Skip to content

Commit

Permalink
improve
Browse files Browse the repository at this point in the history
  • Loading branch information
zyxxoo committed May 5, 2023
1 parent 574ddd5 commit 788a948
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,13 @@

public class EphemeralJobQueue {

protected static final Logger LOG = Log.logger(EphemeralJobQueue.class);
private static final Logger LOG = Log.logger(EphemeralJobQueue.class);

public static final int CAPACITY = 2000;
private static final int CAPACITY = 2000;

private final BlockingQueue<EphemeralJob<?>> pendingQueue =
new ArrayBlockingQueue<>(CAPACITY);
private final BlockingQueue<EphemeralJob<?>> pendingQueue;

private AtomicReference<State> state;
private final AtomicReference<State> state;

private final HugeGraph graph;

Expand All @@ -52,6 +51,7 @@ private enum State {
public EphemeralJobQueue(HugeGraph graph) {
this.state = new AtomicReference<>(State.INIT);
this.graph = graph;
this.pendingQueue = new ArrayBlockingQueue<>(CAPACITY);
}

public void add(EphemeralJob<?> job) {
Expand Down Expand Up @@ -103,9 +103,9 @@ public static class BatchEphemeralJob extends EphemeralJob<Object> {

private static final int PAGE_SIZE = 100;
private static final String BATCH_EPHEMERAL_JOB = "batch-ephemeral-job";
public static final int MAX_CONSUME_COUNT = EphemeralJobQueue.CAPACITY / 2;
private static final int MAX_CONSUME_COUNT = EphemeralJobQueue.CAPACITY / 2;

WeakReference<EphemeralJobQueue> queueWeakReference;
private WeakReference<EphemeralJobQueue> queueWeakReference;

public BatchEphemeralJob(EphemeralJobQueue queue) {
this.queueWeakReference = new WeakReference<>(queue);
Expand All @@ -119,7 +119,7 @@ public String type() {
@Override
public Object execute() throws Exception {
boolean stop = false;
long count = 0;
Object ret = null;
int consumeCount = 0;
InterruptedException interruptedException = null;
EphemeralJobQueue queue;
Expand All @@ -136,7 +136,7 @@ public Object execute() throws Exception {
}

if (queue.isEmpty() || consumeCount > MAX_CONSUME_COUNT ||
interruptedException != null) {
interruptedException != null) {
queue.consumeComplete();
stop = true;
if (!queue.isEmpty()) {
Expand All @@ -156,7 +156,7 @@ public Object execute() throws Exception {
continue;
}

this.executeBatchJob(batchJobs);
ret = this.executeBatchJob(batchJobs, ret);

} catch (InterruptedException e) {
interruptedException = e;
Expand All @@ -170,26 +170,24 @@ public Object execute() throws Exception {
throw interruptedException;
}

return count;
return ret;
}

private long executeBatchJob(List<EphemeralJob<?>> jobs) throws Exception {
private Object executeBatchJob(List<EphemeralJob<?>> jobs, Object prev) throws Exception {
GraphIndexTransaction graphTx = this.params().systemTransaction().indexTransaction();
GraphIndexTransaction systemTx = this.params().graphTransaction().indexTransaction();
long count = 0;
Object ret = prev;
for (EphemeralJob<?> job : jobs) {
Object obj = job.call();
if (job instanceof Reduce) {
count = ((Reduce) job).reduce(count, obj);
} else {
count ++;
ret = ((Reduce) job).reduce(ret, obj);
}
}

graphTx.commit();
systemTx.commit();

return count;
return ret;
}

@Override
Expand All @@ -216,6 +214,6 @@ public Object call() throws Exception {
}

public interface Reduce<T> {
long reduce(long t1, T t2);
T reduce(T t1, T t2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1711,6 +1711,7 @@ private static Query parent(Collection<Query> queries) {
}
}
}

public static class RemoveLeftIndexJob extends EphemeralJob<Long>
implements EphemeralJobQueue.Reduce<Long> {

Expand Down Expand Up @@ -1961,7 +1962,15 @@ private HugeElement newestElement(HugeElement element) {
}

@Override
public long reduce(long t1, Long t2) {
public Long reduce(Long t1, Long t2) {
if (t1 == null) {
return t2;
}

if (t2 == null) {
return t1;
}

return t1 + t2;
}
}
Expand Down

0 comments on commit 788a948

Please sign in to comment.