Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix unexpected task status #1767

Merged
merged 4 commits into from
Mar 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public Map<String, Object> update(@Context GraphManager manager,

private static TaskStatus parseStatus(String status) {
try {
return TaskStatus.valueOf(status);
return TaskStatus.valueOf(status.toUpperCase());
} catch (Exception e) {
throw new IllegalArgumentException(String.format(
"Status value must be in %s, but got '%s'",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,14 +275,19 @@ public String toString() {
@Override
public void run() {
if (this.cancelled()) {
// Scheduled task is running after cancelled
// A task is running after cancelled which scheduled/queued before
return;
}

TaskManager.setContext(this.context());
try {
assert this.status.code() < TaskStatus.RUNNING.code() : this.status;
if (this.checkDependenciesSuccess()) {
/*
* FIXME: worker node may reset status to RUNNING here, and the
* status in DB is CANCELLING that set by master node,
* it will lead to cancel() operation not to take effect.
*/
this.status(TaskStatus.RUNNING);
super.run();
}
Expand All @@ -308,7 +313,7 @@ public boolean cancel(boolean mayInterruptIfRunning) {
// Callback for saving status to store
this.callable.cancelled();
} else {
// Maybe the worker is still running then set status SUCCESS
// Maybe worker node is still running then set status SUCCESS
cancelled = false;
}
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,7 @@ public synchronized void initServerInfo(Id server, NodeRole role) {
} while (page != null);
}

HugeServerInfo serverInfo = new HugeServerInfo(server, role);
serverInfo.maxLoad(this.calcMaxLoad());
this.save(serverInfo);

LOG.info("Init server info: {}", serverInfo);
this.saveServerInfo(this.selfServerId, this.selfServerRole);
}

public Id selfServerId() {
Expand All @@ -186,8 +182,10 @@ public boolean onlySingleNode() {

public void heartbeat() {
HugeServerInfo serverInfo = this.selfServerInfo();
if (serverInfo == null) {
return;
if (serverInfo == null && this.selfServerId != null &&
this.selfServerRole != NodeRole.MASTER) {
serverInfo = this.saveServerInfo(this.selfServerId,
this.selfServerRole);
}
serverInfo.updateTime(DateUtil.now());
this.save(serverInfo);
Expand Down Expand Up @@ -239,7 +237,11 @@ protected synchronized HugeServerInfo pickWorkerNode(
}
}

this.onlySingleNode = !hasWorkerNode;
boolean singleNode = !hasWorkerNode;
if (singleNode != this.onlySingleNode) {
LOG.info("Switch only_single_node to {}", singleNode);
this.onlySingleNode = singleNode;
}

// Only schedule to master if there is no workers and master is suitable
if (!hasWorkerNode) {
Expand All @@ -260,26 +262,35 @@ private GraphTransaction tx() {
return this.graph.systemTransaction();
}

private Id save(HugeServerInfo server) {
private HugeServerInfo saveServerInfo(Id server, NodeRole role) {
HugeServerInfo serverInfo = new HugeServerInfo(server, role);
serverInfo.maxLoad(this.calcMaxLoad());
this.save(serverInfo);

LOG.info("Init server info: {}", serverInfo);
return serverInfo;
}

private Id save(HugeServerInfo serverInfo) {
return this.call(() -> {
// Construct vertex from server info
HugeServerInfo.Schema schema = HugeServerInfo.schema(this.graph);
if (!schema.existVertexLabel(HugeServerInfo.P.SERVER)) {
throw new HugeException("Schema is missing for %s '%s'",
HugeServerInfo.P.SERVER, server);
HugeServerInfo.P.SERVER, serverInfo);
}
HugeVertex vertex = this.tx().constructVertex(false,
server.asArray());
serverInfo.asArray());
// Add or update server info in backend store
vertex = this.tx().addVertex(vertex);
return vertex.id();
});
}

private int save(Collection<HugeServerInfo> servers) {
private int save(Collection<HugeServerInfo> serverInfos) {
return this.call(() -> {
if (servers.isEmpty()) {
return servers.size();
if (serverInfos.isEmpty()) {
return serverInfos.size();
}
HugeServerInfo.Schema schema = HugeServerInfo.schema(this.graph);
if (!schema.existVertexLabel(HugeServerInfo.P.SERVER)) {
Expand All @@ -289,7 +300,7 @@ private int save(Collection<HugeServerInfo> servers) {
// Save server info in batch
GraphTransaction tx = this.tx();
int updated = 0;
for (HugeServerInfo server : servers) {
for (HugeServerInfo server : serverInfos) {
if (!server.updated()) {
continue;
}
Expand Down Expand Up @@ -319,7 +330,11 @@ private <V> V call(Callable<V> callable) {
}

private HugeServerInfo selfServerInfo() {
return this.serverInfo(this.selfServerId);
HugeServerInfo selfServerInfo = this.serverInfo(this.selfServerId);
if (selfServerInfo == null) {
LOG.warn("ServerInfo is missing: {}", this.selfServerId);
}
return selfServerInfo;
}

private HugeServerInfo serverInfo(Id server) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ public <V> Future<?> schedule(HugeTask<V> task) {
/*
* Due to EphemeralJob won't be serialized and deserialized through
* shared storage, submit EphemeralJob immediately on master
* NOTE: don't need to save EphemeralJob task
*/
task.status(TaskStatus.QUEUED);
return this.submitTask(task);
Expand All @@ -228,16 +229,16 @@ public <V> Future<?> schedule(HugeTask<V> task) {

if (this.serverManager().onlySingleNode() && !task.computer()) {
/*
* Speed up for single node, submit task immediately
* this can be removed without affecting logic
* Speed up for single node, submit task immediately,
* this code can be removed without affecting logic
*/
task.status(TaskStatus.QUEUED);
task.server(this.serverManager().selfServerId());
this.save(task);
return this.submitTask(task);
} else {
/*
* Just set SCHEDULING status and save task
* Just set SCHEDULING status and save task,
* it will be scheduled by periodic scheduler worker
*/
task.status(TaskStatus.SCHEDULING);
Expand Down Expand Up @@ -394,6 +395,7 @@ protected void executeTasksOnWorker(Id server) {
}
if (taskServer.equals(server)) {
task.status(TaskStatus.QUEUED);
this.save(task);
this.submitTask(task);
}
}
Expand Down Expand Up @@ -467,6 +469,7 @@ protected void remove(HugeTask<?> task) {

@Override
public <V> void save(HugeTask<V> task) {
LOG.debug("Saving task: {}", task);
task.scheduler(this);
E.checkArgumentNotNull(task, "Task can't be null");
this.call(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public final class TaskManager {
"server-info-db-worker-%d";
public static final String TASK_SCHEDULER = "task-scheduler-%d";

protected static final int SCHEDULE_PERIOD = 1; // Unit second
protected static final long SCHEDULE_PERIOD = 1000L; // unit ms

private static final int THREADS = 4;
private static final TaskManager MANAGER = new TaskManager(THREADS);
Expand Down Expand Up @@ -79,10 +79,11 @@ private TaskManager(int pool) {
// For schedule task to run, just one thread is ok
this.schedulerExecutor = ExecutorUtil.newPausableScheduledThreadPool(
1, TASK_SCHEDULER);
// Start after 10s waiting for HugeGraphServer startup
// Start after 10x period time waiting for HugeGraphServer startup
this.schedulerExecutor.scheduleWithFixedDelay(this::scheduleOrExecuteJob,
10L, SCHEDULE_PERIOD,
TimeUnit.SECONDS);
10 * SCHEDULE_PERIOD,
SCHEDULE_PERIOD,
TimeUnit.MILLISECONDS);
}

public void addScheduler(HugeGraphParams graph) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,13 @@
import org.junit.BeforeClass;

import com.baidu.hugegraph.HugeException;
import com.baidu.hugegraph.util.CollectionUtil;
import com.baidu.hugegraph.util.JsonUtil;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;

public class BaseApiTest {
Expand Down Expand Up @@ -294,7 +296,7 @@ protected static void initEdgeLabel() {
+ "}");
}

protected static void initIndexLabel() {
protected static int initIndexLabel() {
String path = URL_PREFIX + SCHEMA_ILS;

Response r = client.post(path, "{\n"
Expand All @@ -303,11 +305,13 @@ protected static void initIndexLabel() {
+ "\"base_value\": \"person\",\n"
+ "\"index_type\": \"SECONDARY\",\n"
+ "\"check_exist\": false,\n"
+ "\"rebuild\": false,\n"
+ "\"fields\": [\n"
+ "\"city\"\n"
+ "]\n"
+ "}");
assertResponseStatus(202, r);
String content = assertResponseStatus(202, r);
return assertJsonContains(content, "task_id");
}

protected static void initEdge() {
Expand Down Expand Up @@ -519,6 +523,8 @@ protected static void clearSchema() {
List<Map> list = readList(content, type, Map.class);
List<Object> names = list.stream().map(e -> e.get("name"))
.collect(Collectors.toList());
Assert.assertTrue("Expect all names are unique: " + names,
CollectionUtil.allUnique(names));
Set<Integer> tasks = new HashSet<>();
names.forEach(name -> {
Response response = client.delete(path, (String) name);
Expand All @@ -540,13 +546,30 @@ protected static void clearSchema() {
}

protected static void waitTaskSuccess(int task) {
waitTaskStatus(task, ImmutableSet.of("success"));
}

protected static void waitTaskCompleted(int task) {
Set<String> completed = ImmutableSet.of("success",
"cancelled",
"failed");
waitTaskStatus(task, completed);
}

protected static void waitTaskStatus(int task, Set<String> expectedStatus) {
String status;
int times = 0;
int maxTimes = 100000;
do {
Response r = client.get("/graphs/hugegraph/tasks/",
String.valueOf(task));
String content = assertResponseStatus(200, r);
status = assertJsonContains(content, "task_status");
} while (!"success".equals(status));
if (times++ > maxTimes) {
Assert.fail(String.format("Failed to wait for task %s " +
"due to timeout", task));
}
} while (!expectedStatus.contains(status));
}

protected static String parseId(String content) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public void prepareSchema() {

@Test
public void testList() {
// create a task
int taskId = this.rebuild();

Response r = client().get(path, ImmutableMap.of("limit", -1));
Expand All @@ -52,6 +53,12 @@ public void testList() {
assertArrayContains(tasks, "id", taskId);

waitTaskSuccess(taskId);

r = client().get(path, String.valueOf(taskId));
content = assertResponseStatus(200, r);
String status = assertJsonContains(content, "task_status");
Assert.assertEquals("success", status);

/*
* FIXME: sometimes may get results of RUNNING tasks after the task
* status is SUCCESS, which is stored in DB if there are worker
Expand All @@ -62,23 +69,36 @@ public void testList() {
r = client().get(path, ImmutableMap.of("status", "RUNNING"));
content = assertResponseStatus(200, r);
tasks = assertJsonContains(content, "tasks");
Assert.assertTrue(tasks.toString(), tasks.isEmpty());
String message = String.format("Expect none RUNNING tasks(%d), " +
"but got %s", taskId, tasks);
Assert.assertTrue(message, tasks.isEmpty());
}

@Test
public void testGet() {
// create a task
int taskId = this.rebuild();

Response r = client().get(path, String.valueOf(taskId));
String content = assertResponseStatus(200, r);
assertJsonContains(content, "id");

waitTaskSuccess(taskId);

r = client().get(path, String.valueOf(taskId));
content = assertResponseStatus(200, r);
String status = assertJsonContains(content, "task_status");
Assert.assertEquals("success", status);
}

@Test
public void testCancel() {
// create a task
int taskId = this.gremlinJob();

sleepAWhile();

// cancel task
Map<String, Object> params = ImmutableMap.of("action", "cancel");
Response r = client().put(path, String.valueOf(taskId), "", params);
String content = r.readEntity(String.class);
Expand All @@ -88,6 +108,12 @@ public void testCancel() {
String status = assertJsonContains(content, "task_status");
Assert.assertTrue(status, status.equals("cancelling") ||
status.equals("cancelled"));
/*
* NOTE: should be waitTaskStatus(taskId, "cancelled"), but worker
* node may ignore the CANCELLING status due to now we can't atomic
* update task status, and then the task is running to SUCCESS.
*/
waitTaskCompleted(taskId);
} else {
assert r.getStatus() == 400;
String error = String.format(
Expand All @@ -103,14 +129,17 @@ public void testCancel() {

@Test
public void testDelete() {
// create a task
int taskId = this.rebuild();

waitTaskSuccess(taskId);
// delete task
Response r = client().delete(path, String.valueOf(taskId));
assertResponseStatus(204, r);
}

private int rebuild() {
// create a rebuild_index task
String rebuildPath = "/graphs/hugegraph/jobs/rebuild/indexlabels";
String personByCity = "personByCity";
Map<String, Object> params = ImmutableMap.of();
Expand Down