Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add efficient peek dataframe preview #318

Merged
merged 14 commits into from
Jan 25, 2024
Merged
4 changes: 2 additions & 2 deletions bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,10 @@ def get_column_type(self, key: str) -> bigframes.dtypes.Dtype:
return self._compile_ordered().get_column_type(key)

def _compile_ordered(self) -> compiling.OrderedIR:
return compiling.compile_ordered(self.node)
return compiling.compile_ordered_ir(self.node)

def _compile_unordered(self) -> compiling.UnorderedIR:
return compiling.compile_unordered(self.node)
return compiling.compile_unordered_ir(self.node)

def row_count(self) -> ArrayValue:
"""Get number of rows in ArrayValue as a single-entry ArrayValue."""
Expand Down
10 changes: 10 additions & 0 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,8 +432,18 @@ def to_pandas(
downsampling=sampling, ordered=ordered
)
)
df.set_axis(self.column_labels, axis=1, copy=False)
return df, query_job

def try_peek(self, n: int = 20) -> typing.Optional[pd.DataFrame]:
if self.expr.node.peekable:
iterator, _ = self.session._peek(self.expr, n)
df = self._to_dataframe(iterator)
self._copy_index_to_pandas(df)
return df
else:
return None

def to_pandas_batches(self):
"""Download results one message at a time."""
dtypes = dict(zip(self.index_columns, self.index_dtypes))
Expand Down
6 changes: 3 additions & 3 deletions bigframes/core/compile/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
# limitations under the License.

from bigframes.core.compile.compiled import OrderedIR, UnorderedIR
from bigframes.core.compile.compiler import compile_ordered, compile_unordered
from bigframes.core.compile.compiler import compile_ordered_ir, compile_unordered_ir

__all__ = [
"compile_ordered",
"compile_unordered",
"compile_ordered_ir",
"compile_unordered_ir",
"OrderedIR",
"UnorderedIR",
]
7 changes: 7 additions & 0 deletions bigframes/core/compile/compiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,13 @@ def builder(self):
predicates=self._predicates,
)

def peek_sql(self, n: int):
# Peek currently implemented as top level LIMIT op.
# Execution engine handles limit pushdown.
# In future, may push down limit/filters in compilation.
sql = ibis_bigquery.Backend().compile(self._to_ibis_expr().limit(n))
return typing.cast(str, sql)

