From 9553d8d9d7685d5c86de015f025e0f56dc48955a Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Fri, 21 Jun 2019 14:45:19 -0700 Subject: [PATCH 1/9] BQ Storage: Add basic arrow stream parser --- .../google/cloud/bigquery/_pandas_helpers.py | 2 + .../cloud/bigquery_storage_v1beta1/reader.py | 89 +++++++++++++++++-- bigquery_storage/noxfile.py | 2 +- bigquery_storage/setup.py | 1 + bigquery_storage/tests/unit/test_reader.py | 76 +++++++++++++--- 5 files changed, 149 insertions(+), 21 deletions(-) diff --git a/bigquery/google/cloud/bigquery/_pandas_helpers.py b/bigquery/google/cloud/bigquery/_pandas_helpers.py index 5261c2b99efd..21eabac3fa91 100644 --- a/bigquery/google/cloud/bigquery/_pandas_helpers.py +++ b/bigquery/google/cloud/bigquery/_pandas_helpers.py @@ -75,6 +75,8 @@ def pyarrow_timestamp(): if pyarrow: + # This dictionary is duplicated in bigquery_storage/test/unite/test_reader.py + # When modifying it be sure to update it there as well. BQ_TO_ARROW_SCALARS = { "BOOL": pyarrow.bool_, "BOOLEAN": pyarrow.bool_, diff --git a/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py b/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py index ac45d7022d5d..e66bd728868c 100644 --- a/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py +++ b/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py @@ -29,6 +29,11 @@ pandas = None import six +try: + import pyarrow +except ImportError: # pragma: NO COVER + pyarrow = None + from google.cloud.bigquery_storage_v1beta1 import types @@ -37,6 +42,9 @@ "fastavro is required to parse ReadRowResponse messages with Avro bytes." ) _PANDAS_REQUIRED = "pandas is required to create a DataFrame" +_PYARROW_REQUIRED = ( + "pyarrow is required to parse ReadRowResponse messages with Arrow bytes." +) class ReadRowsStream(object): @@ -152,9 +160,6 @@ def rows(self, read_session): Iterable[Mapping]: A sequence of rows, represented as dictionaries. """ - if fastavro is None: - raise ImportError(_FASTAVRO_REQUIRED) - return ReadRowsIterable(self, read_session) def to_dataframe(self, read_session, dtypes=None): @@ -186,8 +191,6 @@ def to_dataframe(self, read_session, dtypes=None): pandas.DataFrame: A data frame of all rows in the stream. """ - if fastavro is None: - raise ImportError(_FASTAVRO_REQUIRED) if pandas is None: raise ImportError(_PANDAS_REQUIRED) @@ -212,6 +215,7 @@ def __init__(self, reader, read_session): self._status = None self._reader = reader self._read_session = read_session + self._stream_parser = _StreamParser.from_read_session(self._read_session) @property def total_rows(self): @@ -231,10 +235,9 @@ def pages(self): """ # Each page is an iterator of rows. But also has num_items, remaining, # and to_dataframe. - stream_parser = _StreamParser(self._read_session) for message in self._reader: self._status = message.status - yield ReadRowsPage(stream_parser, message) + yield ReadRowsPage(self._stream_parser, message) def __iter__(self): """Iterator for each row in all pages.""" @@ -355,16 +358,39 @@ def to_dataframe(self, dtypes=None): class _StreamParser(object): + def to_dataframe(self, message, dtypes=None): + raise NotImplementedError("Not implemented.") + + def to_rows(self, message): + raise NotImplementedError("Not implemented.") + + @staticmethod + def from_read_session(read_session): + schema_type = read_session.WhichOneof("schema") + if schema_type == "avro_schema": + return _AvroStreamParser(read_session) + elif schema_type == "arrow_schema": + return _ArrowStreamParser(read_session) + else: + raise TypeError( + "Unsupported schema type in read_session: {0}".format(schema_type) + ) + + +class _AvroStreamParser(_StreamParser): """Helper to parse Avro messages into useful representations.""" def __init__(self, read_session): - """Construct a _StreamParser. + """Construct an _AvroStreamParser. Args: read_session (google.cloud.bigquery_storage_v1beta1.types.ReadSession): A read session. This is required because it contains the schema used in the stream messages. """ + if fastavro is None: + raise ImportError(_FASTAVRO_REQUIRED) + self._read_session = read_session self._avro_schema_json = None self._fastavro_schema = None @@ -447,6 +473,53 @@ def to_rows(self, message): break # Finished with message +class _ArrowStreamParser(_StreamParser): + def __init__(self, read_session): + if pyarrow is None: + raise ImportError(_PYARROW_REQUIRED) + + self._read_session = read_session + self._schema = None + + def to_rows(self, message): + record_batch = self._parse_arrow_message(message) + + # Iterate through each column simultaneously, and make a dict from the + # row values + for row in zip(*record_batch.columns): + yield dict(zip(self._column_names, row)) + + def to_dataframe(self, message, dtypes=None): + record_batch = self._parse_arrow_message(message) + + if dtypes is None: + dtypes = {} + + df = record_batch.to_pandas() + + for column in dtypes: + df[column] = pandas.Series(df[column], dtype=dtypes[column]) + + return df + + def _parse_arrow_message(self, message): + self._parse_arrow_schema() + + return pyarrow.read_record_batch( + pyarrow.py_buffer(message.arrow_record_batch.serialized_record_batch), + self._schema, + ) + + def _parse_arrow_schema(self): + if self._schema: + return + + self._schema = pyarrow.read_schema( + pyarrow.py_buffer(self._read_session.arrow_schema.serialized_schema) + ) + self._column_names = [field.name for field in self._schema] + + def _copy_stream_position(position): """Copy a StreamPosition. diff --git a/bigquery_storage/noxfile.py b/bigquery_storage/noxfile.py index 3840ad8d6638..32cce79a68df 100644 --- a/bigquery_storage/noxfile.py +++ b/bigquery_storage/noxfile.py @@ -37,7 +37,7 @@ def default(session): session.install('mock', 'pytest', 'pytest-cov') for local_dep in LOCAL_DEPS: session.install('-e', local_dep) - session.install('-e', '.[pandas,fastavro]') + session.install('-e', '.[pandas,fastavro,pyarrow]') # Run py.test against the unit tests. session.run( diff --git a/bigquery_storage/setup.py b/bigquery_storage/setup.py index 8471b55485d1..bfdd6d3cabbd 100644 --- a/bigquery_storage/setup.py +++ b/bigquery_storage/setup.py @@ -31,6 +31,7 @@ extras = { 'pandas': 'pandas>=0.17.1', 'fastavro': 'fastavro>=0.21.2', + 'pyarrow': 'pyarrow>=0.13.0', } package_root = os.path.abspath(os.path.dirname(__file__)) diff --git a/bigquery_storage/tests/unit/test_reader.py b/bigquery_storage/tests/unit/test_reader.py index a39309b55de5..3376caba7bcf 100644 --- a/bigquery_storage/tests/unit/test_reader.py +++ b/bigquery_storage/tests/unit/test_reader.py @@ -20,6 +20,7 @@ import json import fastavro +import pyarrow import mock import pandas import pandas.testing @@ -44,6 +45,20 @@ "time": {"type": "long", "logicalType": "time-micros"}, "timestamp": {"type": "long", "logicalType": "timestamp-micros"}, } +# This dictionary is duplicated in bigquery/google/cloud/bigquery/_pandas_helpers.py +# When modifying it be sure to update it there as well. +BQ_TO_ARROW_TYPES = { + "int64": pyarrow.int64(), + "float64": pyarrow.float64(), + "bool": pyarrow.bool_(), + "numeric": pyarrow.decimal128(38, 9), + "string": pyarrow.utf8(), + "bytes": pyarrow.binary(), + "date": pyarrow.date32(), # int32 days since epoch + "datetime": pyarrow.timestamp("us"), + "time": pyarrow.time64("us"), + "timestamp": pyarrow.timestamp("us", tz="UTC"), +} SCALAR_COLUMNS = [ {"name": "int_col", "type": "int64"}, {"name": "float_col", "type": "float64"}, @@ -143,11 +158,17 @@ def _avro_blocks_w_deadline(avro_blocks): raise google.api_core.exceptions.DeadlineExceeded("test: timeout, don't reconnect") -def _generate_read_session(avro_schema_json): +def _generate_avro_read_session(avro_schema_json): schema = json.dumps(avro_schema_json) return bigquery_storage_v1beta1.types.ReadSession(avro_schema={"schema": schema}) +def _generate_arrow_read_session(arrow_schema): + return bigquery_storage_v1beta1.types.ReadSession( + arrow_schema={"serialized_schema": arrow_schema.serialize().to_pybytes()} + ) + + def _bq_to_avro_schema(bq_columns): fields = [] avro_schema = {"type": "record", "name": "__root__", "fields": fields} @@ -166,6 +187,18 @@ def _bq_to_avro_schema(bq_columns): return avro_schema +def _bq_to_arrow_schema(bq_columns): + def bq_col_as_field(column): + doc = column.get("description") + name = column["name"] + type_ = BQ_TO_ARROW_TYPES[column["type"]] + mode = column.get("mode", "nullable").lower() + + return pyarrow.field(name, type_, mode == "nullable", {"description": doc}) + + return pyarrow.schema(bq_col_as_field(c) for c in bq_columns) + + def _get_avro_bytes(rows, avro_schema): avro_file = six.BytesIO() for row in rows: @@ -173,12 +206,31 @@ def _get_avro_bytes(rows, avro_schema): return avro_file.getvalue() -def test_rows_raises_import_error(mut, class_under_test, mock_client, monkeypatch): +def test_avro_rows_raises_import_error(mut, class_under_test, mock_client, monkeypatch): monkeypatch.setattr(mut, "fastavro", None) reader = class_under_test( [], mock_client, bigquery_storage_v1beta1.types.StreamPosition(), {} ) - read_session = bigquery_storage_v1beta1.types.ReadSession() + + bq_columns = [{"name": "int_col", "type": "int64"}] + avro_schema = _bq_to_avro_schema(bq_columns) + read_session = _generate_avro_read_session(avro_schema) + + with pytest.raises(ImportError): + reader.rows(read_session) + + +def test_pyarrow_rows_raises_import_error( + mut, class_under_test, mock_client, monkeypatch +): + monkeypatch.setattr(mut, "pyarrow", None) + reader = class_under_test( + [], mock_client, bigquery_storage_v1beta1.types.StreamPosition(), {} + ) + + bq_columns = [{"name": "int_col", "type": "int64"}] + arrow_schema = _bq_to_arrow_schema(bq_columns) + read_session = _generate_arrow_read_session(arrow_schema) with pytest.raises(ImportError): reader.rows(read_session) @@ -187,7 +239,7 @@ def test_rows_raises_import_error(mut, class_under_test, mock_client, monkeypatc def test_rows_w_empty_stream(class_under_test, mock_client): bq_columns = [{"name": "int_col", "type": "int64"}] avro_schema = _bq_to_avro_schema(bq_columns) - read_session = _generate_read_session(avro_schema) + read_session = _generate_avro_read_session(avro_schema) reader = class_under_test( [], mock_client, bigquery_storage_v1beta1.types.StreamPosition(), {} ) @@ -199,7 +251,7 @@ def test_rows_w_empty_stream(class_under_test, mock_client): def test_rows_w_scalars(class_under_test, mock_client): avro_schema = _bq_to_avro_schema(SCALAR_COLUMNS) - read_session = _generate_read_session(avro_schema) + read_session = _generate_avro_read_session(avro_schema) avro_blocks = _bq_to_avro_blocks(SCALAR_BLOCKS, avro_schema) reader = class_under_test( @@ -214,7 +266,7 @@ def test_rows_w_scalars(class_under_test, mock_client): def test_rows_w_timeout(class_under_test, mock_client): bq_columns = [{"name": "int_col", "type": "int64"}] avro_schema = _bq_to_avro_schema(bq_columns) - read_session = _generate_read_session(avro_schema) + read_session = _generate_avro_read_session(avro_schema) bq_blocks_1 = [ [{"int_col": 123}, {"int_col": 234}], [{"int_col": 345}, {"int_col": 456}], @@ -248,7 +300,7 @@ def test_rows_w_timeout(class_under_test, mock_client): def test_rows_w_reconnect(class_under_test, mock_client): bq_columns = [{"name": "int_col", "type": "int64"}] avro_schema = _bq_to_avro_schema(bq_columns) - read_session = _generate_read_session(avro_schema) + read_session = _generate_avro_read_session(avro_schema) bq_blocks_1 = [ [{"int_col": 123}, {"int_col": 234}], [{"int_col": 345}, {"int_col": 456}], @@ -295,7 +347,7 @@ def test_rows_w_reconnect(class_under_test, mock_client): def test_rows_w_reconnect_by_page(class_under_test, mock_client): bq_columns = [{"name": "int_col", "type": "int64"}] avro_schema = _bq_to_avro_schema(bq_columns) - read_session = _generate_read_session(avro_schema) + read_session = _generate_avro_read_session(avro_schema) bq_blocks_1 = [ [{"int_col": 123}, {"int_col": 234}], [{"int_col": 345}, {"int_col": 456}], @@ -358,7 +410,7 @@ def test_to_dataframe_no_pandas_raises_import_error( ): monkeypatch.setattr(mut, "pandas", None) avro_schema = _bq_to_avro_schema(SCALAR_COLUMNS) - read_session = _generate_read_session(avro_schema) + read_session = _generate_avro_read_session(avro_schema) avro_blocks = _bq_to_avro_blocks(SCALAR_BLOCKS, avro_schema) reader = class_under_test( @@ -390,7 +442,7 @@ def test_to_dataframe_no_fastavro_raises_import_error( def test_to_dataframe_w_scalars(class_under_test): avro_schema = _bq_to_avro_schema(SCALAR_COLUMNS) - read_session = _generate_read_session(avro_schema) + read_session = _generate_avro_read_session(avro_schema) avro_blocks = _bq_to_avro_blocks(SCALAR_BLOCKS, avro_schema) reader = class_under_test( @@ -427,7 +479,7 @@ def test_to_dataframe_w_dtypes(class_under_test): {"name": "lilfloat", "type": "float64"}, ] ) - read_session = _generate_read_session(avro_schema) + read_session = _generate_avro_read_session(avro_schema) blocks = [ [{"bigfloat": 1.25, "lilfloat": 30.5}, {"bigfloat": 2.5, "lilfloat": 21.125}], [{"bigfloat": 3.75, "lilfloat": 11.0}], @@ -458,7 +510,7 @@ def test_to_dataframe_by_page(class_under_test, mock_client): {"name": "bool_col", "type": "bool"}, ] avro_schema = _bq_to_avro_schema(bq_columns) - read_session = _generate_read_session(avro_schema) + read_session = _generate_avro_read_session(avro_schema) block_1 = [{"int_col": 123, "bool_col": True}, {"int_col": 234, "bool_col": False}] block_2 = [{"int_col": 345, "bool_col": True}, {"int_col": 456, "bool_col": False}] block_3 = [{"int_col": 567, "bool_col": True}, {"int_col": 789, "bool_col": False}] From 48735626a2ba9534d2672c7bec108ab1bf2371b3 Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Thu, 27 Jun 2019 15:36:51 -0700 Subject: [PATCH 2/9] BQ Storage: Add tests for to_dataframe with arrow data --- bigquery_storage/tests/unit/test_reader.py | 118 ++++++++++++++++++++- 1 file changed, 115 insertions(+), 3 deletions(-) diff --git a/bigquery_storage/tests/unit/test_reader.py b/bigquery_storage/tests/unit/test_reader.py index 3376caba7bcf..94f33019433a 100644 --- a/bigquery_storage/tests/unit/test_reader.py +++ b/bigquery_storage/tests/unit/test_reader.py @@ -146,6 +146,28 @@ def _bq_to_avro_blocks(bq_blocks, avro_schema_json): return avro_blocks +def _bq_to_arrow_batches(bq_blocks, arrow_schema): + arrow_batches = [] + for block in bq_blocks: + arrays = [] + for name in arrow_schema.names: + arrays.append( + pyarrow.array( + (row[name] for row in block), + type=arrow_schema.field_by_name(name).type, + size=len(block), + ) + ) + record_batch = pyarrow.RecordBatch.from_arrays(arrays, arrow_schema) + + response = bigquery_storage_v1beta1.types.ReadRowsResponse() + response.arrow_record_batch.serialized_record_batch = ( + record_batch.serialize().to_pybytes() + ) + arrow_batches.append(response) + return arrow_batches + + def _avro_blocks_w_unavailable(avro_blocks): for block in avro_blocks: yield block @@ -236,6 +258,18 @@ def test_pyarrow_rows_raises_import_error( reader.rows(read_session) +def test_rows_no_schema_set_raises_type_error( + mut, class_under_test, mock_client, monkeypatch +): + reader = class_under_test( + [], mock_client, bigquery_storage_v1beta1.types.StreamPosition(), {} + ) + read_session = bigquery_storage_v1beta1.types.ReadSession() + + with pytest.raises(TypeError): + reader.rows(read_session) + + def test_rows_w_empty_stream(class_under_test, mock_client): bq_columns = [{"name": "int_col", "type": "int64"}] avro_schema = _bq_to_avro_schema(bq_columns) @@ -249,6 +283,19 @@ def test_rows_w_empty_stream(class_under_test, mock_client): assert tuple(got) == () +def test_rows_w_empty_stream_arrow(class_under_test, mock_client): + bq_columns = [{"name": "int_col", "type": "int64"}] + arrow_schema = _bq_to_arrow_schema(bq_columns) + read_session = _generate_arrow_read_session(arrow_schema) + reader = class_under_test( + [], mock_client, bigquery_storage_v1beta1.types.StreamPosition(), {} + ) + + got = reader.rows(read_session) + assert got.total_rows is None + assert tuple(got) == () + + def test_rows_w_scalars(class_under_test, mock_client): avro_schema = _bq_to_avro_schema(SCALAR_COLUMNS) read_session = _generate_avro_read_session(avro_schema) @@ -263,6 +310,20 @@ def test_rows_w_scalars(class_under_test, mock_client): assert got == expected +def test_rows_w_scalars_arrow(class_under_test, mock_client): + arrow_schema = _bq_to_arrow_schema(SCALAR_COLUMNS) + read_session = _generate_arrow_read_session(arrow_schema) + arrow_batches = _bq_to_arrow_batches(SCALAR_BLOCKS, arrow_schema) + + reader = class_under_test( + arrow_batches, mock_client, bigquery_storage_v1beta1.types.StreamPosition(), {} + ) + got = tuple(reader.rows(read_session)) + + expected = tuple(itertools.chain.from_iterable(SCALAR_BLOCKS)) + assert got == expected + + def test_rows_w_timeout(class_under_test, mock_client): bq_columns = [{"name": "int_col", "type": "int64"}] avro_schema = _bq_to_avro_schema(bq_columns) @@ -427,16 +488,15 @@ def test_to_dataframe_no_pandas_raises_import_error( next(reader.rows(read_session).pages).to_dataframe() -def test_to_dataframe_no_fastavro_raises_import_error( +def test_to_dataframe_no_schema_set_raises_type_error( mut, class_under_test, mock_client, monkeypatch ): - monkeypatch.setattr(mut, "fastavro", None) reader = class_under_test( [], mock_client, bigquery_storage_v1beta1.types.StreamPosition(), {} ) read_session = bigquery_storage_v1beta1.types.ReadSession() - with pytest.raises(ImportError): + with pytest.raises(TypeError): reader.to_dataframe(read_session) @@ -472,6 +532,26 @@ def test_to_dataframe_w_scalars(class_under_test): ) +def test_to_dataframe_w_scalars_arrow(class_under_test): + arrow_schema = _bq_to_arrow_schema(SCALAR_COLUMNS) + read_session = _generate_arrow_read_session(arrow_schema) + arrow_batches = _bq_to_arrow_batches(SCALAR_BLOCKS, arrow_schema) + + reader = class_under_test( + arrow_batches, mock_client, bigquery_storage_v1beta1.types.StreamPosition(), {} + ) + got = reader.to_dataframe(read_session) + + expected = pandas.DataFrame( + list(itertools.chain.from_iterable(SCALAR_BLOCKS)), columns=SCALAR_COLUMN_NAMES + ) + + pandas.testing.assert_frame_equal( + got.reset_index(drop=True), # reset_index to ignore row labels + expected.reset_index(drop=True), + ) + + def test_to_dataframe_w_dtypes(class_under_test): avro_schema = _bq_to_avro_schema( [ @@ -504,6 +584,38 @@ def test_to_dataframe_w_dtypes(class_under_test): ) +def test_to_dataframe_w_dtypes_arrow(class_under_test): + arrow_schema = _bq_to_arrow_schema( + [ + {"name": "bigfloat", "type": "float64"}, + {"name": "lilfloat", "type": "float64"}, + ] + ) + read_session = _generate_arrow_read_session(arrow_schema) + blocks = [ + [{"bigfloat": 1.25, "lilfloat": 30.5}, {"bigfloat": 2.5, "lilfloat": 21.125}], + [{"bigfloat": 3.75, "lilfloat": 11.0}], + ] + arrow_batches = _bq_to_arrow_batches(blocks, arrow_schema) + + reader = class_under_test( + arrow_batches, mock_client, bigquery_storage_v1beta1.types.StreamPosition(), {} + ) + got = reader.to_dataframe(read_session, dtypes={"lilfloat": "float16"}) + + expected = pandas.DataFrame( + { + "bigfloat": [1.25, 2.5, 3.75], + "lilfloat": pandas.Series([30.5, 21.125, 11.0], dtype="float16"), + }, + columns=["bigfloat", "lilfloat"], + ) + pandas.testing.assert_frame_equal( + got.reset_index(drop=True), # reset_index to ignore row labels + expected.reset_index(drop=True), + ) + + def test_to_dataframe_by_page(class_under_test, mock_client): bq_columns = [ {"name": "int_col", "type": "int64"}, From 45bd1e90ef4708c187660d3f669014903c00071c Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Mon, 1 Jul 2019 10:32:29 -0700 Subject: [PATCH 3/9] Use Arrow format in client.list_rows(..).to_dataframe(..) with BQ Storage client --- bigquery/google/cloud/bigquery/_pandas_helpers.py | 1 + bigquery/google/cloud/bigquery/table.py | 2 +- bigquery/setup.py | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/bigquery/google/cloud/bigquery/_pandas_helpers.py b/bigquery/google/cloud/bigquery/_pandas_helpers.py index 21eabac3fa91..0f021dc84d05 100644 --- a/bigquery/google/cloud/bigquery/_pandas_helpers.py +++ b/bigquery/google/cloud/bigquery/_pandas_helpers.py @@ -293,6 +293,7 @@ def download_dataframe_bqstorage( session = bqstorage_client.create_read_session( table.to_bqstorage(), "projects/{}".format(project_id), + format_=bigquery_storage_v1beta1.proto.storage_pb2.DataFormat.ARROW, read_options=read_options, requested_streams=requested_streams, ) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 7af3bc6f48b4..c25af2079519 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -1444,7 +1444,7 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non supplied, use the faster BigQuery Storage API to fetch rows from BigQuery. This API is a billable API. - This method requires the ``fastavro`` and + This method requires the ``pyarrow`` and ``google-cloud-bigquery-storage`` libraries. Reading from a specific partition or snapshot is not diff --git a/bigquery/setup.py b/bigquery/setup.py index 8592a232ecb3..62af0c3f05f6 100644 --- a/bigquery/setup.py +++ b/bigquery/setup.py @@ -37,7 +37,7 @@ extras = { "bqstorage": [ "google-cloud-bigquery-storage >= 0.4.0, <2.0.0dev", - "fastavro>=0.21.2", + "pyarrow >= 0.4.1", ], "pandas": ["pandas>=0.17.1"], # Exclude PyArrow dependency from Windows Python 2.7. From 06f990e4f3ce25550e21fd36d90768ed3db8f970 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 10 Jul 2019 11:42:06 -0500 Subject: [PATCH 4/9] Add system test for arrow wire format. --- bigquery_storage/tests/system/test_system.py | 36 +++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/bigquery_storage/tests/system/test_system.py b/bigquery_storage/tests/system/test_system.py index 3e86a7fc2263..6a86cffa016f 100644 --- a/bigquery_storage/tests/system/test_system.py +++ b/bigquery_storage/tests/system/test_system.py @@ -67,7 +67,7 @@ def test_read_rows_full_table(client, project_id, small_table_reference): assert len(block.avro_rows.serialized_binary_rows) > 0 -def test_read_rows_to_dataframe(client, project_id): +def test_read_rows_to_dataframe_w_avro(client, project_id): table_ref = bigquery_storage_v1beta1.types.TableReference() table_ref.project_id = "bigquery-public-data" table_ref.dataset_id = "new_york_citibike" @@ -75,6 +75,40 @@ def test_read_rows_to_dataframe(client, project_id): session = client.create_read_session( table_ref, "projects/{}".format(project_id), requested_streams=1 ) + schema_type = session.WhichOneof("schema") + assert schema_type == "avro_schema" + + stream_pos = bigquery_storage_v1beta1.types.StreamPosition( + stream=session.streams[0] + ) + + frame = client.read_rows(stream_pos).to_dataframe( + session, dtypes={"latitude": numpy.float16} + ) + + # Station ID is a required field (no nulls), so the datatype should always + # be integer. + assert frame.station_id.dtype.name == "int64" + assert frame.latitude.dtype.name == "float16" + assert frame.longitude.dtype.name == "float64" + assert frame["name"].str.startswith("Central Park").any() + + +def test_read_rows_to_dataframe_w_arrow(client, project_id): + table_ref = bigquery_storage_v1beta1.types.TableReference() + table_ref.project_id = "bigquery-public-data" + table_ref.dataset_id = "new_york_citibike" + table_ref.table_id = "citibike_stations" + + session = client.create_read_session( + table_ref, + "projects/{}".format(project_id), + format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW, + requested_streams=1 + ) + schema_type = session.WhichOneof("schema") + assert schema_type == "arrow_schema" + stream_pos = bigquery_storage_v1beta1.types.StreamPosition( stream=session.streams[0] ) From 5e7a403313d434e481c2ecd269a546471550e617 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 10 Jul 2019 14:31:27 -0500 Subject: [PATCH 5/9] Add pyarrow to system tests deps. --- bigquery_storage/noxfile.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigquery_storage/noxfile.py b/bigquery_storage/noxfile.py index 32cce79a68df..bb1be8dec998 100644 --- a/bigquery_storage/noxfile.py +++ b/bigquery_storage/noxfile.py @@ -121,7 +121,7 @@ def system(session): session.install('-e', os.path.join('..', 'test_utils')) for local_dep in LOCAL_DEPS: session.install('-e', local_dep) - session.install('-e', '.[pandas,fastavro]') + session.install('-e', '.[fastavro,pandas,pyarrow]') # Run py.test against the system tests. session.run('py.test', '--quiet', 'tests/system/') From 5155ac91f657b31ff0d8c21ba9ed2ec71d013ff4 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 10 Jul 2019 16:11:53 -0500 Subject: [PATCH 6/9] Add to_arrow with BQ Storage API. --- .../google/cloud/bigquery/_pandas_helpers.py | 76 +++++++++++++----- bigquery/google/cloud/bigquery/table.py | 46 ++++++++++- bigquery/tests/unit/test_job.py | 2 + .../cloud/bigquery_storage_v1beta1/reader.py | 80 ++++++++++++++++--- bigquery_storage/tests/system/test_system.py | 2 +- bigquery_storage/tests/unit/test_reader.py | 2 +- 6 files changed, 171 insertions(+), 37 deletions(-) diff --git a/bigquery/google/cloud/bigquery/_pandas_helpers.py b/bigquery/google/cloud/bigquery/_pandas_helpers.py index 39f709765397..d77aa67d5cf5 100644 --- a/bigquery/google/cloud/bigquery/_pandas_helpers.py +++ b/bigquery/google/cloud/bigquery/_pandas_helpers.py @@ -15,6 +15,7 @@ """Shared helper functions for connecting BigQuery and pandas.""" import concurrent.futures +import functools import warnings from six.moves import queue @@ -271,14 +272,18 @@ def download_dataframe_tabledata_list(pages, schema, dtypes): yield _tabledata_list_page_to_dataframe(page, column_names, dtypes) -def _download_dataframe_bqstorage_stream( - download_state, - bqstorage_client, - column_names, - dtypes, - session, - stream, - worker_queue, +def _bqstorage_page_to_arrow(page): + return page.to_arrow() + + +def _bqstorage_page_to_dataframe(column_names, dtypes, page): + # page.to_dataframe() does not preserve column order in some versions + # of google-cloud-bigquery-storage. Access by column name to rearrange. + return page.to_dataframe(dtypes=dtypes)[column_names] + + +def _download_table_bqstorage_stream( + download_state, bqstorage_client, session, stream, worker_queue, page_to_item ): position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream) rowstream = bqstorage_client.read_rows(position).rows(session) @@ -286,10 +291,8 @@ def _download_dataframe_bqstorage_stream( for page in rowstream.pages: if download_state.done: return - # page.to_dataframe() does not preserve column order in some versions - # of google-cloud-bigquery-storage. Access by column name to rearrange. - frame = page.to_dataframe(dtypes=dtypes)[column_names] - worker_queue.put(frame) + item = page_to_item(page) + worker_queue.put(item) def _nowait(futures): @@ -306,14 +309,13 @@ def _nowait(futures): return done, not_done -def download_dataframe_bqstorage( +def _download_table_bqstorage( project_id, table, bqstorage_client, - column_names, - dtypes, preserve_order=False, selected_fields=None, + page_to_item=None, ): """Use (faster, but billable) BQ Storage API to construct DataFrame.""" if "$" in table.table_id: @@ -335,15 +337,13 @@ def download_dataframe_bqstorage( session = bqstorage_client.create_read_session( table.to_bqstorage(), "projects/{}".format(project_id), - format_=bigquery_storage_v1beta1.proto.storage_pb2.DataFormat.ARROW, + format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW, read_options=read_options, requested_streams=requested_streams, ) - # Avoid reading rows from an empty table. pandas.concat will fail on an - # empty list. + # Avoid reading rows from an empty table. if not session.streams: - yield pandas.DataFrame(columns=column_names) return total_streams = len(session.streams) @@ -363,14 +363,13 @@ def download_dataframe_bqstorage( # See: https://github.com/googleapis/google-cloud-python/pull/7698 not_done = [ pool.submit( - _download_dataframe_bqstorage_stream, + _download_table_bqstorage_stream, download_state, bqstorage_client, - column_names, - dtypes, session, stream, worker_queue, + page_to_item, ) for stream in session.streams ] @@ -413,3 +412,36 @@ def download_dataframe_bqstorage( # Shutdown all background threads, now that they should know to # exit early. pool.shutdown(wait=True) + + +def download_arrow_bqstorage( + project_id, table, bqstorage_client, preserve_order=False, selected_fields=None +): + return _download_table_bqstorage( + project_id, + table, + bqstorage_client, + preserve_order=preserve_order, + selected_fields=selected_fields, + page_to_item=_bqstorage_page_to_arrow, + ) + + +def download_dataframe_bqstorage( + project_id, + table, + bqstorage_client, + column_names, + dtypes, + preserve_order=False, + selected_fields=None, +): + page_to_item = functools.partial(_bqstorage_page_to_dataframe, column_names, dtypes) + return _download_table_bqstorage( + project_id, + table, + bqstorage_client, + preserve_order=preserve_order, + selected_fields=selected_fields, + page_to_item=page_to_item, + ) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index c670af164ce8..4999de3b0d8d 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -1403,14 +1403,42 @@ def _get_progress_bar(self, progress_bar_type): warnings.warn(_NO_TQDM_ERROR, UserWarning, stacklevel=3) return None - def _to_arrow_iterable(self): + def _to_arrow_iterable(self, bqstorage_client=None): """Create an iterable of arrow RecordBatches, to process the table as a stream.""" + if bqstorage_client is not None: + column_names = [field.name for field in self._schema] + try: + # Iterate over the stream so that read errors are raised (and + # the method can then fallback to tabledata.list). + for record_batch in _pandas_helpers.download_arrow_bqstorage( + self._project, + self._table, + bqstorage_client, + column_names, + preserve_order=self._preserve_order, + selected_fields=self._selected_fields, + ): + yield record_batch + return + except google.api_core.exceptions.Forbidden: + # Don't hide errors such as insufficient permissions to create + # a read session, or the API is not enabled. Both of those are + # clearly problems if the developer has explicitly asked for + # BigQuery Storage API support. + raise + except google.api_core.exceptions.GoogleAPICallError: + # There is a known issue with reading from small anonymous + # query results tables, so some errors are expected. Rather + # than throw those errors, try reading the DataFrame again, but + # with the tabledata.list API. + pass + for record_batch in _pandas_helpers.download_arrow_tabledata_list( iter(self.pages), self.schema ): yield record_batch - def to_arrow(self, progress_bar_type=None): + def to_arrow(self, progress_bar_type=None, bqstorage_client=None): """[Beta] Create a class:`pyarrow.Table` by loading all pages of a table or query. @@ -1433,6 +1461,18 @@ def to_arrow(self, progress_bar_type=None): ``'tqdm_gui'`` Use the :func:`tqdm.tqdm_gui` function to display a progress bar as a graphical dialog box. + bqstorage_client ( \ + google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient \ + ): + **Beta Feature** Optional. A BigQuery Storage API client. If + supplied, use the faster BigQuery Storage API to fetch rows + from BigQuery. This API is a billable API. + + This method requires the ``pyarrow`` and + ``google-cloud-bigquery-storage`` libraries. + + Reading from a specific partition or snapshot is not + currently supported by this method. Returns: pyarrow.Table @@ -1452,7 +1492,7 @@ def to_arrow(self, progress_bar_type=None): progress_bar = self._get_progress_bar(progress_bar_type) record_batches = [] - for record_batch in self._to_arrow_iterable(): + for record_batch in self._to_arrow_iterable(bqstorage_client=bqstorage_client): record_batches.append(record_batch) if progress_bar is not None: diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index 22809c245d4b..dcc90b2d96a8 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -4897,6 +4897,7 @@ def test_to_dataframe_bqstorage(self): bqstorage_client.create_read_session.assert_called_once_with( mock.ANY, "projects/{}".format(self.PROJECT), + format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW, read_options=mock.ANY, # Use default number of streams for best performance. requested_streams=0, @@ -5340,6 +5341,7 @@ def test_to_dataframe_bqstorage_preserve_order(query): bqstorage_client.create_read_session.assert_called_once_with( mock.ANY, "projects/test-project", + format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW, read_options=mock.ANY, # Use a single stream to preserve row order. requested_streams=1, diff --git a/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py b/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py index e66bd728868c..8c73e4443a5a 100644 --- a/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py +++ b/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py @@ -27,6 +27,10 @@ import pandas except ImportError: # pragma: NO COVER pandas = None +try: + import pyarrow +except ImportError: # pragma: NO COVER + pyarrow = None import six try: @@ -38,13 +42,12 @@ _STREAM_RESUMPTION_EXCEPTIONS = (google.api_core.exceptions.ServiceUnavailable,) -_FASTAVRO_REQUIRED = ( - "fastavro is required to parse ReadRowResponse messages with Avro bytes." -) + +_AVRO_BYTES_OPERATION = "parse ReadRowResponse messages with Avro bytes" +_ARROW_BYTES_OPERATION = "parse ReadRowResponse messages with Arrow bytes" +_FASTAVRO_REQUIRED = "fastavro is required to {operation}." _PANDAS_REQUIRED = "pandas is required to create a DataFrame" -_PYARROW_REQUIRED = ( - "pyarrow is required to parse ReadRowResponse messages with Arrow bytes." -) +_PYARROW_REQUIRED = "pyarrow is required to {operation}." class ReadRowsStream(object): @@ -121,7 +124,7 @@ def __iter__(self): while True: try: for message in self._wrapped: - rowcount = message.avro_rows.row_count + rowcount = message.row_count self._position.offset += rowcount yield message @@ -162,6 +165,26 @@ def rows(self, read_session): """ return ReadRowsIterable(self, read_session) + def to_arrow(self, read_session): + """Create a :class:`pyarrow.Table` of all rows in the stream. + + This method requires the pyarrow library and a stream using the Arrow + format. + + Args: + read_session ( \ + ~google.cloud.bigquery_storage_v1beta1.types.ReadSession \ + ): + The read session associated with this read rows stream. This + contains the schema, which is required to parse the data + messages. + + Returns: + pyarrow.Table: + A table of all rows in the stream. + """ + return self.rows(read_session).to_arrow() + def to_dataframe(self, read_session, dtypes=None): """Create a :class:`pandas.DataFrame` of all rows in the stream. @@ -245,6 +268,21 @@ def __iter__(self): for row in page: yield row + def to_arrow(self): + """Create a :class:`pyarrow.Table` of all rows in the stream. + + This method requires the pyarrow library and a stream using the Arrow + format. + + Returns: + pyarrow.Table: + A table of all rows in the stream. + """ + record_batches = [] + for page in self.pages: + record_batches.append(page.to_arrow()) + return pyarrow.Table.from_batches(record_batches) + def to_dataframe(self, dtypes=None): """Create a :class:`pandas.DataFrame` of all rows in the stream. @@ -294,8 +332,8 @@ def __init__(self, stream_parser, message): self._stream_parser = stream_parser self._message = message self._iter_rows = None - self._num_items = self._message.avro_rows.row_count - self._remaining = self._message.avro_rows.row_count + self._num_items = self._message.row_count + self._remaining = self._message.row_count def _parse_rows(self): """Parse rows from the message only once.""" @@ -358,6 +396,9 @@ def to_dataframe(self, dtypes=None): class _StreamParser(object): + def to_arrow(self, message): + raise NotImplementedError("Not implemented.") + def to_dataframe(self, message, dtypes=None): raise NotImplementedError("Not implemented.") @@ -396,6 +437,20 @@ def __init__(self, read_session): self._fastavro_schema = None self._column_names = None + def to_arrow(self, message): + """Create an :class:`pyarrow.RecordBatch` of rows in the page. + + Args: + message (google.cloud.bigquery_storage_v1beta1.types.ReadRowsResponse): + Protocol buffer from the read rows stream, to convert into an + Arrow record batch. + + Returns: + pyarrow.RecordBatch: + Rows from the message, as an Arrow record batch. + """ + raise NotImplementedError("to_arrow not implemented for Avro streams.") + def to_dataframe(self, message, dtypes=None): """Create a :class:`pandas.DataFrame` of rows in the page. @@ -476,11 +531,16 @@ def to_rows(self, message): class _ArrowStreamParser(_StreamParser): def __init__(self, read_session): if pyarrow is None: - raise ImportError(_PYARROW_REQUIRED) + raise ImportError( + _PYARROW_REQUIRED.format(operation=_ARROW_BYTES_OPERATION) + ) self._read_session = read_session self._schema = None + def to_arrow(self, message): + return self._parse_arrow_message(message) + def to_rows(self, message): record_batch = self._parse_arrow_message(message) diff --git a/bigquery_storage/tests/system/test_system.py b/bigquery_storage/tests/system/test_system.py index 6a86cffa016f..ee4b06cb43ce 100644 --- a/bigquery_storage/tests/system/test_system.py +++ b/bigquery_storage/tests/system/test_system.py @@ -104,7 +104,7 @@ def test_read_rows_to_dataframe_w_arrow(client, project_id): table_ref, "projects/{}".format(project_id), format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW, - requested_streams=1 + requested_streams=1, ) schema_type = session.WhichOneof("schema") assert schema_type == "arrow_schema" diff --git a/bigquery_storage/tests/unit/test_reader.py b/bigquery_storage/tests/unit/test_reader.py index 94f33019433a..03e8779a8b8f 100644 --- a/bigquery_storage/tests/unit/test_reader.py +++ b/bigquery_storage/tests/unit/test_reader.py @@ -140,7 +140,7 @@ def _bq_to_avro_blocks(bq_blocks, avro_schema_json): fastavro.schemaless_writer(blockio, avro_schema, row) response = bigquery_storage_v1beta1.types.ReadRowsResponse() - response.avro_rows.row_count = len(block) + response.row_count = len(block) response.avro_rows.serialized_binary_rows = blockio.getvalue() avro_blocks.append(response) return avro_blocks From 5a5edd51740f621412180b2cbb726fea2747f8ef Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 11 Jul 2019 08:44:06 -0500 Subject: [PATCH 7/9] Revert changes to bigquery so that bigquery_storage can be released separately. --- .../google/cloud/bigquery/_pandas_helpers.py | 77 +++++-------------- bigquery/google/cloud/bigquery/table.py | 48 +----------- bigquery/setup.py | 2 +- bigquery/tests/unit/test_job.py | 2 - 4 files changed, 26 insertions(+), 103 deletions(-) diff --git a/bigquery/google/cloud/bigquery/_pandas_helpers.py b/bigquery/google/cloud/bigquery/_pandas_helpers.py index d77aa67d5cf5..5a3a9833b572 100644 --- a/bigquery/google/cloud/bigquery/_pandas_helpers.py +++ b/bigquery/google/cloud/bigquery/_pandas_helpers.py @@ -15,7 +15,6 @@ """Shared helper functions for connecting BigQuery and pandas.""" import concurrent.futures -import functools import warnings from six.moves import queue @@ -75,8 +74,6 @@ def pyarrow_timestamp(): if pyarrow: - # This dictionary is duplicated in bigquery_storage/test/unite/test_reader.py - # When modifying it be sure to update it there as well. BQ_TO_ARROW_SCALARS = { "BOOL": pyarrow.bool_, "BOOLEAN": pyarrow.bool_, @@ -272,18 +269,14 @@ def download_dataframe_tabledata_list(pages, schema, dtypes): yield _tabledata_list_page_to_dataframe(page, column_names, dtypes) -def _bqstorage_page_to_arrow(page): - return page.to_arrow() - - -def _bqstorage_page_to_dataframe(column_names, dtypes, page): - # page.to_dataframe() does not preserve column order in some versions - # of google-cloud-bigquery-storage. Access by column name to rearrange. - return page.to_dataframe(dtypes=dtypes)[column_names] - - -def _download_table_bqstorage_stream( - download_state, bqstorage_client, session, stream, worker_queue, page_to_item +def _download_dataframe_bqstorage_stream( + download_state, + bqstorage_client, + column_names, + dtypes, + session, + stream, + worker_queue, ): position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream) rowstream = bqstorage_client.read_rows(position).rows(session) @@ -291,8 +284,10 @@ def _download_table_bqstorage_stream( for page in rowstream.pages: if download_state.done: return - item = page_to_item(page) - worker_queue.put(item) + # page.to_dataframe() does not preserve column order in some versions + # of google-cloud-bigquery-storage. Access by column name to rearrange. + frame = page.to_dataframe(dtypes=dtypes)[column_names] + worker_queue.put(frame) def _nowait(futures): @@ -309,13 +304,14 @@ def _nowait(futures): return done, not_done -def _download_table_bqstorage( +def download_dataframe_bqstorage( project_id, table, bqstorage_client, + column_names, + dtypes, preserve_order=False, selected_fields=None, - page_to_item=None, ): """Use (faster, but billable) BQ Storage API to construct DataFrame.""" if "$" in table.table_id: @@ -337,13 +333,14 @@ def _download_table_bqstorage( session = bqstorage_client.create_read_session( table.to_bqstorage(), "projects/{}".format(project_id), - format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW, read_options=read_options, requested_streams=requested_streams, ) - # Avoid reading rows from an empty table. + # Avoid reading rows from an empty table. pandas.concat will fail on an + # empty list. if not session.streams: + yield pandas.DataFrame(columns=column_names) return total_streams = len(session.streams) @@ -363,13 +360,14 @@ def _download_table_bqstorage( # See: https://github.com/googleapis/google-cloud-python/pull/7698 not_done = [ pool.submit( - _download_table_bqstorage_stream, + _download_dataframe_bqstorage_stream, download_state, bqstorage_client, + column_names, + dtypes, session, stream, worker_queue, - page_to_item, ) for stream in session.streams ] @@ -412,36 +410,3 @@ def _download_table_bqstorage( # Shutdown all background threads, now that they should know to # exit early. pool.shutdown(wait=True) - - -def download_arrow_bqstorage( - project_id, table, bqstorage_client, preserve_order=False, selected_fields=None -): - return _download_table_bqstorage( - project_id, - table, - bqstorage_client, - preserve_order=preserve_order, - selected_fields=selected_fields, - page_to_item=_bqstorage_page_to_arrow, - ) - - -def download_dataframe_bqstorage( - project_id, - table, - bqstorage_client, - column_names, - dtypes, - preserve_order=False, - selected_fields=None, -): - page_to_item = functools.partial(_bqstorage_page_to_dataframe, column_names, dtypes) - return _download_table_bqstorage( - project_id, - table, - bqstorage_client, - preserve_order=preserve_order, - selected_fields=selected_fields, - page_to_item=page_to_item, - ) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 4999de3b0d8d..8aa7788acdfa 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -1403,42 +1403,14 @@ def _get_progress_bar(self, progress_bar_type): warnings.warn(_NO_TQDM_ERROR, UserWarning, stacklevel=3) return None - def _to_arrow_iterable(self, bqstorage_client=None): + def _to_arrow_iterable(self): """Create an iterable of arrow RecordBatches, to process the table as a stream.""" - if bqstorage_client is not None: - column_names = [field.name for field in self._schema] - try: - # Iterate over the stream so that read errors are raised (and - # the method can then fallback to tabledata.list). - for record_batch in _pandas_helpers.download_arrow_bqstorage( - self._project, - self._table, - bqstorage_client, - column_names, - preserve_order=self._preserve_order, - selected_fields=self._selected_fields, - ): - yield record_batch - return - except google.api_core.exceptions.Forbidden: - # Don't hide errors such as insufficient permissions to create - # a read session, or the API is not enabled. Both of those are - # clearly problems if the developer has explicitly asked for - # BigQuery Storage API support. - raise - except google.api_core.exceptions.GoogleAPICallError: - # There is a known issue with reading from small anonymous - # query results tables, so some errors are expected. Rather - # than throw those errors, try reading the DataFrame again, but - # with the tabledata.list API. - pass - for record_batch in _pandas_helpers.download_arrow_tabledata_list( iter(self.pages), self.schema ): yield record_batch - def to_arrow(self, progress_bar_type=None, bqstorage_client=None): + def to_arrow(self, progress_bar_type=None): """[Beta] Create a class:`pyarrow.Table` by loading all pages of a table or query. @@ -1461,18 +1433,6 @@ def to_arrow(self, progress_bar_type=None, bqstorage_client=None): ``'tqdm_gui'`` Use the :func:`tqdm.tqdm_gui` function to display a progress bar as a graphical dialog box. - bqstorage_client ( \ - google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient \ - ): - **Beta Feature** Optional. A BigQuery Storage API client. If - supplied, use the faster BigQuery Storage API to fetch rows - from BigQuery. This API is a billable API. - - This method requires the ``pyarrow`` and - ``google-cloud-bigquery-storage`` libraries. - - Reading from a specific partition or snapshot is not - currently supported by this method. Returns: pyarrow.Table @@ -1492,7 +1452,7 @@ def to_arrow(self, progress_bar_type=None, bqstorage_client=None): progress_bar = self._get_progress_bar(progress_bar_type) record_batches = [] - for record_batch in self._to_arrow_iterable(bqstorage_client=bqstorage_client): + for record_batch in self._to_arrow_iterable(): record_batches.append(record_batch) if progress_bar is not None: @@ -1559,7 +1519,7 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non supplied, use the faster BigQuery Storage API to fetch rows from BigQuery. This API is a billable API. - This method requires the ``pyarrow`` and + This method requires the ``fastavro`` and ``google-cloud-bigquery-storage`` libraries. Reading from a specific partition or snapshot is not diff --git a/bigquery/setup.py b/bigquery/setup.py index 84a0b384816c..5637c0f4fd53 100644 --- a/bigquery/setup.py +++ b/bigquery/setup.py @@ -37,7 +37,7 @@ extras = { "bqstorage": [ "google-cloud-bigquery-storage >= 0.4.0, <2.0.0dev", - "pyarrow >= 0.4.1", + "fastavro>=0.21.2", ], "pandas": ["pandas>=0.17.1"], # Exclude PyArrow dependency from Windows Python 2.7. diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index dcc90b2d96a8..22809c245d4b 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -4897,7 +4897,6 @@ def test_to_dataframe_bqstorage(self): bqstorage_client.create_read_session.assert_called_once_with( mock.ANY, "projects/{}".format(self.PROJECT), - format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW, read_options=mock.ANY, # Use default number of streams for best performance. requested_streams=0, @@ -5341,7 +5340,6 @@ def test_to_dataframe_bqstorage_preserve_order(query): bqstorage_client.create_read_session.assert_called_once_with( mock.ANY, "projects/test-project", - format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW, read_options=mock.ANY, # Use a single stream to preserve row order. requested_streams=1, From 4986bd1cdc8f1cc500f3fcac1f09c62f2c527fda Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 11 Jul 2019 09:43:15 -0500 Subject: [PATCH 8/9] Add tests for to_arrow. --- .../cloud/bigquery_storage_v1beta1/reader.py | 9 ++++ bigquery_storage/tests/system/test_system.py | 35 +++++++++++++++ bigquery_storage/tests/unit/test_reader.py | 44 ++++++++++++++++++- 3 files changed, 86 insertions(+), 2 deletions(-) diff --git a/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py b/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py index 8c73e4443a5a..5d753e3f0132 100644 --- a/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py +++ b/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py @@ -367,6 +367,15 @@ def next(self): # Alias needed for Python 2/3 support. __next__ = next + def to_arrow(self): + """Create an :class:`pyarrow.RecordBatch` of rows in the page. + + Returns: + pyarrow.RecordBatch: + Rows from the message, as an Arrow record batch. + """ + return self._stream_parser.to_arrow(self._message) + def to_dataframe(self, dtypes=None): """Create a :class:`pandas.DataFrame` of rows in the page. diff --git a/bigquery_storage/tests/system/test_system.py b/bigquery_storage/tests/system/test_system.py index ee4b06cb43ce..aa5dd5db868f 100644 --- a/bigquery_storage/tests/system/test_system.py +++ b/bigquery_storage/tests/system/test_system.py @@ -18,6 +18,7 @@ import os import numpy +import pyarrow.types import pytest from google.cloud import bigquery_storage_v1beta1 @@ -67,6 +68,40 @@ def test_read_rows_full_table(client, project_id, small_table_reference): assert len(block.avro_rows.serialized_binary_rows) > 0 +def test_read_rows_to_arrow(client, project_id): + table_ref = bigquery_storage_v1beta1.types.TableReference() + table_ref.project_id = "bigquery-public-data" + table_ref.dataset_id = "new_york_citibike" + table_ref.table_id = "citibike_stations" + + read_options = bigquery_storage_v1beta1.types.TableReadOptions() + read_options.selected_fields.append("station_id") + read_options.selected_fields.append("latitude") + read_options.selected_fields.append("longitude") + read_options.selected_fields.append("name") + session = client.create_read_session( + table_ref, + "projects/{}".format(project_id), + format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW, + read_options=read_options, + requested_streams=1, + ) + stream_pos = bigquery_storage_v1beta1.types.StreamPosition( + stream=session.streams[0] + ) + + tbl = client.read_rows(stream_pos).to_arrow(session) + + assert tbl.num_columns == 4 + schema = tbl.schema + # Use field_by_name because the order doesn't currently match that of + # selected_fields. + assert pyarrow.types.is_int64(schema.field_by_name("station_id").type) + assert pyarrow.types.is_float64(schema.field_by_name("latitude").type) + assert pyarrow.types.is_float64(schema.field_by_name("longitude").type) + assert pyarrow.types.is_string(schema.field_by_name("name").type) + + def test_read_rows_to_dataframe_w_avro(client, project_id): table_ref = bigquery_storage_v1beta1.types.TableReference() table_ref.project_id = "bigquery-public-data" diff --git a/bigquery_storage/tests/unit/test_reader.py b/bigquery_storage/tests/unit/test_reader.py index 03e8779a8b8f..748a45608f3a 100644 --- a/bigquery_storage/tests/unit/test_reader.py +++ b/bigquery_storage/tests/unit/test_reader.py @@ -146,7 +146,7 @@ def _bq_to_avro_blocks(bq_blocks, avro_schema_json): return avro_blocks -def _bq_to_arrow_batches(bq_blocks, arrow_schema): +def _bq_to_arrow_batch_objects(bq_blocks, arrow_schema): arrow_batches = [] for block in bq_blocks: arrays = [] @@ -158,8 +158,13 @@ def _bq_to_arrow_batches(bq_blocks, arrow_schema): size=len(block), ) ) - record_batch = pyarrow.RecordBatch.from_arrays(arrays, arrow_schema) + arrow_batches.append(pyarrow.RecordBatch.from_arrays(arrays, arrow_schema)) + return arrow_batches + +def _bq_to_arrow_batches(bq_blocks, arrow_schema): + arrow_batches = [] + for record_batch in _bq_to_arrow_batch_objects(bq_blocks, arrow_schema): response = bigquery_storage_v1beta1.types.ReadRowsResponse() response.arrow_record_batch.serialized_record_batch = ( record_batch.serialize().to_pybytes() @@ -466,6 +471,41 @@ def test_rows_w_reconnect_by_page(class_under_test, mock_client): assert page_4.remaining == 0 +def test_to_arrow_no_pyarrow_raises_import_error( + mut, class_under_test, mock_client, monkeypatch +): + monkeypatch.setattr(mut, "pyarrow", None) + arrow_schema = _bq_to_arrow_schema(SCALAR_COLUMNS) + read_session = _generate_arrow_read_session(arrow_schema) + arrow_batches = _bq_to_arrow_batches(SCALAR_BLOCKS, arrow_schema) + reader = class_under_test( + arrow_batches, mock_client, bigquery_storage_v1beta1.types.StreamPosition(), {} + ) + + with pytest.raises(ImportError): + reader.to_arrow(read_session) + + with pytest.raises(ImportError): + reader.rows(read_session).to_arrow() + + with pytest.raises(ImportError): + next(reader.rows(read_session).pages).to_arrow() + + +def test_to_arrow_w_scalars_arrow(class_under_test): + arrow_schema = _bq_to_arrow_schema(SCALAR_COLUMNS) + read_session = _generate_arrow_read_session(arrow_schema) + arrow_batches = _bq_to_arrow_batches(SCALAR_BLOCKS, arrow_schema) + reader = class_under_test( + arrow_batches, mock_client, bigquery_storage_v1beta1.types.StreamPosition(), {} + ) + actual_table = reader.to_arrow(read_session) + expected_table = pyarrow.Table.from_batches( + _bq_to_arrow_batch_objects(SCALAR_BLOCKS, arrow_schema) + ) + assert actual_table == expected_table + + def test_to_dataframe_no_pandas_raises_import_error( mut, class_under_test, mock_client, monkeypatch ): From f6470622b33db112daa010176198beee89bd2d5a Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 11 Jul 2019 14:39:57 -0500 Subject: [PATCH 9/9] Remove parameterized error messages. --- .../cloud/bigquery_storage_v1beta1/reader.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py b/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py index 5d753e3f0132..138fae4110eb 100644 --- a/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py +++ b/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py @@ -43,11 +43,13 @@ _STREAM_RESUMPTION_EXCEPTIONS = (google.api_core.exceptions.ServiceUnavailable,) -_AVRO_BYTES_OPERATION = "parse ReadRowResponse messages with Avro bytes" -_ARROW_BYTES_OPERATION = "parse ReadRowResponse messages with Arrow bytes" -_FASTAVRO_REQUIRED = "fastavro is required to {operation}." +_FASTAVRO_REQUIRED = ( + "fastavro is required to parse ReadRowResponse messages with Avro bytes." +) _PANDAS_REQUIRED = "pandas is required to create a DataFrame" -_PYARROW_REQUIRED = "pyarrow is required to {operation}." +_PYARROW_REQUIRED = ( + "pyarrow is required to parse ReadRowResponse messages with Arrow bytes." +) class ReadRowsStream(object): @@ -540,9 +542,7 @@ def to_rows(self, message): class _ArrowStreamParser(_StreamParser): def __init__(self, read_session): if pyarrow is None: - raise ImportError( - _PYARROW_REQUIRED.format(operation=_ARROW_BYTES_OPERATION) - ) + raise ImportError(_PYARROW_REQUIRED) self._read_session = read_session self._schema = None