Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove left index in async mode #285

Merged
merged 1 commit into from
Jan 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Vertex;

import com.baidu.hugegraph.HugeException;
import com.baidu.hugegraph.HugeGraph;
Expand All @@ -42,9 +46,12 @@
import com.baidu.hugegraph.backend.query.ConditionQueryFlatten;
import com.baidu.hugegraph.backend.query.IdQuery;
import com.baidu.hugegraph.backend.query.Query;
import com.baidu.hugegraph.backend.serializer.AbstractSerializer;
import com.baidu.hugegraph.backend.store.BackendEntry;
import com.baidu.hugegraph.backend.store.BackendStore;
import com.baidu.hugegraph.exception.NoIndexException;
import com.baidu.hugegraph.job.EphemeralJob;
import com.baidu.hugegraph.job.EphemeralJobBuilder;
import com.baidu.hugegraph.perf.PerfUtil.Watched;
import com.baidu.hugegraph.schema.IndexLabel;
import com.baidu.hugegraph.schema.PropertyKey;
Expand All @@ -54,6 +61,7 @@
import com.baidu.hugegraph.structure.HugeIndex;
import com.baidu.hugegraph.structure.HugeProperty;
import com.baidu.hugegraph.structure.HugeVertex;
import com.baidu.hugegraph.task.HugeTask;
import com.baidu.hugegraph.type.HugeType;
import com.baidu.hugegraph.type.define.HugeKeys;
import com.baidu.hugegraph.type.define.IndexType;
Expand All @@ -79,110 +87,14 @@ public GraphIndexTransaction(HugeGraph graph, BackendStore store) {
assert this.textAnalyzer != null;
}

protected void removeIndexLeft(ConditionQuery query, HugeElement element) {
if (element.type() != HugeType.VERTEX &&
element.type() != HugeType.EDGE_OUT &&
element.type() != HugeType.EDGE_IN) {
throw new HugeException("Only accept element of type VERTEX and " +
"EDGE to remove left index, but got: '%s'",
element.type());
}

// TODO: remove left index in async thread
for (ConditionQuery cq: ConditionQueryFlatten.flatten(query)) {
// Process range index
this.processRangeIndexLeft(cq, element);
// Process secondary index or search index
this.processSecondaryOrSearchIndexLeft(cq, element);
}
this.commit();
}

private void processRangeIndexLeft(ConditionQuery query,
HugeElement element) {
// Construct index ConditionQuery
Set<MatchedIndex> matchedIndexes = collectMatchedIndexes(query);
IndexQueries queries = null;
for (MatchedIndex index : matchedIndexes) {
if (index.schemaLabel().id().equals(element.schemaLabel().id())) {
queries = index.constructIndexQueries(query);
break;
}
}
E.checkState(queries != null,
"Can't construct left-index query for '%s'", query);

for (ConditionQuery q : queries.values()) {
if (!q.resultType().isRangeIndex()) {
continue;
}
// Query and delete index equals element id
for (Iterator<BackendEntry> it = super.query(q); it.hasNext();) {
BackendEntry entry = it.next();
HugeIndex index = this.serializer.readIndex(graph(), q, entry);
if (index.elementIds().contains(element.id())) {
index.resetElementIds();
index.elementIds(element.id());
this.doEliminate(this.serializer.writeIndex(index));
}
}
}
}

