Skip to content

Commit

Permalink
[FLINK-36024] Handle BinaryLogClient exceptions when searching for th…
Browse files Browse the repository at this point in the history
…e starting binlog offset
  • Loading branch information
whhe committed Dec 2, 2024
1 parent 3fd4cb6 commit 4fa4fb6
Showing 1 changed file with 46 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@

import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -244,33 +243,23 @@ private static Map<String, String> querySystemVariables(

public static BinlogOffset findBinlogOffset(
long targetMs, MySqlConnection connection, MySqlSourceConfig mySqlSourceConfig) {
MySqlConnection.MySqlConnectionConfiguration config = connection.connectionConfig();
BinaryLogClient client =
new BinaryLogClient(
config.hostname(), config.port(), config.username(), config.password());
List<String> binlogFiles = connection.availableBinlogFiles();
LOG.info("Available binlog files: {}", binlogFiles);

if (binlogFiles.isEmpty()) {
return BinlogOffset.ofBinlogFilePosition("", 0);
}
if (binlogFiles.size() == 1) {
return BinlogOffset.ofBinlogFilePosition(binlogFiles.get(0), 0);
}

BinaryLogClient client = createBinaryClient(mySqlSourceConfig.getDbzConfiguration());
if (mySqlSourceConfig.getServerIdRange() != null) {
client.setServerId(mySqlSourceConfig.getServerIdRange().getStartServerId());
}
List<String> binlogFiles = new ArrayList<>();
JdbcConnection.ResultSetConsumer rsc =
rs -> {
while (rs.next()) {
String fileName = rs.getString(1);
long fileSize = rs.getLong(2);
if (fileSize > 0) {
binlogFiles.add(fileName);
}
}
};

LOG.info("Start querying binlog files for timestamp {}", targetMs);
try {
connection.query("SHOW BINARY LOGS", rsc);
LOG.info("Total search binlog: {}", binlogFiles);

if (binlogFiles.isEmpty()) {
return BinlogOffset.ofBinlogFilePosition("", 0);
}

String binlogName = searchBinlogName(client, targetMs, binlogFiles);
return BinlogOffset.ofBinlogFilePosition(binlogName, 0);
} catch (Exception e) {
Expand All @@ -279,14 +268,18 @@ public static BinlogOffset findBinlogOffset(
}

private static String searchBinlogName(
BinaryLogClient client, long targetMs, List<String> binlogFiles)
throws IOException, InterruptedException {
BinaryLogClient client, long targetMs, List<String> binlogFiles) throws IOException {
int startIdx = 0;
int endIdx = binlogFiles.size() - 1;

while (startIdx <= endIdx) {
int mid = startIdx + (endIdx - startIdx) / 2;
long midTs = getBinlogTimestamp(client, binlogFiles.get(mid));
if (midTs < 0) {
binlogFiles.remove(mid);
endIdx--;
continue;
}
if (midTs < targetMs) {
startIdx = mid + 1;
} else if (targetMs < midTs) {
Expand All @@ -296,11 +289,13 @@ private static String searchBinlogName(
}
}

return endIdx < 0 ? binlogFiles.get(0) : binlogFiles.get(endIdx);
return binlogFiles.isEmpty()
? ""
: endIdx < 0 ? binlogFiles.get(0) : binlogFiles.get(endIdx);
}

private static long getBinlogTimestamp(BinaryLogClient client, String binlogFile)
throws IOException, InterruptedException {
throws IOException {

ArrayBlockingQueue<Long> binlogTimestamps = new ArrayBlockingQueue<>(1);
BinaryLogClient.EventListener eventListener =
Expand All @@ -324,16 +319,34 @@ private static long getBinlogTimestamp(BinaryLogClient client, String binlogFile
}
};

try {
client.registerEventListener(eventListener);
client.setBinlogFilename(binlogFile);
client.setBinlogPosition(0);
ArrayBlockingQueue<Exception> exceptions = new ArrayBlockingQueue<>(1);
BinaryLogClient.LifecycleListener lifecycleListener =
new BinaryLogClient.AbstractLifecycleListener() {
@Override
public void onCommunicationFailure(BinaryLogClient client, Exception e) {
exceptions.add(e);
}
};

LOG.info("begin parse binlog: {}", binlogFile);
client.registerEventListener(eventListener);
client.registerLifecycleListener(lifecycleListener);
client.setBinlogFilename(binlogFile);
client.setBinlogPosition(0);

LOG.info("Start parsing binlog: {}", binlogFile);
try {
client.connect();
} finally {
client.disconnect();
client.unregisterLifecycleListener(lifecycleListener);
client.unregisterEventListener(eventListener);
}
return binlogTimestamps.take();

Exception exception = exceptions.peek();
if (exception != null) {
throw new RuntimeException(exception);
}
Long ts = binlogTimestamps.peek();
return ts == null ? -1L : ts;
}
}

0 comments on commit 4fa4fb6

Please sign in to comment.