From 04d8359a6891e2cd758fbd665e70a32415359265 Mon Sep 17 00:00:00 2001 From: zhangyi51 Date: Tue, 30 Oct 2018 15:22:14 +0800 Subject: [PATCH] Commit index updates in batch 1. To avoid commit all together during rebuilding index, especially for Cassandra backend, which has batch limit 65535 2. Also move async codes to com.baidu.hugegraph.job package 3. fix bug that CacheManager might create more than one cache with same name fixed: #144 implemented: #82 Change-Id: I88ff4bc878bc24122f0bb6ecf9964246a083b9ab --- .../baidu/hugegraph/api/job/GremlinAPI.java | 9 +- .../java/com/baidu/hugegraph/HugeGraph.java | 6 + .../baidu/hugegraph/backend/Transaction.java | 2 + .../hugegraph/backend/cache/CacheManager.java | 8 +- .../backend/tx/AbstractTransaction.java | 6 + .../backend/tx/GraphIndexTransaction.java | 54 +++--- .../backend/tx/GraphTransaction.java | 4 +- .../backend/tx/SchemaTransaction.java | 174 +++--------------- .../com/baidu/hugegraph/job/JobBuilder.java | 20 +- .../job/schema/EdgeLabelRemoveCallable.java | 41 ++++- .../job/schema/IndexLabelRemoveCallable.java | 37 +++- .../hugegraph/job/schema/SchemaCallable.java | 63 ++++++- .../job/schema/VertexLabelRemoveCallable.java | 59 +++++- 13 files changed, 285 insertions(+), 198 deletions(-) diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/api/job/GremlinAPI.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/job/GremlinAPI.java index 45ec879118..48133ef39c 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/api/job/GremlinAPI.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/job/GremlinAPI.java @@ -75,10 +75,11 @@ public Map post(@Context GraphManager manager, HugeGraph g = graph(manager, graph); request.aliase(graph, "graph"); - JobBuilder builder = JobBuilder.of(g).name(request.name()) - .input(request.toJson()) - .job(new GremlinJob()); - return ImmutableMap.of("task_id", builder.schedule()); + JobBuilder builder = JobBuilder.of(g); + builder.name(request.name()) + .input(request.toJson()) + .job(new GremlinJob()); + return ImmutableMap.of("task_id", builder.schedule().id()); } public static class GremlinJob extends Job { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java index cefe6ca613..6511e37220 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java @@ -550,6 +550,12 @@ public boolean closed() { return this.refs.get() == 0; } + public void commitIfGtSize(int size) { + // Only committing graph transaction data if reaching batch size + // is OK, bacause schema transaction is auto committed. + this.graphTransaction().commitIfGtSize(size); + } + @Override public void commit() { try { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/Transaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/Transaction.java index 5b9626a34c..b3c066b1ec 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/Transaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/Transaction.java @@ -23,6 +23,8 @@ public interface Transaction { public void commit() throws BackendException; + public void commitIfGtSize(int size) throws BackendException; + public void rollback() throws BackendException; public boolean autoCommit(); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CacheManager.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CacheManager.java index 87aa3ea81e..a16ed69a35 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CacheManager.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CacheManager.java @@ -87,11 +87,9 @@ public Cache cache(String name) { } public Cache cache(String name, int capacity) { - Cache cache = this.caches.get(name); - if (cache == null) { - cache = new RamCache(capacity); - this.caches.put(name, cache); + if (!this.caches.containsKey(name)) { + this.caches.putIfAbsent(name, new RamCache(capacity)); } - return cache; + return this.caches.get(name); } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/AbstractTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/AbstractTransaction.java index d85612ec9c..0264fd438d 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/AbstractTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/AbstractTransaction.java @@ -170,6 +170,12 @@ public void commit() throws BackendException { } } + public void commitIfGtSize(int size) throws BackendException { + if (this.mutationSize() >= size) { + this.commit(); + } + } + @Watched(prefix = "tx") @Override public void rollback() throws BackendException { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphIndexTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphIndexTransaction.java index 83fc527d4c..cb7a57b912 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphIndexTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphIndexTransaction.java @@ -76,6 +76,7 @@ public class GraphIndexTransaction extends AbstractTransaction { private static final String INDEX_EMPTY_SYM = "\u0000"; private static final Query EMPTY_QUERY = new ConditionQuery(null); + private static final int REBUILD_COMMIT_BATCH = 1000; private final Analyzer textAnalyzer; @@ -1100,30 +1101,13 @@ public void rebuildIndex(HugeType type, Id label, * They have different id lead to it can't compare and optimize */ this.commit(); - if (type == HugeType.VERTEX_LABEL) { - ConditionQuery query = new ConditionQuery(HugeType.VERTEX); - query.eq(HugeKeys.LABEL, label); - query.capacity(Query.NO_CAPACITY); - for (Iterator itor = graphTx.queryVertices(query); - itor.hasNext();) { - HugeVertex vertex = (HugeVertex) itor.next(); - for (Id id : indexLabelIds) { - this.updateIndex(id, vertex, false); - } - } + this.rebuildIndex(HugeType.VERTEX, label, indexLabelIds, + graphTx::queryVerticesFromBackend); } else { assert type == HugeType.EDGE_LABEL; - ConditionQuery query = new ConditionQuery(HugeType.EDGE); - query.eq(HugeKeys.LABEL, label); - query.capacity(Query.NO_CAPACITY); - for (Iterator itor = graphTx.queryEdges(query); - itor.hasNext();) { - HugeEdge edge = (HugeEdge) itor.next(); - for (Id id : indexLabelIds) { - this.updateIndex(id, edge, false); - } - } + this.rebuildIndex(HugeType.EDGE, label, indexLabelIds, + graphTx::queryEdgesFromBackend); } this.commit(); @@ -1136,6 +1120,34 @@ public void rebuildIndex(HugeType type, Id label, } } + private void rebuildIndex(HugeType type, Id label, + Collection indexLabelIds, + Function> fetcher) { + assert type == HugeType.VERTEX || type == HugeType.EDGE; + ConditionQuery query = new ConditionQuery(type); + query.eq(HugeKeys.LABEL, label); + query.limit(Query.DEFAULT_CAPACITY); + + int pass = 0; + int counter; + do { + counter = 0; + query.offset(pass++ * Query.DEFAULT_CAPACITY); + Iterator itor = fetcher.apply(query); + while (itor.hasNext()) { + HugeElement element = (HugeElement) itor.next(); + for (Id id : indexLabelIds) { + this.updateIndex(id, element, false); + // Commit per small batch to avoid too much data + // in single commit, especially for Cassandra backend. + this.commitIfGtSize(REBUILD_COMMIT_BATCH); + } + ++counter; + } + assert counter <= Query.DEFAULT_CAPACITY; + } while (counter == Query.DEFAULT_CAPACITY); + } + private static class MatchedLabel { private SchemaLabel schemaLabel; diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphTransaction.java index 71987baefe..2bf63c68f4 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphTransaction.java @@ -446,7 +446,7 @@ public Iterator queryVertices(Query query) { return r; } - private Iterator queryVerticesFromBackend(Query query) { + protected Iterator queryVerticesFromBackend(Query query) { assert query.resultType().isVertex(); Iterator entries = this.query(query); @@ -562,7 +562,7 @@ public Iterator queryEdges(Query query) { return r; } - private Iterator queryEdgesFromBackend(Query query) { + protected Iterator queryEdgesFromBackend(Query query) { assert query.resultType().isEdge(); Iterator entries = this.query(query); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java index 1245654ac5..e7c1d5c055 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import java.util.Set; import com.baidu.hugegraph.HugeException; import com.baidu.hugegraph.HugeGraph; @@ -33,7 +32,6 @@ import com.baidu.hugegraph.backend.store.BackendEntry; import com.baidu.hugegraph.backend.store.BackendStore; import com.baidu.hugegraph.config.CoreOptions; -import com.baidu.hugegraph.config.HugeConfig; import com.baidu.hugegraph.exception.NotAllowException; import com.baidu.hugegraph.job.JobBuilder; import com.baidu.hugegraph.job.schema.EdgeLabelRemoveCallable; @@ -48,12 +46,12 @@ import com.baidu.hugegraph.schema.SchemaElement; import com.baidu.hugegraph.schema.SchemaLabel; import com.baidu.hugegraph.schema.VertexLabel; +import com.baidu.hugegraph.task.HugeTask; import com.baidu.hugegraph.type.HugeType; import com.baidu.hugegraph.type.define.HugeKeys; import com.baidu.hugegraph.type.define.SchemaStatus; import com.baidu.hugegraph.util.E; import com.baidu.hugegraph.util.LockUtil; -import com.google.common.collect.ImmutableSet; public class SchemaTransaction extends IndexableTransaction { @@ -162,58 +160,9 @@ public VertexLabel getVertexLabel(String name) { } public Id removeVertexLabel(Id id) { - HugeConfig config = this.graph().configuration(); - if (config.get(CoreOptions.SCHEMA_SYNC_DELETION)) { - removeVertexLabelSync(this.graph(), id); - return null; - } else { - SchemaCallable callable = new VertexLabelRemoveCallable(); - return asyncRun(this.graph(), HugeType.VERTEX_LABEL, id, callable); - } - } - - public static void removeVertexLabelSync(HugeGraph graph, Id id) { LOG.debug("SchemaTransaction remove vertex label '{}'", id); - GraphTransaction graphTx = graph.graphTransaction(); - SchemaTransaction schemaTx = graph.schemaTransaction(); - VertexLabel vertexLabel = schemaTx.getVertexLabel(id); - // If the vertex label does not exist, return directly - if (vertexLabel == null) { - return; - } - - List edgeLabels = schemaTx.getEdgeLabels(); - for (EdgeLabel edgeLabel : edgeLabels) { - if (edgeLabel.linkWithLabel(id)) { - throw new HugeException( - "Not allowed to remove vertex label '%s' " + - "because the edge label '%s' still link with it", - vertexLabel.name(), edgeLabel.name()); - } - } - - /* - * Copy index label ids because removeIndexLabel will mutate - * vertexLabel.indexLabels() - */ - Set indexLabelIds = ImmutableSet.copyOf(vertexLabel.indexLabels()); - LockUtil.Locks locks = new LockUtil.Locks(); - try { - locks.lockWrites(LockUtil.VERTEX_LABEL_DELETE, id); - schemaTx.updateSchemaStatus(vertexLabel, SchemaStatus.DELETING); - for (Id indexLabelId : indexLabelIds) { - removeIndexLabelSync(graph, indexLabelId); - } - - // TODO: use event to replace direct call - // Deleting a vertex will automatically deletes the held edge - graphTx.removeVertices(vertexLabel); - schemaTx.removeSchema(vertexLabel); - // Should commit changes to backend store before release delete lock - graph.tx().commit(); - } finally { - locks.unlock(); - } + SchemaCallable callable = new VertexLabelRemoveCallable(); + return asyncRun(this.graph(), HugeType.VERTEX_LABEL, id, callable); } public void addEdgeLabel(EdgeLabel edgeLabel) { @@ -233,43 +182,9 @@ public EdgeLabel getEdgeLabel(String name) { } public Id removeEdgeLabel(Id id) { - HugeConfig config = this.graph().configuration(); - if (config.get(CoreOptions.SCHEMA_SYNC_DELETION)) { - removeEdgeLabelSync(this.graph(), id); - return null; - } else { - SchemaCallable callable = new EdgeLabelRemoveCallable(); - return asyncRun(this.graph(), HugeType.EDGE_LABEL, id, callable); - } - } - - public static void removeEdgeLabelSync(HugeGraph graph, Id id) { LOG.debug("SchemaTransaction remove edge label '{}'", id); - GraphTransaction graphTx = graph.graphTransaction(); - SchemaTransaction schemaTx = graph.schemaTransaction(); - EdgeLabel edgeLabel = schemaTx.getEdgeLabel(id); - // If the edge label does not exist, return directly - if (edgeLabel == null) { - return; - } - // TODO: use event to replace direct call - // Remove index related data(include schema) of this edge label - Set indexIds = ImmutableSet.copyOf(edgeLabel.indexLabels()); - LockUtil.Locks locks = new LockUtil.Locks(); - try { - locks.lockWrites(LockUtil.EDGE_LABEL_DELETE, id); - schemaTx.updateSchemaStatus(edgeLabel, SchemaStatus.DELETING); - for (Id indexId : indexIds) { - removeIndexLabelSync(graph, indexId); - } - // Remove all edges which has matched label - graphTx.removeEdges(edgeLabel); - schemaTx.removeSchema(edgeLabel); - // Should commit changes to backend store before release delete lock - graph.tx().commit(); - } finally { - locks.unlock(); - } + SchemaCallable callable = new EdgeLabelRemoveCallable(); + return asyncRun(this.graph(), HugeType.EDGE_LABEL, id, callable); } public void addIndexLabel(SchemaLabel schemaLabel, IndexLabel indexLabel) { @@ -296,55 +211,16 @@ public IndexLabel getIndexLabel(String name) { } public Id removeIndexLabel(Id id) { - HugeConfig config = this.graph().configuration(); - if (config.get(CoreOptions.SCHEMA_SYNC_DELETION)) { - removeIndexLabelSync(this.graph(), id); - return null; - } else { - SchemaCallable callable = new IndexLabelRemoveCallable(); - return asyncRun(this.graph(), HugeType.INDEX_LABEL, id, callable); - } - } - - public static void removeIndexLabelSync(HugeGraph graph, Id id) { LOG.debug("SchemaTransaction remove index label '{}'", id); - GraphTransaction graphTx = graph.graphTransaction(); - SchemaTransaction schemaTx = graph.schemaTransaction(); - IndexLabel indexLabel = schemaTx.getIndexLabel(id); - // If the index label does not exist, return directly - if (indexLabel == null) { - return; - } - LockUtil.Locks locks = new LockUtil.Locks(); - try { - locks.lockWrites(LockUtil.INDEX_LABEL_DELETE, id); - // TODO add update lock - // Set index label to "deleting" status - schemaTx.updateSchemaStatus(indexLabel, SchemaStatus.DELETING); - // Remove index data - // TODO: use event to replace direct call - graphTx.removeIndex(indexLabel); - // Remove label from indexLabels of vertex or edge label - schemaTx.removeIndexLabelFromBaseLabel(indexLabel); - schemaTx.removeSchema(indexLabel); - // Should commit changes to backend store before release delete lock - graph.tx().commit(); - } finally { - locks.unlock(); - } + SchemaCallable callable = new IndexLabelRemoveCallable(); + return asyncRun(this.graph(), HugeType.INDEX_LABEL, id, callable); } public Id rebuildIndex(SchemaElement schema) { LOG.debug("SchemaTransaction rebuild index for {} with id '{}'", schema.type(), schema.id()); - HugeGraph graph = this.graph(); - if (graph.configuration().get(CoreOptions.SCHEMA_SYNC_DELETION)) { - graph.graphTransaction().rebuildIndex(schema); - return null; - } else { - SchemaCallable callable = new RebuildIndexCallable(); - return asyncRun(this.graph(), schema.type(), schema.id(), callable); - } + SchemaCallable callable = new RebuildIndexCallable(); + return asyncRun(this.graph(), schema.type(), schema.id(), callable); } public void updateSchemaStatus(SchemaElement schema, SchemaStatus status) { @@ -436,20 +312,6 @@ protected void removeSchema(SchemaElement schema) { } } - protected void removeIndexLabelFromBaseLabel(IndexLabel label) { - HugeType baseType = label.baseType(); - Id baseValue = label.baseValue(); - SchemaLabel schemaLabel; - if (baseType == HugeType.VERTEX_LABEL) { - schemaLabel = this.getVertexLabel(baseValue); - } else { - assert baseType == HugeType.EDGE_LABEL; - schemaLabel = this.getEdgeLabel(baseValue); - } - schemaLabel.removeIndexLabel(label.id()); - this.updateSchema(schemaLabel); - } - public Id getNextId(HugeType type) { LOG.debug("SchemaTransaction get next id for {}", type); return this.store().nextId(type); @@ -491,7 +353,21 @@ private T deserialize(BackendEntry entry, HugeType type) { private static Id asyncRun(HugeGraph graph, HugeType schemaType, Id schemaId, SchemaCallable callable) { String name = SchemaCallable.formatTaskName(schemaType, schemaId); - JobBuilder builder = JobBuilder.of(graph).name(name).job(callable); - return builder.schedule(); + + JobBuilder builder = JobBuilder.of(graph).name(name) + .job(callable); + HugeTask task = builder.schedule(); + + // If SCHEMA_SYNC_DELETION is true, wait async thread done before + // continue. This is used when running tests. + if (graph.configuration().get(CoreOptions.SCHEMA_SYNC_DELETION)) { + try { + task.get(); + assert task.completed(); + } catch (Exception e) { + throw new HugeException("Async task failed", e); + } + } + return task.id(); } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/JobBuilder.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/JobBuilder.java index 67c914bf7f..bcbb6f9917 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/JobBuilder.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/JobBuilder.java @@ -26,42 +26,42 @@ import com.baidu.hugegraph.type.HugeType; import com.baidu.hugegraph.util.E; -public class JobBuilder { +public class JobBuilder { private final HugeGraph graph; private String name; private String input; - private Job job; + private Job job; - public static JobBuilder of(final HugeGraph graph) { - return new JobBuilder(graph); + public static JobBuilder of(final HugeGraph graph) { + return new JobBuilder<>(graph); } public JobBuilder(final HugeGraph graph) { this.graph = graph; } - public JobBuilder name(String name) { + public JobBuilder name(String name) { this.name = name; return this; } - public JobBuilder input(String input) { + public JobBuilder input(String input) { this.input = input; return this; } - public JobBuilder job(Job job) { + public JobBuilder job(Job job) { this.job = job; return this; } - public Id schedule() { + public HugeTask schedule() { E.checkArgumentNotNull(this.name, "Job name can't be null"); E.checkArgumentNotNull(this.job, "Job can't be null"); - HugeTask task = new HugeTask<>(this.genTaskId(), null, this.job); + HugeTask task = new HugeTask<>(this.genTaskId(), null, this.job); task.type(this.job.type()); task.name(this.name); if (this.input != null) { @@ -72,7 +72,7 @@ public Id schedule() { scheduler.schedule(task); scheduler.save(task); - return task.id(); + return task; } private Id genTaskId() { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/EdgeLabelRemoveCallable.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/EdgeLabelRemoveCallable.java index cdd144c161..a81f14c4f7 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/EdgeLabelRemoveCallable.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/EdgeLabelRemoveCallable.java @@ -19,7 +19,16 @@ package com.baidu.hugegraph.job.schema; +import java.util.Set; + +import com.baidu.hugegraph.HugeGraph; +import com.baidu.hugegraph.backend.id.Id; +import com.baidu.hugegraph.backend.tx.GraphTransaction; import com.baidu.hugegraph.backend.tx.SchemaTransaction; +import com.baidu.hugegraph.schema.EdgeLabel; +import com.baidu.hugegraph.type.define.SchemaStatus; +import com.baidu.hugegraph.util.LockUtil; +import com.google.common.collect.ImmutableSet; public class EdgeLabelRemoveCallable extends SchemaCallable { @@ -30,7 +39,35 @@ public String type() { @Override public Object execute() { - SchemaTransaction.removeEdgeLabelSync(this.graph(), this.schemaId()); + removeEdgeLabel(this.graph(), this.schemaId()); return null; } -} \ No newline at end of file + + protected static void removeEdgeLabel(HugeGraph graph, Id id) { + GraphTransaction graphTx = graph.graphTransaction(); + SchemaTransaction schemaTx = graph.schemaTransaction(); + EdgeLabel edgeLabel = schemaTx.getEdgeLabel(id); + // If the edge label does not exist, return directly + if (edgeLabel == null) { + return; + } + // TODO: use event to replace direct call + // Remove index related data(include schema) of this edge label + Set indexIds = ImmutableSet.copyOf(edgeLabel.indexLabels()); + LockUtil.Locks locks = new LockUtil.Locks(); + try { + locks.lockWrites(LockUtil.EDGE_LABEL_DELETE, id); + schemaTx.updateSchemaStatus(edgeLabel, SchemaStatus.DELETING); + for (Id indexId : indexIds) { + IndexLabelRemoveCallable.removeIndexLabel(graph, indexId); + } + // Remove all edges which has matched label + graphTx.removeEdges(edgeLabel); + removeSchema(schemaTx, edgeLabel); + // Should commit changes to backend store before release delete lock + graph.tx().commit(); + } finally { + locks.unlock(); + } + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/IndexLabelRemoveCallable.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/IndexLabelRemoveCallable.java index b9e0624e27..ebbf47577f 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/IndexLabelRemoveCallable.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/IndexLabelRemoveCallable.java @@ -19,7 +19,13 @@ package com.baidu.hugegraph.job.schema; +import com.baidu.hugegraph.HugeGraph; +import com.baidu.hugegraph.backend.id.Id; +import com.baidu.hugegraph.backend.tx.GraphTransaction; import com.baidu.hugegraph.backend.tx.SchemaTransaction; +import com.baidu.hugegraph.schema.IndexLabel; +import com.baidu.hugegraph.type.define.SchemaStatus; +import com.baidu.hugegraph.util.LockUtil; public class IndexLabelRemoveCallable extends SchemaCallable { @@ -30,7 +36,34 @@ public String type() { @Override public Object execute() { - SchemaTransaction.removeIndexLabelSync(this.graph(), this.schemaId()); + removeIndexLabel(this.graph(), this.schemaId()); return null; } -} \ No newline at end of file + + protected static void removeIndexLabel(HugeGraph graph, Id id) { + GraphTransaction graphTx = graph.graphTransaction(); + SchemaTransaction schemaTx = graph.schemaTransaction(); + IndexLabel indexLabel = schemaTx.getIndexLabel(id); + // If the index label does not exist, return directly + if (indexLabel == null) { + return; + } + LockUtil.Locks locks = new LockUtil.Locks(); + try { + locks.lockWrites(LockUtil.INDEX_LABEL_DELETE, id); + // TODO add update lock + // Set index label to "deleting" status + schemaTx.updateSchemaStatus(indexLabel, SchemaStatus.DELETING); + // Remove index data + // TODO: use event to replace direct call + graphTx.removeIndex(indexLabel); + // Remove label from indexLabels of vertex or edge label + removeIndexLabelFromBaseLabel(schemaTx, indexLabel); + removeSchema(schemaTx, indexLabel); + // Should commit changes to backend store before release delete lock + graph.tx().commit(); + } finally { + locks.unlock(); + } + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/SchemaCallable.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/SchemaCallable.java index 3e13ac5f92..3196ae3261 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/SchemaCallable.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/SchemaCallable.java @@ -1,8 +1,15 @@ package com.baidu.hugegraph.job.schema; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + import com.baidu.hugegraph.backend.id.Id; import com.baidu.hugegraph.backend.id.IdGenerator; +import com.baidu.hugegraph.backend.tx.SchemaTransaction; import com.baidu.hugegraph.job.Job; +import com.baidu.hugegraph.schema.IndexLabel; +import com.baidu.hugegraph.schema.SchemaElement; +import com.baidu.hugegraph.schema.SchemaLabel; import com.baidu.hugegraph.type.HugeType; import com.baidu.hugegraph.util.E; @@ -37,4 +44,58 @@ public static String formatTaskName(HugeType schemaType, Id schemaId) { E.checkNotNull(schemaId, "schema id"); return String.join(SPLITOR, schemaType.toString(), schemaId.toString()); } -} \ No newline at end of file + + protected static void removeIndexLabelFromBaseLabel(SchemaTransaction tx, + IndexLabel label) { + HugeType baseType = label.baseType(); + Id baseValue = label.baseValue(); + SchemaLabel schemaLabel; + if (baseType == HugeType.VERTEX_LABEL) { + schemaLabel = tx.getVertexLabel(baseValue); + } else { + assert baseType == HugeType.EDGE_LABEL; + schemaLabel = tx.getEdgeLabel(baseValue); + } + schemaLabel.removeIndexLabel(label.id()); + updateSchema(tx, schemaLabel); + } + + /** + * Use reflection to call SchemaTransaction.removeSchema(), + * which is protected + */ + protected static void removeSchema(SchemaTransaction tx, + SchemaElement schema) { + try { + Method method = SchemaTransaction.class + .getDeclaredMethod("removeSchema", + SchemaElement.class); + method.setAccessible(true); + method.invoke(tx, schema); + } catch (NoSuchMethodException | IllegalAccessException | + InvocationTargetException e) { + throw new AssertionError( + "Can't call SchemaTransaction.removeSchema()", e); + } + + } + + /** + * Use reflection to call SchemaTransaction.updateSchema(), + * which is protected + */ + protected static void updateSchema(SchemaTransaction tx, + SchemaElement schema) { + try { + Method method = SchemaTransaction.class + .getDeclaredMethod("updateSchema", + SchemaElement.class); + method.setAccessible(true); + method.invoke(tx, schema); + } catch (NoSuchMethodException | IllegalAccessException | + InvocationTargetException e) { + throw new AssertionError( + "Can't call SchemaTransaction.updateSchema()", e); + } + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/VertexLabelRemoveCallable.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/VertexLabelRemoveCallable.java index 0485e8727d..c6e09c5cc0 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/VertexLabelRemoveCallable.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/VertexLabelRemoveCallable.java @@ -19,7 +19,19 @@ package com.baidu.hugegraph.job.schema; +import java.util.List; +import java.util.Set; + +import com.baidu.hugegraph.HugeException; +import com.baidu.hugegraph.HugeGraph; +import com.baidu.hugegraph.backend.id.Id; +import com.baidu.hugegraph.backend.tx.GraphTransaction; import com.baidu.hugegraph.backend.tx.SchemaTransaction; +import com.baidu.hugegraph.schema.EdgeLabel; +import com.baidu.hugegraph.schema.VertexLabel; +import com.baidu.hugegraph.type.define.SchemaStatus; +import com.baidu.hugegraph.util.LockUtil; +import com.google.common.collect.ImmutableSet; public class VertexLabelRemoveCallable extends SchemaCallable { @@ -30,7 +42,50 @@ public String type() { @Override public Object execute() { - SchemaTransaction.removeVertexLabelSync(this.graph(), this.schemaId()); + removeVertexLabel(this.graph(), this.schemaId()); return null; } -} \ No newline at end of file + + protected static void removeVertexLabel(HugeGraph graph, Id id) { + GraphTransaction graphTx = graph.graphTransaction(); + SchemaTransaction schemaTx = graph.schemaTransaction(); + VertexLabel vertexLabel = schemaTx.getVertexLabel(id); + // If the vertex label does not exist, return directly + if (vertexLabel == null) { + return; + } + + List edgeLabels = schemaTx.getEdgeLabels(); + for (EdgeLabel edgeLabel : edgeLabels) { + if (edgeLabel.linkWithLabel(id)) { + throw new HugeException( + "Not allowed to remove vertex label '%s' " + + "because the edge label '%s' still link with it", + vertexLabel.name(), edgeLabel.name()); + } + } + + /* + * Copy index label ids because removeIndexLabel will mutate + * vertexLabel.indexLabels() + */ + Set indexLabelIds = ImmutableSet.copyOf(vertexLabel.indexLabels()); + LockUtil.Locks locks = new LockUtil.Locks(); + try { + locks.lockWrites(LockUtil.VERTEX_LABEL_DELETE, id); + schemaTx.updateSchemaStatus(vertexLabel, SchemaStatus.DELETING); + for (Id indexLabelId : indexLabelIds) { + IndexLabelRemoveCallable.removeIndexLabel(graph, indexLabelId); + } + + // TODO: use event to replace direct call + // Deleting a vertex will automatically deletes the held edge + graphTx.removeVertices(vertexLabel); + removeSchema(schemaTx, vertexLabel); + // Should commit changes to backend store before release delete lock + graph.tx().commit(); + } finally { + locks.unlock(); + } + } +}