From c1215549ccf5e6b40baff0bdee3e978ada684e45 Mon Sep 17 00:00:00 2001 From: Anqi Date: Wed, 24 Jan 2024 10:52:37 +0800 Subject: [PATCH] support to filter the datasource --- .../exchange/common/config/Configs.scala | 19 +++++- .../common/config/SchemaConfigs.scala | 9 ++- .../com/vesoft/nebula/exchange/Exchange.scala | 63 +++++-------------- .../com/vesoft/nebula/exchange/Exchange.scala | 63 +++++-------------- .../com/vesoft/nebula/exchange/Exchange.scala | 63 +++++-------------- 5 files changed, 74 insertions(+), 143 deletions(-) diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala index 5552c1c3..6224f62b 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala @@ -187,6 +187,10 @@ case class UdfConfigEntry(sep: String, oldColNames: List[String], newColName: St } } +case class FilterConfigEntry(filter: String){ + override def toString(): String = s"filter:$filter" +} + /** * */ @@ -467,6 +471,10 @@ object Configs { Some(UdfConfigEntry(sep, cols, newCol)) } else None + val filterConfig = if(tagConfig.hasPath("filter")) { + Some(FilterConfigEntry(tagConfig.getString("filter"))) + } else None + LOG.info(s"name ${tagName} batch ${batch}") val entry = TagConfigEntry( tagName, @@ -485,7 +493,8 @@ object Configs { enableTagless, ignoreIndex, deleteEdge, - vertexUdf + vertexUdf, + filterConfig ) LOG.info(s"Tag Config: ${entry}") tags += entry @@ -608,6 +617,11 @@ object Configs { Some(UdfConfigEntry(sep, cols, newCol)) } else None + + val filterConfig = if (edgeConfig.hasPath("filter")) { + Some(FilterConfigEntry(edgeConfig.getString("filter"))) + } else None + val entry = EdgeConfigEntry( edgeName, sourceConfig, @@ -631,7 +645,8 @@ object Configs { repartitionWithNebula, ignoreIndex, srcUdf, - dstUdf + dstUdf, + filterConfig ) LOG.info(s"Edge Config: ${entry}") edges += entry diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/SchemaConfigs.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/SchemaConfigs.scala index 08381303..3138e6dd 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/SchemaConfigs.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/SchemaConfigs.scala @@ -69,7 +69,8 @@ case class TagConfigEntry(override val name: String, enableTagless: Boolean = false, ignoreIndex: Boolean = false, deleteEdge: Boolean = false, - vertexUdf: Option[UdfConfigEntry] = None) + vertexUdf: Option[UdfConfigEntry] = None, + filterConfig: Option[FilterConfigEntry] = None) extends SchemaConfigEntry { require(name.trim.nonEmpty, "tag name cannot be empty") require(vertexField.trim.nonEmpty, "tag vertex id cannot be empty") @@ -89,7 +90,8 @@ case class TagConfigEntry(override val name: String, s"repartitionWithNebula: $repartitionWithNebula, " + s"enableTagless: $enableTagless, " + s"ignoreIndex: $ignoreIndex, " + - s"vertexUdf: $vertexUdf." + s"vertexUdf: $vertexUdf, " + + s"filter: $filterConfig." } } @@ -134,7 +136,8 @@ case class EdgeConfigEntry(override val name: String, repartitionWithNebula: Boolean = false, ignoreIndex: Boolean = false, srcVertexUdf: Option[UdfConfigEntry] = None, - dstVertexUdf: Option[UdfConfigEntry] = None) + dstVertexUdf: Option[UdfConfigEntry] = None, + filterConfig: Option[FilterConfigEntry] = None) extends SchemaConfigEntry { require(name.trim.nonEmpty, "edge name cannot be empty") require(sourceField.trim.nonEmpty, "edge source id cannot be empty") diff --git a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index 447c085b..fd3f055d 100644 --- a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -10,47 +10,8 @@ import org.apache.spark.sql.{Column, DataFrame, SparkSession} import java.io.File import com.vesoft.exchange.Argument import com.vesoft.exchange.common.{CheckPointHandler, ErrorHandler} -import com.vesoft.exchange.common.config.{ - ClickHouseConfigEntry, - Configs, - DataSourceConfigEntry, - EdgeConfigEntry, - FileBaseSourceConfigEntry, - HBaseSourceConfigEntry, - HiveSourceConfigEntry, - JanusGraphSourceConfigEntry, - JdbcConfigEntry, - KafkaSourceConfigEntry, - MaxComputeConfigEntry, - MySQLSourceConfigEntry, - Neo4JSourceConfigEntry, - OracleConfigEntry, - PostgreSQLSourceConfigEntry, - PulsarSourceConfigEntry, - SchemaConfigEntry, - SinkCategory, - SourceCategory, - TagConfigEntry, - UdfConfigEntry -} -import com.vesoft.nebula.exchange.reader.{ - CSVReader, - ClickhouseReader, - HBaseReader, - HiveReader, - JSONReader, - JanusGraphReader, - JdbcReader, - KafkaReader, - MaxcomputeReader, - MySQLReader, - Neo4JReader, - ORCReader, - OracleReader, - ParquetReader, - PostgreSQLReader, - PulsarReader -} +import com.vesoft.exchange.common.config.{ClickHouseConfigEntry, Configs, DataSourceConfigEntry, EdgeConfigEntry, FileBaseSourceConfigEntry, FilterConfigEntry, HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, JdbcConfigEntry, KafkaSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, OracleConfigEntry, PostgreSQLSourceConfigEntry, PulsarSourceConfigEntry, SchemaConfigEntry, SinkCategory, SourceCategory, TagConfigEntry, UdfConfigEntry} +import com.vesoft.nebula.exchange.reader.{CSVReader, ClickhouseReader, HBaseReader, HiveReader, JSONReader, JanusGraphReader, JdbcReader, KafkaReader, MaxcomputeReader, MySQLReader, Neo4JReader, ORCReader, OracleReader, ParquetReader, PostgreSQLReader, PulsarReader} import com.vesoft.exchange.common.processor.ReloadProcessor import com.vesoft.exchange.common.utils.SparkValidate import com.vesoft.nebula.exchange.processor.{EdgeProcessor, VerticesProcessor} @@ -124,7 +85,7 @@ object Exchange { var totalClientBatchFailure: Long = 0L var totalClientRecordSuccess: Long = 0L var totalClientRecordFailure: Long = 0L - var totalSstRecordSuccess: Long = 0l + var totalSstRecordSuccess: Long = 0L var totalSstRecordFailure: Long = 0L // reload for failed import tasks @@ -171,10 +132,11 @@ object Exchange { data.get.show(truncate = false) } if (data.isDefined && !c.dry) { - val df = if (tagConfig.vertexUdf.isDefined) { - dataUdf(data.get, tagConfig.vertexUdf.get) + var df = filterDf(data.get, tagConfig.filterConfig) + df = if (tagConfig.vertexUdf.isDefined) { + dataUdf(df, tagConfig.vertexUdf.get) } else { - data.get + df } val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.${tagConfig.name}") @@ -238,7 +200,7 @@ object Exchange { data.get.show(truncate = false) } if (data.isDefined && !c.dry) { - var df = data.get + var df = filterDf(data.get, edgeConfig.filterConfig) if (edgeConfig.srcVertexUdf.isDefined) { df = dataUdf(df, edgeConfig.srcVertexUdf.get) } @@ -437,4 +399,13 @@ object Exchange { finalColNames.append(concat_ws(sep, oldCols.map(c => col(c)): _*).cast(StringType).as(newCol)) data.select(finalColNames: _*) } + + + private[this] def filterDf(data: DataFrame, filter: Option[FilterConfigEntry]): DataFrame = { + if (filter.isDefined && filter.get != null && filter.get.filter != null) { + data.filter(filter.get.filter) + } else { + data + } + } } diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index 26622a16..ecc3374a 100644 --- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -10,47 +10,8 @@ import org.apache.spark.sql.{Column, DataFrame, SparkSession} import java.io.File import com.vesoft.exchange.Argument import com.vesoft.exchange.common.{CheckPointHandler, ErrorHandler} -import com.vesoft.exchange.common.config.{ - ClickHouseConfigEntry, - Configs, - DataSourceConfigEntry, - EdgeConfigEntry, - FileBaseSourceConfigEntry, - HBaseSourceConfigEntry, - HiveSourceConfigEntry, - JanusGraphSourceConfigEntry, - JdbcConfigEntry, - KafkaSourceConfigEntry, - MaxComputeConfigEntry, - MySQLSourceConfigEntry, - Neo4JSourceConfigEntry, - OracleConfigEntry, - PostgreSQLSourceConfigEntry, - PulsarSourceConfigEntry, - SchemaConfigEntry, - SinkCategory, - SourceCategory, - TagConfigEntry, - UdfConfigEntry -} -import com.vesoft.nebula.exchange.reader.{ - CSVReader, - ClickhouseReader, - HBaseReader, - HiveReader, - JSONReader, - JanusGraphReader, - JdbcReader, - KafkaReader, - MaxcomputeReader, - MySQLReader, - Neo4JReader, - ORCReader, - OracleReader, - ParquetReader, - PostgreSQLReader, - PulsarReader -} +import com.vesoft.exchange.common.config.{ClickHouseConfigEntry, Configs, DataSourceConfigEntry, EdgeConfigEntry, FileBaseSourceConfigEntry, FilterConfigEntry, HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, JdbcConfigEntry, KafkaSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, OracleConfigEntry, PostgreSQLSourceConfigEntry, PulsarSourceConfigEntry, SchemaConfigEntry, SinkCategory, SourceCategory, TagConfigEntry, UdfConfigEntry} +import com.vesoft.nebula.exchange.reader.{CSVReader, ClickhouseReader, HBaseReader, HiveReader, JSONReader, JanusGraphReader, JdbcReader, KafkaReader, MaxcomputeReader, MySQLReader, Neo4JReader, ORCReader, OracleReader, ParquetReader, PostgreSQLReader, PulsarReader} import com.vesoft.exchange.common.processor.ReloadProcessor import com.vesoft.exchange.common.utils.SparkValidate import com.vesoft.nebula.exchange.processor.{EdgeProcessor, VerticesProcessor} @@ -124,7 +85,7 @@ object Exchange { var totalClientBatchFailure: Long = 0L var totalClientRecordSuccess: Long = 0L var totalClientRecordFailure: Long = 0L - var totalSstRecordSuccess: Long = 0l + var totalSstRecordSuccess: Long = 0L var totalSstRecordFailure: Long = 0L // reload for failed import tasks @@ -170,10 +131,11 @@ object Exchange { data.get.show(truncate = false) } if (data.isDefined && !c.dry) { - val df = if (tagConfig.vertexUdf.isDefined) { - dataUdf(data.get, tagConfig.vertexUdf.get) + var df = filterDf(data.get, tagConfig.filterConfig) + df = if (tagConfig.vertexUdf.isDefined) { + dataUdf(df, tagConfig.vertexUdf.get) } else { - data.get + df } val batchSuccess = @@ -237,7 +199,7 @@ object Exchange { data.get.show(truncate = false) } if (data.isDefined && !c.dry) { - var df = data.get + var df = filterDf(data.get, edgeConfig.filterConfig) if (edgeConfig.srcVertexUdf.isDefined) { df = dataUdf(df, edgeConfig.srcVertexUdf.get) } @@ -436,4 +398,13 @@ object Exchange { finalColNames.append(concat_ws(sep, oldCols.map(c => col(c)): _*).cast(StringType).as(newCol)) data.select(finalColNames: _*) } + + private[this] def filterDf(data: DataFrame, filter: Option[FilterConfigEntry]): DataFrame = { + data.show() + if (filter.isDefined && filter.get != null && filter.get.filter != null) { + data.filter(filter.get.filter) + } else { + data + } + } } diff --git a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index 6de542ee..bef41bb2 100644 --- a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -10,47 +10,8 @@ import org.apache.spark.sql.{Column, DataFrame, SparkSession} import java.io.File import com.vesoft.exchange.Argument import com.vesoft.exchange.common.{CheckPointHandler, ErrorHandler} -import com.vesoft.exchange.common.config.{ - ClickHouseConfigEntry, - Configs, - DataSourceConfigEntry, - EdgeConfigEntry, - FileBaseSourceConfigEntry, - HBaseSourceConfigEntry, - HiveSourceConfigEntry, - JanusGraphSourceConfigEntry, - JdbcConfigEntry, - KafkaSourceConfigEntry, - MaxComputeConfigEntry, - MySQLSourceConfigEntry, - Neo4JSourceConfigEntry, - OracleConfigEntry, - PostgreSQLSourceConfigEntry, - PulsarSourceConfigEntry, - SchemaConfigEntry, - SinkCategory, - SourceCategory, - TagConfigEntry, - UdfConfigEntry -} -import com.vesoft.nebula.exchange.reader.{ - CSVReader, - ClickhouseReader, - HBaseReader, - HiveReader, - JSONReader, - JanusGraphReader, - JdbcReader, - KafkaReader, - MaxcomputeReader, - MySQLReader, - Neo4JReader, - ORCReader, - OracleReader, - ParquetReader, - PostgreSQLReader, - PulsarReader -} +import com.vesoft.exchange.common.config.{ClickHouseConfigEntry, Configs, DataSourceConfigEntry, EdgeConfigEntry, FileBaseSourceConfigEntry, FilterConfigEntry, HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, JdbcConfigEntry, KafkaSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, OracleConfigEntry, PostgreSQLSourceConfigEntry, PulsarSourceConfigEntry, SchemaConfigEntry, SinkCategory, SourceCategory, TagConfigEntry, UdfConfigEntry} +import com.vesoft.nebula.exchange.reader.{CSVReader, ClickhouseReader, HBaseReader, HiveReader, JSONReader, JanusGraphReader, JdbcReader, KafkaReader, MaxcomputeReader, MySQLReader, Neo4JReader, ORCReader, OracleReader, ParquetReader, PostgreSQLReader, PulsarReader} import com.vesoft.exchange.common.processor.ReloadProcessor import com.vesoft.exchange.common.utils.SparkValidate import com.vesoft.nebula.exchange.processor.{EdgeProcessor, VerticesProcessor} @@ -124,7 +85,7 @@ object Exchange { var totalClientBatchFailure: Long = 0L var totalClientRecordSuccess: Long = 0L var totalClientRecordFailure: Long = 0L - var totalSstRecordSuccess: Long = 0l + var totalSstRecordSuccess: Long = 0L var totalSstRecordFailure: Long = 0L // reload for failed import tasks @@ -170,10 +131,11 @@ object Exchange { data.get.show(truncate = false) } if (data.isDefined && !c.dry) { - val df = if (tagConfig.vertexUdf.isDefined) { - dataUdf(data.get, tagConfig.vertexUdf.get) + var df = filterDf(data.get, tagConfig.filterConfig) + df = if (tagConfig.vertexUdf.isDefined) { + dataUdf(df, tagConfig.vertexUdf.get) } else { - data.get + df } val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.${tagConfig.name}") @@ -236,7 +198,7 @@ object Exchange { data.get.show(truncate = false) } if (data.isDefined && !c.dry) { - var df = data.get + var df = filterDf(data.get, edgeConfig.filterConfig) if (edgeConfig.srcVertexUdf.isDefined) { df = dataUdf(df, edgeConfig.srcVertexUdf.get) } @@ -434,4 +396,13 @@ object Exchange { finalColNames.append(concat_ws(sep, oldCols.map(c => col(c)): _*).cast(StringType).as(newCol)) data.select(finalColNames: _*) } + + + private[this] def filterDf(data: DataFrame, filter: Option[FilterConfigEntry]): DataFrame = { + if (filter.isDefined && filter.get != null && filter.get.filter != null) { + data.filter(filter.get.filter) + } else { + data + } + } }