Skip to content

Commit

Permalink
HBASE-26371 Prioritize meta region move over other region moves in re…
Browse files Browse the repository at this point in the history
…gion_mover (#3769) (#3767)

Signed-off-by: Andrew Purtell <[email protected]>
  • Loading branch information
virajjasani committed Oct 20, 2021
1 parent 5589744 commit 75f47f7
Showing 1 changed file with 69 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -244,21 +246,51 @@ public RegionMover build() throws IOException {
*/
public boolean load() throws ExecutionException, InterruptedException, TimeoutException {
ExecutorService loadPool = Executors.newFixedThreadPool(1);
Future<Boolean> loadTask = loadPool.submit(() -> {
Future<Boolean> loadTask = loadPool.submit(getMetaRegionMovePlan());
boolean isMetaMoved = waitTaskToFinish(loadPool, loadTask, "loading");
if (!isMetaMoved) {
return false;
}
loadPool = Executors.newFixedThreadPool(1);
loadTask = loadPool.submit(getNonMetaRegionsMovePlan());
return waitTaskToFinish(loadPool, loadTask, "loading");
}

private Callable<Boolean> getMetaRegionMovePlan() {
return getRegionsMovePlan(true);
}

private Callable<Boolean> getNonMetaRegionsMovePlan() {
return getRegionsMovePlan(false);
}

private Callable<Boolean> getRegionsMovePlan(boolean moveMetaRegion) {
return () -> {
try {
List<RegionInfo> regionsToMove = readRegionsFromFile(filename);
if (regionsToMove.isEmpty()) {
LOG.info("No regions to load.Exiting");
return true;
}
loadRegions(regionsToMove);
Optional<RegionInfo> metaRegion = getMetaRegionInfoIfToBeMoved(regionsToMove);
if (moveMetaRegion) {
if (metaRegion.isPresent()) {
loadRegions(Collections.singletonList(metaRegion.get()));
}
} else {
metaRegion.ifPresent(regionsToMove::remove);
loadRegions(regionsToMove);
}
} catch (Exception e) {
LOG.error("Error while loading regions to " + hostname, e);
return false;
}
return true;
});
return waitTaskToFinish(loadPool, loadTask, "loading");
};
}

private Optional<RegionInfo> getMetaRegionInfoIfToBeMoved(List<RegionInfo> regionsToMove) {
return regionsToMove.stream().filter(RegionInfo::isMetaRegion).findFirst();
}

private void loadRegions(List<RegionInfo> regionsToMove)
Expand Down Expand Up @@ -369,30 +401,43 @@ private void unloadRegions(ServerName server, List<ServerName> regionServers,
}
LOG.info("Moving {} regions from {} to {} servers using {} threads .Ack Mode: {}",
regionsToMove.size(), this.hostname, regionServers.size(), this.maxthreads, ack);
final ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
List<Future<Boolean>> taskList = new ArrayList<>();
int serverIndex = 0;
for (RegionInfo regionToMove : regionsToMove) {
if (ack) {
Future<Boolean> task = moveRegionsPool.submit(
new MoveWithAck(conn, regionToMove, server, regionServers.get(serverIndex),
movedRegions));
taskList.add(task);
} else {
Future<Boolean> task = moveRegionsPool.submit(
new MoveWithoutAck(admin, regionToMove, server, regionServers.get(serverIndex),
movedRegions));
taskList.add(task);
}
serverIndex = (serverIndex + 1) % regionServers.size();

Optional<RegionInfo> metaRegion = getMetaRegionInfoIfToBeMoved(regionsToMove);
if (metaRegion.isPresent()) {
RegionInfo meta = metaRegion.get();
submitRegionMovesWhileUnloading(server, regionServers, movedRegions,
Collections.singletonList(meta));
regionsToMove.remove(meta);
}
moveRegionsPool.shutdown();
long timeoutInSeconds = regionsToMove.size() * admin.getConfiguration()
.getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds);
submitRegionMovesWhileUnloading(server, regionServers, movedRegions, regionsToMove);
}
}

private void submitRegionMovesWhileUnloading(ServerName server, List<ServerName> regionServers,
List<RegionInfo> movedRegions, List<RegionInfo> regionsToMove) throws Exception {
final ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
List<Future<Boolean>> taskList = new ArrayList<>();
int serverIndex = 0;
for (RegionInfo regionToMove : regionsToMove) {
if (ack) {
Future<Boolean> task = moveRegionsPool.submit(
new MoveWithAck(conn, regionToMove, server, regionServers.get(serverIndex),
movedRegions));
taskList.add(task);
} else {
Future<Boolean> task = moveRegionsPool.submit(
new MoveWithoutAck(admin, regionToMove, server, regionServers.get(serverIndex),
movedRegions));
taskList.add(task);
}
serverIndex = (serverIndex + 1) % regionServers.size();
}
moveRegionsPool.shutdown();
long timeoutInSeconds = regionsToMove.size() * admin.getConfiguration()
.getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds);
}

private boolean waitTaskToFinish(ExecutorService pool, Future<Boolean> task, String operation)
throws TimeoutException, InterruptedException, ExecutionException {
pool.shutdown();
Expand Down

0 comments on commit 75f47f7

Please sign in to comment.