Skip to content

Commit

Permalink
Merge branch 'main' into remote_op_reproject
Browse files Browse the repository at this point in the history
  • Loading branch information
tswast authored Dec 27, 2024
2 parents d780dde + 4d854fd commit 477e50f
Show file tree
Hide file tree
Showing 64 changed files with 4,396 additions and 327 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: "3.9"
python-version: "3.10"
- name: Install nox
run: |
python -m pip install --upgrade setuptools pip wheel
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: "3.8"
python-version: "3.10"
- name: Install nox
run: |
python -m pip install --upgrade setuptools pip wheel
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/unittest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ jobs:
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: "3.8"
python-version: "3.10"
- name: Install coverage
run: |
python -m pip install --upgrade setuptools pip wheel
Expand Down
36 changes: 34 additions & 2 deletions bigframes/core/compile/scalar_op_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -967,6 +967,17 @@ def normalize_op_impl(x: ibis_types.Value):
return result.cast(result_type)


# Geo Ops
@scalar_op_compiler.register_unary_op(ops.geo_x_op)
def geo_x_op_impl(x: ibis_types.Value):
return typing.cast(ibis_types.GeoSpatialValue, x).x()


@scalar_op_compiler.register_unary_op(ops.geo_y_op)
def geo_y_op_impl(x: ibis_types.Value):
return typing.cast(ibis_types.GeoSpatialValue, x).y()


