From 499f24a5f22ce484db96eb09cd3a0ce972398d81 Mon Sep 17 00:00:00 2001 From: Huan Chen <142538604+Genesis929@users.noreply.github.com> Date: Mon, 28 Oct 2024 21:43:01 -0700 Subject: [PATCH] =?UTF-8?q?perf:=20improve=20series.unique=20performance?= =?UTF-8?q?=20and=20replace=20drop=5Fduplicates=20i=E2=80=A6=20(#1108)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * perf: improve series.unique performance and replace drop_duplicates in tpch test. * change variable name * update docstring and index. * update flag to control behavior. * update examples * delete empty line * keep order error * reduce query_count * improve q16 * benchmark updates --- bigframes/series.py | 13 ++++++-- .../bigframes_vendored/pandas/core/series.py | 18 +++++++++-- .../bigframes_vendored/tpch/queries/q10.py | 30 +++++++++++-------- .../bigframes_vendored/tpch/queries/q11.py | 12 ++++++-- .../bigframes_vendored/tpch/queries/q15.py | 9 ++++-- .../bigframes_vendored/tpch/queries/q16.py | 14 ++++++--- .../bigframes_vendored/tpch/queries/q17.py | 4 +-- .../bigframes_vendored/tpch/queries/q20.py | 7 ++--- .../bigframes_vendored/tpch/queries/q22.py | 17 ++++++----- .../bigframes_vendored/tpch/queries/q4.py | 5 +--- 10 files changed, 84 insertions(+), 45 deletions(-) diff --git a/bigframes/series.py b/bigframes/series.py index 215f4473ee..d311742861 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -1609,9 +1609,16 @@ def drop_duplicates(self, *, keep: str = "first") -> Series: block = block_ops.drop_duplicates(self._block, (self._value_column,), keep) return Series(block) - @validations.requires_ordering() - def unique(self) -> Series: - return self.drop_duplicates() + def unique(self, keep_order=True) -> Series: + if keep_order: + validations.enforce_ordered(self, "unique(keep_order != False)") + return self.drop_duplicates() + block, result = self._block.aggregate( + [self._value_column], + [(self._value_column, agg_ops.AnyValueOp())], + dropna=False, + ) + return Series(block.select_columns(result).reset_index()) def duplicated(self, keep: str = "first") -> Series: if keep is not False: diff --git a/third_party/bigframes_vendored/pandas/core/series.py b/third_party/bigframes_vendored/pandas/core/series.py index 845d623e2a..a3b85205a9 100644 --- a/third_party/bigframes_vendored/pandas/core/series.py +++ b/third_party/bigframes_vendored/pandas/core/series.py @@ -645,13 +645,18 @@ def nunique(self) -> int: """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) - def unique(self) -> Series: + def unique(self, keep_order=True) -> Series: """ Return unique values of Series object. - Uniques are returned in order of appearance. Hash table-based unique, + By default, uniques are returned in order of appearance. Hash table-based unique, therefore does NOT sort. + Args: + keep_order (bool, default True): + If True, preserves the order of the first appearance of each unique value. + If False, returns the elements in ascending order, which can be faster. + **Examples:** >>> import bigframes.pandas as bpd @@ -664,12 +669,21 @@ def unique(self) -> Series: 2 3 3 3 Name: A, dtype: Int64 + + Example with order preservation: Slower, but keeps order >>> s.unique() 0 2 1 1 2 3 Name: A, dtype: Int64 + Example without order preservation: Faster, but loses original order + >>> s.unique(keep_order=False) + 0 1 + 1 2 + 2 3 + Name: A, dtype: Int64 + Returns: Series: The unique values returned as a Series. """ diff --git a/third_party/bigframes_vendored/tpch/queries/q10.py b/third_party/bigframes_vendored/tpch/queries/q10.py index 75a8f2de7f..1650e9ca34 100644 --- a/third_party/bigframes_vendored/tpch/queries/q10.py +++ b/third_party/bigframes_vendored/tpch/queries/q10.py @@ -28,8 +28,6 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session): var1 = date(1993, 10, 1) var2 = date(1994, 1, 1) - q_final = customer.merge - q_final = ( customer.merge(orders, left_on="C_CUSTKEY", right_on="O_CUSTKEY") .merge(lineitem, left_on="O_ORDERKEY", right_on="L_ORDERKEY") @@ -61,15 +59,21 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session): as_index=False, ).agg(REVENUE=bpd.NamedAgg(column="INTERMEDIATE_REVENUE", aggfunc="sum")) - q_final[ - [ - "C_CUSTKEY", - "C_NAME", - "REVENUE", - "C_ACCTBAL", - "N_NAME", - "C_ADDRESS", - "C_PHONE", - "C_COMMENT", + q_final = ( + q_final[ + [ + "C_CUSTKEY", + "C_NAME", + "REVENUE", + "C_ACCTBAL", + "N_NAME", + "C_ADDRESS", + "C_PHONE", + "C_COMMENT", + ] ] - ].sort_values(by="REVENUE", ascending=False).head(20).to_gbq() + .sort_values(by="REVENUE", ascending=False) + .head(20) + ) + + q_final.to_gbq() diff --git a/third_party/bigframes_vendored/tpch/queries/q11.py b/third_party/bigframes_vendored/tpch/queries/q11.py index 484a7c0001..385393f781 100644 --- a/third_party/bigframes_vendored/tpch/queries/q11.py +++ b/third_party/bigframes_vendored/tpch/queries/q11.py @@ -30,10 +30,16 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session): grouped["VALUE"] = grouped["VALUE"].round(2) - total_value = (filtered_df["PS_SUPPLYCOST"] * filtered_df["PS_AVAILQTY"]).sum() - threshold = total_value * 0.0001 + total_value = ( + (filtered_df["PS_SUPPLYCOST"] * filtered_df["PS_AVAILQTY"]).to_frame().sum() + ) + threshold = (total_value * 0.0001).rename("THRESHOLD") + + grouped = grouped.merge(threshold, how="cross") - result_df = grouped[grouped["VALUE"] > threshold] + result_df = grouped[grouped["VALUE"] > grouped["THRESHOLD"]].drop( + columns="THRESHOLD" + ) result_df = result_df.sort_values(by="VALUE", ascending=False) diff --git a/third_party/bigframes_vendored/tpch/queries/q15.py b/third_party/bigframes_vendored/tpch/queries/q15.py index 042adbda8b..adf37f9892 100644 --- a/third_party/bigframes_vendored/tpch/queries/q15.py +++ b/third_party/bigframes_vendored/tpch/queries/q15.py @@ -36,8 +36,13 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session): supplier, grouped_revenue, left_on="S_SUPPKEY", right_on="SUPPLIER_NO" ) - max_revenue = joined_data["TOTAL_REVENUE"].max() - max_revenue_suppliers = joined_data[joined_data["TOTAL_REVENUE"] == max_revenue] + max_revenue = joined_data[["TOTAL_REVENUE"]].max().rename("MAX_REVENUE") + + joined_data = joined_data.merge(max_revenue, how="cross") + + max_revenue_suppliers = joined_data[ + joined_data["TOTAL_REVENUE"] == joined_data["MAX_REVENUE"] + ] max_revenue_suppliers["TOTAL_REVENUE"] = max_revenue_suppliers[ "TOTAL_REVENUE" diff --git a/third_party/bigframes_vendored/tpch/queries/q16.py b/third_party/bigframes_vendored/tpch/queries/q16.py index 1bd2795c42..79f42ec42c 100644 --- a/third_party/bigframes_vendored/tpch/queries/q16.py +++ b/third_party/bigframes_vendored/tpch/queries/q16.py @@ -20,16 +20,22 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session): var1 = "Brand#45" - supplier = supplier[ - supplier["S_COMMENT"].str.contains("Customer.*Complaints", regex=True) - ]["S_SUPPKEY"] + supplier = ( + supplier[ + ~supplier["S_COMMENT"].str.contains("Customer.*Complaints", regex=True) + ]["S_SUPPKEY"] + .unique(keep_order=False) + .to_frame() + ) q_filtered = part.merge(partsupp, left_on="P_PARTKEY", right_on="PS_PARTKEY") q_filtered = q_filtered[q_filtered["P_BRAND"] != var1] q_filtered = q_filtered[~q_filtered["P_TYPE"].str.contains("MEDIUM POLISHED")] q_filtered = q_filtered[q_filtered["P_SIZE"].isin([49, 14, 23, 45, 19, 3, 36, 9])] - final_df = q_filtered[~q_filtered["PS_SUPPKEY"].isin(supplier)] + final_df = q_filtered.merge( + supplier, left_on=["PS_SUPPKEY"], right_on=["S_SUPPKEY"] + ) grouped = final_df.groupby(["P_BRAND", "P_TYPE", "P_SIZE"], as_index=False) result = grouped.agg( diff --git a/third_party/bigframes_vendored/tpch/queries/q17.py b/third_party/bigframes_vendored/tpch/queries/q17.py index 0bd1c44315..56289d57ad 100644 --- a/third_party/bigframes_vendored/tpch/queries/q17.py +++ b/third_party/bigframes_vendored/tpch/queries/q17.py @@ -33,8 +33,8 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session): q_final = q_final[q_final["L_QUANTITY"] < q_final["AVG_QUANTITY"]] - q_final = bpd.DataFrame( - {"AVG_YEARLY": [(q_final["L_EXTENDEDPRICE"].sum() / 7.0).round(2)]} + q_final = ( + (q_final[["L_EXTENDEDPRICE"]].sum() / 7.0).round(2).to_frame(name="AVG_YEARLY") ) q_final.to_gbq() diff --git a/third_party/bigframes_vendored/tpch/queries/q20.py b/third_party/bigframes_vendored/tpch/queries/q20.py index 26651a31c4..fded5f5c97 100644 --- a/third_party/bigframes_vendored/tpch/queries/q20.py +++ b/third_party/bigframes_vendored/tpch/queries/q20.py @@ -46,7 +46,7 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session): if not session._strictly_ordered: filtered_parts = filtered_parts[["P_PARTKEY"]].sort_values(by=["P_PARTKEY"]) - filtered_parts = filtered_parts[["P_PARTKEY"]].drop_duplicates() + filtered_parts = filtered_parts["P_PARTKEY"].unique(keep_order=False).to_frame() joined_parts = filtered_parts.merge( partsupp, left_on="P_PARTKEY", right_on="PS_PARTKEY" ) @@ -56,10 +56,7 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session): ) final_filtered = final_join[final_join["PS_AVAILQTY"] > final_join["SUM_QUANTITY"]] - final_filtered = final_filtered[["PS_SUPPKEY"]] - if not session._strictly_ordered: - final_filtered = final_filtered.sort_values(by="PS_SUPPKEY") - final_filtered = final_filtered.drop_duplicates() + final_filtered = final_filtered["PS_SUPPKEY"].unique(keep_order=False).to_frame() final_result = final_filtered.merge(q3, left_on="PS_SUPPKEY", right_on="S_SUPPKEY") final_result = final_result[["S_NAME", "S_ADDRESS"]].sort_values(by="S_NAME") diff --git a/third_party/bigframes_vendored/tpch/queries/q22.py b/third_party/bigframes_vendored/tpch/queries/q22.py index 137a7d5c36..bc648ef392 100644 --- a/third_party/bigframes_vendored/tpch/queries/q22.py +++ b/third_party/bigframes_vendored/tpch/queries/q22.py @@ -18,13 +18,15 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session): customer["CNTRYCODE"] = customer["C_PHONE"].str.slice(0, 2) - avg_acctbal = customer[ - (customer["CNTRYCODE"].isin(country_codes)) & (customer["C_ACCTBAL"] > 0) - ]["C_ACCTBAL"].mean() + avg_acctbal = ( + customer[ + (customer["CNTRYCODE"].isin(country_codes)) & (customer["C_ACCTBAL"] > 0) + ][["C_ACCTBAL"]] + .mean() + .rename("AVG_ACCTBAL") + ) - if not session._strictly_ordered: - orders = orders.sort_values(by="O_CUSTKEY") - orders_unique = orders.drop_duplicates(subset=["O_CUSTKEY"]) + orders_unique = orders["O_CUSTKEY"].unique(keep_order=False).to_frame() matched_customers = customer.merge( orders_unique, left_on="C_CUSTKEY", right_on="O_CUSTKEY" @@ -35,10 +37,11 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session): matched_customers[["C_CUSTKEY", "IS_IN_ORDERS"]], on="C_CUSTKEY", how="left" ) customer["IS_IN_ORDERS"] = customer["IS_IN_ORDERS"].fillna(False) + customer = customer.merge(avg_acctbal, how="cross") filtered_customers = customer[ (customer["CNTRYCODE"].isin(country_codes)) - & (customer["C_ACCTBAL"] > avg_acctbal) + & (customer["C_ACCTBAL"] > customer["AVG_ACCTBAL"]) & (~customer["IS_IN_ORDERS"]) ] diff --git a/third_party/bigframes_vendored/tpch/queries/q4.py b/third_party/bigframes_vendored/tpch/queries/q4.py index b89f70845f..d149a71f71 100644 --- a/third_party/bigframes_vendored/tpch/queries/q4.py +++ b/third_party/bigframes_vendored/tpch/queries/q4.py @@ -26,10 +26,7 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session): jn = jn[(jn["O_ORDERDATE"] >= var1) & (jn["O_ORDERDATE"] < var2)] jn = jn[jn["L_COMMITDATE"] < jn["L_RECEIPTDATE"]] - if not session._strictly_ordered: - jn = jn.sort_values(by=["O_ORDERPRIORITY", "L_ORDERKEY"]) - - jn = jn.drop_duplicates(subset=["O_ORDERPRIORITY", "L_ORDERKEY"]) + jn = jn.groupby(["O_ORDERPRIORITY", "L_ORDERKEY"], as_index=False).agg("size") gb = jn.groupby("O_ORDERPRIORITY", as_index=False) agg = gb.agg(ORDER_COUNT=bpd.NamedAgg(column="L_ORDERKEY", aggfunc="count"))