Skip to content

Commit

Permalink
Merge branch 'main' into shobs-min-iam
Browse files Browse the repository at this point in the history
  • Loading branch information
shobsi authored Mar 7, 2024
2 parents 5a35b59 + 38bd2ba commit 6c17eb4
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 15 deletions.
2 changes: 1 addition & 1 deletion bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ def join(
self,
other: ArrayValue,
join_def: join_def.JoinDefinition,
allow_row_identity_join: bool = True,
allow_row_identity_join: bool = False,
):
return ArrayValue(
nodes.JoinNode(
Expand Down
4 changes: 2 additions & 2 deletions bigframes/core/compile/single_column.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def join_by_column_ordered(
left: compiled.OrderedIR,
right: compiled.OrderedIR,
join: join_defs.JoinDefinition,
allow_row_identity_join: bool = True,
allow_row_identity_join: bool = False,
) -> compiled.OrderedIR:
"""Join two expressions by column equality.
Expand Down Expand Up @@ -134,7 +134,7 @@ def join_by_column_unordered(
left: compiled.UnorderedIR,
right: compiled.UnorderedIR,
join: join_defs.JoinDefinition,
allow_row_identity_join: bool = True,
allow_row_identity_join: bool = False,
) -> compiled.UnorderedIR:
"""Join two expressions by column equality.
Expand Down
2 changes: 1 addition & 1 deletion bigframes/core/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class JoinNode(BigFrameNode):
left_child: BigFrameNode
right_child: BigFrameNode
join: JoinDefinition
allow_row_identity_join: bool = True
allow_row_identity_join: bool = False

@property
def row_preserving(self) -> bool:
Expand Down
8 changes: 7 additions & 1 deletion bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -1253,11 +1253,17 @@ def apply(
ex.message += f"\n{_remote_function_recommendation_message}"
raise

# We are working with remote function at this point
reprojected_series = Series(self._block._force_reproject())
return reprojected_series._apply_unary_op(
result_series = reprojected_series._apply_unary_op(
ops.RemoteFunctionOp(func=func, apply_on_null=True)
)

# return Series with materialized result so that any error in the remote
# function is caught early
materialized_series = result_series._cached()
return materialized_series

def add_prefix(self, prefix: str, axis: int | str | None = None) -> Series:
return Series(self._get_block().add_prefix(prefix))

Expand Down
7 changes: 7 additions & 0 deletions tests/system/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,13 @@ def scalars_table_id(test_data_tables) -> str:
return test_data_tables["scalars"]


@pytest.fixture(scope="session")
def baseball_schedules_df(session: bigframes.Session) -> bigframes.dataframe.DataFrame:
"""Public BQ table"""
df = session.read_gbq("bigquery-public-data.baseball.schedules")
return df


@pytest.fixture(scope="session")
def hockey_table_id(test_data_tables) -> str:
return test_data_tables["hockey_players"]
Expand Down
70 changes: 70 additions & 0 deletions tests/system/small/regression/test_issue355_merge_after_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pandas as pd
import pytest

from tests.system.utils import assert_pandas_df_equal


@pytest.mark.parametrize(
("merge_how",),
[
("inner",),
("outer",),
("left",),
("right",),
],
)
def test_merge_after_filter(baseball_schedules_df, merge_how):
on = ["awayTeamName"]
left_columns = [
"gameId",
"year",
"homeTeamName",
"awayTeamName",
"duration_minutes",
]
right_columns = [
"gameId",
"year",
"homeTeamName",
"awayTeamName",
"duration_minutes",
]

left = baseball_schedules_df[left_columns]
left = left[left["homeTeamName"] == "Rays"]
# Offset the rows somewhat so that outer join can have an effect.
right = baseball_schedules_df[right_columns]
right = right[right["homeTeamName"] == "White Sox"]

df = left.merge(right, on=on, how=merge_how)
bf_result = df.to_pandas()

left_pandas = baseball_schedules_df.to_pandas()[left_columns]
left_pandas = left_pandas[left_pandas["homeTeamName"] == "Rays"]

right_pandas = baseball_schedules_df.to_pandas()[right_columns]
right_pandas = right_pandas[right_pandas["homeTeamName"] == "White Sox"]

pd_result = pd.merge(
left_pandas,
right_pandas,
merge_how,
on,
sort=True,
)

assert_pandas_df_equal(bf_result, pd_result, ignore_order=True)
21 changes: 11 additions & 10 deletions third_party/bigframes_vendored/pandas/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -3935,6 +3935,11 @@ def nlargest(self, n: int, columns, keep: str = "first"):
``df.sort_values(columns, ascending=False).head(n)``, but more
performant.
.. note::
This function cannot be used with all column types. For example, when
specifying columns with `object` or `category` dtypes, ``TypeError`` is
raised.
**Examples:**
>>> import bigframes.pandas as bpd
Expand Down Expand Up @@ -4002,11 +4007,6 @@ def nlargest(self, n: int, columns, keep: str = "first"):
Returns:
DataFrame: The first `n` rows ordered by the given columns in descending order.
.. note::
This function cannot be used with all column types. For example, when
specifying columns with `object` or `category` dtypes, ``TypeError`` is
raised.
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

Expand All @@ -4022,6 +4022,12 @@ def nsmallest(self, n: int, columns, keep: str = "first"):
``df.sort_values(columns, ascending=True).head(n)``, but more
performant.
.. note::
This function cannot be used with all column types. For example, when
specifying columns with `object` or `category` dtypes, ``TypeError`` is
raised.
**Examples:**
>>> import bigframes.pandas as bpd
Expand Down Expand Up @@ -4090,11 +4096,6 @@ def nsmallest(self, n: int, columns, keep: str = "first"):
Returns:
DataFrame: The first `n` rows ordered by the given columns in ascending order.
.. note::
This function cannot be used with all column types. For example, when
specifying columns with `object` or `category` dtypes, ``TypeError`` is
raised.
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

Expand Down

0 comments on commit 6c17eb4

Please sign in to comment.