Skip to content

Commit

Permalink
ZOOKEEPER-3683: Discard requests that are delayed longer than a confi…
Browse files Browse the repository at this point in the history
…gured threshold

Author: Jie Huang <[email protected]>
Author: Ivailo Nedelchev <[email protected]>

Reviewers: Michael Han <[email protected]>, Allan Lyu <[email protected]>, Damien Diederen <[email protected]>

Closes apache#1211 from jhuan31/ZOOKEEPER-3683
  • Loading branch information
Jie Huang authored and RokLenarcic committed Sep 3, 2022
1 parent 49e8a87 commit c42d168
Show file tree
Hide file tree
Showing 24 changed files with 695 additions and 24 deletions.
3 changes: 2 additions & 1 deletion zookeeper-client/zookeeper-client-c/include/zookeeper.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ enum ZOO_ERRORS {
ZEPHEMERALONLOCALSESSION = -120, /*!< Attempt to create ephemeral node on a local session */
ZNOWATCHER = -121, /*!< The watcher couldn't be found */
ZRECONFIGDISABLED = -123, /*!< Attempts to perform a reconfiguration operation when reconfiguration feature is disabled */
ZSESSIONCLOSEDREQUIRESASLAUTH = -124 /*!< The session has been closed by server because server requires client to do SASL authentication, but client is not configured with SASL authentication or configuted with SASL but failed (i.e. wrong credential used.). */
ZSESSIONCLOSEDREQUIRESASLAUTH = -124, /*!< The session has been closed by server because server requires client to do SASL authentication, but client is not configured with SASL authentication or configuted with SASL but failed (i.e. wrong credential used.). */
ZTHROTTLEDOP = -127 /*!< Operation was throttled and not executed at all. please, retry! */
};

