-
Notifications
You must be signed in to change notification settings - Fork 94
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix two bug about single tasks and executor #12
Conversation
@@ -110,6 +110,10 @@ private LoadOptions() {} | |||
description = "Setting the interval time before retrying") | |||
public int retryInterval = 10; | |||
|
|||
@Parameter(names = {"--continue-when-single-task-timeout"}, arity = 1, | |||
description = "Setting the interval time before retrying") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to add this argument. Loader will wait until all elements inserted except insert errors element.
@@ -286,7 +151,7 @@ private void printProgress(String type, long frequency, int batchSize) { | |||
} | |||
} | |||
|
|||
private void ensurePoolAvailable() { | |||
protected void ensurePoolAvailable() { | |||
while (this.batchService.isShutdown()){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thread pool can be used immediately after created. no need to wait here.
@Parameter(names = {"--continue-when-single-task-timeout"}, arity = 1, | ||
description = "Setting the interval time before retrying") | ||
public boolean continueWhenSingleTaskTimeout = true; | ||
|
||
@Parameter(names = {"--test-mode"}, arity = 1, | ||
description = "Whether the hugegraph-loader work in test mode") | ||
public boolean testMode = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also add “--help” subcommand to print usage information
protected final int futureNum; | ||
protected final Semaphore available; | ||
protected final ListeningExecutorService batchService; | ||
protected final ExecutorService singleExecutor; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use multi-thread to improve single-insertion performance
94048a1
to
4afccad
Compare
@@ -161,7 +151,10 @@ private void createSchema() { | |||
groovyExecutor.execute(script, client); | |||
} | |||
|
|||
private void loadVertices() { | |||
private void loadVertices(LoadSummary summary) { | |||
Instant begTime = Instant.now(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
beginTime
if (this.singleTasks.longValue() != 0L) { | ||
return false; | ||
throw new IllegalStateException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
E.checkState
|
||
InsertEdgeTask(List<Edge> batch) { | ||
EdgeInsertTask(List<Edge> batch) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public
@@ -24,9 +24,9 @@ | |||
import com.baidu.hugegraph.loader.executor.LoadOptions; | |||
import com.baidu.hugegraph.structure.graph.Edge; | |||
|
|||
public class InsertEdgeTask extends InsertTask<Edge> { | |||
public class EdgeInsertTask extends InsertTask<Edge> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InsertionTask
this.available.acquire(this.futureNum); | ||
} | ||
|
||
private void waitSingleTaskFinished() throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
waitSingleTasksFinished
|
||
private void waitSingleTaskFinished() throws InterruptedException { | ||
long total = this.singleTasks.longValue(); | ||
for (long i = 0; i < total; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i < this.singleTasks.longValue()
4afccad
to
4e800e3
Compare
Already edited |
try { | ||
// Wait batch task finished | ||
this.available.acquire(this.futureNum); | ||
this.batchSemaphore.acquire(options.numThreads); | ||
LOG.info("Batch tasks of {} finished", type); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Batch-mode tasks
try { | ||
reader.close(); | ||
} catch (Throwable e) { | ||
LOG.warn("Failed to close parser for edge source {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Failed to close reader
if (++this.parseFailureNum >= options.maxParseErrors) { | ||
exitWithInfo("vertices", options.maxParseErrors); | ||
if (++this.parseFailureNum >= this.options.maxParseErrors) { | ||
LOG.error("Too many vertices parse error ... Stopping"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete it since LOG.error() is called in printError()
if (++this.parseFailureNum >= options.maxParseErrors) { | ||
exitWithInfo("edges", options.maxParseErrors); | ||
if (++this.parseFailureNum >= this.options.maxParseErrors) { | ||
LOG.error("Too many edges parse error ... Stopping"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
return false; | ||
} | ||
this.singleSemaphore.acquire(options.numThreads); | ||
LOG.info("Single tasks of {} finished", type); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Single-mode tasks
} catch (InterruptedException e) { | ||
LOG.error("Tasks is interrupted."); | ||
LOG.error("The batch tasks are interrupted"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto, check somewhere else
public void onFailure(Throwable t) { | ||
singleSemaphore.release(); | ||
} | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move line 282 to LoadUtil
@@ -100,43 +107,29 @@ private void parseAndCheckOptions(String[] args) { | |||
} | |||
|
|||
private LoadSummary load() { | |||
LoadOptions options = LoadOptions.instance(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove it, and remove instance() method from class LoadOptions too
487255e
to
40e96b3
Compare
1. When a single insert task times out, it may still cause statistical errors. 2. TaskManager.shutdown() ignored singleExecutor Fix #12 Change-Id: I9a468241f369f4c7e73a95ba9a5772dac90e0ce1
40e96b3
to
5b59b0b
Compare
already edited |
GroovyExecutor groovyExecutor = new GroovyExecutor(); | ||
groovyExecutor.bind("schema", client.schema()); | ||
String script; | ||
try { | ||
script = FileUtils.readFileToString(schemaFile, "UTF-8"); | ||
} catch (IOException e) { | ||
throw new LoadException("Read schema file '%s' error", | ||
e, options.schema); | ||
e, this.options.schema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move '%s' to the end
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will be modified in the issue #15
public static String backward(long word) { | ||
StringBuilder backward = new StringBuilder(); | ||
for (int i = 0, len = String.valueOf(word).length(); i <= len; i++) { | ||
backward.append("\b"); | ||
} | ||
return backward.toString(); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move backward() to the bottom of file
this.batchSemaphore = new Semaphore(options.numThreads); | ||
this.singleSemaphore = new Semaphore(options.numThreads); | ||
this.batchService = MoreExecutors.listeningDecorator( | ||
Executors.newFixedThreadPool(options.numThreads)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ExecutorUtil.newFixedThreadPool
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add TODO
// That's fine, just continue. | ||
} | ||
if (this.batchService.isShutdown()) { | ||
throw new LoadException("Thread pool has been closed"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add singleService check too
5b59b0b
to
84c5868
Compare
this.batchSemaphore = new Semaphore(options.numThreads); | ||
this.singleSemaphore = new Semaphore(options.numThreads); | ||
this.batchService = MoreExecutors.listeningDecorator( | ||
Executors.newFixedThreadPool(options.numThreads)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add TODO
} | ||
|
||
public static void printInBackward(long count) { | ||
System.out.print(String.format(" %d%s", count, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove the first space and let backward() return the size without space
remember update loadVertices and loadEdges
84c5868
to
a2ba0c9
Compare
reader.close(); | ||
} catch (Throwable e) { | ||
LOG.warn("Failed to close reader for vertex source {}", | ||
source, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOG.warn("Failed to close reader for vertex source {} with exception {}",source, e)
try { | ||
reader.close(); | ||
} catch (Throwable e) { | ||
LOG.warn("Failed to close reader for edge source {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
a2ba0c9
to
ed1d263
Compare
1. When a single insert task times out, it may still cause statistical errors. 2. TaskManager.shutdown() ignored singleExecutor Fix #11 Change-Id: I9a468241f369f4c7e73a95ba9a5772dac90e0ce1
ed1d263
to
8421f70
Compare
Fix #12 Change-Id: I2ae4e4e6e3d1749981155e477887a8a4269a26ba
Fix #11
Change-Id: I9a468241f369f4c7e73a95ba9a5772dac90e0ce1