Skip to content

Commit

Permalink
ZOOKEEPER-3448: Introduce MessageTracker to assist debug leader and l…
Browse files Browse the repository at this point in the history
…eaner connectivity issues.

We want to have better insight on the state of the world when learners lost connection with leader, so we need capture more information when that happens. We capture more information through MessageTracker which will record the last few sent and received messages at various protocol stage, and these information will be dumped to log files for further analysis.

Author: Michael Han <[email protected]>
Author: Michael Han <[email protected]>

Reviewers: Enrico Olivelli <[email protected]>, Fangmin Lyu <[email protected]>

Closes apache#1007 from hanm/twitter/2765eb0629d2f63f07d112270b582e8e931f734f
  • Loading branch information
hanm authored and eolivelli committed Aug 23, 2019
1 parent 942213d commit b5817fb
Show file tree
Hide file tree
Showing 9 changed files with 667 additions and 2 deletions.
20 changes: 20 additions & 0 deletions zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
Original file line number Diff line number Diff line change
Expand Up @@ -1494,6 +1494,26 @@ Both subsystems need to have sufficient amount of threads to achieve peak read t
minute. This prevents herding during container deletion.
Default is "10000".

<a name="sc_debug_observability_config"></a>

#### Debug Observability Configurations

**New in 3.6.0:** The following options are introduced to make zookeeper easier to debug.

* *zookeeper.messageTracker.BufferSize* :
(Java system property only)
Controls the maximum number of messages stored in **MessageTracker**. Value should be positive
integers. The default value is 10. **MessageTracker** is introduced in **3.6.0** to record the
last set of messages between a server (follower or observer) and a leader, when a server
disconnects with leader. These set of messages will then be dumped to zookeeper's log file,
and will help reconstruct the state of the servers at the time of the disconnection and
will be useful for debugging purpose.

* *zookeeper.messageTracker.Enabled* :
(Java system property only)
When set to "true", will enable **MessageTracker** to track and record messages. Default value
is "false".

<a name="sc_adminserver_config"></a>

#### AdminServer configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,16 @@ void followLeader() throws InterruptedException {
self.start_fle = 0;
self.end_fle = 0;
fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);

long connectionTime = 0;
boolean completedSync = false;

