Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BinlogConnectorReplicator: add heartbeat detection [MELINF-2251] #1643

Merged
merged 6 commits into from
Feb 5, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class BinlogConnectorEventListener implements BinaryLogClient.EventListener {
private final BinaryLogClient client;
private final MaxwellOutputConfig outputConfig;
private long replicationLag;
private long lastEventSeenAt;
private String gtid;

public BinlogConnectorEventListener(
Expand All @@ -36,6 +37,7 @@ public BinlogConnectorEventListener(
this.queue = q;
this.queueTimer = metrics.getRegistry().timer(metrics.metricName("replication", "queue", "time"));
this.outputConfig = outputConfig;
this.lastEventSeenAt = System.currentTimeMillis();

final BinlogConnectorEventListener self = this;
metrics.register(metrics.metricName("replication", "lag"), (Gauge<Long>) () -> self.replicationLag);
Expand All @@ -45,9 +47,14 @@ public void stop() {
mustStop.set(true);
}

public long getLastEventSeenAt() {
return this.lastEventSeenAt;
}

@Override
public void onEvent(Event event) {
long eventSeenAt = 0;
long eventSeenAt = System.currentTimeMillis();
this.lastEventSeenAt = eventSeenAt;
boolean trackMetrics = false;

if (event.getHeader().getEventType() == EventType.GTID) {
Expand All @@ -58,7 +65,6 @@ public void onEvent(Event event) {

if (ep.isCommitEvent()) {
trackMetrics = true;
eventSeenAt = System.currentTimeMillis();
replicationLag = eventSeenAt - event.getHeader().getTimestamp();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,14 @@ public class BinlogConnectorReplicator extends RunLoopProcess implements Replica
private static final long MAX_TX_ELEMENTS = 10000;
public static final int BAD_BINLOG_ERROR_CODE = 1236;
public static final int ACCESS_DENIED_ERROR_CODE = 1227;
public static final int HEARTBEAT_FAILURE_ERROR_CODE = 1623;

private final String clientID;
private final String maxwellSchemaDatabaseName;

protected final BinaryLogClient client;
private final long heartbeatInterval = 5000L; // TODO configure
private final float heartbeatIntervalAllowance = 3.0f; // TODO configure, find a better name?
osheroff marked this conversation as resolved.
Show resolved Hide resolved
private BinlogConnectorEventListener binlogEventListener;
private final LinkedBlockingDeque<BinlogConnectorEvent> queue = new LinkedBlockingDeque<>(20);
private final TableCache tableCache;
Expand Down Expand Up @@ -147,11 +150,8 @@ public BinlogConnectorReplicator(
which triggers mysql to jump ahead a binlog.
At some point I presume shyko will fix it and we can remove this.
*/

/*
the logic for this must be applied to all scenarios, as otherwise we will have competing keepalive logic
*/
this.client.setKeepAlive(false);
this.client.setHeartbeatInterval(this.heartbeatInterval);

EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setCompatibilityMode(
Expand Down Expand Up @@ -249,6 +249,16 @@ private void checkCommErrors() throws ServerException {
if (lastCommError != null) {
throw lastCommError;
}
long lastEventSeenAt = this.binlogEventListener.getLastEventSeenAt();
long lastEventAge = System.currentTimeMillis() - lastEventSeenAt;
long maxAllowedEventAge = Math.round(this.heartbeatInterval * this.heartbeatIntervalAllowance);
if (lastEventAge > this.heartbeatInterval * this.heartbeatIntervalAllowance) {
throw new ServerException(
"Last binlog event seen " + lastEventAge + "ms ago, exceeding " + maxAllowedEventAge + "ms allowance " +
"(" + this.heartbeatInterval + " * " + this.heartbeatIntervalAllowance + ")",
HEARTBEAT_FAILURE_ERROR_CODE,
"HY000");
}
osheroff marked this conversation as resolved.
Show resolved Hide resolved
}

private boolean shouldSkipRow(RowMap row) throws IOException {
Expand Down