Skip to content

Commit

Permalink
HBASE-26782 Minor code cleanup in and around RpcExecutor
Browse files Browse the repository at this point in the history
Signed-off-by: Bharath Vissapragada <[email protected]>
Signed-off-by: Duo Zhang <[email protected]>
  • Loading branch information
ndimiduk committed Mar 7, 2022
1 parent 591f781 commit f3f2aa9
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 89 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -52,7 +52,7 @@ public BalancedQueueRpcExecutor(final String name, final int handlerCount,
}

@Override
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
public boolean dispatch(final CallRunner callTask) {
int queueIndex = balancer.getNextQueue(callTask);
BlockingQueue<CallRunner> queue = queues.get(queueIndex);
// that means we can overflow by at most <num reader> size (5), that's ok
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -20,9 +20,7 @@
import java.util.Deque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -65,7 +63,7 @@ protected RpcHandler getHandler(final String name, final double handlerFailureTh
}

@Override
public boolean dispatch(CallRunner callTask) throws InterruptedException {
public boolean dispatch(CallRunner callTask) {
//FastPathHandlers don't check queue limits, so if we're completely shut down
//we have to prevent ourselves from using the handler in the first place
if (currentQueueLimit == 0){
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -27,8 +26,6 @@
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* RPC Executor that extends {@link RWQueueRpcExecutor} with fast-path feature, used in
Expand All @@ -37,7 +34,6 @@
@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
@InterfaceStability.Evolving
public class FastPathRWQueueRpcExecutor extends RWQueueRpcExecutor {
private static final Logger LOG = LoggerFactory.getLogger(RWQueueRpcExecutor.class);

private final Deque<FastPathRpcHandler> readHandlerStack = new ConcurrentLinkedDeque<>();
private final Deque<FastPathRpcHandler> writeHandlerStack = new ConcurrentLinkedDeque<>();
Expand All @@ -60,7 +56,7 @@ protected RpcHandler getHandler(final String name, final double handlerFailureTh
}

@Override
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
public boolean dispatch(final CallRunner callTask) {
RpcCall call = callTask.getRpcCall();
boolean shouldDispatchToWriteQueue = isWriteRequest(call.getHeader(), call.getParam());
boolean shouldDispatchToScanQueue = shouldDispatchToScanQueue(callTask);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ protected void startHandlers(final int port) {
}

@Override
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
public boolean dispatch(final CallRunner callTask) {
RpcCall call = callTask.getRpcCall();
return dispatchTo(isWriteRequest(call.getHeader(), call.getParam()),
shouldDispatchToScanQueue(callTask), callTask);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -19,8 +19,8 @@
package org.apache.hadoop.hbase.ipc;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -29,20 +29,21 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.base.Strings;
import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;

/**
* Runs the CallRunners passed here via {@link #dispatch(CallRunner)}. Subclass and add particular
Expand All @@ -53,14 +54,16 @@ public abstract class RpcExecutor {
private static final Logger LOG = LoggerFactory.getLogger(RpcExecutor.class);

protected static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT = 250;
public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = "hbase.ipc.server.callqueue.handler.factor";
public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
"hbase.ipc.server.callqueue.handler.factor";

/** max delay in msec used to bound the deprioritized requests */
public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY = "hbase.ipc.server.queue.max.call.delay";
/** max delay in msec used to bound the de-prioritized requests */
public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY =
"hbase.ipc.server.queue.max.call.delay";

/**
* The default, 'fifo', has the least friction but is dumb. If set to 'deadline', uses a priority
* queue and deprioritizes long-running scans. Sorting by priority comes at a cost, reduced
* queue and de-prioritizes long-running scans. Sorting by priority comes at a cost, reduced
* throughput.
*/
public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel";
Expand All @@ -70,14 +73,18 @@ public abstract class RpcExecutor {
public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type";
public static final String CALL_QUEUE_TYPE_CONF_DEFAULT = CALL_QUEUE_TYPE_FIFO_CONF_VALUE;

public static final String CALL_QUEUE_QUEUE_BALANCER_CLASS = "hbase.ipc.server.callqueue.balancer.class";
public static final String CALL_QUEUE_QUEUE_BALANCER_CLASS =
"hbase.ipc.server.callqueue.balancer.class";
public static final Class<?> CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT = RandomQueueBalancer.class;


// These 3 are only used by Codel executor
public static final String CALL_QUEUE_CODEL_TARGET_DELAY = "hbase.ipc.server.callqueue.codel.target.delay";
public static final String CALL_QUEUE_CODEL_INTERVAL = "hbase.ipc.server.callqueue.codel.interval";
public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD = "hbase.ipc.server.callqueue.codel.lifo.threshold";
public static final String CALL_QUEUE_CODEL_TARGET_DELAY =
"hbase.ipc.server.callqueue.codel.target.delay";
public static final String CALL_QUEUE_CODEL_INTERVAL =
"hbase.ipc.server.callqueue.codel.interval";
public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD =
"hbase.ipc.server.callqueue.codel.lifo.threshold";

public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 100;
public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100;
Expand All @@ -88,16 +95,14 @@ public abstract class RpcExecutor {
public static final String PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED =
"hbase.ipc.server.callqueue.pluggable.queue.fast.path.enabled";

private LongAdder numGeneralCallsDropped = new LongAdder();
private LongAdder numLifoModeSwitches = new LongAdder();
private final LongAdder numGeneralCallsDropped = new LongAdder();
private final LongAdder numLifoModeSwitches = new LongAdder();

protected final int numCallQueues;
protected final List<BlockingQueue<CallRunner>> queues;
private final Class<? extends BlockingQueue> queueClass;
private final Object[] queueInitArgs;

private final PriorityFunction priority;

protected volatile int currentQueueLimit;

private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
Expand All @@ -107,8 +112,8 @@ public abstract class RpcExecutor {

private String name;

private Configuration conf = null;
private Abortable abortable = null;
private final Configuration conf;
private final Abortable abortable;

public RpcExecutor(final String name, final int handlerCount, final int maxQueueLength,
final PriorityFunction priority, final Configuration conf, final Abortable abortable) {
Expand Down Expand Up @@ -144,12 +149,10 @@ public RpcExecutor(final String name, final int handlerCount, final String callQ
this.handlerCount = Math.max(handlerCount, this.numCallQueues);
this.handlers = new ArrayList<>(this.handlerCount);

this.priority = priority;

if (isDeadlineQueueType(callQueueType)) {
this.name += ".Deadline";
this.queueInitArgs = new Object[] { maxQueueLength,
new CallPriorityComparator(conf, this.priority) };
new CallPriorityComparator(conf, priority) };
this.queueClass = BoundedPriorityBlockingQueue.class;
} else if (isCodelQueueType(callQueueType)) {
this.name += ".Codel";
Expand All @@ -159,16 +162,17 @@ public RpcExecutor(final String name, final int handlerCount, final String callQ
double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD,
CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD);
this.queueInitArgs = new Object[] { maxQueueLength, codelTargetDelay, codelInterval,
codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches };
codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches };
this.queueClass = AdaptiveLifoCoDelCallQueue.class;
} else if (isPluggableQueueType(callQueueType)) {
Optional<Class<? extends BlockingQueue<CallRunner>>> pluggableQueueClass = getPluggableQueueClass();
Optional<Class<? extends BlockingQueue<CallRunner>>> pluggableQueueClass =
getPluggableQueueClass();

if (!pluggableQueueClass.isPresent()) {
throw new PluggableRpcQueueNotFound("Pluggable call queue failed to load and selected call"
+ " queue type required");
} else {
this.queueInitArgs = new Object[] { maxQueueLength, this.priority, conf };
this.queueInitArgs = new Object[] { maxQueueLength, priority, conf };
this.queueClass = pluggableQueueClass.get();
}
} else {
Expand All @@ -186,50 +190,41 @@ protected int computeNumCallQueues(final int handlerCount, final float callQueue
return Math.max(1, Math.round(handlerCount * callQueuesHandlersFactor));
}

public Map<String, Long> getCallQueueCountsSummary() {
HashMap<String, Long> callQueueMethodTotalCount = new HashMap<>();

for(BlockingQueue<CallRunner> queue: queues) {
for (CallRunner cr:queue) {
RpcCall rpcCall = cr.getRpcCall();

String method;

if (null==rpcCall.getMethod() ||
StringUtil.isNullOrEmpty(method = rpcCall.getMethod().getName())) {
method = "Unknown";
}
/**
* Return the {@link Descriptors.MethodDescriptor#getName()} from {@code callRunner} or "Unknown".
*/
private static String getMethodName(final CallRunner callRunner) {
return Optional.ofNullable(callRunner)
.map(CallRunner::getRpcCall)
.map(RpcCall::getMethod)
.map(Descriptors.MethodDescriptor::getName)
.orElse("Unknown");
}

callQueueMethodTotalCount.put(method, 1+callQueueMethodTotalCount.getOrDefault(method, 0L));
}
}
/**
* Return the {@link RpcCall#getSize()} from {@code callRunner} or 0L.
*/
private static long getRpcCallSize(final CallRunner callRunner) {
return Optional.ofNullable(callRunner)
.map(CallRunner::getRpcCall)
.map(RpcCall::getSize)
.orElse(0L);
}

return callQueueMethodTotalCount;
public Map<String, Long> getCallQueueCountsSummary() {
return queues.stream()
.flatMap(Collection::stream)
.map(RpcExecutor::getMethodName)
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
}

public Map<String, Long> getCallQueueSizeSummary() {
HashMap<String, Long> callQueueMethodTotalSize = new HashMap<>();

for(BlockingQueue<CallRunner> queue: queues) {
for (CallRunner cr:queue) {
RpcCall rpcCall = cr.getRpcCall();
String method;

if (null==rpcCall.getMethod() ||
StringUtil.isNullOrEmpty(method = rpcCall.getMethod().getName())) {
method = "Unknown";
}

long size = rpcCall.getSize();

callQueueMethodTotalSize.put(method, size+callQueueMethodTotalSize.getOrDefault(method, 0L));
}
}

return callQueueMethodTotalSize;
return queues.stream()
.flatMap(Collection::stream)
.map(callRunner -> new Pair<>(getMethodName(callRunner), getRpcCallSize(callRunner)))
.collect(Collectors.groupingBy(Pair::getFirst, Collectors.summingLong(Pair::getSecond)));
}


protected void initializeQueues(final int numQueues) {
if (queueInitArgs.length > 0) {
currentQueueLimit = (int) queueInitArgs[0];
Expand All @@ -252,7 +247,7 @@ public void stop() {
}

/** Add the request to the executor queue */
public abstract boolean dispatch(final CallRunner callTask) throws InterruptedException;
public abstract boolean dispatch(final CallRunner callTask);

/** Returns the list of request queues */
protected List<BlockingQueue<CallRunner>> getQueues() {
Expand Down Expand Up @@ -298,26 +293,26 @@ protected void startHandlers(final String nameSuffix, final int numHandlers,
handlers.size(), threadPrefix, qsize, port);
}

public static QueueBalancer getBalancer(String executorName, Configuration conf, List<BlockingQueue<CallRunner>> queues) {
/**
* All requests go to the first queue, at index 0
*/
private static final QueueBalancer ONE_QUEUE = val -> 0;

public static QueueBalancer getBalancer(
final String executorName,
final Configuration conf,
final List<BlockingQueue<CallRunner>> queues
) {
Preconditions.checkArgument(queues.size() > 0, "Queue size is <= 0, must be at least 1");
if (queues.size() == 1) {
return ONE_QUEUE;
} else {
Class<?> balancerClass = conf.getClass(CALL_QUEUE_QUEUE_BALANCER_CLASS, CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT);
Class<?> balancerClass = conf.getClass(
CALL_QUEUE_QUEUE_BALANCER_CLASS, CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT);
return (QueueBalancer) ReflectionUtils.newInstance(balancerClass, conf, executorName, queues);
}
}

/**
* All requests go to the first queue, at index 0
*/
private static QueueBalancer ONE_QUEUE = new QueueBalancer() {
@Override
public int getNextQueue(CallRunner callRunner) {
return 0;
}
};

/**
* Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true. It
* uses the calculated "deadline" e.g. to deprioritize long-running job If multiple requests have
Expand Down Expand Up @@ -455,7 +450,8 @@ public void resizeQueues(Configuration conf) {
configKey = RpcScheduler.IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH;
}
}
currentQueueLimit = conf.getInt(configKey, currentQueueLimit);
final int queueLimit = currentQueueLimit;
currentQueueLimit = conf.getInt(configKey, queueLimit);
}

public void onConfigurationChange(Configuration conf) {
Expand Down

0 comments on commit f3f2aa9

Please sign in to comment.