Skip to content

Commit

Permalink
chore: single task and batch consume remove left index task
Browse files Browse the repository at this point in the history
  • Loading branch information
zyxxoo committed Apr 23, 2023
1 parent 267ff6d commit 8441b2a
Showing 1 changed file with 193 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,23 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;

import groovy.lang.Tuple2;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hugegraph.backend.page.PageIds;
import org.apache.hugegraph.backend.page.PageState;
import org.apache.hugegraph.backend.store.BackendEntry;
import org.apache.hugegraph.backend.store.BackendStore;
import org.apache.hugegraph.util.Events;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.CloseableIterator;
Expand Down Expand Up @@ -81,7 +89,6 @@
import org.apache.hugegraph.structure.HugeIndex.IdWithExpiredTime;
import org.apache.hugegraph.structure.HugeProperty;
import org.apache.hugegraph.structure.HugeVertex;
import org.apache.hugegraph.task.HugeTask;
import org.apache.hugegraph.type.HugeType;
import org.apache.hugegraph.type.define.Action;
import org.apache.hugegraph.type.define.HugeKeys;
Expand All @@ -104,6 +111,8 @@ public class GraphIndexTransaction extends AbstractTransaction {
private final Analyzer textAnalyzer;
private final int indexIntersectThresh;

private RemoveLeftIndexJobHelper removeLeftIndexJobHelper;

public GraphIndexTransaction(HugeGraphParams graph, BackendStore store) {
super(graph, store);

Expand All @@ -113,17 +122,13 @@ public GraphIndexTransaction(HugeGraphParams graph, BackendStore store) {
final HugeConfig conf = graph.configuration();
this.indexIntersectThresh =
conf.get(CoreOptions.QUERY_INDEX_INTERSECT_THRESHOLD);
this.removeLeftIndexJobHelper = RemoveLeftIndexJobHelper.instance(graph);
}

protected Id asyncRemoveIndexLeft(ConditionQuery query,
HugeElement element) {
protected void asyncRemoveIndexLeft(ConditionQuery query,
HugeElement element) {
LOG.info("Remove left index: {}, query: {}", element, query);
RemoveLeftIndexJob job = new RemoveLeftIndexJob(query, element);
HugeTask<?> task = EphemeralJobBuilder.of(this.graph())
.name(element.id().asString())
.job(job)
.schedule();
return task.id();
this.removeLeftIndexJobHelper.add(query, element);
}

@Watched(prefix = "index")
Expand Down Expand Up @@ -1717,22 +1722,104 @@ private static Query parent(Collection<Query> queries) {
}
}

public static class RemoveLeftIndexJobHelper {

public static final int CAPACITY = 2000;

private final BlockingQueue<Tuple2<ConditionQuery, HugeElement>> pendingQueue =
new ArrayBlockingQueue<>(CAPACITY);

private AtomicReference<State> state;

private HugeGraph graph;

private static final Map<HugeGraph, RemoveLeftIndexJobHelper> graph2JobHelpers =
new ConcurrentHashMap<>();

public static RemoveLeftIndexJobHelper instance(HugeGraphParams params) {
RemoveLeftIndexJobHelper helper = graph2JobHelpers.computeIfAbsent(params.graph(), g -> {
params.graphEventHub().listen(Events.GRAPH_DROP, event -> {
LOG.debug("RemoveLeftIndexJobHelper accepts event '{}'", event.name());
event.checkArgs(HugeGraph.class);
HugeGraph graph = (HugeGraph) event.args()[0];
graph2JobHelpers.remove(graph);
return null;
});

return new RemoveLeftIndexJobHelper(g);
});

return helper;
}

enum State {
INIT,
EXECUTE,
}

private RemoveLeftIndexJobHelper(HugeGraph graph) {
this.state = new AtomicReference<>(State.INIT);
this.graph = graph;
}

public void add(ConditionQuery query, HugeElement element) {
if (query == null || element == null) {
return;
}

if (!pendingQueue.offer(new Tuple2(query, element))) {
LOG.warn("The pending queue of RemoveLeftIndexJob is full");
this.reSchedule();
return;
}

this.reSchedule();
}

public void consumeComplete() {
this.state.compareAndSet(State.EXECUTE, State.INIT);
}

public void reSchedule() {
if (this.state.compareAndSet(State.INIT, State.EXECUTE)) {
try {
RemoveLeftIndexJob job = new RemoveLeftIndexJob(pendingQueue, this::consumeComplete,
this::reSchedule);
EphemeralJobBuilder.of(this.graph)
.name("batch-remove-left-index")
.job(job)
.schedule();
} catch (Throwable e) {
// Maybe if it fails, consider clearing all the data in the pendingQueue,
// or start a scheduled retry task to retry until success.
LOG.warn("Failed to schedule RemoveLeftIndexJob", e);
this.pendingQueue.clear();
this.state.compareAndSet(State.EXECUTE, State.INIT);
}
}
}
}

public static class RemoveLeftIndexJob extends EphemeralJob<Object> {

private static final String REMOVE_LEFT_INDEX = "remove_left_index";
public static final int MAX_CONSUME_COUNT = 1000;

private final ConditionQuery query;
private final HugeElement element;
private GraphIndexTransaction tx;
private Set<ConditionQuery.LeftIndex> leftIndexes;

private RemoveLeftIndexJob(ConditionQuery query, HugeElement element) {
E.checkArgumentNotNull(query, "query");
E.checkArgumentNotNull(element, "element");
this.query = query;
this.element = element;
this.tx = null;
this.leftIndexes = query.getLeftIndexOfElement(element.id());
private Queue<Tuple2<ConditionQuery, HugeElement>> queue;
private Runnable completeCallback;
private Runnable scheduler;

private RemoveLeftIndexJob(Queue<Tuple2<ConditionQuery, HugeElement>> queue,
Runnable completeCallback,
Runnable scheduler) {
E.checkArgumentNotNull(queue, "The queue can't be null");
E.checkArgumentNotNull(completeCallback, "The callback can't be null");
E.checkArgumentNotNull(scheduler, "The scheduler can't be null");
this.queue = queue;
this.completeCallback = completeCallback;
this.scheduler = scheduler;
}

@Override
Expand All @@ -1741,11 +1828,86 @@ public String type() {
}

@Override
public Object execute() {
this.tx = this.element.schemaLabel().system() ?
this.params().systemTransaction().indexTransaction() :
this.params().graphTransaction().indexTransaction();
return this.removeIndexLeft(this.query, this.element);
public Object execute() throws InterruptedException {
boolean stop = false;
List<Tuple2<ConditionQuery, HugeElement>> systemElements = new ArrayList<>();
List<Tuple2<ConditionQuery, HugeElement>> graphElements = new ArrayList<>();
final int pageSize = 100;
int count = 0;
int consumeCount = 0;
InterruptedException interruptedException = null;
while (!stop) {
if (interruptedException == null && Thread.currentThread().isInterrupted()) {
interruptedException = new InterruptedException();
}

if (this.queue.isEmpty() || consumeCount > MAX_CONSUME_COUNT ||
interruptedException != null) {
this.completeCallback.run();
stop = true;
if (!this.queue.isEmpty()) {
this.scheduler.run();
}
continue;
}

try {
while (!this.queue.isEmpty() && (systemElements.size() + graphElements.size()) < pageSize) {
Tuple2<ConditionQuery, HugeElement> query2Element = this.queue.poll();
if (query2Element.getSecond().schemaLabel().system()) {
systemElements.add(query2Element);
} else {
graphElements.add(query2Element);
}

consumeCount++;
}

if (!systemElements.isEmpty()) {
count += this.removeIndexLefts(this.params().systemTransaction().indexTransaction(),
systemElements);
}

if (!graphElements.isEmpty()) {
count += this.removeIndexLefts(this.params().graphTransaction().indexTransaction(),
graphElements);
}
} catch (InterruptedException e) {
interruptedException = e;
} finally {
graphElements.clear();
systemElements.clear();
}
}

if (interruptedException != null) {
Thread.currentThread().interrupt();
throw interruptedException;
}

return count;
}

private long removeIndexLefts(GraphIndexTransaction tx,
List<Tuple2<ConditionQuery, HugeElement>> query2Elements)
throws InterruptedException {
this.tx = tx;
long count = 0;
try {
for (Tuple2<ConditionQuery, HugeElement> query2Element : query2Elements) {
count += this.removeIndexLeft(query2Element.getFirst(), query2Element.getSecond());
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException();
}
}
this.tx.commit();
} catch (Throwable e) {
if (e instanceof InterruptedException) {
throw e;
}
LOG.warn("Failed to remove left index for graph element", e);
}
return count;
}

protected long removeIndexLeft(ConditionQuery query,
Expand Down Expand Up @@ -1776,24 +1938,25 @@ protected long removeIndexLeft(ConditionQuery query,
long sCount = 0;
for (ConditionQuery cq: ConditionQueryFlatten.flatten(query)) {
// Process range index
rCount += this.processRangeIndexLeft(cq, element);
rCount += this.processRangeIndexLeft(query, cq, element);
// Process secondary index or search index
sCount += this.processSecondaryOrSearchIndexLeft(cq, element);
}
this.tx.commit();
return rCount + sCount;
}

private long processRangeIndexLeft(ConditionQuery query,
ConditionQuery flattenQuery,
HugeElement element) {
long count = 0;
if (this.leftIndexes == null) {
Set<ConditionQuery.LeftIndex> leftIndexes = query.getLeftIndexOfElement(element.id());
if (CollectionUtils.isEmpty(leftIndexes)) {
return count;
}

for (ConditionQuery.LeftIndex leftIndex : this.leftIndexes) {
for (ConditionQuery.LeftIndex leftIndex : leftIndexes) {
Set<Object> indexValues = leftIndex.indexFieldValues();
IndexLabel indexLabel = this.findMatchedIndexLabel(query,
IndexLabel indexLabel = this.findMatchedIndexLabel(flattenQuery,
leftIndex);
assert indexLabel != null;

Expand All @@ -1807,7 +1970,7 @@ private long processRangeIndexLeft(ConditionQuery query,
}
}
// Remove LeftIndex after constructing remove job
this.query.removeElementLeftIndex(element.id());
query.removeElementLeftIndex(element.id());
this.tx.commit();
return count;
}
Expand Down

0 comments on commit 8441b2a

Please sign in to comment.