Skip to content

Commit

Permalink
Add tqdm progress bar for to_dataframe downloads (#7552)
Browse files Browse the repository at this point in the history
Add progress_bar_type argument to to_dataframe

Install tqdm to use this feature.

If there are any tqdm errors during progress bar
construction, a warning is displayed and no progress
bar is displayed.
  • Loading branch information
JohnPaton authored and tswast committed Mar 28, 2019
1 parent 70be116 commit 800a6bb
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 20 deletions.
2 changes: 1 addition & 1 deletion bigquery/docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,9 @@

# Example configuration for intersphinx: refer to the Python standard library.
intersphinx_mapping = {
"python": ("http://python.readthedocs.org/en/latest/", None),
"gax": ("https://gax-python.readthedocs.org/en/latest/", None),
"pandas": ("https://pandas.pydata.org/pandas-docs/stable/", None),
"python": ("http://python.readthedocs.org/en/latest/", None),
}

# Napoleon settings
Expand Down
12 changes: 11 additions & 1 deletion bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2810,7 +2810,7 @@ def result(self, timeout=None, retry=DEFAULT_RETRY):
dest_table = Table(dest_table_ref, schema=schema)
return self._client.list_rows(dest_table, retry=retry)

def to_dataframe(self, bqstorage_client=None, dtypes=None):
def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=None):
"""Return a pandas DataFrame from a QueryJob
Args:
Expand All @@ -2837,6 +2837,16 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None):
provided ``dtype`` is used when constructing the series for
the column specified. Otherwise, the default pandas behavior
is used.
progress_bar_type (Optional[str]):
If set, use the `tqdm <https://tqdm.github.io/>`_ library to
display a progress bar while the data downloads. Install the
``tqdm`` package to use this feature.
See
:func:`~google.cloud.bigquery.table.RowIterator.to_dataframe`
for details.
..versionadded:: 1.11.0
Returns:
A :class:`~pandas.DataFrame` populated with row data and column
Expand Down
78 changes: 73 additions & 5 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
except ImportError: # pragma: NO COVER
pandas = None

try:
import tqdm
except ImportError: # pragma: NO COVER
tqdm = None

from google.api_core.page_iterator import HTTPIterator

import google.cloud._helpers
Expand All @@ -44,6 +49,10 @@
"The pandas library is not installed, please install "
"pandas to use the to_dataframe() function."
)
_NO_TQDM_ERROR = (
"A progress bar was requested, but there was an error loading the tqdm "
"library. Please install tqdm to use the progress bar functionality."
)
_TABLE_HAS_NO_SCHEMA = 'Table has no schema: call "client.get_table()"'
_MARKER = object()

Expand Down Expand Up @@ -1330,12 +1339,22 @@ def _to_dataframe_dtypes(self, page, column_names, dtypes):
columns[column] = pandas.Series(columns[column], dtype=dtypes[column])
return pandas.DataFrame(columns, columns=column_names)

def _to_dataframe_tabledata_list(self, dtypes):
def _to_dataframe_tabledata_list(self, dtypes, progress_bar=None):
"""Use (slower, but free) tabledata.list to construct a DataFrame."""
column_names = [field.name for field in self.schema]
frames = []

for page in iter(self.pages):
frames.append(self._to_dataframe_dtypes(page, column_names, dtypes))
current_frame = self._to_dataframe_dtypes(page, column_names, dtypes)
frames.append(current_frame)

if progress_bar is not None:
# In some cases, the number of total rows is not populated
# until the first page of rows is fetched. Update the
# progress bar's total to keep an accurate count.
progress_bar.total = progress_bar.total or self.total_rows
progress_bar.update(len(current_frame))

return pandas.concat(frames)

def _to_dataframe_bqstorage(self, bqstorage_client, dtypes):
Expand Down Expand Up @@ -1385,10 +1404,37 @@ def get_dataframe(stream):
# the end using manually-parsed schema.
return pandas.concat(frames)[columns]

def to_dataframe(self, bqstorage_client=None, dtypes=None):
def _get_progress_bar(self, progress_bar_type):
"""Construct a tqdm progress bar object, if tqdm is installed."""
if tqdm is None:
if progress_bar_type is not None:
warnings.warn(_NO_TQDM_ERROR, UserWarning, stacklevel=3)
return None

description = "Downloading"
unit = "rows"

try:
if progress_bar_type == "tqdm":
return tqdm.tqdm(desc=description, total=self.total_rows, unit=unit)
elif progress_bar_type == "tqdm_notebook":
return tqdm.tqdm_notebook(
desc=description, total=self.total_rows, unit=unit
)
elif progress_bar_type == "tqdm_gui":
return tqdm.tqdm_gui(
desc=description, total=self.total_rows, unit=unit
)
except (KeyError, TypeError):
# Protect ourselves from any tqdm errors. In case of
# unexpected tqdm behavior, just fall back to showing
# no progress bar.
warnings.warn(_NO_TQDM_ERROR, UserWarning, stacklevel=3)
return None

def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=None):
"""Create a pandas DataFrame by loading all pages of a query.
Args:
bqstorage_client ( \
google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient \
Expand All @@ -1413,6 +1459,26 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None):
provided ``dtype`` is used when constructing the series for
the column specified. Otherwise, the default pandas behavior
is used.
progress_bar_type (Optional[str]):
If set, use the `tqdm <https://tqdm.github.io/>`_ library to
display a progress bar while the data downloads. Install the
``tqdm`` package to use this feature.
Possible values of ``progress_bar_type`` include:
``None``
No progress bar.
``'tqdm'``
Use the :func:`tqdm.tqdm` function to print a progress bar
to :data:`sys.stderr`.
``'tqdm_notebook'``
Use the :func:`tqdm.tqdm_notebook` function to display a
progress bar as a Jupyter notebook widget.
``'tqdm_gui'``
Use the :func:`tqdm.tqdm_gui` function to display a
progress bar as a graphical dialog box.
..versionadded:: 1.11.0
Returns:
pandas.DataFrame:
Expand All @@ -1429,10 +1495,12 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None):
if dtypes is None:
dtypes = {}

