Skip to content

Commit

Permalink
feat: specific pyarrow mappings for decimal, bytes types (#283)
Browse files Browse the repository at this point in the history
* feat: new bytes, json, decimal type mappings

* amend tests to reflect new types

* add implicit type conversion for df.replace

* more type casting tests

* skip pandas 1.x for more tests

---------

Co-authored-by: Tim Swast <[email protected]>
  • Loading branch information
TrevorBergeron and tswast authored Dec 21, 2023
1 parent ad67465 commit a1c0631
Show file tree
Hide file tree
Showing 21 changed files with 267 additions and 101 deletions.
2 changes: 1 addition & 1 deletion bigframes/core/block_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def interpolate(block: blocks.Block, method: str = "linear") -> blocks.Block:
if len(index_columns) != 1:
raise ValueError("only method 'linear' supports multi-index")
xvalues = block.index_columns[0]
if block.index_dtypes[0] not in dtypes.NUMERIC_BIGFRAMES_TYPES:
if block.index_dtypes[0] not in dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE:
raise ValueError("Can only interpolate on numeric index.")

for column in original_columns:
Expand Down
2 changes: 1 addition & 1 deletion bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,7 @@ def _standard_stats(self, column_id) -> typing.Sequence[agg_ops.AggregateOp]:
stats: list[agg_ops.AggregateOp] = [agg_ops.count_op]
if dtype not in bigframes.dtypes.UNORDERED_DTYPES:
stats += [agg_ops.min_op, agg_ops.max_op]
if dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES:
if dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE:
# Notable exclusions:
# prod op tends to cause overflows
# Also, var_op is redundant as can be derived from std
Expand Down
14 changes: 12 additions & 2 deletions bigframes/core/compile/compiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,10 @@ def _get_ibis_column(self, key: str) -> ibis_types.Value:
raise ValueError(
"Column name {} not in set of values: {}".format(key, self.column_ids)
)
return typing.cast(ibis_types.Value, self._column_names[key])
return typing.cast(
ibis_types.Value,
bigframes.dtypes.ibis_value_to_canonical_type(self._column_names[key]),
)

def get_column_type(self, key: str) -> bigframes.dtypes.Dtype:
ibis_type = typing.cast(
Expand Down Expand Up @@ -1177,7 +1180,14 @@ def _to_ibis_expr(
# Make sure all dtypes are the "canonical" ones for BigFrames. This is
# important for operations like UNION where the schema must match.
table = self._table.select(
bigframes.dtypes.ibis_value_to_canonical_type(column) for column in columns
bigframes.dtypes.ibis_value_to_canonical_type(
column.resolve(self._table)
# TODO(https://github.com/ibis-project/ibis/issues/7613): use
# public API to refer to Deferred type.
if isinstance(column, ibis.common.deferred.Deferred)
else column
)
for column in columns
)
base_table = table
if self._reduced_predicate is not None:
Expand Down
7 changes: 5 additions & 2 deletions bigframes/core/groupby/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,8 @@ def _convert_index(self, dataframe: df.DataFrame):

def _raise_on_non_numeric(self, op: str):
if not all(
dtype in dtypes.NUMERIC_BIGFRAMES_TYPES for dtype in self._block.dtypes
dtype in dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE
for dtype in self._block.dtypes
):
raise NotImplementedError(
f"'{op}' does not support non-numeric columns. "
Expand All @@ -371,7 +372,9 @@ def _raise_on_non_numeric(self, op: str):
def _aggregated_columns(self, numeric_only: bool = False) -> typing.Sequence[str]:
valid_agg_cols: list[str] = []
for col_id in self._selected_cols:
is_numeric = self._column_type(col_id) in dtypes.NUMERIC_BIGFRAMES_TYPES
is_numeric = (
self._column_type(col_id) in dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE
)
if is_numeric or not numeric_only:
valid_agg_cols.append(col_id)
return valid_agg_cols
Expand Down
20 changes: 11 additions & 9 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1800,7 +1800,7 @@ def agg(
) -> DataFrame | bigframes.series.Series:
if utils.is_list_like(func):
if any(
dtype not in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES
dtype not in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE
for dtype in self.dtypes
):
raise NotImplementedError(
Expand Down Expand Up @@ -1867,7 +1867,7 @@ def melt(
)

def describe(self) -> DataFrame:
df_numeric = self._drop_non_numeric(keep_bool=False)
df_numeric = self._drop_non_numeric(permissive=False)
if len(df_numeric.columns) == 0:
raise NotImplementedError(
f"df.describe() currently only supports numeric values. {constants.FEEDBACK_LINK}"
Expand Down Expand Up @@ -2005,10 +2005,12 @@ def unstack(self, level: LevelsType = -1):
)
return DataFrame(pivot_block)

def _drop_non_numeric(self, keep_bool=True) -> DataFrame:
types_to_keep = set(bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES)
if not keep_bool:
types_to_keep -= set(bigframes.dtypes.BOOL_BIGFRAMES_TYPES)
def _drop_non_numeric(self, permissive=True) -> DataFrame:
types_to_keep = (
set(bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE)
if permissive
else set(bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_RESTRICTIVE)
)
non_numeric_cols = [
col_id
for col_id, dtype in zip(self._block.value_columns, self._block.dtypes)
Expand All @@ -2026,7 +2028,7 @@ def _drop_non_bool(self) -> DataFrame:

def _raise_on_non_numeric(self, op: str):
if not all(
dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES
dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE
for dtype in self._block.dtypes
):
raise NotImplementedError(
Expand Down Expand Up @@ -2301,7 +2303,7 @@ def notna(self) -> DataFrame:

def cumsum(self):
is_numeric_types = [
(dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES)
(dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE)
for _, dtype in self.dtypes.items()
]
if not all(is_numeric_types):
Expand All @@ -2313,7 +2315,7 @@ def cumsum(self):

def cumprod(self) -> DataFrame:
is_numeric_types = [
(dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES)
(dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE)
for _, dtype in self.dtypes.items()
]
if not all(is_numeric_types):
Expand Down
139 changes: 102 additions & 37 deletions bigframes/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""Mappings for Pandas dtypes supported by BigQuery DataFrames package"""

import datetime
import decimal
import textwrap
import typing
from typing import Any, Dict, Iterable, Literal, Tuple, Union
Expand All @@ -30,6 +31,7 @@

import bigframes.constants as constants
import third_party.bigframes_vendored.google_cloud_bigquery._pandas_helpers as gcb3p_pandas_helpers
import third_party.bigframes_vendored.ibis.expr.operations as vendored_ibis_ops

# Type hints for Pandas dtypes supported by BigQuery DataFrame
Dtype = Union[
Expand All @@ -40,9 +42,6 @@
pd.ArrowDtype,
]

# Corresponds to the pandas concept of numeric type (such as when 'numeric_only' is specified in an operation)
NUMERIC_BIGFRAMES_TYPES = [pd.BooleanDtype(), pd.Float64Dtype(), pd.Int64Dtype()]

# On BQ side, ARRAY, STRUCT, GEOGRAPHY, JSON are not orderable
UNORDERED_DTYPES = [gpd.array.GeometryDtype()]

Expand All @@ -57,6 +56,9 @@
"timestamp[us][pyarrow]",
"date32[day][pyarrow]",
"time64[us][pyarrow]",
"decimal128(38, 9)[pyarrow]",
"decimal256(38, 9)[pyarrow]",
"binary[pyarrow]",
]

# Type hints for Ibis data types supported by BigQuery DataFrame
Expand All @@ -72,8 +74,17 @@

BOOL_BIGFRAMES_TYPES = [pd.BooleanDtype()]

# Several operations are restricted to these types.
NUMERIC_BIGFRAMES_TYPES = [pd.BooleanDtype(), pd.Float64Dtype(), pd.Int64Dtype()]
# Corresponds to the pandas concept of numeric type (such as when 'numeric_only' is specified in an operation)
# Pandas is inconsistent, so two definitions are provided, each used in different contexts
NUMERIC_BIGFRAMES_TYPES_RESTRICTIVE = [
pd.Float64Dtype(),
pd.Int64Dtype(),
]
NUMERIC_BIGFRAMES_TYPES_PERMISSIVE = NUMERIC_BIGFRAMES_TYPES_RESTRICTIVE + [
pd.BooleanDtype(),
pd.ArrowDtype(pa.decimal128(38, 9)),
pd.ArrowDtype(pa.decimal256(76, 38)),
]

# Type hints for Ibis data types that can be read to Python objects by BigQuery DataFrame
ReadOnlyIbisDtype = Union[
Expand All @@ -97,6 +108,15 @@
ibis_dtypes.Timestamp(timezone="UTC"),
pd.ArrowDtype(pa.timestamp("us", tz="UTC")),
),
(ibis_dtypes.binary, pd.ArrowDtype(pa.binary())),
(
ibis_dtypes.Decimal(precision=38, scale=9, nullable=True),
pd.ArrowDtype(pa.decimal128(38, 9)),
),
(
ibis_dtypes.Decimal(precision=76, scale=38, nullable=True),
pd.ArrowDtype(pa.decimal256(76, 38)),
),
)

BIGFRAMES_TO_IBIS: Dict[Dtype, ibis_dtypes.DataType] = {
Expand All @@ -112,6 +132,9 @@
ibis_dtypes.time: pa.time64("us"),
ibis_dtypes.Timestamp(timezone=None): pa.timestamp("us"),
ibis_dtypes.Timestamp(timezone="UTC"): pa.timestamp("us", tz="UTC"),
ibis_dtypes.binary: pa.binary(),
ibis_dtypes.Decimal(precision=38, scale=9, nullable=True): pa.decimal128(38, 9),
ibis_dtypes.Decimal(precision=76, scale=38, nullable=True): pa.decimal256(76, 38),
}

ARROW_TO_IBIS = {arrow: ibis for ibis, arrow in IBIS_TO_ARROW.items()}
Expand All @@ -125,10 +148,6 @@
)
IBIS_TO_BIGFRAMES.update(
{
ibis_dtypes.binary: np.dtype("O"),
ibis_dtypes.json: np.dtype("O"),
ibis_dtypes.Decimal(precision=38, scale=9, nullable=True): np.dtype("O"),
ibis_dtypes.Decimal(precision=76, scale=38, nullable=True): np.dtype("O"),
ibis_dtypes.GeoSpatial(
geotype="geography", srid=4326, nullable=True
): gpd.array.GeometryDtype(),
Expand Down Expand Up @@ -178,7 +197,7 @@ def ibis_dtype_to_bigframes_dtype(
# our IO returns them as objects. Eventually, we should support them as
# ArrowDType (and update the IO accordingly)
if isinstance(ibis_dtype, ibis_dtypes.Array):
return np.dtype("O")
return pd.ArrowDtype(ibis_dtype_to_arrow_dtype(ibis_dtype))

if isinstance(ibis_dtype, ibis_dtypes.Struct):
return pd.ArrowDtype(ibis_dtype_to_arrow_dtype(ibis_dtype))
Expand All @@ -200,7 +219,9 @@ def ibis_dtype_to_bigframes_dtype(

def ibis_dtype_to_arrow_dtype(ibis_dtype: ibis_dtypes.DataType) -> pa.DataType:
if isinstance(ibis_dtype, ibis_dtypes.Array):
return pa.list_(ibis_dtype_to_arrow_dtype(ibis_dtype.value_type))
return pa.list_(
ibis_dtype_to_arrow_dtype(ibis_dtype.value_type.copy(nullable=True))
)

if isinstance(ibis_dtype, ibis_dtypes.Struct):
return pa.struct(
Expand All @@ -224,21 +245,13 @@ def ibis_value_to_canonical_type(value: ibis_types.Value) -> ibis_types.Value:
This is useful in cases where multiple types correspond to the same BigFrames dtype.
"""
ibis_type = value.type()
name = value.get_name()
if ibis_type.is_json():
value = vendored_ibis_ops.ToJsonString(value).to_expr()
return value.name(name)
# Allow REQUIRED fields to be joined with NULLABLE fields.
nullable_type = ibis_type.copy(nullable=True)
return value.cast(nullable_type).name(value.get_name())


def ibis_table_to_canonical_types(table: ibis_types.Table) -> ibis_types.Table:
"""Converts an Ibis table expression to canonical types.
This is useful in cases where multiple types correspond to the same BigFrames dtype.
"""
casted_columns = []
for column_name in table.columns:
column = typing.cast(ibis_types.Value, table[column_name])
casted_columns.append(ibis_value_to_canonical_type(column))
return table.select(*casted_columns)
return value.cast(nullable_type).name(name)


def arrow_dtype_to_ibis_dtype(arrow_dtype: pa.DataType) -> ibis_dtypes.DataType:
Expand Down Expand Up @@ -386,15 +399,35 @@ def cast_ibis_value(
ibis_dtypes.bool,
ibis_dtypes.float64,
ibis_dtypes.string,
ibis_dtypes.Decimal(precision=38, scale=9),
ibis_dtypes.Decimal(precision=76, scale=38),
),
ibis_dtypes.float64: (
ibis_dtypes.string,
ibis_dtypes.int64,
ibis_dtypes.Decimal(precision=38, scale=9),
ibis_dtypes.Decimal(precision=76, scale=38),
),
ibis_dtypes.string: (
ibis_dtypes.int64,
ibis_dtypes.float64,
ibis_dtypes.Decimal(precision=38, scale=9),
ibis_dtypes.Decimal(precision=76, scale=38),
ibis_dtypes.binary,
),
ibis_dtypes.float64: (ibis_dtypes.string, ibis_dtypes.int64),
ibis_dtypes.string: (ibis_dtypes.int64, ibis_dtypes.float64),
ibis_dtypes.date: (ibis_dtypes.string,),
ibis_dtypes.Decimal(precision=38, scale=9): (ibis_dtypes.float64,),
ibis_dtypes.Decimal(precision=76, scale=38): (ibis_dtypes.float64,),
ibis_dtypes.Decimal(precision=38, scale=9): (
ibis_dtypes.float64,
ibis_dtypes.Decimal(precision=76, scale=38),
),
ibis_dtypes.Decimal(precision=76, scale=38): (
ibis_dtypes.float64,
ibis_dtypes.Decimal(precision=38, scale=9),
),
ibis_dtypes.time: (),
ibis_dtypes.timestamp: (ibis_dtypes.Timestamp(timezone="UTC"),),
ibis_dtypes.Timestamp(timezone="UTC"): (ibis_dtypes.timestamp,),
ibis_dtypes.binary: (ibis_dtypes.string,),
}

value = ibis_value_to_canonical_type(value)
Expand Down Expand Up @@ -458,30 +491,62 @@ def is_dtype(scalar: typing.Any, dtype: Dtype) -> bool:
return False


# string is binary
def is_patype(scalar: typing.Any, pa_type: pa.DataType) -> bool:
"""Determine whether a scalar's type matches a given pyarrow type."""
if pa_type == pa.time64("us"):
return isinstance(scalar, datetime.time)
if pa_type == pa.timestamp("us"):
elif pa_type == pa.timestamp("us"):
if isinstance(scalar, datetime.datetime):
return not scalar.tzinfo
if isinstance(scalar, pd.Timestamp):
return not scalar.tzinfo
if pa_type == pa.timestamp("us", tz="UTC"):
elif pa_type == pa.timestamp("us", tz="UTC"):
if isinstance(scalar, datetime.datetime):
return scalar.tzinfo == datetime.timezone.utc
if isinstance(scalar, pd.Timestamp):
return scalar.tzinfo == datetime.timezone.utc
if pa_type == pa.date32():
elif pa_type == pa.date32():
return isinstance(scalar, datetime.date)
elif pa_type == pa.binary():
return isinstance(scalar, bytes)
elif pa_type == pa.decimal128(38, 9):
# decimal.Decimal is a superset, but ibis performs out-of-bounds and loss-of-precision checks
return isinstance(scalar, decimal.Decimal)
elif pa_type == pa.decimal256(76, 38):
# decimal.Decimal is a superset, but ibis performs out-of-bounds and loss-of-precision checks
return isinstance(scalar, decimal.Decimal)
return False


def is_comparable(scalar: typing.Any, dtype: Dtype) -> bool:
"""Whether scalar can be compare to items of dtype (though maybe requiring coercion)"""
def is_compatible(scalar: typing.Any, dtype: Dtype) -> typing.Optional[Dtype]:
"""Whether scalar can be compare to items of dtype (though maybe requiring coercion). Returns the datatype that must be used for the comparison"""
if is_dtype(scalar, dtype):
return True
return dtype
elif pd.api.types.is_numeric_dtype(dtype):
return pd.api.types.is_number(scalar)
else:
return False
# Implicit conversion currently only supported for numeric types
if pd.api.types.is_bool(scalar):
return lcd_type(pd.BooleanDtype(), dtype)
if pd.api.types.is_float(scalar):
return lcd_type(pd.Float64Dtype(), dtype)
if pd.api.types.is_integer(scalar):
return lcd_type(pd.Int64Dtype(), dtype)
if isinstance(scalar, decimal.Decimal):
# TODO: Check context to see if can use NUMERIC instead of BIGNUMERIC
return lcd_type(pd.ArrowDtype(pa.decimal128(76, 38)), dtype)
return None


def lcd_type(dtype1: Dtype, dtype2: Dtype) -> typing.Optional[Dtype]:
# Implicit conversion currently only supported for numeric types
hierarchy: list[Dtype] = [
pd.BooleanDtype(),
pd.Int64Dtype(),
pd.Float64Dtype(),
pd.ArrowDtype(pa.decimal128(38, 9)),
pd.ArrowDtype(pa.decimal256(76, 38)),
]
if (dtype1 not in hierarchy) or (dtype2 not in hierarchy):
return None
lcd_index = max(hierarchy.index(dtype1), hierarchy.index(dtype2))
return hierarchy[lcd_index]
Loading

0 comments on commit a1c0631

Please sign in to comment.