Skip to content

Commit

Permalink
add export_community for lounvain (#43)
Browse files Browse the repository at this point in the history
Change-Id: I01e402fc99669f53544279c752f81d886c6ce28f
  • Loading branch information
javeme committed Oct 19, 2022
1 parent 7c665f3 commit d72d189
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ public abstract class AbstractAlgorithm implements Algorithm {
public static final long MAX_CAPACITY = MAX_QUERY_LIMIT;
public static final int BATCH = 500;

public static final String USER_DIR = System.getProperty("user.dir");
public static final String EXPORT_PATH = USER_DIR + "/export";

public static final String CATEGORY_AGGR = "aggregate";
public static final String CATEGORY_PATH = "path";
public static final String CATEGORY_RANK = "rank";
Expand All @@ -89,6 +92,7 @@ public abstract class AbstractAlgorithm implements Algorithm {
public static final String KEY_PRECISION = "precision";
public static final String KEY_SHOW_MOD= "show_modularity";
public static final String KEY_SHOW_COMM = "show_community";
public static final String KEY_EXPORT_COMM = "export_community";
public static final String KEY_SKIP_ISOLATED = "skip_isolated";
public static final String KEY_CLEAR = "clear";
public static final String KEY_CAPACITY = "capacity";
Expand Down Expand Up @@ -287,14 +291,18 @@ public AlgoTraverser(UserJob<Object> job) {
protected AlgoTraverser(UserJob<Object> job, String name, int workers) {
super(job.graph());
this.job = job;
String prefix = name + "-" + job.task().id();
String prefix = name + "-" + this.jobId();
this.executor = Consumers.newThreadPool(prefix, workers);
}

public void updateProgress(long progress) {
this.job.updateProgress((int) progress);
}

public Id jobId() {
return this.job.task().id();
}

@Override
public void close() {
if (this.executor != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public void checkParameters(Map<String, Object> parameters) {
sourceCLabel(parameters);
showModularity(parameters);
showCommunity(parameters);
exportCommunity(parameters);
skipIsolated(parameters);
clearPass(parameters);
workers(parameters);
Expand All @@ -60,6 +61,7 @@ public Object call(UserJob<Object> job, Map<String, Object> parameters) {
Long clearPass = clearPass(parameters);
Long modPass = showModularity(parameters);
String showComm = showCommunity(parameters);
Long exportPass = exportCommunity(parameters);

try (LouvainTraverser traverser = new LouvainTraverser(
job, workers, degree,
Expand All @@ -68,6 +70,10 @@ public Object call(UserJob<Object> job, Map<String, Object> parameters) {
return traverser.clearPass(clearPass.intValue());
} else if (modPass != null) {
return traverser.modularity(modPass.intValue());
} else if (exportPass != null) {
boolean vertexFirst = showComm == null;
int pass = exportPass.intValue();
return traverser.exportCommunity(pass, vertexFirst);
} else if (showComm != null) {
return traverser.showCommunity(showComm);
} else {
Expand Down Expand Up @@ -99,6 +105,15 @@ protected static Long showModularity(Map<String, Object> parameters) {
return pass;
}

protected static Long exportCommunity(Map<String, Object> parameters) {
if (!parameters.containsKey(KEY_EXPORT_COMM)) {
return null;
}
long pass = ParameterUtil.parameterLong(parameters, KEY_EXPORT_COMM);
HugeTraverser.checkNonNegative(pass, KEY_EXPORT_COMM);
return pass;
}

protected static boolean skipIsolated(Map<String, Object> parameters) {
if (!parameters.containsKey(KEY_SKIP_ISOLATED)) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

package com.baidu.hugegraph.job.algorithm.comm;

import java.io.BufferedOutputStream;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -59,6 +62,7 @@
import com.baidu.hugegraph.type.define.Directions;
import com.baidu.hugegraph.util.InsertionOrderUtil;
import com.baidu.hugegraph.util.Log;
import com.baidu.hugegraph.util.StringEncoding;
import com.google.common.collect.ImmutableMap;

public class LouvainTraverser extends AlgoTraverser {
Expand Down Expand Up @@ -660,14 +664,47 @@ public Collection<Object> showCommunity(String community) {
Vertex sub = subComms.next();
if (sub.property(C_MEMBERS).isPresent()) {
Set<Object> members = sub.value(C_MEMBERS);
reachPass0 = sub.label().equals(C_PASS0);
reachPass0 = sub.label().equals(C_PASS0);
comms.addAll(members);
}
}
}
return comms;
}

public long exportCommunity(int pass, boolean vertexFirst) {
String exportFile = String.format("%s/louvain-%s.txt",
LouvainAlgorithm.EXPORT_PATH,
this.jobId());
String label = labelOfPassN(pass);
GraphTraversal<Vertex, Vertex> t = this.g.V().hasLabel(label);
this.execute(t, () -> {
try (OutputStream os = new FileOutputStream(exportFile);
BufferedOutputStream bos = new BufferedOutputStream(os)) {
while (t.hasNext()) {
String comm = t.next().id().toString();
Collection<Object> members = this.showCommunity(comm);
if (vertexFirst) {
for (Object member : members) {
bos.write(StringEncoding.encode(member.toString()));
bos.write(StringEncoding.encode("\t"));
bos.write(StringEncoding.encode(comm));
bos.write(StringEncoding.encode("\n"));
}
} else {
bos.write(StringEncoding.encode(comm));
bos.write(StringEncoding.encode(": "));
bos.write(StringEncoding.encode(members.toString()));
bos.write(StringEncoding.encode("\n"));
}
}
}
return null;
});

return this.progress;
}

public long clearPass(int pass) {
GraphTraversal<Edge, Edge> te = this.g.E();
if (pass < 0) {
Expand Down

0 comments on commit d72d189

Please sign in to comment.