From d60ef5c474f8f7b34f9ebe740b57cbc7d3be4428 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=B5=B7=E6=99=B4=28jennyhq=2Echen=29?= Date: Thu, 27 Jun 2024 16:05:02 +0800 Subject: [PATCH 1/9] [Feature]: Support Unit5 Test For Clickhouse Connector --- flink-connector-clickhouse-e2e-test/pom.xml | 11 ++ .../clickhouse/ClickhouseE2ECase.java | 121 ++++++++++++++++++ 2 files changed, 132 insertions(+) create mode 100644 flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/ClickhouseE2ECase.java diff --git a/flink-connector-clickhouse-e2e-test/pom.xml b/flink-connector-clickhouse-e2e-test/pom.xml index 860d69a..58ee8f2 100644 --- a/flink-connector-clickhouse-e2e-test/pom.xml +++ b/flink-connector-clickhouse-e2e-test/pom.xml @@ -35,6 +35,17 @@ limitations under the License. + + org.apache.flink + flink-connector-test-utils + ${flink.version} + + + org.testcontainers + clickhouse + 1.19.8 + + diff --git a/flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/ClickhouseE2ECase.java b/flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/ClickhouseE2ECase.java new file mode 100644 index 0000000..5e5426d --- /dev/null +++ b/flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/ClickhouseE2ECase.java @@ -0,0 +1,121 @@ +package org.apache.flink.connector.clickhouse; + +import com.clickhouse.jdbc.ClickHouseConnection; + +import com.clickhouse.jdbc.ClickHouseDriver; + +import com.clickhouse.jdbc.ClickHouseStatement; + +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.connector.testframe.container.FlinkContainers; +import org.apache.flink.connector.testframe.container.FlinkContainersSettings; +import org.apache.flink.connector.testframe.container.TestcontainersSettings; + +import org.apache.flink.test.util.SQLJobSubmission; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + + +public class ClickhouseE2ECase { + + private static final Logger logger = LoggerFactory.getLogger(ClickhouseE2ECase.class); + + static final ClickHouseContainer CLICKHOUSE_CONTAINER = + new ClickHouseContainer("clickhouse/clickhouse-server:latest") + .withLogConsumer(new Slf4jLogConsumer(logger)); + + public static final Network NETWORK = Network.newNetwork(); + + private static final TestcontainersSettings TESTCONTAINERS_SETTINGS = + TestcontainersSettings.builder() + .logger(logger) + .network(NETWORK) + .dependsOn(CLICKHOUSE_CONTAINER) + .build(); + // .baseImage("flink:1.19.0-scala_2.12") + public static final FlinkContainers FLINK = + FlinkContainers.builder() + .withFlinkContainersSettings( + FlinkContainersSettings.builder() + .numTaskManagers(1) + .setConfigOption(JobManagerOptions.ADDRESS,"jobmanager").baseImage("flink:1.19.0-scala_2.12").build()) + .withTestcontainersSettings( + TESTCONTAINERS_SETTINGS) + .build(); + + ClickHouseConnection connection; + + @Before + public void setUp() throws Exception { + String properties = "jobmanager.rpc.address: jobmanager"; + FLINK.getJobManager().withLabel("com.testcontainers.allow-filesystem-access","true"); + FLINK.getJobManager().withNetworkAliases("jobmanager") + .withExposedPorts(8081).withEnv("FLINK_PROPERTIES", properties) + .withExtraHost("host.docker.internal", "host-gateway"); + + FLINK.getTaskManagers().forEach(tm -> {tm.withLabel("com.testcontainers.allow-filesystem-access","true"); + tm.withNetworkAliases("taskmanager"); + tm.withEnv("FLINK_PROPERTIES", properties); + tm.dependsOn(FLINK.getJobManager()); + tm.withExtraHost("host.docker.internal", "host-gateway");}); + logger.info("starting containers"); + FLINK.start(); + // FLINK.getTaskManagers().forEach(tm -> tm.withLabel("com.testcontainers.allow-filesystem-access","true")); + ClickHouseDriver driver = new ClickHouseDriver(); + connection = driver.connect(CLICKHOUSE_CONTAINER.getJdbcUrl(), null); + + } + + @Test + public void testSink() throws Exception { + ClickHouseStatement statement = connection.createStatement(); + statement.execute("create table test (id Int32, name String) engine = Memory"); + statement.execute("create table test_insert (id Int32, name String) engine = Memory"); + + statement.execute("insert into test values (1, 'test')"); + List sqlLines = new ArrayList<>(); + sqlLines.add("create table clickhouse_test (id int, name varchar) with ('connector' = 'clickhouse',\n" + + " 'uri' = 'jdbc:clickhouse://clickhouse:8030',\n" + + " 'table' = 'test');"); + sqlLines.add("create table test (id int, name varchar) with ('connector' = 'clickhouse',\n" + + " 'uri' = 'jdbc:clickhouse://clickhouse:8030',\n" + + " 'table' = 'test_insert');"); + sqlLines.add("insert into clickhouse_test select * from test;"); + executeSqlStatements(sqlLines); + ResultSet resultSet = statement.executeQuery("select * from test_insert"); + + while (resultSet.next()) { + int id = resultSet.getInt("id"); + String name = resultSet.getString("name"); + //TODO + } + } + + private static void executeSqlStatements(final List sqlLines) throws Exception { + + FLINK.submitSQLJob( + new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines) + .build()); + } + + @After + public void tearDown() throws SQLException { + CLICKHOUSE_CONTAINER.stop(); + if(connection != null) { + connection.close(); + } + } + +} From 49eaa423ccc2bc0126f495795a243b2bab3f8006 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=B5=B7=E6=99=B4=28jennyhq=2Echen=29?= Date: Fri, 19 Jul 2024 09:44:59 +0800 Subject: [PATCH 2/9] [ISSUE-138][Feature]: Support Unit5 Test For Clickhouse Connector --- flink-connector-clickhouse-e2e-test/pom.xml | 12 + .../clickhouse/ClickhouseE2ECase.java | 125 ++++----- .../connector/clickhouse/ClickhouseProxy.java | 94 +++++++ .../FlinkContainerTestEnviroment.java | 241 ++++++++++++++++++ 4 files changed, 400 insertions(+), 72 deletions(-) create mode 100644 flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/ClickhouseProxy.java create mode 100644 flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/FlinkContainerTestEnviroment.java diff --git a/flink-connector-clickhouse-e2e-test/pom.xml b/flink-connector-clickhouse-e2e-test/pom.xml index 58ee8f2..bc8f00b 100644 --- a/flink-connector-clickhouse-e2e-test/pom.xml +++ b/flink-connector-clickhouse-e2e-test/pom.xml @@ -45,6 +45,18 @@ limitations under the License. clickhouse 1.19.8 + + com.clickhouse + clickhouse-jdbc + ${clickhouse-jdbc.version} + test + + + org.apache.flink + flink-connector-clickhouse + ${project.version} + test + diff --git a/flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/ClickhouseE2ECase.java b/flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/ClickhouseE2ECase.java index 5e5426d..544e4c1 100644 --- a/flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/ClickhouseE2ECase.java +++ b/flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/ClickhouseE2ECase.java @@ -6,116 +6,97 @@ import com.clickhouse.jdbc.ClickHouseStatement; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.client.deployment.StandaloneClusterId; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RestOptions; import org.apache.flink.connector.testframe.container.FlinkContainers; import org.apache.flink.connector.testframe.container.FlinkContainersSettings; import org.apache.flink.connector.testframe.container.TestcontainersSettings; +import org.apache.flink.runtime.client.JobStatusMessage; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.test.resources.ResourceTestUtils; import org.apache.flink.test.util.SQLJobSubmission; +import org.apache.flink.test.util.TestUtils; + import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.Network; import org.testcontainers.containers.output.Slf4jLogConsumer; - +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.MountableFile; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.sql.ResultSet; import java.sql.SQLException; +import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; -public class ClickhouseE2ECase { +public class ClickhouseE2ECase extends FlinkContainerTestEnviroment { private static final Logger logger = LoggerFactory.getLogger(ClickhouseE2ECase.class); - static final ClickHouseContainer CLICKHOUSE_CONTAINER = - new ClickHouseContainer("clickhouse/clickhouse-server:latest") - .withLogConsumer(new Slf4jLogConsumer(logger)); - - public static final Network NETWORK = Network.newNetwork(); - - private static final TestcontainersSettings TESTCONTAINERS_SETTINGS = - TestcontainersSettings.builder() - .logger(logger) - .network(NETWORK) - .dependsOn(CLICKHOUSE_CONTAINER) - .build(); - // .baseImage("flink:1.19.0-scala_2.12") - public static final FlinkContainers FLINK = - FlinkContainers.builder() - .withFlinkContainersSettings( - FlinkContainersSettings.builder() - .numTaskManagers(1) - .setConfigOption(JobManagerOptions.ADDRESS,"jobmanager").baseImage("flink:1.19.0-scala_2.12").build()) - .withTestcontainersSettings( - TESTCONTAINERS_SETTINGS) - .build(); - - ClickHouseConnection connection; - - @Before - public void setUp() throws Exception { - String properties = "jobmanager.rpc.address: jobmanager"; - FLINK.getJobManager().withLabel("com.testcontainers.allow-filesystem-access","true"); - FLINK.getJobManager().withNetworkAliases("jobmanager") - .withExposedPorts(8081).withEnv("FLINK_PROPERTIES", properties) - .withExtraHost("host.docker.internal", "host-gateway"); - - FLINK.getTaskManagers().forEach(tm -> {tm.withLabel("com.testcontainers.allow-filesystem-access","true"); - tm.withNetworkAliases("taskmanager"); - tm.withEnv("FLINK_PROPERTIES", properties); - tm.dependsOn(FLINK.getJobManager()); - tm.withExtraHost("host.docker.internal", "host-gateway");}); - logger.info("starting containers"); - FLINK.start(); - // FLINK.getTaskManagers().forEach(tm -> tm.withLabel("com.testcontainers.allow-filesystem-access","true")); - ClickHouseDriver driver = new ClickHouseDriver(); - connection = driver.connect(CLICKHOUSE_CONTAINER.getJdbcUrl(), null); + ClickhouseProxy proxy; - } @Test public void testSink() throws Exception { - ClickHouseStatement statement = connection.createStatement(); - statement.execute("create table test (id Int32, name String) engine = Memory"); - statement.execute("create table test_insert (id Int32, name String) engine = Memory"); - - statement.execute("insert into test values (1, 'test')"); + String jdbcUrl = String.format("jdbc:clickhouse://%s:%s/default", "clickhouse", + "8123"); + + proxy = new ClickhouseProxy(CLICKHOUSE_CONTAINER.getJdbcUrl(), CLICKHOUSE_CONTAINER.getUsername(),CLICKHOUSE_CONTAINER.getPassword()); + proxy.execute( + "create table test (id Int32, name String) engine = Memory"); + proxy.execute("create table test_insert (id Int32, name String) engine = Memory"); + proxy.execute("insert into test values (1, 'test');"); + proxy.execute("insert into test values (2, 'kiki');"); List sqlLines = new ArrayList<>(); sqlLines.add("create table clickhouse_test (id int, name varchar) with ('connector' = 'clickhouse',\n" - + " 'uri' = 'jdbc:clickhouse://clickhouse:8030',\n" - + " 'table' = 'test');"); + + " 'url' = '" + jdbcUrl + "',\n" + + " 'table-name' = 'test',\n" + + " 'username'='test_username',\n" + + " 'password'='test_password'\n" + + ");"); sqlLines.add("create table test (id int, name varchar) with ('connector' = 'clickhouse',\n" - + " 'uri' = 'jdbc:clickhouse://clickhouse:8030',\n" - + " 'table' = 'test_insert');"); - sqlLines.add("insert into clickhouse_test select * from test;"); - executeSqlStatements(sqlLines); - ResultSet resultSet = statement.executeQuery("select * from test_insert"); - - while (resultSet.next()) { - int id = resultSet.getInt("id"); - String name = resultSet.getString("name"); - //TODO - } + + " 'url' = '" + jdbcUrl + "',\n" + + " 'username'='test_username',\n" + + " 'password'='test_password',\n" + + " 'table-name' = 'test_insert');"); + sqlLines.add("insert into test select * from clickhouse_test;"); + + submitSQLJob(sqlLines, SQL_CONNECTOR_CLICKHOUSE_JAR, CLICKHOUSE_JDBC_JAR, HTTPCORE_JAR, HTTPCLIENT_JAR, HTTPCLIENT_H2_JAR); + waitUntilJobRunning(Duration.of(1, ChronoUnit.MINUTES)); + List expectedResult = Arrays.asList("1,test","2,kiki"); + proxy.checkResultWithTimeout(expectedResult, "test_insert", Arrays.asList("id", "name"), 60000); } - private static void executeSqlStatements(final List sqlLines) throws Exception { - FLINK.submitSQLJob( - new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines) - .build()); - } + @After public void tearDown() throws SQLException { CLICKHOUSE_CONTAINER.stop(); - if(connection != null) { - connection.close(); - } } } diff --git a/flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/ClickhouseProxy.java b/flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/ClickhouseProxy.java new file mode 100644 index 0000000..000da83 --- /dev/null +++ b/flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/ClickhouseProxy.java @@ -0,0 +1,94 @@ +package org.apache.flink.connector.clickhouse; + +import com.clickhouse.jdbc.ClickHouseConnection; +import com.clickhouse.jdbc.ClickHouseDataSource; +import com.clickhouse.jdbc.ClickHouseDriver; +import com.clickhouse.jdbc.ClickHouseStatement; +import org.apache.commons.lang3.StringUtils; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +public class ClickhouseProxy { + private String jdbcUrl; + private String username; + private String password; + private static final Logger logger = LoggerFactory.getLogger(ClickhouseProxy.class); + ClickHouseDriver driver; + ClickHouseStatement statement; + ClickHouseConnection connection; + ClickhouseProxy(String jdbcUrl, String username, String password) { + this.jdbcUrl = jdbcUrl; + this.username = username; + this.password = password; + this.driver = new ClickHouseDriver(); + + } + + public void connect() { + try { + if (connection == null) { + Properties properties = new Properties(); + properties.put("username", username); + properties.put("password", password); + ClickHouseDataSource clickHouseDataSource = new ClickHouseDataSource(jdbcUrl, properties); + connection = clickHouseDataSource.getConnection(username,password); + statement = connection.createStatement(); + } + } catch (Exception e) { + logger.error("Failed to connect to clickhouse", e); + } + } + + public void execute(String sql) throws SQLException { + connect(); + statement.execute(sql); + } + + private void checkResult(List expectedResult, String table, List fields) throws Exception { + connect(); + List results = new ArrayList<>(); + ResultSet resultSet = statement.executeQuery("select * from " + table); + while (resultSet.next()) { + List result = new ArrayList<>(); + for (String field : fields) { + Object value = resultSet.getObject(field); + if (value == null) { + result.add("null"); + } else { + result.add(value.toString()); + } + } + results.add(StringUtils.join(result)); + + } + Collections.sort(results); + Collections.sort(expectedResult); + Assert.assertArrayEquals(expectedResult.toArray(), results.toArray()); + } + + public void checkResultWithTimeout(List expectedResult, String table, List fields, long timeout) throws Exception { + long endTimeout = System.currentTimeMillis() + timeout; + boolean result = false; + while (System.currentTimeMillis() < endTimeout) { + try { + checkResult(expectedResult, table, fields); + result = true; + break; + } catch (AssertionError | SQLException throwable) { + Thread.sleep(1000L); + } + } + if (!result) { + checkResult(expectedResult, table, fields); + } + + } +} diff --git a/flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/FlinkContainerTestEnviroment.java b/flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/FlinkContainerTestEnviroment.java new file mode 100644 index 0000000..da139d0 --- /dev/null +++ b/flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/FlinkContainerTestEnviroment.java @@ -0,0 +1,241 @@ +package org.apache.flink.connector.clickhouse; + +import com.clickhouse.jdbc.ClickHouseDriver; + +import com.clickhouse.jdbc.ClickHouseStatement; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.client.deployment.StandaloneClusterId; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.connector.testframe.container.FlinkContainers; +import org.apache.flink.connector.testframe.container.FlinkContainersSettings; +import org.apache.flink.connector.testframe.container.TestcontainersSettings; + +import org.apache.flink.runtime.client.JobStatusMessage; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.test.resources.ResourceTestUtils; + +import org.apache.flink.test.util.SQLJobSubmission; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.MountableFile; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import static org.assertj.core.util.Preconditions.checkState; + +public class FlinkContainerTestEnviroment { + + private static final Logger logger = LoggerFactory.getLogger(FlinkContainerTestEnviroment.class); + public static final Network NETWORK = Network.newNetwork(); + + static final ClickHouseContainer CLICKHOUSE_CONTAINER = + new ClickHouseContainer("clickhouse/clickhouse-server:latest") + .withNetwork(NETWORK) + .withNetworkAliases("clickhouse") + .withExposedPorts(8123, 9000) + .withUsername("test_username") + .withPassword("test_password") + .withLogConsumer(new Slf4jLogConsumer(logger)); + + + private static final TestcontainersSettings TESTCONTAINERS_SETTINGS = + TestcontainersSettings.builder() + .logger(logger) + .network(NETWORK) + .dependsOn(CLICKHOUSE_CONTAINER) + .build(); + + public static final FlinkContainers FLINK = + FlinkContainers.builder() + .withFlinkContainersSettings( + FlinkContainersSettings + .builder() + .numTaskManagers(1) + .setConfigOption(JobManagerOptions.ADDRESS,"jobmanager").baseImage("flink:1.19.0-scala_2.12").build()) + .withTestcontainersSettings( + TESTCONTAINERS_SETTINGS) + .build(); + + public static Path SQL_CONNECTOR_CLICKHOUSE_JAR = ResourceTestUtils + .getResource("flink-connector-clickhouse-1.0.0-SNAPSHOT.jar"); + public static Path CLICKHOUSE_JDBC_JAR = ResourceTestUtils + .getResource("clickhouse-jdbc-0.6.1.jar"); + public static Path HTTPCORE_JAR = ResourceTestUtils + .getResource("httpcore5-5.2.jar"); + public static Path HTTPCLIENT_JAR = ResourceTestUtils.getResource("httpclient5-5.2.1.jar"); + public static Path HTTPCLIENT_H2_JAR = ResourceTestUtils.getResource("httpcore5-h2-5.2.jar"); + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private GenericContainer jobManager; + private GenericContainer taskManager; + protected RestClusterClient restClusterClient; + + @Before + public void setUp() throws Exception { + CLICKHOUSE_CONTAINER.start(); + + /*connection = driver.connect(CLICKHOUSE_CONTAINER.getJdbcUrl(), null); + logger.info("Clickhouse connection is established, url: {}, clickhouse port:{}", CLICKHOUSE_CONTAINER.getJdbcUrl(), + CLICKHOUSE_CONTAINER.getMappedPort(8123)); + logger.info("executing clickhouse sql statements"); + statement = connection.createStatement(); + boolean execute = statement.execute( + "create table test (id Int32, name String) engine = Memory"); + execute = statement.execute("create table test_insert (id Int32, name String) engine = Memory"); + execute = statement.execute("insert into test values (1, 'test');");*/ + String properties = String.join( + "\n", + Arrays.asList( + "jobmanager.rpc.address: jobmanager", + "heartbeat.timeout: 60000","parallelism.default: 1")); + jobManager = + new GenericContainer<>(new DockerImageName("flink:1.19.0-scala_2.12")) + .withCommand("jobmanager") + .withNetwork(NETWORK) + .withExtraHost("host.docker.internal", "host-gateway") + .withNetworkAliases("jobmanager") + .withExposedPorts(8081,6123) + .dependsOn(CLICKHOUSE_CONTAINER) + .withLabel("com.testcontainers.allow-filesystem-access","true") + .withEnv("FLINK_PROPERTIES", properties) + .withLogConsumer(new Slf4jLogConsumer(logger)); + taskManager = + new GenericContainer<>(new DockerImageName("flink:1.19.0-scala_2.12")) + .withCommand("taskmanager") + .withExtraHost("host.docker.internal", "host-gateway") + .withNetwork(NETWORK) + .withNetworkAliases("taskmanager") + .withEnv("FLINK_PROPERTIES", properties) + .dependsOn(jobManager) + .withLabel("com.testcontainers.allow-filesystem-access","true") + .withLogConsumer(new Slf4jLogConsumer(logger)); + Startables.deepStart(Stream.of(jobManager)).join(); + Startables.deepStart(Stream.of(taskManager)).join(); + Thread.sleep(5000); + logger.info("Containers are started."); + + + + } + + public RestClusterClient getRestClusterClient() { + if (restClusterClient != null) { + return restClusterClient; + } + checkState( + jobManager.isRunning(), + "Cluster client should only be retrieved for a running cluster"); + try { + final Configuration clientConfiguration = new Configuration(); + clientConfiguration.set(RestOptions.ADDRESS, jobManager.getHost()); + clientConfiguration.set( + RestOptions.PORT, jobManager.getMappedPort(8081)); + this.restClusterClient = + new RestClusterClient<>(clientConfiguration, StandaloneClusterId.getInstance()); + } catch (Exception e) { + throw new IllegalStateException( + "Failed to create client for Flink container cluster", e); + } + return restClusterClient; + } + + /** + * Submits a SQL job to the running cluster. + * + *

NOTE: You should not use {@code '\t'}. + */ + public void submitSQLJob(List sqlLines, Path... jars) + throws IOException, InterruptedException { + logger.info("submitting flink sql task"); + + SQLJobSubmission job = + new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines).addJars(jars).build(); + final List commands = new ArrayList<>(); + Path script = temporaryFolder.newFile().toPath(); + Files.write(script, job.getSqlLines()); + jobManager.copyFileToContainer(MountableFile.forHostPath(script), "/tmp/script.sql"); + commands.add("cat /tmp/script.sql | "); + commands.add("bin/sql-client.sh"); + for (String jar : job.getJars()) { + commands.add("--jar"); + String containerPath = copyAndGetContainerPath(jobManager, jar); + commands.add(containerPath); + } + + Container.ExecResult execResult = + jobManager.execInContainer("bash", "-c", String.join(" ", commands)); + logger.info("execute result:" + execResult.getStdout()); + logger.error("execute error:" + execResult.getStderr()); + if (execResult.getExitCode() != 0) { + throw new AssertionError("Failed when submitting the SQL job."); + } + } + + private String copyAndGetContainerPath(GenericContainer container, String filePath) { + Path path = Paths.get(filePath); + String containerPath = "/tmp/" + path.getFileName(); + container.copyFileToContainer(MountableFile.forHostPath(path), containerPath); + return containerPath; + } + + + private static List readSqlFile(final String resourceName) throws Exception { + return Files.readAllLines( + Paths.get(ClickhouseE2ECase.class.getResource("/" + resourceName).toURI())); + } + + public void waitUntilJobRunning(Duration timeout) { + RestClusterClient clusterClient = getRestClusterClient(); + Deadline deadline = Deadline.fromNow(timeout); + while (deadline.hasTimeLeft()) { + Collection jobStatusMessages; + try { + jobStatusMessages = clusterClient.listJobs().get(10, TimeUnit.SECONDS); + } catch (Exception e) { + logger.warn("Error when fetching job status.", e); + continue; + } + if (jobStatusMessages != null && !jobStatusMessages.isEmpty()) { + JobStatusMessage message = jobStatusMessages.iterator().next(); + JobStatus jobStatus = message.getJobState(); + if (jobStatus.isTerminalState()) { + throw new ValidationException( + String.format( + "Job has been terminated! JobName: %s, JobID: %s, Status: %s", + message.getJobName(), + message.getJobId(), + message.getJobState())); + } else if (jobStatus == JobStatus.RUNNING) { + return; + } + } + } + } +} From 1ecbf9c9a92c222a0496c614be069075877e949e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=B5=B7=E6=99=B4=28jennyhq=2Echen=29?= Date: Tue, 23 Jul 2024 20:53:18 +0800 Subject: [PATCH 3/9] [ISSUE-138][Feature]: Support Unit5 Test For Clickhouse Connector --- flink-connector-clickhouse-e2e-test/pom.xml | 21 ++++++++++ .../clickhouse/ClickhouseE2ECase.java | 36 +++++++++++++----- .../connector/clickhouse/ClickhouseProxy.java | 38 ++++++++++++++----- 3 files changed, 76 insertions(+), 19 deletions(-) diff --git a/flink-connector-clickhouse-e2e-test/pom.xml b/flink-connector-clickhouse-e2e-test/pom.xml index bc8f00b..10f1b91 100644 --- a/flink-connector-clickhouse-e2e-test/pom.xml +++ b/flink-connector-clickhouse-e2e-test/pom.xml @@ -57,6 +57,27 @@ limitations under the License. ${project.version} test + + org.apache.httpcomponents.client5 + httpclient5 + 5.2.1 + test + + + org.apache.httpcomponents.core5 + httpcore5 + 5.2 + test + + + + org.apache.httpcomponents.core5 + httpcore5-h2 + 5.2 + test + + + diff --git a/flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/ClickhouseE2ECase.java b/flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/ClickhouseE2ECase.java index 544e4c1..bc87854 100644 --- a/flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/ClickhouseE2ECase.java +++ b/flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/ClickhouseE2ECase.java @@ -67,28 +67,44 @@ public void testSink() throws Exception { proxy = new ClickhouseProxy(CLICKHOUSE_CONTAINER.getJdbcUrl(), CLICKHOUSE_CONTAINER.getUsername(),CLICKHOUSE_CONTAINER.getPassword()); proxy.execute( - "create table test (id Int32, name String) engine = Memory"); - proxy.execute("create table test_insert (id Int32, name String) engine = Memory"); - proxy.execute("insert into test values (1, 'test');"); - proxy.execute("insert into test values (2, 'kiki');"); + "create table test (id Int32, name String, float32_column Float32, date_column Date,datetime_column DateTime, array_column Array(Int32)) engine = Memory"); + proxy.execute("create table test_insert (id Int32, name String, float32_column Float32, date_column Date,datetime_column DateTime, array_column Array(Int32)) engine = Memory; "); + proxy.execute("INSERT INTO test (id, name, float32_column, date_column, datetime_column, array_column) VALUES (1, 'Name1', 1.1, '2022-01-01', '2022-01-01 00:00:00', [1, 2, 3]);"); + proxy.execute("INSERT INTO test (id, name, float32_column, date_column, datetime_column, array_column) VALUES (2, 'Name2', 2.2, '2022-01-02', '2022-01-02 01:00:00', [4, 5, 6]);"); + proxy.execute("INSERT INTO test (id, name, float32_column, date_column, datetime_column, array_column) VALUES (3, 'Name3', 3.3, '2022-01-03', '2022-01-03 02:00:00', [7, 8, 9]);"); + proxy.execute("INSERT INTO test (id, name, float32_column, date_column, datetime_column, array_column) VALUES (4, 'Name4', 4.4, '2022-01-04', '2022-01-04 03:00:00', [10, 11, 12]);"); + proxy.execute("INSERT INTO test (id, name, float32_column, date_column, datetime_column, array_column) VALUES (5, 'Name5', 5.5, '2022-01-05', '2022-01-05 04:00:00', [13, 14, 15]);"); + // proxy.execute("insert into test values (2, 'kiki');"); List sqlLines = new ArrayList<>(); - sqlLines.add("create table clickhouse_test (id int, name varchar) with ('connector' = 'clickhouse',\n" + sqlLines.add("create table clickhouse_test (id int, name varchar,float32_column FLOAT,\n" + // + " date_column DATE,\n" + + " datetime_column TIMESTAMP(3),\n" + + " array_column ARRAY) with ('connector' = 'clickhouse',\n" + " 'url' = '" + jdbcUrl + "',\n" + " 'table-name' = 'test',\n" + " 'username'='test_username',\n" + " 'password'='test_password'\n" + ");"); - sqlLines.add("create table test (id int, name varchar) with ('connector' = 'clickhouse',\n" + sqlLines.add("create table test (id int, name varchar,float32_column FLOAT,\n" + // + " date_column DATE,\n" + + " datetime_column TIMESTAMP(3),\n" + + " array_column ARRAY) with ('connector' = 'clickhouse',\n" + " 'url' = '" + jdbcUrl + "',\n" + + " 'table-name' = 'test_insert',\n" + " 'username'='test_username',\n" - + " 'password'='test_password',\n" - + " 'table-name' = 'test_insert');"); + + " 'password'='test_password'\n" + + ");"); sqlLines.add("insert into test select * from clickhouse_test;"); submitSQLJob(sqlLines, SQL_CONNECTOR_CLICKHOUSE_JAR, CLICKHOUSE_JDBC_JAR, HTTPCORE_JAR, HTTPCLIENT_JAR, HTTPCLIENT_H2_JAR); waitUntilJobRunning(Duration.of(1, ChronoUnit.MINUTES)); - List expectedResult = Arrays.asList("1,test","2,kiki"); - proxy.checkResultWithTimeout(expectedResult, "test_insert", Arrays.asList("id", "name"), 60000); + List expectedResult = Arrays.asList( + "1,Name1,1.1,2022-01-01 00:00:00,[1,2,3]", + "2,Name2,2.2,2022-01-02 01:00:00,[4,5,6]", + "3,Name3,3.3,2022-01-03 02:00:00,[7,8,9]", + "4,Name4,4.4,2022-01-04 03:00:00,[10,11,12]", + "5,Name5,5.5,2022-01-05 04:00:00,[13,14,15]"); + proxy.checkResultWithTimeout(expectedResult, "test_insert", Arrays.asList("id", "name","float32_column","datetime_column","array_column"), 60000); } diff --git a/flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/ClickhouseProxy.java b/flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/ClickhouseProxy.java index 000da83..38f35d2 100644 --- a/flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/ClickhouseProxy.java +++ b/flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/ClickhouseProxy.java @@ -4,17 +4,22 @@ import com.clickhouse.jdbc.ClickHouseDataSource; import com.clickhouse.jdbc.ClickHouseDriver; import com.clickhouse.jdbc.ClickHouseStatement; -import org.apache.commons.lang3.StringUtils; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testcontainers.shaded.org.apache.commons.lang3.StringUtils; +import java.sql.Array; import java.sql.ResultSet; +import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.sql.Timestamp; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Properties; +import java.util.stream.Collectors; public class ClickhouseProxy { private String jdbcUrl; @@ -58,16 +63,31 @@ private void checkResult(List expectedResult, String table, List ResultSet resultSet = statement.executeQuery("select * from " + table); while (resultSet.next()) { List result = new ArrayList<>(); - for (String field : fields) { - Object value = resultSet.getObject(field); - if (value == null) { - result.add("null"); - } else { - result.add(value.toString()); + ResultSetMetaData metaData = resultSet.getMetaData(); + int columnCount = metaData.getColumnCount(); + for (int i = 1; i <= columnCount; i++) { + String columnName = metaData.getColumnName(i); + if (!fields.contains(columnName)) { + continue; + } + String columnType = metaData.getColumnTypeName(i); + switch (columnType) { + case "Array": + Array array = resultSet.getArray(i); + result.add(array.toString()); + break; + case "Timestamp": + Timestamp timestamp = resultSet.getTimestamp(i); + result.add(timestamp.toString()); + break; + default: + String value = resultSet.getString(i); + result.add(value); + break; + } } - } - results.add(StringUtils.join(result)); + results.add(result.stream().collect(Collectors.joining(","))); } Collections.sort(results); Collections.sort(expectedResult); From 640d7580fc7bc55f8f44fd7b0540716a6e24a118 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=B5=B7=E6=99=B4=28jennyhq=2Echen=29?= Date: Thu, 25 Jul 2024 19:21:21 +0800 Subject: [PATCH 4/9] [ISSUE-138][Feature]: Support Unit5 Test For Clickhouse Connector --- .../clickhouse/ClickhouseE2ECase.java | 141 ++++++++---------- .../connector/clickhouse/ClickhouseProxy.java | 88 +++++------ .../FlinkContainerTestEnviroment.java | 88 ++++------- 3 files changed, 138 insertions(+), 179 deletions(-) diff --git a/flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/ClickhouseE2ECase.java b/flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/ClickhouseE2ECase.java index bc87854..0f5072f 100644 --- a/flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/ClickhouseE2ECase.java +++ b/flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/ClickhouseE2ECase.java @@ -1,50 +1,10 @@ package org.apache.flink.connector.clickhouse; -import com.clickhouse.jdbc.ClickHouseConnection; - -import com.clickhouse.jdbc.ClickHouseDriver; - -import com.clickhouse.jdbc.ClickHouseStatement; - -import org.apache.flink.api.common.JobStatus; -import org.apache.flink.api.common.time.Deadline; -import org.apache.flink.client.deployment.StandaloneClusterId; -import org.apache.flink.client.program.rest.RestClusterClient; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.JobManagerOptions; -import org.apache.flink.configuration.RestOptions; -import org.apache.flink.connector.testframe.container.FlinkContainers; -import org.apache.flink.connector.testframe.container.FlinkContainersSettings; -import org.apache.flink.connector.testframe.container.TestcontainersSettings; - -import org.apache.flink.runtime.client.JobStatusMessage; -import org.apache.flink.table.api.ValidationException; -import org.apache.flink.test.resources.ResourceTestUtils; -import org.apache.flink.test.util.SQLJobSubmission; - -import org.apache.flink.test.util.TestUtils; - import org.junit.After; -import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testcontainers.clickhouse.ClickHouseContainer; -import org.testcontainers.containers.Container; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.Network; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.lifecycle.Startables; -import org.testcontainers.utility.DockerImageName; -import org.testcontainers.utility.MountableFile; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.sql.ResultSet; import java.sql.SQLException; import java.time.Duration; import java.time.temporal.ChronoUnit; @@ -52,67 +12,86 @@ import java.util.Arrays; import java.util.List; - +/** End-to-end test for Clickhouse. */ public class ClickhouseE2ECase extends FlinkContainerTestEnviroment { private static final Logger logger = LoggerFactory.getLogger(ClickhouseE2ECase.class); ClickhouseProxy proxy; - @Test public void testSink() throws Exception { - String jdbcUrl = String.format("jdbc:clickhouse://%s:%s/default", "clickhouse", - "8123"); + String jdbcUrl = String.format("jdbc:clickhouse://%s:%s/default", "clickhouse", "8123"); - proxy = new ClickhouseProxy(CLICKHOUSE_CONTAINER.getJdbcUrl(), CLICKHOUSE_CONTAINER.getUsername(),CLICKHOUSE_CONTAINER.getPassword()); + proxy = + new ClickhouseProxy( + CLICKHOUSE_CONTAINER.getJdbcUrl(), + CLICKHOUSE_CONTAINER.getUsername(), + CLICKHOUSE_CONTAINER.getPassword()); proxy.execute( "create table test (id Int32, name String, float32_column Float32, date_column Date,datetime_column DateTime, array_column Array(Int32)) engine = Memory"); - proxy.execute("create table test_insert (id Int32, name String, float32_column Float32, date_column Date,datetime_column DateTime, array_column Array(Int32)) engine = Memory; "); - proxy.execute("INSERT INTO test (id, name, float32_column, date_column, datetime_column, array_column) VALUES (1, 'Name1', 1.1, '2022-01-01', '2022-01-01 00:00:00', [1, 2, 3]);"); - proxy.execute("INSERT INTO test (id, name, float32_column, date_column, datetime_column, array_column) VALUES (2, 'Name2', 2.2, '2022-01-02', '2022-01-02 01:00:00', [4, 5, 6]);"); - proxy.execute("INSERT INTO test (id, name, float32_column, date_column, datetime_column, array_column) VALUES (3, 'Name3', 3.3, '2022-01-03', '2022-01-03 02:00:00', [7, 8, 9]);"); - proxy.execute("INSERT INTO test (id, name, float32_column, date_column, datetime_column, array_column) VALUES (4, 'Name4', 4.4, '2022-01-04', '2022-01-04 03:00:00', [10, 11, 12]);"); - proxy.execute("INSERT INTO test (id, name, float32_column, date_column, datetime_column, array_column) VALUES (5, 'Name5', 5.5, '2022-01-05', '2022-01-05 04:00:00', [13, 14, 15]);"); + proxy.execute( + "create table test_insert (id Int32, name String, float32_column Float32, date_column Date,datetime_column DateTime, array_column Array(Int32)) engine = Memory; "); + proxy.execute( + "INSERT INTO test (id, name, float32_column, date_column, datetime_column, array_column) VALUES (1, 'Name1', 1.1, '2022-01-01', '2022-01-01 00:00:00', [1, 2, 3]);"); + proxy.execute( + "INSERT INTO test (id, name, float32_column, date_column, datetime_column, array_column) VALUES (2, 'Name2', 2.2, '2022-01-02', '2022-01-02 01:00:00', [4, 5, 6]);"); + proxy.execute( + "INSERT INTO test (id, name, float32_column, date_column, datetime_column, array_column) VALUES (3, 'Name3', 3.3, '2022-01-03', '2022-01-03 02:00:00', [7, 8, 9]);"); + proxy.execute( + "INSERT INTO test (id, name, float32_column, date_column, datetime_column, array_column) VALUES (4, 'Name4', 4.4, '2022-01-04', '2022-01-04 03:00:00', [10, 11, 12]);"); + proxy.execute( + "INSERT INTO test (id, name, float32_column, date_column, datetime_column, array_column) VALUES (5, 'Name5', 5.5, '2022-01-05', '2022-01-05 04:00:00', [13, 14, 15]);"); // proxy.execute("insert into test values (2, 'kiki');"); List sqlLines = new ArrayList<>(); - sqlLines.add("create table clickhouse_test (id int, name varchar,float32_column FLOAT,\n" - // + " date_column DATE,\n" - + " datetime_column TIMESTAMP(3),\n" - + " array_column ARRAY) with ('connector' = 'clickhouse',\n" - + " 'url' = '" + jdbcUrl + "',\n" - + " 'table-name' = 'test',\n" - + " 'username'='test_username',\n" - + " 'password'='test_password'\n" - + ");"); - sqlLines.add("create table test (id int, name varchar,float32_column FLOAT,\n" - // + " date_column DATE,\n" - + " datetime_column TIMESTAMP(3),\n" - + " array_column ARRAY) with ('connector' = 'clickhouse',\n" - + " 'url' = '" + jdbcUrl + "',\n" - + " 'table-name' = 'test_insert',\n" - + " 'username'='test_username',\n" - + " 'password'='test_password'\n" - + ");"); + sqlLines.add( + "create table clickhouse_test (id int, name varchar,float32_column FLOAT,\n" + + " datetime_column TIMESTAMP(3),\n" + + " array_column ARRAY) with ('connector' = 'clickhouse',\n" + + " 'url' = '" + + jdbcUrl + + "',\n" + + " 'table-name' = 'test',\n" + + " 'username'='test_username',\n" + + " 'password'='test_password'\n" + + ");"); + sqlLines.add( + "create table test (id int, name varchar,float32_column FLOAT,\n" + + " datetime_column TIMESTAMP(3),\n" + + " array_column ARRAY) with ('connector' = 'clickhouse',\n" + + " 'url' = '" + + jdbcUrl + + "',\n" + + " 'table-name' = 'test_insert',\n" + + " 'username'='test_username',\n" + + " 'password'='test_password'\n" + + ");"); sqlLines.add("insert into test select * from clickhouse_test;"); - submitSQLJob(sqlLines, SQL_CONNECTOR_CLICKHOUSE_JAR, CLICKHOUSE_JDBC_JAR, HTTPCORE_JAR, HTTPCLIENT_JAR, HTTPCLIENT_H2_JAR); + submitSQLJob( + sqlLines, + SQL_CONNECTOR_CLICKHOUSE_JAR, + CLICKHOUSE_JDBC_JAR, + HTTPCORE_JAR, + HTTPCLIENT_JAR, + HTTPCLIENT_H2_JAR); waitUntilJobRunning(Duration.of(1, ChronoUnit.MINUTES)); - List expectedResult = Arrays.asList( - "1,Name1,1.1,2022-01-01 00:00:00,[1,2,3]", - "2,Name2,2.2,2022-01-02 01:00:00,[4,5,6]", - "3,Name3,3.3,2022-01-03 02:00:00,[7,8,9]", - "4,Name4,4.4,2022-01-04 03:00:00,[10,11,12]", - "5,Name5,5.5,2022-01-05 04:00:00,[13,14,15]"); - proxy.checkResultWithTimeout(expectedResult, "test_insert", Arrays.asList("id", "name","float32_column","datetime_column","array_column"), 60000); + List expectedResult = + Arrays.asList( + "1,Name1,1.1,2022-01-01 00:00:00,[1,2,3]", + "2,Name2,2.2,2022-01-02 01:00:00,[4,5,6]", + "3,Name3,3.3,2022-01-03 02:00:00,[7,8,9]", + "4,Name4,4.4,2022-01-04 03:00:00,[10,11,12]", + "5,Name5,5.5,2022-01-05 04:00:00,[13,14,15]"); + proxy.checkResultWithTimeout( + expectedResult, + "test_insert", + Arrays.asList("id", "name", "float32_column", "datetime_column", "array_column"), + 60000); } - - - @After public void tearDown() throws SQLException { CLICKHOUSE_CONTAINER.stop(); } - } diff --git a/flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/ClickhouseProxy.java b/flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/ClickhouseProxy.java index 38f35d2..6bb70ae 100644 --- a/flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/ClickhouseProxy.java +++ b/flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/ClickhouseProxy.java @@ -7,7 +7,6 @@ import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testcontainers.shaded.org.apache.commons.lang3.StringUtils; import java.sql.Array; import java.sql.ResultSet; @@ -15,12 +14,12 @@ import java.sql.SQLException; import java.sql.Timestamp; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.stream.Collectors; +/** A proxy for Clickhouse to execute SQLs and check results. */ public class ClickhouseProxy { private String jdbcUrl; private String username; @@ -29,12 +28,12 @@ public class ClickhouseProxy { ClickHouseDriver driver; ClickHouseStatement statement; ClickHouseConnection connection; + ClickhouseProxy(String jdbcUrl, String username, String password) { this.jdbcUrl = jdbcUrl; this.username = username; this.password = password; - this.driver = new ClickHouseDriver(); - + this.driver = new ClickHouseDriver(); } public void connect() { @@ -43,9 +42,10 @@ public void connect() { Properties properties = new Properties(); properties.put("username", username); properties.put("password", password); - ClickHouseDataSource clickHouseDataSource = new ClickHouseDataSource(jdbcUrl, properties); - connection = clickHouseDataSource.getConnection(username,password); - statement = connection.createStatement(); + ClickHouseDataSource clickHouseDataSource = + new ClickHouseDataSource(jdbcUrl, properties); + connection = clickHouseDataSource.getConnection(username, password); + statement = connection.createStatement(); } } catch (Exception e) { logger.error("Failed to connect to clickhouse", e); @@ -57,7 +57,8 @@ public void execute(String sql) throws SQLException { statement.execute(sql); } - private void checkResult(List expectedResult, String table, List fields) throws Exception { + private void checkResult(List expectedResult, String table, List fields) + throws Exception { connect(); List results = new ArrayList<>(); ResultSet resultSet = statement.executeQuery("select * from " + table); @@ -66,26 +67,26 @@ private void checkResult(List expectedResult, String table, List ResultSetMetaData metaData = resultSet.getMetaData(); int columnCount = metaData.getColumnCount(); for (int i = 1; i <= columnCount; i++) { - String columnName = metaData.getColumnName(i); - if (!fields.contains(columnName)) { - continue; - } - String columnType = metaData.getColumnTypeName(i); - switch (columnType) { - case "Array": - Array array = resultSet.getArray(i); - result.add(array.toString()); - break; - case "Timestamp": - Timestamp timestamp = resultSet.getTimestamp(i); - result.add(timestamp.toString()); - break; - default: - String value = resultSet.getString(i); - result.add(value); - break; - } + String columnName = metaData.getColumnName(i); + if (!fields.contains(columnName)) { + continue; } + String columnType = metaData.getColumnTypeName(i); + switch (columnType) { + case "Array": + Array array = resultSet.getArray(i); + result.add(array.toString()); + break; + case "Timestamp": + Timestamp timestamp = resultSet.getTimestamp(i); + result.add(timestamp.toString()); + break; + default: + String value = resultSet.getString(i); + result.add(value); + break; + } + } results.add(result.stream().collect(Collectors.joining(","))); } @@ -94,21 +95,22 @@ private void checkResult(List expectedResult, String table, List Assert.assertArrayEquals(expectedResult.toArray(), results.toArray()); } - public void checkResultWithTimeout(List expectedResult, String table, List fields, long timeout) throws Exception { - long endTimeout = System.currentTimeMillis() + timeout; - boolean result = false; - while (System.currentTimeMillis() < endTimeout) { - try { - checkResult(expectedResult, table, fields); - result = true; - break; - } catch (AssertionError | SQLException throwable) { - Thread.sleep(1000L); - } - } - if (!result) { - checkResult(expectedResult, table, fields); - } - + public void checkResultWithTimeout( + List expectedResult, String table, List fields, long timeout) + throws Exception { + long endTimeout = System.currentTimeMillis() + timeout; + boolean result = false; + while (System.currentTimeMillis() < endTimeout) { + try { + checkResult(expectedResult, table, fields); + result = true; + break; + } catch (AssertionError | SQLException throwable) { + Thread.sleep(1000L); + } + } + if (!result) { + checkResult(expectedResult, table, fields); + } } } diff --git a/flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/FlinkContainerTestEnviroment.java b/flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/FlinkContainerTestEnviroment.java index da139d0..16296c0 100644 --- a/flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/FlinkContainerTestEnviroment.java +++ b/flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/FlinkContainerTestEnviroment.java @@ -1,24 +1,15 @@ package org.apache.flink.connector.clickhouse; -import com.clickhouse.jdbc.ClickHouseDriver; - -import com.clickhouse.jdbc.ClickHouseStatement; - import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.time.Deadline; import org.apache.flink.client.deployment.StandaloneClusterId; import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.RestOptions; -import org.apache.flink.connector.testframe.container.FlinkContainers; -import org.apache.flink.connector.testframe.container.FlinkContainersSettings; import org.apache.flink.connector.testframe.container.TestcontainersSettings; - import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.table.api.ValidationException; import org.apache.flink.test.resources.ResourceTestUtils; - import org.apache.flink.test.util.SQLJobSubmission; import org.junit.Before; @@ -49,9 +40,11 @@ import static org.assertj.core.util.Preconditions.checkState; +/** Test environment running job on Flink containers. */ public class FlinkContainerTestEnviroment { - private static final Logger logger = LoggerFactory.getLogger(FlinkContainerTestEnviroment.class); + private static final Logger logger = + LoggerFactory.getLogger(FlinkContainerTestEnviroment.class); public static final Network NETWORK = Network.newNetwork(); static final ClickHouseContainer CLICKHOUSE_CONTAINER = @@ -63,7 +56,6 @@ public class FlinkContainerTestEnviroment { .withPassword("test_password") .withLogConsumer(new Slf4jLogConsumer(logger)); - private static final TestcontainersSettings TESTCONTAINERS_SETTINGS = TestcontainersSettings.builder() .logger(logger) @@ -71,27 +63,16 @@ public class FlinkContainerTestEnviroment { .dependsOn(CLICKHOUSE_CONTAINER) .build(); - public static final FlinkContainers FLINK = - FlinkContainers.builder() - .withFlinkContainersSettings( - FlinkContainersSettings - .builder() - .numTaskManagers(1) - .setConfigOption(JobManagerOptions.ADDRESS,"jobmanager").baseImage("flink:1.19.0-scala_2.12").build()) - .withTestcontainersSettings( - TESTCONTAINERS_SETTINGS) - .build(); - - public static Path SQL_CONNECTOR_CLICKHOUSE_JAR = ResourceTestUtils - .getResource("flink-connector-clickhouse-1.0.0-SNAPSHOT.jar"); - public static Path CLICKHOUSE_JDBC_JAR = ResourceTestUtils - .getResource("clickhouse-jdbc-0.6.1.jar"); - public static Path HTTPCORE_JAR = ResourceTestUtils - .getResource("httpcore5-5.2.jar"); - public static Path HTTPCLIENT_JAR = ResourceTestUtils.getResource("httpclient5-5.2.1.jar"); - public static Path HTTPCLIENT_H2_JAR = ResourceTestUtils.getResource("httpcore5-h2-5.2.jar"); - @Rule - public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + public static final Path SQL_CONNECTOR_CLICKHOUSE_JAR = + ResourceTestUtils.getResource("flink-connector-clickhouse-1.0.0-SNAPSHOT.jar"); + public static final Path CLICKHOUSE_JDBC_JAR = + ResourceTestUtils.getResource("clickhouse-jdbc-0.6.1.jar"); + public static final Path HTTPCORE_JAR = ResourceTestUtils.getResource("httpcore5-5.2.jar"); + public static final Path HTTPCLIENT_JAR = + ResourceTestUtils.getResource("httpclient5-5.2.1.jar"); + public static final Path HTTPCLIENT_H2_JAR = + ResourceTestUtils.getResource("httpcore5-h2-5.2.jar"); + @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); private GenericContainer jobManager; private GenericContainer taskManager; @@ -101,29 +82,22 @@ public class FlinkContainerTestEnviroment { public void setUp() throws Exception { CLICKHOUSE_CONTAINER.start(); - /*connection = driver.connect(CLICKHOUSE_CONTAINER.getJdbcUrl(), null); - logger.info("Clickhouse connection is established, url: {}, clickhouse port:{}", CLICKHOUSE_CONTAINER.getJdbcUrl(), - CLICKHOUSE_CONTAINER.getMappedPort(8123)); - logger.info("executing clickhouse sql statements"); - statement = connection.createStatement(); - boolean execute = statement.execute( - "create table test (id Int32, name String) engine = Memory"); - execute = statement.execute("create table test_insert (id Int32, name String) engine = Memory"); - execute = statement.execute("insert into test values (1, 'test');");*/ - String properties = String.join( - "\n", - Arrays.asList( - "jobmanager.rpc.address: jobmanager", - "heartbeat.timeout: 60000","parallelism.default: 1")); + String properties = + String.join( + "\n", + Arrays.asList( + "jobmanager.rpc.address: jobmanager", + "heartbeat.timeout: 60000", + "parallelism.default: 1")); jobManager = new GenericContainer<>(new DockerImageName("flink:1.19.0-scala_2.12")) .withCommand("jobmanager") .withNetwork(NETWORK) .withExtraHost("host.docker.internal", "host-gateway") .withNetworkAliases("jobmanager") - .withExposedPorts(8081,6123) + .withExposedPorts(8081, 6123) .dependsOn(CLICKHOUSE_CONTAINER) - .withLabel("com.testcontainers.allow-filesystem-access","true") + .withLabel("com.testcontainers.allow-filesystem-access", "true") .withEnv("FLINK_PROPERTIES", properties) .withLogConsumer(new Slf4jLogConsumer(logger)); taskManager = @@ -134,17 +108,20 @@ public void setUp() throws Exception { .withNetworkAliases("taskmanager") .withEnv("FLINK_PROPERTIES", properties) .dependsOn(jobManager) - .withLabel("com.testcontainers.allow-filesystem-access","true") + .withLabel("com.testcontainers.allow-filesystem-access", "true") .withLogConsumer(new Slf4jLogConsumer(logger)); Startables.deepStart(Stream.of(jobManager)).join(); Startables.deepStart(Stream.of(taskManager)).join(); Thread.sleep(5000); logger.info("Containers are started."); - - - } + /** + * Returns the {@link RestClusterClient} for the running cluster. + * + *

