Skip to content

Commit

Permalink
test(flink): enable turning on local test env (ibis-project#8544)
Browse files Browse the repository at this point in the history
  • Loading branch information
mfatihaktas authored Mar 21, 2024
1 parent 0a00a05 commit bc8bc4a
Showing 1 changed file with 47 additions and 26 deletions.
73 changes: 47 additions & 26 deletions ibis/backends/flink/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from typing import Any
from typing import TYPE_CHECKING, Any

import pandas as pd
import pytest
Expand All @@ -10,48 +10,74 @@
from ibis.backends.tests.base import BackendTest
from ibis.backends.tests.data import array_types, json_types, struct_types, topk, win

if TYPE_CHECKING:
from pyflink.table import StreamTableEnvironment

class TestConf(BackendTest):
force_sort = True
stateful = False
supports_map = True
deps = "pandas", "pyflink"

@staticmethod
def connect(*, tmpdir, worker_id, **kw: Any):
"""Flink backend is created in batch mode by default. This is to
comply with the assumption that the tests under ibis/ibis/backends/tests/
are for batch (storage or processing) backends.
"""
def get_table_env(
local_env: bool,
streaming_mode: bool,
) -> StreamTableEnvironment:
if local_env:
from pyflink.table import EnvironmentSettings, TableEnvironment

env_settings = (
EnvironmentSettings.in_streaming_mode()
if streaming_mode
else EnvironmentSettings.in_batch_mode()
)
table_env = TableEnvironment.create(env_settings)

else:
import os

from pyflink.java_gateway import get_gateway
from pyflink.table import StreamTableEnvironment
from pyflink.table.table_environment import StreamExecutionEnvironment

# connect with Flink remote cluster to run the unit tests
# Connect with Flink remote cluster to run the unit tests
gateway = get_gateway()
string_class = gateway.jvm.String
string_array = gateway.new_array(string_class, 0)
env_settings = (
gateway.jvm.org.apache.flink.table.api.EnvironmentSettings.inBatchMode()
gateway.jvm.org.apache.flink.table.api.EnvironmentSettings.inStreamingMode()
if streaming_mode
else gateway.jvm.org.apache.flink.table.api.EnvironmentSettings.inBatchMode()
)
stream_env = gateway.jvm.org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
flink_cluster_addr = os.environ.get("FLINK_REMOTE_CLUSTER_ADDR", "localhost")
flink_cluster_port = int(os.environ.get("FLINK_REMOTE_CLUSTER_PORT", "8081"))
j_stream_execution_environment = stream_env.createRemoteEnvironment(
j_execution_environment = stream_env.createRemoteEnvironment(
flink_cluster_addr,
flink_cluster_port,
env_settings.getConfiguration(),
string_array,
)

env = StreamExecutionEnvironment(j_stream_execution_environment)
stream_table_env = StreamTableEnvironment.create(env)
table_config = stream_table_env.get_config()
table_config.set("table.local-time-zone", "UTC")
env = StreamExecutionEnvironment(j_execution_environment)
table_env = StreamTableEnvironment.create(env)

table_config = table_env.get_config()
table_config.set("table.local-time-zone", "UTC")

return table_env

return ibis.flink.connect(stream_table_env, **kw)

class TestConf(BackendTest):
force_sort = True
stateful = False
supports_map = True
deps = "pandas", "pyflink"

@staticmethod
def connect(*, tmpdir, worker_id, **kw: Any):
"""Flink backend is created in batch mode by default. This is to
comply with the assumption that the tests under ibis/ibis/backends/tests/
are for batch (storage or processing) backends.
"""

table_env = get_table_env(local_env=False, streaming_mode=False)
return ibis.flink.connect(table_env, **kw)

def _load_data(self, **_: Any) -> None:
con = self.connection
Expand Down Expand Up @@ -85,13 +111,8 @@ def connect(*, tmpdir, worker_id, **kw: Any):
in the tests under ibis/ibis/backends/flink/tests/.
We only use mini cluster here for simplicity.
"""
from pyflink.table import EnvironmentSettings, TableEnvironment

env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_config = table_env.get_config()
table_config.set("table.local-time-zone", "UTC")

table_env = get_table_env(local_env=True, streaming_mode=True)
return ibis.flink.connect(table_env, **kw)


Expand Down

0 comments on commit bc8bc4a

Please sign in to comment.