Skip to content

Commit

Permalink
call resubmitTask() when re-schedule task
Browse files Browse the repository at this point in the history
Change-Id: I2efa9fd4979492c6901e1ae11fbaf2be451f0354
  • Loading branch information
javeme committed Feb 20, 2021
1 parent 5cd5bbf commit 9d81eda
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -382,11 +382,12 @@ protected boolean checkDependenciesSuccess() {
if (this.dependencies == null || this.dependencies.isEmpty()) {
return true;
}
TaskScheduler scheduler = this.scheduler();
for (Id dependency : this.dependencies) {
HugeTask<?> task = this.scheduler().task(dependency);
HugeTask<?> task = scheduler.task(dependency);
if (!task.completed()) {
// Dependent task not completed, re-schedule self
this.scheduler().schedule(this);
scheduler.schedule(this);
return false;
} else if (task.status() == TaskStatus.CANCELLED) {
this.result(TaskStatus.CANCELLED, String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,15 @@ private <V> Future<?> restore(HugeTask<V> task) {
public <V> Future<?> schedule(HugeTask<V> task) {
E.checkArgumentNotNull(task, "Task can't be null");

if (task.status() == TaskStatus.QUEUED) {
/*
* Just submit to queue if status=QUEUED (means re-schedule task)
* NOTE: schedule() method may be called multi times by
* HugeTask.checkDependenciesSuccess() method
*/
return this.resubmitTask(task);
}

if (task.callable() instanceof EphemeralJob) {
/*
* Due to EphemeralJob won't be serialized and deserialized through
Expand Down Expand Up @@ -252,6 +261,16 @@ private <V> Future<?> submitTask(HugeTask<V> task) {
return this.taskExecutor.submit(task);
}

private <V> Future<?> resubmitTask(HugeTask<V> task) {
E.checkArgument(task.status() == TaskStatus.QUEUED,
"Can't resubmit task '%s' with status %s",
task.id(), TaskStatus.QUEUED);
E.checkArgument(this.tasks.containsKey(task.id()),
"Can't resubmit task '%s' not been submitted before",
task.id());
return this.taskExecutor.submit(task);
}

public <V> void initTaskCallable(HugeTask<V> task) {
task.scheduler(this);

Expand Down

0 comments on commit 9d81eda

Please sign in to comment.