diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/api/job/GremlinAPI.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/job/GremlinAPI.java index 45ec879118..48133ef39c 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/api/job/GremlinAPI.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/job/GremlinAPI.java @@ -75,10 +75,11 @@ public Map post(@Context GraphManager manager, HugeGraph g = graph(manager, graph); request.aliase(graph, "graph"); - JobBuilder builder = JobBuilder.of(g).name(request.name()) - .input(request.toJson()) - .job(new GremlinJob()); - return ImmutableMap.of("task_id", builder.schedule()); + JobBuilder builder = JobBuilder.of(g); + builder.name(request.name()) + .input(request.toJson()) + .job(new GremlinJob()); + return ImmutableMap.of("task_id", builder.schedule().id()); } public static class GremlinJob extends Job { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java index cefe6ca613..6511e37220 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java @@ -550,6 +550,12 @@ public boolean closed() { return this.refs.get() == 0; } + public void commitIfGtSize(int size) { + // Only committing graph transaction data if reaching batch size + // is OK, bacause schema transaction is auto committed. + this.graphTransaction().commitIfGtSize(size); + } + @Override public void commit() { try { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/Transaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/Transaction.java index 5b9626a34c..b3c066b1ec 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/Transaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/Transaction.java @@ -23,6 +23,8 @@ public interface Transaction { public void commit() throws BackendException; + public void commitIfGtSize(int size) throws BackendException; + public void rollback() throws BackendException; public boolean autoCommit(); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CacheManager.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CacheManager.java index 87aa3ea81e..a16ed69a35 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CacheManager.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CacheManager.java @@ -87,11 +87,9 @@ public Cache cache(String name) { } public Cache cache(String name, int capacity) { - Cache cache = this.caches.get(name); - if (cache == null) { - cache = new RamCache(capacity); - this.caches.put(name, cache); + if (!this.caches.containsKey(name)) { + this.caches.putIfAbsent(name, new RamCache(capacity)); } - return cache; + return this.caches.get(name); } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/AbstractTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/AbstractTransaction.java index d85612ec9c..0264fd438d 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/AbstractTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/AbstractTransaction.java @@ -170,6 +170,12 @@ public void commit() throws BackendException { } } + public void commitIfGtSize(int size) throws BackendException { + if (this.mutationSize() >= size) { + this.commit(); + } + } + @Watched(prefix = "tx") @Override public void rollback() throws BackendException { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphIndexTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphIndexTransaction.java index 83fc527d4c..cb7a57b912 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphIndexTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphIndexTransaction.java @@ -76,6 +76,7 @@ public class GraphIndexTransaction extends AbstractTransaction { private static final String INDEX_EMPTY_SYM = "\u0000"; private static final Query EMPTY_QUERY = new ConditionQuery(null); + private static final int REBUILD_COMMIT_BATCH = 1000; private final Analyzer textAnalyzer; @@ -1100,30 +1101,13 @@ public void rebuildIndex(HugeType type, Id label, * They have different id lead to it can't compare and optimize */ this.commit(); - if (type == HugeType.VERTEX_LABEL) { - ConditionQuery query = new ConditionQuery(HugeType.VERTEX); - query.eq(HugeKeys.LABEL, label); - query.capacity(Query.NO_CAPACITY); - for (Iterator itor = graphTx.queryVertices(query); - itor.hasNext();) { - HugeVertex vertex = (HugeVertex) itor.next(); - for (Id id : indexLabelIds) { - this.updateIndex(id, vertex, false); - } - } + this.rebuildIndex(HugeType.VERTEX, label, indexLabelIds, + graphTx::queryVerticesFromBackend); } else { assert type == HugeType.EDGE_LABEL; - ConditionQuery query = new ConditionQuery(HugeType.EDGE); - query.eq(HugeKeys.LABEL, label); - query.capacity(Query.NO_CAPACITY); - for (Iterator itor = graphTx.queryEdges(query); - itor.hasNext();) { - HugeEdge edge = (HugeEdge) itor.next(); - for (Id id : indexLabelIds) { - this.updateIndex(id, edge, false); - } - } + this.rebuildIndex(HugeType.EDGE, label, indexLabelIds, + graphTx::queryEdgesFromBackend); } this.commit(); @@ -1136,6 +1120,34 @@ public void rebuildIndex(HugeType type, Id label, } } + private void rebuildIndex(HugeType type, Id label, + Collection indexLabelIds, + Function> fetcher) { + assert type == HugeType.VERTEX || type == HugeType.EDGE; + ConditionQuery query = new ConditionQuery(type); + query.eq(HugeKeys.LABEL, label); + query.limit(Query.DEFAULT_CAPACITY); + + int pass = 0; + int counter; + do { + counter = 0; + query.offset(pass++ * Query.DEFAULT_CAPACITY); + Iterator itor = fetcher.apply(query); + while (itor.hasNext()) { + HugeElement element = (HugeElement) itor.next(); + for (Id id : indexLabelIds) { + this.updateIndex(id, element, false); + // Commit per small batch to avoid too much data + // in single commit, especially for Cassandra backend. + this.commitIfGtSize(REBUILD_COMMIT_BATCH); + } + ++counter; + } + assert counter <= Query.DEFAULT_CAPACITY; + } while (counter == Query.DEFAULT_CAPACITY); + } + private static class MatchedLabel { private SchemaLabel schemaLabel; diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphTransaction.java index 71987baefe..2bf63c68f4 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphTransaction.java @@ -446,7 +446,7 @@ public Iterator queryVertices(Query query) { return r; } - private Iterator queryVerticesFromBackend(Query query) { + protected Iterator queryVerticesFromBackend(Query query) { assert query.resultType().isVertex(); Iterator entries = this.query(query); @@ -562,7 +562,7 @@ public Iterator queryEdges(Query query) { return r; } - private Iterator queryEdgesFromBackend(Query query) { + protected Iterator queryEdgesFromBackend(Query query) { assert query.resultType().isEdge(); Iterator entries = this.query(query); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java index 1245654ac5..e7c1d5c055 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import java.util.Set; import com.baidu.hugegraph.HugeException; import com.baidu.hugegraph.HugeGraph; @@ -33,7 +32,6 @@ import com.baidu.hugegraph.backend.store.BackendEntry; import com.baidu.hugegraph.backend.store.BackendStore; import com.baidu.hugegraph.config.CoreOptions; -import com.baidu.hugegraph.config.HugeConfig; import com.baidu.hugegraph.exception.NotAllowException; import com.baidu.hugegraph.job.JobBuilder; import com.baidu.hugegraph.job.schema.EdgeLabelRemoveCallable; @@ -48,12 +46,12 @@ import com.baidu.hugegraph.schema.SchemaElement; import com.baidu.hugegraph.schema.SchemaLabel; import com.baidu.hugegraph.schema.VertexLabel; +import com.baidu.hugegraph.task.HugeTask; import com.baidu.hugegraph.type.HugeType; import com.baidu.hugegraph.type.define.HugeKeys; import com.baidu.hugegraph.type.define.SchemaStatus; import com.baidu.hugegraph.util.E; import com.baidu.hugegraph.util.LockUtil; -import com.google.common.collect.ImmutableSet; public class SchemaTransaction extends IndexableTransaction { @@ -162,58 +160,9 @@ public VertexLabel getVertexLabel(String name) { } public Id removeVertexLabel(Id id) { - HugeConfig config = this.graph().configuration(); - if (config.get(CoreOptions.SCHEMA_SYNC_DELETION)) { - removeVertexLabelSync(this.graph(), id); - return null; - } else { - SchemaCallable callable = new VertexLabelRemoveCallable(); - return asyncRun(this.graph(), HugeType.VERTEX_LABEL, id, callable); - } - } - - public static void removeVertexLabelSync(HugeGraph graph, Id id) { LOG.debug("SchemaTransaction remove vertex label '{}'", id); - GraphTransaction graphTx = graph.graphTransaction(); - SchemaTransaction schemaTx = graph.schemaTransaction(); - VertexLabel vertexLabel = schemaTx.getVertexLabel(id); - // If the vertex label does not exist, return directly - if (vertexLabel == null) { - return; - } - - List edgeLabels = schemaTx.getEdgeLabels(); - for (EdgeLabel edgeLabel : edgeLabels) { - if (edgeLabel.linkWithLabel(id)) { - throw new HugeException( - "Not allowed to remove vertex label '%s' " + - "because the edge label '%s' still link with it", - vertexLabel.name(), edgeLabel.name()); - } - } - - /* - * Copy index label ids because removeIndexLabel will mutate - * vertexLabel.indexLabels() - */ - Set indexLabelIds = ImmutableSet.copyOf(vertexLabel.indexLabels()); - LockUtil.Locks locks = new LockUtil.Locks(); - try { - locks.lockWrites(LockUtil.VERTEX_LABEL_DELETE, id); - schemaTx.updateSchemaStatus(vertexLabel, SchemaStatus.DELETING); - for (Id indexLabelId : indexLabelIds) { - removeIndexLabelSync(graph, indexLabelId); - } - - // TODO: use event to replace direct call - // Deleting a vertex will automatically deletes the held edge - graphTx.removeVertices(vertexLabel); - schemaTx.removeSchema(vertexLabel); - // Should commit changes to backend store before release delete lock - graph.tx().commit(); - } finally { - locks.unlock(); - } + SchemaCallable callable = new VertexLabelRemoveCallable(); + return asyncRun(this.graph(), HugeType.VERTEX_LABEL, id, callable); } public void addEdgeLabel(EdgeLabel edgeLabel) { @@ -233,43 +182,9 @@ public EdgeLabel getEdgeLabel(String name) { } public Id removeEdgeLabel(Id id) { - HugeConfig config = this.graph().configuration(); - if (config.get(CoreOptions.SCHEMA_SYNC_DELETION)) { - removeEdgeLabelSync(this.graph(), id); - return null; - } else { - SchemaCallable callable = new EdgeLabelRemoveCallable(); - return asyncRun(this.graph(), HugeType.EDGE_LABEL, id, callable); - } - } - - public static void removeEdgeLabelSync(HugeGraph graph, Id id) { LOG.debug("SchemaTransaction remove edge label '{}'", id); - GraphTransaction graphTx = graph.graphTransaction(); - SchemaTransaction schemaTx = graph.schemaTransaction(); - EdgeLabel edgeLabel = schemaTx.getEdgeLabel(id); - // If the edge label does not exist, return directly - if (edgeLabel == null) { - return; - } - // TODO: use event to replace direct call - // Remove index related data(include schema) of this edge label - Set indexIds = ImmutableSet.copyOf(edgeLabel.indexLabels()); - LockUtil.Locks locks = new LockUtil.Locks(); - try { - locks.lockWrites(LockUtil.EDGE_LABEL_DELETE, id); - schemaTx.updateSchemaStatus(edgeLabel, SchemaStatus.DELETING); - for (Id indexId : indexIds) { - removeIndexLabelSync(graph, indexId); - } - // Remove all edges which has matched label - graphTx.removeEdges(edgeLabel); - schemaTx.removeSchema(edgeLabel); - // Should commit changes to backend store before release delete lock - graph.tx().commit(); - } finally { - locks.unlock(); - } + SchemaCallable callable = new EdgeLabelRemoveCallable(); + return asyncRun(this.graph(), HugeType.EDGE_LABEL, id, callable); } public void addIndexLabel(SchemaLabel schemaLabel, IndexLabel indexLabel) { @@ -296,55 +211,16 @@ public IndexLabel getIndexLabel(String name) { } public Id removeIndexLabel(Id id) { - HugeConfig config = this.graph().configuration(); - if (config.get(CoreOptions.SCHEMA_SYNC_DELETION)) { - removeIndexLabelSync(this.graph(), id); - return null; - } else { - SchemaCallable callable = new IndexLabelRemoveCallable(); - return asyncRun(this.graph(), HugeType.INDEX_LABEL, id, callable); - } - } - - public static void removeIndexLabelSync(HugeGraph graph, Id id) { LOG.debug("SchemaTransaction remove index label '{}'", id); - GraphTransaction graphTx = graph.graphTransaction(); - SchemaTransaction schemaTx = graph.schemaTransaction(); - IndexLabel indexLabel = schemaTx.getIndexLabel(id); - // If the index label does not exist, return directly - if (indexLabel == null) { - return; - } - LockUtil.Locks locks = new LockUtil.Locks(); - try { - locks.lockWrites(LockUtil.INDEX_LABEL_DELETE, id); - // TODO add update lock - // Set index label to "deleting" status - schemaTx.updateSchemaStatus(indexLabel, SchemaStatus.DELETING); - // Remove index data - // TODO: use event to replace direct call - graphTx.removeIndex(indexLabel); - // Remove label from indexLabels of vertex or edge label - schemaTx.removeIndexLabelFromBaseLabel(indexLabel); - schemaTx.removeSchema(indexLabel); - // Should commit changes to backend store before release delete lock - graph.tx().commit(); - } finally { - locks.unlock(); - } + SchemaCallable callable = new IndexLabelRemoveCallable(); + return asyncRun(this.graph(), HugeType.INDEX_LABEL, id, callable); } public Id rebuildIndex(SchemaElement schema) { LOG.debug("SchemaTransaction rebuild index for {} with id '{}'", schema.type(), schema.id()); - HugeGraph graph = this.graph(); - if (graph.configuration().get(CoreOptions.SCHEMA_SYNC_DELETION)) { - graph.graphTransaction().rebuildIndex(schema); - return null; - } else { - SchemaCallable callable = new RebuildIndexCallable(); - return asyncRun(this.graph(), schema.type(), schema.id(), callable); - } + SchemaCallable callable = new RebuildIndexCallable(); + return asyncRun(this.graph(), schema.type(), schema.id(), callable); } public void updateSchemaStatus(SchemaElement schema, SchemaStatus status) { @@ -436,20 +312,6 @@ protected void removeSchema(SchemaElement schema) { } } - protected void removeIndexLabelFromBaseLabel(IndexLabel label) { - HugeType baseType = label.baseType(); - Id baseValue = label.baseValue(); - SchemaLabel schemaLabel; - if (baseType == HugeType.VERTEX_LABEL) { - schemaLabel = this.getVertexLabel(baseValue); - } else { - assert baseType == HugeType.EDGE_LABEL; - schemaLabel = this.getEdgeLabel(baseValue); - } - schemaLabel.removeIndexLabel(label.id()); - this.updateSchema(schemaLabel); - } - public Id getNextId(HugeType type) { LOG.debug("SchemaTransaction get next id for {}", type); return this.store().nextId(type); @@ -491,7 +353,21 @@ private T deserialize(BackendEntry entry, HugeType type) { private static Id asyncRun(HugeGraph graph, HugeType schemaType, Id schemaId, SchemaCallable callable) { String name = SchemaCallable.formatTaskName(schemaType, schemaId); - JobBuilder builder = JobBuilder.of(graph).name(name).job(callable); - return builder.schedule(); + + JobBuilder builder = JobBuilder.of(graph).name(name) + .job(callable); + HugeTask task = builder.schedule(); + + // If SCHEMA_SYNC_DELETION is true, wait async thread done before + // continue. This is used when running tests. + if (graph.configuration().get(CoreOptions.SCHEMA_SYNC_DELETION)) { + try { + task.get(); + assert task.completed(); + } catch (Exception e) { + throw new HugeException("Async task failed", e); + } + } + return task.id(); } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/JobBuilder.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/JobBuilder.java index 67c914bf7f..bcbb6f9917 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/JobBuilder.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/JobBuilder.java @@ -26,42 +26,42 @@ import com.baidu.hugegraph.type.HugeType; import com.baidu.hugegraph.util.E; -public class JobBuilder { +public class JobBuilder { private final HugeGraph graph; private String name; private String input; - private Job job; + private Job job; - public static JobBuilder of(final HugeGraph graph) { - return new JobBuilder(graph); + public static JobBuilder of(final HugeGraph graph) { + return new JobBuilder<>(graph); } public JobBuilder(final HugeGraph graph) { this.graph = graph; } - public JobBuilder name(String name) { + public JobBuilder name(String name) { this.name = name; return this; } - public JobBuilder input(String input) { + public JobBuilder input(String input) { this.input = input; return this; } - public JobBuilder job(Job job) { + public JobBuilder job(Job job) { this.job = job; return this; } - public Id schedule() { + public HugeTask schedule() { E.checkArgumentNotNull(this.name, "Job name can't be null"); E.checkArgumentNotNull(this.job, "Job can't be null"); - HugeTask task = new HugeTask<>(this.genTaskId(), null, this.job); + HugeTask task = new HugeTask<>(this.genTaskId(), null, this.job); task.type(this.job.type()); task.name(this.name); if (this.input != null) { @@ -72,7 +72,7 @@ public Id schedule() { scheduler.schedule(task); scheduler.save(task); - return task.id(); + return task; } private Id genTaskId() { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/EdgeLabelRemoveCallable.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/EdgeLabelRemoveCallable.java index cdd144c161..a81f14c4f7 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/EdgeLabelRemoveCallable.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/EdgeLabelRemoveCallable.java @@ -19,7 +19,16 @@ package com.baidu.hugegraph.job.schema; +import java.util.Set; + +import com.baidu.hugegraph.HugeGraph; +import com.baidu.hugegraph.backend.id.Id; +import com.baidu.hugegraph.backend.tx.GraphTransaction; import com.baidu.hugegraph.backend.tx.SchemaTransaction; +import com.baidu.hugegraph.schema.EdgeLabel; +import com.baidu.hugegraph.type.define.SchemaStatus; +import com.baidu.hugegraph.util.LockUtil; +import com.google.common.collect.ImmutableSet; public class EdgeLabelRemoveCallable extends SchemaCallable { @@ -30,7 +39,35 @@ public String type() { @Override public Object execute() { - SchemaTransaction.removeEdgeLabelSync(this.graph(), this.schemaId()); + removeEdgeLabel(this.graph(), this.schemaId()); return null; } -} \ No newline at end of file + + protected static void removeEdgeLabel(HugeGraph graph, Id id) { + GraphTransaction graphTx = graph.graphTransaction(); + SchemaTransaction schemaTx = graph.schemaTransaction(); + EdgeLabel edgeLabel = schemaTx.getEdgeLabel(id); + // If the edge label does not exist, return directly + if (edgeLabel == null) { + return; + } + // TODO: use event to replace direct call + // Remove index related data(include schema) of this edge label + Set indexIds = ImmutableSet.copyOf(edgeLabel.indexLabels()); + LockUtil.Locks locks = new LockUtil.Locks(); + try { + locks.lockWrites(LockUtil.EDGE_LABEL_DELETE, id); + schemaTx.updateSchemaStatus(edgeLabel, SchemaStatus.DELETING); + for (Id indexId : indexIds) { + IndexLabelRemoveCallable.removeIndexLabel(graph, indexId); + } + // Remove all edges which has matched label + graphTx.removeEdges(edgeLabel); + removeSchema(schemaTx, edgeLabel); + // Should commit changes to backend store before release delete lock + graph.tx().commit(); + } finally { + locks.unlock(); + } + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/IndexLabelRemoveCallable.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/IndexLabelRemoveCallable.java index b9e0624e27..ebbf47577f 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/IndexLabelRemoveCallable.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/IndexLabelRemoveCallable.java @@ -19,7 +19,13 @@ package com.baidu.hugegraph.job.schema; +import com.baidu.hugegraph.HugeGraph; +import com.baidu.hugegraph.backend.id.Id; +import com.baidu.hugegraph.backend.tx.GraphTransaction; import com.baidu.hugegraph.backend.tx.SchemaTransaction; +import com.baidu.hugegraph.schema.IndexLabel; +import com.baidu.hugegraph.type.define.SchemaStatus; +import com.baidu.hugegraph.util.LockUtil; public class IndexLabelRemoveCallable extends SchemaCallable { @@ -30,7 +36,34 @@ public String type() { @Override public Object execute() { - SchemaTransaction.removeIndexLabelSync(this.graph(), this.schemaId()); + removeIndexLabel(this.graph(), this.schemaId()); return null; } -} \ No newline at end of file + + protected static void removeIndexLabel(HugeGraph graph, Id id) { + GraphTransaction graphTx = graph.graphTransaction(); + SchemaTransaction schemaTx = graph.schemaTransaction(); + IndexLabel indexLabel = schemaTx.getIndexLabel(id); + // If the index label does not exist, return directly + if (indexLabel == null) { + return; + } + LockUtil.Locks locks = new LockUtil.Locks(); + try { + locks.lockWrites(LockUtil.INDEX_LABEL_DELETE, id); + // TODO add update lock + // Set index label to "deleting" status + schemaTx.updateSchemaStatus(indexLabel, SchemaStatus.DELETING); + // Remove index data + // TODO: use event to replace direct call + graphTx.removeIndex(indexLabel); + // Remove label from indexLabels of vertex or edge label + removeIndexLabelFromBaseLabel(schemaTx, indexLabel); + removeSchema(schemaTx, indexLabel); + // Should commit changes to backend store before release delete lock + graph.tx().commit(); + } finally { + locks.unlock(); + } + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/SchemaCallable.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/SchemaCallable.java index 3e13ac5f92..3196ae3261 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/SchemaCallable.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/SchemaCallable.java @@ -1,8 +1,15 @@ package com.baidu.hugegraph.job.schema; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + import com.baidu.hugegraph.backend.id.Id; import com.baidu.hugegraph.backend.id.IdGenerator; +import com.baidu.hugegraph.backend.tx.SchemaTransaction; import com.baidu.hugegraph.job.Job; +import com.baidu.hugegraph.schema.IndexLabel; +import com.baidu.hugegraph.schema.SchemaElement; +import com.baidu.hugegraph.schema.SchemaLabel; import com.baidu.hugegraph.type.HugeType; import com.baidu.hugegraph.util.E; @@ -37,4 +44,58 @@ public static String formatTaskName(HugeType schemaType, Id schemaId) { E.checkNotNull(schemaId, "schema id"); return String.join(SPLITOR, schemaType.toString(), schemaId.toString()); } -} \ No newline at end of file + + protected static void removeIndexLabelFromBaseLabel(SchemaTransaction tx, + IndexLabel label) { + HugeType baseType = label.baseType(); + Id baseValue = label.baseValue(); + SchemaLabel schemaLabel; + if (baseType == HugeType.VERTEX_LABEL) { + schemaLabel = tx.getVertexLabel(baseValue); + } else { + assert baseType == HugeType.EDGE_LABEL; + schemaLabel = tx.getEdgeLabel(baseValue); + } + schemaLabel.removeIndexLabel(label.id()); + updateSchema(tx, schemaLabel); + } + + /** + * Use reflection to call SchemaTransaction.removeSchema(), + * which is protected + */ + protected static void removeSchema(SchemaTransaction tx, + SchemaElement schema) { + try { + Method method = SchemaTransaction.class + .getDeclaredMethod("removeSchema", + SchemaElement.class); + method.setAccessible(true); + method.invoke(tx, schema); + } catch (NoSuchMethodException | IllegalAccessException | + InvocationTargetException e) { + throw new AssertionError( + "Can't call SchemaTransaction.removeSchema()", e); + } + + } + + /** + * Use reflection to call SchemaTransaction.updateSchema(), + * which is protected + */ + protected static void updateSchema(SchemaTransaction tx, + SchemaElement schema) { + try { + Method method = SchemaTransaction.class + .getDeclaredMethod("updateSchema", + SchemaElement.class); + method.setAccessible(true); + method.invoke(tx, schema); + } catch (NoSuchMethodException | IllegalAccessException | + InvocationTargetException e) { + throw new AssertionError( + "Can't call SchemaTransaction.updateSchema()", e); + } + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/VertexLabelRemoveCallable.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/VertexLabelRemoveCallable.java index 0485e8727d..c6e09c5cc0 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/VertexLabelRemoveCallable.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/VertexLabelRemoveCallable.java @@ -19,7 +19,19 @@ package com.baidu.hugegraph.job.schema; +import java.util.List; +import java.util.Set; + +import com.baidu.hugegraph.HugeException; +import com.baidu.hugegraph.HugeGraph; +import com.baidu.hugegraph.backend.id.Id; +import com.baidu.hugegraph.backend.tx.GraphTransaction; import com.baidu.hugegraph.backend.tx.SchemaTransaction; +import com.baidu.hugegraph.schema.EdgeLabel; +import com.baidu.hugegraph.schema.VertexLabel; +import com.baidu.hugegraph.type.define.SchemaStatus; +import com.baidu.hugegraph.util.LockUtil; +import com.google.common.collect.ImmutableSet; public class VertexLabelRemoveCallable extends SchemaCallable { @@ -30,7 +42,50 @@ public String type() { @Override public Object execute() { - SchemaTransaction.removeVertexLabelSync(this.graph(), this.schemaId()); + removeVertexLabel(this.graph(), this.schemaId()); return null; } -} \ No newline at end of file + + protected static void removeVertexLabel(HugeGraph graph, Id id) { + GraphTransaction graphTx = graph.graphTransaction(); + SchemaTransaction schemaTx = graph.schemaTransaction(); + VertexLabel vertexLabel = schemaTx.getVertexLabel(id); + // If the vertex label does not exist, return directly + if (vertexLabel == null) { + return; + } + + List edgeLabels = schemaTx.getEdgeLabels(); + for (EdgeLabel edgeLabel : edgeLabels) { + if (edgeLabel.linkWithLabel(id)) { + throw new HugeException( + "Not allowed to remove vertex label '%s' " + + "because the edge label '%s' still link with it", + vertexLabel.name(), edgeLabel.name()); + } + } + + /* + * Copy index label ids because removeIndexLabel will mutate + * vertexLabel.indexLabels() + */ + Set indexLabelIds = ImmutableSet.copyOf(vertexLabel.indexLabels()); + LockUtil.Locks locks = new LockUtil.Locks(); + try { + locks.lockWrites(LockUtil.VERTEX_LABEL_DELETE, id); + schemaTx.updateSchemaStatus(vertexLabel, SchemaStatus.DELETING); + for (Id indexLabelId : indexLabelIds) { + IndexLabelRemoveCallable.removeIndexLabel(graph, indexLabelId); + } + + // TODO: use event to replace direct call + // Deleting a vertex will automatically deletes the held edge + graphTx.removeVertices(vertexLabel); + removeSchema(schemaTx, vertexLabel); + // Should commit changes to backend store before release delete lock + graph.tx().commit(); + } finally { + locks.unlock(); + } + } +}