diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java index 67d508a470..e30ce1e165 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java @@ -36,6 +36,7 @@ import org.apache.tinkerpop.gremlin.structure.Property; import org.apache.tinkerpop.gremlin.structure.Transaction; import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.structure.util.CloseableIterator; import com.baidu.hugegraph.HugeException; import com.baidu.hugegraph.backend.id.Id; @@ -44,6 +45,7 @@ import com.baidu.hugegraph.iterator.FilterIterator; import com.baidu.hugegraph.iterator.FlatMapperIterator; import com.baidu.hugegraph.job.Job; +import com.baidu.hugegraph.job.algorithm.Consumers.StopExecution; import com.baidu.hugegraph.testutil.Whitebox; import com.baidu.hugegraph.traversal.algorithm.HugeTraverser; import com.baidu.hugegraph.type.HugeType; @@ -61,6 +63,7 @@ public abstract class AbstractAlgorithm implements Algorithm { public static final long MAX_RESULT_SIZE = 100L * Bytes.MB; public static final long MAX_QUERY_LIMIT = 100000000L; // about 100GB + public static final long MAX_CAPACITY = MAX_QUERY_LIMIT; public static final int BATCH = 500; public static final String CATEGORY_AGGR = "aggregate"; @@ -87,11 +90,13 @@ public abstract class AbstractAlgorithm implements Algorithm { public static final String KEY_CLEAR = "clear"; public static final String KEY_CAPACITY = "capacity"; public static final String KEY_LIMIT = "limit"; + public static final String KEY_EACH_LIMIT = "each_limit"; public static final String KEY_ALPHA = "alpha"; public static final String KEY_WORKERS = "workers"; public static final long DEFAULT_CAPACITY = 10000000L; public static final long DEFAULT_LIMIT = 100L; + public static final long DEFAULT_EACH_LIMIT = 1L; public static final long DEFAULT_DEGREE = 100L; public static final long DEFAULT_SAMPLE = 1L; public static final long DEFAULT_TIMES = 20L; @@ -131,6 +136,14 @@ protected static Directions direction(Map parameters) { return parseDirection(direction); } + protected static Directions direction4Out(Map parameters) { + if (!parameters.containsKey(KEY_DIRECTION)) { + return Directions.OUT; + } + Object direction = parameter(parameters, KEY_DIRECTION); + return parseDirection(direction); + } + protected static Directions directionOutIn(Map parameters) { if (!parameters.containsKey(KEY_DIRECTION)) { return Directions.OUT; @@ -148,14 +161,10 @@ protected static double alpha(Map parameters) { return DEFAULT_ALPHA; } double alpha = parameterDouble(parameters, KEY_ALPHA); - checkAlpha(alpha); - return alpha; - } - - public static void checkAlpha(double alpha) { - E.checkArgument(alpha > 0 && alpha <= 1.0, + E.checkArgument(alpha > 0.0 && alpha <= 1.0, "The value of %s must be in range (0, 1], but got %s", KEY_ALPHA, alpha); + return alpha; } protected static long top(Map parameters) { @@ -163,9 +172,7 @@ protected static long top(Map parameters) { return 0L; } long top = parameterLong(parameters, KEY_TOP); - E.checkArgument(top >= 0L, - "The value of %s must be >= 0, but got %s", - KEY_TOP, top); + HugeTraverser.checkNonNegativeOrNoLimit(top, KEY_TOP); return top; } @@ -196,6 +203,15 @@ protected static long limit(Map parameters) { return limit; } + protected static long eachLimit(Map parameters) { + if (!parameters.containsKey(KEY_EACH_LIMIT)) { + return DEFAULT_EACH_LIMIT; + } + long limit = parameterLong(parameters, KEY_EACH_LIMIT); + HugeTraverser.checkPositiveOrNoLimit(limit, KEY_EACH_LIMIT); + return limit; + } + protected static long sample(Map parameters) { if (!parameters.containsKey(KEY_SAMPLE)) { return DEFAULT_SAMPLE; @@ -355,21 +371,24 @@ protected long traverse(String sourceLabel, String sourceCLabel, Consumers consumers = new Consumers<>(this.executor, consumer, done); - consumers.start(); + consumers.start("task-" + this.job.task().id()); + long total = 0L; try { - long total = 0L; while (vertices.hasNext()) { this.updateProgress(++this.progress); total++; Vertex v = vertices.next(); consumers.provide(v); } - return total; + } catch (StopExecution e) { + // pass } catch (Throwable e) { throw Consumers.wrapException(e); } finally { consumers.await(); + CloseableIterator.closeIterator(vertices); } + return total; } protected Iterator vertices() { @@ -520,9 +539,11 @@ public void add(K key, long value) { } public void put(K key, long value) { + assert this.topN != 0L; this.tops.put(key, new MutableLong(value)); // keep 2x buffer - if (this.tops.size() > this.topN * 2) { + if (this.tops.size() > this.topN * 2 && + this.topN != HugeTraverser.NO_LIMIT) { this.shrinkIfNeeded(this.topN); } } @@ -537,7 +558,10 @@ public Set> entrySet() { } private void shrinkIfNeeded(long limit) { - if (this.tops.size() >= limit && limit != HugeTraverser.NO_LIMIT) { + assert limit != 0L; + if (this.tops.size() >= limit && + (limit > 0L || limit == HugeTraverser.NO_LIMIT)) { + // Just do sort if limit=NO_LIMIT, else do sort and shrink this.tops = HugeTraverser.topN(this.tops, true, limit); } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java index f5d01d9803..711e95edc8 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java @@ -73,14 +73,14 @@ public Consumers(ExecutorService executor, this.queue = new ArrayBlockingQueue<>(this.queueSize); } - public void start() { + public void start(String name) { this.ending = false; this.exception = null; if (this.executor == null) { return; } - LOG.info("Starting {} workers with queue size {}...", - this.workers, this.queueSize); + LOG.info("Starting {} workers[{}] with queue size {}...", + this.workers, name, this.queueSize); for (int i = 0; i < this.workers; i++) { this.executor.submit(() -> { try { @@ -88,8 +88,10 @@ public void start() { this.done(); } catch (Throwable e) { // Only the first exception of one thread can be stored - this.exception = e; - LOG.error("Error when running task", e); + this.exception = e; + if (!(e instanceof StopExecution)) { + LOG.error("Error when running task", e); + } this.done(); } finally { this.latch.countDown(); @@ -183,4 +185,17 @@ public static RuntimeException wrapException(Throwable e) { throw new HugeException("Error when running task: %s", HugeException.rootCause(e).getMessage(), e); } + + public static class StopExecution extends HugeException { + + private static final long serialVersionUID = -371829356182454517L; + + public StopExecution(String message) { + super(message); + } + + public StopExecution(String message, Object... args) { + super(message, args); + } + } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/SubgraphStatAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/SubgraphStatAlgorithm.java index 385fbbb5b5..4ce97d3628 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/SubgraphStatAlgorithm.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/SubgraphStatAlgorithm.java @@ -39,6 +39,7 @@ import com.baidu.hugegraph.job.algorithm.path.RingsDetectAlgorithm; import com.baidu.hugegraph.job.algorithm.rank.PageRankAlgorithm; import com.baidu.hugegraph.task.HugeTask; +import com.baidu.hugegraph.traversal.algorithm.HugeTraverser; import com.baidu.hugegraph.traversal.optimize.HugeScriptTraversal; import com.baidu.hugegraph.util.E; import com.baidu.hugegraph.util.InsertionOrderUtil; @@ -123,6 +124,7 @@ private static class Traverser extends AlgoTraverser { "depth", 10L, "degree", -1L, "sample", -1L, + "top", -1L /* sorted */, "workers", 0); public Traverser(Job job) { @@ -158,6 +160,8 @@ public Object subgraphStat(Job job) { parameters = ImmutableMap.builder() .putAll(PARAMS) .put("count_only", true) + .put("each_limit", NO_LIMIT) + .put("limit", NO_LIMIT) .build(); results.put("rings", algo.call(job, parameters)); @@ -175,6 +179,7 @@ private Map pageRanks(Job job) { Vertex vertex = vertices.next(); ranks.put(vertex.id(), vertex.value(R_RANK)); } + ranks = HugeTraverser.topN(ranks, true, NO_LIMIT); return ranks; } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/AbstractCentAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/AbstractCentAlgorithm.java index 22372ad097..7b11c134c1 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/AbstractCentAlgorithm.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/AbstractCentAlgorithm.java @@ -24,9 +24,12 @@ import java.util.Map; import org.apache.commons.lang3.tuple.Pair; +import org.apache.tinkerpop.gremlin.process.traversal.Order; import org.apache.tinkerpop.gremlin.process.traversal.Pop; +import org.apache.tinkerpop.gremlin.process.traversal.Scope; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__; +import org.apache.tinkerpop.gremlin.structure.Column; import org.apache.tinkerpop.gremlin.structure.Direction; import org.apache.tinkerpop.gremlin.structure.Vertex; @@ -130,10 +133,10 @@ protected GraphTraversal constructPathUnit( return unit; } - protected GraphTraversal filterNonShortestPath( - GraphTraversal - t) { - long size = this.graph().traversal().V().limit(MAX_QUERY_LIMIT) + protected GraphTraversal filterNonShortestPath( + GraphTraversal t, + boolean keepOneShortestPath) { + long size = this.graph().traversal().V().limit(100000L) .count().next(); Map, Integer> triples = new HashMap<>((int) size); return t.filter(it -> { @@ -142,15 +145,32 @@ protected GraphTraversal filterNonShortestPath( int len = it.>path(Pop.all, "v").size(); Pair key = Pair.of(start, end); Integer shortest = triples.get(key); - if (shortest != null && shortest != len) { + if (shortest != null && len > shortest) { // ignore non shortest path return false; } if (shortest == null) { triples.put(key, len); + } else { + assert len == shortest; + if (keepOneShortestPath) { + return false; + } } return true; }); } + + protected GraphTraversal topN(GraphTraversal t, + long topN) { + if (topN > 0L || topN == NO_LIMIT) { + t = t.order(Scope.local).by(Column.values, Order.desc); + if (topN > 0L) { + assert topN != NO_LIMIT; + t = t.limit(Scope.local, topN); + } + } + return t; + } } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/BetweenessCentralityAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/BetweenessCentralityAlgorithm.java index 3702991632..40b38f6555 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/BetweenessCentralityAlgorithm.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/BetweenessCentralityAlgorithm.java @@ -21,13 +21,10 @@ import java.util.Map; -import org.apache.tinkerpop.gremlin.process.traversal.Order; import org.apache.tinkerpop.gremlin.process.traversal.P; import org.apache.tinkerpop.gremlin.process.traversal.Pop; -import org.apache.tinkerpop.gremlin.process.traversal.Scope; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__; -import org.apache.tinkerpop.gremlin.structure.Column; import org.apache.tinkerpop.gremlin.structure.Vertex; import com.baidu.hugegraph.job.Job; @@ -72,7 +69,7 @@ public Object betweenessCentrality(Directions direction, long topN) { assert depth > 0; assert degree > 0L || degree == NO_LIMIT; - assert topN >= 0L; + assert topN >= 0L || topN == NO_LIMIT; GraphTraversal t = constructSource(sourceLabel, sourceSample, @@ -80,14 +77,11 @@ public Object betweenessCentrality(Directions direction, t = constructPath(t, direction, label, degree, sample, sourceLabel, sourceCLabel); t = t.emit().until(__.loops().is(P.gte(depth))); - t = filterNonShortestPath(t); + t = filterNonShortestPath(t, false); GraphTraversal tg = t.select(Pop.all, "v") - .unfold().id() - .groupCount().order(Scope.local) - .by(Column.values, Order.desc); - GraphTraversal tLimit = topN <= 0L ? tg : - tg.limit(Scope.local, topN); + .unfold().id().groupCount(); + GraphTraversal tLimit = topN(tg, topN); return this.execute(tLimit, () -> tLimit.next()); } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/ClosenessCentralityAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/ClosenessCentralityAlgorithm.java index 56d61504a7..9a25b6394a 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/ClosenessCentralityAlgorithm.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/ClosenessCentralityAlgorithm.java @@ -22,13 +22,11 @@ import java.util.Map; import org.apache.tinkerpop.gremlin.process.traversal.Operator; -import org.apache.tinkerpop.gremlin.process.traversal.Order; import org.apache.tinkerpop.gremlin.process.traversal.P; import org.apache.tinkerpop.gremlin.process.traversal.Pop; import org.apache.tinkerpop.gremlin.process.traversal.Scope; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__; -import org.apache.tinkerpop.gremlin.structure.Column; import org.apache.tinkerpop.gremlin.structure.Vertex; import com.baidu.hugegraph.job.Job; @@ -81,7 +79,7 @@ public Object closenessCentrality(Directions direction, long topN) { assert depth > 0; assert degree > 0L || degree == NO_LIMIT; - assert topN >= 0L; + assert topN >= 0L || topN == NO_LIMIT; GraphTraversal t = constructSource(sourceLabel, sourceSample, @@ -89,15 +87,13 @@ public Object closenessCentrality(Directions direction, t = constructPath(t, direction, label, degree, sample, sourceLabel, sourceCLabel); t = t.emit().until(__.loops().is(P.gte(depth))); - t = filterNonShortestPath(t); + t = filterNonShortestPath(t, true); GraphTraversal tg; tg = t.group().by(__.select(Pop.first, "v").id()) .by(__.select(Pop.all, "v").count(Scope.local) - .sack(Operator.div).sack().sum()) - .order(Scope.local).by(Column.values, Order.desc); - GraphTraversal tLimit = topN <= 0L ? tg : - tg.limit(Scope.local, topN); + .sack(Operator.div).sack().sum()); + GraphTraversal tLimit = topN(tg, topN); return this.execute(tLimit, () -> tLimit.next()); } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/DegreeCentralityAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/DegreeCentralityAlgorithm.java index f29a6301df..01b3e5c4b9 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/DegreeCentralityAlgorithm.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/DegreeCentralityAlgorithm.java @@ -64,10 +64,10 @@ public Object degreeCentrality(Directions direction, String label, long topN) { if (direction == null || direction == Directions.BOTH) { - return degreeCentrality(label, topN); + return this.degreeCentralityForBothDir(label, topN); } assert direction == Directions.OUT || direction == Directions.IN; - assert topN >= 0L; + assert topN >= 0L || topN == NO_LIMIT; Iterator edges = this.edges(direction); @@ -76,12 +76,12 @@ public Object degreeCentrality(Directions direction, Id vertex = null; Id labelId = this.getEdgeLabelId(label); long degree = 0L; - long total = 0L; + long totalEdges = 0L; degrees.startObject(); while (edges.hasNext()) { HugeEdge edge = (HugeEdge) edges.next(); - this.updateProgress(++total); + this.updateProgress(++totalEdges); Id schemaLabel = edge.schemaLabel().id(); if (labelId != null && !labelId.equals(schemaLabel)) { @@ -97,7 +97,7 @@ public Object degreeCentrality(Directions direction, if (vertex != null) { // next vertex found - if (topN <= 0L) { + if (topN <= 0L && topN != NO_LIMIT) { degrees.append(vertex, degree); } else { tops.put(vertex, degree); @@ -108,7 +108,7 @@ public Object degreeCentrality(Directions direction, } if (vertex != null) { - if (topN <= 0L) { + if (topN <= 0L && topN != NO_LIMIT) { degrees.append(vertex, degree); } else { tops.put(vertex, degree); @@ -121,9 +121,9 @@ public Object degreeCentrality(Directions direction, return degrees.asJson(); } - protected Object degreeCentrality(String label, long topN) { - assert topN >= 0L; - long total = 0L; + protected Object degreeCentralityForBothDir(String label, long topN) { + assert topN >= 0L || topN == NO_LIMIT; + long totalVertices = 0L; JsonMap degrees = new JsonMap(); TopMap tops = new TopMap<>(topN); @@ -132,11 +132,11 @@ protected Object degreeCentrality(String label, long topN) { degrees.startObject(); while (vertices.hasNext()) { Id source = (Id) vertices.next().id(); - this.updateProgress(++total); + this.updateProgress(++totalVertices); long degree = this.degree(source, label); if (degree > 0L) { - if (topN <= 0L) { + if (topN <= 0L && topN != NO_LIMIT) { degrees.append(source, degree); } else { tops.put(source, degree); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/EigenvectorCentralityAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/EigenvectorCentralityAlgorithm.java index ec065fa07a..0f695a1fb0 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/EigenvectorCentralityAlgorithm.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/EigenvectorCentralityAlgorithm.java @@ -21,11 +21,8 @@ import java.util.Map; -import org.apache.tinkerpop.gremlin.process.traversal.Order; -import org.apache.tinkerpop.gremlin.process.traversal.Scope; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__; -import org.apache.tinkerpop.gremlin.structure.Column; import org.apache.tinkerpop.gremlin.structure.T; import org.apache.tinkerpop.gremlin.structure.Vertex; @@ -74,7 +71,7 @@ public Object eigenvectorCentrality(Directions direction, long topN) { assert depth > 0; assert degree > 0L || degree == NO_LIMIT; - assert topN >= 0L; + assert topN >= 0L || topN == NO_LIMIT; // TODO: support parameters: Directions dir, String label /* @@ -96,10 +93,8 @@ public Object eigenvectorCentrality(Directions direction, t = t.repeat(__.groupCount("m").by(T.id) .local(unit).simplePath()).times(depth); - GraphTraversal tCap; - tCap = t.cap("m").order(Scope.local).by(Column.values, Order.desc); - GraphTraversal tLimit = topN <= 0L ? tCap : - tCap.limit(Scope.local, topN); + GraphTraversal tCap = t.cap("m"); + GraphTraversal tLimit = topN(tCap, topN); return this.execute(tLimit, () -> tLimit.next()); } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/ClusterCoeffcientAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/ClusterCoeffcientAlgorithm.java index 7ac30cd2da..3f3a26c3ca 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/ClusterCoeffcientAlgorithm.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/ClusterCoeffcientAlgorithm.java @@ -23,33 +23,48 @@ import com.baidu.hugegraph.job.Job; import com.baidu.hugegraph.type.define.Directions; +import com.baidu.hugegraph.util.E; import com.baidu.hugegraph.util.InsertionOrderUtil; public class ClusterCoeffcientAlgorithm extends AbstractCommAlgorithm { + public static final String ALGO_NAME = "cluster_coeffcient"; + @Override public String name() { - return "cluster_coeffcient"; + return ALGO_NAME; } @Override public void checkParameters(Map parameters) { - directionOutIn(parameters); + direction(parameters); degree(parameters); + workersWhenBoth(parameters); } @Override public Object call(Job job, Map parameters) { - try (Traverser traverser = new Traverser(job)) { - return traverser.clusterCoeffcient(directionOutIn(parameters), + int workers = workersWhenBoth(parameters); + try (Traverser traverser = new Traverser(job, workers)) { + return traverser.clusterCoeffcient(direction(parameters), degree(parameters)); } } + protected static int workersWhenBoth(Map parameters) { + Directions direction = direction(parameters); + int workers = workers(parameters); + E.checkArgument(direction == Directions.BOTH || workers <= 0, + "The workers must be not set when direction!=BOTH, " + + "but got workers=%s and direction=%s", + workers, direction); + return workers; + } + private static class Traverser extends TriangleCountAlgorithm.Traverser { - public Traverser(Job job) { - super(job); + public Traverser(Job job, int workers) { + super(job, ALGO_NAME, workers); } public Object clusterCoeffcient(Directions direction, long degree) { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/KCoreAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/KCoreAlgorithm.java index 923c1a2a33..52ddeeb71b 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/KCoreAlgorithm.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/KCoreAlgorithm.java @@ -44,6 +44,8 @@ public class KCoreAlgorithm extends AbstractCommAlgorithm { + public static final String ALGO_NAME = "k_core"; + public static final String KEY_K = "k"; public static final String KEY_MERGED = "merged"; @@ -51,7 +53,7 @@ public class KCoreAlgorithm extends AbstractCommAlgorithm { @Override public String name() { - return "k_core"; + return ALGO_NAME; } @Override @@ -101,7 +103,7 @@ protected static boolean merged(Map parameters) { private static class Traverser extends AlgoTraverser { public Traverser(Job job, int workers) { - super(job, "kcore", workers); + super(job, ALGO_NAME, workers); } public Object kcore(String sourceLabel, String sourceCLabel, diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainAlgorithm.java index 446ab2686e..8eee9f43e5 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainAlgorithm.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainAlgorithm.java @@ -26,9 +26,11 @@ public class LouvainAlgorithm extends AbstractCommAlgorithm { + public static final String ALGO_NAME = "louvain"; + @Override public String name() { - return "louvain"; + return ALGO_NAME; } @Override diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java index 5d7548aa3b..6135d1d402 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java @@ -85,7 +85,7 @@ public class LouvainTraverser extends AlgoTraverser { public LouvainTraverser(Job job, int workers, long degree, String sourceLabel, String sourceCLabel) { - super(job, "louvain", workers); + super(job, LouvainAlgorithm.ALGO_NAME, workers); this.g = this.graph().traversal(); this.sourceLabel = sourceLabel; this.sourceCLabel = sourceCLabel; @@ -422,7 +422,7 @@ private double moveCommunities(int pass) { } }); - consumers.start(); + consumers.start("louvain-move-pass-" + pass); try { while (vertices.hasNext()) { this.updateProgress(++this.progress); @@ -460,7 +460,7 @@ private void mergeCommunities(int pass) { this.graph().tx().commit(); }); - consumers.start(); + consumers.start("louvain-merge-pass-" + pass); try { for (Pair> pair : comms) { Community c = pair.getLeft(); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java index e15665cfc4..0f3506a154 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java @@ -41,9 +41,11 @@ public class LpaAlgorithm extends AbstractCommAlgorithm { + public static final String ALGO_NAME = "lpa"; + @Override public String name() { - return "lpa"; + return ALGO_NAME; } @Override @@ -87,7 +89,7 @@ private static class Traverser extends AlgoTraverser { private final Random R = new Random(); public Traverser(Job job, int workers) { - super(job, "lpa", workers); + super(job, ALGO_NAME, workers); } public Object lpa(String sourceLabel, String edgeLabel, diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/TriangleCountAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/TriangleCountAlgorithm.java index 0adb4707c5..d8a17653c5 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/TriangleCountAlgorithm.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/TriangleCountAlgorithm.java @@ -19,48 +19,70 @@ package com.baidu.hugegraph.job.algorithm.comm; -import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.lang.mutable.MutableLong; import org.apache.tinkerpop.gremlin.structure.Edge; import com.baidu.hugegraph.backend.id.Id; +import com.baidu.hugegraph.backend.id.IdGenerator; import com.baidu.hugegraph.job.Job; import com.baidu.hugegraph.structure.HugeEdge; import com.baidu.hugegraph.type.define.Directions; +import com.baidu.hugegraph.util.E; import com.baidu.hugegraph.util.InsertionOrderUtil; import com.google.common.collect.ImmutableMap; public class TriangleCountAlgorithm extends AbstractCommAlgorithm { + public static final String ALGO_NAME = "triangle_count"; + @Override public String name() { - return "triangle_count"; + return ALGO_NAME; } @Override public void checkParameters(Map parameters) { - directionOutIn(parameters); + direction4Out(parameters); degree(parameters); + workersWhenBoth(parameters); } @Override public Object call(Job job, Map parameters) { - try (Traverser traverser = new Traverser(job)) { - return traverser.triangleCount(directionOutIn(parameters), + int workers = workersWhenBoth(parameters); + try (Traverser traverser = new Traverser(job, workers)) { + return traverser.triangleCount(direction4Out(parameters), degree(parameters)); } } + protected static int workersWhenBoth(Map parameters) { + Directions direction = direction4Out(parameters); + int workers = workers(parameters); + E.checkArgument(direction == Directions.BOTH || workers <= 0, + "The workers must be not set when direction!=BOTH, " + + "but got workers=%s and direction=%s", + workers, direction); + return workers; + } + protected static class Traverser extends AlgoTraverser { protected static final String KEY_TRIANGLES = "triangles"; protected static final String KEY_TRIADS = "triads"; - public Traverser(Job job) { - super(job); + public Traverser(Job job, int workers) { + super(job, ALGO_NAME, workers); + } + + protected Traverser(Job job, String name, int workers) { + super(job, name, workers); } public Object triangleCount(Directions direction, long degree) { @@ -73,22 +95,23 @@ public Object triangleCount(Directions direction, long degree) { protected Map triangles(Directions direction, long degree) { if (direction == null || direction == Directions.BOTH) { - throw new IllegalArgumentException("Direction must be OUT/IN"); + return this.trianglesForBothDir(degree); } + assert direction == Directions.OUT || direction == Directions.IN; Iterator edges = this.edges(direction); long triangles = 0L; long triads = 0L; - long total = 0L; + long totalEdges = 0L; long totalVertices = 0L; Id vertex = null; - Set adjVertices = new HashSet<>(); + Set adjVertices = newOrderedSet(); while (edges.hasNext()) { HugeEdge edge = (HugeEdge) edges.next(); - this.updateProgress(++total); + this.updateProgress(++totalEdges); Id source = edge.ownerVertex().id(); Id target = edge.otherVertex().id(); @@ -108,37 +131,97 @@ protected Map triangles(Directions direction, * B -> [D,F] * E -> [B,C,F] */ - triangles += this.intersect(direction, degree, adjVertices); + triangles += this.intersect(degree, adjVertices); triads += this.localTriads(adjVertices.size()); totalVertices++; // Reset for the next source - adjVertices = new HashSet<>(); + adjVertices = newOrderedSet(); } vertex = source; adjVertices.add(target); } if (vertex != null) { - triangles += this.intersect(direction, degree, adjVertices); + triangles += this.intersect(degree, adjVertices); triads += this.localTriads(adjVertices.size()); totalVertices++; } String suffix = "_" + direction.string(); - return ImmutableMap.of("edges" + suffix, total, + return ImmutableMap.of("edges" + suffix, totalEdges, "vertices" + suffix, totalVertices, KEY_TRIANGLES, triangles, KEY_TRIADS, triads); } - protected long intersect(Directions dir, long degree, - Set adjVertices) { + protected Map trianglesForBothDir(long degree) { + AtomicLong triangles = new AtomicLong(0L); + AtomicLong triads = new AtomicLong(0L); + AtomicLong totalEdges = new AtomicLong(0L); + AtomicLong totalVertices = new AtomicLong(0L); + + this.traverse(null, null, v -> { + Id source = (Id) v.id(); + + MutableLong edgesCount = new MutableLong(0L); + Set adjVertices = this.adjacentVertices(source, degree, + edgesCount); + + triangles.addAndGet(this.intersect(degree, adjVertices)); + triads.addAndGet(this.localTriads(adjVertices.size())); + + totalVertices.incrementAndGet(); + totalEdges.addAndGet(edgesCount.longValue()); + }); + + assert totalEdges.get() % 2L == 0L; + assert triangles.get() % 3L == 0L; + // totalEdges /= 2L + totalEdges.getAndAccumulate(2L, (l, w) -> l / w); + // triangles /= 3L + triangles.getAndAccumulate(3L, (l, w) -> l / w); + // triads -= triangles * 2L + triads.addAndGet(triangles.get() * -2L); + + return ImmutableMap.of("edges", totalEdges.get(), + "vertices", totalVertices.get(), + KEY_TRIANGLES, triangles.get(), + KEY_TRIADS, triads.get()); + } + + private Set adjacentVertices(Id source, long degree, + MutableLong edgesCount) { + Iterator adjVertices = this.adjacentVertices(source, + Directions.BOTH, + null, degree); + Set set = newOrderedSet(); + while (adjVertices.hasNext()) { + edgesCount.increment(); + set.add(adjVertices.next()); + } + return set; + } + + protected long intersect(long degree, Set adjVertices) { long count = 0L; + Directions dir = Directions.OUT; + Id empty = IdGenerator.of(0); Iterator vertices; for (Id v : adjVertices) { vertices = this.adjacentVertices(v, dir, null, degree); + Id lastVertex = empty; while (vertices.hasNext()) { Id vertex = vertices.next(); + if (lastVertex.equals(vertex)) { + // Skip duplicated target vertex (through sortkeys) + continue; + } + lastVertex = vertex; + + /* + * FIXME: deduplicate two edges with opposite directions + * between two specified adjacent vertices + */ if (adjVertices.contains(vertex)) { count++; } @@ -150,5 +233,9 @@ protected long intersect(Directions dir, long degree, protected long localTriads(int size) { return size * (size - 1L) / 2L; } + + protected static Set newOrderedSet() { + return new TreeSet<>(); + } } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/path/RingsDetectAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/path/RingsDetectAlgorithm.java index 3d5bd163e3..bbb028efc1 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/path/RingsDetectAlgorithm.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/path/RingsDetectAlgorithm.java @@ -20,34 +20,37 @@ package com.baidu.hugegraph.job.algorithm.path; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import com.baidu.hugegraph.backend.id.Id; import com.baidu.hugegraph.job.Job; import com.baidu.hugegraph.job.algorithm.AbstractAlgorithm; +import com.baidu.hugegraph.job.algorithm.Consumers.StopExecution; import com.baidu.hugegraph.traversal.algorithm.SubGraphTraverser; import com.baidu.hugegraph.type.define.Directions; import com.baidu.hugegraph.util.JsonUtil; public class RingsDetectAlgorithm extends AbstractAlgorithm { + public static final String ALGO_NAME = "rings"; + public static final String KEY_COUNT_ONLY = "count_only"; @Override - public String name() { - return "rings_detect"; + public String category() { + return CATEGORY_PATH; } @Override - public String category() { - return CATEGORY_PATH; + public String name() { + return ALGO_NAME; } @Override public void checkParameters(Map parameters) { depth(parameters); degree(parameters); - capacity(parameters); + eachLimit(parameters); limit(parameters); sourceLabel(parameters); sourceCLabel(parameters); @@ -67,13 +70,13 @@ public Object call(Job job, Map parameters) { edgeLabel(parameters), depth(parameters), degree(parameters), - capacity(parameters), + eachLimit(parameters), limit(parameters), countOnly(parameters)); } } - public boolean countOnly(Map parameters) { + protected boolean countOnly(Map parameters) { if (!parameters.containsKey(KEY_COUNT_ONLY)) { return false; } @@ -83,12 +86,12 @@ public boolean countOnly(Map parameters) { private static class Traverser extends AlgoTraverser { public Traverser(Job job, int workers) { - super(job, "ring", workers); + super(job, ALGO_NAME, workers); } public Object rings(String sourceLabel, String sourceCLabel, Directions dir, String label, int depth, - long degree, long capacity, long limit, + long degree, long eachLimit, long limit, boolean countOnly) { JsonMap ringsJson = new JsonMap(); ringsJson.startObject(); @@ -100,24 +103,24 @@ public Object rings(String sourceLabel, String sourceCLabel, } SubGraphTraverser traverser = new SubGraphTraverser(this.graph()); - AtomicInteger count = new AtomicInteger(0); + AtomicLong count = new AtomicLong(0L); this.traverse(sourceLabel, sourceCLabel, v -> { Id source = (Id) v.id(); PathSet rings = traverser.rings(source, dir, label, depth, - true, degree, capacity, limit); + true, degree, MAX_CAPACITY, + eachLimit); + assert eachLimit == NO_LIMIT || rings.size() <= eachLimit; for (Path ring : rings) { - Id min = null; - for (Id id : ring.vertices()) { - if (min == null || id.compareTo(min) < 0) { - min = id; - } + if (eachLimit == NO_LIMIT && !ring.ownedBy(source)) { + // Only dedup rings when each_limit!=NO_LIMIT + continue; } - if (source.equals(min)) { - if (countOnly) { - count.incrementAndGet(); - continue; - } + + if (count.incrementAndGet() > limit && limit != NO_LIMIT) { + throw new StopExecution("exceed limit %s", limit); + } + if (!countOnly) { String ringJson = JsonUtil.toJson(ring.vertices()); synchronized (ringsJson) { ringsJson.appendRaw(ringJson); @@ -125,8 +128,14 @@ public Object rings(String sourceLabel, String sourceCLabel, } } }); + if (countOnly) { - ringsJson.append(count.get()); + long counted = count.get(); + if (limit != NO_LIMIT && counted > limit) { + // The count increased by multi threads and exceed limit + counted = limit; + } + ringsJson.append(counted); } else { ringsJson.endList(); } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/similarity/FusiformSimilarityAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/similarity/FusiformSimilarityAlgorithm.java index 82294f48bb..fbaca4960e 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/similarity/FusiformSimilarityAlgorithm.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/similarity/FusiformSimilarityAlgorithm.java @@ -20,12 +20,14 @@ package com.baidu.hugegraph.job.algorithm.similarity; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; import com.baidu.hugegraph.HugeGraph; import com.baidu.hugegraph.job.Job; import com.baidu.hugegraph.job.algorithm.AbstractAlgorithm; +import com.baidu.hugegraph.job.algorithm.Consumers.StopExecution; import com.baidu.hugegraph.schema.EdgeLabel; import com.baidu.hugegraph.traversal.algorithm.FusiformSimilarityTraverser; import com.baidu.hugegraph.traversal.algorithm.FusiformSimilarityTraverser.SimilarsMap; @@ -35,24 +37,27 @@ public class FusiformSimilarityAlgorithm extends AbstractAlgorithm { + public static final String ALGO_NAME = "fusiform_similarity"; + public static final String KEY_MIN_NEIGHBORS = "min_neighbors"; public static final String KEY_MIN_SIMILARS = "min_similars"; + public static final String KEY_TOP_SIMILARS = "top_similars"; public static final String KEY_GROUP_PROPERTY = "group_property"; public static final String KEY_MIN_GROUPS = "min_groups"; public static final int DEFAULT_MIN_NEIGHBORS = 10; public static final int DEFAULT_MIN_SIMILARS = 6; + public static final int DEFAULT_TOP_SIMILARS = 0; public static final int DEFAULT_MIN_GROUPS = 1; - public static final long DEFAULT_LIMIT = -1L; @Override - public String name() { - return "fusiform_similarity"; + public String category() { + return CATEGORY_SIMI; } @Override - public String category() { - return CATEGORY_SIMI; + public String name() { + return ALGO_NAME; } @Override @@ -60,11 +65,10 @@ public void checkParameters(Map parameters) { minNeighbors(parameters); alpha(parameters); minSimilars(parameters); - top(parameters); + topSimilars(parameters); groupProperty(parameters); minGroups(parameters); degree(parameters); - capacity(parameters); limit(parameters); sourceLabel(parameters); sourceCLabel(parameters); @@ -84,11 +88,10 @@ public Object call(Job job, Map parameters) { minNeighbors(parameters), alpha(parameters), minSimilars(parameters), - top(parameters), + topSimilars(parameters), groupProperty(parameters), minGroups(parameters), degree(parameters), - capacity(parameters), limit(parameters)); } } @@ -98,7 +101,7 @@ protected static int minNeighbors(Map parameters) { return DEFAULT_MIN_NEIGHBORS; } int minNeighbors = parameterInt(parameters, KEY_MIN_NEIGHBORS); - HugeTraverser.checkPositive(minNeighbors, "min neighbors"); + HugeTraverser.checkPositive(minNeighbors, KEY_MIN_NEIGHBORS); return minNeighbors; } @@ -107,7 +110,16 @@ protected static int minSimilars(Map parameters) { return DEFAULT_MIN_SIMILARS; } int minSimilars = parameterInt(parameters, KEY_MIN_SIMILARS); - HugeTraverser.checkPositive(minSimilars, "min similars"); + HugeTraverser.checkPositive(minSimilars, KEY_MIN_SIMILARS); + return minSimilars; + } + + protected static int topSimilars(Map parameters) { + if (!parameters.containsKey(KEY_TOP_SIMILARS)) { + return DEFAULT_TOP_SIMILARS; + } + int minSimilars = parameterInt(parameters, KEY_TOP_SIMILARS); + HugeTraverser.checkNonNegative(minSimilars, KEY_TOP_SIMILARS); return minSimilars; } @@ -123,7 +135,7 @@ protected static int minGroups(Map parameters) { return DEFAULT_MIN_GROUPS; } int minGroups = parameterInt(parameters, KEY_MIN_GROUPS); - HugeTraverser.checkPositive(minGroups, "min groups"); + HugeTraverser.checkPositive(minGroups, KEY_MIN_GROUPS); return minGroups; } @@ -139,7 +151,7 @@ protected static long limit(Map parameters) { private static class Traverser extends AlgoTraverser { public Traverser(Job job, int workers) { - super(job, "fusiform", workers); + super(job, ALGO_NAME, workers); } public Object fusiformSimilars(String sourceLabel, String sourceCLabel, @@ -147,12 +159,14 @@ public Object fusiformSimilars(String sourceLabel, String sourceCLabel, int minNeighbors, double alpha, int minSimilars, long topSimilars, String groupProperty, int minGroups, - long degree, long capacity, long limit) { + long degree, long limit) { HugeGraph graph = this.graph(); EdgeLabel edgeLabel = label == null ? null : graph.edgeLabel(label); FusiformSimilarityTraverser traverser = new FusiformSimilarityTraverser(graph); + + AtomicLong count = new AtomicLong(0L); JsonMap similarsJson = new JsonMap(); similarsJson.startObject(); @@ -162,16 +176,20 @@ public Object fusiformSimilars(String sourceLabel, String sourceCLabel, edgeLabel, minNeighbors, alpha, minSimilars, (int) topSimilars, groupProperty, minGroups, degree, - capacity, NO_LIMIT, true); + MAX_CAPACITY, NO_LIMIT, true); if (similars.isEmpty()) { return; } String result = JsonUtil.toJson(similars.toMap()); result = result.substring(1, result.length() - 1); synchronized (similarsJson) { + if (count.incrementAndGet() > limit && limit != NO_LIMIT) { + throw new StopExecution("exceed limit %s", limit); + } similarsJson.appendRaw(result); } - }, null, limit); + }); + similarsJson.endObject(); return similarsJson.asJson();