Skip to content

Commit

Permalink
feat: new bytes, json, decimal type mappings
Browse files Browse the repository at this point in the history
  • Loading branch information
TrevorBergeron committed Dec 19, 2023
1 parent dab2f2c commit 620533e
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 31 deletions.
14 changes: 12 additions & 2 deletions bigframes/core/compile/compiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,10 @@ def _get_ibis_column(self, key: str) -> ibis_types.Value:
raise ValueError(
"Column name {} not in set of values: {}".format(key, self.column_ids)
)
return typing.cast(ibis_types.Value, self._column_names[key])
return typing.cast(
ibis_types.Value,
bigframes.dtypes.ibis_value_to_canonical_type(self._column_names[key]),
)

def get_column_type(self, key: str) -> bigframes.dtypes.Dtype:
ibis_type = typing.cast(
Expand Down Expand Up @@ -1177,7 +1180,14 @@ def _to_ibis_expr(
# Make sure all dtypes are the "canonical" ones for BigFrames. This is
# important for operations like UNION where the schema must match.
table = self._table.select(
bigframes.dtypes.ibis_value_to_canonical_type(column) for column in columns
bigframes.dtypes.ibis_value_to_canonical_type(
column.resolve(self._table)
# TODO(https://github.com/ibis-project/ibis/issues/7613): use
# public API to refer to Deferred type.
if isinstance(column, ibis.common.deferred.Deferred)
else column
)
for column in columns
)
base_table = table
if self._reduced_predicate is not None:
Expand Down
41 changes: 23 additions & 18 deletions bigframes/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import bigframes.constants as constants
import third_party.bigframes_vendored.google_cloud_bigquery._pandas_helpers as gcb3p_pandas_helpers
import third_party.bigframes_vendored.ibis.expr.operations as vendored_ibis_ops

# Type hints for Pandas dtypes supported by BigQuery DataFrame
Dtype = Union[
Expand Down Expand Up @@ -96,6 +97,15 @@
ibis_dtypes.Timestamp(timezone="UTC"),
pd.ArrowDtype(pa.timestamp("us", tz="UTC")),
),
(ibis_dtypes.binary, pd.ArrowDtype(pa.binary())),
(
ibis_dtypes.Decimal(precision=38, scale=9, nullable=True),
pd.ArrowDtype(pa.decimal128(38, 9)),
),
(
ibis_dtypes.Decimal(precision=76, scale=38, nullable=True),
pd.ArrowDtype(pa.decimal256(76, 38)),
),
)

BIGFRAMES_TO_IBIS: Dict[Dtype, ibis_dtypes.DataType] = {
Expand All @@ -111,6 +121,13 @@
ibis_dtypes.time: pa.time64("us"),
ibis_dtypes.Timestamp(timezone=None): pa.timestamp("us"),
ibis_dtypes.Timestamp(timezone="UTC"): pa.timestamp("us", tz="UTC"),
ibis_dtypes.binary: pd.ArrowDtype(pa.binary()),
ibis_dtypes.Decimal(precision=38, scale=9, nullable=True): pd.ArrowDtype(
pa.decimal128(38, 9)
),
ibis_dtypes.Decimal(precision=76, scale=38, nullable=True): pd.ArrowDtype(
pa.decimal256(76, 38)
),
}

ARROW_TO_IBIS = {arrow: ibis for ibis, arrow in IBIS_TO_ARROW.items()}
Expand All @@ -124,10 +141,6 @@
)
IBIS_TO_BIGFRAMES.update(
{
ibis_dtypes.binary: np.dtype("O"),
ibis_dtypes.json: np.dtype("O"),
ibis_dtypes.Decimal(precision=38, scale=9, nullable=True): np.dtype("O"),
ibis_dtypes.Decimal(precision=76, scale=38, nullable=True): np.dtype("O"),
ibis_dtypes.GeoSpatial(
geotype="geography", srid=4326, nullable=True
): gpd.array.GeometryDtype(),
Expand Down Expand Up @@ -177,7 +190,7 @@ def ibis_dtype_to_bigframes_dtype(
# our IO returns them as objects. Eventually, we should support them as
# ArrowDType (and update the IO accordingly)
if isinstance(ibis_dtype, ibis_dtypes.Array):
return np.dtype("O")
return pd.ArrowDtype(ibis_dtype_to_arrow_dtype(ibis_dtype))

if isinstance(ibis_dtype, ibis_dtypes.Struct):
return pd.ArrowDtype(ibis_dtype_to_arrow_dtype(ibis_dtype))
Expand Down Expand Up @@ -223,21 +236,13 @@ def ibis_value_to_canonical_type(value: ibis_types.Value) -> ibis_types.Value:
This is useful in cases where multiple types correspond to the same BigFrames dtype.
"""
ibis_type = value.type()
name = value.get_name()
if ibis_type.is_json():
value = vendored_ibis_ops.ToJsonString(value).to_expr()
return value.name(name)
# Allow REQUIRED fields to be joined with NULLABLE fields.
nullable_type = ibis_type.copy(nullable=True)
return value.cast(nullable_type).name(value.get_name())


def ibis_table_to_canonical_types(table: ibis_types.Table) -> ibis_types.Table:
"""Converts an Ibis table expression to canonical types.
This is useful in cases where multiple types correspond to the same BigFrames dtype.
"""
casted_columns = []
for column_name in table.columns:
column = typing.cast(ibis_types.Value, table[column_name])
casted_columns.append(ibis_value_to_canonical_type(column))
return table.select(*casted_columns)
return value.cast(nullable_type).name(name)


def arrow_dtype_to_ibis_dtype(arrow_dtype: pa.DataType) -> ibis_dtypes.DataType:
Expand Down
3 changes: 2 additions & 1 deletion tests/system/small/test_dataframe_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ def test_load_json(session):
expected = pd.DataFrame(
{
"json_column": ['{"bar":true,"foo":10}'],
}
},
dtype=pd.StringDtype(storage="pyarrow"),
)
expected.index = expected.index.astype("Int64")
pd.testing.assert_series_equal(result.dtypes, expected.dtypes)
Expand Down
24 changes: 18 additions & 6 deletions tests/system/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,16 +133,28 @@ def convert_pandas_dtypes(df: pd.DataFrame, bytes_col: bool):
df["geography_col"].replace({np.nan: None})
)

# Convert bytes types column.
if bytes_col:
if not isinstance(df["bytes_col"].dtype, pd.ArrowDtype):
df["bytes_col"] = df["bytes_col"].apply(
lambda value: base64.b64decode(value) if not pd.isnull(value) else value
)
arrow_table = pa.Table.from_pandas(
pd.DataFrame(df, columns=["bytes_col"]),
schema=pa.schema([("bytes_col", pa.binary())]),
)
df["bytes_col"] = arrow_table.to_pandas(types_mapper=pd.ArrowDtype)["bytes_col"]

# Convert numeric types column.
df["numeric_col"] = df["numeric_col"].apply(
lambda value: decimal.Decimal(str(value)) if value else None # type: ignore
)
if not isinstance(df["numeric_col"].dtype, pd.ArrowDtype):
# Convert numeric types column.
df["numeric_col"] = df["numeric_col"].apply(
lambda value: decimal.Decimal(str(value)) if value else None # type: ignore
)
arrow_table = pa.Table.from_pandas(
pd.DataFrame(df, columns=["numeric_col"]),
schema=pa.schema([("numeric_col", pa.decimal128(38, 9))]),
)
df["numeric_col"] = arrow_table.to_pandas(types_mapper=pd.ArrowDtype)[
"numeric_col"
]


def assert_pandas_df_equal_pca_components(actual, expected, **kwargs):
Expand Down
7 changes: 3 additions & 4 deletions tests/unit/test_dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@
# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
pytest.param(
ibis_dtypes.Decimal(precision=76, scale=38, nullable=True),
np.dtype("O"),
pd.ArrowDtype(pa.decimal256(76, 38)),
id="bignumeric",
),
pytest.param(ibis_dtypes.boolean, pd.BooleanDtype(), id="bool"),
pytest.param(ibis_dtypes.binary, np.dtype("O"), id="bytes"),
pytest.param(ibis_dtypes.binary, pd.ArrowDtype(pa.binary()), id="bytes"),
pytest.param(ibis_dtypes.date, pd.ArrowDtype(pa.date32()), id="date"),
pytest.param(
ibis_dtypes.Timestamp(), pd.ArrowDtype(pa.timestamp("us")), id="datetime"
Expand All @@ -49,10 +49,9 @@
pytest.param(ibis_dtypes.int8, pd.Int64Dtype(), id="int8-as-int64"),
pytest.param(ibis_dtypes.int64, pd.Int64Dtype(), id="int64"),
# TODO(tswast): custom dtype (or at least string dtype) for JSON objects
pytest.param(ibis_dtypes.json, np.dtype("O"), id="json"),
pytest.param(
ibis_dtypes.Decimal(precision=38, scale=9, nullable=True),
np.dtype("O"),
pd.ArrowDtype(pa.decimal128(38, 9)),
id="numeric",
),
pytest.param(
Expand Down

0 comments on commit 620533e

Please sign in to comment.