Skip to content
This repository has been archived by the owner on Oct 18, 2021. It is now read-only.

Commit

Permalink
Merge pull request #97 from Nicole00/sjs_poc
Browse files Browse the repository at this point in the history
add GraphTriangleCount algorithm
  • Loading branch information
HarrisChu authored Sep 2, 2021
2 parents 766e14a + 033f8b8 commit d73f6db
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 5 deletions.
13 changes: 8 additions & 5 deletions nebula-algorithm/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# data source. optional of nebula,csv,json
source: csv
# data sink, means the algorithm result will be write into this sink. optional of nebula,csv,text
sink: nebula
sink: csv
# if your algorithm needs weight
hasWeight: false
}
Expand Down Expand Up @@ -51,7 +51,7 @@
local: {
# algo's data source from Nebula. If data.source is csv or json, then this local.read can be valid.
read:{
filePath: "hdfs://127.0.0.1:9000/edge/work_for.csv"
filePath: "file:///tmp/algo_edge.csv"
# srcId column
srcId:"_c0"
# dstId column
Expand All @@ -66,16 +66,16 @@

# algo result sink into local file. If data.sink is csv or text, then this local.write can be valid.
write:{
resultPath:/tmp/
resultPath:/tmp/count
}
}


algorithm: {
# the algorithm that you are going to execute,pick one from [pagerank, louvain, connectedcomponent,
# labelpropagation, shortestpaths, degreestatic, kcore, stronglyconnectedcomponent, trianglecount,
# betweenness]
executeAlgo: pagerank
# betweenness, graphtriangleCount]
executeAlgo: graphtrianglecount

# PageRank parameter
pagerank: {
Expand Down Expand Up @@ -118,6 +118,9 @@
# Trianglecount parameter
trianglecount:{}

# graphTriangleCount parameter
graphtrianglecount:{}

# Betweenness centrality parameter
betweenness:{
maxIter:5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package com.vesoft.nebula.algorithm
import com.vesoft.nebula.algorithm.config.Configs.Argument
import com.vesoft.nebula.algorithm.config.{
AlgoConfig,
AlgoConstants,
BetweennessConfig,
CcConfig,
Configs,
Expand All @@ -23,6 +24,7 @@ import com.vesoft.nebula.algorithm.lib.{
BetweennessCentralityAlgo,
ConnectedComponentsAlgo,
DegreeStaticAlgo,
GraphTriangleCountAlgo,
KCoreAlgo,
LabelPropagationAlgo,
LouvainAlgo,
Expand All @@ -35,6 +37,7 @@ import com.vesoft.nebula.algorithm.reader.{CsvReader, JsonReader, NebulaReader}
import com.vesoft.nebula.algorithm.writer.{CsvWriter, NebulaWriter, TextWriter}
import org.apache.commons.math3.ode.UnknownParameterException
import org.apache.log4j.Logger
import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

/**
Expand Down Expand Up @@ -160,6 +163,9 @@ object Main {
case "trianglecount" => {
TriangleCountAlgo(spark, dataSet)
}
case "graphtrianglecount" => {
GraphTriangleCountAlgo(spark, dataSet)
}
case _ => throw new UnknownParameterException("unknown executeAlgo name.")
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

package com.vesoft.nebula.algorithm.lib

import com.vesoft.nebula.algorithm.config.AlgoConstants
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType}

/**
* compute all graph's triangle count
*/
object GraphTriangleCountAlgo {

def apply(spark: SparkSession, dataset: Dataset[Row]): DataFrame = {

val triangleCount = TriangleCountAlgo(spark, dataset)
val count = triangleCount
.select(AlgoConstants.TRIANGLECOUNT_RESULT_COL)
.rdd
.map(value => value.get(0).asInstanceOf[Int])
.reduce(_ + _) / 3
val list = List(count)
val rdd = spark.sparkContext.parallelize(list).map(row => Row(row))

val schema = StructType(
List(
StructField("count", IntegerType, nullable = false)
))
val algoResult = spark.sqlContext
.createDataFrame(rdd, schema)

algoResult
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ object TriangleCountAlgo {

/**
* run the TriangleCount algorithm for nebula graph
*
* compute each vertex's triangle count
*/
def apply(spark: SparkSession, dataset: Dataset[Row]): DataFrame = {

Expand Down

0 comments on commit d73f6db

Please sign in to comment.