Skip to content

Commit

Permalink
set job desc with algorithm name
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicole00 committed Jan 25, 2024
1 parent 7ee3050 commit 764aa61
Show file tree
Hide file tree
Showing 18 changed files with 28 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ object BetweennessCentralityAlgo {
dataset: Dataset[Row],
betweennessConfig: BetweennessConfig,
hasWeight: Boolean): DataFrame = {
spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM")

var encodeIdDf: DataFrame = null

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ object BfsAlgo {
* run the louvain algorithm for nebula graph
*/
def apply(spark: SparkSession, dataset: Dataset[Row], bfsConfig: BfsConfig): DataFrame = {
spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM")

var encodeIdDf: DataFrame = null
var finalRoot: Long = 0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ object ClosenessAlgo {
* run the Closeness algorithm for nebula graph
*/
def apply(spark: SparkSession, dataset: Dataset[Row], hasWeight: Boolean): DataFrame = {
spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM")

val graph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, hasWeight)
val closenessRDD = execute(graph)
val schema = StructType(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ object ClusteringCoefficientAlgo {
def apply(spark: SparkSession,
dataset: Dataset[Row],
coefficientConfig: CoefficientConfig): DataFrame = {
spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM")

var encodeIdDf: DataFrame = null

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import com.vesoft.nebula.algorithm.utils.{DecodeUtil, NebulaUtil}
import org.apache.log4j.Logger
import org.apache.spark.graphx.{Graph, VertexId, VertexRDD}
import org.apache.spark.rdd.RDD
import com.vesoft.nebula.algorithm.utils.NebulaUtil
import org.apache.spark.graphx.lib.ConnectedComponents
import org.apache.spark.sql.types.{DoubleType, LongType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
Expand All @@ -27,6 +26,7 @@ object ConnectedComponentsAlgo {
dataset: Dataset[Row],
ccConfig: CcConfig,
hasWeight: Boolean): DataFrame = {
spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM")

var encodeIdDf: DataFrame = null

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ object DegreeStaticAlgo {
def apply(spark: SparkSession,
dataset: Dataset[Row],
degreeConfig: DegreeStaticConfig = new DegreeStaticConfig): DataFrame = {
spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM")

var encodeIdDf: DataFrame = null

val graph: Graph[None.type, Double] = if (degreeConfig.encodeId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ import scala.collection.mutable

object DfsAlgo {
var iterNums = 0
val ALGORITHM = "dfs"

def apply(spark: SparkSession, dataset: Dataset[Row], dfsConfig: DfsConfig): DataFrame = {
spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM")

var encodeIdDf: DataFrame = null
var finalRoot: Long = 0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructTyp
* compute all graph's triangle count
*/
object GraphTriangleCountAlgo {
val ALGORITHM = "graphTriangleCount"

def apply(spark: SparkSession, dataset: Dataset[Row]): DataFrame = {
spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM")

val triangleCount = TriangleCountAlgo(spark, dataset)
val count = triangleCount
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ object HanpAlgo {
hanpConfig: HanpConfig,
hasWeight: Boolean,
preferences: RDD[(VertexId, Double)] = null): DataFrame = {
spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM")

var encodeIdDf: DataFrame = null

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,11 @@
package com.vesoft.nebula.algorithm.lib

import com.vesoft.nebula.algorithm.config.JaccardConfig
import com.vesoft.nebula.algorithm.lib.HanpAlgo.ALGORITHM
import com.vesoft.nebula.algorithm.utils.{DecodeUtil, NebulaUtil}
import org.apache.log4j.Logger
import org.apache.spark.graphx.Graph
import org.apache.spark.ml.feature.{
CountVectorizer,
CountVectorizerModel,
MinHashLSH,
MinHashLSHModel
}
import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel, MinHashLSH, MinHashLSHModel}
import org.apache.spark.ml.linalg.SparseVector
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
Expand All @@ -30,6 +26,7 @@ object JaccardAlgo {
* run the Jaccard algorithm for nebula graph
*/
def apply(spark: SparkSession, dataset: Dataset[Row], jaccardConfig: JaccardConfig): DataFrame = {
spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM")

var encodeIdDf: DataFrame = null
var data: DataFrame = dataset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ object KCoreAlgo {
* run the louvain algorithm for nebula graph
*/
def apply(spark: SparkSession, dataset: Dataset[Row], kCoreConfig: KCoreConfig): DataFrame = {

spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM")
var encodeIdDf: DataFrame = null

val graph: Graph[None.type, Double] = if (kCoreConfig.encodeId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ object LabelPropagationAlgo {
dataset: Dataset[Row],
lpaConfig: LPAConfig,
hasWeight: Boolean): DataFrame = {
spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM")

var encodeIdDf: DataFrame = null

val graph: Graph[None.type, Double] = if (lpaConfig.encodeId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ object LouvainAlgo {
dataset: Dataset[Row],
louvainConfig: LouvainConfig,
hasWeight: Boolean): DataFrame = {
spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM")

var encodeIdDf: DataFrame = null

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ object Node2vecAlgo {
dataset: Dataset[Row],
node2vecConfig: Node2vecConfig,
hasWeight: Boolean): DataFrame = {
spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM")

val inputGraph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, hasWeight)
this.context = spark.sparkContext
this.node2vecConfig = node2vecConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ object PageRankAlgo {
dataset: Dataset[Row],
pageRankConfig: PRConfig,
hasWeight: Boolean): DataFrame = {
spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM")

var encodeIdDf: DataFrame = null

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ object ShortestPathAlgo {
dataset: Dataset[Row],
shortestPathConfig: ShortestPathConfig,
hasWeight: Boolean): DataFrame = {

spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM")
val graph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, hasWeight)

val prResultRDD = execute(graph, shortestPathConfig.landmarks)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ object StronglyConnectedComponentsAlgo {
dataset: Dataset[Row],
ccConfig: CcConfig,
hasWeight: Boolean): DataFrame = {
spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM")

var encodeIdDf: DataFrame = null

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ object TriangleCountAlgo {
def apply(spark: SparkSession,
dataset: Dataset[Row],
triangleConfig: TriangleConfig = new TriangleConfig): DataFrame = {
spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM")

var encodeIdDf: DataFrame = null

Expand Down

0 comments on commit 764aa61

Please sign in to comment.