-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
HBASE-26703 Allow configuration of IPC queue balancer
- Loading branch information
1 parent
056e8fb
commit 630dcea
Showing
6 changed files
with
182 additions
and
50 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
17 changes: 17 additions & 0 deletions
17
hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/QueueBalancer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
package org.apache.hadoop.hbase.ipc; | ||
|
||
import org.apache.hadoop.hbase.HBaseInterfaceAudience; | ||
import org.apache.yetus.audience.InterfaceAudience; | ||
import org.apache.yetus.audience.InterfaceStability; | ||
|
||
/** | ||
* Interface for balancing requests across IPC queues | ||
*/ | ||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) | ||
@InterfaceStability.Stable | ||
public interface QueueBalancer { | ||
/** | ||
* @return the index of the next queue to which a request should be inserted | ||
*/ | ||
int getNextQueue(CallRunner callRunner); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
34 changes: 34 additions & 0 deletions
34
hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RandomQueueBalancer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
package org.apache.hadoop.hbase.ipc; | ||
|
||
import com.google.errorprone.annotations.RestrictedApi; | ||
import java.util.List; | ||
import java.util.concurrent.BlockingQueue; | ||
import java.util.concurrent.ThreadLocalRandom; | ||
import org.apache.hadoop.conf.Configuration; | ||
import org.apache.yetus.audience.InterfaceAudience; | ||
import org.apache.yetus.audience.InterfaceStability; | ||
|
||
/** | ||
* Queue balancer that just randomly selects a queue in the range [0, num queues). | ||
*/ | ||
@InterfaceAudience.Private | ||
@InterfaceStability.Stable | ||
public class RandomQueueBalancer implements QueueBalancer { | ||
private final int queueSize; | ||
private final List<BlockingQueue<CallRunner>> queues; | ||
|
||
public RandomQueueBalancer(Configuration conf, List<BlockingQueue<CallRunner>> queues) { | ||
this.queueSize = queues.size(); | ||
this.queues = queues; | ||
} | ||
|
||
@Override public int getNextQueue(CallRunner callRunner) { | ||
return ThreadLocalRandom.current().nextInt(queueSize); | ||
} | ||
|
||
@RestrictedApi(explanation = "Should only be called in tests", link = "", | ||
allowedOnPath = ".*/src/test/.*") | ||
List<BlockingQueue<CallRunner>> getQueues() { | ||
return queues; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
89 changes: 89 additions & 0 deletions
89
hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRWQueueRpcExecutor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
package org.apache.hadoop.hbase.ipc; | ||
|
||
import static org.apache.hadoop.hbase.ipc.RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY; | ||
import static org.apache.hadoop.hbase.ipc.RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY; | ||
import static org.apache.hadoop.hbase.ipc.RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY; | ||
import static org.junit.Assert.assertEquals; | ||
import static org.junit.Assert.assertTrue; | ||
import static org.mockito.Mockito.*; | ||
import java.util.List; | ||
import java.util.concurrent.BlockingQueue; | ||
import org.apache.hadoop.conf.Configuration; | ||
import org.apache.hadoop.hbase.HBaseClassTestRule; | ||
import org.apache.hadoop.hbase.HBaseConfiguration; | ||
import org.apache.hadoop.hbase.testclassification.MediumTests; | ||
import org.apache.hadoop.hbase.testclassification.RPCTests; | ||
import org.junit.Before; | ||
import org.junit.ClassRule; | ||
import org.junit.Rule; | ||
import org.junit.Test; | ||
import org.junit.experimental.categories.Category; | ||
import org.junit.rules.TestName; | ||
|
||
@Category({ RPCTests.class, MediumTests.class}) | ||
public class TestRWQueueRpcExecutor { | ||
|
||
@ClassRule | ||
public static final HBaseClassTestRule CLASS_RULE = | ||
HBaseClassTestRule.forClass(TestRWQueueRpcExecutor.class); | ||
|
||
@Rule | ||
public TestName testName = new TestName(); | ||
|
||
private Configuration conf; | ||
|
||
@Before | ||
public void setUp() { | ||
conf = HBaseConfiguration.create(); | ||
conf.setFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f); | ||
conf.setFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f); | ||
conf.setFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f); | ||
} | ||
|
||
@Test public void itProvidesCorrectQueuesToBalancers() throws InterruptedException { | ||
PriorityFunction qosFunction = mock(PriorityFunction.class); | ||
RWQueueRpcExecutor executor = | ||
new RWQueueRpcExecutor(testName.getMethodName(), 100, 100, qosFunction, conf, null); | ||
|
||
QueueBalancer readBalancer = executor.getReadBalancer(); | ||
QueueBalancer writeBalancer = executor.getWriteBalancer(); | ||
QueueBalancer scanBalancer = executor.getScanBalancer(); | ||
|
||
assertTrue(readBalancer instanceof RandomQueueBalancer); | ||
assertTrue(writeBalancer instanceof RandomQueueBalancer); | ||
assertTrue(scanBalancer instanceof RandomQueueBalancer); | ||
|
||
List<BlockingQueue<CallRunner>> readQueues = ((RandomQueueBalancer) readBalancer).getQueues(); | ||
List<BlockingQueue<CallRunner>> writeQueues = ((RandomQueueBalancer) writeBalancer).getQueues(); | ||
List<BlockingQueue<CallRunner>> scanQueues = ((RandomQueueBalancer) scanBalancer).getQueues(); | ||
|
||
assertEquals(25, readQueues.size()); | ||
assertEquals(50, writeQueues.size()); | ||
assertEquals(25, scanQueues.size()); | ||
|
||
verifyDistinct(readQueues, writeQueues, scanQueues); | ||
verifyDistinct(writeQueues, readQueues, scanQueues); | ||
verifyDistinct(scanQueues, readQueues, writeQueues); | ||
|
||
} | ||
|
||
private void verifyDistinct(List<BlockingQueue<CallRunner>> queues, List<BlockingQueue<CallRunner>>... others) | ||
throws InterruptedException { | ||
CallRunner mock = mock(CallRunner.class); | ||
for (BlockingQueue<CallRunner> queue : queues) { | ||
queue.put(mock); | ||
assertEquals(1, queue.size()); | ||
} | ||
|
||
for (List<BlockingQueue<CallRunner>> other : others) { | ||
for (BlockingQueue<CallRunner> queue : other) { | ||
assertEquals(0, queue.size()); | ||
} | ||
} | ||
|
||
// clear them for next test | ||
for (BlockingQueue<CallRunner> queue : queues) { | ||
queue.clear(); | ||
} | ||
} | ||
} |