From 9b5ed4ddcc958219271edb36839edbdb6f29e862 Mon Sep 17 00:00:00 2001 From: liuhongtong Date: Mon, 16 Sep 2019 15:54:09 +0800 Subject: [PATCH 1/6] Optimize distributedLoad #9791 Signed-off-by: liuhongtong --- .../fs/command/DistributedLoadCommand.java | 137 +++++++++++++++--- 1 file changed, 116 insertions(+), 21 deletions(-) diff --git a/shell/src/main/java/alluxio/cli/fs/command/DistributedLoadCommand.java b/shell/src/main/java/alluxio/cli/fs/command/DistributedLoadCommand.java index 9443cd9ee99d..8c24045d0871 100644 --- a/shell/src/main/java/alluxio/cli/fs/command/DistributedLoadCommand.java +++ b/shell/src/main/java/alluxio/cli/fs/command/DistributedLoadCommand.java @@ -19,14 +19,22 @@ import alluxio.exception.AlluxioException; import alluxio.exception.status.InvalidArgumentException; import alluxio.job.load.LoadConfig; -import alluxio.util.CommonUtils; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import java.io.IOException; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import javax.annotation.concurrent.ThreadSafe; @@ -35,7 +43,28 @@ */ @ThreadSafe public final class DistributedLoadCommand extends AbstractFileSystemCommand { - private static final String REPLICATION = "replication"; + public static final Option REPLICATION = + Option.builder() + .longOpt("replication") + .required(false) + .hasArg(true) + .desc("number of replicas to have for each block of the loaded file") + .build(); + public static final Option THREAD_OPTION = + Option.builder() + .longOpt("thread") + .required(false) + .hasArg(true) + .numberOfArgs(1) + .argName("threads") + .type(Number.class) + .desc("Number of mThreads used to load files in parallel, default value is CPU cores * 2") + .build(); + + private ThreadPoolExecutor mDistributedLoadServicePool; + private CompletionService mDistributedLoadService; + private ArrayList> mFutures = new ArrayList<>(); + private int mThreads = Runtime.getRuntime().availableProcessors() * 2; /** * Constructs a new instance to load a file or directory in Alluxio space. @@ -53,12 +82,8 @@ public String getCommandName() { @Override public Options getOptions() { - return new Options().addOption(Option.builder() - .longOpt(REPLICATION) - .required(false) - .hasArg(true) - .desc("number of replicas to have for each block of the loaded file") - .build()); + return new Options().addOption(REPLICATION) + .addOption(THREAD_OPTION); } @Override @@ -71,11 +96,19 @@ public int run(CommandLine cl) throws AlluxioException, IOException { String[] args = cl.getArgs(); AlluxioURI path = new AlluxioURI(args[0]); int replication = 1; - if (cl.hasOption(REPLICATION)) { - replication = Integer.parseInt(cl.getOptionValue(REPLICATION)); + if (cl.hasOption(REPLICATION.getLongOpt())) { + replication = Integer.parseInt(cl.getOptionValue(REPLICATION.getLongOpt())); + } + if (cl.hasOption(THREAD_OPTION.getLongOpt())) { + mThreads = Integer.parseInt(cl.getOptionValue(THREAD_OPTION.getLongOpt())); } + mDistributedLoadServicePool = new ThreadPoolExecutor(mThreads, mThreads, 60, + TimeUnit.SECONDS, new LinkedBlockingQueue()); + mDistributedLoadServicePool.allowCoreThreadTimeOut(true); + mDistributedLoadService = + new ExecutorCompletionService<>(mDistributedLoadServicePool); try { - load(path, replication); + distributedLoad(path, replication); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return -1; @@ -83,6 +116,65 @@ public int run(CommandLine cl) throws AlluxioException, IOException { return 0; } + /** + * Create a new job to load a file in Alluxio space, makes it resident in memory. + * + * @param filePath The {@link AlluxioURI} path to load into Alluxio memory + * @param replication The replication of file to load into Alluxio memory + */ + private Callable newJob(AlluxioURI filePath, int replication) { + return new Callable() { + @Override + public AlluxioURI call() throws Exception { + JobGrpcClientUtils.run(new LoadConfig(filePath.getPath(), replication), 3, + mFsContext.getPathConf(filePath)); + return filePath; + } + }; + } + + /** + * Wait one job to complete. + */ + private void waitJob() { + while (true) { + Future future = null; + try { + // Take one completed job. + future = mDistributedLoadService.take(); + if (future != null) { + AlluxioURI uri = future.get(); + System.out.println(uri + " loaded"); + mFutures.remove(future); + return; + } + } catch (ExecutionException e) { + e.printStackTrace(); + mFutures.remove(future); + return; + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + /** + * Distributed loads a file or directory in Alluxio space, makes it resident in memory. + * + * @param filePath The {@link AlluxioURI} path to load into Alluxio memory + * @param replication The replication of file to load into Alluxio memory + * @throws AlluxioException when Alluxio exception occurs + * @throws IOException when non-Alluxio exception occurs + */ + private void distributedLoad(AlluxioURI filePath, int replication) + throws AlluxioException, IOException, InterruptedException { + load(filePath, replication); + // Wait remaining jobs to complete. + while (!mFutures.isEmpty()) { + waitJob(); + } + } + /** * Loads a file or directory in Alluxio space, makes it resident in memory. * @@ -91,7 +183,7 @@ public int run(CommandLine cl) throws AlluxioException, IOException { * @throws IOException when non-Alluxio exception occurs */ private void load(AlluxioURI filePath, int replication) - throws AlluxioException, IOException, InterruptedException { + throws IOException, AlluxioException { URIStatus status = mFileSystem.getStatus(filePath); if (status.isFolder()) { List statuses = mFileSystem.listStatus(filePath); @@ -100,21 +192,24 @@ private void load(AlluxioURI filePath, int replication) load(newPath, replication); } } else { - Thread thread = CommonUtils.createProgressThread(System.out); - thread.start(); - try { - JobGrpcClientUtils.run(new LoadConfig(filePath.getPath(), replication), 3, - mFsContext.getPathConf(filePath)); - } finally { - thread.interrupt(); + if (status.getInAlluxioPercentage() == 100) { + // The file has already been fully loaded into Alluxio. + System.out.println(filePath + " already in Alluxio fully"); + return; + } + if (mFutures.size() >= mThreads) { + // Wait one job to complete. + waitJob(); } + Callable call = newJob(filePath, replication); + mFutures.add(mDistributedLoadService.submit(call)); + System.out.println(filePath + " loading"); } - System.out.println(filePath + " loaded"); } @Override public String getUsage() { - return "distributedLoad [--replication ] "; + return "distributedLoad [--replication ] [--thread ] "; } @Override From 84faeb423ef777704c9157b762f3426967eca796 Mon Sep 17 00:00:00 2001 From: liuhongtong Date: Tue, 17 Sep 2019 16:14:22 +0800 Subject: [PATCH 2/6] update after first review Signed-off-by: liuhongtong --- .../fs/command/DistributedLoadCommand.java | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/shell/src/main/java/alluxio/cli/fs/command/DistributedLoadCommand.java b/shell/src/main/java/alluxio/cli/fs/command/DistributedLoadCommand.java index 8c24045d0871..616e54d67e5b 100644 --- a/shell/src/main/java/alluxio/cli/fs/command/DistributedLoadCommand.java +++ b/shell/src/main/java/alluxio/cli/fs/command/DistributedLoadCommand.java @@ -43,22 +43,25 @@ */ @ThreadSafe public final class DistributedLoadCommand extends AbstractFileSystemCommand { - public static final Option REPLICATION = + private static final Option REPLICATION_OPTION = Option.builder() .longOpt("replication") .required(false) .hasArg(true) - .desc("number of replicas to have for each block of the loaded file") + .numberOfArgs(1) + .type(Number.class) + .argName("replicas") + .desc("number of replicas to have for each block of the loaded file, default value is 1") .build(); - public static final Option THREAD_OPTION = + private static final Option THREADS_OPTION = Option.builder() .longOpt("thread") .required(false) .hasArg(true) .numberOfArgs(1) - .argName("threads") .type(Number.class) - .desc("Number of mThreads used to load files in parallel, default value is CPU cores * 2") + .argName("threads") + .desc("Number of threads used to load files in parallel, default value is CPU cores * 2") .build(); private ThreadPoolExecutor mDistributedLoadServicePool; @@ -82,8 +85,8 @@ public String getCommandName() { @Override public Options getOptions() { - return new Options().addOption(REPLICATION) - .addOption(THREAD_OPTION); + return new Options().addOption(REPLICATION_OPTION) + .addOption(THREADS_OPTION); } @Override @@ -96,11 +99,11 @@ public int run(CommandLine cl) throws AlluxioException, IOException { String[] args = cl.getArgs(); AlluxioURI path = new AlluxioURI(args[0]); int replication = 1; - if (cl.hasOption(REPLICATION.getLongOpt())) { - replication = Integer.parseInt(cl.getOptionValue(REPLICATION.getLongOpt())); + if (cl.hasOption(REPLICATION_OPTION.getLongOpt())) { + replication = Integer.parseInt(cl.getOptionValue(REPLICATION_OPTION.getLongOpt())); } - if (cl.hasOption(THREAD_OPTION.getLongOpt())) { - mThreads = Integer.parseInt(cl.getOptionValue(THREAD_OPTION.getLongOpt())); + if (cl.hasOption(THREADS_OPTION.getLongOpt())) { + mThreads = Integer.parseInt(cl.getOptionValue(THREADS_OPTION.getLongOpt())); } mDistributedLoadServicePool = new ThreadPoolExecutor(mThreads, mThreads, 60, TimeUnit.SECONDS, new LinkedBlockingQueue()); @@ -163,8 +166,6 @@ private void waitJob() { * * @param filePath The {@link AlluxioURI} path to load into Alluxio memory * @param replication The replication of file to load into Alluxio memory - * @throws AlluxioException when Alluxio exception occurs - * @throws IOException when non-Alluxio exception occurs */ private void distributedLoad(AlluxioURI filePath, int replication) throws AlluxioException, IOException, InterruptedException { @@ -194,7 +195,7 @@ private void load(AlluxioURI filePath, int replication) } else { if (status.getInAlluxioPercentage() == 100) { // The file has already been fully loaded into Alluxio. - System.out.println(filePath + " already in Alluxio fully"); + System.out.println(filePath + " is already fully loaded in Alluxio"); return; } if (mFutures.size() >= mThreads) { From e80f59479539a16cc241c69f60218bccf2573944 Mon Sep 17 00:00:00 2001 From: liuhongtong Date: Wed, 18 Sep 2019 11:11:28 +0800 Subject: [PATCH 3/6] modify error output Signed-off-by: liuhongtong --- .../fs/command/DistributedLoadCommand.java | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/shell/src/main/java/alluxio/cli/fs/command/DistributedLoadCommand.java b/shell/src/main/java/alluxio/cli/fs/command/DistributedLoadCommand.java index 616e54d67e5b..53dcacfdffe9 100644 --- a/shell/src/main/java/alluxio/cli/fs/command/DistributedLoadCommand.java +++ b/shell/src/main/java/alluxio/cli/fs/command/DistributedLoadCommand.java @@ -23,6 +23,9 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; @@ -43,6 +46,7 @@ */ @ThreadSafe public final class DistributedLoadCommand extends AbstractFileSystemCommand { + private static final Logger LOG = LoggerFactory.getLogger(DistributedLoadCommand.class); private static final Option REPLICATION_OPTION = Option.builder() .longOpt("replication") @@ -115,6 +119,10 @@ public int run(CommandLine cl) throws AlluxioException, IOException { } catch (InterruptedException e) { Thread.currentThread().interrupt(); return -1; + } catch (ExecutionException e) { + System.out.println(e.getMessage()); + LOG.error("Error running " + StringUtils.join(args, " "), e); + return -1; } return 0; } @@ -139,7 +147,7 @@ public AlluxioURI call() throws Exception { /** * Wait one job to complete. */ - private void waitJob() { + private void waitJob() throws ExecutionException, InterruptedException { while (true) { Future future = null; try { @@ -152,11 +160,9 @@ private void waitJob() { return; } } catch (ExecutionException e) { - e.printStackTrace(); - mFutures.remove(future); - return; + throw e; } catch (InterruptedException e) { - e.printStackTrace(); + throw e; } } } @@ -168,7 +174,7 @@ private void waitJob() { * @param replication The replication of file to load into Alluxio memory */ private void distributedLoad(AlluxioURI filePath, int replication) - throws AlluxioException, IOException, InterruptedException { + throws AlluxioException, IOException, InterruptedException, ExecutionException { load(filePath, replication); // Wait remaining jobs to complete. while (!mFutures.isEmpty()) { @@ -184,7 +190,7 @@ private void distributedLoad(AlluxioURI filePath, int replication) * @throws IOException when non-Alluxio exception occurs */ private void load(AlluxioURI filePath, int replication) - throws IOException, AlluxioException { + throws IOException, AlluxioException, ExecutionException, InterruptedException { URIStatus status = mFileSystem.getStatus(filePath); if (status.isFolder()) { List statuses = mFileSystem.listStatus(filePath); From 9fc9081a31fd777ff54f1120340e22b0bdf07834 Mon Sep 17 00:00:00 2001 From: liuhongtong Date: Wed, 18 Sep 2019 15:12:38 +0800 Subject: [PATCH 4/6] decrease getStatus Signed-off-by: liuhongtong --- .../fs/command/DistributedLoadCommand.java | 41 ++++++++++++------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/shell/src/main/java/alluxio/cli/fs/command/DistributedLoadCommand.java b/shell/src/main/java/alluxio/cli/fs/command/DistributedLoadCommand.java index 53dcacfdffe9..dc1fc8824fc1 100644 --- a/shell/src/main/java/alluxio/cli/fs/command/DistributedLoadCommand.java +++ b/shell/src/main/java/alluxio/cli/fs/command/DistributedLoadCommand.java @@ -167,6 +167,26 @@ private void waitJob() throws ExecutionException, InterruptedException { } } + /** + * Add one job. + */ + private void addJob(URIStatus status, int replication) + throws ExecutionException, InterruptedException { + AlluxioURI filePath = new AlluxioURI(status.getPath()); + if (status.getInAlluxioPercentage() == 100) { + // The file has already been fully loaded into Alluxio. + System.out.println(filePath + " is already fully loaded in Alluxio"); + return; + } + if (mFutures.size() >= mThreads) { + // Wait one job to complete. + waitJob(); + } + Callable call = newJob(filePath, replication); + mFutures.add(mDistributedLoadService.submit(call)); + System.out.println(filePath + " loading"); + } + /** * Distributed loads a file or directory in Alluxio space, makes it resident in memory. * @@ -195,22 +215,15 @@ private void load(AlluxioURI filePath, int replication) if (status.isFolder()) { List statuses = mFileSystem.listStatus(filePath); for (URIStatus uriStatus : statuses) { - AlluxioURI newPath = new AlluxioURI(uriStatus.getPath()); - load(newPath, replication); + if (uriStatus.isFolder()) { + AlluxioURI subPath = new AlluxioURI(uriStatus.getPath()); + load(subPath, replication); + } else { + addJob(uriStatus, replication); + } } } else { - if (status.getInAlluxioPercentage() == 100) { - // The file has already been fully loaded into Alluxio. - System.out.println(filePath + " is already fully loaded in Alluxio"); - return; - } - if (mFutures.size() >= mThreads) { - // Wait one job to complete. - waitJob(); - } - Callable call = newJob(filePath, replication); - mFutures.add(mDistributedLoadService.submit(call)); - System.out.println(filePath + " loading"); + addJob(status, replication); } } From 800669b665b4581f6282302632a071b4fc8a0a1b Mon Sep 17 00:00:00 2001 From: Bin Fan Date: Wed, 18 Sep 2019 10:37:13 -0700 Subject: [PATCH 5/6] Update DistributedLoadCommand.java --- .../fs/command/DistributedLoadCommand.java | 27 +++++++------------ 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/shell/src/main/java/alluxio/cli/fs/command/DistributedLoadCommand.java b/shell/src/main/java/alluxio/cli/fs/command/DistributedLoadCommand.java index dc1fc8824fc1..7408b19bce10 100644 --- a/shell/src/main/java/alluxio/cli/fs/command/DistributedLoadCommand.java +++ b/shell/src/main/java/alluxio/cli/fs/command/DistributedLoadCommand.java @@ -55,7 +55,7 @@ public final class DistributedLoadCommand extends AbstractFileSystemCommand { .numberOfArgs(1) .type(Number.class) .argName("replicas") - .desc("number of replicas to have for each block of the loaded file, default value is 1") + .desc("Number of block replicas of each loaded file, default value is 1") .build(); private static final Option THREADS_OPTION = Option.builder() @@ -128,7 +128,7 @@ public int run(CommandLine cl) throws AlluxioException, IOException { } /** - * Create a new job to load a file in Alluxio space, makes it resident in memory. + * Creates a new job to load a file in Alluxio space, makes it resident in memory. * * @param filePath The {@link AlluxioURI} path to load into Alluxio memory * @param replication The replication of file to load into Alluxio memory @@ -145,26 +145,19 @@ public AlluxioURI call() throws Exception { } /** - * Wait one job to complete. + * Waits one job to complete. */ private void waitJob() throws ExecutionException, InterruptedException { while (true) { Future future = null; - try { - // Take one completed job. - future = mDistributedLoadService.take(); - if (future != null) { - AlluxioURI uri = future.get(); - System.out.println(uri + " loaded"); - mFutures.remove(future); - return; - } - } catch (ExecutionException e) { - throw e; - } catch (InterruptedException e) { - throw e; + // Take one completed job. + future = mDistributedLoadService.take(); + if (future != null) { + AlluxioURI uri = future.get(); + System.out.println(uri + " is loaded"); + mFutures.remove(future); + return; } - } } /** From cc68732499e59c782dc8f1edecfadcdcb26d5e11 Mon Sep 17 00:00:00 2001 From: liuhongtong Date: Thu, 19 Sep 2019 10:22:24 +0800 Subject: [PATCH 6/6] Update DistributedLoadCommand.java --- .../main/java/alluxio/cli/fs/command/DistributedLoadCommand.java | 1 + 1 file changed, 1 insertion(+) diff --git a/shell/src/main/java/alluxio/cli/fs/command/DistributedLoadCommand.java b/shell/src/main/java/alluxio/cli/fs/command/DistributedLoadCommand.java index 7408b19bce10..2220794a8d45 100644 --- a/shell/src/main/java/alluxio/cli/fs/command/DistributedLoadCommand.java +++ b/shell/src/main/java/alluxio/cli/fs/command/DistributedLoadCommand.java @@ -158,6 +158,7 @@ private void waitJob() throws ExecutionException, InterruptedException { mFutures.remove(future); return; } + } } /**