try {
self.setZabState(QuorumPeer.ZabState.DISCOVERY);
QuorumServer leaderServer = findLeader();
try {
connectToLeader(leaderServer.addr, leaderServer.hostname);
connectionTime = System.currentTimeMillis();
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
if (self.isReconfigStateChange()) {
throw new Exception("learned about role change");
Expand All @@ -99,6 +104,7 @@ void followLeader() throws InterruptedException {
self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
syncWithLeader(newEpochZxid);
self.setZabState(QuorumPeer.ZabState.BROADCAST);
completedSync = true;
} finally {
long syncTime = Time.currentElapsedTime() - startTime;
ServerMetrics.getMetrics().FOLLOWER_SYNC_TIME.add(syncTime);
Expand Down Expand Up @@ -129,6 +135,14 @@ void followLeader() throws InterruptedException {
om.stop();
}
zk.unregisterJMX(this);

if (connectionTime != 0) {
long connectionDuration = System.currentTimeMillis() - connectionTime;
LOG.info("Disconnected from leader (with address: {}). "
+ "Was connected for {}ms. Sync state: {}",
leaderAddr, connectionDuration, completedSync);
messageTracker.dumpToLog(leaderAddr.toString());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.zookeeper.server.ZooTrace;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.MessageTracker;
import org.apache.zookeeper.server.util.SerializeUtils;
import org.apache.zookeeper.server.util.ZxidUtils;
import org.apache.zookeeper.txn.SetDataTxn;
Expand Down Expand Up @@ -74,6 +75,7 @@ static class PacketInFlight {
protected BufferedOutputStream bufferedOutput;

protected Socket sock;
protected InetSocketAddress leaderAddr;

/**
* Socket getter
Expand All @@ -88,6 +90,9 @@ public Socket getSocket() {
/** the protocol version of the leader */
protected int leaderProtocolVersion = 0x01;

private static final int BUFFERED_MESSAGE_SIZE = 10;
protected final MessageTracker messageTracker = new MessageTracker(BUFFERED_MESSAGE_SIZE);

protected static final Logger LOG = LoggerFactory.getLogger(Learner.class);

/**
Expand Down Expand Up @@ -146,6 +151,7 @@ void validateSession(ServerCnxn cnxn, long clientId, int timeout) throws IOExcep
void writePacket(QuorumPacket pp, boolean flush) throws IOException {
synchronized (leaderOs) {
if (pp != null) {
messageTracker.trackSent(pp.getType());
leaderOs.writeRecord(pp, "packet");
}
if (flush) {
Expand All @@ -164,6 +170,7 @@ void writePacket(QuorumPacket pp, boolean flush) throws IOException {
void readPacket(QuorumPacket pp) throws IOException {
synchronized (leaderIs) {
leaderIs.readRecord(pp, "packet");
messageTracker.trackReceived(pp.getType());
}
long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
if (pp.getType() == Leader.PING) {
Expand Down Expand Up @@ -250,6 +257,7 @@ protected void sockConnect(Socket sock, InetSocketAddress addr, int timeout) thr
*/
protected void connectToLeader(InetSocketAddress addr, String hostname) throws IOException, InterruptedException, X509Exception {
this.sock = createSocket();
this.leaderAddr = addr;

// leader connection timeout defaults to tickTime * initLimit
int connectTimeout = self.tickTime * self.initLimit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.zookeeper.server.quorum.Leader.Proposal;
import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
import org.apache.zookeeper.server.util.MessageTracker;
import org.apache.zookeeper.server.util.SerializeUtils;
import org.apache.zookeeper.server.util.ZxidUtils;
import org.apache.zookeeper.txn.TxnHeader;
Expand Down Expand Up @@ -220,6 +221,8 @@ public boolean equals(Object o) {
private final BufferedInputStream bufferedInput;
private BufferedOutputStream bufferedOutput;

protected final MessageTracker messageTracker;

// for test only
protected void setOutputArchive(BinaryOutputArchive oa) {
this.oa = oa;
Expand Down Expand Up @@ -280,6 +283,8 @@ protected void setBufferedOutput(BufferedOutputStream bufferedOutput) {
}
throw new SaslException("Authentication failure: " + e.getMessage());
}

this.messageTracker = new MessageTracker(MessageTracker.BUFFERED_MESSAGE_SIZE);
}

@Override
Expand Down Expand Up @@ -349,6 +354,7 @@ private void sendPackets() throws InterruptedException {
}
oa.writeRecord(p, "packet");
packetsSent.incrementAndGet();
messageTracker.trackSent(p.getType());
} catch (IOException e) {
if (!sock.isClosed()) {
LOG.warn("Unexpected exception at " + this, e);
Expand Down Expand Up @@ -464,8 +470,11 @@ public void run() {

QuorumPacket qp = new QuorumPacket();
ia.readRecord(qp, "packet");

messageTracker.trackReceived(qp.getType());
if (qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO) {
LOG.error("First packet " + qp.toString() + " is not FOLLOWERINFO or OBSERVERINFO!");

return;
}

Expand Down Expand Up @@ -526,9 +535,11 @@ public void run() {
ByteBuffer.wrap(ver).putInt(0x10000);
QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
oa.writeRecord(newEpochPacket, "packet");
messageTracker.trackSent(Leader.LEADERINFO);
bufferedOutput.flush();
QuorumPacket ackEpochPacket = new QuorumPacket();
ia.readRecord(ackEpochPacket, "packet");
messageTracker.trackReceived(ackEpochPacket.getType());
if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
LOG.error(ackEpochPacket.toString() + " is not ACKEPOCH");
return;
Expand All @@ -554,6 +565,7 @@ public void run() {
try {
long zxidToSend = learnerMaster.getZKDatabase().getDataTreeLastProcessedZxid();
oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
messageTracker.trackSent(Leader.SNAP);
bufferedOutput.flush();

LOG.info("Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, "
Expand Down Expand Up @@ -600,6 +612,8 @@ public void run() {
*/
qp = new QuorumPacket();
ia.readRecord(qp, "packet");

messageTracker.trackReceived(qp.getType());
if (qp.getType() != Leader.ACK) {
LOG.error("Next packet was supposed to be an ACK," + " but received packet: {}", packetToString(qp));
return;
Expand Down Expand Up @@ -632,6 +646,7 @@ public void run() {
while (true) {
qp = new QuorumPacket();
ia.readRecord(qp, "packet");
messageTracker.trackReceived(qp.getType());

long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
if (qp.getType() == Leader.PING) {
Expand Down Expand Up @@ -716,7 +731,9 @@ public void run() {
syncThrottler.endSync();
syncThrottler = null;
}
LOG.warn("******* GOODBYE {} ********", getRemoteAddress());
String remoteAddr = getRemoteAddress();
LOG.warn("******* GOODBYE {} ********", remoteAddr);
messageTracker.dumpToLog(remoteAddr);
shutdown();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,14 @@ public String toString() {
*/
void observeLeader() throws Exception {
zk.registerJMX(new ObserverBean(this, zk), self.jmxLocalPeerBean);

long connectTime = 0;
boolean completedSync = false;
try {
self.setZabState(QuorumPeer.ZabState.DISCOVERY);
QuorumServer master = findLearnerMaster();
try {
connectToLeader(master.addr, master.hostname);
connectTime = System.currentTimeMillis();
long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
if (self.isReconfigStateChange()) {
throw new Exception("learned about role change");
Expand All @@ -112,6 +114,7 @@ void observeLeader() throws Exception {
self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
syncWithLeader(newLeaderZxid);
self.setZabState(QuorumPeer.ZabState.BROADCAST);
completedSync = true;
QuorumPacket qp = new QuorumPacket();
while (this.isRunning() && nextLearnerMaster.get() == null) {
readPacket(qp);
Expand All @@ -127,6 +130,14 @@ void observeLeader() throws Exception {
} finally {
currentLearnerMaster = null;
zk.unregisterJMX(this);
if (connectTime != 0) {
long connectionDuration = System.currentTimeMillis() - connectTime;

LOG.info("Disconnected from leader (with address: {}). "
+ "Was connected for {}ms. Sync state: {}",
leaderAddr, connectionDuration, completedSync);
messageTracker.dumpToLog(leaderAddr.toString());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zookeeper.server.util;

import java.lang.reflect.Array;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Thread safe FIFO CircularBuffer implementation.
* When the buffer is full write operation overwrites the oldest element.
*
* Fun thing @todo, make this lock free as this is called on every quorum message
*/
public class CircularBuffer<T> {

private final T[] buffer;
private final int capacity;
private int oldest;
private AtomicInteger numberOfElements = new AtomicInteger();

@SuppressWarnings("unchecked")
public CircularBuffer(Class<T> clazz, int capacity) {
if (capacity <= 0) {
throw new IllegalArgumentException("CircularBuffer capacity should be greater than 0");
}
this.buffer = (T[]) Array.newInstance(clazz, capacity);
this.capacity = capacity;
}

/**
* Puts elements in the next available index in the array.
* If the array is full the oldest element is replaced with
* the new value.
* @param element
*/
public synchronized void write(T element) {
int newSize = numberOfElements.incrementAndGet();
if (newSize > capacity) {
buffer[oldest] = element;
oldest = ++oldest % capacity;
numberOfElements.decrementAndGet();
} else {
int index = (oldest + numberOfElements.get() - 1) % capacity;
buffer[index] = element;
}
}

/**
* Reads from the buffer in a FIFO manner.
* Returns the oldest element in the buffer if the buffer ie not empty
* Returns null if the buffer is empty
* @return
*/
public synchronized T take() {
int newSize = numberOfElements.decrementAndGet();
if (newSize < 0) {
numberOfElements.incrementAndGet();
return null;
}
T polled = buffer[oldest];
oldest = ++oldest % capacity;
return polled;
}

public synchronized T peek() {
if (numberOfElements.get() <= 0) {
return null;
}
return buffer[oldest];
}

public int size() {
return numberOfElements.get();
}

public boolean isEmpty() {
return numberOfElements.get() <= 0;
}

public boolean isFull() {
return numberOfElements.get() >= capacity;
}

public synchronized void reset() {
numberOfElements.set(0);
}
}
Loading

0 comments on commit b5817fb

Please sign in to comment.