Skip to content

Commit

Permalink
Add option to choose dtypes by column in to_dataframe. (#7126)
Browse files Browse the repository at this point in the history
* Add option to choose dtypes by column in to_dataframe.

This allows pandas users to select different sized floats for
performance at the expense of accuracy. With pandas 0.24, it will also
allow pandas users to use the new pandas.Int64Dtype() for nullable
integer columns.

* Adjust deps for testing. Blacken.
  • Loading branch information
tswast authored Jan 17, 2019
1 parent 64d1dc8 commit 8c2d0fd
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 33 deletions.
46 changes: 33 additions & 13 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from __future__ import absolute_import

import collections
import copy
import datetime
import json
Expand Down Expand Up @@ -1315,14 +1316,24 @@ def total_rows(self):
"""int: The total number of rows in the table."""
return self._total_rows

def _to_dataframe_tabledata_list(self):
def _to_dataframe_dtypes(self, page, column_names, dtypes):
columns = collections.defaultdict(list)
for row in page:
for column in column_names:
columns[column].append(row[column])
for column in dtypes:
columns[column] = pandas.Series(columns[column], dtype=dtypes[column])
return pandas.DataFrame(columns, columns=column_names)

def _to_dataframe_tabledata_list(self, dtypes):
"""Use (slower, but free) tabledata.list to construct a DataFrame."""
column_headers = [field.name for field in self.schema]
# Use generator, rather than pulling the whole rowset into memory.
rows = (row.values() for row in iter(self))
return pandas.DataFrame(rows, columns=column_headers)
column_names = [field.name for field in self.schema]
frames = []
for page in iter(self.pages):
frames.append(self._to_dataframe_dtypes(page, column_names, dtypes))
return pandas.concat(frames)

def _to_dataframe_bqstorage(self, bqstorage_client):
def _to_dataframe_bqstorage(self, bqstorage_client, dtypes):
"""Use (faster, but billable) BQ Storage API to construct DataFrame."""
import concurrent.futures
from google.cloud import bigquery_storage_v1beta1
Expand Down Expand Up @@ -1360,7 +1371,7 @@ def _to_dataframe_bqstorage(self, bqstorage_client):
def get_dataframe(stream):
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
rowstream = bqstorage_client.read_rows(position)
return rowstream.to_dataframe(session)
return rowstream.to_dataframe(session, dtypes=dtypes)

with concurrent.futures.ThreadPoolExecutor() as pool:
frames = pool.map(get_dataframe, session.streams)
Expand All @@ -1369,16 +1380,16 @@ def get_dataframe(stream):
# the end using manually-parsed schema.
return pandas.concat(frames)[columns]

def to_dataframe(self, bqstorage_client=None):
def to_dataframe(self, bqstorage_client=None, dtypes=None):
"""Create a pandas DataFrame from the query results.
Args:
bqstorage_client ( \
google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient \
):
Optional. A BigQuery Storage API client. If supplied, use the
faster BigQuery Storage API to fetch rows from BigQuery. This
API is a billable API.
**Alpha Feature** Optional. A BigQuery Storage API client. If
supplied, use the faster BigQuery Storage API to fetch rows
from BigQuery. This API is a billable API.
This method requires the ``fastavro`` and
``google-cloud-bigquery-storage`` libraries.
Expand All @@ -1389,6 +1400,13 @@ def to_dataframe(self, bqstorage_client=None):
**Caution**: There is a known issue reading small anonymous
query result tables with the BQ Storage API. Write your query
results to a destination table to work around this issue.
dtypes ( \
Map[str, Union[str, pandas.Series.dtype]] \
):
Optional. A dictionary of column names pandas ``dtype``s. The
provided ``dtype`` is used when constructing the series for
the column specified. Otherwise, the default pandas behavior
is used.
Returns:
pandas.DataFrame:
Expand All @@ -1402,11 +1420,13 @@ def to_dataframe(self, bqstorage_client=None):
"""
if pandas is None:
raise ValueError(_NO_PANDAS_ERROR)
if dtypes is None:
dtypes = {}

if bqstorage_client is not None:
return self._to_dataframe_bqstorage(bqstorage_client)
return self._to_dataframe_bqstorage(bqstorage_client, dtypes)
else:
return self._to_dataframe_tabledata_list()
return self._to_dataframe_tabledata_list(dtypes)


class _EmptyRowIterator(object):
Expand Down
7 changes: 5 additions & 2 deletions bigquery/noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
LOCAL_DEPS = (
os.path.join('..', 'api_core[grpc]'),
os.path.join('..', 'core'),
# TODO: Move bigquery_storage back to dev_install once dtypes feature is
# released. Issue #7049
os.path.join('..', 'bigquery_storage[pandas,fastavro]'),
)


Expand All @@ -40,9 +43,9 @@ def default(session):

# Pyarrow does not support Python 3.7
if session.python == '3.7':
dev_install = '.[bqstorage, pandas]'
dev_install = '.[pandas]'
else:
dev_install = '.[bqstorage, pandas, pyarrow]'
dev_install = '.[pandas, pyarrow]'
session.install('-e', dev_install)

# IPython does not support Python 2 after version 5.x
Expand Down
2 changes: 1 addition & 1 deletion bigquery/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
'google-resumable-media >= 0.3.1',
]
extras = {
'bqstorage': 'google-cloud-bigquery-storage<=2.0.0dev',
'bqstorage': 'google-cloud-bigquery-storage >= 0.2.0dev1, <2.0.0dev',
'pandas': 'pandas>=0.17.1',
# Exclude PyArrow dependency from Windows Python 2.7.
'pyarrow: platform_system != "Windows" or python_version >= "3.4"':
Expand Down
20 changes: 17 additions & 3 deletions bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -1733,13 +1733,22 @@ def test_nested_table_to_dataframe(self):
),
],
),
SF("bigfloat_col", "FLOAT", mode="NULLABLE"),
SF("smallfloat_col", "FLOAT", mode="NULLABLE"),
]
record = {
"nested_string": "another string value",
"nested_repeated": [0, 1, 2],
"nested_record": {"nested_nested_string": "some deep insight"},
}
to_insert = [{"string_col": "Some value", "record_col": record}]
to_insert = [
{
"string_col": "Some value",
"record_col": record,
"bigfloat_col": 3.14,
"smallfloat_col": 2.72,
}
]
rows = [json.dumps(row) for row in to_insert]
body = six.BytesIO("{}\n".format("\n".join(rows)).encode("ascii"))
table_id = "test_table"
Expand All @@ -1753,11 +1762,13 @@ def test_nested_table_to_dataframe(self):
# Load a table using a local JSON file from memory.
Config.CLIENT.load_table_from_file(body, table, job_config=job_config).result()

