From 4afccad2b60d4d894bd7ca5eacf34c44aa4644cc Mon Sep 17 00:00:00 2001 From: liningrui Date: Wed, 17 Oct 2018 17:43:23 +0800 Subject: [PATCH] Fix two bug about single tasks and executor 1. When a single insert task times out, it may still cause statistical errors. 2. TashManager.shutdown() ignored singleExecutor Change-Id: I9a468241f369f4c7e73a95ba9a5772dac90e0ce1 --- .../hugegraph/loader/HugeGraphLoader.java | 61 +++++++++------- .../loader/executor/LoadOptions.java | 4 +- .../loader/parser/ElementParser.java | 4 +- ...nsertEdgeTask.java => EdgeInsertTask.java} | 4 +- .../hugegraph/loader/task/InsertTask.java | 4 +- .../hugegraph/loader/task/TaskManager.java | 69 ++++++++++++------- ...tVertexTask.java => VertexInsertTask.java} | 4 +- .../HugeClientWrapper.java} | 21 +++--- 8 files changed, 103 insertions(+), 68 deletions(-) rename src/main/java/com/baidu/hugegraph/loader/task/{InsertEdgeTask.java => EdgeInsertTask.java} (92%) rename src/main/java/com/baidu/hugegraph/loader/task/{InsertVertexTask.java => VertexInsertTask.java} (91%) rename src/main/java/com/baidu/hugegraph/loader/{executor/HugeClients.java => util/HugeClientWrapper.java} (74%) diff --git a/src/main/java/com/baidu/hugegraph/loader/HugeGraphLoader.java b/src/main/java/com/baidu/hugegraph/loader/HugeGraphLoader.java index 9c61e89b6..ce3eb0cac 100644 --- a/src/main/java/com/baidu/hugegraph/loader/HugeGraphLoader.java +++ b/src/main/java/com/baidu/hugegraph/loader/HugeGraphLoader.java @@ -34,7 +34,6 @@ import com.baidu.hugegraph.loader.exception.LoadException; import com.baidu.hugegraph.loader.exception.ParseException; import com.baidu.hugegraph.loader.executor.GroovyExecutor; -import com.baidu.hugegraph.loader.executor.HugeClients; import com.baidu.hugegraph.loader.executor.LoadLogger; import com.baidu.hugegraph.loader.executor.LoadOptions; import com.baidu.hugegraph.loader.executor.LoadSummary; @@ -46,6 +45,7 @@ import com.baidu.hugegraph.loader.source.GraphSource; import com.baidu.hugegraph.loader.source.VertexSource; import com.baidu.hugegraph.loader.task.TaskManager; +import com.baidu.hugegraph.loader.util.HugeClientWrapper; import com.baidu.hugegraph.structure.graph.Edge; import com.baidu.hugegraph.structure.graph.Vertex; import com.baidu.hugegraph.util.E; @@ -100,43 +100,29 @@ private void parseAndCheckOptions(String[] args) { } private LoadSummary load() { + LoadOptions options = LoadOptions.instance(); // Create schema this.createSchema(); LoadSummary summary = new LoadSummary(); - // Prepare to load vertices - Instant begTime = Instant.now(); + // Prepare to load vertices ... System.out.print("Vertices has been imported: 0\b\b"); // Load vertices - this.loadVertices(); - Instant endTime = Instant.now(); - Duration duration = Duration.between(begTime, endTime); - summary.parseFailureVertices(this.parseFailureNum); - summary.insertFailureVertices(this.taskManager.failureNum()); - summary.insertSuccessVertices(this.taskManager.successNum()); - summary.vertexLoadTime(duration); + this.loadVertices(summary); System.out.println(" " + summary.insertSuccessVertices()); // Reset counters this.resetCounters(); // Prepare to load edges ... - begTime = Instant.now(); System.out.print("Edges has been imported: 0\b\b"); // Load edges - this.loadEdges(); - endTime = Instant.now(); - duration = Duration.between(begTime, endTime); - summary.parseFailureEdges(this.parseFailureNum); - summary.insertFailureEdges(this.taskManager.failureNum()); - summary.insertSuccessEdges(this.taskManager.successNum()); - summary.edgeLoadTime(duration); + this.loadEdges(summary); System.out.println(" " + summary.insertSuccessEdges()); // Reset counters this.resetCounters(); - - LoadOptions options = LoadOptions.instance(); // Shutdown task manager this.taskManager.shutdown(options.shutdownTimeout); + return summary; } @@ -145,10 +131,14 @@ private void resetCounters() { this.parseFailureNum = 0L; } + private void shutdown(int seconds) { + this.taskManager.shutdown(seconds); + } + private void createSchema() { LoadOptions options = LoadOptions.instance(); File schemaFile = FileUtils.getFile(options.schema); - HugeClient client = HugeClients.get(options); + HugeClient client = HugeClientWrapper.get(options); GroovyExecutor groovyExecutor = new GroovyExecutor(); groovyExecutor.bind("schema", client.schema()); String script; @@ -161,7 +151,10 @@ private void createSchema() { groovyExecutor.execute(script, client); } - private void loadVertices() { + private void loadVertices(LoadSummary summary) { + Instant begTime = Instant.now(); + + // Execute loading tasks LoadOptions options = LoadOptions.instance(); List vertexSources = this.graphSource.vertexSources(); for (VertexSource source : vertexSources) { @@ -174,8 +167,16 @@ private void loadVertices() { LOG.warn("Failed to close parser for vertex source {}", source); } } + // Waiting async worker threads finish - this.taskManager.waitFinished(options.timeout); + this.taskManager.waitFinished(); + + Instant endTime = Instant.now(); + Duration duration = Duration.between(begTime, endTime); + summary.parseFailureVertices(this.parseFailureNum); + summary.insertFailureVertices(this.taskManager.failureNum()); + summary.insertSuccessVertices(this.taskManager.successNum()); + summary.vertexLoadTime(duration); } private void loadVertex(VertexParser parser) { @@ -207,7 +208,9 @@ private void loadVertex(VertexParser parser) { } } - private void loadEdges() { + private void loadEdges(LoadSummary summary) { + Instant begTime = Instant.now(); + LoadOptions options = LoadOptions.instance(); List edgeSources = this.graphSource.edgeSources(); for (EdgeSource source : edgeSources) { @@ -220,8 +223,16 @@ private void loadEdges() { LOG.warn("Failed to close parser for edge source {}", source); } } + // Waiting async worker threads finish - this.taskManager.waitFinished(options.timeout); + this.taskManager.waitFinished(); + + Instant endTime = Instant.now(); + Duration duration = Duration.between(begTime, endTime); + summary.parseFailureEdges(this.parseFailureNum); + summary.insertFailureEdges(this.taskManager.failureNum()); + summary.insertSuccessEdges(this.taskManager.successNum()); + summary.edgeLoadTime(duration); } private void loadEdge(EdgeParser parser) { diff --git a/src/main/java/com/baidu/hugegraph/loader/executor/LoadOptions.java b/src/main/java/com/baidu/hugegraph/loader/executor/LoadOptions.java index 7e52702f3..6684c4de7 100644 --- a/src/main/java/com/baidu/hugegraph/loader/executor/LoadOptions.java +++ b/src/main/java/com/baidu/hugegraph/loader/executor/LoadOptions.java @@ -97,8 +97,8 @@ private LoadOptions() {} @Parameter(names = {"--timeout"}, arity = 1, validateWith = {PositiveValidator.class}, - description = "The timeout of inserting task in seconds") - public int timeout = 100; + description = "The timeout of HugeClient request") + public int timeout = 60; @Parameter(names = {"--retry-times"}, arity = 1, validateWith = {PositiveValidator.class}, diff --git a/src/main/java/com/baidu/hugegraph/loader/parser/ElementParser.java b/src/main/java/com/baidu/hugegraph/loader/parser/ElementParser.java index 79066a044..fafe3c6d8 100644 --- a/src/main/java/com/baidu/hugegraph/loader/parser/ElementParser.java +++ b/src/main/java/com/baidu/hugegraph/loader/parser/ElementParser.java @@ -27,7 +27,7 @@ import com.baidu.hugegraph.driver.HugeClient; import com.baidu.hugegraph.loader.exception.ParseException; -import com.baidu.hugegraph.loader.executor.HugeClients; +import com.baidu.hugegraph.loader.util.HugeClientWrapper; import com.baidu.hugegraph.loader.executor.LoadOptions; import com.baidu.hugegraph.loader.reader.InputReader; import com.baidu.hugegraph.loader.source.ElementSource; @@ -57,7 +57,7 @@ public abstract class ElementParser ElementParser(InputReader reader) { this.reader = reader; - this.client = HugeClients.get(LoadOptions.instance()); + this.client = HugeClientWrapper.get(LoadOptions.instance()); this.schemas = HashBasedTable.create(); this.reader.init(); } diff --git a/src/main/java/com/baidu/hugegraph/loader/task/InsertEdgeTask.java b/src/main/java/com/baidu/hugegraph/loader/task/EdgeInsertTask.java similarity index 92% rename from src/main/java/com/baidu/hugegraph/loader/task/InsertEdgeTask.java rename to src/main/java/com/baidu/hugegraph/loader/task/EdgeInsertTask.java index be6131eec..47f84da2c 100644 --- a/src/main/java/com/baidu/hugegraph/loader/task/InsertEdgeTask.java +++ b/src/main/java/com/baidu/hugegraph/loader/task/EdgeInsertTask.java @@ -24,9 +24,9 @@ import com.baidu.hugegraph.loader.executor.LoadOptions; import com.baidu.hugegraph.structure.graph.Edge; -public class InsertEdgeTask extends InsertTask { +public class EdgeInsertTask extends InsertTask { - InsertEdgeTask(List batch) { + EdgeInsertTask(List batch) { super(batch); } diff --git a/src/main/java/com/baidu/hugegraph/loader/task/InsertTask.java b/src/main/java/com/baidu/hugegraph/loader/task/InsertTask.java index 1da77a1c4..f201e36ce 100644 --- a/src/main/java/com/baidu/hugegraph/loader/task/InsertTask.java +++ b/src/main/java/com/baidu/hugegraph/loader/task/InsertTask.java @@ -24,7 +24,7 @@ import com.baidu.hugegraph.driver.HugeClient; import com.baidu.hugegraph.exception.ServerException; -import com.baidu.hugegraph.loader.executor.HugeClients; +import com.baidu.hugegraph.loader.util.HugeClientWrapper; import com.baidu.hugegraph.loader.executor.LoadOptions; import com.baidu.hugegraph.rest.ClientException; import com.baidu.hugegraph.structure.GraphElement; @@ -40,7 +40,7 @@ public abstract class InsertTask public InsertTask(List batch) { this.batch = batch; - this.client = HugeClients.get(LoadOptions.instance()); + this.client = HugeClientWrapper.get(LoadOptions.instance()); } public List batch() { diff --git a/src/main/java/com/baidu/hugegraph/loader/task/TaskManager.java b/src/main/java/com/baidu/hugegraph/loader/task/TaskManager.java index 35d5bd097..76a7db2e8 100644 --- a/src/main/java/com/baidu/hugegraph/loader/task/TaskManager.java +++ b/src/main/java/com/baidu/hugegraph/loader/task/TaskManager.java @@ -34,9 +34,9 @@ import com.baidu.hugegraph.driver.GraphManager; import com.baidu.hugegraph.loader.exception.InsertException; import com.baidu.hugegraph.loader.exception.LoadException; -import com.baidu.hugegraph.loader.executor.HugeClients; import com.baidu.hugegraph.loader.executor.LoadLogger; import com.baidu.hugegraph.loader.executor.LoadOptions; +import com.baidu.hugegraph.loader.util.HugeClientWrapper; import com.baidu.hugegraph.loader.util.LoaderUtil; import com.baidu.hugegraph.structure.graph.Edge; import com.baidu.hugegraph.structure.graph.Vertex; @@ -73,7 +73,7 @@ public TaskManager(LoadOptions options) { this.available = new Semaphore(this.futureNum); ExecutorService pool = Executors.newFixedThreadPool(this.futureNum); this.batchService = MoreExecutors.listeningDecorator(pool); - this.singleExecutor = Executors.newFixedThreadPool(1); + this.singleExecutor = Executors.newFixedThreadPool(this.futureNum); this.singleService = new ExecutorCompletionService<>(singleExecutor); this.singleTasks = new LongAdder(); this.successNum = new LongAdder(); @@ -88,14 +88,17 @@ public long failureNum() { return this.failureNum.longValue(); } - public boolean waitFinished(int timeout) { + public boolean waitFinished() { try { // Wait batch task finished - this.available.acquire(this.futureNum); + this.waitBatchTaskFinished(); // Wait single task finished - this.tryConsume(timeout); + this.waitSingleTaskFinished(); + if (this.singleTasks.longValue() != 0L) { - return false; + throw new IllegalStateException( + "Some unexpected execption occured when " + + "waiting single tasks finished"); } } catch (InterruptedException e) { return false; @@ -105,37 +108,60 @@ public boolean waitFinished(int timeout) { return true; } + private void waitBatchTaskFinished() throws InterruptedException { + this.available.acquire(this.futureNum); + } + + private void waitSingleTaskFinished() throws InterruptedException { + long total = this.singleTasks.longValue(); + for (long i = 0; i < total; i++) { + this.singleService.take(); + this.singleTasks.decrement(); + } + } + public void cleanup() { this.successNum.reset(); this.failureNum.reset(); } public void shutdown(int seconds) { + LOG.debug("Attempt to shutdown batch tasks executor"); try { - LOG.debug("Attempt to shutdown executor."); this.batchService.shutdown(); this.batchService.awaitTermination(seconds, TimeUnit.SECONDS); - this.singleExecutor.shutdown(); - this.singleExecutor.awaitTermination(seconds, TimeUnit.SECONDS); } catch (InterruptedException e) { - LOG.error("Tasks is interrupted."); + LOG.error("The batch tasks are interrupted"); } finally { if (!this.batchService.isTerminated()) { - LOG.error("Cancel unfinished tasks."); + LOG.error("Cancel unfinished batch tasks"); } this.batchService.shutdownNow(); - LOG.debug("Shutdown is completed."); + } + + LOG.debug("Attempt to shutdown single tasks executor"); + try { + this.singleExecutor.shutdown(); + this.singleExecutor.awaitTermination(seconds, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.error("The single task are interrupted"); + } finally { + if (!this.singleExecutor.isTerminated()) { + LOG.error("Cancel unfinished single tasks"); + } + this.singleExecutor.shutdownNow(); } } public void submitVertexBatch(List batch) { this.ensurePoolAvailable(); + try { this.available.acquire(); } catch (InterruptedException ignored) { } - InsertVertexTask task = new InsertVertexTask(batch); + InsertTask task = new VertexInsertTask(batch); ListenableFuture future = this.batchService.submit(task); Futures.addCallback(future, new FutureCallback() { @@ -157,12 +183,13 @@ public void onFailure(Throwable t) { public void submitEdgeBatch(List batch) { this.ensurePoolAvailable(); + try { this.available.acquire(); } catch (InterruptedException ignored) { } - InsertEdgeTask task = new InsertEdgeTask(batch); + InsertTask task = new EdgeInsertTask(batch); ListenableFuture future = this.batchService.submit(task); Futures.addCallback(future, new FutureCallback() { @@ -186,7 +213,7 @@ private void submitVerticesInSingleMode(List vertices) { LoadOptions options = LoadOptions.instance(); int maxInsertErrors = options.maxInsertErrors; int shutdownTimeout = options.shutdownTimeout; - GraphManager graph = HugeClients.get(options).graph(); + GraphManager graph = HugeClientWrapper.get(options).graph(); this.singleService.submit(() -> { for (Vertex vertex : vertices) { try { @@ -224,11 +251,11 @@ private void submitVerticesInSingleMode(List vertices) { } catch (InterruptedException ignored) {} } - private void submitEdgesInSingleMode(List edges) { + public void submitEdgesInSingleMode(List edges) { LoadOptions options = LoadOptions.instance(); int maxInsertErrors = options.maxInsertErrors; int shutdownTimeout = options.shutdownTimeout; - GraphManager graph = HugeClients.get(options).graph(); + GraphManager graph = HugeClientWrapper.get(options).graph(); this.singleService.submit(() -> { for (Edge edge : edges) { try { @@ -287,12 +314,8 @@ private void printProgress(String type, long frequency, int batchSize) { } private void ensurePoolAvailable() { - while (this.batchService.isShutdown()){ - try { - Thread.sleep(100); - } catch (Exception ignored) { - // That's fine, just continue. - } + if (this.batchService.isShutdown()) { + throw new LoadException("Thread pool has been closed"); } } } diff --git a/src/main/java/com/baidu/hugegraph/loader/task/InsertVertexTask.java b/src/main/java/com/baidu/hugegraph/loader/task/VertexInsertTask.java similarity index 91% rename from src/main/java/com/baidu/hugegraph/loader/task/InsertVertexTask.java rename to src/main/java/com/baidu/hugegraph/loader/task/VertexInsertTask.java index 7b5aeff66..6cf5fb6bc 100644 --- a/src/main/java/com/baidu/hugegraph/loader/task/InsertVertexTask.java +++ b/src/main/java/com/baidu/hugegraph/loader/task/VertexInsertTask.java @@ -23,9 +23,9 @@ import com.baidu.hugegraph.structure.graph.Vertex; -public class InsertVertexTask extends InsertTask { +public class VertexInsertTask extends InsertTask { - InsertVertexTask(List batch) { + VertexInsertTask(List batch) { super(batch); } diff --git a/src/main/java/com/baidu/hugegraph/loader/executor/HugeClients.java b/src/main/java/com/baidu/hugegraph/loader/util/HugeClientWrapper.java similarity index 74% rename from src/main/java/com/baidu/hugegraph/loader/executor/HugeClients.java rename to src/main/java/com/baidu/hugegraph/loader/util/HugeClientWrapper.java index 5a2cced39..b7d6739f7 100644 --- a/src/main/java/com/baidu/hugegraph/loader/executor/HugeClients.java +++ b/src/main/java/com/baidu/hugegraph/loader/util/HugeClientWrapper.java @@ -17,26 +17,27 @@ * under the License. */ -package com.baidu.hugegraph.loader.executor; +package com.baidu.hugegraph.loader.util; import com.baidu.hugegraph.driver.HugeClient; import com.baidu.hugegraph.loader.executor.LoadOptions; -public class HugeClients { +public final class HugeClientWrapper { - // TODO: seems no need to use ThreadLocal, reuse HugeClient is ok - private static final ThreadLocal instance = new ThreadLocal<>(); + private static volatile HugeClient instance; public static HugeClient get(LoadOptions options) { - HugeClient client = instance.get(); - if (client == null) { - client = newHugeClient(options); - instance.set(client); + if (instance == null) { + synchronized(HugeClientWrapper.class) { + if (instance == null) { + instance = newHugeClient(options); + } + } } - return client; + return instance; } - private HugeClients() {} + private HugeClientWrapper() {} private static HugeClient newHugeClient(LoadOptions options) { String address = options.host + ":" + options.port;