Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: json in struct destination type #1187

Merged
merged 16 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bigframes/bigquery/_operations/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def json_set(
>>> s = bpd.read_gbq("SELECT JSON '{\\\"a\\\": 1}' AS data")["data"]
>>> bbq.json_set(s, json_path_value_pairs=[("$.a", 100), ("$.b", "hi")])
0 {"a":100,"b":"hi"}
Name: data, dtype: string
Name: data, dtype: large_string[pyarrow]
Args:
input (bigframes.series.Series):
Expand Down
2 changes: 1 addition & 1 deletion bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def from_table(
raise ValueError("must set at most one of 'offests', 'primary_key'")
if any(i.field_type == "JSON" for i in table.schema if i.name in schema.names):
warnings.warn(
"Interpreting JSON column(s) as StringDtype. This behavior may change in future versions.",
"Interpreting JSON column(s) as StringDtype and pyarrow.large_string. This behavior may change in future versions.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we just say "... as pyarrow.large_string"? StringDtype here is not relevant anymore, no?

bigframes.exceptions.PreviewWarning,
)
# define data source only for needed columns, this makes row-hashing cheaper
Expand Down
4 changes: 2 additions & 2 deletions bigframes/core/compile/compiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ def _set_or_replace_by_id(
builder.columns = [*self.columns, new_value.name(id)]
return builder.build()

def _select(self, values: typing.Tuple[ibis_types.Value]) -> UnorderedIR:
def _select(self, values: typing.Tuple[ibis_types.Value, ...]) -> UnorderedIR:
builder = self.builder()
builder.columns = values
return builder.build()
Expand Down Expand Up @@ -1129,7 +1129,7 @@ def _set_or_replace_by_id(self, id: str, new_value: ibis_types.Value) -> Ordered
builder.columns = [*self.columns, new_value.name(id)]
return builder.build()

def _select(self, values: typing.Tuple[ibis_types.Value]) -> OrderedIR:
def _select(self, values: typing.Tuple[ibis_types.Value, ...]) -> OrderedIR:
"""Safely assign by id while maintaining ordering integrity."""
# TODO: Split into explicit set and replace methods
ordering_col_ids = set(
Expand Down
8 changes: 4 additions & 4 deletions bigframes/core/compile/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ def compile_sql(
node = self.set_output_names(node, output_ids)
if ordered:
node, limit = rewrites.pullup_limit_from_slice(node)
return self.compile_ordered_ir(self._preprocess(node)).to_sql(
ordered=True, limit=limit
)
ir = self.compile_ordered_ir(self._preprocess(node))
return ir.to_sql(ordered=True, limit=limit)
else:
return self.compile_unordered_ir(self._preprocess(node)).to_sql()
ir = self.compile_unordered_ir(self._preprocess(node)) # type: ignore
return ir.to_sql()

def compile_peek_sql(self, node: nodes.BigFrameNode, n_rows: int) -> str:
return self.compile_unordered_ir(self._preprocess(node)).peek_sql(n_rows)
Expand Down
11 changes: 3 additions & 8 deletions bigframes/core/compile/ibis_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from bigframes_vendored.ibis.expr.datatypes.core import (
dtype as python_type_to_bigquery_type,
)
import bigframes_vendored.ibis.expr.operations as ibis_ops
import bigframes_vendored.ibis.expr.types as ibis_types
import geopandas as gpd # type: ignore
import google.cloud.bigquery as bigquery
Expand All @@ -46,6 +45,7 @@
ibis_dtypes.Binary,
ibis_dtypes.Decimal,
ibis_dtypes.GeoSpatial,
ibis_dtypes.JSON,
]


Expand Down Expand Up @@ -74,6 +74,7 @@
ibis_dtypes.GeoSpatial(geotype="geography", srid=4326, nullable=True),
gpd.array.GeometryDtype(),
),
(ibis_dtypes.json, pd.ArrowDtype(pa.large_string())),
)

BIGFRAMES_TO_IBIS: Dict[bigframes.dtypes.Dtype, ibis_dtypes.DataType] = {
Expand Down Expand Up @@ -219,12 +220,6 @@ def ibis_value_to_canonical_type(value: ibis_types.Value) -> ibis_types.Value:
"""
ibis_type = value.type()
name = value.get_name()
if ibis_type.is_json():
value = ibis_ops.ToJsonString(value).to_expr() # type: ignore
value = (
value.case().when("null", bigframes_vendored.ibis.null()).else_(value).end()
)
return value.name(name)
# Allow REQUIRED fields to be joined with NULLABLE fields.
nullable_type = ibis_type.copy(nullable=True)
return value.cast(nullable_type).name(name)
Expand Down Expand Up @@ -314,7 +309,7 @@ def ibis_dtype_to_bigframes_dtype(
"Interpreting JSON as string. This behavior may change in future versions.",
bigframes.exceptions.PreviewWarning,
)
return bigframes.dtypes.STRING_DTYPE
return bigframes.dtypes.JSON_DTYPE

if ibis_dtype in IBIS_TO_BIGFRAMES:
return IBIS_TO_BIGFRAMES[ibis_dtype]
Expand Down
16 changes: 13 additions & 3 deletions bigframes/core/compile/scalar_op_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1181,7 +1181,10 @@ def json_set_op_impl(x: ibis_types.Value, y: ibis_types.Value, op: ops.JSONSet):

@scalar_op_compiler.register_unary_op(ops.JSONExtract, pass_op=True)
def json_extract_op_impl(x: ibis_types.Value, op: ops.JSONExtract):
return json_extract(json_obj=x, json_path=op.json_path)
if x.type().is_json():
return json_extract(json_obj=x, json_path=op.json_path)
# json string
return json_extract_string(json_obj=x, json_path=op.json_path)


@scalar_op_compiler.register_unary_op(ops.JSONExtractArray, pass_op=True)
Expand Down Expand Up @@ -1845,7 +1848,7 @@ def float_ceil(a: float) -> float:


@ibis_udf.scalar.builtin(name="parse_json")
def parse_json(a: str) -> ibis_dtypes.JSON: # type: ignore[empty-body]
def parse_json(json_str: str) -> ibis_dtypes.JSON: # type: ignore[empty-body]
"""Converts a JSON-formatted STRING value to a JSON value."""


Expand All @@ -1860,7 +1863,14 @@ def json_set( # type: ignore[empty-body]
def json_extract( # type: ignore[empty-body]
json_obj: ibis_dtypes.JSON, json_path: ibis_dtypes.String
) -> ibis_dtypes.JSON:
"""Extracts a JSON value and converts it to a SQL JSON-formatted STRING or JSON value."""
"""Extracts a JSON value and converts it to a JSON value."""


@ibis_udf.scalar.builtin(name="json_extract")
def json_extract_string( # type: ignore[empty-body]
json_obj: ibis_dtypes.String, json_path: ibis_dtypes.String
) -> ibis_dtypes.String:
"""Extracts a JSON SRING value and converts it to a SQL JSON-formatted STRING."""


@ibis_udf.scalar.builtin(name="json_extract_array")
Expand Down
28 changes: 16 additions & 12 deletions bigframes/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
BIGNUMERIC_DTYPE = pd.ArrowDtype(pa.decimal256(76, 38))
# No arrow equivalent
GEO_DTYPE = gpd.array.GeometryDtype()
# JSON
JSON_DTYPE = pd.ArrowDtype(pa.large_string())

# Used when storing Null expressions
DEFAULT_DTYPE = FLOAT_DTYPE
Expand Down Expand Up @@ -132,6 +134,13 @@ class SimpleDtypeInfo:
orderable=True,
clusterable=True,
),
SimpleDtypeInfo(
dtype=JSON_DTYPE,
arrow_dtype=pa.large_string(),
type_kind=("JSON",),
orderable=False,
clusterable=False,
),
SimpleDtypeInfo(
dtype=DATE_DTYPE,
arrow_dtype=pa.date32(),
Expand Down Expand Up @@ -281,7 +290,7 @@ def is_struct_like(type_: ExpressionType) -> bool:

def is_json_like(type_: ExpressionType) -> bool:
# TODO: Add JSON type support
return type_ == STRING_DTYPE
return type_ == JSON_DTYPE or type_ == STRING_DTYPE # Including JSON string


def is_json_encoding_type(type_: ExpressionType) -> bool:
Expand Down Expand Up @@ -455,8 +464,6 @@ def infer_literal_arrow_type(literal) -> typing.Optional[pa.DataType]:
return bigframes_dtype_to_arrow_dtype(infer_literal_type(literal))


# Don't have dtype for json, so just end up interpreting as STRING
_REMAPPED_TYPEKINDS = {"JSON": "STRING"}
_TK_TO_BIGFRAMES = {
type_kind: mapping.dtype
for mapping in SIMPLE_TYPES
Expand All @@ -480,16 +487,13 @@ def convert_schema_field(
pa_struct = pa.struct(fields)
pa_type = pa.list_(pa_struct) if is_repeated else pa_struct
return field.name, pd.ArrowDtype(pa_type)
elif (
field.field_type in _TK_TO_BIGFRAMES or field.field_type in _REMAPPED_TYPEKINDS
):
singular_type = _TK_TO_BIGFRAMES[
_REMAPPED_TYPEKINDS.get(field.field_type, field.field_type)
]
elif field.field_type in _TK_TO_BIGFRAMES:
if is_repeated:
pa_type = pa.list_(bigframes_dtype_to_arrow_dtype(singular_type))
pa_type = pa.list_(
bigframes_dtype_to_arrow_dtype(_TK_TO_BIGFRAMES[field.field_type])
)
return field.name, pd.ArrowDtype(pa_type)
return field.name, singular_type
return field.name, _TK_TO_BIGFRAMES[field.field_type]
else:
raise ValueError(f"Cannot handle type: {field.field_type}")

Expand Down Expand Up @@ -639,7 +643,7 @@ def can_coerce(source_type: ExpressionType, target_type: ExpressionType) -> bool
return True # None can be coerced to any supported type
else:
return (source_type == STRING_DTYPE) and (
target_type in TEMPORAL_BIGFRAMES_TYPES
target_type in TEMPORAL_BIGFRAMES_TYPES + [JSON_DTYPE]
)


Expand Down
6 changes: 4 additions & 2 deletions bigframes/session/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ def peek(
def head(
self, array_value: bigframes.core.ArrayValue, n_rows: int
) -> ExecuteResult:

maybe_row_count = self._local_get_row_count(array_value)
if (maybe_row_count is not None) and (maybe_row_count <= n_rows):
return self.execute(array_value, ordered=True)
Expand Down Expand Up @@ -452,7 +453,7 @@ def cached(
# use a heuristic for whether something needs to be cached
if (not force) and self._is_trivially_executable(array_value):
return
elif use_session:
if use_session:
self._cache_with_session_awareness(array_value)
else:
self._cache_with_cluster_cols(array_value, cluster_cols=cluster_cols)
Expand Down Expand Up @@ -656,7 +657,7 @@ def _sql_as_cached_temp_table(
def _validate_result_schema(
self,
array_value: bigframes.core.ArrayValue,
bq_schema: list[bigquery.schema.SchemaField],
bq_schema: list[bigquery.SchemaField],
):
actual_schema = tuple(bq_schema)
ibis_schema = bigframes.core.compile.test_only_ibis_inferred_schema(
Expand All @@ -665,6 +666,7 @@ def _validate_result_schema(
internal_schema = array_value.schema
if not bigframes.features.PANDAS_VERSIONS.is_arrow_list_dtype_usable:
return

if internal_schema.to_bigquery() != actual_schema:
raise ValueError(
f"This error should only occur while testing. BigFrames internal schema: {internal_schema.to_bigquery()} does not match actual schema: {actual_schema}"
Expand Down
9 changes: 9 additions & 0 deletions tests/system/small/bigquery/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ def test_json_extract_from_string():
actual.to_pandas(),
expected.to_pandas(),
check_names=False,
check_dtype=False, # json_extract returns string type. While _get_series_from_json gives a JSON series (pa.large_string).
)


Expand Down Expand Up @@ -200,3 +201,11 @@ def test_json_extract_string_array_as_float_array_from_array_strings():
def test_json_extract_string_array_w_invalid_series_type():
with pytest.raises(TypeError):
bbq.json_extract_string_array(bpd.Series([1, 2]))


# b/381148539
def test_json_in_struct():
df = bpd.read_gbq(
"SELECT STRUCT(JSON '{\\\"a\\\": 1}' AS data, 1 AS number) as struct_col"
)
assert df["struct_col"].struct.field("data")[0] == '{"a":1}'
2 changes: 1 addition & 1 deletion tests/system/small/test_dataframe_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ def test_load_json(session):
{
"json_column": ['{"bar":true,"foo":10}'],
},
dtype=pd.StringDtype(storage="pyarrow"),
dtype=pd.ArrowDtype(pa.large_string()),
)
expected.index = expected.index.astype("Int64")
pd.testing.assert_series_equal(result.dtypes, expected.dtypes)
Expand Down
2 changes: 1 addition & 1 deletion tests/system/small/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ def test_get_column(scalars_dfs, col_name, expected_dtype):
def test_get_column_w_json(json_df, json_pandas_df):
series = json_df["json_col"]
series_pandas = series.to_pandas()
assert series.dtype == pd.StringDtype(storage="pyarrow")
assert series.dtype == pd.ArrowDtype(pa.large_string())
assert series_pandas.shape[0] == json_pandas_df.shape[0]


Expand Down
Loading