Skip to content

Commit

Permalink
perf: derive expression schema without ibis compilation
Browse files Browse the repository at this point in the history
  • Loading branch information
TrevorBergeron committed Feb 8, 2024
1 parent d5518b2 commit 1b79355
Show file tree
Hide file tree
Showing 10 changed files with 412 additions and 87 deletions.
61 changes: 50 additions & 11 deletions bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
from __future__ import annotations

from dataclasses import dataclass
import functools
import io
import typing
from typing import Iterable, Sequence

import ibis.expr.types as ibis_types
import pandas
import pyarrow as pa
import pyarrow.feather as pa_feather

import bigframes.core.compile as compiling
import bigframes.core.expression as ex
Expand All @@ -28,6 +31,7 @@
import bigframes.core.nodes as nodes
from bigframes.core.ordering import OrderingColumnReference
import bigframes.core.ordering as orderings
import bigframes.core.schema as schemata
import bigframes.core.utils
from bigframes.core.window_spec import WindowSpec
import bigframes.dtypes
Expand All @@ -41,6 +45,10 @@
ORDER_ID_COLUMN = "bigframes_ordering_id"
PREDICATE_COLUMN = "bigframes_predicate"

# Enable only for testing, will slow down performance
# DO NOT COMMIT : Set to False
_VALIDATE_SCHEMA_WITH_IBIS = True


@dataclass(frozen=True)
class ArrayValue:
Expand All @@ -50,6 +58,10 @@ class ArrayValue:

node: nodes.BigFrameNode

def __post_init__(self):
if _VALIDATE_SCHEMA_WITH_IBIS:
self.validate_schema()

@classmethod
def from_ibis(
cls,
Expand All @@ -69,21 +81,27 @@ def from_ibis(
return cls(node)

@classmethod
def from_pandas(cls, pd_df: pandas.DataFrame):
def from_pyarrow(cls, arrow_table: pa.Table):
iobytes = io.BytesIO()
# Use alphanumeric identifiers, to avoid downstream problems with escaping.
as_ids = [
bigframes.core.utils.label_to_identifier(label, strict=True)
for label in pd_df.columns
]
unique_ids = tuple(bigframes.core.utils.disambiguate_ids(as_ids))
pd_df.reset_index(drop=True).set_axis(unique_ids, axis=1).to_feather(iobytes)
node = nodes.ReadLocalNode(iobytes.getvalue())
pa_feather.write_feather(arrow_table, iobytes)
schema_items = tuple(
schemata.SchemaItem(
field.name,
bigframes.dtypes.ibis_dtype_to_bigframes_dtype(
bigframes.dtypes.arrow_dtype_to_ibis_dtype(field.type)
),
)
for field in arrow_table.schema
)
node = nodes.ReadLocalNode(
iobytes.getvalue(), data_schema=schemata.ArraySchema(schema_items)
)

return cls(node)

@property
def column_ids(self) -> typing.Sequence[str]:
return self._compile_ordered().column_ids
return self.schema.names

@property
def session(self) -> Session:
Expand All @@ -94,6 +112,27 @@ def session(self) -> Session:
required_session if (required_session is not None) else get_global_session()
)

@functools.cached_property
def schema(self) -> schemata.ArraySchema:
return self.node.schema

def validate_schema(self):
schema = self.schema
compiled = self._compile_unordered()
items = [
schemata.SchemaItem(id, compiled.get_column_type(id))
for id in compiled.column_ids
]
ibis_schema = schemata.ArraySchema(items)
if schema.names != ibis_schema.names:
raise ValueError(
f"Unexpected names internal {schema.names} vs generated {ibis_schema.names}"
)
if schema.dtypes != ibis_schema.dtypes:
raise ValueError(
f"Unexpected types internal {schema.dtypes} vs generated {ibis_schema.dtypes}"
)

def _try_evaluate_local(self):
"""Use only for unit testing paths - not fully featured. Will throw exception if fails."""
import ibis
Expand All @@ -103,7 +142,7 @@ def _try_evaluate_local(self):
)

