From ff88f116c44e84556377b51038e2af0afa535460 Mon Sep 17 00:00:00 2001 From: Jermy Li Date: Wed, 3 Jun 2020 22:15:33 +0800 Subject: [PATCH] fix algorithm can't stop caused by threads exception (#18) Change-Id: I546682b19fb5a84a65dc2a3bd77d62b386722bfa --- .../job/algorithm/AbstractAlgorithm.java | 24 +++++---- .../hugegraph/job/algorithm/Consumers.java | 36 ++++++++++--- .../job/algorithm/comm/LouvainTraverser.java | 51 +++++++++++-------- 3 files changed, 71 insertions(+), 40 deletions(-) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java index d3311772a6..327905ad38 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java @@ -354,18 +354,20 @@ protected long traverse(String sourceLabel, String sourceCLabel, Consumers consumers = new Consumers<>(this.executor, consumer, done); consumers.start(); - - long total = 0L; - while (vertices.hasNext()) { - this.updateProgress(++this.progress); - total++; - Vertex v = vertices.next(); - consumers.provide(v); + try { + long total = 0L; + while (vertices.hasNext()) { + this.updateProgress(++this.progress); + total++; + Vertex v = vertices.next(); + consumers.provide(v); + } + return total; + } catch (Throwable e) { + throw Consumers.wrapException(e); + } finally { + consumers.await(); } - - consumers.await(); - - return total; } protected Iterator vertices() { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java index 526419c46c..f5d01d9803 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java @@ -29,6 +29,7 @@ import org.slf4j.Logger; +import com.baidu.hugegraph.HugeException; import com.baidu.hugegraph.util.ExecutorUtil; import com.baidu.hugegraph.util.Log; @@ -50,6 +51,7 @@ public class Consumers { private final BlockingQueue queue; private volatile boolean ending = false; + private volatile Throwable exception = null; public Consumers(ExecutorService executor, Consumer consumer) { this(executor, consumer, null); @@ -72,6 +74,8 @@ public Consumers(ExecutorService executor, } public void start() { + this.ending = false; + this.exception = null; if (this.executor == null) { return; } @@ -81,11 +85,12 @@ public void start() { this.executor.submit(() -> { try { this.run(); - if (this.done != null) { - this.done.run(); - } + this.done(); } catch (Throwable e) { + // Only the first exception of one thread can be stored + this.exception = e; LOG.error("Error when running task", e); + this.done(); } finally { this.latch.countDown(); } @@ -120,10 +125,19 @@ private boolean consume() { return true; } - public void provide(V v) { + private void done() { + if (this.done != null) { + this.done.run(); + } + } + + public void provide(V v) throws Throwable { if (this.executor == null) { + assert this.exception == null; // do job directly if without thread pool this.consumer.accept(v); + } else if (this.exception != null) { + throw this.exception; } else { try { this.queue.put(v); @@ -137,14 +151,12 @@ public void await() { this.ending = true; if (this.executor == null) { // call done() directly if without thread pool - if (this.done != null) { - this.done.run(); - } + this.done(); } else { try { this.latch.await(); } catch (InterruptedException e) { - LOG.warn("Interrupted", e);; + LOG.warn("Interrupted", e); } } } @@ -163,4 +175,12 @@ public static ExecutorService newThreadPool(String prefix, int workers) { return ExecutorUtil.newFixedThreadPool(workers, name); } } + + public static RuntimeException wrapException(Throwable e) { + if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } + throw new HugeException("Error when running task: %s", + HugeException.rootCause(e).getMessage(), e); + } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java index e55152b10b..5d7548aa3b 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java @@ -421,21 +421,25 @@ private double moveCommunities(int pass) { moved.incrementAndGet(); } }); - consumers.start(); - while (vertices.hasNext()) { - this.updateProgress(++this.progress); - Vertex v = vertices.next(); - if (needSkipVertex(pass, v)) { - // skip the old intermediate data, or filter clabel - continue; + consumers.start(); + try { + while (vertices.hasNext()) { + this.updateProgress(++this.progress); + Vertex v = vertices.next(); + if (needSkipVertex(pass, v)) { + // skip the old intermediate data, or filter clabel + continue; + } + total++; + consumers.provide(v); } - total++; - consumers.provide(v); + } catch (Throwable e) { + throw Consumers.wrapException(e); + } finally { + consumers.await(); } - consumers.await(); - // maybe always shocking when set degree limited return total == 0L ? 0d : moved.doubleValue() / total; } @@ -455,19 +459,24 @@ private void mergeCommunities(int pass) { // commit when finished this.graph().tx().commit(); }); - consumers.start(); - for (Pair> pair : comms) { - Community c = pair.getLeft(); - if (c.empty()) { - continue; + consumers.start(); + try { + for (Pair> pair : comms) { + Community c = pair.getLeft(); + if (c.empty()) { + continue; + } + this.progress += pair.getRight().size(); + this.updateProgress(this.progress); + //this.mergeCommunity(pass, pair.getLeft(), pair.getRight()); + consumers.provide(pair); } - this.progress += pair.getRight().size(); - this.updateProgress(this.progress); - //this.mergeCommunity(pass, pair.getLeft(), pair.getRight()); - consumers.provide(pair); + } catch (Throwable e) { + throw Consumers.wrapException(e); + } finally { + consumers.await(); } - consumers.await(); this.graph().tx().commit(); assert this.allMembersExist(pass);