diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java index 1db5408841ea..5d9e07a8d117 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java @@ -41,7 +41,7 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor { /* * Stack of Handlers waiting for work. */ - private final Deque fastPathHandlerStack = new ConcurrentLinkedDeque<>(); + private final Deque fastPathHandlerStack = new ConcurrentLinkedDeque<>(); public FastPathBalancedQueueRpcExecutor(final String name, final int handlerCount, final int maxQueueLength, final PriorityFunction priority, final Configuration conf, @@ -56,10 +56,12 @@ public FastPathBalancedQueueRpcExecutor(final String name, final int handlerCoun } @Override - protected Handler getHandler(String name, double handlerFailureThreshhold, - BlockingQueue q, AtomicInteger activeHandlerCount) { - return new FastPathHandler(name, handlerFailureThreshhold, q, activeHandlerCount, - fastPathHandlerStack); + protected RpcHandler getHandler(final String name, final double handlerFailureThreshhold, + final int handlerCount, final BlockingQueue q, + final AtomicInteger activeHandlerCount, final AtomicInteger failedHandlerCount, + final Abortable abortable) { + return new FastPathRpcHandler(name, handlerFailureThreshhold, handlerCount, q, + activeHandlerCount, failedHandlerCount, abortable, fastPathHandlerStack); } @Override @@ -69,62 +71,14 @@ public boolean dispatch(CallRunner callTask) throws InterruptedException { if (currentQueueLimit == 0){ return false; } - FastPathHandler handler = popReadyHandler(); + FastPathRpcHandler handler = popReadyHandler(); return handler != null? handler.loadCallRunner(callTask): super.dispatch(callTask); } /** * @return Pop a Handler instance if one available ready-to-go or else return null. */ - private FastPathHandler popReadyHandler() { + private FastPathRpcHandler popReadyHandler() { return this.fastPathHandlerStack.poll(); } - - class FastPathHandler extends Handler { - // Below are for fast-path support. Push this Handler on to the fastPathHandlerStack Deque - // if an empty queue of CallRunners so we are available for direct handoff when one comes in. - final Deque fastPathHandlerStack; - // Semaphore to coordinate loading of fastpathed loadedTask and our running it. - // UNFAIR synchronization. - private Semaphore semaphore = new Semaphore(0); - // The task we get when fast-pathing. - private CallRunner loadedCallRunner; - - FastPathHandler(String name, double handlerFailureThreshhold, BlockingQueue q, - final AtomicInteger activeHandlerCount, - final Deque fastPathHandlerStack) { - super(name, handlerFailureThreshhold, q, activeHandlerCount); - this.fastPathHandlerStack = fastPathHandlerStack; - } - - @Override - protected CallRunner getCallRunner() throws InterruptedException { - // Get a callrunner if one in the Q. - CallRunner cr = this.q.poll(); - if (cr == null) { - // Else, if a fastPathHandlerStack present and no callrunner in Q, register ourselves for - // the fastpath handoff done via fastPathHandlerStack. - if (this.fastPathHandlerStack != null) { - this.fastPathHandlerStack.push(this); - this.semaphore.acquire(); - cr = this.loadedCallRunner; - this.loadedCallRunner = null; - } else { - // No fastpath available. Block until a task comes available. - cr = super.getCallRunner(); - } - } - return cr; - } - - /** - * @param cr Task gotten via fastpath. - * @return True if we successfully loaded our task - */ - boolean loadCallRunner(final CallRunner cr) { - this.loadedCallRunner = cr; - this.semaphore.release(); - return true; - } - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java new file mode 100644 index 000000000000..c335b024c212 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java @@ -0,0 +1,72 @@ +/** + + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.util.Deque; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * RPC Executor that extends {@link RWQueueRpcExecutor} with fast-path feature, used in + * {@link FastPathBalancedQueueRpcExecutor}. + */ +@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) +@InterfaceStability.Evolving +public class FastPathRWQueueRpcExecutor extends RWQueueRpcExecutor { + private static final Logger LOG = LoggerFactory.getLogger(RWQueueRpcExecutor.class); + + private final Deque readHandlerStack = new ConcurrentLinkedDeque<>(); + private final Deque writeHandlerStack = new ConcurrentLinkedDeque<>(); + private final Deque scanHandlerStack = new ConcurrentLinkedDeque<>(); + + public FastPathRWQueueRpcExecutor(String name, int handlerCount, int maxQueueLength, + PriorityFunction priority, Configuration conf, Abortable abortable) { + super(name, handlerCount, maxQueueLength, priority, conf, abortable); + } + + @Override + protected RpcHandler getHandler(final String name, final double handlerFailureThreshhold, + final int handlerCount, final BlockingQueue q, + final AtomicInteger activeHandlerCount, final AtomicInteger failedHandlerCount, + final Abortable abortable) { + Deque handlerStack = name.contains("read") ? readHandlerStack : + name.contains("write") ? writeHandlerStack : scanHandlerStack; + return new FastPathRpcHandler(name, handlerFailureThreshhold, handlerCount, q, + activeHandlerCount, failedHandlerCount, abortable, handlerStack); + } + + @Override + public boolean dispatch(final CallRunner callTask) throws InterruptedException { + RpcCall call = callTask.getRpcCall(); + boolean shouldDispatchToWriteQueue = isWriteRequest(call.getHeader(), call.getParam()); + boolean shouldDispatchToScanQueue = shouldDispatchToScanQueue(callTask); + FastPathRpcHandler handler = shouldDispatchToWriteQueue ? writeHandlerStack.poll() : + shouldDispatchToScanQueue ? scanHandlerStack.poll() : readHandlerStack.poll(); + return handler != null ? handler.loadCallRunner(callTask) : + dispatchTo(shouldDispatchToWriteQueue, shouldDispatchToScanQueue, callTask); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRpcHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRpcHandler.java new file mode 100644 index 000000000000..3064c7aa324d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRpcHandler.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.util.Deque; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.hbase.Abortable; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class FastPathRpcHandler extends RpcHandler { + // Below are for fast-path support. Push this Handler on to the fastPathHandlerStack Deque + // if an empty queue of CallRunners so we are available for direct handoff when one comes in. + final Deque fastPathHandlerStack; + // Semaphore to coordinate loading of fastpathed loadedTask and our running it. + // UNFAIR synchronization. + private Semaphore semaphore = new Semaphore(0); + // The task we get when fast-pathing. + private CallRunner loadedCallRunner; + + FastPathRpcHandler(String name, double handlerFailureThreshhold, int handlerCount, + BlockingQueue q, AtomicInteger activeHandlerCount, + AtomicInteger failedHandlerCount, final Abortable abortable, + final Deque fastPathHandlerStack) { + super(name, handlerFailureThreshhold, handlerCount, q, activeHandlerCount, failedHandlerCount, + abortable); + this.fastPathHandlerStack = fastPathHandlerStack; + } + + @Override + protected CallRunner getCallRunner() throws InterruptedException { + // Get a callrunner if one in the Q. + CallRunner cr = this.q.poll(); + if (cr == null) { + // Else, if a fastPathHandlerStack present and no callrunner in Q, register ourselves for + // the fastpath handoff done via fastPathHandlerStack. + if (this.fastPathHandlerStack != null) { + this.fastPathHandlerStack.push(this); + this.semaphore.acquire(); + cr = this.loadedCallRunner; + this.loadedCallRunner = null; + } else { + // No fastpath available. Block until a task comes available. + cr = super.getCallRunner(); + } + } + return cr; + } + + /** + * @param cr Task gotten via fastpath. + * @return True if we successfully loaded our task + */ + boolean loadCallRunner(final CallRunner cr) { + this.loadedCallRunner = cr; + this.semaphore.release(); + return true; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java index 5e7e2f8d3351..43c6ce468d73 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.ipc; +import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicInteger; @@ -130,16 +131,21 @@ protected void startHandlers(final int port) { @Override public boolean dispatch(final CallRunner callTask) throws InterruptedException { RpcCall call = callTask.getRpcCall(); + return dispatchTo(isWriteRequest(call.getHeader(), call.getParam()), + shouldDispatchToScanQueue(callTask), callTask); + } + + protected boolean dispatchTo(boolean toWriteQueue, boolean toScanQueue, + final CallRunner callTask) { int queueIndex; - if (isWriteRequest(call.getHeader(), call.getParam())) { + if (toWriteQueue) { queueIndex = writeBalancer.getNextQueue(); - } else if (numScanQueues > 0 && isScanRequest(call.getHeader(), call.getParam())) { + } else if (toScanQueue) { queueIndex = numWriteQueues + numReadQueues + scanBalancer.getNextQueue(); } else { queueIndex = numWriteQueues + readBalancer.getNextQueue(); } - - BlockingQueue queue = queues.get(queueIndex); + Queue queue = queues.get(queueIndex); if (queue.size() >= currentQueueLimit) { return false; } @@ -232,6 +238,11 @@ private boolean isScanRequest(final RequestHeader header, final Message param) { return param instanceof ScanRequest; } + protected boolean shouldDispatchToScanQueue(final CallRunner task) { + RpcCall call = task.getRpcCall(); + return numScanQueues > 0 && isScanRequest(call.getHeader(), call.getParam()); + } + protected float getReadShare(final Configuration conf) { return conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java index db512340fdb1..5775fc19d0fe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java @@ -98,12 +98,11 @@ public abstract class RpcExecutor { protected volatile int currentQueueLimit; private final AtomicInteger activeHandlerCount = new AtomicInteger(0); - private final List handlers; + private final List handlers; private final int handlerCount; private final AtomicInteger failedHandlerCount = new AtomicInteger(0); private String name; - private boolean running; private Configuration conf = null; private Abortable abortable = null; @@ -239,13 +238,12 @@ protected void initializeQueues(final int numQueues) { } public void start(final int port) { - running = true; startHandlers(port); } public void stop() { - running = false; - for (Thread handler : handlers) { + for (RpcHandler handler : handlers) { + handler.stopRunning(); handler.interrupt(); } } @@ -266,9 +264,12 @@ protected void startHandlers(final int port) { /** * Override if providing alternate Handler implementation. */ - protected Handler getHandler(final String name, final double handlerFailureThreshhold, - final BlockingQueue q, final AtomicInteger activeHandlerCount) { - return new Handler(name, handlerFailureThreshhold, q, activeHandlerCount); + protected RpcHandler getHandler(final String name, final double handlerFailureThreshhold, + final int handlerCount, final BlockingQueue q, + final AtomicInteger activeHandlerCount, final AtomicInteger failedHandlerCount, + final Abortable abortable) { + return new RpcHandler(name, handlerFailureThreshhold, handlerCount, q, activeHandlerCount, + failedHandlerCount, abortable); } /** @@ -285,8 +286,8 @@ protected void startHandlers(final String nameSuffix, final int numHandlers, final int index = qindex + (i % qsize); String name = "RpcServer." + threadPrefix + ".handler=" + handlers.size() + ",queue=" + index + ",port=" + port; - Handler handler = getHandler(name, handlerFailureThreshhold, callQueues.get(index), - activeHandlerCount); + RpcHandler handler = getHandler(name, handlerFailureThreshhold, handlerCount, + callQueues.get(index), activeHandlerCount, failedHandlerCount, abortable); handler.start(); handlers.add(handler); } @@ -294,90 +295,6 @@ protected void startHandlers(final String nameSuffix, final int numHandlers, handlers.size(), threadPrefix, qsize, port); } - /** - * Handler thread run the {@link CallRunner#run()} in. - */ - protected class Handler extends Thread { - /** - * Q to find CallRunners to run in. - */ - final BlockingQueue q; - - final double handlerFailureThreshhold; - - // metrics (shared with other handlers) - final AtomicInteger activeHandlerCount; - - Handler(final String name, final double handlerFailureThreshhold, - final BlockingQueue q, final AtomicInteger activeHandlerCount) { - super(name); - setDaemon(true); - this.q = q; - this.handlerFailureThreshhold = handlerFailureThreshhold; - this.activeHandlerCount = activeHandlerCount; - } - - /** - * @return A {@link CallRunner} - * @throws InterruptedException - */ - protected CallRunner getCallRunner() throws InterruptedException { - return this.q.take(); - } - - @Override - public void run() { - boolean interrupted = false; - try { - while (running) { - try { - run(getCallRunner()); - } catch (InterruptedException e) { - interrupted = true; - } - } - } catch (Exception e) { - LOG.warn(e.toString(), e); - throw e; - } finally { - if (interrupted) { - Thread.currentThread().interrupt(); - } - } - } - - private void run(CallRunner cr) { - MonitoredRPCHandler status = RpcServer.getStatus(); - cr.setStatus(status); - try { - this.activeHandlerCount.incrementAndGet(); - cr.run(); - } catch (Throwable e) { - if (e instanceof Error) { - int failedCount = failedHandlerCount.incrementAndGet(); - if (this.handlerFailureThreshhold >= 0 - && failedCount > handlerCount * this.handlerFailureThreshhold) { - String message = "Number of failed RpcServer handler runs exceeded threshhold " - + this.handlerFailureThreshhold + "; reason: " + StringUtils.stringifyException(e); - if (abortable != null) { - abortable.abort(message, e); - } else { - LOG.error("Error but can't abort because abortable is null: " - + StringUtils.stringifyException(e)); - throw e; - } - } else { - LOG.warn("Handler errors " + StringUtils.stringifyException(e)); - } - } else { - LOG.warn("Handler exception " + StringUtils.stringifyException(e)); - } - } finally { - this.activeHandlerCount.decrementAndGet(); - } - } - } - public static abstract class QueueBalancer { /** * @return the index of the next queue to which a request should be inserted diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcHandler.java new file mode 100644 index 000000000000..f46dcfcc08eb --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcHandler.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; +import org.apache.hadoop.util.StringUtils; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Thread to handle rpc call. + * Should only be used in {@link RpcExecutor} and its sub-classes. + */ +@InterfaceAudience.Private +public class RpcHandler extends Thread { + private static final Logger LOG = LoggerFactory.getLogger(RpcHandler.class); + + /** + * Q to find CallRunners to run in. + */ + final BlockingQueue q; + + final int handlerCount; + final double handlerFailureThreshhold; + + // metrics (shared with other handlers) + final AtomicInteger activeHandlerCount; + final AtomicInteger failedHandlerCount; + + // The up-level RpcServer. + final Abortable abortable; + + private boolean running; + + RpcHandler(final String name, final double handlerFailureThreshhold, final int handlerCount, + final BlockingQueue q, final AtomicInteger activeHandlerCount, + final AtomicInteger failedHandlerCount, final Abortable abortable) { + super(name); + setDaemon(true); + this.q = q; + this.handlerFailureThreshhold = handlerFailureThreshhold; + this.activeHandlerCount = activeHandlerCount; + this.failedHandlerCount = failedHandlerCount; + this.handlerCount = handlerCount; + this.abortable = abortable; + } + + /** + * @return A {@link CallRunner} + * @throws InterruptedException + */ + protected CallRunner getCallRunner() throws InterruptedException { + return this.q.take(); + } + + public void stopRunning() { + running = false; + } + + @Override + public void run() { + boolean interrupted = false; + running = true; + try { + while (running) { + try { + run(getCallRunner()); + } catch (InterruptedException e) { + interrupted = true; + } + } + } catch (Exception e) { + LOG.warn(e.toString(), e); + throw e; + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } + + private void run(CallRunner cr) { + MonitoredRPCHandler status = RpcServer.getStatus(); + cr.setStatus(status); + try { + this.activeHandlerCount.incrementAndGet(); + cr.run(); + } catch (Throwable e) { + if (e instanceof Error) { + int failedCount = failedHandlerCount.incrementAndGet(); + if (this.handlerFailureThreshhold >= 0 + && failedCount > handlerCount * this.handlerFailureThreshhold) { + String message = "Number of failed RpcServer handler runs exceeded threshhold " + + this.handlerFailureThreshhold + "; reason: " + StringUtils.stringifyException(e); + if (abortable != null) { + abortable.abort(message, e); + } else { + LOG.error("Error but can't abort because abortable is null: " + + StringUtils.stringifyException(e)); + throw e; + } + } else { + LOG.warn("Handler errors " + StringUtils.stringifyException(e)); + } + } else { + LOG.warn("Handler exception " + StringUtils.stringifyException(e)); + } + } finally { + this.activeHandlerCount.decrementAndGet(); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index 3b7c0bb1a24e..fa5e7ddaeb89 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -88,7 +88,7 @@ public SimpleRpcScheduler( if (callqReadShare > 0) { // at least 1 read handler and 1 write handler - callExecutor = new RWQueueRpcExecutor("default.RWQ", Math.max(2, handlerCount), + callExecutor = new FastPathRWQueueRpcExecutor("default.FPRWQ", Math.max(2, handlerCount), maxQueueLength, priority, conf, server); } else { if (RpcExecutor.isFifoQueueType(callQueueType) || RpcExecutor.isCodelQueueType(callQueueType)) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java index 7d32f35fa44c..286094c5bcc3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java @@ -662,7 +662,7 @@ public void testFastPathBalancedQueueRpcExecutorWithQueueLength0() throws Except assertFalse(executor.dispatch(task)); //make sure we never internally get a handler, which would skip the queue validation Mockito.verify(executor, Mockito.never()).getHandler(Mockito.any(), Mockito.anyDouble(), - Mockito.any(), Mockito.any()); + Mockito.anyInt(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); } @Test