Skip to content

Commit

Permalink
feat: add DataFrame.peek() as an efficient alternative to head()
Browse files Browse the repository at this point in the history
…results preview (#318)

* feat: add efficient peek dataframe preview

* add force parameter to peek to cache full dataframe

* add df.peek docstring

* set peek to default force=False

* update peek docstring and error type

---------

Co-authored-by: Tim Swast <[email protected]>
  • Loading branch information
TrevorBergeron and tswast authored Jan 25, 2024
1 parent d88c562 commit 9c34d83
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 24 deletions.
4 changes: 2 additions & 2 deletions bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,10 @@ def get_column_type(self, key: str) -> bigframes.dtypes.Dtype:
return self._compile_ordered().get_column_type(key)

def _compile_ordered(self) -> compiling.OrderedIR:
return compiling.compile_ordered(self.node)
return compiling.compile_ordered_ir(self.node)

def _compile_unordered(self) -> compiling.UnorderedIR:
return compiling.compile_unordered(self.node)
return compiling.compile_unordered_ir(self.node)

def row_count(self) -> ArrayValue:
"""Get number of rows in ArrayValue as a single-entry ArrayValue."""
Expand Down
10 changes: 10 additions & 0 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,8 +432,18 @@ def to_pandas(
downsampling=sampling, ordered=ordered
)
)
df.set_axis(self.column_labels, axis=1, copy=False)
return df, query_job

def try_peek(self, n: int = 20) -> typing.Optional[pd.DataFrame]:
if self.expr.node.peekable:
iterator, _ = self.session._peek(self.expr, n)
df = self._to_dataframe(iterator)
self._copy_index_to_pandas(df)
return df
else:
return None

def to_pandas_batches(self):
"""Download results one message at a time."""
dtypes = dict(zip(self.index_columns, self.index_dtypes))
Expand Down
6 changes: 3 additions & 3 deletions bigframes/core/compile/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
# limitations under the License.

from bigframes.core.compile.compiled import OrderedIR, UnorderedIR
from bigframes.core.compile.compiler import compile_ordered, compile_unordered
from bigframes.core.compile.compiler import compile_ordered_ir, compile_unordered_ir

__all__ = [
"compile_ordered",
"compile_unordered",
"compile_ordered_ir",
"compile_unordered_ir",
"OrderedIR",
"UnorderedIR",
]
7 changes: 7 additions & 0 deletions bigframes/core/compile/compiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,13 @@ def builder(self):
predicates=self._predicates,
)

def peek_sql(self, n: int):
# Peek currently implemented as top level LIMIT op.
# Execution engine handles limit pushdown.
# In future, may push down limit/filters in compilation.
sql = ibis_bigquery.Backend().compile(self._to_ibis_expr().limit(n))
return typing.cast(str, sql)

