Skip to content

Commit

Permalink
add BOTH direction support for triangle_count/cluster_coeffcient (#24)
Browse files Browse the repository at this point in the history
change log:
1. add BOTH direction support for triangle_count/cluster_coeffcientith.
2. fix extra triangle count with multi edges between tow adjacent vertices.
3. set default value of direction to BOTH for degree_centrality and cluster_coeffcient .
4. add workers for triangle_count and cluster_coeffcient.
5. fix closeness: multi shortest paths cause results illogical.
6. rename rings_detect to rings, rename limit to each_limit which means limit number of rings of each source vertex, and don't do dedup if passed each_limit > 0.
7. unify top for 4 centrality algos: sorted results when top = -1, unsorted results when top = 0.
8. fusiform: rename top to top_similars (expected >= 0).
9. fusiform/rings: add limit param which means max number of results, and remove capacity param and hardcode to 100000000.

Change-Id: I9ddf8553e6d86b99adbff8b972890d69d623fa1a
  • Loading branch information
javeme committed Oct 19, 2022
1 parent d1f4177 commit 0501b41
Show file tree
Hide file tree
Showing 16 changed files with 315 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.tinkerpop.gremlin.structure.Property;
import org.apache.tinkerpop.gremlin.structure.Transaction;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.CloseableIterator;

import com.baidu.hugegraph.HugeException;
import com.baidu.hugegraph.backend.id.Id;
Expand All @@ -44,6 +45,7 @@
import com.baidu.hugegraph.iterator.FilterIterator;
import com.baidu.hugegraph.iterator.FlatMapperIterator;
import com.baidu.hugegraph.job.Job;
import com.baidu.hugegraph.job.algorithm.Consumers.StopExecution;
import com.baidu.hugegraph.testutil.Whitebox;
import com.baidu.hugegraph.traversal.algorithm.HugeTraverser;
import com.baidu.hugegraph.type.HugeType;
Expand All @@ -61,6 +63,7 @@ public abstract class AbstractAlgorithm implements Algorithm {

public static final long MAX_RESULT_SIZE = 100L * Bytes.MB;
public static final long MAX_QUERY_LIMIT = 100000000L; // about 100GB
public static final long MAX_CAPACITY = MAX_QUERY_LIMIT;
public static final int BATCH = 500;

public static final String CATEGORY_AGGR = "aggregate";
Expand All @@ -87,11 +90,13 @@ public abstract class AbstractAlgorithm implements Algorithm {
public static final String KEY_CLEAR = "clear";
public static final String KEY_CAPACITY = "capacity";
public static final String KEY_LIMIT = "limit";
public static final String KEY_EACH_LIMIT = "each_limit";
public static final String KEY_ALPHA = "alpha";
public static final String KEY_WORKERS = "workers";

public static final long DEFAULT_CAPACITY = 10000000L;
public static final long DEFAULT_LIMIT = 100L;
public static final long DEFAULT_EACH_LIMIT = 1L;
public static final long DEFAULT_DEGREE = 100L;
public static final long DEFAULT_SAMPLE = 1L;
public static final long DEFAULT_TIMES = 20L;
Expand Down Expand Up @@ -131,6 +136,14 @@ protected static Directions direction(Map<String, Object> parameters) {
return parseDirection(direction);
}

protected static Directions direction4Out(Map<String, Object> parameters) {
if (!parameters.containsKey(KEY_DIRECTION)) {
return Directions.OUT;
}
Object direction = parameter(parameters, KEY_DIRECTION);
return parseDirection(direction);
}

protected static Directions directionOutIn(Map<String, Object> parameters) {
if (!parameters.containsKey(KEY_DIRECTION)) {
return Directions.OUT;
Expand All @@ -148,24 +161,18 @@ protected static double alpha(Map<String, Object> parameters) {
return DEFAULT_ALPHA;
}
double alpha = parameterDouble(parameters, KEY_ALPHA);
checkAlpha(alpha);
return alpha;
}

public static void checkAlpha(double alpha) {
E.checkArgument(alpha > 0 && alpha <= 1.0,
E.checkArgument(alpha > 0.0 && alpha <= 1.0,
"The value of %s must be in range (0, 1], but got %s",
KEY_ALPHA, alpha);
return alpha;
}

protected static long top(Map<String, Object> parameters) {
if (!parameters.containsKey(KEY_TOP)) {
return 0L;
}
long top = parameterLong(parameters, KEY_TOP);
E.checkArgument(top >= 0L,
"The value of %s must be >= 0, but got %s",
KEY_TOP, top);
HugeTraverser.checkNonNegativeOrNoLimit(top, KEY_TOP);
return top;
}

Expand Down Expand Up @@ -196,6 +203,15 @@ protected static long limit(Map<String, Object> parameters) {
return limit;
}

protected static long eachLimit(Map<String, Object> parameters) {
if (!parameters.containsKey(KEY_EACH_LIMIT)) {
return DEFAULT_EACH_LIMIT;
}
long limit = parameterLong(parameters, KEY_EACH_LIMIT);
HugeTraverser.checkPositiveOrNoLimit(limit, KEY_EACH_LIMIT);
return limit;
}

protected static long sample(Map<String, Object> parameters) {
if (!parameters.containsKey(KEY_SAMPLE)) {
return DEFAULT_SAMPLE;
Expand Down Expand Up @@ -355,21 +371,24 @@ protected long traverse(String sourceLabel, String sourceCLabel,

Consumers<Vertex> consumers = new Consumers<>(this.executor,
consumer, done);
consumers.start();
consumers.start("task-" + this.job.task().id());
long total = 0L;
try {
long total = 0L;
while (vertices.hasNext()) {
this.updateProgress(++this.progress);
total++;
Vertex v = vertices.next();
consumers.provide(v);
}
return total;
} catch (StopExecution e) {
// pass
} catch (Throwable e) {
throw Consumers.wrapException(e);
} finally {
consumers.await();
CloseableIterator.closeIterator(vertices);
}
return total;
}

protected Iterator<Vertex> vertices() {
Expand Down Expand Up @@ -520,9 +539,11 @@ public void add(K key, long value) {
}

public void put(K key, long value) {
assert this.topN != 0L;
this.tops.put(key, new MutableLong(value));
// keep 2x buffer
if (this.tops.size() > this.topN * 2) {
if (this.tops.size() > this.topN * 2 &&
this.topN != HugeTraverser.NO_LIMIT) {
this.shrinkIfNeeded(this.topN);
}
}
Expand All @@ -537,7 +558,10 @@ public Set<Map.Entry<K, MutableLong>> entrySet() {
}

private void shrinkIfNeeded(long limit) {
if (this.tops.size() >= limit && limit != HugeTraverser.NO_LIMIT) {
assert limit != 0L;
if (this.tops.size() >= limit &&
(limit > 0L || limit == HugeTraverser.NO_LIMIT)) {
// Just do sort if limit=NO_LIMIT, else do sort and shrink
this.tops = HugeTraverser.topN(this.tops, true, limit);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,23 +73,25 @@ public Consumers(ExecutorService executor,
this.queue = new ArrayBlockingQueue<>(this.queueSize);
}

public void start() {
public void start(String name) {
this.ending = false;
this.exception = null;
if (this.executor == null) {
return;
}
LOG.info("Starting {} workers with queue size {}...",
this.workers, this.queueSize);
LOG.info("Starting {} workers[{}] with queue size {}...",
this.workers, name, this.queueSize);
for (int i = 0; i < this.workers; i++) {
this.executor.submit(() -> {
try {
this.run();
this.done();
} catch (Throwable e) {
// Only the first exception of one thread can be stored
this.exception = e;
LOG.error("Error when running task", e);
this.exception = e;
if (!(e instanceof StopExecution)) {
LOG.error("Error when running task", e);
}
this.done();
} finally {
this.latch.countDown();
Expand Down Expand Up @@ -183,4 +185,17 @@ public static RuntimeException wrapException(Throwable e) {
throw new HugeException("Error when running task: %s",
HugeException.rootCause(e).getMessage(), e);
}

public static class StopExecution extends HugeException {

private static final long serialVersionUID = -371829356182454517L;

public StopExecution(String message) {
super(message);
}

public StopExecution(String message, Object... args) {
super(message, args);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.baidu.hugegraph.job.algorithm.path.RingsDetectAlgorithm;
import com.baidu.hugegraph.job.algorithm.rank.PageRankAlgorithm;
import com.baidu.hugegraph.task.HugeTask;
import com.baidu.hugegraph.traversal.algorithm.HugeTraverser;
import com.baidu.hugegraph.traversal.optimize.HugeScriptTraversal;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.InsertionOrderUtil;
Expand Down Expand Up @@ -123,6 +124,7 @@ private static class Traverser extends AlgoTraverser {
"depth", 10L,
"degree", -1L,
"sample", -1L,
"top", -1L /* sorted */,
"workers", 0);

public Traverser(Job<Object> job) {
Expand Down Expand Up @@ -158,6 +160,8 @@ public Object subgraphStat(Job<Object> job) {
parameters = ImmutableMap.<String, Object>builder()
.putAll(PARAMS)
.put("count_only", true)
.put("each_limit", NO_LIMIT)
.put("limit", NO_LIMIT)
.build();
results.put("rings", algo.call(job, parameters));

Expand All @@ -175,6 +179,7 @@ private Map<Object, Double> pageRanks(Job<Object> job) {
Vertex vertex = vertices.next();
ranks.put(vertex.id(), vertex.value(R_RANK));
}
ranks = HugeTraverser.topN(ranks, true, NO_LIMIT);
return ranks;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@
import java.util.Map;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.tinkerpop.gremlin.process.traversal.Order;
import org.apache.tinkerpop.gremlin.process.traversal.Pop;
import org.apache.tinkerpop.gremlin.process.traversal.Scope;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
import org.apache.tinkerpop.gremlin.structure.Column;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Vertex;

Expand Down Expand Up @@ -130,10 +133,10 @@ protected GraphTraversal<Vertex, Vertex> constructPathUnit(
return unit;
}

protected GraphTraversal<Vertex, Vertex> filterNonShortestPath(
GraphTraversal<Vertex, Vertex>
t) {
long size = this.graph().traversal().V().limit(MAX_QUERY_LIMIT)
protected <V> GraphTraversal<V, V> filterNonShortestPath(
GraphTraversal<V, V> t,
boolean keepOneShortestPath) {
long size = this.graph().traversal().V().limit(100000L)
.count().next();
Map<Pair<Id, Id>, Integer> triples = new HashMap<>((int) size);
return t.filter(it -> {
Expand All @@ -142,15 +145,32 @@ protected GraphTraversal<Vertex, Vertex> filterNonShortestPath(
int len = it.<List<?>>path(Pop.all, "v").size();
Pair<Id, Id> key = Pair.of(start, end);
Integer shortest = triples.get(key);
if (shortest != null && shortest != len) {
if (shortest != null && len > shortest) {
// ignore non shortest path
return false;
}
if (shortest == null) {
triples.put(key, len);
} else {
assert len == shortest;
if (keepOneShortestPath) {
return false;
}
}
return true;
});
}

protected GraphTraversal<Vertex, ?> topN(GraphTraversal<Vertex, ?> t,
long topN) {
if (topN > 0L || topN == NO_LIMIT) {
t = t.order(Scope.local).by(Column.values, Order.desc);
if (topN > 0L) {
assert topN != NO_LIMIT;
t = t.limit(Scope.local, topN);
}
}
return t;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,10 @@

import java.util.Map;

import org.apache.tinkerpop.gremlin.process.traversal.Order;
import org.apache.tinkerpop.gremlin.process.traversal.P;
import org.apache.tinkerpop.gremlin.process.traversal.Pop;
import org.apache.tinkerpop.gremlin.process.traversal.Scope;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
import org.apache.tinkerpop.gremlin.structure.Column;
import org.apache.tinkerpop.gremlin.structure.Vertex;

import com.baidu.hugegraph.job.Job;
Expand Down Expand Up @@ -72,22 +69,19 @@ public Object betweenessCentrality(Directions direction,
long topN) {
assert depth > 0;
assert degree > 0L || degree == NO_LIMIT;
assert topN >= 0L;
assert topN >= 0L || topN == NO_LIMIT;

GraphTraversal<Vertex, Vertex> t = constructSource(sourceLabel,
sourceSample,
sourceCLabel);
t = constructPath(t, direction, label, degree, sample,
sourceLabel, sourceCLabel);
t = t.emit().until(__.loops().is(P.gte(depth)));
t = filterNonShortestPath(t);
t = filterNonShortestPath(t, false);

GraphTraversal<Vertex, ?> tg = t.select(Pop.all, "v")
.unfold().id()
.groupCount().order(Scope.local)
.by(Column.values, Order.desc);
GraphTraversal<Vertex, ?> tLimit = topN <= 0L ? tg :
tg.limit(Scope.local, topN);
.unfold().id().groupCount();
GraphTraversal<Vertex, ?> tLimit = topN(tg, topN);

return this.execute(tLimit, () -> tLimit.next());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@
import java.util.Map;

import org.apache.tinkerpop.gremlin.process.traversal.Operator;
import org.apache.tinkerpop.gremlin.process.traversal.Order;
import org.apache.tinkerpop.gremlin.process.traversal.P;
import org.apache.tinkerpop.gremlin.process.traversal.Pop;
import org.apache.tinkerpop.gremlin.process.traversal.Scope;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
import org.apache.tinkerpop.gremlin.structure.Column;
import org.apache.tinkerpop.gremlin.structure.Vertex;

import com.baidu.hugegraph.job.Job;
Expand Down Expand Up @@ -81,23 +79,21 @@ public Object closenessCentrality(Directions direction,
long topN) {
assert depth > 0;
assert degree > 0L || degree == NO_LIMIT;
assert topN >= 0L;
assert topN >= 0L || topN == NO_LIMIT;

GraphTraversal<Vertex, Vertex> t = constructSource(sourceLabel,
sourceSample,
sourceCLabel);
t = constructPath(t, direction, label, degree, sample,
sourceLabel, sourceCLabel);
t = t.emit().until(__.loops().is(P.gte(depth)));
t = filterNonShortestPath(t);
t = filterNonShortestPath(t, true);

GraphTraversal<Vertex, ?> tg;
tg = t.group().by(__.select(Pop.first, "v").id())
.by(__.select(Pop.all, "v").count(Scope.local)
.sack(Operator.div).sack().sum())
.order(Scope.local).by(Column.values, Order.desc);
GraphTraversal<Vertex, ?> tLimit = topN <= 0L ? tg :
tg.limit(Scope.local, topN);
.sack(Operator.div).sack().sum());
GraphTraversal<Vertex, ?> tLimit = topN(tg, topN);

return this.execute(tLimit, () -> tLimit.next());
}
Expand Down
Loading

0 comments on commit 0501b41

Please sign in to comment.