Skip to content

Commit

Permalink
ZOOKEEPER-2678: Discovery and Sync can take a very long time on large…
Browse files Browse the repository at this point in the history
… DBs (master)

This is the master version of apache#157

Author: Robert (Bobby) Evans <[email protected]>

Reviewers: Flavio Junqueira <[email protected]>, Edward Ribeiro <[email protected]>, Abraham Fine <[email protected]>, Michael Han <[email protected]>

Closes apache#159 from revans2/ZOOKEEPER-2678-master and squashes the following commits:

69fbe19 [Robert (Bobby) Evans] ZOOKEEPER-2678: Addressed review comments
a432642 [Robert (Bobby) Evans] ZOOKEEPER-2678:  Improved test to verify snapshot times
742367e [Robert (Bobby) Evans] Addressed review comments
f4c5b0e [Robert (Bobby) Evans] ZOOKEEPER-2678: Discovery and Sync can take a very long time on large DBs
  • Loading branch information
Robert (Bobby) Evans authored and hanm committed Feb 16, 2017
1 parent f07a836 commit c164ee4
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 17 deletions.
18 changes: 16 additions & 2 deletions src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,15 @@ public boolean isRunning() {
return state == State.RUNNING;
}

public synchronized void shutdown() {
public void shutdown() {
shutdown(false);
}

/**
* Shut down the server instance
* @param fullyShutDown true if another server using the same database will not replace this one in the same process
*/
public synchronized void shutdown(boolean fullyShutDown) {
if (!canShutdown()) {
LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!");
return;
Expand All @@ -543,9 +551,15 @@ public synchronized void shutdown() {
if (firstProcessor != null) {
firstProcessor.shutdown();
}
if (zkDb != null) {

if (fullyShutDown && zkDb != null) {
zkDb.clear();
}
// else there is no need to clear the database
// * When a new quorum is established we can still apply the diff
// on top of the same zkDb data
// * If we fetch a new snapshot from leader, the zkDb will be
// cleared anyway before loading the snapshot

unregisterJMX();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public void runFromConfig(ServerConfig config)
secureCnxnFactory.join();
}
if (zkServer.canShutdown()) {
zkServer.shutdown();
zkServer.shutdown(true);
}
} catch (InterruptedException e) {
// warn, but generally this is ok
Expand Down
37 changes: 24 additions & 13 deletions src/java/main/org/apache/zookeeper/server/quorum/Learner.java
Original file line number Diff line number Diff line change
Expand Up @@ -358,12 +358,16 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception{

QuorumVerifier newLeaderQV = null;

readPacket(qp);
// In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot
// For SNAP and TRUNC the snapshot is needed to save that history
boolean snapshotNeeded = true;
readPacket(qp);
LinkedList<Long> packetsCommitted = new LinkedList<Long>();
LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
synchronized (zk) {
if (qp.getType() == Leader.DIFF) {
LOG.info("Getting a diff from the leader 0x" + Long.toHexString(qp.getZxid()));
LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
snapshotNeeded = false;
}
else if (qp.getType() == Leader.SNAP) {
LOG.info("Getting a snapshot from leader");
Expand Down Expand Up @@ -400,10 +404,13 @@ else if (qp.getType() == Leader.SNAP) {

long lastQueued = 0;

// in V1.0 we take a snapshot when we get the NEWLEADER message, but in pre V1.0
// we take the snapshot at the UPDATE, since V1.0 also gets the UPDATE (after the NEWLEADER)
// in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0
// we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE (after the NEWLEADER)
// we need to make sure that we don't take the snapshot twice.
boolean snapshotTaken = false;
boolean isPreZAB1_0 = true;
//If we are not going to take the snapshot be sure the transactions are not applied in memory
// but written out to the transaction log
boolean writeToTxnLog = !snapshotNeeded;
// we are now going to start getting transactions to apply followed by an UPTODATE
outerLoop:
while (self.isRunning()) {
Expand Down Expand Up @@ -440,7 +447,7 @@ else if (qp.getType() == Leader.SNAP) {
throw new Exception("changes proposed in reconfig");
}
}
if (!snapshotTaken) {
if (!writeToTxnLog) {
if (pif.hdr.getZxid() != qp.getZxid()) {
LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
} else {
Expand Down Expand Up @@ -479,8 +486,7 @@ else if (qp.getType() == Leader.SNAP) {
}
lastQueued = packet.hdr.getZxid();
}

if (!snapshotTaken) {
if (!writeToTxnLog) {
// Apply to db directly if we haven't taken the snapshot
zk.processTxn(packet.hdr, packet.rec);
} else {
Expand All @@ -498,14 +504,15 @@ else if (qp.getType() == Leader.SNAP) {
throw new Exception("changes proposed in reconfig");
}
}
if (!snapshotTaken) { // true for the pre v1.0 case
zk.takeSnapshot();
if (isPreZAB1_0) {
zk.takeSnapshot();
self.setCurrentEpoch(newEpoch);
}
self.setZooKeeperServer(zk);
self.adminServer.setZooKeeperServer(zk);
break outerLoop;
case Leader.NEWLEADER: // it will be NEWLEADER in v1.0
case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery
// means this is Zab 1.0
LOG.info("Learner received NEWLEADER message");
if (qp.getData()!=null && qp.getData().length > 1) {
try {
Expand All @@ -516,10 +523,14 @@ else if (qp.getType() == Leader.SNAP) {
e.printStackTrace();
}
}

if (snapshotNeeded) {
zk.takeSnapshot();
}

zk.takeSnapshot();
self.setCurrentEpoch(newEpoch);
snapshotTaken = true;
writeToTxnLog = true; //Anything after this needs to go to the transaction log, not applied directly in memory
isPreZAB1_0 = false;
writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
break;
}
Expand Down
29 changes: 28 additions & 1 deletion src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
package org.apache.zookeeper.server.quorum;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
Expand Down Expand Up @@ -651,6 +654,8 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa,
tmpDir.mkdir();
File logDir = f.fzk.getTxnLogFactory().getDataDir().getParentFile();
File snapDir = f.fzk.getTxnLogFactory().getSnapDir().getParentFile();
//Spy on ZK so we can check if a snapshot happened or not.
f.zk = spy(f.zk);
try {
Assert.assertEquals(0, f.self.getAcceptedEpoch());
Assert.assertEquals(0, f.self.getCurrentEpoch());
Expand Down Expand Up @@ -693,6 +698,10 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa,
oa.writeRecord(qp, null);
zkDb.serializeSnapshot(oa);
oa.writeString("BenWasHere", null);
Thread.sleep(10); //Give it some time to process the snap
//No Snapshot taken yet, the SNAP was applied in memory
verify(f.zk, never()).takeSnapshot();

qp.setType(Leader.NEWLEADER);
qp.setZxid(ZxidUtils.makeZxid(1, 0));
oa.writeRecord(qp, null);
Expand All @@ -703,7 +712,8 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa,
Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
Assert.assertEquals(1, f.self.getAcceptedEpoch());
Assert.assertEquals(1, f.self.getCurrentEpoch());

//Make sure that we did take the snapshot now
verify(f.zk).takeSnapshot();
Assert.assertEquals(firstZxid, f.fzk.getLastProcessedZxid());

// Make sure the data was recorded in the filesystem ok
Expand Down Expand Up @@ -779,6 +789,8 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa,
tmpDir.mkdir();
File logDir = f.fzk.getTxnLogFactory().getDataDir().getParentFile();
File snapDir = f.fzk.getTxnLogFactory().getSnapDir().getParentFile();
//Spy on ZK so we can check if a snapshot happened or not.
f.zk = spy(f.zk);
try {
Assert.assertEquals(0, f.self.getAcceptedEpoch());
Assert.assertEquals(0, f.self.getCurrentEpoch());
Expand Down Expand Up @@ -846,13 +858,28 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa,
Assert.assertEquals(1, f.self.getAcceptedEpoch());
Assert.assertEquals(1, f.self.getCurrentEpoch());

//Wait for the transactions to be written out. The thread that writes them out
// does not send anything back when it is done.
long start = System.currentTimeMillis();
while (createSessionZxid != f.fzk.getLastProcessedZxid() && (System.currentTimeMillis() - start) < 50) {
Thread.sleep(1);
}

Assert.assertEquals(createSessionZxid, f.fzk.getLastProcessedZxid());

// Make sure the data was recorded in the filesystem ok
ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
start = System.currentTimeMillis();
zkDb2.loadDataBase();
while (zkDb2.getSessionWithTimeOuts().isEmpty() && (System.currentTimeMillis() - start) < 50) {
Thread.sleep(1);
zkDb2.loadDataBase();
}
LOG.info("zkdb2 sessions:" + zkDb2.getSessions());
LOG.info("zkdb2 with timeouts:" + zkDb2.getSessionWithTimeOuts());
Assert.assertNotNull(zkDb2.getSessionWithTimeOuts().get(4L));
//Snapshot was never taken during very simple sync
verify(f.zk, never()).takeSnapshot();
} finally {
TestUtils.deleteFileRecursively(tmpDir);
}
Expand Down

0 comments on commit c164ee4

Please sign in to comment.