Skip to content

Commit

Permalink
refactor: move query execution from ArrayValue to Session (#255)
Browse files Browse the repository at this point in the history
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/python-bigquery-dataframes/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> 🦕
  • Loading branch information
TrevorBergeron authored Dec 16, 2023
1 parent 7cbbb7d commit 02f7ab6
Show file tree
Hide file tree
Showing 9 changed files with 155 additions and 111 deletions.
110 changes: 15 additions & 95 deletions bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@
from dataclasses import dataclass
import io
import typing
from typing import Iterable, Literal, Optional, Sequence, Tuple
from typing import Iterable, Literal, Sequence

from google.cloud import bigquery
import ibis
import ibis.expr.types as ibis_types
import pandas

Expand Down Expand Up @@ -86,7 +84,17 @@ def session(self) -> Session:
required_session = self.node.session
from bigframes import get_global_session

return self.node.session[0] if required_session else get_global_session()
return (
required_session if (required_session is not None) else get_global_session()
)

def _try_evaluate_local(self):
"""Use only for unit testing paths - not fully featured. Will throw exception if fails."""
import ibis

return ibis.pandas.connect({}).execute(
self._compile_ordered()._to_ibis_expr(ordering_mode="unordered")
)

def get_column_type(self, key: str) -> bigframes.dtypes.Dtype:
return self._compile_ordered().get_column_type(key)
Expand All @@ -97,97 +105,9 @@ def _compile_ordered(self) -> compiled.OrderedIR:
def _compile_unordered(self) -> compiled.UnorderedIR:
return compiler.compile_unordered(self.node)

def shape(self) -> typing.Tuple[int, int]:
"""Returns dimensions as (length, width) tuple."""
width = len(self._compile_unordered().columns)
count_expr = self._compile_unordered()._to_ibis_expr().count()

# Support in-memory engines for hermetic unit tests.
if not self.node.session:
try:
length = ibis.pandas.connect({}).execute(count_expr)
return (length, width)
except Exception:
# Not all cases can be handled by pandas engine
pass

sql = self.session.ibis_client.compile(count_expr)
row_iterator, _ = self.session._start_query(
sql=sql,
max_results=1,
)
length = next(row_iterator)[0]
return (length, width)

def to_sql(
self,
offset_column: typing.Optional[str] = None,
col_id_overrides: typing.Mapping[str, str] = {},
sorted: bool = False,
) -> str:
array_value = self
if offset_column:
array_value = self.promote_offsets(offset_column)
if sorted:
return array_value._compile_ordered().to_sql(
col_id_overrides=col_id_overrides,
sorted=sorted,
)
else:
return array_value._compile_unordered().to_sql(
col_id_overrides=col_id_overrides
)

def start_query(
self,
job_config: Optional[bigquery.job.QueryJobConfig] = None,
max_results: Optional[int] = None,
*,
sorted: bool = True,
) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]:
"""Execute a query and return metadata about the results."""
# TODO(swast): Cache the job ID so we can look it up again if they ask
# for the results? We'd need a way to invalidate the cache if DataFrame
# becomes mutable, though. Or move this method to the immutable
# expression class.
# TODO(swast): We might want to move this method to Session and/or
# provide our own minimal metadata class. Tight coupling to the
# BigQuery client library isn't ideal, especially if we want to support
# a LocalSession for unit testing.
# TODO(swast): Add a timeout here? If the query is taking a long time,
# maybe we just print the job metadata that we have so far?
sql = self.to_sql(sorted=sorted) # type:ignore
return self.session._start_query(
sql=sql,
job_config=job_config,
max_results=max_results,
)

def cached(self, cluster_cols: typing.Sequence[str]) -> ArrayValue:
"""Write the ArrayValue to a session table and create a new block object that references it."""
compiled_value = self._compile_ordered()
ibis_expr = compiled_value._to_ibis_expr(
ordering_mode="unordered", expose_hidden_cols=True
)
tmp_table = self.session._ibis_to_temp_table(
ibis_expr, cluster_cols=cluster_cols, api_name="cached"
)

