From fd9862f3500aa56c12061025c65794579a2af4fa Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Sat, 7 Dec 2024 00:36:27 +0000 Subject: [PATCH] feat: Series.isin supports bigframes.Series arg --- bigframes/core/blocks.py | 37 ++++++++++++++++++++++++++++ bigframes/ml/utils.py | 2 +- bigframes/series.py | 3 ++- tests/system/small/test_series.py | 40 +++++++++++++++++++++++++++++++ 4 files changed, 80 insertions(+), 2 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 574bed00eb..892f34a75f 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -2060,6 +2060,43 @@ def concat( result_block = result_block.reset_index() return result_block + def isin(self, other: Block): + # TODO: Support multiple other columns and match on label + # TODO: Model as explicit "IN" subquery/join to better allow db to optimize + assert len(other.value_columns) == 1 + unique_other_values = other.expr.select_columns( + [other.value_columns[0]] + ).aggregate((), by_column_ids=(other.value_columns[0],)) + block = self + # for each original column, join with other + for i in range(len(self.value_columns)): + block = block._isin_inner(block.value_columns[i], unique_other_values) + return block + + def _isin_inner(self: Block, col: str, unique_values: core.ArrayValue) -> Block: + unique_values, const = unique_values.create_constant( + True, dtype=bigframes.dtypes.BOOL_DTYPE + ) + expr, (l_map, r_map) = self._expr.relational_join( + unique_values, ((col, unique_values.column_ids[0]),), type="left" + ) + expr, matches = expr.project_to_id( + ops.eq_op.as_expr(ex.const(True), r_map[const]) + ) + + new_index_cols = tuple(l_map[idx_col] for idx_col in self.index_columns) + new_value_cols = tuple( + l_map[val_col] if val_col != col else matches + for val_col in self.value_columns + ) + expr = expr.select_columns((*new_index_cols, *new_value_cols)) + return Block( + expr, + index_columns=new_index_cols, + column_labels=self.column_labels, + index_labels=self._index_labels, + ) + def merge( self, other: Block, diff --git a/bigframes/ml/utils.py b/bigframes/ml/utils.py index 72d7054f45..e1620485d5 100644 --- a/bigframes/ml/utils.py +++ b/bigframes/ml/utils.py @@ -92,7 +92,7 @@ def _get_only_column(input: ArrayType) -> Union[pd.Series, bpd.Series]: label = typing.cast(Hashable, input.columns.tolist()[0]) if isinstance(input, pd.DataFrame): return typing.cast(pd.Series, input[label]) - return typing.cast(bpd.Series, input[label]) + return typing.cast(bpd.Series, input[label]) # type: ignore def parse_model_endpoint(model_endpoint: str) -> tuple[str, Optional[str]]: diff --git a/bigframes/series.py b/bigframes/series.py index b92da64aff..af09866bfe 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -718,12 +718,13 @@ def nsmallest(self, n: int = 5, keep: str = "first") -> Series: ) def isin(self, values) -> "Series" | None: + if isinstance(values, (Series,)): + self._block.isin(values._block) if not _is_list_like(values): raise TypeError( "only list-like objects are allowed to be passed to " f"isin(), you passed a [{type(values).__name__}]" ) - return self._apply_unary_op( ops.IsInOp(values=tuple(values), match_nulls=True) ).fillna(value=False) diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index 218708a19d..692b221a19 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -1200,6 +1200,46 @@ def test_isin(scalars_dfs, col_name, test_set): ) +@pytest.mark.parametrize( + ( + "col_name", + "test_set", + ), + [ + ( + "int64_col", + [314159, 2.0, 3, pd.NA], + ), + ( + "int64_col", + [2, 55555, 4], + ), + ( + "float64_col", + [-123.456, 1.25, pd.NA], + ), + ( + "int64_too", + [1, 2, pd.NA], + ), + ( + "string_col", + ["Hello, World!", "Hi", "こんにちは"], + ), + ], +) +def test_isin_bigframes_values(scalars_dfs, col_name, test_set, session): + scalars_df, scalars_pandas_df = scalars_dfs + bf_result = ( + scalars_df[col_name].isin(series.Series(test_set, session=session)).to_pandas() + ) + pd_result = scalars_pandas_df[col_name].isin(test_set).astype("boolean") + pd.testing.assert_series_equal( + pd_result, + bf_result, + ) + + def test_isnull(scalars_dfs): scalars_df, scalars_pandas_df = scalars_dfs col_name = "float64_col"