Skip to content

Commit

Permalink
refactor(flink): port to sqlglot
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud committed Feb 12, 2024
1 parent f76b21e commit 75d6e02
Show file tree
Hide file tree
Showing 82 changed files with 1,512 additions and 1,847 deletions.
108 changes: 50 additions & 58 deletions .github/workflows/ibis-backends.yml
Original file line number Diff line number Diff line change
Expand Up @@ -177,22 +177,28 @@ jobs:
- oracle
services:
- oracle
# - name: flink
# title: Flink
# serial: true
# extras:
# - flink
# additional_deps:
# - apache-flink
# - pytest-split
# services:
# - flink
# - name: risingwave
# title: Risingwave
# services:
# - risingwave
# extras:
# - risingwave
- name: flink
title: Flink
serial: true
extras:
- flink
additional_deps:
- apache-flink
services:
- flink
include:
- os: ubuntu-latest
python-version: "3.10"
backend:
name: flink
title: Flink
serial: true
extras:
- flink
additional_deps:
- apache-flink
services:
- flink
exclude:
- os: windows-latest
backend:
Expand Down Expand Up @@ -296,32 +302,29 @@ jobs:
- oracle
services:
- oracle
# - os: windows-latest
# backend:
# name: flink
# title: Flink
# serial: true
# extras:
# - flink
# services:
# - flink
# - python-version: "3.11"
# backend:
# name: flink
# title: Flink
# serial: true
# extras:
# - flink
# services:
# - flink
# - os: windows-latest
# backend:
# name: risingwave
# title: Risingwave
# services:
# - risingwave
# extras:
# - risingwave
- os: windows-latest
backend:
name: flink
title: Flink
serial: true
extras:
- flink
additional_deps:
- apache-flink
services:
- flink
- os: ubuntu-latest
python-version: "3.11"
backend:
name: flink
title: Flink
serial: true
extras:
- flink
additional_deps:
- apache-flink
services:
- flink
- os: windows-latest
backend:
name: exasol
Expand Down Expand Up @@ -390,29 +393,18 @@ jobs:
IBIS_TEST_IMPALA_PORT: 21050
IBIS_EXAMPLES_DATA: ${{ runner.temp }}/examples-${{ matrix.backend.name }}-${{ matrix.os }}-${{ steps.install_python.outputs.python-version }}

# FIXME(deepyaman): If some backend-specific test, in test_ddl.py,
# executes before common tests, they will fail with:
# org.apache.flink.table.api.ValidationException: Table `default_catalog`.`default_database`.`functional_alltypes` was not found.
# Therefore, we run backend-specific tests second to avoid this.
# - name: "run serial tests: ${{ matrix.backend.name }}"
# if: matrix.backend.serial && matrix.backend.name == 'flink'
# run: |
# just ci-check -m ${{ matrix.backend.name }} ibis/backends/tests
# just ci-check -m ${{ matrix.backend.name }} ibis/backends/flink/tests
# env:
# IBIS_EXAMPLES_DATA: ${{ runner.temp }}/examples-${{ matrix.backend.name }}-${{ matrix.os }}-${{ steps.install_python.outputs.python-version }}
# FLINK_REMOTE_CLUSTER_ADDR: localhost
# FLINK_REMOTE_CLUSTER_PORT: "8081"
#
- name: "run serial tests: ${{ matrix.backend.name }}"
if: matrix.backend.serial # && matrix.backend.name != 'flink'
if: matrix.backend.serial
run: just ci-check -m ${{ matrix.backend.name }}
env:
FLINK_REMOTE_CLUSTER_ADDR: localhost
FLINK_REMOTE_CLUSTER_PORT: "8081"
IBIS_EXAMPLES_DATA: ${{ runner.temp }}/examples-${{ matrix.backend.name }}-${{ matrix.os }}-${{ steps.install_python.outputs.python-version }}

