Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add efficient peek dataframe preview #318

Merged
merged 14 commits into from
Jan 25, 2024
Merged
4 changes: 2 additions & 2 deletions bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,10 @@ def get_column_type(self, key: str) -> bigframes.dtypes.Dtype:
return self._compile_ordered().get_column_type(key)

def _compile_ordered(self) -> compiling.OrderedIR:
return compiling.compile_ordered(self.node)
return compiling.compile_ordered_ir(self.node)

def _compile_unordered(self) -> compiling.UnorderedIR:
return compiling.compile_unordered(self.node)
return compiling.compile_unordered_ir(self.node)

def row_count(self) -> ArrayValue:
"""Get number of rows in ArrayValue as a single-entry ArrayValue."""
Expand Down
10 changes: 10 additions & 0 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,8 +432,18 @@ def to_pandas(
downsampling=sampling, ordered=ordered
)
)
df.set_axis(self.column_labels, axis=1, copy=False)
return df, query_job

def try_peek(self, n: int = 20) -> typing.Optional[pd.DataFrame]:
if self.expr.node.peekable:
iterator, _ = self.session._peek(self.expr, n)
df = self._to_dataframe(iterator)
self._copy_index_to_pandas(df)
return df
else:
return None

def to_pandas_batches(self):
"""Download results one message at a time."""
dtypes = dict(zip(self.index_columns, self.index_dtypes))
Expand Down
6 changes: 3 additions & 3 deletions bigframes/core/compile/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
# limitations under the License.

from bigframes.core.compile.compiled import OrderedIR, UnorderedIR
from bigframes.core.compile.compiler import compile_ordered, compile_unordered
from bigframes.core.compile.compiler import compile_ordered_ir, compile_unordered_ir

__all__ = [
"compile_ordered",
"compile_unordered",
"compile_ordered_ir",
"compile_unordered_ir",
"OrderedIR",
"UnorderedIR",
]
7 changes: 7 additions & 0 deletions bigframes/core/compile/compiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,13 @@ def builder(self):
predicates=self._predicates,
)

def peek_sql(self, n: int):
# Peek currently implemented as top level LIMIT op.
# Execution engine handles limit pushdown.
# In future, may push down limit/filters in compilation.
sql = ibis_bigquery.Backend().compile(self._to_ibis_expr().limit(n))
return typing.cast(str, sql)

