Skip to content

Commit

Permalink
[mysql] Add finished unack splits to state for the MysqlSourceReader (#…
Browse files Browse the repository at this point in the history
…2399)

(cherry picked from commit 84e97df)
  • Loading branch information
ruanhang1993 authored and leonardBang committed Oct 31, 2023
1 parent 4ed7206 commit 2ec183a
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.ververica.cdc.connectors.mysql.source.reader;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
Expand Down Expand Up @@ -136,6 +137,9 @@ public List<MySqlSplit> snapshotState(long checkpointId) {
.filter(split -> !finishedUnackedSplits.containsKey(split.splitId()))
.collect(Collectors.toList());

// add finished snapshot splits that did not receive ack yet
unfinishedSplits.addAll(finishedUnackedSplits.values());

// add binlog splits who are uncompleted
unfinishedSplits.addAll(uncompletedBinlogSplits.values());

Expand Down Expand Up @@ -500,4 +504,9 @@ private void logCurrentBinlogOffsets(List<MySqlSplit> splits, long checkpointId)
protected MySqlSplit toSplitType(String splitId, MySqlSplitState splitState) {
return splitState.toMySqlSplit();
}

@VisibleForTesting
public Map<String, MySqlSnapshotSplit> getFinishedUnackedSplits() {
return finishedUnackedSplits;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.testcontainers.lifecycle.Startables;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -106,6 +107,14 @@ public static void assertEqualsInOrder(List<String> expected, List<String> actua
assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0]));
}

public static void assertMapEquals(Map<String, ?> expected, Map<String, ?> actual) {
assertTrue(expected != null && actual != null);
assertEquals(expected.size(), actual.size());
for (String key : expected.keySet()) {
assertEquals(expected.get(key), actual.get(key));
}
}

/** The type of failover. */
protected enum FailoverType {
TM,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -93,6 +95,7 @@
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isSchemaChangeEvent;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isWatermarkEvent;
import static java.lang.String.format;
import static org.apache.flink.core.io.InputStatus.MORE_AVAILABLE;
import static org.apache.flink.util.Preconditions.checkState;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand All @@ -107,6 +110,107 @@ public class MySqlSourceReaderTest extends MySqlSourceTestBase {
private final UniqueDatabase inventoryDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "inventory", "mysqluser", "mysqlpw");

@Test
public void testFinishedUnackedSplitsUsingStateFromSnapshotPhase() throws Exception {
customerDatabase.createAndInitialize();
final MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers"});
final DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("address", DataTypes.STRING()),
DataTypes.FIELD("phone_number", DataTypes.STRING()));
List<MySqlSplit> snapshotSplits;
try (MySqlConnection jdbc = DebeziumUtils.createMySqlConnection(sourceConfig)) {
Map<TableId, TableChanges.TableChange> tableSchemas =
TableDiscoveryUtils.discoverSchemaForCapturedTables(
new MySqlPartition(
sourceConfig.getMySqlConnectorConfig().getLogicalName()),
sourceConfig,
jdbc);
TableId tableId = new TableId(customerDatabase.getDatabaseName(), null, "customers");
RowType splitType =
RowType.of(
new LogicalType[] {DataTypes.INT().getLogicalType()},
new String[] {"id"});
snapshotSplits =
Arrays.asList(
new MySqlSnapshotSplit(
tableId,
tableId + ":0",
splitType,
null,
new Integer[] {200},
null,
tableSchemas),
new MySqlSnapshotSplit(
tableId,
tableId + ":1",
splitType,
new Integer[] {200},
new Integer[] {1500},
null,
tableSchemas),
new MySqlSnapshotSplit(
tableId,
tableId + ":2",
splitType,
new Integer[] {1500},
null,
null,
tableSchemas));
}

// Step 1: start source reader and assign snapshot splits
MySqlSourceReader<SourceRecord> reader = createReader(sourceConfig, -1);
reader.start();
reader.addSplits(snapshotSplits);

String[] expectedRecords =
new String[] {
"+I[111, user_6, Shanghai, 123567891234]",
"+I[110, user_5, Shanghai, 123567891234]",
"+I[101, user_1, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"+I[118, user_7, Shanghai, 123567891234]",
"+I[121, user_8, Shanghai, 123567891234]",
"+I[123, user_9, Shanghai, 123567891234]",
"+I[109, user_4, Shanghai, 123567891234]",
"+I[1009, user_10, Shanghai, 123567891234]",
"+I[1011, user_12, Shanghai, 123567891234]",
"+I[1010, user_11, Shanghai, 123567891234]",
"+I[1013, user_14, Shanghai, 123567891234]",
"+I[1012, user_13, Shanghai, 123567891234]",
"+I[1015, user_16, Shanghai, 123567891234]",
"+I[1014, user_15, Shanghai, 123567891234]",
"+I[1017, user_18, Shanghai, 123567891234]",
"+I[1016, user_17, Shanghai, 123567891234]",
"+I[1019, user_20, Shanghai, 123567891234]",
"+I[1018, user_19, Shanghai, 123567891234]",
"+I[2000, user_21, Shanghai, 123567891234]"
};
// Step 2: wait the snapshot splits finished reading
Thread.sleep(5000L);
List<String> actualRecords = consumeRecords(reader, dataType);
assertEqualsInAnyOrder(Arrays.asList(expectedRecords), actualRecords);

// Step 3: snapshot reader's state
List<MySqlSplit> splitsState = reader.snapshotState(1L);

// Step 4: restart reader from a restored state
MySqlSourceReader<SourceRecord> restartReader = createReader(sourceConfig, -1);
restartReader.start();
restartReader.addSplits(splitsState);

// Step 5: check the finished unacked splits between original reader and restarted reader
assertEquals(3, reader.getFinishedUnackedSplits().size());
assertMapEquals(
restartReader.getFinishedUnackedSplits(), reader.getFinishedUnackedSplits());
reader.close();
restartReader.close();
}

@Test
public void testBinlogReadFailoverCrossTransaction() throws Exception {
customerDatabase.createAndInitialize();
Expand Down Expand Up @@ -411,8 +515,9 @@ private List<String> consumeRecords(
MySqlSourceReader<SourceRecord> sourceReader, DataType recordType) throws Exception {
// Poll all the n records of the single split.
final SimpleReaderOutput output = new SimpleReaderOutput();
while (output.getResults().size() == 0) {
sourceReader.pollNext(output);
InputStatus status = MORE_AVAILABLE;
while (MORE_AVAILABLE == status || output.getResults().size() == 0) {
status = sourceReader.pollNext(output);
}
final RecordsFormatter formatter = new RecordsFormatter(recordType);
return formatter.format(output.getResults());
Expand Down

0 comments on commit 2ec183a

Please sign in to comment.