diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphIndexTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphIndexTransaction.java index 4b91527691..cc625c4784 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphIndexTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphIndexTransaction.java @@ -29,6 +29,10 @@ import java.util.Map; import java.util.Set; import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.tinkerpop.gremlin.structure.Edge; +import org.apache.tinkerpop.gremlin.structure.Vertex; import com.baidu.hugegraph.HugeException; import com.baidu.hugegraph.HugeGraph; @@ -42,9 +46,12 @@ import com.baidu.hugegraph.backend.query.ConditionQueryFlatten; import com.baidu.hugegraph.backend.query.IdQuery; import com.baidu.hugegraph.backend.query.Query; +import com.baidu.hugegraph.backend.serializer.AbstractSerializer; import com.baidu.hugegraph.backend.store.BackendEntry; import com.baidu.hugegraph.backend.store.BackendStore; import com.baidu.hugegraph.exception.NoIndexException; +import com.baidu.hugegraph.job.EphemeralJob; +import com.baidu.hugegraph.job.EphemeralJobBuilder; import com.baidu.hugegraph.perf.PerfUtil.Watched; import com.baidu.hugegraph.schema.IndexLabel; import com.baidu.hugegraph.schema.PropertyKey; @@ -54,6 +61,7 @@ import com.baidu.hugegraph.structure.HugeIndex; import com.baidu.hugegraph.structure.HugeProperty; import com.baidu.hugegraph.structure.HugeVertex; +import com.baidu.hugegraph.task.HugeTask; import com.baidu.hugegraph.type.HugeType; import com.baidu.hugegraph.type.define.HugeKeys; import com.baidu.hugegraph.type.define.IndexType; @@ -79,110 +87,14 @@ public GraphIndexTransaction(HugeGraph graph, BackendStore store) { assert this.textAnalyzer != null; } - protected void removeIndexLeft(ConditionQuery query, HugeElement element) { - if (element.type() != HugeType.VERTEX && - element.type() != HugeType.EDGE_OUT && - element.type() != HugeType.EDGE_IN) { - throw new HugeException("Only accept element of type VERTEX and " + - "EDGE to remove left index, but got: '%s'", - element.type()); - } - - // TODO: remove left index in async thread - for (ConditionQuery cq: ConditionQueryFlatten.flatten(query)) { - // Process range index - this.processRangeIndexLeft(cq, element); - // Process secondary index or search index - this.processSecondaryOrSearchIndexLeft(cq, element); - } - this.commit(); - } - - private void processRangeIndexLeft(ConditionQuery query, - HugeElement element) { - // Construct index ConditionQuery - Set matchedIndexes = collectMatchedIndexes(query); - IndexQueries queries = null; - for (MatchedIndex index : matchedIndexes) { - if (index.schemaLabel().id().equals(element.schemaLabel().id())) { - queries = index.constructIndexQueries(query); - break; - } - } - E.checkState(queries != null, - "Can't construct left-index query for '%s'", query); - - for (ConditionQuery q : queries.values()) { - if (!q.resultType().isRangeIndex()) { - continue; - } - // Query and delete index equals element id - for (Iterator it = super.query(q); it.hasNext();) { - BackendEntry entry = it.next(); - HugeIndex index = this.serializer.readIndex(graph(), q, entry); - if (index.elementIds().contains(element.id())) { - index.resetElementIds(); - index.elementIds(element.id()); - this.doEliminate(this.serializer.writeIndex(index)); - } - } - } - } - - private void processSecondaryOrSearchIndexLeft(ConditionQuery query, - HugeElement element) { - HugeElement deletion = element.copyAsFresh(); - Set propKeys = query.userpropKeys(); - Set incorrectPKs = InsertionOrderUtil.newSet(); - for (Id key : propKeys) { - Set conditionValues = query.userpropValues(key); - E.checkState(!conditionValues.isEmpty(), - "Expect user property values for key '%s', " + - "but got none", key); - if (conditionValues.size() > 1) { - // It's inside/between Query (processed in range index) - return; - } - Object propValue = deletion.getProperty(key).value(); - Object conditionValue = conditionValues.iterator().next(); - if (!propValue.equals(conditionValue)) { - PropertyKey pkey = this.graph().propertyKey(key); - deletion.addProperty(pkey, conditionValue); - incorrectPKs.add(key); - } - } - - // Delete unused index - for (IndexLabel il : relatedIndexLabels(deletion)) { - if (!CollectionUtil.hasIntersection(il.indexFields(), incorrectPKs)) { - continue; - } - // Skip if search index is not wrong - if (il.indexType() == IndexType.SEARCH) { - Id field = il.indexField(); - String cond = deletion.getPropertyValue(field); - String actual = element.getPropertyValue(field); - if (this.matchSearchIndexWords(actual, cond)) { - /* - * If query by two search index, one is correct but - * the other is wrong, we should not delete the correct - */ - continue; - } - } - // Delete index with error property - this.updateIndex(il.id(), deletion, true); - // Rebuild index if delete correct index part - if (il.indexType() == IndexType.SECONDARY) { - /* - * When it's a composite secondary index, - * if the suffix property is wrong and the prefix property - * is correct, the correct prefix part will be deleted, - * so rebuild the index again with the correct property. - */ - this.updateIndex(il.id(), element, false); - } - } + protected Id asyncRemoveIndexLeft(ConditionQuery query, + HugeElement element) { + RemoveLeftIndexJob callable = new RemoveLeftIndexJob(query, element); + HugeTask task = EphemeralJobBuilder.of(this.graph()) + .name(element.name()) + .job(callable) + .schedule(); + return task.id(); } @Watched(prefix = "index") @@ -314,7 +226,7 @@ private void updateIndex(IndexLabel indexLabel, Object propValue, * @return converted id query */ @Watched(prefix = "index") - public Query query(ConditionQuery query) { + public Query indexQuery(ConditionQuery query) { // Index query must have been flattened in Graph tx query.checkFlattened(); @@ -1091,10 +1003,228 @@ public static IndexQueries of(IndexLabel il, ConditionQuery query) { } } - public static enum OptimizedType { + public enum OptimizedType { NONE, PRIMARY_KEY, SORT_KEYS, INDEX } + + public static class RemoveLeftIndexJob extends EphemeralJob { + + private static final String REMOVE_LEFT_INDEX = "remove_left_index"; + + private ConditionQuery query; + private HugeElement element; + private GraphIndexTransaction tx; + + private RemoveLeftIndexJob(ConditionQuery query, + HugeElement element) { + E.checkArgumentNotNull(query, "query"); + E.checkArgumentNotNull(element, "element"); + this.query = query; + this.element = element; + } + + @Override + public String type() { + return REMOVE_LEFT_INDEX; + } + + @Override + public Object execute() { + this.tx = this.graph().graphTransaction().indexTransaction(); + return this.removeIndexLeft(this.query, this.element); + } + + protected int removeIndexLeft(ConditionQuery query, + HugeElement element) { + if (element.type() != HugeType.VERTEX && + element.type() != HugeType.EDGE_OUT && + element.type() != HugeType.EDGE_IN) { + throw new HugeException("Only accept element of type VERTEX " + + "and EDGE to remove left index, " + + "but got: '%s'", element.type()); + } + int rCount = 0; + int sCount = 0; + for (ConditionQuery cq: ConditionQueryFlatten.flatten(query)) { + // Process range index + rCount += this.processRangeIndexLeft(cq, element); + // Process secondary index or search index + sCount += this.processSecondaryOrSearchIndexLeft(cq, element); + } + this.tx.commit(); + return rCount + sCount; + } + + private int processRangeIndexLeft(ConditionQuery query, + HugeElement element) { + GraphIndexTransaction tx = this.tx; + AbstractSerializer serializer = tx.serializer; + int count = 0; + // Construct index ConditionQuery + Set matchedIndexes = tx.collectMatchedIndexes(query); + IndexQueries queries = null; + Id elementLabelId = element.schemaLabel().id(); + for (MatchedIndex index : matchedIndexes) { + if (index.schemaLabel().id().equals(elementLabelId)) { + queries = index.constructIndexQueries(query); + break; + } + } + E.checkState(queries != null, + "Can't construct left-index query for '%s'", query); + + for (ConditionQuery q : queries.values()) { + if (!q.resultType().isRangeIndex()) { + continue; + } + // Query and delete index equals element id + for (Iterator it = tx.query(q); it.hasNext();) { + BackendEntry entry = it.next(); + HugeIndex index = serializer.readIndex(graph(), q, entry); + if (index.elementIds().contains(element.id())) { + index.resetElementIds(); + index.elementIds(element.id()); + tx.doEliminate(serializer.writeIndex(index)); + tx.commit(); + // If deleted by error, re-add deleted index again + if (this.deletedByError(q, element)) { + tx.doAppend(serializer.writeIndex(index)); + tx.commit(); + } else { + count++; + } + } + } + } + return count; + } + + private int processSecondaryOrSearchIndexLeft(ConditionQuery query, + HugeElement element) { + Map incorrectPKs = InsertionOrderUtil.newMap(); + HugeElement deletion = this.constructErrorElem(query, element, + incorrectPKs); + if (deletion == null) { + return 0; + } + + // Delete unused index + int count = 0; + Set incorrectPkIds; + for (IndexLabel il : relatedIndexLabels(deletion)) { + incorrectPkIds = incorrectPKs.keySet().stream() + .map(PropertyKey::id) + .collect(Collectors.toSet()); + Collection incorrectIndexFields = CollectionUtil.intersect( + il.indexFields(), + incorrectPkIds); + if (incorrectIndexFields.isEmpty()) { + continue; + } + // Skip if search index is not wrong + if (il.indexType() == IndexType.SEARCH) { + Id field = il.indexField(); + String cond = deletion.getPropertyValue(field); + String actual = element.getPropertyValue(field); + if (this.tx.matchSearchIndexWords(actual, cond)) { + /* + * If query by two search index, one is correct but + * the other is wrong, we should not delete the correct + */ + continue; + } + } + // Delete index with error property + this.tx.updateIndex(il.id(), deletion, true); + // Rebuild index if delete correct index part + if (il.indexType() == IndexType.SECONDARY) { + /* + * When it's a composite secondary index, + * if the suffix property is wrong and the prefix property + * is correct, the correct prefix part will be deleted, + * so rebuild the index again with the correct property. + */ + this.tx.updateIndex(il.id(), element, false); + } + this.tx.commit(); + if (this.deletedByError(element, incorrectIndexFields, + incorrectPKs)) { + this.tx.updateIndex(il.id(), deletion, false); + this.tx.commit(); + } else { + count++; + } + } + return count; + } + + private HugeElement constructErrorElem( + ConditionQuery query, HugeElement element, + Map incorrectPKs) { + HugeElement errorElem = element.copyAsFresh(); + Set propKeys = query.userpropKeys(); + for (Id key : propKeys) { + Set conditionValues = query.userpropValues(key); + E.checkState(!conditionValues.isEmpty(), + "Expect user property values for key '%s', " + + "but got none", key); + if (conditionValues.size() > 1) { + // It's inside/between Query (processed in range index) + return null; + } + Object propValue = errorElem.getProperty(key).value(); + Object conditionValue = conditionValues.iterator().next(); + if (!propValue.equals(conditionValue)) { + PropertyKey pkey = this.graph().propertyKey(key); + errorElem.addProperty(pkey, conditionValue); + incorrectPKs.put(pkey, conditionValue); + } + } + return errorElem; + } + + private boolean deletedByError(ConditionQuery query, + HugeElement element) { + HugeElement elem = this.newestElement(element); + if (elem == null) { + return false; + } + return query.test(elem); + } + + private boolean deletedByError(HugeElement element, + Collection ilFields, + Map incorrectPKs) { + HugeElement elem = this.newestElement(element); + for (Map.Entry e : incorrectPKs.entrySet()) { + PropertyKey pk = e.getKey(); + Object value = e.getValue(); + if (ilFields.contains(pk.id()) && + value.equals(elem.getPropertyValue(pk.id()))) { + return true; + } + } + return false; + } + + private HugeElement newestElement(HugeElement element) { + boolean isVertex = element instanceof HugeVertex; + if (isVertex) { + Iterator iterV = this.graph().vertices(element.id()); + if (iterV.hasNext()) { + return (HugeVertex) iterV.next(); + } + } else { + assert element instanceof HugeEdge; + Iterator iterE = this.graph().edges(element.id()); + if (iterE.hasNext()) { + return (HugeEdge) iterE.next(); + } + } + return null; + } + } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphTransaction.java index b91e114dae..f4b3e4c173 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphTransaction.java @@ -151,7 +151,7 @@ protected void reset() { } @Override - protected AbstractTransaction indexTransaction() { + protected GraphIndexTransaction indexTransaction() { return this.indexTx; } @@ -1026,7 +1026,7 @@ protected Query optimizeQuery(ConditionQuery query) { */ this.beforeRead(); try { - return this.indexTx.query(query); + return this.indexTx.indexQuery(query); } finally { this.afterRead(); } @@ -1206,7 +1206,7 @@ private boolean filterResultFromIndexQuery(Query query, HugeElement elem) { if (cq.optimized() == OptimizedType.INDEX.ordinal()) { LOG.info("Remove left index: {}, query: {}", elem, cq); - this.indexTx.removeIndexLeft(cq, elem); + this.indexTx.asyncRemoveIndexLeft(cq, elem); } return false; } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/EphemeralJob.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/EphemeralJob.java new file mode 100644 index 0000000000..181c220b14 --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/EphemeralJob.java @@ -0,0 +1,34 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.job; + +import com.baidu.hugegraph.task.TaskCallable; + +public abstract class EphemeralJob extends TaskCallable { + + public abstract String type(); + + public abstract T execute() throws Exception; + + @Override + public T call() throws Exception { + return this.execute(); + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/EphemeralJobBuilder.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/EphemeralJobBuilder.java new file mode 100644 index 0000000000..173aa33fbe --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/EphemeralJobBuilder.java @@ -0,0 +1,86 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.job; + +import com.baidu.hugegraph.HugeGraph; +import com.baidu.hugegraph.backend.id.Id; +import com.baidu.hugegraph.backend.id.IdGenerator; +import com.baidu.hugegraph.task.HugeTask; +import com.baidu.hugegraph.task.TaskScheduler; +import com.baidu.hugegraph.util.E; + +public class EphemeralJobBuilder { + + private final HugeGraph graph; + + private String name; + private String input; + private EphemeralJob job; + + // Use negative task id for ephemeral task + private static int ephemeralTaskId = -1; + + public static EphemeralJobBuilder of(final HugeGraph graph) { + return new EphemeralJobBuilder<>(graph); + } + + public EphemeralJobBuilder(final HugeGraph graph) { + this.graph = graph; + } + + public EphemeralJobBuilder name(String name) { + this.name = name; + return this; + } + + public EphemeralJobBuilder input(String input) { + this.input = input; + return this; + } + + public EphemeralJobBuilder job(EphemeralJob job) { + this.job = job; + return this; + } + + public HugeTask schedule() { + E.checkArgumentNotNull(this.name, "Job name can't be null"); + E.checkArgumentNotNull(this.job, "Job can't be null"); + + HugeTask task = new HugeTask<>(this.genTaskId(), null, this.job); + task.type(this.job.type()); + task.name(this.name); + if (this.input != null) { + task.input(this.input); + } + + TaskScheduler scheduler = this.graph.taskScheduler(); + scheduler.schedule(task); + + return task; + } + + private Id genTaskId() { + if (ephemeralTaskId >= 0) { + ephemeralTaskId = -1; + } + return IdGenerator.of(ephemeralTaskId--); + } +}