private void processSecondaryOrSearchIndexLeft(ConditionQuery query,
HugeElement element) {
HugeElement deletion = element.copyAsFresh();
Set<Id> propKeys = query.userpropKeys();
Set<Id> incorrectPKs = InsertionOrderUtil.newSet();
for (Id key : propKeys) {
Set<Object> conditionValues = query.userpropValues(key);
E.checkState(!conditionValues.isEmpty(),
"Expect user property values for key '%s', " +
"but got none", key);
if (conditionValues.size() > 1) {
// It's inside/between Query (processed in range index)
return;
}
Object propValue = deletion.getProperty(key).value();
Object conditionValue = conditionValues.iterator().next();
if (!propValue.equals(conditionValue)) {
PropertyKey pkey = this.graph().propertyKey(key);
deletion.addProperty(pkey, conditionValue);
incorrectPKs.add(key);
}
}

// Delete unused index
for (IndexLabel il : relatedIndexLabels(deletion)) {
if (!CollectionUtil.hasIntersection(il.indexFields(), incorrectPKs)) {
continue;
}
// Skip if search index is not wrong
if (il.indexType() == IndexType.SEARCH) {
Id field = il.indexField();
String cond = deletion.<String>getPropertyValue(field);
String actual = element.<String>getPropertyValue(field);
if (this.matchSearchIndexWords(actual, cond)) {
/*
* If query by two search index, one is correct but
* the other is wrong, we should not delete the correct
*/
continue;
}
}
// Delete index with error property
this.updateIndex(il.id(), deletion, true);
// Rebuild index if delete correct index part
if (il.indexType() == IndexType.SECONDARY) {
/*
* When it's a composite secondary index,
* if the suffix property is wrong and the prefix property
* is correct, the correct prefix part will be deleted,
* so rebuild the index again with the correct property.
*/
this.updateIndex(il.id(), element, false);
}
}
protected Id asyncRemoveIndexLeft(ConditionQuery query,
HugeElement element) {
RemoveLeftIndexJob callable = new RemoveLeftIndexJob(query, element);
HugeTask<?> task = EphemeralJobBuilder.of(this.graph())
.name(element.name())
.job(callable)
.schedule();
return task.id();
}

zhoney marked this conversation as resolved.
Show resolved Hide resolved
@Watched(prefix = "index")
Expand Down Expand Up @@ -314,7 +226,7 @@ private void updateIndex(IndexLabel indexLabel, Object propValue,
* @return converted id query
*/
@Watched(prefix = "index")
public Query query(ConditionQuery query) {
public Query indexQuery(ConditionQuery query) {
// Index query must have been flattened in Graph tx
query.checkFlattened();

Expand Down Expand Up @@ -1091,10 +1003,228 @@ public static IndexQueries of(IndexLabel il, ConditionQuery query) {
}
}

public static enum OptimizedType {
public enum OptimizedType {
NONE,
PRIMARY_KEY,
SORT_KEYS,
INDEX
}

public static class RemoveLeftIndexJob extends EphemeralJob<Object> {

private static final String REMOVE_LEFT_INDEX = "remove_left_index";

private ConditionQuery query;
private HugeElement element;
private GraphIndexTransaction tx;

private RemoveLeftIndexJob(ConditionQuery query,
HugeElement element) {
E.checkArgumentNotNull(query, "query");
E.checkArgumentNotNull(element, "element");
this.query = query;
this.element = element;
}

@Override
public String type() {
return REMOVE_LEFT_INDEX;
}

@Override
public Object execute() {
this.tx = this.graph().graphTransaction().indexTransaction();
return this.removeIndexLeft(this.query, this.element);
}

protected long removeIndexLeft(ConditionQuery query,
HugeElement element) {
if (element.type() != HugeType.VERTEX &&
element.type() != HugeType.EDGE_OUT &&
element.type() != HugeType.EDGE_IN) {
throw new HugeException("Only accept element of type VERTEX " +
"and EDGE to remove left index, " +
"but got: '%s'", element.type());
}
long rCount = 0;
long sCount = 0;
for (ConditionQuery cq: ConditionQueryFlatten.flatten(query)) {
// Process range index
rCount += this.processRangeIndexLeft(cq, element);
// Process secondary index or search index
sCount += this.processSecondaryOrSearchIndexLeft(cq, element);
}
this.tx.commit();
return rCount + sCount;
}

private long processRangeIndexLeft(ConditionQuery query,
HugeElement element) {
GraphIndexTransaction tx = this.tx;
AbstractSerializer serializer = tx.serializer;
long count = 0;
// Construct index ConditionQuery
Set<MatchedIndex> matchedIndexes = tx.collectMatchedIndexes(query);
IndexQueries queries = null;
Id elementLabelId = element.schemaLabel().id();
for (MatchedIndex index : matchedIndexes) {
if (index.schemaLabel().id().equals(elementLabelId)) {
queries = index.constructIndexQueries(query);
break;
}
}
E.checkState(queries != null,
"Can't construct left-index query for '%s'", query);

for (ConditionQuery q : queries.values()) {
if (!q.resultType().isRangeIndex()) {
continue;
}
// Query and delete index equals element id
for (Iterator<BackendEntry> it = tx.query(q); it.hasNext();) {
BackendEntry entry = it.next();
HugeIndex index = serializer.readIndex(graph(), q, entry);
if (index.elementIds().contains(element.id())) {
index.resetElementIds();
index.elementIds(element.id());
tx.doEliminate(serializer.writeIndex(index));
tx.commit();
// If deleted by error, re-add deleted index again
if (this.deletedByError(q, element)) {
tx.doAppend(serializer.writeIndex(index));
tx.commit();
} else {
count++;
}
}
}
}
return count;
}

private long processSecondaryOrSearchIndexLeft(ConditionQuery query,
HugeElement element) {
Map<PropertyKey, Object> incorrectPKs = InsertionOrderUtil.newMap();
HugeElement deletion = this.constructErrorElem(query, element,
incorrectPKs);
if (deletion == null) {
return 0;
}

// Delete unused index
long count = 0;
Set<Id> incorrectPkIds;
for (IndexLabel il : relatedIndexLabels(deletion)) {
incorrectPkIds = incorrectPKs.keySet().stream()
.map(PropertyKey::id)
.collect(Collectors.toSet());
Collection<Id> incorrectIndexFields = CollectionUtil.intersect(
il.indexFields(),
incorrectPkIds);
if (incorrectIndexFields.isEmpty()) {
continue;
}
// Skip if search index is not wrong
if (il.indexType() == IndexType.SEARCH) {
Id field = il.indexField();
String cond = deletion.<String>getPropertyValue(field);
String actual = element.<String>getPropertyValue(field);
if (this.tx.matchSearchIndexWords(actual, cond)) {
/*
* If query by two search index, one is correct but
* the other is wrong, we should not delete the correct
*/
continue;
}
}
// Delete index with error property
this.tx.updateIndex(il.id(), deletion, true);
// Rebuild index if delete correct index part
if (il.indexType() == IndexType.SECONDARY) {
/*
* When it's a composite secondary index,
* if the suffix property is wrong and the prefix property
* is correct, the correct prefix part will be deleted,
* so rebuild the index again with the correct property.
*/
this.tx.updateIndex(il.id(), element, false);
}
this.tx.commit();
if (this.deletedByError(element, incorrectIndexFields,
incorrectPKs)) {
this.tx.updateIndex(il.id(), deletion, false);
this.tx.commit();
} else {
count++;
}
}
return count;
}

private HugeElement constructErrorElem(
ConditionQuery query, HugeElement element,
Map<PropertyKey, Object> incorrectPKs) {
HugeElement errorElem = element.copyAsFresh();
Set<Id> propKeys = query.userpropKeys();
for (Id key : propKeys) {
Set<Object> conditionValues = query.userpropValues(key);
E.checkState(!conditionValues.isEmpty(),
"Expect user property values for key '%s', " +
"but got none", key);
if (conditionValues.size() > 1) {
// It's inside/between Query (processed in range index)
return null;
}
Object propValue = errorElem.getProperty(key).value();
Object conditionValue = conditionValues.iterator().next();
if (!propValue.equals(conditionValue)) {
PropertyKey pkey = this.graph().propertyKey(key);
errorElem.addProperty(pkey, conditionValue);
incorrectPKs.put(pkey, conditionValue);
}
}
return errorElem;
}

private boolean deletedByError(ConditionQuery query,
HugeElement element) {
HugeElement elem = this.newestElement(element);
if (elem == null) {
return false;
}
return query.test(elem);
}

private boolean deletedByError(HugeElement element,
Collection<Id> ilFields,
Map<PropertyKey, Object> incorrectPKs) {
HugeElement elem = this.newestElement(element);
for (Map.Entry<PropertyKey, Object> e : incorrectPKs.entrySet()) {
PropertyKey pk = e.getKey();
Object value = e.getValue();
if (ilFields.contains(pk.id()) &&
value.equals(elem.getPropertyValue(pk.id()))) {
return true;
}
}
return false;
}

private HugeElement newestElement(HugeElement element) {
boolean isVertex = element instanceof HugeVertex;
if (isVertex) {
Iterator<Vertex> iterV = this.graph().vertices(element.id());
if (iterV.hasNext()) {
return (HugeVertex) iterV.next();
}
} else {
assert element instanceof HugeEdge;
Iterator<Edge> iterE = this.graph().edges(element.id());
if (iterE.hasNext()) {
return (HugeEdge) iterE.next();
}
}
return null;
}
}
}
Loading