diff --git a/hugegraph-api/pom.xml b/hugegraph-api/pom.xml index a78ffe3263..36b837946e 100644 --- a/hugegraph-api/pom.xml +++ b/hugegraph-api/pom.xml @@ -101,7 +101,7 @@ - 0.29.0.0 + 0.30.0.0 diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/api/job/TaskAPI.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/job/TaskAPI.java index 56b85a42f1..f55eb4e113 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/api/job/TaskAPI.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/job/TaskAPI.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import javax.inject.Singleton; import javax.ws.rs.BadRequestException; @@ -42,6 +43,7 @@ import com.baidu.hugegraph.api.API; import com.baidu.hugegraph.api.filter.StatusFilter.Status; +import com.baidu.hugegraph.backend.id.Id; import com.baidu.hugegraph.backend.id.IdGenerator; import com.baidu.hugegraph.core.GraphManager; import com.baidu.hugegraph.server.RestServer; @@ -58,6 +60,7 @@ public class TaskAPI extends API { private static final Logger LOG = Log.logger(RestServer.class); + private static final long NO_LIMIT = -1L; public static final String ACTION_CANCEL = "cancel"; @@ -67,6 +70,7 @@ public class TaskAPI extends API { public Map> list(@Context GraphManager manager, @PathParam("graph") String graph, @QueryParam("status") String status, + @QueryParam("ids") List ids, @QueryParam("limit") @DefaultValue("100") long limit) { LOG.debug("Graph [{}] list tasks with status {}, limit {}", @@ -75,16 +79,35 @@ public Map> list(@Context GraphManager manager, TaskScheduler scheduler = graph(manager, graph).taskScheduler(); Iterator> itor; - if (status == null) { - itor = scheduler.findAllTask(limit); + + if (!ids.isEmpty()) { + LOG.debug("Graph [{}] list tasks with ids {}, limit {}", + graph, ids, limit); + E.checkArgument(status == null, + "Not support status when query task by ids, " + + "but got status='%s'", status); + // Set limit to NO_LIMIT to ignore limit when query task by ids + limit = NO_LIMIT; + List idList = ids.stream().map(IdGenerator::of) + .collect(Collectors.toList()); + itor = scheduler.tasks(idList); } else { - itor = scheduler.findTask(parseStatus(status), limit); + LOG.debug("Graph [{}] list tasks with status {}, limit {}", + graph, status, limit); + if (status == null) { + itor = scheduler.findAllTask(limit); + } else { + itor = scheduler.findTask(parseStatus(status), limit); + } } List tasks = new ArrayList<>(); while (itor.hasNext()) { tasks.add(itor.next().asMap(false)); } + if (limit != NO_LIMIT && tasks.size() > limit) { + tasks = tasks.subList(0, (int) limit); + } return ImmutableMap.of("tasks", tasks); } diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/api/schema/IndexLabelAPI.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/schema/IndexLabelAPI.java index 2649bd1782..19ffcf5a30 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/api/schema/IndexLabelAPI.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/schema/IndexLabelAPI.java @@ -67,8 +67,8 @@ public String create(@Context GraphManager manager, HugeGraph g = graph(manager, graph); IndexLabel.Builder builder = jsonIndexLabel.convert2Builder(g); - IndexLabel indexLabel = builder.create(); - return manager.serializer(g).writeIndexlabel(indexLabel); + IndexLabel.CreatedIndexLabel il = builder.createWithTask(); + return manager.serializer(g).writeCreatedIndexLabel(il); } @GET diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/serializer/JsonSerializer.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/serializer/JsonSerializer.java index 78a69d8add..3bdb339948 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/serializer/JsonSerializer.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/serializer/JsonSerializer.java @@ -159,6 +159,18 @@ public String writeIndexlabels(List indexLabels) { return writeList("indexlabels", indexLabels); } + @Override + public String writeCreatedIndexLabel(IndexLabel.CreatedIndexLabel cil) { + StringBuilder builder = new StringBuilder(); + long id = cil.task() == null ? 0L : cil.task().asLong(); + return builder.append("{\"index_label\": ") + .append(this.writeIndexlabel(cil.indexLabel())) + .append(", \"task_id\": ") + .append(id) + .append("}") + .toString(); + } + @Override public String writeVertex(Vertex vertex) { return writeObject(vertex); diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/serializer/Serializer.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/serializer/Serializer.java index 352182655a..4e210e5c0e 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/serializer/Serializer.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/serializer/Serializer.java @@ -52,6 +52,8 @@ public interface Serializer { public String writeIndexlabels(List indexLabels); + public String writeCreatedIndexLabel(IndexLabel.CreatedIndexLabel cil); + public String writeVertex(Vertex v); public String writeVertices(Iterator vertices, boolean paging); diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/version/ApiVersion.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/version/ApiVersion.java index a768246da9..a3bbf34aa2 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/version/ApiVersion.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/version/ApiVersion.java @@ -73,10 +73,11 @@ public final class ApiVersion { * version 0.8: * [0.28] Issue-153: Add task-cancel API * [0.29] Issue-39: Add rays and rings RESTful API + * [0.30] Issue-32: Change index create API to return indexLabel and task id */ // The second parameter of Version.of() is for IDE running without JAR - public static final Version VERSION = Version.of(ApiVersion.class, "0.29"); + public static final Version VERSION = Version.of(ApiVersion.class, "0.30"); public static final void check() { // Check version of hugegraph-core. Firstly do check from version 0.3 diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java index e7c1d5c055..bc94858855 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Set; import com.baidu.hugegraph.HugeException; import com.baidu.hugegraph.HugeGraph; @@ -52,6 +53,7 @@ import com.baidu.hugegraph.type.define.SchemaStatus; import com.baidu.hugegraph.util.E; import com.baidu.hugegraph.util.LockUtil; +import com.google.common.collect.ImmutableSet; public class SchemaTransaction extends IndexableTransaction { @@ -217,10 +219,15 @@ public Id removeIndexLabel(Id id) { } public Id rebuildIndex(SchemaElement schema) { + return this.rebuildIndex(schema, ImmutableSet.of()); + } + + public Id rebuildIndex(SchemaElement schema, Set dependencies) { LOG.debug("SchemaTransaction rebuild index for {} with id '{}'", schema.type(), schema.id()); SchemaCallable callable = new RebuildIndexCallable(); - return asyncRun(this.graph(), schema.type(), schema.id(), callable); + return asyncRun(this.graph(), schema.type(), schema.id(), callable, + dependencies); } public void updateSchemaStatus(SchemaElement schema, SchemaStatus status) { @@ -352,10 +359,21 @@ private T deserialize(BackendEntry entry, HugeType type) { private static Id asyncRun(HugeGraph graph, HugeType schemaType, Id schemaId, SchemaCallable callable) { - String name = SchemaCallable.formatTaskName(schemaType, schemaId); + return asyncRun(graph, schemaType, schemaId, + callable, ImmutableSet.of()); + } + + private static Id asyncRun(HugeGraph graph, HugeType schemaType, + Id schemaId, SchemaCallable callable, + Set dependencies) { + String schemaName = graph.schemaTransaction() + .getSchema(schemaType, schemaId).name(); + String name = SchemaCallable.formatTaskName(schemaType, schemaId, + schemaName); JobBuilder builder = JobBuilder.of(graph).name(name) - .job(callable); + .job(callable) + .dependencies(dependencies); HugeTask task = builder.schedule(); // If SCHEMA_SYNC_DELETION is true, wait async thread done before diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/JobBuilder.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/JobBuilder.java index 79ba363661..e5b66a6051 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/JobBuilder.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/JobBuilder.java @@ -19,6 +19,8 @@ package com.baidu.hugegraph.job; +import java.util.Set; + import com.baidu.hugegraph.HugeGraph; import com.baidu.hugegraph.backend.id.Id; import com.baidu.hugegraph.task.HugeTask; @@ -33,6 +35,7 @@ public class JobBuilder { private String name; private String input; private Job job; + private Set dependencies; public static JobBuilder of(final HugeGraph graph) { return new JobBuilder<>(graph); @@ -57,6 +60,11 @@ public JobBuilder job(Job job) { return this; } + public JobBuilder dependencies(Set dependencies) { + this.dependencies = dependencies; + return this; + } + public HugeTask schedule() { E.checkArgumentNotNull(this.name, "Job name can't be null"); E.checkArgumentNotNull(this.job, "Job can't be null"); @@ -67,6 +75,11 @@ public HugeTask schedule() { if (this.input != null) { task.input(this.input); } + if (this.dependencies != null && !this.dependencies.isEmpty()) { + for (Id depend : this.dependencies) { + task.depends(depend); + } + } TaskScheduler scheduler = this.graph.taskScheduler(); scheduler.schedule(task); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/SchemaCallable.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/SchemaCallable.java index 3196ae3261..70e8398afe 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/SchemaCallable.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/SchemaCallable.java @@ -17,32 +17,34 @@ public abstract class SchemaCallable extends Job { public static final String REMOVE_SCHEMA = "remove_schema"; public static final String REBUILD_INDEX = "rebuild_index"; + public static final String CREATE_INDEX = "create_index"; private static final String SPLITOR = ":"; protected HugeType schemaType() { String name = this.task().name(); - String[] parts = name.split(SPLITOR); - E.checkState(parts.length == 2 && parts[0] != null, - "Task name should be formatted to String 'TYPE:ID', " + - "but got '%s'", name); + String[] parts = name.split(SPLITOR, 3); + E.checkState(parts.length == 3 && parts[0] != null, + "Task name should be formatted to String " + + "'TYPE:ID:NAME', but got '%s'", name); return HugeType.valueOf(parts[0]); } protected Id schemaId() { String name = this.task().name(); - String[] parts = name.split(SPLITOR); - E.checkState(parts.length == 2 && parts[1] != null, - "Task name should be formatted to String 'TYPE:ID', " + - "but got '%s'", name); + String[] parts = name.split(SPLITOR, 3); + E.checkState(parts.length == 3 && parts[1] != null, + "Task name should be formatted to String " + + "'TYPE:ID:NAME', but got '%s'", name); return IdGenerator.of(Long.valueOf(parts[1])); } - public static String formatTaskName(HugeType schemaType, Id schemaId) { - E.checkNotNull(schemaType, "schema type"); - E.checkNotNull(schemaId, "schema id"); - return String.join(SPLITOR, schemaType.toString(), schemaId.toString()); + public static String formatTaskName(HugeType type, Id id, String name) { + E.checkNotNull(type, "schema type"); + E.checkNotNull(id, "schema id"); + E.checkNotNull(name, "schema name"); + return String.join(SPLITOR, type.toString(), id.asString(), name); } protected static void removeIndexLabelFromBaseLabel(SchemaTransaction tx, diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/IndexLabel.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/IndexLabel.java index d50a0fba5b..517903d28e 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/IndexLabel.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/IndexLabel.java @@ -184,6 +184,8 @@ public static IndexLabel label(HugeGraph graph, Id id) { public interface Builder extends SchemaBuilder { + CreatedIndexLabel createWithTask(); + Id rebuild(); Builder onV(String baseValue); @@ -202,4 +204,33 @@ public interface Builder extends SchemaBuilder { Builder indexType(IndexType indexType); } + + public static class CreatedIndexLabel { + + private IndexLabel indexLabel; + private Id task; + + public CreatedIndexLabel(IndexLabel indexLabel, Id task) { + E.checkNotNull(indexLabel, "index label"); + this.indexLabel = indexLabel; + this.task = task; + } + + public void indexLabel(IndexLabel indexLabel) { + E.checkNotNull(indexLabel, "index label"); + this.indexLabel = indexLabel; + } + + public IndexLabel indexLabel() { + return this.indexLabel; + } + + public void task(Id task) { + this.task = task; + } + + public Id task() { + return this.task; + } + } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/IndexLabelBuilder.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/IndexLabelBuilder.java index a237a810ea..b97c6521f8 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/IndexLabelBuilder.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/IndexLabelBuilder.java @@ -21,14 +21,17 @@ import java.util.ArrayList; import java.util.Arrays; -import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeoutException; +import com.baidu.hugegraph.HugeException; import com.baidu.hugegraph.HugeGraph; import com.baidu.hugegraph.backend.id.Id; import com.baidu.hugegraph.backend.tx.GraphTransaction; import com.baidu.hugegraph.backend.tx.SchemaTransaction; +import com.baidu.hugegraph.config.CoreOptions; +import com.baidu.hugegraph.config.HugeConfig; import com.baidu.hugegraph.exception.ExistedException; import com.baidu.hugegraph.exception.NotSupportException; import com.baidu.hugegraph.schema.EdgeLabel; @@ -37,6 +40,7 @@ import com.baidu.hugegraph.schema.SchemaElement; import com.baidu.hugegraph.schema.SchemaLabel; import com.baidu.hugegraph.schema.VertexLabel; +import com.baidu.hugegraph.task.TaskScheduler; import com.baidu.hugegraph.type.HugeType; import com.baidu.hugegraph.type.define.Cardinality; import com.baidu.hugegraph.type.define.DataType; @@ -44,6 +48,7 @@ import com.baidu.hugegraph.type.define.SchemaStatus; import com.baidu.hugegraph.util.CollectionUtil; import com.baidu.hugegraph.util.E; +import com.baidu.hugegraph.util.InsertionOrderUtil; public class IndexLabelBuilder implements IndexLabel.Builder { @@ -83,7 +88,7 @@ public IndexLabel build() { } @Override - public IndexLabel create() { + public IndexLabel.CreatedIndexLabel createWithTask() { SchemaElement.checkName(this.name, this.transaction.graph().configuration()); IndexLabel indexLabel = this.transaction.getIndexLabel(this.name); @@ -91,7 +96,7 @@ public IndexLabel create() { if (this.checkExist) { throw new ExistedException("index label", this.name); } - return indexLabel; + return new IndexLabel.CreatedIndexLabel(indexLabel, null); } SchemaLabel schemaLabel = this.loadElement(); @@ -105,7 +110,7 @@ public IndexLabel create() { // Delete index label which is prefix of the new index label // TODO: use event to replace direct call - this.removeSubIndex(schemaLabel); + Set removeTasks = this.removeSubIndex(schemaLabel); // Create index label indexLabel = this.build(); @@ -113,8 +118,29 @@ public IndexLabel create() { this.transaction.addIndexLabel(schemaLabel, indexLabel); // TODO: use event to replace direct call - this.rebuildIndexIfNeeded(schemaLabel, indexLabel); + Id rebuildTask = this.rebuildIndexIfNeeded(schemaLabel, indexLabel, + removeTasks); + return new IndexLabel.CreatedIndexLabel(indexLabel, rebuildTask); + } + + @Override + public IndexLabel create() { + IndexLabel.CreatedIndexLabel createdIndexLabel = this.createWithTask(); + Id task = createdIndexLabel.task(); + IndexLabel indexLabel = createdIndexLabel.indexLabel(); + if (task == null) { + E.checkNotNull(indexLabel, "index label"); + return indexLabel; + } + TaskScheduler scheduler = this.transaction.graph().taskScheduler(); + try { + // TODO: read timeout from config file + scheduler.waitUntilTaskCompleted(task, 30L); + } catch (TimeoutException e) { + throw new HugeException( + "Failed to wait index-creating task completed", e); + } return indexLabel; } @@ -316,8 +342,8 @@ private void checkRepeatIndex(SchemaLabel schemaLabel) { } } - private void removeSubIndex(SchemaLabel schemaLabel) { - HashSet overrideIndexLabelIds = new HashSet<>(); + private Set removeSubIndex(SchemaLabel schemaLabel) { + Set overrideIndexLabelIds = InsertionOrderUtil.newSet(); for (Id id : schemaLabel.indexLabels()) { IndexLabel old = this.transaction.getIndexLabel(id); if (this.indexType != old.indexType()) { @@ -334,14 +360,19 @@ private void removeSubIndex(SchemaLabel schemaLabel) { overrideIndexLabelIds.add(id); } } + Set tasks = InsertionOrderUtil.newSet(); for (Id id : overrideIndexLabelIds) { schemaLabel.removeIndexLabel(id); - this.transaction.removeIndexLabel(id); + Id task = this.transaction.removeIndexLabel(id); + E.checkNotNull(task, "remove sub index label task"); + tasks.add(task); } + return tasks; } - private void rebuildIndexIfNeeded(SchemaLabel schemaLabel, - IndexLabel indexLabel) { + private Id rebuildIndexIfNeeded(SchemaLabel schemaLabel, + IndexLabel indexLabel, + Set dependencies) { GraphTransaction tx = this.transaction.graph().graphTransaction(); boolean needRebuild; if (this.baseType == HugeType.VERTEX_LABEL) { @@ -352,12 +383,14 @@ private void rebuildIndexIfNeeded(SchemaLabel schemaLabel, needRebuild = tx.queryEdgesByLabel((EdgeLabel) schemaLabel, 1L) .hasNext(); } + Id task = null; if (needRebuild) { // rebuildIndex() will set status to CREATED after REBUILDING - this.transaction.rebuildIndex(indexLabel); + task = this.transaction.rebuildIndex(indexLabel, dependencies); } else { this.transaction.updateSchemaStatus(indexLabel, SchemaStatus.CREATED); } + return task; } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/task/HugeTask.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/task/HugeTask.java index a20f44326a..d5a7319061 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/task/HugeTask.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/task/HugeTask.java @@ -26,7 +26,10 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.FutureTask; +import java.util.stream.Collector; +import java.util.stream.Collectors; import org.apache.tinkerpop.gremlin.structure.Graph.Hidden; import org.apache.tinkerpop.gremlin.structure.T; @@ -35,8 +38,10 @@ import org.slf4j.Logger; import com.baidu.hugegraph.backend.id.Id; +import com.baidu.hugegraph.backend.id.IdGenerator; import com.baidu.hugegraph.type.define.SerialEnum; import com.baidu.hugegraph.util.E; +import com.baidu.hugegraph.util.InsertionOrderUtil; import com.baidu.hugegraph.util.Log; public class HugeTask extends FutureTask { @@ -49,7 +54,7 @@ public class HugeTask extends FutureTask { private String name; private final Id id; private final Id parent; - private List children; + private Set dependencies; private String description; private Date create; private volatile TaskStatus status; @@ -76,7 +81,7 @@ public HugeTask(Id id, Id parent, TaskCallable callable) { this.name = null; this.id = id; this.parent = parent; - this.children = null; + this.dependencies = null; this.description = null; this.status = TaskStatus.NEW; this.progress = 0; @@ -95,15 +100,17 @@ public Id parent() { return this.parent; } - public List children() { - return Collections.unmodifiableList(this.children); + public Set dependencies() { + return Collections.unmodifiableSet(this.dependencies); } - public void child(Id id) { - if (this.children == null) { - this.children = new ArrayList<>(); + public void depends(Id id) { + E.checkState(this.status == TaskStatus.NEW, + "Can't add dependency in status '%s'", this.status); + if (this.dependencies == null) { + this.dependencies = InsertionOrderUtil.newSet(); } - this.children.add(id); + this.dependencies.add(id); } public TaskStatus status() { @@ -190,8 +197,10 @@ public String toString() { @Override public void run() { assert this.status.code() < TaskStatus.RUNNING.code(); - this.status(TaskStatus.RUNNING); - super.run(); + if (this.checkDependenciesSuccess()) { + this.status(TaskStatus.RUNNING); + super.run(); + } } @Override @@ -241,6 +250,35 @@ protected void setException(Throwable e) { super.setException(e); } + protected boolean checkDependenciesSuccess() { + if (this.dependencies == null || this.dependencies.isEmpty()) { + return true; + } + for (Id dependency : this.dependencies) { + HugeTask task = this.callable.scheduler().task(dependency); + if (!task.completed()) { + // Dependent task not completed, re-schedule self + this.callable.scheduler().schedule(this); + return false; + } else if (task.status() == TaskStatus.CANCELLED) { + this.status(TaskStatus.CANCELLED); + this.result = String.format( + "Cancelled due to dependent task '%s' cancelled", + dependency); + this.done(); + return false; + } else if (task.status() == TaskStatus.FAILED) { + this.status(TaskStatus.FAILED); + this.result = String.format( + "Failed due to dependent task '%s' failed", + dependency); + this.done(); + return false; + } + } + return true; + } + protected TaskCallable callable() { return this.callable; } @@ -258,8 +296,8 @@ protected void property(String key, Object value) { case P.NAME: this.name = (String) value; break; - case P.DESCRIPTION: - this.description = (String) value; + case P.CALLABLE: + // pass break; case P.STATUS: this.status(SerialEnum.fromCode(TaskStatus.class, (byte) value)); @@ -270,11 +308,20 @@ protected void property(String key, Object value) { case P.CREATE: this.create = (Date) value; break; + case P.RETRIES: + this.retries = (int) value; + break; + case P.DESCRIPTION: + this.description = (String) value; + break; case P.UPDATE: this.update = (Date) value; break; - case P.RETRIES: - this.retries = (int) value; + case P.DEPENDENCIES: + @SuppressWarnings("unchecked") + Set values = (Set) value; + this.dependencies = values.stream().map(IdGenerator::of) + .collect(toOrderSet()); break; case P.INPUT: this.input = (String) value; @@ -282,9 +329,6 @@ protected void property(String key, Object value) { case P.RESULT: this.result = (String) value; break; - case P.CALLABLE: - // pass - break; default: throw new AssertionError("Unsupported key: " + key); } @@ -333,6 +377,12 @@ protected Object[] asArray() { list.add(this.update); } + if (this.dependencies != null) { + list.add(P.DEPENDENCIES); + list.add(this.dependencies.stream().map(Id::asLong) + .collect(toOrderSet())); + } + if (this.input != null) { list.add(P.INPUT); list.add(this.input); @@ -372,6 +422,11 @@ public Map asMap(boolean withDetails) { if (this.update != null) { map.put(Hidden.unHide(P.UPDATE), this.update); } + if (this.dependencies != null) { + Set value = this.dependencies.stream().map(Id::asLong) + .collect(toOrderSet()); + map.put(Hidden.unHide(P.DEPENDENCIES), value); + } if (withDetails && this.input != null) { map.put(Hidden.unHide(P.INPUT), this.input); } @@ -400,6 +455,10 @@ public static HugeTask fromVertex(Vertex vertex) { return task; } + private static Collector> toOrderSet() { + return Collectors.toCollection(InsertionOrderUtil::newSet); + } + public static final class P { public static final String TASK = Hidden.hide("task"); @@ -418,6 +477,7 @@ public static final class P { public static final String RETRIES = "~task_retries"; public static final String INPUT = "~task_input"; public static final String RESULT = "~task_result"; + public static final String DEPENDENCIES = "~task_dependencies"; //public static final String PARENT = hide("parent"); //public static final String CHILDREN = hide("children"); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/task/TaskScheduler.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/task/TaskScheduler.java index bf099fbea8..2be5c46165 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/task/TaskScheduler.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/task/TaskScheduler.java @@ -41,6 +41,7 @@ import com.baidu.hugegraph.backend.store.BackendStore; import com.baidu.hugegraph.backend.tx.GraphTransaction; import com.baidu.hugegraph.exception.NotFoundException; +import com.baidu.hugegraph.iterator.ExtendableIterator; import com.baidu.hugegraph.iterator.MapperIterator; import com.baidu.hugegraph.schema.IndexLabel; import com.baidu.hugegraph.schema.PropertyKey; @@ -203,6 +204,29 @@ public HugeTask task(Id id) { return this.findTask(id); } + public Iterator> tasks(List ids) { + List taskIdsNotInMem = new ArrayList<>(); + List> taskInMem = new ArrayList<>(); + for (Id id : ids) { + @SuppressWarnings("unchecked") + HugeTask task = (HugeTask) this.tasks.get(id); + if (task != null) { + taskInMem.add(task); + } else { + taskIdsNotInMem.add(id); + } + } + @SuppressWarnings("unchecked") + ExtendableIterator> iterator; + if (taskInMem.isEmpty()) { + iterator = new ExtendableIterator<>(); + } else { + iterator = new ExtendableIterator<>(taskInMem.iterator()); + } + iterator.extend(this.findTasks(taskIdsNotInMem)); + return iterator; + } + public HugeTask findTask(Id id) { HugeTask result = this.submit(() -> { HugeTask task = null; @@ -219,6 +243,14 @@ public HugeTask findTask(Id id) { return result; } + public Iterator> findTasks(List ids) { + Object[] idArray = ids.toArray(new Id[ids.size()]); + return this.submit(() -> { + Iterator vertices = this.tx().queryVertices(idArray); + return new MapperIterator<>(vertices, HugeTask::fromVertex); + }); + } + public Iterator> findAllTask(long limit) { return this.queryTask(ImmutableMap.of(), limit); } @@ -352,7 +384,7 @@ protected void initSchema() { .properties(properties) .useCustomizeNumberId() .nullableKeys(P.DESCRIPTION, P.UPDATE, - P.INPUT, P.RESULT) + P.INPUT, P.RESULT, P.DEPENDENCIES) .enableLabelIndex(true) .build(); graph.schemaTransaction().addVertexLabel(label); @@ -375,6 +407,8 @@ private String[] initProperties() { props.add(createPropertyKey(P.RETRIES, DataType.INT)); props.add(createPropertyKey(P.INPUT)); props.add(createPropertyKey(P.RESULT)); + props.add(createPropertyKey(P.DEPENDENCIES, DataType.LONG, + Cardinality.SET)); return props.toArray(new String[0]); }