Skip to content

Commit

Permalink
Implement parallelism for the distributedLoad command
Browse files Browse the repository at this point in the history
distributedLoad traverses the specified path, distributes a load job of
a file to job master and waits the job completed one by one, that is a
serial process. So the performance may be not acceptable.
Now distributedLoad traverses the specified path, distributes a batch
load job of files to job master and distributes a new job if one job
completed, that is a intercurrent process.

Signed-off-by: liuhongtong <[email protected]>

Fix Alluxio#9791

pr-link: Alluxio#9895
change-id: cid-f7ece37fda7dd3bd2a6c38783b77edc410e22fab
  • Loading branch information
liuhongtong authored and apc999 committed Oct 2, 2019
1 parent 59b9969 commit 5ab45e3
Showing 1 changed file with 133 additions and 24 deletions.
157 changes: 133 additions & 24 deletions shell/src/main/java/alluxio/cli/fs/command/DistributedLoadCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,25 @@
import alluxio.exception.AlluxioException;
import alluxio.exception.status.InvalidArgumentException;
import alluxio.job.load.LoadConfig;
import alluxio.util.CommonUtils;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import javax.annotation.concurrent.ThreadSafe;

Expand All @@ -35,7 +46,32 @@
*/
@ThreadSafe
public final class DistributedLoadCommand extends AbstractFileSystemCommand {
private static final String REPLICATION = "replication";
private static final Logger LOG = LoggerFactory.getLogger(DistributedLoadCommand.class);
private static final Option REPLICATION_OPTION =
Option.builder()
.longOpt("replication")
.required(false)
.hasArg(true)
.numberOfArgs(1)
.type(Number.class)
.argName("replicas")
.desc("Number of block replicas of each loaded file, default value is 1")
.build();
private static final Option THREADS_OPTION =
Option.builder()
.longOpt("thread")
.required(false)
.hasArg(true)
.numberOfArgs(1)
.type(Number.class)
.argName("threads")
.desc("Number of threads used to load files in parallel, default value is CPU cores * 2")
.build();

private ThreadPoolExecutor mDistributedLoadServicePool;
private CompletionService<AlluxioURI> mDistributedLoadService;
private ArrayList<Future<AlluxioURI>> mFutures = new ArrayList<>();
private int mThreads = Runtime.getRuntime().availableProcessors() * 2;

/**
* Constructs a new instance to load a file or directory in Alluxio space.
Expand All @@ -53,12 +89,8 @@ public String getCommandName() {

@Override
public Options getOptions() {
return new Options().addOption(Option.builder()
.longOpt(REPLICATION)
.required(false)
.hasArg(true)
.desc("number of replicas to have for each block of the loaded file")
.build());
return new Options().addOption(REPLICATION_OPTION)
.addOption(THREADS_OPTION);
}

@Override
Expand All @@ -71,18 +103,99 @@ public int run(CommandLine cl) throws AlluxioException, IOException {
String[] args = cl.getArgs();
AlluxioURI path = new AlluxioURI(args[0]);
int replication = 1;
if (cl.hasOption(REPLICATION)) {
replication = Integer.parseInt(cl.getOptionValue(REPLICATION));
if (cl.hasOption(REPLICATION_OPTION.getLongOpt())) {
replication = Integer.parseInt(cl.getOptionValue(REPLICATION_OPTION.getLongOpt()));
}
if (cl.hasOption(THREADS_OPTION.getLongOpt())) {
mThreads = Integer.parseInt(cl.getOptionValue(THREADS_OPTION.getLongOpt()));
}
mDistributedLoadServicePool = new ThreadPoolExecutor(mThreads, mThreads, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
mDistributedLoadServicePool.allowCoreThreadTimeOut(true);
mDistributedLoadService =
new ExecutorCompletionService<>(mDistributedLoadServicePool);
try {
load(path, replication);
distributedLoad(path, replication);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return -1;
} catch (ExecutionException e) {
System.out.println(e.getMessage());
LOG.error("Error running " + StringUtils.join(args, " "), e);
return -1;
}
return 0;
}

/**
* Creates a new job to load a file in Alluxio space, makes it resident in memory.
*
* @param filePath The {@link AlluxioURI} path to load into Alluxio memory
* @param replication The replication of file to load into Alluxio memory
*/
private Callable<AlluxioURI> newJob(AlluxioURI filePath, int replication) {
return new Callable<AlluxioURI>() {
@Override
public AlluxioURI call() throws Exception {
JobGrpcClientUtils.run(new LoadConfig(filePath.getPath(), replication), 3,
mFsContext.getPathConf(filePath));
return filePath;
}
};
}

/**
* Waits one job to complete.
*/
private void waitJob() throws ExecutionException, InterruptedException {
while (true) {
Future<AlluxioURI> future = null;
// Take one completed job.
future = mDistributedLoadService.take();
if (future != null) {
AlluxioURI uri = future.get();
System.out.println(uri + " is loaded");
mFutures.remove(future);
return;
}
}
}

/**
* Add one job.
*/
private void addJob(URIStatus status, int replication)
throws ExecutionException, InterruptedException {
AlluxioURI filePath = new AlluxioURI(status.getPath());
if (status.getInAlluxioPercentage() == 100) {
// The file has already been fully loaded into Alluxio.
System.out.println(filePath + " is already fully loaded in Alluxio");
return;
}
if (mFutures.size() >= mThreads) {
// Wait one job to complete.
waitJob();
}
Callable<AlluxioURI> call = newJob(filePath, replication);
mFutures.add(mDistributedLoadService.submit(call));
System.out.println(filePath + " loading");
}

/**
* Distributed loads a file or directory in Alluxio space, makes it resident in memory.
*
* @param filePath The {@link AlluxioURI} path to load into Alluxio memory
* @param replication The replication of file to load into Alluxio memory
*/
private void distributedLoad(AlluxioURI filePath, int replication)
throws AlluxioException, IOException, InterruptedException, ExecutionException {
load(filePath, replication);
// Wait remaining jobs to complete.
while (!mFutures.isEmpty()) {
waitJob();
}
}

/**
* Loads a file or directory in Alluxio space, makes it resident in memory.
*
Expand All @@ -91,30 +204,26 @@ public int run(CommandLine cl) throws AlluxioException, IOException {
* @throws IOException when non-Alluxio exception occurs
*/
private void load(AlluxioURI filePath, int replication)
throws AlluxioException, IOException, InterruptedException {
throws IOException, AlluxioException, ExecutionException, InterruptedException {
URIStatus status = mFileSystem.getStatus(filePath);
if (status.isFolder()) {
List<URIStatus> statuses = mFileSystem.listStatus(filePath);
for (URIStatus uriStatus : statuses) {
AlluxioURI newPath = new AlluxioURI(uriStatus.getPath());
load(newPath, replication);
if (uriStatus.isFolder()) {
AlluxioURI subPath = new AlluxioURI(uriStatus.getPath());
load(subPath, replication);
} else {
addJob(uriStatus, replication);
}
}
} else {
Thread thread = CommonUtils.createProgressThread(System.out);
thread.start();
try {
JobGrpcClientUtils.run(new LoadConfig(filePath.getPath(), replication), 3,
mFsContext.getPathConf(filePath));
} finally {
thread.interrupt();
}
addJob(status, replication);
}
System.out.println(filePath + " loaded");
}

@Override
public String getUsage() {
return "distributedLoad [--replication <num>] <path>";
return "distributedLoad [--replication <num>] [--thread <threads>] <path>";
}

@Override
Expand Down

0 comments on commit 5ab45e3

Please sign in to comment.