diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperOracleQuorums.md b/zookeeper-docs/src/main/resources/markdown/zookeeperOracleQuorums.md new file mode 100644 index 00000000000..adc547778bb --- /dev/null +++ b/zookeeper-docs/src/main/resources/markdown/zookeeperOracleQuorums.md @@ -0,0 +1,202 @@ + + +# Introduction to Oracle Quorum +The introduction to Oracle Quorum increases the availability of a cluster of 2 ZooKeeper instances with a failure detector as known as the Oracle. + The Oracle is designed to grant the permission to the instance which is the only remaining instance +in a 2-instance configuration when the other instance is identified as faulty by the fail detector, the Oracle. + +## The implementation of the Oracle +Every instance shall access to a file which contains either 0 or 1 to indicate whether this instance is authorized by the Oracle. +However, this design can be changed since the fail detector algorithms vary from each other. Therefore, ones can override the method of _askOracle()_ in _QuorumOracleMaj_ to adapt the preferred way of deciphering the message from the Oracle. + +## The deployment cotexts +The Oracle is designed to increase the availability of a cluster of 2 ZooKeeper instances; thus, the size of the voting member is **2**. +In other words, the Oracle solves the consensus problem of a possibility of faulty instance in a two-instance ensemble. + +In the case that the size of the voting members exceeds 2, the expected way to make the Oracle work correctly is to reconfigure the size of the cluster when a faulty machine is identified. +For example, with a configuration of 5 instances, when a faulty machine breaks the connection with the Leader, it is expected to have a _reconfig_ client request to the cluster, which makes the cluster to re-form as the configuration of 4 instances. +Therefore, once the size of the voting member equals to 2, the configuration falls into the problem domain which the Oracle is designed to address. + +## How to deploy the Oracle in _zoo.cfg_ +Regardless of the size of the cluster, the _oraclePath_ must be configured at the time of the initialization, which is like other static parameters. +The below shows the correct way to specify and enable the Oracle. + + oraclePath=/to/some/file + +#### An example of zoo.cfg: + + dataDir=/data + dataLogDir=/datalog + tickTime=2000 + initLimit=5 + syncLimit=2 + autopurge.snapRetainCount=3 + autopurge.purgeInterval=0 + maxClientCnxns=60 + standaloneEnabled=true + admin.enableServer=true + oraclePath=/chassis/mastership + server.1=0.0.0.0:2888:3888;2181 + server.2=hw1:2888:3888;2181 + +The QuorumOracleMaj is designed to read the result of a failure detector, which is written on a text file, the oracle file. +The configuration in the zoo.cfg like the following: + + oraclePath=/to/some/file + +Suppose you have the result of the failure detector written on /some/path/result.txt, and then the correct configuration is the following: + + oraclePath=/some/path/result.txt + +So, what is the correct content of the provided file? An example file can be created with the following command from the terminal: + + $echo 1 > /some/path/result.txt + +Any equivalent files are suitable for the current implementation of QuorumOracleMaj. +The number of oracle files should be equal to the number of ZooKeeper instances configured to enable the Oracle. +In other words, each ZooKeeper instance should have its oracle file, and the files shall not be shared; otherwise, the issues in the next section will arise. + +## What differs after the deployment of the Oracle enabled +The _QuorumPeerConfig_ will create an instance of _QuorumOracleMaj_ instead of the default QuorumVerifier, _QuorumMaj_ when it reads the _zoo.cfg_ contains _oraclePath_. +QuorumOracleMaj inheritances from QuorumMaj, and differs from its superclass by overriding the method of _containsQuorum()_. +QuorumOracleMaj is designed to execute its version of _containsQuorum_ when the Leader loses all of its followers, and fails to maintain the quorum. +In other cases, _QuorumOracleMaj_ shall execute as _QuorumMaj_. + +## What we should pay attention to the Oracle +We consider an asynchronous distributed system which consists of **2** ZooKeeper instances and an Oracle. + +### Liveness Issue: +When we consider the oracle satisfies the following property introduced by [CT]: + + Strong Completeness: There is a time after which every process that crashes is permanently suspected by every correct processes + +The liveness of the system is ensured by the Oracle. +However, when the introduced oracle fails to maintain this property, the lost of the liveness is expected as the following example, + +Suppose we have a Leader and a Follower, which are running in the broadcasting state, +The system will lose its liveness when: + + 1. The Leader fails, but the Oracle does not detect the faulty Leader, which means the Oracle will not authorize the Follower to become a new Leader. + 2. When a Follower fails, but the Oracle does not detect the faulty follower, which means the Oracle will authorize the Leader to move system forward. + +### Safety Issue: +#### Lost of Progress +The progress can lost when multiple failures occurs in the system at different time as the following example, + +Suppose we have a Leader(Ben) and a Follower(John) in the broadcasting state, + + At T1 with zxid(0x1_1): L-Ben fails, and the F-John takes over the system under the authorization from the Oracle. + At T2 with zxid(0x2_1): The F-John becomes a new Leader, L-John, and starts a new epoch. + At T3 with zxid(0x2_A): L-John fails + At T4 with zxid(0x2_A): Ben recovers up and starts its leader election. + At T5 with zxid(0x3_1): Ben becomes the new leader, L-Ben, under the authorization from the Oracle. + +In this case, the system loses its progress after the L-Ben failed. + + +However, the lost of progress can be prevented by making the Oracle is capable of referring the latest zxid. +When the Oracle could refer to the latest zxid, + + At T5 with zxid(0x2_A): Ben will not end his leader election because the Oracle would not authorize although John is down. + +Nevertheless, we exchange the liveness for the safety. +#### Split Brain Issue +We consider the Oracle satisfies the following desired property introduced by [CT], + + Accuracy: There is a time after which some correct processes is never suspected by any processes + +Nevertheless, the decisions which the Oracle gives out should be mutual exclusive. + +In other words, + +Suppose we have a Leader(Ben) and a Follower(John) in the broadcasting state, + + - At any time, the Oracle will not authorize both Ben and John even though the failure detectors think each other is faulty. + Or + - At any time, for any two values in any two Oracle files respectively, the values are not both equal to 1. + +The split brain is expected when the Oracle fails to maintain this property during the leader election phase of + + 1. Start of the system + 2. A failed instance recovers from failures. + +## Examples of Concepts for Implementation of a Failure Detector +One should consider that the failure detector's outcome is to authorize the querying ZooKeeper instance whether it has the right to move the system forward without waiting for the faulty instance, which is identified by the failure detector. + +### An Implementation of Hardware +Suppose two dedicated pieces of hardware, hw1 and hw2, can host ZooKeeper instances, zk1 and zk2, respectively, and form a cluster. +A hardware device is attached to both of the hardware, and it is capable of determining whether the hardware is power on or not. +So, when hw1 is not power on, the zk1 is undoubtedly faulty. +Therefore, the hardware device updates the oracle file on hw2 to 1, which indicates that zk1 is faulty and authorizes zk2 to move the system forwards. + +### An Implementation of Software +Suppose two dedicated pieces of hardware, hw1 and hw2, can host ZooKeeper instances, zk1 and zk2, respectively, and form a cluster. +One can have two more services, o1 and o2, on hw1 and hw2, respectively. The job of o1 and o2 are detecting the other hardware is alive or not. +For example, o1 can constantly ping hw2 to determine if hw2 is power on or not. +When o1 cannot ping hw2, o1 identifies that hw2 is faulty and then update the oracle file of zk1 to 1, which indicates that zk2 is faulty and authorizes zk1 to move the system forwards. + +### Use USB devices as Oracle to Maintain Progress +In macOS,10.15.7 (19H2), the external storage devices are mounted under `/Volumes`. +Thus, we can insert a USB device which contains the required information as the oracle. +When the device is connected, the oracle authorizes the leader to move system forward, which also means the other instance fails. +There are **SIX** steps to reproduce this stimulation. + +* Firstly, insert a USB device named `Oracle`, and then we can expect that `/Volumes/Oracle` is accessible. +* Secondly, we create a file contains `1` under `/Volumes/Oracle` named `mastership`. +Now we can access `/Volumes/Oracle/mastership`, and so does the zookeeper instances to see whether it has the right to move the system forward. +The file can easily be generated by the following command: + + + $echo 1 > mastership + +* Thirdly, you shall have a `zoo.cfg` like the example below: + + + dataDir=/data + dataLogDir=/datalog + tickTime=2000 + initLimit=5 + syncLimit=2 + autopurge.snapRetainCount=3 + autopurge.purgeInterval=0 + maxClientCnxns=60 + standaloneEnabled=true + admin.enableServer=true + oraclePath=/Volumes/Oracle/mastership + server.1=0.0.0.0:2888:3888;2181 + server.2=hw1:2888:3888;2181 + +_(NOTE) The split brain issues will not occur because there is only a SINGLE USB device in this stimulation._ +_Additionally, `mastership` should not be shared by multiple instances._ +_Thus, only one ZooKeeper instance is configured with Oracle._ +_For more, please refer to Section Safety Issue._ + +* Fourthly, start the cluster, and it is expected it forms a quorum normally. +* Fifthly, terminate the instance either without attaching to a USB device or `mastership` contains 0. +There are two scenarios to expect: + 1. A leader failure occurs, and the remained instance finishes the leader election on its own due to the oracle. + 2. The quorum is still maintained due to the oracle. + +* Lastly, when the USB device is removed, `/Volumes/Oracle/mastership` becomes unavailable. +Therefore, according to the current implementation, whenever the Leader queries the oracle, the oracle throws an exception and return `FALSE`. +Repeat the fifth step, and then it is expected that either the system cannot recover from a leader failure ,or the leader loses the quorum. +In either case, the service is interrupted. + +With these steps, we can show and practice how the oracle works with two-instance systems with ease. + +##REFERENCE +[CT] Tushar Deepak Chandra and Sam Toueg. 1991. Unreliable failure detectors for asynchronous systems (preliminary version). In Proceedings of the tenth annual ACM symposium on Principles of distributed computing (PODC '91). Association for Computing Machinery, New York, NY, USA, 325–340. DOI:https://doi.org/10.1145/112600.112627 \ No newline at end of file diff --git a/zookeeper-server/pom.xml b/zookeeper-server/pom.xml index 90ee82760ee..2636f5cd969 100755 --- a/zookeeper-server/pom.xml +++ b/zookeeper-server/pom.xml @@ -172,6 +172,12 @@ snappy-java provided + + commons-io + commons-io + 2.6 + compile + diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/cli/ReconfigCommand.java b/zookeeper-server/src/main/java/org/apache/zookeeper/cli/ReconfigCommand.java index 8afc14bb517..ce490ad8039 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/cli/ReconfigCommand.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/cli/ReconfigCommand.java @@ -130,7 +130,7 @@ public CliCommand parse(String[] cmdArgs) throws CliParseException { //check that membership makes sense; leader will make these checks again //don't check for leader election ports since //client doesn't know what leader election alg is used - members = QuorumPeerConfig.parseDynamicConfig(dynamicCfg, 0, true, false).toString(); + members = QuorumPeerConfig.parseDynamicConfig(dynamicCfg, 0, true, false, null).toString(); } catch (Exception e) { throw new CliParseException("Error processing " + cl.getOptionValue("file") + e.getMessage()); } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java index e71828d262b..11b5ccbc8d3 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java @@ -65,6 +65,7 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig; import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; import org.apache.zookeeper.server.quorum.flexible.QuorumMaj; +import org.apache.zookeeper.server.quorum.flexible.QuorumOracleMaj; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; import org.apache.zookeeper.txn.CheckVersionTxn; import org.apache.zookeeper.txn.CloseSessionTxn; @@ -452,7 +453,7 @@ protected void pRequest2Txn(int type, long zxid, Request request, Record record, try { Properties props = new Properties(); props.load(new StringReader(newMembers)); - request.qv = QuorumPeerConfig.parseDynamicConfig(props, lzks.self.getElectionType(), true, false); + request.qv = QuorumPeerConfig.parseDynamicConfig(props, lzks.self.getElectionType(), true, false, lastSeenQV.getOraclePath()); request.qv.setVersion(request.getHdr().getZxid()); } catch (IOException | ConfigException e) { throw new KeeperException.BadArgumentsException(e.getMessage()); @@ -472,7 +473,7 @@ protected void pRequest2Txn(int type, long zxid, Request request, Record record, leavingServers = StringUtils.split(leavingServersString, ","); } - if (!(lastSeenQV instanceof QuorumMaj)) { + if (!(lastSeenQV instanceof QuorumMaj) && !(lastSeenQV instanceof QuorumOracleMaj)) { String msg = "Incremental reconfiguration requested but last configuration seen has a non-majority quorum system"; LOG.warn(msg); throw new KeeperException.BadArgumentsException(msg); @@ -514,7 +515,13 @@ protected void pRequest2Txn(int type, long zxid, Request request, Record record, } catch (ConfigException e) { throw new KeeperException.BadArgumentsException("Reconfiguration failed"); } - request.qv = new QuorumMaj(nextServers); + + if (lastSeenQV instanceof QuorumMaj) { + request.qv = new QuorumMaj(nextServers); + } else { + request.qv = new QuorumOracleMaj(nextServers, lastSeenQV.getOraclePath()); + } + request.qv.setVersion(request.getHdr().getZxid()); } if (QuorumPeerConfig.isStandaloneEnabled() && request.qv.getVotingMembers().size() < 2) { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FastLeaderElection.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FastLeaderElection.java index 0950c6d4c2e..9fc9d148c39 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FastLeaderElection.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FastLeaderElection.java @@ -34,6 +34,7 @@ import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType; import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState; import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; +import org.apache.zookeeper.server.quorum.flexible.QuorumOracleMaj; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; import org.apache.zookeeper.server.util.ZxidUtils; import org.slf4j.Logger; @@ -948,7 +949,7 @@ public Vote lookForLeader() throws InterruptedException { Long.toHexString(proposedZxid)); sendNotifications(); - SyncedLearnerTracker voteSet; + SyncedLearnerTracker voteSet = null; /* * Loop in which we exchange notifications until we find a leader @@ -977,7 +978,24 @@ public Vote lookForLeader() throws InterruptedException { */ int tmpTimeOut = notTimeout * 2; notTimeout = Math.min(tmpTimeOut, maxNotificationInterval); + + /* + * When a leader failure happens on a master, the backup will be supposed to receive the honour from + * Oracle and become a leader, but the honour is likely to be delay. We do a re-check once timeout happens + * + * The leader election algorithm does not provide the ability of electing a leader from a single instance + * which is in a configuration of 2 instances. + * */ + self.getQuorumVerifier().revalidateVoteset(voteSet, notTimeout != minNotificationInterval); + if (self.getQuorumVerifier() instanceof QuorumOracleMaj && voteSet != null && voteSet.hasAllQuorums() && notTimeout != minNotificationInterval) { + setPeerState(proposedLeader, voteSet); + Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch); + leaveInstance(endVote); + return endVote; + } + LOG.info("Notification time out: {} ms", notTimeout); + } else if (validVoter(n.sid) && validVoter(n.leader)) { /* * Only proceed if the vote comes from a replica in the current or next @@ -1051,43 +1069,53 @@ public Vote lookForLeader() throws InterruptedException { case OBSERVING: LOG.debug("Notification from observer: {}", n.sid); break; + + /* + * In ZOOKEEPER-3922, we separate the behaviors of FOLLOWING and LEADING. + * To avoid the duplication of codes, we create a method called followingBehavior which was used to + * shared by FOLLOWING and LEADING. This method returns a Vote. When the returned Vote is null, it follows + * the original idea to break swtich statement; otherwise, a valid returned Vote indicates, a leader + * is generated. + * + * The reason why we need to separate these behaviors is to make the algorithm runnable for 2-node + * setting. An extra condition for generating leader is needed. Due to the majority rule, only when + * there is a majority in the voteset, a leader will be generated. However, in a configuration of 2 nodes, + * the number to achieve the majority remains 2, which means a recovered node cannot generate a leader which is + * the existed leader. Therefore, we need the Oracle to kick in this situation. In a two-node configuration, the Oracle + * only grants the permission to maintain the progress to one node. The oracle either grants the permission to the + * remained node and makes it a new leader when there is a faulty machine, which is the case to maintain the progress. + * Otherwise, the oracle does not grant the permission to the remained node, which further causes a service down. + * + * In the former case, when a failed server recovers and participate in the leader election, it would not locate a + * new leader because there does not exist a majority in the voteset. It fails on the containAllQuorum() infinitely due to + * two facts. First one is the fact that it does do not have a majority in the voteset. The other fact is the fact that + * the oracle would not give the permission since the oracle already gave the permission to the existed leader, the healthy machine. + * Logically, when the oracle replies with negative, it implies the existed leader which is LEADING notification comes from is a valid leader. + * To threat this negative replies as a permission to generate the leader is the purpose to separate these two behaviors. + * + * + * */ case FOLLOWING: - case LEADING: /* - * Consider all notifications from the same epoch - * together. - */ - if (n.electionEpoch == logicalclock.get()) { - recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); - voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); - if (voteSet.hasAllQuorums() && checkLeader(recvset, n.leader, n.electionEpoch)) { - setPeerState(n.leader, voteSet); - Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); - leaveInstance(endVote); - return endVote; - } + * To avoid duplicate codes + * */ + Vote resultFN = receivedFollowingNotification(recvset, outofelection, voteSet, n); + if (resultFN == null) { + break; + } else { + return resultFN; } - + case LEADING: /* - * Before joining an established ensemble, verify that - * a majority are following the same leader. - * - * Note that the outofelection map also stores votes from the current leader election. - * See ZOOKEEPER-1732 for more information. - */ - outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); - voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); - - if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) { - synchronized (this) { - logicalclock.set(n.electionEpoch); - setPeerState(n.leader, voteSet); - } - Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); - leaveInstance(endVote); - return endVote; + * In leadingBehavior(), it performs followingBehvior() first. When followingBehavior() returns + * a null pointer, ask Oracle whether to follow this leader. + * */ + Vote resultLN = receivedLeadingNotification(recvset, outofelection, voteSet, n); + if (resultLN == null) { + break; + } else { + return resultLN; } - break; default: LOG.warn("Notification state unrecognized: {} (n.state), {}(n.sid)", n.state, n.sid); break; @@ -1115,6 +1143,74 @@ public Vote lookForLeader() throws InterruptedException { } } + private Vote receivedFollowingNotification(Map recvset, Map outofelection, SyncedLearnerTracker voteSet, Notification n) { + /* + * Consider all notifications from the same epoch + * together. + */ + if (n.electionEpoch == logicalclock.get()) { + recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); + voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); + if (voteSet.hasAllQuorums() && checkLeader(recvset, n.leader, n.electionEpoch)) { + setPeerState(n.leader, voteSet); + Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); + leaveInstance(endVote); + return endVote; + } + } + + /* + * Before joining an established ensemble, verify that + * a majority are following the same leader. + * + * Note that the outofelection map also stores votes from the current leader election. + * See ZOOKEEPER-1732 for more information. + */ + outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); + voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); + + if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) { + synchronized (this) { + logicalclock.set(n.electionEpoch); + setPeerState(n.leader, voteSet); + } + Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); + leaveInstance(endVote); + return endVote; + } + + return null; + } + + private Vote receivedLeadingNotification(Map recvset, Map outofelection, SyncedLearnerTracker voteSet, Notification n) { + /* + * + * In a two-node configuration, a recovery nodes cannot locate a leader because of the lack of the majority in the voteset. + * Therefore, it is the time for Oracle to take place as a tight breaker. + * + * */ + Vote result = receivedFollowingNotification(recvset, outofelection, voteSet, n); + if (result == null) { + /* + * Ask Oracle to see if it is okay to follow this leader. + * + * We don't need the CheckLeader() because itself cannot be a leader candidate + * */ + if (self.getQuorumVerifier().getNeedOracle() && !self.getQuorumVerifier().askOracle()) { + LOG.info("Oracle indicates to follow"); + setPeerState(n.leader, voteSet); + Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); + leaveInstance(endVote); + return endVote; + } else { + LOG.info("Oracle indicates not to follow"); + return null; + } + } else { + return result; + } + } + /** * Check if a given sid is represented in either the current or * the next voting view diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java index 2de2ceeb8b0..ce8f7999c45 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java @@ -174,6 +174,10 @@ public List getNonVotingFollowers() { void addForwardingFollower(LearnerHandler lh) { synchronized (forwardingFollowers) { forwardingFollowers.add(lh); + /* + * Any changes on forwardiongFollowers could possible affect the need of Oracle. + * */ + self.getQuorumVerifier().updateNeedOracle(new ArrayList<>(forwardingFollowers)); } } @@ -757,7 +761,27 @@ void lead() throws IOException, InterruptedException { break; } - if (!tickSkip && !syncedAckSet.hasAllQuorums()) { + /* + * + * We will need to re-validate the outstandingProposal to maintain the progress of ZooKeeper. + * It is likely a proposal is waiting for enough ACKs to be committed. The proposals are sent out, but the + * only follower goes away which makes the proposals will not be committed until the follower recovers back. + * An earlier proposal which is not committed will block any further proposals. So, We need to re-validate those + * outstanding proposal with the help from Oracle. A key point in the process of re-validation is that the proposals + * need to be processed in order. + * + * We make the whole method blocking to avoid any possible race condition on outstandingProposal and lastCommitted + * as well as to avoid nested synchronization. + * + * As a more generic approach, we pass the object of forwardingFollowers to QuorumOracleMaj to determine if we need + * the help from Oracle. + * + * + * the size of outstandingProposals can be 1. The only one outstanding proposal is the one waiting for the ACK from + * the leader itself. + * */ + if (!tickSkip && !syncedAckSet.hasAllQuorums() + && !(self.getQuorumVerifier().overrideQuorumDecision(getForwardingFollowers()) && self.getQuorumVerifier().revalidateOutstandingProp(this, new ArrayList<>(outstandingProposals.values()), lastCommitted))) { // Lost quorum of last committed and/or last proposed // config, set shutdown flag shutdownMessage = "Not sufficient followers synced, only synced with sids: [ " @@ -909,10 +933,10 @@ public synchronized boolean tryToCommit(Proposal p, long zxid, SocketAddress fol // commit proposals in order if (zxid != lastCommitted + 1) { LOG.warn( - "Commiting zxid 0x{} from {} noy first!", + "Commiting zxid 0x{} from {} not first!", Long.toHexString(zxid), followerAddr); - LOG.warn("First is {}", (lastCommitted + 1)); + LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1)); } outstandingProposals.remove(zxid); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java index 3102c6379ed..19aae086027 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java @@ -78,6 +78,7 @@ import org.apache.zookeeper.server.quorum.auth.SaslQuorumAuthLearner; import org.apache.zookeeper.server.quorum.auth.SaslQuorumAuthServer; import org.apache.zookeeper.server.quorum.flexible.QuorumMaj; +import org.apache.zookeeper.server.quorum.flexible.QuorumOracleMaj; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; import org.apache.zookeeper.server.util.ConfigUtils; import org.apache.zookeeper.server.util.JvmPauseMonitor; @@ -1254,6 +1255,22 @@ public QuorumPeer(Map quorumPeers, File snapDir, File logDir new QuorumMaj(quorumPeers)); } + public QuorumPeer(Map quorumPeers, File snapDir, File logDir, int clientPort, int electionAlg, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit, String oraclePath) throws IOException { + this( + quorumPeers, + snapDir, + logDir, + electionAlg, + myid, + tickTime, + initLimit, + syncLimit, + connectToLearnerMasterLimit, + false, + ServerCnxnFactory.createFactory(getClientAddress(quorumPeers, myid, clientPort), -1), + new QuorumOracleMaj(quorumPeers, oraclePath)); + } + /** * This constructor is only used by the existing unit test code. * It defaults to FileLogProvider persistence provider. @@ -1808,7 +1825,7 @@ public int getTick() { public QuorumVerifier configFromString(String s) throws IOException, ConfigException { Properties props = new Properties(); props.load(new StringReader(s)); - return QuorumPeerConfig.parseDynamicConfig(props, electionType, false, false); + return QuorumPeerConfig.parseDynamicConfig(props, electionType, false, false, getQuorumVerifier().getOraclePath()); } /** diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java index c56204f1f76..1b37f291f51 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java @@ -52,6 +52,7 @@ import org.apache.zookeeper.server.quorum.auth.QuorumAuth; import org.apache.zookeeper.server.quorum.flexible.QuorumHierarchical; import org.apache.zookeeper.server.quorum.flexible.QuorumMaj; +import org.apache.zookeeper.server.quorum.flexible.QuorumOracleMaj; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; import org.apache.zookeeper.server.util.JvmPauseMonitor; import org.apache.zookeeper.server.util.VerifyingFileFactory; @@ -130,6 +131,8 @@ public class QuorumPeerConfig { Integer.parseInt(System.getProperty(QuorumPeer.CONFIG_KEY_MULTI_ADDRESS_REACHABILITY_CHECK_TIMEOUT_MS, String.valueOf(MultipleAddresses.DEFAULT_TIMEOUT.toMillis()))); + protected String oraclePath; + /** * Minimum snapshot retain count. * @see org.apache.zookeeper.server.PurgeTxnLog#purge(File, File, int) @@ -380,6 +383,8 @@ public void parseProperties(Properties zkProp) throws IOException, ConfigExcepti multiAddressReachabilityCheckTimeoutMs = Integer.parseInt(value); } else if (key.equals("multiAddress.reachabilityCheckEnabled")) { multiAddressReachabilityCheckEnabled = parseBoolean(key, value); + } else if (key.equals("oraclePath")) { + oraclePath = value; } else { System.setProperty("zookeeper." + key, value); } @@ -629,6 +634,15 @@ public static void deleteFile(String filename) { } } + + private static QuorumVerifier createQuorumVerifier(Properties dynamicConfigProp, boolean isHierarchical, String oraclePath) throws ConfigException { + if (oraclePath == null) { + return createQuorumVerifier(dynamicConfigProp, isHierarchical); + } else { + return new QuorumOracleMaj(dynamicConfigProp, oraclePath); + } + } + private static QuorumVerifier createQuorumVerifier(Properties dynamicConfigProp, boolean isHierarchical) throws ConfigException { if (isHierarchical) { return new QuorumHierarchical(dynamicConfigProp); @@ -642,7 +656,7 @@ private static QuorumVerifier createQuorumVerifier(Properties dynamicConfigProp, } void setupQuorumPeerConfig(Properties prop, boolean configBackwardCompatibilityMode) throws IOException, ConfigException { - quorumVerifier = parseDynamicConfig(prop, electionAlg, true, configBackwardCompatibilityMode); + quorumVerifier = parseDynamicConfig(prop, electionAlg, true, configBackwardCompatibilityMode, oraclePath); setupMyId(); setupClientPort(); setupPeerType(); @@ -656,7 +670,7 @@ void setupQuorumPeerConfig(Properties prop, boolean configBackwardCompatibilityM * @throws IOException * @throws ConfigException */ - public static QuorumVerifier parseDynamicConfig(Properties dynamicConfigProp, int eAlg, boolean warnings, boolean configBackwardCompatibilityMode) throws IOException, ConfigException { + public static QuorumVerifier parseDynamicConfig(Properties dynamicConfigProp, int eAlg, boolean warnings, boolean configBackwardCompatibilityMode, String oraclePath) throws IOException, ConfigException { boolean isHierarchical = false; for (Entry entry : dynamicConfigProp.entrySet()) { String key = entry.getKey().toString().trim(); @@ -668,7 +682,7 @@ public static QuorumVerifier parseDynamicConfig(Properties dynamicConfigProp, in } } - QuorumVerifier qv = createQuorumVerifier(dynamicConfigProp, isHierarchical); + QuorumVerifier qv = createQuorumVerifier(dynamicConfigProp, isHierarchical, oraclePath); int numParticipators = qv.getVotingMembers().size(); int numObservers = qv.getObservingMembers().size(); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java index ced966f58c2..5376ae6c520 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java @@ -120,6 +120,7 @@ public boolean equals(Object o) { } return true; } + /** * This constructor requires the quorum configuration * to be declared in a separate file, and it takes the diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java index 6e6f1c2887d..ed38533ec42 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java @@ -26,6 +26,8 @@ import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType; import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class implements a validator for majority quorums. The implementation is @@ -34,11 +36,13 @@ */ public class QuorumMaj implements QuorumVerifier { + private static final Logger LOG = LoggerFactory.getLogger(QuorumMaj.class); + private Map allMembers = new HashMap(); private Map votingMembers = new HashMap(); private Map observingMembers = new HashMap(); private long version = 0; - private int half; + protected int half; public int hashCode() { assert false : "hashCode not designed"; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumOracleMaj.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumOracleMaj.java new file mode 100644 index 00000000000..0845b802979 --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumOracleMaj.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum.flexible; + +import java.io.FileReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.io.FilenameUtils; +import org.apache.zookeeper.server.quorum.Leader; +import org.apache.zookeeper.server.quorum.LearnerHandler; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.apache.zookeeper.server.quorum.QuorumPeerConfig; +import org.apache.zookeeper.server.quorum.SyncedLearnerTracker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/* + * + * QuorumOracleMaj is a subclass of QuorumMaj. + * + * QuorumOracleMaj is designed to be functional in a 2-nodes configuration. The only method that this class overrides super + * class' method is containsQuorum(). Besides the check of oracle, it also checks the number of voting member. Whenever the + * number of voting members is greater than 2. QuorumOracleMaj shall function as hook to its super class. + * */ +public class QuorumOracleMaj extends QuorumMaj { + private static final Logger LOG = LoggerFactory.getLogger(QuorumOracleMaj.class); + + private String oracle = null; + + private final AtomicBoolean needOracle = new AtomicBoolean(true); + + public QuorumOracleMaj(Map allMembers, String oraclePath) { + super(allMembers); + setOracle(oraclePath); + } + + public QuorumOracleMaj(Properties props, String oraclePath) throws QuorumPeerConfig.ConfigException { + super(props); + setOracle(oraclePath); + } + + private void setOracle(String path) { + if (oracle == null) { + oracle = path; + LOG.info("Oracle is set to {}", path); + } else { + LOG.warn("Oracle is already set. Ignore:{}", path); + } + } + + @Override + public boolean updateNeedOracle(List forwardingFollowers) { + // Do we have the quorum + needOracle.set(forwardingFollowers.isEmpty() && super.getVotingMembers().size() == 2); + return needOracle.get(); + } + + @Override + public boolean askOracle() { + FileReader fr = null; + try { + int read; + fr = new FileReader(FilenameUtils.getFullPath(oracle) + FilenameUtils.getName(oracle)); + read = fr.read(); + LOG.debug("Oracle says:{}", (char) read); + fr.close(); + return (char) read == '1'; + } catch (Exception e) { + e.printStackTrace(); + if (oracle == null) { + LOG.error("Oracle is not set, return false"); + } + return false; + } finally { + if (fr != null) { + try { + fr.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } + + @Override + public boolean getNeedOracle() { + return needOracle.get(); + } + + @Override + public String getOraclePath() { + return oracle; + } + + @Override + public boolean overrideQuorumDecision(List forwardingFollowers) { + if (updateNeedOracle(forwardingFollowers) && askOracle()) { + return true; + } else { + return false; + } + } + + @Override + public boolean revalidateOutstandingProp(Leader self, ArrayList outstandingProposal, long lastCommitted) { + LOG.debug("Start Revalidation outstandingProposals"); + try { + while (outstandingProposal.size() >= 1) { + outstandingProposal.sort((o1, o2) -> (int) (o1.packet.getZxid() - o2.packet.getZxid())); + + Leader.Proposal p; + int i = 0; + while (i < outstandingProposal.size()) { + p = outstandingProposal.get(i); + if (p.request.zxid > lastCommitted) { + LOG.debug("Re-validate outstanding proposal: 0x{} size:{} lastCommitted:{}", Long.toHexString(p.request.zxid), outstandingProposal.size(), Long.toHexString(lastCommitted)); + if (!self.tryToCommit(p, p.request.zxid, null)) { + break; + } else { + lastCommitted = p.request.zxid; + outstandingProposal.remove(p); + } + } + } + } + } catch (Exception e) { + e.printStackTrace(); + return false; + } + + LOG.debug("Finish Revalidation outstandingProposals"); + return true; + } + + @Override + public boolean revalidateVoteset(SyncedLearnerTracker voteSet, boolean timeout) { + return voteSet != null && voteSet.hasAllQuorums() && timeout; + } + + @Override + public boolean containsQuorum(Set ackSet) { + if (oracle == null || getVotingMembers().size() > 2) { + return super.containsQuorum(ackSet); + } else if (!super.containsQuorum(ackSet)) { + if (getNeedOracle()) { + LOG.debug("We lose the quorum, but we do not have any valid followers Oracle:{}", askOracle()); + return askOracle(); + } else { + return false; + } + } else { + return true; + } + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + QuorumOracleMaj qm = (QuorumOracleMaj) o; + if (qm.getVersion() == super.getVersion()) { + return true; + } + if (super.getAllMembers().size() != qm.getAllMembers().size()) { + return false; + } + for (QuorumPeer.QuorumServer qs : super.getAllMembers().values()) { + QuorumPeer.QuorumServer qso = qm.getAllMembers().get(qs.id); + if (qso == null || !qs.equals(qso)) { + return false; + } + } + return true; + } + + @Override + public int hashCode() { + assert false : "hashCode not designed"; + return 43; // any arbitrary constant will do + } +} + diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java index 12d84890f06..7362313552c 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java @@ -18,9 +18,14 @@ package org.apache.zookeeper.server.quorum.flexible; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.zookeeper.server.quorum.Leader; +import org.apache.zookeeper.server.quorum.LearnerHandler; import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; +import org.apache.zookeeper.server.quorum.SyncedLearnerTracker; /** * All quorum validators have to implement a method called @@ -39,6 +44,37 @@ public interface QuorumVerifier { Map getVotingMembers(); Map getObservingMembers(); boolean equals(Object o); + /* + * Only QuorumOracleMaj will implement these methods. Other class will raise warning if the methods are called and + * return false always. + * */ + default boolean updateNeedOracle(List forwardingFollowers) { + return false; + } + default boolean getNeedOracle() { + return false; + } + + default boolean askOracle() { + return false; + } + + default boolean overrideQuorumDecision(List forwardingFollowers) { + return false; + } + + default boolean revalidateOutstandingProp(Leader self, ArrayList outstandingProposal, long lastCommitted) { + return false; + } + + default boolean revalidateVoteset(SyncedLearnerTracker voteSet, boolean timeout) { + return false; + } + + default String getOraclePath() { + return null; + }; + String toString(); } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EagerACLFilterTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EagerACLFilterTest.java index d37516bd3bf..a27d5cf9f0f 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EagerACLFilterTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EagerACLFilterTest.java @@ -72,7 +72,7 @@ public void setUp(ServerState serverState, boolean checkEnabled) throws Exceptio ensureCheck(checkEnabled); CountdownWatcher clientWatch = new CountdownWatcher(); CountdownWatcher clientWatchB = new CountdownWatcher(); - super.setUp(true); + super.setUp(true, true); String hostPort = getPeersMatching(serverState).split(",")[0]; int clientPort = Integer.parseInt(hostPort.split(":")[1]); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerConfigTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerConfigTest.java index b2c350e3edd..407a9d1a58b 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerConfigTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerConfigTest.java @@ -203,6 +203,7 @@ public void testParseBoolean() throws IOException, ConfigException { private Properties getDefaultZKProperties() { Properties zkProp = new Properties(); zkProp.setProperty("dataDir", new File("myDataDir").getAbsolutePath()); + zkProp.setProperty("oraclePath", new File("mastership").getAbsolutePath()); return zkProp; } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumRequestPipelineTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumRequestPipelineTest.java index 0888d6fb505..c395a62e6be 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumRequestPipelineTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumRequestPipelineTest.java @@ -69,7 +69,7 @@ public void setUp() { public void setUp(ServerState serverState) throws Exception { CountdownWatcher clientWatch = new CountdownWatcher(); - super.setUp(true); + super.setUp(true, true); zkClient = createClient(clientWatch, getPeersMatching(serverState)); zkClient.addAuthInfo(AUTH_PROVIDER, AUTH); clientWatch.waitForConnected(CONNECTION_TIMEOUT); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/AsyncHammerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/AsyncHammerTest.java index 2276354d15e..108b21c0f09 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/AsyncHammerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/AsyncHammerTest.java @@ -47,7 +47,7 @@ public class AsyncHammerTest extends ZKTestCase implements StringCallback, VoidC private volatile boolean bang; public void setUp(boolean withObservers) throws Exception { - qb.setUp(withObservers); + qb.setUp(withObservers, false); } protected void restart() throws Exception { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverLETest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverLETest.java index 014ccb4df0c..67aa5959263 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverLETest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverLETest.java @@ -35,7 +35,7 @@ public class ObserverLETest extends ZKTestCase { @BeforeEach public void establishThreeParticipantOneObserverEnsemble() throws Exception { - qb.setUp(true); + qb.setUp(true, false); ct.hostPort = qb.hostPort; ct.setUpAll(); qb.s5.shutdown(); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverQuorumHammerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverQuorumHammerTest.java index 00953f792f3..45e37f2da04 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverQuorumHammerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverQuorumHammerTest.java @@ -28,7 +28,7 @@ public class ObserverQuorumHammerTest extends QuorumHammerTest { @BeforeEach @Override public void setUp() throws Exception { - qb.setUp(true); + qb.setUp(true, false); cht.hostPort = qb.hostPort; cht.setUpAll(); } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java index 02e5e0a034c..c2396a6403d 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java @@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import java.io.File; +import java.io.FileWriter; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -48,6 +49,13 @@ public class QuorumBase extends ClientBase { private static final String LOCALADDR = "127.0.0.1"; + private static final String oraclePath_0 = "./tmp/oraclePath/0/mastership/"; + private static final String oraclePath_1 = "./tmp/oraclePath/1/mastership/"; + private static final String oraclePath_2 = "./tmp/oraclePath/0/mastership/"; + private static final String oraclePath_3 = "./tmp/oraclePath/1/mastership/"; + private static final String oraclePath_4 = "./tmp/oraclePath/0/mastership/"; + private static final String mastership = "value"; + File s1dir, s2dir, s3dir, s4dir, s5dir; QuorumPeer s1, s2, s3, s4, s5; protected int port1; @@ -71,13 +79,14 @@ public class QuorumBase extends ClientBase { protected boolean localSessionsEnabled = false; protected boolean localSessionsUpgradingEnabled = false; + @BeforeEach @Override public void setUp() throws Exception { - setUp(false); + setUp(false, true); } - protected void setUp(boolean withObservers) throws Exception { + protected void setUp(boolean withObservers, boolean withOracle) throws Exception { LOG.info("QuorumBase.setup {}", getTestName()); setupTestEnv(); @@ -121,21 +130,54 @@ protected void setUp(boolean withObservers) throws Exception { s4dir = ClientBase.createTmpDir(); s5dir = ClientBase.createTmpDir(); - startServers(withObservers); + startServers(withObservers, withOracle); OSMXBean osMbean = new OSMXBean(); if (osMbean.getUnix()) { LOG.info("Initial fdcount is: {}", osMbean.getOpenFileDescriptorCount()); } + if (withOracle) { + File directory = new File(oraclePath_0); + directory.mkdirs(); + FileWriter fw = new FileWriter(oraclePath_0 + mastership); + fw.write("1"); + fw.close(); + + directory = new File(oraclePath_1); + directory.mkdirs(); + fw = new FileWriter(oraclePath_1 + mastership); + fw.write("0"); + fw.close(); + + directory = new File(oraclePath_2); + directory.mkdirs(); + fw = new FileWriter(oraclePath_2 + mastership); + fw.write("0"); + fw.close(); + + directory = new File(oraclePath_3); + directory.mkdirs(); + fw = new FileWriter(oraclePath_3 + mastership); + fw.write("1"); + fw.close(); + + directory = new File(oraclePath_4); + directory.mkdirs(); + fw = new FileWriter(oraclePath_4 + mastership); + fw.write("0"); + fw.close(); + } + + LOG.info("Setup finished"); } void startServers() throws Exception { - startServers(false); + startServers(false, true); } - void startServers(boolean withObservers) throws Exception { + void startServers(boolean withObservers, boolean withOracle) throws Exception { int tickTime = 2000; int initLimit = 3; int syncLimit = 3; @@ -152,21 +194,39 @@ void startServers(boolean withObservers) throws Exception { peers.get(Long.valueOf(5)).type = LearnerType.OBSERVER; } - LOG.info("creating QuorumPeer 1 port {}", portClient1); - s1 = new QuorumPeer(peers, s1dir, s1dir, portClient1, 3, 1, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit); - assertEquals(portClient1, s1.getClientPort()); - LOG.info("creating QuorumPeer 2 port {}", portClient2); - s2 = new QuorumPeer(peers, s2dir, s2dir, portClient2, 3, 2, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit); - assertEquals(portClient2, s2.getClientPort()); - LOG.info("creating QuorumPeer 3 port {}", portClient3); - s3 = new QuorumPeer(peers, s3dir, s3dir, portClient3, 3, 3, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit); - assertEquals(portClient3, s3.getClientPort()); - LOG.info("creating QuorumPeer 4 port {}", portClient4); - s4 = new QuorumPeer(peers, s4dir, s4dir, portClient4, 3, 4, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit); - assertEquals(portClient4, s4.getClientPort()); - LOG.info("creating QuorumPeer 5 port {}", portClient5); - s5 = new QuorumPeer(peers, s5dir, s5dir, portClient5, 3, 5, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit); - assertEquals(portClient5, s5.getClientPort()); + if (!withOracle) { + LOG.info("creating QuorumPeer 1 port {}", portClient1); + s1 = new QuorumPeer(peers, s1dir, s1dir, portClient1, 3, 1, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit); + assertEquals(portClient1, s1.getClientPort()); + LOG.info("creating QuorumPeer 2 port {}", portClient2); + s2 = new QuorumPeer(peers, s2dir, s2dir, portClient2, 3, 2, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit); + assertEquals(portClient2, s2.getClientPort()); + LOG.info("creating QuorumPeer 3 port {}", portClient3); + s3 = new QuorumPeer(peers, s3dir, s3dir, portClient3, 3, 3, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit); + assertEquals(portClient3, s3.getClientPort()); + LOG.info("creating QuorumPeer 4 port {}", portClient4); + s4 = new QuorumPeer(peers, s4dir, s4dir, portClient4, 3, 4, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit); + assertEquals(portClient4, s4.getClientPort()); + LOG.info("creating QuorumPeer 5 port {}", portClient5); + s5 = new QuorumPeer(peers, s5dir, s5dir, portClient5, 3, 5, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit); + assertEquals(portClient5, s5.getClientPort()); + } else { + LOG.info("creating QuorumPeer 1 port {}", portClient1); + s1 = new QuorumPeer(peers, s1dir, s1dir, portClient1, 3, 1, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, oraclePath_0 + mastership); + assertEquals(portClient1, s1.getClientPort()); + LOG.info("creating QuorumPeer 2 port {}", portClient2); + s2 = new QuorumPeer(peers, s2dir, s2dir, portClient2, 3, 2, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, oraclePath_1 + mastership); + assertEquals(portClient2, s2.getClientPort()); + LOG.info("creating QuorumPeer 3 port {}", portClient3); + s3 = new QuorumPeer(peers, s3dir, s3dir, portClient3, 3, 3, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, oraclePath_2 + mastership); + assertEquals(portClient3, s3.getClientPort()); + LOG.info("creating QuorumPeer 4 port {}", portClient4); + s4 = new QuorumPeer(peers, s4dir, s4dir, portClient4, 3, 4, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, oraclePath_3 + mastership); + assertEquals(portClient4, s4.getClientPort()); + LOG.info("creating QuorumPeer 5 port {}", portClient5); + s5 = new QuorumPeer(peers, s5dir, s5dir, portClient5, 3, 5, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, oraclePath_4 + mastership); + assertEquals(portClient5, s5.getClientPort()); + } if (withObservers) { s4.setLearnerType(LearnerType.OBSERVER); @@ -230,18 +290,18 @@ void startServers(boolean withObservers) throws Exception { } public int getLeaderIndex() { - if (s1.getPeerState() == ServerState.LEADING) { - return 0; - } else if (s2.getPeerState() == ServerState.LEADING) { - return 1; - } else if (s3.getPeerState() == ServerState.LEADING) { - return 2; - } else if (s4.getPeerState() == ServerState.LEADING) { - return 3; - } else if (s5.getPeerState() == ServerState.LEADING) { - return 4; - } - return -1; + if (s1.getPeerState() == ServerState.LEADING) { + return 0; + } else if (s2.getPeerState() == ServerState.LEADING) { + return 1; + } else if (s3.getPeerState() == ServerState.LEADING) { + return 2; + } else if (s4.getPeerState() == ServerState.LEADING) { + return 3; + } else if (s5.getPeerState() == ServerState.LEADING) { + return 4; + } + return -1; } public int getLeaderClientPort() { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBaseOracle_2Nodes.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBaseOracle_2Nodes.java new file mode 100644 index 00000000000..482027d1a9a --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBaseOracle_2Nodes.java @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.TestableZooKeeper; +import org.apache.zookeeper.server.quorum.Election; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.apache.zookeeper.server.util.OSMXBean; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class QuorumBaseOracle_2Nodes extends ClientBase{ + + private static final Logger LOG = LoggerFactory.getLogger(QuorumBase.class); + + private static final String LOCALADDR = "127.0.0.1"; + + private static final String oraclePath_0 = "./tmp/oraclePath/0/mastership/"; + private static final String oraclePath_1 = "./tmp/oraclePath/1/mastership/"; + + private static final String mastership = "value"; + + File s1dir, s2dir; + QuorumPeer s1, s2; + protected int port1; + protected int port2; + + protected int portLE1; + protected int portLE2; + + protected int portClient1; + protected int portClient2; + + protected boolean localSessionsEnabled = false; + protected boolean localSessionsUpgradingEnabled = false; + + + + @BeforeEach + @Override + public void setUp() throws Exception { + LOG.info("QuorumBase.setup {}", getTestName()); + setupTestEnv(); + + JMXEnv.setUp(); + + setUpAll(); + + port1 = PortAssignment.unique(); + port2 = PortAssignment.unique(); + + portLE1 = PortAssignment.unique(); + portLE2 = PortAssignment.unique(); + + portClient1 = PortAssignment.unique(); + portClient2 = PortAssignment.unique(); + + hostPort = "127.0.0.1:" + + portClient1 + + ",127.0.0.1:" + + portClient2; + LOG.info("Ports are: {}", hostPort); + + s1dir = ClientBase.createTmpDir(); + s2dir = ClientBase.createTmpDir(); + + startServers(); + + OSMXBean osMbean = new OSMXBean(); + if (osMbean.getUnix()) { + LOG.info("Initial fdcount is: {}", osMbean.getOpenFileDescriptorCount()); + } + + File directory = new File(oraclePath_0); + directory.mkdirs(); + FileWriter fw = new FileWriter(oraclePath_0 + mastership); + fw.write("0"); + fw.close(); + + directory = new File(oraclePath_1); + directory.mkdirs(); + fw = new FileWriter(oraclePath_1 + mastership); + fw.write("1"); + fw.close(); + + + LOG.info("Setup finished"); + } + + void startServers() throws Exception { + int tickTime = 2000; + int initLimit = 3; + int syncLimit = 3; + int connectToLearnerMasterLimit = 3; + Map peers = new HashMap(); + peers.put(Long.valueOf(1), new QuorumPeer.QuorumServer(1, new InetSocketAddress(LOCALADDR, port1), new InetSocketAddress(LOCALADDR, portLE1), new InetSocketAddress(LOCALADDR, portClient1), QuorumPeer.LearnerType.PARTICIPANT)); + peers.put(Long.valueOf(2), new QuorumPeer.QuorumServer(2, new InetSocketAddress(LOCALADDR, port2), new InetSocketAddress(LOCALADDR, portLE2), new InetSocketAddress(LOCALADDR, portClient2), QuorumPeer.LearnerType.PARTICIPANT)); + + LOG.info("creating QuorumPeer 1 port {}", portClient1); + s1 = new QuorumPeer(peers, s1dir, s1dir, portClient1, 3, 1, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, oraclePath_0 + mastership); + assertEquals(portClient1, s1.getClientPort()); + LOG.info("creating QuorumPeer 2 port {}", portClient2); + s2 = new QuorumPeer(peers, s2dir, s2dir, portClient2, 3, 2, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, oraclePath_1 + mastership); + assertEquals(portClient2, s2.getClientPort()); + + + LOG.info("QuorumPeer 1 voting view: {}", s1.getVotingView()); + LOG.info("QuorumPeer 2 voting view: {}", s2.getVotingView()); + + s1.enableLocalSessions(localSessionsEnabled); + s2.enableLocalSessions(localSessionsEnabled); + s1.enableLocalSessionsUpgrading(localSessionsUpgradingEnabled); + s2.enableLocalSessionsUpgrading(localSessionsUpgradingEnabled); + + LOG.info("start QuorumPeer 1"); + s1.start(); + LOG.info("start QuorumPeer 2"); + s2.start(); + + LOG.info("Checking ports {}", hostPort); + for (String hp : hostPort.split(",")) { + assertTrue(ClientBase.waitForServerUp(hp, CONNECTION_TIMEOUT), "waiting for server up"); + LOG.info("{} is accepting client connections", hp); + } + + // interesting to see what's there... + JMXEnv.dump(); + // make sure we have these 5 servers listed + Set ensureNames = new LinkedHashSet(); + for (int i = 1; i <= 2; i++) { + ensureNames.add("InMemoryDataTree"); + } + for (int i = 1; i <= 2; i++) { + ensureNames.add("name0=ReplicatedServer_id" + i + ",name1=replica." + i + ",name2="); + } + for (int i = 1; i <= 2; i++) { + for (int j = 1; j <= 2; j++) { + ensureNames.add("name0=ReplicatedServer_id" + i + ",name1=replica." + j); + } + } + for (int i = 1; i <= 2; i++) { + ensureNames.add("name0=ReplicatedServer_id" + i); + } + JMXEnv.ensureAll(ensureNames.toArray(new String[ensureNames.size()])); + } + + public int getLeaderIndex() { + if (s1.getPeerState() == QuorumPeer.ServerState.LEADING) { + return 0; + } else if (s2.getPeerState() == QuorumPeer.ServerState.LEADING) { + return 1; + } + return -1; + } + + public int getLeaderClientPort() { + if (s1.getPeerState() == QuorumPeer.ServerState.LEADING) { + return portClient1; + } else if (s2.getPeerState() == QuorumPeer.ServerState.LEADING) { + return portClient2; + } + return -1; + } + + public QuorumPeer getLeaderQuorumPeer() { + if (s1.getPeerState() == QuorumPeer.ServerState.LEADING) { + return s1; + } else if (s2.getPeerState() == QuorumPeer.ServerState.LEADING) { + return s2; + } + return null; + } + + public QuorumPeer getFirstObserver() { + if (s1.getLearnerType() == QuorumPeer.LearnerType.OBSERVER) { + return s1; + } else if (s2.getLearnerType() == QuorumPeer.LearnerType.OBSERVER) { + return s2; + } + return null; + } + + public int getFirstObserverClientPort() { + if (s1.getLearnerType() == QuorumPeer.LearnerType.OBSERVER) { + return portClient1; + } else if (s2.getLearnerType() == QuorumPeer.LearnerType.OBSERVER) { + return portClient2; + } + return -1; + } + + public String getPeersMatching(QuorumPeer.ServerState state) { + StringBuilder hosts = new StringBuilder(); + for (QuorumPeer p : getPeerList()) { + if (p.getPeerState() == state) { + hosts.append(String.format("%s:%d,", LOCALADDR, p.getClientAddress().getPort())); + } + } + LOG.info("getPeersMatching ports are {}", hosts); + return hosts.toString(); + } + + public ArrayList getPeerList() { + ArrayList peers = new ArrayList(); + peers.add(s1); + peers.add(s2); + return peers; + } + + public QuorumPeer getPeerByClientPort(int clientPort) { + for (QuorumPeer p : getPeerList()) { + if (p.getClientAddress().getPort() == clientPort) { + return p; + } + } + return null; + } + + public void setupServers() throws IOException { + setupServer(1); + setupServer(2); + } + + Map peers = null; + + public void setupServer(int i) throws IOException { + int tickTime = 2000; + int initLimit = 3; + int syncLimit = 3; + int connectToLearnerMasterLimit = 3; + + if (peers == null) { + peers = new HashMap(); + + peers.put(Long.valueOf(1), new QuorumPeer.QuorumServer(1, new InetSocketAddress(LOCALADDR, port1), new InetSocketAddress(LOCALADDR, portLE1), new InetSocketAddress(LOCALADDR, portClient1), QuorumPeer.LearnerType.PARTICIPANT)); + peers.put(Long.valueOf(2), new QuorumPeer.QuorumServer(2, new InetSocketAddress(LOCALADDR, port2), new InetSocketAddress(LOCALADDR, portLE2), new InetSocketAddress(LOCALADDR, portClient2), QuorumPeer.LearnerType.PARTICIPANT)); + } + + switch (i) { + case 1: + LOG.info("creating QuorumPeer 1 port {}", portClient1); + s1 = new QuorumPeer(peers, s1dir, s1dir, portClient1, 3, 1, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit); + assertEquals(portClient1, s1.getClientPort()); + break; + case 2: + LOG.info("creating QuorumPeer 2 port {}", portClient2); + s2 = new QuorumPeer(peers, s2dir, s2dir, portClient2, 3, 2, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit); + assertEquals(portClient2, s2.getClientPort()); + break; + } + } + + @AfterEach + @Override + public void tearDown() throws Exception { + LOG.info("TearDown started"); + + OSMXBean osMbean = new OSMXBean(); + if (osMbean.getUnix()) { + LOG.info("fdcount after test is: {}", osMbean.getOpenFileDescriptorCount()); + } + + shutdownServers(); + + for (String hp : hostPort.split(",")) { + assertTrue(ClientBase.waitForServerDown(hp, ClientBase.CONNECTION_TIMEOUT), "waiting for server down"); + LOG.info("{} is no longer accepting client connections", hp); + } + + JMXEnv.tearDown(); + } + public void shutdownServers() { + shutdown(s1); + shutdown(s2); + } + + public static void shutdown(QuorumPeer qp) { + if (qp == null) { + return; + } + try { + LOG.info("Shutting down quorum peer {}", qp.getName()); + qp.shutdown(); + Election e = qp.getElectionAlg(); + if (e != null) { + LOG.info("Shutting down leader election {}", qp.getName()); + e.shutdown(); + } else { + LOG.info("No election available to shutdown {}", qp.getName()); + } + LOG.info("Waiting for {} to exit thread", qp.getName()); + long readTimeout = qp.getTickTime() * qp.getInitLimit(); + long connectTimeout = qp.getTickTime() * qp.getSyncLimit(); + long maxTimeout = Math.max(readTimeout, connectTimeout); + maxTimeout = Math.max(maxTimeout, ClientBase.CONNECTION_TIMEOUT); + qp.join(maxTimeout * 2); + if (qp.isAlive()) { + fail("QP failed to shutdown in " + (maxTimeout * 2) + " seconds: " + qp.getName()); + } + } catch (InterruptedException e) { + LOG.debug("QP interrupted: {}", qp.getName(), e); + } + } + + protected TestableZooKeeper createClient() throws IOException, InterruptedException { + return createClient(hostPort); + } + + protected TestableZooKeeper createClient(String hp) throws IOException, InterruptedException { + ClientBase.CountdownWatcher watcher = new ClientBase.CountdownWatcher(); + return createClient(watcher, hp); + } + + protected TestableZooKeeper createClient(ClientBase.CountdownWatcher watcher, QuorumPeer.ServerState state) throws IOException, InterruptedException { + return createClient(watcher, getPeersMatching(state)); + } + +} diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumMajorityTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumMajorityTest.java index deaeb68d8ad..4761596736c 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumMajorityTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumMajorityTest.java @@ -57,7 +57,7 @@ public void testMajQuorums() throws Throwable { } //setup servers 1-5 to be followers - setUp(false); + setUp(false, true); Proposal p = new Proposal(); @@ -77,7 +77,7 @@ public void testMajQuorums() throws Throwable { assertEquals(true, p.hasAllQuorums()); //setup servers 1-3 to be followers and 4 and 5 to be observers - setUp(true); + setUp(true, true); p = new Proposal(); p.addQuorumVerifier(s1.getQuorumVerifier()); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumOracleMajTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumOracleMajTest.java new file mode 100644 index 00000000000..1b1fb3125ba --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumOracleMajTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import java.util.ArrayList; +import org.apache.zookeeper.jmx.MBeanRegistry; +import org.apache.zookeeper.server.quorum.Leader; +import org.apache.zookeeper.server.quorum.LearnerHandler; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class QuorumOracleMajTest extends QuorumBaseOracle_2Nodes { + + protected static final Logger LOG = LoggerFactory.getLogger(QuorumMajorityTest.class); + public static final long CONNECTION_TIMEOUT = ClientTest.CONNECTION_TIMEOUT; + + /***************************************************************/ + /* Test that the majority quorum verifier only counts votes from */ + /* followers in its view */ + /***************************************************************/ + @Test + public void testMajQuorums() throws Throwable { + LOG.info("Verify QuorumPeer#electionTimeTaken jmx bean attribute"); + + ArrayList peers = getPeerList(); + for (int i = 1; i <= peers.size(); i++) { + QuorumPeer qp = peers.get(i - 1); + Long electionTimeTaken = -1L; + String bean = ""; + if (qp.getPeerState() == QuorumPeer.ServerState.FOLLOWING) { + bean = String.format("%s:name0=ReplicatedServer_id%d,name1=replica.%d,name2=Follower", MBeanRegistry.DOMAIN, i, i); + } else if (qp.getPeerState() == QuorumPeer.ServerState.LEADING) { + bean = String.format("%s:name0=ReplicatedServer_id%d,name1=replica.%d,name2=Leader", MBeanRegistry.DOMAIN, i, i); + } + electionTimeTaken = (Long) JMXEnv.ensureBeanAttribute(bean, "ElectionTimeTaken"); + assertTrue(electionTimeTaken >= 0, "Wrong electionTimeTaken value!"); + } + + //setup servers 1-2 to be followers + // id=1, oracle is false; id=2, oracle is true + setUp(); + + QuorumPeer s; + int leader; + if ((leader = getLeaderIndex()) == 1) { + s = s1; + } else { + s = s2; + } + + noDropConectionTest(s); + + dropConnectionTest(s, leader); + + } + + private void noDropConectionTest(QuorumPeer s) { + Leader.Proposal p = new Leader.Proposal(); + + + p.addQuorumVerifier(s.getQuorumVerifier()); + + // 1 followers out of 2 is not a majority + p.addAck(Long.valueOf(1)); + assertEquals(false, p.hasAllQuorums()); + + // 6 is not in the view - its vote shouldn't count + p.addAck(Long.valueOf(6)); + assertEquals(false, p.hasAllQuorums()); + + // 2 followers out of 2 is good + p.addAck(Long.valueOf(2)); + assertEquals(true, p.hasAllQuorums()); + + } + + + private void dropConnectionTest(QuorumPeer s, int leader) { + Leader.Proposal p = new Leader.Proposal(); + p.addQuorumVerifier(s.getQuorumVerifier()); + + ArrayList fake = new ArrayList<>(); + + LearnerHandler f = null; + fake.add(f); + + s.getQuorumVerifier().updateNeedOracle(fake); + // still have valid followers, the oracle should not take place + assertEquals(false, s.getQuorumVerifier().getNeedOracle()); + + fake.remove(0); + s.getQuorumVerifier().updateNeedOracle(fake); + // lose all of followers, the oracle should take place + assertEquals(true, s.getQuorumVerifier().getNeedOracle()); + + + // when leader is 1, we expect false. + // when leader is 2, we expect true. + assertEquals(leader != 1, p.hasAllQuorums()); + } +} diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpObserverTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpObserverTest.java index b3e1d47ee7d..52d2bd80f5b 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpObserverTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpObserverTest.java @@ -35,7 +35,7 @@ public static void applyMockUps() { @BeforeEach @Override public void setUp() throws Exception { - super.setUp(true /* withObservers */); + super.setUp(true /* withObservers */, false); } @Test diff --git a/zookeeper-server/src/test/resources/data/invalidsnap/version-2/snapshot.83f b/zookeeper-server/src/test/resources/data/invalidsnap/version-2/snapshot.83f index 26dc5f66521..1b563b6ed6d 100644 Binary files a/zookeeper-server/src/test/resources/data/invalidsnap/version-2/snapshot.83f and b/zookeeper-server/src/test/resources/data/invalidsnap/version-2/snapshot.83f differ