Skip to content

Commit

Permalink
chore(flink): update pytest markers and run linting
Browse files Browse the repository at this point in the history
  • Loading branch information
chloeh13q committed Mar 27, 2024
1 parent fffe268 commit 6a618c8
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 16 deletions.
3 changes: 2 additions & 1 deletion ibis/backends/flink/tests/test_memtable.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
def test_create_memtable(con, data, schema, expected):
t = ibis.memtable(data, schema=ibis.schema(schema))
# cannot use con.execute(t) directly because of some behavioral discrepancy between
# `TableEnvironment.execute_sql()` and `TableEnvironment.sql_query()`
# `TableEnvironment.execute_sql()` and `TableEnvironment.sql_query()`; this doesn't
# seem to be an issue if we don't execute memtable directly
result = con.raw_sql(con.compile(t))
# raw_sql() returns a `TableResult` object and doesn't natively convert to pandas
assert list(result.collect()) == expected
Expand Down
3 changes: 2 additions & 1 deletion ibis/backends/sql/dialects.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ class Generator(Hive.Generator):
sge.ArrayConcat: rename_func("array_concat"),
sge.ArraySize: rename_func("cardinality"),
sge.Length: rename_func("char_length"),
sge.TryCast: lambda self, e: f"TRY_CAST({e.this.sql(self.dialect)} AS {e.to.sql(self.dialect)})",
sge.TryCast: lambda self,
e: f"TRY_CAST({e.this.sql(self.dialect)} AS {e.to.sql(self.dialect)})",
sge.DayOfYear: rename_func("dayofyear"),
sge.DayOfWeek: rename_func("dayofweek"),
sge.DayOfMonth: rename_func("dayofmonth"),
Expand Down
46 changes: 36 additions & 10 deletions ibis/backends/tests/test_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -807,12 +807,6 @@ def test_array_intersect(con, data):
@pytest.mark.broken(
["trino"], reason="inserting maps into structs doesn't work", raises=TrinoUserError
)
@pytest.mark.broken(
["flink"],
raises=Py4JJavaError,
reason="flink throws exception on named struct with a single field",
# also cannot do array<struct<>> in flink; needs to be written as row<row<struct<>>>
)
def test_unnest_struct(con):
data = {"value": [[{"a": 1}, {"a": 2}], [{"a": 3}, {"a": 4}]]}
t = ibis.memtable(data, schema=ibis.schema({"value": "!array<!struct<a: !int>>"}))
Expand All @@ -824,6 +818,39 @@ def test_unnest_struct(con):
tm.assert_series_equal(result, expected)


@builtin_array
@pytest.mark.notimpl(
["clickhouse"],
raises=ClickHouseDatabaseError,
reason="ClickHouse won't accept dicts for struct type values",
)
@pytest.mark.notimpl(["postgres"], raises=PsycoPg2SyntaxError)
@pytest.mark.notimpl(["risingwave"], raises=PsycoPg2InternalError)
@pytest.mark.notimpl(["datafusion"], raises=com.OperationNotDefinedError)
@pytest.mark.broken(
["trino"], reason="inserting maps into structs doesn't work", raises=TrinoUserError
)
@pytest.mark.broken(
["flink"], reason="flink unnests a and b as separate columns", raises=Py4JJavaError
)
def test_unnest_struct_with_multiple_fields(con):
data = {
"value": [
[{"a": 1, "b": "banana"}, {"a": 2, "b": "apple"}],
[{"a": 3, "b": "coconut"}, {"a": 4, "b": "orange"}],
]
}
t = ibis.memtable(
data, schema=ibis.schema({"value": "!array<!struct<a: !int, b: !string>>"})
)
expr = t.value.unnest()

result = con.execute(expr)

expected = pd.DataFrame(data).explode("value").iloc[:, 0].reset_index(drop=True)
tm.assert_series_equal(result, expected)


array_zip_notimpl = pytest.mark.notimpl(
[
"dask",
Expand Down Expand Up @@ -909,11 +936,10 @@ def test_zip_null(con, fn):
@pytest.mark.broken(
["trino"], reason="inserting maps into structs doesn't work", raises=TrinoUserError
)
@pytest.mark.broken(
@pytest.mark.notyet(
["flink"],
raises=Py4JJavaError,
reason="flink throws exception on named struct with a single field",
# also cannot do array<struct<>> in flink; needs to be written as row<row<struct<>>>
reason="does not seem to support field selection on unnest",
)
def test_array_of_struct_unnest(con):
jobs = ibis.memtable(
Expand Down Expand Up @@ -1095,7 +1121,7 @@ def test_range_start_stop_step_zero(con, start, stop):
raises=com.OperationNotDefinedError,
reason="backend doesn't support unnest",
)
@pytest.mark.broken(
@pytest.mark.notyet(
["flink"],
raises=Py4JJavaError,
reason="SQL validation failed; Flink does not support ARRAY[]", # https://issues.apache.org/jira/browse/FLINK-20578
Expand Down
17 changes: 13 additions & 4 deletions ibis/formats/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import ibis.expr.datatypes as dt
import ibis.expr.schema as sch
from ibis import util
from ibis.common.numeric import normalize_decimal
from ibis.common.temporal import normalize_timezone
from ibis.formats import DataMapper, SchemaMapper, TableProxy
Expand Down Expand Up @@ -178,9 +179,13 @@ def convert_GeoSpatial(cls, s, dtype, pandas_type):
return gpd.GeoSeries(s)
return gpd.GeoSeries.from_wkb(s)

convert_Point = convert_LineString = convert_Polygon = convert_MultiLineString = (
convert_MultiPoint
) = convert_MultiPolygon = convert_GeoSpatial
convert_Point = (
convert_LineString
) = (
convert_Polygon
) = (
convert_MultiLineString
) = convert_MultiPoint = convert_MultiPolygon = convert_GeoSpatial

@classmethod
def convert_default(cls, s, dtype, pandas_type):
Expand Down Expand Up @@ -305,7 +310,11 @@ def convert(values, names=dtype.names, converters=converters):
if values is None:
return values

items = values.items() if isinstance(values, dict) else zip(names, values)
items = (
values.items()
if isinstance(values, dict)
else zip(names, util.promote_list(values))
)
return {
k: converter(v) if v is not None else v
for converter, (k, v) in zip(converters, items)
Expand Down

0 comments on commit 6a618c8

Please sign in to comment.