Skip to content

Commit

Permalink
add the number of pending tasks to metrics
Browse files Browse the repository at this point in the history
also refactor class name about task to remove prefix 'Huge'

Change-Id: Id21e9d91a0b83b807189fffbee0247219322f234
  • Loading branch information
javeme committed Nov 6, 2018
1 parent 3ec7146 commit 053007a
Show file tree
Hide file tree
Showing 13 changed files with 113 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package com.baidu.hugegraph.api.job;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -39,14 +40,14 @@

import org.slf4j.Logger;

import com.baidu.hugegraph.HugeGraph;
import com.baidu.hugegraph.api.API;
import com.baidu.hugegraph.api.filter.StatusFilter.Status;
import com.baidu.hugegraph.backend.id.IdGenerator;
import com.baidu.hugegraph.core.GraphManager;
import com.baidu.hugegraph.server.RestServer;
import com.baidu.hugegraph.task.HugeTask;
import com.baidu.hugegraph.task.HugeTaskScheduler;
import com.baidu.hugegraph.task.TaskScheduler;
import com.baidu.hugegraph.task.TaskStatus;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.Log;
import com.codahale.metrics.annotation.Timed;
Expand All @@ -71,8 +72,7 @@ public Map<String, List<Object>> list(@Context GraphManager manager,
LOG.debug("Graph [{}] list tasks with status {}, limit {}",
graph, status, limit);

HugeGraph g = graph(manager, graph);
HugeTaskScheduler scheduler = g.taskScheduler();
TaskScheduler scheduler = graph(manager, graph).taskScheduler();

Iterator<HugeTask<Object>> itor;
if (status == null) {
Expand All @@ -97,8 +97,7 @@ public Map<String, Object> get(@Context GraphManager manager,
@PathParam("id") long id) {
LOG.debug("Graph [{}] get task: {}", graph, id);

HugeGraph g = graph(manager, graph);
HugeTaskScheduler scheduler = g.taskScheduler();
TaskScheduler scheduler = graph(manager, graph).taskScheduler();
return scheduler.task(IdGenerator.of(id)).asMap();
}

Expand All @@ -110,8 +109,7 @@ public void delete(@Context GraphManager manager,
@PathParam("id") long id) {
LOG.debug("Graph [{}] delete task: {}", graph, id);

HugeGraph g = graph(manager, graph);
HugeTaskScheduler scheduler = g.taskScheduler();
TaskScheduler scheduler = graph(manager, graph).taskScheduler();
HugeTask<?> task = scheduler.deleteTask(IdGenerator.of(id));
E.checkArgument(task != null, "There is no task with id '%s'", id);
}
Expand All @@ -132,11 +130,8 @@ public Map<String, Object> update(@Context GraphManager manager,
"Not support action '%s'", action));
}

HugeGraph g = graph(manager, graph);
HugeTaskScheduler scheduler = g.taskScheduler();

TaskScheduler scheduler = graph(manager, graph).taskScheduler();
HugeTask<?> task = scheduler.task(IdGenerator.of(id));

