Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: specific pyarrow mappings for decimal, bytes types #283

Merged
merged 9 commits into from
Dec 21, 2023
2 changes: 1 addition & 1 deletion bigframes/core/block_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def interpolate(block: blocks.Block, method: str = "linear") -> blocks.Block:
if len(index_columns) != 1:
raise ValueError("only method 'linear' supports multi-index")
xvalues = block.index_columns[0]
if block.index_dtypes[0] not in dtypes.NUMERIC_BIGFRAMES_TYPES:
if block.index_dtypes[0] not in dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE:
raise ValueError("Can only interpolate on numeric index.")

for column in original_columns:
Expand Down
2 changes: 1 addition & 1 deletion bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,7 @@ def _standard_stats(self, column_id) -> typing.Sequence[agg_ops.AggregateOp]:
stats: list[agg_ops.AggregateOp] = [agg_ops.count_op]
if dtype not in bigframes.dtypes.UNORDERED_DTYPES:
stats += [agg_ops.min_op, agg_ops.max_op]
if dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES:
if dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE:
# Notable exclusions:
# prod op tends to cause overflows
# Also, var_op is redundant as can be derived from std
Expand Down
14 changes: 12 additions & 2 deletions bigframes/core/compile/compiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,10 @@ def _get_ibis_column(self, key: str) -> ibis_types.Value:
raise ValueError(
"Column name {} not in set of values: {}".format(key, self.column_ids)
)
return typing.cast(ibis_types.Value, self._column_names[key])
return typing.cast(
ibis_types.Value,
bigframes.dtypes.ibis_value_to_canonical_type(self._column_names[key]),
)

