Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Allow join-free alignment of analytic expressions #1168

Merged
merged 6 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 29 additions & 10 deletions bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,14 +449,40 @@ def relational_join(
)
return ArrayValue(join_node), (l_mapping, r_mapping)

def try_align_as_projection(
def try_new_row_join(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just try_row_join since we plan on this being the only one early next year?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

self,
other: ArrayValue,
conditions: typing.Tuple[typing.Tuple[str, str], ...] = (),
) -> Optional[
typing.Tuple[ArrayValue, typing.Tuple[dict[str, str], dict[str, str]]]
]:
import bigframes.core.node_align

result_node = bigframes.core.node_align.join_as_projection(
self.node, other.node, conditions
)
if result_node is None:
return None

new_ids = list(field.id.name for field in result_node.fields)
self_pre_fields = list(field.id.name for field in self.node.fields)
other_pre_fields = list(field.id.name for field in other.node.fields)
return (
ArrayValue(result_node),
(
dict(zip(self_pre_fields, new_ids[: len(self_pre_fields) :])),
dict(zip(other_pre_fields, new_ids[len(self_pre_fields) :])),
),
)

def try_legacy_row_join(
self,
other: ArrayValue,
join_type: join_def.JoinType,
join_keys: typing.Tuple[join_def.CoalescedColumnMapping, ...],
mappings: typing.Tuple[join_def.JoinColumnMapping, ...],
) -> typing.Optional[ArrayValue]:
result = bigframes.core.rewrite.join_as_projection(
result = bigframes.core.rewrite.legacy_join_as_projection(
self.node, other.node, join_keys, mappings, join_type
)
if result is not None:
Expand Down Expand Up @@ -488,11 +514,4 @@ def _gen_namespaced_uid(self) -> str:
return self._gen_namespaced_uids(1)[0]

def _gen_namespaced_uids(self, n: int) -> List[str]:
i = len(self.node.defined_variables)
genned_ids: List[str] = []
while len(genned_ids) < n:
attempted_id = f"col_{i}"
if attempted_id not in self.node.defined_variables:
genned_ids.append(attempted_id)
i = i + 1
return genned_ids
return [ids.ColumnId.unique().name for _ in range(n)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this make the column IDs less deterministic than the previous logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not any less deterministic than we are now. If we want an isomorphism between query structure and output syntax, there is a bit more work that needs to be done to the system, which I think basically amounts to late binding identifiers serially through the tree.

40 changes: 36 additions & 4 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2341,7 +2341,9 @@ def join(
# Handle null index, which only supports row join
# This is the canonical way of aligning on null index, so always allow (ignore block_identity_join)
if self.index.nlevels == other.index.nlevels == 0:
result = try_row_join(self, other, how=how)
result = try_legacy_row_join(self, other, how=how) or try_new_row_join(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No how necessary in the new row join?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed now, as now emulating more recent pandas alignment logic, where the sides need to match exactly, without any view-like interpretations of filtered objects.

self, other
)
if result is not None:
return result
raise bigframes.exceptions.NullIndexError(
Expand All @@ -2354,7 +2356,9 @@ def join(
and (self.index.nlevels == other.index.nlevels)
and (self.index.dtypes == other.index.dtypes)
):
result = try_row_join(self, other, how=how)
result = try_legacy_row_join(self, other, how=how) or try_new_row_join(
self, other
)
if result is not None:
return result

Expand Down Expand Up @@ -2693,7 +2697,35 @@ def is_uniquely_named(self: BlockIndexProperties):
return len(set(self.names)) == len(self.names)


def try_row_join(
def try_new_row_join(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto, re try_row_join here, but perhaps in a separate PR since I see that it would make it harder to identify the one's that should be replaced with try_legacy_row_join if we did that.

left: Block, right: Block
) -> Optional[Tuple[Block, Tuple[Mapping[str, str], Mapping[str, str]],]]:
join_keys = tuple(
(left_id, right_id)
for left_id, right_id in zip(left.index_columns, right.index_columns)
)
join_result = left.expr.try_new_row_join(right.expr, join_keys)
if join_result is None: # did not succeed
return None
combined_expr, (get_column_left, get_column_right) = join_result
# Keep the left index column, and drop the matching right column
index_cols_post_join = [get_column_left[id] for id in left.index_columns]
combined_expr = combined_expr.drop_columns(
[get_column_right[id] for id in right.index_columns]
)
block = Block(
combined_expr,
index_columns=index_cols_post_join,
column_labels=left.column_labels.append(right.column_labels),
index_labels=left.index.names,
)
return (
block,
(get_column_left, get_column_right),
)


def try_legacy_row_join(
left: Block,
right: Block,
*,
Expand Down Expand Up @@ -2727,7 +2759,7 @@ def try_row_join(
)
for id in right.value_columns
]
combined_expr = left_expr.try_align_as_projection(
combined_expr = left_expr.try_legacy_row_join(
right_expr,
join_type=how,
join_keys=join_keys,
Expand Down
6 changes: 6 additions & 0 deletions bigframes/core/identifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import itertools
from typing import Generator

import bigframes.core.guid


def standard_id_strings(prefix: str = "col_") -> Generator[str, None, None]:
i = 0
Expand Down Expand Up @@ -47,6 +49,10 @@ def local_normalized(self) -> ColumnId:
def __lt__(self, other: ColumnId) -> bool:
return self.sql < other.sql

@classmethod
def unique(cls) -> ColumnId:
return ColumnId(name=bigframes.core.guid.generate_guid())


@dataclasses.dataclass(frozen=True)
class SerialColumnId(ColumnId):
Expand Down
167 changes: 167 additions & 0 deletions bigframes/core/node_align.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

import dataclasses
from typing import Optional, Tuple

import bigframes.core.expression
import bigframes.core.guid
import bigframes.core.identifiers
import bigframes.core.join_def
import bigframes.core.nodes
import bigframes.core.window_spec
import bigframes.operations.aggregations

ADDITIVE_NODES = (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be helpful to have some comments about what these node types have in common. How would I decide if a node type should be added to this list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added comment explaining

bigframes.core.nodes.ProjectionNode,
bigframes.core.nodes.WindowOpNode,
bigframes.core.nodes.PromoteOffsetsNode,
)


@dataclasses.dataclass(frozen=True)
class ExpressionSpec:
expression: bigframes.core.expression.Expression
node: bigframes.core.nodes.BigFrameNode


def get_expression_spec(
node: bigframes.core.nodes.BigFrameNode, id: bigframes.core.identifiers.ColumnId
) -> ExpressionSpec:
"""Normalizes column value by chaining expressions across multiple selection and projection nodes if possible"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on conversations we've had with Shevak, perhaps we be more explicit that this is a BFET -> BFET rewrite in terms of our module organization?

I suppose a bigframes/core/rewrites/... directory would help with that?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, could you describe a bit more about the purpose? I'm slightly confused as to why we'd need to do this normalization one column at a time? Maybe for Series?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, we are trying to identify if the join keys are equal, so we want to compare the join key expression without evaluating. The same value can be achieve in different ways with different names, so the goal here is to normalize the key expression as much as possible to evaluate equivalence.

expression: bigframes.core.expression.Expression = (
bigframes.core.expression.DerefOp(id)
)
curr_node = node
while True:
if isinstance(curr_node, bigframes.core.nodes.SelectionNode):
select_mappings = {
col_id: ref for ref, col_id in curr_node.input_output_pairs
}
expression = expression.bind_refs(
select_mappings, allow_partial_bindings=True
)
elif isinstance(curr_node, bigframes.core.nodes.ProjectionNode):
proj_mappings = {col_id: expr for expr, col_id in curr_node.assignments}
expression = expression.bind_refs(
proj_mappings, allow_partial_bindings=True
)
elif isinstance(
curr_node,
(
bigframes.core.nodes.WindowOpNode,
bigframes.core.nodes.PromoteOffsetsNode,
),
):
pass
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a comment here as to why there's nothing to bind?

else:
return ExpressionSpec(expression, curr_node)
curr_node = curr_node.child


def _linearize_trees(
base_tree: bigframes.core.nodes.BigFrameNode,
append_tree: bigframes.core.nodes.BigFrameNode,
) -> bigframes.core.nodes.BigFrameNode:
"""Linearize two divergent tree who only diverge through different additive nodes."""
assert append_tree.projection_base == base_tree.projection_base
if append_tree == append_tree.projection_base:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would it be its own projection base? Could you give some examples in a comment?

return base_tree
else:
assert isinstance(append_tree, ADDITIVE_NODES)
return append_tree.replace_child(_linearize_trees(base_tree, append_tree.child))


def combine_nodes(
l_node: bigframes.core.nodes.BigFrameNode,
r_node: bigframes.core.nodes.BigFrameNode,
) -> bigframes.core.nodes.BigFrameNode:
assert l_node.projection_base == r_node.projection_base
l_node, l_selection = pull_up_selection(l_node)
r_node, r_selection = pull_up_selection(
r_node, rename_vars=True
) # Rename only right vars to avoid collisions with left vars
combined_selection = (*l_selection, *r_selection)
merged_node = _linearize_trees(l_node, r_node)
return bigframes.core.nodes.SelectionNode(merged_node, combined_selection)


def join_as_projection(
l_node: bigframes.core.nodes.BigFrameNode,
r_node: bigframes.core.nodes.BigFrameNode,
join_keys: Tuple[Tuple[str, str], ...],
) -> Optional[bigframes.core.nodes.BigFrameNode]:
"""Joins the two nodes"""
if l_node.projection_base != r_node.projection_base:
return None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there are return Nones should the be try_join_as_projection?

# check join key
for l_key, r_key in join_keys:
# Caller is block, so they still work with raw strings rather than ids
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we fix that? e.g. make block use ColumnId?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we should do that refactor, but its a bit of an undertaking. For now, block uses string ids, and ArrayValue is responsible for wrapping them up as ColumnIds.

left_id = bigframes.core.identifiers.ColumnId(l_key)
right_id = bigframes.core.identifiers.ColumnId(r_key)
if get_expression_spec(l_node, left_id) != get_expression_spec(
r_node, right_id
):
return None
return combine_nodes(l_node, r_node)


def pull_up_selection(
node: bigframes.core.nodes.BigFrameNode, rename_vars: bool = False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of the rename_vars parameter? A docstring would be helpful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added docstring. its intended to make sure that when we combine columns from two sides, we don't get conflicts.

) -> Tuple[
bigframes.core.nodes.BigFrameNode,
Tuple[
Tuple[bigframes.core.expression.DerefOp, bigframes.core.identifiers.ColumnId],
...,
],
]:
"""Remove all selection nodes above the base node. Returns stripped tree."""
if node == node.projection_base: # base case
return node, tuple(
(bigframes.core.expression.DerefOp(field.id), field.id)
for field in node.fields
)
assert isinstance(node, (bigframes.core.nodes.SelectionNode, *ADDITIVE_NODES))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I remembering correctly that SelectionNode represents a WHERE clause, based on Selection from relational algebra terminology?

Given the high-level that implicit joiner no longer handles these, I'm a little confused why we need to rewrite these? Is it so that they are all consolidated so that the nodes we can combine appear together in the tree? I would appreciate a bit more info.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its actually poorly named. SelectionNode just renames/reorders existing columns, without any scalar transforms

child_node, child_selections = pull_up_selection(node.child)
mapping = {out: ref.id for ref, out in child_selections}
if isinstance(node, ADDITIVE_NODES):
new_node: bigframes.core.nodes.BigFrameNode = node.replace_child(child_node)
new_node = new_node.remap_refs(mapping)
if rename_vars:
var_renames = {
field.id: bigframes.core.identifiers.ColumnId.unique()
for field in node.added_fields
}
new_node = new_node.remap_vars(var_renames)
assert isinstance(new_node, ADDITIVE_NODES)
added_selections = (
(bigframes.core.expression.DerefOp(field.id), field.id)
for field in new_node.added_fields
)
new_selection = (*child_selections, *added_selections)
assert all(ref.id in new_node.ids for ref, _ in new_selection)
return new_node, new_selection
elif isinstance(node, bigframes.core.nodes.SelectionNode):
new_selection = tuple(
(
bigframes.core.expression.DerefOp(mapping[ref.id]),
(bigframes.core.identifiers.ColumnId.unique() if rename_vars else out),
)
for ref, out in node.input_output_pairs
)
if not (all(ref.id in child_node.ids for ref, _ in new_selection)):
raise ValueError()
return child_node, new_selection
raise ValueError(f"Couldn't pull up select from node: {node}")
36 changes: 36 additions & 0 deletions bigframes/core/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ def child_nodes(self) -> typing.Sequence[BigFrameNode]:
"""Direct children of this node"""
return tuple([])

@property
def projection_base(self) -> BigFrameNode:
return self

@property
@abc.abstractmethod
def row_count(self) -> typing.Optional[int]:
Expand Down Expand Up @@ -307,6 +311,14 @@ def transform_children(
return self
return transformed

def replace_child(
self, new_child: BigFrameNode, validate: bool = False
) -> UnaryNode:
new_self = replace(self, child=new_child) # type: ignore
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to open up the file to figure out this is from dataclasses. Per https://google.github.io/styleguide/pyguide.html#s2.2-imports please import dataclasses, not individual functions / classes from it. dataclasses is not one of the allowed exceptions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed this import

if validate:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious, in what cases is it necessary to have a validate argument instead of a public validate() method? If it's for method chaining, we could have validate() return self.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eh, I think I"ll remove that, its not being used, and yeah, could run validations it as a separate invocation

new_self._validate()
return new_self

@property
def order_ambiguous(self) -> bool:
return self.child.order_ambiguous
Expand Down Expand Up @@ -903,6 +915,14 @@ def row_count(self) -> Optional[int]:
def node_defined_ids(self) -> Tuple[bfet_ids.ColumnId, ...]:
return (self.col_id,)

@property
def projection_base(self) -> BigFrameNode:
return self.child.projection_base

@property
def added_fields(self) -> Tuple[Field, ...]:
return (Field(self.col_id, bigframes.dtypes.INT_DTYPE),)

def prune(self, used_cols: COLUMN_SET) -> BigFrameNode:
if self.col_id not in used_cols:
return self.child.prune(used_cols)
Expand Down Expand Up @@ -1072,6 +1092,10 @@ def variables_introduced(self) -> int:
def defines_namespace(self) -> bool:
return True

@property
def projection_base(self) -> BigFrameNode:
return self.child.projection_base

@property
def row_count(self) -> Optional[int]:
return self.child.row_count
Expand Down Expand Up @@ -1146,6 +1170,10 @@ def variables_introduced(self) -> int:
def row_count(self) -> Optional[int]:
return self.child.row_count

@property
def projection_base(self) -> BigFrameNode:
return self.child.projection_base

@property
def node_defined_ids(self) -> Tuple[bfet_ids.ColumnId, ...]:
return tuple(id for _, id in self.assignments)
Expand Down Expand Up @@ -1324,6 +1352,14 @@ def fields(self) -> Iterable[Field]:
def variables_introduced(self) -> int:
return 1

@property
def projection_base(self) -> BigFrameNode:
return self.child.projection_base

@property
def added_fields(self) -> Tuple[Field, ...]:
return (self.added_field,)

@property
def relation_ops_created(self) -> int:
# Assume that if not reprojecting, that there is a sequence of window operations sharing the same window
Expand Down
2 changes: 1 addition & 1 deletion bigframes/core/rewrite.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ def expand(self) -> nodes.BigFrameNode:
)


def join_as_projection(
def legacy_join_as_projection(
l_node: nodes.BigFrameNode,
r_node: nodes.BigFrameNode,
join_keys: Tuple[join_defs.CoalescedColumnMapping, ...],
Expand Down
Loading
Loading