Skip to content

Commit

Permalink
Add page iterator to ReadRowsStream
Browse files Browse the repository at this point in the history
This allows readers to read blocks (called pages for compatibility with
BigQuery client library) one at a time from a stream. This enables use
cases such as progress bar support and streaming workers that expect
pandas DataFrames.
  • Loading branch information
tswast committed Apr 9, 2019
1 parent 958935e commit df5500e
Show file tree
Hide file tree
Showing 2 changed files with 332 additions and 32 deletions.
207 changes: 182 additions & 25 deletions bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
google.api_core.exceptions.ServiceUnavailable,
)
_FASTAVRO_REQUIRED = "fastavro is required to parse Avro blocks"
_PANDAS_REQUIRED = "pandas is required to create a DataFrame"


class ReadRowsStream(object):
Expand Down Expand Up @@ -156,9 +157,7 @@ def rows(self, read_session):
if fastavro is None:
raise ImportError(_FASTAVRO_REQUIRED)

avro_schema, _ = _avro_schema(read_session)
blocks = (_avro_rows(block, avro_schema) for block in self)
return itertools.chain.from_iterable(blocks)
return ReadRowsIterable(self, read_session)

def to_dataframe(self, read_session, dtypes=None):
"""Create a :class:`pandas.DataFrame` of all rows in the stream.
Expand Down Expand Up @@ -192,29 +191,186 @@ def to_dataframe(self, read_session, dtypes=None):
if fastavro is None:
raise ImportError(_FASTAVRO_REQUIRED)
if pandas is None:
raise ImportError("pandas is required to create a DataFrame")
raise ImportError(_PANDAS_REQUIRED)

if dtypes is None:
dtypes = {}
return self.rows(read_session).to_dataframe(dtypes=dtypes)


class ReadRowsIterable(object):
"""An iterable of rows from a read session.
Args:
reader (google.cloud.bigquery_storage_v1beta1.reader.ReadRowsStream):
A read rows stream.
read_session (google.cloud.bigquery_storage_v1beta1.types.ReadSession):
A read session. This is required because it contains the schema
used in the stream blocks.
"""

# This class is modelled after the google.cloud.bigquery.table.RowIterator
# and aims to be API compatible where possible.

def __init__(self, reader, read_session):
self._status = None
self._reader = reader
self._read_session = read_session

@property
def total_rows(self):
"""int: Number of estimated rows in the current stream.
May change over time.
"""
return getattr(self._status, "estimated_row_count", None)

@property
def pages(self):
"""A generator of all pages in the stream.
Returns:
types.GeneratorType[google.cloud.bigquery_storage_v1beta1.ReadRowsPage]:
A generator of pages.
"""
# Each page is an iterator of rows. But also has num_items, remaining,
# and to_dataframe.
avro_schema, column_names = _avro_schema(self._read_session)
for block in self._reader:
self._status = block.status
yield ReadRowsPage(avro_schema, column_names, block)

def __iter__(self):
"""Iterator for each row in all pages."""
for page in self.pages:
for row in page:
yield row

def to_dataframe(self, dtypes=None):
"""Create a :class:`pandas.DataFrame` of all rows in the stream.
This method requires the pandas libary to create a data frame and the
fastavro library to parse row blocks.
.. warning::
DATETIME columns are not supported. They are currently parsed as
strings in the fastavro library.
Args:
dtypes ( \
Map[str, Union[str, pandas.Series.dtype]] \
):
Optional. A dictionary of column names pandas ``dtype``s. The
provided ``dtype`` is used when constructing the series for
the column specified. Otherwise, the default pandas behavior
is used.
Returns:
pandas.DataFrame:
A data frame of all rows in the stream.
"""
if pandas is None:
raise ImportError(_PANDAS_REQUIRED)

avro_schema, column_names = _avro_schema(read_session)
frames = []
for block in self:
dataframe = _to_dataframe_with_dtypes(
_avro_rows(block, avro_schema), column_names, dtypes
)
frames.append(dataframe)
for page in self.pages:
frames.append(page.to_dataframe(dtypes=dtypes))
return pandas.concat(frames)


def _to_dataframe_with_dtypes(rows, column_names, dtypes):
columns = collections.defaultdict(list)
for row in rows:
for column in row:
columns[column].append(row[column])
for column in dtypes:
columns[column] = pandas.Series(columns[column], dtype=dtypes[column])
return pandas.DataFrame(columns, columns=column_names)
class ReadRowsPage(object):
"""An iterator of rows from a read session block.
Args:
avro_schema (fastavro.schema):
A parsed Avro schema, using :func:`fastavro.schema.parse_schema`
column_names (Tuple[str]]):
A read session's column names (in requested order).
block (google.cloud.bigquery_storage_v1beta1.types.ReadRowsResponse):
A block of data from a read rows stream.
"""

# This class is modeled after google.api_core.page_iterator.Page and aims
# to provide API compatibility where possible.

def __init__(self, avro_schema, column_names, block):
self._avro_schema = avro_schema
self._column_names = column_names
self._block = block
self._iter_rows = None
self._num_items = None
self._remaining = None

def _parse_block(self):
"""Parse metadata and rows from the block only once."""
if self._iter_rows is not None:
return

rows = _avro_rows(self._block, self._avro_schema)
self._num_items = self._block.avro_rows.row_count
self._remaining = self._block.avro_rows.row_count
self._iter_rows = iter(rows)

@property
def num_items(self):
"""int: Total items in the page."""
self._parse_block()
return self._num_items

@property
def remaining(self):
"""int: Remaining items in the page."""
self._parse_block()
return self._remaining

def __iter__(self):
"""A ``ReadRowsPage`` is an iterator."""
return self

def next(self):
"""Get the next row in the page."""
self._parse_block()
if self._remaining > 0:
self._remaining -= 1
return six.next(self._iter_rows)

# Alias needed for Python 2/3 support.
__next__ = next

def to_dataframe(self, dtypes=None):
"""Create a :class:`pandas.DataFrame` of rows in the page.
This method requires the pandas libary to create a data frame and the
fastavro library to parse row blocks.
.. warning::
DATETIME columns are not supported. They are currently parsed as
strings in the fastavro library.
Args:
dtypes ( \
Map[str, Union[str, pandas.Series.dtype]] \
):
Optional. A dictionary of column names pandas ``dtype``s. The
provided ``dtype`` is used when constructing the series for
the column specified. Otherwise, the default pandas behavior
is used.
Returns:
pandas.DataFrame:
A data frame of all rows in the stream.
"""
if pandas is None:
raise ImportError(_PANDAS_REQUIRED)

if dtypes is None:
dtypes = {}

columns = collections.defaultdict(list)
for row in self:
for column in row:
columns[column].append(row[column])
for column in dtypes:
columns[column] = pandas.Series(columns[column], dtype=dtypes[column])
return pandas.DataFrame(columns, columns=self._column_names)


def _avro_schema(read_session):
Expand Down Expand Up @@ -242,12 +398,13 @@ def _avro_rows(block, avro_schema):
"""Parse all rows in a stream block.
Args:
read_session ( \
~google.cloud.bigquery_storage_v1beta1.types.ReadSession \
block ( \
~google.cloud.bigquery_storage_v1beta1.types.ReadRowsResponse \
):
The read session associated with this read rows stream. This
contains the schema, which is required to parse the data
blocks.
A block containing Avro bytes to parse into rows.
avro_schema (fastavro.schema):
A parsed Avro schema, used to deserialized the bytes in the
block.
Returns:
Iterable[Mapping]:
Expand Down
Loading

0 comments on commit df5500e

Please sign in to comment.