table_expression = self.session.ibis_client.table(
f"{tmp_table.project}.{tmp_table.dataset_id}.{tmp_table.table_id}"
)
new_columns = [table_expression[column] for column in compiled_value.column_ids]
new_hidden_columns = [
table_expression[column]
for column in compiled_value._hidden_ordering_column_names
]
return ArrayValue.from_ibis(
self.session,
table_expression,
columns=new_columns,
hidden_ordering_columns=new_hidden_columns,
ordering=compiled_value._ordering,
)
def row_count(self) -> ArrayValue:
"""Get number of rows in ArrayValue as a single-entry ArrayValue."""
return ArrayValue(nodes.RowCountNode(child=self.node))

# Operations

Expand Down
37 changes: 26 additions & 11 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,19 @@ def index(self) -> indexes.IndexValue:
@functools.cached_property
def shape(self) -> typing.Tuple[int, int]:
"""Returns dimensions as (length, width) tuple."""
impl_length, _ = self._expr.shape()
return (impl_length, len(self.value_columns))
row_count_expr = self.expr.row_count()

# Support in-memory engines for hermetic unit tests.
if self.expr.node.session is None:
try:
row_count = row_count_expr._try_evaluate_local().squeeze()
return (row_count, len(self.value_columns))
except Exception:
pass

iter, _ = self.session._execute(row_count_expr, sorted=False)
row_count = next(iter)[0]
return (row_count, len(self.value_columns))

