diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 08987ea6b194..cf74f805afeb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -149,6 +149,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.ForbidMajorCompactionChecker; import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationSink; +import org.apache.hadoop.hbase.regionserver.throttle.BulkLoadThrottler; import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector; @@ -7140,7 +7141,7 @@ private static boolean hasMultipleColumnFamilies(Collection */ public Map> bulkLoadHFiles(Collection> familyPaths, boolean assignSeqId, BulkLoadListener bulkLoadListener) throws IOException { - return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, null, true); + return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, null, true, null); } /** @@ -7185,7 +7186,8 @@ String prepareBulkLoad(byte[] family, String srcPath, boolean copyFile, String c */ public Map> bulkLoadHFiles(Collection> familyPaths, boolean assignSeqId, BulkLoadListener bulkLoadListener, boolean copyFile, - List clusterIds, boolean replicate) throws IOException { + List clusterIds, boolean replicate, BulkLoadThrottler bulkLoadThrottler) + throws IOException { long seqId = -1; Map> storeFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR); Map storeFilesSizes = new HashMap<>(); @@ -7281,7 +7283,10 @@ public Map> bulkLoadHFiles(Collection> f } Pair pair = null; if (reqTmp) { - pair = store.preBulkLoadHFile(finalPath, seqId); + pair = store.preBulkLoadHFile(finalPath, seqId, + (bulkLoadThrottler != null && bulkLoadThrottler.isEnabled()) + ? bulkLoadThrottler + : null); } else { Path livePath = new Path(finalPath); pair = new Pair<>(livePath, livePath); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java index 48afdc59f86f..879edc0095b5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; +import org.apache.hadoop.hbase.regionserver.throttle.BulkLoadThrottler; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.FSUtils; @@ -552,8 +553,8 @@ public void removeStoreFiles(String familyName, Collection storeFile * @param seqNum Bulk Load sequence number * @return The destination {@link Path} of the bulk loaded file */ - Pair bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum) - throws IOException { + Pair bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum, + BulkLoadThrottler bulkLoadThrottler) throws IOException { // Copy the file if it's on another filesystem FileSystem srcFs = srcPath.getFileSystem(conf); srcPath = srcFs.resolvePath(srcPath); @@ -567,7 +568,11 @@ Pair bulkLoadStoreFile(final String familyName, Path srcPath, long s LOG.info("Bulk-load file " + srcPath + " is on different filesystem than " + "the destination store. Copying file over to destination filesystem."); Path tmpPath = createTempName(); - FileUtil.copy(realSrcFs, srcPath, fs, tmpPath, false, conf); + if (bulkLoadThrottler != null && bulkLoadThrottler.isEnabled()) { + bulkLoadThrottler.copy(realSrcFs, srcPath, fs, tmpPath, false, conf); + } else { + FileUtil.copy(realSrcFs, srcPath, fs, tmpPath, false, conf); + } LOG.info("Copied " + srcPath + " to temporary path on destination filesystem: " + tmpPath); srcPath = tmpPath; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 5240df5c62a1..3e7b2dfc4e4d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -3402,6 +3402,17 @@ public void onConfigurationChange(Configuration newConf) { LOG.info("Update region server coprocessors because the configuration has changed"); this.rsHost = new RegionServerCoprocessorHost(this, newConf); } + + long oldBandWidth = secureBulkLoadManager.getBulkLoadThrottler().getConf().getLong( + SecureBulkLoadManager.HBASE_BULKLOAD_NODE_BANDWIDTH, + SecureBulkLoadManager.DEFAULT_HBASE_BULKLOAD_NODE_BANDWIDTH); + long newBandWidth = newConf.getLong(SecureBulkLoadManager.HBASE_BULKLOAD_NODE_BANDWIDTH, + SecureBulkLoadManager.DEFAULT_HBASE_BULKLOAD_NODE_BANDWIDTH); + if (oldBandWidth != newBandWidth) { + LOG.info("ConfigurationChange bulkload oldBandWidth is {} " + "newBandWidth is {}", + oldBandWidth, newBandWidth); + this.secureBulkLoadManager.getBulkLoadThrottler().setConf(newConf); + } } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 5a2a74a61a46..e2fbcdafb830 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -85,6 +85,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; +import org.apache.hadoop.hbase.regionserver.throttle.BulkLoadThrottler; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.security.EncryptionUtil; @@ -670,9 +671,11 @@ public void assertBulkLoadHFileOk(Path srcPath) throws IOException { * HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this) * @param seqNum sequence Id associated with the HFile */ - public Pair preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException { + public Pair preBulkLoadHFile(String srcPathStr, long seqNum, + BulkLoadThrottler bulkLoadThrottler) throws IOException { Path srcPath = new Path(srcPathStr); - return getRegionFileSystem().bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum); + return getRegionFileSystem().bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum, + bulkLoadThrottler); } public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java index a6a67fb91390..1bcfda062905 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.AsyncConnection; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.regionserver.HRegion.BulkLoadListener; +import org.apache.hadoop.hbase.regionserver.throttle.BulkLoadThrottler; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier; @@ -106,6 +107,11 @@ public class SecureBulkLoadManager { private UserProvider userProvider; private ConcurrentHashMap ugiReferenceCounter; private AsyncConnection conn; + private Long defaultBulkLoadBandwidth; + public final static String HBASE_BULKLOAD_NODE_BANDWIDTH = + "hbase.regionserver.bulkload.node.bandwidth"; + public final static Long DEFAULT_HBASE_BULKLOAD_NODE_BANDWIDTH = 0L; + private BulkLoadThrottler bulkLoadThrottler; SecureBulkLoadManager(Configuration conf, AsyncConnection conn) { this.conf = conf; @@ -130,6 +136,13 @@ public void start() throws IOException { fs.setPermission(baseStagingDir, PERM_HIDDEN); } } + defaultBulkLoadBandwidth = + conf.getLong(HBASE_BULKLOAD_NODE_BANDWIDTH, DEFAULT_HBASE_BULKLOAD_NODE_BANDWIDTH); + bulkLoadThrottler = new BulkLoadThrottler(defaultBulkLoadBandwidth, conf); + } + + public BulkLoadThrottler getBulkLoadThrottler() { + return bulkLoadThrottler; } public void stop() throws IOException { @@ -286,8 +299,10 @@ public Map> run() { // We call bulkLoadHFiles as requesting user // To enable access prior to staging return region.bulkLoadHFiles(familyPaths, true, - new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile(), clusterIds, - request.getReplicate()); + (bulkLoadThrottler != null && bulkLoadThrottler.isEnabled()) + ? new SecureBulkLoadListener(fs, bulkToken, conf, bulkLoadThrottler) + : new SecureBulkLoadListener(fs, bulkToken, conf), + request.getCopyFile(), clusterIds, request.getReplicate(), null); } catch (Exception e) { LOG.error("Failed to complete bulk load", e); } @@ -348,6 +363,16 @@ static class SecureBulkLoadListener implements BulkLoadListener { private FileSystem srcFs = null; private Map origPermissions = null; private Map origSources = null; + private BulkLoadThrottler bulkLoadThrottler; + + public SecureBulkLoadListener(FileSystem fs, String stagingDir, Configuration conf, + BulkLoadThrottler bulkLoadThrottler) { + this.fs = fs; + this.stagingDir = stagingDir; + this.conf = conf; + this.origPermissions = new HashMap<>(); + this.bulkLoadThrottler = bulkLoadThrottler; + } public SecureBulkLoadListener(FileSystem fs, String stagingDir, Configuration conf) { this.fs = fs; @@ -389,10 +414,18 @@ public String prepareBulkLoad(final byte[] family, final String srcPath, boolean if (!FSUtils.isSameHdfs(conf, srcFs, fs)) { LOG.debug("Bulk-load file " + srcPath + " is on different filesystem than " + "the destination filesystem. Copying file over to destination staging dir."); - FileUtil.copy(srcFs, p, fs, stageP, false, conf); + if (null != bulkLoadThrottler && bulkLoadThrottler.isEnabled()) { + bulkLoadThrottler.copy(srcFs, p, fs, stageP, false, conf); + } else { + FileUtil.copy(srcFs, p, fs, stageP, false, conf); + } } else if (copyFile) { LOG.debug("Bulk-load file " + srcPath + " is copied to destination staging dir."); - FileUtil.copy(srcFs, p, fs, stageP, false, conf); + if (null != bulkLoadThrottler && bulkLoadThrottler.isEnabled()) { + bulkLoadThrottler.copy(srcFs, p, fs, stageP, false, conf); + } else { + FileUtil.copy(srcFs, p, fs, stageP, false, conf); + } } else { LOG.debug("Moving " + p + " to " + stageP); FileStatus origFileStatus = fs.getFileStatus(p); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/BulkLoadThrottler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/BulkLoadThrottler.java new file mode 100644 index 000000000000..13f5a27e412a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/BulkLoadThrottler.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.throttle; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; +import static org.apache.hadoop.io.IOUtils.closeStream; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintStream; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.io.IOUtils; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Bulk load throttling controller for RS: enabled if bandwidth > 0, a cycle = 100ms, by + * throttling we guarantee data pushed to peer within each cycle won't exceed 'bandwidth' bytes + */ +@InterfaceAudience.Private +public class BulkLoadThrottler { + private static final Logger LOG = LoggerFactory.getLogger(BulkLoadThrottler.class); + private Configuration conf; + private boolean enabled; + private long bandwidth; + private long cyclePushSize; + private long speedLastDownloadSize; + private long cycleStartTick; + // a cycle = 1000ms + private static final Long CYCLE = 1000L; + private Long currentBulkLoadBandwidth; + + /** + * BulkLoadThrottler constructor If bandwidth less than 1, throttling is disabled + * @param bandwidth bandwidth cycle(1000ms) + */ + public BulkLoadThrottler(final long bandwidth, Configuration conf) { + this.conf = conf; + this.bandwidth = bandwidth; + LOG.info("Init bulk load bandwidth is {}", bandwidth); + this.enabled = this.bandwidth > 0; + if (this.enabled) { + this.cyclePushSize = 0; + this.cycleStartTick = EnvironmentEdgeManager.currentTime(); + } + currentBulkLoadBandwidth = conf.getLong(SecureBulkLoadManager.HBASE_BULKLOAD_NODE_BANDWIDTH, + SecureBulkLoadManager.DEFAULT_HBASE_BULKLOAD_NODE_BANDWIDTH); + } + + public void setConf(Configuration newConf) { + this.conf = newConf; + } + + public Configuration getConf() { + return conf; + } + + /** + * If bulk load throttling is enabled + * @return true if bulk load throttling is enabled + */ + public boolean isEnabled() { + return this.enabled; + } + + public synchronized void limitNextBytes(final int size) { + if (!this.enabled) { + return; + } + addPushSize(size); + long now = EnvironmentEdgeManager.currentTime(); + if (cyclePushSize - speedLastDownloadSize >= bandwidth) { + long interval = now - cycleStartTick; + if (interval < CYCLE) { + long sleepTicks = CYCLE - interval; + if (sleepTicks > 0) { + if (LOG.isTraceEnabled()) { + LOG.trace("To sleep {}ms for bulk load throttling control ", sleepTicks); + } + try { + Thread.sleep(sleepTicks); + } catch (InterruptedException e) { + LOG.error( + "Interrupted while sleeping for throttling control, " + "currentByteSize is {}", size, + e); + } + } + } + speedLastDownloadSize = cyclePushSize; + resetStartTick(); + } else { + long nextCycleTick = this.cycleStartTick + CYCLE; + if (now > nextCycleTick) { + this.cycleStartTick = now; + } + } + } + + /** + * Add current size to the current cycle's total push size + * @param size is the current size added to the current cycle's total push size + */ + private void addPushSize(final int size) { + if (this.enabled) { + this.cyclePushSize += size; + } + } + + /** + * Reset the cycle start tick to NOW + */ + public void resetStartTick() { + if (this.enabled) { + this.cycleStartTick = EnvironmentEdgeManager.currentTime(); + } + } + + /** + * Set bulk load Bandwidth throttling + * @param newBandwidth set bandwidth size + */ + + public void setBandwidth(long newBandwidth) { + LOG.info("Bandwidth change {}Byte/sec to {}Byte/sec", this.bandwidth, newBandwidth); + this.bandwidth = newBandwidth; + this.enabled = this.bandwidth > 0; + } + + /** + * Get bulk load Bandwidth throttling + * @return get bandwidth size + */ + public long getBandwidth() { + return bandwidth; + } + + private void tryThrottle(int batchSize) { + checkBulkLoadBandwidthChangeAndResetThrottler(); + if (isEnabled()) { + limitNextBytes(batchSize); + } + } + + public void checkBulkLoadBandwidthChangeAndResetThrottler() { + long Bandwidth = getCurrentBandwidth(); + if (Bandwidth != currentBulkLoadBandwidth) { + LOG.info("Bulk load node bandwidth throttling changed, {}Byte/sec to {}Byte/sec", + currentBulkLoadBandwidth, Bandwidth); + currentBulkLoadBandwidth = Bandwidth; + setBandwidth(currentBulkLoadBandwidth); + } + } + + private long getCurrentBandwidth() { + long tableBandwidth = conf.getLong(SecureBulkLoadManager.HBASE_BULKLOAD_NODE_BANDWIDTH, + SecureBulkLoadManager.DEFAULT_HBASE_BULKLOAD_NODE_BANDWIDTH); + return tableBandwidth >= 0 ? tableBandwidth : 0; + } + + public boolean copy(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, boolean deleteSource, + Configuration conf) throws IOException { + return copy(srcFS, srcFS.getFileStatus(src), dstFS, dst, deleteSource, true, conf); + } + + /** + * Copy files between FileSystems. + */ + private boolean copy(FileSystem srcFS, FileStatus srcStatus, FileSystem dstFS, Path dst, + boolean deleteSource, boolean overwrite, Configuration conf) throws IOException { + Path src = srcStatus.getPath(); + dst = checkDest(src.getName(), dstFS, dst, overwrite); + if (srcStatus.isDirectory()) { + checkDependencies(srcFS, src, dstFS, dst); + if (!dstFS.mkdirs(dst)) { + return false; + } + FileStatus[] contents = srcFS.listStatus(src); + for (int i = 0; i < contents.length; i++) { + copy(srcFS, contents[i], dstFS, new Path(dst, contents[i].getPath().getName()), + deleteSource, overwrite, conf); + } + } else { + InputStream in = null; + OutputStream out = null; + try { + in = srcFS.open(src); + out = dstFS.create(dst, overwrite); + copyBytes(in, out, conf, true); + } catch (IOException e) { + IOUtils.closeStream(out); + IOUtils.closeStream(in); + throw e; + } + } + if (deleteSource) { + return srcFS.delete(src, true); + } else { + return true; + } + + } + + private Path checkDest(String srcName, FileSystem dstFS, Path dst, boolean overwrite) + throws IOException { + if (dstFS.exists(dst)) { + FileStatus sdst = dstFS.getFileStatus(dst); + if (sdst.isDirectory()) { + if (null == srcName) { + throw new IOException("Target " + dst + " is a directory"); + } + return checkDest(null, dstFS, new Path(dst, srcName), overwrite); + } else if (!overwrite) { + throw new IOException("Target " + dst + " already exists"); + } + } + return dst; + } + + // If the destination is a subdirectory of the source, then generate exception + private void checkDependencies(FileSystem srcFS, Path src, FileSystem dstFS, Path dst) + throws IOException { + if (srcFS == dstFS) { + String srcq = src.makeQualified(srcFS).toString() + Path.SEPARATOR; + String dstq = dst.makeQualified(dstFS).toString() + Path.SEPARATOR; + if (dstq.startsWith(srcq)) { + if (srcq.length() == dstq.length()) { + throw new IOException("Cannot copy " + src + " to itself."); + } else { + throw new IOException("Cannot copy " + src + " to its subdirectory " + dst); + } + } + } + } + + private void copyBytes(InputStream in, OutputStream out, Configuration conf, boolean close) + throws IOException { + copyBytes(in, out, conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT), close); + } + + private void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) + throws IOException { + try { + copyBytes(in, out, buffSize); + if (close) { + out.close(); + out = null; + in.close(); + in = null; + } + } finally { + if (close) { + closeStream(out); + closeStream(in); + } + } + } + + private void copyBytes(InputStream in, OutputStream out, int buffSize) throws IOException { + PrintStream ps = out instanceof PrintStream ? (PrintStream) out : null; + byte[] buf = new byte[buffSize]; + int bytesRead = in.read(buf); + while (bytesRead >= 0) { + tryThrottle(bytesRead); + out.write(buf, 0, bytesRead); + if ((ps != null) && ps.checkError()) { + throw new IOException("Unable to write to output stream."); + } + bytesRead = in.read(buf); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestBulkLoadThrottler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestBulkLoadThrottler.java new file mode 100644 index 000000000000..0f5cf2b3f76a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestBulkLoadThrottler.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.throttle; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({ ReplicationTests.class, SmallTests.class }) +public class TestBulkLoadThrottler { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestBulkLoadThrottler.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestBulkLoadThrottler.class); + + /** + * unit test for throttling + */ + @Test + public void testThrottling() { + LOG.info("testBulkLoadThrottling"); + Configuration conf1 = new Configuration(); + Configuration conf2 = new Configuration(); + + // throttle bandwidth is 100 and 10 bytes/cycle respectively + BulkLoadThrottler throttler1 = new BulkLoadThrottler(10, conf1); + BulkLoadThrottler throttler2 = new BulkLoadThrottler(5, conf2); + + long throttler1StartTime = EnvironmentEdgeManager.currentTime(); + for (int i = 1; i < 21; i++) { + throttler1.limitNextBytes(1); + } + long throttler1EndTime = EnvironmentEdgeManager.currentTime(); + long throttler1Interval = throttler1EndTime - throttler1StartTime; + // twice throttle + assertTrue(throttler1Interval >= 1900 && throttler1Interval < 2100); + + long throttler2StartTime = EnvironmentEdgeManager.currentTime(); + for (int i = 1; i < 21; i++) { + throttler2.limitNextBytes(1); + } + long throttler2EndTime = EnvironmentEdgeManager.currentTime(); + long throttler2Interval = throttler2EndTime - throttler2StartTime; + // Four times throttle + assertTrue(throttler2Interval >= 3900 && throttler2Interval < 4100); + + assertEquals(10, throttler1.getBandwidth()); + // update_all_config or update_config + conf1.set("hbase.regionserver.bulkload.node.bandwidth", "15"); + throttler1.checkBulkLoadBandwidthChangeAndResetThrottler(); + assertEquals(15, throttler1.getBandwidth()); + + long throttler3StartTime = EnvironmentEdgeManager.currentTime(); + for (int i = 1; i < 21; i++) { + throttler1.limitNextBytes(1); + } + long throttler3EndTime = EnvironmentEdgeManager.currentTime(); + long throttler3Interval = throttler3EndTime - throttler3StartTime; + // once throttle + assertTrue(throttler3Interval >= 900 && throttler3Interval < 1100); + + assertEquals(5, throttler2.getBandwidth()); + // update_all_config or update_config + conf2.set("hbase.regionserver.bulkload.node.bandwidth", "10"); + throttler2.checkBulkLoadBandwidthChangeAndResetThrottler(); + assertEquals(10, throttler2.getBandwidth()); + + long throttler4StartTime = EnvironmentEdgeManager.currentTime(); + for (int i = 1; i < 21; i++) { + throttler2.limitNextBytes(1); + } + long throttler4EndTime = EnvironmentEdgeManager.currentTime(); + long throttler4Interval = throttler4EndTime - throttler4StartTime; + // twice throttle + assertTrue(throttler4Interval >= 1900 && throttler4Interval < 2100); + } +}