Skip to content

Commit

Permalink
HBASE-26551 Add FastPath feature to HBase RWQueueRpcExecutor (#3929)
Browse files Browse the repository at this point in the history
Signed-off-by: Reid Chan <[email protected]>
  • Loading branch information
YutSean authored Jan 13, 2022
1 parent 1388ca3 commit 3a14cfc
Show file tree
Hide file tree
Showing 8 changed files with 316 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
/*
* Stack of Handlers waiting for work.
*/
private final Deque<FastPathHandler> fastPathHandlerStack = new ConcurrentLinkedDeque<>();
private final Deque<FastPathRpcHandler> fastPathHandlerStack = new ConcurrentLinkedDeque<>();

public FastPathBalancedQueueRpcExecutor(final String name, final int handlerCount,
final int maxQueueLength, final PriorityFunction priority, final Configuration conf,
Expand All @@ -56,10 +56,12 @@ public FastPathBalancedQueueRpcExecutor(final String name, final int handlerCoun
}

@Override
protected Handler getHandler(String name, double handlerFailureThreshhold,
BlockingQueue<CallRunner> q, AtomicInteger activeHandlerCount) {
return new FastPathHandler(name, handlerFailureThreshhold, q, activeHandlerCount,
fastPathHandlerStack);
protected RpcHandler getHandler(final String name, final double handlerFailureThreshhold,
final int handlerCount, final BlockingQueue<CallRunner> q,
final AtomicInteger activeHandlerCount, final AtomicInteger failedHandlerCount,
final Abortable abortable) {
return new FastPathRpcHandler(name, handlerFailureThreshhold, handlerCount, q,
activeHandlerCount, failedHandlerCount, abortable, fastPathHandlerStack);
}

@Override
Expand All @@ -69,62 +71,14 @@ public boolean dispatch(CallRunner callTask) throws InterruptedException {
if (currentQueueLimit == 0){
return false;
}
FastPathHandler handler = popReadyHandler();
FastPathRpcHandler handler = popReadyHandler();
return handler != null? handler.loadCallRunner(callTask): super.dispatch(callTask);
}

/**
* @return Pop a Handler instance if one available ready-to-go or else return null.
*/
private FastPathHandler popReadyHandler() {
private FastPathRpcHandler popReadyHandler() {
return this.fastPathHandlerStack.poll();
}

class FastPathHandler extends Handler {
// Below are for fast-path support. Push this Handler on to the fastPathHandlerStack Deque
// if an empty queue of CallRunners so we are available for direct handoff when one comes in.
final Deque<FastPathHandler> fastPathHandlerStack;
// Semaphore to coordinate loading of fastpathed loadedTask and our running it.
// UNFAIR synchronization.
private Semaphore semaphore = new Semaphore(0);
// The task we get when fast-pathing.
private CallRunner loadedCallRunner;

FastPathHandler(String name, double handlerFailureThreshhold, BlockingQueue<CallRunner> q,
final AtomicInteger activeHandlerCount,
final Deque<FastPathHandler> fastPathHandlerStack) {
super(name, handlerFailureThreshhold, q, activeHandlerCount);
this.fastPathHandlerStack = fastPathHandlerStack;
}

@Override
protected CallRunner getCallRunner() throws InterruptedException {
// Get a callrunner if one in the Q.
CallRunner cr = this.q.poll();
if (cr == null) {
// Else, if a fastPathHandlerStack present and no callrunner in Q, register ourselves for
// the fastpath handoff done via fastPathHandlerStack.
if (this.fastPathHandlerStack != null) {
this.fastPathHandlerStack.push(this);
this.semaphore.acquire();
cr = this.loadedCallRunner;
this.loadedCallRunner = null;
} else {
// No fastpath available. Block until a task comes available.
cr = super.getCallRunner();
}
}
return cr;
}

/**
* @param cr Task gotten via fastpath.
* @return True if we successfully loaded our task
*/
boolean loadCallRunner(final CallRunner cr) {
this.loadedCallRunner = cr;
this.semaphore.release();
return true;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;

import java.util.Deque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* RPC Executor that extends {@link RWQueueRpcExecutor} with fast-path feature, used in
* {@link FastPathBalancedQueueRpcExecutor}.
*/
@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
@InterfaceStability.Evolving
public class FastPathRWQueueRpcExecutor extends RWQueueRpcExecutor {
private static final Logger LOG = LoggerFactory.getLogger(RWQueueRpcExecutor.class);

private final Deque<FastPathRpcHandler> readHandlerStack = new ConcurrentLinkedDeque<>();
private final Deque<FastPathRpcHandler> writeHandlerStack = new ConcurrentLinkedDeque<>();
private final Deque<FastPathRpcHandler> scanHandlerStack = new ConcurrentLinkedDeque<>();

public FastPathRWQueueRpcExecutor(String name, int handlerCount, int maxQueueLength,
PriorityFunction priority, Configuration conf, Abortable abortable) {
super(name, handlerCount, maxQueueLength, priority, conf, abortable);
}

@Override
protected RpcHandler getHandler(final String name, final double handlerFailureThreshhold,
final int handlerCount, final BlockingQueue<CallRunner> q,
final AtomicInteger activeHandlerCount, final AtomicInteger failedHandlerCount,
final Abortable abortable) {
Deque<FastPathRpcHandler> handlerStack = name.contains("read") ? readHandlerStack :
name.contains("write") ? writeHandlerStack : scanHandlerStack;
return new FastPathRpcHandler(name, handlerFailureThreshhold, handlerCount, q,
activeHandlerCount, failedHandlerCount, abortable, handlerStack);
}

@Override
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
RpcCall call = callTask.getRpcCall();
boolean shouldDispatchToWriteQueue = isWriteRequest(call.getHeader(), call.getParam());
boolean shouldDispatchToScanQueue = shouldDispatchToScanQueue(callTask);
FastPathRpcHandler handler = shouldDispatchToWriteQueue ? writeHandlerStack.poll() :
shouldDispatchToScanQueue ? scanHandlerStack.poll() : readHandlerStack.poll();
return handler != null ? handler.loadCallRunner(callTask) :
dispatchTo(shouldDispatchToWriteQueue, shouldDispatchToScanQueue, callTask);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;

import java.util.Deque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.Abortable;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public class FastPathRpcHandler extends RpcHandler {
// Below are for fast-path support. Push this Handler on to the fastPathHandlerStack Deque
// if an empty queue of CallRunners so we are available for direct handoff when one comes in.
final Deque<FastPathRpcHandler> fastPathHandlerStack;
// Semaphore to coordinate loading of fastpathed loadedTask and our running it.
// UNFAIR synchronization.
private Semaphore semaphore = new Semaphore(0);
// The task we get when fast-pathing.
private CallRunner loadedCallRunner;

FastPathRpcHandler(String name, double handlerFailureThreshhold, int handlerCount,
BlockingQueue<CallRunner> q, AtomicInteger activeHandlerCount,
AtomicInteger failedHandlerCount, final Abortable abortable,
final Deque<FastPathRpcHandler> fastPathHandlerStack) {
super(name, handlerFailureThreshhold, handlerCount, q, activeHandlerCount, failedHandlerCount,
abortable);
this.fastPathHandlerStack = fastPathHandlerStack;
}

@Override
protected CallRunner getCallRunner() throws InterruptedException {
// Get a callrunner if one in the Q.
CallRunner cr = this.q.poll();
if (cr == null) {
// Else, if a fastPathHandlerStack present and no callrunner in Q, register ourselves for
// the fastpath handoff done via fastPathHandlerStack.
if (this.fastPathHandlerStack != null) {
this.fastPathHandlerStack.push(this);
this.semaphore.acquire();
cr = this.loadedCallRunner;
this.loadedCallRunner = null;
} else {
// No fastpath available. Block until a task comes available.
cr = super.getCallRunner();
}
}
return cr;
}

/**
* @param cr Task gotten via fastpath.
* @return True if we successfully loaded our task
*/
boolean loadCallRunner(final CallRunner cr) {
this.loadedCallRunner = cr;
this.semaphore.release();
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.hadoop.hbase.ipc;

import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -130,16 +131,21 @@ protected void startHandlers(final int port) {
@Override
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
RpcCall call = callTask.getRpcCall();
return dispatchTo(isWriteRequest(call.getHeader(), call.getParam()),
shouldDispatchToScanQueue(callTask), callTask);
}

protected boolean dispatchTo(boolean toWriteQueue, boolean toScanQueue,
final CallRunner callTask) {
int queueIndex;
if (isWriteRequest(call.getHeader(), call.getParam())) {
if (toWriteQueue) {
queueIndex = writeBalancer.getNextQueue();
} else if (numScanQueues > 0 && isScanRequest(call.getHeader(), call.getParam())) {
} else if (toScanQueue) {
queueIndex = numWriteQueues + numReadQueues + scanBalancer.getNextQueue();
} else {
queueIndex = numWriteQueues + readBalancer.getNextQueue();
}

BlockingQueue<CallRunner> queue = queues.get(queueIndex);
Queue<CallRunner> queue = queues.get(queueIndex);
if (queue.size() >= currentQueueLimit) {
return false;
}
Expand Down Expand Up @@ -232,6 +238,11 @@ private boolean isScanRequest(final RequestHeader header, final Message param) {
return param instanceof ScanRequest;
}

protected boolean shouldDispatchToScanQueue(final CallRunner task) {
RpcCall call = task.getRpcCall();
return numScanQueues > 0 && isScanRequest(call.getHeader(), call.getParam());
}

protected float getReadShare(final Configuration conf) {
return conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
}
Expand Down
Loading

0 comments on commit 3a14cfc

Please sign in to comment.