Skip to content

Commit

Permalink
feat: support read and write from hive datasource (#100)
Browse files Browse the repository at this point in the history
* feat: support read and write from hive datasource

* feat: connect hive by meta store

* refactor: remove show dataFrame
  • Loading branch information
awang12345 authored Aug 19, 2024
1 parent 20573b3 commit 4accdfe
Show file tree
Hide file tree
Showing 8 changed files with 220 additions and 25 deletions.
39 changes: 36 additions & 3 deletions nebula-algorithm/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,47 @@
}

data: {
# data source. optional of nebula,nebula-ngql,csv,json
# data source. optional of nebula,nebula-ngql,csv,json,hive
source: csv
# data sink, means the algorithm result will be write into this sink. optional of nebula,csv,text
# data sink, means the algorithm result will be write into this sink. optional of nebula,csv,text,hive
sink: csv
# if your algorithm needs weight
hasWeight: false
}

# Hive related config
hive: {
#[Optional] spark and hive require configuration on different clusters. Read and write connect hive with this metastore
metaStoreUris: "thrift://hive-metastore-server:9083"
# algo's data source from hive
read: {
#spark sql
sql: "select column_1,column_2,column_3 from database_01.table_01 "
#[Optional] graph source vid mapping with column of sql result.
srcId: "column_1"
#[Optional] graph dest vid mapping with column of sql result
dstId: "column_2"
#[Optional] graph weight mapping with column of sql result
weight: "column_3"
}

# algo result sink into hive
write: {
#save result to hive table
dbTableName: "database_02.table_02"
#[Optional] spark dataframe save mode,optional of Append,Overwrite,ErrorIfExists,Ignore. Default is Overwrite
saveMode: "Overwrite"
#[Optional] if auto create hive table. Default is true
autoCreateTable: true
#[Optional] algorithm result mapping with hive table column name. Default same with column name of algo result dataframe
resultTableColumnMapping: {
# Note: Different algorithms have different output fields, so let's take the pagerank algorithm for example:
_id: "column_1"
pagerank: "pagerank_value"
}
}
}

