Skip to content

Commit

Permalink
Respect progress_bar_type in to_dataframe when used with BQ Stora…
Browse files Browse the repository at this point in the history
…ge API (#7697)

* fix: `to_dataframe` respects `progress_bar_type` with BQ Storage API

* Add unit test for progress bar.

* Add test for full queue.

* Add worker queue for progress bar to prevent lost tqdm updates.

The worker queue runs in a background thread, so it's more likely to be
able to keep up with the other workers that are adding to the worker
queue.

* Test that progress bar updates more than once.
  • Loading branch information
tswast authored Apr 23, 2019
1 parent fb8c802 commit 4dc8c36
Show file tree
Hide file tree
Showing 2 changed files with 200 additions and 15 deletions.
101 changes: 97 additions & 4 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
import datetime
import json
import operator
import threading
import time
import warnings

import six
from six.moves import queue

try:
from google.cloud import bigquery_storage_v1beta1
Expand Down Expand Up @@ -66,7 +69,12 @@
)
_TABLE_HAS_NO_SCHEMA = 'Table has no schema: call "client.get_table()"'
_MARKER = object()
_PROGRESS_INTERVAL = 1.0 # Time between download status updates, in seconds.
_PROGRESS_INTERVAL = 0.2 # Time between download status updates, in seconds.

# Send multiple updates from the worker threads, so there are at least a few
# waiting next time the prgrogess bar is updated.
_PROGRESS_UPDATES_PER_INTERVAL = 3
_PROGRESS_WORKER_INTERVAL = _PROGRESS_INTERVAL / _PROGRESS_UPDATES_PER_INTERVAL


def _reference_getter(table):
Expand Down Expand Up @@ -1274,6 +1282,16 @@ def __repr__(self):
return "Row({}, {})".format(self._xxx_values, f2i)


class _NoopProgressBarQueue(object):
"""A fake Queue class that does nothing.
This is used when there is no progress bar to send updates to.
"""

def put_nowait(self, item):
"""Don't actually do anything with the item."""


