Skip to content

Commit

Permalink
Merge pull request #144 from qingfei1994/ISSUE-138
Browse files Browse the repository at this point in the history
[Feature]: Support Unit5 Test For Clickhouse Connector
  • Loading branch information
czy006 authored Aug 7, 2024
2 parents cf34ed6 + 2e8db17 commit 279b6ac
Show file tree
Hide file tree
Showing 6 changed files with 570 additions and 2 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
strategy:
matrix:
java-version: [ 8, 11 ]
runs-on: [ubuntu-latest, macos-latest, windows-latest ]
runs-on: [ubuntu-latest]
runs-on: ${{ matrix.runs-on }}
steps:
- uses: actions/checkout@v2
Expand All @@ -34,5 +34,7 @@ jobs:
java-version: ${{ matrix.java-version }}
distribution: 'adopt'
cache: maven
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1
- name: Build with Maven
run: ./mvnw verify
133 changes: 132 additions & 1 deletion flink-connector-clickhouse-e2e-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,137 @@ limitations under the License.
</properties>

<dependencies>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-test-utils</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>clickhouse</artifactId>
<version>${testcontainer.version}</version>
</dependency>
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>${clickhouse-jdbc.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-clickhouse</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
<version>${httpclient5.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents.core5</groupId>
<artifactId>httpcore5</artifactId>
<version>${httpcore5.version}</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.httpcomponents.core5/httpcore5-h2 -->
<dependency>
<groupId>org.apache.httpcomponents.core5</groupId>
<artifactId>httpcore5-h2</artifactId>
<version>${httpcore5.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<executions>
<execution>
<id>default-test</id>
<phase>none</phase>
</execution>
<execution>
<id>integration-tests</id>
<phase>none</phase>
</execution>
<execution>
<id>end-to-end-tests</id>
<phase>integration-test</phase>
<goals>
<goal>test</goal>
</goals>
<configuration>
<includes>
<include>**/*.*</include>
</includes>
<forkCount>1</forkCount>
<systemPropertyVariables>
<moduleDir>${project.basedir}</moduleDir>
</systemPropertyVariables>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-jars</id>
<phase>process-resources</phase>
<goals>
<goal>copy</goal>
</goals>
</execution>
</executions>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-clickhouse</artifactId>
<version>${project.version}</version>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
<version>5.2.1</version>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.httpcomponents.core5</groupId>
<artifactId>httpcore5</artifactId>
<version>5.2</version>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.httpcomponents.core5</groupId>
<artifactId>httpcore5-h2</artifactId>
<version>5.2</version>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>${clickhouse-jdbc.version}</version>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}</outputDirectory>
</artifactItem>
</artifactItems>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package org.apache.flink.connector.clickhouse;

import org.junit.After;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.SQLException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/** End-to-end test for Clickhouse. */
public class ClickhouseE2ECase extends FlinkContainerTestEnviroment {

private static final Logger logger = LoggerFactory.getLogger(ClickhouseE2ECase.class);

ClickhouseProxy proxy;

@Test
public void testSink() throws Exception {
String jdbcUrl = String.format("jdbc:clickhouse://%s:%s/default", "clickhouse", "8123");

proxy =
new ClickhouseProxy(
CLICKHOUSE_CONTAINER.getJdbcUrl(),
CLICKHOUSE_CONTAINER.getUsername(),
CLICKHOUSE_CONTAINER.getPassword());
proxy.execute(
"create table test (id Int32, name String, float32_column Float32, date_column Date,datetime_column DateTime, array_column Array(Int32)) engine = Memory");
proxy.execute(
"create table test_insert (id Int32, name String, float32_column Float32, date_column Date,datetime_column DateTime, array_column Array(Int32)) engine = Memory; ");
proxy.execute(
"INSERT INTO test (id, name, float32_column, date_column, datetime_column, array_column) VALUES (1, 'Name1', 1.1, '2022-01-01', '2022-01-01 00:00:00', [1, 2, 3]);");
proxy.execute(
"INSERT INTO test (id, name, float32_column, date_column, datetime_column, array_column) VALUES (2, 'Name2', 2.2, '2022-01-02', '2022-01-02 01:00:00', [4, 5, 6]);");
proxy.execute(
"INSERT INTO test (id, name, float32_column, date_column, datetime_column, array_column) VALUES (3, 'Name3', 3.3, '2022-01-03', '2022-01-03 02:00:00', [7, 8, 9]);");
proxy.execute(
"INSERT INTO test (id, name, float32_column, date_column, datetime_column, array_column) VALUES (4, 'Name4', 4.4, '2022-01-04', '2022-01-04 03:00:00', [10, 11, 12]);");
proxy.execute(
"INSERT INTO test (id, name, float32_column, date_column, datetime_column, array_column) VALUES (5, 'Name5', 5.5, '2022-01-05', '2022-01-05 04:00:00', [13, 14, 15]);");
// proxy.execute("insert into test values (2, 'kiki');");
List<String> sqlLines = new ArrayList<>();
sqlLines.add(
"create table clickhouse_test (id int, name varchar,float32_column FLOAT,\n"
+ " datetime_column TIMESTAMP(3),\n"
+ " array_column ARRAY<INT>) with ('connector' = 'clickhouse',\n"
+ " 'url' = '"
+ jdbcUrl
+ "',\n"
+ " 'table-name' = 'test',\n"
+ " 'username'='test_username',\n"
+ " 'password'='test_password'\n"
+ ");");
sqlLines.add(
"create table test (id int, name varchar,float32_column FLOAT,\n"
+ " datetime_column TIMESTAMP(3),\n"
+ " array_column ARRAY<INT>) with ('connector' = 'clickhouse',\n"
+ " 'url' = '"
+ jdbcUrl
+ "',\n"
+ " 'table-name' = 'test_insert',\n"
+ " 'username'='test_username',\n"
+ " 'password'='test_password'\n"
+ ");");
sqlLines.add("insert into test select * from clickhouse_test;");

submitSQLJob(
sqlLines,
SQL_CONNECTOR_CLICKHOUSE_JAR,
CLICKHOUSE_JDBC_JAR,
HTTPCORE_JAR,
HTTPCLIENT_JAR,
HTTPCLIENT_H2_JAR);
waitUntilJobRunning(Duration.of(1, ChronoUnit.MINUTES));
List<String> expectedResult =
Arrays.asList(
"1,Name1,1.1,2022-01-01 00:00:00,[1,2,3]",
"2,Name2,2.2,2022-01-02 01:00:00,[4,5,6]",
"3,Name3,3.3,2022-01-03 02:00:00,[7,8,9]",
"4,Name4,4.4,2022-01-04 03:00:00,[10,11,12]",
"5,Name5,5.5,2022-01-05 04:00:00,[13,14,15]");
proxy.checkResultWithTimeout(
expectedResult,
"test_insert",
Arrays.asList("id", "name", "float32_column", "datetime_column", "array_column"),
60000);
}

@After
public void tearDown() throws SQLException {
CLICKHOUSE_CONTAINER.stop();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package org.apache.flink.connector.clickhouse;

import com.clickhouse.jdbc.ClickHouseConnection;
import com.clickhouse.jdbc.ClickHouseDataSource;
import com.clickhouse.jdbc.ClickHouseDriver;
import com.clickhouse.jdbc.ClickHouseStatement;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Array;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;

/** A proxy for Clickhouse to execute SQLs and check results. */
public class ClickhouseProxy {
private String jdbcUrl;
private String username;
private String password;
private static final Logger logger = LoggerFactory.getLogger(ClickhouseProxy.class);
ClickHouseDriver driver;
ClickHouseStatement statement;
ClickHouseConnection connection;

ClickhouseProxy(String jdbcUrl, String username, String password) {
this.jdbcUrl = jdbcUrl;
this.username = username;
this.password = password;
this.driver = new ClickHouseDriver();
}

public void connect() {
try {
if (connection == null) {
Properties properties = new Properties();
properties.put("username", username);
properties.put("password", password);
ClickHouseDataSource clickHouseDataSource =
new ClickHouseDataSource(jdbcUrl, properties);
connection = clickHouseDataSource.getConnection(username, password);
statement = connection.createStatement();
}
} catch (Exception e) {
logger.error("Failed to connect to clickhouse", e);
}
}

public void execute(String sql) throws SQLException {
connect();
statement.execute(sql);
}

private void checkResult(List<String> expectedResult, String table, List<String> fields)
throws Exception {
connect();
List<String> results = new ArrayList<>();
ResultSet resultSet = statement.executeQuery("select * from " + table);
while (resultSet.next()) {
List<String> result = new ArrayList<>();
ResultSetMetaData metaData = resultSet.getMetaData();
int columnCount = metaData.getColumnCount();
for (int i = 1; i <= columnCount; i++) {
String columnName = metaData.getColumnName(i);
if (!fields.contains(columnName)) {
continue;
}
String columnType = metaData.getColumnTypeName(i);
switch (columnType) {
case "Array":
Array array = resultSet.getArray(i);
result.add(array.toString());
break;
case "Timestamp":
Timestamp timestamp = resultSet.getTimestamp(i);
result.add(timestamp.toString());
break;
default:
String value = resultSet.getString(i);
result.add(value);
break;
}
}

results.add(result.stream().collect(Collectors.joining(",")));
}
Collections.sort(results);
Collections.sort(expectedResult);
Assert.assertArrayEquals(expectedResult.toArray(), results.toArray());
}

public void checkResultWithTimeout(
List<String> expectedResult, String table, List<String> fields, long timeout)
throws Exception {
long endTimeout = System.currentTimeMillis() + timeout;
boolean result = false;
while (System.currentTimeMillis() < endTimeout) {
try {
checkResult(expectedResult, table, fields);
result = true;
break;
} catch (AssertionError | SQLException throwable) {
Thread.sleep(1000L);
}
}
if (!result) {
checkResult(expectedResult, table, fields);
}
}
}
Loading

0 comments on commit 279b6ac

Please sign in to comment.