# NebulaGraph related config
nebula: {
# algo's data source from Nebula. If data.source is nebula, then this nebula.read config can be valid.
Expand Down Expand Up @@ -78,7 +111,7 @@
# the algorithm that you are going to execute,pick one from [pagerank, louvain, connectedcomponent,
# labelpropagation, shortestpaths, degreestatic, kcore, stronglyconnectedcomponent, trianglecount,
# betweenness, graphtriangleCount, clusteringcoefficient, bfs, hanp, closeness, jaccard, node2vec]
executeAlgo: graphtrianglecount
executeAlgo: pagerank

# PageRank parameter
pagerank: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ object Main {
val algoTime = System.currentTimeMillis()

// writer
saveAlgoResult(algoResult, configs)
saveAlgoResult(sparkConfig.spark, algoResult, configs)
val endTime = System.currentTimeMillis()

sparkConfig.spark.stop()
Expand Down Expand Up @@ -149,8 +149,8 @@ object Main {
}
}

private[this] def saveAlgoResult(algoResult: DataFrame, configs: Configs): Unit = {
private[this] def saveAlgoResult(spark: SparkSession, algoResult: DataFrame, configs: Configs): Unit = {
val writer = AlgoWriter.make(configs)
writer.write(algoResult, configs)
writer.write(spark, algoResult, configs)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.apache.log4j.Logger
import scala.collection.JavaConverters._
import com.typesafe.config.{Config, ConfigFactory}
import com.vesoft.nebula.algorithm.config.Configs.readConfig
import com.vesoft.nebula.algorithm.config.Configs.getOrElse

import scala.collection.mutable

Expand Down Expand Up @@ -129,6 +130,51 @@ object LocalConfigEntry {
}
}


object HiveConfigEntry {
def apply(config: Config): HiveConfigEntry = {
//uri of hive metastore. eg: thrift://127.0.0.1:9083
val hiveMetaStoreUris: String = getOrElse(config, "hive.metaStoreUris", "")
val readConfigEntry = buildReadConfig(config)
val writeConfigEntry = buildWriteConfig(config)
HiveConfigEntry(hiveMetaStoreUris,readConfigEntry, writeConfigEntry)
}

def buildReadConfig(config: Config): HiveReadConfigEntry = {
//source data of spark sql
val sql: String = getOrElse(config, "hive.read.sql", "")
//the source vertex ID is mapped with the SQL result column name
val srcIdCol: String = getOrElse(config, "hive.read.srcId", "")
//the dest vertex ID is mapped with the SQL result column name
val dstIdCol: String = getOrElse(config, "hive.read.dstId", "")
//the weight is mapped with the SQL result column name
val weightCol: String = getOrElse(config, "hive.read.weight", "")
HiveReadConfigEntry(sql, srcIdCol, dstIdCol, weightCol)
}

def buildWriteConfig(config: Config): HiveWriteConfigEntry = {
//algo result save to hive table
val dbTableName: String = getOrElse(config, "hive.write.dbTableName", "")
//save mode of spark
val saveMode: String = getOrElse(config, "hive.write.saveMode", "")
//Whether the table is automatically created
val autoCreateTable: Boolean = getOrElse(config, "hive.write.autoCreateTable", true)
//algo results dataframe column and hive table column mapping relationships
val resultColumnMapping = mutable.Map[String, String]()
val mappingKey = "hive.write.resultTableColumnMapping"
if (config.hasPath(mappingKey)) {
val mappingConfig = config.getObject(mappingKey)
for (subkey <- mappingConfig.unwrapped().keySet().asScala) {
val key = s"${mappingKey}.${subkey}"
val value = config.getString(key)
resultColumnMapping += subkey -> value
}
}
HiveWriteConfigEntry(dbTableName, saveMode, autoCreateTable, resultColumnMapping)
}

}

/**
* SparkConfigEntry support key-value pairs for spark session.
*
Expand Down Expand Up @@ -173,6 +219,34 @@ case class LocalConfigEntry(filePath: String,
}
}

case class HiveConfigEntry(hiveMetaStoreUris: String,
hiveReadConfigEntry: HiveReadConfigEntry,
hiveWriteConfigEntry: HiveWriteConfigEntry) {
override def toString: String = {
s"HiveConfigEntry: {hiveMetaStoreUris:$hiveMetaStoreUris, read: $hiveReadConfigEntry, write: $hiveWriteConfigEntry}"
}
}

case class HiveReadConfigEntry(sql: String,
srcIdCol: String = "srcId",
dstIdCol: String = "dstId",
weightCol: String) {
override def toString: String = {
s"HiveReadConfigEntry: {sql: $sql, srcIdCol: $srcIdCol, dstIdCol: $dstIdCol, " +
s"weightCol:$weightCol}"
}
}

case class HiveWriteConfigEntry(dbTableName: String,
saveMode: String,
autoCreateTable: Boolean,
resultColumnMapping: mutable.Map[String, String]) {
override def toString: String = {
s"HiveWriteConfigEntry: {dbTableName: $dbTableName, saveMode=$saveMode, " +
s"autoCreateTable=$autoCreateTable, resultColumnMapping=$resultColumnMapping}"
}
}

/**
* NebulaConfigEntry
* @param readConfigEntry config for nebula-spark-connector reader
Expand Down Expand Up @@ -218,6 +292,7 @@ case class Configs(sparkConfig: SparkConfigEntry,
dataSourceSinkEntry: DataSourceSinkEntry,
nebulaConfig: NebulaConfigEntry,
localConfigEntry: LocalConfigEntry,
hiveConfigEntry: HiveConfigEntry,
algorithmConfig: AlgorithmConfigEntry)

object Configs {
Expand All @@ -237,10 +312,11 @@ object Configs {
val dataSourceEntry = DataSourceSinkEntry(config)
val localConfigEntry = LocalConfigEntry(config)
val nebulaConfigEntry = NebulaConfigEntry(config)
val sparkEntry = SparkConfigEntry(config)
val algorithmEntry = AlgorithmConfigEntry(config)
val hiveConfigEntry = HiveConfigEntry(config)
val sparkEntry = SparkConfigEntry(config)
val algorithmEntry = AlgorithmConfigEntry(config)

Configs(sparkEntry, dataSourceEntry, nebulaConfigEntry, localConfigEntry, algorithmEntry)
Configs(sparkEntry, dataSourceEntry, nebulaConfigEntry, localConfigEntry, hiveConfigEntry, algorithmEntry)
}

/**
Expand Down Expand Up @@ -277,15 +353,15 @@ object Configs {
}

/**
* Get the value from config by the path. If the path not exist, return the default value.
*
* @param config The config.
* @param path The path of the config.
* @param defaultValue The default value for the path.
*
* @return
*/
private[this] def getOrElse[T](config: Config, path: String, defaultValue: T): T = {
* Get the value from config by the path. If the path not exist, return the default value.
*
* @param config The config.
* @param path The path of the config.
* @param defaultValue The default value for the path.
*
* @return
*/
def getOrElse[T](config: Config, path: String, defaultValue: T): T = {
if (config.hasPath(path)) {
config.getAnyRef(path).asInstanceOf[T]
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package com.vesoft.nebula.algorithm.config

import com.vesoft.nebula.algorithm.reader.ReaderType
import com.vesoft.nebula.algorithm.writer.WriterType
import org.apache.spark.sql.SparkSession

case class SparkConfig(spark: SparkSession, partitionNum: Int)
Expand All @@ -20,12 +22,29 @@ object SparkConfig {
sparkConfigs.foreach { case (key, value) =>
session.config(key, value)
}

// set hive config
setHiveConfig(session, configs)

val partitionNum = sparkConfigs.getOrElse("spark.app.partitionNum", "0")
val spark = session.getOrCreate()
validate(spark.version, "2.4.*")
SparkConfig(spark, partitionNum.toInt)
}

private def setHiveConfig(session: org.apache.spark.sql.SparkSession.Builder, configs: Configs): Unit = {
val dataSource = configs.dataSourceSinkEntry
if (dataSource.source.equals(ReaderType.hive.stringify)
|| dataSource.sink.equals(WriterType.hive.stringify)) {
session.enableHiveSupport()
val uris = configs.hiveConfigEntry.hiveMetaStoreUris
if (uris != null && uris.trim.nonEmpty) {
session.config("hive.metastore.schema.verification", false)
session.config("hive.metastore.uris", uris)
}
}
}

private def validate(sparkVersion: String, supportedVersions: String*): Unit = {
if (sparkVersion != "UNKNOWN" && !supportedVersions.exists(sparkVersion.matches)) {
throw new RuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ object DataReader {
case ReaderType.nebulaNgql => new NebulaNgqlReader
case ReaderType.nebula => new NebulaReader
case ReaderType.csv => new CsvReader
case ReaderType.hive => new HiveReader
}
.getOrElse(throw new UnsupportedOperationException("unsupported reader"))
}
Expand Down Expand Up @@ -179,3 +180,30 @@ final class JsonReader extends DataReader {
data
}
}
final class HiveReader extends DataReader {

override val tpe: ReaderType = ReaderType.hive
override def read(spark: SparkSession, configs: Configs, partitionNum: Int): DataFrame = {
val readConfig = configs.hiveConfigEntry.hiveReadConfigEntry
val sql = readConfig.sql
val srcIdCol = readConfig.srcIdCol
val dstIdCol = readConfig.dstIdCol
val weightCol = readConfig.weightCol

var data = spark.sql(sql)

if (srcIdCol != null && dstIdCol != null && srcIdCol.trim.nonEmpty && dstIdCol.trim.nonEmpty) {
if (configs.dataSourceSinkEntry.hasWeight && weightCol != null && weightCol.trim.nonEmpty) {
data = data.select(srcIdCol, dstIdCol, weightCol)
} else {
data = data.select(srcIdCol, dstIdCol)
}
}

if (partitionNum != 0) {
data.repartition(partitionNum)
}

data
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@ sealed trait ReaderType {
case ReaderType.nebulaNgql => "nebula-ngql"
case ReaderType.nebula => "nebula"
case ReaderType.csv => "csv"
case ReaderType.hive => "hive"
}
}
object ReaderType {
lazy val mapping: Map[String, ReaderType] = Map(
json.stringify -> json,
nebulaNgql.stringify -> nebulaNgql,
nebula.stringify -> nebula,
csv.stringify -> csv
csv.stringify -> csv,
hive.stringify -> hive
)
object json extends ReaderType
object nebulaNgql extends ReaderType
object nebula extends ReaderType
object csv extends ReaderType
object hive extends ReaderType
}
Loading

0 comments on commit 4accdfe

Please sign in to comment.