Skip to content

Commit

Permalink
fix algorithm can't stop caused by threads exception (#18)
Browse files Browse the repository at this point in the history
Change-Id: I546682b19fb5a84a65dc2a3bd77d62b386722bfa
  • Loading branch information
javeme authored and imbajin committed Nov 7, 2022
1 parent 2ee51c4 commit ff88f11
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -354,18 +354,20 @@ protected long traverse(String sourceLabel, String sourceCLabel,
Consumers<Vertex> consumers = new Consumers<>(this.executor,
consumer, done);
consumers.start();

long total = 0L;
while (vertices.hasNext()) {
this.updateProgress(++this.progress);
total++;
Vertex v = vertices.next();
consumers.provide(v);
try {
long total = 0L;
while (vertices.hasNext()) {
this.updateProgress(++this.progress);
total++;
Vertex v = vertices.next();
consumers.provide(v);
}
return total;
} catch (Throwable e) {
throw Consumers.wrapException(e);
} finally {
consumers.await();
}

consumers.await();

return total;
}

protected Iterator<Vertex> vertices() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import org.slf4j.Logger;

import com.baidu.hugegraph.HugeException;
import com.baidu.hugegraph.util.ExecutorUtil;
import com.baidu.hugegraph.util.Log;

Expand All @@ -50,6 +51,7 @@ public class Consumers<V> {
private final BlockingQueue<V> queue;

private volatile boolean ending = false;
private volatile Throwable exception = null;

public Consumers(ExecutorService executor, Consumer<V> consumer) {
this(executor, consumer, null);
Expand All @@ -72,6 +74,8 @@ public Consumers(ExecutorService executor,
}

public void start() {
this.ending = false;
this.exception = null;
if (this.executor == null) {
return;
}
Expand All @@ -81,11 +85,12 @@ public void start() {
this.executor.submit(() -> {
try {
this.run();
if (this.done != null) {
this.done.run();
}
this.done();
} catch (Throwable e) {
// Only the first exception of one thread can be stored
this.exception = e;
LOG.error("Error when running task", e);
this.done();
} finally {
this.latch.countDown();
}
Expand Down Expand Up @@ -120,10 +125,19 @@ private boolean consume() {
return true;
}

public void provide(V v) {
private void done() {
if (this.done != null) {
this.done.run();
}
}

public void provide(V v) throws Throwable {
if (this.executor == null) {
assert this.exception == null;
// do job directly if without thread pool
this.consumer.accept(v);
} else if (this.exception != null) {
throw this.exception;
} else {
try {
this.queue.put(v);
Expand All @@ -137,14 +151,12 @@ public void await() {
this.ending = true;
if (this.executor == null) {
// call done() directly if without thread pool
if (this.done != null) {
this.done.run();
}
this.done();
} else {
try {
this.latch.await();
} catch (InterruptedException e) {
LOG.warn("Interrupted", e);;
LOG.warn("Interrupted", e);
}
}
}
Expand All @@ -163,4 +175,12 @@ public static ExecutorService newThreadPool(String prefix, int workers) {
return ExecutorUtil.newFixedThreadPool(workers, name);
}
}

public static RuntimeException wrapException(Throwable e) {
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
}
throw new HugeException("Error when running task: %s",
HugeException.rootCause(e).getMessage(), e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -421,21 +421,25 @@ private double moveCommunities(int pass) {
moved.incrementAndGet();
}
});
consumers.start();

while (vertices.hasNext()) {
this.updateProgress(++this.progress);
Vertex v = vertices.next();
if (needSkipVertex(pass, v)) {
// skip the old intermediate data, or filter clabel
continue;
consumers.start();
try {
while (vertices.hasNext()) {
this.updateProgress(++this.progress);
Vertex v = vertices.next();
if (needSkipVertex(pass, v)) {
// skip the old intermediate data, or filter clabel
continue;
}
total++;
consumers.provide(v);
}
total++;
consumers.provide(v);
} catch (Throwable e) {
throw Consumers.wrapException(e);
} finally {
consumers.await();
}

consumers.await();

// maybe always shocking when set degree limited
return total == 0L ? 0d : moved.doubleValue() / total;
}
Expand All @@ -455,19 +459,24 @@ private void mergeCommunities(int pass) {
// commit when finished
this.graph().tx().commit();
});
consumers.start();

for (Pair<Community, Set<Id>> pair : comms) {
Community c = pair.getLeft();
if (c.empty()) {
continue;
consumers.start();
try {
for (Pair<Community, Set<Id>> pair : comms) {
Community c = pair.getLeft();
if (c.empty()) {
continue;
}
this.progress += pair.getRight().size();
this.updateProgress(this.progress);
//this.mergeCommunity(pass, pair.getLeft(), pair.getRight());
consumers.provide(pair);
}
this.progress += pair.getRight().size();
this.updateProgress(this.progress);
//this.mergeCommunity(pass, pair.getLeft(), pair.getRight());
consumers.provide(pair);
} catch (Throwable e) {
throw Consumers.wrapException(e);
} finally {
consumers.await();
}
consumers.await();

this.graph().tx().commit();
assert this.allMembersExist(pass);
Expand Down

0 comments on commit ff88f11

Please sign in to comment.