diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java index 7ceaaec362d0..e1a49491dc2e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java @@ -324,34 +324,10 @@ public boolean progress() { } /** - * This function calculates how many splitters this RS should create based on expected average - * tasks per RS and the hard limit upper bound(maxConcurrentTasks) set by configuration.
- * At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper Bound) - * @param numTasks total number of split tasks available - * @return number of tasks this RS can grab - */ - private int getNumExpectedTasksPerRS(int numTasks) { - // at lease one RS(itself) available - int availableRSs = 1; - try { - List regionServers = - ZKUtil.listChildrenNoWatch(watcher, watcher.getZNodePaths().rsZNode); - availableRSs = Math.max(availableRSs, (regionServers == null) ? 0 : regionServers.size()); - } catch (KeeperException e) { - // do nothing - LOG.debug("getAvailableRegionServers got ZooKeeper exception", e); - } - int expectedTasksPerRS = (numTasks / availableRSs) + ((numTasks % availableRSs == 0) ? 0 : 1); - return Math.max(1, expectedTasksPerRS); // at least be one - } - - /** - * @param expectedTasksPerRS Average number of tasks to be handled by each RS * @return true if more splitters are available, otherwise false. */ - private boolean areSplittersAvailable(int expectedTasksPerRS) { - return (Math.min(expectedTasksPerRS, maxConcurrentTasks) - - this.tasksInProgress.get()) > 0; + private boolean areSplittersAvailable() { + return maxConcurrentTasks - tasksInProgress.get() > 0; } /** @@ -432,13 +408,14 @@ public void taskLoop() throws InterruptedException { } } int numTasks = paths.size(); - int expectedTasksPerRS = getNumExpectedTasksPerRS(numTasks); boolean taskGrabbed = false; for (int i = 0; i < numTasks; i++) { while (!shouldStop) { - if (this.areSplittersAvailable(expectedTasksPerRS)) { - LOG.debug("Current region server " + server.getServerName() + if (this.areSplittersAvailable()) { + if (LOG.isTraceEnabled()) { + LOG.trace("Current region server " + server.getServerName() + " is ready to take more tasks, will get task list and try grab tasks again."); + } int idx = (i + offset) % paths.size(); // don't call ZKSplitLog.getNodeName() because that will lead to // double encoding of the path name @@ -446,8 +423,10 @@ public void taskLoop() throws InterruptedException { watcher.getZNodePaths().splitLogZNode, paths.get(idx))); break; } else { - LOG.debug("Current region server " + server.getServerName() + " has " + if (LOG.isTraceEnabled()) { + LOG.trace("Current region server " + server.getServerName() + " has " + this.tasksInProgress.get() + " tasks in progress and can't take more."); + } Thread.sleep(100); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java index b52bf1977e16..2c52bc0c2105 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java @@ -471,53 +471,6 @@ public void testAcquireMultiTasks() throws Exception { } } - /** - * The test checks SplitLogWorker should not spawn more splitters than expected num of tasks per - * RS - * @throws Exception - */ - @Test - public void testAcquireMultiTasksByAvgTasksPerRS() throws Exception { - LOG.info("testAcquireMultiTasks"); - SplitLogCounters.resetCounters(); - final String TATAS = "tatas"; - final ServerName RS = ServerName.valueOf("rs,1,1"); - final ServerName RS2 = ServerName.valueOf("rs,1,2"); - final int maxTasks = 3; - Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); - testConf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, maxTasks); - RegionServerServices mockedRS = getRegionServer(RS); - - // create two RS nodes - String rsPath = ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, RS.getServerName()); - zkw.getRecoverableZooKeeper().create(rsPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); - rsPath = ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, RS2.getServerName()); - zkw.getRecoverableZooKeeper().create(rsPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); - - for (int i = 0; i < maxTasks; i++) { - zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i), - new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), - Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } - - SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, neverEndingTask); - slw.start(); - try { - int acquiredTasks = 0; - waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 2, WAIT_TIME); - for (int i = 0; i < maxTasks; i++) { - byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i)); - SplitLogTask slt = SplitLogTask.parseFrom(bytes); - if (slt.isOwned(RS)) { - acquiredTasks++; - } - } - assertEquals(2, acquiredTasks); - } finally { - stopSplitLogWorker(slw); - } - } - /** * Create a mocked region server service instance */