df = Config.CLIENT.list_rows(table, selected_fields=schema).to_dataframe()
df = Config.CLIENT.list_rows(table, selected_fields=schema).to_dataframe(
dtypes={"smallfloat_col": "float16"}
)

self.assertIsInstance(df, pandas.DataFrame)
self.assertEqual(len(df), 1) # verify the number of rows
exp_columns = ["string_col", "record_col"]
exp_columns = ["string_col", "record_col", "bigfloat_col", "smallfloat_col"]
self.assertEqual(list(df), exp_columns) # verify the column names
row = df.iloc[0]
# verify the row content
Expand All @@ -1769,6 +1780,9 @@ def test_nested_table_to_dataframe(self):
row["record_col"]["nested_record"]["nested_nested_string"],
"some deep insight",
)
# verify dtypes
self.assertEqual(df.dtypes["bigfloat_col"].name, "float64")
self.assertEqual(df.dtypes["smallfloat_col"].name, "float16")

def test_list_rows_empty_table(self):
from google.cloud.bigquery.table import RowIterator
Expand Down
10 changes: 6 additions & 4 deletions bigquery/tests/unit/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1472,21 +1472,22 @@ def test_to_dataframe_column_dtypes(self):
SchemaField("start_timestamp", "TIMESTAMP"),
SchemaField("seconds", "INT64"),
SchemaField("miles", "FLOAT64"),
SchemaField("km", "FLOAT64"),
SchemaField("payment_type", "STRING"),
SchemaField("complete", "BOOL"),
SchemaField("date", "DATE"),
]
row_data = [
["1.4338368E9", "420", "1.1", "Cash", "true", "1999-12-01"],
["1.3878117E9", "2580", "17.7", "Cash", "false", "1953-06-14"],
["1.3855653E9", "2280", "4.4", "Credit", "true", "1981-11-04"],
["1.4338368E9", "420", "1.1", "1.77", "Cash", "true", "1999-12-01"],
["1.3878117E9", "2580", "17.7", "28.5", "Cash", "false", "1953-06-14"],
["1.3855653E9", "2280", "4.4", "7.1", "Credit", "true", "1981-11-04"],
]
rows = [{"f": [{"v": field} for field in row]} for row in row_data]
path = "/foo"
api_request = mock.Mock(return_value={"rows": rows})
row_iterator = RowIterator(_mock_client(), api_request, path, schema)