- name: check that no untracked files were produced
shell: bash
run: git checkout poetry.lock pyproject.toml && ! git status --porcelain | tee /dev/stderr | grep .
run: |
! git status --porcelain | tee /dev/stderr | grep .
- name: upload code coverage
if: success()
Expand Down
9 changes: 7 additions & 2 deletions ibis/backends/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1277,9 +1277,14 @@ def _transpile_sql(self, query: str, *, dialect: str | None = None) -> str:


@functools.cache
def _get_backend_names() -> frozenset[str]:
def _get_backend_names(*, exclude: tuple[str] = ()) -> frozenset[str]:
"""Return the set of known backend names.
Parameters
----------
exclude
Exclude these backend names from the result
Notes
-----
This function returns a frozenset to prevent cache pollution.
Expand All @@ -1293,7 +1298,7 @@ def _get_backend_names() -> frozenset[str]:
entrypoints = importlib.metadata.entry_points()["ibis.backends"]
else:
entrypoints = importlib.metadata.entry_points(group="ibis.backends")
return frozenset(ep.name for ep in entrypoints)
return frozenset(ep.name for ep in entrypoints).difference(exclude)


def connect(resource: Path | str, **kwargs: Any) -> BaseBackend:
Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/base/sqlglot/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def __getitem__(self, key: str) -> sge.Column:

def paren(expr):
"""Wrap a sqlglot expression in parentheses."""
return sge.Paren(this=expr)
return sge.Paren(this=sge.convert(expr))


def parenthesize(op, arg):
Expand Down
10 changes: 10 additions & 0 deletions ibis/backends/base/sqlglot/datatypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1029,3 +1029,13 @@ def _from_ibis_Map(cls, dtype: dt.Map) -> sge.DataType:
key_type = cls.from_ibis(dtype.key_type.copy(nullable=False))
value_type = cls.from_ibis(dtype.value_type)
return sge.DataType(this=typecode.MAP, expressions=[key_type, value_type])


class FlinkType(SqlglotType):
dialect = "flink"
default_decimal_precision = 38
default_decimal_scale = 18

@classmethod
def _from_ibis_Binary(cls, dtype: dt.Binary) -> sge.DataType:
return sge.DataType(this=sge.DataType.Type.VARBINARY)

Check warning on line 1041 in ibis/backends/base/sqlglot/datatypes.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/base/sqlglot/datatypes.py#L1041

Added line #L1041 was not covered by tests
17 changes: 17 additions & 0 deletions ibis/backends/base/sqlglot/dialects.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ class Generator(Postgres.Generator):

class Flink(Hive):
class Generator(Hive.Generator):
TYPE_MAPPING = Hive.Generator.TYPE_MAPPING.copy() | {
sge.DataType.Type.TIME: "TIME",
}

TRANSFORMS = Hive.Generator.TRANSFORMS.copy() | {
sge.Stddev: rename_func("stddev_samp"),
sge.StddevPop: rename_func("stddev_pop"),
Expand All @@ -82,8 +86,21 @@ class Generator(Hive.Generator):
),
sge.ArrayConcat: rename_func("array_concat"),
sge.Length: rename_func("char_length"),
sge.TryCast: lambda self,
e: f"TRY_CAST({e.this.sql(self.dialect)} AS {e.to.sql(self.dialect)})",
sge.DayOfYear: rename_func("dayofyear"),
sge.DayOfWeek: rename_func("dayofweek"),
sge.DayOfMonth: rename_func("dayofmonth"),
}

class Tokenizer(Hive.Tokenizer):
# In Flink, embedded single quotes are escaped like most other SQL
# dialects: doubling up the single quote
#
# We override it here because we inherit from Hive's dialect and Hive
# uses a backslash to escape single quotes
STRING_ESCAPES = ["'"]


class Impala(Hive):
class Generator(Hive.Generator):
Expand Down
Loading

0 comments on commit 75d6e02

Please sign in to comment.