# Parameterized ops
@scalar_op_compiler.register_unary_op(ops.StructFieldOp, pass_op=True)
def struct_field_op_impl(x: ibis_types.Value, op: ops.StructFieldOp):
Expand Down Expand Up @@ -1201,8 +1212,13 @@ def json_extract_string_array_op_impl(

# Blob Ops
@scalar_op_compiler.register_unary_op(ops.obj_fetch_metadata_op)
def obj_fetch_metadata_op_impl(x: ibis_types.Value):
return obj_fetch_metadata(obj_ref=x)
def obj_fetch_metadata_op_impl(obj_ref: ibis_types.Value):
return obj_fetch_metadata(obj_ref=obj_ref)


@scalar_op_compiler.register_unary_op(ops.ObjGetAccessUrl, pass_op=True)
def obj_get_access_url_op_impl(obj_ref: ibis_types.Value, op: ops.ObjGetAccessUrl):
return obj_get_access_url(obj_ref=obj_ref, mode=op.mode)


### Binary Ops
Expand Down Expand Up @@ -1724,6 +1740,12 @@ def binary_remote_function_op_impl(
return x_transformed


# Blob Ops
@scalar_op_compiler.register_binary_op(ops.obj_make_ref_op)
def obj_make_ref_op(x: ibis_types.Value, y: ibis_types.Value):
return obj_make_ref(uri=x, authorizer=y)


# Ternary Operations
@scalar_op_compiler.register_ternary_op(ops.where_op)
def where_op(
Expand Down Expand Up @@ -1895,3 +1917,13 @@ def vector_distance(vector1, vector2, type: str) -> ibis_dtypes.Float64: # type
@ibis_udf.scalar.builtin(name="OBJ.FETCH_METADATA")
def obj_fetch_metadata(obj_ref: _OBJ_REF_IBIS_DTYPE) -> _OBJ_REF_IBIS_DTYPE: # type: ignore
"""Fetch metadata from ObjectRef Struct."""


@ibis_udf.scalar.builtin(name="OBJ.MAKE_REF")
def obj_make_ref(uri: str, authorizer: str) -> _OBJ_REF_IBIS_DTYPE: # type: ignore
"""Make ObjectRef Struct from uri and connection."""


@ibis_udf.scalar.builtin(name="OBJ.GET_ACCESS_URL")
def obj_get_access_url(obj_ref: _OBJ_REF_IBIS_DTYPE, mode: ibis_dtypes.String) -> ibis_dtypes.JSON: # type: ignore
"""Get access url (as ObjectRefRumtime JSON) from ObjectRef."""
3 changes: 2 additions & 1 deletion bigframes/core/reshape/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
# limitations under the License.

from bigframes.core.reshape.concat import concat
from bigframes.core.reshape.encoding import get_dummies
from bigframes.core.reshape.merge import merge
from bigframes.core.reshape.tile import cut, qcut

__all__ = ["concat", "cut", "qcut", "merge"]
__all__ = ["concat", "get_dummies", "merge", "cut", "qcut"]
194 changes: 194 additions & 0 deletions bigframes/core/reshape/encoding.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import typing
from typing import Any, List, Optional, Tuple, Union

import bigframes_vendored.constants as constants
import bigframes_vendored.pandas.core.reshape.encoding as vendored_pandas_encoding
import pandas

from bigframes import operations
from bigframes.core import blocks, expression
from bigframes.dataframe import DataFrame
from bigframes.series import Series


def get_dummies(
data: Union[DataFrame, Series],
prefix: Union[List, dict, str, None] = None,
prefix_sep: Union[List, dict, str, None] = "_",
dummy_na: bool = False,
columns: Optional[List] = None,
drop_first: bool = False,
dtype: Any = None,
) -> DataFrame:
# simplify input parameters into per-input-label lists
# also raise errors for invalid parameters
column_labels, prefixes, prefix_seps = _standardize_get_dummies_params(
data, prefix, prefix_sep, columns, dtype
)

# combine prefixes into per-column-id list
full_columns_prefixes, columns_ids = _determine_get_dummies_columns_from_labels(
data, column_labels, prefix is not None, prefixes, prefix_seps
)

# run queries to compute unique values
block = data._block
max_unique_value = (
blocks._BQ_MAX_COLUMNS - len(block.value_columns) - len(block.index_columns) - 1
) // len(column_labels)
columns_values = [
block._get_unique_values([col_id], max_unique_value) for col_id in columns_ids
]

# for each dummified column, add the content of the output columns via block operations
intermediate_col_ids = []
for i in range(len(columns_values)):
level = columns_values[i].get_level_values(0).sort_values().dropna()
if drop_first:
level = level[1:]
column_label = full_columns_prefixes[i]
column_id = columns_ids[i]
block, new_intermediate_col_ids = _perform_get_dummies_block_operations(
block, level, column_label, column_id, dummy_na
)
intermediate_col_ids.extend(new_intermediate_col_ids)

# drop dummified columns (and the intermediate columns we added)
block = block.drop_columns(columns_ids + intermediate_col_ids)
return DataFrame(block)


get_dummies.__doc__ = vendored_pandas_encoding.get_dummies.__doc__


def _standardize_get_dummies_params(
data: Union[DataFrame, Series],
prefix: Union[List, dict, str, None],
prefix_sep: Union[List, dict, str, None],
columns: Optional[List],
dtype: Any,
) -> Tuple[List, List[str], List[str]]:
block = data._block

if isinstance(data, Series):
columns = [block.column_labels[0]]
if columns is not None and not pandas.api.types.is_list_like(columns):
raise TypeError("Input must be a list-like for parameter `columns`")
if dtype is not None and dtype not in [
pandas.BooleanDtype,
bool,
"Boolean",
"boolean",
"bool",
]:
raise NotImplementedError(
f"Only Boolean dtype is currently supported. {constants.FEEDBACK_LINK}"
)

if columns is None:
default_dummy_types = [pandas.StringDtype, "string[pyarrow]"]
columns = []
columns_set = set()
for col_id in block.value_columns:
label = block.col_id_to_label[col_id]
if (
label not in columns_set
and block.expr.get_column_type(col_id) in default_dummy_types
):
columns.append(label)
columns_set.add(label)

column_labels: List = typing.cast(List, columns)

def parse_prefix_kwarg(kwarg, kwarg_name) -> Optional[List[str]]:
if kwarg is None:
return None
if isinstance(kwarg, str):
return [kwarg] * len(column_labels)
if isinstance(kwarg, dict):
return [kwarg[column] for column in column_labels]
kwarg = typing.cast(List, kwarg)
if pandas.api.types.is_list_like(kwarg) and len(kwarg) != len(column_labels):
raise ValueError(
f"Length of '{kwarg_name}' ({len(kwarg)}) did not match "
f"the length of the columns being encoded ({len(column_labels)})."
)
if pandas.api.types.is_list_like(kwarg):
return list(map(str, kwarg))
raise TypeError(f"{kwarg_name} kwarg must be a string, list, or dictionary")

prefix_seps = parse_prefix_kwarg(prefix_sep or "_", "prefix_sep")
prefix_seps = typing.cast(List, prefix_seps)
prefixes = parse_prefix_kwarg(prefix, "prefix")
if prefixes is None:
prefixes = column_labels
prefixes = typing.cast(List, prefixes)

return column_labels, prefixes, prefix_seps


def _determine_get_dummies_columns_from_labels(
data: Union[DataFrame, Series],
column_labels: List,
prefix_given: bool,
prefixes: List[str],
prefix_seps: List[str],
) -> Tuple[List[str], List[str]]:
block = data._block

columns_ids = []
columns_prefixes = []
for i in range(len(column_labels)):
label = column_labels[i]
empty_prefix = label is None or (isinstance(data, Series) and not prefix_given)
full_prefix = "" if empty_prefix else prefixes[i] + prefix_seps[i]

for col_id in block.label_to_col_id[label]:
columns_ids.append(col_id)
columns_prefixes.append(full_prefix)

return columns_prefixes, columns_ids


def _perform_get_dummies_block_operations(
block: blocks.Block,
level: pandas.Index,
column_label: str,
column_id: str,
dummy_na: bool,
) -> Tuple[blocks.Block, List[str]]:
intermediate_col_ids = []
for value in level:
new_column_label = f"{column_label}{value}"
if column_label == "":
new_column_label = value
new_block, new_id = block.project_expr(
operations.eq_op.as_expr(column_id, expression.const(value))
)
intermediate_col_ids.append(new_id)
block, _ = new_block.project_expr(
operations.fillna_op.as_expr(new_id, expression.const(False)),
label=new_column_label,
)
if dummy_na:
# dummy column name for na depends on the dtype
na_string = str(pandas.Index([None], dtype=level.dtype)[0])
new_column_label = f"{column_label}{na_string}"
block, _ = block.apply_unary_op(
column_id, operations.isnull_op, result_label=new_column_label
)
return block, intermediate_col_ids
41 changes: 37 additions & 4 deletions bigframes/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,28 @@
GEO_DTYPE = gpd.array.GeometryDtype()
# JSON
JSON_DTYPE = pd.ArrowDtype(pa.large_string())
OBJ_REF_DTYPE = pd.ArrowDtype(
pa.struct(
(
pa.field(
"uri",
pa.string(),
),
pa.field(
"version",
pa.string(),
),
pa.field(
"authorizer",
pa.string(),
),
pa.field(
"details",
pa.large_string(), # JSON
),
)
)
)

# Used when storing Null expressions
DEFAULT_DTYPE = FLOAT_DTYPE
Expand Down Expand Up @@ -253,6 +275,10 @@ def is_time_like(type_: ExpressionType) -> bool:
return type_ in (DATETIME_DTYPE, TIMESTAMP_DTYPE, TIME_DTYPE)


def is_geo_like(type_: ExpressionType) -> bool:
return type_ in (GEO_DTYPE,)


def is_binary_like(type_: ExpressionType) -> bool:
return type_ in (BOOL_DTYPE, BYTES_DTYPE, INT_DTYPE)

Expand Down Expand Up @@ -380,12 +406,19 @@ def arrow_dtype_to_bigframes_dtype(arrow_dtype: pa.DataType) -> Dtype:
return pd.ArrowDtype(arrow_dtype)
if pa.types.is_struct(arrow_dtype):
return pd.ArrowDtype(arrow_dtype)

# BigFrames doesn't distinguish between string and large_string because the
# largest string (2 GB) is already larger than the largest BigQuery row.
if pa.types.is_string(arrow_dtype) or pa.types.is_large_string(arrow_dtype):
return STRING_DTYPE

if arrow_dtype == pa.null():
return DEFAULT_DTYPE
else:
raise ValueError(
f"Unexpected Arrow data type {arrow_dtype}. {constants.FEEDBACK_LINK}"
)

# No other types matched.
raise ValueError(
f"Unexpected Arrow data type {arrow_dtype}. {constants.FEEDBACK_LINK}"
)


_BIGFRAMES_TO_ARROW = {
Expand Down
13 changes: 13 additions & 0 deletions bigframes/geopandas/geoseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import bigframes_vendored.geopandas.geoseries as vendored_geoseries
import geopandas.array # type: ignore

import bigframes.operations as ops
import bigframes.series


Expand All @@ -26,3 +27,15 @@ def __init__(self, data=None, index=None, **kwargs):
super().__init__(
data=data, index=index, dtype=geopandas.array.GeometryDtype(), **kwargs
)

@property
def x(self) -> bigframes.series.Series:
series = self._apply_unary_op(ops.geo_x_op)
series.name = None
return series

@property
def y(self) -> bigframes.series.Series:
series = self._apply_unary_op(ops.geo_y_op)
series.name = None
return series
6 changes: 6 additions & 0 deletions bigframes/ml/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@ def predict(self, input_data: bpd.DataFrame) -> bpd.DataFrame:
self._model_manipulation_sql_generator.ml_predict,
)

def explain_predict(self, input_data: bpd.DataFrame) -> bpd.DataFrame:
return self._apply_ml_tvf(
input_data,
self._model_manipulation_sql_generator.ml_explain_predict,
)

def transform(self, input_data: bpd.DataFrame) -> bpd.DataFrame:
return self._apply_ml_tvf(
input_data,
Expand Down
Loading

0 comments on commit 477e50f

Please sign in to comment.