diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperOracleQuorums.md b/zookeeper-docs/src/main/resources/markdown/zookeeperOracleQuorums.md
new file mode 100644
index 00000000000..adc547778bb
--- /dev/null
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperOracleQuorums.md
@@ -0,0 +1,202 @@
+
+
+# Introduction to Oracle Quorum
+The introduction to Oracle Quorum increases the availability of a cluster of 2 ZooKeeper instances with a failure detector as known as the Oracle.
+ The Oracle is designed to grant the permission to the instance which is the only remaining instance
+in a 2-instance configuration when the other instance is identified as faulty by the fail detector, the Oracle.
+
+## The implementation of the Oracle
+Every instance shall access to a file which contains either 0 or 1 to indicate whether this instance is authorized by the Oracle.
+However, this design can be changed since the fail detector algorithms vary from each other. Therefore, ones can override the method of _askOracle()_ in _QuorumOracleMaj_ to adapt the preferred way of deciphering the message from the Oracle.
+
+## The deployment cotexts
+The Oracle is designed to increase the availability of a cluster of 2 ZooKeeper instances; thus, the size of the voting member is **2**.
+In other words, the Oracle solves the consensus problem of a possibility of faulty instance in a two-instance ensemble.
+
+In the case that the size of the voting members exceeds 2, the expected way to make the Oracle work correctly is to reconfigure the size of the cluster when a faulty machine is identified.
+For example, with a configuration of 5 instances, when a faulty machine breaks the connection with the Leader, it is expected to have a _reconfig_ client request to the cluster, which makes the cluster to re-form as the configuration of 4 instances.
+Therefore, once the size of the voting member equals to 2, the configuration falls into the problem domain which the Oracle is designed to address.
+
+## How to deploy the Oracle in _zoo.cfg_
+Regardless of the size of the cluster, the _oraclePath_ must be configured at the time of the initialization, which is like other static parameters.
+The below shows the correct way to specify and enable the Oracle.
+
+ oraclePath=/to/some/file
+
+#### An example of zoo.cfg:
+
+ dataDir=/data
+ dataLogDir=/datalog
+ tickTime=2000
+ initLimit=5
+ syncLimit=2
+ autopurge.snapRetainCount=3
+ autopurge.purgeInterval=0
+ maxClientCnxns=60
+ standaloneEnabled=true
+ admin.enableServer=true
+ oraclePath=/chassis/mastership
+ server.1=0.0.0.0:2888:3888;2181
+ server.2=hw1:2888:3888;2181
+
+The QuorumOracleMaj is designed to read the result of a failure detector, which is written on a text file, the oracle file.
+The configuration in the zoo.cfg like the following:
+
+ oraclePath=/to/some/file
+
+Suppose you have the result of the failure detector written on /some/path/result.txt, and then the correct configuration is the following:
+
+ oraclePath=/some/path/result.txt
+
+So, what is the correct content of the provided file? An example file can be created with the following command from the terminal:
+
+ $echo 1 > /some/path/result.txt
+
+Any equivalent files are suitable for the current implementation of QuorumOracleMaj.
+The number of oracle files should be equal to the number of ZooKeeper instances configured to enable the Oracle.
+In other words, each ZooKeeper instance should have its oracle file, and the files shall not be shared; otherwise, the issues in the next section will arise.
+
+## What differs after the deployment of the Oracle enabled
+The _QuorumPeerConfig_ will create an instance of _QuorumOracleMaj_ instead of the default QuorumVerifier, _QuorumMaj_ when it reads the _zoo.cfg_ contains _oraclePath_.
+QuorumOracleMaj inheritances from QuorumMaj, and differs from its superclass by overriding the method of _containsQuorum()_.
+QuorumOracleMaj is designed to execute its version of _containsQuorum_ when the Leader loses all of its followers, and fails to maintain the quorum.
+In other cases, _QuorumOracleMaj_ shall execute as _QuorumMaj_.
+
+## What we should pay attention to the Oracle
+We consider an asynchronous distributed system which consists of **2** ZooKeeper instances and an Oracle.
+
+### Liveness Issue:
+When we consider the oracle satisfies the following property introduced by [CT]:
+
+ Strong Completeness: There is a time after which every process that crashes is permanently suspected by every correct processes
+
+The liveness of the system is ensured by the Oracle.
+However, when the introduced oracle fails to maintain this property, the lost of the liveness is expected as the following example,
+
+Suppose we have a Leader and a Follower, which are running in the broadcasting state,
+The system will lose its liveness when:
+
+ 1. The Leader fails, but the Oracle does not detect the faulty Leader, which means the Oracle will not authorize the Follower to become a new Leader.
+ 2. When a Follower fails, but the Oracle does not detect the faulty follower, which means the Oracle will authorize the Leader to move system forward.
+
+### Safety Issue:
+#### Lost of Progress
+The progress can lost when multiple failures occurs in the system at different time as the following example,
+
+Suppose we have a Leader(Ben) and a Follower(John) in the broadcasting state,
+
+ At T1 with zxid(0x1_1): L-Ben fails, and the F-John takes over the system under the authorization from the Oracle.
+ At T2 with zxid(0x2_1): The F-John becomes a new Leader, L-John, and starts a new epoch.
+ At T3 with zxid(0x2_A): L-John fails
+ At T4 with zxid(0x2_A): Ben recovers up and starts its leader election.
+ At T5 with zxid(0x3_1): Ben becomes the new leader, L-Ben, under the authorization from the Oracle.
+
+In this case, the system loses its progress after the L-Ben failed.
+
+
+However, the lost of progress can be prevented by making the Oracle is capable of referring the latest zxid.
+When the Oracle could refer to the latest zxid,
+
+ At T5 with zxid(0x2_A): Ben will not end his leader election because the Oracle would not authorize although John is down.
+
+Nevertheless, we exchange the liveness for the safety.
+#### Split Brain Issue
+We consider the Oracle satisfies the following desired property introduced by [CT],
+
+ Accuracy: There is a time after which some correct processes is never suspected by any processes
+
+Nevertheless, the decisions which the Oracle gives out should be mutual exclusive.
+
+In other words,
+
+Suppose we have a Leader(Ben) and a Follower(John) in the broadcasting state,
+
+ - At any time, the Oracle will not authorize both Ben and John even though the failure detectors think each other is faulty.
+ Or
+ - At any time, for any two values in any two Oracle files respectively, the values are not both equal to 1.
+
+The split brain is expected when the Oracle fails to maintain this property during the leader election phase of
+
+ 1. Start of the system
+ 2. A failed instance recovers from failures.
+
+## Examples of Concepts for Implementation of a Failure Detector
+One should consider that the failure detector's outcome is to authorize the querying ZooKeeper instance whether it has the right to move the system forward without waiting for the faulty instance, which is identified by the failure detector.
+
+### An Implementation of Hardware
+Suppose two dedicated pieces of hardware, hw1 and hw2, can host ZooKeeper instances, zk1 and zk2, respectively, and form a cluster.
+A hardware device is attached to both of the hardware, and it is capable of determining whether the hardware is power on or not.
+So, when hw1 is not power on, the zk1 is undoubtedly faulty.
+Therefore, the hardware device updates the oracle file on hw2 to 1, which indicates that zk1 is faulty and authorizes zk2 to move the system forwards.
+
+### An Implementation of Software
+Suppose two dedicated pieces of hardware, hw1 and hw2, can host ZooKeeper instances, zk1 and zk2, respectively, and form a cluster.
+One can have two more services, o1 and o2, on hw1 and hw2, respectively. The job of o1 and o2 are detecting the other hardware is alive or not.
+For example, o1 can constantly ping hw2 to determine if hw2 is power on or not.
+When o1 cannot ping hw2, o1 identifies that hw2 is faulty and then update the oracle file of zk1 to 1, which indicates that zk2 is faulty and authorizes zk1 to move the system forwards.
+
+### Use USB devices as Oracle to Maintain Progress
+In macOS,10.15.7 (19H2), the external storage devices are mounted under `/Volumes`.
+Thus, we can insert a USB device which contains the required information as the oracle.
+When the device is connected, the oracle authorizes the leader to move system forward, which also means the other instance fails.
+There are **SIX** steps to reproduce this stimulation.
+
+* Firstly, insert a USB device named `Oracle`, and then we can expect that `/Volumes/Oracle` is accessible.
+* Secondly, we create a file contains `1` under `/Volumes/Oracle` named `mastership`.
+Now we can access `/Volumes/Oracle/mastership`, and so does the zookeeper instances to see whether it has the right to move the system forward.
+The file can easily be generated by the following command:
+
+
+ $echo 1 > mastership
+
+* Thirdly, you shall have a `zoo.cfg` like the example below:
+
+
+ dataDir=/data
+ dataLogDir=/datalog
+ tickTime=2000
+ initLimit=5
+ syncLimit=2
+ autopurge.snapRetainCount=3
+ autopurge.purgeInterval=0
+ maxClientCnxns=60
+ standaloneEnabled=true
+ admin.enableServer=true
+ oraclePath=/Volumes/Oracle/mastership
+ server.1=0.0.0.0:2888:3888;2181
+ server.2=hw1:2888:3888;2181
+
+_(NOTE) The split brain issues will not occur because there is only a SINGLE USB device in this stimulation._
+_Additionally, `mastership` should not be shared by multiple instances._
+_Thus, only one ZooKeeper instance is configured with Oracle._
+_For more, please refer to Section Safety Issue._
+
+* Fourthly, start the cluster, and it is expected it forms a quorum normally.
+* Fifthly, terminate the instance either without attaching to a USB device or `mastership` contains 0.
+There are two scenarios to expect:
+ 1. A leader failure occurs, and the remained instance finishes the leader election on its own due to the oracle.
+ 2. The quorum is still maintained due to the oracle.
+
+* Lastly, when the USB device is removed, `/Volumes/Oracle/mastership` becomes unavailable.
+Therefore, according to the current implementation, whenever the Leader queries the oracle, the oracle throws an exception and return `FALSE`.
+Repeat the fifth step, and then it is expected that either the system cannot recover from a leader failure ,or the leader loses the quorum.
+In either case, the service is interrupted.
+
+With these steps, we can show and practice how the oracle works with two-instance systems with ease.
+
+##REFERENCE
+[CT] Tushar Deepak Chandra and Sam Toueg. 1991. Unreliable failure detectors for asynchronous systems (preliminary version). In Proceedings of the tenth annual ACM symposium on Principles of distributed computing (PODC '91). Association for Computing Machinery, New York, NY, USA, 325–340. DOI:https://doi.org/10.1145/112600.112627
\ No newline at end of file
diff --git a/zookeeper-server/pom.xml b/zookeeper-server/pom.xml
index 72211dfe14d..32177dc6fad 100755
--- a/zookeeper-server/pom.xml
+++ b/zookeeper-server/pom.xml
@@ -172,6 +172,12 @@
snappy-javaprovided
+
+ commons-io
+ commons-io
+ 2.6
+ compile
+
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/cli/ReconfigCommand.java b/zookeeper-server/src/main/java/org/apache/zookeeper/cli/ReconfigCommand.java
index 8afc14bb517..ce490ad8039 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/cli/ReconfigCommand.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/cli/ReconfigCommand.java
@@ -130,7 +130,7 @@ public CliCommand parse(String[] cmdArgs) throws CliParseException {
//check that membership makes sense; leader will make these checks again
//don't check for leader election ports since
//client doesn't know what leader election alg is used
- members = QuorumPeerConfig.parseDynamicConfig(dynamicCfg, 0, true, false).toString();
+ members = QuorumPeerConfig.parseDynamicConfig(dynamicCfg, 0, true, false, null).toString();
} catch (Exception e) {
throw new CliParseException("Error processing " + cl.getOptionValue("file") + e.getMessage());
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
index e71828d262b..11b5ccbc8d3 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
@@ -65,6 +65,7 @@
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
+import org.apache.zookeeper.server.quorum.flexible.QuorumOracleMaj;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.txn.CheckVersionTxn;
import org.apache.zookeeper.txn.CloseSessionTxn;
@@ -452,7 +453,7 @@ protected void pRequest2Txn(int type, long zxid, Request request, Record record,
try {
Properties props = new Properties();
props.load(new StringReader(newMembers));
- request.qv = QuorumPeerConfig.parseDynamicConfig(props, lzks.self.getElectionType(), true, false);
+ request.qv = QuorumPeerConfig.parseDynamicConfig(props, lzks.self.getElectionType(), true, false, lastSeenQV.getOraclePath());
request.qv.setVersion(request.getHdr().getZxid());
} catch (IOException | ConfigException e) {
throw new KeeperException.BadArgumentsException(e.getMessage());
@@ -472,7 +473,7 @@ protected void pRequest2Txn(int type, long zxid, Request request, Record record,
leavingServers = StringUtils.split(leavingServersString, ",");
}
- if (!(lastSeenQV instanceof QuorumMaj)) {
+ if (!(lastSeenQV instanceof QuorumMaj) && !(lastSeenQV instanceof QuorumOracleMaj)) {
String msg = "Incremental reconfiguration requested but last configuration seen has a non-majority quorum system";
LOG.warn(msg);
throw new KeeperException.BadArgumentsException(msg);
@@ -514,7 +515,13 @@ protected void pRequest2Txn(int type, long zxid, Request request, Record record,
} catch (ConfigException e) {
throw new KeeperException.BadArgumentsException("Reconfiguration failed");
}
- request.qv = new QuorumMaj(nextServers);
+
+ if (lastSeenQV instanceof QuorumMaj) {
+ request.qv = new QuorumMaj(nextServers);
+ } else {
+ request.qv = new QuorumOracleMaj(nextServers, lastSeenQV.getOraclePath());
+ }
+
request.qv.setVersion(request.getHdr().getZxid());
}
if (QuorumPeerConfig.isStandaloneEnabled() && request.qv.getVotingMembers().size() < 2) {
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FastLeaderElection.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FastLeaderElection.java
index 0950c6d4c2e..9fc9d148c39 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FastLeaderElection.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FastLeaderElection.java
@@ -34,6 +34,7 @@
import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
+import org.apache.zookeeper.server.quorum.flexible.QuorumOracleMaj;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.ZxidUtils;
import org.slf4j.Logger;
@@ -948,7 +949,7 @@ public Vote lookForLeader() throws InterruptedException {
Long.toHexString(proposedZxid));
sendNotifications();
- SyncedLearnerTracker voteSet;
+ SyncedLearnerTracker voteSet = null;
/*
* Loop in which we exchange notifications until we find a leader
@@ -977,7 +978,24 @@ public Vote lookForLeader() throws InterruptedException {
*/
int tmpTimeOut = notTimeout * 2;
notTimeout = Math.min(tmpTimeOut, maxNotificationInterval);
+
+ /*
+ * When a leader failure happens on a master, the backup will be supposed to receive the honour from
+ * Oracle and become a leader, but the honour is likely to be delay. We do a re-check once timeout happens
+ *
+ * The leader election algorithm does not provide the ability of electing a leader from a single instance
+ * which is in a configuration of 2 instances.
+ * */
+ self.getQuorumVerifier().revalidateVoteset(voteSet, notTimeout != minNotificationInterval);
+ if (self.getQuorumVerifier() instanceof QuorumOracleMaj && voteSet != null && voteSet.hasAllQuorums() && notTimeout != minNotificationInterval) {
+ setPeerState(proposedLeader, voteSet);
+ Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
+ leaveInstance(endVote);
+ return endVote;
+ }
+
LOG.info("Notification time out: {} ms", notTimeout);
+
} else if (validVoter(n.sid) && validVoter(n.leader)) {
/*
* Only proceed if the vote comes from a replica in the current or next
@@ -1051,43 +1069,53 @@ public Vote lookForLeader() throws InterruptedException {
case OBSERVING:
LOG.debug("Notification from observer: {}", n.sid);
break;
+
+ /*
+ * In ZOOKEEPER-3922, we separate the behaviors of FOLLOWING and LEADING.
+ * To avoid the duplication of codes, we create a method called followingBehavior which was used to
+ * shared by FOLLOWING and LEADING. This method returns a Vote. When the returned Vote is null, it follows
+ * the original idea to break swtich statement; otherwise, a valid returned Vote indicates, a leader
+ * is generated.
+ *
+ * The reason why we need to separate these behaviors is to make the algorithm runnable for 2-node
+ * setting. An extra condition for generating leader is needed. Due to the majority rule, only when
+ * there is a majority in the voteset, a leader will be generated. However, in a configuration of 2 nodes,
+ * the number to achieve the majority remains 2, which means a recovered node cannot generate a leader which is
+ * the existed leader. Therefore, we need the Oracle to kick in this situation. In a two-node configuration, the Oracle
+ * only grants the permission to maintain the progress to one node. The oracle either grants the permission to the
+ * remained node and makes it a new leader when there is a faulty machine, which is the case to maintain the progress.
+ * Otherwise, the oracle does not grant the permission to the remained node, which further causes a service down.
+ *
+ * In the former case, when a failed server recovers and participate in the leader election, it would not locate a
+ * new leader because there does not exist a majority in the voteset. It fails on the containAllQuorum() infinitely due to
+ * two facts. First one is the fact that it does do not have a majority in the voteset. The other fact is the fact that
+ * the oracle would not give the permission since the oracle already gave the permission to the existed leader, the healthy machine.
+ * Logically, when the oracle replies with negative, it implies the existed leader which is LEADING notification comes from is a valid leader.
+ * To threat this negative replies as a permission to generate the leader is the purpose to separate these two behaviors.
+ *
+ *
+ * */
case FOLLOWING:
- case LEADING:
/*
- * Consider all notifications from the same epoch
- * together.
- */
- if (n.electionEpoch == logicalclock.get()) {
- recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
- voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
- if (voteSet.hasAllQuorums() && checkLeader(recvset, n.leader, n.electionEpoch)) {
- setPeerState(n.leader, voteSet);
- Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
- leaveInstance(endVote);
- return endVote;
- }
+ * To avoid duplicate codes
+ * */
+ Vote resultFN = receivedFollowingNotification(recvset, outofelection, voteSet, n);
+ if (resultFN == null) {
+ break;
+ } else {
+ return resultFN;
}
-
+ case LEADING:
/*
- * Before joining an established ensemble, verify that
- * a majority are following the same leader.
- *
- * Note that the outofelection map also stores votes from the current leader election.
- * See ZOOKEEPER-1732 for more information.
- */
- outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
- voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
-
- if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
- synchronized (this) {
- logicalclock.set(n.electionEpoch);
- setPeerState(n.leader, voteSet);
- }
- Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
- leaveInstance(endVote);
- return endVote;
+ * In leadingBehavior(), it performs followingBehvior() first. When followingBehavior() returns
+ * a null pointer, ask Oracle whether to follow this leader.
+ * */
+ Vote resultLN = receivedLeadingNotification(recvset, outofelection, voteSet, n);
+ if (resultLN == null) {
+ break;
+ } else {
+ return resultLN;
}
- break;
default:
LOG.warn("Notification state unrecognized: {} (n.state), {}(n.sid)", n.state, n.sid);
break;
@@ -1115,6 +1143,74 @@ public Vote lookForLeader() throws InterruptedException {
}
}
+ private Vote receivedFollowingNotification(Map recvset, Map outofelection, SyncedLearnerTracker voteSet, Notification n) {
+ /*
+ * Consider all notifications from the same epoch
+ * together.
+ */
+ if (n.electionEpoch == logicalclock.get()) {
+ recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
+ voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
+ if (voteSet.hasAllQuorums() && checkLeader(recvset, n.leader, n.electionEpoch)) {
+ setPeerState(n.leader, voteSet);
+ Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
+ leaveInstance(endVote);
+ return endVote;
+ }
+ }
+
+ /*
+ * Before joining an established ensemble, verify that
+ * a majority are following the same leader.
+ *
+ * Note that the outofelection map also stores votes from the current leader election.
+ * See ZOOKEEPER-1732 for more information.
+ */
+ outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
+ voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
+
+ if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
+ synchronized (this) {
+ logicalclock.set(n.electionEpoch);
+ setPeerState(n.leader, voteSet);
+ }
+ Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
+ leaveInstance(endVote);
+ return endVote;
+ }
+
+ return null;
+ }
+
+ private Vote receivedLeadingNotification(Map recvset, Map outofelection, SyncedLearnerTracker voteSet, Notification n) {
+ /*
+ *
+ * In a two-node configuration, a recovery nodes cannot locate a leader because of the lack of the majority in the voteset.
+ * Therefore, it is the time for Oracle to take place as a tight breaker.
+ *
+ * */
+ Vote result = receivedFollowingNotification(recvset, outofelection, voteSet, n);
+ if (result == null) {
+ /*
+ * Ask Oracle to see if it is okay to follow this leader.
+ *
+ * We don't need the CheckLeader() because itself cannot be a leader candidate
+ * */
+ if (self.getQuorumVerifier().getNeedOracle() && !self.getQuorumVerifier().askOracle()) {
+ LOG.info("Oracle indicates to follow");
+ setPeerState(n.leader, voteSet);
+ Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
+ leaveInstance(endVote);
+ return endVote;
+ } else {
+ LOG.info("Oracle indicates not to follow");
+ return null;
+ }
+ } else {
+ return result;
+ }
+ }
+
/**
* Check if a given sid is represented in either the current or
* the next voting view
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
index 2de2ceeb8b0..ce8f7999c45 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
@@ -174,6 +174,10 @@ public List getNonVotingFollowers() {
void addForwardingFollower(LearnerHandler lh) {
synchronized (forwardingFollowers) {
forwardingFollowers.add(lh);
+ /*
+ * Any changes on forwardiongFollowers could possible affect the need of Oracle.
+ * */
+ self.getQuorumVerifier().updateNeedOracle(new ArrayList<>(forwardingFollowers));
}
}
@@ -757,7 +761,27 @@ void lead() throws IOException, InterruptedException {
break;
}
- if (!tickSkip && !syncedAckSet.hasAllQuorums()) {
+ /*
+ *
+ * We will need to re-validate the outstandingProposal to maintain the progress of ZooKeeper.
+ * It is likely a proposal is waiting for enough ACKs to be committed. The proposals are sent out, but the
+ * only follower goes away which makes the proposals will not be committed until the follower recovers back.
+ * An earlier proposal which is not committed will block any further proposals. So, We need to re-validate those
+ * outstanding proposal with the help from Oracle. A key point in the process of re-validation is that the proposals
+ * need to be processed in order.
+ *
+ * We make the whole method blocking to avoid any possible race condition on outstandingProposal and lastCommitted
+ * as well as to avoid nested synchronization.
+ *
+ * As a more generic approach, we pass the object of forwardingFollowers to QuorumOracleMaj to determine if we need
+ * the help from Oracle.
+ *
+ *
+ * the size of outstandingProposals can be 1. The only one outstanding proposal is the one waiting for the ACK from
+ * the leader itself.
+ * */
+ if (!tickSkip && !syncedAckSet.hasAllQuorums()
+ && !(self.getQuorumVerifier().overrideQuorumDecision(getForwardingFollowers()) && self.getQuorumVerifier().revalidateOutstandingProp(this, new ArrayList<>(outstandingProposals.values()), lastCommitted))) {
// Lost quorum of last committed and/or last proposed
// config, set shutdown flag
shutdownMessage = "Not sufficient followers synced, only synced with sids: [ "
@@ -909,10 +933,10 @@ public synchronized boolean tryToCommit(Proposal p, long zxid, SocketAddress fol
// commit proposals in order
if (zxid != lastCommitted + 1) {
LOG.warn(
- "Commiting zxid 0x{} from {} noy first!",
+ "Commiting zxid 0x{} from {} not first!",
Long.toHexString(zxid),
followerAddr);
- LOG.warn("First is {}", (lastCommitted + 1));
+ LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1));
}
outstandingProposals.remove(zxid);
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
index 3102c6379ed..19aae086027 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
@@ -78,6 +78,7 @@
import org.apache.zookeeper.server.quorum.auth.SaslQuorumAuthLearner;
import org.apache.zookeeper.server.quorum.auth.SaslQuorumAuthServer;
import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
+import org.apache.zookeeper.server.quorum.flexible.QuorumOracleMaj;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.ConfigUtils;
import org.apache.zookeeper.server.util.JvmPauseMonitor;
@@ -1254,6 +1255,22 @@ public QuorumPeer(Map quorumPeers, File snapDir, File logDir
new QuorumMaj(quorumPeers));
}
+ public QuorumPeer(Map quorumPeers, File snapDir, File logDir, int clientPort, int electionAlg, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit, String oraclePath) throws IOException {
+ this(
+ quorumPeers,
+ snapDir,
+ logDir,
+ electionAlg,
+ myid,
+ tickTime,
+ initLimit,
+ syncLimit,
+ connectToLearnerMasterLimit,
+ false,
+ ServerCnxnFactory.createFactory(getClientAddress(quorumPeers, myid, clientPort), -1),
+ new QuorumOracleMaj(quorumPeers, oraclePath));
+ }
+
/**
* This constructor is only used by the existing unit test code.
* It defaults to FileLogProvider persistence provider.
@@ -1808,7 +1825,7 @@ public int getTick() {
public QuorumVerifier configFromString(String s) throws IOException, ConfigException {
Properties props = new Properties();
props.load(new StringReader(s));
- return QuorumPeerConfig.parseDynamicConfig(props, electionType, false, false);
+ return QuorumPeerConfig.parseDynamicConfig(props, electionType, false, false, getQuorumVerifier().getOraclePath());
}
/**
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
index c56204f1f76..1b37f291f51 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
@@ -52,6 +52,7 @@
import org.apache.zookeeper.server.quorum.auth.QuorumAuth;
import org.apache.zookeeper.server.quorum.flexible.QuorumHierarchical;
import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
+import org.apache.zookeeper.server.quorum.flexible.QuorumOracleMaj;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.JvmPauseMonitor;
import org.apache.zookeeper.server.util.VerifyingFileFactory;
@@ -130,6 +131,8 @@ public class QuorumPeerConfig {
Integer.parseInt(System.getProperty(QuorumPeer.CONFIG_KEY_MULTI_ADDRESS_REACHABILITY_CHECK_TIMEOUT_MS,
String.valueOf(MultipleAddresses.DEFAULT_TIMEOUT.toMillis())));
+ protected String oraclePath;
+
/**
* Minimum snapshot retain count.
* @see org.apache.zookeeper.server.PurgeTxnLog#purge(File, File, int)
@@ -380,6 +383,8 @@ public void parseProperties(Properties zkProp) throws IOException, ConfigExcepti
multiAddressReachabilityCheckTimeoutMs = Integer.parseInt(value);
} else if (key.equals("multiAddress.reachabilityCheckEnabled")) {
multiAddressReachabilityCheckEnabled = parseBoolean(key, value);
+ } else if (key.equals("oraclePath")) {
+ oraclePath = value;
} else {
System.setProperty("zookeeper." + key, value);
}
@@ -629,6 +634,15 @@ public static void deleteFile(String filename) {
}
}
+
+ private static QuorumVerifier createQuorumVerifier(Properties dynamicConfigProp, boolean isHierarchical, String oraclePath) throws ConfigException {
+ if (oraclePath == null) {
+ return createQuorumVerifier(dynamicConfigProp, isHierarchical);
+ } else {
+ return new QuorumOracleMaj(dynamicConfigProp, oraclePath);
+ }
+ }
+
private static QuorumVerifier createQuorumVerifier(Properties dynamicConfigProp, boolean isHierarchical) throws ConfigException {
if (isHierarchical) {
return new QuorumHierarchical(dynamicConfigProp);
@@ -642,7 +656,7 @@ private static QuorumVerifier createQuorumVerifier(Properties dynamicConfigProp,
}
void setupQuorumPeerConfig(Properties prop, boolean configBackwardCompatibilityMode) throws IOException, ConfigException {
- quorumVerifier = parseDynamicConfig(prop, electionAlg, true, configBackwardCompatibilityMode);
+ quorumVerifier = parseDynamicConfig(prop, electionAlg, true, configBackwardCompatibilityMode, oraclePath);
setupMyId();
setupClientPort();
setupPeerType();
@@ -656,7 +670,7 @@ void setupQuorumPeerConfig(Properties prop, boolean configBackwardCompatibilityM
* @throws IOException
* @throws ConfigException
*/
- public static QuorumVerifier parseDynamicConfig(Properties dynamicConfigProp, int eAlg, boolean warnings, boolean configBackwardCompatibilityMode) throws IOException, ConfigException {
+ public static QuorumVerifier parseDynamicConfig(Properties dynamicConfigProp, int eAlg, boolean warnings, boolean configBackwardCompatibilityMode, String oraclePath) throws IOException, ConfigException {
boolean isHierarchical = false;
for (Entry