From 84aa90f0d7edf2ef43b844a61be7243eaa70a4d5 Mon Sep 17 00:00:00 2001 From: zhangyi51 Date: Mon, 12 Nov 2018 15:23:21 +0800 Subject: [PATCH] Resume uncompleted tasks if the service is restarted implement: #137 Change-Id: I8b90631b862449f9dac47bf4990089b014ae9830 --- .../java/com/baidu/hugegraph/core/GraphManager.java | 9 +++++++++ .../main/java/com/baidu/hugegraph/task/HugeTask.java | 11 +---------- .../java/com/baidu/hugegraph/task/TaskScheduler.java | 11 +++++++++++ .../java/com/baidu/hugegraph/task/TaskStatus.java | 12 ++++++++++++ 4 files changed, 33 insertions(+), 10 deletions(-) diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/core/GraphManager.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/core/GraphManager.java index e52c82e06a..442ede6a15 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/core/GraphManager.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/core/GraphManager.java @@ -67,6 +67,7 @@ public GraphManager(HugeConfig conf) { this.loadGraphs(conf.getMap(ServerOptions.GRAPHS)); this.checkBackendVersionOrExit(); + this.restoreUncompletedTasks(); this.addMetrics(conf); } @@ -193,6 +194,14 @@ private void checkBackendVersionOrExit() { } } + private void restoreUncompletedTasks() { + for (String graph : this.graphs()) { + HugeGraph hugegraph = this.graph(graph); + assert hugegraph != null; + hugegraph.taskScheduler().restoreTasks(); + } + } + private void addMetrics(HugeConfig config) { final MetricManager metric = MetricManager.INSTANCE; // Force to add server reporter diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/task/HugeTask.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/task/HugeTask.java index 88fe7c914a..a20f44326a 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/task/HugeTask.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/task/HugeTask.java @@ -26,7 +26,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.FutureTask; import org.apache.tinkerpop.gremlin.structure.Graph.Hidden; @@ -39,18 +38,10 @@ import com.baidu.hugegraph.type.define.SerialEnum; import com.baidu.hugegraph.util.E; import com.baidu.hugegraph.util.Log; -import com.google.common.collect.ImmutableSet; public class HugeTask extends FutureTask { private static final Logger LOG = Log.logger(HugeTask.class); - private static final Set COMPLETED_STATUSES; - - static { - COMPLETED_STATUSES = ImmutableSet.of(TaskStatus.SUCCESS, - TaskStatus.CANCELLED, - TaskStatus.FAILED); - } private final TaskCallable callable; @@ -188,7 +179,7 @@ public String result() { } public boolean completed() { - return COMPLETED_STATUSES.contains(this.status); + return TaskStatus.COMPLETED_STATUSES.contains(this.status); } @Override diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/task/TaskScheduler.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/task/TaskScheduler.java index 7f369d51c6..bf099fbea8 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/task/TaskScheduler.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/task/TaskScheduler.java @@ -119,6 +119,17 @@ private void listenChanges() { }); } + public void restoreTasks() { + // Restore 'RESTORING', 'RUNNING' and 'QUEUED' tasks in order. + for (TaskStatus status : TaskStatus.PENDING_STATUSES) { + Iterator> iter = this.findTask(status, NO_LIMIT); + while (iter.hasNext()) { + HugeTask task = iter.next(); + this.restore(task); + } + } + } + public Future restore(HugeTask task) { E.checkArgumentNotNull(task, "Task can't be null"); E.checkState(!task.isDone(), "No need to restore task '%s', " + diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/task/TaskStatus.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/task/TaskStatus.java index b39844751d..8de9ea702e 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/task/TaskStatus.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/task/TaskStatus.java @@ -19,7 +19,12 @@ package com.baidu.hugegraph.task; +import java.util.List; +import java.util.Set; + import com.baidu.hugegraph.type.define.SerialEnum; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; public enum TaskStatus implements SerialEnum { @@ -33,6 +38,13 @@ public enum TaskStatus implements SerialEnum { CANCELLED(6, "cancelled"), FAILED(7, "failed"); + // NOTE: order is important(RESTORING > RUNNING > QUEUED) when restoring + public static final List PENDING_STATUSES = ImmutableList.of( + TaskStatus.RESTORING, TaskStatus.RUNNING, TaskStatus.QUEUED); + + public static final Set COMPLETED_STATUSES = ImmutableSet.of( + TaskStatus.SUCCESS, TaskStatus.CANCELLED, TaskStatus.FAILED); + private byte status = 0; private String name;