Skip to content

Commit

Permalink
ZOOKEEPER-2959: ignore accepted epoch and LEADERINFO ack from observers
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Shraer committed May 10, 2018
1 parent 43f117e commit 088dfdf
Show file tree
Hide file tree
Showing 5 changed files with 379 additions and 146 deletions.
30 changes: 20 additions & 10 deletions src/java/main/org/apache/zookeeper/server/quorum/Leader.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ public String toString() {

final QuorumPeer self;

private boolean quorumFormed = false;
// VisibleForTesting
protected boolean quorumFormed = false;

// the follower acceptor thread
volatile LearnerCnxAcceptor cnxAcceptor = null;
Expand Down Expand Up @@ -358,7 +359,8 @@ public boolean isQuorumSynced(QuorumVerifier qv) {

private final ConcurrentLinkedQueue<Proposal> toBeApplied = new ConcurrentLinkedQueue<Proposal>();

private final Proposal newLeaderProposal = new Proposal();
// VisibleForTesting
protected final Proposal newLeaderProposal = new Proposal();

class LearnerCnxAcceptor extends ZooKeeperCriticalThread {
private volatile boolean stop = false;
Expand Down Expand Up @@ -507,7 +509,7 @@ void lead() throws IOException, InterruptedException {
self.setCurrentEpoch(epoch);

try {
waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT);
waitForNewLeaderAck(self.getId(), zk.getZxid());
} catch (InterruptedException e) {
shutdown("Waiting for a quorum of followers, only synced with sids: [ "
+ newLeaderProposal.ackSetsToString() + " ]");
Expand Down Expand Up @@ -1163,7 +1165,8 @@ synchronized public long startForwarding(LearnerHandler handler,

return lastProposed;
}
private final HashSet<Long> connectingFollowers = new HashSet<Long>();
// VisibleForTesting
protected final Set<Long> connectingFollowers = new HashSet<Long>();
public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {
synchronized(connectingFollowers) {
if (!waitingForNewEpoch) {
Expand All @@ -1172,7 +1175,9 @@ public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws Interrupt
if (lastAcceptedEpoch >= epoch) {
epoch = lastAcceptedEpoch+1;
}
connectingFollowers.add(sid);
if (isParticipant(sid)) {
connectingFollowers.add(sid);
}
QuorumVerifier verifier = self.getQuorumVerifier();
if (connectingFollowers.contains(self.getId()) &&
verifier.containsQuorum(connectingFollowers)) {
Expand All @@ -1195,8 +1200,10 @@ public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws Interrupt
}
}

private final HashSet<Long> electingFollowers = new HashSet<Long>();
private boolean electionFinished = false;
// VisibleForTesting
protected final Set<Long> electingFollowers = new HashSet<Long>();
// VisibleForTesting
protected boolean electionFinished = false;
public void waitForEpochAck(long id, StateSummary ss) throws IOException, InterruptedException {
synchronized(electingFollowers) {
if (electionFinished) {
Expand All @@ -1210,7 +1217,7 @@ public void waitForEpochAck(long id, StateSummary ss) throws IOException, Interr
+ leaderStateSummary.getLastZxid()
+ " (last zxid)");
}
if (ss.getLastZxid() != -1) {
if (ss.getLastZxid() != -1 && isParticipant(id)) {
electingFollowers.add(id);
}
}
Expand Down Expand Up @@ -1294,10 +1301,9 @@ private synchronized void startZkServer() {
* sufficient acks.
*
* @param sid
* @param learnerType
* @throws InterruptedException
*/
public void waitForNewLeaderAck(long sid, long zxid, LearnerType learnerType)
public void waitForNewLeaderAck(long sid, long zxid)
throws InterruptedException {

synchronized (newLeaderProposal.qvAcksetPairs) {
Expand Down Expand Up @@ -1393,4 +1399,8 @@ public static String getPacketType(int packetType) {
private boolean isRunning() {
return self.isRunning() && zk.isRunning();
}

private boolean isParticipant(long sid) {
return self.getQuorumVerifier().getVotingMembers().containsKey(sid);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ public void run() {
if(LOG.isDebugEnabled()){
LOG.debug("Received NEWLEADER-ACK message from " + sid);
}
leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType());
leader.waitForNewLeaderAck(getSid(), qp.getZxid());

syncLimitCheck.start();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zookeeper.server.quorum;

import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.test.ClientBase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.io.File;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Set;

import static org.apache.zookeeper.server.quorum.ZabUtils.createLeader;
import static org.apache.zookeeper.server.quorum.ZabUtils.createQuorumPeer;

public class LeaderWithObserverTest {

QuorumPeer peer;
Leader leader;
File tmpDir;
long participantId;
long observerId;

@Before
public void setUp() throws Exception {
tmpDir = ClientBase.createTmpDir();
peer = createQuorumPeer(tmpDir);
participantId = 1;
Map<Long, QuorumPeer.QuorumServer> peers = peer.getQuorumVerifier().getAllMembers();
observerId = peers.size();
leader = createLeader(tmpDir, peer);
peer.leader = leader;
peers.put(observerId, new QuorumPeer.QuorumServer(
observerId, new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
QuorumPeer.LearnerType.OBSERVER));

// these tests are serial, we can speed up InterruptedException
peer.tickTime = 1;
}

@After
public void tearDown(){
leader.shutdown("end of test");
tmpDir.delete();
}

@Test
public void testGetEpochToPropose() throws Exception {
long lastAcceptedEpoch = 5;
peer.setAcceptedEpoch(5);

Assert.assertEquals("Unexpected vote in connectingFollowers", 0, leader.connectingFollowers.size());
Assert.assertTrue(leader.waitingForNewEpoch);
try {
// Leader asks for epoch (mocking Leader.lead behavior)
// First add to connectingFollowers
leader.getEpochToPropose(peer.getId(), lastAcceptedEpoch);
} catch (InterruptedException e) {
// ignore timeout
}

Assert.assertEquals("Unexpected vote in connectingFollowers", 1, leader.connectingFollowers.size());
Assert.assertEquals("Leader shouldn't set new epoch until quorum of participants is in connectingFollowers",
lastAcceptedEpoch, peer.getAcceptedEpoch());
Assert.assertTrue(leader.waitingForNewEpoch);
try {
// Observer asks for epoch (mocking LearnerHandler behavior)
leader.getEpochToPropose(observerId, lastAcceptedEpoch);
} catch (InterruptedException e) {
// ignore timeout
}

Assert.assertEquals("Unexpected vote in connectingFollowers", 1, leader.connectingFollowers.size());
Assert.assertEquals("Leader shouldn't set new epoch after observer asks for epoch",
lastAcceptedEpoch, peer.getAcceptedEpoch());
Assert.assertTrue(leader.waitingForNewEpoch);
try {
// Now participant asks for epoch (mocking LearnerHandler behavior). Second add to connectingFollowers.
// Triggers verifier.containsQuorum = true
leader.getEpochToPropose(participantId, lastAcceptedEpoch);
} catch (Exception e) {
Assert.fail("Timed out in getEpochToPropose");
}

Assert.assertEquals("Unexpected vote in connectingFollowers", 2, leader.connectingFollowers.size());
Assert.assertEquals("Leader should record next epoch", lastAcceptedEpoch + 1, peer.getAcceptedEpoch());
Assert.assertFalse(leader.waitingForNewEpoch);
}

@Test
public void testWaitForEpochAck() throws Exception {
// things needed for waitForEpochAck to run (usually in leader.lead(), but we're not running leader here)
leader.leaderStateSummary = new StateSummary(leader.self.getCurrentEpoch(), leader.zk.getLastProcessedZxid());

Assert.assertEquals("Unexpected vote in electingFollowers", 0, leader.electingFollowers.size());
Assert.assertFalse(leader.electionFinished);
try {
// leader calls waitForEpochAck, first add to electingFollowers
leader.waitForEpochAck(peer.getId(), new StateSummary(0, 0));
} catch (InterruptedException e) {
// ignore timeout
}

Assert.assertEquals("Unexpected vote in electingFollowers", 1, leader.electingFollowers.size());
Assert.assertFalse(leader.electionFinished);
try {
// observer calls waitForEpochAck, should fail verifier.containsQuorum
leader.waitForEpochAck(observerId, new StateSummary(0, 0));
} catch (InterruptedException e) {
// ignore timeout
}

Assert.assertEquals("Unexpected vote in electingFollowers", 1, leader.electingFollowers.size());
Assert.assertFalse(leader.electionFinished);
try {
// second add to electingFollowers, verifier.containsQuorum=true, waitForEpochAck returns without exceptions
leader.waitForEpochAck(participantId, new StateSummary(0, 0));
Assert.assertEquals("Unexpected vote in electingFollowers", 2, leader.electingFollowers.size());
Assert.assertTrue(leader.electionFinished);
} catch (Exception e) {
Assert.fail("Timed out in waitForEpochAck");
}
}

@Test
public void testWaitForNewLeaderAck() throws Exception {
long zxid = leader.zk.getZxid();

// things needed for waitForNewLeaderAck to run (usually in leader.lead(), but we're not running leader here)
leader.newLeaderProposal.packet = new QuorumPacket(0, zxid, null, null);
leader.newLeaderProposal.addQuorumVerifier(peer.getQuorumVerifier());

Set<Long> ackSet = leader.newLeaderProposal.qvAcksetPairs.get(0).getAckset();
Assert.assertEquals("Unexpected vote in ackSet", 0, ackSet.size());
Assert.assertFalse(leader.quorumFormed);
try {
// leader calls waitForNewLeaderAck, first add to ackSet
leader.waitForNewLeaderAck(peer.getId(), zxid);
} catch (InterruptedException e) {
// ignore timeout
}

Assert.assertEquals("Unexpected vote in ackSet", 1, ackSet.size());
Assert.assertFalse(leader.quorumFormed);
try {
// observer calls waitForNewLeaderAck, should fail verifier.containsQuorum
leader.waitForNewLeaderAck(observerId, zxid);
} catch (InterruptedException e) {
// ignore timeout
}

Assert.assertEquals("Unexpected vote in ackSet", 1, ackSet.size());
Assert.assertFalse(leader.quorumFormed);
try {
// second add to ackSet, verifier.containsQuorum=true, waitForNewLeaderAck returns without exceptions
leader.waitForNewLeaderAck(participantId, zxid);
Assert.assertEquals("Unexpected vote in ackSet", 2, ackSet.size());
Assert.assertTrue(leader.quorumFormed);
} catch (Exception e) {
Assert.fail("Timed out in waitForEpochAck");
}
}
}
Loading

0 comments on commit 088dfdf

Please sign in to comment.