Skip to content

Commit

Permalink
Fix two bug about single tasks and executor
Browse files Browse the repository at this point in the history
1. When a single insert task times out, it may still cause statistical errors.
2. TaskManager.shutdown() ignored singleExecutor

Fix #11

Change-Id: I9a468241f369f4c7e73a95ba9a5772dac90e0ce1
  • Loading branch information
Linary committed Oct 30, 2018
1 parent ca5df87 commit a2ba0c9
Show file tree
Hide file tree
Showing 12 changed files with 351 additions and 332 deletions.
194 changes: 106 additions & 88 deletions src/main/java/com/baidu/hugegraph/loader/HugeGraphLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import com.baidu.hugegraph.loader.exception.LoadException;
import com.baidu.hugegraph.loader.exception.ParseException;
import com.baidu.hugegraph.loader.executor.GroovyExecutor;
import com.baidu.hugegraph.loader.executor.HugeClients;
import com.baidu.hugegraph.loader.executor.LoadLogger;
import com.baidu.hugegraph.loader.executor.LoadOptions;
import com.baidu.hugegraph.loader.executor.LoadSummary;
Expand All @@ -46,6 +45,8 @@
import com.baidu.hugegraph.loader.source.GraphSource;
import com.baidu.hugegraph.loader.source.VertexSource;
import com.baidu.hugegraph.loader.task.TaskManager;
import com.baidu.hugegraph.loader.util.HugeClientWrapper;
import com.baidu.hugegraph.loader.util.LoaderUtil;
import com.baidu.hugegraph.structure.graph.Edge;
import com.baidu.hugegraph.structure.graph.Vertex;
import com.baidu.hugegraph.util.E;
Expand All @@ -57,143 +58,155 @@ public class HugeGraphLoader {
private static final Logger LOG = Log.logger(HugeGraphLoader.class);
private static final LoadLogger LOG_PARSE = LoadLogger.logger("parseError");

private final JCommander commander;
private final LoadOptions options;
private final TaskManager taskManager;
private final GraphSource graphSource;

private long parseFailureNum = 0L;

public static void main(String[] args) {
HugeGraphLoader loader = new HugeGraphLoader(args);
LoadSummary summary = loader.load();
summary.print();
loader.load();
}

private HugeGraphLoader(String[] args) {
LoadOptions options = LoadOptions.instance();
this.commander = JCommander.newBuilder().addObject(options).build();
this.options = new LoadOptions();
this.parseAndCheckOptions(args);
this.taskManager = new TaskManager(options);
this.graphSource = GraphSource.of(options.file);
this.taskManager = new TaskManager(this.options);
this.graphSource = GraphSource.of(this.options.file);
}

private void parseAndCheckOptions(String[] args) {
this.commander.parse(args);
JCommander commander = JCommander.newBuilder()
.addObject(this.options)
.build();
commander.parse(args);
// Print usage and exit
if (this.options.help) {
LoaderUtil.exitWithUsage(commander, 0);
}
// Check options
LoadOptions options = LoadOptions.instance();
// Check option "-f"
E.checkArgument(!StringUtils.isEmpty(options.file),
E.checkArgument(!StringUtils.isEmpty(this.options.file),
"Must specified entrance groovy file");
File scriptFile = new File(options.file);
File scriptFile = new File(this.options.file);
if (!scriptFile.canRead()) {
LOG.error("Script file must be readable: '{}'",
scriptFile.getAbsolutePath());
this.exitWithUsage(-1);
LoaderUtil.exitWithUsage(commander, -1);
}
// Check option "-g"
E.checkArgument(!StringUtils.isEmpty(options.graph),
E.checkArgument(!StringUtils.isEmpty(this.options.graph),
"Must specified a graph");
// Check option "-h"
if (!options.host.startsWith("http://")) {
options.host = "http://" + options.host;
if (!this.options.host.startsWith("http://")) {
this.options.host = "http://" + this.options.host;
}
}

private LoadSummary load() {
private void load() {
// Create schema
this.createSchema();

LoadSummary summary = new LoadSummary();
// Prepare to load vertices
Instant begTime = Instant.now();
System.out.print("Vertices has been imported: 0\b\b");
// Load vertices
this.loadVertices();
Instant endTime = Instant.now();
Duration duration = Duration.between(begTime, endTime);
summary.parseFailureVertices(this.parseFailureNum);
summary.insertFailureVertices(this.taskManager.failureNum());
summary.insertSuccessVertices(this.taskManager.successNum());
summary.vertexLoadTime(duration);
System.out.println(" " + summary.insertSuccessVertices());
System.out.print("Vertices has been imported: 0\b");
LoadSummary vertexSummary = this.loadVertices();
System.out.println(vertexSummary.insertSuccess());
// Reset counters
this.resetCounters();

// Prepare to load edges ...
begTime = Instant.now();
System.out.print("Edges has been imported: 0\b\b");
// Load edges
this.loadEdges();
endTime = Instant.now();
duration = Duration.between(begTime, endTime);
summary.parseFailureEdges(this.parseFailureNum);
summary.insertFailureEdges(this.taskManager.failureNum());
summary.insertSuccessEdges(this.taskManager.successNum());
summary.edgeLoadTime(duration);
System.out.println(" " + summary.insertSuccessEdges());
System.out.print("Edges has been imported: 0\b");
LoadSummary edgeSummary = this.loadEdges();
System.out.println(edgeSummary.insertSuccess());
// Reset counters
this.resetCounters();

LoadOptions options = LoadOptions.instance();
// Print load summary
LoaderUtil.printSummary(vertexSummary, edgeSummary);

// Shutdown task manager
this.taskManager.shutdown(options.shutdownTimeout);
return summary;
this.shutdown(this.options.shutdownTimeout);
}

private void resetCounters() {
this.taskManager.cleanup();
this.parseFailureNum = 0L;
}

private void shutdown(int seconds) {
this.taskManager.shutdown(seconds);
}

private void createSchema() {
LoadOptions options = LoadOptions.instance();
File schemaFile = FileUtils.getFile(options.schema);
HugeClient client = HugeClients.get(options);
File schemaFile = FileUtils.getFile(this.options.schema);
HugeClient client = HugeClientWrapper.get(this.options);
GroovyExecutor groovyExecutor = new GroovyExecutor();
groovyExecutor.bind("schema", client.schema());
String script;
try {
script = FileUtils.readFileToString(schemaFile, "UTF-8");
} catch (IOException e) {
throw new LoadException("Read schema file '%s' error",
e, options.schema);
e, this.options.schema);
}
groovyExecutor.execute(script, client);
}

private void loadVertices() {
LoadOptions options = LoadOptions.instance();
private LoadSummary loadVertices() {
Instant beginTime = Instant.now();

// Execute loading tasks
List<VertexSource> vertexSources = this.graphSource.vertexSources();
for (VertexSource source : vertexSources) {
LOG.info("Loading vertex source '{}'", source.label());
InputReader reader = InputReaderFactory.create(source.input());
VertexParser parser = new VertexParser(source, reader);
this.loadVertex(parser);
try {
parser.close();
} catch (Exception e) {
LOG.warn("Failed to close parser for vertex source {}", source);
VertexParser parser = new VertexParser(source, reader,
this.options);
this.loadVertex(parser);
} finally {
try {
reader.close();
} catch (Throwable e) {
LOG.warn("Failed to close reader for vertex source {}",
source, e);
}
}
}
// Waiting async worker threads finish
this.taskManager.waitFinished(options.timeout);
this.taskManager.waitFinished("vertices");

Instant endTime = Instant.now();

LoadSummary summary = new LoadSummary("vertices");
Duration duration = Duration.between(beginTime, endTime);
summary.parseFailure(this.parseFailureNum);
summary.insertFailure(this.taskManager.failureNum());
summary.insertSuccess(this.taskManager.successNum());
summary.loadTime(duration);

return summary;
}

private void loadVertex(VertexParser parser) {
LoadOptions options = LoadOptions.instance();
int batchSize = options.batchSize;
int batchSize = this.options.batchSize;
List<Vertex> batch = new ArrayList<>(batchSize);
while (parser.hasNext()) {
try {
Vertex vertex = parser.next();
batch.add(vertex);
} catch (ParseException e) {
if (options.testMode) {
if (this.options.testMode) {
throw e;
}
LOG.error("Vertex parse error", e);
LOG_PARSE.error(e);
if (++this.parseFailureNum >= options.maxParseErrors) {
exitWithInfo("vertices", options.maxParseErrors);
if (++this.parseFailureNum >= this.options.maxParseErrors) {
LoaderUtil.printError("Error: More than %s vertices " +
"parsing error ... Stopping",
this.options.maxParseErrors);
System.exit(-1);
}
continue;
}
Expand All @@ -207,39 +220,59 @@ private void loadVertex(VertexParser parser) {
}
}

private void loadEdges() {
LoadOptions options = LoadOptions.instance();
private LoadSummary loadEdges() {
Instant beginTime = Instant.now();

List<EdgeSource> edgeSources = this.graphSource.edgeSources();
for (EdgeSource source : edgeSources) {
LOG.info("Loading edge source '{}'", source.label());
InputReader reader = InputReaderFactory.create(source.input());
EdgeParser parser = new EdgeParser(source, reader);
this.loadEdge(parser);
try {
parser.close();
} catch (Exception e) {
LOG.warn("Failed to close parser for edge source {}", source);
EdgeParser parser = new EdgeParser(source, reader,
this.options);
this.loadEdge(parser);
} finally {
try {
reader.close();
} catch (Throwable e) {
LOG.warn("Failed to close reader for edge source {}",
source, e);
}
}
}
// Waiting async worker threads finish
this.taskManager.waitFinished(options.timeout);
this.taskManager.waitFinished("edges");

Instant endTime = Instant.now();

LoadSummary summary = new LoadSummary("edges");
Duration duration = Duration.between(beginTime, endTime);
summary.parseFailure(this.parseFailureNum);
summary.insertFailure(this.taskManager.failureNum());
summary.insertSuccess(this.taskManager.successNum());
summary.loadTime(duration);

return summary;
}

private void loadEdge(EdgeParser parser) {
LoadOptions options = LoadOptions.instance();
int batchSize = options.batchSize;
int batchSize = this.options.batchSize;
List<Edge> batch = new ArrayList<>(batchSize);
while (parser.hasNext()) {
try {
Edge edge = parser.next();
batch.add(edge);
} catch (ParseException e) {
if (options.testMode) {
if (this.options.testMode) {
throw e;
}
LOG.error("Edge parse error", e);
LOG_PARSE.error(e);
if (++this.parseFailureNum >= options.maxParseErrors) {
exitWithInfo("edges", options.maxParseErrors);
if (++this.parseFailureNum >= this.options.maxParseErrors) {
LoaderUtil.printError("Error: More than %s edges " +
"parsing error ... Stopping",
this.options.maxParseErrors);
System.exit(-1);
}
continue;
}
Expand All @@ -252,19 +285,4 @@ private void loadEdge(EdgeParser parser) {
this.taskManager.submitEdgeBatch(batch);
}
}

private void exitWithUsage(int status) {
this.commander.usage();
System.exit(status);
}

private static void exitWithInfo(String type, int parseErrors) {
LOG.error("Too many {} parse error ... Stopping", type);
// Print an empty line.
System.out.println();
System.out.println(String.format(
"Error: More than %s %s parsing error ... Stopping",
parseErrors, type));
System.exit(0);
}
}
25 changes: 8 additions & 17 deletions src/main/java/com/baidu/hugegraph/loader/executor/LoadOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,12 @@

import java.io.File;

import org.slf4j.Logger;

import com.baidu.hugegraph.util.Log;
import com.beust.jcommander.IParameterValidator;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;

public class LoadOptions {

private Logger LOG = Log.logger(LoadOptions.class);

private static final LoadOptions instance = new LoadOptions();

public static LoadOptions instance() {
return instance;
}

private LoadOptions() {}


@Parameter(names = {"-f", "--file"}, required = true, arity = 1,
validateWith = {FileValidator.class},
description = "The path of the data source description file")
Expand Down Expand Up @@ -67,7 +54,7 @@ private LoadOptions() {}
@Parameter(names = {"--num-threads"}, arity = 1,
validateWith = {PositiveValidator.class},
description = "The number of threads to use")
public int numThreads = Runtime.getRuntime().availableProcessors() * 2 - 1;
public int numThreads = Runtime.getRuntime().availableProcessors();

@Parameter(names = {"--batch-size"}, arity = 1,
validateWith = {PositiveValidator.class},
Expand Down Expand Up @@ -97,8 +84,8 @@ private LoadOptions() {}

@Parameter(names = {"--timeout"}, arity = 1,
validateWith = {PositiveValidator.class},
description = "The timeout of inserting task in seconds")
public int timeout = 100;
description = "The timeout of HugeClient request")
public int timeout = 60;

@Parameter(names = {"--retry-times"}, arity = 1,
validateWith = {PositiveValidator.class},
Expand All @@ -114,6 +101,10 @@ private LoadOptions() {}
description = "Whether the hugegraph-loader work in test mode")
public boolean testMode = false;

@Parameter(names = {"--help"}, help = true,
description = "Print usage of HugeGraphLoader")
public boolean help;

public static class UrlValidator implements IParameterValidator {

@Override
Expand Down
Loading

0 comments on commit a2ba0c9

Please sign in to comment.