class RowIterator(HTTPIterator):
"""A class for iterating through HTTP/JSON API row list responses.
Expand Down Expand Up @@ -1392,7 +1410,7 @@ def _to_dataframe_tabledata_list(self, dtypes, progress_bar=None):
return pandas.concat(frames)

def _to_dataframe_bqstorage_stream(
self, bqstorage_client, dtypes, columns, session, stream
self, bqstorage_client, dtypes, columns, session, stream, worker_queue
):
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
rowstream = bqstorage_client.read_rows(position).rows(session)
Expand All @@ -1403,6 +1421,13 @@ def _to_dataframe_bqstorage_stream(
return
frames.append(page.to_dataframe(dtypes=dtypes))

try:
worker_queue.put_nowait(page.num_items)
except queue.Full:
# It's okay if we miss a few progress updates. Don't slow
# down parsing for that.
pass

# Avoid errors on unlucky streams with no blocks. pandas.concat
# will fail on an empty list.
if not frames:
Expand All @@ -1412,7 +1437,47 @@ def _to_dataframe_bqstorage_stream(
# the end using manually-parsed schema.
return pandas.concat(frames)[columns]

def _to_dataframe_bqstorage(self, bqstorage_client, dtypes):
def _process_worker_updates(self, worker_queue, progress_queue):
last_update_time = time.time()
current_update = 0

# Sum all updates in a contant loop.
while True:
try:
current_update += worker_queue.get(timeout=_PROGRESS_INTERVAL)

# Time to send to the progress bar queue?
current_time = time.time()
elapsed_time = current_time - last_update_time
if elapsed_time > _PROGRESS_WORKER_INTERVAL:
progress_queue.put(current_update)
last_update_time = current_time
current_update = 0

except queue.Empty:
# Keep going, unless there probably aren't going to be any
# additional updates.
if self._to_dataframe_finished:
progress_queue.put(current_update)
return

def _process_progress_updates(self, progress_queue, progress_bar):
if progress_bar is None:
return

# Output all updates since the last interval.
while True:
try:
next_update = progress_queue.get_nowait()
progress_bar.update(next_update)
except queue.Empty:
break

if self._to_dataframe_finished:
progress_bar.close()
return

def _to_dataframe_bqstorage(self, bqstorage_client, dtypes, progress_bar=None):
"""Use (faster, but billable) BQ Storage API to construct DataFrame."""
if bigquery_storage_v1beta1 is None:
raise ValueError(_NO_BQSTORAGE_ERROR)
Expand Down Expand Up @@ -1451,6 +1516,18 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes):
# See: https://stackoverflow.com/a/29237343/101923
self._to_dataframe_finished = False

# Create a queue to track progress updates across threads.
worker_queue = _NoopProgressBarQueue()
progress_queue = None
progress_thread = None
if progress_bar is not None:
worker_queue = queue.Queue()
progress_queue = queue.Queue()
progress_thread = threading.Thread(
target=self._process_worker_updates, args=(worker_queue, progress_queue)
)
progress_thread.start()

def get_frames(pool):
frames = []

Expand All @@ -1466,6 +1543,7 @@ def get_frames(pool):
columns,
session,
stream,
worker_queue,
)
for stream in session.streams
]
Expand All @@ -1475,6 +1553,11 @@ def get_frames(pool):
not_done, timeout=_PROGRESS_INTERVAL
)
frames.extend([future.result() for future in done])

# The progress bar needs to update on the main thread to avoid
# contention over stdout / stderr.
self._process_progress_updates(progress_queue, progress_bar)

return frames

with concurrent.futures.ThreadPoolExecutor() as pool:
Expand All @@ -1486,6 +1569,14 @@ def get_frames(pool):
# definition (enforced by the global interpreter lock).
self._to_dataframe_finished = True

# Shutdown all background threads, now that they should know to
# exit early.
pool.shutdown(wait=True)
if progress_thread is not None:
progress_thread.join()

# Update the progress bar one last time to close it.
self._process_progress_updates(progress_queue, progress_bar)
return pandas.concat(frames)

def _get_progress_bar(self, progress_bar_type):
Expand Down Expand Up @@ -1585,7 +1676,9 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non

if bqstorage_client is not None:
try:
return self._to_dataframe_bqstorage(bqstorage_client, dtypes)
return self._to_dataframe_bqstorage(
bqstorage_client, dtypes, progress_bar=progress_bar
)
except google.api_core.exceptions.Forbidden:
# Don't hide errors such as insufficient permissions to create
# a read session, or the API is not enabled. Both of those are
Expand Down
114 changes: 103 additions & 11 deletions bigquery/tests/unit/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import mock
import pytest
import six
from six.moves import queue

import google.api_core.exceptions

Expand Down Expand Up @@ -1816,9 +1817,12 @@ def test_to_dataframe_w_bqstorage_nonempty(self):
bqstorage_client = mock.create_autospec(
bigquery_storage_v1beta1.BigQueryStorageClient
)
session = bigquery_storage_v1beta1.types.ReadSession(
streams=[{"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}]
)
streams = [
# Use two streams we want to check frames are read from each stream.
{"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"},
{"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"},
]
session = bigquery_storage_v1beta1.types.ReadSession(streams=streams)
session.avro_schema.schema = json.dumps(
{
"fields": [
Expand All @@ -1836,20 +1840,25 @@ def test_to_dataframe_w_bqstorage_nonempty(self):

mock_rows = mock.create_autospec(reader.ReadRowsIterable)
mock_rowstream.rows.return_value = mock_rows
page_items = [
{"colA": 1, "colB": "abc", "colC": 2.0},
{"colA": -1, "colB": "def", "colC": 4.0},
]

def blocking_to_dataframe(*args, **kwargs):
# Sleep for longer than the waiting interval so that we know we're
# only reading one page per loop at most.
time.sleep(2 * mut._PROGRESS_INTERVAL)
return pandas.DataFrame(
{"colA": [1, -1], "colB": ["abc", "def"], "colC": [2.0, 4.0]},
columns=["colA", "colB", "colC"],
)
return pandas.DataFrame(page_items, columns=["colA", "colB", "colC"])

mock_page = mock.create_autospec(reader.ReadRowsPage)
mock_page.to_dataframe.side_effect = blocking_to_dataframe
mock_pages = mock.PropertyMock(return_value=(mock_page, mock_page, mock_page))
type(mock_rows).pages = mock_pages
mock_pages = (mock_page, mock_page, mock_page)
type(mock_rows).pages = mock.PropertyMock(return_value=mock_pages)

# Test that full queue errors are ignored.
mock_queue = mock.create_autospec(mut._NoopProgressBarQueue)
mock_queue().put_nowait.side_effect = queue.Full

schema = [
schema.SchemaField("colA", "IGNORED"),
Expand All @@ -1866,17 +1875,100 @@ def blocking_to_dataframe(*args, **kwargs):
selected_fields=schema,
)

with mock.patch(
with mock.patch.object(mut, "_NoopProgressBarQueue", mock_queue), mock.patch(
"concurrent.futures.wait", wraps=concurrent.futures.wait
) as mock_wait:
got = row_iterator.to_dataframe(bqstorage_client=bqstorage_client)

# Are the columns in the expected order?
column_names = ["colA", "colC", "colB"]
self.assertEqual(list(got), column_names)
self.assertEqual(len(got.index), 6)

# Have expected number of rows?
total_pages = len(streams) * len(mock_pages)
total_rows = len(page_items) * total_pages
self.assertEqual(len(got.index), total_rows)

# Make sure that this test looped through multiple progress intervals.
self.assertGreaterEqual(mock_wait.call_count, 2)

# Make sure that this test pushed to the progress queue.
self.assertEqual(mock_queue().put_nowait.call_count, total_pages)

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
)
@unittest.skipIf(tqdm is None, "Requires `tqdm`")
@mock.patch("tqdm.tqdm")
def test_to_dataframe_w_bqstorage_updates_progress_bar(self, tqdm_mock):
from google.cloud.bigquery import schema
from google.cloud.bigquery import table as mut
from google.cloud.bigquery_storage_v1beta1 import reader

# Speed up testing.
mut._PROGRESS_INTERVAL = 0.01

bqstorage_client = mock.create_autospec(
bigquery_storage_v1beta1.BigQueryStorageClient
)
streams = [
# Use two streams we want to check that progress bar updates are
# sent from each stream.
{"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"},
{"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"},
]
session = bigquery_storage_v1beta1.types.ReadSession(streams=streams)
session.avro_schema.schema = json.dumps({"fields": [{"name": "testcol"}]})
bqstorage_client.create_read_session.return_value = session

mock_rowstream = mock.create_autospec(reader.ReadRowsStream)
bqstorage_client.read_rows.return_value = mock_rowstream

mock_rows = mock.create_autospec(reader.ReadRowsIterable)
mock_rowstream.rows.return_value = mock_rows
mock_page = mock.create_autospec(reader.ReadRowsPage)
page_items = [-1, 0, 1]
type(mock_page).num_items = mock.PropertyMock(return_value=len(page_items))

def blocking_to_dataframe(*args, **kwargs):
# Sleep for longer than the waiting interval. This ensures the
# progress_queue gets written to more than once because it gives
# the worker->progress updater time to sum intermediate updates.
time.sleep(2 * mut._PROGRESS_INTERVAL)
return pandas.DataFrame({"testcol": page_items})

mock_page.to_dataframe.side_effect = blocking_to_dataframe
mock_pages = (mock_page, mock_page, mock_page, mock_page, mock_page)
type(mock_rows).pages = mock.PropertyMock(return_value=mock_pages)

schema = [schema.SchemaField("testcol", "IGNORED")]

row_iterator = mut.RowIterator(
_mock_client(),
None, # api_request: ignored
None, # path: ignored
schema,
table=mut.TableReference.from_string("proj.dset.tbl"),
selected_fields=schema,
)

row_iterator.to_dataframe(
bqstorage_client=bqstorage_client, progress_bar_type="tqdm"
)

# Make sure that this test updated the progress bar once per page from
# each stream.
total_pages = len(streams) * len(mock_pages)
expected_total_rows = total_pages * len(page_items)
progress_updates = [
args[0] for args, kwargs in tqdm_mock().update.call_args_list
]
# Should have sent >1 update due to delay in blocking_to_dataframe.
self.assertGreater(len(progress_updates), 1)
self.assertEqual(sum(progress_updates), expected_total_rows)
tqdm_mock().close.assert_called_once()

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
Expand Down

0 comments on commit 4dc8c36

Please sign in to comment.