Skip to content

Commit

Permalink
refactor: complete dtype rules for expression tree transformations (#376
Browse files Browse the repository at this point in the history
)
  • Loading branch information
TrevorBergeron authored Mar 20, 2024
1 parent 2b9a01d commit 43d0864
Show file tree
Hide file tree
Showing 24 changed files with 794 additions and 178 deletions.
59 changes: 47 additions & 12 deletions bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,26 @@
from __future__ import annotations

from dataclasses import dataclass
import functools
import io
import typing
from typing import Iterable, Sequence

import ibis.expr.types as ibis_types
import pandas
import pyarrow as pa
import pyarrow.feather as pa_feather

import bigframes.core.compile as compiling
import bigframes.core.expression as ex
import bigframes.core.guid
import bigframes.core.join_def as join_def
import bigframes.core.local_data as local_data
import bigframes.core.nodes as nodes
from bigframes.core.ordering import OrderingColumnReference
import bigframes.core.ordering as orderings
import bigframes.core.rewrite
import bigframes.core.schema as schemata
import bigframes.core.utils
from bigframes.core.window_spec import WindowSpec
import bigframes.dtypes
Expand Down Expand Up @@ -63,28 +68,32 @@ def from_ibis(
node = nodes.ReadGbqNode(
table=table,
table_session=session,
columns=tuple(columns),
columns=tuple(
bigframes.dtypes.ibis_value_to_canonical_type(column)
for column in columns
),
hidden_ordering_columns=tuple(hidden_ordering_columns),
ordering=ordering,
)
return cls(node)

@classmethod
def from_pandas(cls, pd_df: pandas.DataFrame, session: bigframes.Session):
def from_pyarrow(cls, arrow_table: pa.Table, session: Session):
adapted_table = local_data.adapt_pa_table(arrow_table)
schema = local_data.arrow_schema_to_bigframes(adapted_table.schema)

iobytes = io.BytesIO()
# Use alphanumeric identifiers, to avoid downstream problems with escaping.
as_ids = [
bigframes.core.utils.label_to_identifier(label, strict=True)
for label in pd_df.columns
]
unique_ids = tuple(bigframes.core.utils.disambiguate_ids(as_ids))
pd_df.reset_index(drop=True).set_axis(unique_ids, axis=1).to_feather(iobytes)
node = nodes.ReadLocalNode(feather_bytes=iobytes.getvalue(), session=session)
pa_feather.write_feather(adapted_table, iobytes)
node = nodes.ReadLocalNode(
iobytes.getvalue(),
data_schema=schema,
session=session,
)
return cls(node)

@property
def column_ids(self) -> typing.Sequence[str]:
return self._compile_ordered().column_ids
return self.schema.names

@property
def session(self) -> Session:
Expand All @@ -95,6 +104,32 @@ def session(self) -> Session:
required_session if (required_session is not None) else get_global_session()
)

@functools.cached_property
def schema(self) -> schemata.ArraySchema:
# TODO: switch to use self.node.schema
return self._compiled_schema

@functools.cached_property
def _compiled_schema(self) -> schemata.ArraySchema:
compiled = self._compile_unordered()
items = tuple(
schemata.SchemaItem(id, compiled.get_column_type(id))
for id in compiled.column_ids
)
return schemata.ArraySchema(items)

def validate_schema(self):
tree_derived = self.node.schema
ibis_derived = self._compiled_schema
if tree_derived.names != ibis_derived.names:
raise ValueError(
f"Unexpected names internal {tree_derived.names} vs compiled {ibis_derived.names}"
)
if tree_derived.dtypes != ibis_derived.dtypes:
raise ValueError(
f"Unexpected types internal {tree_derived.dtypes} vs compiled {ibis_derived.dtypes}"
)

def _try_evaluate_local(self):
"""Use only for unit testing paths - not fully featured. Will throw exception if fails."""
import ibis
Expand All @@ -104,7 +139,7 @@ def _try_evaluate_local(self):
)

def get_column_type(self, key: str) -> bigframes.dtypes.Dtype:
return self._compile_ordered().get_column_type(key)
return self.schema.get_type(key)

def _compile_ordered(self) -> compiling.OrderedIR:
return compiling.compile_ordered_ir(self.node)
Expand Down
40 changes: 16 additions & 24 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
from typing import Iterable, List, Mapping, Optional, Sequence, Tuple
import warnings

import bigframes_vendored.pandas.io.common as vendored_pandas_io_common
import google.cloud.bigquery as bigquery
import pandas as pd
import pyarrow as pa

