Skip to content

Commit

Permalink
fix: read_pandas inline respects location (#412)
Browse files Browse the repository at this point in the history
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/python-bigquery-dataframes/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)

Fixes b/327544164 🦕
  • Loading branch information
GarrettWu authored Mar 12, 2024
1 parent 763edeb commit ae0e3ea
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 8 deletions.
4 changes: 2 additions & 2 deletions bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def from_ibis(
return cls(node)

@classmethod
def from_pandas(cls, pd_df: pandas.DataFrame):
def from_pandas(cls, pd_df: pandas.DataFrame, session: bigframes.Session):
iobytes = io.BytesIO()
# Use alphanumeric identifiers, to avoid downstream problems with escaping.
as_ids = [
Expand All @@ -78,7 +78,7 @@ def from_pandas(cls, pd_df: pandas.DataFrame):
]
unique_ids = tuple(bigframes.core.utils.disambiguate_ids(as_ids))
pd_df.reset_index(drop=True).set_axis(unique_ids, axis=1).to_feather(iobytes)
node = nodes.ReadLocalNode(iobytes.getvalue())
node = nodes.ReadLocalNode(feather_bytes=iobytes.getvalue(), session=session)
return cls(node)

@property
Expand Down
4 changes: 2 additions & 2 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def __init__(
self._stats_cache[" ".join(self.index_columns)] = {}

@classmethod
def from_local(cls, data) -> Block:
def from_local(cls, data, session: bigframes.Session) -> Block:
pd_data = pd.DataFrame(data)
columns = pd_data.columns

Expand All @@ -162,7 +162,7 @@ def from_local(cls, data) -> Block:
)
index_ids = pd_data.columns[: len(index_labels)]

keys_expr = core.ArrayValue.from_pandas(pd_data)
keys_expr = core.ArrayValue.from_pandas(pd_data, session)
return cls(
keys_expr,
column_labels=columns,
Expand Down
1 change: 1 addition & 0 deletions bigframes/core/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ def __hash__(self):
@dataclass(frozen=True)
class ReadLocalNode(BigFrameNode):
feather_bytes: bytes
session: typing.Optional[bigframes.session.Session] = None

def __hash__(self):
return self._node_hash
Expand Down
2 changes: 1 addition & 1 deletion bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1646,7 +1646,7 @@ def _reindex_rows(
raise NotImplementedError(
"Cannot reindex with index with different nlevels"
)
new_indexer = DataFrame(index=index)[[]]
new_indexer = DataFrame(index=index, session=self._session)[[]]
# multiindex join is senstive to index names, so we will set all these
result = new_indexer.rename_axis(range(new_indexer.index.nlevels)).join(
self.rename_axis(range(self.index.nlevels)),
Expand Down
2 changes: 1 addition & 1 deletion bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ def _read_pandas(
def _read_pandas_inline(
self, pandas_dataframe: pandas.DataFrame
) -> dataframe.DataFrame:
return dataframe.DataFrame(blocks.Block.from_local(pandas_dataframe))
return dataframe.DataFrame(blocks.Block.from_local(pandas_dataframe, self))

def _read_pandas_load_job(
self, pandas_dataframe: pandas.DataFrame, api_name: str
Expand Down
2 changes: 1 addition & 1 deletion notebooks/location/regionalized.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -2791,7 +2791,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
"version": "3.10.9"
},
"orig_nbformat": 4
},
Expand Down
17 changes: 17 additions & 0 deletions tests/system/small/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,23 @@ def test_df_construct_from_dict():
)


def test_df_construct_inline_respects_location():
import bigframes.pandas as bpd

bpd.close_session()
bpd.options.bigquery.location = "europe-west1"

df = bpd.DataFrame([[1, 2, 3], [4, 5, 6]])
repr(df)

table = bpd.get_global_session().bqclient.get_table(df.query_job.destination)
assert table.location == "europe-west1"

# Reset global session
bpd.close_session()
bpd.options.bigquery.location = "us"


def test_get_column(scalars_dfs):
scalars_df, scalars_pandas_df = scalars_dfs
col_name = "int64_col"
Expand Down
11 changes: 11 additions & 0 deletions tests/system/small/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,17 @@ def test_read_pandas(session, scalars_dfs):
pd.testing.assert_frame_equal(result, expected)


def test_read_pandas_inline_respects_location():
options = bigframes.BigQueryOptions(location="europe-west1")
session = bigframes.Session(options)

df = session.read_pandas(pd.DataFrame([[1, 2, 3], [4, 5, 6]]))
repr(df)

table = session.bqclient.get_table(df.query_job.destination)
assert table.location == "europe-west1"


def test_read_pandas_col_label_w_space(session: bigframes.Session):
expected = pd.DataFrame(
{
Expand Down
9 changes: 8 additions & 1 deletion tests/unit/core/test_blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from unittest import mock

import pandas
import pandas.testing
import pytest

import bigframes
import bigframes.core.blocks as blocks


Expand Down Expand Up @@ -74,8 +77,12 @@
)
def test_block_from_local(data):
expected = pandas.DataFrame(data)
mock_session = mock.create_autospec(spec=bigframes.Session)

# hard-coded the returned dimension of the session for that each of the test case contains 3 rows.
mock_session._execute.return_value = (iter([[3]]), None)

block = blocks.Block.from_local(data)
block = blocks.Block.from_local(data, mock_session)

pandas.testing.assert_index_equal(block.column_labels, expected.columns)
assert tuple(block.index.names) == tuple(expected.index.names)
Expand Down

0 comments on commit ae0e3ea

Please sign in to comment.