From eaaebb8a323075a5ccf74539ec8c23c8ed758284 Mon Sep 17 00:00:00 2001 From: Chloe He Date: Tue, 12 Sep 2023 14:02:33 -0700 Subject: [PATCH] feat: introduce watermarks in ibis api --- ibis/backends/flink/__init__.py | 5 ++ ibis/backends/flink/ddl.py | 66 +++++++++++++++++++++- ibis/backends/flink/tests/test_ddl.py | 61 ++++++++++++++++---- ibis/backends/flink/tests/test_literals.py | 1 + ibis/backends/flink/utils.py | 3 +- ibis/expr/api.py | 24 +++++++- ibis/expr/streaming.py | 9 +++ 7 files changed, 152 insertions(+), 17 deletions(-) create mode 100644 ibis/expr/streaming.py diff --git a/ibis/backends/flink/__init__.py b/ibis/backends/flink/__init__.py index e70f410e020b..a40693c09017 100644 --- a/ibis/backends/flink/__init__.py +++ b/ibis/backends/flink/__init__.py @@ -28,6 +28,7 @@ from pyflink.table import TableEnvironment import ibis.expr.types as ir + from ibis.expr.streaming import Watermark class Backend(BaseBackend, CanCreateDatabase): @@ -255,6 +256,7 @@ def create_table( database: str | None = None, catalog: str | None = None, tbl_properties: dict | None = None, + watermark: Watermark | None = None, temp: bool = False, overwrite: bool = False, ) -> ir.Table: @@ -289,6 +291,8 @@ def create_table( Table properties used to create a table source/sink. The properties are usually used to find and create the underlying connector. Accepts dictionary of key-value pairs (key1=val1, key2=val2, ...). + watermark + Watermark strategy for the table, only applicable on sources. temp Whether a table is temporary or not overwrite @@ -331,6 +335,7 @@ def create_table( table_name=name, schema=schema, tbl_properties=tbl_properties, + watermark=watermark, temp=temp, database=database, catalog=catalog, diff --git a/ibis/backends/flink/ddl.py b/ibis/backends/flink/ddl.py index 69f02adb5d12..30c5a5bf5044 100644 --- a/ibis/backends/flink/ddl.py +++ b/ibis/backends/flink/ddl.py @@ -4,6 +4,7 @@ import sqlglot as sg +import ibis.expr.schema as sch from ibis.backends.base.sql.ddl import ( CreateTableWithSchema, DropObject, @@ -12,10 +13,48 @@ _is_quoted, is_fully_qualified, ) -from ibis.backends.base.sql.registry import quote_identifier +from ibis.backends.base.sql.registry import quote_identifier, type_to_sql_string +from ibis.backends.flink.utils import translate_literal if TYPE_CHECKING: - import ibis.expr.schema as sch + from ibis.expr.streaming import Watermark + + +def format_schema(schema): + elements = [ + _format_schema_element(name, t) for name, t in zip(schema.names, schema.types) + ] + return "({})".format(",\n ".join(elements)) + + +def _format_schema_element(name, t): + return f"{quote_identifier(name, force=True)} {type_to_flink_sql_string(t)}" + + +def type_to_flink_sql_string(tval): + if tval.is_timestamp(): + return f"timestamp({tval.scale})" + else: + return type_to_sql_string(tval) + + +def _format_watermark_strategy(watermark: Watermark) -> str: + if watermark.allowed_delay is None: + return watermark.time_col + return f"{watermark.time_col} - {translate_literal(watermark.allowed_delay.op())}" + + +def format_schema_with_watermark( + schema: sch.Schema, watermark: Watermark | None = None +) -> str: + elements = [ + _format_schema_element(name, t) for name, t in zip(schema.names, schema.types) + ] + if watermark is not None: + elements.append( + f"WATERMARK FOR {watermark.time_col} AS {_format_watermark_strategy(watermark)}" + ) + return "({})".format(",\n ".join(elements)) class _CatalogAwareBaseQualifiedSQLStatement: @@ -39,6 +78,7 @@ def __init__( table_name: str, schema: sch.Schema, tbl_properties: dict, + watermark: Watermark | None = None, database: str | None = None, catalog: str | None = None, temp: bool = False, @@ -56,6 +96,7 @@ def __init__( ) self.catalog = catalog self.temp = temp + self.watermark = watermark def _storage(self) -> str: return f"STORED AS {self.format}" if self.format else None @@ -77,7 +118,26 @@ def _create_line(self) -> str: @property def _pieces(self): - yield from super()._pieces + if self.partition is not None: + main_schema = self.schema + part_schema = self.partition + if not isinstance(part_schema, sch.Schema): + part_fields = {name: self.schema[name] for name in part_schema} + part_schema = sch.Schema(part_fields) + + to_delete = {name for name in self.partition if name in self.schema} + fields = { + name: dtype + for name, dtype in main_schema.items() + if name not in to_delete + } + main_schema = sch.Schema(fields) + + yield format_schema_with_watermark(main_schema, self.watermark) + yield f"PARTITIONED BY {format_schema(part_schema)}" + else: + yield format_schema_with_watermark(self.schema, self.watermark) + yield self._format_tbl_properties() diff --git a/ibis/backends/flink/tests/test_ddl.py b/ibis/backends/flink/tests/test_ddl.py index b87fe2872b01..0a42f65ed1a5 100644 --- a/ibis/backends/flink/tests/test_ddl.py +++ b/ibis/backends/flink/tests/test_ddl.py @@ -3,6 +3,7 @@ import pytest from py4j.protocol import Py4JJavaError +import ibis import ibis.expr.datatypes as dt import ibis.expr.schema as sch @@ -22,13 +23,36 @@ def awards_players_schema(): @pytest.fixture -def awards_players_csv_connector_configs(): - return { - "connector": "filesystem", - "path": "ci/ibis-testing-data/csv/awards_players.csv", - "format": "csv", - "csv.ignore-parse-errors": "true", - } +def functiona_alltypes_schema(): + return sch.Schema( + { + "id": dt.int32, + "bool_col": dt.bool, + "smallint_col": dt.int16, + "int_col": dt.int32, + "bigint_col": dt.int64, + "float_col": dt.float32, + "double_col": dt.float64, + "date_string_col": dt.string, + "string_col": dt.string, + "timestamp_col": dt.timestamp(scale=3), + "year": dt.int32, + "month": dt.int32, + } + ) + + +@pytest.fixture +def csv_connector_configs(): + def generate_csv_configs(csv_file): + return { + "connector": "filesystem", + "path": f"ci/ibis-testing-data/csv/{csv_file}.csv", + "format": "csv", + "csv.ignore-parse-errors": "true", + } + + return generate_csv_configs def test_list_tables(con): @@ -37,12 +61,12 @@ def test_list_tables(con): def test_create_table_from_schema( - con, awards_players_schema, temp_table, awards_players_csv_connector_configs + con, awards_players_schema, temp_table, csv_connector_configs ): new_table = con.create_table( temp_table, schema=awards_players_schema, - tbl_properties=awards_players_csv_connector_configs, + tbl_properties=csv_connector_configs("awards_players"), ) assert temp_table in con.list_tables() assert new_table.schema() == awards_players_schema @@ -50,12 +74,12 @@ def test_create_table_from_schema( @pytest.mark.parametrize("temp", [True, False]) def test_create_table( - con, awards_players_schema, temp_table, awards_players_csv_connector_configs, temp + con, awards_players_schema, temp_table, csv_connector_configs, temp ): con.create_table( temp_table, schema=awards_players_schema, - tbl_properties=awards_players_csv_connector_configs, + tbl_properties=csv_connector_configs("awards_players"), temp=temp, ) assert temp_table in con.list_tables() @@ -67,3 +91,18 @@ def test_create_table( con.drop_table(temp_table, temp=temp) assert temp_table not in con.list_tables() + + +def test_create_source_table_with_watermark( + con, functiona_alltypes_schema, temp_table, csv_connector_configs +): + new_table = con.create_table( + temp_table, + schema=functiona_alltypes_schema, + tbl_properties=csv_connector_configs("functional_alltypes"), + watermark=ibis.watermark( + time_col="timestamp_col", allowed_delay=ibis.interval(seconds=15) + ), + ) + assert temp_table in con.list_tables() + assert new_table.schema() == functiona_alltypes_schema diff --git a/ibis/backends/flink/tests/test_literals.py b/ibis/backends/flink/tests/test_literals.py index ab85ded90b7d..ce6cd134ed01 100644 --- a/ibis/backends/flink/tests/test_literals.py +++ b/ibis/backends/flink/tests/test_literals.py @@ -51,6 +51,7 @@ def test_string_literals(value, expected): param( ibis.interval(months=50), "INTERVAL '04-02' YEAR TO MONTH", id="50months" ), + param(ibis.interval(seconds=5), "INTERVAL '5' SECOND", id="5seconds"), ], ) def test_translate_interval_literal(value, expected): diff --git a/ibis/backends/flink/utils.py b/ibis/backends/flink/utils.py index 34bd25e0f2fd..7a05f1561c10 100644 --- a/ibis/backends/flink/utils.py +++ b/ibis/backends/flink/utils.py @@ -239,7 +239,8 @@ def _translate_interval(value, dtype): if len(nonzero_interval_segments) == 1: unit = next(iter(nonzero_interval_segments)) value = nonzero_interval_segments[unit] - return f"'{value}' {unit.value}{format_precision(value, unit)}" + precision = _calculate_precision(value) + return f"'{value}' {unit.name}{format_precision(precision, unit)}" # YEAR TO MONTH, DAY TO SECOND return interval.format_as_string() diff --git a/ibis/expr/api.py b/ibis/expr/api.py index c097fac74ab3..5b054d5739dc 100644 --- a/ibis/expr/api.py +++ b/ibis/expr/api.py @@ -22,6 +22,7 @@ from ibis.expr.deferred import Deferred from ibis.expr.schema import Schema from ibis.expr.sql import parse_sql, show_sql, to_sql +from ibis.expr.streaming import Watermark from ibis.expr.types import ( DateValue, Expr, @@ -157,6 +158,7 @@ "trailing_range_window", "trailing_window", "union", + "watermark", "where", "window", "preceding", @@ -1496,9 +1498,9 @@ def difference(table: ir.Table, *rest: ir.Table, distinct: bool = True): Parameters ---------- - table: + table A table expression - *rest: + *rest Additional table expressions distinct Only diff distinct rows not occurring in the calling table @@ -1544,6 +1546,24 @@ def difference(table: ir.Table, *rest: ir.Table, distinct: bool = True): return table.difference(*rest, distinct=distinct) if rest else table +def watermark(time_col: str, allowed_delay: ir.IntervalScalar) -> Watermark: + """Return a watermark object. + + Parameters + ---------- + time_col + The timestamp column that will be used to generate watermarks in event time processing. + allowed_delay + Length of time that events are allowed to be late. + + Returns + ------- + Watermark + A watermark object. + """ + return Watermark(time_col=time_col, allowed_delay=allowed_delay) + + e = ops.E().to_expr() pi = ops.Pi().to_expr() diff --git a/ibis/expr/streaming.py b/ibis/expr/streaming.py new file mode 100644 index 000000000000..a479f2a57255 --- /dev/null +++ b/ibis/expr/streaming.py @@ -0,0 +1,9 @@ +from __future__ import annotations + +import ibis.expr.types as ir # noqa: TCH001 +from ibis.common.grounds import Concrete + + +class Watermark(Concrete): + time_col: str + allowed_delay: ir.IntervalScalar