From 16f65e6ae15979217ceea6c6d398c9057a363a13 Mon Sep 17 00:00:00 2001 From: Jim Fulton Date: Tue, 24 Aug 2021 10:29:04 -0400 Subject: [PATCH] feat: Support using GeoPandas for GEOGRAPHY columns (#848) --- docs/conf.py | 2 + docs/usage/pandas.rst | 15 ++ google/cloud/bigquery/_pandas_helpers.py | 71 ++++++- google/cloud/bigquery/job/query.py | 119 ++++++++++- google/cloud/bigquery/table.py | 196 +++++++++++++++++- owlbot.py | 4 + samples/geography/requirements.txt | 44 ++++ samples/geography/to_geodataframe.py | 32 +++ samples/geography/to_geodataframe_test.py | 25 +++ setup.py | 1 + testing/constraints-3.6.txt | 4 +- tests/system/test_client.py | 3 - tests/system/test_pandas.py | 143 +++++++++++++ tests/unit/job/test_query_pandas.py | 130 ++++++++++-- tests/unit/test__pandas_helpers.py | 100 +++++++++ tests/unit/test_table.py | 242 ++++++++++++++++++++++ 16 files changed, 1102 insertions(+), 29 deletions(-) create mode 100644 samples/geography/to_geodataframe.py create mode 100644 samples/geography/to_geodataframe_test.py diff --git a/docs/conf.py b/docs/conf.py index 09f7ea414..59a2d8fb3 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -366,6 +366,8 @@ "grpc": ("https://grpc.github.io/grpc/python/", None), "proto-plus": ("https://proto-plus-python.readthedocs.io/en/latest/", None), "protobuf": ("https://googleapis.dev/python/protobuf/latest/", None), + "pandas": ("http://pandas.pydata.org/pandas-docs/dev", None), + "geopandas": ("https://geopandas.org/", None), } diff --git a/docs/usage/pandas.rst b/docs/usage/pandas.rst index 9db98dfbb..92eee67cf 100644 --- a/docs/usage/pandas.rst +++ b/docs/usage/pandas.rst @@ -37,6 +37,21 @@ To retrieve table rows as a :class:`pandas.DataFrame`: :start-after: [START bigquery_list_rows_dataframe] :end-before: [END bigquery_list_rows_dataframe] + +Retrieve BigQuery GEOGRAPHY data as a GeoPandas GeoDataFrame +------------------------------------------------------------ + +`GeoPandas `_ adds geospatial analytics +capabilities to Pandas. To retrieve query results containing +GEOGRAPHY data as a :class:`geopandas.GeoDataFrame`: + +.. literalinclude:: ../samples/geography/to_geodataframe.py + :language: python + :dedent: 4 + :start-after: [START bigquery_query_results_geodataframe] + :end-before: [END bigquery_query_results_geodataframe] + + Load a Pandas DataFrame to a BigQuery Table ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index f49980645..ab58b1729 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -24,6 +24,36 @@ import pandas except ImportError: # pragma: NO COVER pandas = None +else: + import numpy + +try: + # _BaseGeometry is used to detect shapely objevys in `bq_to_arrow_array` + from shapely.geometry.base import BaseGeometry as _BaseGeometry +except ImportError: # pragma: NO COVER + # No shapely, use NoneType for _BaseGeometry as a placeholder. + _BaseGeometry = type(None) +else: + if pandas is not None: # pragma: NO COVER + + def _to_wkb(): + # Create a closure that: + # - Adds a not-null check. This allows the returned function to + # be used directly with apply, unlike `shapely.wkb.dumps`. + # - Avoid extra work done by `shapely.wkb.dumps` that we don't need. + # - Caches the WKBWriter (and write method lookup :) ) + # - Avoids adding WKBWriter, lgeos, and notnull to the module namespace. + from shapely.geos import WKBWriter, lgeos + + write = WKBWriter(lgeos).write + notnull = pandas.notnull + + def _to_wkb(v): + return write(v) if notnull(v) else v + + return _to_wkb + + _to_wkb = _to_wkb() try: import pyarrow @@ -69,6 +99,7 @@ "uint8": "INTEGER", "uint16": "INTEGER", "uint32": "INTEGER", + "geometry": "GEOGRAPHY", } @@ -193,14 +224,16 @@ def bq_to_arrow_data_type(field): return data_type_constructor() -def bq_to_arrow_field(bq_field): +def bq_to_arrow_field(bq_field, array_type=None): """Return the Arrow field, corresponding to a given BigQuery column. Returns: None: if the Arrow type cannot be determined. """ arrow_type = bq_to_arrow_data_type(bq_field) - if arrow_type: + if arrow_type is not None: + if array_type is not None: + arrow_type = array_type # For GEOGRAPHY, at least initially is_nullable = bq_field.mode.upper() == "NULLABLE" return pyarrow.field(bq_field.name, arrow_type, nullable=is_nullable) @@ -225,7 +258,24 @@ def bq_to_arrow_schema(bq_schema): def bq_to_arrow_array(series, bq_field): - arrow_type = bq_to_arrow_data_type(bq_field) + if bq_field.field_type.upper() == "GEOGRAPHY": + arrow_type = None + first = _first_valid(series) + if first is not None: + if series.dtype.name == "geometry" or isinstance(first, _BaseGeometry): + arrow_type = pyarrow.binary() + # Convert shapey geometry to WKB binary format: + series = series.apply(_to_wkb) + elif isinstance(first, bytes): + arrow_type = pyarrow.binary() + elif series.dtype.name == "geometry": + # We have a GeoSeries containing all nulls, convert it to a pandas series + series = pandas.Series(numpy.array(series)) + + if arrow_type is None: + arrow_type = bq_to_arrow_data_type(bq_field) + else: + arrow_type = bq_to_arrow_data_type(bq_field) field_type_upper = bq_field.field_type.upper() if bq_field.field_type else "" @@ -279,6 +329,12 @@ def list_columns_and_indexes(dataframe): return columns_and_indexes +def _first_valid(series): + first_valid_index = series.first_valid_index() + if first_valid_index is not None: + return series.at[first_valid_index] + + def dataframe_to_bq_schema(dataframe, bq_schema): """Convert a pandas DataFrame schema to a BigQuery schema. @@ -319,6 +375,13 @@ def dataframe_to_bq_schema(dataframe, bq_schema): # Otherwise, try to automatically determine the type based on the # pandas dtype. bq_type = _PANDAS_DTYPE_TO_BQ.get(dtype.name) + if bq_type is None: + sample_data = _first_valid(dataframe[column]) + if ( + isinstance(sample_data, _BaseGeometry) + and sample_data is not None # Paranoia + ): + bq_type = "GEOGRAPHY" bq_field = schema.SchemaField(column, bq_type) bq_schema_out.append(bq_field) @@ -450,11 +513,11 @@ def dataframe_to_arrow(dataframe, bq_schema): arrow_names = [] arrow_fields = [] for bq_field in bq_schema: - arrow_fields.append(bq_to_arrow_field(bq_field)) arrow_names.append(bq_field.name) arrow_arrays.append( bq_to_arrow_array(get_column_or_index(dataframe, bq_field.name), bq_field) ) + arrow_fields.append(bq_to_arrow_field(bq_field, arrow_arrays[-1].type)) if all((field is not None for field in arrow_fields)): return pyarrow.Table.from_arrays( diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index 3ab47b0f9..0cb4798be 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -53,6 +53,7 @@ # Assumption: type checks are only used by library developers and CI environments # that have all optional dependencies installed, thus no conditional imports. import pandas + import geopandas import pyarrow from google.api_core import retry as retries from google.cloud import bigquery_storage @@ -1487,6 +1488,7 @@ def to_dataframe( create_bqstorage_client: bool = True, date_as_object: bool = True, max_results: Optional[int] = None, + geography_as_object: bool = False, ) -> "pandas.DataFrame": """Return a pandas DataFrame from a QueryJob @@ -1538,13 +1540,27 @@ def to_dataframe( .. versionadded:: 2.21.0 + geography_as_object (Optional[bool]): + If ``True``, convert GEOGRAPHY data to :mod:`shapely` + geometry objects. If ``False`` (default), don't cast + geography data to :mod:`shapely` geometry objects. + + .. versionadded:: 2.24.0 + Returns: - A :class:`~pandas.DataFrame` populated with row data and column - headers from the query results. The column headers are derived - from the destination table's schema. + pandas.DataFrame: + A :class:`~pandas.DataFrame` populated with row data + and column headers from the query results. The column + headers are derived from the destination table's + schema. Raises: - ValueError: If the `pandas` library cannot be imported. + ValueError: + If the :mod:`pandas` library cannot be imported, or + the :mod:`google.cloud.bigquery_storage_v1` module is + required but cannot be imported. Also if + `geography_as_object` is `True`, but the + :mod:`shapely` library cannot be imported. """ query_result = wait_for_query(self, progress_bar_type, max_results=max_results) return query_result.to_dataframe( @@ -1553,6 +1569,101 @@ def to_dataframe( progress_bar_type=progress_bar_type, create_bqstorage_client=create_bqstorage_client, date_as_object=date_as_object, + geography_as_object=geography_as_object, + ) + + # If changing the signature of this method, make sure to apply the same + # changes to table.RowIterator.to_dataframe(), except for the max_results parameter + # that should only exist here in the QueryJob method. + def to_geodataframe( + self, + bqstorage_client: "bigquery_storage.BigQueryReadClient" = None, + dtypes: Dict[str, Any] = None, + progress_bar_type: str = None, + create_bqstorage_client: bool = True, + date_as_object: bool = True, + max_results: Optional[int] = None, + geography_column: Optional[str] = None, + ) -> "geopandas.GeoDataFrame": + """Return a GeoPandas GeoDataFrame from a QueryJob + + Args: + bqstorage_client (Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient]): + A BigQuery Storage API client. If supplied, use the faster + BigQuery Storage API to fetch rows from BigQuery. This + API is a billable API. + + This method requires the ``fastavro`` and + ``google-cloud-bigquery-storage`` libraries. + + Reading from a specific partition or snapshot is not + currently supported by this method. + + dtypes (Optional[Map[str, Union[str, pandas.Series.dtype]]]): + A dictionary of column names pandas ``dtype``s. The provided + ``dtype`` is used when constructing the series for the column + specified. Otherwise, the default pandas behavior is used. + + progress_bar_type (Optional[str]): + If set, use the `tqdm `_ library to + display a progress bar while the data downloads. Install the + ``tqdm`` package to use this feature. + + See + :func:`~google.cloud.bigquery.table.RowIterator.to_dataframe` + for details. + + .. versionadded:: 1.11.0 + create_bqstorage_client (Optional[bool]): + If ``True`` (default), create a BigQuery Storage API client + using the default API settings. The BigQuery Storage API + is a faster way to fetch rows from BigQuery. See the + ``bqstorage_client`` parameter for more information. + + This argument does nothing if ``bqstorage_client`` is supplied. + + .. versionadded:: 1.24.0 + + date_as_object (Optional[bool]): + If ``True`` (default), cast dates to objects. If ``False``, convert + to datetime64[ns] dtype. + + .. versionadded:: 1.26.0 + + max_results (Optional[int]): + Maximum number of rows to include in the result. No limit by default. + + .. versionadded:: 2.21.0 + + geography_column (Optional[str]): + If there are more than one GEOGRAPHY column, + identifies which one to use to construct a GeoPandas + GeoDataFrame. This option can be ommitted if there's + only one GEOGRAPHY column. + + Returns: + geopandas.GeoDataFrame: + A :class:`geopandas.GeoDataFrame` populated with row + data and column headers from the query results. The + column headers are derived from the destination + table's schema. + + Raises: + ValueError: + If the :mod:`geopandas` library cannot be imported, or the + :mod:`google.cloud.bigquery_storage_v1` module is + required but cannot be imported. + + .. versionadded:: 2.24.0 + """ + query_result = wait_for_query(self, progress_bar_type, max_results=max_results) + return query_result.to_geodataframe( + bqstorage_client=bqstorage_client, + dtypes=dtypes, + progress_bar_type=progress_bar_type, + create_bqstorage_client=create_bqstorage_client, + date_as_object=date_as_object, + geography_column=geography_column, ) def __iter__(self): diff --git a/google/cloud/bigquery/table.py b/google/cloud/bigquery/table.py index 62f888001..609c0b57e 100644 --- a/google/cloud/bigquery/table.py +++ b/google/cloud/bigquery/table.py @@ -29,6 +29,20 @@ except ImportError: # pragma: NO COVER pandas = None +try: + import geopandas +except ImportError: + geopandas = None +else: + _COORDINATE_REFERENCE_SYSTEM = "EPSG:4326" + +try: + import shapely.geos +except ImportError: + shapely = None +else: + _read_wkt = shapely.geos.WKTReader(shapely.geos.lgeos).read + try: import pyarrow except ImportError: # pragma: NO COVER @@ -52,6 +66,7 @@ # Unconditionally import optional dependencies again to tell pytype that # they are not None, avoiding false "no attribute" errors. import pandas + import geopandas import pyarrow from google.cloud import bigquery_storage @@ -60,6 +75,14 @@ "The pandas library is not installed, please install " "pandas to use the to_dataframe() function." ) +_NO_GEOPANDAS_ERROR = ( + "The geopandas library is not installed, please install " + "geopandas to use the to_geodataframe() function." +) +_NO_SHAPELY_ERROR = ( + "The shapely library is not installed, please install " + "shapely to use the geography_as_object option." +) _NO_PYARROW_ERROR = ( "The pyarrow library is not installed, please install " "pyarrow to use the to_arrow() function." @@ -1878,6 +1901,7 @@ def to_dataframe( progress_bar_type: str = None, create_bqstorage_client: bool = True, date_as_object: bool = True, + geography_as_object: bool = False, ) -> "pandas.DataFrame": """Create a pandas DataFrame by loading all pages of a query. @@ -1933,6 +1957,13 @@ def to_dataframe( .. versionadded:: 1.26.0 + geography_as_object (Optional[bool]): + If ``True``, convert GEOGRAPHY data to :mod:`shapely` + geometry objects. If ``False`` (default), don't cast + geography data to :mod:`shapely` geometry objects. + + .. versionadded:: 2.24.0 + Returns: pandas.DataFrame: A :class:`~pandas.DataFrame` populated with row data and column @@ -1941,13 +1972,18 @@ def to_dataframe( Raises: ValueError: - If the :mod:`pandas` library cannot be imported, or the - :mod:`google.cloud.bigquery_storage_v1` module is - required but cannot be imported. + If the :mod:`pandas` library cannot be imported, or + the :mod:`google.cloud.bigquery_storage_v1` module is + required but cannot be imported. Also if + `geography_as_object` is `True`, but the + :mod:`shapely` library cannot be imported. """ if pandas is None: raise ValueError(_NO_PANDAS_ERROR) + if geography_as_object and shapely is None: + raise ValueError(_NO_SHAPELY_ERROR) + if dtypes is None: dtypes = {} @@ -1988,8 +2024,136 @@ def to_dataframe( for column in dtypes: df[column] = pandas.Series(df[column], dtype=dtypes[column]) + if geography_as_object: + for field in self.schema: + if field.field_type.upper() == "GEOGRAPHY": + df[field.name] = df[field.name].dropna().apply(_read_wkt) + return df + # If changing the signature of this method, make sure to apply the same + # changes to job.QueryJob.to_geodataframe() + def to_geodataframe( + self, + bqstorage_client: "bigquery_storage.BigQueryReadClient" = None, + dtypes: Dict[str, Any] = None, + progress_bar_type: str = None, + create_bqstorage_client: bool = True, + date_as_object: bool = True, + geography_column: Optional[str] = None, + ) -> "geopandas.GeoDataFrame": + """Create a GeoPandas GeoDataFrame by loading all pages of a query. + + Args: + bqstorage_client (Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient]): + A BigQuery Storage API client. If supplied, use the faster + BigQuery Storage API to fetch rows from BigQuery. + + This method requires the ``pyarrow`` and + ``google-cloud-bigquery-storage`` libraries. + + This method only exposes a subset of the capabilities of the + BigQuery Storage API. For full access to all features + (projections, filters, snapshots) use the Storage API directly. + + dtypes (Optional[Map[str, Union[str, pandas.Series.dtype]]]): + A dictionary of column names pandas ``dtype``s. The provided + ``dtype`` is used when constructing the series for the column + specified. Otherwise, the default pandas behavior is used. + progress_bar_type (Optional[str]): + If set, use the `tqdm `_ library to + display a progress bar while the data downloads. Install the + ``tqdm`` package to use this feature. + + Possible values of ``progress_bar_type`` include: + + ``None`` + No progress bar. + ``'tqdm'`` + Use the :func:`tqdm.tqdm` function to print a progress bar + to :data:`sys.stderr`. + ``'tqdm_notebook'`` + Use the :func:`tqdm.tqdm_notebook` function to display a + progress bar as a Jupyter notebook widget. + ``'tqdm_gui'`` + Use the :func:`tqdm.tqdm_gui` function to display a + progress bar as a graphical dialog box. + + create_bqstorage_client (Optional[bool]): + If ``True`` (default), create a BigQuery Storage API client + using the default API settings. The BigQuery Storage API + is a faster way to fetch rows from BigQuery. See the + ``bqstorage_client`` parameter for more information. + + This argument does nothing if ``bqstorage_client`` is supplied. + + date_as_object (Optional[bool]): + If ``True`` (default), cast dates to objects. If ``False``, convert + to datetime64[ns] dtype. + + geography_column (Optional[str]): + If there are more than one GEOGRAPHY column, + identifies which one to use to construct a geopandas + GeoDataFrame. This option can be ommitted if there's + only one GEOGRAPHY column. + + Returns: + geopandas.GeoDataFrame: + A :class:`geopandas.GeoDataFrame` populated with row + data and column headers from the query results. The + column headers are derived from the destination + table's schema. + + Raises: + ValueError: + If the :mod:`geopandas` library cannot be imported, or the + :mod:`google.cloud.bigquery_storage_v1` module is + required but cannot be imported. + + .. versionadded:: 2.24.0 + """ + if geopandas is None: + raise ValueError(_NO_GEOPANDAS_ERROR) + + geography_columns = set( + field.name + for field in self.schema + if field.field_type.upper() == "GEOGRAPHY" + ) + if not geography_columns: + raise TypeError( + "There must be at least one GEOGRAPHY column" + " to create a GeoDataFrame" + ) + + if geography_column: + if geography_column not in geography_columns: + raise ValueError( + f"The given geography column, {geography_column}, doesn't name" + f" a GEOGRAPHY column in the result." + ) + elif len(geography_columns) == 1: + [geography_column] = geography_columns + else: + raise ValueError( + "There is more than one GEOGRAPHY column in the result. " + "The geography_column argument must be used to specify which " + "one to use to create a GeoDataFrame" + ) + + df = self.to_dataframe( + bqstorage_client, + dtypes, + progress_bar_type, + create_bqstorage_client, + date_as_object, + geography_as_object=True, + ) + + return geopandas.GeoDataFrame( + df, crs=_COORDINATE_REFERENCE_SYSTEM, geometry=geography_column + ) + class _EmptyRowIterator(RowIterator): """An empty row iterator. @@ -2042,6 +2206,7 @@ def to_dataframe( progress_bar_type=None, create_bqstorage_client=True, date_as_object=True, + geography_as_object=False, ) -> "pandas.DataFrame": """Create an empty dataframe. @@ -2059,6 +2224,31 @@ def to_dataframe( raise ValueError(_NO_PANDAS_ERROR) return pandas.DataFrame() + def to_geodataframe( + self, + bqstorage_client=None, + dtypes=None, + progress_bar_type=None, + create_bqstorage_client=True, + date_as_object=True, + geography_column: Optional[str] = None, + ) -> "pandas.DataFrame": + """Create an empty dataframe. + + Args: + bqstorage_client (Any): Ignored. Added for compatibility with RowIterator. + dtypes (Any): Ignored. Added for compatibility with RowIterator. + progress_bar_type (Any): Ignored. Added for compatibility with RowIterator. + create_bqstorage_client (bool): Ignored. Added for compatibility with RowIterator. + date_as_object (bool): Ignored. Added for compatibility with RowIterator. + + Returns: + pandas.DataFrame: An empty :class:`~pandas.DataFrame`. + """ + if geopandas is None: + raise ValueError(_NO_GEOPANDAS_ERROR) + return geopandas.GeoDataFrame(crs=_COORDINATE_REFERENCE_SYSTEM) + def to_dataframe_iterable( self, bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None, diff --git a/owlbot.py b/owlbot.py index 09845480a..ea9904cdb 100644 --- a/owlbot.py +++ b/owlbot.py @@ -97,6 +97,10 @@ samples=True, microgenerator=True, split_system_tests=True, + intersphinx_dependencies={ + "pandas": 'http://pandas.pydata.org/pandas-docs/dev', + "geopandas": "https://geopandas.org/", + } ) # BigQuery has a custom multiprocessing note diff --git a/samples/geography/requirements.txt b/samples/geography/requirements.txt index ac804c81c..7a76b4033 100644 --- a/samples/geography/requirements.txt +++ b/samples/geography/requirements.txt @@ -1,4 +1,48 @@ +attrs==21.2.0 +cachetools==4.2.2 +certifi==2021.5.30 +cffi==1.14.6 +charset-normalizer==2.0.4 +click==8.0.1 +click-plugins==1.1.1 +cligj==0.7.2 +dataclasses==0.6; python_version < '3.7' +Fiona==1.8.20 geojson==2.5.0 +geopandas==0.9.0 +google-api-core==1.31.2 +google-auth==1.35.0 google-cloud-bigquery==2.24.1 google-cloud-bigquery-storage==2.6.3 +google-cloud-core==1.7.2 +google-crc32c==1.1.2 +google-resumable-media==1.3.3 +googleapis-common-protos==1.53.0 +grpcio==1.39.0 +idna==3.2 +importlib-metadata==4.6.4 +libcst==0.3.20 +munch==2.5.0 +mypy-extensions==0.4.3 +numpy==1.19.5 +packaging==21.0 +pandas==1.1.5 +proto-plus==1.19.0 +protobuf==3.17.3 +pyarrow==5.0.0 +pyasn1==0.4.8 +pyasn1-modules==0.2.8 +pycparser==2.20 +pyparsing==2.4.7 +pyproj==3.0.1 +python-dateutil==2.8.2 +pytz==2021.1 +PyYAML==5.4.1 +requests==2.26.0 +rsa==4.7.2 Shapely==1.7.1 +six==1.16.0 +typing-extensions==3.10.0.0 +typing-inspect==0.7.1 +urllib3==1.26.6 +zipp==3.5.0 diff --git a/samples/geography/to_geodataframe.py b/samples/geography/to_geodataframe.py new file mode 100644 index 000000000..fa8073fef --- /dev/null +++ b/samples/geography/to_geodataframe.py @@ -0,0 +1,32 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from google.cloud import bigquery + +client = bigquery.Client() + + +def get_austin_service_requests_as_geography(): + # [START bigquery_query_results_geodataframe] + + sql = """ + SELECT created_date, complaint_description, + ST_GEOGPOINT(longitude, latitude) as location + FROM bigquery-public-data.austin_311.311_service_requests + LIMIT 10 + """ + + df = client.query(sql).to_geodataframe() + # [END bigquery_query_results_geodataframe] + return df diff --git a/samples/geography/to_geodataframe_test.py b/samples/geography/to_geodataframe_test.py new file mode 100644 index 000000000..7a2ba6937 --- /dev/null +++ b/samples/geography/to_geodataframe_test.py @@ -0,0 +1,25 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from .to_geodataframe import get_austin_service_requests_as_geography + + +def test_get_austin_service_requests_as_geography(): + geopandas = pytest.importorskip("geopandas") + df = get_austin_service_requests_as_geography() + assert isinstance(df, geopandas.GeoDataFrame) + assert len(list(df)) == 3 # verify the number of columns + assert len(df) == 10 # verify the number of rows diff --git a/setup.py b/setup.py index a1b3b61a0..e7515493d 100644 --- a/setup.py +++ b/setup.py @@ -56,6 +56,7 @@ "grpcio >= 1.38.1, < 2.0dev", "pyarrow >= 3.0.0, < 6.0dev", ], + "geopandas": ["geopandas>=0.9.0, <1.0dev", "Shapely>=1.6.0, <2.0dev"], "pandas": ["pandas>=0.23.0", "pyarrow >= 3.0.0, < 6.0dev"], "bignumeric_type": ["pyarrow >= 3.0.0, < 6.0dev"], "tqdm": ["tqdm >= 4.7.4, <5.0.0dev"], diff --git a/testing/constraints-3.6.txt b/testing/constraints-3.6.txt index ce012f0d7..be1a992fa 100644 --- a/testing/constraints-3.6.txt +++ b/testing/constraints-3.6.txt @@ -5,6 +5,7 @@ # # e.g., if setup.py has "foo >= 1.14.0, < 2.0.0dev", # Then this file should have foo==1.14.0 +geopandas==0.9.0 google-api-core==1.29.0 google-cloud-bigquery-storage==2.0.0 google-cloud-core==1.4.1 @@ -13,10 +14,11 @@ grpcio==1.38.1 opentelemetry-api==0.11b0 opentelemetry-instrumentation==0.11b0 opentelemetry-sdk==0.11b0 -pandas==0.23.0 +pandas==0.24.2 proto-plus==1.10.0 protobuf==3.12.0 pyarrow==3.0.0 requests==2.18.0 +shapely==1.6.0 six==1.13.0 tqdm==4.7.4 diff --git a/tests/system/test_client.py b/tests/system/test_client.py index 4250111b4..9da45ee6e 100644 --- a/tests/system/test_client.py +++ b/tests/system/test_client.py @@ -2360,9 +2360,6 @@ def test_create_table_rows_fetch_nested_schema(self): self.assertEqual(found[7], e_favtime) self.assertEqual(found[8], decimal.Decimal(expected["FavoriteNumber"])) - def _fetch_dataframe(self, query): - return Config.CLIENT.query(query).result().to_dataframe() - @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") @unittest.skipIf( bigquery_storage is None, "Requires `google-cloud-bigquery-storage`" diff --git a/tests/system/test_pandas.py b/tests/system/test_pandas.py index 371dcea71..836f93210 100644 --- a/tests/system/test_pandas.py +++ b/tests/system/test_pandas.py @@ -798,3 +798,146 @@ def test_list_rows_max_results_w_bqstorage(bigquery_client): dataframe = row_iterator.to_dataframe(bqstorage_client=bqstorage_client) assert len(dataframe.index) == 100 + + +def test_to_dataframe_geography_as_objects(bigquery_client, dataset_id): + wkt = pytest.importorskip("shapely.wkt") + bigquery_client.query( + f"create table {dataset_id}.lake (name string, geog geography)" + ).result() + bigquery_client.query( + f""" + insert into {dataset_id}.lake (name, geog) values + ('foo', st_geogfromtext('point(0 0)')), + ('bar', st_geogfromtext('point(0 1)')), + ('baz', null) + """ + ).result() + df = bigquery_client.query( + f"select * from {dataset_id}.lake order by name" + ).to_dataframe(geography_as_object=True) + assert list(df["name"]) == ["bar", "baz", "foo"] + assert df["geog"][0] == wkt.loads("point(0 1)") + assert pandas.isna(df["geog"][1]) + assert df["geog"][2] == wkt.loads("point(0 0)") + + +def test_to_geodataframe(bigquery_client, dataset_id): + geopandas = pytest.importorskip("geopandas") + from shapely import wkt + + bigquery_client.query( + f"create table {dataset_id}.geolake (name string, geog geography)" + ).result() + bigquery_client.query( + f""" + insert into {dataset_id}.geolake (name, geog) values + ('foo', st_geogfromtext('point(0 0)')), + ('bar', st_geogfromtext('polygon((0 0, 1 0, 1 1, 0 0))')), + ('baz', null) + """ + ).result() + df = bigquery_client.query( + f"select * from {dataset_id}.geolake order by name" + ).to_geodataframe() + assert df["geog"][0] == wkt.loads("polygon((0 0, 1 0, 1 1, 0 0))") + assert pandas.isna(df["geog"][1]) + assert df["geog"][2] == wkt.loads("point(0 0)") + assert isinstance(df, geopandas.GeoDataFrame) + assert isinstance(df["geog"], geopandas.GeoSeries) + assert df.area[0] == 0.5 + assert pandas.isna(df.area[1]) + assert df.area[2] == 0.0 + assert df.crs.srs == "EPSG:4326" + assert df.crs.name == "WGS 84" + assert df.geog.crs.srs == "EPSG:4326" + assert df.geog.crs.name == "WGS 84" + + +def test_load_geodataframe(bigquery_client, dataset_id): + geopandas = pytest.importorskip("geopandas") + import pandas + from shapely import wkt + from google.cloud.bigquery.schema import SchemaField + + df = geopandas.GeoDataFrame( + pandas.DataFrame( + dict( + name=["foo", "bar"], + geo1=[None, None], + geo2=[None, wkt.loads("Point(1 1)")], + ) + ), + geometry="geo1", + ) + + table_id = f"{dataset_id}.lake_from_gp" + bigquery_client.load_table_from_dataframe(df, table_id).result() + + table = bigquery_client.get_table(table_id) + assert table.schema == [ + SchemaField("name", "STRING", "NULLABLE"), + SchemaField("geo1", "GEOGRAPHY", "NULLABLE"), + SchemaField("geo2", "GEOGRAPHY", "NULLABLE"), + ] + assert sorted(map(list, bigquery_client.list_rows(table_id))) == [ + ["bar", None, "POINT(1 1)"], + ["foo", None, None], + ] + + +def test_load_dataframe_w_shapely(bigquery_client, dataset_id): + wkt = pytest.importorskip("shapely.wkt") + from google.cloud.bigquery.schema import SchemaField + + df = pandas.DataFrame( + dict(name=["foo", "bar"], geo=[None, wkt.loads("Point(1 1)")]) + ) + + table_id = f"{dataset_id}.lake_from_shapes" + bigquery_client.load_table_from_dataframe(df, table_id).result() + + table = bigquery_client.get_table(table_id) + assert table.schema == [ + SchemaField("name", "STRING", "NULLABLE"), + SchemaField("geo", "GEOGRAPHY", "NULLABLE"), + ] + assert sorted(map(list, bigquery_client.list_rows(table_id))) == [ + ["bar", "POINT(1 1)"], + ["foo", None], + ] + + bigquery_client.load_table_from_dataframe(df, table_id).result() + assert sorted(map(list, bigquery_client.list_rows(table_id))) == [ + ["bar", "POINT(1 1)"], + ["bar", "POINT(1 1)"], + ["foo", None], + ["foo", None], + ] + + +def test_load_dataframe_w_wkb(bigquery_client, dataset_id): + wkt = pytest.importorskip("shapely.wkt") + from shapely import wkb + from google.cloud.bigquery.schema import SchemaField + + df = pandas.DataFrame( + dict(name=["foo", "bar"], geo=[None, wkb.dumps(wkt.loads("Point(1 1)"))]) + ) + + table_id = f"{dataset_id}.lake_from_wkb" + # We create the table first, to inform the interpretation of the wkb data + bigquery_client.query( + f"create table {table_id} (name string, geo GEOGRAPHY)" + ).result() + bigquery_client.load_table_from_dataframe(df, table_id).result() + + table = bigquery_client.get_table(table_id) + assert table.schema == [ + SchemaField("name", "STRING", "NULLABLE"), + SchemaField("geo", "GEOGRAPHY", "NULLABLE"), + ] + assert sorted(map(list, bigquery_client.list_rows(table_id))) == [ + ["bar", "POINT(1 1)"], + ["foo", None], + ] diff --git a/tests/unit/job/test_query_pandas.py b/tests/unit/job/test_query_pandas.py index c537802f4..b5af90c0b 100644 --- a/tests/unit/job/test_query_pandas.py +++ b/tests/unit/job/test_query_pandas.py @@ -23,6 +23,14 @@ import pandas except (ImportError, AttributeError): # pragma: NO COVER pandas = None +try: + import shapely +except (ImportError, AttributeError): # pragma: NO COVER + shapely = None +try: + import geopandas +except (ImportError, AttributeError): # pragma: NO COVER + geopandas = None try: import pyarrow except (ImportError, AttributeError): # pragma: NO COVER @@ -425,38 +433,41 @@ def test_to_arrow_w_tqdm_wo_query_plan(): result_patch_tqdm.assert_called() -@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") -def test_to_dataframe(): +def _make_job(schema=(), rows=()): from google.cloud.bigquery.job import QueryJob as target_class begun_resource = _make_job_resource(job_type="query") query_resource = { "jobComplete": True, "jobReference": begun_resource["jobReference"], - "totalRows": "4", + "totalRows": str(len(rows)), "schema": { "fields": [ - {"name": "name", "type": "STRING", "mode": "NULLABLE"}, - {"name": "age", "type": "INTEGER", "mode": "NULLABLE"}, + dict(name=field[0], type=field[1], mode=field[2]) for field in schema ] }, } - tabledata_resource = { - "rows": [ - {"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]}, - {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, - {"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}]}, - {"f": [{"v": "Bhettye Rhubble"}, {"v": "27"}]}, - ] - } + tabledata_resource = {"rows": [{"f": [{"v": v} for v in row]} for row in rows]} done_resource = copy.deepcopy(begun_resource) done_resource["status"] = {"state": "DONE"} connection = _make_connection( begun_resource, query_resource, done_resource, tabledata_resource ) client = _make_client(connection=connection) - job = target_class.from_api_repr(begun_resource, client) + return target_class.from_api_repr(begun_resource, client) + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +def test_to_dataframe(): + job = _make_job( + (("name", "STRING", "NULLABLE"), ("age", "INTEGER", "NULLABLE")), + ( + ("Phred Phlyntstone", "32"), + ("Bharney Rhubble", "33"), + ("Wylma Phlyntstone", "29"), + ("Bhettye Rhubble", "27"), + ), + ) df = job.to_dataframe(create_bqstorage_client=False) assert isinstance(df, pandas.DataFrame) @@ -868,3 +879,94 @@ def test_to_dataframe_w_tqdm_max_results(): result_patch_tqdm.assert_called_with( timeout=_PROGRESS_BAR_UPDATE_INTERVAL, max_results=3 ) + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +@pytest.mark.skipif(shapely is None, reason="Requires `shapely`") +def test_to_dataframe_geography_as_object(): + job = _make_job( + (("name", "STRING", "NULLABLE"), ("geog", "GEOGRAPHY", "NULLABLE")), + ( + ("Phred Phlyntstone", "Point(0 0)"), + ("Bharney Rhubble", "Point(0 1)"), + ("Wylma Phlyntstone", None), + ), + ) + df = job.to_dataframe(create_bqstorage_client=False, geography_as_object=True) + + assert isinstance(df, pandas.DataFrame) + assert len(df) == 3 # verify the number of rows + assert list(df) == ["name", "geog"] # verify the column names + assert [v.__class__.__name__ for v in df.geog] == [ + "Point", + "Point", + "float", + ] # float because nan + + +@pytest.mark.skipif(geopandas is None, reason="Requires `geopandas`") +def test_to_geodataframe(): + job = _make_job( + (("name", "STRING", "NULLABLE"), ("geog", "GEOGRAPHY", "NULLABLE")), + ( + ("Phred Phlyntstone", "Point(0 0)"), + ("Bharney Rhubble", "Point(0 1)"), + ("Wylma Phlyntstone", None), + ), + ) + df = job.to_geodataframe(create_bqstorage_client=False) + + assert isinstance(df, geopandas.GeoDataFrame) + assert len(df) == 3 # verify the number of rows + assert list(df) == ["name", "geog"] # verify the column names + assert [v.__class__.__name__ for v in df.geog] == [ + "Point", + "Point", + "NoneType", + ] # float because nan + assert isinstance(df.geog, geopandas.GeoSeries) + + +@pytest.mark.skipif(geopandas is None, reason="Requires `geopandas`") +@mock.patch("google.cloud.bigquery.job.query.wait_for_query") +def test_query_job_to_geodataframe_delegation(wait_for_query): + """ + QueryJob.to_geodataframe just delegates to RowIterator.to_geodataframe. + + This test just demonstrates that. We don't need to test all the + variations, which are tested for RowIterator. + """ + import numpy + + job = _make_job() + bqstorage_client = object() + dtypes = dict(xxx=numpy.dtype("int64")) + progress_bar_type = "normal" + create_bqstorage_client = False + date_as_object = False + max_results = 42 + geography_column = "g" + + df = job.to_geodataframe( + bqstorage_client=bqstorage_client, + dtypes=dtypes, + progress_bar_type=progress_bar_type, + create_bqstorage_client=create_bqstorage_client, + date_as_object=date_as_object, + max_results=max_results, + geography_column=geography_column, + ) + + wait_for_query.assert_called_once_with( + job, progress_bar_type, max_results=max_results + ) + row_iterator = wait_for_query.return_value + row_iterator.to_geodataframe.assert_called_once_with( + bqstorage_client=bqstorage_client, + dtypes=dtypes, + progress_bar_type=progress_bar_type, + create_bqstorage_client=create_bqstorage_client, + date_as_object=date_as_object, + geography_column=geography_column, + ) + assert df is row_iterator.to_geodataframe.return_value diff --git a/tests/unit/test__pandas_helpers.py b/tests/unit/test__pandas_helpers.py index b9cb56572..a9b0ae21f 100644 --- a/tests/unit/test__pandas_helpers.py +++ b/tests/unit/test__pandas_helpers.py @@ -36,6 +36,11 @@ # Mock out pyarrow when missing, because methods from pyarrow.types are # used in test parameterization. pyarrow = mock.Mock() +try: + import geopandas +except ImportError: # pragma: NO COVER + geopandas = None + import pytest from google import api_core @@ -584,6 +589,60 @@ def test_bq_to_arrow_array_w_special_floats(module_under_test): assert roundtrip[3] is None +@pytest.mark.skipif(geopandas is None, reason="Requires `geopandas`") +@pytest.mark.skipif(isinstance(pyarrow, mock.Mock), reason="Requires `pyarrow`") +def test_bq_to_arrow_array_w_geography_dtype(module_under_test): + from shapely import wkb, wkt + + bq_field = schema.SchemaField("field_name", "GEOGRAPHY") + + series = geopandas.GeoSeries([None, wkt.loads("point(0 0)")]) + array = module_under_test.bq_to_arrow_array(series, bq_field) + # The result is binary, because we use wkb format + assert array.type == pyarrow.binary() + assert array.to_pylist() == [None, wkb.dumps(series[1])] + + # All na: + series = geopandas.GeoSeries([None, None]) + array = module_under_test.bq_to_arrow_array(series, bq_field) + assert array.type == pyarrow.string() + assert array.to_pylist() == list(series) + + +@pytest.mark.skipif(geopandas is None, reason="Requires `geopandas`") +@pytest.mark.skipif(isinstance(pyarrow, mock.Mock), reason="Requires `pyarrow`") +def test_bq_to_arrow_array_w_geography_type_shapely_data(module_under_test): + from shapely import wkb, wkt + + bq_field = schema.SchemaField("field_name", "GEOGRAPHY") + + series = pandas.Series([None, wkt.loads("point(0 0)")]) + array = module_under_test.bq_to_arrow_array(series, bq_field) + # The result is binary, because we use wkb format + assert array.type == pyarrow.binary() + assert array.to_pylist() == [None, wkb.dumps(series[1])] + + # All na: + series = pandas.Series([None, None]) + array = module_under_test.bq_to_arrow_array(series, bq_field) + assert array.type == pyarrow.string() + assert array.to_pylist() == list(series) + + +@pytest.mark.skipif(geopandas is None, reason="Requires `geopandas`") +@pytest.mark.skipif(isinstance(pyarrow, mock.Mock), reason="Requires `pyarrow`") +def test_bq_to_arrow_array_w_geography_type_wkb_data(module_under_test): + from shapely import wkb, wkt + + bq_field = schema.SchemaField("field_name", "GEOGRAPHY") + + series = pandas.Series([None, wkb.dumps(wkt.loads("point(0 0)"))]) + array = module_under_test.bq_to_arrow_array(series, bq_field) + # The result is binary, because we use wkb format + assert array.type == pyarrow.binary() + assert array.to_pylist() == list(series) + + @pytest.mark.skipif(isinstance(pyarrow, mock.Mock), reason="Requires `pyarrow`") def test_bq_to_arrow_schema_w_unknown_type(module_under_test): fields = ( @@ -1158,6 +1217,28 @@ def test_dataframe_to_bq_schema_pyarrow_fallback_fails(module_under_test): assert "struct_field" in str(expected_warnings[0]) +@pytest.mark.skipif(geopandas is None, reason="Requires `geopandas`") +def test_dataframe_to_bq_schema_geography(module_under_test): + from shapely import wkt + + df = geopandas.GeoDataFrame( + pandas.DataFrame( + dict( + name=["foo", "bar"], + geo1=[None, None], + geo2=[None, wkt.loads("Point(1 1)")], + ) + ), + geometry="geo1", + ) + bq_schema = module_under_test.dataframe_to_bq_schema(df, []) + assert bq_schema == ( + schema.SchemaField("name", "STRING"), + schema.SchemaField("geo1", "GEOGRAPHY"), + schema.SchemaField("geo2", "GEOGRAPHY"), + ) + + @pytest.mark.skipif(pandas is None, reason="Requires `pandas`") @pytest.mark.skipif(isinstance(pyarrow, mock.Mock), reason="Requires `pyarrow`") def test_augment_schema_type_detection_succeeds(module_under_test): @@ -1554,3 +1635,22 @@ def test_download_dataframe_row_iterator_dict_sequence_schema(module_under_test) def test_table_data_listpage_to_dataframe_skips_stop_iteration(module_under_test): dataframe = module_under_test._row_iterator_page_to_dataframe([], [], {}) assert isinstance(dataframe, pandas.DataFrame) + + +@pytest.mark.skipif(isinstance(pyarrow, mock.Mock), reason="Requires `pyarrow`") +def test_bq_to_arrow_field_type_override(module_under_test): + # When loading pandas data, we may need to override the type + # decision based on data contents, because GEOGRAPHY data can be + # stored as either text or binary. + + assert ( + module_under_test.bq_to_arrow_field(schema.SchemaField("g", "GEOGRAPHY")).type + == pyarrow.string() + ) + + assert ( + module_under_test.bq_to_arrow_field( + schema.SchemaField("g", "GEOGRAPHY"), pyarrow.binary(), + ).type + == pyarrow.binary() + ) diff --git a/tests/unit/test_table.py b/tests/unit/test_table.py index 50d573345..1ce930ee4 100644 --- a/tests/unit/test_table.py +++ b/tests/unit/test_table.py @@ -14,6 +14,7 @@ import datetime import logging +import re import time import types import unittest @@ -39,6 +40,11 @@ except (ImportError, AttributeError): # pragma: NO COVER pandas = None +try: + import geopandas +except (ImportError, AttributeError): # pragma: NO COVER + geopandas = None + try: import pyarrow import pyarrow.types @@ -1842,6 +1848,27 @@ def test_to_dataframe_iterable(self): self.assertEqual(len(df), 0) # Verify the number of rows. self.assertEqual(len(df.columns), 0) + @mock.patch("google.cloud.bigquery.table.geopandas", new=None) + def test_to_geodataframe_if_geopandas_is_none(self): + row_iterator = self._make_one() + with self.assertRaisesRegex( + ValueError, + re.escape( + "The geopandas library is not installed, please install " + "geopandas to use the to_geodataframe() function." + ), + ): + row_iterator.to_geodataframe(create_bqstorage_client=False) + + @unittest.skipIf(geopandas is None, "Requires `geopandas`") + def test_to_geodataframe(self): + row_iterator = self._make_one() + df = row_iterator.to_geodataframe(create_bqstorage_client=False) + self.assertIsInstance(df, geopandas.GeoDataFrame) + self.assertEqual(len(df), 0) # verify the number of rows + self.assertEqual(df.crs.srs, "EPSG:4326") + self.assertEqual(df.crs.name, "WGS 84") + class TestRowIterator(unittest.TestCase): def _class_under_test(self): @@ -1879,6 +1906,16 @@ def _make_one( client, api_request, path, schema, table=table, **kwargs ) + def _make_one_from_data(self, schema=(), rows=()): + from google.cloud.bigquery.schema import SchemaField + + schema = [SchemaField(*a) for a in schema] + rows = [{"f": [{"v": v} for v in row]} for row in rows] + + path = "/foo" + api_request = mock.Mock(return_value={"rows": rows}) + return self._make_one(_mock_client(), api_request, path, schema) + def test_constructor(self): from google.cloud.bigquery.table import _item_to_row from google.cloud.bigquery.table import _rows_page_start @@ -3170,6 +3207,18 @@ def test_to_dataframe_error_if_pandas_is_none(self): with self.assertRaises(ValueError): row_iterator.to_dataframe() + @unittest.skipIf(pandas is None, "Requires `pandas`") + @mock.patch("google.cloud.bigquery.table.shapely", new=None) + def test_to_dataframe_error_if_shapely_is_none(self): + with self.assertRaisesRegex( + ValueError, + re.escape( + "The shapely library is not installed, please install " + "shapely to use the geography_as_object option." + ), + ): + self._make_one_from_data().to_dataframe(geography_as_object=True) + @unittest.skipIf(pandas is None, "Requires `pandas`") def test_to_dataframe_max_results_w_bqstorage_warning(self): from google.cloud.bigquery.schema import SchemaField @@ -3927,6 +3976,199 @@ def test_to_dataframe_concat_categorical_dtype_w_pyarrow(self): # Don't close the client if it was passed in. bqstorage_client._transport.grpc_channel.close.assert_not_called() + @unittest.skipIf(geopandas is None, "Requires `geopandas`") + def test_to_dataframe_geography_as_object(self): + row_iterator = self._make_one_from_data( + (("name", "STRING"), ("geog", "GEOGRAPHY")), + ( + ("foo", "Point(0 0)"), + ("bar", None), + ("baz", "Polygon((0 0, 0 1, 1 0, 0 0))"), + ), + ) + df = row_iterator.to_dataframe( + create_bqstorage_client=False, geography_as_object=True, + ) + self.assertIsInstance(df, pandas.DataFrame) + self.assertEqual(len(df), 3) # verify the number of rows + self.assertEqual(list(df), ["name", "geog"]) # verify the column names + self.assertEqual(df.name.dtype.name, "object") + self.assertEqual(df.geog.dtype.name, "object") + self.assertIsInstance(df.geog, pandas.Series) + self.assertEqual( + [v.__class__.__name__ for v in df.geog], ["Point", "float", "Polygon"] + ) + + @mock.patch("google.cloud.bigquery.table.geopandas", new=None) + def test_to_geodataframe_error_if_geopandas_is_none(self): + with self.assertRaisesRegex( + ValueError, + re.escape( + "The geopandas library is not installed, please install " + "geopandas to use the to_geodataframe() function." + ), + ): + self._make_one_from_data().to_geodataframe() + + @unittest.skipIf(geopandas is None, "Requires `geopandas`") + def test_to_geodataframe(self): + row_iterator = self._make_one_from_data( + (("name", "STRING"), ("geog", "GEOGRAPHY")), + ( + ("foo", "Point(0 0)"), + ("bar", None), + ("baz", "Polygon((0 0, 0 1, 1 0, 0 0))"), + ), + ) + df = row_iterator.to_geodataframe(create_bqstorage_client=False) + self.assertIsInstance(df, geopandas.GeoDataFrame) + self.assertEqual(len(df), 3) # verify the number of rows + self.assertEqual(list(df), ["name", "geog"]) # verify the column names + self.assertEqual(df.name.dtype.name, "object") + self.assertEqual(df.geog.dtype.name, "geometry") + self.assertIsInstance(df.geog, geopandas.GeoSeries) + self.assertEqual(list(map(str, df.area)), ["0.0", "nan", "0.5"]) + self.assertEqual(list(map(str, df.geog.area)), ["0.0", "nan", "0.5"]) + self.assertEqual(df.crs.srs, "EPSG:4326") + self.assertEqual(df.crs.name, "WGS 84") + self.assertEqual(df.geog.crs.srs, "EPSG:4326") + self.assertEqual(df.geog.crs.name, "WGS 84") + + @unittest.skipIf(geopandas is None, "Requires `geopandas`") + def test_to_geodataframe_ambiguous_geog(self): + row_iterator = self._make_one_from_data( + (("name", "STRING"), ("geog", "GEOGRAPHY"), ("geog2", "GEOGRAPHY")), () + ) + with self.assertRaisesRegex( + ValueError, + re.escape( + "There is more than one GEOGRAPHY column in the result. " + "The geography_column argument must be used to specify which " + "one to use to create a GeoDataFrame" + ), + ): + row_iterator.to_geodataframe(create_bqstorage_client=False) + + @unittest.skipIf(geopandas is None, "Requires `geopandas`") + def test_to_geodataframe_bad_geography_column(self): + row_iterator = self._make_one_from_data( + (("name", "STRING"), ("geog", "GEOGRAPHY"), ("geog2", "GEOGRAPHY")), () + ) + with self.assertRaisesRegex( + ValueError, + re.escape( + "The given geography column, xxx, doesn't name" + " a GEOGRAPHY column in the result." + ), + ): + row_iterator.to_geodataframe( + create_bqstorage_client=False, geography_column="xxx" + ) + + @unittest.skipIf(geopandas is None, "Requires `geopandas`") + def test_to_geodataframe_no_geog(self): + row_iterator = self._make_one_from_data( + (("name", "STRING"), ("geog", "STRING")), () + ) + with self.assertRaisesRegex( + TypeError, + re.escape( + "There must be at least one GEOGRAPHY column" + " to create a GeoDataFrame" + ), + ): + row_iterator.to_geodataframe(create_bqstorage_client=False) + + @unittest.skipIf(geopandas is None, "Requires `geopandas`") + def test_to_geodataframe_w_geography_column(self): + row_iterator = self._make_one_from_data( + (("name", "STRING"), ("geog", "GEOGRAPHY"), ("geog2", "GEOGRAPHY")), + ( + ("foo", "Point(0 0)", "Point(1 1)"), + ("bar", None, "Point(2 2)"), + ("baz", "Polygon((0 0, 0 1, 1 0, 0 0))", "Point(3 3)"), + ), + ) + df = row_iterator.to_geodataframe( + create_bqstorage_client=False, geography_column="geog" + ) + self.assertIsInstance(df, geopandas.GeoDataFrame) + self.assertEqual(len(df), 3) # verify the number of rows + self.assertEqual(list(df), ["name", "geog", "geog2"]) # verify the column names + self.assertEqual(df.name.dtype.name, "object") + self.assertEqual(df.geog.dtype.name, "geometry") + self.assertEqual(df.geog2.dtype.name, "object") + self.assertIsInstance(df.geog, geopandas.GeoSeries) + self.assertEqual(list(map(str, df.area)), ["0.0", "nan", "0.5"]) + self.assertEqual(list(map(str, df.geog.area)), ["0.0", "nan", "0.5"]) + self.assertEqual( + [v.__class__.__name__ for v in df.geog], ["Point", "NoneType", "Polygon"] + ) + + # Geog2 isn't a GeoSeries, but it contains geomentries: + self.assertIsInstance(df.geog2, pandas.Series) + self.assertEqual( + [v.__class__.__name__ for v in df.geog2], ["Point", "Point", "Point"] + ) + # and can easily be converted to a GeoSeries + self.assertEqual( + list(map(str, geopandas.GeoSeries(df.geog2).area)), ["0.0", "0.0", "0.0"] + ) + + @unittest.skipIf(geopandas is None, "Requires `geopandas`") + @mock.patch("google.cloud.bigquery.table.RowIterator.to_dataframe") + def test_rowiterator_to_geodataframe_delegation(self, to_dataframe): + """ + RowIterator.to_geodataframe just delegates to RowIterator.to_dataframe. + + This test just demonstrates that. We don't need to test all the + variations, which are tested for to_dataframe. + """ + import numpy + from shapely import wkt + + row_iterator = self._make_one_from_data( + (("name", "STRING"), ("g", "GEOGRAPHY")) + ) + bqstorage_client = object() + dtypes = dict(xxx=numpy.dtype("int64")) + progress_bar_type = "normal" + create_bqstorage_client = False + date_as_object = False + geography_column = "g" + + to_dataframe.return_value = pandas.DataFrame( + dict(name=["foo"], g=[wkt.loads("point(0 0)")],) + ) + + df = row_iterator.to_geodataframe( + bqstorage_client=bqstorage_client, + dtypes=dtypes, + progress_bar_type=progress_bar_type, + create_bqstorage_client=create_bqstorage_client, + date_as_object=date_as_object, + geography_column=geography_column, + ) + + to_dataframe.assert_called_once_with( + bqstorage_client, + dtypes, + progress_bar_type, + create_bqstorage_client, + date_as_object, + geography_as_object=True, + ) + + self.assertIsInstance(df, geopandas.GeoDataFrame) + self.assertEqual(len(df), 1) # verify the number of rows + self.assertEqual(list(df), ["name", "g"]) # verify the column names + self.assertEqual(df.name.dtype.name, "object") + self.assertEqual(df.g.dtype.name, "geometry") + self.assertIsInstance(df.g, geopandas.GeoSeries) + self.assertEqual(list(map(str, df.area)), ["0.0"]) + self.assertEqual(list(map(str, df.g.area)), ["0.0"]) + self.assertEqual([v.__class__.__name__ for v in df.g], ["Point"]) + class TestPartitionRange(unittest.TestCase): def _get_target_class(self):