def get_column_type(self, key: str) -> bigframes.dtypes.Dtype:
return self._compile_ordered().get_column_type(key)
return self.schema.get_type(key)

def _compile_ordered(self) -> compiling.OrderedIR:
return compiling.compile_ordered_ir(self.node)
Expand Down
52 changes: 31 additions & 21 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import warnings

import google.cloud.bigquery as bigquery
import ibis
import pandas as pd
import pyarrow as pa

import bigframes._config.sampling_options as sampling_options
import bigframes.constants as constants
Expand All @@ -45,7 +47,6 @@
import bigframes.operations as ops
import bigframes.operations.aggregations as agg_ops
import bigframes.session._io.pandas
import third_party.bigframes_vendored.pandas.io.common as vendored_pandas_io_common

# Type constraint for wherever column labels are used
Label = typing.Hashable
Expand Down Expand Up @@ -141,35 +142,44 @@ def __init__(

@classmethod
def from_local(cls, data) -> Block:
# Try to intrpet data into columns of supported dtypes
pd_data = pd.DataFrame(data)
columns = pd_data.columns

# Make a flattened version to treat as a table.
if len(pd_data.columns.names) > 1:
pd_data.columns = columns.to_flat_index()

column_labels = pd_data.columns
index_labels = list(pd_data.index.names)
# The ArrayValue layer doesn't know about indexes, so make sure indexes
# are real columns with unique IDs.
pd_data = pd_data.reset_index(
names=[f"level_{level}" for level in range(len(index_labels))]
)
pd_data = pd_data.set_axis(
vendored_pandas_io_common.dedup_names(
list(pd_data.columns), is_potential_multiindex=False
),
axis="columns",
)
index_ids = pd_data.columns[: len(index_labels)]

keys_expr = core.ArrayValue.from_pandas(pd_data)
# unique internal ids
column_ids = [f"column_{i}" for i in range(len(pd_data.columns))]
index_ids = [f"level_{level}" for level in range(pd_data.index.nlevels)]

pd_data = pd_data.set_axis(column_ids, axis=1)
pd_data = pd_data.reset_index(names=index_ids)
pd_data = cls._adapt_pandas_schema(pd_data)
as_pyarrow = pa.Table.from_pandas(pd_data, preserve_index=False)
keys_expr = core.ArrayValue.from_pyarrow(as_pyarrow)
return cls(
keys_expr,
column_labels=columns,
column_labels=column_labels,
index_columns=index_ids,
index_labels=index_labels,
)

@classmethod
def _adapt_pandas_schema(cls, pd_df: pd.DataFrame) -> pd.DataFrame:
"""Adapts a pandas dataframe to use BigFrames compatible types (or throw if not possible)"""
# TODO: Take ownership of this logic rather than rely on Ibis.
# ibis memtable cannot handle NA, must convert to None
pd_df = pd_df.astype("object") # type: ignore
pd_df = pd_df.where(pd.notnull(pd_df), None)
keys_memtable = ibis.memtable(pd_df)
schema = keys_memtable.schema()

def convert_series_to_bf_type(col: pd.Series) -> pd.Series:
return col.astype(
bigframes.dtypes.ibis_dtype_to_bigframes_dtype(schema[col.name])
)

return pd_df.apply(convert_series_to_bf_type)

@property
def index(self) -> BlockIndexProperties:
"""Row identities for values in the Block."""
Expand Down
42 changes: 6 additions & 36 deletions bigframes/core/compile/compiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,53 +630,23 @@ def from_pandas(
) -> OrderedIR:
"""
Builds an in-memory only (SQL only) expr from a pandas dataframe.
Assumed that the dataframe has unique string column names and bigframes-suppported dtypes.
"""
# We can't include any hidden columns in the ArrayValue constructor, so
# grab the column names before we add the hidden ordering column.
column_names = [str(column) for column in pd_df.columns]
# Make sure column names are all strings.
pd_df = pd_df.set_axis(column_names, axis="columns")
pd_df = pd_df.assign(**{ORDER_ID_COLUMN: range(len(pd_df))})
pd_df[ORDER_ID_COLUMN] = pd_df[ORDER_ID_COLUMN].astype(
bigframes.dtypes.INT_DTYPE
)

# ibis memtable cannot handle NA, must convert to None
pd_df = pd_df.astype("object") # type: ignore
pd_df = pd_df.where(pandas.notnull(pd_df), None)

# NULL type isn't valid in BigQuery, so retry with an explicit schema in these cases.
keys_memtable = ibis.memtable(pd_df)
schema = keys_memtable.schema()
new_schema = []
for column_index, column in enumerate(schema):
if column == ORDER_ID_COLUMN:
new_type: ibis_dtypes.DataType = ibis_dtypes.int64
else:
column_type = schema[column]
# The autodetected type might not be one we can support, such
# as NULL type for empty rows, so convert to a type we do
# support.
new_type = bigframes.dtypes.bigframes_dtype_to_ibis_dtype(
bigframes.dtypes.ibis_dtype_to_bigframes_dtype(column_type)
)
# TODO(swast): Ibis memtable doesn't use backticks in struct
# field names, so spaces and other characters aren't allowed in
# the memtable context. Blocked by
# https://github.com/ibis-project/ibis/issues/7187
column = f"col_{column_index}"
new_schema.append((column, new_type))

# must set non-null column labels. these are not the user-facing labels
pd_df = pd_df.set_axis(
[column for column, _ in new_schema],
axis="columns",
)
keys_memtable = ibis.memtable(pd_df, schema=ibis.schema(new_schema))

return cls(
keys_memtable,
columns=[
keys_memtable[f"col_{column_index}"].name(column)
for column_index, column in enumerate(column_names)
],
columns=[keys_memtable[column].name(column) for column in column_names],
ordering=ExpressionOrdering(
ordering_value_columns=tuple(
[OrderingColumnReference(ORDER_ID_COLUMN)]
Expand Down
4 changes: 3 additions & 1 deletion bigframes/core/compile/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ def compile_join(node: nodes.JoinNode, ordered: bool = True):

@_compile_node.register
def compile_readlocal(node: nodes.ReadLocalNode, ordered: bool = True):
array_as_pd = pd.read_feather(io.BytesIO(node.feather_bytes))
array_as_pd = pd.read_feather(io.BytesIO(node.feather_bytes)).astype(
{field.column: field.dtype for field in node.schema.items}
)
ordered_ir = compiled.OrderedIR.from_pandas(array_as_pd)
if ordered:
return ordered_ir
Expand Down
20 changes: 19 additions & 1 deletion bigframes/core/expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ class Aggregation(abc.ABC):

op: agg_ops.WindowOp = dataclasses.field()

@abc.abstractmethod
def output_type(
self, input_types: dict[str, dtypes.ExpressionType]
) -> dtypes.ExpressionType:
...


@dataclasses.dataclass(frozen=True)
class UnaryAggregation(Aggregation):
Expand All @@ -47,6 +53,11 @@ class UnaryAggregation(Aggregation):
UnboundVariableExpression, ScalarConstantExpression
] = dataclasses.field()

def output_type(
self, input_types: dict[str, bigframes.dtypes.Dtype]
) -> dtypes.ExpressionType:
return self.op.output_type(self.arg.output_type(input_types))


@dataclasses.dataclass(frozen=True)
class BinaryAggregation(Aggregation):
Expand All @@ -58,6 +69,13 @@ class BinaryAggregation(Aggregation):
UnboundVariableExpression, ScalarConstantExpression
] = dataclasses.field()

def output_type(
self, input_types: dict[str, bigframes.dtypes.Dtype]
) -> dtypes.ExpressionType:
return self.op.output_type(
self.left.output_type(input_types), self.right.output_type(input_types)
)


@dataclasses.dataclass(frozen=True)
class Expression(abc.ABC):
Expand Down Expand Up @@ -126,7 +144,7 @@ def output_type(
if self.id in input_types:
return input_types[self.id]
else:
raise ValueError("Type of variable has not been fixed.")
raise ValueError(f"Type of variable {self.id} has not been fixed.")


@dataclasses.dataclass(frozen=True)
Expand Down
Loading

0 comments on commit 1b79355

Please sign in to comment.