def to_sql(
self,
offset_column: typing.Optional[str] = None,
Expand Down
38 changes: 21 additions & 17 deletions bigframes/core/compile/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,18 @@
import bigframes.session


def compile_ordered(node: nodes.BigFrameNode) -> compiled.OrderedIR:
def compile_ordered_ir(node: nodes.BigFrameNode) -> compiled.OrderedIR:
return typing.cast(compiled.OrderedIR, compile_node(node, True))


def compile_unordered(node: nodes.BigFrameNode) -> compiled.UnorderedIR:
def compile_unordered_ir(node: nodes.BigFrameNode) -> compiled.UnorderedIR:
return typing.cast(compiled.UnorderedIR, compile_node(node, False))


def compile_peak_sql(node: nodes.BigFrameNode, n_rows: int) -> typing.Optional[str]:
return compile_unordered_ir(node).peek_sql(n_rows)


@functools.cache
def compile_node(
node: nodes.BigFrameNode, ordered: bool = True
Expand All @@ -56,17 +60,17 @@ def _compile_node(
@_compile_node.register
def compile_join(node: nodes.JoinNode, ordered: bool = True):
if ordered:
left_ordered = compile_ordered(node.left_child)
right_ordered = compile_ordered(node.right_child)
left_ordered = compile_ordered_ir(node.left_child)
right_ordered = compile_ordered_ir(node.right_child)
return bigframes.core.compile.single_column.join_by_column_ordered(
left=left_ordered,
right=right_ordered,
join=node.join,
allow_row_identity_join=node.allow_row_identity_join,
)
else:
left_unordered = compile_unordered(node.left_child)
right_unordered = compile_unordered(node.right_child)
left_unordered = compile_unordered_ir(node.left_child)
right_unordered = compile_unordered_ir(node.right_child)
return bigframes.core.compile.single_column.join_by_column_unordered(
left=left_unordered,
right=right_unordered,
Expand Down Expand Up @@ -103,7 +107,7 @@ def compile_readgbq(node: nodes.ReadGbqNode, ordered: bool = True):

@_compile_node.register
def compile_promote_offsets(node: nodes.PromoteOffsetsNode, ordered: bool = True):
result = compile_ordered(node.child).promote_offsets(node.col_id)
result = compile_ordered_ir(node.child).promote_offsets(node.col_id)
return result if ordered else result.to_unordered()


Expand All @@ -115,17 +119,17 @@ def compile_filter(node: nodes.FilterNode, ordered: bool = True):
@_compile_node.register
def compile_orderby(node: nodes.OrderByNode, ordered: bool = True):
if ordered:
return compile_ordered(node.child).order_by(node.by)
return compile_ordered_ir(node.child).order_by(node.by)
else:
return compile_unordered(node.child)
return compile_unordered_ir(node.child)


@_compile_node.register
def compile_reversed(node: nodes.ReversedNode, ordered: bool = True):
if ordered:
return compile_ordered(node.child).reversed()
return compile_ordered_ir(node.child).reversed()
else:
return compile_unordered(node.child)
return compile_unordered_ir(node.child)


@_compile_node.register
Expand All @@ -137,36 +141,36 @@ def compile_projection(node: nodes.ProjectionNode, ordered: bool = True):
@_compile_node.register
def compile_concat(node: nodes.ConcatNode, ordered: bool = True):
if ordered:
compiled_ordered = [compile_ordered(node) for node in node.children]
compiled_ordered = [compile_ordered_ir(node) for node in node.children]
return concat_impl.concat_ordered(compiled_ordered)
else:
compiled_unordered = [compile_unordered(node) for node in node.children]
compiled_unordered = [compile_unordered_ir(node) for node in node.children]
return concat_impl.concat_unordered(compiled_unordered)


@_compile_node.register
def compile_rowcount(node: nodes.RowCountNode, ordered: bool = True):
result = compile_unordered(node.child).row_count()
result = compile_unordered_ir(node.child).row_count()
return result if ordered else result.to_unordered()


@_compile_node.register
def compile_aggregate(node: nodes.AggregateNode, ordered: bool = True):
result = compile_unordered(node.child).aggregate(
result = compile_unordered_ir(node.child).aggregate(
node.aggregations, node.by_column_ids, node.dropna
)
return result if ordered else result.to_unordered()


@_compile_node.register
def compile_corr(node: nodes.CorrNode, ordered: bool = True):
result = compile_unordered(node.child).corr_aggregate(node.corr_aggregations)
result = compile_unordered_ir(node.child).corr_aggregate(node.corr_aggregations)
return result if ordered else result.to_unordered()


@_compile_node.register
def compile_window(node: nodes.WindowOpNode, ordered: bool = True):
result = compile_ordered(node.child).project_window_op(
result = compile_ordered_ir(node.child).project_window_op(
node.column_name,
node.op,
node.window_spec,
Expand Down
55 changes: 55 additions & 0 deletions bigframes/core/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from dataclasses import dataclass, field, fields
import functools
import itertools
import typing
from typing import Tuple

Expand Down Expand Up @@ -74,6 +75,18 @@ def session(self):
def _node_hash(self):
return hash(tuple(hash(getattr(self, field.name)) for field in fields(self)))

@property
def peekable(self) -> bool:
"""Indicates whether the node can be sampled efficiently"""
return all(child.peekable for child in self.child_nodes)

@property
def roots(self) -> typing.Set[BigFrameNode]:
roots = itertools.chain.from_iterable(
map(lambda child: child.roots, self.child_nodes)
)
return set(roots)


@dataclass(frozen=True)
class UnaryNode(BigFrameNode):
Expand All @@ -98,6 +111,12 @@ def child_nodes(self) -> typing.Sequence[BigFrameNode]:
def __hash__(self):
return self._node_hash

@property
def peekable(self) -> bool:
children_peekable = all(child.peekable for child in self.child_nodes)
single_root = len(self.roots) == 1
return children_peekable and single_root


@dataclass(frozen=True)
class ConcatNode(BigFrameNode):
Expand All @@ -119,6 +138,14 @@ class ReadLocalNode(BigFrameNode):
def __hash__(self):
return self._node_hash

@property
def peekable(self) -> bool:
return True

@property
def roots(self) -> typing.Set[BigFrameNode]:
return {self}


# TODO: Refactor to take raw gbq object reference
@dataclass(frozen=True)
Expand All @@ -136,6 +163,14 @@ def session(self):
def __hash__(self):
return self._node_hash

@property
def peekable(self) -> bool:
return True

@property
def roots(self) -> typing.Set[BigFrameNode]:
return {self}


# Unary nodes
@dataclass(frozen=True)
Expand All @@ -145,6 +180,10 @@ class PromoteOffsetsNode(UnaryNode):
def __hash__(self):
return self._node_hash

@property
def peekable(self) -> bool:
return False


@dataclass(frozen=True)
class FilterNode(UnaryNode):
Expand Down Expand Up @@ -194,6 +233,10 @@ class AggregateNode(UnaryNode):
def __hash__(self):
return self._node_hash

@property
def peekable(self) -> bool:
return False


# TODO: Unify into aggregate
@dataclass(frozen=True)
Expand All @@ -203,6 +246,10 @@ class CorrNode(UnaryNode):
def __hash__(self):
return self._node_hash

@property
def peekable(self) -> bool:
return False


@dataclass(frozen=True)
class WindowOpNode(UnaryNode):
Expand All @@ -216,6 +263,10 @@ class WindowOpNode(UnaryNode):
def __hash__(self):
return self._node_hash

@property
def peekable(self) -> bool:
return False


@dataclass(frozen=True)
class ReprojectOpNode(UnaryNode):
Expand All @@ -239,6 +290,10 @@ class UnpivotNode(UnaryNode):
def __hash__(self):
return self._node_hash

@property
def peekable(self) -> bool:
return False


@dataclass(frozen=True)
class RandomSampleNode(UnaryNode):
Expand Down
31 changes: 31 additions & 0 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1066,6 +1066,37 @@ def head(self, n: int = 5) -> DataFrame:
def tail(self, n: int = 5) -> DataFrame:
return typing.cast(DataFrame, self.iloc[-n:])

def peek(self, n: int = 5, *, force: bool = False) -> pandas.DataFrame:
"""
Preview n arbitrary rows from the dataframe. No guarantees about row selection or ordering.
DataFrame.peek(force=False) will always be very fast, but will not succeed if data requires
full data scanning. Using force=True will always succeed, but may be perform expensive
computations.

Args:
n (int, default 5):
The number of rows to select from the dataframe. Which N rows are returned is non-deterministic.
force (bool, default False):
If the data cannot be peeked efficiently, the dataframe will instead be fully materialized as part
of the operation if force=True. If force=False, the operation will throw a ValueError.
Returns:
pandas.DataFrame: A pandas DataFrame with n rows.

Raises:
ValueError: If force=False and data cannot be efficiently peeked.
"""
maybe_result = self._block.try_peek(n)
if maybe_result is None:
if force:
self._cached()
maybe_result = self._block.try_peek(n)
assert maybe_result is not None
else:
raise ValueError(
"Cannot peek efficiently when data has aggregates, joins or window functions applied. Use force=True to fully compute dataframe."
)
return maybe_result.set_axis(self._block.column_labels, axis=1, copy=False)

def nlargest(
self,
n: int,
Expand Down
15 changes: 13 additions & 2 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1508,6 +1508,17 @@ def _execute(
job_config=job_config,
)

def _peek(
self, array_value: core.ArrayValue, n_rows: int
) -> tuple[bigquery.table.RowIterator, bigquery.QueryJob]:
"""A 'peek' efficiently accesses a small number of rows in the dataframe."""
if not array_value.node.peekable:
raise NotImplementedError("cannot efficient peek this dataframe")
sql = self._compile_unordered(array_value).peek_sql(n_rows)
return self._start_query(
sql=sql,
)

def _to_sql(
self,
array_value: core.ArrayValue,
Expand All @@ -1528,12 +1539,12 @@ def _to_sql(
def _compile_ordered(
self, array_value: core.ArrayValue
) -> bigframes.core.compile.OrderedIR:
return bigframes.core.compile.compile_ordered(array_value.node)
return bigframes.core.compile.compile_ordered_ir(array_value.node)

def _compile_unordered(
self, array_value: core.ArrayValue
) -> bigframes.core.compile.UnorderedIR:
return bigframes.core.compile.compile_unordered(array_value.node)
return bigframes.core.compile.compile_unordered_ir(array_value.node)

def _get_table_size(self, destination_table):
table = self.bqclient.get_table(destination_table)
Expand Down
Loading