df = row_iterator.to_dataframe()
df = row_iterator.to_dataframe(dtypes={"km": "float16"})

self.assertIsInstance(df, pandas.DataFrame)
self.assertEqual(len(df), 3) # verify the number of rows
Expand All @@ -1496,6 +1497,7 @@ def test_to_dataframe_column_dtypes(self):
self.assertEqual(df.start_timestamp.dtype.name, "datetime64[ns, UTC]")
self.assertEqual(df.seconds.dtype.name, "int64")
self.assertEqual(df.miles.dtype.name, "float64")
self.assertEqual(df.km.dtype.name, "float16")
self.assertEqual(df.payment_type.dtype.name, "object")
self.assertEqual(df.complete.dtype.name, "bool")
self.assertEqual(df.date.dtype.name, "object")
Expand Down
38 changes: 32 additions & 6 deletions bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from __future__ import absolute_import

import collections
import itertools
import json

Expand Down Expand Up @@ -155,11 +156,11 @@ def rows(self, read_session):
if fastavro is None:
raise ImportError(_FASTAVRO_REQUIRED)

avro_schema = _avro_schema(read_session)
avro_schema, _ = _avro_schema(read_session)
blocks = (_avro_rows(block, avro_schema) for block in self)
return itertools.chain.from_iterable(blocks)

