diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java index 7ca60be5bf2..0cb1195fd8f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java @@ -48,7 +48,6 @@ import java.io.IOException; import java.sql.SQLException; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -244,33 +243,23 @@ private static Map querySystemVariables( public static BinlogOffset findBinlogOffset( long targetMs, MySqlConnection connection, MySqlSourceConfig mySqlSourceConfig) { - MySqlConnection.MySqlConnectionConfiguration config = connection.connectionConfig(); - BinaryLogClient client = - new BinaryLogClient( - config.hostname(), config.port(), config.username(), config.password()); + List binlogFiles = connection.availableBinlogFiles(); + LOG.info("Available binlog files: {}", binlogFiles); + + if (binlogFiles.isEmpty()) { + return BinlogOffset.ofBinlogFilePosition("", 0); + } + if (binlogFiles.size() == 1) { + return BinlogOffset.ofBinlogFilePosition(binlogFiles.get(0), 0); + } + + BinaryLogClient client = createBinaryClient(mySqlSourceConfig.getDbzConfiguration()); if (mySqlSourceConfig.getServerIdRange() != null) { client.setServerId(mySqlSourceConfig.getServerIdRange().getStartServerId()); } - List binlogFiles = new ArrayList<>(); - JdbcConnection.ResultSetConsumer rsc = - rs -> { - while (rs.next()) { - String fileName = rs.getString(1); - long fileSize = rs.getLong(2); - if (fileSize > 0) { - binlogFiles.add(fileName); - } - } - }; + LOG.info("Start querying binlog files for timestamp {}", targetMs); try { - connection.query("SHOW BINARY LOGS", rsc); - LOG.info("Total search binlog: {}", binlogFiles); - - if (binlogFiles.isEmpty()) { - return BinlogOffset.ofBinlogFilePosition("", 0); - } - String binlogName = searchBinlogName(client, targetMs, binlogFiles); return BinlogOffset.ofBinlogFilePosition(binlogName, 0); } catch (Exception e) { @@ -279,14 +268,18 @@ public static BinlogOffset findBinlogOffset( } private static String searchBinlogName( - BinaryLogClient client, long targetMs, List binlogFiles) - throws IOException, InterruptedException { + BinaryLogClient client, long targetMs, List binlogFiles) throws IOException { int startIdx = 0; int endIdx = binlogFiles.size() - 1; while (startIdx <= endIdx) { int mid = startIdx + (endIdx - startIdx) / 2; long midTs = getBinlogTimestamp(client, binlogFiles.get(mid)); + if (midTs < 0) { + binlogFiles.remove(mid); + endIdx--; + continue; + } if (midTs < targetMs) { startIdx = mid + 1; } else if (targetMs < midTs) { @@ -296,11 +289,13 @@ private static String searchBinlogName( } } - return endIdx < 0 ? binlogFiles.get(0) : binlogFiles.get(endIdx); + return binlogFiles.isEmpty() + ? "" + : endIdx < 0 ? binlogFiles.get(0) : binlogFiles.get(endIdx); } private static long getBinlogTimestamp(BinaryLogClient client, String binlogFile) - throws IOException, InterruptedException { + throws IOException { ArrayBlockingQueue binlogTimestamps = new ArrayBlockingQueue<>(1); BinaryLogClient.EventListener eventListener = @@ -324,16 +319,34 @@ private static long getBinlogTimestamp(BinaryLogClient client, String binlogFile } }; - try { - client.registerEventListener(eventListener); - client.setBinlogFilename(binlogFile); - client.setBinlogPosition(0); + ArrayBlockingQueue exceptions = new ArrayBlockingQueue<>(1); + BinaryLogClient.LifecycleListener lifecycleListener = + new BinaryLogClient.AbstractLifecycleListener() { + @Override + public void onCommunicationFailure(BinaryLogClient client, Exception e) { + exceptions.add(e); + } + }; - LOG.info("begin parse binlog: {}", binlogFile); + client.registerEventListener(eventListener); + client.registerLifecycleListener(lifecycleListener); + client.setBinlogFilename(binlogFile); + client.setBinlogPosition(0); + + LOG.info("Start parsing binlog: {}", binlogFile); + try { client.connect(); } finally { + client.disconnect(); + client.unregisterLifecycleListener(lifecycleListener); client.unregisterEventListener(eventListener); } - return binlogTimestamps.take(); + + Exception exception = exceptions.peek(); + if (exception != null) { + throw new RuntimeException(exception); + } + Long ts = binlogTimestamps.peek(); + return ts == null ? -1L : ts; } }