Skip to content

Commit

Permalink
Fix two bug about single tasks and executor
Browse files Browse the repository at this point in the history
1. When a single insert task times out, it may still cause statistical errors.
2. TashManager.shutdown() ignored singleExecutor

Change-Id: I9a468241f369f4c7e73a95ba9a5772dac90e0ce1
  • Loading branch information
Linary committed Oct 18, 2018
1 parent ca5df87 commit 4afccad
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 68 deletions.
61 changes: 36 additions & 25 deletions src/main/java/com/baidu/hugegraph/loader/HugeGraphLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import com.baidu.hugegraph.loader.exception.LoadException;
import com.baidu.hugegraph.loader.exception.ParseException;
import com.baidu.hugegraph.loader.executor.GroovyExecutor;
import com.baidu.hugegraph.loader.executor.HugeClients;
import com.baidu.hugegraph.loader.executor.LoadLogger;
import com.baidu.hugegraph.loader.executor.LoadOptions;
import com.baidu.hugegraph.loader.executor.LoadSummary;
Expand All @@ -46,6 +45,7 @@
import com.baidu.hugegraph.loader.source.GraphSource;
import com.baidu.hugegraph.loader.source.VertexSource;
import com.baidu.hugegraph.loader.task.TaskManager;
import com.baidu.hugegraph.loader.util.HugeClientWrapper;
import com.baidu.hugegraph.structure.graph.Edge;
import com.baidu.hugegraph.structure.graph.Vertex;
import com.baidu.hugegraph.util.E;
Expand Down Expand Up @@ -100,43 +100,29 @@ private void parseAndCheckOptions(String[] args) {
}

private LoadSummary load() {
LoadOptions options = LoadOptions.instance();
// Create schema
this.createSchema();

LoadSummary summary = new LoadSummary();
// Prepare to load vertices
Instant begTime = Instant.now();
// Prepare to load vertices ...
System.out.print("Vertices has been imported: 0\b\b");
// Load vertices
this.loadVertices();
Instant endTime = Instant.now();
Duration duration = Duration.between(begTime, endTime);
summary.parseFailureVertices(this.parseFailureNum);
summary.insertFailureVertices(this.taskManager.failureNum());
summary.insertSuccessVertices(this.taskManager.successNum());
summary.vertexLoadTime(duration);
this.loadVertices(summary);
System.out.println(" " + summary.insertSuccessVertices());
// Reset counters
this.resetCounters();

// Prepare to load edges ...
begTime = Instant.now();
System.out.print("Edges has been imported: 0\b\b");
// Load edges
this.loadEdges();
endTime = Instant.now();
duration = Duration.between(begTime, endTime);
summary.parseFailureEdges(this.parseFailureNum);
summary.insertFailureEdges(this.taskManager.failureNum());
summary.insertSuccessEdges(this.taskManager.successNum());
summary.edgeLoadTime(duration);
this.loadEdges(summary);
System.out.println(" " + summary.insertSuccessEdges());
// Reset counters
this.resetCounters();

LoadOptions options = LoadOptions.instance();
// Shutdown task manager
this.taskManager.shutdown(options.shutdownTimeout);

return summary;
}

Expand All @@ -145,10 +131,14 @@ private void resetCounters() {
this.parseFailureNum = 0L;
}

private void shutdown(int seconds) {
this.taskManager.shutdown(seconds);
}