def to_dataframe(self, read_session):
def to_dataframe(self, read_session, dtypes=None):
"""Create a :class:`pandas.DataFrame` of all rows in the stream.
This method requires the pandas libary to create a data frame and the
Expand All @@ -176,6 +177,13 @@ def to_dataframe(self, read_session):
The read session associated with this read rows stream. This
contains the schema, which is required to parse the data
blocks.
dtypes ( \
Map[str, Union[str, pandas.Series.dtype]] \
):
Optional. A dictionary of column names pandas ``dtype``s. The
provided ``dtype`` is used when constructing the series for
the column specified. Otherwise, the default pandas behavior
is used.
Returns:
pandas.DataFrame:
Expand All @@ -186,14 +194,29 @@ def to_dataframe(self, read_session):
if pandas is None:
raise ImportError("pandas is required to create a DataFrame")

avro_schema = _avro_schema(read_session)
if dtypes is None:
dtypes = {}

avro_schema, column_names = _avro_schema(read_session)
frames = []
for block in self:
dataframe = pandas.DataFrame(list(_avro_rows(block, avro_schema)))
dataframe = _to_dataframe_with_dtypes(
_avro_rows(block, avro_schema), column_names, dtypes
)
frames.append(dataframe)
return pandas.concat(frames)


def _to_dataframe_with_dtypes(rows, column_names, dtypes):
columns = collections.defaultdict(list)
for row in rows:
for column in row:
columns[column].append(row[column])
for column in dtypes:
columns[column] = pandas.Series(columns[column], dtype=dtypes[column])
return pandas.DataFrame(columns, columns=column_names)


def _avro_schema(read_session):
"""Extract and parse Avro schema from a read session.
Expand All @@ -206,10 +229,13 @@ def _avro_schema(read_session):
blocks.
Returns:
A parsed Avro schema, using :func:`fastavro.schema.parse_schema`.
Tuple[fastavro.schema, Tuple[str]]:
A parsed Avro schema, using :func:`fastavro.schema.parse_schema`
and the column names for a read session.
"""
json_schema = json.loads(read_session.avro_schema.schema)
return fastavro.parse_schema(json_schema)
column_names = tuple((field["name"] for field in json_schema["fields"]))
return fastavro.parse_schema(json_schema), column_names


def _avro_rows(block, avro_schema):
Expand Down
1 change: 0 additions & 1 deletion bigquery_storage/noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ def default(session):
session.run(
'py.test',
'--quiet',
'--cov=google.cloud.bigquery_storage',
'--cov=google.cloud.bigquery_storage_v1beta1',
'--cov=tests.unit',
'--cov-append',
Expand Down
2 changes: 1 addition & 1 deletion bigquery_storage/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

name = 'google-cloud-bigquery-storage'
description = 'BigQuery Storage API API client library'
version = '0.1.1'
version = '0.2.0dev1'
release_status = 'Development Status :: 3 - Alpha'
dependencies = [
'google-api-core[grpc] >= 1.6.0, < 2.0.0dev',
Expand Down
7 changes: 6 additions & 1 deletion bigquery_storage/tests/system/test_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import os

import numpy
import pytest

from google.cloud import bigquery_storage_v1beta1
Expand Down Expand Up @@ -78,11 +79,15 @@ def test_read_rows_to_dataframe(client, project_id):
stream=session.streams[0]
)

frame = client.read_rows(stream_pos).to_dataframe(session)
frame = client.read_rows(stream_pos).to_dataframe(
session, dtypes={"latitude": numpy.float16}
)

# Station ID is a required field (no nulls), so the datatype should always
# be integer.
assert frame.station_id.dtype.name == "int64"
assert frame.latitude.dtype.name == "float16"
assert frame.longitude.dtype.name == "float64"
assert frame["name"].str.startswith("Central Park").any()


Expand Down
38 changes: 37 additions & 1 deletion bigquery_storage/tests/unit/test_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
{"name": "time_col", "type": "time"},
{"name": "ts_col", "type": "timestamp"},
]
SCALAR_COLUMN_NAMES = [field["name"] for field in SCALAR_COLUMNS]
SCALAR_BLOCKS = [
[
{
Expand Down Expand Up @@ -281,7 +282,9 @@ def test_to_dataframe_w_scalars(class_under_test):
)
got = reader.to_dataframe(read_session)

expected = pandas.DataFrame(list(itertools.chain.from_iterable(SCALAR_BLOCKS)))
expected = pandas.DataFrame(
list(itertools.chain.from_iterable(SCALAR_BLOCKS)), columns=SCALAR_COLUMN_NAMES
)
# fastavro provides its own UTC definition, so
# compare the timestamp columns separately.
got_ts = got["ts_col"]
Expand All @@ -301,6 +304,39 @@ def test_to_dataframe_w_scalars(class_under_test):
)


def test_to_dataframe_w_dtypes(class_under_test):
# TODOTODOTODOTODO
avro_schema = _bq_to_avro_schema(
[
{"name": "bigfloat", "type": "float64"},
{"name": "lilfloat", "type": "float64"},
]
)
read_session = _generate_read_session(avro_schema)
blocks = [
[{"bigfloat": 1.25, "lilfloat": 30.5}, {"bigfloat": 2.5, "lilfloat": 21.125}],
[{"bigfloat": 3.75, "lilfloat": 11.0}],
]
avro_blocks = _bq_to_avro_blocks(blocks, avro_schema)

reader = class_under_test(
avro_blocks, mock_client, bigquery_storage_v1beta1.types.StreamPosition(), {}
)
got = reader.to_dataframe(read_session, dtypes={"lilfloat": "float16"})

expected = pandas.DataFrame(
{
"bigfloat": [1.25, 2.5, 3.75],
"lilfloat": pandas.Series([30.5, 21.125, 11.0], dtype="float16"),
},
columns=["bigfloat", "lilfloat"],
)
pandas.testing.assert_frame_equal(
got.reset_index(drop=True), # reset_index to ignore row labels
expected.reset_index(drop=True),
)


def test_copy_stream_position(mut):
read_position = bigquery_storage_v1beta1.types.StreamPosition(
stream={"name": "test"}, offset=41
Expand Down

0 comments on commit 8c2d0fd

Please sign in to comment.