Skip to content

Commit

Permalink
[FEAT] Dynamically parallel local parquet reader (#3310)
Browse files Browse the repository at this point in the history
Implement a dynamically parallel local streaming parquet reader.

### Background
The current streaming local parquet reader, while fast and streaming,
has some problems:
- It reads and deserializes **ALL row groups and ALL columns in
parallel.**
- It does not respect **downstream back-pressure** (the crossbeam
channels are all bounded by max chunks, it's free to fill it up).

This leads to unnecessarily high memory usage, and it potentially
starves downstream tasks.

### Solution
Instead of launching all tasks at once, we can cap the number of
parallel tasks based on certain factors:
- Number of CPUs
- Number of Columns.


### Results
Most glaringly, the benefits of these are in memory usage of streaming
queries, for example:
```
next(daft.read_parquet("data/tpch-dbgen/1_0/1/parquet/lineitem").iter_partitions()) # read lineitem tpch sf1
```

The new implementation hits a peak of 300mb, while the old goes over
1gb.
<img width="1186" alt="Screenshot 2024-11-18 at 11 35 36 PM"
src="https://github.com/user-attachments/assets/45fb9fab-3215-4ff6-a7fe-63a428fd9c7b">
<img width="1170" alt="Screenshot 2024-11-18 at 11 36 15 PM"
src="https://github.com/user-attachments/assets/591b9bad-25e9-46ed-ba53-caaa892f50eb">

Another example, where we stream the entire file, but the consumption is
slow:
```
for _ in daft.read_parquet("/Users/colinho/Desktop/Daft/z/daft_tpch_100g_32part_64RG.parquet").iter_partitions():
    time.sleep(0.1)
```

The new implementation hits a peak of 1.2gb, while the old goes over
3gb.
<img width="1188" alt="Screenshot 2024-11-18 at 11 42 01 PM"
src="https://github.com/user-attachments/assets/de9976c7-9c7f-46b4-bd24-1b0ade8a4a86">
<img width="1172" alt="Screenshot 2024-11-18 at 11 42 44 PM"
src="https://github.com/user-attachments/assets/5a2f1bbc-35ed-45a8-93c1-d1853cdbfc89">

To maintain perfomance parity, I also wrote some benchmarks for parquet
files with differing rows / cols / row groups, the results show that the
new implementation is pretty much on par, with some slight differences.
<img width="1432" alt="Screenshot 2024-11-18 at 11 29 30 PM"
src="https://github.com/user-attachments/assets/cf8c7f2c-3fa4-4d43-979a-da2b0f8a1f35">
<img width="1407" alt="Screenshot 2024-11-18 at 11 29 38 PM"
src="https://github.com/user-attachments/assets/b5afb8ca-fe8e-4f6e-b6a1-ea8c45be36cb">

On reading a tpch sf-1 lineitem table though: the results are pretty
much the same: (~0.2s)

---------

Co-authored-by: Colin Ho <[email protected]>
Co-authored-by: EC2 Default User <[email protected]>
  • Loading branch information
3 people authored Dec 4, 2024
1 parent 6d30e30 commit de4fe50
Show file tree
Hide file tree
Showing 3 changed files with 381 additions and 118 deletions.
134 changes: 134 additions & 0 deletions benchmarking/parquet/test_local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
from pathlib import Path

import pytest

from tests.assets import get_asset_dir


def generate_parquet(dir: str, num_rows: int, num_cols: int, num_rowgroups: int):
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as papq
from faker import Faker

# Initialize Faker
Faker.seed(0) # For reproducibility
fake = Faker()

# Generate small pools of fake data that we'll sample from
POOL_SIZE = 1000 # Size of initial fake data pools

# Pre-generate data pools
name_pool = [fake.name() for _ in range(POOL_SIZE)]
email_pool = [fake.email() for _ in range(POOL_SIZE)]
company_pool = [fake.company() for _ in range(POOL_SIZE)]
job_pool = [fake.job() for _ in range(POOL_SIZE)]
address_pool = [fake.address().replace("\n", ", ") for _ in range(POOL_SIZE)]

# Pre-generate date pools
recent_dates = pd.date_range(end=pd.Timestamp.now(), periods=POOL_SIZE, freq="H")
past_dates = pd.date_range(end=pd.Timestamp.now(), periods=POOL_SIZE, freq="D")
future_dates = pd.date_range(start=pd.Timestamp.now(), periods=POOL_SIZE, freq="D")

data = {}
for i in range(num_cols):
col_type = i % 5

if col_type == 0:
# Integer columns (vectorized operations)
data_type = i % 3
if data_type == 0:
data[f"age_{i}"] = np.random.randint(0, 100, size=num_rows)
elif data_type == 1:
data[f"price_{i}"] = np.random.randint(0, 1000, size=num_rows)
else:
data[f"views_{i}"] = np.random.randint(0, 1000000, size=num_rows)

elif col_type == 1:
# Float columns (vectorized operations)
data_type = i % 3
if data_type == 0:
data[f"rating_{i}"] = np.round(np.random.uniform(0, 5, size=num_rows), 1)
elif data_type == 1:
data[f"temp_{i}"] = np.round(np.random.uniform(-20, 45, size=num_rows), 1)
else:
data[f"percentage_{i}"] = np.round(np.random.uniform(0, 100, size=num_rows), 2)

elif col_type == 2:
# String columns (sampling from pre-generated pools)
data_type = i % 5
if data_type == 0:
data[f"name_{i}"] = np.random.choice(name_pool, size=num_rows)
elif data_type == 1:
data[f"email_{i}"] = np.random.choice(email_pool, size=num_rows)
elif data_type == 2:
data[f"company_{i}"] = np.random.choice(company_pool, size=num_rows)
elif data_type == 3:
data[f"address_{i}"] = np.random.choice(address_pool, size=num_rows)
else:
data[f"job_{i}"] = np.random.choice(job_pool, size=num_rows)

elif col_type == 3:
# Boolean columns (vectorized operations)
data[f"is_active_{i}"] = np.random.choice([True, False], size=num_rows)

else:
# Timestamp columns (sampling from pre-generated date ranges)
data_type = i % 3
if data_type == 0:
data[f"recent_date_{i}"] = np.random.choice(recent_dates, size=num_rows)
elif data_type == 1:
data[f"past_date_{i}"] = np.random.choice(past_dates, size=num_rows)
else:
data[f"future_date_{i}"] = np.random.choice(future_dates, size=num_rows)

df = pd.DataFrame(data)
papq.write_table(
table=pa.Table.from_pandas(df),
where=dir,
row_group_size=num_rows // num_rowgroups,
)
print(f"Finished writing {dir}")


SIZE_CONFIGS = [
(10_000_000, 1), # Lots of rows, single col
(1_000_000, 32), # Balanced
(10_000, 1024), # Few rows, many cols
]

ROWGROUP_CONFIGS = [1, 8, 64]

LOCAL_DATA_FIXTURE_PATH = Path(get_asset_dir()) / "../../benchmarking/parquet/local_data"


def get_param_id(param):
(num_rows, num_cols), num_rowgroups = param
return f"{num_rows}_rows_{num_cols}_cols_{num_rowgroups}_rowgroups"


@pytest.fixture(
scope="session",
params=[(size, rowgroup) for size in SIZE_CONFIGS for rowgroup in ROWGROUP_CONFIGS],
ids=lambda param: get_param_id(param),
)
def parquet_file(request):
(num_rows, num_cols), num_rowgroups = request.param
filepath = LOCAL_DATA_FIXTURE_PATH / f"test_{num_rows}rows_{num_cols}cols_{num_rowgroups}groups.parquet"
if not filepath.parent.exists():
filepath.parent.mkdir(parents=True)
if not filepath.exists():
generate_parquet(str(filepath), num_rows, num_cols, num_rowgroups)
return str(filepath)


@pytest.mark.benchmark(group="read_parquet_local")
def test_read_parquet(parquet_file, benchmark):
import daft

def read_parquet():
df = daft.read_parquet(parquet_file)
return df.to_arrow()

benchmark(read_parquet)
1 change: 1 addition & 0 deletions src/daft-parquet/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ async fn stream_parquet_single(
maintain_order,
io_stats,
)
.await
} else {
let builder = ParquetReaderBuilder::from_uri(
uri.as_str(),
Expand Down
Loading

0 comments on commit de4fe50

Please sign in to comment.