import bigframes._config.sampling_options as sampling_options
import bigframes.constants as constants
Expand Down Expand Up @@ -141,32 +141,23 @@ def __init__(
self._stats_cache[" ".join(self.index_columns)] = {}

@classmethod
def from_local(cls, data, session: bigframes.Session) -> Block:
pd_data = pd.DataFrame(data)
columns = pd_data.columns

# Make a flattened version to treat as a table.
if len(pd_data.columns.names) > 1:
pd_data.columns = columns.to_flat_index()

def from_local(cls, data: pd.DataFrame, session: bigframes.Session) -> Block:
# Assumes caller has already converted datatypes to bigframes ones.
pd_data = data
column_labels = pd_data.columns
index_labels = list(pd_data.index.names)
# The ArrayValue layer doesn't know about indexes, so make sure indexes
# are real columns with unique IDs.
pd_data = pd_data.reset_index(
names=[f"level_{level}" for level in range(len(index_labels))]
)
pd_data = pd_data.set_axis(
vendored_pandas_io_common.dedup_names(
list(pd_data.columns), is_potential_multiindex=False
),
axis="columns",
)
index_ids = pd_data.columns[: len(index_labels)]

keys_expr = core.ArrayValue.from_pandas(pd_data, session)
# unique internal ids
column_ids = [f"column_{i}" for i in range(len(pd_data.columns))]
index_ids = [f"level_{level}" for level in range(pd_data.index.nlevels)]

pd_data = pd_data.set_axis(column_ids, axis=1)
pd_data = pd_data.reset_index(names=index_ids)
as_pyarrow = pa.Table.from_pandas(pd_data, preserve_index=False)
array_value = core.ArrayValue.from_pyarrow(as_pyarrow, session=session)
return cls(
keys_expr,
column_labels=columns,
array_value,
column_labels=column_labels,
index_columns=index_ids,
index_labels=index_labels,
)
Expand Down Expand Up @@ -484,6 +475,7 @@ def _copy_index_to_pandas(self, df: pd.DataFrame):
# general Sequence[Label] that BigQuery DataFrames has.
# See: https://github.com/pandas-dev/pandas-stubs/issues/804
df.index.names = self.index.names # type: ignore
df.columns = self.column_labels

def _materialize_local(
self, materialize_options: MaterializationOptions = MaterializationOptions()
Expand Down
57 changes: 16 additions & 41 deletions bigframes/core/compile/compiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
IntegerEncoding,
OrderingColumnReference,
)
import bigframes.core.schema as schemata
import bigframes.core.utils as utils
from bigframes.core.window_spec import WindowSpec
import bigframes.dtypes
Expand Down Expand Up @@ -627,56 +628,30 @@ def __init__(
def from_pandas(
cls,
pd_df: pandas.DataFrame,
schema: schemata.ArraySchema,
) -> OrderedIR:
"""
Builds an in-memory only (SQL only) expr from a pandas dataframe.
Assumed that the dataframe has unique string column names and bigframes-suppported dtypes.
"""
# We can't include any hidden columns in the ArrayValue constructor, so
# grab the column names before we add the hidden ordering column.
column_names = [str(column) for column in pd_df.columns]
# Make sure column names are all strings.
pd_df = pd_df.set_axis(column_names, axis="columns")
pd_df = pd_df.assign(**{ORDER_ID_COLUMN: range(len(pd_df))})

# ibis memtable cannot handle NA, must convert to None
pd_df = pd_df.astype("object") # type: ignore
pd_df = pd_df.where(pandas.notnull(pd_df), None)

# NULL type isn't valid in BigQuery, so retry with an explicit schema in these cases.
keys_memtable = ibis.memtable(pd_df)
schema = keys_memtable.schema()
new_schema = []
for column_index, column in enumerate(schema):
if column == ORDER_ID_COLUMN:
new_type: ibis_dtypes.DataType = ibis_dtypes.int64
else:
column_type = schema[column]
# The autodetected type might not be one we can support, such
# as NULL type for empty rows, so convert to a type we do
# support.
new_type = bigframes.dtypes.bigframes_dtype_to_ibis_dtype(
bigframes.dtypes.ibis_dtype_to_bigframes_dtype(column_type)
)
# TODO(swast): Ibis memtable doesn't use backticks in struct
# field names, so spaces and other characters aren't allowed in
# the memtable context. Blocked by
# https://github.com/ibis-project/ibis/issues/7187
column = f"col_{column_index}"
new_schema.append((column, new_type))

# must set non-null column labels. these are not the user-facing labels
pd_df = pd_df.set_axis(
[column for column, _ in new_schema],
axis="columns",
)
keys_memtable = ibis.memtable(pd_df, schema=ibis.schema(new_schema))
# this destroys the schema however
ibis_values = pd_df.astype("object").where(pandas.notnull(pd_df), None) # type: ignore
ibis_values = ibis_values.assign(**{ORDER_ID_COLUMN: range(len(pd_df))})
# derive the ibis schema from the original pandas schema
ibis_schema = [
(name, bigframes.dtypes.bigframes_dtype_to_ibis_dtype(dtype))
for name, dtype in zip(schema.names, schema.dtypes)
]
ibis_schema.append((ORDER_ID_COLUMN, ibis_dtypes.int64))

keys_memtable = ibis.memtable(ibis_values, schema=ibis.schema(ibis_schema))

return cls(
keys_memtable,
columns=[
keys_memtable[f"col_{column_index}"].name(column)
for column_index, column in enumerate(column_names)
],
columns=[keys_memtable[column].name(column) for column in pd_df.columns],
ordering=ExpressionOrdering(
ordering_value_columns=tuple(
[OrderingColumnReference(ORDER_ID_COLUMN)]
Expand Down
5 changes: 3 additions & 2 deletions bigframes/core/compile/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ def compile_peak_sql(node: nodes.BigFrameNode, n_rows: int) -> typing.Optional[s
return compile_unordered_ir(node).peek_sql(n_rows)


@functools.cache
# TODO: Remove cache when schema no longer requires compilation to derive schema (and therefor only compiles for execution)
@functools.lru_cache(maxsize=5000)
def compile_node(
node: nodes.BigFrameNode, ordered: bool = True
) -> compiled.UnorderedIR | compiled.OrderedIR:
Expand Down Expand Up @@ -80,7 +81,7 @@ def compile_join(node: nodes.JoinNode, ordered: bool = True):
@_compile_node.register
def compile_readlocal(node: nodes.ReadLocalNode, ordered: bool = True):
array_as_pd = pd.read_feather(io.BytesIO(node.feather_bytes))
ordered_ir = compiled.OrderedIR.from_pandas(array_as_pd)
ordered_ir = compiled.OrderedIR.from_pandas(array_as_pd, node.schema)
if ordered:
return ordered_ir
else:
Expand Down
Loading

0 comments on commit 43d0864

Please sign in to comment.