From e5b0d8f5e0355c15d7b77eca4601de9bce316e26 Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Mon, 29 Jan 2024 22:57:25 -0500 Subject: [PATCH 1/5] feat(risingwave): add streaming DDLs --- compose.yaml | 2 +- ibis/backends/risingwave/__init__.py | 377 +++++++++++++++++- .../risingwave/tests/test_streaming.py | 110 +++++ ibis/backends/tests/test_array.py | 4 + ibis/backends/tests/test_client.py | 25 +- ibis/backends/tests/test_join.py | 1 + ibis/backends/tests/test_temporal.py | 30 -- poetry.lock | 1 - 8 files changed, 493 insertions(+), 57 deletions(-) create mode 100644 ibis/backends/risingwave/tests/test_streaming.py diff --git a/compose.yaml b/compose.yaml index 0030add17c75..5720ecb577af 100644 --- a/compose.yaml +++ b/compose.yaml @@ -542,7 +542,7 @@ services: - impala risingwave: - image: ghcr.io/risingwavelabs/risingwave:nightly-20240122 + image: ghcr.io/risingwavelabs/risingwave:nightly-20240204 command: "standalone --meta-opts=\" \ --advertise-addr 0.0.0.0:5690 \ --backend mem \ diff --git a/ibis/backends/risingwave/__init__.py b/ibis/backends/risingwave/__init__.py index 9456b00688b9..a9855dafb6b9 100644 --- a/ibis/backends/risingwave/__init__.py +++ b/ibis/backends/risingwave/__init__.py @@ -25,6 +25,24 @@ import pyarrow as pa +def data_and_encode_format(data_format, encode_format, encode_properties): + res = "" + if data_format is not None: + res = res + " FORMAT " + data_format.upper() + if encode_format is not None: + res = res + " ENCODE " + encode_format.upper() + if encode_properties is not None: + res = res + " " + format_properties(encode_properties) + return res + + +def format_properties(props): + tokens = [] + for k, v in props.items(): + tokens.append(f"{k}='{v}'") + return "( {} ) ".format(", ".join(tokens)) + + class Backend(PostgresBackend): name = "risingwave" compiler = RisingwaveCompiler() @@ -110,6 +128,11 @@ def create_table( database: str | None = None, temp: bool = False, overwrite: bool = False, + # TODO(Kexiang): add `append only` + connector_properties: dict | None = None, + data_format: str | None = None, + encode_format: str | None = None, + encode_properties: dict | None = None, ): """Create a table in Risingwave. @@ -131,7 +154,20 @@ def create_table( overwrite If `True`, replace the table if it already exists, otherwise fail if the table exists - + connector_properties + The properties of the sink connector, providing the connector settings to push to the downstream data sink. + Refer https://docs.risingwave.com/docs/current/data-delivery/ for the required properties of different data sink. + data_format + The data format for the new source, e.g., "PLAIN". data_format and encode_format must be specified at the same time. + encode_format + The encode format for the new source, e.g., "JSON". data_format and encode_format must be specified at the same time. + encode_properties + The properties of encode format, providing information like schema registry url. Refer https://docs.risingwave.com/docs/current/sql-create-source/ for more details. + + Returns + ------- + Table + Table expression """ if obj is None and schema is None: raise ValueError("Either `obj` or `schema` must be specified") @@ -143,10 +179,19 @@ def create_table( else: database = None + if connector_properties is not None and ( + encode_format is None or data_format is None + ): + raise com.RelationError( + "When creating tables with connector, both encode_format and data_format are required" + ) + properties = [] if temp: - properties.append(sge.TemporaryProperty()) + raise com.UnsupportedOperationError( + f"Creating temp tables is not supported by {self.name}" + ) if obj is not None: if not isinstance(obj, ir.Expr): @@ -181,11 +226,23 @@ def create_table( table = sg.table(temp_name, catalog=database, quoted=self.compiler.quoted) target = sge.Schema(this=table, expressions=column_defs) - create_stmt = sge.Create( - kind="TABLE", - this=target, - properties=sge.Properties(expressions=properties), - ) + if connector_properties is None: + create_stmt = sge.Create( + kind="TABLE", + this=target, + properties=sge.Properties(expressions=properties), + ) + else: + create_stmt = sge.Create( + kind="TABLE", + this=target, + properties=sge.Properties( + expressions=sge.Properties.from_dict(connector_properties) + ), + ) + create_stmt = create_stmt.sql(self.dialect) + data_and_encode_format( + data_format, encode_format, encode_properties + ) this = sg.table(name, catalog=database, quoted=self.compiler.quoted) with self._safe_raw_sql(create_stmt) as cur: @@ -194,9 +251,7 @@ def create_table( cur.execute(insert_stmt) if overwrite: - cur.execute( - sge.Drop(kind="TABLE", this=this, exists=True).sql(self.dialect) - ) + self.drop_table(name, database=database, schema=schema, force=True) cur.execute( f"ALTER TABLE {table.sql(self.dialect)} RENAME TO {this.sql(self.dialect)}" ) @@ -268,3 +323,305 @@ def list_databases( databases = list(map(itemgetter(0), cur)) return self._filter_with_like(databases, like) + + def create_materialized_view( + self, + name: str, + obj: ir.Table, + *, + database: str | None = None, + schema: str | None = None, + overwrite: bool = False, + ) -> ir.Table: + """Creating a materialized view. The created materialized view can be accessed like a normal table. + + Parameters + ---------- + name + Materialized view name to Create. + obj + The select statement to materialize. + database + Name of the database where the view exists, if not the default. + schema + Name of the schema where the view exists, if not the default + overwrite + Whether to overwrite the existing materialized view with the same name + + Returns + ------- + Table + Table expression + """ + if database is not None and database != self.current_database: + raise com.UnsupportedOperationError( + f"Creating materialized views in other databases is not supported by {self.name}" + ) + else: + database = None + + if overwrite: + temp_name = util.gen_name(f"{self.name}_table") + else: + temp_name = name + + table = sg.table( + temp_name, db=schema, catalog=database, quoted=self.compiler.quoted + ) + + create_stmt = sge.Create( + this=table, + kind="MATERIALIZED VIEW", + expression=self.compile(obj), + ) + self._register_in_memory_tables(obj) + + this = sg.table(name, catalog=database, quoted=self.compiler.quoted) + with self._safe_raw_sql(create_stmt) as cur: + if overwrite: + self.drop_materialized_view( + name, database=database, schema=schema, force=True + ) + cur.execute( + f"ALTER MATERIALIZED VIEW {table.sql(self.dialect)} RENAME TO {this.sql(self.dialect)}" + ) + + return self.table(name, database=database) + + def drop_materialized_view( + self, + name: str, + *, + database: str | None = None, + schema: str | None = None, + force: bool = False, + ) -> None: + """Drop a materialized view. + + Parameters + ---------- + name + Materialized view name to drop. + database + Name of the database where the view exists, if not the default. + schema + Name of the schema where the view exists, if not the default. + force + If `False`, an exception is raised if the view does not exist. + """ + src = sge.Drop( + this=sg.table( + name, db=schema, catalog=database, quoted=self.compiler.quoted + ), + kind="MATERIALIZED VIEW", + exists=force, + ) + with self._safe_raw_sql(src): + pass + + def create_source( + self, + name: str, + schema: ibis.Schema, + *, + database: str | None = None, + connector_properties: dict, + data_format: str, + encode_format: str, + encode_properties: dict | None = None, + ) -> ir.Table: + """Creating a source. + + Parameters + ---------- + name + Source name to Create. + schema + The schema for the new Source. + database + Name of the database where the source exists, if not the default. + connector_properties + The properties of the source connector, providing the connector settings to access the upstream data source. + Refer https://docs.risingwave.com/docs/current/data-ingestion/ for the required properties of different data source. + data_format + The data format for the new source, e.g., "PLAIN". data_format and encode_format must be specified at the same time. + encode_format + The encode format for the new source, e.g., "JSON". data_format and encode_format must be specified at the same time. + encode_properties + The properties of encode format, providing information like schema registry url. Refer https://docs.risingwave.com/docs/current/sql-create-source/ for more details. + + Returns + ------- + Table + Table expression + """ + if database is not None and database != self.current_database: + raise com.UnsupportedOperationError( + f"Creating sources in other databases is not supported by {self.name}" + ) + else: + database = None + + column_defs = [ + sge.ColumnDef( + this=sg.to_identifier(colname, quoted=self.compiler.quoted), + kind=self.compiler.type_mapper.from_ibis(typ), + constraints=( + None + if typ.nullable + else [sge.ColumnConstraint(kind=sge.NotNullColumnConstraint())] + ), + ) + for colname, typ in schema.items() + ] + + table = sg.table(name, catalog=database, quoted=self.compiler.quoted) + target = sge.Schema(this=table, expressions=column_defs) + + create_stmt = sge.Create( + kind="SOURCE", + this=target, + properties=sge.Properties( + expressions=sge.Properties.from_dict(connector_properties) + ), + ) + + create_stmt = create_stmt.sql(self.dialect) + data_and_encode_format( + data_format, encode_format, encode_properties + ) + + with self._safe_raw_sql(create_stmt): + pass + + return self.table(name, database=database) + + def drop_source( + self, + name: str, + *, + database: str | None = None, + schema: str | None = None, + force: bool = False, + ) -> None: + """Drop a Source. + + Parameters + ---------- + name + Source name to drop. + database + Name of the database where the view exists, if not the default. + schema + Name of the schema where the view exists, if not the default. + force + If `False`, an exception is raised if the source does not exist. + """ + src = sge.Drop( + this=sg.table( + name, db=schema, catalog=database, quoted=self.compiler.quoted + ), + kind="SOURCE", + exists=force, + ) + with self._safe_raw_sql(src): + pass + + def create_sink( + self, + name: str, + sink_from: str | None = None, + connector_properties: dict | None = None, + *, + obj: ir.Table | None = None, + database: str | None = None, + schema: str | None = None, + data_format: str | None = None, + encode_format: str | None = None, + encode_properties: dict | None = None, + ) -> None: + """Creating a sink. + + Parameters + ---------- + name + Sink name to Create. + sink_from + The table or materialized view name to sink from. Only one of `sink_from` or `obj` can be + provided. + connector_properties + The properties of the sink connector, providing the connector settings to push to the downstream data sink. + Refer https://docs.risingwave.com/docs/current/data-delivery/ for the required properties of different data sink. + obj + An Ibis table expression or pandas table that will be used to extract the schema and the data of the new table. Only one of `sink_from` or `obj` can be provided. + database + Name of the database where the source exists, if not the default. + schema + Name of the schema where the view exists, if not the default. + data_format + The data format for the new source, e.g., "PLAIN". data_format and encode_format must be specified at the same time. + encode_format + The encode format for the new source, e.g., "JSON". data_format and encode_format must be specified at the same time. + encode_properties + The properties of encode format, providing information like schema registry url. Refer https://docs.risingwave.com/docs/current/sql-create-source/ for more details. + """ + if database is not None and database != self.current_database: + raise com.UnsupportedOperationError( + f"Creating sinks in other databases is not supported by {self.name}" + ) + else: + database = None + + table = sg.table(name, db=schema, catalog=database, quoted=self.compiler.quoted) + + if encode_format is None != data_format is None: + raise com.RelationError( + "When creating sinks, both encode_format and data_format must be provided, or neither should be" + ) + + if sink_from is not None: + create_stmt = f"CREATE SINK {table.sql(self.dialect)} FROM {sink_from}" + else: + create_stmt = sge.Create( + this=table, + kind="SINK", + expression=self.compile(obj), + ).sql(self.dialect) + create_stmt = ( + create_stmt + + " WITH " + + format_properties(connector_properties) + + data_and_encode_format(data_format, encode_format, encode_properties) + ) + with self._safe_raw_sql(create_stmt): + pass + + def drop_sink( + self, + name: str, + *, + database: str | None = None, + schema: str | None = None, + force: bool = False, + ) -> None: + """Drop a Sink. + + Parameters + ---------- + name + Sink name to drop. + database + Name of the database where the view exists, if not the default. + schema + Name of the schema where the view exists, if not the default. + force + If `False`, an exception is raised if the source does not exist. + """ + src = sge.Drop( + this=sg.table( + name, db=schema, catalog=database, quoted=self.compiler.quoted + ), + kind="SINK", + exists=force, + ) + with self._safe_raw_sql(src): + pass diff --git a/ibis/backends/risingwave/tests/test_streaming.py b/ibis/backends/risingwave/tests/test_streaming.py new file mode 100644 index 000000000000..344788c51e2f --- /dev/null +++ b/ibis/backends/risingwave/tests/test_streaming.py @@ -0,0 +1,110 @@ +from __future__ import annotations + +import time + +import pandas as pd +import pandas.testing as tm +import pytest + +import ibis +from ibis import util + + +@pytest.mark.parametrize( + "column", + ["string_col", "double_col", "date_string_col", "timestamp_col"], +) +def test_simple_mv(con, alltypes, column): + expr = alltypes[[column]].distinct().order_by(column) + mv_name = util.gen_name("alltypes_mv") + mv = con.create_materialized_view(mv_name, expr, overwrite=True) + expected = expr.limit(5).execute() + result = mv.order_by(column).limit(5).execute() + tm.assert_frame_equal(result, expected) + con.drop_materialized_view(mv_name) + + +def test_mv_on_simple_source(con): + sc_name = util.gen_name("simple_sc") + schema = ibis.schema([("v", "int32")]) + # use Risingwave's internal data generator to imitate a upstream data source + connector_properties = { + "connector": "datagen", + "fields.v.kind": "sequence", + "fields.v.start": "1", + "fields.v.end": "10", + "datagen.rows.per.second": "10000", + "datagen.split.num": "1", + } + source = con.create_source( + sc_name, + schema, + connector_properties=connector_properties, + data_format="PLAIN", + encode_format="JSON", + ) + expr = source["v"].sum() + mv_name = util.gen_name("simple_mv") + mv = con.create_materialized_view(mv_name, expr) + # sleep 3s to make sure the data has been generated by the source and consumed by the MV. + time.sleep(3) + result = mv.execute() + expected = pd.DataFrame({"Sum(v)": [55]}) + tm.assert_frame_equal(result, expected) + con.drop_materialized_view(mv_name) + con.drop_source(sc_name) + + +def test_mv_on_table_with_connector(con): + tblc_name = util.gen_name("simple_table_with_connector") + schema = ibis.schema([("v", "int32")]) + # use Risingwave's internal data generator to imitate a upstream data source + connector_properties = { + "connector": "datagen", + "fields.v.kind": "sequence", + "fields.v.start": "1", + "fields.v.end": "10", + "datagen.rows.per.second": "10000", + "datagen.split.num": "1", + } + tblc = con.create_table( + name=tblc_name, + obj=None, + schema=schema, + connector_properties=connector_properties, + data_format="PLAIN", + encode_format="JSON", + ) + expr = tblc["v"].sum() + mv_name = util.gen_name("simple_mv") + mv = con.create_materialized_view(mv_name, expr) + # sleep 1 s to make sure the data has been generated by the source and consumed by the MV. + time.sleep(1) + + result_tblc = expr.execute() + assert result_tblc == 55 + + result_mv = mv.execute() + expected = pd.DataFrame({"Sum(v)": [55]}) + tm.assert_frame_equal(result_mv, expected) + con.drop_materialized_view(mv_name) + con.drop_table(tblc_name) + + +def test_sink_from(con, alltypes): + sk_name = util.gen_name("sk_from") + connector_properties = { + "connector": "blackhole", + } + con.create_sink(sk_name, "functional_alltypes", connector_properties) + con.drop_sink(sk_name) + + +def test_sink_as_select(con, alltypes): + sk_name = util.gen_name("sk_as_select") + expr = alltypes[["string_col"]].distinct().order_by("string_col") + connector_properties = { + "connector": "blackhole", + } + con.create_sink(sk_name, None, connector_properties, obj=expr) + con.drop_sink(sk_name) diff --git a/ibis/backends/tests/test_array.py b/ibis/backends/tests/test_array.py index c9768eff3d72..21064d9723e1 100644 --- a/ibis/backends/tests/test_array.py +++ b/ibis/backends/tests/test_array.py @@ -1320,6 +1320,10 @@ def test_repr_timestamp_array(con, monkeypatch): @pytest.mark.broken( ["dask"], raises=AssertionError, reason="DataFrame.index are different" ) +@pytest.mark.notimpl( + ["risingwave"], + reason="Only table-in-out functions can have subquery parameters", +) def test_unnest_range(con): expr = ibis.range(2).unnest().name("x").as_table().mutate({"y": 1.0}) result = con.execute(expr) diff --git a/ibis/backends/tests/test_client.py b/ibis/backends/tests/test_client.py index cfb69d3f0736..79c84d7075af 100644 --- a/ibis/backends/tests/test_client.py +++ b/ibis/backends/tests/test_client.py @@ -302,10 +302,10 @@ def test_create_table_from_schema(con, new_schema, temp_table): reason="temporary tables not implemented", raises=NotImplementedError, ) -@pytest.mark.notyet( +@pytest.mark.never( ["risingwave"], - raises=PsycoPg2InternalError, - reason="truncate not supported upstream", + raises=com.UnsupportedOperationError, + reason="Feature is not yet implemented: CREATE TEMPORARY TABLE", ) @pytest.mark.notimpl( ["flink"], @@ -1205,7 +1205,7 @@ def test_create_table_timestamp(con, temp_table): @mark.notimpl(["exasol"], reason="Exasol does not support temporary tables") @pytest.mark.never( ["risingwave"], - raises=PsycoPg2InternalError, + raises=com.UnsupportedOperationError, reason="Feature is not yet implemented: CREATE TEMPORARY TABLE", ) def test_persist_expression_ref_count(backend, con, alltypes): @@ -1230,7 +1230,7 @@ def test_persist_expression_ref_count(backend, con, alltypes): @mark.notimpl(["exasol"], reason="Exasol does not support temporary tables") @pytest.mark.never( ["risingwave"], - raises=PsycoPg2InternalError, + raises=com.UnsupportedOperationError, reason="Feature is not yet implemented: CREATE TEMPORARY TABLE", ) def test_persist_expression(backend, alltypes): @@ -1249,7 +1249,7 @@ def test_persist_expression(backend, alltypes): @mark.notimpl(["exasol"], reason="Exasol does not support temporary tables") @pytest.mark.never( ["risingwave"], - raises=PsycoPg2InternalError, + raises=com.UnsupportedOperationError, reason="Feature is not yet implemented: CREATE TEMPORARY TABLE", ) def test_persist_expression_contextmanager(backend, alltypes): @@ -1270,7 +1270,7 @@ def test_persist_expression_contextmanager(backend, alltypes): @mark.notimpl(["exasol"], reason="Exasol does not support temporary tables") @pytest.mark.never( ["risingwave"], - raises=PsycoPg2InternalError, + raises=com.UnsupportedOperationError, reason="Feature is not yet implemented: CREATE TEMPORARY TABLE", ) def test_persist_expression_contextmanager_ref_count(backend, con, alltypes): @@ -1293,7 +1293,7 @@ def test_persist_expression_contextmanager_ref_count(backend, con, alltypes): ) @pytest.mark.never( ["risingwave"], - raises=PsycoPg2InternalError, + raises=com.UnsupportedOperationError, reason="Feature is not yet implemented: CREATE TEMPORARY TABLE", ) @mark.notimpl(["exasol"], reason="Exasol does not support temporary tables") @@ -1335,7 +1335,7 @@ def test_persist_expression_multiple_refs(backend, con, alltypes): @mark.notimpl(["exasol"], reason="Exasol does not support temporary tables") @pytest.mark.never( ["risingwave"], - raises=PsycoPg2InternalError, + raises=com.UnsupportedOperationError, reason="Feature is not yet implemented: CREATE TEMPORARY TABLE", ) def test_persist_expression_repeated_cache(alltypes): @@ -1355,18 +1355,13 @@ def test_persist_expression_repeated_cache(alltypes): @mark.notimpl(["exasol"], reason="Exasol does not support temporary tables") @pytest.mark.never( ["risingwave"], - raises=PsycoPg2InternalError, + raises=com.UnsupportedOperationError, reason="Feature is not yet implemented: CREATE TEMPORARY TABLE", ) @mark.notimpl( ["oracle"], reason="Oracle error message for a missing table/view doesn't include the name of the table", ) -@pytest.mark.never( - ["risingwave"], - raises=PsycoPg2InternalError, - reason="Feature is not yet implemented: CREATE TEMPORARY TABLE", -) def test_persist_expression_release(con, alltypes): non_cached_table = alltypes.mutate( test_column="calculation", other_column="big calc 3" diff --git a/ibis/backends/tests/test_join.py b/ibis/backends/tests/test_join.py index c4e9b82ed937..a02233275b88 100644 --- a/ibis/backends/tests/test_join.py +++ b/ibis/backends/tests/test_join.py @@ -175,6 +175,7 @@ def test_mutate_then_join_no_column_overlap(batting, awards_players): @pytest.mark.notimpl(["druid"]) @pytest.mark.notyet(["dask"], reason="dask doesn't support descending order by") @pytest.mark.notyet(["flink"], reason="Flink doesn't support semi joins") +@pytest.mark.skip("risingwave") # TODO(Kexiang): Risingwave's bug, investigating @pytest.mark.parametrize( "func", [ diff --git a/ibis/backends/tests/test_temporal.py b/ibis/backends/tests/test_temporal.py index 19341a8c9b07..e25e242f59ca 100644 --- a/ibis/backends/tests/test_temporal.py +++ b/ibis/backends/tests/test_temporal.py @@ -1597,11 +1597,6 @@ def test_today_from_projection(alltypes): @pytest.mark.notimpl( ["oracle"], raises=OracleDatabaseError, reason="ORA-00936 missing expression" ) -@pytest.mark.notimpl( - ["risingwave"], - raises=com.OperationNotDefinedError, - reason="function make_date(integer, integer, integer) does not exist", -) def test_date_literal(con, backend): expr = ibis.date(2022, 2, 4) result = con.execute(expr) @@ -1631,11 +1626,6 @@ def test_date_literal(con, backend): raises=com.OperationNotDefinedError, ) @pytest.mark.notyet(["impala"], raises=com.OperationNotDefinedError) -@pytest.mark.notimpl( - ["risingwave"], - raises=PsycoPg2InternalError, - reason="function make_timestamp(integer, integer, integer, integer, integer, integer) does not exist", -) def test_timestamp_literal(con, backend): expr = ibis.timestamp(2022, 2, 4, 16, 20, 0) result = con.execute(expr) @@ -1689,11 +1679,6 @@ def test_timestamp_literal(con, backend): ", , , )" ), ) -@pytest.mark.notimpl( - ["risingwave"], - raises=PsycoPg2InternalError, - reason="function make_timestamp(integer, integer, integer, integer, integer, integer) does not exist", -) def test_timestamp_with_timezone_literal(con, timezone, expected): expr = ibis.timestamp(2022, 2, 4, 16, 20, 0).cast(dt.Timestamp(timezone=timezone)) result = con.execute(expr) @@ -1722,11 +1707,6 @@ def test_timestamp_with_timezone_literal(con, timezone, expected): ["clickhouse", "impala", "exasol"], raises=com.OperationNotDefinedError ) @pytest.mark.notimpl(["druid"], raises=com.OperationNotDefinedError) -@pytest.mark.notimpl( - ["risingwave"], - raises=PsycoPg2InternalError, - reason="function make_time(integer, integer, integer) does not exist", -) def test_time_literal(con, backend): expr = ibis.time(16, 20, 0) result = con.execute(expr) @@ -1854,11 +1834,6 @@ def test_interval_literal(con, backend): @pytest.mark.broken( ["oracle"], raises=OracleDatabaseError, reason="ORA-00936: missing expression" ) -@pytest.mark.notimpl( - ["risingwave"], - raises=com.OperationNotDefinedError, - reason="function make_date(integer, integer, integer) does not exist", -) def test_date_column_from_ymd(backend, con, alltypes, df): c = alltypes.timestamp_col expr = ibis.date(c.year(), c.month(), c.day()) @@ -1879,11 +1854,6 @@ def test_date_column_from_ymd(backend, con, alltypes, df): reason="StringColumn' object has no attribute 'year'", ) @pytest.mark.notyet(["impala", "oracle"], raises=com.OperationNotDefinedError) -@pytest.mark.notimpl( - ["risingwave"], - raises=PsycoPg2InternalError, - reason="function make_timestamp(smallint, smallint, smallint, smallint, smallint, smallint) does not exist", -) def test_timestamp_column_from_ymdhms(backend, con, alltypes, df): c = alltypes.timestamp_col expr = ibis.timestamp( diff --git a/poetry.lock b/poetry.lock index 68da7983f736..c40bb1bfbb29 100644 --- a/poetry.lock +++ b/poetry.lock @@ -3420,7 +3420,6 @@ files = [ {file = "msgpack-1.0.8-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:5fbb160554e319f7b22ecf530a80a3ff496d38e8e07ae763b9e82fadfe96f273"}, {file = "msgpack-1.0.8-cp39-cp39-win32.whl", hash = "sha256:f9af38a89b6a5c04b7d18c492c8ccf2aee7048aff1ce8437c4683bb5a1df893d"}, {file = "msgpack-1.0.8-cp39-cp39-win_amd64.whl", hash = "sha256:ed59dd52075f8fc91da6053b12e8c89e37aa043f8986efd89e61fae69dc1b011"}, - {file = "msgpack-1.0.8-py3-none-any.whl", hash = "sha256:24f727df1e20b9876fa6e95f840a2a2651e34c0ad147676356f4bf5fbb0206ca"}, {file = "msgpack-1.0.8.tar.gz", hash = "sha256:95c02b0e27e706e48d0e5426d1710ca78e0f0628d6e89d5b5a5b91a5f12274f3"}, ] From 7c467751c16064965d8e41ad3a0c3aafa7cfe851 Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Fri, 29 Mar 2024 22:55:11 +0800 Subject: [PATCH 2/5] remove schema by Gil Forsyth --- ibis/backends/risingwave/__init__.py | 81 ++++++---------------------- ibis/backends/tests/test_array.py | 4 -- ibis/backends/tests/test_temporal.py | 8 ++- 3 files changed, 21 insertions(+), 72 deletions(-) diff --git a/ibis/backends/risingwave/__init__.py b/ibis/backends/risingwave/__init__.py index a9855dafb6b9..9b22d5d4bc97 100644 --- a/ibis/backends/risingwave/__init__.py +++ b/ibis/backends/risingwave/__init__.py @@ -172,13 +172,6 @@ def create_table( if obj is None and schema is None: raise ValueError("Either `obj` or `schema` must be specified") - if database is not None and database != self.current_database: - raise com.UnsupportedOperationError( - f"Creating tables in other databases is not supported by {self.name}" - ) - else: - database = None - if connector_properties is not None and ( encode_format is None or data_format is None ): @@ -223,7 +216,7 @@ def create_table( else: temp_name = name - table = sg.table(temp_name, catalog=database, quoted=self.compiler.quoted) + table = sg.table(temp_name, db=database, quoted=self.compiler.quoted) target = sge.Schema(this=table, expressions=column_defs) if connector_properties is None: @@ -244,14 +237,14 @@ def create_table( data_format, encode_format, encode_properties ) - this = sg.table(name, catalog=database, quoted=self.compiler.quoted) + this = sg.table(name, db=database, quoted=self.compiler.quoted) with self._safe_raw_sql(create_stmt) as cur: if query is not None: insert_stmt = sge.Insert(this=table, expression=query).sql(self.dialect) cur.execute(insert_stmt) if overwrite: - self.drop_table(name, database=database, schema=schema, force=True) + self.drop_table(name, database=database, force=True) cur.execute( f"ALTER TABLE {table.sql(self.dialect)} RENAME TO {this.sql(self.dialect)}" ) @@ -330,7 +323,6 @@ def create_materialized_view( obj: ir.Table, *, database: str | None = None, - schema: str | None = None, overwrite: bool = False, ) -> ir.Table: """Creating a materialized view. The created materialized view can be accessed like a normal table. @@ -342,9 +334,7 @@ def create_materialized_view( obj The select statement to materialize. database - Name of the database where the view exists, if not the default. - schema - Name of the schema where the view exists, if not the default + Name of the database where the view exists, if not the default overwrite Whether to overwrite the existing materialized view with the same name @@ -353,21 +343,12 @@ def create_materialized_view( Table Table expression """ - if database is not None and database != self.current_database: - raise com.UnsupportedOperationError( - f"Creating materialized views in other databases is not supported by {self.name}" - ) - else: - database = None - if overwrite: temp_name = util.gen_name(f"{self.name}_table") else: temp_name = name - table = sg.table( - temp_name, db=schema, catalog=database, quoted=self.compiler.quoted - ) + table = sg.table(temp_name, db=database, quoted=self.compiler.quoted) create_stmt = sge.Create( this=table, @@ -376,14 +357,14 @@ def create_materialized_view( ) self._register_in_memory_tables(obj) - this = sg.table(name, catalog=database, quoted=self.compiler.quoted) with self._safe_raw_sql(create_stmt) as cur: if overwrite: - self.drop_materialized_view( - name, database=database, schema=schema, force=True - ) + target = sg.table(name, db=database).sql(self.dialect) + + self.drop_materialized_view(target, database=database, force=True) + cur.execute( - f"ALTER MATERIALIZED VIEW {table.sql(self.dialect)} RENAME TO {this.sql(self.dialect)}" + f"ALTER MATERIALIZED VIEW {table.sql(self.dialect)} RENAME TO {target}" ) return self.table(name, database=database) @@ -393,7 +374,6 @@ def drop_materialized_view( name: str, *, database: str | None = None, - schema: str | None = None, force: bool = False, ) -> None: """Drop a materialized view. @@ -404,15 +384,11 @@ def drop_materialized_view( Materialized view name to drop. database Name of the database where the view exists, if not the default. - schema - Name of the schema where the view exists, if not the default. force If `False`, an exception is raised if the view does not exist. """ src = sge.Drop( - this=sg.table( - name, db=schema, catalog=database, quoted=self.compiler.quoted - ), + this=sg.table(name, db=database, quoted=self.compiler.quoted), kind="MATERIALIZED VIEW", exists=force, ) @@ -455,13 +431,6 @@ def create_source( Table Table expression """ - if database is not None and database != self.current_database: - raise com.UnsupportedOperationError( - f"Creating sources in other databases is not supported by {self.name}" - ) - else: - database = None - column_defs = [ sge.ColumnDef( this=sg.to_identifier(colname, quoted=self.compiler.quoted), @@ -475,7 +444,7 @@ def create_source( for colname, typ in schema.items() ] - table = sg.table(name, catalog=database, quoted=self.compiler.quoted) + table = sg.table(name, db=database, quoted=self.compiler.quoted) target = sge.Schema(this=table, expressions=column_defs) create_stmt = sge.Create( @@ -500,7 +469,6 @@ def drop_source( name: str, *, database: str | None = None, - schema: str | None = None, force: bool = False, ) -> None: """Drop a Source. @@ -511,15 +479,11 @@ def drop_source( Source name to drop. database Name of the database where the view exists, if not the default. - schema - Name of the schema where the view exists, if not the default. force If `False`, an exception is raised if the source does not exist. """ src = sge.Drop( - this=sg.table( - name, db=schema, catalog=database, quoted=self.compiler.quoted - ), + this=sg.table(name, db=database, quoted=self.compiler.quoted), kind="SOURCE", exists=force, ) @@ -534,7 +498,6 @@ def create_sink( *, obj: ir.Table | None = None, database: str | None = None, - schema: str | None = None, data_format: str | None = None, encode_format: str | None = None, encode_properties: dict | None = None, @@ -555,8 +518,6 @@ def create_sink( An Ibis table expression or pandas table that will be used to extract the schema and the data of the new table. Only one of `sink_from` or `obj` can be provided. database Name of the database where the source exists, if not the default. - schema - Name of the schema where the view exists, if not the default. data_format The data format for the new source, e.g., "PLAIN". data_format and encode_format must be specified at the same time. encode_format @@ -564,14 +525,7 @@ def create_sink( encode_properties The properties of encode format, providing information like schema registry url. Refer https://docs.risingwave.com/docs/current/sql-create-source/ for more details. """ - if database is not None and database != self.current_database: - raise com.UnsupportedOperationError( - f"Creating sinks in other databases is not supported by {self.name}" - ) - else: - database = None - - table = sg.table(name, db=schema, catalog=database, quoted=self.compiler.quoted) + table = sg.table(name, db=database, quoted=self.compiler.quoted) if encode_format is None != data_format is None: raise com.RelationError( @@ -600,7 +554,6 @@ def drop_sink( name: str, *, database: str | None = None, - schema: str | None = None, force: bool = False, ) -> None: """Drop a Sink. @@ -611,15 +564,11 @@ def drop_sink( Sink name to drop. database Name of the database where the view exists, if not the default. - schema - Name of the schema where the view exists, if not the default. force If `False`, an exception is raised if the source does not exist. """ src = sge.Drop( - this=sg.table( - name, db=schema, catalog=database, quoted=self.compiler.quoted - ), + this=sg.table(name, db=database, quoted=self.compiler.quoted), kind="SINK", exists=force, ) diff --git a/ibis/backends/tests/test_array.py b/ibis/backends/tests/test_array.py index 21064d9723e1..c9768eff3d72 100644 --- a/ibis/backends/tests/test_array.py +++ b/ibis/backends/tests/test_array.py @@ -1320,10 +1320,6 @@ def test_repr_timestamp_array(con, monkeypatch): @pytest.mark.broken( ["dask"], raises=AssertionError, reason="DataFrame.index are different" ) -@pytest.mark.notimpl( - ["risingwave"], - reason="Only table-in-out functions can have subquery parameters", -) def test_unnest_range(con): expr = ibis.range(2).unnest().name("x").as_table().mutate({"y": 1.0}) result = con.execute(expr) diff --git a/ibis/backends/tests/test_temporal.py b/ibis/backends/tests/test_temporal.py index e25e242f59ca..011adadf568a 100644 --- a/ibis/backends/tests/test_temporal.py +++ b/ibis/backends/tests/test_temporal.py @@ -1590,7 +1590,9 @@ def test_today_from_projection(alltypes): } -@pytest.mark.notimpl(["pandas", "dask", "exasol"], raises=com.OperationNotDefinedError) +@pytest.mark.notimpl( + ["pandas", "dask", "exasol", "risingwave"], raises=com.OperationNotDefinedError +) @pytest.mark.notimpl( ["druid"], raises=PyDruidProgrammingError, reason="SQL parse failed" ) @@ -1825,7 +1827,9 @@ def test_interval_literal(con, backend): assert con.execute(expr.typeof()) == INTERVAL_BACKEND_TYPES[backend_name] -@pytest.mark.notimpl(["pandas", "dask", "exasol"], raises=com.OperationNotDefinedError) +@pytest.mark.notimpl( + ["pandas", "dask", "exasol", "risingwave"], raises=com.OperationNotDefinedError +) @pytest.mark.broken( ["druid"], raises=AttributeError, From 83cd36bc60b1424453572a30321e92720ce12ec5 Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Tue, 2 Apr 2024 16:24:05 +0800 Subject: [PATCH 3/5] add experimental --- ibis/backends/risingwave/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ibis/backends/risingwave/__init__.py b/ibis/backends/risingwave/__init__.py index 9b22d5d4bc97..0d90840f5aaf 100644 --- a/ibis/backends/risingwave/__init__.py +++ b/ibis/backends/risingwave/__init__.py @@ -19,6 +19,7 @@ from ibis import util from ibis.backends.postgres import Backend as PostgresBackend from ibis.backends.risingwave.compiler import RisingwaveCompiler +from ibis.util import experimental if TYPE_CHECKING: import pandas as pd @@ -317,6 +318,7 @@ def list_databases( return self._filter_with_like(databases, like) + @experimental def create_materialized_view( self, name: str, From b1a0836f128f0b6a64cad22a9204f49c88328334 Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Wed, 3 Apr 2024 23:39:38 +0800 Subject: [PATCH 4/5] fix --- ibis/backends/risingwave/__init__.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/ibis/backends/risingwave/__init__.py b/ibis/backends/risingwave/__init__.py index 0d90840f5aaf..ba94f136fc4f 100644 --- a/ibis/backends/risingwave/__init__.py +++ b/ibis/backends/risingwave/__init__.py @@ -176,7 +176,7 @@ def create_table( if connector_properties is not None and ( encode_format is None or data_format is None ): - raise com.RelationError( + raise com.UnsupportedOperationError( "When creating tables with connector, both encode_format and data_format are required" ) @@ -327,7 +327,7 @@ def create_materialized_view( database: str | None = None, overwrite: bool = False, ) -> ir.Table: - """Creating a materialized view. The created materialized view can be accessed like a normal table. + """Create a materialized view. Materialized views can be accessed like a normal table. Parameters ---------- @@ -517,7 +517,7 @@ def create_sink( The properties of the sink connector, providing the connector settings to push to the downstream data sink. Refer https://docs.risingwave.com/docs/current/data-delivery/ for the required properties of different data sink. obj - An Ibis table expression or pandas table that will be used to extract the schema and the data of the new table. Only one of `sink_from` or `obj` can be provided. + An Ibis table expression that will be used to extract the schema and the data of the new table. Only one of `sink_from` or `obj` can be provided. database Name of the database where the source exists, if not the default. data_format @@ -528,9 +528,13 @@ def create_sink( The properties of encode format, providing information like schema registry url. Refer https://docs.risingwave.com/docs/current/sql-create-source/ for more details. """ table = sg.table(name, db=database, quoted=self.compiler.quoted) + if sink_from is None and obj is None: + raise ValueError("Either `sink_from` or `obj` must be specified") + if sink_from is not None and obj is not None: + raise ValueError("Only one of `sink_from` or `obj` should be specified") - if encode_format is None != data_format is None: - raise com.RelationError( + if (encode_format is None) != (data_format is None): + raise com.UnsupportedArgumentError( "When creating sinks, both encode_format and data_format must be provided, or neither should be" ) From a165546f4001e249822389d3b09d01791352242e Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Wed, 3 Apr 2024 23:43:29 +0800 Subject: [PATCH 5/5] fix discussions --- ibis/backends/risingwave/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ibis/backends/risingwave/__init__.py b/ibis/backends/risingwave/__init__.py index ba94f136fc4f..badce2233664 100644 --- a/ibis/backends/risingwave/__init__.py +++ b/ibis/backends/risingwave/__init__.py @@ -531,7 +531,7 @@ def create_sink( if sink_from is None and obj is None: raise ValueError("Either `sink_from` or `obj` must be specified") if sink_from is not None and obj is not None: - raise ValueError("Only one of `sink_from` or `obj` should be specified") + raise ValueError("Only one of `sink_from` or `obj` can be specified") if (encode_format is None) != (data_format is None): raise com.UnsupportedArgumentError(