Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix case where df.peek would fail to execute even with force=True #511

Merged
merged 5 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import bigframes.core.guid as guid
import bigframes.core.join_def as join_defs
import bigframes.core.ordering as ordering
import bigframes.core.tree_properties as tree_properties
import bigframes.core.utils
import bigframes.core.utils as utils
import bigframes.dtypes
Expand Down Expand Up @@ -444,7 +445,7 @@ def to_pandas(
return df, query_job

def try_peek(self, n: int = 20) -> typing.Optional[pd.DataFrame]:
if self.expr.node.peekable:
if tree_properties.peekable(self.expr.node):
iterator, _ = self.session._peek(self.expr, n)
df = self._to_dataframe(iterator)
self._copy_index_to_pandas(df)
Expand Down
37 changes: 1 addition & 36 deletions bigframes/core/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,6 @@ def session(self):
def _node_hash(self):
return hash(tuple(hash(getattr(self, field.name)) for field in fields(self)))

@property
def peekable(self) -> bool:
"""Indicates whether the node can be sampled efficiently"""
return all(child.peekable for child in self.child_nodes)

@property
def roots(self) -> typing.Set[BigFrameNode]:
roots = itertools.chain.from_iterable(
Expand Down Expand Up @@ -143,12 +138,6 @@ def child_nodes(self) -> typing.Sequence[BigFrameNode]:
def __hash__(self):
return self._node_hash

@property
def peekable(self) -> bool:
children_peekable = all(child.peekable for child in self.child_nodes)
single_root = len(self.roots) == 1
return children_peekable and single_root

@functools.cached_property
def schema(self) -> schemata.ArraySchema:
def join_mapping_to_schema_item(mapping: JoinColumnMapping):
Expand Down Expand Up @@ -204,10 +193,6 @@ class ReadLocalNode(BigFrameNode):
def __hash__(self):
return self._node_hash

@property
def peekable(self) -> bool:
return True

@property
def roots(self) -> typing.Set[BigFrameNode]:
return {self}
Expand All @@ -233,10 +218,6 @@ def session(self):
def __hash__(self):
return self._node_hash

@property
def peekable(self) -> bool:
return True

@property
def roots(self) -> typing.Set[BigFrameNode]:
return {self}
Expand All @@ -261,13 +242,9 @@ class PromoteOffsetsNode(UnaryNode):
def __hash__(self):
return self._node_hash

@property
def peekable(self) -> bool:
return False

@property
def non_local(self) -> bool:
return False
return True

@property
def schema(self) -> schemata.ArraySchema:
Expand Down Expand Up @@ -365,10 +342,6 @@ def row_preserving(self) -> bool:
def __hash__(self):
return self._node_hash

@property
def peekable(self) -> bool:
return False

@property
def non_local(self) -> bool:
return True
Expand Down Expand Up @@ -401,10 +374,6 @@ class WindowOpNode(UnaryNode):
def __hash__(self):
return self._node_hash

@property
def peekable(self) -> bool:
return False

@property
def non_local(self) -> bool:
return True
Expand Down Expand Up @@ -453,10 +422,6 @@ def row_preserving(self) -> bool:
def non_local(self) -> bool:
return True

@property
def peekable(self) -> bool:
return False

@functools.cached_property
def schema(self) -> schemata.ArraySchema:
def infer_dtype(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.


import bigframes.core.nodes as nodes

# TODO: Convert these functions to iterative or enforce hard limit on tree depth. The below algorithms can cause stack to exceed limit.


def is_trivially_executable(node: nodes.BigFrameNode) -> bool:
if local_only(node):
Expand All @@ -25,3 +28,11 @@ def is_trivially_executable(node: nodes.BigFrameNode) -> bool:

def local_only(node: nodes.BigFrameNode) -> bool:
return all(isinstance(node, nodes.ReadLocalNode) for node in node.roots)


def peekable(node: nodes.BigFrameNode) -> bool:
if local_only(node):
return True
children_peekable = all(peekable(child) for child in node.child_nodes)
self_peekable = not node.non_local
return children_peekable and self_peekable
5 changes: 3 additions & 2 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@
import bigframes.core.guid as guid
from bigframes.core.ordering import IntegerEncoding
import bigframes.core.ordering as order
import bigframes.core.traversal as traversals
import bigframes.core.tree_properties as traversals
import bigframes.core.tree_properties as tree_properties
import bigframes.core.utils as utils
import bigframes.dtypes
import bigframes.formatting_helpers as formatting_helpers
Expand Down Expand Up @@ -1848,7 +1849,7 @@ def _peek(
self, array_value: core.ArrayValue, n_rows: int
) -> tuple[bigquery.table.RowIterator, bigquery.QueryJob]:
"""A 'peek' efficiently accesses a small number of rows in the dataframe."""
if not array_value.node.peekable:
if not tree_properties.peekable(array_value.node):
raise NotImplementedError("cannot efficient peek this dataframe")
sql = self._compile_unordered(array_value).peek_sql(n_rows)
return self._start_query(
Expand Down
11 changes: 11 additions & 0 deletions tests/system/small/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,17 @@ def test_df_peek_force_default(scalars_dfs):
assert len(peek_result) == 3


def test_df_peek_reset_index(scalars_dfs):
scalars_df, scalars_pandas_df = scalars_dfs
peek_result = (
scalars_df[["int64_col", "int64_too"]].reset_index(drop=True).peek(n=3)
)
pd.testing.assert_index_equal(
scalars_pandas_df[["int64_col", "int64_too"]].columns, peek_result.columns
)
assert len(peek_result) == 3


def test_repr_w_all_rows(scalars_dfs):
scalars_df, scalars_pandas_df = scalars_dfs

Expand Down