def to_sql(
self,
offset_column: typing.Optional[str] = None,
Expand Down
38 changes: 21 additions & 17 deletions bigframes/core/compile/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,18 @@
import bigframes.session


def compile_ordered(node: nodes.BigFrameNode) -> compiled.OrderedIR:
def compile_ordered_ir(node: nodes.BigFrameNode) -> compiled.OrderedIR:
return typing.cast(compiled.OrderedIR, compile_node(node, True))


def compile_unordered(node: nodes.BigFrameNode) -> compiled.UnorderedIR:
def compile_unordered_ir(node: nodes.BigFrameNode) -> compiled.UnorderedIR:
return typing.cast(compiled.UnorderedIR, compile_node(node, False))


def compile_peak_sql(node: nodes.BigFrameNode, n_rows: int) -> typing.Optional[str]:
return compile_unordered_ir(node).peek_sql(n_rows)


@functools.cache
def compile_node(
node: nodes.BigFrameNode, ordered: bool = True
Expand All @@ -56,8 +60,8 @@ def _compile_node(
@_compile_node.register
def compile_join(node: nodes.JoinNode, ordered: bool = True):
if ordered:
left_ordered = compile_ordered(node.left_child)
right_ordered = compile_ordered(node.right_child)
left_ordered = compile_ordered_ir(node.left_child)
right_ordered = compile_ordered_ir(node.right_child)
return bigframes.core.compile.single_column.join_by_column_ordered(
left_ordered,
node.left_column_ids,
Expand All @@ -67,8 +71,8 @@ def compile_join(node: nodes.JoinNode, ordered: bool = True):
allow_row_identity_join=node.allow_row_identity_join,
)
else:
left_unordered = compile_unordered(node.left_child)
right_unordered = compile_unordered(node.right_child)
left_unordered = compile_unordered_ir(node.left_child)
right_unordered = compile_unordered_ir(node.right_child)
return bigframes.core.compile.single_column.join_by_column_unordered(
left_unordered,
node.left_column_ids,
Expand Down Expand Up @@ -117,7 +121,7 @@ def compile_readgbq(node: nodes.ReadGbqNode, ordered: bool = True):

@_compile_node.register
def compile_promote_offsets(node: nodes.PromoteOffsetsNode, ordered: bool = True):
result = compile_ordered(node.child).promote_offsets(node.col_id)
result = compile_ordered_ir(node.child).promote_offsets(node.col_id)
return result if ordered else result.to_unordered()


Expand All @@ -129,17 +133,17 @@ def compile_filter(node: nodes.FilterNode, ordered: bool = True):
@_compile_node.register
def compile_orderby(node: nodes.OrderByNode, ordered: bool = True):
if ordered:
return compile_ordered(node.child).order_by(node.by)
return compile_ordered_ir(node.child).order_by(node.by)
else:
return compile_unordered(node.child)
return compile_unordered_ir(node.child)


@_compile_node.register
def compile_reversed(node: nodes.ReversedNode, ordered: bool = True):
if ordered:
return compile_ordered(node.child).reversed()
return compile_ordered_ir(node.child).reversed()
else:
return compile_unordered(node.child)
return compile_unordered_ir(node.child)


@_compile_node.register
Expand All @@ -153,36 +157,36 @@ def compile_projection(node: nodes.ProjectionNode, ordered: bool = True):
@_compile_node.register
def compile_concat(node: nodes.ConcatNode, ordered: bool = True):
if ordered:
compiled_ordered = [compile_ordered(node) for node in node.children]
compiled_ordered = [compile_ordered_ir(node) for node in node.children]
return concat_impl.concat_ordered(compiled_ordered)
else:
compiled_unordered = [compile_unordered(node) for node in node.children]
compiled_unordered = [compile_unordered_ir(node) for node in node.children]
return concat_impl.concat_unordered(compiled_unordered)


@_compile_node.register
def compile_rowcount(node: nodes.RowCountNode, ordered: bool = True):
result = compile_unordered(node.child).row_count()
result = compile_unordered_ir(node.child).row_count()
return result if ordered else result.to_unordered()


@_compile_node.register
def compile_aggregate(node: nodes.AggregateNode, ordered: bool = True):
result = compile_unordered(node.child).aggregate(
result = compile_unordered_ir(node.child).aggregate(
node.aggregations, node.by_column_ids, node.dropna
)
return result if ordered else result.to_unordered()


@_compile_node.register
def compile_corr(node: nodes.CorrNode, ordered: bool = True):
result = compile_unordered(node.child).corr_aggregate(node.corr_aggregations)
result = compile_unordered_ir(node.child).corr_aggregate(node.corr_aggregations)
return result if ordered else result.to_unordered()


@_compile_node.register
def compile_window(node: nodes.WindowOpNode, ordered: bool = True):
result = compile_ordered(node.child).project_window_op(
result = compile_ordered_ir(node.child).project_window_op(
node.column_name,
node.op,
node.window_spec,
Expand Down
55 changes: 55 additions & 0 deletions bigframes/core/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from dataclasses import dataclass, field, fields
import functools
import itertools
import typing
from typing import Tuple

Expand Down Expand Up @@ -73,6 +74,18 @@ def session(self):
def _node_hash(self):
return hash(tuple(hash(getattr(self, field.name)) for field in fields(self)))

@property
def peekable(self) -> bool:
"""Indicates whether the node can be sampled efficiently"""
return all(child.peekable for child in self.child_nodes)

@property
def roots(self) -> typing.Set[BigFrameNode]:
roots = itertools.chain.from_iterable(
map(lambda child: child.roots, self.child_nodes)
)
return set(roots)


@dataclass(frozen=True)
class UnaryNode(BigFrameNode):
Expand Down Expand Up @@ -105,6 +118,12 @@ def child_nodes(self) -> typing.Sequence[BigFrameNode]:
def __hash__(self):
return self._node_hash

@property
def peekable(self) -> bool:
children_peekable = all(child.peekable for child in self.child_nodes)
single_root = len(self.roots) == 1
return children_peekable and single_root


@dataclass(frozen=True)
class ConcatNode(BigFrameNode):
Expand All @@ -126,6 +145,14 @@ class ReadLocalNode(BigFrameNode):
def __hash__(self):
return self._node_hash

@property
def peekable(self) -> bool:
return True

@property
def roots(self) -> typing.Set[BigFrameNode]:
return {self}


# TODO: Refactor to take raw gbq object reference
@dataclass(frozen=True)
Expand All @@ -143,6 +170,14 @@ def session(self):
def __hash__(self):
return self._node_hash

@property
def peekable(self) -> bool:
return True

@property
def roots(self) -> typing.Set[BigFrameNode]:
return {self}


# Unary nodes
@dataclass(frozen=True)
Expand All @@ -160,6 +195,10 @@ class PromoteOffsetsNode(UnaryNode):
def __hash__(self):
return self._node_hash

@property
def peekable(self) -> bool:
return False


@dataclass(frozen=True)
class FilterNode(UnaryNode):
Expand Down Expand Up @@ -218,6 +257,10 @@ class AggregateNode(UnaryNode):
def __hash__(self):
return self._node_hash

@property
def peekable(self) -> bool:
return False


# TODO: Unify into aggregate
@dataclass(frozen=True)
Expand All @@ -227,6 +270,10 @@ class CorrNode(UnaryNode):
def __hash__(self):
return self._node_hash

@property
def peekable(self) -> bool:
return False


@dataclass(frozen=True)
class WindowOpNode(UnaryNode):
Expand All @@ -240,6 +287,10 @@ class WindowOpNode(UnaryNode):
def __hash__(self):
return self._node_hash

@property
def peekable(self) -> bool:
return False


@dataclass(frozen=True)
class ReprojectOpNode(UnaryNode):
Expand All @@ -263,6 +314,10 @@ class UnpivotNode(UnaryNode):
def __hash__(self):
return self._node_hash

@property
def peekable(self) -> bool:
return False


@dataclass(frozen=True)
class AssignNode(UnaryNode):
Expand Down
8 changes: 8 additions & 0 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1066,6 +1066,14 @@ def head(self, n: int = 5) -> DataFrame:
def tail(self, n: int = 5) -> DataFrame:
return typing.cast(DataFrame, self.iloc[-n:])

def peek(self, n: int = 5) -> pandas.DataFrame:
maybe_result = self._block.try_peek(n)
if maybe_result is None:
raise NotImplementedError(
"Cannot peek efficiently when data has aggregates, joins or window functions applied."
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not always going to be obvious when the data has aggregates, windows, or especially not joins (due to implicit joins on index).

Could we instead call _cached() in this case and then peek?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a force=True param to peek. This will cause the dataframe to execute and cache the block if it is not peekable. Are you sure this is the best default behavior? One of my goals with peek was for users to avoid fully computing the dataframe.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, I'd say the pandas API skews towards the most usable default, not the most efficient one. That said, our main motivation here is to address the feedback of how expensive head() can be, so I see the argument for having force=False the default.

)
return maybe_result.set_axis(self._block.column_labels, axis=1, copy=False)

def nlargest(
self,
n: int,
Expand Down
15 changes: 13 additions & 2 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1508,6 +1508,17 @@ def _execute(
job_config=job_config,
)

def _peek(
self, array_value: core.ArrayValue, n_rows: int
) -> tuple[bigquery.table.RowIterator, bigquery.QueryJob]:
"""A 'peek' efficiently accesses a small number of rows in the dataframe."""
if not array_value.node.peekable:
raise NotImplementedError("cannot efficient peek this dataframe")
sql = self._compile_unordered(array_value).peek_sql(n_rows)
return self._start_query(
sql=sql,
)

def _to_sql(
self,
array_value: core.ArrayValue,
Expand All @@ -1528,12 +1539,12 @@ def _to_sql(
def _compile_ordered(
self, array_value: core.ArrayValue
) -> bigframes.core.compile.OrderedIR:
return bigframes.core.compile.compile_ordered(array_value.node)
return bigframes.core.compile.compile_ordered_ir(array_value.node)

def _compile_unordered(
self, array_value: core.ArrayValue
) -> bigframes.core.compile.UnorderedIR:
return bigframes.core.compile.compile_unordered(array_value.node)
return bigframes.core.compile.compile_unordered_ir(array_value.node)

def _get_table_size(self, destination_table):
table = self.bqclient.get_table(destination_table)
Expand Down
22 changes: 22 additions & 0 deletions tests/system/small/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,28 @@ def test_rename(scalars_dfs):
)


def test_df_peek(scalars_dfs):
scalars_df, scalars_pandas_df = scalars_dfs
peek_result = scalars_df.peek(n=3)
pd.testing.assert_index_equal(scalars_pandas_df.columns, peek_result.columns)
assert len(peek_result) == 3


def test_df_peek_filtered(scalars_dfs):
scalars_df, scalars_pandas_df = scalars_dfs
peek_result = scalars_df[scalars_df.int64_col != 0].peek(n=3)
pd.testing.assert_index_equal(scalars_pandas_df.columns, peek_result.columns)
assert len(peek_result) == 3


def test_df_peek_exception(scalars_dfs):
scalars_df, _ = scalars_dfs

with pytest.raises(NotImplementedError):
# Window ops aren't compatible with efficient peeking
scalars_df[["int64_col", "int64_too"]].cumsum().peek(n=3)


def test_repr_w_all_rows(scalars_dfs):
scalars_df, scalars_pandas_df = scalars_dfs

Expand Down