From 788a948c8ed49bc0f172298b1cd23d583e8c9247 Mon Sep 17 00:00:00 2001 From: vaughn Date: Fri, 5 May 2023 16:18:20 +0800 Subject: [PATCH] improve --- .../backend/tx/EphemeralJobQueue.java | 34 +++++++++---------- .../backend/tx/GraphIndexTransaction.java | 11 +++++- 2 files changed, 26 insertions(+), 19 deletions(-) diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/EphemeralJobQueue.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/EphemeralJobQueue.java index 42b16a54cb..134409f529 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/EphemeralJobQueue.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/EphemeralJobQueue.java @@ -33,14 +33,13 @@ public class EphemeralJobQueue { - protected static final Logger LOG = Log.logger(EphemeralJobQueue.class); + private static final Logger LOG = Log.logger(EphemeralJobQueue.class); - public static final int CAPACITY = 2000; + private static final int CAPACITY = 2000; - private final BlockingQueue> pendingQueue = - new ArrayBlockingQueue<>(CAPACITY); + private final BlockingQueue> pendingQueue; - private AtomicReference state; + private final AtomicReference state; private final HugeGraph graph; @@ -52,6 +51,7 @@ private enum State { public EphemeralJobQueue(HugeGraph graph) { this.state = new AtomicReference<>(State.INIT); this.graph = graph; + this.pendingQueue = new ArrayBlockingQueue<>(CAPACITY); } public void add(EphemeralJob job) { @@ -103,9 +103,9 @@ public static class BatchEphemeralJob extends EphemeralJob { private static final int PAGE_SIZE = 100; private static final String BATCH_EPHEMERAL_JOB = "batch-ephemeral-job"; - public static final int MAX_CONSUME_COUNT = EphemeralJobQueue.CAPACITY / 2; + private static final int MAX_CONSUME_COUNT = EphemeralJobQueue.CAPACITY / 2; - WeakReference queueWeakReference; + private WeakReference queueWeakReference; public BatchEphemeralJob(EphemeralJobQueue queue) { this.queueWeakReference = new WeakReference<>(queue); @@ -119,7 +119,7 @@ public String type() { @Override public Object execute() throws Exception { boolean stop = false; - long count = 0; + Object ret = null; int consumeCount = 0; InterruptedException interruptedException = null; EphemeralJobQueue queue; @@ -136,7 +136,7 @@ public Object execute() throws Exception { } if (queue.isEmpty() || consumeCount > MAX_CONSUME_COUNT || - interruptedException != null) { + interruptedException != null) { queue.consumeComplete(); stop = true; if (!queue.isEmpty()) { @@ -156,7 +156,7 @@ public Object execute() throws Exception { continue; } - this.executeBatchJob(batchJobs); + ret = this.executeBatchJob(batchJobs, ret); } catch (InterruptedException e) { interruptedException = e; @@ -170,26 +170,24 @@ public Object execute() throws Exception { throw interruptedException; } - return count; + return ret; } - private long executeBatchJob(List> jobs) throws Exception { + private Object executeBatchJob(List> jobs, Object prev) throws Exception { GraphIndexTransaction graphTx = this.params().systemTransaction().indexTransaction(); GraphIndexTransaction systemTx = this.params().graphTransaction().indexTransaction(); - long count = 0; + Object ret = prev; for (EphemeralJob job : jobs) { Object obj = job.call(); if (job instanceof Reduce) { - count = ((Reduce) job).reduce(count, obj); - } else { - count ++; + ret = ((Reduce) job).reduce(ret, obj); } } graphTx.commit(); systemTx.commit(); - return count; + return ret; } @Override @@ -216,6 +214,6 @@ public Object call() throws Exception { } public interface Reduce { - long reduce(long t1, T t2); + T reduce(T t1, T t2); } } diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphIndexTransaction.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphIndexTransaction.java index c25076f8f2..f7d1038427 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphIndexTransaction.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphIndexTransaction.java @@ -1711,6 +1711,7 @@ private static Query parent(Collection queries) { } } } + public static class RemoveLeftIndexJob extends EphemeralJob implements EphemeralJobQueue.Reduce { @@ -1961,7 +1962,15 @@ private HugeElement newestElement(HugeElement element) { } @Override - public long reduce(long t1, Long t2) { + public Long reduce(Long t1, Long t2) { + if (t1 == null) { + return t2; + } + + if (t2 == null) { + return t1; + } + return t1 + t2; } }