Skip to content

Commit

Permalink
perf: improve series.unique performance and replace drop_duplicates i… (
Browse files Browse the repository at this point in the history
#1108)

* perf: improve series.unique performance and replace drop_duplicates in tpch test.

* change variable name

* update docstring and index.

* update flag to control behavior.

* update examples

* delete empty line

* keep order error

* reduce query_count

* improve q16

* benchmark updates
  • Loading branch information
Genesis929 authored Oct 29, 2024
1 parent 254875c commit 499f24a
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 45 deletions.
13 changes: 10 additions & 3 deletions bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -1609,9 +1609,16 @@ def drop_duplicates(self, *, keep: str = "first") -> Series:
block = block_ops.drop_duplicates(self._block, (self._value_column,), keep)
return Series(block)

@validations.requires_ordering()
def unique(self) -> Series:
return self.drop_duplicates()
def unique(self, keep_order=True) -> Series:
if keep_order:
validations.enforce_ordered(self, "unique(keep_order != False)")
return self.drop_duplicates()
block, result = self._block.aggregate(
[self._value_column],
[(self._value_column, agg_ops.AnyValueOp())],
dropna=False,
)
return Series(block.select_columns(result).reset_index())

def duplicated(self, keep: str = "first") -> Series:
if keep is not False:
Expand Down
18 changes: 16 additions & 2 deletions third_party/bigframes_vendored/pandas/core/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,13 +645,18 @@ def nunique(self) -> int:
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

def unique(self) -> Series:
def unique(self, keep_order=True) -> Series:
"""
Return unique values of Series object.
Uniques are returned in order of appearance. Hash table-based unique,
By default, uniques are returned in order of appearance. Hash table-based unique,
therefore does NOT sort.
Args:
keep_order (bool, default True):
If True, preserves the order of the first appearance of each unique value.
If False, returns the elements in ascending order, which can be faster.
**Examples:**
>>> import bigframes.pandas as bpd
Expand All @@ -664,12 +669,21 @@ def unique(self) -> Series:
2 3
3 3
Name: A, dtype: Int64
Example with order preservation: Slower, but keeps order
>>> s.unique()
0 2
1 1
2 3
Name: A, dtype: Int64
Example without order preservation: Faster, but loses original order
>>> s.unique(keep_order=False)
0 1
1 2
2 3
Name: A, dtype: Int64
Returns:
Series: The unique values returned as a Series.
"""
Expand Down
30 changes: 17 additions & 13 deletions third_party/bigframes_vendored/tpch/queries/q10.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session):
var1 = date(1993, 10, 1)
var2 = date(1994, 1, 1)

q_final = customer.merge

q_final = (
customer.merge(orders, left_on="C_CUSTKEY", right_on="O_CUSTKEY")
.merge(lineitem, left_on="O_ORDERKEY", right_on="L_ORDERKEY")
Expand Down Expand Up @@ -61,15 +59,21 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session):
as_index=False,
).agg(REVENUE=bpd.NamedAgg(column="INTERMEDIATE_REVENUE", aggfunc="sum"))

q_final[
[
"C_CUSTKEY",
"C_NAME",
"REVENUE",
"C_ACCTBAL",
"N_NAME",
"C_ADDRESS",
"C_PHONE",
"C_COMMENT",
q_final = (
q_final[
[
"C_CUSTKEY",
"C_NAME",
"REVENUE",
"C_ACCTBAL",
"N_NAME",
"C_ADDRESS",
"C_PHONE",
"C_COMMENT",
]
]
].sort_values(by="REVENUE", ascending=False).head(20).to_gbq()
.sort_values(by="REVENUE", ascending=False)
.head(20)
)

q_final.to_gbq()
12 changes: 9 additions & 3 deletions third_party/bigframes_vendored/tpch/queries/q11.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,16 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session):

grouped["VALUE"] = grouped["VALUE"].round(2)

total_value = (filtered_df["PS_SUPPLYCOST"] * filtered_df["PS_AVAILQTY"]).sum()
threshold = total_value * 0.0001
total_value = (
(filtered_df["PS_SUPPLYCOST"] * filtered_df["PS_AVAILQTY"]).to_frame().sum()
)
threshold = (total_value * 0.0001).rename("THRESHOLD")

grouped = grouped.merge(threshold, how="cross")

result_df = grouped[grouped["VALUE"] > threshold]
result_df = grouped[grouped["VALUE"] > grouped["THRESHOLD"]].drop(
columns="THRESHOLD"
)

result_df = result_df.sort_values(by="VALUE", ascending=False)

Expand Down
9 changes: 7 additions & 2 deletions third_party/bigframes_vendored/tpch/queries/q15.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,13 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session):
supplier, grouped_revenue, left_on="S_SUPPKEY", right_on="SUPPLIER_NO"
)

max_revenue = joined_data["TOTAL_REVENUE"].max()
max_revenue_suppliers = joined_data[joined_data["TOTAL_REVENUE"] == max_revenue]
max_revenue = joined_data[["TOTAL_REVENUE"]].max().rename("MAX_REVENUE")

joined_data = joined_data.merge(max_revenue, how="cross")

max_revenue_suppliers = joined_data[
joined_data["TOTAL_REVENUE"] == joined_data["MAX_REVENUE"]
]

max_revenue_suppliers["TOTAL_REVENUE"] = max_revenue_suppliers[
"TOTAL_REVENUE"
Expand Down
14 changes: 10 additions & 4 deletions third_party/bigframes_vendored/tpch/queries/q16.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,22 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session):

var1 = "Brand#45"

