Skip to content

Commit

Permalink
[common] Add Flink 1.15.2 support. Add Maven profile for Flink 1.14
Browse files Browse the repository at this point in the history
  • Loading branch information
tigrulya-exe committed Aug 25, 2022
1 parent 9f97e06 commit d4979f3
Show file tree
Hide file tree
Showing 14 changed files with 68 additions and 34 deletions.
6 changes: 3 additions & 3 deletions flink-cdc-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime_${scala.binary.version}</artifactId>
<artifactId>flink-table-runtime${flink.submodule.postfix}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<artifactId>flink-test-utils${flink.submodule.postfix}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
Expand All @@ -82,7 +82,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>flink-streaming-java${flink.submodule.postfix}</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
Expand Down
2 changes: 1 addition & 1 deletion flink-connector-debezium/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<artifactId>flink-test-utils${flink.submodule.postfix}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
Expand Down
4 changes: 2 additions & 2 deletions flink-connector-mongodb-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<artifactId>flink-test-utils${flink.submodule.postfix}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
Expand All @@ -96,7 +96,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>flink-streaming-java${flink.submodule.postfix}</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
Expand Down
6 changes: 3 additions & 3 deletions flink-connector-mysql-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,14 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime_${scala.binary.version}</artifactId>
<artifactId>flink-table-runtime${flink.submodule.postfix}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<artifactId>flink-test-utils${flink.submodule.postfix}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
Expand All @@ -136,7 +136,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>flink-streaming-java${flink.submodule.postfix}</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;

import com.esri.core.geometry.ogc.OGCGeometry;
import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -115,8 +114,7 @@ public Object convert(Object dbzObj, Schema schema) throws Exception {

private static Optional<DeserializationRuntimeConverter> createArrayConverter(
ArrayType arrayType) {
if (LogicalTypeChecks.hasFamily(
arrayType.getElementType(), LogicalTypeFamily.CHARACTER_STRING)) {
if (hasFamily(arrayType.getElementType(), LogicalTypeFamily.CHARACTER_STRING)) {
// only map MySQL SET type to Flink ARRAY<STRING> type
return Optional.of(
new DeserializationRuntimeConverter() {
Expand Down Expand Up @@ -148,4 +146,8 @@ public Object convert(Object dbzObj, Schema schema) throws Exception {
return Optional.empty();
}
}

private static boolean hasFamily(LogicalType logicalType, LogicalTypeFamily family) {
return logicalType.getTypeRoot().getFamilies().contains(family);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
private static final String TEST_PASSWORD = "mysqlpw";

private static final MySqlContainer MYSQL8_CONTAINER =
(MySqlContainer) createMySqlContainer(MySqlVersion.V8_0).withExposedPorts(3307);
createMySqlContainer(MySqlVersion.V8_0).withDatabasePort(3307);

private final UniqueDatabase inventoryDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "inventory", TEST_USER, TEST_PASSWORD);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
public class MySqlContainer extends JdbcDatabaseContainer {

public static final String IMAGE = "mysql";
public static final Integer MYSQL_PORT = 3306;

private static final String MY_CNF_CONFIG_OVERRIDE_PARAM_NAME = "MY_CNF";
private static final String SETUP_SQL_PARAM_NAME = "SETUP_SQL";
Expand All @@ -41,23 +40,24 @@ public class MySqlContainer extends JdbcDatabaseContainer {
private String databaseName = "test";
private String username = "test";
private String password = "test";
private int databasePort = 3306;

public MySqlContainer() {
this(MySqlVersion.V5_7);
}

public MySqlContainer(MySqlVersion version) {
super(DockerImageName.parse(IMAGE + ":" + version.getVersion()));
addExposedPort(MYSQL_PORT);
}

@Override
protected Set<Integer> getLivenessCheckPorts() {
return new HashSet<>(getMappedPort(MYSQL_PORT));
return new HashSet<>(getMappedPort(databasePort));
}

@Override
protected void configure() {
addExposedPort(databasePort);
optionallyMapResourceParameterAsVolume(
MY_CNF_CONFIG_OVERRIDE_PARAM_NAME, "/etc/mysql/", "mysql-default-conf");

Expand Down Expand Up @@ -107,7 +107,7 @@ public String getJdbcUrl() {
}

public int getDatabasePort() {
return getMappedPort(MYSQL_PORT);
return getMappedPort(databasePort);
}

@Override
Expand Down Expand Up @@ -175,4 +175,9 @@ public MySqlContainer withPassword(final String password) {
this.password = password;
return this;
}

public MySqlContainer withDatabasePort(final int port) {
this.databasePort = port;
return this;
}
}
18 changes: 15 additions & 3 deletions flink-connector-oceanbase-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,34 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime_${scala.binary.version}</artifactId>
<artifactId>flink-table-runtime${flink.submodule.postfix}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<artifactId>flink-test-utils${flink.submodule.postfix}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand All @@ -99,7 +111,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>flink-streaming-java${flink.submodule.postfix}</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
Expand Down
4 changes: 2 additions & 2 deletions flink-connector-oracle-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<artifactId>flink-test-utils${flink.submodule.postfix}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
Expand All @@ -102,7 +102,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>flink-streaming-java${flink.submodule.postfix}</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
Expand Down
4 changes: 2 additions & 2 deletions flink-connector-postgres-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<artifactId>flink-test-utils${flink.submodule.postfix}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
Expand All @@ -92,7 +92,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>flink-streaming-java${flink.submodule.postfix}</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
Expand Down
4 changes: 2 additions & 2 deletions flink-connector-sqlserver-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<artifactId>flink-test-utils${flink.submodule.postfix}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
Expand All @@ -92,7 +92,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>flink-streaming-java${flink.submodule.postfix}</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
Expand Down
2 changes: 1 addition & 1 deletion flink-connector-test-util/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<artifactId>flink-test-utils${flink.submodule.postfix}</artifactId>
<version>${flink.version}</version>
</dependency>

Expand Down
6 changes: 3 additions & 3 deletions flink-connector-tidb-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,14 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime_${scala.binary.version}</artifactId>
<artifactId>flink-table-runtime${flink.submodule.postfix}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<artifactId>flink-test-utils${flink.submodule.postfix}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
Expand All @@ -116,7 +116,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>flink-streaming-java${flink.submodule.postfix}</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
Expand Down
23 changes: 19 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ under the License.
</distributionManagement>

<properties>
<flink.version>1.14.4</flink.version>
<flink.version>1.15.2</flink.version>
<flink.submodule.postfix></flink.submodule.postfix>
<debezium.version>1.6.4.Final</debezium.version>
<tikv.version>3.2.0</tikv.version>
<geometry.version>2.2.0</geometry.version>
Expand All @@ -81,7 +82,7 @@ under the License.
See more https://github.com/testcontainers/testcontainers-java/issues/4297. -->
<testcontainers.version>1.15.3</testcontainers.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.binary.version>2.12</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<hamcrest.version>1.3</hamcrest.version>
Expand All @@ -107,7 +108,13 @@ under the License.
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge${flink.submodule.postfix}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
Expand All @@ -119,7 +126,7 @@ under the License.
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>flink-streaming-java${flink.submodule.postfix}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
Expand Down Expand Up @@ -467,6 +474,14 @@ under the License.
</build>

<profiles>
<profile>
<id>flink-1.14</id>
<properties>
<flink.version>1.14.4</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<flink.submodule.postfix>_${scala.binary.version}</flink.submodule.postfix>
</properties>
</profile>
<profile>
<id>release</id>
<build>
Expand Down

0 comments on commit d4979f3

Please sign in to comment.