From de4fe508b19dd9ae5649f51f116821aa55051d80 Mon Sep 17 00:00:00 2001
From: Colin Ho <colin.ho99@gmail.com>
Date: Wed, 4 Dec 2024 09:30:00 -0800
Subject: [PATCH] [FEAT] Dynamically parallel local parquet reader (#3310)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Implement a dynamically parallel local streaming parquet reader.

### Background
The current streaming local parquet reader, while fast and streaming,
has some problems:
- It reads and deserializes **ALL row groups and ALL columns in
parallel.**
- It does not respect **downstream back-pressure** (the crossbeam
channels are all bounded by max chunks, it's free to fill it up).

This leads to unnecessarily high memory usage, and it potentially
starves downstream tasks.

### Solution
Instead of launching all tasks at once, we can cap the number of
parallel tasks based on certain factors:
- Number of CPUs
- Number of Columns.


### Results
Most glaringly, the benefits of these are in memory usage of streaming
queries, for example:
```
next(daft.read_parquet("data/tpch-dbgen/1_0/1/parquet/lineitem").iter_partitions()) # read lineitem tpch sf1
```

The new implementation hits a peak of 300mb, while the old goes over
1gb.
<img width="1186" alt="Screenshot 2024-11-18 at 11 35 36 PM"
src="https://github.com/user-attachments/assets/45fb9fab-3215-4ff6-a7fe-63a428fd9c7b">
<img width="1170" alt="Screenshot 2024-11-18 at 11 36 15 PM"
src="https://github.com/user-attachments/assets/591b9bad-25e9-46ed-ba53-caaa892f50eb">

Another example, where we stream the entire file, but the consumption is
slow:
```
for _ in daft.read_parquet("/Users/colinho/Desktop/Daft/z/daft_tpch_100g_32part_64RG.parquet").iter_partitions():
    time.sleep(0.1)
```

The new implementation hits a peak of 1.2gb, while the old goes over
3gb.
<img width="1188" alt="Screenshot 2024-11-18 at 11 42 01 PM"
src="https://github.com/user-attachments/assets/de9976c7-9c7f-46b4-bd24-1b0ade8a4a86">
<img width="1172" alt="Screenshot 2024-11-18 at 11 42 44 PM"
src="https://github.com/user-attachments/assets/5a2f1bbc-35ed-45a8-93c1-d1853cdbfc89">

To maintain perfomance parity, I also wrote some benchmarks for parquet
files with differing rows / cols / row groups, the results show that the
new implementation is pretty much on par, with some slight differences.
<img width="1432" alt="Screenshot 2024-11-18 at 11 29 30 PM"
src="https://github.com/user-attachments/assets/cf8c7f2c-3fa4-4d43-979a-da2b0f8a1f35">
<img width="1407" alt="Screenshot 2024-11-18 at 11 29 38 PM"
src="https://github.com/user-attachments/assets/b5afb8ca-fe8e-4f6e-b6a1-ea8c45be36cb">

On reading a tpch sf-1 lineitem table though: the results are pretty
much the same: (~0.2s)

---------

Co-authored-by: Colin Ho <colinho@Colins-MacBook-Pro.local>
Co-authored-by: EC2 Default User <ec2-user@ip-172-31-62-80.us-west-2.compute.internal>
---
 benchmarking/parquet/test_local.py    | 134 ++++++++++
 src/daft-parquet/src/read.rs          |   1 +
 src/daft-parquet/src/stream_reader.rs | 364 +++++++++++++++++---------
 3 files changed, 381 insertions(+), 118 deletions(-)
 create mode 100644 benchmarking/parquet/test_local.py

diff --git a/benchmarking/parquet/test_local.py b/benchmarking/parquet/test_local.py
new file mode 100644
index 0000000000..173d83f97b
--- /dev/null
+++ b/benchmarking/parquet/test_local.py
@@ -0,0 +1,134 @@
+from pathlib import Path
+
+import pytest
+
+from tests.assets import get_asset_dir
+
+
+def generate_parquet(dir: str, num_rows: int, num_cols: int, num_rowgroups: int):
+    import numpy as np
+    import pandas as pd
+    import pyarrow as pa
+    import pyarrow.parquet as papq
+    from faker import Faker
+
+    # Initialize Faker
+    Faker.seed(0)  # For reproducibility
+    fake = Faker()
+
+    # Generate small pools of fake data that we'll sample from
+    POOL_SIZE = 1000  # Size of initial fake data pools
+
+    # Pre-generate data pools
+    name_pool = [fake.name() for _ in range(POOL_SIZE)]
+    email_pool = [fake.email() for _ in range(POOL_SIZE)]
+    company_pool = [fake.company() for _ in range(POOL_SIZE)]
+    job_pool = [fake.job() for _ in range(POOL_SIZE)]
+    address_pool = [fake.address().replace("\n", ", ") for _ in range(POOL_SIZE)]
+
+    # Pre-generate date pools
+    recent_dates = pd.date_range(end=pd.Timestamp.now(), periods=POOL_SIZE, freq="H")
+    past_dates = pd.date_range(end=pd.Timestamp.now(), periods=POOL_SIZE, freq="D")
+    future_dates = pd.date_range(start=pd.Timestamp.now(), periods=POOL_SIZE, freq="D")
+
+    data = {}
+    for i in range(num_cols):
+        col_type = i % 5
+
+        if col_type == 0:
+            # Integer columns (vectorized operations)
+            data_type = i % 3
+            if data_type == 0:
+                data[f"age_{i}"] = np.random.randint(0, 100, size=num_rows)
+            elif data_type == 1:
+                data[f"price_{i}"] = np.random.randint(0, 1000, size=num_rows)
+            else:
+                data[f"views_{i}"] = np.random.randint(0, 1000000, size=num_rows)
+
+        elif col_type == 1:
+            # Float columns (vectorized operations)
+            data_type = i % 3
+            if data_type == 0:
+                data[f"rating_{i}"] = np.round(np.random.uniform(0, 5, size=num_rows), 1)
+            elif data_type == 1:
+                data[f"temp_{i}"] = np.round(np.random.uniform(-20, 45, size=num_rows), 1)
+            else:
+                data[f"percentage_{i}"] = np.round(np.random.uniform(0, 100, size=num_rows), 2)
+
+        elif col_type == 2:
+            # String columns (sampling from pre-generated pools)
+            data_type = i % 5
+            if data_type == 0:
+                data[f"name_{i}"] = np.random.choice(name_pool, size=num_rows)
+            elif data_type == 1:
+                data[f"email_{i}"] = np.random.choice(email_pool, size=num_rows)
+            elif data_type == 2:
+                data[f"company_{i}"] = np.random.choice(company_pool, size=num_rows)
+            elif data_type == 3:
+                data[f"address_{i}"] = np.random.choice(address_pool, size=num_rows)
+            else:
+                data[f"job_{i}"] = np.random.choice(job_pool, size=num_rows)
+
+        elif col_type == 3:
+            # Boolean columns (vectorized operations)
+            data[f"is_active_{i}"] = np.random.choice([True, False], size=num_rows)
+
+        else:
+            # Timestamp columns (sampling from pre-generated date ranges)
+            data_type = i % 3
+            if data_type == 0:
+                data[f"recent_date_{i}"] = np.random.choice(recent_dates, size=num_rows)
+            elif data_type == 1:
+                data[f"past_date_{i}"] = np.random.choice(past_dates, size=num_rows)
+            else:
+                data[f"future_date_{i}"] = np.random.choice(future_dates, size=num_rows)
+
+    df = pd.DataFrame(data)
+    papq.write_table(
+        table=pa.Table.from_pandas(df),
+        where=dir,
+        row_group_size=num_rows // num_rowgroups,
+    )
+    print(f"Finished writing {dir}")
+
+
+SIZE_CONFIGS = [
+    (10_000_000, 1),  # Lots of rows, single col
+    (1_000_000, 32),  # Balanced
+    (10_000, 1024),  # Few rows, many cols
+]
+
+ROWGROUP_CONFIGS = [1, 8, 64]
+
+LOCAL_DATA_FIXTURE_PATH = Path(get_asset_dir()) / "../../benchmarking/parquet/local_data"
+
+
+def get_param_id(param):
+    (num_rows, num_cols), num_rowgroups = param
+    return f"{num_rows}_rows_{num_cols}_cols_{num_rowgroups}_rowgroups"
+
+
+@pytest.fixture(
+    scope="session",
+    params=[(size, rowgroup) for size in SIZE_CONFIGS for rowgroup in ROWGROUP_CONFIGS],
+    ids=lambda param: get_param_id(param),
+)
+def parquet_file(request):
+    (num_rows, num_cols), num_rowgroups = request.param
+    filepath = LOCAL_DATA_FIXTURE_PATH / f"test_{num_rows}rows_{num_cols}cols_{num_rowgroups}groups.parquet"
+    if not filepath.parent.exists():
+        filepath.parent.mkdir(parents=True)
+    if not filepath.exists():
+        generate_parquet(str(filepath), num_rows, num_cols, num_rowgroups)
+    return str(filepath)
+
+
+@pytest.mark.benchmark(group="read_parquet_local")
+def test_read_parquet(parquet_file, benchmark):
+    import daft
+
+    def read_parquet():
+        df = daft.read_parquet(parquet_file)
+        return df.to_arrow()
+
+    benchmark(read_parquet)
diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs
index 6d3ee8b74b..3810532b35 100644
--- a/src/daft-parquet/src/read.rs
+++ b/src/daft-parquet/src/read.rs
@@ -418,6 +418,7 @@ async fn stream_parquet_single(
             maintain_order,
             io_stats,
         )
+        .await
     } else {
         let builder = ParquetReaderBuilder::from_uri(
             uri.as_str(),
diff --git a/src/daft-parquet/src/stream_reader.rs b/src/daft-parquet/src/stream_reader.rs
index 4755386fe9..cd879fce13 100644
--- a/src/daft-parquet/src/stream_reader.rs
+++ b/src/daft-parquet/src/stream_reader.rs
@@ -1,12 +1,15 @@
 use std::{
+    cmp::max,
     collections::HashSet,
     fs::File,
     io::{Read, Seek},
+    num::NonZeroUsize,
     sync::Arc,
 };
 
 use arrow2::{bitmap::Bitmap, io::parquet::read};
 use common_error::DaftResult;
+use common_runtime::{get_compute_runtime, RuntimeTask};
 use daft_core::{prelude::*, utils::arrow::cast_array_for_daft_if_needed};
 use daft_dsl::ExprRef;
 use daft_io::IOStatsRef;
@@ -18,6 +21,7 @@ use rayon::{
     prelude::{IndexedParallelIterator, IntoParallelIterator, ParallelBridge},
 };
 use snafu::ResultExt;
+use tokio_stream::wrappers::ReceiverStream;
 
 use crate::{
     file::{build_row_ranges, RowGroupRange},
@@ -86,73 +90,190 @@ pub fn arrow_column_iters_to_table_iter(
         let chunk = chunk.with_context(|_| {
             super::UnableToCreateChunkFromStreamingFileReaderSnafu { path: uri.clone() }
         })?;
-        let all_series = chunk
-            .into_iter()
-            .zip(owned_schema_ref.as_ref().fields.iter())
-            .filter_map(|(mut arr, (f_name, _))| {
-                if (index_so_far + arr.len()) < row_range_start {
-                    // No need to process arrays that are less than the start offset
-                    return None;
-                }
-                if index_so_far < row_range_start {
-                    // Slice arrays that are partially needed
-                    let offset = row_range_start.saturating_sub(index_so_far);
-                    arr = arr.sliced(offset, arr.len() - offset);
-                }
-                let series_result =
-                    Series::try_from((f_name.as_str(), cast_array_for_daft_if_needed(arr)));
-                Some(series_result)
-            })
-            .collect::<DaftResult<Vec<_>>>()?;
-
-        let len = all_series
-            .first()
-            .map(daft_core::series::Series::len)
-            .expect("All series should not be empty when creating table from parquet chunks");
-        if all_series.iter().any(|s| s.len() != len) {
-            return Err(super::Error::ParquetColumnsDontHaveEqualRows { path: uri.clone() }.into());
+        arrow_chunk_to_table(
+            chunk,
+            &owned_schema_ref,
+            &uri,
+            row_range_start,
+            &mut index_so_far,
+            delete_rows.as_deref(),
+            &mut curr_delete_row_idx,
+            predicate.clone(),
+            original_columns.as_deref(),
+            original_num_rows,
+        )
+    });
+    Some(table_iter)
+}
+
+#[allow(clippy::too_many_arguments)]
+fn arrow_chunk_to_table(
+    arrow_chunk: ArrowChunk,
+    schema_ref: &SchemaRef,
+    uri: &str,
+    row_range_start: usize,
+    index_so_far: &mut usize,
+    delete_rows: Option<&[i64]>,
+    curr_delete_row_idx: &mut usize,
+    predicate: Option<ExprRef>,
+    original_columns: Option<&[String]>,
+    original_num_rows: Option<usize>,
+) -> DaftResult<Table> {
+    let all_series = arrow_chunk
+        .into_iter()
+        .zip(schema_ref.fields.iter())
+        .filter_map(|(mut arr, (f_name, _))| {
+            if (*index_so_far + arr.len()) < row_range_start {
+                // No need to process arrays that are less than the start offset
+                return None;
+            }
+            if *index_so_far < row_range_start {
+                // Slice arrays that are partially needed
+                let offset = row_range_start.saturating_sub(*index_so_far);
+                arr = arr.sliced(offset, arr.len() - offset);
+            }
+            let series_result =
+                Series::try_from((f_name.as_str(), cast_array_for_daft_if_needed(arr)));
+            Some(series_result)
+        })
+        .collect::<DaftResult<Vec<_>>>()?;
+
+    let len = all_series
+        .first()
+        .map(daft_core::series::Series::len)
+        .expect("All series should not be empty when creating table from parquet chunks");
+    if all_series.iter().any(|s| s.len() != len) {
+        return Err(super::Error::ParquetColumnsDontHaveEqualRows {
+            path: uri.to_string(),
         }
+        .into());
+    }
 
-        let mut table = Table::new_with_size(
-            Schema::new(all_series.iter().map(|s| s.field().clone()).collect())?,
-            all_series,
-            len,
-        )
-        .with_context(|_| super::UnableToCreateTableFromChunkSnafu { path: uri.clone() })?;
+    let mut table = Table::new_with_size(
+        Schema::new(all_series.iter().map(|s| s.field().clone()).collect())?,
+        all_series,
+        len,
+    )
+    .with_context(|_| super::UnableToCreateTableFromChunkSnafu {
+        path: uri.to_string(),
+    })?;
 
-        // Apply delete rows if needed
-        if let Some(delete_rows) = &delete_rows
-            && !delete_rows.is_empty()
+    // Apply delete rows if needed
+    if let Some(delete_rows) = &delete_rows
+        && !delete_rows.is_empty()
+    {
+        let mut selection_mask = Bitmap::new_trued(table.len()).make_mut();
+        while *curr_delete_row_idx < delete_rows.len()
+            && delete_rows[*curr_delete_row_idx] < *index_so_far as i64 + len as i64
         {
-            let mut selection_mask = Bitmap::new_trued(table.len()).make_mut();
-            while curr_delete_row_idx < delete_rows.len()
-                && delete_rows[curr_delete_row_idx] < index_so_far as i64 + len as i64
-            {
-                let table_row = delete_rows[curr_delete_row_idx] as usize - index_so_far;
-                unsafe {
-                    selection_mask.set_unchecked(table_row, false);
-                }
-                curr_delete_row_idx += 1;
+            let table_row = delete_rows[*curr_delete_row_idx] as usize - *index_so_far;
+            unsafe {
+                selection_mask.set_unchecked(table_row, false);
             }
-            let selection_mask: BooleanArray =
-                ("selection_mask", Bitmap::from(selection_mask)).into();
-            table = table.mask_filter(&selection_mask.into_series())?;
+            *curr_delete_row_idx += 1;
+        }
+        let selection_mask: BooleanArray = ("selection_mask", Bitmap::from(selection_mask)).into();
+        table = table.mask_filter(&selection_mask.into_series())?;
+    }
+    *index_so_far += len;
+
+    // Apply pushdowns if needed
+    if let Some(predicate) = predicate {
+        table = table.filter(&[predicate])?;
+        if let Some(oc) = &original_columns {
+            table = table.get_columns(oc)?;
+        }
+        if let Some(nr) = original_num_rows {
+            table = table.head(nr)?;
+        }
+    }
+    Ok(table)
+}
+
+/// Spawns a task that reads the column iterators and converts them into a table.
+#[allow(clippy::too_many_arguments)]
+pub fn spawn_column_iters_to_table_task(
+    arr_iters: ArrowChunkIters,
+    rg_range: RowGroupRange,
+    schema_ref: SchemaRef,
+    uri: String,
+    predicate: Option<ExprRef>,
+    original_columns: Option<Vec<String>>,
+    original_num_rows: Option<usize>,
+    delete_rows: Option<Vec<i64>>,
+    output_sender: tokio::sync::mpsc::Sender<DaftResult<Table>>,
+    permit: tokio::sync::OwnedSemaphorePermit,
+) -> RuntimeTask<DaftResult<()>> {
+    let (arrow_chunk_senders, mut arrow_chunk_receivers): (Vec<_>, Vec<_>) = arr_iters
+        .iter()
+        .map(|_| tokio::sync::mpsc::channel(1))
+        .unzip();
+
+    let compute_runtime = get_compute_runtime();
+
+    let deserializer_handles = arrow_chunk_senders
+        .into_iter()
+        .zip(arr_iters)
+        .map(|(sender, arr_iter)| {
+            let deserialization_task = async move {
+                for arr in arr_iter {
+                    if sender.send(arr?).await.is_err() {
+                        break;
+                    }
+                }
+                DaftResult::Ok(())
+            };
+            compute_runtime.spawn(deserialization_task)
+        })
+        .collect::<Vec<_>>();
+
+    compute_runtime.spawn(async move {
+        if deserializer_handles.is_empty() {
+            let empty = Table::new_with_size(schema_ref.clone(), vec![], rg_range.num_rows);
+            let _ = output_sender.send(empty).await;
+            return Ok(());
         }
-        index_so_far += len;
 
-        // Apply pushdowns if needed
-        if let Some(predicate) = &predicate {
-            table = table.filter(&[predicate.clone()])?;
-            if let Some(oc) = &original_columns {
-                table = table.get_columns(oc)?;
+        let mut curr_delete_row_idx = 0;
+        // Keep track of the current index in the row group so we can throw away arrays that are not needed
+        // and slice arrays that are partially needed.
+        let mut index_so_far = 0;
+        loop {
+            let chunk =
+                futures::future::join_all(arrow_chunk_receivers.iter_mut().map(|s| s.recv()))
+                    .await
+                    .into_iter()
+                    .flatten()
+                    .collect::<Vec<_>>();
+            if chunk.len() != deserializer_handles.len() {
+                break;
             }
-            if let Some(nr) = original_num_rows {
-                table = table.head(nr)?;
+            let table = arrow_chunk_to_table(
+                chunk,
+                &schema_ref,
+                &uri,
+                rg_range.start,
+                &mut index_so_far,
+                delete_rows.as_deref(),
+                &mut curr_delete_row_idx,
+                predicate.clone(),
+                original_columns.as_deref(),
+                original_num_rows,
+            )?;
+
+            if output_sender.send(Ok(table)).await.is_err() {
+                break;
             }
         }
-        Ok(table)
-    });
-    Some(table_iter)
+
+        futures::future::try_join_all(deserializer_handles)
+            .await?
+            .into_iter()
+            .collect::<DaftResult<()>>()?;
+
+        drop(permit);
+        DaftResult::Ok(())
+    })
 }
 
 struct CountingReader<R> {
@@ -517,7 +638,7 @@ pub async fn local_parquet_read_async(
 }
 
 #[allow(clippy::too_many_arguments)]
-pub fn local_parquet_stream(
+pub async fn local_parquet_stream(
     uri: &str,
     original_columns: Option<Vec<String>>,
     columns: Option<Vec<String>>,
@@ -547,75 +668,82 @@ pub fn local_parquet_stream(
         io_stats,
     )?;
 
-    // Create a channel for each row group to send the processed tables to the stream
-    // Each channel is expected to have a number of chunks equal to the number of chunks in the row group
-    let (senders, receivers): (Vec<_>, Vec<_>) = row_ranges
+    // We use a semaphore to limit the number of concurrent row group deserialization tasks.
+    // Set the maximum number of concurrent tasks to ceil(number of available threads / columns).
+    let num_parallel_tasks = (std::thread::available_parallelism()
+        .unwrap_or(NonZeroUsize::new(2).unwrap())
+        .checked_mul(2.try_into().unwrap())
+        .unwrap()
+        .get() as f64
+        / max(schema_ref.fields.len(), 1) as f64)
+        .ceil() as usize;
+
+    let semaphore = Arc::new(tokio::sync::Semaphore::new(num_parallel_tasks));
+
+    let owned_uri = uri.to_string();
+    let compute_runtime = get_compute_runtime();
+
+    let (output_senders, output_receivers): (Vec<_>, Vec<_>) = row_ranges
         .iter()
-        .map(|rg_range| {
-            let expected_num_chunks =
-                f32::ceil(rg_range.num_rows as f32 / chunk_size as f32) as usize;
-            crossbeam_channel::bounded(expected_num_chunks)
-        })
+        .map(|_| tokio::sync::mpsc::channel(1))
         .unzip();
 
-    let owned_uri = uri.to_string();
+    let parquet_task = compute_runtime.spawn(async move {
+        let mut table_tasks = Vec::with_capacity(row_ranges.len());
+        for ((column_iters, sender), rg_range) in column_iters.zip(output_senders).zip(row_ranges) {
+            if let Err(e) = column_iters {
+                let _ = sender.send(Err(e.into())).await;
+                break;
+            }
 
-    rayon::spawn(move || {
-        // Once a row group has been read into memory and we have the column iterators,
-        // we can start processing them in parallel.
-        let par_column_iters = column_iters.zip(row_ranges).zip(senders).par_bridge();
-
-        // For each vec of column iters, iterate through them in parallel lock step such that each iteration
-        // produces a chunk of the row group that can be converted into a table.
-        par_column_iters.for_each(move |((rg_column_iters_result, rg_range), tx)| {
-            let table_iter = match rg_column_iters_result {
-                Ok(rg_column_iters) => {
-                    let table_iter = arrow_column_iters_to_table_iter(
-                        rg_column_iters,
-                        rg_range.start,
-                        schema_ref.clone(),
-                        owned_uri.clone(),
-                        predicate.clone(),
-                        original_columns.clone(),
-                        original_num_rows,
-                        delete_rows.clone(),
-                    );
-                    // Even if there are no columns to read, we still need to create a empty table with the correct number of rows
-                    // This is because the columns may be present in other files. See https://github.com/Eventual-Inc/Daft/pull/2514
-                    if let Some(table_iter) = table_iter {
-                        table_iter
+            let permit = semaphore.clone().acquire_owned().await.unwrap();
+            let table_task = spawn_column_iters_to_table_task(
+                column_iters.unwrap(),
+                rg_range,
+                schema_ref.clone(),
+                owned_uri.clone(),
+                predicate.clone(),
+                original_columns.clone(),
+                original_num_rows,
+                delete_rows.clone(),
+                sender,
+                permit,
+            );
+            table_tasks.push(table_task);
+        }
+
+        futures::future::try_join_all(table_tasks)
+            .await?
+            .into_iter()
+            .collect::<DaftResult<()>>()?;
+
+        DaftResult::Ok(())
+    });
+
+    let stream_of_streams =
+        futures::stream::iter(output_receivers.into_iter().map(ReceiverStream::new));
+    let flattened = match maintain_order {
+        true => stream_of_streams.flatten().boxed(),
+        false => stream_of_streams.flatten_unordered(None).boxed(),
+    };
+    let combined_stream = futures::stream::unfold(
+        (Some(parquet_task), flattened),
+        |(task, mut stream)| async move {
+            task.as_ref()?;
+            match stream.next().await {
+                Some(v) => Some((v, (task, stream))),
+                None => {
+                    if let Err(e) = task.unwrap().await {
+                        Some((Err(e), (None, stream)))
                     } else {
-                        let table =
-                            Table::new_with_size(schema_ref.clone(), vec![], rg_range.num_rows);
-                        if let Err(crossbeam_channel::TrySendError::Full(_)) = tx.try_send(table) {
-                            panic!("Parquet stream channel should not be full")
-                        }
-                        return;
+                        None
                     }
                 }
-                Err(e) => {
-                    let _ = tx.send(Err(e.into()));
-                    return;
-                }
-            };
-            for table_result in table_iter {
-                let table_err = table_result.is_err();
-                if let Err(crossbeam_channel::TrySendError::Full(_)) = tx.try_send(table_result) {
-                    panic!("Parquet stream channel should not be full")
-                }
-                if table_err {
-                    break;
-                }
             }
-        });
-    });
+        },
+    );
 
-    let result_stream = futures::stream::iter(receivers.into_iter().map(futures::stream::iter));
-
-    match maintain_order {
-        true => Ok((metadata, Box::pin(result_stream.flatten()))),
-        false => Ok((metadata, Box::pin(result_stream.flatten_unordered(None)))),
-    }
+    Ok((metadata, combined_stream.boxed()))
 }
 
 #[allow(clippy::too_many_arguments)]