progress_bar = self._get_progress_bar(progress_bar_type)

if bqstorage_client is not None:
return self._to_dataframe_bqstorage(bqstorage_client, dtypes)
else:
return self._to_dataframe_tabledata_list(dtypes)
return self._to_dataframe_tabledata_list(dtypes, progress_bar=progress_bar)


class _EmptyRowIterator(object):
Expand Down
4 changes: 2 additions & 2 deletions bigquery/noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ def default(session):

# Pyarrow does not support Python 3.7
if session.python == '3.7':
dev_install = '.[pandas]'
dev_install = '.[pandas, tqdm]'
else:
dev_install = '.[pandas, pyarrow]'
dev_install = '.[pandas, pyarrow, tqdm]'
session.install('-e', dev_install)

# IPython does not support Python 2 after version 5.x
Expand Down
1 change: 1 addition & 0 deletions bigquery/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
# Exclude PyArrow dependency from Windows Python 2.7.
'pyarrow: platform_system != "Windows" or python_version >= "3.4"':
'pyarrow>=0.4.1',
'tqdm': 'tqdm >= 4.0.0, <5.0.0dev',
'fastparquet': ['fastparquet', 'python-snappy'],
}

Expand Down
138 changes: 129 additions & 9 deletions bigquery/tests/unit/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import itertools
import json
import unittest
import warnings

import mock
import pytest
Expand All @@ -29,6 +30,11 @@
except (ImportError, AttributeError): # pragma: NO COVER
pandas = None

try:
from tqdm import tqdm
except (ImportError, AttributeError): # pragma: NO COVER
tqdm = None

from google.cloud.bigquery.dataset import DatasetReference


Expand Down Expand Up @@ -901,7 +907,6 @@ def test_time_partitioning_setter_none(self):
self.assertIsNone(table.time_partitioning)

def test_partitioning_type_setter(self):
import warnings
from google.cloud.bigquery.table import TimePartitioningType

dataset = DatasetReference(self.PROJECT, self.DS_ID)
Expand All @@ -920,7 +925,6 @@ def test_partitioning_type_setter(self):
self.assertIs(warning.category, PendingDeprecationWarning)

def test_partitioning_type_setter_w_time_partitioning_set(self):
import warnings
from google.cloud.bigquery.table import TimePartitioning

dataset = DatasetReference(self.PROJECT, self.DS_ID)
Expand All @@ -938,7 +942,6 @@ def test_partitioning_type_setter_w_time_partitioning_set(self):
self.assertIs(warning.category, PendingDeprecationWarning)

def test_partitioning_expiration_setter_w_time_partitioning_set(self):
import warnings
from google.cloud.bigquery.table import TimePartitioning

dataset = DatasetReference(self.PROJECT, self.DS_ID)
Expand All @@ -956,8 +959,6 @@ def test_partitioning_expiration_setter_w_time_partitioning_set(self):
self.assertIs(warning.category, PendingDeprecationWarning)

def test_partition_expiration_setter(self):
import warnings

dataset = DatasetReference(self.PROJECT, self.DS_ID)
table_ref = dataset.table(self.TABLE_NAME)
table = self._make_one(table_ref)
Expand Down Expand Up @@ -1112,8 +1113,6 @@ def _make_one(self, *args, **kw):
return self._get_target_class()(*args, **kw)

def test_ctor(self):
import warnings

project = "test-project"
dataset_id = "test_dataset"
table_id = "coffee_table"
Expand Down Expand Up @@ -1191,8 +1190,6 @@ def test_ctor_view(self):
self.assertTrue(table.view_use_legacy_sql)

def test_ctor_missing_properties(self):
import warnings