@property
def index_columns(self) -> Sequence[str]:
Expand Down Expand Up @@ -182,6 +193,10 @@ def index_dtypes(
"""Returns the dtypes of the index columns."""
return [self.expr.get_column_type(col) for col in self.index_columns]

@property
def session(self) -> core.Session:
return self._expr.session

@functools.cached_property
def col_id_to_label(self) -> typing.Mapping[str, Label]:
"""Get column label for value columns, or index name for index columns"""
Expand Down Expand Up @@ -376,7 +391,7 @@ def _to_dataframe(self, result) -> pd.DataFrame:
"""Convert BigQuery data to pandas DataFrame with specific dtypes."""
dtypes = dict(zip(self.index_columns, self.index_dtypes))
dtypes.update(zip(self.value_columns, self.dtypes))
return self._expr.session._rows_to_dataframe(result, dtypes)
return self.session._rows_to_dataframe(result, dtypes)

def to_pandas(
self,
Expand Down Expand Up @@ -404,9 +419,9 @@ def to_pandas_batches(self):
"""Download results one message at a time."""
dtypes = dict(zip(self.index_columns, self.index_dtypes))
dtypes.update(zip(self.value_columns, self.dtypes))
results_iterator, _ = self._expr.start_query()
results_iterator, _ = self.session._execute(self.expr, sorted=True)
for arrow_table in results_iterator.to_arrow_iterable(
bqstorage_client=self._expr.session.bqstoragereadclient
bqstorage_client=self.session.bqstoragereadclient
):
df = bigframes.session._io.pandas.arrow_to_pandas(arrow_table, dtypes)
self._copy_index_to_pandas(df)
Expand Down Expand Up @@ -460,12 +475,12 @@ def _compute_and_count(

expr = self._apply_value_keys_to_expr(value_keys=value_keys)

results_iterator, query_job = expr.start_query(
max_results=max_results, sorted=ordered
results_iterator, query_job = self.session._execute(
expr, max_results=max_results, sorted=ordered
)

table_size = (
expr.session._get_table_size(query_job.destination) / _BYTES_TO_MEGABYTES
self.session._get_table_size(query_job.destination) / _BYTES_TO_MEGABYTES
)
fraction = (
max_download_size / table_size
Expand Down Expand Up @@ -607,7 +622,7 @@ def _compute_dry_run(
) -> bigquery.QueryJob:
expr = self._apply_value_keys_to_expr(value_keys=value_keys)
job_config = bigquery.QueryJobConfig(dry_run=True)
_, query_job = expr.start_query(job_config=job_config)
_, query_job = self.session._execute(expr, job_config=job_config, dry_run=True)
return query_job

def _apply_value_keys_to_expr(self, value_keys: Optional[Iterable[str]] = None):
Expand Down Expand Up @@ -1668,7 +1683,7 @@ def to_sql_query(
# the BigQuery unicode column name feature?
substitutions[old_id] = new_id

sql = array_value.to_sql(col_id_overrides=substitutions)
sql = self.session._to_sql(array_value, col_id_overrides=substitutions)
return (
sql,
new_ids[: len(idx_labels)],
Expand All @@ -1678,7 +1693,7 @@ def to_sql_query(
def cached(self) -> Block:
"""Write the block to a session table and create a new block object that references it."""
return Block(
self.expr.cached(cluster_cols=self.index_columns),
self.session._execute_and_cache(self.expr, cluster_cols=self.index_columns),
index_columns=self.index_columns,
column_labels=self.column_labels,
index_labels=self.index_labels,
Expand Down
16 changes: 16 additions & 0 deletions bigframes/core/compile/compiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,22 @@ def to_sql(
)
return typing.cast(str, sql)

def row_count(self) -> OrderedIR:
original_table = self._to_ibis_expr()
ibis_table = original_table.agg(
[
original_table.count().name("count"),
]
)
return OrderedIR(
ibis_table,
(ibis_table["count"],),
ordering=ExpressionOrdering(
ordering_value_columns=(OrderingColumnReference("count"),),
total_ordering_columns=frozenset(["count"]),
),
)

def _to_ibis_expr(
self,
*,
Expand Down
6 changes: 6 additions & 0 deletions bigframes/core/compile/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ def compile_concat(node: nodes.ConcatNode, ordered: bool = True):
return concat_impl.concat_unordered(compiled_unordered)


@_compile_node.register
def compile_rowcount(node: nodes.RowCountNode, ordered: bool = True):
result = compile_unordered(node.child).row_count()
return result if ordered else result.to_unordered()


@_compile_node.register
def compile_aggregate(node: nodes.AggregateNode, ordered: bool = True):
result = compile_unordered(node.child).aggregate(
Expand Down
6 changes: 5 additions & 1 deletion bigframes/core/indexes/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,10 @@ def dtypes(
) -> typing.Sequence[typing.Union[bf_dtypes.Dtype, np.dtype[typing.Any]]]:
return self._block.index_dtypes

@property
def session(self) -> core.Session:
return self._expr.session

def __repr__(self) -> str:
"""Converts an Index to a string."""
# TODO(swast): Add a timeout here? If the query is taking a long time,
Expand All @@ -411,7 +415,7 @@ def to_pandas(self) -> pandas.Index:
index_columns = list(self._block.index_columns)
dtypes = dict(zip(index_columns, self.dtypes))
expr = self._expr.select_columns(index_columns)
results, _ = expr.start_query()
results, _ = self.session._execute(expr)
df = expr.session._rows_to_dataframe(results, dtypes)
df = df.set_index(index_columns)
index = df.index
Expand Down
8 changes: 7 additions & 1 deletion bigframes/core/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class ReadGbqNode(BigFrameNode):

@property
def session(self):
return (self.table_session,)
return self.table_session

def __hash__(self):
return self._node_hash
Expand Down Expand Up @@ -229,6 +229,12 @@ def __hash__(self):
return self._node_hash


# TODO: Merge RowCount and Corr into Aggregate Node
@dataclass(frozen=True)
class RowCountNode(UnaryNode):
pass


@dataclass(frozen=True)
class AggregateNode(UnaryNode):
aggregations: typing.Tuple[typing.Tuple[str, agg_ops.AggregateOp, str], ...]
Expand Down
3 changes: 2 additions & 1 deletion bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2701,7 +2701,8 @@ def _create_io_query(self, index: bool, ordering_id: Optional[str]) -> str:

if ordering_id is not None:
array_value = array_value.promote_offsets(ordering_id)
return array_value.to_sql(
return self._block.session._to_sql(
array_value=array_value,
col_id_overrides=id_overrides,
)

Expand Down
Loading

0 comments on commit 02f7ab6

Please sign in to comment.