From c415949b315d2f567461b4487ff07d55cab75cd4 Mon Sep 17 00:00:00 2001 From: Nicole00 <16240361+Nicole00@users.noreply.github.com> Date: Fri, 11 Jun 2021 14:08:19 +0800 Subject: [PATCH 1/2] add GraphTriangleCount algorithm --- .../src/main/resources/application.conf | 8 ++-- .../com/vesoft/nebula/algorithm/Main.scala | 6 +++ .../lib/GraphTriangleCountAlgo.scala | 38 +++++++++++++++++++ .../algorithm/lib/TriangleCountAlgo.scala | 2 + 4 files changed, 50 insertions(+), 4 deletions(-) create mode 100644 nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/GraphTriangleCountAlgo.scala diff --git a/nebula-algorithm/src/main/resources/application.conf b/nebula-algorithm/src/main/resources/application.conf index 7d0ffb84..b4ec37f5 100644 --- a/nebula-algorithm/src/main/resources/application.conf +++ b/nebula-algorithm/src/main/resources/application.conf @@ -13,7 +13,7 @@ # data source. optional of nebula,csv,json source: csv # data sink, means the algorithm result will be write into this sink. optional of nebula,csv,text - sink: nebula + sink: csv # if your algorithm needs weight hasWeight: false } @@ -51,7 +51,7 @@ local: { # algo's data source from Nebula. If data.source is csv or json, then this local.read can be valid. read:{ - filePath: "hdfs://127.0.0.1:9000/edge/work_for.csv" + filePath: "file:///tmp/algo_edge.csv" # srcId column srcId:"_c0" # dstId column @@ -66,7 +66,7 @@ # algo result sink into local file. If data.sink is csv or text, then this local.write can be valid. write:{ - resultPath:/tmp/ + resultPath:/tmp/count } } @@ -75,7 +75,7 @@ # the algorithm that you are going to execute,pick one from [pagerank, louvain, connectedcomponent, # labelpropagation, shortestpaths, degreestatic, kcore, stronglyconnectedcomponent, trianglecount, # betweenness] - executeAlgo: pagerank + executeAlgo: graphtrianglecount # PageRank parameter pagerank: { diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala index 9992682f..96cf9b3f 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala @@ -9,6 +9,7 @@ package com.vesoft.nebula.algorithm import com.vesoft.nebula.algorithm.config.Configs.Argument import com.vesoft.nebula.algorithm.config.{ AlgoConfig, + AlgoConstants, BetweennessConfig, CcConfig, Configs, @@ -23,6 +24,7 @@ import com.vesoft.nebula.algorithm.lib.{ BetweennessCentralityAlgo, ConnectedComponentsAlgo, DegreeStaticAlgo, + GraphTriangleCountAlgo, KCoreAlgo, LabelPropagationAlgo, LouvainAlgo, @@ -35,6 +37,7 @@ import com.vesoft.nebula.algorithm.reader.{CsvReader, JsonReader, NebulaReader} import com.vesoft.nebula.algorithm.writer.{CsvWriter, NebulaWriter, TextWriter} import org.apache.commons.math3.ode.UnknownParameterException import org.apache.log4j.Logger +import org.apache.spark.sql.types.{LongType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} /** @@ -160,6 +163,9 @@ object Main { case "trianglecount" => { TriangleCountAlgo(spark, dataSet) } + case "graphtrianglecount" => { + GraphTriangleCountAlgo(spark, dataSet) + } case _ => throw new UnknownParameterException("unknown executeAlgo name.") } } diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/GraphTriangleCountAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/GraphTriangleCountAlgo.scala new file mode 100644 index 00000000..7d3585f1 --- /dev/null +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/GraphTriangleCountAlgo.scala @@ -0,0 +1,38 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.lib + +import com.vesoft.nebula.algorithm.config.AlgoConstants +import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} +import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType} + +/** + * compute all graph's triangle count + */ +object GraphTriangleCountAlgo { + + def apply(spark: SparkSession, dataset: Dataset[Row]): DataFrame = { + + val triangleCount = TriangleCountAlgo(spark, dataset) + val count = triangleCount + .select(AlgoConstants.TRIANGLECOUNT_RESULT_COL) + .rdd + .map(value => value.get(0).asInstanceOf[Int]) + .reduce(_ + _) / 3 + val list = List(count) + val rdd = spark.sparkContext.parallelize(list).map(row => Row(row)) + + val schema = StructType( + List( + StructField("count", IntegerType, nullable = false) + )) + val algoResult = spark.sqlContext + .createDataFrame(rdd, schema) + + algoResult + } +} diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/TriangleCountAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/TriangleCountAlgo.scala index 93d08a11..b9badb82 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/TriangleCountAlgo.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/TriangleCountAlgo.scala @@ -22,6 +22,8 @@ object TriangleCountAlgo { /** * run the TriangleCount algorithm for nebula graph + * + * compute each vertex's triangle count */ def apply(spark: SparkSession, dataset: Dataset[Row]): DataFrame = { From 74f643e38cf54a1a34492ef4e071ea3a4149b54d Mon Sep 17 00:00:00 2001 From: Nicole00 <16240361+Nicole00@users.noreply.github.com> Date: Fri, 11 Jun 2021 14:09:59 +0800 Subject: [PATCH 2/2] add config for GraphTriangleCount --- nebula-algorithm/src/main/resources/application.conf | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/nebula-algorithm/src/main/resources/application.conf b/nebula-algorithm/src/main/resources/application.conf index b4ec37f5..4d2bdc2e 100644 --- a/nebula-algorithm/src/main/resources/application.conf +++ b/nebula-algorithm/src/main/resources/application.conf @@ -74,7 +74,7 @@ algorithm: { # the algorithm that you are going to execute,pick one from [pagerank, louvain, connectedcomponent, # labelpropagation, shortestpaths, degreestatic, kcore, stronglyconnectedcomponent, trianglecount, - # betweenness] + # betweenness, graphtriangleCount] executeAlgo: graphtrianglecount # PageRank parameter @@ -118,6 +118,9 @@ # Trianglecount parameter trianglecount:{} + # graphTriangleCount parameter + graphtrianglecount:{} + # Betweenness centrality parameter betweenness:{ maxIter:5