Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-26551 Add FastPath feature to HBase RWQueueRpcExecutor #3929

Merged
merged 6 commits into from
Jan 13, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
/*
* Stack of Handlers waiting for work.
*/
private final Deque<FastPathHandler> fastPathHandlerStack = new ConcurrentLinkedDeque<>();
private final Deque<FastPathRpcHandler> fastPathHandlerStack = new ConcurrentLinkedDeque<>();

public FastPathBalancedQueueRpcExecutor(final String name, final int handlerCount,
final int maxQueueLength, final PriorityFunction priority, final Configuration conf,
Expand All @@ -56,10 +56,12 @@ public FastPathBalancedQueueRpcExecutor(final String name, final int handlerCoun
}

@Override
protected Handler getHandler(String name, double handlerFailureThreshhold,
BlockingQueue<CallRunner> q, AtomicInteger activeHandlerCount) {
return new FastPathHandler(name, handlerFailureThreshhold, q, activeHandlerCount,
fastPathHandlerStack);
protected RpcHandler getHandler(final String name, final double handlerFailureThreshhold,
final int handlerCount, final BlockingQueue<CallRunner> q,
final AtomicInteger activeHandlerCount, final AtomicInteger failedHandlerCount,
final Abortable abortable) {
return new FastPathRpcHandler(name, handlerFailureThreshhold, handlerCount, q,
activeHandlerCount, failedHandlerCount, abortable, fastPathHandlerStack);
}

@Override
Expand All @@ -69,62 +71,14 @@ public boolean dispatch(CallRunner callTask) throws InterruptedException {
if (currentQueueLimit == 0){
return false;
}
FastPathHandler handler = popReadyHandler();
FastPathRpcHandler handler = popReadyHandler();
return handler != null? handler.loadCallRunner(callTask): super.dispatch(callTask);
}

/**
* @return Pop a Handler instance if one available ready-to-go or else return null.
*/
private FastPathHandler popReadyHandler() {
private FastPathRpcHandler popReadyHandler() {
return this.fastPathHandlerStack.poll();
}

class FastPathHandler extends Handler {
// Below are for fast-path support. Push this Handler on to the fastPathHandlerStack Deque
// if an empty queue of CallRunners so we are available for direct handoff when one comes in.
final Deque<FastPathHandler> fastPathHandlerStack;
// Semaphore to coordinate loading of fastpathed loadedTask and our running it.
// UNFAIR synchronization.
private Semaphore semaphore = new Semaphore(0);
// The task we get when fast-pathing.
private CallRunner loadedCallRunner;

FastPathHandler(String name, double handlerFailureThreshhold, BlockingQueue<CallRunner> q,
final AtomicInteger activeHandlerCount,
final Deque<FastPathHandler> fastPathHandlerStack) {
super(name, handlerFailureThreshhold, q, activeHandlerCount);
this.fastPathHandlerStack = fastPathHandlerStack;
}

@Override
protected CallRunner getCallRunner() throws InterruptedException {
// Get a callrunner if one in the Q.
CallRunner cr = this.q.poll();
if (cr == null) {
// Else, if a fastPathHandlerStack present and no callrunner in Q, register ourselves for
// the fastpath handoff done via fastPathHandlerStack.
if (this.fastPathHandlerStack != null) {
this.fastPathHandlerStack.push(this);
this.semaphore.acquire();
cr = this.loadedCallRunner;
this.loadedCallRunner = null;
} else {
// No fastpath available. Block until a task comes available.
cr = super.getCallRunner();
}
}
return cr;
}

/**
* @param cr Task gotten via fastpath.
* @return True if we successfully loaded our task
*/
boolean loadCallRunner(final CallRunner cr) {
this.loadedCallRunner = cr;
this.semaphore.release();
return true;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/**

* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;

import java.util.Deque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* RPC Executor that extends {@link RWQueueRpcExecutor} with fast-path feature, used in
* {@link FastPathBalancedQueueRpcExecutor}.
*/
@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
@InterfaceStability.Evolving
public class FastPathRWQueueRpcExecutor extends RWQueueRpcExecutor {
private static final Logger LOG = LoggerFactory.getLogger(RWQueueRpcExecutor.class);

private final Deque<FastPathRpcHandler> readHandlerStack = new ConcurrentLinkedDeque<>();
private final Deque<FastPathRpcHandler> writeHandlerStack = new ConcurrentLinkedDeque<>();
private final Deque<FastPathRpcHandler> scanHandlerStack = new ConcurrentLinkedDeque<>();

public FastPathRWQueueRpcExecutor(String name, int handlerCount, int maxQueueLength,
PriorityFunction priority, Configuration conf, Abortable abortable) {
super(name, handlerCount, maxQueueLength, priority, conf, abortable);
}

@Override
protected RpcHandler getHandler(final String name, final double handlerFailureThreshhold,
final int handlerCount, final BlockingQueue<CallRunner> q,
final AtomicInteger activeHandlerCount, final AtomicInteger failedHandlerCount,
final Abortable abortable) {
Deque<FastPathRpcHandler> handlerStack = name.contains("read") ? readHandlerStack :
name.contains("write") ? writeHandlerStack : scanHandlerStack;
return new FastPathRpcHandler(name, handlerFailureThreshhold, handlerCount, q,
activeHandlerCount, failedHandlerCount, abortable, handlerStack);
}

@Override
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
RpcCall call = callTask.getRpcCall();
boolean isWriteRequest = isWriteRequest(call.getHeader(), call.getParam());
boolean shouldDispatchToScanQueue = shouldDispatchToScanQueue(callTask);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here it uses isWriteRequest, can you rename it isScanRequest as well? Naming alias.

Copy link
Contributor Author

@YutSean YutSean Dec 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about change the isWriteRequest to shouldDispatchToWriteQueue. While scan requests can also be handled as read requests when the scan ratio is 0.0.

FastPathRpcHandler handler = isWriteRequest ? writeHandlerStack.poll() :
shouldDispatchToScanQueue ? scanHandlerStack.poll() : readHandlerStack.poll();
return handler != null ? handler.loadCallRunner(callTask) :
dispatchTo(isWriteRequest, shouldDispatchToScanQueue, callTask);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;

import java.util.Deque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.Abortable;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public class FastPathRpcHandler extends RpcHandler {
// Below are for fast-path support. Push this Handler on to the fastPathHandlerStack Deque
// if an empty queue of CallRunners so we are available for direct handoff when one comes in.
final Deque<FastPathRpcHandler> fastPathHandlerStack;
// Semaphore to coordinate loading of fastpathed loadedTask and our running it.
// UNFAIR synchronization.
private Semaphore semaphore = new Semaphore(0);
// The task we get when fast-pathing.
private CallRunner loadedCallRunner;

FastPathRpcHandler(String name, double handlerFailureThreshhold, int handlerCount,
BlockingQueue<CallRunner> q, AtomicInteger activeHandlerCount,
AtomicInteger failedHandlerCount, final Abortable abortable,
final Deque<FastPathRpcHandler> fastPathHandlerStack) {
super(name, handlerFailureThreshhold, handlerCount, q, activeHandlerCount, failedHandlerCount,
abortable);
this.fastPathHandlerStack = fastPathHandlerStack;
}

@Override
protected CallRunner getCallRunner() throws InterruptedException {
// Get a callrunner if one in the Q.
CallRunner cr = this.q.poll();
if (cr == null) {
// Else, if a fastPathHandlerStack present and no callrunner in Q, register ourselves for
// the fastpath handoff done via fastPathHandlerStack.
if (this.fastPathHandlerStack != null) {
this.fastPathHandlerStack.push(this);
this.semaphore.acquire();
cr = this.loadedCallRunner;
this.loadedCallRunner = null;
} else {
// No fastpath available. Block until a task comes available.
cr = super.getCallRunner();
}
}
return cr;
}

/**
* @param cr Task gotten via fastpath.
* @return True if we successfully loaded our task
*/
boolean loadCallRunner(final CallRunner cr) {
this.loadedCallRunner = cr;
this.semaphore.release();
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.hadoop.hbase.ipc;

import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -130,16 +131,21 @@ protected void startHandlers(final int port) {
@Override
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
RpcCall call = callTask.getRpcCall();
return dispatchTo(isWriteRequest(call.getHeader(), call.getParam()),
shouldDispatchToScanQueue(callTask), callTask);
}

protected boolean dispatchTo(boolean toWriteQueue, boolean toScanQueue,
final CallRunner callTask) {
int queueIndex;
if (isWriteRequest(call.getHeader(), call.getParam())) {
if (toWriteQueue) {
queueIndex = writeBalancer.getNextQueue();
} else if (numScanQueues > 0 && isScanRequest(call.getHeader(), call.getParam())) {
} else if (toScanQueue) {
queueIndex = numWriteQueues + numReadQueues + scanBalancer.getNextQueue();
} else {
queueIndex = numWriteQueues + readBalancer.getNextQueue();
}

BlockingQueue<CallRunner> queue = queues.get(queueIndex);
Queue<CallRunner> queue = queues.get(queueIndex);
if (queue.size() >= currentQueueLimit) {
return false;
}
Expand Down Expand Up @@ -232,6 +238,11 @@ private boolean isScanRequest(final RequestHeader header, final Message param) {
return param instanceof ScanRequest;
}

protected boolean shouldDispatchToScanQueue(final CallRunner task) {
RpcCall call = task.getRpcCall();
return numScanQueues > 0 && isScanRequest(call.getHeader(), call.getParam());
}

protected float getReadShare(final Configuration conf) {
return conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
}
Expand Down
Loading