diff --git a/docs/docs/config.md b/docs/docs/config.md index 8027d3725..fdc549d51 100644 --- a/docs/docs/config.md +++ b/docs/docs/config.md @@ -30,6 +30,7 @@ master_recovery | BOOLEAN | enable experimental mast gtid_mode | BOOLEAN | enable GTID-based replication | false recapture_schema | BOOLEAN | recapture the latest schema. Not available in config.properties. | false max_schemas | LONG | how many schema deltas to keep before triggering compaction operation | unlimited +binlog_heartbeat | BOOLEAN | enable binlog heartbeats to detect stale connections | DISABLED   replication_host | STRING | server to replicate from. See [split server roles](#split-server-roles) | *schema-store host* replication_password | STRING | password on replication server | (none) diff --git a/src/main/java/com/zendesk/maxwell/MaxwellConfig.java b/src/main/java/com/zendesk/maxwell/MaxwellConfig.java index 633abfbe3..780181039 100644 --- a/src/main/java/com/zendesk/maxwell/MaxwellConfig.java +++ b/src/main/java/com/zendesk/maxwell/MaxwellConfig.java @@ -193,6 +193,8 @@ protected MaxwellOptionParser buildOptionParser() { .withRequiredArg(); parser.accepts( "password", "password for host" ) .withRequiredArg(); + parser.accepts( "binlog_heartbeat", "enable binlog replication heartbeats, default false" ) + .withOptionalArg().ofType(Boolean.class); parser.section("mysql"); @@ -892,7 +894,8 @@ public void validate() { null, this.maxwellMysql.user, this.maxwellMysql.password, - this.maxwellMysql.sslMode + this.maxwellMysql.sslMode, + this.maxwellMysql.enableHeartbeat ); this.replicationMysql.jdbcOptions = this.maxwellMysql.jdbcOptions; diff --git a/src/main/java/com/zendesk/maxwell/MaxwellMysqlConfig.java b/src/main/java/com/zendesk/maxwell/MaxwellMysqlConfig.java index 3f3539f73..c3a275cc7 100644 --- a/src/main/java/com/zendesk/maxwell/MaxwellMysqlConfig.java +++ b/src/main/java/com/zendesk/maxwell/MaxwellMysqlConfig.java @@ -23,6 +23,7 @@ public class MaxwellMysqlConfig { public String user; public String password; public SSLMode sslMode; + public boolean enableHeartbeat; public Map jdbcOptions = new HashMap<>(); public Integer connectTimeoutMS = 5000; @@ -33,6 +34,7 @@ public MaxwellMysqlConfig() { this.user = null; this.password = null; this.sslMode = null; + this.enableHeartbeat = false; this.jdbcOptions = new HashMap<>(); this.jdbcOptions.put("zeroDateTimeBehavior", "convertToNull"); @@ -41,7 +43,7 @@ public MaxwellMysqlConfig() { } public MaxwellMysqlConfig(String host, Integer port, String database, String user, String password, - SSLMode sslMode) { + SSLMode sslMode, boolean enableHeartbeat) { this(); this.host = host; this.port = port; @@ -49,6 +51,7 @@ public MaxwellMysqlConfig(String host, Integer port, String database, String use this.user = user; this.password = password; this.sslMode = sslMode; + this.enableHeartbeat = enableHeartbeat; } public MaxwellMysqlConfig(MaxwellMysqlConfig c) { diff --git a/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorLivenessMonitor.java b/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorLivenessMonitor.java new file mode 100644 index 000000000..fe51b954d --- /dev/null +++ b/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorLivenessMonitor.java @@ -0,0 +1,60 @@ +package com.zendesk.maxwell.replication; + +import com.github.shyiko.mysql.binlog.BinaryLogClient; +import com.github.shyiko.mysql.binlog.event.Event; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BinlogConnectorLivenessMonitor implements BinaryLogClient.EventListener, BinaryLogClient.LifecycleListener { + private static final Logger LOGGER = LoggerFactory.getLogger(BinlogConnectorLivenessMonitor.class); + + private static long heartbeatInterval = 5000L; + private static final float heartbeatIntervalAllowance = 5.0f; + + private final BinaryLogClient client; + private long lastEventSeenAt; + + public BinlogConnectorLivenessMonitor(BinaryLogClient client) { + this.client = client; + this.client.setHeartbeatInterval(heartbeatInterval); + this.reset(); + } + + private void reset() { + this.lastEventSeenAt = System.currentTimeMillis(); + } + + public boolean isAlive() { + long lastEventAge = System.currentTimeMillis() - lastEventSeenAt; + long maxAllowedEventAge = Math.round(heartbeatInterval * heartbeatIntervalAllowance); + boolean alive = lastEventAge <= maxAllowedEventAge; + if (!alive) { + LOGGER.warn( + "Last binlog event seen " + lastEventAge + "ms ago, exceeding " + maxAllowedEventAge + "ms allowance " + + "(" + heartbeatInterval + " * " + heartbeatIntervalAllowance + ")"); + } + return alive; + } + + @Override + public void onEvent(Event event) { + this.reset(); + } + + @Override + public void onConnect(BinaryLogClient binaryLogClient) { + this.reset(); + } + + @Override + public void onCommunicationFailure(BinaryLogClient binaryLogClient, Exception e) { + } + + @Override + public void onEventDeserializationFailure(BinaryLogClient binaryLogClient, Exception e) { + } + + @Override + public void onDisconnect(BinaryLogClient binaryLogClient) { + } +} diff --git a/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorReplicator.java b/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorReplicator.java index ffc87a3b1..47ff2539c 100644 --- a/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorReplicator.java +++ b/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorReplicator.java @@ -47,6 +47,7 @@ public class BinlogConnectorReplicator extends RunLoopProcess implements Replica protected final BinaryLogClient client; private BinlogConnectorEventListener binlogEventListener; + private BinlogConnectorLivenessMonitor binlogLivenessMonitor; private final LinkedBlockingDeque queue = new LinkedBlockingDeque<>(20); private final TableCache tableCache; private final Scripting scripting; @@ -147,11 +148,12 @@ public BinlogConnectorReplicator( which triggers mysql to jump ahead a binlog. At some point I presume shyko will fix it and we can remove this. */ - - /* - the logic for this must be applied to all scenarios, as otherwise we will have competing keepalive logic - */ this.client.setKeepAlive(false); + if (mysqlConfig.enableHeartbeat) { + this.binlogLivenessMonitor = new BinlogConnectorLivenessMonitor(client); + this.client.registerLifecycleListener(this.binlogLivenessMonitor); + this.client.registerEventListener(this.binlogLivenessMonitor); + } EventDeserializer eventDeserializer = new EventDeserializer(); eventDeserializer.setCompatibilityMode( @@ -251,6 +253,18 @@ private void checkCommErrors() throws ServerException { } } + /** + * Returns true if connected with a recent event. + * If binlog heartbeats are disabled, just returns + * whether there is a connection. + */ + private boolean isConnectionAlive() { + if (!isConnected) { + return false; + } + return this.binlogLivenessMonitor == null || binlogLivenessMonitor.isAlive(); + } + private boolean shouldSkipRow(RowMap row) throws IOException { if ( isMaxwellRow(row) && !isBootstrapInsert(row)) return true; @@ -407,7 +421,12 @@ private boolean isBootstrapInsert(RowMap row) { private void ensureReplicatorThread() throws Exception { checkCommErrors(); - if ( !this.isConnected && !stopOnEOF ) { + if (!this.isConnected && stopOnEOF) { + // reached EOF, nothing to do + return; + } + if (!this.isConnectionAlive()) { + client.disconnect(); if (this.gtidPositioning) { // When using gtid positioning, reconnecting should take us to the top // of the gtid event. We throw away any binlog position we have diff --git a/src/main/java/com/zendesk/maxwell/util/AbstractConfig.java b/src/main/java/com/zendesk/maxwell/util/AbstractConfig.java index 909c048da..2dcf9c085 100644 --- a/src/main/java/com/zendesk/maxwell/util/AbstractConfig.java +++ b/src/main/java/com/zendesk/maxwell/util/AbstractConfig.java @@ -186,6 +186,9 @@ protected MaxwellMysqlConfig parseMysqlConfig(String prefix, OptionSet options, config.sslMode = this.getSslModeFromString(fetchStringOption(prefix + "ssl", options, properties, null)); config.setJDBCOptions( fetchStringOption(prefix + "jdbc_options", options, properties, null)); + + // binlog_heartbeat isn't prefixed, as it only affects replication + config.enableHeartbeat = fetchBooleanOption("binlog_heartbeat", options, properties, config.enableHeartbeat); return config; }