Skip to content

Commit

Permalink
feat(flink): add primary key support
Browse files Browse the repository at this point in the history
  • Loading branch information
mfatihaktas authored and jcrist committed Jan 4, 2024
1 parent 6e3fcd7 commit da04679
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 8 deletions.
7 changes: 7 additions & 0 deletions ibis/backends/flink/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ def create_table(
catalog: str | None = None,
tbl_properties: dict | None = None,
watermark: Watermark | None = None,
primary_key: str | list[str] | None = None,
temp: bool = False,
overwrite: bool = False,
) -> ir.Table:
Expand Down Expand Up @@ -349,6 +350,11 @@ def create_table(
dictionary of key-value pairs (key1=val1, key2=val2, ...).
watermark
Watermark strategy for the table, only applicable on sources.
primary_key
A single column or a list of columns to be marked as primary. Raises
an error if the column(s) in `primary_key` is NOT a subset of the
columns in `schema`. Primary keys must be non-nullable in Flink and
the columns indicated as primary key will be designated as non-nullable.
temp
Whether a table is temporary or not.
overwrite
Expand Down Expand Up @@ -441,6 +447,7 @@ def create_table(
schema=schema,
tbl_properties=tbl_properties,
watermark=watermark,
primary_key=primary_key,
temporary=temp,
database=database,
catalog=catalog,
Expand Down
44 changes: 40 additions & 4 deletions ibis/backends/flink/ddl.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import sqlglot as sg

import ibis.common.exceptions as exc
import ibis.expr.schema as sch
from ibis.backends.base.sql.ddl import (
CreateTable,
Expand All @@ -19,15 +20,19 @@
)
from ibis.backends.base.sql.registry import quote_identifier
from ibis.backends.flink.registry import type_to_sql_string
from ibis.util import promote_list

if TYPE_CHECKING:
from collections.abc import Sequence

from ibis.api import Watermark


def format_schema(schema):
def format_schema(schema: sch.Schema):
elements = [
_format_schema_element(name, t) for name, t in zip(schema.names, schema.types)
]

return "({})".format(",\n ".join(elements))


Expand All @@ -54,15 +59,31 @@ def _format_watermark_strategy(watermark: Watermark) -> str:


def format_schema_with_watermark(
schema: sch.Schema, watermark: Watermark | None = None
schema: sch.Schema,
watermark: Watermark | None = None,
primary_keys: Sequence[str] | None = None,
) -> str:
elements = [
_format_schema_element(name, t) for name, t in zip(schema.names, schema.types)
]

if watermark is not None:
elements.append(
f"WATERMARK FOR {watermark.time_col} AS {_format_watermark_strategy(watermark)}"
)

if primary_keys is not None and primary_keys:
# Note (mehmet): Currently supports "NOT ENFORCED" only. For the reason
# of this choice, the following quote from Flink docs is self-explanatory:
# "SQL standard specifies that a constraint can either be ENFORCED or
# NOT ENFORCED. This controls if the constraint checks are performed on
# the incoming/outgoing data. Flink does not own the data therefore the
# only mode we want to support is the NOT ENFORCED mode. It is up to the
# user to ensure that the query enforces key integrity."
# Ref: https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/create/#primary-key
comma_separated_keys = ", ".join(f"`{key}`" for key in primary_keys)
elements.append(f"PRIMARY KEY ({comma_separated_keys}) NOT ENFORCED")

return "({})".format(",\n ".join(elements))


Expand All @@ -88,6 +109,7 @@ def __init__(
schema: sch.Schema,
tbl_properties: dict,
watermark: Watermark | None = None,
primary_key: str | Sequence[str] | None = None,
database: str | None = None,
catalog: str | None = None,
temporary: bool = False,
Expand All @@ -107,6 +129,16 @@ def __init__(
self.temporary = temporary
self.watermark = watermark

self.primary_keys = promote_list(primary_key)

# Check if `primary_keys` is a subset of the columns in `schema`.
if self.primary_keys and not set(self.primary_keys) <= set(schema.names):
raise exc.IbisError(
"`primary_key` must be a subset of the columns in `schema`. \n"
f"\t primary_key= {primary_key} \n"
f"\t schema.names= {schema.names}"
)

def _storage(self) -> str:
return f"STORED AS {self.format}" if self.format else None

Expand Down Expand Up @@ -142,10 +174,14 @@ def _pieces(self):
}
main_schema = sch.Schema(fields)

yield format_schema_with_watermark(main_schema, self.watermark)
yield format_schema_with_watermark(
main_schema, self.watermark, self.primary_keys
)
yield f"PARTITIONED BY {format_schema(part_schema)}"
else:
yield format_schema_with_watermark(self.schema, self.watermark)
yield format_schema_with_watermark(
self.schema, self.watermark, self.primary_keys
)

yield self._format_tbl_properties()

Expand Down
70 changes: 66 additions & 4 deletions ibis/backends/flink/tests/test_ddl.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pytest

import ibis
import ibis.common.exceptions as exc
import ibis.expr.datatypes as dt
import ibis.expr.schema as sch
from ibis.backends.conftest import TEST_TABLES
Expand Down Expand Up @@ -258,19 +259,80 @@ def test_force_recreate_in_mem_table(
assert new_table.schema() == schema


def test_create_source_table_with_watermark(
con, functional_alltypes_schema, temp_table, csv_source_configs
@pytest.fixture
def functional_alltypes_schema_w_nonnullable_columns():
return sch.Schema(
{
"id": dt.int32(nullable=False),
"bool_col": dt.bool(nullable=False),
"smallint_col": dt.int16(nullable=False),
"int_col": dt.int32(nullable=False),
"bigint_col": dt.int64(nullable=False),
"float_col": dt.float32(nullable=False),
"double_col": dt.float64(nullable=False),
"date_string_col": dt.string(nullable=False),
"string_col": dt.string(nullable=False),
"year": dt.int32(nullable=False),
"month": dt.int32(nullable=False),
"timestamp_col": dt.timestamp(scale=3),
}
)


@pytest.mark.parametrize(
"primary_key",
[
None,
"id",
["id"],
["month"],
["id", "string_col"],
["id", "string_col", "year"],
],
)
def test_create_source_table_with_watermark_and_primary_key(
con,
temp_table,
functional_alltypes_schema_w_nonnullable_columns,
csv_source_configs,
primary_key,
):
new_table = con.create_table(
temp_table,
schema=functional_alltypes_schema,
schema=functional_alltypes_schema_w_nonnullable_columns,
tbl_properties=csv_source_configs("functional_alltypes"),
watermark=ibis.watermark(
time_col="timestamp_col", allowed_delay=ibis.interval(seconds=15)
),
primary_key=primary_key,
)
assert temp_table in con.list_tables()
assert new_table.schema() == functional_alltypes_schema
assert new_table.schema() == functional_alltypes_schema_w_nonnullable_columns


@pytest.mark.parametrize(
"primary_key",
[
"nonexistent_column",
["nonexistent_column"],
["id", "nonexistent_column"],
],
)
def test_create_table_failure_with_invalid_primary_keys(
con,
temp_table,
functional_alltypes_schema_w_nonnullable_columns,
csv_source_configs,
primary_key,
):
with pytest.raises(exc.IbisError):
con.create_table(
temp_table,
schema=functional_alltypes_schema_w_nonnullable_columns,
tbl_properties=csv_source_configs("functional_alltypes"),
primary_key=primary_key,
)
assert temp_table not in con.list_tables()


@pytest.mark.parametrize("temp", [True, False])
Expand Down

0 comments on commit da04679

Please sign in to comment.