diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index a8b79223f4..c01d321943 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -67,11 +67,20 @@ def from_pyarrow(cls, arrow_table: pa.Table, session: Session): iobytes = io.BytesIO() pa_feather.write_feather(adapted_table, iobytes) + # Scan all columns by default, we define this list as it can be pruned while preserving source_def + scan_list = nodes.ScanList( + tuple( + nodes.ScanItem(ids.ColumnId(item.column), item.dtype, item.column) + for item in schema.items + ) + ) + node = nodes.ReadLocalNode( iobytes.getvalue(), data_schema=schema, session=session, n_rows=arrow_table.num_rows, + scan_list=scan_list, ) return cls(node) @@ -94,14 +103,30 @@ def from_table( "Interpreting JSON column(s) as StringDtype. This behavior may change in future versions.", bigframes.exceptions.PreviewWarning, ) + # define data source only for needed columns, this makes row-hashing cheaper + table = nodes.GbqTable.from_table(table, columns=schema.names) + + # create ordering from info + ordering = None + if offsets_col: + ordering = orderings.TotalOrdering.from_offset_col(offsets_col) + elif primary_key: + ordering = orderings.TotalOrdering.from_primary_key(primary_key) + + # Scan all columns by default, we define this list as it can be pruned while preserving source_def + scan_list = nodes.ScanList( + tuple( + nodes.ScanItem(ids.ColumnId(item.column), item.dtype, item.column) + for item in schema.items + ) + ) + source_def = nodes.BigqueryDataSource( + table=table, at_time=at_time, sql_predicate=predicate, ordering=ordering + ) node = nodes.ReadTableNode( - table=nodes.GbqTable.from_table(table), - total_order_cols=(offsets_col,) if offsets_col else tuple(primary_key), - order_col_is_sequential=(offsets_col is not None), - columns=schema, - at_time=at_time, + source=source_def, + scan_list=scan_list, table_session=session, - sql_predicate=predicate, ) return cls(node) @@ -147,12 +172,22 @@ def as_cached( ordering: Optional[orderings.RowOrdering], ) -> ArrayValue: """ - Replace the node with an equivalent one that references a tabel where the value has been materialized to. + Replace the node with an equivalent one that references a table where the value has been materialized to. """ + table = nodes.GbqTable.from_table(cache_table) + source = nodes.BigqueryDataSource(table, ordering=ordering) + # Assumption: GBQ cached table uses field name as bq column name + scan_list = nodes.ScanList( + tuple( + nodes.ScanItem(field.id, field.dtype, field.id.name) + for field in self.node.fields + ) + ) node = nodes.CachedTableNode( original_node=self.node, - table=nodes.GbqTable.from_table(cache_table), - ordering=ordering, + source=source, + table_session=self.session, + scan_list=scan_list, ) return ArrayValue(node) @@ -369,28 +404,33 @@ def relational_join( conditions: typing.Tuple[typing.Tuple[str, str], ...] = (), type: typing.Literal["inner", "outer", "left", "right", "cross"] = "inner", ) -> typing.Tuple[ArrayValue, typing.Tuple[dict[str, str], dict[str, str]]]: + l_mapping = { # Identity mapping, only rename right side + lcol.name: lcol.name for lcol in self.node.ids + } + r_mapping = { # Rename conflicting names + rcol.name: rcol.name + if (rcol not in l_mapping) + else bigframes.core.guid.generate_guid() + for rcol in other.node.ids + } + other_node = other.node + if set(other_node.ids) & set(self.node.ids): + other_node = nodes.SelectionNode( + other_node, + tuple( + (ex.deref(old_id), ids.ColumnId(new_id)) + for old_id, new_id in r_mapping.items() + ), + ) + join_node = nodes.JoinNode( left_child=self.node, - right_child=other.node, + right_child=other_node, conditions=tuple( (ex.deref(l_col), ex.deref(r_col)) for l_col, r_col in conditions ), type=type, ) - # Maps input ids to output ids for caller convenience - l_size = len(self.node.schema) - l_mapping = { - lcol: ocol - for lcol, ocol in zip( - self.node.schema.names, join_node.schema.names[:l_size] - ) - } - r_mapping = { - rcol: ocol - for rcol, ocol in zip( - other.node.schema.names, join_node.schema.names[l_size:] - ) - } return ArrayValue(join_node), (l_mapping, r_mapping) def try_align_as_projection( diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index cbf3519651..a66cb66b1e 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -16,7 +16,6 @@ import dataclasses import functools import io -import itertools import typing import ibis @@ -29,9 +28,12 @@ import bigframes.core.compile.concat as concat_impl import bigframes.core.compile.default_ordering as default_ordering import bigframes.core.compile.ibis_types +import bigframes.core.compile.scalar_op_compiler +import bigframes.core.compile.scalar_op_compiler as compile_scalar import bigframes.core.compile.schema_translator import bigframes.core.compile.single_column -import bigframes.core.expression as ex +import bigframes.core.guid as guids +import bigframes.core.identifiers as ids import bigframes.core.nodes as nodes import bigframes.core.ordering as bf_ordering @@ -45,6 +47,7 @@ class Compiler: # In strict mode, ordering will always be deterministic # In unstrict mode, ordering from ReadTable or after joins may be ambiguous to improve query performance. strict: bool = True + scalar_op_compiler = compile_scalar.ScalarOpCompiler() def compile_ordered_ir(self, node: nodes.BigFrameNode) -> compiled.OrderedIR: ir = typing.cast(compiled.OrderedIR, self.compile_node(node, True)) @@ -121,81 +124,30 @@ def compile_readlocal(self, node: nodes.ReadLocalNode, ordered: bool = True): else: return ordered_ir.to_unordered() - @_compile_node.register - def compile_cached_table(self, node: nodes.CachedTableNode, ordered: bool = True): - full_table_name = ( - f"{node.table.project_id}.{node.table.dataset_id}.{node.table.table_id}" - ) - physical_schema = ibis.backends.bigquery.BigQuerySchema.to_ibis( - node.table.physical_schema - ) - ibis_table = ibis.table(physical_schema, full_table_name) - if ordered: - if node.ordering is None: - # If this happens, session malfunctioned while applying cached results. - raise ValueError( - "Cannot use unordered cached value. Result requires ordering information." - ) - if self.strict and not isinstance(node.ordering, bf_ordering.TotalOrdering): - raise ValueError( - "Cannot use partially ordered cached value. Result requires total ordering information." - ) - ir = compiled.OrderedIR( - ibis_table, - columns=tuple( - bigframes.core.compile.ibis_types.ibis_value_to_canonical_type( - ibis_table[col.sql] - ) - for col in itertools.chain( - map(lambda x: x.id, node.fields), node._hidden_columns - ) - ), - ordering=node.ordering, - ) - ir = ir._select( - tuple(ir._get_ibis_column(name) for name in node.schema.names) - ) - return ir - else: - return compiled.UnorderedIR( - ibis_table, - columns=tuple( - bigframes.core.compile.ibis_types.ibis_value_to_canonical_type( - ibis_table[col] - ) - for col in node.schema.names - ), - ) - @_compile_node.register def compile_readtable(self, node: nodes.ReadTableNode, ordered: bool = True): if ordered: - return self.compile_read_table_ordered(node) + return self.compile_read_table_ordered(node.source, node.scan_list) else: - return self.compile_read_table_unordered(node) + return self.compile_read_table_unordered(node.source, node.scan_list) def read_table_as_unordered_ibis( - self, node: nodes.ReadTableNode + self, source: nodes.BigqueryDataSource ) -> ibis.expr.types.Table: - full_table_name = ( - f"{node.table.project_id}.{node.table.dataset_id}.{node.table.table_id}" - ) - used_columns = ( - *node.schema.names, - *[i for i in node.total_order_cols if i not in node.schema.names], - ) + full_table_name = f"{source.table.project_id}.{source.table.dataset_id}.{source.table.table_id}" + used_columns = tuple(col.name for col in source.table.physical_schema) # Physical schema might include unused columns, unsupported datatypes like JSON physical_schema = ibis.backends.bigquery.BigQuerySchema.to_ibis( - list(i for i in node.table.physical_schema if i.name in used_columns) + list(i for i in source.table.physical_schema if i.name in used_columns) ) - if node.at_time is not None or node.sql_predicate is not None: + if source.at_time is not None or source.sql_predicate is not None: import bigframes.session._io.bigquery sql = bigframes.session._io.bigquery.to_query( full_table_name, columns=used_columns, - sql_predicate=node.sql_predicate, - time_travel_timestamp=node.at_time, + sql_predicate=source.sql_predicate, + time_travel_timestamp=source.at_time, ) return ibis.backends.bigquery.Backend().sql( schema=physical_schema, query=sql @@ -203,56 +155,64 @@ def read_table_as_unordered_ibis( else: return ibis.table(physical_schema, full_table_name) - def compile_read_table_unordered(self, node: nodes.ReadTableNode): - ibis_table = self.read_table_as_unordered_ibis(node) + def compile_read_table_unordered( + self, source: nodes.BigqueryDataSource, scan: nodes.ScanList + ): + ibis_table = self.read_table_as_unordered_ibis(source) return compiled.UnorderedIR( ibis_table, tuple( bigframes.core.compile.ibis_types.ibis_value_to_canonical_type( - ibis_table[col] + ibis_table[scan_item.source_id].name(scan_item.id.sql) ) - for col in node.schema.names + for scan_item in scan.items ), ) - def compile_read_table_ordered(self, node: nodes.ReadTableNode): - ibis_table = self.read_table_as_unordered_ibis(node) - if node.total_order_cols: - ordering_value_columns = tuple( - bf_ordering.ascending_over(col) for col in node.total_order_cols + def compile_read_table_ordered( + self, source: nodes.BigqueryDataSource, scan_list: nodes.ScanList + ): + ibis_table = self.read_table_as_unordered_ibis(source) + if source.ordering is not None: + visible_column_mapping = { + ids.ColumnId(scan_item.source_id): scan_item.id + for scan_item in scan_list.items + } + full_mapping = { + ids.ColumnId(col.name): ids.ColumnId(guids.generate_guid()) + for col in source.ordering.referenced_columns + } + full_mapping.update(visible_column_mapping) + + ordering = source.ordering.remap_column_refs(full_mapping) + hidden_columns = tuple( + ibis_table[source_id.sql].name(out_id.sql) + for source_id, out_id in full_mapping.items() + if source_id not in visible_column_mapping ) - if node.order_col_is_sequential: - integer_encoding = bf_ordering.IntegerEncoding( - is_encoded=True, is_sequential=True + elif self.strict: # In strict mode, we fallback to ordering by row hash + order_values = [ + col.name(guids.generate_guid()) + for col in default_ordering.gen_default_ordering( + ibis_table, use_double_hash=True ) - else: - integer_encoding = bf_ordering.IntegerEncoding() - ordering: bf_ordering.RowOrdering = bf_ordering.TotalOrdering( - ordering_value_columns, - integer_encoding=integer_encoding, - total_ordering_columns=frozenset(map(ex.deref, node.total_order_cols)), - ) - hidden_columns = () - elif self.strict: - ibis_table, ordering = default_ordering.gen_default_ordering( - ibis_table, use_double_hash=True - ) - hidden_columns = tuple( - ibis_table[col] - for col in ibis_table.columns - if col not in node.schema.names + ] + ordering = bf_ordering.TotalOrdering.from_primary_key( + [value.get_name() for value in order_values] ) + hidden_columns = tuple(order_values) else: # In unstrict mode, don't generate total ordering from hashing as this is # expensive (prevent removing any columns from table scan) ordering, hidden_columns = bf_ordering.RowOrdering(), () + return compiled.OrderedIR( ibis_table, columns=tuple( bigframes.core.compile.ibis_types.ibis_value_to_canonical_type( - ibis_table[col] + ibis_table[scan_item.source_id].name(scan_item.id.sql) ) - for col in node.schema.names + for scan_item in scan_list.items ), ordering=ordering, hidden_ordering_columns=hidden_columns, diff --git a/bigframes/core/compile/default_ordering.py b/bigframes/core/compile/default_ordering.py index 910f822d63..bafeebddc9 100644 --- a/bigframes/core/compile/default_ordering.py +++ b/bigframes/core/compile/default_ordering.py @@ -18,7 +18,6 @@ from __future__ import annotations -import itertools from typing import cast import bigframes_vendored.ibis.expr.operations as vendored_ibis_ops @@ -27,9 +26,7 @@ import ibis.expr.datatypes as ibis_dtypes import ibis.expr.types as ibis_types -import bigframes.core.expression as ex import bigframes.core.guid as guid -import bigframes.core.ordering as order def _convert_to_nonnull_string(column: ibis_types.Column) -> ibis_types.StringValue: @@ -59,7 +56,9 @@ def _convert_to_nonnull_string(column: ibis_types.Column) -> ibis_types.StringVa return cast(ibis_types.StringColumn, ibis.literal("\\")).concat(escaped) -def gen_default_ordering(table: ibis.table, use_double_hash: bool = True): +def gen_default_ordering( + table: ibis.table, use_double_hash: bool = True +) -> list[ibis.Value]: ordering_hash_part = guid.generate_guid("bigframes_ordering_") ordering_hash_part2 = guid.generate_guid("bigframes_ordering_") ordering_rand_part = guid.generate_guid("bigframes_ordering_") @@ -82,18 +81,4 @@ def gen_default_ordering(table: ibis.table, use_double_hash: bool = True): if use_double_hash else [full_row_hash, random_value] ) - - original_column_ids = table.columns - table_with_ordering = table.select( - itertools.chain(original_column_ids, order_values) - ) - - ordering = order.TotalOrdering( - ordering_value_columns=tuple( - order.ascending_over(col.get_name()) for col in order_values - ), - total_ordering_columns=frozenset( - ex.deref(col.get_name()) for col in order_values - ), - ) - return table_with_ordering, ordering + return order_values diff --git a/bigframes/core/compile/single_column.py b/bigframes/core/compile/single_column.py index 48fa52974e..ccb4561ab4 100644 --- a/bigframes/core/compile/single_column.py +++ b/bigframes/core/compile/single_column.py @@ -52,9 +52,8 @@ def join_by_column_ordered( """ # Do not reset the generator - id_generator = ids.standard_identifiers() - l_value_mapping = dict(zip(left.column_ids, id_generator)) - r_value_mapping = dict(zip(right.column_ids, id_generator)) + l_value_mapping = dict(zip(left.column_ids, left.column_ids)) + r_value_mapping = dict(zip(right.column_ids, right.column_ids)) l_hidden_mapping = { id: guids.generate_guid("hidden_") for id in left._hidden_column_ids @@ -143,18 +142,11 @@ def join_by_column_unordered( first the coalesced join keys, then, all the left columns, and finally, all the right columns. """ - id_generator = ids.standard_identifiers() - l_mapping = dict(zip(left.column_ids, id_generator)) - r_mapping = dict(zip(right.column_ids, id_generator)) - left_table = left._to_ibis_expr( - col_id_overrides=l_mapping, - ) - right_table = right._to_ibis_expr( - col_id_overrides=r_mapping, - ) + left_table = left._to_ibis_expr() + right_table = right._to_ibis_expr() join_conditions = [ - value_to_join_key(left_table[l_mapping[left_index]]) - == value_to_join_key(right_table[r_mapping[right_index]]) + value_to_join_key(left_table[left_index]) + == value_to_join_key(right_table[right_index]) for left_index, right_index in conditions ] @@ -166,8 +158,8 @@ def join_by_column_unordered( ) # We could filter out the original join columns, but predicates/ordering # might still reference them in implicit joins. - columns = [combined_table[l_mapping[col.get_name()]] for col in left.columns] + [ - combined_table[r_mapping[col.get_name()]] for col in right.columns + columns = [combined_table[col.get_name()] for col in left.columns] + [ + combined_table[col.get_name()] for col in right.columns ] return compiled.UnorderedIR( combined_table, diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index 2649b41227..9f8b80ce7e 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -20,7 +20,7 @@ import functools import itertools import typing -from typing import Callable, Tuple +from typing import Callable, Iterable, Sequence, Tuple import google.cloud.bigquery as bq @@ -110,9 +110,13 @@ def roots(self) -> typing.Set[BigFrameNode]: # TODO: For deep trees, this can create a lot of overhead, maybe use zero-copy persistent datastructure? @property @abc.abstractmethod - def fields(self) -> Tuple[Field, ...]: + def fields(self) -> Iterable[Field]: ... + @property + def ids(self) -> Iterable[bfet_ids.ColumnId]: + return (field.id for field in self.fields) + @property @abc.abstractmethod def variables_introduced(self) -> int: @@ -226,7 +230,7 @@ def child_nodes(self) -> typing.Sequence[BigFrameNode]: return (self.child,) @functools.cached_property - def fields(self) -> Tuple[Field, ...]: + def fields(self) -> Iterable[Field]: return self.child.fields @property @@ -250,6 +254,11 @@ class JoinNode(BigFrameNode): conditions: typing.Tuple[typing.Tuple[ex.DerefOp, ex.DerefOp], ...] type: typing.Literal["inner", "outer", "left", "right", "cross"] + def __post_init__(self): + assert not ( + set(self.left_child.ids) & set(self.right_child.ids) + ), "Join ids collide" + @property def row_preserving(self) -> bool: return False @@ -275,13 +284,8 @@ def __hash__(self): return self._node_hash @functools.cached_property - def fields(self) -> Tuple[Field, ...]: - items = [] - schema_items = itertools.chain(self.left_child.fields, self.right_child.fields) - identifiers = map(bfet_ids.ColumnId, bfet_ids.standard_identifiers()) - for id, item in zip(identifiers, schema_items): - items.append(Field(id, item.dtype)) - return tuple(items) + def fields(self) -> Iterable[Field]: + return tuple(itertools.chain(self.left_child.fields, self.right_child.fields)) @functools.cached_property def variables_introduced(self) -> int: @@ -375,11 +379,24 @@ def row_count(self) -> typing.Optional[int]: return None +class ScanItem(typing.NamedTuple): + id: bfet_ids.ColumnId + dtype: bigframes.dtypes.Dtype # Might be multiple logical types for a given physical source type + source_id: str # Flexible enough for both local data and bq data + + +@dataclass(frozen=True) +class ScanList: + items: typing.Tuple[ScanItem, ...] + + @dataclass(frozen=True) class ReadLocalNode(LeafNode): feather_bytes: bytes data_schema: schemata.ArraySchema n_rows: int + # Mapping of local ids to bfet id. + scan_list: ScanList session: typing.Optional[bigframes.session.Session] = None def __hash__(self): @@ -387,14 +404,12 @@ def __hash__(self): @functools.cached_property def fields(self) -> Tuple[Field, ...]: - return tuple( - Field(bfet_ids.ColumnId(i.column), i.dtype) for i in self.data_schema.items - ) + return tuple(Field(col_id, dtype) for col_id, dtype, _ in self.scan_list.items) @functools.cached_property def variables_introduced(self) -> int: """Defines the number of variables generated by the current node. Used to estimate query planning complexity.""" - return len(self.schema.items) + 1 + return len(self.scan_list.items) + 1 @property def supports_fast_head(self) -> bool: @@ -423,12 +438,17 @@ class GbqTable: cluster_cols: typing.Optional[Tuple[str, ...]] @staticmethod - def from_table(table: bq.Table) -> GbqTable: + def from_table(table: bq.Table, columns: Sequence[str] = ()) -> GbqTable: + # Subsetting fields with columns can reduce cost of row-hash default ordering + if columns: + schema = tuple(item for item in table.schema if item.name in columns) + else: + schema = tuple(table.schema) return GbqTable( project_id=table.project, dataset_id=table.dataset_id, table_id=table.table_id, - physical_schema=tuple(table.schema), + physical_schema=schema, n_rows=table.num_rows, cluster_cols=None if table.clustering_fields is None @@ -436,32 +456,40 @@ def from_table(table: bq.Table) -> GbqTable: ) -## Put ordering in here or just add order_by node above? @dataclass(frozen=True) -class ReadTableNode(LeafNode): - table: GbqTable - # Subset of physical schema columns, with chosen BQ types - columns: schemata.ArraySchema = field() +class BigqueryDataSource: + """ + Google BigQuery Data source. - table_session: bigframes.session.Session = field() - # Empty tuple if no primary key (primary key can be any set of columns that together form a unique key) - # Empty if no known unique key - total_order_cols: Tuple[str, ...] = field() - # indicates a primary key that is exactly offsets 0, 1, 2, ..., N-2, N-1 - order_col_is_sequential: bool = False + This should not be modified once defined, as all attributes contribute to the default ordering. + """ + + table: GbqTable at_time: typing.Optional[datetime.datetime] = None # Added for backwards compatibility, not validated sql_predicate: typing.Optional[str] = None + ordering: typing.Optional[orderings.RowOrdering] = None + + +## Put ordering in here or just add order_by node above? +@dataclass(frozen=True) +class ReadTableNode(LeafNode): + source: BigqueryDataSource + # Subset of physical schema column + # Mapping of table schema ids to bfet id. + scan_list: ScanList + + table_session: bigframes.session.Session = field() def __post_init__(self): # enforce invariants - physical_names = set(map(lambda i: i.name, self.table.physical_schema)) - if not set(self.columns.names).issubset(physical_names): + physical_names = set(map(lambda i: i.name, self.source.table.physical_schema)) + if not set(scan.source_id for scan in self.scan_list.items).issubset( + physical_names + ): raise ValueError( - f"Requested schema {self.columns} cannot be derived from table schemal {self.table.physical_schema}" + f"Requested schema {self.scan_list} cannot be derived from table schemal {self.source.table.physical_schema}" ) - if self.order_col_is_sequential and len(self.total_order_cols) != 1: - raise ValueError("Sequential primary key must have only one component") @property def session(self): @@ -472,9 +500,7 @@ def __hash__(self): @functools.cached_property def fields(self) -> Tuple[Field, ...]: - return tuple( - Field(bfet_ids.ColumnId(i.column), i.dtype) for i in self.columns.items - ) + return tuple(Field(col_id, dtype) for col_id, dtype, _ in self.scan_list.items) @property def relation_ops_created(self) -> int: @@ -486,94 +512,38 @@ def supports_fast_head(self) -> bool: # Fast head is only supported when row offsets are available. # In the future, ORDER BY+LIMIT optimizations may allow fast head when # clustered and/or partitioned on ordering key - return self.order_col_is_sequential + return (self.source.ordering is not None) and self.source.ordering.is_sequential @property def order_ambiguous(self) -> bool: - return len(self.total_order_cols) == 0 + return ( + self.source.ordering is None + ) or not self.source.ordering.is_total_ordering @property def explicitly_ordered(self) -> bool: - return len(self.total_order_cols) > 0 + return self.source.ordering is not None @functools.cached_property def variables_introduced(self) -> int: - return len(self.schema.items) + 1 + return len(self.scan_list.items) + 1 @property def row_count(self) -> typing.Optional[int]: - if self.sql_predicate is None: - return self.table.n_rows + if self.source.sql_predicate is None: + return self.source.table.n_rows return None -# This node shouldn't be used in the "original" expression tree, only used as replacement for original during planning @dataclass(frozen=True) -class CachedTableNode(LeafNode): +class CachedTableNode(ReadTableNode): # The original BFET subtree that was cached # note: this isn't a "child" node. original_node: BigFrameNode = field() - # reference to cached materialization of original_node - table: GbqTable - ordering: typing.Optional[orderings.RowOrdering] = field() - - def __post_init__(self): - # enforce invariants - physical_names = set(map(lambda i: i.name, self.table.physical_schema)) - logical_names = self.original_node.schema.names - if not set(logical_names).issubset(physical_names): - raise ValueError( - f"Requested schema {logical_names} cannot be derived from table schema {self.table.physical_schema}" - ) - - @property - def session(self): - return self.original_node.session def __hash__(self): return self._node_hash - @property - def fields(self) -> Tuple[Field, ...]: - return self.original_node.fields - - @functools.cached_property - def variables_introduced(self) -> int: - return len(self.fields) + OVERHEAD_VARIABLES - - @property - def _hidden_columns(self) -> typing.Tuple[bfet_ids.ColumnId, ...]: - """Physical columns used to define ordering but not directly exposed as value columns.""" - field_names = set(field.id for field in self.fields) - if self.ordering is None: - return () - return tuple( - col - for col in sorted(self.ordering.referenced_columns) - if col not in field_names - ) - - @property - def supports_fast_head(self) -> bool: - # Fast head is only supported when row offsets are available. - # In the future, ORDER BY+LIMIT optimizations may allow fast head when - # clustered and/or partitioned on ordering key - return (self.ordering is None) or self.ordering.is_sequential - - @property - def order_ambiguous(self) -> bool: - return not isinstance(self.ordering, orderings.TotalOrdering) - - @property - def explicitly_ordered(self) -> bool: - return (self.ordering is not None) and len( - self.ordering.all_ordering_columns - ) > 0 - - @property - def row_count(self) -> typing.Optional[int]: - return self.table.n_rows - # Unary nodes @dataclass(frozen=True) diff --git a/bigframes/core/ordering.py b/bigframes/core/ordering.py index daf08d5e87..96788b4e00 100644 --- a/bigframes/core/ordering.py +++ b/bigframes/core/ordering.py @@ -132,6 +132,10 @@ def is_string_encoded(self) -> bool: def is_sequential(self) -> bool: return self.integer_encoding.is_encoded and self.integer_encoding.is_sequential + @property + def is_total_ordering(self) -> bool: + return False + @property def total_order_col(self) -> Optional[OrderingExpression]: """Returns column id of columns that defines total ordering, if such as column exists""" @@ -229,6 +233,19 @@ def from_offset_col(cls, col: str) -> TotalOrdering: total_ordering_columns=frozenset({expression.deref(col)}), ) + @classmethod + def from_primary_key(cls, primary_key: Sequence[str]) -> TotalOrdering: + return TotalOrdering( + tuple(ascending_over(col) for col in primary_key), + total_ordering_columns=frozenset( + {expression.deref(col) for col in primary_key} + ), + ) + + @property + def is_total_ordering(self) -> bool: + return True + def with_non_sequential(self): """Create a copy that is marked as non-sequential.