Skip to content

Commit

Permalink
[ZOOKEEPER-3690] Improving leader efficiency via not processing learn…
Browse files Browse the repository at this point in the history
…er forwarded requests in commit processor

Author: Fangmin Lyu <[email protected]>

Reviewers: Enrico Olivelli <[email protected]>, Michael Han <[email protected]>

Closes apache#1223 from lvfangmin/ZOOKEEPER-3690
  • Loading branch information
lvfangmin authored and RokLenarcic committed Aug 31, 2022
1 parent 09cd333 commit 1857e98
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 1 deletion.
8 changes: 8 additions & 0 deletions zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
Original file line number Diff line number Diff line change
Expand Up @@ -1148,6 +1148,14 @@ property, when available, is noted below.
(Jave system property only: **learner.closeSocketAsync**)
When enabled, a learner will close the quorum socket asynchronously. This is useful for TLS connections where closing a socket might take a long time, block the shutdown process, potentially delay a new leader election, and leave the quorum unavailabe. Closing the socket asynchronously avoids blocking the shutdown process despite the long socket closing time and a new leader election can be started while the socket being closed. The default is false.

* *forward_learner_requests_to_commit_processor_disabled*
(Jave system property: **zookeeper.forward_learner_requests_to_commit_processor_disabled**)
When this property is set, the requests from learners won't be enqueued to
CommitProcessor queue, which will help save the resources and GC time on
leader.

The default value is false.


<a name="sc_clusterOptions"></a>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ private ServerMetrics(MetricsProvider metricsProvider) {

SOCKET_CLOSING_TIME = metricsContext.getSummary("socket_closing_time", DetailLevel.BASIC);

REQUESTS_NOT_FORWARDED_TO_COMMIT_PROCESSOR = metricsContext.getCounter(
"requests_not_forwarded_to_commit_processor");
}

/**
Expand Down Expand Up @@ -465,6 +467,7 @@ private ServerMetrics(MetricsProvider metricsProvider) {

public final Summary SOCKET_CLOSING_TIME;

public final Counter REQUESTS_NOT_FORWARDED_TO_COMMIT_PROCESSOR;

private final MetricsProvider metricsProvider;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.server.SyncRequestProcessor;
import org.apache.zookeeper.server.quorum.Leader.XidRolloverException;
import org.slf4j.Logger;
Expand All @@ -39,11 +40,22 @@ public class ProposalRequestProcessor implements RequestProcessor {

SyncRequestProcessor syncProcessor;

// If this property is set, requests from Learners won't be forwarded
// to the CommitProcessor in order to save resources
public static final String FORWARD_LEARNER_REQUESTS_TO_COMMIT_PROCESSOR_DISABLED =
"zookeeper.forward_learner_requests_to_commit_processor_disabled";
private final boolean forwardLearnerRequestsToCommitProcessorDisabled;

public ProposalRequestProcessor(LeaderZooKeeperServer zks, RequestProcessor nextProcessor) {
this.zks = zks;
this.nextProcessor = nextProcessor;
AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());
syncProcessor = new SyncRequestProcessor(zks, ackProcessor);

forwardLearnerRequestsToCommitProcessorDisabled = Boolean.getBoolean(
FORWARD_LEARNER_REQUESTS_TO_COMMIT_PROCESSOR_DISABLED);
LOG.info("{} = {}", FORWARD_LEARNER_REQUESTS_TO_COMMIT_PROCESSOR_DISABLED,
forwardLearnerRequestsToCommitProcessorDisabled);
}

/**
Expand All @@ -70,7 +82,9 @@ public void processRequest(Request request) throws RequestProcessorException {
if (request instanceof LearnerSyncRequest) {
zks.getLeader().processSync((LearnerSyncRequest) request);
} else {
nextProcessor.processRequest(request);
if (shouldForwardToNextProcessor(request)) {
nextProcessor.processRequest(request);
}
if (request.getHdr() != null) {
// We need to sync and get consensus on any transactions
try {
Expand All @@ -89,4 +103,14 @@ public void shutdown() {
syncProcessor.shutdown();
}

private boolean shouldForwardToNextProcessor(Request request) {
if (!forwardLearnerRequestsToCommitProcessorDisabled) {
return true;
}
if (request.getOwner() instanceof LearnerHandler) {
ServerMetrics.getMetrics().REQUESTS_NOT_FORWARDED_TO_COMMIT_PROCESSOR.add(1);
return false;
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.zookeeper.common.X509Exception;
import org.apache.zookeeper.metrics.BaseTestMetricsProvider;
import org.apache.zookeeper.metrics.impl.NullMetricsProvider;
import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.Leader.Proposal;
import org.apache.zookeeper.test.ClientBase;
Expand Down Expand Up @@ -1620,6 +1621,40 @@ public void testFaultyMetricsProviderOnConfigure() throws Exception {
assertTrue("complains about metrics provider MetricsProviderLifeCycleException", found);
}

/**
* Test the behavior to skip processing the learner forwarded requests in
* Leader's CommitProcessor.
*/
@Test
public void testLearnerRequestForwardBehavior() throws Exception {
System.setProperty(ProposalRequestProcessor.FORWARD_LEARNER_REQUESTS_TO_COMMIT_PROCESSOR_DISABLED, "true");

try {
// 1. set up an ensemble with 3 servers
final int numServers = 3;
servers = LaunchServers(numServers);
int leaderId = servers.findLeader();

int followerA = (leaderId + 1) % numServers;
waitForOne(servers.zk[followerA], States.CONNECTED);

// 2. reset all metrics
ServerMetrics.getMetrics().resetAll();

// 3. issue a request
final String node = "/testLearnerRequestForwardBehavior";
servers.zk[followerA].create(node, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

assertNotNull("node " + node + " should exist",
servers.zk[followerA].exists("/testLearnerRequestForwardBehavior", false));

assertEquals(1L, ServerMetrics.getMetrics().REQUESTS_NOT_FORWARDED_TO_COMMIT_PROCESSOR.get());
} finally {
//clean up
System.setProperty(ProposalRequestProcessor.FORWARD_LEARNER_REQUESTS_TO_COMMIT_PROCESSOR_DISABLED, "false");
}
}

static class Context {

boolean quitFollowing = false;
Expand Down

0 comments on commit 1857e98

Please sign in to comment.