def to_sql(
self,
offset_column: typing.Optional[str] = None,
Expand Down
38 changes: 21 additions & 17 deletions bigframes/core/compile/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,18 @@
import bigframes.session


def compile_ordered(node: nodes.BigFrameNode) -> compiled.OrderedIR:
def compile_ordered_ir(node: nodes.BigFrameNode) -> compiled.OrderedIR:
return typing.cast(compiled.OrderedIR, compile_node(node, True))


def compile_unordered(node: nodes.BigFrameNode) -> compiled.UnorderedIR:
def compile_unordered_ir(node: nodes.BigFrameNode) -> compiled.UnorderedIR:
return typing.cast(compiled.UnorderedIR, compile_node(node, False))


def compile_peak_sql(node: nodes.BigFrameNode, n_rows: int) -> typing.Optional[str]:
return compile_unordered_ir(node).peek_sql(n_rows)


@functools.cache
def compile_node(
node: nodes.BigFrameNode, ordered: bool = True
Expand All @@ -56,17 +60,17 @@ def _compile_node(
@_compile_node.register
def compile_join(node: nodes.JoinNode, ordered: bool = True):
if ordered:
left_ordered = compile_ordered(node.left_child)
right_ordered = compile_ordered(node.right_child)
left_ordered = compile_ordered_ir(node.left_child)
right_ordered = compile_ordered_ir(node.right_child)
return bigframes.core.compile.single_column.join_by_column_ordered(
left=left_ordered,
right=right_ordered,
join=node.join,
allow_row_identity_join=node.allow_row_identity_join,
)
else:
left_unordered = compile_unordered(node.left_child)
right_unordered = compile_unordered(node.right_child)
left_unordered = compile_unordered_ir(node.left_child)
right_unordered = compile_unordered_ir(node.right_child)
return bigframes.core.compile.single_column.join_by_column_unordered(
left=left_unordered,
right=right_unordered,
Expand Down Expand Up @@ -103,7 +107,7 @@ def compile_readgbq(node: nodes.ReadGbqNode, ordered: bool = True):

@_compile_node.register
def compile_promote_offsets(node: nodes.PromoteOffsetsNode, ordered: bool = True):
result = compile_ordered(node.child).promote_offsets(node.col_id)
result = compile_ordered_ir(node.child).promote_offsets(node.col_id)
return result if ordered else result.to_unordered()


Expand All @@ -115,17 +119,17 @@ def compile_filter(node: nodes.FilterNode, ordered: bool = True):
@_compile_node.register
def compile_orderby(node: nodes.OrderByNode, ordered: bool = True):
if ordered:
return compile_ordered(node.child).order_by(node.by)
return compile_ordered_ir(node.child).order_by(node.by)
else:
return compile_unordered(node.child)
return compile_unordered_ir(node.child)


@_compile_node.register
def compile_reversed(node: nodes.ReversedNode, ordered: bool = True):
if ordered:
return compile_ordered(node.child).reversed()
return compile_ordered_ir(node.child).reversed()
else:
return compile_unordered(node.child)
return compile_unordered_ir(node.child)


@_compile_node.register
Expand All @@ -137,36 +141,36 @@ def compile_projection(node: nodes.ProjectionNode, ordered: bool = True):
@_compile_node.register
def compile_concat(node: nodes.ConcatNode, ordered: bool = True):
if ordered:
compiled_ordered = [compile_ordered(node) for node in node.children]
compiled_ordered = [compile_ordered_ir(node) for node in node.children]
return concat_impl.concat_ordered(compiled_ordered)
else:
compiled_unordered = [compile_unordered(node) for node in node.children]
compiled_unordered = [compile_unordered_ir(node) for node in node.children]
return concat_impl.concat_unordered(compiled_unordered)


@_compile_node.register
def compile_rowcount(node: nodes.RowCountNode, ordered: bool = True):
result = compile_unordered(node.child).row_count()
result = compile_unordered_ir(node.child).row_count()
return result if ordered else result.to_unordered()


@_compile_node.register
def compile_aggregate(node: nodes.AggregateNode, ordered: bool = True):
result = compile_unordered(node.child).aggregate(
result = compile_unordered_ir(node.child).aggregate(
node.aggregations, node.by_column_ids, node.dropna
)
return result if ordered else result.to_unordered()


@_compile_node.register
def compile_corr(node: nodes.CorrNode, ordered: bool = True):
result = compile_unordered(node.child).corr_aggregate(node.corr_aggregations)
result = compile_unordered_ir(node.child).corr_aggregate(node.corr_aggregations)
return result if ordered else result.to_unordered()


@_compile_node.register
def compile_window(node: nodes.WindowOpNode, ordered: bool = True):
result = compile_ordered(node.child).project_window_op(
result = compile_ordered_ir(node.child).project_window_op(
node.column_name,
node.op,
node.window_spec,
Expand Down
55 changes: 55 additions & 0 deletions bigframes/core/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from dataclasses import dataclass, field, fields
import functools
import itertools
import typing
from typing import Tuple

Expand Down Expand Up @@ -74,6 +75,18 @@ def session(self):
def _node_hash(self):
return hash(tuple(hash(getattr(self, field.name)) for field in fields(self)))

@property
def peekable(self) -> bool:
"""Indicates whether the node can be sampled efficiently"""
return all(child.peekable for child in self.child_nodes)

@property
def roots(self) -> typing.Set[BigFrameNode]:
roots = itertools.chain.from_iterable(
map(lambda child: child.roots, self.child_nodes)
)
return set(roots)


@dataclass(frozen=True)
class UnaryNode(BigFrameNode):
Expand All @@ -98,6 +111,12 @@ def child_nodes(self) -> typing.Sequence[BigFrameNode]:
def __hash__(self):
return self._node_hash

@property
def peekable(self) -> bool:
children_peekable = all(child.peekable for child in self.child_nodes)
single_root = len(self.roots) == 1
return children_peekable and single_root


@dataclass(frozen=True)
class ConcatNode(BigFrameNode):
Expand All @@ -119,6 +138,14 @@ class ReadLocalNode(BigFrameNode):
def __hash__(self):
return self._node_hash

@property
def peekable(self) -> bool:
return True

@property
def roots(self) -> typing.Set[BigFrameNode]:
return {self}


# TODO: Refactor to take raw gbq object reference
@dataclass(frozen=True)
Expand All @@ -136,6 +163,14 @@ def session(self):
def __hash__(self):
return self._node_hash

@property
def peekable(self) -> bool:
return True

@property
def roots(self) -> typing.Set[BigFrameNode]:
return {self}


# Unary nodes
@dataclass(frozen=True)
Expand All @@ -145,6 +180,10 @@ class PromoteOffsetsNode(UnaryNode):
def __hash__(self):
return self._node_hash

@property
def peekable(self) -> bool:
return False


@dataclass(frozen=True)
class FilterNode(UnaryNode):
Expand Down Expand Up @@ -194,6 +233,10 @@ class AggregateNode(UnaryNode):
def __hash__(self):
return self._node_hash

@property
def peekable(self) -> bool:
return False


# TODO: Unify into aggregate
@dataclass(frozen=True)
Expand All @@ -203,6 +246,10 @@ class CorrNode(UnaryNode):
def __hash__(self):
return self._node_hash

@property
def peekable(self) -> bool:
return False


@dataclass(frozen=True)
class WindowOpNode(UnaryNode):
Expand All @@ -216,6 +263,10 @@ class WindowOpNode(UnaryNode):
def __hash__(self):
return self._node_hash

@property
def peekable(self) -> bool:
return False


@dataclass(frozen=True)
class ReprojectOpNode(UnaryNode):
Expand All @@ -239,6 +290,10 @@ class UnpivotNode(UnaryNode):
def __hash__(self):
return self._node_hash

@property
def peekable(self) -> bool:
return False


@dataclass(frozen=True)
class RandomSampleNode(UnaryNode):
Expand Down
31 changes: 31 additions & 0 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1066,6 +1066,37 @@ def head(self, n: int = 5) -> DataFrame:
def tail(self, n: int = 5) -> DataFrame:
return typing.cast(DataFrame, self.iloc[-n:])

def peek(self, n: int = 5, *, force: bool = False) -> pandas.DataFrame:
"""
Preview n arbitrary rows from the dataframe. No guarantees about row selection or ordering.
DataFrame.peek(force=False) will always be very fast, but will not succeed if data requires
full data scanning. Using force=True will always succeed, but may be perform expensive
computations.
Args:
n (int, default 5):
The number of rows to select from the dataframe. Which N rows are returned is non-deterministic.
force (bool, default False):
If the data cannot be peeked efficiently, the dataframe will instead be fully materialized as part
of the operation if force=True. If force=False, the operation will throw a ValueError.
Returns:
pandas.DataFrame: A pandas DataFrame with n rows.
Raises:
ValueError: If force=False and data cannot be efficiently peeked.
"""
maybe_result = self._block.try_peek(n)
if maybe_result is None:
if force:
self._cached()
maybe_result = self._block.try_peek(n)
assert maybe_result is not None
else:
raise ValueError(
"Cannot peek efficiently when data has aggregates, joins or window functions applied. Use force=True to fully compute dataframe."
)
return maybe_result.set_axis(self._block.column_labels, axis=1, copy=False)

def nlargest(
self,
n: int,
Expand Down
15 changes: 13 additions & 2 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1508,6 +1508,17 @@ def _execute(
job_config=job_config,
)

def _peek(
self, array_value: core.ArrayValue, n_rows: int
) -> tuple[bigquery.table.RowIterator, bigquery.QueryJob]:
"""A 'peek' efficiently accesses a small number of rows in the dataframe."""
if not array_value.node.peekable:
raise NotImplementedError("cannot efficient peek this dataframe")
sql = self._compile_unordered(array_value).peek_sql(n_rows)
return self._start_query(
sql=sql,
)

def _to_sql(
self,
array_value: core.ArrayValue,
Expand All @@ -1528,12 +1539,12 @@ def _to_sql(
def _compile_ordered(
self, array_value: core.ArrayValue
) -> bigframes.core.compile.OrderedIR:
return bigframes.core.compile.compile_ordered(array_value.node)
return bigframes.core.compile.compile_ordered_ir(array_value.node)

def _compile_unordered(
self, array_value: core.ArrayValue
) -> bigframes.core.compile.UnorderedIR:
return bigframes.core.compile.compile_unordered(array_value.node)
return bigframes.core.compile.compile_unordered_ir(array_value.node)

def _get_table_size(self, destination_table):
table = self.bqclient.get_table(destination_table)
Expand Down
Loading

0 comments on commit 9c34d83

Please sign in to comment.