Skip to content

Commit

Permalink
make create index label return async job if exists
Browse files Browse the repository at this point in the history
fixed #32

Change-Id: I8e23bd0dfcbe65d8f580f957143fcc1802c879f6
  • Loading branch information
zhoney authored and zhoney committed Nov 16, 2018
1 parent 44277ab commit 30fbc72
Show file tree
Hide file tree
Showing 13 changed files with 280 additions and 51 deletions.
2 changes: 1 addition & 1 deletion hugegraph-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@
</addDefaultSpecificationEntries>
</manifest>
<manifestEntries>
<Implementation-Version>0.29.0.0</Implementation-Version>
<Implementation-Version>0.30.0.0</Implementation-Version>
</manifestEntries>
</archive>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import javax.inject.Singleton;
import javax.ws.rs.BadRequestException;
Expand All @@ -42,6 +43,7 @@

import com.baidu.hugegraph.api.API;
import com.baidu.hugegraph.api.filter.StatusFilter.Status;
import com.baidu.hugegraph.backend.id.Id;
import com.baidu.hugegraph.backend.id.IdGenerator;
import com.baidu.hugegraph.core.GraphManager;
import com.baidu.hugegraph.server.RestServer;
Expand All @@ -58,6 +60,7 @@
public class TaskAPI extends API {

private static final Logger LOG = Log.logger(RestServer.class);
private static final long NO_LIMIT = -1L;

public static final String ACTION_CANCEL = "cancel";

Expand All @@ -67,6 +70,7 @@ public class TaskAPI extends API {
public Map<String, List<Object>> list(@Context GraphManager manager,
@PathParam("graph") String graph,
@QueryParam("status") String status,
@QueryParam("ids") List<Long> ids,
@QueryParam("limit")
@DefaultValue("100") long limit) {
LOG.debug("Graph [{}] list tasks with status {}, limit {}",
Expand All @@ -75,16 +79,35 @@ public Map<String, List<Object>> list(@Context GraphManager manager,
TaskScheduler scheduler = graph(manager, graph).taskScheduler();

Iterator<HugeTask<Object>> itor;
if (status == null) {
itor = scheduler.findAllTask(limit);

if (!ids.isEmpty()) {
LOG.debug("Graph [{}] list tasks with ids {}, limit {}",
graph, ids, limit);
E.checkArgument(status == null,
"Not support status when query task by ids, " +
"but got status='%s'", status);
// Set limit to NO_LIMIT to ignore limit when query task by ids
limit = NO_LIMIT;
List<Id> idList = ids.stream().map(IdGenerator::of)
.collect(Collectors.toList());
itor = scheduler.tasks(idList);
} else {
itor = scheduler.findTask(parseStatus(status), limit);
LOG.debug("Graph [{}] list tasks with status {}, limit {}",
graph, status, limit);
if (status == null) {
itor = scheduler.findAllTask(limit);
} else {
itor = scheduler.findTask(parseStatus(status), limit);
}
}

List<Object> tasks = new ArrayList<>();
while (itor.hasNext()) {
tasks.add(itor.next().asMap(false));
}
if (limit != NO_LIMIT && tasks.size() > limit) {
tasks = tasks.subList(0, (int) limit);
}
return ImmutableMap.of("tasks", tasks);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ public String create(@Context GraphManager manager,

HugeGraph g = graph(manager, graph);
IndexLabel.Builder builder = jsonIndexLabel.convert2Builder(g);
IndexLabel indexLabel = builder.create();
return manager.serializer(g).writeIndexlabel(indexLabel);
IndexLabel.CreatedIndexLabel il = builder.createWithTask();
return manager.serializer(g).writeCreatedIndexLabel(il);
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,18 @@ public String writeIndexlabels(List<IndexLabel> indexLabels) {
return writeList("indexlabels", indexLabels);
}

@Override
public String writeCreatedIndexLabel(IndexLabel.CreatedIndexLabel cil) {
StringBuilder builder = new StringBuilder();
long id = cil.task() == null ? 0L : cil.task().asLong();
return builder.append("{\"index_label\": ")
.append(this.writeIndexlabel(cil.indexLabel()))
.append(", \"task_id\": ")
.append(id)
.append("}")
.toString();
}

@Override
public String writeVertex(Vertex vertex) {
return writeObject(vertex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public interface Serializer {

public String writeIndexlabels(List<IndexLabel> indexLabels);

public String writeCreatedIndexLabel(IndexLabel.CreatedIndexLabel cil);

public String writeVertex(Vertex v);

public String writeVertices(Iterator<Vertex> vertices, boolean paging);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,11 @@ public final class ApiVersion {
* version 0.8:
* [0.28] Issue-153: Add task-cancel API
* [0.29] Issue-39: Add rays and rings RESTful API
* [0.30] Issue-32: Change index create API to return indexLabel and task id
*/

// The second parameter of Version.of() is for IDE running without JAR
public static final Version VERSION = Version.of(ApiVersion.class, "0.29");
public static final Version VERSION = Version.of(ApiVersion.class, "0.30");

public static final void check() {
// Check version of hugegraph-core. Firstly do check from version 0.3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

import com.baidu.hugegraph.HugeException;
import com.baidu.hugegraph.HugeGraph;
Expand Down Expand Up @@ -52,6 +53,7 @@
import com.baidu.hugegraph.type.define.SchemaStatus;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.LockUtil;
import com.google.common.collect.ImmutableSet;

public class SchemaTransaction extends IndexableTransaction {

Expand Down Expand Up @@ -217,10 +219,15 @@ public Id removeIndexLabel(Id id) {
}

public Id rebuildIndex(SchemaElement schema) {
return this.rebuildIndex(schema, ImmutableSet.of());
}

public Id rebuildIndex(SchemaElement schema, Set<Id> dependencies) {
LOG.debug("SchemaTransaction rebuild index for {} with id '{}'",
schema.type(), schema.id());
SchemaCallable callable = new RebuildIndexCallable();
return asyncRun(this.graph(), schema.type(), schema.id(), callable);
return asyncRun(this.graph(), schema.type(), schema.id(), callable,
dependencies);
}

public void updateSchemaStatus(SchemaElement schema, SchemaStatus status) {
Expand Down Expand Up @@ -352,10 +359,21 @@ private <T> T deserialize(BackendEntry entry, HugeType type) {

private static Id asyncRun(HugeGraph graph, HugeType schemaType,
Id schemaId, SchemaCallable callable) {
String name = SchemaCallable.formatTaskName(schemaType, schemaId);
return asyncRun(graph, schemaType, schemaId,
callable, ImmutableSet.of());
}

private static Id asyncRun(HugeGraph graph, HugeType schemaType,
Id schemaId, SchemaCallable callable,
Set<Id> dependencies) {
String schemaName = graph.schemaTransaction()
.getSchema(schemaType, schemaId).name();
String name = SchemaCallable.formatTaskName(schemaType, schemaId,
schemaName);

JobBuilder<Object> builder = JobBuilder.of(graph).name(name)
.job(callable);
.job(callable)
.dependencies(dependencies);
HugeTask<?> task = builder.schedule();

// If SCHEMA_SYNC_DELETION is true, wait async thread done before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package com.baidu.hugegraph.job;

import java.util.Set;

import com.baidu.hugegraph.HugeGraph;
import com.baidu.hugegraph.backend.id.Id;
import com.baidu.hugegraph.task.HugeTask;
Expand All @@ -33,6 +35,7 @@ public class JobBuilder<T> {
private String name;
private String input;
private Job<T> job;
private Set<Id> dependencies;

public static <T> JobBuilder<T> of(final HugeGraph graph) {
return new JobBuilder<>(graph);
Expand All @@ -57,6 +60,11 @@ public JobBuilder<T> job(Job<T> job) {
return this;
}

public JobBuilder<T> dependencies(Set<Id> dependencies) {
this.dependencies = dependencies;
return this;
}

public HugeTask<T> schedule() {
E.checkArgumentNotNull(this.name, "Job name can't be null");
E.checkArgumentNotNull(this.job, "Job can't be null");
Expand All @@ -67,6 +75,11 @@ public HugeTask<T> schedule() {
if (this.input != null) {
task.input(this.input);
}
if (this.dependencies != null && !this.dependencies.isEmpty()) {
for (Id depend : this.dependencies) {
task.depends(depend);
}
}

TaskScheduler scheduler = this.graph.taskScheduler();
scheduler.schedule(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,34 @@ public abstract class SchemaCallable extends Job<Object> {

public static final String REMOVE_SCHEMA = "remove_schema";
public static final String REBUILD_INDEX = "rebuild_index";
public static final String CREATE_INDEX = "create_index";

private static final String SPLITOR = ":";

protected HugeType schemaType() {
String name = this.task().name();
String[] parts = name.split(SPLITOR);
E.checkState(parts.length == 2 && parts[0] != null,
"Task name should be formatted to String 'TYPE:ID', " +
"but got '%s'", name);
String[] parts = name.split(SPLITOR, 3);
E.checkState(parts.length == 3 && parts[0] != null,
"Task name should be formatted to String " +
"'TYPE:ID:NAME', but got '%s'", name);

return HugeType.valueOf(parts[0]);
}

protected Id schemaId() {
String name = this.task().name();
String[] parts = name.split(SPLITOR);
E.checkState(parts.length == 2 && parts[1] != null,
"Task name should be formatted to String 'TYPE:ID', " +
"but got '%s'", name);
String[] parts = name.split(SPLITOR, 3);
E.checkState(parts.length == 3 && parts[1] != null,
"Task name should be formatted to String " +
"'TYPE:ID:NAME', but got '%s'", name);
return IdGenerator.of(Long.valueOf(parts[1]));
}

public static String formatTaskName(HugeType schemaType, Id schemaId) {
E.checkNotNull(schemaType, "schema type");
E.checkNotNull(schemaId, "schema id");
return String.join(SPLITOR, schemaType.toString(), schemaId.toString());
public static String formatTaskName(HugeType type, Id id, String name) {
E.checkNotNull(type, "schema type");
E.checkNotNull(id, "schema id");
E.checkNotNull(name, "schema name");
return String.join(SPLITOR, type.toString(), id.asString(), name);
}

protected static void removeIndexLabelFromBaseLabel(SchemaTransaction tx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ public static IndexLabel label(HugeGraph graph, Id id) {

public interface Builder extends SchemaBuilder<IndexLabel> {

CreatedIndexLabel createWithTask();

Id rebuild();

Builder onV(String baseValue);
Expand All @@ -202,4 +204,33 @@ public interface Builder extends SchemaBuilder<IndexLabel> {

Builder indexType(IndexType indexType);
}

public static class CreatedIndexLabel {

private IndexLabel indexLabel;
private Id task;

public CreatedIndexLabel(IndexLabel indexLabel, Id task) {
E.checkNotNull(indexLabel, "index label");
this.indexLabel = indexLabel;
this.task = task;
}

public void indexLabel(IndexLabel indexLabel) {
E.checkNotNull(indexLabel, "index label");
this.indexLabel = indexLabel;
}

public IndexLabel indexLabel() {
return this.indexLabel;
}

public void task(Id task) {
this.task = task;
}

public Id task() {
return this.task;
}
}
}
Loading

0 comments on commit 30fbc72

Please sign in to comment.