supplier = supplier[
supplier["S_COMMENT"].str.contains("Customer.*Complaints", regex=True)
]["S_SUPPKEY"]
supplier = (
supplier[
~supplier["S_COMMENT"].str.contains("Customer.*Complaints", regex=True)
]["S_SUPPKEY"]
.unique(keep_order=False)
.to_frame()
)

q_filtered = part.merge(partsupp, left_on="P_PARTKEY", right_on="PS_PARTKEY")
q_filtered = q_filtered[q_filtered["P_BRAND"] != var1]
q_filtered = q_filtered[~q_filtered["P_TYPE"].str.contains("MEDIUM POLISHED")]
q_filtered = q_filtered[q_filtered["P_SIZE"].isin([49, 14, 23, 45, 19, 3, 36, 9])]

final_df = q_filtered[~q_filtered["PS_SUPPKEY"].isin(supplier)]
final_df = q_filtered.merge(
supplier, left_on=["PS_SUPPKEY"], right_on=["S_SUPPKEY"]
)

grouped = final_df.groupby(["P_BRAND", "P_TYPE", "P_SIZE"], as_index=False)
result = grouped.agg(
Expand Down
4 changes: 2 additions & 2 deletions third_party/bigframes_vendored/tpch/queries/q17.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session):

q_final = q_final[q_final["L_QUANTITY"] < q_final["AVG_QUANTITY"]]

q_final = bpd.DataFrame(
{"AVG_YEARLY": [(q_final["L_EXTENDEDPRICE"].sum() / 7.0).round(2)]}
q_final = (
(q_final[["L_EXTENDEDPRICE"]].sum() / 7.0).round(2).to_frame(name="AVG_YEARLY")
)

q_final.to_gbq()
7 changes: 2 additions & 5 deletions third_party/bigframes_vendored/tpch/queries/q20.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session):

if not session._strictly_ordered:
filtered_parts = filtered_parts[["P_PARTKEY"]].sort_values(by=["P_PARTKEY"])
filtered_parts = filtered_parts[["P_PARTKEY"]].drop_duplicates()
filtered_parts = filtered_parts["P_PARTKEY"].unique(keep_order=False).to_frame()
joined_parts = filtered_parts.merge(
partsupp, left_on="P_PARTKEY", right_on="PS_PARTKEY"
)
Expand All @@ -56,10 +56,7 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session):
)
final_filtered = final_join[final_join["PS_AVAILQTY"] > final_join["SUM_QUANTITY"]]

final_filtered = final_filtered[["PS_SUPPKEY"]]
if not session._strictly_ordered:
final_filtered = final_filtered.sort_values(by="PS_SUPPKEY")
final_filtered = final_filtered.drop_duplicates()
final_filtered = final_filtered["PS_SUPPKEY"].unique(keep_order=False).to_frame()

final_result = final_filtered.merge(q3, left_on="PS_SUPPKEY", right_on="S_SUPPKEY")
final_result = final_result[["S_NAME", "S_ADDRESS"]].sort_values(by="S_NAME")
Expand Down
17 changes: 10 additions & 7 deletions third_party/bigframes_vendored/tpch/queries/q22.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session):

customer["CNTRYCODE"] = customer["C_PHONE"].str.slice(0, 2)

avg_acctbal = customer[
(customer["CNTRYCODE"].isin(country_codes)) & (customer["C_ACCTBAL"] > 0)
]["C_ACCTBAL"].mean()
avg_acctbal = (
customer[
(customer["CNTRYCODE"].isin(country_codes)) & (customer["C_ACCTBAL"] > 0)
][["C_ACCTBAL"]]
.mean()
.rename("AVG_ACCTBAL")
)

if not session._strictly_ordered:
orders = orders.sort_values(by="O_CUSTKEY")
orders_unique = orders.drop_duplicates(subset=["O_CUSTKEY"])
orders_unique = orders["O_CUSTKEY"].unique(keep_order=False).to_frame()

matched_customers = customer.merge(
orders_unique, left_on="C_CUSTKEY", right_on="O_CUSTKEY"
Expand All @@ -35,10 +37,11 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session):
matched_customers[["C_CUSTKEY", "IS_IN_ORDERS"]], on="C_CUSTKEY", how="left"
)
customer["IS_IN_ORDERS"] = customer["IS_IN_ORDERS"].fillna(False)
customer = customer.merge(avg_acctbal, how="cross")

filtered_customers = customer[
(customer["CNTRYCODE"].isin(country_codes))
& (customer["C_ACCTBAL"] > avg_acctbal)
& (customer["C_ACCTBAL"] > customer["AVG_ACCTBAL"])
& (~customer["IS_IN_ORDERS"])
]

Expand Down
5 changes: 1 addition & 4 deletions third_party/bigframes_vendored/tpch/queries/q4.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session):
jn = jn[(jn["O_ORDERDATE"] >= var1) & (jn["O_ORDERDATE"] < var2)]
jn = jn[jn["L_COMMITDATE"] < jn["L_RECEIPTDATE"]]

if not session._strictly_ordered:
jn = jn.sort_values(by=["O_ORDERPRIORITY", "L_ORDERKEY"])

jn = jn.drop_duplicates(subset=["O_ORDERPRIORITY", "L_ORDERKEY"])
jn = jn.groupby(["O_ORDERPRIORITY", "L_ORDERKEY"], as_index=False).agg("size")

gb = jn.groupby("O_ORDERPRIORITY", as_index=False)
agg = gb.agg(ORDER_COUNT=bpd.NamedAgg(column="L_ORDERKEY", aggfunc="count"))
Expand Down

0 comments on commit 499f24a

Please sign in to comment.