Skip to content

Commit

Permalink
HBASE-27523 Add BulkLoad bandwidth throttling
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengsicheng committed Jan 16, 2023
1 parent 7ed2cb9 commit 7970042
Show file tree
Hide file tree
Showing 6 changed files with 399 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.ForbidMajorCompactionChecker;
import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationSink;
import org.apache.hadoop.hbase.regionserver.throttle.BulkLoadThrottler;
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector;
Expand Down Expand Up @@ -7140,7 +7141,7 @@ private static boolean hasMultipleColumnFamilies(Collection<Pair<byte[], String>
*/
public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths,
boolean assignSeqId, BulkLoadListener bulkLoadListener) throws IOException {
return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, null, true);
return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, null, true, null);
}

/**
Expand Down Expand Up @@ -7185,7 +7186,8 @@ String prepareBulkLoad(byte[] family, String srcPath, boolean copyFile, String c
*/
public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths,
boolean assignSeqId, BulkLoadListener bulkLoadListener, boolean copyFile,
List<String> clusterIds, boolean replicate) throws IOException {
List<String> clusterIds, boolean replicate,
BulkLoadThrottler bulkLoadThrottler) throws IOException {
long seqId = -1;
Map<byte[], List<Path>> storeFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR);
Map<String, Long> storeFilesSizes = new HashMap<>();
Expand Down Expand Up @@ -7281,7 +7283,8 @@ public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> f
}
Pair<Path, Path> pair = null;
if (reqTmp) {
pair = store.preBulkLoadHFile(finalPath, seqId);
pair = store.preBulkLoadHFile(finalPath, seqId,
(bulkLoadThrottler != null && bulkLoadThrottler.isEnabled()) ? bulkLoadThrottler : null);
} else {
Path livePath = new Path(finalPath);
pair = new Pair<>(livePath, livePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.BulkLoadThrottler;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.FSUtils;
Expand Down Expand Up @@ -552,7 +553,8 @@ public void removeStoreFiles(String familyName, Collection<HStoreFile> storeFile
* @param seqNum Bulk Load sequence number
* @return The destination {@link Path} of the bulk loaded file
*/
Pair<Path, Path> bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum)
Pair<Path, Path> bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum,
BulkLoadThrottler bulkLoadThrottler)
throws IOException {
// Copy the file if it's on another filesystem
FileSystem srcFs = srcPath.getFileSystem(conf);
Expand All @@ -567,7 +569,11 @@ Pair<Path, Path> bulkLoadStoreFile(final String familyName, Path srcPath, long s
LOG.info("Bulk-load file " + srcPath + " is on different filesystem than "
+ "the destination store. Copying file over to destination filesystem.");
Path tmpPath = createTempName();
FileUtil.copy(realSrcFs, srcPath, fs, tmpPath, false, conf);
if (bulkLoadThrottler != null && bulkLoadThrottler.isEnabled()) {
bulkLoadThrottler.copy(realSrcFs, srcPath, fs, tmpPath, false, conf);
} else {
FileUtil.copy(realSrcFs, srcPath, fs, tmpPath, false, conf);
}
LOG.info("Copied " + srcPath + " to temporary path on destination filesystem: " + tmpPath);
srcPath = tmpPath;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3402,6 +3402,17 @@ public void onConfigurationChange(Configuration newConf) {
LOG.info("Update region server coprocessors because the configuration has changed");
this.rsHost = new RegionServerCoprocessorHost(this, newConf);
}

long oldBandWidth = secureBulkLoadManager.getBulkLoadThrottler().getConf()
.getLong(SecureBulkLoadManager.HBASE_BULKLOAD_NODE_BANDWIDTH,
SecureBulkLoadManager.DEFAULT_HBASE_BULKLOAD_NODE_BANDWIDTH);
long newBandWidth = newConf.getLong(SecureBulkLoadManager.HBASE_BULKLOAD_NODE_BANDWIDTH,
SecureBulkLoadManager.DEFAULT_HBASE_BULKLOAD_NODE_BANDWIDTH);
if (oldBandWidth != newBandWidth) {
LOG.info("ConfigurationChange bulkload oldBandWidth is {} " + "newBandWidth is {}",
oldBandWidth, newBandWidth);
this.secureBulkLoadManager.getBulkLoadThrottler().setConf(newConf);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
import org.apache.hadoop.hbase.regionserver.throttle.BulkLoadThrottler;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.security.EncryptionUtil;
Expand Down Expand Up @@ -670,9 +671,10 @@ public void assertBulkLoadHFileOk(Path srcPath) throws IOException {
* HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this)
* @param seqNum sequence Id associated with the HFile
*/
public Pair<Path, Path> preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
public Pair<Path, Path> preBulkLoadHFile(String srcPathStr, long seqNum,
BulkLoadThrottler bulkLoadThrottler) throws IOException {
Path srcPath = new Path(srcPathStr);
return getRegionFileSystem().bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
return getRegionFileSystem().bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum, bulkLoadThrottler);
}

public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.regionserver.HRegion.BulkLoadListener;
import org.apache.hadoop.hbase.regionserver.throttle.BulkLoadThrottler;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
Expand Down Expand Up @@ -106,6 +107,11 @@ public class SecureBulkLoadManager {
private UserProvider userProvider;
private ConcurrentHashMap<UserGroupInformation, MutableInt> ugiReferenceCounter;
private AsyncConnection conn;
private Long defaultBulkLoadBandwidth;
public final static String HBASE_BULKLOAD_NODE_BANDWIDTH =
"hbase.regionserver.bulkload.node.bandwidth";
public final static Long DEFAULT_HBASE_BULKLOAD_NODE_BANDWIDTH = 0L;
private BulkLoadThrottler bulkLoadThrottler;

SecureBulkLoadManager(Configuration conf, AsyncConnection conn) {
this.conf = conf;
Expand All @@ -130,6 +136,13 @@ public void start() throws IOException {
fs.setPermission(baseStagingDir, PERM_HIDDEN);
}
}
defaultBulkLoadBandwidth =
conf.getLong(HBASE_BULKLOAD_NODE_BANDWIDTH, DEFAULT_HBASE_BULKLOAD_NODE_BANDWIDTH);
bulkLoadThrottler = new BulkLoadThrottler(defaultBulkLoadBandwidth, conf);
}

public BulkLoadThrottler getBulkLoadThrottler() {
return bulkLoadThrottler;
}

public void stop() throws IOException {
Expand Down Expand Up @@ -286,8 +299,10 @@ public Map<byte[], List<Path>> run() {
// We call bulkLoadHFiles as requesting user
// To enable access prior to staging
return region.bulkLoadHFiles(familyPaths, true,
new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile(), clusterIds,
request.getReplicate());
(bulkLoadThrottler != null && bulkLoadThrottler.isEnabled()) ?
new SecureBulkLoadListener(fs, bulkToken, conf, bulkLoadThrottler) :
new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile(), clusterIds,
request.getReplicate(), null);
} catch (Exception e) {
LOG.error("Failed to complete bulk load", e);
}
Expand Down Expand Up @@ -348,6 +363,16 @@ static class SecureBulkLoadListener implements BulkLoadListener {
private FileSystem srcFs = null;
private Map<String, FsPermission> origPermissions = null;
private Map<String, String> origSources = null;
private BulkLoadThrottler bulkLoadThrottler;

public SecureBulkLoadListener(FileSystem fs, String stagingDir, Configuration conf,
BulkLoadThrottler bulkLoadThrottler) {
this.fs = fs;
this.stagingDir = stagingDir;
this.conf = conf;
this.origPermissions = new HashMap<>();
this.bulkLoadThrottler = bulkLoadThrottler;
}

public SecureBulkLoadListener(FileSystem fs, String stagingDir, Configuration conf) {
this.fs = fs;
Expand Down Expand Up @@ -389,10 +414,18 @@ public String prepareBulkLoad(final byte[] family, final String srcPath, boolean
if (!FSUtils.isSameHdfs(conf, srcFs, fs)) {
LOG.debug("Bulk-load file " + srcPath + " is on different filesystem than "
+ "the destination filesystem. Copying file over to destination staging dir.");
FileUtil.copy(srcFs, p, fs, stageP, false, conf);
if (null != bulkLoadThrottler && bulkLoadThrottler.isEnabled()) {
bulkLoadThrottler.copy(srcFs, p, fs, stageP, false, conf);
} else {
FileUtil.copy(srcFs, p, fs, stageP, false, conf);
}
} else if (copyFile) {
LOG.debug("Bulk-load file " + srcPath + " is copied to destination staging dir.");
FileUtil.copy(srcFs, p, fs, stageP, false, conf);
if (null != bulkLoadThrottler && bulkLoadThrottler.isEnabled()) {
bulkLoadThrottler.copy(srcFs, p, fs, stageP, false, conf);
} else {
FileUtil.copy(srcFs, p, fs, stageP, false, conf);
}
} else {
LOG.debug("Moving " + p + " to " + stageP);
FileStatus origFileStatus = fs.getFileStatus(p);
Expand Down
Loading

0 comments on commit 7970042

Please sign in to comment.