Skip to content

Commit

Permalink
SparkConfig refactor (#90)
Browse files Browse the repository at this point in the history
  • Loading branch information
artemkorsakov authored Jan 12, 2024
1 parent 656d01c commit 8f454d0
Showing 1 changed file with 4 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,22 @@ case class SparkConfig(spark: SparkSession, partitionNum: Int)

object SparkConfig {

var spark: SparkSession = _

var partitionNum: String = _

def getSpark(configs: Configs, defaultAppName: String = "algorithm"): SparkConfig = {
val sparkConfigs = configs.sparkConfig.map
val session = SparkSession.builder
.appName(defaultAppName)
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

for (key <- sparkConfigs.keySet) {
session.config(key, sparkConfigs(key))
sparkConfigs.foreach { case (key, value) =>
session.config(key, value)
}
partitionNum = sparkConfigs.getOrElse("spark.app.partitionNum", "0")
val partitionNum = sparkConfigs.getOrElse("spark.app.partitionNum", "0")
val spark = session.getOrCreate()
validate(spark.version, "2.4.*")
SparkConfig(spark, partitionNum.toInt)
}

def validate(sparkVersion: String, supportedVersions: String*): Unit = {
private def validate(sparkVersion: String, supportedVersions: String*): Unit = {
if (sparkVersion != "UNKNOWN" && !supportedVersions.exists(sparkVersion.matches)) {
throw new RuntimeException(
s"""Your current spark version ${sparkVersion} is not supported by the current NebulaGraph Algorithm.
Expand Down

0 comments on commit 8f454d0

Please sign in to comment.