resource = {
"tableReference": {
"projectId": "testproject",
Expand Down Expand Up @@ -1413,6 +1410,129 @@ def test_to_dataframe(self):
self.assertEqual(df.name.dtype.name, "object")
self.assertEqual(df.age.dtype.name, "int64")

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(tqdm is None, "Requires `tqdm`")
@mock.patch("tqdm.tqdm_gui")
@mock.patch("tqdm.tqdm_notebook")
@mock.patch("tqdm.tqdm")
def test_to_dataframe_progress_bar(
self, tqdm_mock, tqdm_notebook_mock, tqdm_gui_mock
):
from google.cloud.bigquery.table import RowIterator
from google.cloud.bigquery.table import SchemaField

schema = [
SchemaField("name", "STRING", mode="REQUIRED"),
SchemaField("age", "INTEGER", mode="REQUIRED"),
]
rows = [
{"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]},
{"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]},
{"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}]},
{"f": [{"v": "Bhettye Rhubble"}, {"v": "27"}]},
]
path = "/foo"
api_request = mock.Mock(return_value={"rows": rows})

progress_bars = (
("tqdm", tqdm_mock),
("tqdm_notebook", tqdm_notebook_mock),
("tqdm_gui", tqdm_gui_mock),
)

for progress_bar_type, progress_bar_mock in progress_bars:
row_iterator = RowIterator(_mock_client(), api_request, path, schema)
df = row_iterator.to_dataframe(progress_bar_type=progress_bar_type)

progress_bar_mock.assert_called()
progress_bar_mock().update.assert_called()
self.assertEqual(len(df), 4)

@unittest.skipIf(pandas is None, "Requires `pandas`")
@mock.patch("google.cloud.bigquery.table.tqdm", new=None)
def test_to_dataframe_no_tqdm_no_progress_bar(self):
from google.cloud.bigquery.table import RowIterator
from google.cloud.bigquery.table import SchemaField

schema = [
SchemaField("name", "STRING", mode="REQUIRED"),
SchemaField("age", "INTEGER", mode="REQUIRED"),
]
rows = [
{"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]},
{"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]},
{"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}]},
{"f": [{"v": "Bhettye Rhubble"}, {"v": "27"}]},
]
path = "/foo"
api_request = mock.Mock(return_value={"rows": rows})
row_iterator = RowIterator(_mock_client(), api_request, path, schema)

with warnings.catch_warnings(record=True) as warned:
df = row_iterator.to_dataframe()

self.assertEqual(len(warned), 0)
self.assertEqual(len(df), 4)

@unittest.skipIf(pandas is None, "Requires `pandas`")
@mock.patch("google.cloud.bigquery.table.tqdm", new=None)
def test_to_dataframe_no_tqdm(self):
from google.cloud.bigquery.table import RowIterator
from google.cloud.bigquery.table import SchemaField

schema = [
SchemaField("name", "STRING", mode="REQUIRED"),
SchemaField("age", "INTEGER", mode="REQUIRED"),
]
rows = [
{"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]},
{"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]},
{"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}]},
{"f": [{"v": "Bhettye Rhubble"}, {"v": "27"}]},
]
path = "/foo"
api_request = mock.Mock(return_value={"rows": rows})
row_iterator = RowIterator(_mock_client(), api_request, path, schema)

with warnings.catch_warnings(record=True) as warned:
df = row_iterator.to_dataframe(progress_bar_type="tqdm")

self.assertEqual(len(warned), 1)
for warning in warned:
self.assertIs(warning.category, UserWarning)

# Even though the progress bar won't show, downloading the dataframe
# should still work.
self.assertEqual(len(df), 4)

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(tqdm is None, "Requires `tqdm`")
@mock.patch("tqdm.tqdm_gui", new=None) # will raise TypeError on call
@mock.patch("tqdm.tqdm_notebook", new=None) # will raise TypeError on call
@mock.patch("tqdm.tqdm", new=None) # will raise TypeError on call
def test_to_dataframe_tqdm_error(self):
from google.cloud.bigquery.table import RowIterator
from google.cloud.bigquery.table import SchemaField

schema = [
SchemaField("name", "STRING", mode="REQUIRED"),
SchemaField("age", "INTEGER", mode="REQUIRED"),
]
rows = [
{"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]},
{"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]},
{"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}]},
{"f": [{"v": "Bhettye Rhubble"}, {"v": "27"}]},
]
path = "/foo"

for progress_bar_type in ("tqdm", "tqdm_notebook", "tqdm_gui"):
api_request = mock.Mock(return_value={"rows": rows})
row_iterator = RowIterator(_mock_client(), api_request, path, schema)
df = row_iterator.to_dataframe(progress_bar_type=progress_bar_type)

self.assertEqual(len(df), 4) # all should be well

@unittest.skipIf(pandas is None, "Requires `pandas`")
def test_to_dataframe_w_empty_results(self):
from google.cloud.bigquery.table import RowIterator
Expand Down
Loading

0 comments on commit 800a6bb

Please sign in to comment.