From 7721b57a3bda05b6cd3698d6f8533d9f4999c808 Mon Sep 17 00:00:00 2001 From: diaohancai <550630588@qq.com> Date: Fri, 26 Jan 2024 18:36:57 +0800 Subject: [PATCH] core(output): output custom write computation result --- .../path/shortest/SingleSourceShortestPath.java | 4 ++++ .../path/shortest/SingleSourceShortestPathOutput.java | 8 ++++++++ .../hugegraph/computer/core/output/ComputerOutput.java | 9 +++++++++ .../computer/core/compute/FileGraphPartition.java | 4 +++- 4 files changed, 24 insertions(+), 1 deletion(-) diff --git a/computer-algorithm/src/main/java/org/apache/hugegraph/computer/algorithm/path/shortest/SingleSourceShortestPath.java b/computer-algorithm/src/main/java/org/apache/hugegraph/computer/algorithm/path/shortest/SingleSourceShortestPath.java index 202214d24..f360f2b15 100644 --- a/computer-algorithm/src/main/java/org/apache/hugegraph/computer/algorithm/path/shortest/SingleSourceShortestPath.java +++ b/computer-algorithm/src/main/java/org/apache/hugegraph/computer/algorithm/path/shortest/SingleSourceShortestPath.java @@ -294,4 +294,8 @@ private boolean isAllTargetsReached(Vertex vertex) { } return false; } + + public IdSet getTargetIdSet() { + return targetIdSet; + } } diff --git a/computer-algorithm/src/main/java/org/apache/hugegraph/computer/algorithm/path/shortest/SingleSourceShortestPathOutput.java b/computer-algorithm/src/main/java/org/apache/hugegraph/computer/algorithm/path/shortest/SingleSourceShortestPathOutput.java index 4a646cf41..943babb2e 100644 --- a/computer-algorithm/src/main/java/org/apache/hugegraph/computer/algorithm/path/shortest/SingleSourceShortestPathOutput.java +++ b/computer-algorithm/src/main/java/org/apache/hugegraph/computer/algorithm/path/shortest/SingleSourceShortestPathOutput.java @@ -20,8 +20,10 @@ import java.util.HashMap; import java.util.Map; +import org.apache.hugegraph.computer.core.config.Config; import org.apache.hugegraph.computer.core.graph.vertex.Vertex; import org.apache.hugegraph.computer.core.output.hg.HugeGraphOutput; +import org.apache.hugegraph.computer.core.worker.Computation; import org.apache.hugegraph.util.JsonUtil; public class SingleSourceShortestPathOutput extends HugeGraphOutput { @@ -45,4 +47,10 @@ protected String value(Vertex vertex) { map.put("total_weight", value.totalWeight()); return JsonUtil.toJson(map); } + + @Override + public boolean filter(Config config, Computation computation, Vertex vertex) { + SingleSourceShortestPath sssp = (SingleSourceShortestPath) computation; + return sssp.getTargetIdSet() == null || sssp.getTargetIdSet().contains(vertex.id()); + } } diff --git a/computer-api/src/main/java/org/apache/hugegraph/computer/core/output/ComputerOutput.java b/computer-api/src/main/java/org/apache/hugegraph/computer/core/output/ComputerOutput.java index c2c2cc20d..3522853f9 100644 --- a/computer-api/src/main/java/org/apache/hugegraph/computer/core/output/ComputerOutput.java +++ b/computer-api/src/main/java/org/apache/hugegraph/computer/core/output/ComputerOutput.java @@ -19,6 +19,7 @@ import org.apache.hugegraph.computer.core.config.Config; import org.apache.hugegraph.computer.core.graph.vertex.Vertex; +import org.apache.hugegraph.computer.core.worker.Computation; /** * Computer output is used to output computer results. There is an output object @@ -37,6 +38,14 @@ public interface ComputerOutput { */ void write(Vertex vertex); + /** + * Write filter. + * True to commit the computation result, otherwise not to commit. + */ + default boolean filter(Config config, Computation computation, Vertex vertex) { + return true; + } + /** * Merge output files of multiple partitions if applicable. */ diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/compute/FileGraphPartition.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/compute/FileGraphPartition.java index 25312868e..a2e61a8a8 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/compute/FileGraphPartition.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/compute/FileGraphPartition.java @@ -255,7 +255,9 @@ protected PartitionStat output() { Edges edges = this.edgesInput.edges(this.vertexInput.idPointer()); vertex.edges(edges); - output.write(vertex); + if (output.filter(this.context.config(), this.computation, vertex)) { + output.write(vertex); + } } try {