diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 123dbea7bb07..b7668fd28ed1 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -16,6 +16,7 @@ from __future__ import absolute_import +import collections import copy import datetime import json @@ -1315,14 +1316,24 @@ def total_rows(self): """int: The total number of rows in the table.""" return self._total_rows - def _to_dataframe_tabledata_list(self): + def _to_dataframe_dtypes(self, page, column_names, dtypes): + columns = collections.defaultdict(list) + for row in page: + for column in column_names: + columns[column].append(row[column]) + for column in dtypes: + columns[column] = pandas.Series(columns[column], dtype=dtypes[column]) + return pandas.DataFrame(columns, columns=column_names) + + def _to_dataframe_tabledata_list(self, dtypes): """Use (slower, but free) tabledata.list to construct a DataFrame.""" - column_headers = [field.name for field in self.schema] - # Use generator, rather than pulling the whole rowset into memory. - rows = (row.values() for row in iter(self)) - return pandas.DataFrame(rows, columns=column_headers) + column_names = [field.name for field in self.schema] + frames = [] + for page in iter(self.pages): + frames.append(self._to_dataframe_dtypes(page, column_names, dtypes)) + return pandas.concat(frames) - def _to_dataframe_bqstorage(self, bqstorage_client): + def _to_dataframe_bqstorage(self, bqstorage_client, dtypes): """Use (faster, but billable) BQ Storage API to construct DataFrame.""" import concurrent.futures from google.cloud import bigquery_storage_v1beta1 @@ -1360,7 +1371,7 @@ def _to_dataframe_bqstorage(self, bqstorage_client): def get_dataframe(stream): position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream) rowstream = bqstorage_client.read_rows(position) - return rowstream.to_dataframe(session) + return rowstream.to_dataframe(session, dtypes=dtypes) with concurrent.futures.ThreadPoolExecutor() as pool: frames = pool.map(get_dataframe, session.streams) @@ -1369,16 +1380,16 @@ def get_dataframe(stream): # the end using manually-parsed schema. return pandas.concat(frames)[columns] - def to_dataframe(self, bqstorage_client=None): + def to_dataframe(self, bqstorage_client=None, dtypes=None): """Create a pandas DataFrame from the query results. Args: bqstorage_client ( \ google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient \ ): - Optional. A BigQuery Storage API client. If supplied, use the - faster BigQuery Storage API to fetch rows from BigQuery. This - API is a billable API. + **Alpha Feature** Optional. A BigQuery Storage API client. If + supplied, use the faster BigQuery Storage API to fetch rows + from BigQuery. This API is a billable API. This method requires the ``fastavro`` and ``google-cloud-bigquery-storage`` libraries. @@ -1389,6 +1400,13 @@ def to_dataframe(self, bqstorage_client=None): **Caution**: There is a known issue reading small anonymous query result tables with the BQ Storage API. Write your query results to a destination table to work around this issue. + dtypes ( \ + Map[str, Union[str, pandas.Series.dtype]] \ + ): + Optional. A dictionary of column names pandas ``dtype``s. The + provided ``dtype`` is used when constructing the series for + the column specified. Otherwise, the default pandas behavior + is used. Returns: pandas.DataFrame: @@ -1402,11 +1420,13 @@ def to_dataframe(self, bqstorage_client=None): """ if pandas is None: raise ValueError(_NO_PANDAS_ERROR) + if dtypes is None: + dtypes = {} if bqstorage_client is not None: - return self._to_dataframe_bqstorage(bqstorage_client) + return self._to_dataframe_bqstorage(bqstorage_client, dtypes) else: - return self._to_dataframe_tabledata_list() + return self._to_dataframe_tabledata_list(dtypes) class _EmptyRowIterator(object): diff --git a/bigquery/noxfile.py b/bigquery/noxfile.py index 121993e36595..089a82375606 100644 --- a/bigquery/noxfile.py +++ b/bigquery/noxfile.py @@ -22,6 +22,9 @@ LOCAL_DEPS = ( os.path.join('..', 'api_core[grpc]'), os.path.join('..', 'core'), + # TODO: Move bigquery_storage back to dev_install once dtypes feature is + # released. Issue #7049 + os.path.join('..', 'bigquery_storage[pandas,fastavro]'), ) @@ -40,9 +43,9 @@ def default(session): # Pyarrow does not support Python 3.7 if session.python == '3.7': - dev_install = '.[bqstorage, pandas]' + dev_install = '.[pandas]' else: - dev_install = '.[bqstorage, pandas, pyarrow]' + dev_install = '.[pandas, pyarrow]' session.install('-e', dev_install) # IPython does not support Python 2 after version 5.x diff --git a/bigquery/setup.py b/bigquery/setup.py index 3f8f1c7f9388..5d155e24d1a8 100644 --- a/bigquery/setup.py +++ b/bigquery/setup.py @@ -34,7 +34,7 @@ 'google-resumable-media >= 0.3.1', ] extras = { - 'bqstorage': 'google-cloud-bigquery-storage<=2.0.0dev', + 'bqstorage': 'google-cloud-bigquery-storage >= 0.2.0dev1, <2.0.0dev', 'pandas': 'pandas>=0.17.1', # Exclude PyArrow dependency from Windows Python 2.7. 'pyarrow: platform_system != "Windows" or python_version >= "3.4"': diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index ad67e7b035bd..1f0755cd64e8 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -1733,13 +1733,22 @@ def test_nested_table_to_dataframe(self): ), ], ), + SF("bigfloat_col", "FLOAT", mode="NULLABLE"), + SF("smallfloat_col", "FLOAT", mode="NULLABLE"), ] record = { "nested_string": "another string value", "nested_repeated": [0, 1, 2], "nested_record": {"nested_nested_string": "some deep insight"}, } - to_insert = [{"string_col": "Some value", "record_col": record}] + to_insert = [ + { + "string_col": "Some value", + "record_col": record, + "bigfloat_col": 3.14, + "smallfloat_col": 2.72, + } + ] rows = [json.dumps(row) for row in to_insert] body = six.BytesIO("{}\n".format("\n".join(rows)).encode("ascii")) table_id = "test_table" @@ -1753,11 +1762,13 @@ def test_nested_table_to_dataframe(self): # Load a table using a local JSON file from memory. Config.CLIENT.load_table_from_file(body, table, job_config=job_config).result() - df = Config.CLIENT.list_rows(table, selected_fields=schema).to_dataframe() + df = Config.CLIENT.list_rows(table, selected_fields=schema).to_dataframe( + dtypes={"smallfloat_col": "float16"} + ) self.assertIsInstance(df, pandas.DataFrame) self.assertEqual(len(df), 1) # verify the number of rows - exp_columns = ["string_col", "record_col"] + exp_columns = ["string_col", "record_col", "bigfloat_col", "smallfloat_col"] self.assertEqual(list(df), exp_columns) # verify the column names row = df.iloc[0] # verify the row content @@ -1769,6 +1780,9 @@ def test_nested_table_to_dataframe(self): row["record_col"]["nested_record"]["nested_nested_string"], "some deep insight", ) + # verify dtypes + self.assertEqual(df.dtypes["bigfloat_col"].name, "float64") + self.assertEqual(df.dtypes["smallfloat_col"].name, "float16") def test_list_rows_empty_table(self): from google.cloud.bigquery.table import RowIterator diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index 9ed6eea2a3d0..af20c396ac88 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -1472,21 +1472,22 @@ def test_to_dataframe_column_dtypes(self): SchemaField("start_timestamp", "TIMESTAMP"), SchemaField("seconds", "INT64"), SchemaField("miles", "FLOAT64"), + SchemaField("km", "FLOAT64"), SchemaField("payment_type", "STRING"), SchemaField("complete", "BOOL"), SchemaField("date", "DATE"), ] row_data = [ - ["1.4338368E9", "420", "1.1", "Cash", "true", "1999-12-01"], - ["1.3878117E9", "2580", "17.7", "Cash", "false", "1953-06-14"], - ["1.3855653E9", "2280", "4.4", "Credit", "true", "1981-11-04"], + ["1.4338368E9", "420", "1.1", "1.77", "Cash", "true", "1999-12-01"], + ["1.3878117E9", "2580", "17.7", "28.5", "Cash", "false", "1953-06-14"], + ["1.3855653E9", "2280", "4.4", "7.1", "Credit", "true", "1981-11-04"], ] rows = [{"f": [{"v": field} for field in row]} for row in row_data] path = "/foo" api_request = mock.Mock(return_value={"rows": rows}) row_iterator = RowIterator(_mock_client(), api_request, path, schema) - df = row_iterator.to_dataframe() + df = row_iterator.to_dataframe(dtypes={"km": "float16"}) self.assertIsInstance(df, pandas.DataFrame) self.assertEqual(len(df), 3) # verify the number of rows @@ -1496,6 +1497,7 @@ def test_to_dataframe_column_dtypes(self): self.assertEqual(df.start_timestamp.dtype.name, "datetime64[ns, UTC]") self.assertEqual(df.seconds.dtype.name, "int64") self.assertEqual(df.miles.dtype.name, "float64") + self.assertEqual(df.km.dtype.name, "float16") self.assertEqual(df.payment_type.dtype.name, "object") self.assertEqual(df.complete.dtype.name, "bool") self.assertEqual(df.date.dtype.name, "object") diff --git a/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py b/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py index 0cf6af2d494d..64748b0bc7cf 100644 --- a/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py +++ b/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py @@ -14,6 +14,7 @@ from __future__ import absolute_import +import collections import itertools import json @@ -155,11 +156,11 @@ def rows(self, read_session): if fastavro is None: raise ImportError(_FASTAVRO_REQUIRED) - avro_schema = _avro_schema(read_session) + avro_schema, _ = _avro_schema(read_session) blocks = (_avro_rows(block, avro_schema) for block in self) return itertools.chain.from_iterable(blocks) - def to_dataframe(self, read_session): + def to_dataframe(self, read_session, dtypes=None): """Create a :class:`pandas.DataFrame` of all rows in the stream. This method requires the pandas libary to create a data frame and the @@ -176,6 +177,13 @@ def to_dataframe(self, read_session): The read session associated with this read rows stream. This contains the schema, which is required to parse the data blocks. + dtypes ( \ + Map[str, Union[str, pandas.Series.dtype]] \ + ): + Optional. A dictionary of column names pandas ``dtype``s. The + provided ``dtype`` is used when constructing the series for + the column specified. Otherwise, the default pandas behavior + is used. Returns: pandas.DataFrame: @@ -186,14 +194,29 @@ def to_dataframe(self, read_session): if pandas is None: raise ImportError("pandas is required to create a DataFrame") - avro_schema = _avro_schema(read_session) + if dtypes is None: + dtypes = {} + + avro_schema, column_names = _avro_schema(read_session) frames = [] for block in self: - dataframe = pandas.DataFrame(list(_avro_rows(block, avro_schema))) + dataframe = _to_dataframe_with_dtypes( + _avro_rows(block, avro_schema), column_names, dtypes + ) frames.append(dataframe) return pandas.concat(frames) +def _to_dataframe_with_dtypes(rows, column_names, dtypes): + columns = collections.defaultdict(list) + for row in rows: + for column in row: + columns[column].append(row[column]) + for column in dtypes: + columns[column] = pandas.Series(columns[column], dtype=dtypes[column]) + return pandas.DataFrame(columns, columns=column_names) + + def _avro_schema(read_session): """Extract and parse Avro schema from a read session. @@ -206,10 +229,13 @@ def _avro_schema(read_session): blocks. Returns: - A parsed Avro schema, using :func:`fastavro.schema.parse_schema`. + Tuple[fastavro.schema, Tuple[str]]: + A parsed Avro schema, using :func:`fastavro.schema.parse_schema` + and the column names for a read session. """ json_schema = json.loads(read_session.avro_schema.schema) - return fastavro.parse_schema(json_schema) + column_names = tuple((field["name"] for field in json_schema["fields"])) + return fastavro.parse_schema(json_schema), column_names def _avro_rows(block, avro_schema): diff --git a/bigquery_storage/noxfile.py b/bigquery_storage/noxfile.py index a067c095588f..e5aa557c8611 100644 --- a/bigquery_storage/noxfile.py +++ b/bigquery_storage/noxfile.py @@ -43,7 +43,6 @@ def default(session): session.run( 'py.test', '--quiet', - '--cov=google.cloud.bigquery_storage', '--cov=google.cloud.bigquery_storage_v1beta1', '--cov=tests.unit', '--cov-append', diff --git a/bigquery_storage/setup.py b/bigquery_storage/setup.py index e5107e47147d..027f79e11fb2 100644 --- a/bigquery_storage/setup.py +++ b/bigquery_storage/setup.py @@ -21,7 +21,7 @@ name = 'google-cloud-bigquery-storage' description = 'BigQuery Storage API API client library' -version = '0.1.1' +version = '0.2.0dev1' release_status = 'Development Status :: 3 - Alpha' dependencies = [ 'google-api-core[grpc] >= 1.6.0, < 2.0.0dev', diff --git a/bigquery_storage/tests/system/test_system.py b/bigquery_storage/tests/system/test_system.py index a1581e51acf6..4050f72455ca 100644 --- a/bigquery_storage/tests/system/test_system.py +++ b/bigquery_storage/tests/system/test_system.py @@ -17,6 +17,7 @@ import os +import numpy import pytest from google.cloud import bigquery_storage_v1beta1 @@ -78,11 +79,15 @@ def test_read_rows_to_dataframe(client, project_id): stream=session.streams[0] ) - frame = client.read_rows(stream_pos).to_dataframe(session) + frame = client.read_rows(stream_pos).to_dataframe( + session, dtypes={"latitude": numpy.float16} + ) # Station ID is a required field (no nulls), so the datatype should always # be integer. assert frame.station_id.dtype.name == "int64" + assert frame.latitude.dtype.name == "float16" + assert frame.longitude.dtype.name == "float64" assert frame["name"].str.startswith("Central Park").any() diff --git a/bigquery_storage/tests/unit/test_reader.py b/bigquery_storage/tests/unit/test_reader.py index 51bf2fd99ddc..b191a034fff0 100644 --- a/bigquery_storage/tests/unit/test_reader.py +++ b/bigquery_storage/tests/unit/test_reader.py @@ -55,6 +55,7 @@ {"name": "time_col", "type": "time"}, {"name": "ts_col", "type": "timestamp"}, ] +SCALAR_COLUMN_NAMES = [field["name"] for field in SCALAR_COLUMNS] SCALAR_BLOCKS = [ [ { @@ -281,7 +282,9 @@ def test_to_dataframe_w_scalars(class_under_test): ) got = reader.to_dataframe(read_session) - expected = pandas.DataFrame(list(itertools.chain.from_iterable(SCALAR_BLOCKS))) + expected = pandas.DataFrame( + list(itertools.chain.from_iterable(SCALAR_BLOCKS)), columns=SCALAR_COLUMN_NAMES + ) # fastavro provides its own UTC definition, so # compare the timestamp columns separately. got_ts = got["ts_col"] @@ -301,6 +304,39 @@ def test_to_dataframe_w_scalars(class_under_test): ) +def test_to_dataframe_w_dtypes(class_under_test): + # TODOTODOTODOTODO + avro_schema = _bq_to_avro_schema( + [ + {"name": "bigfloat", "type": "float64"}, + {"name": "lilfloat", "type": "float64"}, + ] + ) + read_session = _generate_read_session(avro_schema) + blocks = [ + [{"bigfloat": 1.25, "lilfloat": 30.5}, {"bigfloat": 2.5, "lilfloat": 21.125}], + [{"bigfloat": 3.75, "lilfloat": 11.0}], + ] + avro_blocks = _bq_to_avro_blocks(blocks, avro_schema) + + reader = class_under_test( + avro_blocks, mock_client, bigquery_storage_v1beta1.types.StreamPosition(), {} + ) + got = reader.to_dataframe(read_session, dtypes={"lilfloat": "float16"}) + + expected = pandas.DataFrame( + { + "bigfloat": [1.25, 2.5, 3.75], + "lilfloat": pandas.Series([30.5, 21.125, 11.0], dtype="float16"), + }, + columns=["bigfloat", "lilfloat"], + ) + pandas.testing.assert_frame_equal( + got.reset_index(drop=True), # reset_index to ignore row labels + expected.reset_index(drop=True), + ) + + def test_copy_stream_position(mut): read_position = bigquery_storage_v1beta1.types.StreamPosition( stream={"name": "test"}, offset=41