Skip to content

Commit

Permalink
ZOOKEEPER-3769: handling malformed Leader Election notification messages
Browse files Browse the repository at this point in the history
Using ZooKeeper with JDK 12.0.2 on CentOS 7 when the current leader is killed, we saw a few times that some partial Leader Election notification (vote) messages were delivered to the other ZooKeeper servers. The malformed / partial messages are causing different exceptions in the WorkerReceiver thread of FastLeaderElection which were not handled before. This was leading to the death of the WorkerReceiver thread, which caused that the given ZooKeeper Server was unable to receive leader election messages anymore and was not able to re-join to any quorum until it got restarted.

In the proposed fix I created unit tests to simulate certain error cases with regards to partial leader election messages, and fixed the error handling in FastLeaderElection.

Author: Mate Szalay-Beko <[email protected]>

Reviewers: Enrico Olivelli <[email protected]>, Norbert Kalmar <[email protected]>

Closes apache#1300 from symat/ZOOKEEPER-3769-master
  • Loading branch information
symat authored and RokLenarcic committed Aug 31, 2022
1 parent ecd6219 commit c6fc824
Show file tree
Hide file tree
Showing 2 changed files with 313 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.zookeeper.server.quorum;

import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -237,19 +238,21 @@ public void run() {
continue;
}

final int capacity = response.buffer.capacity();

// The current protocol and two previous generations all send at least 28 bytes
if (response.buffer.capacity() < 28) {
LOG.error("Got a short response: {}", response.buffer.capacity());
if (capacity < 28) {
LOG.error("Got a short response from server {}: {}", response.sid, capacity);
continue;
}

// this is the backwardCompatibility mode in place before ZK-107
// It is for a version of the protocol in which we didn't send peer epoch
// With peer epoch and version the message became 40 bytes
boolean backCompatibility28 = (response.buffer.capacity() == 28);
boolean backCompatibility28 = (capacity == 28);

// this is the backwardCompatibility mode for no version information
boolean backCompatibility40 = (response.buffer.capacity() == 40);
boolean backCompatibility40 = (capacity == 40);

response.buffer.clear();

Expand All @@ -263,64 +266,74 @@ public void run() {
long rpeerepoch;

int version = 0x0;
if (!backCompatibility28) {
rpeerepoch = response.buffer.getLong();
if (!backCompatibility40) {
/*
* Version added in 3.4.6
*/
QuorumVerifier rqv = null;

version = response.buffer.getInt();
try {
if (!backCompatibility28) {
rpeerepoch = response.buffer.getLong();
if (!backCompatibility40) {
/*
* Version added in 3.4.6
*/

version = response.buffer.getInt();
} else {
LOG.info("Backward compatibility mode (36 bits), server id: {}", response.sid);
}
} else {
LOG.info("Backward compatibility mode (36 bits), server id: {}", response.sid);
LOG.info("Backward compatibility mode (28 bits), server id: {}", response.sid);
rpeerepoch = ZxidUtils.getEpochFromZxid(rzxid);
}
} else {
LOG.info("Backward compatibility mode (28 bits), server id: {}", response.sid);
rpeerepoch = ZxidUtils.getEpochFromZxid(rzxid);
}

QuorumVerifier rqv = null;
// check if we have a version that includes config. If so extract config info from message.
if (version > 0x1) {
int configLength = response.buffer.getInt();

// we want to avoid errors caused by the allocation of a byte array with negative length
// (causing NegativeArraySizeException) or huge length (causing e.g. OutOfMemoryError)
if (configLength < 0 || configLength > capacity) {
throw new IOException(String.format("Invalid configLength in notification message! sid=%d, capacity=%d, version=%d, configLength=%d",
response.sid, capacity, version, configLength));
}

// check if we have a version that includes config. If so extract config info from message.
if (version > 0x1) {
int configLength = response.buffer.getInt();
byte[] b = new byte[configLength];

response.buffer.get(b);

synchronized (self) {
try {
rqv = self.configFromString(new String(b));
QuorumVerifier curQV = self.getQuorumVerifier();
if (rqv.getVersion() > curQV.getVersion()) {
LOG.info("{} Received version: {} my version: {}",
self.getId(),
Long.toHexString(rqv.getVersion()),
Long.toHexString(self.getQuorumVerifier().getVersion()));
if (self.getPeerState() == ServerState.LOOKING) {
LOG.debug("Invoking processReconfig(), state: {}", self.getServerState());
self.processReconfig(rqv, null, null, false);
if (!rqv.equals(curQV)) {
LOG.info("restarting leader election");
self.shuttingDownLE = true;
self.getElectionAlg().shutdown();

break;
byte[] b = new byte[configLength];
response.buffer.get(b);

synchronized (self) {
try {
rqv = self.configFromString(new String(b));
QuorumVerifier curQV = self.getQuorumVerifier();
if (rqv.getVersion() > curQV.getVersion()) {
LOG.info("{} Received version: {} my version: {}",
self.getId(),
Long.toHexString(rqv.getVersion()),
Long.toHexString(self.getQuorumVerifier().getVersion()));
if (self.getPeerState() == ServerState.LOOKING) {
LOG.debug("Invoking processReconfig(), state: {}", self.getServerState());
self.processReconfig(rqv, null, null, false);
if (!rqv.equals(curQV)) {
LOG.info("restarting leader election");
self.shuttingDownLE = true;
self.getElectionAlg().shutdown();

break;
}
} else {
LOG.debug("Skip processReconfig(), state: {}", self.getServerState());
}
} else {
LOG.debug("Skip processReconfig(), state: {}", self.getServerState());
}
} catch (IOException | ConfigException e) {
LOG.error("Something went wrong while processing config received from {}", response.sid);
}
} catch (IOException e) {
LOG.error("Something went wrong while processing config received from {}", response.sid);
} catch (ConfigException e) {
LOG.error("Something went wrong while processing config received from {}", response.sid);
}
} else {
LOG.info("Backward compatibility mode (before reconfig), server id: {}", response.sid);
}
} else {
LOG.info("Backward compatibility mode (before reconfig), server id: {}", response.sid);
} catch (BufferUnderflowException | IOException e) {
LOG.warn("Skipping the processing of a partial / malformed response message sent by sid={} (message length: {})",
response.sid, capacity, e);
continue;
}

/*
* If it is from a non-voting server (such as an observer or
* a non-voting follower), respond right away.
Expand Down
Loading

0 comments on commit c6fc824

Please sign in to comment.