#ifdef __cplusplus
Expand Down
2 changes: 2 additions & 0 deletions zookeeper-client/zookeeper-client-c/src/zookeeper.c
Original file line number Diff line number Diff line change
Expand Up @@ -4902,6 +4902,8 @@ const char* zerror(int c)
return "the watcher couldn't be found";
case ZRECONFIGDISABLED:
return "attempts to perform a reconfiguration operation when reconfiguration feature is disable";
case ZTHROTTLEDOP:
return "Operation was throttled due to high load";
}
if (c > 0) {
return strerror(c);
Expand Down
9 changes: 9 additions & 0 deletions zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
Original file line number Diff line number Diff line change
Expand Up @@ -1133,6 +1133,15 @@ property, when available, is noted below.
effect due to TLS handshake timeout when there are too many in-flight TLS
handshakes. Set it to something like 250 is good enough to avoid herd effect.

* *throttledOpWaitTime*
(Java system property: **zookeeper.throttled_op_wait_time**)
The time in the RequestThrottler queue longer than which a request will be marked as throttled.
A throttled requests will not be processed other than being fed down the pipeline of the server it belongs to
to preserve the order of all requests.
The FinalProcessor will issue an error response (new error code: ZTHROTTLEDOP) for these undigested requests.
The intent is for the clients not to retry them immediately.
When set to 0, no requests will be throttled. The default is 0.

<a name="sc_clusterOptions"></a>

#### Cluster Options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ public static KeeperException create(Code code) {
return new SessionClosedRequireAuthException();
case REQUESTTIMEOUT:
return new RequestTimeoutException();
case THROTTLEDOP:
return new ThrottledOpException();
case OK:
default:
throw new IllegalArgumentException("Invalid exception code");
Expand Down Expand Up @@ -404,7 +406,11 @@ public enum Code implements CodeDeprecated {
/** The session has been closed by server because server requires client to do SASL authentication,
* but client is not configured with SASL authentication or configuted with SASL but failed
* (i.e. wrong credential used.). */
SESSIONCLOSEDREQUIRESASLAUTH(-124);
SESSIONCLOSEDREQUIRESASLAUTH(-124),
/** Operation was throttled and not executed at all. This error code indicates that zookeeper server
* is under heavy load and can't process incoming requests at full speed; please retry with back off.
*/
THROTTLEDOP (-127);

private static final Map<Integer, Code> lookup = new HashMap<Integer, Code>();

Expand Down Expand Up @@ -495,6 +501,8 @@ static String getCodeMessage(Code code) {
return "Reconfig is disabled";
case SESSIONCLOSEDREQUIRESASLAUTH:
return "Session closed because client failed to authenticate";
case THROTTLEDOP:
return "Op throttled due to high load";
default:
return "Unknown error " + code;
}
Expand Down Expand Up @@ -940,4 +948,12 @@ public RequestTimeoutException() {

}

/**
* @see Code#THROTTLEDOP
*/
public static class ThrottledOpException extends KeeperException {
public ThrottledOpException() {
super(Code.THROTTLEDOP);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,18 +106,7 @@ public FinalRequestProcessor(ZooKeeperServer zks) {
this.requestPathMetricsCollector = zks.getRequestPathMetricsCollector();
}

public void processRequest(Request request) {
LOG.debug("Processing request:: {}", request);

// request.addRQRec(">final");
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) {
traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
}
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, traceMask, 'E', request, "");
}

private ProcessTxnResult applyRequest(Request request) {
ProcessTxnResult rc = zks.processTxn(request);

// ZOOKEEPER-558:
Expand All @@ -131,7 +120,7 @@ public void processRequest(Request request) {
// we are just playing diffs from the leader.
if (closeSession(zks.serverCnxnFactory, request.sessionId)
|| closeSession(zks.secureServerCnxnFactory, request.sessionId)) {
return;
return rc;
}
}

Expand All @@ -150,6 +139,24 @@ public void processRequest(Request request) {
}
}

return rc;
}

public void processRequest(Request request) {
LOG.debug("Processing request:: {}", request);

// request.addRQRec(">final");
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) {
traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
}
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, traceMask, 'E', request, "");
}
ProcessTxnResult rc = null;
if (!request.isThrottled()) {
rc = applyRequest(request);
}
if (request.cnxn == null) {
return;
}
Expand Down Expand Up @@ -195,7 +202,13 @@ public void processRequest(Request request) {
if (request.isStale()) {
ServerMetrics.getMetrics().STALE_REPLIES.add(1);
}

if (request.isThrottled()) {
throw KeeperException.create(Code.THROTTLEDOP);
}

AuditHelper.addAuditLog(request, rc);

switch (request.type) {
case OpCode.ping: {
lastOp = "PING";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,21 @@ protected void pRequest(Request request) throws RequestProcessorException {
request.setHdr(null);
request.setTxn(null);

if (!request.isThrottled()) {
pRequestHelper(request);
}

request.zxid = zks.getZxid();
ServerMetrics.getMetrics().PREP_PROCESS_TIME.add(Time.currentElapsedTime() - request.prepStartTime);
nextProcessor.processRequest(request);
}

/**
* This method is a helper to pRequest method
*
* @param request
*/
private void pRequestHelper(Request request) throws RequestProcessorException {
try {
switch (request.type) {
case OpCode.createContainer:
Expand Down Expand Up @@ -939,9 +954,6 @@ protected void pRequest(Request request) throws RequestProcessorException {
request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue()));
}
}
request.zxid = zks.getZxid();
ServerMetrics.getMetrics().PREP_PROCESS_TIME.add(Time.currentElapsedTime() - request.prepStartTime);
nextProcessor.processRequest(request);
}

private static List<ACL> removeDuplicates(final List<ACL> acls) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ public Request(long sessionId, int xid, int type, TxnHeader hdr, Record txn, lon

public long syncQueueStartTime;

public long requestThrottleQueueTime;

private Object owner;

private KeeperException e;
Expand All @@ -108,6 +110,22 @@ public Request(long sessionId, int xid, int type, TxnHeader hdr, Record txn, lon

private TxnDigest txnDigest;

private boolean isThrottledFlag = false;

public boolean isThrottled() {
return isThrottledFlag;
}

public void setIsThrottled(boolean val) {
isThrottledFlag = val;
}

public boolean isThrottlable() {
return this.type != OpCode.ping
&& this.type != OpCode.closeSession
&& this.type != OpCode.createSession;
}

/**
* If this is a create or close request for a local-only session.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.util.ServiceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -97,6 +98,13 @@ public class RequestThrottler extends ZooKeeperCriticalThread {
*/
private static volatile boolean dropStaleRequests = Boolean.parseBoolean(System.getProperty("zookeeper.request_throttle_drop_stale", "true"));

protected boolean shouldThrottleOp(Request request, long elapsedTime) {
return request.isThrottlable()
&& zks.getThrottledOpWaitTime() > 0
&& elapsedTime > zks.getThrottledOpWaitTime();
}


public RequestThrottler(ZooKeeperServer zks) {
super("RequestThrottler", zks.getZooKeeperServerListener());
this.zks = zks;
Expand Down Expand Up @@ -171,6 +179,12 @@ public void run() {
if (request.isStale()) {
ServerMetrics.getMetrics().STALE_REQUESTS.add(1);
}
final long elapsedTime = Time.currentElapsedTime() - request.requestThrottleQueueTime;
ServerMetrics.getMetrics().REQUEST_THROTTLE_QUEUE_TIME.add(elapsedTime);
if (shouldThrottleOp(request, elapsedTime)) {
request.setIsThrottled(true);
ServerMetrics.getMetrics().THROTTLED_OPS.add(1);
}
zks.submitRequestNow(request);
}
}
Expand Down Expand Up @@ -230,6 +244,7 @@ public void submitRequest(Request request) {
LOG.debug("Shutdown in progress. Request cannot be processed");
dropRequest(request);
} else {
request.requestThrottleQueueTime = Time.currentElapsedTime();
submittedRequests.add(request);
}
}
Expand All @@ -238,7 +253,6 @@ public int getInflight() {
return submittedRequests.size();
}

@SuppressFBWarnings("DM_EXIT")
public void shutdown() {
// Try to shutdown gracefully
LOG.info("Shutting down");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ private ServerMetrics(MetricsProvider metricsProvider) {
READS_ISSUED_IN_COMMIT_PROC = metricsContext.getSummary("read_commit_proc_issued", DetailLevel.BASIC);
WRITES_ISSUED_IN_COMMIT_PROC = metricsContext.getSummary("write_commit_proc_issued", DetailLevel.BASIC);

THROTTLED_OPS = metricsContext.getCounter("throttled_ops");

/**
* Time spent by a read request in the commit processor.
*/
Expand Down Expand Up @@ -223,6 +225,7 @@ private ServerMetrics(MetricsProvider metricsProvider) {
STALE_REQUESTS = metricsContext.getCounter("stale_requests");
STALE_REQUESTS_DROPPED = metricsContext.getCounter("stale_requests_dropped");
STALE_REPLIES = metricsContext.getCounter("stale_replies");
REQUEST_THROTTLE_QUEUE_TIME = metricsContext.getSummary("request_throttle_queue_time_ms", DetailLevel.ADVANCED);
REQUEST_THROTTLE_WAIT_COUNT = metricsContext.getCounter("request_throttle_wait_count");
LARGE_REQUESTS_REJECTED = metricsContext.getCounter("large_requests_rejected");

Expand Down Expand Up @@ -381,6 +384,9 @@ private ServerMetrics(MetricsProvider metricsProvider) {
public final Summary READS_ISSUED_IN_COMMIT_PROC;
public final Summary WRITES_ISSUED_IN_COMMIT_PROC;

// Request op throttling related
public final Counter THROTTLED_OPS;

/**
* Time spent by a read request in the commit processor.
*/
Expand Down Expand Up @@ -435,6 +441,7 @@ private ServerMetrics(MetricsProvider metricsProvider) {
public final Counter STALE_REQUESTS;
public final Counter STALE_REQUESTS_DROPPED;
public final Counter STALE_REPLIES;
public final Summary REQUEST_THROTTLE_QUEUE_TIME;
public final Counter REQUEST_THROTTLE_WAIT_COUNT;
public final Counter LARGE_REQUESTS_REJECTED;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public void run() {
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime);

// track the number of records written to the log
if (zks.getZKDatabase().append(si)) {
if (!si.isThrottled() && zks.getZKDatabase().append(si)) {
if (shouldSnapshot()) {
resetSnapshotStats();
// roll the log
Expand All @@ -202,9 +202,8 @@ public void run() {
}
} else if (toFlush.isEmpty()) {
// optimization for read heavy workloads
// iff this is a read, and there are no pending
// flushes (writes), then just pass this to the next
// processor
// iff this is a read or a throttled request(which doesn't need to be written to the disk),
// and there are no pending flushes (writes), then just pass this to the next processor
if (nextProcessor != null) {
nextProcessor.processRequest(si);
if (nextProcessor instanceof Flushable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ public static void setCloseSessionTxnEnabled(boolean enabled) {

public static final int DEFAULT_TICK_TIME = 3000;
protected int tickTime = DEFAULT_TICK_TIME;
public static final int DEFAULT_THROTTLED_OP_WAIT_TIME = 0; // disabled
protected static volatile int throttledOpWaitTime =
Integer.getInteger("zookeeper.throttled_op_wait_time", DEFAULT_THROTTLED_OP_WAIT_TIME);
/** value of -1 indicates unset, use default */
protected int minSessionTimeout = -1;
/** value of -1 indicates unset, use default */
Expand Down Expand Up @@ -1237,6 +1240,15 @@ public void setTickTime(int tickTime) {
this.tickTime = tickTime;
}

public static int getThrottledOpWaitTime() {
return throttledOpWaitTime;
}

public static void setThrottledOpWaitTime(int time) {
LOG.info("throttledOpWaitTime set to {}", time);
throttledOpWaitTime = time;
}

public int getMinSessionTimeout() {
return minSessionTimeout;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,16 @@ public void setFlushDelay(long delay) {
// Request throttling settings
///////////////////////////////////////////////////////////////////////////

public int getThrottledOpWaitTime() {
return zks.getThrottledOpWaitTime();
}

public void setThrottledOpWaitTime(int val) {
zks.setThrottledOpWaitTime(val);
}

///////////////////////////////////////////////////////////////////////////

public int getRequestThrottleLimit() {
return RequestThrottler.getMaxRequests();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ public interface ZooKeeperServerMXBean {
boolean getRequestThrottleDropStale();
void setRequestThrottleDropStale(boolean drop);

int getThrottledOpWaitTime();
void setThrottledOpWaitTime(int val);

boolean getRequestStaleLatencyCheck();
void setRequestStaleLatencyCheck(boolean check);

Expand Down
Loading

0 comments on commit c42d168

Please sign in to comment.