diff --git a/tests/tpch/conftest.py b/tests/tpch/conftest.py index 79869b7790..ad21eada01 100644 --- a/tests/tpch/conftest.py +++ b/tests/tpch/conftest.py @@ -56,10 +56,10 @@ def restart(request): @pytest.fixture(scope="session") def dataset_path(local, scale): remote_paths = { - 10: "s3://coiled-runtime-ci/tpc-h/scale-10/", - 100: "s3://coiled-runtime-ci/tpc-h/scale-100/", - 1000: "s3://coiled-runtime-ci/tpc-h/scale-1000/", - 10000: "s3://coiled-runtime-ci/tpc-h/scale-10000/", + 10: "s3://coiled-runtime-ci/tpc-h/snappy/scale-10/", + 100: "s3://coiled-runtime-ci/tpc-h/snappy/scale-100/", + 1000: "s3://coiled-runtime-ci/tpc-h/snappy/scale-1000/", + 10000: "s3://coiled-runtime-ci/tpc-h/snappy/scale-10000/", } local_paths = { 1: "./tpch-data/scale-1/", diff --git a/tests/tpch/generate-data.py b/tests/tpch/generate-data.py index c5476cd71a..0ad83a1b99 100644 --- a/tests/tpch/generate-data.py +++ b/tests/tpch/generate-data.py @@ -1,6 +1,8 @@ +import enum import functools import pathlib import tempfile +import uuid import warnings import boto3 @@ -11,15 +13,26 @@ import duckdb import psutil import pyarrow.compute as pc +import pyarrow.parquet as pq REGION = None +class CompressionCodec(enum.Enum): + SNAPPY = "SNAPPY" + LZ4 = "LZ4" + ZSTD = "ZSTD" + GZIP = "GZIP" + BROTLI = "BROTLI" + NONE = "NONE" + + def generate( scale: int = 10, partition_size: str = "128 MiB", path: str = "./tpch-data", relaxed_schema: bool = False, + compression: CompressionCodec = CompressionCodec.SNAPPY, ): if str(path).startswith("s3"): path += "/" if not path.endswith("/") else "" @@ -41,6 +54,7 @@ def generate( path=path, relaxed_schema=relaxed_schema, partition_size=partition_size, + compression=compression, ) if use_coiled: @@ -75,7 +89,12 @@ def _(*args, **kwargs): @retry def _tpch_data_gen( - step: int, scale: int, path: str, partition_size: str, relaxed_schema: bool + step: int, + scale: int, + path: str, + partition_size: str, + relaxed_schema: bool, + compression: CompressionCodec, ): """ Run TPC-H dbgen for generating the th part of a multi-part load or update set @@ -150,7 +169,9 @@ def _tpch_data_gen( # TODO: duckdb doesn't (yet) support writing parquet files by limited file size # so we estimate the page size required for each table to get files of about a target size n_rows_total = con.sql(f"select count(*) from {table}").fetchone()[0] - n_rows_per_page = rows_approx_mb(con, table, partition_size=partition_size) + n_rows_per_page = rows_approx_mb( + con, table, partition_size=partition_size, compression=compression + ) if n_rows_total == 0: continue # In case of step based production, some tables may already be fully generated @@ -158,19 +179,34 @@ def _tpch_data_gen( print( f"Start Exporting Page from {table} - Page {offset} - {offset + n_rows_per_page}" ) - con.sql( - f""" - copy - (select * from {table} offset {offset} limit {n_rows_per_page} ) - to '{out}' - (format parquet, per_thread_output true, filename_pattern "{table}_{{uuid}}", overwrite_or_ignore) - """ + stmt = ( + f"""select * from {table} offset {offset} limit {n_rows_per_page}""" + ) + df = con.sql(stmt).arrow() + + # DuckDB doesn't support LZ4, and we want to use PyArrow to handle + # compression codecs. + # ref: https://github.com/duckdb/duckdb/discussions/8950 + # ref: https://github.com/coiled/benchmarks/pull/1209#issuecomment-1829620531 + file = f"{table}_{uuid.uuid4()}.{compression.value.lower()}.parquet" + if isinstance(out, str) and out.startswith("s3"): + out_ = f"{out}/{file}" + else: + out_ = pathlib.Path(out) + out_.mkdir(exist_ok=True, parents=True) + out_ = str(out_ / file) + pq.write_table( + df, + out_, + compression=compression.value.lower(), + write_statistics=True, + write_page_index=True, ) print(f"Finished exporting table {table}!") print("Finished exporting all data!") -def rows_approx_mb(con, table_name, partition_size: str): +def rows_approx_mb(con, table_name, partition_size: str, compression: CompressionCodec): """ Estimate the number of rows from this table required to result in a parquet file output size of `partition_size` @@ -180,9 +216,15 @@ def rows_approx_mb(con, table_name, partition_size: str): table = con.sql(f"select * from {table_name} limit {sample_size}").arrow() with tempfile.TemporaryDirectory() as tmpdir: - tmp = pathlib.Path(tmpdir) / "out.parquet" - con.sql( - f"copy (select * from {table_name} limit {sample_size}) to '{tmp}' (format parquet)" + tmp = pathlib.Path(tmpdir) / "tmp.parquet" + stmt = f"select * from {table_name} limit {sample_size}" + df = con.sql(stmt).arrow() + pq.write_table( + df, + tmp, + compression=compression.value.lower(), + write_statistics=True, + write_page_index=True, ) mb = tmp.stat().st_size return int( @@ -250,8 +292,21 @@ def get_bucket_region(path: str): flag_value=True, help="Set flag to convert official TPC-H types decimal -> float and date -> timestamp_s", ) -def main(scale: int, partition_size: str, path: str, relaxed_schema: bool): - generate(scale, partition_size, path, relaxed_schema) +@click.option( + "--compression", + type=click.Choice(v.lower() for v in CompressionCodec.__members__), + callback=lambda _c, _p, v: getattr(CompressionCodec, v.upper()), + default=CompressionCodec.SNAPPY.value, + help="Set compression codec", +) +def main( + scale: int, + partition_size: str, + path: str, + relaxed_schema: bool, + compression: CompressionCodec, +): + generate(scale, partition_size, path, relaxed_schema, compression) if __name__ == "__main__":