Skip to content

Commit

Permalink
improve
Browse files Browse the repository at this point in the history
  • Loading branch information
zyxxoo committed May 5, 2023
1 parent 574ddd5 commit 10d7a51
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,11 @@

public class EphemeralJobQueue {

protected static final Logger LOG = Log.logger(EphemeralJobQueue.class);
private static final Logger LOG = Log.logger(EphemeralJobQueue.class);

public static final int CAPACITY = 2000;
private static final int CAPACITY = 2000;

private final BlockingQueue<EphemeralJob<?>> pendingQueue =
new ArrayBlockingQueue<>(CAPACITY);
private final BlockingQueue<EphemeralJob<?>> pendingQueue;

private AtomicReference<State> state;

Expand All @@ -52,6 +51,7 @@ private enum State {
public EphemeralJobQueue(HugeGraph graph) {
this.state = new AtomicReference<>(State.INIT);
this.graph = graph;
this.pendingQueue = new ArrayBlockingQueue<>(CAPACITY);
}

public void add(EphemeralJob<?> job) {
Expand Down Expand Up @@ -103,9 +103,9 @@ public static class BatchEphemeralJob extends EphemeralJob<Object> {

private static final int PAGE_SIZE = 100;
private static final String BATCH_EPHEMERAL_JOB = "batch-ephemeral-job";
public static final int MAX_CONSUME_COUNT = EphemeralJobQueue.CAPACITY / 2;
private static final int MAX_CONSUME_COUNT = EphemeralJobQueue.CAPACITY / 2;

WeakReference<EphemeralJobQueue> queueWeakReference;
private WeakReference<EphemeralJobQueue> queueWeakReference;

public BatchEphemeralJob(EphemeralJobQueue queue) {
this.queueWeakReference = new WeakReference<>(queue);
Expand Down Expand Up @@ -136,7 +136,7 @@ public Object execute() throws Exception {
}

if (queue.isEmpty() || consumeCount > MAX_CONSUME_COUNT ||
interruptedException != null) {
interruptedException != null) {
queue.consumeComplete();
stop = true;
if (!queue.isEmpty()) {
Expand Down Expand Up @@ -216,6 +216,6 @@ public Object call() throws Exception {
}

public interface Reduce<T> {
long reduce(long t1, T t2);
long reduce(T t1, T t2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1711,6 +1711,7 @@ private static Query parent(Collection<Query> queries) {
}
}
}

public static class RemoveLeftIndexJob extends EphemeralJob<Long>
implements EphemeralJobQueue.Reduce<Long> {

Expand Down Expand Up @@ -1961,7 +1962,7 @@ private HugeElement newestElement(HugeElement element) {
}

@Override
public long reduce(long t1, Long t2) {
public long reduce(Long t1, Long t2) {
return t1 + t2;
}
}
Expand Down

0 comments on commit 10d7a51

Please sign in to comment.