Skip to content

Commit

Permalink
remove left index in async mode
Browse files Browse the repository at this point in the history
implemented: #247

Change-Id: Ie1baacb93f18d66ed7169161f15f398701e955a3
  • Loading branch information
zhoney committed Jan 11, 2019
1 parent 3def353 commit 6317e48
Show file tree
Hide file tree
Showing 4 changed files with 359 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Vertex;

import com.baidu.hugegraph.HugeException;
import com.baidu.hugegraph.HugeGraph;
Expand All @@ -42,9 +46,12 @@
import com.baidu.hugegraph.backend.query.ConditionQueryFlatten;
import com.baidu.hugegraph.backend.query.IdQuery;
import com.baidu.hugegraph.backend.query.Query;
import com.baidu.hugegraph.backend.serializer.AbstractSerializer;
import com.baidu.hugegraph.backend.store.BackendEntry;
import com.baidu.hugegraph.backend.store.BackendStore;
import com.baidu.hugegraph.exception.NoIndexException;
import com.baidu.hugegraph.job.EphemeralJob;
import com.baidu.hugegraph.job.EphemeralJobBuilder;
import com.baidu.hugegraph.perf.PerfUtil.Watched;
import com.baidu.hugegraph.schema.IndexLabel;
import com.baidu.hugegraph.schema.PropertyKey;
Expand All @@ -54,6 +61,7 @@
import com.baidu.hugegraph.structure.HugeIndex;
import com.baidu.hugegraph.structure.HugeProperty;
import com.baidu.hugegraph.structure.HugeVertex;
import com.baidu.hugegraph.task.HugeTask;
import com.baidu.hugegraph.type.HugeType;
import com.baidu.hugegraph.type.define.HugeKeys;
import com.baidu.hugegraph.type.define.IndexType;
Expand All @@ -79,110 +87,14 @@ public GraphIndexTransaction(HugeGraph graph, BackendStore store) {
assert this.textAnalyzer != null;
}

protected void removeIndexLeft(ConditionQuery query, HugeElement element) {
if (element.type() != HugeType.VERTEX &&
element.type() != HugeType.EDGE_OUT &&
element.type() != HugeType.EDGE_IN) {
throw new HugeException("Only accept element of type VERTEX and " +
"EDGE to remove left index, but got: '%s'",
element.type());
}

// TODO: remove left index in async thread
for (ConditionQuery cq: ConditionQueryFlatten.flatten(query)) {
// Process range index
this.processRangeIndexLeft(cq, element);
// Process secondary index or search index
this.processSecondaryOrSearchIndexLeft(cq, element);
}
this.commit();
}

private void processRangeIndexLeft(ConditionQuery query,
HugeElement element) {
// Construct index ConditionQuery
Set<MatchedIndex> matchedIndexes = collectMatchedIndexes(query);
IndexQueries queries = null;
for (MatchedIndex index : matchedIndexes) {
if (index.schemaLabel().id().equals(element.schemaLabel().id())) {
queries = index.constructIndexQueries(query);
break;
}
}
E.checkState(queries != null,
"Can't construct left-index query for '%s'", query);

for (ConditionQuery q : queries.values()) {
if (!q.resultType().isRangeIndex()) {
continue;
}
// Query and delete index equals element id
for (Iterator<BackendEntry> it = super.query(q); it.hasNext();) {
BackendEntry entry = it.next();
HugeIndex index = this.serializer.readIndex(graph(), q, entry);
if (index.elementIds().contains(element.id())) {
index.resetElementIds();
index.elementIds(element.id());
this.doEliminate(this.serializer.writeIndex(index));
}
}
}
}