if (!task.completed()) {
scheduler.cancel(task);
} else {
Expand All @@ -146,14 +141,13 @@ public Map<String, Object> update(@Context GraphManager manager,
return ImmutableMap.of("cancelled", task.isCancelled());
}

private static com.baidu.hugegraph.task.Status parseStatus(String status) {
private static TaskStatus parseStatus(String status) {
try {
return com.baidu.hugegraph.task.Status.valueOf(status);
return TaskStatus.valueOf(status);
} catch (Exception e) {
throw new IllegalArgumentException(String.format(
"Status value must be in [UNKNOWN, NEW, QUEUED, " +
"RESTORING, RUNNING, SUCCESS, CANCELLED, FAILED], " +
"but got '%s'", status));
"Status value must be in %s, but got '%s'",
Arrays.asList(TaskStatus.values()), status));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import com.baidu.hugegraph.exception.NotSupportException;
import com.baidu.hugegraph.schema.SchemaManager;
import com.baidu.hugegraph.structure.HugeFeatures;
import com.baidu.hugegraph.task.HugeTaskScheduler;
import com.baidu.hugegraph.task.TaskScheduler;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.Log;

Expand Down Expand Up @@ -178,7 +178,7 @@ public boolean restoring() {
return this.hugegraph.restoring();
}

public HugeTaskScheduler taskScheduler() {
public TaskScheduler taskScheduler() {
this.verifyPermission();
return this.hugegraph.taskScheduler();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import com.baidu.hugegraph.serializer.JsonSerializer;
import com.baidu.hugegraph.serializer.Serializer;
import com.baidu.hugegraph.server.RestServer;
import com.baidu.hugegraph.task.TaskManager;
import com.baidu.hugegraph.util.Log;
import com.codahale.metrics.MetricRegistry;

Expand Down Expand Up @@ -216,6 +217,14 @@ private void addMetrics(HugeConfig config) {
}
return count;
});

// Add metrics for task
MetricsUtil.registerGauge(TaskManager.class, "workers", () -> {
return TaskManager.instance().workerPoolSize();
});
MetricsUtil.registerGauge(TaskManager.class, "pending-tasks", () -> {
return TaskManager.instance().pendingTasks();
});
}

private void registerCacheMetrics() {
Expand Down
14 changes: 7 additions & 7 deletions hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@
import com.baidu.hugegraph.schema.SchemaManager;
import com.baidu.hugegraph.schema.VertexLabel;
import com.baidu.hugegraph.structure.HugeFeatures;
import com.baidu.hugegraph.task.HugeTaskManager;
import com.baidu.hugegraph.task.HugeTaskScheduler;
import com.baidu.hugegraph.task.TaskManager;
import com.baidu.hugegraph.task.TaskScheduler;
import com.baidu.hugegraph.traversal.optimize.HugeGraphStepStrategy;
import com.baidu.hugegraph.traversal.optimize.HugeVertexStepStrategy;
import com.baidu.hugegraph.util.E;
Expand Down Expand Up @@ -104,7 +104,7 @@ public class HugeGraph implements Graph {
private final EventHub schemaEventHub;
private final EventHub indexEventHub;
private final RateLimiter rateLimiter;
private final HugeTaskManager taskManager;
private final TaskManager taskManager;

private final HugeFeatures features;

Expand All @@ -122,7 +122,7 @@ public HugeGraph(HugeConfig configuration) {
final int limit = configuration.get(CoreOptions.RATE_LIMIT);
this.rateLimiter = limit > 0 ? RateLimiter.create(limit) : null;

this.taskManager = HugeTaskManager.instance();
this.taskManager = TaskManager.instance();

this.features = new HugeFeatures(this, true);

Expand Down Expand Up @@ -294,8 +294,8 @@ public Analyzer analyzer() {
return AnalyzerFactory.analyzer(name, mode);
}

public HugeTaskScheduler taskScheduler() {
HugeTaskScheduler scheduler = this.taskManager.getScheduler(this);
public TaskScheduler taskScheduler() {
TaskScheduler scheduler = this.taskManager.getScheduler(this);
E.checkState(scheduler != null,
"Can't find task scheduler for graph '%s'", this);
return scheduler;
Expand Down Expand Up @@ -522,7 +522,7 @@ public Id[] mapVlName2Id(String[] vertexLabels) {
*/
public static void shutdown(long timout) throws InterruptedException {
EventHub.destroy(timout);
HugeTaskManager.instance().shutdown(timout);
TaskManager.instance().shutdown(timout);
}

private class TinkerpopTransaction extends AbstractThreadLocalTransaction {
Expand Down
4 changes: 2 additions & 2 deletions hugegraph-core/src/main/java/com/baidu/hugegraph/job/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import java.util.Date;

import com.baidu.hugegraph.task.HugeTask;
import com.baidu.hugegraph.task.HugeTaskCallable;
import com.baidu.hugegraph.task.TaskCallable;

public abstract class Job<T> extends HugeTaskCallable<T> {
public abstract class Job<T> extends TaskCallable<T> {

public abstract String type();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import com.baidu.hugegraph.HugeGraph;
import com.baidu.hugegraph.backend.id.Id;
import com.baidu.hugegraph.task.HugeTask;
import com.baidu.hugegraph.task.HugeTaskScheduler;
import com.baidu.hugegraph.task.TaskScheduler;
import com.baidu.hugegraph.type.HugeType;
import com.baidu.hugegraph.util.E;

Expand Down Expand Up @@ -68,7 +68,7 @@ public HugeTask<T> schedule() {
task.input(this.input);
}

HugeTaskScheduler scheduler = this.graph.taskScheduler();
TaskScheduler scheduler = this.graph.taskScheduler();
scheduler.schedule(task);
scheduler.save(task);

Expand Down
45 changes: 25 additions & 20 deletions hugegraph-core/src/main/java/com/baidu/hugegraph/task/HugeTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,15 @@
public class HugeTask<V> extends FutureTask<V> {

private static final Logger LOG = Log.logger(HugeTask.class);
private static final Set<Status> COMPLETED_STATUSES =
ImmutableSet.of(Status.SUCCESS, Status.CANCELLED, Status.FAILED);
private static final Set<TaskStatus> COMPLETED_STATUSES;

private final HugeTaskCallable<V> callable;
static {
COMPLETED_STATUSES = ImmutableSet.of(TaskStatus.SUCCESS,
TaskStatus.CANCELLED,
TaskStatus.FAILED);
}

private final TaskCallable<V> callable;

private String type;
private String name;
Expand All @@ -56,19 +61,19 @@ public class HugeTask<V> extends FutureTask<V> {
private List<Id> children;
private String description;
private Date create;
private volatile Status status;
private volatile TaskStatus status;
private volatile int progress;
private volatile Date update;
private volatile int retries;
private volatile String input;
private volatile String result;

public HugeTask(Id id, Id parent, String callable, String input) {
this(id, parent, HugeTaskCallable.fromClass(callable));
this(id, parent, TaskCallable.fromClass(callable));
this.input = input;
}

public HugeTask(Id id, Id parent, HugeTaskCallable<V> callable) {
public HugeTask(Id id, Id parent, TaskCallable<V> callable) {
super(callable);

E.checkArgumentNotNull(id, "Task id can't be null");
Expand All @@ -82,7 +87,7 @@ public HugeTask(Id id, Id parent, HugeTaskCallable<V> callable) {
this.parent = parent;
this.children = null;
this.description = null;
this.status = Status.NEW;
this.status = TaskStatus.NEW;
this.progress = 0;
this.create = new Date();
this.update = null;
Expand Down Expand Up @@ -110,7 +115,7 @@ public void child(Id id) {
this.children.add(id);
}

public Status status() {
public TaskStatus status() {
return this.status;
}

Expand Down Expand Up @@ -193,8 +198,8 @@ public String toString() {

@Override
public void run() {
assert this.status.code() < Status.RUNNING.code();
this.status(Status.RUNNING);
assert this.status.code() < TaskStatus.RUNNING.code();
this.status(TaskStatus.RUNNING);
super.run();
}

Expand All @@ -203,7 +208,7 @@ public boolean cancel(boolean mayInterruptIfRunning) {
try {
return super.cancel(mayInterruptIfRunning);
} finally {
this.status(Status.CANCELLED);
this.status(TaskStatus.CANCELLED);
try {
this.callable.cancelled();
} catch (Throwable e) {
Expand All @@ -225,7 +230,7 @@ protected void done() {

@Override
protected void set(V v) {
this.status(Status.SUCCESS);
this.status(TaskStatus.SUCCESS);
if (v != null) {
this.result = v.toString();
}
Expand All @@ -234,22 +239,22 @@ protected void set(V v) {

@Override
protected void setException(Throwable e) {
if (!(this.status == Status.CANCELLED &&
if (!(this.status == TaskStatus.CANCELLED &&
e instanceof InterruptedException)) {
LOG.warn("An exception occurred when running task: {}",
this.id(), e);
// Update status to FAILED if exception occurred(not interrupted)
this.status(Status.FAILED);
this.status(TaskStatus.FAILED);
this.result = e.toString();
}
super.setException(e);
}

protected HugeTaskCallable<V> callable() {
protected TaskCallable<V> callable() {
return this.callable;
}

protected void status(Status status) {
protected void status(TaskStatus status) {
this.status = status;
}

Expand All @@ -266,7 +271,7 @@ protected void property(String key, Object value) {
this.description = (String) value;
break;
case P.STATUS:
this.status(SerialEnum.fromCode(Status.class, (byte) value));
this.status(SerialEnum.fromCode(TaskStatus.class, (byte) value));
break;
case P.PROGRESS:
this.progress = (int) value;
Expand Down Expand Up @@ -388,11 +393,11 @@ public Map<String, Object> asMap(boolean withDetails) {

public static <V> HugeTask<V> fromVertex(Vertex vertex) {
String callableName = vertex.value(P.CALLABLE);
HugeTaskCallable<V> callable;
TaskCallable<V> callable;
try {
callable = HugeTaskCallable.fromClass(callableName);
callable = TaskCallable.fromClass(callableName);
} catch (Exception e) {
callable = HugeTaskCallable.empty(e);
callable = TaskCallable.empty(e);
}

HugeTask<V> task = new HugeTask<>((Id) vertex.id(), null, callable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
import com.baidu.hugegraph.HugeGraph;
import com.baidu.hugegraph.util.E;

public abstract class HugeTaskCallable<V> implements Callable<V> {
public abstract class TaskCallable<V> implements Callable<V> {

private HugeTaskScheduler scheduler = null;
private TaskScheduler scheduler = null;
private HugeTask<V> task = null;

public HugeTaskCallable() {
public TaskCallable() {
// pass
}

Expand All @@ -46,11 +46,11 @@ public HugeGraph graph() {
return this.scheduler().graph();
}

protected void scheduler(HugeTaskScheduler scheduler) {
protected void scheduler(TaskScheduler scheduler) {
this.scheduler = scheduler;
}

public HugeTaskScheduler scheduler() {
public TaskScheduler scheduler() {
E.checkState(this.scheduler != null,
"Can't call scheduler() before scheduling task");
return this.scheduler;
Expand All @@ -67,17 +67,17 @@ public HugeTask<V> task() {
}

@SuppressWarnings("unchecked")
public static <V> HugeTaskCallable<V> fromClass(String className) {
public static <V> TaskCallable<V> fromClass(String className) {
try {
Class<?> clazz = Class.forName(className);
return (HugeTaskCallable<V>) clazz.newInstance();
return (TaskCallable<V>) clazz.newInstance();
} catch (Exception e) {
throw new HugeException("Failed to load task: %s", e, className);
}
}

public static <V> HugeTaskCallable<V> empty(Exception e) {
return new HugeTaskCallable<V>() {
public static <V> TaskCallable<V> empty(Exception e) {
return new TaskCallable<V>() {
@Override
public V call() throws Exception {
throw e;
Expand Down
Loading

0 comments on commit 053007a

Please sign in to comment.