Skip to content

Commit

Permalink
feat: Allow join-free alignment of analytic expressions (#1168)
Browse files Browse the repository at this point in the history
* feat: Allow join-free alignment of analytic expressions

* address pr comments

* fix bugs in pull_up_selection

* fix unit test and remove validations

* fix test failures
  • Loading branch information
TrevorBergeron authored Nov 26, 2024
1 parent 1c8d510 commit daef4f0
Show file tree
Hide file tree
Showing 13 changed files with 747 additions and 341 deletions.
84 changes: 55 additions & 29 deletions bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import pyarrow as pa
import pyarrow.feather as pa_feather

import bigframes.core.compile
import bigframes.core.expression as ex
import bigframes.core.guid
import bigframes.core.identifiers as ids
Expand All @@ -35,15 +34,13 @@
import bigframes.core.nodes as nodes
from bigframes.core.ordering import OrderingExpression
import bigframes.core.ordering as orderings
import bigframes.core.rewrite
import bigframes.core.schema as schemata
import bigframes.core.tree_properties
import bigframes.core.utils
from bigframes.core.window_spec import WindowSpec
import bigframes.dtypes
import bigframes.operations as ops
import bigframes.operations.aggregations as agg_ops
import bigframes.session._io.bigquery

if typing.TYPE_CHECKING:
from bigframes.session import Session
Expand Down Expand Up @@ -199,6 +196,8 @@ def as_cached(

def _try_evaluate_local(self):
"""Use only for unit testing paths - not fully featured. Will throw exception if fails."""
import bigframes.core.compile

return bigframes.core.compile.test_only_try_evaluate(self.node)

def get_column_type(self, key: str) -> bigframes.dtypes.Dtype:
Expand Down Expand Up @@ -422,22 +421,7 @@ def relational_join(
l_mapping = { # Identity mapping, only rename right side
lcol.name: lcol.name for lcol in self.node.ids
}
r_mapping = { # Rename conflicting names
rcol.name: rcol.name
if (rcol.name not in l_mapping)
else bigframes.core.guid.generate_guid()
for rcol in other.node.ids
}
other_node = other.node
if set(other_node.ids) & set(self.node.ids):
other_node = nodes.SelectionNode(
other_node,
tuple(
(ex.deref(old_id), ids.ColumnId(new_id))
for old_id, new_id in r_mapping.items()
),
)

other_node, r_mapping = self.prepare_join_names(other)
join_node = nodes.JoinNode(
left_child=self.node,
right_child=other_node,
Expand All @@ -449,14 +433,63 @@ def relational_join(
)
return ArrayValue(join_node), (l_mapping, r_mapping)

def try_align_as_projection(
def try_row_join(
self,
other: ArrayValue,
conditions: typing.Tuple[typing.Tuple[str, str], ...] = (),
) -> Optional[
typing.Tuple[ArrayValue, typing.Tuple[dict[str, str], dict[str, str]]]
]:
l_mapping = { # Identity mapping, only rename right side
lcol.name: lcol.name for lcol in self.node.ids
}
other_node, r_mapping = self.prepare_join_names(other)
import bigframes.core.rewrite

result_node = bigframes.core.rewrite.try_join_as_projection(
self.node, other_node, conditions
)
if result_node is None:
return None

return (
ArrayValue(result_node),
(l_mapping, r_mapping),
)

def prepare_join_names(
self, other: ArrayValue
) -> Tuple[bigframes.core.nodes.BigFrameNode, dict[str, str]]:
if set(other.node.ids) & set(self.node.ids):
r_mapping = { # Rename conflicting names
rcol.name: rcol.name
if (rcol.name not in self.column_ids)
else bigframes.core.guid.generate_guid()
for rcol in other.node.ids
}
return (
nodes.SelectionNode(
other.node,
tuple(
(ex.deref(old_id), ids.ColumnId(new_id))
for old_id, new_id in r_mapping.items()
),
),
r_mapping,
)
else:
return other.node, {id: id for id in other.column_ids}

def try_legacy_row_join(
self,
other: ArrayValue,
join_type: join_def.JoinType,
join_keys: typing.Tuple[join_def.CoalescedColumnMapping, ...],
mappings: typing.Tuple[join_def.JoinColumnMapping, ...],
) -> typing.Optional[ArrayValue]:
result = bigframes.core.rewrite.join_as_projection(
import bigframes.core.rewrite

result = bigframes.core.rewrite.legacy_join_as_projection(
self.node, other.node, join_keys, mappings, join_type
)
if result is not None:
Expand Down Expand Up @@ -488,11 +521,4 @@ def _gen_namespaced_uid(self) -> str:
return self._gen_namespaced_uids(1)[0]

def _gen_namespaced_uids(self, n: int) -> List[str]:
i = len(self.node.defined_variables)
genned_ids: List[str] = []
while len(genned_ids) < n:
attempted_id = f"col_{i}"
if attempted_id not in self.node.defined_variables:
genned_ids.append(attempted_id)
i = i + 1
return genned_ids
return [ids.ColumnId.unique().name for _ in range(n)]
40 changes: 36 additions & 4 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2341,7 +2341,9 @@ def join(
# Handle null index, which only supports row join
# This is the canonical way of aligning on null index, so always allow (ignore block_identity_join)
if self.index.nlevels == other.index.nlevels == 0:
result = try_row_join(self, other, how=how)
result = try_legacy_row_join(self, other, how=how) or try_new_row_join(
self, other
)
if result is not None:
return result
raise bigframes.exceptions.NullIndexError(
Expand All @@ -2354,7 +2356,9 @@ def join(
and (self.index.nlevels == other.index.nlevels)
and (self.index.dtypes == other.index.dtypes)
):
result = try_row_join(self, other, how=how)
result = try_legacy_row_join(self, other, how=how) or try_new_row_join(
self, other
)
if result is not None:
return result

Expand Down Expand Up @@ -2693,7 +2697,35 @@ def is_uniquely_named(self: BlockIndexProperties):
return len(set(self.names)) == len(self.names)


def try_row_join(
def try_new_row_join(
left: Block, right: Block
) -> Optional[Tuple[Block, Tuple[Mapping[str, str], Mapping[str, str]],]]:
join_keys = tuple(
(left_id, right_id)
for left_id, right_id in zip(left.index_columns, right.index_columns)
)
join_result = left.expr.try_row_join(right.expr, join_keys)
if join_result is None: # did not succeed
return None
combined_expr, (get_column_left, get_column_right) = join_result
# Keep the left index column, and drop the matching right column
index_cols_post_join = [get_column_left[id] for id in left.index_columns]
combined_expr = combined_expr.drop_columns(
[get_column_right[id] for id in right.index_columns]
)
block = Block(
combined_expr,
index_columns=index_cols_post_join,
column_labels=left.column_labels.append(right.column_labels),
index_labels=left.index.names,
)
return (
block,
(get_column_left, get_column_right),
)


def try_legacy_row_join(
left: Block,
right: Block,
*,
Expand Down Expand Up @@ -2727,7 +2759,7 @@ def try_row_join(
)
for id in right.value_columns
]
combined_expr = left_expr.try_align_as_projection(
combined_expr = left_expr.try_legacy_row_join(
right_expr,
join_type=how,
join_keys=join_keys,
Expand Down
6 changes: 6 additions & 0 deletions bigframes/core/identifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import itertools
from typing import Generator

import bigframes.core.guid


def standard_id_strings(prefix: str = "col_") -> Generator[str, None, None]:
i = 0
Expand Down Expand Up @@ -47,6 +49,10 @@ def local_normalized(self) -> ColumnId:
def __lt__(self, other: ColumnId) -> bool:
return self.sql < other.sql

@classmethod
def unique(cls) -> ColumnId:
return ColumnId(name=bigframes.core.guid.generate_guid())


@dataclasses.dataclass(frozen=True)
class SerialColumnId(ColumnId):
Expand Down
Loading

0 comments on commit daef4f0

Please sign in to comment.