Skip to content

Commit

Permalink
[mysql] Use the gtid set firstly when compare the binlog offset (apac…
Browse files Browse the repository at this point in the history
…he#761)

Co-authored-by: camelusluo <[email protected]>
  • Loading branch information
camelus0211 and camelusluo authored Apr 22, 2022
1 parent 7efdc67 commit c0aca57
Showing 1 changed file with 54 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.mysql.GtidSet;
import io.debezium.connector.mysql.MySqlChangeEventSourceMetricsFactory;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.connector.mysql.MySqlConnectorConfig;
Expand Down Expand Up @@ -191,6 +192,59 @@ private MySqlOffsetContext loadStartingOffsetState(
}

private boolean isBinlogAvailable(MySqlOffsetContext offset) {
String gtidStr = offset.gtidSet();
if (gtidStr != null) {
return checkGtidSet(offset);
}

return checkBinlogFilename(offset);
}

private boolean checkGtidSet(MySqlOffsetContext offset) {
String gtidStr = offset.gtidSet();

if (gtidStr.trim().isEmpty()) {
return true; // start at beginning ...
}

String availableGtidStr = connection.knownGtidSet();
if (availableGtidStr == null || availableGtidStr.trim().isEmpty()) {
// Last offsets had GTIDs but the server does not use them ...
LOG.warn(
"Connector used GTIDs previously, but MySQL does not know of any GTIDs or they are not enabled");
return false;
}
// GTIDs are enabled
GtidSet gtidSet = new GtidSet(gtidStr);
// Get the GTID set that is available in the server ...
GtidSet availableGtidSet = new GtidSet(availableGtidStr);
if (gtidSet.isContainedWithin(availableGtidSet)) {
LOG.info(
"MySQL current GTID set {} does contain the GTID set {} required by the connector.",
availableGtidSet,
gtidSet);
// The replication is concept of mysql master-slave replication protocol ...
final GtidSet gtidSetToReplicate =
connection.subtractGtidSet(availableGtidSet, gtidSet);
final GtidSet purgedGtidSet = connection.purgedGtidSet();
LOG.info("Server has already purged {} GTIDs", purgedGtidSet);
final GtidSet nonPurgedGtidSetToReplicate =
connection.subtractGtidSet(gtidSetToReplicate, purgedGtidSet);
LOG.info(
"GTID set {} known by the server but not processed yet, for replication are available only GTID set {}",
gtidSetToReplicate,
nonPurgedGtidSetToReplicate);
if (!gtidSetToReplicate.equals(nonPurgedGtidSetToReplicate)) {
LOG.warn("Some of the GTIDs needed to replicate have been already purged");
return false;
}
return true;
}
LOG.info("Connector last known GTIDs are {}, but MySQL has {}", gtidSet, availableGtidSet);
return false;
}

private boolean checkBinlogFilename(MySqlOffsetContext offset) {
String binlogFilename = offset.getSourceInfo().getString(BINLOG_FILENAME_OFFSET_KEY);
if (binlogFilename == null) {
return true; // start at current position
Expand Down

0 comments on commit c0aca57

Please sign in to comment.