Skip to content

Commit

Permalink
add more example (#65)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicole00 authored Feb 10, 2023
1 parent 0e0668e commit 425176c
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 0 deletions.
2 changes: 2 additions & 0 deletions README-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ nebula-algorithm 是一款基于 [GraphX](https://spark.apache.org/graphx/) 的
| ClusteringCoefficient | 聚集系数 |推荐、电信诈骗分析|
| Jaccard |杰卡德相似度计算|相似度计算、推荐|
| BFS |广度优先遍历 |层序遍历、最短路径规划|
| DFS |深度优先遍历 |层序遍历、最短路径规划|
| Node2Vec | - |图分类|

使用 `nebula-algorithm`,可以通过提交 `Spark` 任务的形式使用完整的算法工具对 `Nebula Graph` 数据库中的数据执行图计算,也可以通过编程形式调用`lib`库下的算法针对DataFrame执行图计算。
Expand Down Expand Up @@ -101,6 +102,7 @@ nebula-algorithm 是一款基于 [GraphX](https://spark.apache.org/graphx/) 的
| closeness | closeness |double/string|
| hanp | hanp | int/string |
| bfs | bfs | string |
| dfs | dfs | string |
| jaccard | jaccard | string |
| node2vec | node2vec | string |
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ nebula-algorithm is a Spark Application based on [GraphX](https://spark.apache.o
| ClusteringCoefficient | recommended, telecom fraud analysis|
| Jaccard | similarity calculation, recommendation|
| BFS | sequence traversal, Shortest path plan|
| DFS | sequence traversal, Shortest path plan|
| Node2Vec | graph machine learning, recommendation|


Expand Down Expand Up @@ -111,6 +112,7 @@ If you want to write the algorithm execution result into NebulaGraph(`sink: nebu
| closeness | closeness |double/string|
| hanp | hanp | int/string |
| bfs | bfs | string |
| bfs | dfs | string |
| jaccard | jaccard | string |
| node2vec | node2vec | string |
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/* Copyright (c) 2022 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package com.vesoft.nebula.algorithm

import com.vesoft.nebula.connector.connector.{NebulaDataFrameReader}
import com.facebook.thrift.protocol.TCompactProtocol
import com.vesoft.nebula.algorithm.config.{CcConfig, LPAConfig, LouvainConfig, PRConfig}
import com.vesoft.nebula.algorithm.lib.{
ConnectedComponentsAlgo,
LabelPropagationAlgo,
LouvainAlgo,
PageRankAlgo
}
import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

object AlgoPerformanceTest {

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol]))
val spark = SparkSession
.builder()
.config(sparkConf)
.getOrCreate()

val df = readNebulaData(spark)
lpa(spark, df)
louvain(spark, df)
pagerank(spark, df)
wcc(spark, df)

}

def readNebulaData(spark: SparkSession): DataFrame = {
val start = System.currentTimeMillis()
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("127.0.0.0.1:9559")
.withTimeout(6000)
.withConenctionRetry(2)
.build()
val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("twitter")
.withLabel("FOLLOW")
.withNoColumn(true)
.withLimit(20000)
.withPartitionNum(120)
.build()
val df: DataFrame =
spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF()
df.cache()
df.count()
println(s"read data cost time ${(System.currentTimeMillis() - start)}")
df
}

def lpa(spark: SparkSession, df: DataFrame): Unit = {
val start = System.currentTimeMillis()
val lpaConfig = LPAConfig(10)
val lpa = LabelPropagationAlgo.apply(spark, df, lpaConfig, false)
lpa.write.csv("hdfs://127.0.0.1:9000/tmp/lpa")
println(s"lpa compute and save cost ${System.currentTimeMillis() - start}")
}

def pagerank(spark: SparkSession, df: DataFrame): Unit = {
val start = System.currentTimeMillis()
val pageRankConfig = PRConfig(10, 0.85)
val pr = PageRankAlgo.apply(spark, df, pageRankConfig, false)
pr.write.csv("hdfs://127.0.0.1:9000/tmp/pagerank")
println(s"pagerank compute and save cost ${System.currentTimeMillis() - start}")
}

def wcc(spark: SparkSession, df: DataFrame): Unit = {
val start = System.currentTimeMillis()
val ccConfig = CcConfig(20)
val cc = ConnectedComponentsAlgo.apply(spark, df, ccConfig, false)
cc.write.csv("hdfs://127.0.0.1:9000/tmp/wcc")
println(s"wcc compute and save cost ${System.currentTimeMillis() - start}")
}

def louvain(spark: SparkSession, df: DataFrame): Unit = {
val start = System.currentTimeMillis()
val louvainConfig = LouvainConfig(10, 5, 0.5)
val louvain = LouvainAlgo.apply(spark, df, louvainConfig, false)
louvain.write.csv("hdfs://127.0.0.1:9000/tmp/louvain")
println(s"louvain compute and save cost ${System.currentTimeMillis() - start}")
}

}

0 comments on commit 425176c

Please sign in to comment.