diff --git a/tests/glue_scripts/wrangler_blog_simple.py b/tests/glue_scripts/wrangler_blog_simple.py index 215d8e381..ce1b5a03e 100644 --- a/tests/glue_scripts/wrangler_blog_simple.py +++ b/tests/glue_scripts/wrangler_blog_simple.py @@ -7,37 +7,33 @@ glue_database = os.environ["glue-database"] glue_table = os.environ["glue-table"] -category = "toys" +# Read 1.5 Gb Parquet data +df = wr.s3.read_parquet(path="s3://ursa-labs-taxi-data/2017/") -# Read Parquet data (1.2 Gb parquet compressed) -df = wr.s3.read_parquet( - path=f"s3://amazon-reviews-pds/parquet/product_category={category.title()}/", -) - -# Drop customer_id column -df.drop("customer_id", axis=1, inplace=True) +# Drop vendor_id column +df.drop("vendor_id", axis=1, inplace=True) -# Filter reviews with 5-star rating -df5 = df[df["star_rating"] == 5] +# Filter trips with 1 passenger +df1 = df[df["trip_distance"] > 1] -# Write partitioned five stars reviews to S3 in Parquet format +# Write partitioned trips to S3 in Parquet format wr.s3.to_parquet( - df5, - path=f"{output_path}output/{category}/", - partition_cols=["year", "marketplace"], + df1, + path=f"{output_path}output/{glue_table}/", + partition_cols=["passenger_count", "payment_type"], dataset=True, database=glue_database, table=glue_table, ) # Read the data back to a modin df via Athena -df5_athena = wr.athena.read_sql_query( +df1_athena = wr.athena.read_sql_query( f"SELECT * FROM {glue_table}", database=glue_database, ctas_approach=False, unload_approach=True, workgroup=workgroup_name, - s3_output=f"{output_path}unload/{category}/", + s3_output=f"{output_path}unload/{glue_table}/", ) # Delete table (required due to LF) diff --git a/tests/load/test_databases.py b/tests/load/test_databases.py index 07bc59605..297615020 100644 --- a/tests/load/test_databases.py +++ b/tests/load/test_databases.py @@ -109,7 +109,7 @@ def test_redshift_copy_unload( @pytest.mark.parametrize("benchmark_time", [40]) def test_athena_unload(benchmark_time: int, path: str, glue_table: str, glue_database: str, request) -> None: - df = wr.s3.read_parquet(path="s3://amazon-reviews-pds/parquet/product_category=Toys/", dataset=True) + df = wr.s3.read_parquet(path="s3://ursa-labs-taxi-data/2017/", dataset=True) wr.s3.to_parquet( df, @@ -117,7 +117,7 @@ def test_athena_unload(benchmark_time: int, path: str, glue_table: str, glue_dat dataset=True, table=glue_table, database=glue_database, - partition_cols=["year", "marketplace"], + partition_cols=["passenger_count", "payment_type"], ) with ExecutionTimer(request) as timer: @@ -136,7 +136,7 @@ def test_athena_unload(benchmark_time: int, path: str, glue_table: str, glue_dat @pytest.mark.parametrize("benchmark_time", [80]) def test_lakeformation_read(benchmark_time: int, path: str, glue_table: str, glue_database: str, request) -> None: - df = wr.s3.read_parquet(path="s3://amazon-reviews-pds/parquet/product_category=Home/", dataset=True) + df = wr.s3.read_parquet(path="s3://ursa-labs-taxi-data/2017/", dataset=True) wr.s3.to_parquet( df, @@ -145,7 +145,7 @@ def test_lakeformation_read(benchmark_time: int, path: str, glue_table: str, glu dataset=True, table=glue_table, database=glue_database, - partition_cols=["year", "marketplace"], + partition_cols=["passenger_count", "payment_type"], glue_table_settings={ "table_type": "GOVERNED", }, diff --git a/tests/load/test_s3.py b/tests/load/test_s3.py index ca20aa05c..4264e8126 100644 --- a/tests/load/test_s3.py +++ b/tests/load/test_s3.py @@ -96,16 +96,6 @@ def test_s3_read_parquet_many_files( assert timer.elapsed_time < benchmark_time -@pytest.mark.parametrize("benchmark_time", [40]) -def test_s3_read_parquet_partition_filter(benchmark_time: float, request: pytest.FixtureRequest) -> None: - path = "s3://amazon-reviews-pds/parquet/" - with ExecutionTimer(request, data_paths=path) as timer: - filter = lambda x: True if x["product_category"].startswith("Wireless") else False # noqa: E731 - wr.s3.read_parquet(path=path, dataset=True, partition_filter=filter) - - assert timer.elapsed_time < benchmark_time - - @pytest.mark.parametrize("benchmark_time", [5]) @pytest.mark.parametrize("path_suffix", [None, "df.parquet"]) def test_s3_write_parquet_simple(