Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement parallelism for the distributedLoad command #9895

Merged
merged 6 commits into from
Sep 20, 2019
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 117 additions & 21 deletions shell/src/main/java/alluxio/cli/fs/command/DistributedLoadCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,22 @@
import alluxio.exception.AlluxioException;
import alluxio.exception.status.InvalidArgumentException;
import alluxio.job.load.LoadConfig;
import alluxio.util.CommonUtils;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import javax.annotation.concurrent.ThreadSafe;

Expand All @@ -35,7 +43,31 @@
*/
@ThreadSafe
public final class DistributedLoadCommand extends AbstractFileSystemCommand {
private static final String REPLICATION = "replication";
private static final Option REPLICATION_OPTION =
Option.builder()
.longOpt("replication")
.required(false)
.hasArg(true)
.numberOfArgs(1)
.type(Number.class)
.argName("replicas")
.desc("number of replicas to have for each block of the loaded file, default value is 1")
.build();
private static final Option THREADS_OPTION =
Option.builder()
.longOpt("thread")
.required(false)
.hasArg(true)
.numberOfArgs(1)
.type(Number.class)
.argName("threads")
.desc("Number of threads used to load files in parallel, default value is CPU cores * 2")
.build();

private ThreadPoolExecutor mDistributedLoadServicePool;
private CompletionService<AlluxioURI> mDistributedLoadService;
private ArrayList<Future<AlluxioURI>> mFutures = new ArrayList<>();
liuhongtong marked this conversation as resolved.
Show resolved Hide resolved
private int mThreads = Runtime.getRuntime().availableProcessors() * 2;

/**
* Constructs a new instance to load a file or directory in Alluxio space.
Expand All @@ -53,12 +85,8 @@ public String getCommandName() {

@Override
public Options getOptions() {
return new Options().addOption(Option.builder()
.longOpt(REPLICATION)
.required(false)
.hasArg(true)
.desc("number of replicas to have for each block of the loaded file")
.build());
return new Options().addOption(REPLICATION_OPTION)
.addOption(THREADS_OPTION);
}

@Override
Expand All @@ -71,18 +99,83 @@ public int run(CommandLine cl) throws AlluxioException, IOException {
String[] args = cl.getArgs();
AlluxioURI path = new AlluxioURI(args[0]);
int replication = 1;
if (cl.hasOption(REPLICATION)) {
replication = Integer.parseInt(cl.getOptionValue(REPLICATION));
if (cl.hasOption(REPLICATION_OPTION.getLongOpt())) {
replication = Integer.parseInt(cl.getOptionValue(REPLICATION_OPTION.getLongOpt()));
}
if (cl.hasOption(THREADS_OPTION.getLongOpt())) {
mThreads = Integer.parseInt(cl.getOptionValue(THREADS_OPTION.getLongOpt()));
}
mDistributedLoadServicePool = new ThreadPoolExecutor(mThreads, mThreads, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
mDistributedLoadServicePool.allowCoreThreadTimeOut(true);
mDistributedLoadService =
new ExecutorCompletionService<>(mDistributedLoadServicePool);
try {
load(path, replication);
distributedLoad(path, replication);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return -1;
}
return 0;
}

/**
* Create a new job to load a file in Alluxio space, makes it resident in memory.
*
* @param filePath The {@link AlluxioURI} path to load into Alluxio memory
* @param replication The replication of file to load into Alluxio memory
*/
private Callable<AlluxioURI> newJob(AlluxioURI filePath, int replication) {
return new Callable<AlluxioURI>() {
@Override
public AlluxioURI call() throws Exception {
JobGrpcClientUtils.run(new LoadConfig(filePath.getPath(), replication), 3,
mFsContext.getPathConf(filePath));
return filePath;
}
};
}
liuhongtong marked this conversation as resolved.
Show resolved Hide resolved

/**
* Wait one job to complete.
*/
private void waitJob() {
while (true) {
Future<AlluxioURI> future = null;
try {
// Take one completed job.
future = mDistributedLoadService.take();
if (future != null) {
AlluxioURI uri = future.get();
System.out.println(uri + " loaded");
mFutures.remove(future);
return;
}
} catch (ExecutionException e) {
e.printStackTrace();
liuhongtong marked this conversation as resolved.
Show resolved Hide resolved
mFutures.remove(future);
return;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

/**
* Distributed loads a file or directory in Alluxio space, makes it resident in memory.
*
* @param filePath The {@link AlluxioURI} path to load into Alluxio memory
* @param replication The replication of file to load into Alluxio memory
*/
private void distributedLoad(AlluxioURI filePath, int replication)
throws AlluxioException, IOException, InterruptedException {
load(filePath, replication);
// Wait remaining jobs to complete.
while (!mFutures.isEmpty()) {
waitJob();
}
}

/**
* Loads a file or directory in Alluxio space, makes it resident in memory.
*
Expand All @@ -91,7 +184,7 @@ public int run(CommandLine cl) throws AlluxioException, IOException {
* @throws IOException when non-Alluxio exception occurs
*/
private void load(AlluxioURI filePath, int replication)
throws AlluxioException, IOException, InterruptedException {
throws IOException, AlluxioException {
URIStatus status = mFileSystem.getStatus(filePath);
if (status.isFolder()) {
List<URIStatus> statuses = mFileSystem.listStatus(filePath);
Expand All @@ -100,21 +193,24 @@ private void load(AlluxioURI filePath, int replication)
load(newPath, replication);
}
} else {
Thread thread = CommonUtils.createProgressThread(System.out);
thread.start();
try {
JobGrpcClientUtils.run(new LoadConfig(filePath.getPath(), replication), 3,
mFsContext.getPathConf(filePath));
} finally {
thread.interrupt();
if (status.getInAlluxioPercentage() == 100) {
// The file has already been fully loaded into Alluxio.
System.out.println(filePath + " is already fully loaded in Alluxio");
return;
}
if (mFutures.size() >= mThreads) {
// Wait one job to complete.
waitJob();
liuhongtong marked this conversation as resolved.
Show resolved Hide resolved
}
Callable<AlluxioURI> call = newJob(filePath, replication);
mFutures.add(mDistributedLoadService.submit(call));
System.out.println(filePath + " loading");
}
System.out.println(filePath + " loaded");
}

@Override
public String getUsage() {
return "distributedLoad [--replication <num>] <path>";
return "distributedLoad [--replication <num>] [--thread <threads>] <path>";
}

@Override
Expand Down