diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java index 43959ab4ede..8a36d8a5955 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java @@ -16,6 +16,7 @@ package com.ververica.cdc.connectors.mysql.source.reader; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.RecordEmitter; @@ -136,6 +137,9 @@ public List snapshotState(long checkpointId) { .filter(split -> !finishedUnackedSplits.containsKey(split.splitId())) .collect(Collectors.toList()); + // add finished snapshot splits that did not receive ack yet + unfinishedSplits.addAll(finishedUnackedSplits.values()); + // add binlog splits who are uncompleted unfinishedSplits.addAll(uncompletedBinlogSplits.values()); @@ -500,4 +504,9 @@ private void logCurrentBinlogOffsets(List splits, long checkpointId) protected MySqlSplit toSplitType(String splitId, MySqlSplitState splitState) { return splitState.toMySqlSplit(); } + + @VisibleForTesting + public Map getFinishedUnackedSplits() { + return finishedUnackedSplits; + } } diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceTestBase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceTestBase.java index 9956402175d..1a9950c469b 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceTestBase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceTestBase.java @@ -36,6 +36,7 @@ import org.testcontainers.lifecycle.Startables; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -106,6 +107,14 @@ public static void assertEqualsInOrder(List expected, List actua assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0])); } + public static void assertMapEquals(Map expected, Map actual) { + assertTrue(expected != null && actual != null); + assertEquals(expected.size(), actual.size()); + for (String key : expected.keySet()) { + assertEquals(expected.get(key), actual.get(key)); + } + } + /** The type of failover. */ protected enum FailoverType { TM, diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java index 321ff5010ff..ab0d607e543 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java @@ -30,6 +30,8 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.Collector; import org.apache.flink.util.Preconditions; @@ -93,6 +95,7 @@ import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isSchemaChangeEvent; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isWatermarkEvent; import static java.lang.String.format; +import static org.apache.flink.core.io.InputStatus.MORE_AVAILABLE; import static org.apache.flink.util.Preconditions.checkState; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -107,6 +110,107 @@ public class MySqlSourceReaderTest extends MySqlSourceTestBase { private final UniqueDatabase inventoryDatabase = new UniqueDatabase(MYSQL_CONTAINER, "inventory", "mysqluser", "mysqlpw"); + @Test + public void testFinishedUnackedSplitsUsingStateFromSnapshotPhase() throws Exception { + customerDatabase.createAndInitialize(); + final MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers"}); + final DataType dataType = + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.BIGINT()), + DataTypes.FIELD("name", DataTypes.STRING()), + DataTypes.FIELD("address", DataTypes.STRING()), + DataTypes.FIELD("phone_number", DataTypes.STRING())); + List snapshotSplits; + try (MySqlConnection jdbc = DebeziumUtils.createMySqlConnection(sourceConfig)) { + Map tableSchemas = + TableDiscoveryUtils.discoverSchemaForCapturedTables( + new MySqlPartition( + sourceConfig.getMySqlConnectorConfig().getLogicalName()), + sourceConfig, + jdbc); + TableId tableId = new TableId(customerDatabase.getDatabaseName(), null, "customers"); + RowType splitType = + RowType.of( + new LogicalType[] {DataTypes.INT().getLogicalType()}, + new String[] {"id"}); + snapshotSplits = + Arrays.asList( + new MySqlSnapshotSplit( + tableId, + tableId + ":0", + splitType, + null, + new Integer[] {200}, + null, + tableSchemas), + new MySqlSnapshotSplit( + tableId, + tableId + ":1", + splitType, + new Integer[] {200}, + new Integer[] {1500}, + null, + tableSchemas), + new MySqlSnapshotSplit( + tableId, + tableId + ":2", + splitType, + new Integer[] {1500}, + null, + null, + tableSchemas)); + } + + // Step 1: start source reader and assign snapshot splits + MySqlSourceReader reader = createReader(sourceConfig, -1); + reader.start(); + reader.addSplits(snapshotSplits); + + String[] expectedRecords = + new String[] { + "+I[111, user_6, Shanghai, 123567891234]", + "+I[110, user_5, Shanghai, 123567891234]", + "+I[101, user_1, Shanghai, 123567891234]", + "+I[103, user_3, Shanghai, 123567891234]", + "+I[102, user_2, Shanghai, 123567891234]", + "+I[118, user_7, Shanghai, 123567891234]", + "+I[121, user_8, Shanghai, 123567891234]", + "+I[123, user_9, Shanghai, 123567891234]", + "+I[109, user_4, Shanghai, 123567891234]", + "+I[1009, user_10, Shanghai, 123567891234]", + "+I[1011, user_12, Shanghai, 123567891234]", + "+I[1010, user_11, Shanghai, 123567891234]", + "+I[1013, user_14, Shanghai, 123567891234]", + "+I[1012, user_13, Shanghai, 123567891234]", + "+I[1015, user_16, Shanghai, 123567891234]", + "+I[1014, user_15, Shanghai, 123567891234]", + "+I[1017, user_18, Shanghai, 123567891234]", + "+I[1016, user_17, Shanghai, 123567891234]", + "+I[1019, user_20, Shanghai, 123567891234]", + "+I[1018, user_19, Shanghai, 123567891234]", + "+I[2000, user_21, Shanghai, 123567891234]" + }; + // Step 2: wait the snapshot splits finished reading + Thread.sleep(5000L); + List actualRecords = consumeRecords(reader, dataType); + assertEqualsInAnyOrder(Arrays.asList(expectedRecords), actualRecords); + + // Step 3: snapshot reader's state + List splitsState = reader.snapshotState(1L); + + // Step 4: restart reader from a restored state + MySqlSourceReader restartReader = createReader(sourceConfig, -1); + restartReader.start(); + restartReader.addSplits(splitsState); + + // Step 5: check the finished unacked splits between original reader and restarted reader + assertEquals(3, reader.getFinishedUnackedSplits().size()); + assertMapEquals( + restartReader.getFinishedUnackedSplits(), reader.getFinishedUnackedSplits()); + reader.close(); + restartReader.close(); + } + @Test public void testBinlogReadFailoverCrossTransaction() throws Exception { customerDatabase.createAndInitialize(); @@ -411,8 +515,9 @@ private List consumeRecords( MySqlSourceReader sourceReader, DataType recordType) throws Exception { // Poll all the n records of the single split. final SimpleReaderOutput output = new SimpleReaderOutput(); - while (output.getResults().size() == 0) { - sourceReader.pollNext(output); + InputStatus status = MORE_AVAILABLE; + while (MORE_AVAILABLE == status || output.getResults().size() == 0) { + status = sourceReader.pollNext(output); } final RecordsFormatter formatter = new RecordsFormatter(recordType); return formatter.format(output.getResults());