Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27523 Add BulkLoad bandwidth throttling #4975

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.ForbidMajorCompactionChecker;
import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationSink;
import org.apache.hadoop.hbase.regionserver.throttle.BulkLoadThrottler;
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector;
Expand Down Expand Up @@ -7140,7 +7141,7 @@ private static boolean hasMultipleColumnFamilies(Collection<Pair<byte[], String>
*/
public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths,
boolean assignSeqId, BulkLoadListener bulkLoadListener) throws IOException {
return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, null, true);
return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, null, true, null);
}

/**
Expand Down Expand Up @@ -7185,7 +7186,8 @@ String prepareBulkLoad(byte[] family, String srcPath, boolean copyFile, String c
*/
public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths,
boolean assignSeqId, BulkLoadListener bulkLoadListener, boolean copyFile,
List<String> clusterIds, boolean replicate) throws IOException {
List<String> clusterIds, boolean replicate, BulkLoadThrottler bulkLoadThrottler)
throws IOException {
long seqId = -1;
Map<byte[], List<Path>> storeFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR);
Map<String, Long> storeFilesSizes = new HashMap<>();
Expand Down Expand Up @@ -7281,7 +7283,10 @@ public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> f
}
Pair<Path, Path> pair = null;
if (reqTmp) {
pair = store.preBulkLoadHFile(finalPath, seqId);
pair = store.preBulkLoadHFile(finalPath, seqId,
(bulkLoadThrottler != null && bulkLoadThrottler.isEnabled())
? bulkLoadThrottler
: null);
} else {
Path livePath = new Path(finalPath);
pair = new Pair<>(livePath, livePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.BulkLoadThrottler;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.FSUtils;
Expand Down Expand Up @@ -552,8 +553,8 @@ public void removeStoreFiles(String familyName, Collection<HStoreFile> storeFile
* @param seqNum Bulk Load sequence number
* @return The destination {@link Path} of the bulk loaded file
*/
Pair<Path, Path> bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum)
throws IOException {
Pair<Path, Path> bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum,
BulkLoadThrottler bulkLoadThrottler) throws IOException {
// Copy the file if it's on another filesystem
FileSystem srcFs = srcPath.getFileSystem(conf);
srcPath = srcFs.resolvePath(srcPath);
Expand All @@ -567,7 +568,11 @@ Pair<Path, Path> bulkLoadStoreFile(final String familyName, Path srcPath, long s
LOG.info("Bulk-load file " + srcPath + " is on different filesystem than "
+ "the destination store. Copying file over to destination filesystem.");
Path tmpPath = createTempName();
FileUtil.copy(realSrcFs, srcPath, fs, tmpPath, false, conf);
if (bulkLoadThrottler != null && bulkLoadThrottler.isEnabled()) {
bulkLoadThrottler.copy(realSrcFs, srcPath, fs, tmpPath, false, conf);
} else {
FileUtil.copy(realSrcFs, srcPath, fs, tmpPath, false, conf);
}
LOG.info("Copied " + srcPath + " to temporary path on destination filesystem: " + tmpPath);
srcPath = tmpPath;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3402,6 +3402,17 @@ public void onConfigurationChange(Configuration newConf) {
LOG.info("Update region server coprocessors because the configuration has changed");
this.rsHost = new RegionServerCoprocessorHost(this, newConf);
}

long oldBandWidth = secureBulkLoadManager.getBulkLoadThrottler().getConf().getLong(
SecureBulkLoadManager.HBASE_BULKLOAD_NODE_BANDWIDTH,
SecureBulkLoadManager.DEFAULT_HBASE_BULKLOAD_NODE_BANDWIDTH);
long newBandWidth = newConf.getLong(SecureBulkLoadManager.HBASE_BULKLOAD_NODE_BANDWIDTH,
SecureBulkLoadManager.DEFAULT_HBASE_BULKLOAD_NODE_BANDWIDTH);
if (oldBandWidth != newBandWidth) {
LOG.info("ConfigurationChange bulkload oldBandWidth is {} " + "newBandWidth is {}",
oldBandWidth, newBandWidth);
this.secureBulkLoadManager.getBulkLoadThrottler().setConf(newConf);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
import org.apache.hadoop.hbase.regionserver.throttle.BulkLoadThrottler;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.security.EncryptionUtil;
Expand Down Expand Up @@ -670,9 +671,11 @@ public void assertBulkLoadHFileOk(Path srcPath) throws IOException {
* HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this)
* @param seqNum sequence Id associated with the HFile
*/
public Pair<Path, Path> preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
public Pair<Path, Path> preBulkLoadHFile(String srcPathStr, long seqNum,
BulkLoadThrottler bulkLoadThrottler) throws IOException {
Path srcPath = new Path(srcPathStr);
return getRegionFileSystem().bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
return getRegionFileSystem().bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum,
bulkLoadThrottler);
}

public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.regionserver.HRegion.BulkLoadListener;
import org.apache.hadoop.hbase.regionserver.throttle.BulkLoadThrottler;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
Expand Down Expand Up @@ -106,6 +107,11 @@ public class SecureBulkLoadManager {
private UserProvider userProvider;
private ConcurrentHashMap<UserGroupInformation, MutableInt> ugiReferenceCounter;
private AsyncConnection conn;
private Long defaultBulkLoadBandwidth;
public final static String HBASE_BULKLOAD_NODE_BANDWIDTH =
"hbase.regionserver.bulkload.node.bandwidth";
public final static Long DEFAULT_HBASE_BULKLOAD_NODE_BANDWIDTH = 0L;
private BulkLoadThrottler bulkLoadThrottler;

SecureBulkLoadManager(Configuration conf, AsyncConnection conn) {
this.conf = conf;
Expand All @@ -130,6 +136,13 @@ public void start() throws IOException {
fs.setPermission(baseStagingDir, PERM_HIDDEN);
}
}
defaultBulkLoadBandwidth =
conf.getLong(HBASE_BULKLOAD_NODE_BANDWIDTH, DEFAULT_HBASE_BULKLOAD_NODE_BANDWIDTH);
bulkLoadThrottler = new BulkLoadThrottler(defaultBulkLoadBandwidth, conf);
}

public BulkLoadThrottler getBulkLoadThrottler() {
return bulkLoadThrottler;
}

public void stop() throws IOException {
Expand Down Expand Up @@ -286,8 +299,10 @@ public Map<byte[], List<Path>> run() {
// We call bulkLoadHFiles as requesting user
// To enable access prior to staging
return region.bulkLoadHFiles(familyPaths, true,
new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile(), clusterIds,
request.getReplicate());
(bulkLoadThrottler != null && bulkLoadThrottler.isEnabled())
? new SecureBulkLoadListener(fs, bulkToken, conf, bulkLoadThrottler)
: new SecureBulkLoadListener(fs, bulkToken, conf),
request.getCopyFile(), clusterIds, request.getReplicate(), null);
} catch (Exception e) {
LOG.error("Failed to complete bulk load", e);
}
Expand Down Expand Up @@ -348,6 +363,16 @@ static class SecureBulkLoadListener implements BulkLoadListener {
private FileSystem srcFs = null;
private Map<String, FsPermission> origPermissions = null;
private Map<String, String> origSources = null;
private BulkLoadThrottler bulkLoadThrottler;

public SecureBulkLoadListener(FileSystem fs, String stagingDir, Configuration conf,
BulkLoadThrottler bulkLoadThrottler) {
this.fs = fs;
this.stagingDir = stagingDir;
this.conf = conf;
this.origPermissions = new HashMap<>();
this.bulkLoadThrottler = bulkLoadThrottler;
}

public SecureBulkLoadListener(FileSystem fs, String stagingDir, Configuration conf) {
this.fs = fs;
Expand Down Expand Up @@ -389,10 +414,18 @@ public String prepareBulkLoad(final byte[] family, final String srcPath, boolean
if (!FSUtils.isSameHdfs(conf, srcFs, fs)) {
LOG.debug("Bulk-load file " + srcPath + " is on different filesystem than "
+ "the destination filesystem. Copying file over to destination staging dir.");
FileUtil.copy(srcFs, p, fs, stageP, false, conf);
if (null != bulkLoadThrottler && bulkLoadThrottler.isEnabled()) {
bulkLoadThrottler.copy(srcFs, p, fs, stageP, false, conf);
} else {
FileUtil.copy(srcFs, p, fs, stageP, false, conf);
}
} else if (copyFile) {
LOG.debug("Bulk-load file " + srcPath + " is copied to destination staging dir.");
FileUtil.copy(srcFs, p, fs, stageP, false, conf);
if (null != bulkLoadThrottler && bulkLoadThrottler.isEnabled()) {
bulkLoadThrottler.copy(srcFs, p, fs, stageP, false, conf);
} else {
FileUtil.copy(srcFs, p, fs, stageP, false, conf);
}
} else {
LOG.debug("Moving " + p + " to " + stageP);
FileStatus origFileStatus = fs.getFileStatus(p);
Expand Down
Loading