Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[common] Add Flink 1.15 support. #1504

Merged
merged 4 commits into from
Oct 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions flink-cdc-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime_${scala.binary.version}</artifactId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
Expand All @@ -82,7 +82,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
Expand Down
11 changes: 11 additions & 0 deletions flink-cdc-e2e-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ under the License.
<properties>
<flink-1.13>1.13.6</flink-1.13>
<flink-1.14>1.14.4</flink-1.14>
<flink-1.15>1.15.2</flink-1.15>
<mysql.driver.version>8.0.27</mysql.driver.version>
</properties>

Expand Down Expand Up @@ -209,6 +210,16 @@ under the License.
</outputDirectory>
</artifactItem>

<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>${flink-1.15}</version>
<destFileName>jdbc-connector_${flink-1.15}.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
</artifactItem>

<artifactItem>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger {

@Parameterized.Parameters(name = "flinkVersion: {0}")
public static List<String> getFlinkVersion() {
return Arrays.asList("1.13.6", "1.14.4");
return Arrays.asList("1.13.6", "1.14.4", "1.15.2");
}

@Before
Expand Down Expand Up @@ -258,6 +258,9 @@ private String copyAndGetContainerPath(GenericContainer<?> container, String fil
}

private String getFlinkDockerImageTag() {
if ("1.15.2".equals(flinkVersion)) {
return String.format("flink:%s-scala_2.12", flinkVersion);
}
return String.format("flink:%s-scala_2.11", flinkVersion);
}

Expand Down
2 changes: 1 addition & 1 deletion flink-connector-debezium/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
Expand Down
4 changes: 2 additions & 2 deletions flink-connector-mongodb-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
Expand All @@ -96,7 +96,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
Expand Down
6 changes: 3 additions & 3 deletions flink-connector-mysql-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,14 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime_${scala.binary.version}</artifactId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
Expand All @@ -136,7 +136,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;

import com.esri.core.geometry.ogc.OGCGeometry;
import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -115,8 +114,7 @@ public Object convert(Object dbzObj, Schema schema) throws Exception {

private static Optional<DeserializationRuntimeConverter> createArrayConverter(
ArrayType arrayType) {
if (LogicalTypeChecks.hasFamily(
arrayType.getElementType(), LogicalTypeFamily.CHARACTER_STRING)) {
if (hasFamily(arrayType.getElementType(), LogicalTypeFamily.CHARACTER_STRING)) {
tigrulya-exe marked this conversation as resolved.
Show resolved Hide resolved
// only map MySQL SET type to Flink ARRAY<STRING> type
return Optional.of(
new DeserializationRuntimeConverter() {
Expand Down Expand Up @@ -148,4 +146,8 @@ public Object convert(Object dbzObj, Schema schema) throws Exception {
return Optional.empty();
}
}

private static boolean hasFamily(LogicalType logicalType, LogicalTypeFamily family) {
tigrulya-exe marked this conversation as resolved.
Show resolved Hide resolved
return logicalType.getTypeRoot().getFamilies().contains(family);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,23 @@

package com.ververica.cdc.connectors.mysql;

import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;

import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
import com.ververica.cdc.connectors.utils.TestSourceContext;
Expand All @@ -43,6 +47,7 @@
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/** Utils to help test. */
public class MySqlTestUtils {
Expand Down Expand Up @@ -108,6 +113,53 @@ public static <T> List<T> drain(TestSourceContext<T> sourceContext, int expected
return allRecords;
}

public static void waitUntilCondition(
SupplierWithException<Boolean, Exception> condition,
Deadline timeout,
long retryIntervalMillis,
String errorMsg)
throws Exception {
while (timeout.hasTimeLeft() && !(Boolean) condition.get()) {
long timeLeft = Math.max(0L, timeout.timeLeft().toMillis());
Thread.sleep(Math.min(retryIntervalMillis, timeLeft));
}

if (!timeout.hasTimeLeft()) {
throw new TimeoutException(errorMsg);
}
}

public static void waitForJobStatus(
JobClient client, List<JobStatus> expectedStatus, Deadline deadline) throws Exception {
waitUntilCondition(
() -> {
JobStatus currentStatus = (JobStatus) client.getJobStatus().get();
if (expectedStatus.contains(currentStatus)) {
return true;
} else if (currentStatus.isTerminalState()) {
try {
client.getJobExecutionResult().get();
} catch (Exception var4) {
throw new IllegalStateException(
String.format(
"Job has entered %s state, but expecting %s",
currentStatus, expectedStatus),
var4);
}

throw new IllegalStateException(
String.format(
"Job has entered a terminal state %s, but expecting %s",
currentStatus, expectedStatus));
} else {
return false;
}
},
deadline,
100L,
"Condition was not met in given timeout.");
}

private static Properties createDebeziumProperties(boolean useLegacyImplementation) {
Properties debeziumProps = new Properties();
if (useLegacyImplementation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@
import java.util.stream.Stream;

import static com.ververica.cdc.connectors.mysql.LegacyMySqlSourceTest.currentMySqlLatestOffset;
import static com.ververica.cdc.connectors.mysql.MySqlTestUtils.waitForJobStatus;
import static org.apache.flink.api.common.JobStatus.RUNNING;
import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForJobStatus;
import static org.junit.Assert.assertEquals;

/** Integration tests for MySQL Table source. */
Expand All @@ -70,8 +70,7 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
private static final String TEST_USER = "mysqluser";
private static final String TEST_PASSWORD = "mysqlpw";

private static final MySqlContainer MYSQL8_CONTAINER =
(MySqlContainer) createMySqlContainer(MySqlVersion.V8_0).withExposedPorts(3307);
private static final MySqlContainer MYSQL8_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
tigrulya-exe marked this conversation as resolved.
Show resolved Hide resolved

private final UniqueDatabase inventoryDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "inventory", TEST_USER, TEST_PASSWORD);
Expand Down
18 changes: 15 additions & 3 deletions flink-connector-oceanbase-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,34 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime_${scala.binary.version}</artifactId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand All @@ -99,7 +111,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
Expand Down
4 changes: 2 additions & 2 deletions flink-connector-oracle-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
Expand All @@ -102,7 +102,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
Expand Down
4 changes: 2 additions & 2 deletions flink-connector-postgres-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
Expand All @@ -104,7 +104,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
Expand Down
4 changes: 2 additions & 2 deletions flink-connector-sqlserver-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
Expand All @@ -92,7 +92,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
Expand Down
2 changes: 1 addition & 1 deletion flink-connector-test-util/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
</dependency>

Expand Down
6 changes: 3 additions & 3 deletions flink-connector-tidb-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,14 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime_${scala.binary.version}</artifactId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
Expand All @@ -116,7 +116,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
Expand Down
Loading