diff --git a/bigquery/docs/conf.py b/bigquery/docs/conf.py index 62815ae73b38..c9ff82d8e72b 100644 --- a/bigquery/docs/conf.py +++ b/bigquery/docs/conf.py @@ -326,9 +326,9 @@ # Example configuration for intersphinx: refer to the Python standard library. intersphinx_mapping = { - "python": ("http://python.readthedocs.org/en/latest/", None), "gax": ("https://gax-python.readthedocs.org/en/latest/", None), "pandas": ("https://pandas.pydata.org/pandas-docs/stable/", None), + "python": ("http://python.readthedocs.org/en/latest/", None), } # Napoleon settings diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index f4c919a2d8fc..bc87f109a484 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -2810,7 +2810,7 @@ def result(self, timeout=None, retry=DEFAULT_RETRY): dest_table = Table(dest_table_ref, schema=schema) return self._client.list_rows(dest_table, retry=retry) - def to_dataframe(self, bqstorage_client=None, dtypes=None): + def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=None): """Return a pandas DataFrame from a QueryJob Args: @@ -2837,6 +2837,16 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None): provided ``dtype`` is used when constructing the series for the column specified. Otherwise, the default pandas behavior is used. + progress_bar_type (Optional[str]): + If set, use the `tqdm `_ library to + display a progress bar while the data downloads. Install the + ``tqdm`` package to use this feature. + + See + :func:`~google.cloud.bigquery.table.RowIterator.to_dataframe` + for details. + + ..versionadded:: 1.11.0 Returns: A :class:`~pandas.DataFrame` populated with row data and column diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 46419e70a83e..dcb25d8bb3c6 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -30,6 +30,11 @@ except ImportError: # pragma: NO COVER pandas = None +try: + import tqdm +except ImportError: # pragma: NO COVER + tqdm = None + from google.api_core.page_iterator import HTTPIterator import google.cloud._helpers @@ -44,6 +49,10 @@ "The pandas library is not installed, please install " "pandas to use the to_dataframe() function." ) +_NO_TQDM_ERROR = ( + "A progress bar was requested, but there was an error loading the tqdm " + "library. Please install tqdm to use the progress bar functionality." +) _TABLE_HAS_NO_SCHEMA = 'Table has no schema: call "client.get_table()"' _MARKER = object() @@ -1330,12 +1339,22 @@ def _to_dataframe_dtypes(self, page, column_names, dtypes): columns[column] = pandas.Series(columns[column], dtype=dtypes[column]) return pandas.DataFrame(columns, columns=column_names) - def _to_dataframe_tabledata_list(self, dtypes): + def _to_dataframe_tabledata_list(self, dtypes, progress_bar=None): """Use (slower, but free) tabledata.list to construct a DataFrame.""" column_names = [field.name for field in self.schema] frames = [] + for page in iter(self.pages): - frames.append(self._to_dataframe_dtypes(page, column_names, dtypes)) + current_frame = self._to_dataframe_dtypes(page, column_names, dtypes) + frames.append(current_frame) + + if progress_bar is not None: + # In some cases, the number of total rows is not populated + # until the first page of rows is fetched. Update the + # progress bar's total to keep an accurate count. + progress_bar.total = progress_bar.total or self.total_rows + progress_bar.update(len(current_frame)) + return pandas.concat(frames) def _to_dataframe_bqstorage(self, bqstorage_client, dtypes): @@ -1385,10 +1404,37 @@ def get_dataframe(stream): # the end using manually-parsed schema. return pandas.concat(frames)[columns] - def to_dataframe(self, bqstorage_client=None, dtypes=None): + def _get_progress_bar(self, progress_bar_type): + """Construct a tqdm progress bar object, if tqdm is installed.""" + if tqdm is None: + if progress_bar_type is not None: + warnings.warn(_NO_TQDM_ERROR, UserWarning, stacklevel=3) + return None + + description = "Downloading" + unit = "rows" + + try: + if progress_bar_type == "tqdm": + return tqdm.tqdm(desc=description, total=self.total_rows, unit=unit) + elif progress_bar_type == "tqdm_notebook": + return tqdm.tqdm_notebook( + desc=description, total=self.total_rows, unit=unit + ) + elif progress_bar_type == "tqdm_gui": + return tqdm.tqdm_gui( + desc=description, total=self.total_rows, unit=unit + ) + except (KeyError, TypeError): + # Protect ourselves from any tqdm errors. In case of + # unexpected tqdm behavior, just fall back to showing + # no progress bar. + warnings.warn(_NO_TQDM_ERROR, UserWarning, stacklevel=3) + return None + + def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=None): """Create a pandas DataFrame by loading all pages of a query. - Args: bqstorage_client ( \ google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient \ @@ -1413,6 +1459,26 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None): provided ``dtype`` is used when constructing the series for the column specified. Otherwise, the default pandas behavior is used. + progress_bar_type (Optional[str]): + If set, use the `tqdm `_ library to + display a progress bar while the data downloads. Install the + ``tqdm`` package to use this feature. + + Possible values of ``progress_bar_type`` include: + + ``None`` + No progress bar. + ``'tqdm'`` + Use the :func:`tqdm.tqdm` function to print a progress bar + to :data:`sys.stderr`. + ``'tqdm_notebook'`` + Use the :func:`tqdm.tqdm_notebook` function to display a + progress bar as a Jupyter notebook widget. + ``'tqdm_gui'`` + Use the :func:`tqdm.tqdm_gui` function to display a + progress bar as a graphical dialog box. + + ..versionadded:: 1.11.0 Returns: pandas.DataFrame: @@ -1429,10 +1495,12 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None): if dtypes is None: dtypes = {} + progress_bar = self._get_progress_bar(progress_bar_type) + if bqstorage_client is not None: return self._to_dataframe_bqstorage(bqstorage_client, dtypes) else: - return self._to_dataframe_tabledata_list(dtypes) + return self._to_dataframe_tabledata_list(dtypes, progress_bar=progress_bar) class _EmptyRowIterator(object): diff --git a/bigquery/noxfile.py b/bigquery/noxfile.py index 82846604306e..2c11f5b67056 100644 --- a/bigquery/noxfile.py +++ b/bigquery/noxfile.py @@ -44,9 +44,9 @@ def default(session): # Pyarrow does not support Python 3.7 if session.python == '3.7': - dev_install = '.[pandas]' + dev_install = '.[pandas, tqdm]' else: - dev_install = '.[pandas, pyarrow]' + dev_install = '.[pandas, pyarrow, tqdm]' session.install('-e', dev_install) # IPython does not support Python 2 after version 5.x diff --git a/bigquery/setup.py b/bigquery/setup.py index 6b4edaf561c0..696e093cd6ff 100644 --- a/bigquery/setup.py +++ b/bigquery/setup.py @@ -39,6 +39,7 @@ # Exclude PyArrow dependency from Windows Python 2.7. 'pyarrow: platform_system != "Windows" or python_version >= "3.4"': 'pyarrow>=0.4.1', + 'tqdm': 'tqdm >= 4.0.0, <5.0.0dev', 'fastparquet': ['fastparquet', 'python-snappy'], } diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index d9ba9db3f05d..4500856ec2a4 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -15,6 +15,7 @@ import itertools import json import unittest +import warnings import mock import pytest @@ -29,6 +30,11 @@ except (ImportError, AttributeError): # pragma: NO COVER pandas = None +try: + from tqdm import tqdm +except (ImportError, AttributeError): # pragma: NO COVER + tqdm = None + from google.cloud.bigquery.dataset import DatasetReference @@ -901,7 +907,6 @@ def test_time_partitioning_setter_none(self): self.assertIsNone(table.time_partitioning) def test_partitioning_type_setter(self): - import warnings from google.cloud.bigquery.table import TimePartitioningType dataset = DatasetReference(self.PROJECT, self.DS_ID) @@ -920,7 +925,6 @@ def test_partitioning_type_setter(self): self.assertIs(warning.category, PendingDeprecationWarning) def test_partitioning_type_setter_w_time_partitioning_set(self): - import warnings from google.cloud.bigquery.table import TimePartitioning dataset = DatasetReference(self.PROJECT, self.DS_ID) @@ -938,7 +942,6 @@ def test_partitioning_type_setter_w_time_partitioning_set(self): self.assertIs(warning.category, PendingDeprecationWarning) def test_partitioning_expiration_setter_w_time_partitioning_set(self): - import warnings from google.cloud.bigquery.table import TimePartitioning dataset = DatasetReference(self.PROJECT, self.DS_ID) @@ -956,8 +959,6 @@ def test_partitioning_expiration_setter_w_time_partitioning_set(self): self.assertIs(warning.category, PendingDeprecationWarning) def test_partition_expiration_setter(self): - import warnings - dataset = DatasetReference(self.PROJECT, self.DS_ID) table_ref = dataset.table(self.TABLE_NAME) table = self._make_one(table_ref) @@ -1112,8 +1113,6 @@ def _make_one(self, *args, **kw): return self._get_target_class()(*args, **kw) def test_ctor(self): - import warnings - project = "test-project" dataset_id = "test_dataset" table_id = "coffee_table" @@ -1191,8 +1190,6 @@ def test_ctor_view(self): self.assertTrue(table.view_use_legacy_sql) def test_ctor_missing_properties(self): - import warnings - resource = { "tableReference": { "projectId": "testproject", @@ -1413,6 +1410,129 @@ def test_to_dataframe(self): self.assertEqual(df.name.dtype.name, "object") self.assertEqual(df.age.dtype.name, "int64") + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf(tqdm is None, "Requires `tqdm`") + @mock.patch("tqdm.tqdm_gui") + @mock.patch("tqdm.tqdm_notebook") + @mock.patch("tqdm.tqdm") + def test_to_dataframe_progress_bar( + self, tqdm_mock, tqdm_notebook_mock, tqdm_gui_mock + ): + from google.cloud.bigquery.table import RowIterator + from google.cloud.bigquery.table import SchemaField + + schema = [ + SchemaField("name", "STRING", mode="REQUIRED"), + SchemaField("age", "INTEGER", mode="REQUIRED"), + ] + rows = [ + {"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]}, + {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, + {"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}]}, + {"f": [{"v": "Bhettye Rhubble"}, {"v": "27"}]}, + ] + path = "/foo" + api_request = mock.Mock(return_value={"rows": rows}) + + progress_bars = ( + ("tqdm", tqdm_mock), + ("tqdm_notebook", tqdm_notebook_mock), + ("tqdm_gui", tqdm_gui_mock), + ) + + for progress_bar_type, progress_bar_mock in progress_bars: + row_iterator = RowIterator(_mock_client(), api_request, path, schema) + df = row_iterator.to_dataframe(progress_bar_type=progress_bar_type) + + progress_bar_mock.assert_called() + progress_bar_mock().update.assert_called() + self.assertEqual(len(df), 4) + + @unittest.skipIf(pandas is None, "Requires `pandas`") + @mock.patch("google.cloud.bigquery.table.tqdm", new=None) + def test_to_dataframe_no_tqdm_no_progress_bar(self): + from google.cloud.bigquery.table import RowIterator + from google.cloud.bigquery.table import SchemaField + + schema = [ + SchemaField("name", "STRING", mode="REQUIRED"), + SchemaField("age", "INTEGER", mode="REQUIRED"), + ] + rows = [ + {"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]}, + {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, + {"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}]}, + {"f": [{"v": "Bhettye Rhubble"}, {"v": "27"}]}, + ] + path = "/foo" + api_request = mock.Mock(return_value={"rows": rows}) + row_iterator = RowIterator(_mock_client(), api_request, path, schema) + + with warnings.catch_warnings(record=True) as warned: + df = row_iterator.to_dataframe() + + self.assertEqual(len(warned), 0) + self.assertEqual(len(df), 4) + + @unittest.skipIf(pandas is None, "Requires `pandas`") + @mock.patch("google.cloud.bigquery.table.tqdm", new=None) + def test_to_dataframe_no_tqdm(self): + from google.cloud.bigquery.table import RowIterator + from google.cloud.bigquery.table import SchemaField + + schema = [ + SchemaField("name", "STRING", mode="REQUIRED"), + SchemaField("age", "INTEGER", mode="REQUIRED"), + ] + rows = [ + {"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]}, + {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, + {"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}]}, + {"f": [{"v": "Bhettye Rhubble"}, {"v": "27"}]}, + ] + path = "/foo" + api_request = mock.Mock(return_value={"rows": rows}) + row_iterator = RowIterator(_mock_client(), api_request, path, schema) + + with warnings.catch_warnings(record=True) as warned: + df = row_iterator.to_dataframe(progress_bar_type="tqdm") + + self.assertEqual(len(warned), 1) + for warning in warned: + self.assertIs(warning.category, UserWarning) + + # Even though the progress bar won't show, downloading the dataframe + # should still work. + self.assertEqual(len(df), 4) + + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf(tqdm is None, "Requires `tqdm`") + @mock.patch("tqdm.tqdm_gui", new=None) # will raise TypeError on call + @mock.patch("tqdm.tqdm_notebook", new=None) # will raise TypeError on call + @mock.patch("tqdm.tqdm", new=None) # will raise TypeError on call + def test_to_dataframe_tqdm_error(self): + from google.cloud.bigquery.table import RowIterator + from google.cloud.bigquery.table import SchemaField + + schema = [ + SchemaField("name", "STRING", mode="REQUIRED"), + SchemaField("age", "INTEGER", mode="REQUIRED"), + ] + rows = [ + {"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]}, + {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, + {"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}]}, + {"f": [{"v": "Bhettye Rhubble"}, {"v": "27"}]}, + ] + path = "/foo" + + for progress_bar_type in ("tqdm", "tqdm_notebook", "tqdm_gui"): + api_request = mock.Mock(return_value={"rows": rows}) + row_iterator = RowIterator(_mock_client(), api_request, path, schema) + df = row_iterator.to_dataframe(progress_bar_type=progress_bar_type) + + self.assertEqual(len(df), 4) # all should be well + @unittest.skipIf(pandas is None, "Requires `pandas`") def test_to_dataframe_w_empty_results(self): from google.cloud.bigquery.table import RowIterator diff --git a/docs/conf.py b/docs/conf.py index 584ce42e952a..1ff88dbae6f6 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -317,13 +317,13 @@ # Configuration for intersphinx: intersphinx_mapping = { + 'fastavro': ('https://fastavro.readthedocs.io/en/stable/', None), 'google-auth': ('https://google-auth.readthedocs.io/en/stable', None), 'google-gax': ('https://gax-python.readthedocs.io/en/latest/', None), 'grpc': ('https://grpc.io/grpc/python/', None), - 'requests': ('http://docs.python-requests.org/en/master/', None), - 'fastavro': ('https://fastavro.readthedocs.io/en/stable/', None), 'pandas': ('https://pandas.pydata.org/pandas-docs/stable/', None), 'python': ('https://docs.python.org/3', None), + 'requests': ('http://docs.python-requests.org/en/master/', None), } # Static HTML pages, e.g. to support redirects