Skip to content

Commit

Permalink
Merge branch 'main' into b296390934-docstrings-type-annotation-raises
Browse files Browse the repository at this point in the history
  • Loading branch information
arwas11 authored Nov 6, 2024
2 parents 3b38cc6 + e7fa913 commit efe645c
Show file tree
Hide file tree
Showing 13 changed files with 614 additions and 128 deletions.
8 changes: 7 additions & 1 deletion bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,13 @@ def promote_offsets(self) -> Tuple[ArrayValue, str]:
def concat(self, other: typing.Sequence[ArrayValue]) -> ArrayValue:
"""Append together multiple ArrayValue objects."""
return ArrayValue(
nodes.ConcatNode(children=tuple([self.node, *[val.node for val in other]]))
nodes.ConcatNode(
children=tuple([self.node, *[val.node for val in other]]),
output_ids=tuple(
ids.ColumnId(bigframes.core.guid.generate_guid())
for id in self.column_ids
),
)
)

def compute_values(self, assignments: Sequence[ex.Expression]):
Expand Down
2 changes: 1 addition & 1 deletion bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3139,7 +3139,7 @@ def _pd_index_to_array_value(
rows = []
labels_as_tuples = utils.index_as_tuples(index)
for row_offset in range(len(index)):
id_gen = bigframes.core.identifiers.standard_identifiers()
id_gen = bigframes.core.identifiers.standard_id_strings()
row_label = labels_as_tuples[row_offset]
row_label = (row_label,) if not isinstance(row_label, tuple) else row_label
row = {}
Expand Down
28 changes: 13 additions & 15 deletions bigframes/core/compile/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
import google.cloud.bigquery as bigquery

import bigframes.core.compile.compiler as compiler
import bigframes.core.rewrite as rewrites

if TYPE_CHECKING:
import bigframes.core.nodes
import bigframes.core.ordering
import bigframes.core.schema

_STRICT_COMPILER = compiler.Compiler(strict=True)
_STRICT_COMPILER = compiler.Compiler(
strict=True, enable_pruning=True, enable_densify_ids=True
)


class SQLCompiler:
Expand All @@ -34,7 +35,7 @@ def __init__(self, strict: bool = True):

def compile_peek(self, node: bigframes.core.nodes.BigFrameNode, n_rows: int) -> str:
"""Compile node into sql that selects N arbitrary rows, may not execute deterministically."""
return self._compiler.compile_unordered_ir(node).peek_sql(n_rows)
return self._compiler.compile_peek_sql(node, n_rows)

def compile_unordered(
self,
Expand All @@ -44,9 +45,8 @@ def compile_unordered(
) -> str:
"""Compile node into sql where rows are unsorted, and no ordering information is preserved."""
# TODO: Enable limit pullup, but only if not being used to write to clustered table.
return self._compiler.compile_unordered_ir(node).to_sql(
col_id_overrides=col_id_overrides
)
output_ids = [col_id_overrides.get(id, id) for id in node.schema.names]
return self._compiler.compile_sql(node, ordered=False, output_ids=output_ids)

def compile_ordered(
self,
Expand All @@ -56,10 +56,8 @@ def compile_ordered(
) -> str:
"""Compile node into sql where rows are sorted with ORDER BY."""
# If we are ordering the query anyways, compiling the slice as a limit is probably a good idea.
new_node, limit = rewrites.pullup_limit_from_slice(node)
return self._compiler.compile_ordered_ir(new_node).to_sql(
col_id_overrides=col_id_overrides, ordered=True, limit=limit
)
output_ids = [col_id_overrides.get(id, id) for id in node.schema.names]
return self._compiler.compile_sql(node, ordered=True, output_ids=output_ids)

def compile_raw(
self,
Expand All @@ -68,13 +66,12 @@ def compile_raw(
str, Sequence[bigquery.SchemaField], bigframes.core.ordering.RowOrdering
]:
"""Compile node into sql that exposes all columns, including hidden ordering-only columns."""
ir = self._compiler.compile_ordered_ir(node)
sql, schema = ir.raw_sql_and_schema()
return sql, schema, ir._ordering
return self._compiler.compile_raw(node)


def test_only_try_evaluate(node: bigframes.core.nodes.BigFrameNode):
"""Use only for unit testing paths - not fully featured. Will throw exception if fails."""
node = _STRICT_COMPILER._preprocess(node)
ibis = _STRICT_COMPILER.compile_ordered_ir(node)._to_ibis_expr(
ordering_mode="unordered"
)
Expand All @@ -85,9 +82,10 @@ def test_only_ibis_inferred_schema(node: bigframes.core.nodes.BigFrameNode):
"""Use only for testing paths to ensure ibis inferred schema does not diverge from bigframes inferred schema."""
import bigframes.core.schema

node = _STRICT_COMPILER._preprocess(node)
compiled = _STRICT_COMPILER.compile_unordered_ir(node)
items = tuple(
bigframes.core.schema.SchemaItem(id, compiled.get_column_type(id))
for id in compiled.column_ids
bigframes.core.schema.SchemaItem(name, compiled.get_column_type(ibis_id))
for name, ibis_id in zip(node.schema.names, compiled.column_ids)
)
return bigframes.core.schema.ArraySchema(items)
51 changes: 17 additions & 34 deletions bigframes/core/compile/compiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,12 @@ def _aggregate_base(
)
# Must have deterministic ordering, so order by the unique "by" column
ordering = TotalOrdering(
tuple([OrderingExpression(column_id) for column_id in by_column_ids]),
tuple(
[
OrderingExpression(ex.DerefOp(ref.id.local_normalized))
for ref in by_column_ids
]
),
total_ordering_columns=frozenset(
[ex.DerefOp(ref.id.local_normalized) for ref in by_column_ids]
),
Expand Down Expand Up @@ -266,31 +271,26 @@ def peek_sql(self, n: int):
def to_sql(
self,
offset_column: typing.Optional[str] = None,
col_id_overrides: typing.Mapping[str, str] = {},
ordered: bool = False,
) -> str:
if offset_column or ordered:
raise ValueError("Cannot produce sorted sql in partial ordering mode")
sql = ibis_bigquery.Backend().compile(
self._to_ibis_expr(
col_id_overrides=col_id_overrides,
)
)
sql = ibis_bigquery.Backend().compile(self._to_ibis_expr())
return typing.cast(str, sql)

def row_count(self) -> OrderedIR:
def row_count(self, name: str) -> OrderedIR:
original_table = self._to_ibis_expr()
ibis_table = original_table.agg(
[
original_table.count().name("count"),
original_table.count().name(name),
]
)
return OrderedIR(
ibis_table,
(ibis_table["count"],),
(ibis_table[name],),
ordering=TotalOrdering(
ordering_value_columns=(ascending_over("count"),),
total_ordering_columns=frozenset([ex.deref("count")]),
ordering_value_columns=(ascending_over(name),),
total_ordering_columns=frozenset([ex.deref(name)]),
),
)

Expand All @@ -299,7 +299,6 @@ def _to_ibis_expr(
*,
expose_hidden_cols: bool = False,
fraction: Optional[float] = None,
col_id_overrides: typing.Mapping[str, str] = {},
):
"""
Creates an Ibis table expression representing the DataFrame.
Expand All @@ -320,8 +319,6 @@ def _to_ibis_expr(
If True, include the hidden ordering columns in the results.
Only compatible with `order_by` and `unordered`
``ordering_mode``.
col_id_overrides:
overrides the column ids for the result
Returns:
An ibis expression representing the data help by the ArrayValue object.
"""
Expand All @@ -346,10 +343,6 @@ def _to_ibis_expr(
if self._reduced_predicate is not None:
table = table.filter(base_table[PREDICATE_COLUMN])
table = table.drop(*columns_to_drop)
if col_id_overrides:
table = table.rename(
{value: key for key, value in col_id_overrides.items()}
)
if fraction is not None:
table = table.filter(ibis.random() < ibis.literal(fraction))
return table
Expand Down Expand Up @@ -941,7 +934,6 @@ def _reproject_to_table(self) -> OrderedIR:

def to_sql(
self,
col_id_overrides: typing.Mapping[str, str] = {},
ordered: bool = False,
limit: Optional[int] = None,
) -> str:
Expand All @@ -951,17 +943,13 @@ def to_sql(
sql = ibis_bigquery.Backend().compile(
baked_ir._to_ibis_expr(
ordering_mode="unordered",
col_id_overrides=col_id_overrides,
expose_hidden_cols=True,
)
)
output_columns = [
col_id_overrides.get(col, col) for col in baked_ir.column_ids
]
sql = (
bigframes.core.compile.googlesql.Select()
.from_(sql)
.select(output_columns)
.select(self.column_ids)
.sql()
)

Expand All @@ -979,24 +967,26 @@ def to_sql(
sql = ibis_bigquery.Backend().compile(
self._to_ibis_expr(
ordering_mode="unordered",
col_id_overrides=col_id_overrides,
expose_hidden_cols=False,
)
)
return typing.cast(str, sql)

def raw_sql_and_schema(
self,
column_ids: typing.Sequence[str],
) -> typing.Tuple[str, typing.Sequence[google.cloud.bigquery.SchemaField]]:
"""Return sql with all hidden columns. Used to cache with ordering information.
Also returns schema, as the extra ordering columns are determined compile-time.
"""
col_id_overrides = dict(zip(self.column_ids, column_ids))
all_columns = (*self.column_ids, *self._hidden_ordering_column_names.keys())
as_ibis = self._to_ibis_expr(
ordering_mode="unordered",
expose_hidden_cols=True,
).select(all_columns)
)
as_ibis = as_ibis.select(all_columns).rename(col_id_overrides)

# Ibis will produce non-nullable schema types, but bigframes should always be nullable
fixed_ibis_schema = ibis_schema.Schema.from_tuples(
Expand All @@ -1013,7 +1003,6 @@ def _to_ibis_expr(
*,
expose_hidden_cols: bool = False,
fraction: Optional[float] = None,
col_id_overrides: typing.Mapping[str, str] = {},
ordering_mode: Literal["string_encoded", "unordered"],
order_col_name: Optional[str] = ORDER_ID_COLUMN,
):
Expand Down Expand Up @@ -1043,8 +1032,6 @@ def _to_ibis_expr(
order_col_name:
If the ordering mode outputs a single ordering or offsets
column, use this as the column name.
col_id_overrides:
overrides the column ids for the result
Returns:
An ibis expression representing the data help by the ArrayValue object.
"""
Expand Down Expand Up @@ -1086,10 +1073,6 @@ def _to_ibis_expr(
if self._reduced_predicate is not None:
table = table.filter(base_table[PREDICATE_COLUMN])
table = table.drop(*columns_to_drop)
if col_id_overrides:
table = table.rename(
{value: key for key, value in col_id_overrides.items()}
)
if fraction is not None:
table = table.filter(ibis.random() < ibis.literal(fraction))
return table
Expand Down
70 changes: 54 additions & 16 deletions bigframes/core/compile/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io
import typing

import google.cloud.bigquery
import ibis
import ibis.backends
import ibis.backends.bigquery
Expand All @@ -32,6 +33,7 @@
import bigframes.core.compile.scalar_op_compiler as compile_scalar
import bigframes.core.compile.schema_translator
import bigframes.core.compile.single_column
import bigframes.core.expression as ex
import bigframes.core.guid as guids
import bigframes.core.identifiers as ids
import bigframes.core.nodes as nodes
Expand All @@ -50,31 +52,66 @@ class Compiler:
strict: bool = True
scalar_op_compiler = compile_scalar.ScalarOpCompiler()
enable_pruning: bool = False
enable_densify_ids: bool = False

def compile_sql(
self, node: nodes.BigFrameNode, ordered: bool, output_ids: typing.Sequence[str]
) -> str:
node = self.set_output_names(node, output_ids)
if ordered:
node, limit = rewrites.pullup_limit_from_slice(node)
return self.compile_ordered_ir(self._preprocess(node)).to_sql(
ordered=True, limit=limit
)
else:
return self.compile_unordered_ir(self._preprocess(node)).to_sql()

def compile_peek_sql(self, node: nodes.BigFrameNode, n_rows: int) -> str:
return self.compile_unordered_ir(self._preprocess(node)).peek_sql(n_rows)

def compile_raw(
self,
node: bigframes.core.nodes.BigFrameNode,
) -> typing.Tuple[
str, typing.Sequence[google.cloud.bigquery.SchemaField], bf_ordering.RowOrdering
]:
ir = self.compile_ordered_ir(self._preprocess(node))
sql, schema = ir.raw_sql_and_schema(column_ids=node.schema.names)
return sql, schema, ir._ordering

def _preprocess(self, node: nodes.BigFrameNode):
if self.enable_pruning:
used_fields = frozenset(field.id for field in node.fields)
node = node.prune(used_fields)
node = functools.cache(rewrites.replace_slice_ops)(node)
if self.enable_densify_ids:
original_names = [id.name for id in node.ids]
node, _ = rewrites.remap_variables(
node, id_generator=ids.anonymous_serial_ids()
)
node = self.set_output_names(node, original_names)
return node

def compile_ordered_ir(self, node: nodes.BigFrameNode) -> compiled.OrderedIR:
ir = typing.cast(
compiled.OrderedIR, self.compile_node(self._preprocess(node), True)
def set_output_names(
self, node: bigframes.core.nodes.BigFrameNode, output_ids: typing.Sequence[str]
):
# TODO: Create specialized output operators that will handle final names
return nodes.SelectionNode(
node,
tuple(
(ex.DerefOp(old_id), ids.ColumnId(out_id))
for old_id, out_id in zip(node.ids, output_ids)
),
)

def compile_ordered_ir(self, node: nodes.BigFrameNode) -> compiled.OrderedIR:
ir = typing.cast(compiled.OrderedIR, self.compile_node(node, True))
if self.strict:
assert ir.has_total_order
return ir

def compile_unordered_ir(self, node: nodes.BigFrameNode) -> compiled.UnorderedIR:
return typing.cast(
compiled.UnorderedIR, self.compile_node(self._preprocess(node), False)
)

def compile_peak_sql(
self, node: nodes.BigFrameNode, n_rows: int
) -> typing.Optional[str]:
return self.compile_unordered_ir(self._preprocess(node)).peek_sql(n_rows)
return typing.cast(compiled.UnorderedIR, self.compile_node(node, False))

# TODO: Remove cache when schema no longer requires compilation to derive schema (and therefor only compiles for execution)
@functools.lru_cache(maxsize=5000)
Expand Down Expand Up @@ -144,11 +181,11 @@ def compile_fromrange(self, node: nodes.FromRangeNode, ordered: bool = True):

labels_array_table = ibis.range(
joined_table[start_column], joined_table[end_column] + node.step, node.step
).name("labels")
).name(node.output_id.sql)
labels = (
typing.cast(ibis.expr.types.ArrayValue, labels_array_table)
.as_table()
.unnest(["labels"])
.unnest([node.output_id.sql])
)
if ordered:
return compiled.OrderedIR(
Expand Down Expand Up @@ -307,18 +344,19 @@ def compile_projection(self, node: nodes.ProjectionNode, ordered: bool = True):

@_compile_node.register
def compile_concat(self, node: nodes.ConcatNode, ordered: bool = True):
output_ids = [id.sql for id in node.output_ids]
if ordered:
compiled_ordered = [self.compile_ordered_ir(node) for node in node.children]
return concat_impl.concat_ordered(compiled_ordered)
return concat_impl.concat_ordered(compiled_ordered, output_ids)
else:
compiled_unordered = [
self.compile_unordered_ir(node) for node in node.children
]
return concat_impl.concat_unordered(compiled_unordered)
return concat_impl.concat_unordered(compiled_unordered, output_ids)

@_compile_node.register
def compile_rowcount(self, node: nodes.RowCountNode, ordered: bool = True):
result = self.compile_unordered_ir(node.child).row_count()
result = self.compile_unordered_ir(node.child).row_count(name=node.col_id.sql)
return result if ordered else result.to_unordered()

@_compile_node.register
Expand Down
Loading

0 comments on commit efe645c

Please sign in to comment.