Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix task: atomic update/get fields and re-schedule #1361

Merged
merged 2 commits into from
Mar 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions hugegraph-core/src/main/java/com/baidu/hugegraph/task/HugeTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,13 @@ public String result() {
return this.result;
}

private void result(String result) {
private synchronized boolean result(TaskStatus status, String result) {
checkPropertySize(result, P.RESULT);
this.result = result;
if (this.status(status)) {
this.result = result;
return true;
}
return false;
}

public void server(Id server) {
Expand Down Expand Up @@ -319,18 +323,17 @@ public boolean fail(Throwable e) {
LOG.warn("An exception occurred when running task: {}",
this.id(), e);
// Update status to FAILED if exception occurred(not interrupted)
if (this.status(TaskStatus.FAILED)) {
this.result(e.toString());
if (this.result(TaskStatus.FAILED, e.toString())) {
return true;
}
}
return false;
}

public void failSave(Throwable e) {
public void failToSave(Throwable e) {
if (!this.fail(e)) {
// Can't update status, just set result to error message
this.result(e.toString());
this.result = e.toString();
}
}

Expand All @@ -352,9 +355,7 @@ protected void done() {
protected void set(V v) {
String result = JsonUtil.toJson(v);
checkPropertySize(result, P.RESULT);
if (this.status(TaskStatus.SUCCESS)) {
this.result = result;
} else {
if (!this.result(TaskStatus.SUCCESS, result)) {
assert this.completed();
}
// Will call done() and may cause to save to store
Expand All @@ -381,22 +382,21 @@ protected boolean checkDependenciesSuccess() {
if (this.dependencies == null || this.dependencies.isEmpty()) {
return true;
}
TaskScheduler scheduler = this.scheduler();
for (Id dependency : this.dependencies) {
HugeTask<?> task = this.scheduler().task(dependency);
HugeTask<?> task = scheduler.task(dependency);
if (!task.completed()) {
// Dependent task not completed, re-schedule self
this.scheduler().schedule(this);
scheduler.schedule(this);
return false;
} else if (task.status() == TaskStatus.CANCELLED) {
this.status(TaskStatus.CANCELLED);
this.result(String.format(
this.result(TaskStatus.CANCELLED, String.format(
"Cancelled due to dependent task '%s' cancelled",
dependency));
this.done();
return false;
} else if (task.status() == TaskStatus.FAILED) {
this.status(TaskStatus.FAILED);
this.result(String.format(
this.result(TaskStatus.FAILED, String.format(
"Failed due to dependent task '%s' failed",
dependency));
this.done();
Expand Down Expand Up @@ -483,7 +483,7 @@ protected void property(String key, Object value) {
}
}

protected Object[] asArray() {
protected synchronized Object[] asArray() {
E.checkState(this.type != null, "Task type can't be null");
E.checkState(this.name != null, "Task name can't be null");

Expand Down Expand Up @@ -563,7 +563,7 @@ public Map<String, Object> asMap() {
return this.asMap(true);
}

public Map<String, Object> asMap(boolean withDetails) {
public synchronized Map<String, Object> asMap(boolean withDetails) {
E.checkState(this.type != null, "Task type can't be null");
E.checkState(this.name != null, "Task name can't be null");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,15 @@ private <V> Future<?> restore(HugeTask<V> task) {
public <V> Future<?> schedule(HugeTask<V> task) {
E.checkArgumentNotNull(task, "Task can't be null");

if (task.status() == TaskStatus.QUEUED) {
/*
* Just submit to queue if status=QUEUED (means re-schedule task)
* NOTE: schedule() method may be called multi times by
* HugeTask.checkDependenciesSuccess() method
*/
return this.resubmitTask(task);
}

if (task.callable() instanceof EphemeralJob) {
/*
* Due to EphemeralJob won't be serialized and deserialized through
Expand Down Expand Up @@ -252,6 +261,16 @@ private <V> Future<?> submitTask(HugeTask<V> task) {
return this.taskExecutor.submit(task);
}

private <V> Future<?> resubmitTask(HugeTask<V> task) {
E.checkArgument(task.status() == TaskStatus.QUEUED,
"Can't resubmit task '%s' with status %s",
task.id(), TaskStatus.QUEUED);
E.checkArgument(this.tasks.containsKey(task.id()),
"Can't resubmit task '%s' not been submitted before",
task.id());
return this.taskExecutor.submit(task);
}

public <V> void initTaskCallable(HugeTask<V> task) {
task.scheduler(this);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ protected void save() {
e, task.asMap(false));
String message = e.getMessage();
if (message.contains(ERROR_COMMIT) && needSaveWithEx(message)) {
task.failSave(e);
task.failToSave(e);
this.graph().taskScheduler().save(task);
return;
}
Expand Down