Skip to content

Commit

Permalink
support custom rules for datasource (#139)
Browse files Browse the repository at this point in the history
* support custom rules for datasource, such as concat two columns as one new column with customized separator

* update config template
  • Loading branch information
Nicole00 authored May 17, 2023
1 parent 2a4ab78 commit a2da575
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, FileSystem, Path}
import org.apache.log4j.Logger

import scala.collection.JavaConversions.asScalaBuffer
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -166,6 +167,12 @@ case class CaSignParam(caCrtFilePath: String, crtFilePath: String, keyFilePath:

case class SelfSignParam(crtFilePath: String, keyFilePath: String, password: String)

case class UdfConfigEntry(sep: String, oldColNames: List[String], newColName: String) {
override def toString(): String = {
s"sep:$sep, oldColNames: $oldColNames, newColName: $newColName"
}
}

/**
*
*/
Expand Down Expand Up @@ -431,6 +438,13 @@ object Configs {
val repartitionWithNebula = getOrElse(tagConfig, "repartitionWithNebula", true)
val ignoreIndex = getOrElse(tagConfig, "ignoreIndex", false)

val vertexUdf = if (tagConfig.hasPath("vertex.udf")) {
val sep = tagConfig.getString("vertex.udf.separator")
val cols: List[String] = tagConfig.getStringList("vertex.udf.oldColNames").toList
val newCol = tagConfig.getString("vertex.udf.newColName")
Some(UdfConfigEntry(sep, cols, newCol))
} else None

LOG.info(s"name ${tagName} batch ${batch}")
val entry = TagConfigEntry(
tagName,
Expand All @@ -445,7 +459,8 @@ object Configs {
checkPointPath,
repartitionWithNebula,
enableTagless,
ignoreIndex
ignoreIndex,
vertexUdf
)
LOG.info(s"Tag Config: ${entry}")
tags += entry
Expand Down Expand Up @@ -553,6 +568,20 @@ object Configs {
val repartitionWithNebula = getOrElse(edgeConfig, "repartitionWithNebula", false)
val ignoreIndex = getOrElse(edgeConfig, "ignoreIndex", false)

val srcUdf = if (edgeConfig.hasPath("source.udf")) {
val sep = edgeConfig.getString("source.udf.separator")
val cols: List[String] = edgeConfig.getStringList("source.udf.oldColNames").toList
val newCol = edgeConfig.getString("source.udf.newColName")
Some(UdfConfigEntry(sep, cols, newCol))
} else None

val dstUdf = if (edgeConfig.hasPath("target.udf")) {
val sep = edgeConfig.getString("target.udf.separator")
val cols: List[String] = edgeConfig.getStringList("target.udf.oldColNames").toList
val newCol = edgeConfig.getString("target.udf.newColName")
Some(UdfConfigEntry(sep, cols, newCol))
} else None

val entry = EdgeConfigEntry(
edgeName,
sourceConfig,
Expand All @@ -571,7 +600,9 @@ object Configs {
partition,
checkPointPath,
repartitionWithNebula,
ignoreIndex
ignoreIndex,
srcUdf,
dstUdf
)
LOG.info(s"Edge Config: ${entry}")
edges += entry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ case class TagConfigEntry(override val name: String,
override val checkPointPath: Option[String],
repartitionWithNebula: Boolean = true,
enableTagless: Boolean = false,
ignoreIndex: Boolean = false)
ignoreIndex: Boolean = false,
vertexUdf: Option[UdfConfigEntry] = None)
extends SchemaConfigEntry {
require(
name.trim.nonEmpty && vertexField.trim.nonEmpty
Expand All @@ -77,7 +78,9 @@ case class TagConfigEntry(override val name: String,
s"batch: $batch, " +
s"partition: $partition, " +
s"repartitionWithNebula: $repartitionWithNebula, " +
s"enableTagless: $enableTagless."
s"enableTagless: $enableTagless, " +
s"ignoreIndex: $ignoreIndex, " +
s"vertexUdf: $vertexUdf."
}
}

Expand Down Expand Up @@ -117,7 +120,9 @@ case class EdgeConfigEntry(override val name: String,
override val partition: Int,
override val checkPointPath: Option[String],
repartitionWithNebula: Boolean = false,
ignoreIndex: Boolean = false)
ignoreIndex: Boolean = false,
srcVertexUdf: Option[UdfConfigEntry] = None,
dstVertexUdf: Option[UdfConfigEntry] = None)
extends SchemaConfigEntry {
require(
name.trim.nonEmpty && sourceField.trim.nonEmpty && targetField.trim.nonEmpty
Expand All @@ -136,7 +141,10 @@ case class EdgeConfigEntry(override val name: String,
s"target field: $targetField, " +
s"target policy: $targetPolicy, " +
s"batch: $batch, " +
s"partition: $partition."
s"partition: $partition, " +
s"ignoreIndex: $ignoreIndex, " +
s"srcVertexUdf: $srcVertexUdf" +
s"dstVertexUdf: $dstVertexUdf."
} else {
s"Edge name: $name, " +
s"source: $dataSourceConfigEntry, " +
Expand All @@ -147,7 +155,10 @@ case class EdgeConfigEntry(override val name: String,
s"target field: $targetField, " +
s"target policy: $targetPolicy, " +
s"batch: $batch, " +
s"partition: $partition."
s"partition: $partition, " +
s"ignoreIndex: $ignoreIndex, " +
s"srcVertexUdf: $srcVertexUdf" +
s"dstVertexUdf: $dstVertexUdf."
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package com.vesoft.nebula.exchange

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import java.io.File

import com.vesoft.exchange.Argument
Expand All @@ -27,7 +27,8 @@ import com.vesoft.exchange.common.config.{
PostgreSQLSourceConfigEntry,
PulsarSourceConfigEntry,
SinkCategory,
SourceCategory
SourceCategory,
UdfConfigEntry
}
import com.vesoft.nebula.exchange.reader.{
CSVReader,
Expand All @@ -51,8 +52,12 @@ import com.vesoft.exchange.common.processor.ReloadProcessor
import com.vesoft.exchange.common.utils.SparkValidate
import com.vesoft.nebula.exchange.processor.{EdgeProcessor, VerticesProcessor}
import org.apache.log4j.Logger
import org.apache.spark.sql.functions.{col, concat_ws}
import org.apache.spark.sql.types.StringType
import org.apache.spark.{SparkConf, SparkEnv}

import scala.collection.mutable.ListBuffer

final case class TooManyErrorsException(private val message: String) extends Exception(message)

/**
Expand Down Expand Up @@ -142,8 +147,13 @@ object Exchange {
val fields = tagConfig.vertexField :: tagConfig.fields
val data = createDataSource(spark, tagConfig.dataSourceConfigEntry, fields)
if (data.isDefined && !c.dry) {
data.get.cache()
val count = data.get.count()
val df = if (tagConfig.vertexUdf.isDefined) {
dataUdf(data.get, tagConfig.vertexUdf.get)
} else {
data.get
}
df.cache()
val count = df.count()
val startTime = System.currentTimeMillis()
val batchSuccess =
spark.sparkContext.longAccumulator(s"batchSuccess.${tagConfig.name}")
Expand All @@ -152,7 +162,7 @@ object Exchange {

val processor = new VerticesProcessor(
spark,
repartition(data.get, tagConfig.partition, tagConfig.dataSourceConfigEntry.category),
repartition(df, tagConfig.partition, tagConfig.dataSourceConfigEntry.category),
tagConfig,
fieldKeys,
nebulaKeys,
Expand All @@ -161,7 +171,7 @@ object Exchange {
batchFailure
)
processor.process()
data.get.unpersist()
df.unpersist()
val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f")
LOG.info(
s"import for tag ${tagConfig.name}: data total count: $count, total time: ${costTime}s")
Expand Down Expand Up @@ -195,15 +205,23 @@ object Exchange {
}
val data = createDataSource(spark, edgeConfig.dataSourceConfigEntry, fields)
if (data.isDefined && !c.dry) {
data.get.cache()
val count = data.get.count()
var df = data.get
if (edgeConfig.srcVertexUdf.isDefined) {
df = dataUdf(df, edgeConfig.srcVertexUdf.get)
}
if (edgeConfig.dstVertexUdf.isDefined) {
df = dataUdf(df, edgeConfig.dstVertexUdf.get)
}

df.cache()
val count = df.count()
val startTime = System.currentTimeMillis()
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.${edgeConfig.name}")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.${edgeConfig.name}")

val processor = new EdgeProcessor(
spark,
repartition(data.get, edgeConfig.partition, edgeConfig.dataSourceConfigEntry.category),
repartition(df, edgeConfig.partition, edgeConfig.dataSourceConfigEntry.category),
edgeConfig,
fieldKeys,
nebulaKeys,
Expand All @@ -212,7 +230,7 @@ object Exchange {
batchFailure
)
processor.process()
data.get.unpersist()
df.unpersist()
val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f")
LOG.info(
s"import for edge ${edgeConfig.name}: data total count: $count, total time: ${costTime}s")
Expand Down Expand Up @@ -363,4 +381,17 @@ object Exchange {
frame
}
}

private[this] def dataUdf(data: DataFrame, udfConfig: UdfConfigEntry): DataFrame = {
val oldCols = udfConfig.oldColNames
val sep = udfConfig.sep
val newCol = udfConfig.newColName
val originalFieldsNames = data.schema.fieldNames.toList
val finalColNames: ListBuffer[Column] = new ListBuffer[Column]
for (field <- originalFieldsNames) {
finalColNames.append(col(field))
}
finalColNames.append(concat_ws(sep, oldCols.map(c => col(c)): _*).cast(StringType).as(newCol))
data.select(finalColNames: _*)
}
}
18 changes: 17 additions & 1 deletion nebula-exchange_spark_2.4/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,16 @@
sink: client
}
path: hdfs tag path 0

fields: [parquet-field-0, parquet-field-1, parquet-field-2]
nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2]
vertex: {
field:parquet-field-0
field:new-parquet-field
udf:{
separator:"_"
oldColNames:[parquet-field-0]
newColNames:[new-parquet-field]
}
#policy:hash
}
batch: 2000
Expand Down Expand Up @@ -367,10 +373,20 @@
nebula.fields: [nebula-field-0 nebula-field-1 nebula-field-2]
source: {
field:parquet-field-0
udf:{
separator:"_"
oldColNames:[parquet-field-0]
newColName:[new-parquet-field]
}
#policy:hash
}
target: {
field:parquet-field-1
udf:{
separator:"_"
oldColNames:[parquet-field-0]
newColName:[new-parquet-field]
}
#policy:hash
}
ranking: parquet-field-2
Expand Down
Loading

0 comments on commit a2da575

Please sign in to comment.