From dceecbc0c594f5aec76ea7a5aac19b8af27e0114 Mon Sep 17 00:00:00 2001 From: zhangyi51 Date: Mon, 17 Dec 2018 22:11:19 +0800 Subject: [PATCH] remove left index in async mode implemented: #247 Change-Id: Ie1baacb93f18d66ed7169161f15f398701e955a3 --- .../backend/tx/GraphIndexTransaction.java | 160 +++++++++++++++--- .../backend/tx/GraphTransaction.java | 4 +- .../com/baidu/hugegraph/job/EphemeralJob.java | 34 ++++ .../hugegraph/job/EphemeralJobBuilder.java | 82 +++++++++ 4 files changed, 255 insertions(+), 25 deletions(-) create mode 100644 hugegraph-core/src/main/java/com/baidu/hugegraph/job/EphemeralJob.java create mode 100644 hugegraph-core/src/main/java/com/baidu/hugegraph/job/EphemeralJobBuilder.java diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphIndexTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphIndexTransaction.java index ede4b829ec..563639c56a 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphIndexTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphIndexTransaction.java @@ -29,6 +29,11 @@ import java.util.Map; import java.util.Set; import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.tinkerpop.gremlin.structure.Edge; +import org.apache.tinkerpop.gremlin.structure.Vertex; import com.baidu.hugegraph.HugeException; import com.baidu.hugegraph.HugeGraph; @@ -45,6 +50,8 @@ import com.baidu.hugegraph.backend.store.BackendEntry; import com.baidu.hugegraph.backend.store.BackendStore; import com.baidu.hugegraph.exception.NoIndexException; +import com.baidu.hugegraph.job.EphemeralJob; +import com.baidu.hugegraph.job.EphemeralJobBuilder; import com.baidu.hugegraph.perf.PerfUtil.Watched; import com.baidu.hugegraph.schema.IndexLabel; import com.baidu.hugegraph.schema.PropertyKey; @@ -54,6 +61,7 @@ import com.baidu.hugegraph.structure.HugeIndex; import com.baidu.hugegraph.structure.HugeProperty; import com.baidu.hugegraph.structure.HugeVertex; +import com.baidu.hugegraph.task.HugeTask; import com.baidu.hugegraph.type.HugeType; import com.baidu.hugegraph.type.define.HugeKeys; import com.baidu.hugegraph.type.define.IndexType; @@ -79,6 +87,18 @@ public GraphIndexTransaction(HugeGraph graph, BackendStore store) { assert this.textAnalyzer != null; } + protected Id asyncRemoveIndexLeft(ConditionQuery query, + HugeElement element) { + RemoveLeftIndexCallable callable = new RemoveLeftIndexCallable(query, + element); + EphemeralJobBuilder builder = + EphemeralJobBuilder.of(this.graph()) + .name(element.name()) + .job(callable); + HugeTask task = builder.schedule(); + return task.id(); + } + protected void removeIndexLeft(ConditionQuery query, HugeElement element) { if (element.type() != HugeType.VERTEX && element.type() != HugeType.EDGE_OUT && @@ -88,7 +108,6 @@ protected void removeIndexLeft(ConditionQuery query, HugeElement element) { element.type()); } - // TODO: remove left index in async thread for (ConditionQuery cq: ConditionQueryFlatten.flatten(query)) { // Process range index this.processRangeIndexLeft(cq, element); @@ -124,6 +143,12 @@ private void processRangeIndexLeft(ConditionQuery query, index.resetElementIds(); index.elementIds(element.id()); this.doEliminate(this.serializer.writeIndex(index)); + this.commit(); + // If deleted by error, re-add deleted index again + if (this.deletedByError(q, element)) { + this.doAppend(this.serializer.writeIndex(index)); + this.commit(); + } } } } @@ -131,30 +156,21 @@ private void processRangeIndexLeft(ConditionQuery query, private void processSecondaryOrSearchIndexLeft(ConditionQuery query, HugeElement element) { - HugeElement deletion = element.copyAsFresh(); - Set propKeys = query.userpropKeys(); - Set incorrectPKs = InsertionOrderUtil.newSet(); - for (Id key : propKeys) { - Set conditionValues = query.userpropValues(key); - E.checkState(!conditionValues.isEmpty(), - "Expect user property values for key '%s', " + - "but got none", key); - if (conditionValues.size() > 1) { - // It's inside/between Query (processed in range index) - return; - } - Object propValue = deletion.getProperty(key).value(); - Object conditionValue = conditionValues.iterator().next(); - if (!propValue.equals(conditionValue)) { - PropertyKey pkey = this.graph().propertyKey(key); - deletion.addProperty(pkey, conditionValue); - incorrectPKs.add(key); - } + Map incorrectPKs = InsertionOrderUtil.newMap(); + HugeElement deletion = this.constructErrorElem(query, element, + incorrectPKs); + if (deletion == null) { + return; } - // Delete unused index for (IndexLabel il : relatedIndexLabels(deletion)) { - if (!CollectionUtil.hasIntersection(il.indexFields(), incorrectPKs)) { + Set incorrectPKIds = incorrectPKs.keySet().stream() + .map(PropertyKey::id) + .collect(Collectors.toSet()); + @SuppressWarnings("unchecked") + Collection pks = CollectionUtils.intersection(il.indexFields(), + incorrectPKIds); + if (pks.isEmpty()) { continue; } // Skip if search index is not wrong @@ -182,6 +198,11 @@ private void processSecondaryOrSearchIndexLeft(ConditionQuery query, */ this.updateIndex(il.id(), element, false); } + this.commit(); + if (this.deletedByError(element, pks, incorrectPKs)) { + this.updateIndex(il.id(), deletion, false); + } + this.commit(); } } @@ -305,6 +326,70 @@ private void updateIndex(IndexLabel indexLabel, Object propValue, } } + private HugeElement constructErrorElem( + ConditionQuery query, HugeElement element, + Map incorrectPKs) { + HugeElement errorElem = element.copyAsFresh(); + Set propKeys = query.userpropKeys(); + for (Id key : propKeys) { + Set conditionValues = query.userpropValues(key); + E.checkState(!conditionValues.isEmpty(), + "Expect user property values for key '%s', " + + "but got none", key); + if (conditionValues.size() > 1) { + // It's inside/between Query (processed in range index) + return null; + } + Object propValue = errorElem.getProperty(key).value(); + Object conditionValue = conditionValues.iterator().next(); + if (!propValue.equals(conditionValue)) { + PropertyKey pkey = this.graph().propertyKey(key); + errorElem.addProperty(pkey, conditionValue); + incorrectPKs.put(pkey, conditionValue); + } + } + return errorElem; + } + + private boolean deletedByError(ConditionQuery query, HugeElement element) { + HugeElement elem = this.newestElement(element); + if (elem == null) { + return false; + } + return query.test(elem); + } + + private boolean deletedByError(HugeElement element, Collection ilFields, + Map incorrectPKs) { + HugeElement elem = this.newestElement(element); + for (Map.Entry e : incorrectPKs.entrySet()) { + PropertyKey pk = e.getKey(); + Object value = e.getValue(); + if (ilFields.contains(pk.id()) && + value.equals(elem.value(pk.name()))) { + return true; + } + } + return false; + } + + private HugeElement newestElement(HugeElement element) { + boolean isVertex = element instanceof HugeVertex; + if (isVertex) { + Iterator iterV = this.graph().vertices(element.id()); + if (iterV.hasNext()) { + return (HugeVertex) iterV.next(); + } + } else { + assert element instanceof HugeEdge; + Iterator iterE = this.graph().edges(element.id()); + if (iterE.hasNext()) { + return (HugeEdge) iterE.next(); + } + } + return null; + } + /** * Composite index, an index involving multiple columns. * Single index, an index involving only one column. @@ -1090,10 +1175,39 @@ public static IndexQueries of(IndexLabel il, ConditionQuery query) { } } - public static enum OptimizedType { + public enum OptimizedType { NONE, PRIMARY_KEY, SORT_KEY, INDEX } + + public static class RemoveLeftIndexCallable extends EphemeralJob { + + private static final String REMOVE_LEFT_INDEX = "remove_left_index"; + + private ConditionQuery query; + private HugeElement element; + + private RemoveLeftIndexCallable(ConditionQuery query, + HugeElement element) { + E.checkArgumentNotNull(query, "query"); + E.checkArgumentNotNull(element, "element"); + this.query = query; + this.element = element; + } + + @Override + public String type() { + return REMOVE_LEFT_INDEX; + } + + @Override + public Object execute() { + GraphIndexTransaction tx = this.graph().graphTransaction() + .indexTransaction(); + tx.removeIndexLeft(this.query, this.element); + return null; + } + } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphTransaction.java index e81d7946b5..d9ed0a5e33 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphTransaction.java @@ -151,7 +151,7 @@ protected void reset() { } @Override - protected AbstractTransaction indexTransaction() { + protected GraphIndexTransaction indexTransaction() { return this.indexTx; } @@ -1184,7 +1184,7 @@ private boolean filterResultFromIndexQuery(Query query, HugeElement elem) { if (cq.optimized() == OptimizedType.INDEX.ordinal()) { LOG.info("Remove left index: {}, query: {}", elem, cq); - this.indexTx.removeIndexLeft(cq, elem); + this.indexTx.asyncRemoveIndexLeft(cq, elem); } return false; } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/EphemeralJob.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/EphemeralJob.java new file mode 100644 index 0000000000..181c220b14 --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/EphemeralJob.java @@ -0,0 +1,34 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.job; + +import com.baidu.hugegraph.task.TaskCallable; + +public abstract class EphemeralJob extends TaskCallable { + + public abstract String type(); + + public abstract T execute() throws Exception; + + @Override + public T call() throws Exception { + return this.execute(); + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/EphemeralJobBuilder.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/EphemeralJobBuilder.java new file mode 100644 index 0000000000..ddbe2b5d1a --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/EphemeralJobBuilder.java @@ -0,0 +1,82 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.job; + +import com.baidu.hugegraph.HugeGraph; +import com.baidu.hugegraph.backend.id.Id; +import com.baidu.hugegraph.backend.id.IdGenerator; +import com.baidu.hugegraph.task.HugeTask; +import com.baidu.hugegraph.task.TaskScheduler; +import com.baidu.hugegraph.util.E; + +public class EphemeralJobBuilder { + + private final HugeGraph graph; + + private String name; + private String input; + private EphemeralJob job; + + private static int tempTaskId = 0; + + public static EphemeralJobBuilder of(final HugeGraph graph) { + return new EphemeralJobBuilder<>(graph); + } + + public EphemeralJobBuilder(final HugeGraph graph) { + this.graph = graph; + } + + public EphemeralJobBuilder name(String name) { + this.name = name; + return this; + } + + public EphemeralJobBuilder input(String input) { + this.input = input; + return this; + } + + public EphemeralJobBuilder job(EphemeralJob job) { + this.job = job; + return this; + } + + public HugeTask schedule() { + E.checkArgumentNotNull(this.name, "Job name can't be null"); + E.checkArgumentNotNull(this.job, "Job can't be null"); + + HugeTask task = new HugeTask<>(this.genTaskId(), null, this.job); + task.type(this.job.type()); + task.name(this.name); + if (this.input != null) { + task.input(this.input); + } + + TaskScheduler scheduler = this.graph.taskScheduler(); + scheduler.schedule(task); + + return task; + } + + private Id genTaskId() { + return IdGenerator.of(--tempTaskId); + } +}