Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support compression codec choice in TPC-H data generation and write with pyarrow #1209

Merged
merged 8 commits into from
Dec 14, 2023
85 changes: 70 additions & 15 deletions tests/tpch/generate-data.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import enum
import functools
import pathlib
import tempfile
import uuid
import warnings

import boto3
Expand All @@ -11,15 +13,26 @@
import duckdb
import psutil
import pyarrow.compute as pc
import pyarrow.parquet as pq

REGION = None


class CompressionCodec(enum.Enum):
SNAPPY = "SNAPPY"
LZ4 = "LZ4"
ZSTD = "ZSTD"
GZIP = "GZIP"
BROTLI = "BROTLI"
NONE = "NONE"


def generate(
scale: int = 10,
partition_size: str = "128 MiB",
path: str = "./tpch-data",
relaxed_schema: bool = False,
compression: CompressionCodec = CompressionCodec.LZ4,
):
if str(path).startswith("s3"):
path += "/" if not path.endswith("/") else ""
Expand All @@ -41,6 +54,7 @@ def generate(
path=path,
relaxed_schema=relaxed_schema,
partition_size=partition_size,
compression=compression,
)

if use_coiled:
Expand Down Expand Up @@ -75,7 +89,12 @@ def _(*args, **kwargs):

@retry
def _tpch_data_gen(
step: int, scale: int, path: str, partition_size: str, relaxed_schema: bool
step: int,
scale: int,
path: str,
partition_size: str,
relaxed_schema: bool,
compression: CompressionCodec,
):
"""
Run TPC-H dbgen for generating the <step>th part of a multi-part load or update set
Expand Down Expand Up @@ -150,27 +169,44 @@ def _tpch_data_gen(
# TODO: duckdb doesn't (yet) support writing parquet files by limited file size
# so we estimate the page size required for each table to get files of about a target size
n_rows_total = con.sql(f"select count(*) from {table}").fetchone()[0]
n_rows_per_page = rows_approx_mb(con, table, partition_size=partition_size)
n_rows_per_page = rows_approx_mb(
con, table, partition_size=partition_size, compression=compression
)
if n_rows_total == 0:
continue # In case of step based production, some tables may already be fully generated

for offset in range(0, n_rows_total, n_rows_per_page):
print(
f"Start Exporting Page from {table} - Page {offset} - {offset + n_rows_per_page}"
)
con.sql(
f"""
copy
(select * from {table} offset {offset} limit {n_rows_per_page} )
to '{out}'
(format parquet, per_thread_output true, filename_pattern "{table}_{{uuid}}", overwrite_or_ignore)
"""
stmt = (
f"""select * from {table} offset {offset} limit {n_rows_per_page}"""
)
df = con.sql(stmt).arrow()

# DuckDB doesn't support LZ4, and we want to use PyArrow to handle
# compression codecs.
# ref: https://github.com/duckdb/duckdb/discussions/8950
# ref: https://github.com/coiled/benchmarks/pull/1209#issuecomment-1829620531
file = f"{table}_{uuid.uuid4()}.{compression.value.lower()}.parquet"
if isinstance(out, str) and out.startswith("s3"):
out_ = f"{out}/{file}"
else:
out_ = pathlib.Path(out)
out_.mkdir(exist_ok=True, parents=True)
out_ = str(out_ / file)
pq.write_table(
df,
out_,
compression=compression.value.lower(),
write_statistics=True,
write_page_index=True,
)
print(f"Finished exporting table {table}!")
print("Finished exporting all data!")


def rows_approx_mb(con, table_name, partition_size: str):
def rows_approx_mb(con, table_name, partition_size: str, compression: CompressionCodec):
"""
Estimate the number of rows from this table required to
result in a parquet file output size of `partition_size`
Expand All @@ -180,9 +216,15 @@ def rows_approx_mb(con, table_name, partition_size: str):
table = con.sql(f"select * from {table_name} limit {sample_size}").arrow()

with tempfile.TemporaryDirectory() as tmpdir:
tmp = pathlib.Path(tmpdir) / "out.parquet"
con.sql(
f"copy (select * from {table_name} limit {sample_size}) to '{tmp}' (format parquet)"
tmp = pathlib.Path(tmpdir) / "tmp.parquet"
stmt = f"select * from {table_name} limit {sample_size}"
df = con.sql(stmt).arrow()
pq.write_table(
df,
tmp,
compression=compression.value.lower(),
write_statistics=True,
write_page_index=True,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pyarrow doesn't store page indices by default and I'm not even sure if it is implemented to use them during reading.
Whether this is a good idea to have depends on many different things, size of file, size of row groups, number of columns, etc. and for some constellations this can cause overhead.
Before we enable this blindly, I would like to make sure this does not negatively impact anything.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see a small difference in file size (see metadata below) when adding page index, and no real performance difference:

❯ ls -lhs lineitem*                                                                                                         (tmp)
126M -rw-r--r--. 1 milesg milesg 126M Nov 28 08:47 lineitem.duckdb.snappy.parquet
102M -rw-r--r--. 1 milesg milesg 102M Nov 28 12:09 lineitem-stats-no-index.snappy.parquet
102M -rw-r--r--. 1 milesg milesg 102M Nov 30 11:53 lineitem-stats-and-index.snappy.parquet
❯ hyperfine 'python read-no-page-index.py' 'python read-page-index.py'                                                      (tmp)
Benchmark 1: python read-no-page-index.py
  Time (mean ± σ):      1.299 s ±  0.160 s    [User: 2.656 s, System: 2.409 s]
  Range (min … max):    0.994 s …  1.572 s    10 runs

Benchmark 2: python read-page-index.py
  Time (mean ± σ):      1.272 s ±  0.159 s    [User: 2.514 s, System: 2.381 s]
  Range (min … max):    1.084 s …  1.538 s    10 runs

Summary
  python read-page-index.py ran
    1.02 ± 0.18 times faster than python read-no-page-index.py

Then metadata:

In [5]: pq.ParquetFile('lineitem-stats-no-index.snappy.parquet').metadata
Out[5]:
<pyarrow._parquet.FileMetaData object at 0x7fc36c2da520>
  created_by: parquet-cpp-arrow version 13.0.0
  num_columns: 16
  num_rows: 2735597
  num_row_groups: 3
  format_version: 2.6
  serialized_size: 6900

In [6]: pq.ParquetFile('lineitem-stats-and-index.snappy.parquet').metadata
Out[6]:
<pyarrow._parquet.FileMetaData object at 0x7fc36c4f4fe0>
  created_by: parquet-cpp-arrow version 13.0.0
  num_columns: 16
  num_rows: 2735597
  num_row_groups: 3
  format_version: 2.6
  serialized_size: 7584

)
mb = tmp.stat().st_size
return int(
Expand Down Expand Up @@ -250,8 +292,21 @@ def get_bucket_region(path: str):
flag_value=True,
help="Set flag to convert official TPC-H types decimal -> float and date -> timestamp_s",
)
def main(scale: int, partition_size: str, path: str, relaxed_schema: bool):
generate(scale, partition_size, path, relaxed_schema)
@click.option(
"--compression",
type=click.Choice(v.lower() for v in CompressionCodec.__members__),
callback=lambda _c, _p, v: getattr(CompressionCodec, v.upper()),
default=CompressionCodec.LZ4.value,
help="Set compression codec",
)
def main(
scale: int,
partition_size: str,
path: str,
relaxed_schema: bool,
compression: CompressionCodec,
):
generate(scale, partition_size, path, relaxed_schema, compression)


if __name__ == "__main__":
Expand Down
Loading