Skip to content

Commit

Permalink
fix: replace amazon-reviews with ursa-labs public S3 bucket (#2460)
Browse files Browse the repository at this point in the history
  • Loading branch information
jaidisido authored Sep 14, 2023
1 parent e19e987 commit 67d40c1
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 30 deletions.
28 changes: 12 additions & 16 deletions tests/glue_scripts/wrangler_blog_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,37 +7,33 @@
glue_database = os.environ["glue-database"]
glue_table = os.environ["glue-table"]

category = "toys"
# Read 1.5 Gb Parquet data
df = wr.s3.read_parquet(path="s3://ursa-labs-taxi-data/2017/")

# Read Parquet data (1.2 Gb parquet compressed)
df = wr.s3.read_parquet(
path=f"s3://amazon-reviews-pds/parquet/product_category={category.title()}/",
)

# Drop customer_id column
df.drop("customer_id", axis=1, inplace=True)
# Drop vendor_id column
df.drop("vendor_id", axis=1, inplace=True)

# Filter reviews with 5-star rating
df5 = df[df["star_rating"] == 5]
# Filter trips with 1 passenger
df1 = df[df["trip_distance"] > 1]

# Write partitioned five stars reviews to S3 in Parquet format
# Write partitioned trips to S3 in Parquet format
wr.s3.to_parquet(
df5,
path=f"{output_path}output/{category}/",
partition_cols=["year", "marketplace"],
df1,
path=f"{output_path}output/{glue_table}/",
partition_cols=["passenger_count", "payment_type"],
dataset=True,
database=glue_database,
table=glue_table,
)

# Read the data back to a modin df via Athena
df5_athena = wr.athena.read_sql_query(
df1_athena = wr.athena.read_sql_query(
f"SELECT * FROM {glue_table}",
database=glue_database,
ctas_approach=False,
unload_approach=True,
workgroup=workgroup_name,
s3_output=f"{output_path}unload/{category}/",
s3_output=f"{output_path}unload/{glue_table}/",
)

# Delete table (required due to LF)
Expand Down
8 changes: 4 additions & 4 deletions tests/load/test_databases.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,15 @@ def test_redshift_copy_unload(

@pytest.mark.parametrize("benchmark_time", [40])
def test_athena_unload(benchmark_time: int, path: str, glue_table: str, glue_database: str, request) -> None:
df = wr.s3.read_parquet(path="s3://amazon-reviews-pds/parquet/product_category=Toys/", dataset=True)
df = wr.s3.read_parquet(path="s3://ursa-labs-taxi-data/2017/", dataset=True)

wr.s3.to_parquet(
df,
path,
dataset=True,
table=glue_table,
database=glue_database,
partition_cols=["year", "marketplace"],
partition_cols=["passenger_count", "payment_type"],
)

with ExecutionTimer(request) as timer:
Expand All @@ -136,7 +136,7 @@ def test_athena_unload(benchmark_time: int, path: str, glue_table: str, glue_dat

@pytest.mark.parametrize("benchmark_time", [80])
def test_lakeformation_read(benchmark_time: int, path: str, glue_table: str, glue_database: str, request) -> None:
df = wr.s3.read_parquet(path="s3://amazon-reviews-pds/parquet/product_category=Home/", dataset=True)
df = wr.s3.read_parquet(path="s3://ursa-labs-taxi-data/2017/", dataset=True)

wr.s3.to_parquet(
df,
Expand All @@ -145,7 +145,7 @@ def test_lakeformation_read(benchmark_time: int, path: str, glue_table: str, glu
dataset=True,
table=glue_table,
database=glue_database,
partition_cols=["year", "marketplace"],
partition_cols=["passenger_count", "payment_type"],
glue_table_settings={
"table_type": "GOVERNED",
},
Expand Down
10 changes: 0 additions & 10 deletions tests/load/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,6 @@ def test_s3_read_parquet_many_files(
assert timer.elapsed_time < benchmark_time


@pytest.mark.parametrize("benchmark_time", [40])
def test_s3_read_parquet_partition_filter(benchmark_time: float, request: pytest.FixtureRequest) -> None:
path = "s3://amazon-reviews-pds/parquet/"
with ExecutionTimer(request, data_paths=path) as timer:
filter = lambda x: True if x["product_category"].startswith("Wireless") else False # noqa: E731
wr.s3.read_parquet(path=path, dataset=True, partition_filter=filter)

assert timer.elapsed_time < benchmark_time


@pytest.mark.parametrize("benchmark_time", [5])
@pytest.mark.parametrize("path_suffix", [None, "df.parquet"])
def test_s3_write_parquet_simple(
Expand Down

0 comments on commit 67d40c1

Please sign in to comment.