From 797004216b1335d1a1ecd5011fe35560cee2028f Mon Sep 17 00:00:00 2001 From: zhengsicheng Date: Tue, 17 Jan 2023 00:03:39 +0800 Subject: [PATCH] HBASE-27523 Add BulkLoad bandwidth throttling --- .../hadoop/hbase/regionserver/HRegion.java | 9 +- .../hbase/regionserver/HRegionFileSystem.java | 10 +- .../hbase/regionserver/HRegionServer.java | 11 + .../hadoop/hbase/regionserver/HStore.java | 6 +- .../regionserver/SecureBulkLoadManager.java | 41 ++- .../throttle/BulkLoadThrottler.java | 333 ++++++++++++++++++ 6 files changed, 399 insertions(+), 11 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/BulkLoadThrottler.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 08987ea6b194..f22091161c0b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -149,6 +149,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.ForbidMajorCompactionChecker; import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationSink; +import org.apache.hadoop.hbase.regionserver.throttle.BulkLoadThrottler; import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector; @@ -7140,7 +7141,7 @@ private static boolean hasMultipleColumnFamilies(Collection */ public Map> bulkLoadHFiles(Collection> familyPaths, boolean assignSeqId, BulkLoadListener bulkLoadListener) throws IOException { - return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, null, true); + return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, null, true, null); } /** @@ -7185,7 +7186,8 @@ String prepareBulkLoad(byte[] family, String srcPath, boolean copyFile, String c */ public Map> bulkLoadHFiles(Collection> familyPaths, boolean assignSeqId, BulkLoadListener bulkLoadListener, boolean copyFile, - List clusterIds, boolean replicate) throws IOException { + List clusterIds, boolean replicate, + BulkLoadThrottler bulkLoadThrottler) throws IOException { long seqId = -1; Map> storeFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR); Map storeFilesSizes = new HashMap<>(); @@ -7281,7 +7283,8 @@ public Map> bulkLoadHFiles(Collection> f } Pair pair = null; if (reqTmp) { - pair = store.preBulkLoadHFile(finalPath, seqId); + pair = store.preBulkLoadHFile(finalPath, seqId, + (bulkLoadThrottler != null && bulkLoadThrottler.isEnabled()) ? bulkLoadThrottler : null); } else { Path livePath = new Path(finalPath); pair = new Pair<>(livePath, livePath); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java index 48afdc59f86f..a01a4e130c33 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; +import org.apache.hadoop.hbase.regionserver.throttle.BulkLoadThrottler; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.FSUtils; @@ -552,7 +553,8 @@ public void removeStoreFiles(String familyName, Collection storeFile * @param seqNum Bulk Load sequence number * @return The destination {@link Path} of the bulk loaded file */ - Pair bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum) + Pair bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum, + BulkLoadThrottler bulkLoadThrottler) throws IOException { // Copy the file if it's on another filesystem FileSystem srcFs = srcPath.getFileSystem(conf); @@ -567,7 +569,11 @@ Pair bulkLoadStoreFile(final String familyName, Path srcPath, long s LOG.info("Bulk-load file " + srcPath + " is on different filesystem than " + "the destination store. Copying file over to destination filesystem."); Path tmpPath = createTempName(); - FileUtil.copy(realSrcFs, srcPath, fs, tmpPath, false, conf); + if (bulkLoadThrottler != null && bulkLoadThrottler.isEnabled()) { + bulkLoadThrottler.copy(realSrcFs, srcPath, fs, tmpPath, false, conf); + } else { + FileUtil.copy(realSrcFs, srcPath, fs, tmpPath, false, conf); + } LOG.info("Copied " + srcPath + " to temporary path on destination filesystem: " + tmpPath); srcPath = tmpPath; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 5240df5c62a1..01af79bd138f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -3402,6 +3402,17 @@ public void onConfigurationChange(Configuration newConf) { LOG.info("Update region server coprocessors because the configuration has changed"); this.rsHost = new RegionServerCoprocessorHost(this, newConf); } + + long oldBandWidth = secureBulkLoadManager.getBulkLoadThrottler().getConf() + .getLong(SecureBulkLoadManager.HBASE_BULKLOAD_NODE_BANDWIDTH, + SecureBulkLoadManager.DEFAULT_HBASE_BULKLOAD_NODE_BANDWIDTH); + long newBandWidth = newConf.getLong(SecureBulkLoadManager.HBASE_BULKLOAD_NODE_BANDWIDTH, + SecureBulkLoadManager.DEFAULT_HBASE_BULKLOAD_NODE_BANDWIDTH); + if (oldBandWidth != newBandWidth) { + LOG.info("ConfigurationChange bulkload oldBandWidth is {} " + "newBandWidth is {}", + oldBandWidth, newBandWidth); + this.secureBulkLoadManager.getBulkLoadThrottler().setConf(newConf); + } } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 5a2a74a61a46..df58dbd84f13 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -85,6 +85,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; +import org.apache.hadoop.hbase.regionserver.throttle.BulkLoadThrottler; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.security.EncryptionUtil; @@ -670,9 +671,10 @@ public void assertBulkLoadHFileOk(Path srcPath) throws IOException { * HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this) * @param seqNum sequence Id associated with the HFile */ - public Pair preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException { + public Pair preBulkLoadHFile(String srcPathStr, long seqNum, + BulkLoadThrottler bulkLoadThrottler) throws IOException { Path srcPath = new Path(srcPathStr); - return getRegionFileSystem().bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum); + return getRegionFileSystem().bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum, bulkLoadThrottler); } public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java index a6a67fb91390..e8dc1b15b675 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.AsyncConnection; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.regionserver.HRegion.BulkLoadListener; +import org.apache.hadoop.hbase.regionserver.throttle.BulkLoadThrottler; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier; @@ -106,6 +107,11 @@ public class SecureBulkLoadManager { private UserProvider userProvider; private ConcurrentHashMap ugiReferenceCounter; private AsyncConnection conn; + private Long defaultBulkLoadBandwidth; + public final static String HBASE_BULKLOAD_NODE_BANDWIDTH = + "hbase.regionserver.bulkload.node.bandwidth"; + public final static Long DEFAULT_HBASE_BULKLOAD_NODE_BANDWIDTH = 0L; + private BulkLoadThrottler bulkLoadThrottler; SecureBulkLoadManager(Configuration conf, AsyncConnection conn) { this.conf = conf; @@ -130,6 +136,13 @@ public void start() throws IOException { fs.setPermission(baseStagingDir, PERM_HIDDEN); } } + defaultBulkLoadBandwidth = + conf.getLong(HBASE_BULKLOAD_NODE_BANDWIDTH, DEFAULT_HBASE_BULKLOAD_NODE_BANDWIDTH); + bulkLoadThrottler = new BulkLoadThrottler(defaultBulkLoadBandwidth, conf); + } + + public BulkLoadThrottler getBulkLoadThrottler() { + return bulkLoadThrottler; } public void stop() throws IOException { @@ -286,8 +299,10 @@ public Map> run() { // We call bulkLoadHFiles as requesting user // To enable access prior to staging return region.bulkLoadHFiles(familyPaths, true, - new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile(), clusterIds, - request.getReplicate()); + (bulkLoadThrottler != null && bulkLoadThrottler.isEnabled()) ? + new SecureBulkLoadListener(fs, bulkToken, conf, bulkLoadThrottler) : + new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile(), clusterIds, + request.getReplicate(), null); } catch (Exception e) { LOG.error("Failed to complete bulk load", e); } @@ -348,6 +363,16 @@ static class SecureBulkLoadListener implements BulkLoadListener { private FileSystem srcFs = null; private Map origPermissions = null; private Map origSources = null; + private BulkLoadThrottler bulkLoadThrottler; + + public SecureBulkLoadListener(FileSystem fs, String stagingDir, Configuration conf, + BulkLoadThrottler bulkLoadThrottler) { + this.fs = fs; + this.stagingDir = stagingDir; + this.conf = conf; + this.origPermissions = new HashMap<>(); + this.bulkLoadThrottler = bulkLoadThrottler; + } public SecureBulkLoadListener(FileSystem fs, String stagingDir, Configuration conf) { this.fs = fs; @@ -389,10 +414,18 @@ public String prepareBulkLoad(final byte[] family, final String srcPath, boolean if (!FSUtils.isSameHdfs(conf, srcFs, fs)) { LOG.debug("Bulk-load file " + srcPath + " is on different filesystem than " + "the destination filesystem. Copying file over to destination staging dir."); - FileUtil.copy(srcFs, p, fs, stageP, false, conf); + if (null != bulkLoadThrottler && bulkLoadThrottler.isEnabled()) { + bulkLoadThrottler.copy(srcFs, p, fs, stageP, false, conf); + } else { + FileUtil.copy(srcFs, p, fs, stageP, false, conf); + } } else if (copyFile) { LOG.debug("Bulk-load file " + srcPath + " is copied to destination staging dir."); - FileUtil.copy(srcFs, p, fs, stageP, false, conf); + if (null != bulkLoadThrottler && bulkLoadThrottler.isEnabled()) { + bulkLoadThrottler.copy(srcFs, p, fs, stageP, false, conf); + } else { + FileUtil.copy(srcFs, p, fs, stageP, false, conf); + } } else { LOG.debug("Moving " + p + " to " + stageP); FileStatus origFileStatus = fs.getFileStatus(p); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/BulkLoadThrottler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/BulkLoadThrottler.java new file mode 100644 index 000000000000..b6db1a909107 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/BulkLoadThrottler.java @@ -0,0 +1,333 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.throttle; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.io.IOUtils; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintStream; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; +import static org.apache.hadoop.io.IOUtils.closeStream; + +/** + * Bulkload throttling controller for RS: enabled if + * bandwidth > 0, a cycle = 100ms, by throttling we guarantee data pushed + * to peer within each cycle won't exceed 'bandwidth' bytes + */ +@InterfaceAudience.Private public class BulkLoadThrottler { + private static final Logger LOG = LoggerFactory.getLogger(BulkLoadThrottler.class); + private Configuration conf; + private boolean enabled; + private double bandwidth; + private long cyclePushSize; + private long cycleStartTick; + // a cycle = 100ms + private static final Long CYCLE = 100L; + private Long currentBulkLoadBandwidth; + + /** + * BulkLoadThrottler constructor + * If bandwidth less than 1, throttling is disabled + * + * @param bandwidth bandwidth cycle(100ms) + */ + public BulkLoadThrottler(final double bandwidth, Configuration conf) { + this.conf = conf; + this.bandwidth = bandwidth; + LOG.info("Init bulkload bandwidth is {}", bandwidth); + this.enabled = this.bandwidth > 0; + if (this.enabled) { + this.cyclePushSize = 0; + this.cycleStartTick = EnvironmentEdgeManager.currentTime(); + } + currentBulkLoadBandwidth = conf.getLong(SecureBulkLoadManager.HBASE_BULKLOAD_NODE_BANDWIDTH, + SecureBulkLoadManager.DEFAULT_HBASE_BULKLOAD_NODE_BANDWIDTH); + } + + public void setConf(Configuration newConf) { + this.conf = newConf; + } + + public Configuration getConf() { + return conf; + } + + /** + * If bulkload throttling is enabled + * + * @return true if bulkload throttling is enabled + */ + public boolean isEnabled() { + return this.enabled; + } + + /** + * Get how long the caller should sleep according to the current size and + * current cycle's total push size and start tick, return the sleep interval + * for throttling control. + * + * @param size is the size of edits to be pushed + * @return sleep interval for throttling control + */ + public long getNextSleepInterval(final int size) { + if (!this.enabled) { + return 0; + } + + long sleepTicks = 0; + long now = EnvironmentEdgeManager.currentTime(); + // 1. if cyclePushSize exceeds bandwidth, we need to sleep some + // following cycles to amortize, this case can occur when a single push + // exceeds the bandwidth + if ((double) this.cyclePushSize > bandwidth) { + double cycles = Math.ceil((double) this.cyclePushSize / bandwidth); + long shouldTillTo = this.cycleStartTick + (long) (cycles * CYCLE); + if (shouldTillTo > now) { + sleepTicks = shouldTillTo - now; + } else { + // no reset in shipEdits since no sleep, so we need to reset cycleStartTick here! + this.cycleStartTick = now; + } + this.cyclePushSize = 0; + } else { + long nextCycleTick = this.cycleStartTick + CYCLE; //a cycle is 100ms + if (now >= nextCycleTick) { + // 2. switch to next cycle if the current cycle has passed + this.cycleStartTick = now; + this.cyclePushSize = 0; + } else if (this.cyclePushSize > 0 && (double) (this.cyclePushSize + size) >= bandwidth) { + // 3. delay the push to next cycle if exceeds throttling bandwidth. + // enforcing cyclePushSize > 0 to avoid the unnecessary sleep for case + // where a cycle's first push size(currentSize) > bandwidth + sleepTicks = nextCycleTick - now; + this.cyclePushSize = 0; + } + } + return sleepTicks; + } + + /** + * Add current size to the current cycle's total push size + * + * @param size is the current size added to the current cycle's total push size + */ + public void addPushSize(final int size) { + if (this.enabled) { + this.cyclePushSize += size; + } + } + + /** + * Reset the cycle start tick to NOW + */ + public void resetStartTick() { + if (this.enabled) { + this.cycleStartTick = EnvironmentEdgeManager.currentTime(); + } + } + + /** + * Set bulkload Bandwidth throttling + * + * @param newBandwidth set bandwidth size + */ + + public void setBandwidth(double newBandwidth) { + LOG.info("Bandwidth change {} to {}", this.bandwidth, newBandwidth); + this.bandwidth = newBandwidth; + this.enabled = this.bandwidth > 0; + } + + /** + * Get bulkload Bandwidth throttling + * + * @return get bandwidth size + */ + public double getBandwidth() { + return bandwidth; + } + + public void tryThrottle(int batchSize) throws InterruptedException { + checkBulkLoadBandwidthChangeAndResetThrottler(); + if (isEnabled()) { + long sleepTicks = getNextSleepInterval(batchSize); + if (sleepTicks > 0) { + if (LOG.isTraceEnabled()) { + LOG.trace("To sleep {}ms for bulkload throttling control ", sleepTicks); + } + Thread.sleep(sleepTicks); + // reset throttler's cycle start tick when sleep for throttling occurs + resetStartTick(); + } + } + } + + private void checkBulkLoadBandwidthChangeAndResetThrottler() { + long Bandwidth = getCurrentBandwidth(); + if (Bandwidth != currentBulkLoadBandwidth) { + LOG.info("Bulkload node bandwidth throttling changed, {} to {}", currentBulkLoadBandwidth, + Bandwidth); + currentBulkLoadBandwidth = Bandwidth; + setBandwidth(currentBulkLoadBandwidth); + + } + } + + private long getCurrentBandwidth() { + long tableBandwidth = conf.getLong(SecureBulkLoadManager.HBASE_BULKLOAD_NODE_BANDWIDTH, + SecureBulkLoadManager.DEFAULT_HBASE_BULKLOAD_NODE_BANDWIDTH); + return tableBandwidth >= 0 ? tableBandwidth : 0; + } + + public boolean copy(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, boolean deleteSource, + Configuration conf) throws IOException { + return copy(srcFS, src, dstFS, dst, deleteSource, true, conf); + } + + private boolean copy(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, boolean deleteSource, + boolean overwrite, Configuration conf) throws IOException { + FileStatus fileStatus = srcFS.getFileStatus(src); + return copy(srcFS, fileStatus, dstFS, dst, deleteSource, overwrite, conf); + } + + /** + * Copy files between FileSystems. + */ + private boolean copy(FileSystem srcFS, FileStatus srcStatus, FileSystem dstFS, Path dst, + boolean deleteSource, boolean overwrite, Configuration conf) throws IOException { + Path src = srcStatus.getPath(); + dst = checkDest(src.getName(), dstFS, dst, overwrite); + if (srcStatus.isDirectory()) { + checkDependencies(srcFS, src, dstFS, dst); + if (!dstFS.mkdirs(dst)) { + return false; + } + FileStatus contents[] = srcFS.listStatus(src); + for (int i = 0; i < contents.length; i++) { + copy(srcFS, contents[i], dstFS, new Path(dst, contents[i].getPath().getName()), + deleteSource, overwrite, conf); + } + } else { + InputStream in = null; + OutputStream out = null; + try { + in = srcFS.open(src); + out = dstFS.create(dst, overwrite); + copyBytes(in, out, conf, true); + } catch (IOException e) { + IOUtils.closeStream(out); + IOUtils.closeStream(in); + throw e; + } + } + if (deleteSource) { + return srcFS.delete(src, true); + } else { + return true; + } + + } + + private Path checkDest(String srcName, FileSystem dstFS, Path dst, boolean overwrite) + throws IOException { + if (dstFS.exists(dst)) { + FileStatus sdst = dstFS.getFileStatus(dst); + if (sdst.isDirectory()) { + if (null == srcName) { + throw new IOException("Target " + dst + " is a directory"); + } + return checkDest(null, dstFS, new Path(dst, srcName), overwrite); + } else if (!overwrite) { + throw new IOException("Target " + dst + " already exists"); + } + } + return dst; + } + + // + // If the destination is a subdirectory of the source, then + // generate exception + // + private void checkDependencies(FileSystem srcFS, Path src, FileSystem dstFS, Path dst) + throws IOException { + if (srcFS == dstFS) { + String srcq = src.makeQualified(srcFS).toString() + Path.SEPARATOR; + String dstq = dst.makeQualified(dstFS).toString() + Path.SEPARATOR; + if (dstq.startsWith(srcq)) { + if (srcq.length() == dstq.length()) { + throw new IOException("Cannot copy " + src + " to itself."); + } else { + throw new IOException("Cannot copy " + src + " to its subdirectory " + dst); + } + } + } + } + + private void copyBytes(InputStream in, OutputStream out, Configuration conf, boolean close) + throws IOException { + copyBytes(in, out, conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT), close); + } + + private void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) + throws IOException { + try { + copyBytes(in, out, buffSize); + if (close) { + out.close(); + out = null; + in.close(); + in = null; + } + } finally { + if (close) { + closeStream(out); + closeStream(in); + } + } + } + + private void copyBytes(InputStream in, OutputStream out, int buffSize) throws IOException { + PrintStream ps = out instanceof PrintStream ? (PrintStream) out : null; + byte[] buf = new byte[buffSize]; + int bytesRead = in.read(buf); + while (bytesRead >= 0) { + try { + tryThrottle(bytesRead); + } catch (InterruptedException e) { + LOG.error( + "Interrupted while sleeping for throttling control, " + "currentByteSize is " + bytesRead, + e); + } + out.write(buf, 0, bytesRead); + if ((ps != null) && ps.checkError()) { + throw new IOException("Unable to write to output stream."); + } + addPushSize(bytesRead); + bytesRead = in.read(buf); + } + } +}