private void createSchema() {
LoadOptions options = LoadOptions.instance();
File schemaFile = FileUtils.getFile(options.schema);
HugeClient client = HugeClients.get(options);
HugeClient client = HugeClientWrapper.get(options);
GroovyExecutor groovyExecutor = new GroovyExecutor();
groovyExecutor.bind("schema", client.schema());
String script;
Expand All @@ -161,7 +151,10 @@ private void createSchema() {
groovyExecutor.execute(script, client);
}

private void loadVertices() {
private void loadVertices(LoadSummary summary) {
Instant begTime = Instant.now();

// Execute loading tasks
LoadOptions options = LoadOptions.instance();
List<VertexSource> vertexSources = this.graphSource.vertexSources();
for (VertexSource source : vertexSources) {
Expand All @@ -174,8 +167,16 @@ private void loadVertices() {
LOG.warn("Failed to close parser for vertex source {}", source);
}
}

// Waiting async worker threads finish
this.taskManager.waitFinished(options.timeout);
this.taskManager.waitFinished();

Instant endTime = Instant.now();
Duration duration = Duration.between(begTime, endTime);
summary.parseFailureVertices(this.parseFailureNum);
summary.insertFailureVertices(this.taskManager.failureNum());
summary.insertSuccessVertices(this.taskManager.successNum());
summary.vertexLoadTime(duration);
}

private void loadVertex(VertexParser parser) {
Expand Down Expand Up @@ -207,7 +208,9 @@ private void loadVertex(VertexParser parser) {
}
}

private void loadEdges() {
private void loadEdges(LoadSummary summary) {
Instant begTime = Instant.now();

LoadOptions options = LoadOptions.instance();
List<EdgeSource> edgeSources = this.graphSource.edgeSources();
for (EdgeSource source : edgeSources) {
Expand All @@ -220,8 +223,16 @@ private void loadEdges() {
LOG.warn("Failed to close parser for edge source {}", source);
}
}

// Waiting async worker threads finish
this.taskManager.waitFinished(options.timeout);
this.taskManager.waitFinished();

Instant endTime = Instant.now();
Duration duration = Duration.between(begTime, endTime);
summary.parseFailureEdges(this.parseFailureNum);
summary.insertFailureEdges(this.taskManager.failureNum());
summary.insertSuccessEdges(this.taskManager.successNum());
summary.edgeLoadTime(duration);
}

private void loadEdge(EdgeParser parser) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ private LoadOptions() {}

@Parameter(names = {"--timeout"}, arity = 1,
validateWith = {PositiveValidator.class},
description = "The timeout of inserting task in seconds")
public int timeout = 100;
description = "The timeout of HugeClient request")
public int timeout = 60;

@Parameter(names = {"--retry-times"}, arity = 1,
validateWith = {PositiveValidator.class},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

import com.baidu.hugegraph.driver.HugeClient;
import com.baidu.hugegraph.loader.exception.ParseException;
import com.baidu.hugegraph.loader.executor.HugeClients;
import com.baidu.hugegraph.loader.util.HugeClientWrapper;
import com.baidu.hugegraph.loader.executor.LoadOptions;
import com.baidu.hugegraph.loader.reader.InputReader;
import com.baidu.hugegraph.loader.source.ElementSource;
Expand Down Expand Up @@ -57,7 +57,7 @@ public abstract class ElementParser<GE extends GraphElement>

ElementParser(InputReader reader) {
this.reader = reader;
this.client = HugeClients.get(LoadOptions.instance());
this.client = HugeClientWrapper.get(LoadOptions.instance());
this.schemas = HashBasedTable.create();
this.reader.init();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import com.baidu.hugegraph.loader.executor.LoadOptions;
import com.baidu.hugegraph.structure.graph.Edge;

public class InsertEdgeTask extends InsertTask<Edge> {
public class EdgeInsertTask extends InsertTask<Edge> {

InsertEdgeTask(List<Edge> batch) {
EdgeInsertTask(List<Edge> batch) {
super(batch);
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/baidu/hugegraph/loader/task/InsertTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import com.baidu.hugegraph.driver.HugeClient;
import com.baidu.hugegraph.exception.ServerException;
import com.baidu.hugegraph.loader.executor.HugeClients;
import com.baidu.hugegraph.loader.util.HugeClientWrapper;
import com.baidu.hugegraph.loader.executor.LoadOptions;
import com.baidu.hugegraph.rest.ClientException;
import com.baidu.hugegraph.structure.GraphElement;
Expand All @@ -40,7 +40,7 @@ public abstract class InsertTask<E extends GraphElement>

public InsertTask(List<E> batch) {
this.batch = batch;
this.client = HugeClients.get(LoadOptions.instance());
this.client = HugeClientWrapper.get(LoadOptions.instance());
}

public List<E> batch() {
Expand Down
69 changes: 46 additions & 23 deletions src/main/java/com/baidu/hugegraph/loader/task/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@
import com.baidu.hugegraph.driver.GraphManager;
import com.baidu.hugegraph.loader.exception.InsertException;
import com.baidu.hugegraph.loader.exception.LoadException;
import com.baidu.hugegraph.loader.executor.HugeClients;
import com.baidu.hugegraph.loader.executor.LoadLogger;
import com.baidu.hugegraph.loader.executor.LoadOptions;
import com.baidu.hugegraph.loader.util.HugeClientWrapper;
import com.baidu.hugegraph.loader.util.LoaderUtil;
import com.baidu.hugegraph.structure.graph.Edge;
import com.baidu.hugegraph.structure.graph.Vertex;
Expand Down Expand Up @@ -73,7 +73,7 @@ public TaskManager(LoadOptions options) {
this.available = new Semaphore(this.futureNum);
ExecutorService pool = Executors.newFixedThreadPool(this.futureNum);
this.batchService = MoreExecutors.listeningDecorator(pool);
this.singleExecutor = Executors.newFixedThreadPool(1);
this.singleExecutor = Executors.newFixedThreadPool(this.futureNum);
this.singleService = new ExecutorCompletionService<>(singleExecutor);
this.singleTasks = new LongAdder();
this.successNum = new LongAdder();
Expand All @@ -88,14 +88,17 @@ public long failureNum() {
return this.failureNum.longValue();
}

public boolean waitFinished(int timeout) {
public boolean waitFinished() {
try {
// Wait batch task finished
this.available.acquire(this.futureNum);
this.waitBatchTaskFinished();
// Wait single task finished
this.tryConsume(timeout);
this.waitSingleTaskFinished();

if (this.singleTasks.longValue() != 0L) {
return false;
throw new IllegalStateException(
"Some unexpected execption occured when " +
"waiting single tasks finished");
}
} catch (InterruptedException e) {
return false;
Expand All @@ -105,37 +108,60 @@ public boolean waitFinished(int timeout) {
return true;
}

private void waitBatchTaskFinished() throws InterruptedException {
this.available.acquire(this.futureNum);
}

private void waitSingleTaskFinished() throws InterruptedException {
long total = this.singleTasks.longValue();
for (long i = 0; i < total; i++) {
this.singleService.take();
this.singleTasks.decrement();
}
}

public void cleanup() {
this.successNum.reset();
this.failureNum.reset();
}

public void shutdown(int seconds) {
LOG.debug("Attempt to shutdown batch tasks executor");
try {
LOG.debug("Attempt to shutdown executor.");
this.batchService.shutdown();
this.batchService.awaitTermination(seconds, TimeUnit.SECONDS);
this.singleExecutor.shutdown();
this.singleExecutor.awaitTermination(seconds, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.error("Tasks is interrupted.");
LOG.error("The batch tasks are interrupted");
} finally {
if (!this.batchService.isTerminated()) {
LOG.error("Cancel unfinished tasks.");
LOG.error("Cancel unfinished batch tasks");
}
this.batchService.shutdownNow();
LOG.debug("Shutdown is completed.");
}

LOG.debug("Attempt to shutdown single tasks executor");
try {
this.singleExecutor.shutdown();
this.singleExecutor.awaitTermination(seconds, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.error("The single task are interrupted");
} finally {
if (!this.singleExecutor.isTerminated()) {
LOG.error("Cancel unfinished single tasks");
}
this.singleExecutor.shutdownNow();
}
}

public void submitVertexBatch(List<Vertex> batch) {
this.ensurePoolAvailable();

try {
this.available.acquire();
} catch (InterruptedException ignored) {
}

InsertVertexTask task = new InsertVertexTask(batch);
InsertTask<Vertex> task = new VertexInsertTask(batch);
ListenableFuture<Integer> future = this.batchService.submit(task);

Futures.addCallback(future, new FutureCallback<Integer>() {
Expand All @@ -157,12 +183,13 @@ public void onFailure(Throwable t) {

public void submitEdgeBatch(List<Edge> batch) {
this.ensurePoolAvailable();

try {
this.available.acquire();
} catch (InterruptedException ignored) {
}

InsertEdgeTask task = new InsertEdgeTask(batch);
InsertTask<Edge> task = new EdgeInsertTask(batch);
ListenableFuture<Integer> future = this.batchService.submit(task);

Futures.addCallback(future, new FutureCallback<Integer>() {
Expand All @@ -186,7 +213,7 @@ private void submitVerticesInSingleMode(List<Vertex> vertices) {
LoadOptions options = LoadOptions.instance();
int maxInsertErrors = options.maxInsertErrors;
int shutdownTimeout = options.shutdownTimeout;
GraphManager graph = HugeClients.get(options).graph();
GraphManager graph = HugeClientWrapper.get(options).graph();
this.singleService.submit(() -> {
for (Vertex vertex : vertices) {
try {
Expand Down Expand Up @@ -224,11 +251,11 @@ private void submitVerticesInSingleMode(List<Vertex> vertices) {
} catch (InterruptedException ignored) {}
}

private void submitEdgesInSingleMode(List<Edge> edges) {
public void submitEdgesInSingleMode(List<Edge> edges) {
LoadOptions options = LoadOptions.instance();
int maxInsertErrors = options.maxInsertErrors;
int shutdownTimeout = options.shutdownTimeout;
GraphManager graph = HugeClients.get(options).graph();
GraphManager graph = HugeClientWrapper.get(options).graph();
this.singleService.submit(() -> {
for (Edge edge : edges) {
try {
Expand Down Expand Up @@ -287,12 +314,8 @@ private void printProgress(String type, long frequency, int batchSize) {
}

private void ensurePoolAvailable() {
while (this.batchService.isShutdown()){
try {
Thread.sleep(100);
} catch (Exception ignored) {
// That's fine, just continue.
}
if (this.batchService.isShutdown()) {
throw new LoadException("Thread pool has been closed");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@

import com.baidu.hugegraph.structure.graph.Vertex;

public class InsertVertexTask extends InsertTask<Vertex> {
public class VertexInsertTask extends InsertTask<Vertex> {

InsertVertexTask(List<Vertex> batch) {
VertexInsertTask(List<Vertex> batch) {
super(batch);
}

Expand Down
Loading

0 comments on commit 4afccad

Please sign in to comment.