From 7c76508d39628e5f45086a9380598c1adeec65d7 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 5 Mar 2024 23:05:57 +0000 Subject: [PATCH 1/7] feat: add engine parameter to `read_parquet` --- bigframes/pandas/__init__.py | 4 +++- bigframes/session/__init__.py | 21 +++++++++++-------- tests/system/small/test_session.py | 18 +++++++++++++--- .../bigframes_vendored/pandas/io/parquet.py | 8 ++++++- 4 files changed, 37 insertions(+), 14 deletions(-) diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index 3c9bb003cc..ad8d2b6b06 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -597,7 +597,9 @@ def read_pickle( read_pickle.__doc__ = inspect.getdoc(bigframes.session.Session.read_pickle) -def read_parquet(path: str | IO["bytes"]) -> bigframes.dataframe.DataFrame: +def read_parquet( + path: str | IO["bytes"], *, engine: str +) -> bigframes.dataframe.DataFrame: return global_session.with_default_session( bigframes.session.Session.read_parquet, path, diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index ef4a349244..4d5e359b48 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1130,19 +1130,22 @@ def read_pickle( def read_parquet( self, path: str | IO["bytes"], + *, + engine: str = "auto", ) -> dataframe.DataFrame: - # Note: "engine" is omitted because it is redundant. Loading a table - # from a pandas DataFrame will just create another parquet file + load - # job anyway. table = bigframes_io.random_table(self._anonymous_dataset) - job_config = bigquery.LoadJobConfig() - job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED - job_config.source_format = bigquery.SourceFormat.PARQUET - job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY - job_config.labels = {"bigframes-api": "read_parquet"} + if engine == "bigquery": + job_config = bigquery.LoadJobConfig() + job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED + job_config.source_format = bigquery.SourceFormat.PARQUET + job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY + job_config.labels = {"bigframes-api": "read_parquet"} - return self._read_bigquery_load_job(path, table, job_config=job_config) + return self._read_bigquery_load_job(path, table, job_config=job_config) + else: + pandas_obj = pandas.read_parquet(path, engine=engine) + return self._read_pandas(pandas_obj, "read_parquet") def read_json( self, diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index 85573472b9..dca3018c5a 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -856,11 +856,19 @@ def test_read_pickle_gcs(session, penguins_pandas_df_default_index, gcs_folder): pd.testing.assert_frame_equal(penguins_pandas_df_default_index, df.to_pandas()) -def test_read_parquet_gcs(session: bigframes.Session, scalars_dfs, gcs_folder): +@pytest.mark.parametrize( + ("engine",), + ( + ("auto",), + ("bigquery",), + ), +) +def test_read_parquet_gcs(session: bigframes.Session, scalars_dfs, gcs_folder, engine): scalars_df, _ = scalars_dfs # Include wildcard so that multiple files can be written/read if > 1 GB. # https://cloud.google.com/bigquery/docs/exporting-data#exporting_data_into_one_or_more_files path = gcs_folder + test_read_parquet_gcs.__name__ + "*.parquet" + df_in: bigframes.dataframe.DataFrame = scalars_df.copy() # GEOGRAPHY not supported in parquet export. df_in = df_in.drop(columns="geography_col") @@ -869,8 +877,12 @@ def test_read_parquet_gcs(session: bigframes.Session, scalars_dfs, gcs_folder): df_write.index.name = f"ordering_id_{random.randrange(1_000_000)}" df_write.to_parquet(path, index=True) + # Only bigquery engine for reads supports wildcards in path name. + if engine != "bigquery": + path = path.replace("*", "000000000000") + df_out = ( - session.read_parquet(path) + session.read_parquet(path, engine=engine) # Restore order. .set_index(df_write.index.name).sort_index() # Restore index. @@ -919,7 +931,7 @@ def test_read_parquet_gcs_compressed( df_write.to_parquet(path, compression=compression, index=True) df_out = ( - session.read_parquet(path) + session.read_parquet(path, engine="bigquery") # Restore order. .set_index(df_write.index.name).sort_index() # Restore index. diff --git a/third_party/bigframes_vendored/pandas/io/parquet.py b/third_party/bigframes_vendored/pandas/io/parquet.py index 0f664e70fc..877a384b6d 100644 --- a/third_party/bigframes_vendored/pandas/io/parquet.py +++ b/third_party/bigframes_vendored/pandas/io/parquet.py @@ -9,6 +9,8 @@ class ParquetIOMixin: def read_parquet( self, path: str, + *, + engine: str = "auto", ): r"""Load a Parquet object from the file path (local or Cloud Storage), returning a DataFrame. @@ -23,11 +25,15 @@ def read_parquet( >>> bpd.options.display.progress_bar = None >>> gcs_path = "gs://cloud-samples-data/bigquery/us-states/us-states.parquet" - >>> df = bpd.read_parquet(path=gcs_path) + >>> df = bpd.read_parquet(path=gcs_path, engine="bigquery") Args: path (str): Local or Cloud Storage path to Parquet file. + engine (str): + One of ``'auto', 'pyarrow', 'fastparquet'``, or ``'bigquery'``. + Parquet library to parse the file. If set to ``'bigquery'``, + order is not preserved. Default, ``'auto'``. Returns: bigframes.dataframe.DataFrame: A BigQuery DataFrames. From d27efac2d23fc70f1c7656b9a348063073f2554f Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 5 Mar 2024 23:09:32 +0000 Subject: [PATCH 2/7] fix bigframes.pandas.read_parquet --- bigframes/pandas/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index ad8d2b6b06..3120e96b1a 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -598,11 +598,12 @@ def read_pickle( def read_parquet( - path: str | IO["bytes"], *, engine: str + path: str | IO["bytes"], *, engine: str = "auto" ) -> bigframes.dataframe.DataFrame: return global_session.with_default_session( bigframes.session.Session.read_parquet, path, + engine=engine, ) From 042f38d78ab1e6e11d2a1db8bbf651d64302f399 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 6 Mar 2024 15:58:17 +0000 Subject: [PATCH 3/7] mypy failure --- bigframes/session/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 4d5e359b48..178339ec5a 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1144,7 +1144,7 @@ def read_parquet( return self._read_bigquery_load_job(path, table, job_config=job_config) else: - pandas_obj = pandas.read_parquet(path, engine=engine) + pandas_obj = pandas.read_parquet(path, engine=engine) # type: ignore return self._read_pandas(pandas_obj, "read_parquet") def read_json( From 8c83b8bf414f4f0b7300b416480baecea3cc5384 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 6 Mar 2024 16:30:24 +0000 Subject: [PATCH 4/7] preserve null integers --- bigframes/session/__init__.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 178339ec5a..21cfe425c5 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1144,7 +1144,17 @@ def read_parquet( return self._read_bigquery_load_job(path, table, job_config=job_config) else: - pandas_obj = pandas.read_parquet(path, engine=engine) # type: ignore + read_parquet_kwargs: Dict[str, Any] = {} + if pandas.__version__.startswith("1."): + read_parquet_kwargs["use_nullable_dtypes"] = True + else: + read_parquet_kwargs["dtype_backend"] = "pyarrow" + + pandas_obj = pandas.read_parquet( + path, + engine=engine, + **read_parquet_kwargs, + ) # type: ignore return self._read_pandas(pandas_obj, "read_parquet") def read_json( From ea3291c722c10dfa439fe960db0c336a927bf1ea Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 6 Mar 2024 16:32:29 +0000 Subject: [PATCH 5/7] fix timestamp test --- tests/system/small/test_session.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index dca3018c5a..40bd9eb356 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -891,9 +891,10 @@ def test_read_parquet_gcs(session: bigframes.Session, scalars_dfs, gcs_folder, e # DATETIME gets loaded as TIMESTAMP in parquet. See: # https://cloud.google.com/bigquery/docs/exporting-data#parquet_export_details - df_out = df_out.assign( - datetime_col=df_out["datetime_col"].astype("timestamp[us][pyarrow]") - ) + if engine == "bigquery" or pd.__version__.startswith("1."): + df_out = df_out.assign( + datetime_col=df_out["datetime_col"].astype("timestamp[us][pyarrow]") + ) # Make sure we actually have at least some values before comparing. assert df_out.size != 0 From 659a8605aecaae45c1d6d318c18b9fd3a13239c0 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 6 Mar 2024 16:36:41 +0000 Subject: [PATCH 6/7] fix timestamp test --- tests/system/small/test_session.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index 40bd9eb356..2e2252be06 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -891,10 +891,10 @@ def test_read_parquet_gcs(session: bigframes.Session, scalars_dfs, gcs_folder, e # DATETIME gets loaded as TIMESTAMP in parquet. See: # https://cloud.google.com/bigquery/docs/exporting-data#parquet_export_details - if engine == "bigquery" or pd.__version__.startswith("1."): - df_out = df_out.assign( - datetime_col=df_out["datetime_col"].astype("timestamp[us][pyarrow]") - ) + df_out = df_out.assign( + datetime_col=df_out["datetime_col"].astype("timestamp[us][pyarrow]"), + timestamp_col=df_out["timestamp_col"].astype("timestamp[us, tz=UTC][pyarrow]"), + ) # Make sure we actually have at least some values before comparing. assert df_out.size != 0 From bed0ed69ccc46e5b7ceb1f27fd51db8c0facd262 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 6 Mar 2024 20:05:54 +0000 Subject: [PATCH 7/7] mypy --- bigframes/session/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 21cfe425c5..4b30a3a9d1 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1152,9 +1152,9 @@ def read_parquet( pandas_obj = pandas.read_parquet( path, - engine=engine, + engine=engine, # type: ignore **read_parquet_kwargs, - ) # type: ignore + ) return self._read_pandas(pandas_obj, "read_parquet") def read_json(