Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27962 Introduce an AdaptiveFastPathRWRpcExecutor to make the W/R/S separations fit various workloads #5316

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;

import java.util.Deque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This is subclass of {@link FastPathRWQueueRpcExecutor}, which has a better utility under various
* kinds of workloads.
*/
@InterfaceAudience.Private
public class AdaptiveFastPathRWQueueRpcExecutor extends FastPathRWQueueRpcExecutor {
private static final Logger LOG = LoggerFactory.getLogger(FastPathRWQueueRpcExecutor.class);

private static final String FASTPATH_ADAPTIVE_RATIO =
"hbase.ipc.server.callqueue.fastpath.adaptive.ratio";
private static final float FASTPATH_ADAPTIVE_DEFAULT = 0;
private static final String FASTPATH_ADAPTIVE_RADIO_WRITE =
"hbase.ipc.server.callqueue.fastpath.adaptive.ratio.write";
private static final String FASTPATH_ADAPTIVE_RADIO_READ =
"hbase.ipc.server.callqueue.fastpath.adaptive.ratio.read";
private static final String FASTPATH_ADAPTIVE_RADIO_SCAN =
"hbase.ipc.server.callqueue.fastpath.adaptive.ratio.scan";

private final int writeSharedHandlers;
private final int readSharedHandlers;
private final int scanSharedHandlers;

protected final Deque<FastPathRpcHandler> sharedHandlerStack = new ConcurrentLinkedDeque<>();

public AdaptiveFastPathRWQueueRpcExecutor(String name, int handlerCount, int maxQueueLength,
PriorityFunction priority, Configuration conf, Abortable abortable) {
super(name, handlerCount, maxQueueLength, priority, conf, abortable);
float adaptiveRatio = conf.getFloat(FASTPATH_ADAPTIVE_RATIO, FASTPATH_ADAPTIVE_DEFAULT);
if (!checkAdaptiveRatioRationality(conf)) {
LOG.warn("The adaptive ratio should be in (0.0, 1.0) but get " + adaptiveRatio
+ " using the default ratio: " + FASTPATH_ADAPTIVE_DEFAULT);
adaptiveRatio = FASTPATH_ADAPTIVE_DEFAULT;
}

float writeRatio = conf.getFloat(FASTPATH_ADAPTIVE_RADIO_WRITE, FASTPATH_ADAPTIVE_DEFAULT);
float readRatio = conf.getFloat(FASTPATH_ADAPTIVE_RADIO_READ, FASTPATH_ADAPTIVE_DEFAULT);
float scanRatio = conf.getFloat(FASTPATH_ADAPTIVE_RADIO_SCAN, FASTPATH_ADAPTIVE_DEFAULT);

writeSharedHandlers = checkRatioRationality(writeRatio)
? (int) (writeRatio * writeHandlersCount)
: (int) (adaptiveRatio * writeHandlersCount);
readSharedHandlers = checkRatioRationality(readRatio)
? (int) (readRatio * readHandlersCount)
: (int) (adaptiveRatio * readHandlersCount);
scanSharedHandlers = checkRatioRationality(scanRatio)
? (int) (scanRatio * scanHandlersCount)
: (int) (adaptiveRatio * scanHandlersCount);
}

@Override
public boolean dispatch(CallRunner callTask) {
RpcCall call = callTask.getRpcCall();
boolean isWriteRequest = isWriteRequest(call.getHeader(), call.getParam());
boolean shouldDispatchToScanQueue = shouldDispatchToScanQueue(callTask);
FastPathRpcHandler handler = isWriteRequest ? writeHandlerStack.poll()
: shouldDispatchToScanQueue ? scanHandlerStack.poll()
: readHandlerStack.poll();
if (handler == null) {
handler = sharedHandlerStack.poll();
}

return handler != null
? handler.loadCallRunner(callTask)
: dispatchTo(isWriteRequest, shouldDispatchToScanQueue, callTask);
}

@Override
protected void startHandlers(final int port) {
startHandlers(".shared_write", writeSharedHandlers, queues, 0, numWriteQueues, port,
activeWriteHandlerCount);

startHandlers(".write", writeHandlersCount - writeSharedHandlers, queues, 0, numWriteQueues,
port, activeWriteHandlerCount);

startHandlers(".shared_read", readSharedHandlers, queues, numWriteQueues, numReadQueues, port,
activeReadHandlerCount);
startHandlers(".read", readHandlersCount - readSharedHandlers, queues, numWriteQueues,
numReadQueues, port, activeReadHandlerCount);

if (numScanQueues > 0) {
startHandlers(".shared_scan", scanSharedHandlers, queues, numWriteQueues + numReadQueues,
numScanQueues, port, activeScanHandlerCount);
startHandlers(".scan", scanHandlersCount - scanSharedHandlers, queues,
numWriteQueues + numReadQueues, numScanQueues, port, activeScanHandlerCount);
}
}

@Override
protected RpcHandler getHandler(final String name, final double handlerFailureThreshhold,
final int handlerCount, final BlockingQueue<CallRunner> q,
final AtomicInteger activeHandlerCount, final AtomicInteger failedHandlerCount,
final Abortable abortable) {
Deque<FastPathRpcHandler> handlerStack = name.contains("shared") ? sharedHandlerStack
: name.contains("read") ? readHandlerStack
: name.contains("write") ? writeHandlerStack
: scanHandlerStack;
return new FastPathRpcHandler(name, handlerFailureThreshhold, handlerCount, q,
activeHandlerCount, failedHandlerCount, abortable, handlerStack);
}

int getWriteStackLength() {
return writeHandlerStack.size();
}

int getReadStackLength() {
return readHandlerStack.size();
}

int getScanStackLength() {
return scanHandlerStack.size();
}

int getSharedStackLength() {
return sharedHandlerStack.size();
}

static boolean checkAdaptiveRatioRationality(Configuration conf) {
float ratio = conf.getFloat(FASTPATH_ADAPTIVE_RATIO, FASTPATH_ADAPTIVE_DEFAULT);
return checkRatioRationality(ratio);
}

private static boolean checkRatioRationality(float ratio) {
return !(ratio <= 0) && !(ratio >= 1.0f);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@
@InterfaceStability.Evolving
public class FastPathRWQueueRpcExecutor extends RWQueueRpcExecutor {

private final Deque<FastPathRpcHandler> readHandlerStack = new ConcurrentLinkedDeque<>();
private final Deque<FastPathRpcHandler> writeHandlerStack = new ConcurrentLinkedDeque<>();
private final Deque<FastPathRpcHandler> scanHandlerStack = new ConcurrentLinkedDeque<>();
protected final Deque<FastPathRpcHandler> readHandlerStack = new ConcurrentLinkedDeque<>();
protected final Deque<FastPathRpcHandler> writeHandlerStack = new ConcurrentLinkedDeque<>();
protected final Deque<FastPathRpcHandler> scanHandlerStack = new ConcurrentLinkedDeque<>();

public FastPathRWQueueRpcExecutor(String name, int handlerCount, int maxQueueLength,
PriorityFunction priority, Configuration conf, Abortable abortable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,16 @@ public class RWQueueRpcExecutor extends RpcExecutor {
private final QueueBalancer writeBalancer;
private final QueueBalancer readBalancer;
private final QueueBalancer scanBalancer;
private final int writeHandlersCount;
private final int readHandlersCount;
private final int scanHandlersCount;
private final int numWriteQueues;
private final int numReadQueues;
private final int numScanQueues;

private final AtomicInteger activeWriteHandlerCount = new AtomicInteger(0);
private final AtomicInteger activeReadHandlerCount = new AtomicInteger(0);
private final AtomicInteger activeScanHandlerCount = new AtomicInteger(0);
protected final int writeHandlersCount;
protected final int readHandlersCount;
protected final int scanHandlersCount;
protected final int numWriteQueues;
protected final int numReadQueues;
protected final int numScanQueues;

protected final AtomicInteger activeWriteHandlerCount = new AtomicInteger(0);
protected final AtomicInteger activeReadHandlerCount = new AtomicInteger(0);
protected final AtomicInteger activeScanHandlerCount = new AtomicInteger(0);

public RWQueueRpcExecutor(final String name, final int handlerCount, final int maxQueueLength,
final PriorityFunction priority, final Configuration conf, final Abortable abortable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,11 @@ public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHand

if (callqReadShare > 0) {
// at least 1 read handler and 1 write handler
callExecutor = new FastPathRWQueueRpcExecutor("default.FPRWQ", Math.max(2, handlerCount),
maxQueueLength, priority, conf, server);
callExecutor = AdaptiveFastPathRWQueueRpcExecutor.checkAdaptiveRatioRationality(conf)
? new AdaptiveFastPathRWQueueRpcExecutor("default.AFPRWQ", Math.max(2, handlerCount),
maxQueueLength, priority, conf, server)
: new FastPathRWQueueRpcExecutor("default.FPRWQ", Math.max(2, handlerCount), maxQueueLength,
priority, conf, server);
} else {
if (
RpcExecutor.isFifoQueueType(callQueueType) || RpcExecutor.isCodelQueueType(callQueueType)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;

import java.util.concurrent.Semaphore;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;

import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;

@Category(SmallTests.class)
public class TestAdaptiveFastPathRWQueueRpcExecutor {

private static final Semaphore blocker = new Semaphore(0);

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAdaptiveFastPathRWQueueRpcExecutor.class);

@Test
public void testInitialization() throws InterruptedException {
Configuration conf = new Configuration();
conf.setFloat("hbase.ipc.server.callqueue.read.ratio", 0.5f);
conf.setFloat("hbase.ipc.server.callqueue.scan.ratio", 0.5f);
conf.setFloat("hbase.ipc.server.callqueue.handler.factor", 0.1f);
conf.setFloat("hbase.ipc.server.callqueue.fastpath.adaptive.ratio", 0.1f);

AdaptiveFastPathRWQueueRpcExecutor executor =
new AdaptiveFastPathRWQueueRpcExecutor("testInitialization", 100, 250, null, conf, null);
executor.start(0);
Thread.sleep(1000);

// When the adaptive ratio is 0.1, then the shared stack should contain
// 25 * 0.1 + 25 * 0.1 + 50 * 0.1 = 9 handlers.
Assert.assertEquals(23, executor.getReadStackLength());
Assert.assertEquals(23, executor.getScanStackLength());
Assert.assertEquals(45, executor.getWriteStackLength());
Assert.assertEquals(9, executor.getSharedStackLength());
}

@Test
public void testInvalidRatio() throws InterruptedException {
Configuration conf = new Configuration();
conf.setFloat("hbase.ipc.server.callqueue.read.ratio", 0.5f);
conf.setFloat("hbase.ipc.server.callqueue.scan.ratio", 0.5f);
conf.setFloat("hbase.ipc.server.callqueue.handler.factor", 0.1f);
conf.setFloat("hbase.ipc.server.callqueue.fastpath.adaptive.ratio", -0.5f);

AdaptiveFastPathRWQueueRpcExecutor executor =
new AdaptiveFastPathRWQueueRpcExecutor("testInvalidRatio", 100, 250, null, conf, null);
executor.start(0);
Thread.sleep(1000);

// If the adaptive ratio is invalid, we just use the default ratio 0, with which the shared
// stack should be empty.
Assert.assertEquals(25, executor.getReadStackLength());
Assert.assertEquals(25, executor.getScanStackLength());
Assert.assertEquals(50, executor.getWriteStackLength());
Assert.assertEquals(0, executor.getSharedStackLength());
}

@Test
public void testCustomRatio() throws InterruptedException {
Configuration conf = new Configuration();
conf.setFloat("hbase.ipc.server.callqueue.read.ratio", 0.5f);
conf.setFloat("hbase.ipc.server.callqueue.scan.ratio", 0.5f);
conf.setFloat("hbase.ipc.server.callqueue.handler.factor", 0.1f);
conf.setFloat("hbase.ipc.server.callqueue.fastpath.adaptive.ratio", 0.2f);

AdaptiveFastPathRWQueueRpcExecutor executor =
new AdaptiveFastPathRWQueueRpcExecutor("testCustomRatio", 100, 250, null, conf, null);
executor.start(0);
Thread.sleep(1000);

// When the adaptive ratio is 0.2, then the shared stack should contain
// 25 * 0.2 + 25 * 0.2 + 50 * 0.2 = 20 handlers.
Assert.assertEquals(20, executor.getReadStackLength());
Assert.assertEquals(20, executor.getScanStackLength());
Assert.assertEquals(40, executor.getWriteStackLength());
Assert.assertEquals(20, executor.getSharedStackLength());
}

@Test
public void testActiveHandlerCount() throws InterruptedException {
Configuration conf = new Configuration();
conf.setFloat("hbase.ipc.server.callqueue.read.ratio", 0.5f);
conf.setFloat("hbase.ipc.server.callqueue.scan.ratio", 0.5f);
conf.setFloat("hbase.ipc.server.callqueue.handler.factor", 0.1f);
conf.setFloat("hbase.ipc.server.callqueue.fastpath.adaptive.ratio", 0.2f);

AdaptiveFastPathRWQueueRpcExecutor executor =
new AdaptiveFastPathRWQueueRpcExecutor("testInitialization", 100, 250, null, conf, null);
executor.start(0);
Thread.sleep(1000);

RpcCall call = Mockito.mock(RpcCall.class);
ClientProtos.ScanRequest scanRequest =
ClientProtos.ScanRequest.newBuilder().getDefaultInstanceForType();

Mockito.when(call.getParam()).thenReturn(scanRequest);

// When the adaptive ratio is 0.2, then the shared stack should contain
// 25 * 0.2 + 25 * 0.2 + 50 * 0.2 = 20 handlers.
// We send 30 CallRunner with mocked scan requests here.

for (int i = 0; i < 30; i++) {
CallRunner temp = new DummyCallRunner(null, call);
executor.dispatch(temp);
}
// Wait for the dummy CallRunner being executed.
Thread.sleep(2000);

Assert.assertEquals(0, executor.getScanStackLength());
Assert.assertEquals(10, executor.getSharedStackLength());
Assert.assertEquals(40, executor.getWriteStackLength());
Assert.assertEquals(20, executor.getReadStackLength());
Assert.assertEquals(30, executor.getActiveScanHandlerCount()
+ executor.getActiveWriteHandlerCount() + executor.getActiveReadHandlerCount());
}

static class DummyCallRunner extends CallRunner {

DummyCallRunner(RpcServerInterface rpcServer, RpcCall call) {
super(rpcServer, call);
}

@Override
public void run() {
try {
blocker.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}