Skip to content

Commit

Permalink
improve
Browse files Browse the repository at this point in the history
  • Loading branch information
zyxxoo committed May 19, 2023
1 parent 79c00ea commit 3539c53
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ protected void asyncRemoveIndexLeft(ConditionQuery query,
HugeElement element) {
LOG.info("Remove left index: {}, query: {}", element, query);
RemoveLeftIndexJob job = new RemoveLeftIndexJob(query, element);
job.selfCommit(false);
this.params().submitEphemeralJob(job);
}

Expand Down Expand Up @@ -1723,20 +1722,13 @@ public static class RemoveLeftIndexJob extends EphemeralJob<Long>
private GraphIndexTransaction tx;
private Set<ConditionQuery.LeftIndex> leftIndexes;

private boolean selfCommit;

private RemoveLeftIndexJob(ConditionQuery query, HugeElement element) {
E.checkArgumentNotNull(query, "query");
E.checkArgumentNotNull(element, "element");
this.query = query;
this.element = element;
this.tx = null;
this.leftIndexes = query.getLeftIndexOfElement(element.id());
this.selfCommit = true;
}

public void selfCommit(boolean selfCommit) {
this.selfCommit = selfCommit;
}

@Override
Expand Down Expand Up @@ -1784,9 +1776,6 @@ protected long removeIndexLeft(ConditionQuery query,
// Process secondary index or search index
sCount += this.processSecondaryOrSearchIndexLeft(cq, element);
}
if (this.selfCommit) {
this.tx.commit();
}
return rCount + sCount;
}

Expand Down Expand Up @@ -1814,9 +1803,6 @@ private long processRangeIndexLeft(ConditionQuery query,
}
// Remove LeftIndex after constructing remove job
this.query.removeElementLeftIndex(element.id());
if (this.selfCommit) {
this.tx.commit();
}
return count;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

import org.apache.hugegraph.HugeGraphParams;
import org.apache.hugegraph.backend.query.Query;
import org.apache.hugegraph.backend.tx.GraphIndexTransaction;
import org.apache.hugegraph.backend.tx.GraphTransaction;
import org.apache.hugegraph.job.EphemeralJob;
import org.apache.hugegraph.job.EphemeralJobBuilder;
import org.apache.hugegraph.util.Log;
Expand All @@ -36,7 +36,7 @@ public class EphemeralJobQueue {

private static final Logger LOG = Log.logger(EphemeralJobQueue.class);

private static final int CAPACITY = 2000;
private static final long CAPACITY = 100 * Query.COMMIT_BATCH;

private final BlockingQueue<EphemeralJob<?>> pendingQueue;

Expand All @@ -52,20 +52,22 @@ private enum State {
public EphemeralJobQueue(HugeGraphParams graph) {
this.state = new AtomicReference<>(State.INIT);
this.graph = graph;
this.pendingQueue = new ArrayBlockingQueue<>(CAPACITY);
this.pendingQueue = new ArrayBlockingQueue<>((int) CAPACITY);
}

public void add(EphemeralJob<?> job) {
public boolean add(EphemeralJob<?> job) {
if (job == null) {
return;
return false;
}

if (!this.pendingQueue.offer(job)) {
LOG.warn("The pending queue of EphemeralJobQueue is full, {} job " +
"will be ignored", job.type());
return false;
}

this.reScheduleIfNeeded();
return true;
}

protected HugeGraphParams params() {
Expand Down Expand Up @@ -110,7 +112,7 @@ public static class BatchEphemeralJob extends EphemeralJob<Object> {

private static final long PAGE_SIZE = Query.COMMIT_BATCH;
private static final String BATCH_EPHEMERAL_JOB = "batch-ephemeral-job";
private static final int MAX_CONSUME_COUNT = EphemeralJobQueue.CAPACITY / 2;
private static final long MAX_CONSUME_COUNT = 2 * PAGE_SIZE;

private WeakReference<EphemeralJobQueue> queueWeakReference;

Expand Down Expand Up @@ -184,8 +186,8 @@ public Object execute() throws Exception {
}

private Object executeBatchJob(List<EphemeralJob<?>> jobs, Object prevResult) throws Exception {
GraphIndexTransaction graphTx = this.params().systemTransaction().indexTransaction();
GraphIndexTransaction systemTx = this.params().graphTransaction().indexTransaction();
GraphTransaction graphTx = this.params().systemTransaction();
GraphTransaction systemTx = this.params().graphTransaction();
Object result = prevResult;
for (EphemeralJob<?> job : jobs) {
this.initJob(job);
Expand Down

0 comments on commit 3539c53

Please sign in to comment.