NOTE: The client is created lazily and should only be retrieved after the cluster + * is running. + */ public RestClusterClient getRestClusterClient() { if (restClusterClient != null) { return restClusterClient; @@ -155,8 +132,7 @@ public RestClusterClient getRestClusterClient() { try { final Configuration clientConfiguration = new Configuration(); clientConfiguration.set(RestOptions.ADDRESS, jobManager.getHost()); - clientConfiguration.set( - RestOptions.PORT, jobManager.getMappedPort(8081)); + clientConfiguration.set(RestOptions.PORT, jobManager.getMappedPort(8081)); this.restClusterClient = new RestClusterClient<>(clientConfiguration, StandaloneClusterId.getInstance()); } catch (Exception e) { @@ -198,6 +174,9 @@ public void submitSQLJob(List sqlLines, Path... jars) } } + /* + * Copy the file to the container and return the container path. + */ private String copyAndGetContainerPath(GenericContainer container, String filePath) { Path path = Paths.get(filePath); String containerPath = "/tmp/" + path.getFileName(); @@ -205,7 +184,6 @@ private String copyAndGetContainerPath(GenericContainer container, String fil return containerPath; } - private static List readSqlFile(final String resourceName) throws Exception { return Files.readAllLines( Paths.get(ClickhouseE2ECase.class.getResource("/" + resourceName).toURI())); From 8db098b9c11fa3ed5f33757e11717296f637b1d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=B5=B7=E6=99=B4=28jennyhq=2Echen=29?= Date: Wed, 31 Jul 2024 19:38:02 +0800 Subject: [PATCH 5/9] [ISSUE-138][Feature]: Support Unit5 Test For Clickhouse Connector --- flink-connector-clickhouse-e2e-test/pom.xml | 86 +++++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/flink-connector-clickhouse-e2e-test/pom.xml b/flink-connector-clickhouse-e2e-test/pom.xml index 10f1b91..f2457d8 100644 --- a/flink-connector-clickhouse-e2e-test/pom.xml +++ b/flink-connector-clickhouse-e2e-test/pom.xml @@ -81,4 +81,90 @@ limitations under the License. + + + + org.apache.maven.plugins + maven-surefire-plugin + + + default-test + none + + + integration-tests + none + + + end-to-end-tests + integration-test + + test + + + + **/*.* + + 1 + + ${project.basedir} + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-jars + process-resources + + copy + + + + + + + org.apache.flink + flink-connector-clickhouse + ${project.version} + jar + true + ${project.build.directory}/dependencies + + + org.apache.httpcomponents.client5 + httpclient5 + 5.2.1 + jar + true + ${project.build.directory}/dependencies + + + org.apache.httpcomponents.core5 + httpcore5 + 5.2 + jar + true + ${project.build.directory}/dependencies + + + org.apache.httpcomponents.core5 + httpcore5-h2 + 5.2 + jar + true + ${project.build.directory}/dependencies + + + + + + + + + From 05ede51c1dcade58aa6692a7471866925c10ec8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=B5=B7=E6=99=B4=28jennyhq=2Echen=29?= Date: Wed, 31 Jul 2024 20:14:59 +0800 Subject: [PATCH 6/9] [ISSUE-138][Feature]: Support Unit5 Test For Clickhouse Connector --- flink-connector-clickhouse-e2e-test/pom.xml | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/flink-connector-clickhouse-e2e-test/pom.xml b/flink-connector-clickhouse-e2e-test/pom.xml index f2457d8..7c14db5 100644 --- a/flink-connector-clickhouse-e2e-test/pom.xml +++ b/flink-connector-clickhouse-e2e-test/pom.xml @@ -133,7 +133,7 @@ limitations under the License. ${project.version} jar true - ${project.build.directory}/dependencies + ${project.build.directory} org.apache.httpcomponents.client5 @@ -141,7 +141,7 @@ limitations under the License. 5.2.1 jar true - ${project.build.directory}/dependencies + ${project.build.directory} org.apache.httpcomponents.core5 @@ -149,7 +149,7 @@ limitations under the License. 5.2 jar true - ${project.build.directory}/dependencies + ${project.build.directory} org.apache.httpcomponents.core5 @@ -157,12 +157,17 @@ limitations under the License. 5.2 jar true - ${project.build.directory}/dependencies + ${project.build.directory} + + + com.clickhouse + clickhouse-jdbc + ${clickhouse-jdbc.version} + jar + true + ${project.build.directory} - - - From eb9226de0844cf16af10968c90a9207a7f28e84d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=B5=B7=E6=99=B4=28jennyhq=2Echen=29?= Date: Thu, 1 Aug 2024 19:37:34 +0800 Subject: [PATCH 7/9] [ISSUE-138][Feature]: Support Unit5 Test For Clickhouse Connector --- .../connector/clickhouse/FlinkContainerTestEnviroment.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/FlinkContainerTestEnviroment.java b/flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/FlinkContainerTestEnviroment.java index 16296c0..13d09ee 100644 --- a/flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/FlinkContainerTestEnviroment.java +++ b/flink-connector-clickhouse-e2e-test/src/test/java/org/apache/flink/connector/clickhouse/FlinkContainerTestEnviroment.java @@ -203,7 +203,7 @@ public void waitUntilJobRunning(Duration timeout) { if (jobStatusMessages != null && !jobStatusMessages.isEmpty()) { JobStatusMessage message = jobStatusMessages.iterator().next(); JobStatus jobStatus = message.getJobState(); - if (jobStatus.isTerminalState()) { + if (jobStatus.isTerminalState() && message.getJobState() == JobStatus.FAILED) { throw new ValidationException( String.format( "Job has been terminated! JobName: %s, JobID: %s, Status: %s", From 8dfb647af32871bd5bfb3f9074685348e5475130 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=B5=B7=E6=99=B4=28jennyhq=2Echen=29?= Date: Tue, 6 Aug 2024 20:09:59 +0800 Subject: [PATCH 8/9] [ISSUE-138][Feature]: Support Unit5 Test For Clickhouse Connector --- .github/workflows/ci.yml | 2 ++ flink-connector-clickhouse-e2e-test/pom.xml | 12 ++++-------- pom.xml | 3 +++ 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 86757dd..224a316 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -34,5 +34,7 @@ jobs: java-version: ${{ matrix.java-version }} distribution: 'adopt' cache: maven + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v1 - name: Build with Maven run: ./mvnw verify diff --git a/flink-connector-clickhouse-e2e-test/pom.xml b/flink-connector-clickhouse-e2e-test/pom.xml index 7c14db5..d2285ed 100644 --- a/flink-connector-clickhouse-e2e-test/pom.xml +++ b/flink-connector-clickhouse-e2e-test/pom.xml @@ -43,7 +43,7 @@ limitations under the License. org.testcontainers clickhouse - 1.19.8 + ${testcontainer.version} com.clickhouse @@ -60,26 +60,22 @@ limitations under the License. org.apache.httpcomponents.client5 httpclient5 - 5.2.1 + ${httpclient5.version} test org.apache.httpcomponents.core5 httpcore5 - 5.2 + ${httpcore5.version} test org.apache.httpcomponents.core5 httpcore5-h2 - 5.2 + ${httpcore5.version} test - - - - diff --git a/pom.xml b/pom.xml index 7881d18..c1aeee3 100644 --- a/pom.xml +++ b/pom.xml @@ -68,6 +68,9 @@ limitations under the License. 32.1.3-jre 2.12.4 3.13.0 + 1.19.8 + 5.2.1 + 5.2 org.apache.flink.shaded.clickhouse flink-connector-clickhouse-parent 3.8.1 From 2e8db1763770149eb0c9757104e0100fe029db25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=B5=B7=E6=99=B4=28jennyhq=2Echen=29?= Date: Tue, 6 Aug 2024 20:20:26 +0800 Subject: [PATCH 9/9] [ISSUE-138][Feature]: Support Unit5 Test For Clickhouse Connector --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 224a316..13672cb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -24,7 +24,7 @@ jobs: strategy: matrix: java-version: [ 8, 11 ] - runs-on: [ubuntu-latest, macos-latest, windows-latest ] + runs-on: [ubuntu-latest] runs-on: ${{ matrix.runs-on }} steps: - uses: actions/checkout@v2