From 9926fc6e05d54019df64a8c89c2b2a188ca5ce63 Mon Sep 17 00:00:00 2001 From: Jermy Li Date: Thu, 20 Aug 2020 18:09:56 +0800 Subject: [PATCH] add export_community for lounvain (#43) Change-Id: I01e402fc99669f53544279c752f81d886c6ce28f --- .../job/algorithm/AbstractAlgorithm.java | 10 ++++- .../job/algorithm/comm/LouvainAlgorithm.java | 15 +++++++ .../job/algorithm/comm/LouvainTraverser.java | 39 ++++++++++++++++++- 3 files changed, 62 insertions(+), 2 deletions(-) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java index 064b1e344c..943debb4b8 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java @@ -68,6 +68,9 @@ public abstract class AbstractAlgorithm implements Algorithm { public static final long MAX_CAPACITY = MAX_QUERY_LIMIT; public static final int BATCH = 500; + public static final String USER_DIR = System.getProperty("user.dir"); + public static final String EXPORT_PATH = USER_DIR + "/export"; + public static final String CATEGORY_AGGR = "aggregate"; public static final String CATEGORY_PATH = "path"; public static final String CATEGORY_RANK = "rank"; @@ -89,6 +92,7 @@ public abstract class AbstractAlgorithm implements Algorithm { public static final String KEY_PRECISION = "precision"; public static final String KEY_SHOW_MOD= "show_modularity"; public static final String KEY_SHOW_COMM = "show_community"; + public static final String KEY_EXPORT_COMM = "export_community"; public static final String KEY_SKIP_ISOLATED = "skip_isolated"; public static final String KEY_CLEAR = "clear"; public static final String KEY_CAPACITY = "capacity"; @@ -287,7 +291,7 @@ public AlgoTraverser(UserJob job) { protected AlgoTraverser(UserJob job, String name, int workers) { super(job.graph()); this.job = job; - String prefix = name + "-" + job.task().id(); + String prefix = name + "-" + this.jobId(); this.executor = Consumers.newThreadPool(prefix, workers); } @@ -295,6 +299,10 @@ public void updateProgress(long progress) { this.job.updateProgress((int) progress); } + public Id jobId() { + return this.job.task().id(); + } + @Override public void close() { if (this.executor != null) { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainAlgorithm.java index 3b3b0a6b8f..3789d6a19f 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainAlgorithm.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainAlgorithm.java @@ -44,6 +44,7 @@ public void checkParameters(Map parameters) { sourceCLabel(parameters); showModularity(parameters); showCommunity(parameters); + exportCommunity(parameters); skipIsolated(parameters); clearPass(parameters); workers(parameters); @@ -60,6 +61,7 @@ public Object call(UserJob job, Map parameters) { Long clearPass = clearPass(parameters); Long modPass = showModularity(parameters); String showComm = showCommunity(parameters); + Long exportPass = exportCommunity(parameters); try (LouvainTraverser traverser = new LouvainTraverser( job, workers, degree, @@ -68,6 +70,10 @@ public Object call(UserJob job, Map parameters) { return traverser.clearPass(clearPass.intValue()); } else if (modPass != null) { return traverser.modularity(modPass.intValue()); + } else if (exportPass != null) { + boolean vertexFirst = showComm == null; + int pass = exportPass.intValue(); + return traverser.exportCommunity(pass, vertexFirst); } else if (showComm != null) { return traverser.showCommunity(showComm); } else { @@ -99,6 +105,15 @@ protected static Long showModularity(Map parameters) { return pass; } + protected static Long exportCommunity(Map parameters) { + if (!parameters.containsKey(KEY_EXPORT_COMM)) { + return null; + } + long pass = ParameterUtil.parameterLong(parameters, KEY_EXPORT_COMM); + HugeTraverser.checkNonNegative(pass, KEY_EXPORT_COMM); + return pass; + } + protected static boolean skipIsolated(Map parameters) { if (!parameters.containsKey(KEY_SKIP_ISOLATED)) { return true; diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java index 42ac3e9906..4359d46b80 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java @@ -19,6 +19,9 @@ package com.baidu.hugegraph.job.algorithm.comm; +import java.io.BufferedOutputStream; +import java.io.FileOutputStream; +import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -59,6 +62,7 @@ import com.baidu.hugegraph.type.define.Directions; import com.baidu.hugegraph.util.InsertionOrderUtil; import com.baidu.hugegraph.util.Log; +import com.baidu.hugegraph.util.StringEncoding; import com.google.common.collect.ImmutableMap; public class LouvainTraverser extends AlgoTraverser { @@ -660,7 +664,7 @@ public Collection showCommunity(String community) { Vertex sub = subComms.next(); if (sub.property(C_MEMBERS).isPresent()) { Set members = sub.value(C_MEMBERS); - reachPass0 = sub.label().equals(C_PASS0); + reachPass0 = sub.label().equals(C_PASS0); comms.addAll(members); } } @@ -668,6 +672,39 @@ public Collection showCommunity(String community) { return comms; } + public long exportCommunity(int pass, boolean vertexFirst) { + String exportFile = String.format("%s/louvain-%s.txt", + LouvainAlgorithm.EXPORT_PATH, + this.jobId()); + String label = labelOfPassN(pass); + GraphTraversal t = this.g.V().hasLabel(label); + this.execute(t, () -> { + try (OutputStream os = new FileOutputStream(exportFile); + BufferedOutputStream bos = new BufferedOutputStream(os)) { + while (t.hasNext()) { + String comm = t.next().id().toString(); + Collection members = this.showCommunity(comm); + if (vertexFirst) { + for (Object member : members) { + bos.write(StringEncoding.encode(member.toString())); + bos.write(StringEncoding.encode("\t")); + bos.write(StringEncoding.encode(comm)); + bos.write(StringEncoding.encode("\n")); + } + } else { + bos.write(StringEncoding.encode(comm)); + bos.write(StringEncoding.encode(": ")); + bos.write(StringEncoding.encode(members.toString())); + bos.write(StringEncoding.encode("\n")); + } + } + } + return null; + }); + + return this.progress; + } + public long clearPass(int pass) { GraphTraversal te = this.g.E(); if (pass < 0) {