diff --git a/nebula-algorithm/src/main/resources/application.conf b/nebula-algorithm/src/main/resources/application.conf index 7d0ffb84..4d2bdc2e 100644 --- a/nebula-algorithm/src/main/resources/application.conf +++ b/nebula-algorithm/src/main/resources/application.conf @@ -13,7 +13,7 @@ # data source. optional of nebula,csv,json source: csv # data sink, means the algorithm result will be write into this sink. optional of nebula,csv,text - sink: nebula + sink: csv # if your algorithm needs weight hasWeight: false } @@ -51,7 +51,7 @@ local: { # algo's data source from Nebula. If data.source is csv or json, then this local.read can be valid. read:{ - filePath: "hdfs://127.0.0.1:9000/edge/work_for.csv" + filePath: "file:///tmp/algo_edge.csv" # srcId column srcId:"_c0" # dstId column @@ -66,7 +66,7 @@ # algo result sink into local file. If data.sink is csv or text, then this local.write can be valid. write:{ - resultPath:/tmp/ + resultPath:/tmp/count } } @@ -74,8 +74,8 @@ algorithm: { # the algorithm that you are going to execute,pick one from [pagerank, louvain, connectedcomponent, # labelpropagation, shortestpaths, degreestatic, kcore, stronglyconnectedcomponent, trianglecount, - # betweenness] - executeAlgo: pagerank + # betweenness, graphtriangleCount] + executeAlgo: graphtrianglecount # PageRank parameter pagerank: { @@ -118,6 +118,9 @@ # Trianglecount parameter trianglecount:{} + # graphTriangleCount parameter + graphtrianglecount:{} + # Betweenness centrality parameter betweenness:{ maxIter:5 diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala index 9992682f..96cf9b3f 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala @@ -9,6 +9,7 @@ package com.vesoft.nebula.algorithm import com.vesoft.nebula.algorithm.config.Configs.Argument import com.vesoft.nebula.algorithm.config.{ AlgoConfig, + AlgoConstants, BetweennessConfig, CcConfig, Configs, @@ -23,6 +24,7 @@ import com.vesoft.nebula.algorithm.lib.{ BetweennessCentralityAlgo, ConnectedComponentsAlgo, DegreeStaticAlgo, + GraphTriangleCountAlgo, KCoreAlgo, LabelPropagationAlgo, LouvainAlgo, @@ -35,6 +37,7 @@ import com.vesoft.nebula.algorithm.reader.{CsvReader, JsonReader, NebulaReader} import com.vesoft.nebula.algorithm.writer.{CsvWriter, NebulaWriter, TextWriter} import org.apache.commons.math3.ode.UnknownParameterException import org.apache.log4j.Logger +import org.apache.spark.sql.types.{LongType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} /** @@ -160,6 +163,9 @@ object Main { case "trianglecount" => { TriangleCountAlgo(spark, dataSet) } + case "graphtrianglecount" => { + GraphTriangleCountAlgo(spark, dataSet) + } case _ => throw new UnknownParameterException("unknown executeAlgo name.") } } diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/GraphTriangleCountAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/GraphTriangleCountAlgo.scala new file mode 100644 index 00000000..7d3585f1 --- /dev/null +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/GraphTriangleCountAlgo.scala @@ -0,0 +1,38 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.lib + +import com.vesoft.nebula.algorithm.config.AlgoConstants +import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} +import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType} + +/** + * compute all graph's triangle count + */ +object GraphTriangleCountAlgo { + + def apply(spark: SparkSession, dataset: Dataset[Row]): DataFrame = { + + val triangleCount = TriangleCountAlgo(spark, dataset) + val count = triangleCount + .select(AlgoConstants.TRIANGLECOUNT_RESULT_COL) + .rdd + .map(value => value.get(0).asInstanceOf[Int]) + .reduce(_ + _) / 3 + val list = List(count) + val rdd = spark.sparkContext.parallelize(list).map(row => Row(row)) + + val schema = StructType( + List( + StructField("count", IntegerType, nullable = false) + )) + val algoResult = spark.sqlContext + .createDataFrame(rdd, schema) + + algoResult + } +} diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/TriangleCountAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/TriangleCountAlgo.scala index 93d08a11..b9badb82 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/TriangleCountAlgo.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/TriangleCountAlgo.scala @@ -22,6 +22,8 @@ object TriangleCountAlgo { /** * run the TriangleCount algorithm for nebula graph + * + * compute each vertex's triangle count */ def apply(spark: SparkSession, dataset: Dataset[Row]): DataFrame = {