private void processSecondaryOrSearchIndexLeft(ConditionQuery query,
HugeElement element) {
HugeElement deletion = element.copyAsFresh();
Set<Id> propKeys = query.userpropKeys();
Set<Id> incorrectPKs = InsertionOrderUtil.newSet();
for (Id key : propKeys) {
Set<Object> conditionValues = query.userpropValues(key);
E.checkState(!conditionValues.isEmpty(),
"Expect user property values for key '%s', " +
"but got none", key);
if (conditionValues.size() > 1) {
// It's inside/between Query (processed in range index)
return;
}
Object propValue = deletion.getProperty(key).value();
Object conditionValue = conditionValues.iterator().next();
if (!propValue.equals(conditionValue)) {
PropertyKey pkey = this.graph().propertyKey(key);
deletion.addProperty(pkey, conditionValue);
incorrectPKs.add(key);
}
}

// Delete unused index
for (IndexLabel il : relatedIndexLabels(deletion)) {
if (!CollectionUtil.hasIntersection(il.indexFields(), incorrectPKs)) {
continue;
}
// Skip if search index is not wrong
if (il.indexType() == IndexType.SEARCH) {
Id field = il.indexField();
String cond = deletion.<String>getPropertyValue(field);
String actual = element.<String>getPropertyValue(field);
if (this.matchSearchIndexWords(actual, cond)) {
/*
* If query by two search index, one is correct but
* the other is wrong, we should not delete the correct
*/
continue;
}
}
// Delete index with error property
this.updateIndex(il.id(), deletion, true);
// Rebuild index if delete correct index part
if (il.indexType() == IndexType.SECONDARY) {
/*
* When it's a composite secondary index,
* if the suffix property is wrong and the prefix property
* is correct, the correct prefix part will be deleted,
* so rebuild the index again with the correct property.
*/
this.updateIndex(il.id(), element, false);
}
}
protected Id asyncRemoveIndexLeft(ConditionQuery query,
HugeElement element) {
RemoveLeftIndexJob callable = new RemoveLeftIndexJob(query, element);
HugeTask<?> task = EphemeralJobBuilder.of(this.graph())
.name(element.name())
.job(callable)
.schedule();
return task.id();
}

@Watched(prefix = "index")
Expand Down Expand Up @@ -314,7 +226,7 @@ private void updateIndex(IndexLabel indexLabel, Object propValue,
* @return converted id query
*/
@Watched(prefix = "index")
public Query query(ConditionQuery query) {
public Query indexQuery(ConditionQuery query) {
// Index query must have been flattened in Graph tx
query.checkFlattened();

Expand Down Expand Up @@ -1091,10 +1003,228 @@ public static IndexQueries of(IndexLabel il, ConditionQuery query) {
}
}

public static enum OptimizedType {
public enum OptimizedType {
NONE,
PRIMARY_KEY,
SORT_KEYS,
INDEX
}

public static class RemoveLeftIndexJob extends EphemeralJob<Object> {

private static final String REMOVE_LEFT_INDEX = "remove_left_index";

private ConditionQuery query;
private HugeElement element;
private GraphIndexTransaction tx;

private RemoveLeftIndexJob(ConditionQuery query,
HugeElement element) {
E.checkArgumentNotNull(query, "query");
E.checkArgumentNotNull(element, "element");
this.query = query;
this.element = element;
}

@Override
public String type() {
return REMOVE_LEFT_INDEX;
}

@Override
public Object execute() {
this.tx = this.graph().graphTransaction().indexTransaction();
return this.removeIndexLeft(this.query, this.element);
}

protected long removeIndexLeft(ConditionQuery query,
HugeElement element) {
if (element.type() != HugeType.VERTEX &&
element.type() != HugeType.EDGE_OUT &&
element.type() != HugeType.EDGE_IN) {
throw new HugeException("Only accept element of type VERTEX " +
"and EDGE to remove left index, " +
"but got: '%s'", element.type());
}
long rCount = 0;
long sCount = 0;
for (ConditionQuery cq: ConditionQueryFlatten.flatten(query)) {
// Process range index
rCount += this.processRangeIndexLeft(cq, element);
// Process secondary index or search index
sCount += this.processSecondaryOrSearchIndexLeft(cq, element);
}
this.tx.commit();
return rCount + sCount;
}

private long processRangeIndexLeft(ConditionQuery query,
HugeElement element) {
GraphIndexTransaction tx = this.tx;
AbstractSerializer serializer = tx.serializer;
long count = 0;
// Construct index ConditionQuery
Set<MatchedIndex> matchedIndexes = tx.collectMatchedIndexes(query);
IndexQueries queries = null;
Id elementLabelId = element.schemaLabel().id();
for (MatchedIndex index : matchedIndexes) {
if (index.schemaLabel().id().equals(elementLabelId)) {
queries = index.constructIndexQueries(query);
break;
}
}
E.checkState(queries != null,
"Can't construct left-index query for '%s'", query);

for (ConditionQuery q : queries.values()) {
if (!q.resultType().isRangeIndex()) {
continue;
}
// Query and delete index equals element id
for (Iterator<BackendEntry> it = tx.query(q); it.hasNext();) {
BackendEntry entry = it.next();
HugeIndex index = serializer.readIndex(graph(), q, entry);
if (index.elementIds().contains(element.id())) {
index.resetElementIds();
index.elementIds(element.id());
tx.doEliminate(serializer.writeIndex(index));
tx.commit();
// If deleted by error, re-add deleted index again
if (this.deletedByError(q, element)) {
tx.doAppend(serializer.writeIndex(index));
tx.commit();
} else {
count++;
}
}
}
}
return count;
}

private long processSecondaryOrSearchIndexLeft(ConditionQuery query,
HugeElement element) {
Map<PropertyKey, Object> incorrectPKs = InsertionOrderUtil.newMap();
HugeElement deletion = this.constructErrorElem(query, element,
incorrectPKs);
if (deletion == null) {
return 0;
}

// Delete unused index
long count = 0;
Set<Id> incorrectPkIds;
for (IndexLabel il : relatedIndexLabels(deletion)) {
incorrectPkIds = incorrectPKs.keySet().stream()
.map(PropertyKey::id)
.collect(Collectors.toSet());
Collection<Id> incorrectIndexFields = CollectionUtil.intersect(
il.indexFields(),
incorrectPkIds);
if (incorrectIndexFields.isEmpty()) {
continue;
}
// Skip if search index is not wrong
if (il.indexType() == IndexType.SEARCH) {
Id field = il.indexField();
String cond = deletion.<String>getPropertyValue(field);
String actual = element.<String>getPropertyValue(field);
if (this.tx.matchSearchIndexWords(actual, cond)) {
/*
* If query by two search index, one is correct but
* the other is wrong, we should not delete the correct
*/
continue;
}
}
// Delete index with error property
this.tx.updateIndex(il.id(), deletion, true);
// Rebuild index if delete correct index part
if (il.indexType() == IndexType.SECONDARY) {
/*
* When it's a composite secondary index,
* if the suffix property is wrong and the prefix property
* is correct, the correct prefix part will be deleted,
* so rebuild the index again with the correct property.
*/
this.tx.updateIndex(il.id(), element, false);
}
this.tx.commit();
if (this.deletedByError(element, incorrectIndexFields,
incorrectPKs)) {
this.tx.updateIndex(il.id(), deletion, false);
this.tx.commit();
} else {
count++;
}
}
return count;
}

private HugeElement constructErrorElem(
ConditionQuery query, HugeElement element,
Map<PropertyKey, Object> incorrectPKs) {
HugeElement errorElem = element.copyAsFresh();
Set<Id> propKeys = query.userpropKeys();
for (Id key : propKeys) {
Set<Object> conditionValues = query.userpropValues(key);
E.checkState(!conditionValues.isEmpty(),
"Expect user property values for key '%s', " +
"but got none", key);
if (conditionValues.size() > 1) {
// It's inside/between Query (processed in range index)
return null;
}
Object propValue = errorElem.getProperty(key).value();
Object conditionValue = conditionValues.iterator().next();
if (!propValue.equals(conditionValue)) {
PropertyKey pkey = this.graph().propertyKey(key);
errorElem.addProperty(pkey, conditionValue);
incorrectPKs.put(pkey, conditionValue);
}
}
return errorElem;
}

private boolean deletedByError(ConditionQuery query,
HugeElement element) {
HugeElement elem = this.newestElement(element);
if (elem == null) {
return false;
}
return query.test(elem);
}

private boolean deletedByError(HugeElement element,
Collection<Id> ilFields,
Map<PropertyKey, Object> incorrectPKs) {
HugeElement elem = this.newestElement(element);
for (Map.Entry<PropertyKey, Object> e : incorrectPKs.entrySet()) {
PropertyKey pk = e.getKey();
Object value = e.getValue();
if (ilFields.contains(pk.id()) &&
value.equals(elem.getPropertyValue(pk.id()))) {
return true;
}
}
return false;
}

private HugeElement newestElement(HugeElement element) {
boolean isVertex = element instanceof HugeVertex;
if (isVertex) {
Iterator<Vertex> iterV = this.graph().vertices(element.id());
if (iterV.hasNext()) {
return (HugeVertex) iterV.next();
}
} else {
assert element instanceof HugeEdge;
Iterator<Edge> iterE = this.graph().edges(element.id());
if (iterE.hasNext()) {
return (HugeEdge) iterE.next();
}
}
return null;
}
}
}
Loading

0 comments on commit 6317e48

Please sign in to comment.