def get_column_type(self, key: str) -> bigframes.dtypes.Dtype:
ibis_type = typing.cast(
Expand Down Expand Up @@ -1177,7 +1180,14 @@ def _to_ibis_expr(
# Make sure all dtypes are the "canonical" ones for BigFrames. This is
# important for operations like UNION where the schema must match.
table = self._table.select(
bigframes.dtypes.ibis_value_to_canonical_type(column) for column in columns
bigframes.dtypes.ibis_value_to_canonical_type(
column.resolve(self._table)
# TODO(https://github.com/ibis-project/ibis/issues/7613): use
# public API to refer to Deferred type.
if isinstance(column, ibis.common.deferred.Deferred)
else column
)
for column in columns
)
base_table = table
if self._reduced_predicate is not None:
Expand Down
7 changes: 5 additions & 2 deletions bigframes/core/groupby/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,8 @@ def _convert_index(self, dataframe: df.DataFrame):

def _raise_on_non_numeric(self, op: str):
if not all(
dtype in dtypes.NUMERIC_BIGFRAMES_TYPES for dtype in self._block.dtypes
dtype in dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE
for dtype in self._block.dtypes
):
raise NotImplementedError(
f"'{op}' does not support non-numeric columns. "
Expand All @@ -371,7 +372,9 @@ def _raise_on_non_numeric(self, op: str):
def _aggregated_columns(self, numeric_only: bool = False) -> typing.Sequence[str]:
valid_agg_cols: list[str] = []
for col_id in self._selected_cols:
is_numeric = self._column_type(col_id) in dtypes.NUMERIC_BIGFRAMES_TYPES
is_numeric = (
self._column_type(col_id) in dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE
)
if is_numeric or not numeric_only:
valid_agg_cols.append(col_id)
return valid_agg_cols
Expand Down
20 changes: 11 additions & 9 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1800,7 +1800,7 @@ def agg(
) -> DataFrame | bigframes.series.Series:
if utils.is_list_like(func):
if any(
dtype not in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES
dtype not in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE
for dtype in self.dtypes
):
raise NotImplementedError(
Expand Down Expand Up @@ -1867,7 +1867,7 @@ def melt(
)

def describe(self) -> DataFrame:
df_numeric = self._drop_non_numeric(keep_bool=False)
df_numeric = self._drop_non_numeric(permissive=False)
if len(df_numeric.columns) == 0:
raise NotImplementedError(
f"df.describe() currently only supports numeric values. {constants.FEEDBACK_LINK}"
Expand Down Expand Up @@ -2005,10 +2005,12 @@ def unstack(self, level: LevelsType = -1):
)
return DataFrame(pivot_block)

def _drop_non_numeric(self, keep_bool=True) -> DataFrame:
types_to_keep = set(bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES)
if not keep_bool:
types_to_keep -= set(bigframes.dtypes.BOOL_BIGFRAMES_TYPES)
def _drop_non_numeric(self, permissive=True) -> DataFrame:
types_to_keep = (
set(bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE)
if permissive
else set(bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_RESTRICTIVE)
)
non_numeric_cols = [
col_id
for col_id, dtype in zip(self._block.value_columns, self._block.dtypes)
Expand All @@ -2026,7 +2028,7 @@ def _drop_non_bool(self) -> DataFrame:

def _raise_on_non_numeric(self, op: str):
if not all(
dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES
dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE
for dtype in self._block.dtypes
):
raise NotImplementedError(
Expand Down Expand Up @@ -2301,7 +2303,7 @@ def notna(self) -> DataFrame:

def cumsum(self):
is_numeric_types = [
(dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES)
(dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE)
for _, dtype in self.dtypes.items()
]
if not all(is_numeric_types):
Expand All @@ -2313,7 +2315,7 @@ def cumsum(self):

def cumprod(self) -> DataFrame:
is_numeric_types = [
(dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES)
(dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE)
for _, dtype in self.dtypes.items()
]
if not all(is_numeric_types):
Expand Down
139 changes: 102 additions & 37 deletions bigframes/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""Mappings for Pandas dtypes supported by BigQuery DataFrames package"""

import datetime
import decimal
import textwrap
import typing
from typing import Any, Dict, Iterable, Literal, Tuple, Union
Expand All @@ -30,6 +31,7 @@

import bigframes.constants as constants
import third_party.bigframes_vendored.google_cloud_bigquery._pandas_helpers as gcb3p_pandas_helpers
import third_party.bigframes_vendored.ibis.expr.operations as vendored_ibis_ops

# Type hints for Pandas dtypes supported by BigQuery DataFrame
Dtype = Union[
Expand All @@ -40,9 +42,6 @@
pd.ArrowDtype,
]

# Corresponds to the pandas concept of numeric type (such as when 'numeric_only' is specified in an operation)
NUMERIC_BIGFRAMES_TYPES = [pd.BooleanDtype(), pd.Float64Dtype(), pd.Int64Dtype()]

# On BQ side, ARRAY, STRUCT, GEOGRAPHY, JSON are not orderable
UNORDERED_DTYPES = [gpd.array.GeometryDtype()]

Expand All @@ -57,6 +56,9 @@
"timestamp[us][pyarrow]",
"date32[day][pyarrow]",
"time64[us][pyarrow]",
"decimal128(38, 9)[pyarrow]",
"decimal256(38, 9)[pyarrow]",
"binary[pyarrow]",
]

# Type hints for Ibis data types supported by BigQuery DataFrame
Expand All @@ -72,8 +74,17 @@

BOOL_BIGFRAMES_TYPES = [pd.BooleanDtype()]

# Several operations are restricted to these types.
NUMERIC_BIGFRAMES_TYPES = [pd.BooleanDtype(), pd.Float64Dtype(), pd.Int64Dtype()]
# Corresponds to the pandas concept of numeric type (such as when 'numeric_only' is specified in an operation)
# Pandas is inconsistent, so two definitions are provided, each used in different contexts
NUMERIC_BIGFRAMES_TYPES_RESTRICTIVE = [
pd.Float64Dtype(),
pd.Int64Dtype(),
]
NUMERIC_BIGFRAMES_TYPES_PERMISSIVE = NUMERIC_BIGFRAMES_TYPES_RESTRICTIVE + [
pd.BooleanDtype(),
pd.ArrowDtype(pa.decimal128(38, 9)),
pd.ArrowDtype(pa.decimal256(76, 38)),
]

# Type hints for Ibis data types that can be read to Python objects by BigQuery DataFrame
ReadOnlyIbisDtype = Union[
Expand All @@ -97,6 +108,15 @@
ibis_dtypes.Timestamp(timezone="UTC"),
pd.ArrowDtype(pa.timestamp("us", tz="UTC")),
),
(ibis_dtypes.binary, pd.ArrowDtype(pa.binary())),
(
ibis_dtypes.Decimal(precision=38, scale=9, nullable=True),
pd.ArrowDtype(pa.decimal128(38, 9)),
),
(
ibis_dtypes.Decimal(precision=76, scale=38, nullable=True),
pd.ArrowDtype(pa.decimal256(76, 38)),
),
)

BIGFRAMES_TO_IBIS: Dict[Dtype, ibis_dtypes.DataType] = {
Expand All @@ -112,6 +132,9 @@
ibis_dtypes.time: pa.time64("us"),
ibis_dtypes.Timestamp(timezone=None): pa.timestamp("us"),
ibis_dtypes.Timestamp(timezone="UTC"): pa.timestamp("us", tz="UTC"),
ibis_dtypes.binary: pa.binary(),
ibis_dtypes.Decimal(precision=38, scale=9, nullable=True): pa.decimal128(38, 9),
ibis_dtypes.Decimal(precision=76, scale=38, nullable=True): pa.decimal256(76, 38),
}

ARROW_TO_IBIS = {arrow: ibis for ibis, arrow in IBIS_TO_ARROW.items()}
Expand All @@ -125,10 +148,6 @@
)
IBIS_TO_BIGFRAMES.update(
{
ibis_dtypes.binary: np.dtype("O"),
ibis_dtypes.json: np.dtype("O"),
ibis_dtypes.Decimal(precision=38, scale=9, nullable=True): np.dtype("O"),
ibis_dtypes.Decimal(precision=76, scale=38, nullable=True): np.dtype("O"),
ibis_dtypes.GeoSpatial(
geotype="geography", srid=4326, nullable=True
): gpd.array.GeometryDtype(),
Expand Down Expand Up @@ -178,7 +197,7 @@ def ibis_dtype_to_bigframes_dtype(
# our IO returns them as objects. Eventually, we should support them as
# ArrowDType (and update the IO accordingly)
if isinstance(ibis_dtype, ibis_dtypes.Array):
return np.dtype("O")
return pd.ArrowDtype(ibis_dtype_to_arrow_dtype(ibis_dtype))

if isinstance(ibis_dtype, ibis_dtypes.Struct):
return pd.ArrowDtype(ibis_dtype_to_arrow_dtype(ibis_dtype))
Expand All @@ -200,7 +219,9 @@ def ibis_dtype_to_bigframes_dtype(

def ibis_dtype_to_arrow_dtype(ibis_dtype: ibis_dtypes.DataType) -> pa.DataType:
if isinstance(ibis_dtype, ibis_dtypes.Array):
return pa.list_(ibis_dtype_to_arrow_dtype(ibis_dtype.value_type))
return pa.list_(
ibis_dtype_to_arrow_dtype(ibis_dtype.value_type.copy(nullable=True))
)

if isinstance(ibis_dtype, ibis_dtypes.Struct):
return pa.struct(
Expand All @@ -224,21 +245,13 @@ def ibis_value_to_canonical_type(value: ibis_types.Value) -> ibis_types.Value:
This is useful in cases where multiple types correspond to the same BigFrames dtype.
"""
ibis_type = value.type()
name = value.get_name()
if ibis_type.is_json():
value = vendored_ibis_ops.ToJsonString(value).to_expr()
return value.name(name)
# Allow REQUIRED fields to be joined with NULLABLE fields.
nullable_type = ibis_type.copy(nullable=True)
return value.cast(nullable_type).name(value.get_name())


def ibis_table_to_canonical_types(table: ibis_types.Table) -> ibis_types.Table:
"""Converts an Ibis table expression to canonical types.

This is useful in cases where multiple types correspond to the same BigFrames dtype.
"""
casted_columns = []
for column_name in table.columns:
column = typing.cast(ibis_types.Value, table[column_name])
casted_columns.append(ibis_value_to_canonical_type(column))
return table.select(*casted_columns)
return value.cast(nullable_type).name(name)


def arrow_dtype_to_ibis_dtype(arrow_dtype: pa.DataType) -> ibis_dtypes.DataType:
Expand Down Expand Up @@ -386,15 +399,35 @@ def cast_ibis_value(
ibis_dtypes.bool,
ibis_dtypes.float64,
ibis_dtypes.string,
ibis_dtypes.Decimal(precision=38, scale=9),
ibis_dtypes.Decimal(precision=76, scale=38),
),
ibis_dtypes.float64: (
ibis_dtypes.string,
ibis_dtypes.int64,
ibis_dtypes.Decimal(precision=38, scale=9),
ibis_dtypes.Decimal(precision=76, scale=38),
),
ibis_dtypes.string: (
ibis_dtypes.int64,
ibis_dtypes.float64,
ibis_dtypes.Decimal(precision=38, scale=9),
ibis_dtypes.Decimal(precision=76, scale=38),
ibis_dtypes.binary,
),
ibis_dtypes.float64: (ibis_dtypes.string, ibis_dtypes.int64),
ibis_dtypes.string: (ibis_dtypes.int64, ibis_dtypes.float64),
ibis_dtypes.date: (ibis_dtypes.string,),
ibis_dtypes.Decimal(precision=38, scale=9): (ibis_dtypes.float64,),
ibis_dtypes.Decimal(precision=76, scale=38): (ibis_dtypes.float64,),
ibis_dtypes.Decimal(precision=38, scale=9): (
ibis_dtypes.float64,
ibis_dtypes.Decimal(precision=76, scale=38),
),
ibis_dtypes.Decimal(precision=76, scale=38): (
ibis_dtypes.float64,
ibis_dtypes.Decimal(precision=38, scale=9),
),
ibis_dtypes.time: (),
ibis_dtypes.timestamp: (ibis_dtypes.Timestamp(timezone="UTC"),),
ibis_dtypes.Timestamp(timezone="UTC"): (ibis_dtypes.timestamp,),
ibis_dtypes.binary: (ibis_dtypes.string,),
}

value = ibis_value_to_canonical_type(value)
Expand Down Expand Up @@ -458,30 +491,62 @@ def is_dtype(scalar: typing.Any, dtype: Dtype) -> bool:
return False


# string is binary
def is_patype(scalar: typing.Any, pa_type: pa.DataType) -> bool:
"""Determine whether a scalar's type matches a given pyarrow type."""
if pa_type == pa.time64("us"):
return isinstance(scalar, datetime.time)
if pa_type == pa.timestamp("us"):
elif pa_type == pa.timestamp("us"):
if isinstance(scalar, datetime.datetime):
return not scalar.tzinfo
if isinstance(scalar, pd.Timestamp):
return not scalar.tzinfo
if pa_type == pa.timestamp("us", tz="UTC"):
elif pa_type == pa.timestamp("us", tz="UTC"):
if isinstance(scalar, datetime.datetime):
return scalar.tzinfo == datetime.timezone.utc
if isinstance(scalar, pd.Timestamp):
return scalar.tzinfo == datetime.timezone.utc
if pa_type == pa.date32():
elif pa_type == pa.date32():
return isinstance(scalar, datetime.date)
elif pa_type == pa.binary():
return isinstance(scalar, bytes)
elif pa_type == pa.decimal128(38, 9):
# decimal.Decimal is a superset, but ibis performs out-of-bounds and loss-of-precision checks
return isinstance(scalar, decimal.Decimal)
elif pa_type == pa.decimal256(76, 38):
# decimal.Decimal is a superset, but ibis performs out-of-bounds and loss-of-precision checks
return isinstance(scalar, decimal.Decimal)
return False


def is_comparable(scalar: typing.Any, dtype: Dtype) -> bool:
"""Whether scalar can be compare to items of dtype (though maybe requiring coercion)"""
def is_compatible(scalar: typing.Any, dtype: Dtype) -> typing.Optional[Dtype]:
"""Whether scalar can be compare to items of dtype (though maybe requiring coercion). Returns the datatype that must be used for the comparison"""
if is_dtype(scalar, dtype):
return True
return dtype
elif pd.api.types.is_numeric_dtype(dtype):
return pd.api.types.is_number(scalar)
else:
return False
# Implicit conversion currently only supported for numeric types
if pd.api.types.is_bool(scalar):
return lcd_type(pd.BooleanDtype(), dtype)
if pd.api.types.is_float(scalar):
return lcd_type(pd.Float64Dtype(), dtype)
if pd.api.types.is_integer(scalar):
return lcd_type(pd.Int64Dtype(), dtype)
if isinstance(scalar, decimal.Decimal):
# TODO: Check context to see if can use NUMERIC instead of BIGNUMERIC
return lcd_type(pd.ArrowDtype(pa.decimal128(76, 38)), dtype)
return None


def lcd_type(dtype1: Dtype, dtype2: Dtype) -> typing.Optional[Dtype]:
# Implicit conversion currently only supported for numeric types
hierarchy: list[Dtype] = [
pd.BooleanDtype(),
pd.Int64Dtype(),
pd.Float64Dtype(),
pd.ArrowDtype(pa.decimal128(38, 9)),
pd.ArrowDtype(pa.decimal256(76, 38)),
]
if (dtype1 not in hierarchy) or (dtype2 not in hierarchy):
return None
lcd_index = max(hierarchy.index(dtype1), hierarchy.index(dtype2))
return hierarchy[lcd_index]
Loading