From c71d5f8e6a1afde833c3c410dca2b4832ba5fcb5 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 15 Jan 2020 08:42:51 -0600 Subject: [PATCH] refactor(bigquery): `to_dataframe` uses faster `to_arrow` + `to_pandas` when `pyarrow` is available (#10027) * fix(bigquery): to_dataframe uses 2x faster to_arrow + to_pandas when pyarrow is available * fix: skip to_arrow tests when pyarrow is missing * test: update test to work around numpy array encoding of nested arrays * test: add test for tabledata.list with no rows * test: boost test coverage * chore: fix lint --- bigquery/google/cloud/bigquery/table.py | 63 ++++-- bigquery/tests/system.py | 7 +- bigquery/tests/unit/test_table.py | 278 ++++++++++++++++++++---- 3 files changed, 286 insertions(+), 62 deletions(-) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 585676490c38..555f529f3670 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -1519,6 +1519,17 @@ def to_arrow( if pyarrow is None: raise ValueError(_NO_PYARROW_ERROR) + if ( + bqstorage_client or create_bqstorage_client + ) and self.max_results is not None: + warnings.warn( + "Cannot use bqstorage_client if max_results is set, " + "reverting to fetching data with the tabledata.list endpoint.", + stacklevel=2, + ) + create_bqstorage_client = False + bqstorage_client = None + owns_bqstorage_client = False if not bqstorage_client and create_bqstorage_client: owns_bqstorage_client = True @@ -1707,33 +1718,39 @@ def to_dataframe( create_bqstorage_client = False bqstorage_client = None - owns_bqstorage_client = False - if not bqstorage_client and create_bqstorage_client: - owns_bqstorage_client = True - bqstorage_client = self.client._create_bqstorage_client() - - try: - progress_bar = self._get_progress_bar(progress_bar_type) + if pyarrow is not None: + # If pyarrow is available, calling to_arrow, then converting to a + # pandas dataframe is about 2x faster. This is because pandas.concat is + # rarely no-copy, whereas pyarrow.Table.from_batches + to_pandas is + # usually no-copy. + record_batch = self.to_arrow( + progress_bar_type=progress_bar_type, + bqstorage_client=bqstorage_client, + create_bqstorage_client=create_bqstorage_client, + ) + df = record_batch.to_pandas() + for column in dtypes: + df[column] = pandas.Series(df[column], dtype=dtypes[column]) + return df - frames = [] - for frame in self.to_dataframe_iterable( - bqstorage_client=bqstorage_client, dtypes=dtypes - ): - frames.append(frame) + # The bqstorage_client is only used if pyarrow is available, so the + # rest of this method only needs to account for tabledata.list. + progress_bar = self._get_progress_bar(progress_bar_type) - if progress_bar is not None: - # In some cases, the number of total rows is not populated - # until the first page of rows is fetched. Update the - # progress bar's total to keep an accurate count. - progress_bar.total = progress_bar.total or self.total_rows - progress_bar.update(len(frame)) + frames = [] + for frame in self.to_dataframe_iterable(dtypes=dtypes): + frames.append(frame) if progress_bar is not None: - # Indicate that the download has finished. - progress_bar.close() - finally: - if owns_bqstorage_client: - bqstorage_client.transport.channel.close() + # In some cases, the number of total rows is not populated + # until the first page of rows is fetched. Update the + # progress bar's total to keep an accurate count. + progress_bar.total = progress_bar.total or self.total_rows + progress_bar.update(len(frame)) + + if progress_bar is not None: + # Indicate that the download has finished. + progress_bar.close() # Avoid concatting an empty list. if not frames: diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index cd72352c29fd..4a1c032717f5 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -2372,7 +2372,12 @@ def test_nested_table_to_dataframe(self): row = df.iloc[0] # verify the row content self.assertEqual(row["string_col"], "Some value") - self.assertEqual(row["record_col"], record) + expected_keys = tuple(sorted(record.keys())) + row_keys = tuple(sorted(row["record_col"].keys())) + self.assertEqual(row_keys, expected_keys) + # Can't compare numpy arrays, which pyarrow encodes the embedded + # repeated column to, so convert to list. + self.assertEqual(list(row["record_col"]["nested_repeated"]), [0, 1, 2]) # verify that nested data can be accessed with indices/keys self.assertEqual(row["record_col"]["nested_repeated"][0], 0) self.assertEqual( diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index 73fe1c10d49b..6e8958cdc46c 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -1809,6 +1809,46 @@ def test_to_arrow_w_empty_table(self): self.assertEqual(child_field.type.value_type[0].name, "name") self.assertEqual(child_field.type.value_type[1].name, "age") + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + @unittest.skipIf( + bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + ) + def test_to_arrow_max_results_w_create_bqstorage_warning(self): + from google.cloud.bigquery.schema import SchemaField + + schema = [ + SchemaField("name", "STRING", mode="REQUIRED"), + SchemaField("age", "INTEGER", mode="REQUIRED"), + ] + rows = [ + {"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]}, + {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, + ] + path = "/foo" + api_request = mock.Mock(return_value={"rows": rows}) + mock_client = _mock_client() + + row_iterator = self._make_one( + client=mock_client, + api_request=api_request, + path=path, + schema=schema, + max_results=42, + ) + + with warnings.catch_warnings(record=True) as warned: + row_iterator.to_arrow(create_bqstorage_client=True) + + matches = [ + warning + for warning in warned + if warning.category is UserWarning + and "cannot use bqstorage_client" in str(warning).lower() + and "tabledata.list" in str(warning) + ] + self.assertEqual(len(matches), 1, msg="User warning was not emitted.") + mock_client._create_bqstorage_client.assert_not_called() + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") @unittest.skipIf( bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" @@ -1856,7 +1896,7 @@ def test_to_arrow_w_bqstorage(self): mock_page = mock.create_autospec(reader.ReadRowsPage) mock_page.to_arrow.return_value = pyarrow.RecordBatch.from_arrays( - page_items, arrow_schema + page_items, schema=arrow_schema ) mock_pages = (mock_page, mock_page, mock_page) type(mock_rows).pages = mock.PropertyMock(return_value=mock_pages) @@ -2057,6 +2097,80 @@ def test_to_dataframe_iterable(self): self.assertEqual(df_2["name"][0], "Sven") self.assertEqual(df_2["age"][0], 33) + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf( + bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + ) + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_to_dataframe_iterable_w_bqstorage(self): + from google.cloud.bigquery import schema + from google.cloud.bigquery import table as mut + from google.cloud.bigquery_storage_v1beta1 import reader + + arrow_fields = [ + pyarrow.field("colA", pyarrow.int64()), + # Not alphabetical to test column order. + pyarrow.field("colC", pyarrow.float64()), + pyarrow.field("colB", pyarrow.utf8()), + ] + arrow_schema = pyarrow.schema(arrow_fields) + + bqstorage_client = mock.create_autospec( + bigquery_storage_v1beta1.BigQueryStorageClient + ) + bqstorage_client.transport = mock.create_autospec( + big_query_storage_grpc_transport.BigQueryStorageGrpcTransport + ) + streams = [ + # Use two streams we want to check frames are read from each stream. + {"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}, + {"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"}, + ] + session = bigquery_storage_v1beta1.types.ReadSession( + streams=streams, + arrow_schema={"serialized_schema": arrow_schema.serialize().to_pybytes()}, + ) + bqstorage_client.create_read_session.return_value = session + + mock_rowstream = mock.create_autospec(reader.ReadRowsStream) + bqstorage_client.read_rows.return_value = mock_rowstream + + mock_rows = mock.create_autospec(reader.ReadRowsIterable) + mock_rowstream.rows.return_value = mock_rows + page_dataframe = pandas.DataFrame( + {"colA": [1, -1], "colC": [2.0, 4.0], "colB": ["abc", "def"]}, + ) + mock_page = mock.create_autospec(reader.ReadRowsPage) + mock_page.to_dataframe.return_value = page_dataframe + mock_pages = (mock_page, mock_page, mock_page) + type(mock_rows).pages = mock.PropertyMock(return_value=mock_pages) + + schema = [ + schema.SchemaField("colA", "IGNORED"), + schema.SchemaField("colC", "IGNORED"), + schema.SchemaField("colB", "IGNORED"), + ] + + row_iterator = mut.RowIterator( + _mock_client(), + None, # api_request: ignored + None, # path: ignored + schema, + table=mut.TableReference.from_string("proj.dset.tbl"), + selected_fields=schema, + ) + + got = list( + row_iterator.to_dataframe_iterable(bqstorage_client=bqstorage_client) + ) + + # Have expected number of rows? + total_pages = len(streams) * len(mock_pages) + self.assertEqual(len(got), total_pages) + + # Don't close the client if it was passed in. + bqstorage_client.transport.channel.close.assert_not_called() + @mock.patch("google.cloud.bigquery.table.pandas", new=None) def test_to_dataframe_iterable_error_if_pandas_is_none(self): from google.cloud.bigquery.schema import SchemaField @@ -2140,6 +2254,45 @@ def test_to_dataframe_progress_bar( progress_bar_mock().close.assert_called_once() self.assertEqual(len(df), 4) + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf(tqdm is None, "Requires `tqdm`") + @mock.patch("tqdm.tqdm_gui") + @mock.patch("tqdm.tqdm_notebook") + @mock.patch("tqdm.tqdm") + def test_to_dataframe_progress_bar_wo_pyarrow( + self, tqdm_mock, tqdm_notebook_mock, tqdm_gui_mock + ): + from google.cloud.bigquery.schema import SchemaField + + schema = [ + SchemaField("name", "STRING", mode="REQUIRED"), + SchemaField("age", "INTEGER", mode="REQUIRED"), + ] + rows = [ + {"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]}, + {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, + {"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}]}, + {"f": [{"v": "Bhettye Rhubble"}, {"v": "27"}]}, + ] + path = "/foo" + api_request = mock.Mock(return_value={"rows": rows}) + + progress_bars = ( + ("tqdm", tqdm_mock), + ("tqdm_notebook", tqdm_notebook_mock), + ("tqdm_gui", tqdm_gui_mock), + ) + + for progress_bar_type, progress_bar_mock in progress_bars: + row_iterator = self._make_one(_mock_client(), api_request, path, schema) + with mock.patch("google.cloud.bigquery.table.pyarrow", None): + df = row_iterator.to_dataframe(progress_bar_type=progress_bar_type) + + progress_bar_mock.assert_called() + progress_bar_mock().update.assert_called() + progress_bar_mock().close.assert_called_once() + self.assertEqual(len(df), 4) + @unittest.skipIf(pandas is None, "Requires `pandas`") @mock.patch("google.cloud.bigquery.table.tqdm", new=None) def test_to_dataframe_no_tqdm_no_progress_bar(self): @@ -2246,6 +2399,47 @@ def test_to_dataframe_w_empty_results(self): self.assertEqual(len(df), 0) # verify the number of rows self.assertEqual(list(df), ["name", "age"]) # verify the column names + @unittest.skipIf(pandas is None, "Requires `pandas`") + def test_to_dataframe_w_empty_results_wo_pyarrow(self): + from google.cloud.bigquery.schema import SchemaField + + with mock.patch("google.cloud.bigquery.table.pyarrow", None): + schema = [ + SchemaField("name", "STRING", mode="REQUIRED"), + SchemaField("age", "INTEGER", mode="REQUIRED"), + ] + api_request = mock.Mock(return_value={"rows": []}) + row_iterator = self._make_one(_mock_client(), api_request, schema=schema) + + df = row_iterator.to_dataframe() + + self.assertIsInstance(df, pandas.DataFrame) + self.assertEqual(len(df), 0) # verify the number of rows + self.assertEqual(list(df), ["name", "age"]) # verify the column names + + @unittest.skipIf(pandas is None, "Requires `pandas`") + def test_to_dataframe_w_no_results_wo_pyarrow(self): + from google.cloud.bigquery.schema import SchemaField + + with mock.patch("google.cloud.bigquery.table.pyarrow", None): + schema = [ + SchemaField("name", "STRING", mode="REQUIRED"), + SchemaField("age", "INTEGER", mode="REQUIRED"), + ] + api_request = mock.Mock(return_value={"rows": []}) + row_iterator = self._make_one(_mock_client(), api_request, schema=schema) + + def empty_iterable(dtypes=None): + return [] + + row_iterator.to_dataframe_iterable = empty_iterable + + df = row_iterator.to_dataframe() + + self.assertIsInstance(df, pandas.DataFrame) + self.assertEqual(len(df), 0) # verify the number of rows + self.assertEqual(list(df), ["name", "age"]) # verify the column names + @unittest.skipIf(pandas is None, "Requires `pandas`") def test_to_dataframe_logs_tabledata_list(self): from google.cloud.bigquery.table import Table @@ -2278,9 +2472,9 @@ def test_to_dataframe_w_various_types_nullable(self): ] row_data = [ [None, None, None, None, None, None], - ["1.4338368E9", "420", "1.1", "Cash", "true", "1999-12-01"], - ["1.3878117E9", "2580", "17.7", "Cash", "false", "1953-06-14"], - ["1.3855653E9", "2280", "4.4", "Credit", "true", "1981-11-04"], + ["1.4338368E9", "420", "1.1", u"Cash", "true", "1999-12-01"], + ["1.3878117E9", "2580", "17.7", u"Cash", "false", "1953-06-14"], + ["1.3855653E9", "2280", "4.4", u"Credit", "true", "1981-11-04"], ] rows = [{"f": [{"v": field} for field in row]} for row in row_data] path = "/foo" @@ -2300,7 +2494,7 @@ def test_to_dataframe_w_various_types_nullable(self): else: self.assertIsInstance(row.start_timestamp, pandas.Timestamp) self.assertIsInstance(row.seconds, float) - self.assertIsInstance(row.payment_type, str) + self.assertIsInstance(row.payment_type, six.string_types) self.assertIsInstance(row.complete, bool) self.assertIsInstance(row.date, datetime.date) @@ -2318,9 +2512,9 @@ def test_to_dataframe_column_dtypes(self): SchemaField("date", "DATE"), ] row_data = [ - ["1.4338368E9", "420", "1.1", "1.77", "Cash", "true", "1999-12-01"], - ["1.3878117E9", "2580", "17.7", "28.5", "Cash", "false", "1953-06-14"], - ["1.3855653E9", "2280", "4.4", "7.1", "Credit", "true", "1981-11-04"], + ["1.4338368E9", "420", "1.1", "1.77", u"Cash", "true", "1999-12-01"], + ["1.3878117E9", "2580", "17.7", "28.5", u"Cash", "false", "1953-06-14"], + ["1.3855653E9", "2280", "4.4", "7.1", u"Credit", "true", "1981-11-04"], ] rows = [{"f": [{"v": field} for field in row]} for row in row_data] path = "/foo" @@ -2486,9 +2680,9 @@ def test_to_dataframe_w_bqstorage_no_streams(self): api_request=None, path=None, schema=[ - schema.SchemaField("colA", "IGNORED"), - schema.SchemaField("colC", "IGNORED"), - schema.SchemaField("colB", "IGNORED"), + schema.SchemaField("colA", "INTEGER"), + schema.SchemaField("colC", "FLOAT"), + schema.SchemaField("colB", "STRING"), ], table=mut.TableReference.from_string("proj.dset.tbl"), ) @@ -2560,10 +2754,11 @@ def test_to_dataframe_w_bqstorage_empty_streams(self): mock_pages = mock.PropertyMock(return_value=()) type(mock_rows).pages = mock_pages + # Schema is required when there are no record batches in the stream. schema = [ - schema.SchemaField("colA", "IGNORED"), - schema.SchemaField("colC", "IGNORED"), - schema.SchemaField("colB", "IGNORED"), + schema.SchemaField("colA", "INTEGER"), + schema.SchemaField("colC", "FLOAT"), + schema.SchemaField("colB", "STRING"), ] row_iterator = mut.RowIterator( @@ -2622,14 +2817,15 @@ def test_to_dataframe_w_bqstorage_nonempty(self): mock_rows = mock.create_autospec(reader.ReadRowsIterable) mock_rowstream.rows.return_value = mock_rows page_items = [ - {"colA": 1, "colB": "abc", "colC": 2.0}, - {"colA": -1, "colB": "def", "colC": 4.0}, + pyarrow.array([1, -1]), + pyarrow.array([2.0, 4.0]), + pyarrow.array(["abc", "def"]), ] - - mock_page = mock.create_autospec(reader.ReadRowsPage) - mock_page.to_dataframe.return_value = pandas.DataFrame( - page_items, columns=["colA", "colB", "colC"] + page_record_batch = pyarrow.RecordBatch.from_arrays( + page_items, schema=arrow_schema ) + mock_page = mock.create_autospec(reader.ReadRowsPage) + mock_page.to_arrow.return_value = page_record_batch mock_pages = (mock_page, mock_page, mock_page) type(mock_rows).pages = mock.PropertyMock(return_value=mock_pages) @@ -2656,7 +2852,7 @@ def test_to_dataframe_w_bqstorage_nonempty(self): # Have expected number of rows? total_pages = len(streams) * len(mock_pages) - total_rows = len(page_items) * total_pages + total_rows = len(page_items[0]) * total_pages self.assertEqual(len(got.index), total_rows) # Don't close the client if it was passed in. @@ -2695,11 +2891,14 @@ def test_to_dataframe_w_bqstorage_multiple_streams_return_unique_index(self): mock_rows = mock.create_autospec(reader.ReadRowsIterable) mock_rowstream.rows.return_value = mock_rows - page_data_frame = pandas.DataFrame( - [{"colA": 1}, {"colA": -1}], columns=["colA"] + page_items = [ + pyarrow.array([1, -1]), + ] + page_record_batch = pyarrow.RecordBatch.from_arrays( + page_items, schema=arrow_schema ) mock_page = mock.create_autospec(reader.ReadRowsPage) - mock_page.to_dataframe.return_value = page_data_frame + mock_page.to_arrow.return_value = page_record_batch mock_pages = (mock_page, mock_page, mock_page) type(mock_rows).pages = mock.PropertyMock(return_value=mock_pages) @@ -2711,7 +2910,7 @@ def test_to_dataframe_w_bqstorage_multiple_streams_return_unique_index(self): self.assertEqual(list(got), ["colA"]) total_pages = len(streams) * len(mock_pages) - total_rows = len(page_data_frame) * total_pages + total_rows = len(page_items[0]) * total_pages self.assertEqual(len(got.index), total_rows) self.assertTrue(got.index.is_unique) @@ -2757,14 +2956,15 @@ def test_to_dataframe_w_bqstorage_updates_progress_bar(self, tqdm_mock): page_items = [-1, 0, 1] type(mock_page).num_items = mock.PropertyMock(return_value=len(page_items)) - def blocking_to_dataframe(*args, **kwargs): - # Sleep for longer than the waiting interval. This ensures the - # progress_queue gets written to more than once because it gives - # the worker->progress updater time to sum intermediate updates. + def blocking_to_arrow(*args, **kwargs): + # Sleep for longer than the waiting interval so that we know we're + # only reading one page per loop at most. time.sleep(2 * mut._PROGRESS_INTERVAL) - return pandas.DataFrame({"testcol": page_items}) + return pyarrow.RecordBatch.from_arrays( + [pyarrow.array(page_items)], schema=arrow_schema + ) - mock_page.to_dataframe.side_effect = blocking_to_dataframe + mock_page.to_arrow.side_effect = blocking_to_arrow mock_pages = (mock_page, mock_page, mock_page, mock_page, mock_page) type(mock_rows).pages = mock.PropertyMock(return_value=mock_pages) @@ -2790,7 +2990,7 @@ def blocking_to_dataframe(*args, **kwargs): progress_updates = [ args[0] for args, kwargs in tqdm_mock().update.call_args_list ] - # Should have sent >1 update due to delay in blocking_to_dataframe. + # Should have sent >1 update due to delay in blocking_to_arrow. self.assertGreater(len(progress_updates), 1) self.assertEqual(sum(progress_updates), expected_total_rows) tqdm_mock().close.assert_called_once() @@ -2830,18 +3030,20 @@ def test_to_dataframe_w_bqstorage_exits_on_keyboardinterrupt(self): arrow_schema={"serialized_schema": arrow_schema.serialize().to_pybytes()}, ) bqstorage_client.create_read_session.return_value = session + page_items = [ + pyarrow.array([1, -1]), + pyarrow.array([2.0, 4.0]), + pyarrow.array(["abc", "def"]), + ] - def blocking_to_dataframe(*args, **kwargs): + def blocking_to_arrow(*args, **kwargs): # Sleep for longer than the waiting interval so that we know we're # only reading one page per loop at most. time.sleep(2 * mut._PROGRESS_INTERVAL) - return pandas.DataFrame( - {"colA": [1, -1], "colB": ["abc", "def"], "colC": [2.0, 4.0]}, - columns=["colA", "colB", "colC"], - ) + return pyarrow.RecordBatch.from_arrays(page_items, schema=arrow_schema) mock_page = mock.create_autospec(reader.ReadRowsPage) - mock_page.to_dataframe.side_effect = blocking_to_dataframe + mock_page.to_arrow.side_effect = blocking_to_arrow mock_rows = mock.create_autospec(reader.ReadRowsIterable) mock_pages = mock.PropertyMock(return_value=(mock_page, mock_page, mock_page)) type(mock_rows).pages = mock_pages