diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala
index 5552c1c..6224f62 100644
--- a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala
+++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala
@@ -187,6 +187,10 @@ case class UdfConfigEntry(sep: String, oldColNames: List[String], newColName: St
}
}
+case class FilterConfigEntry(filter: String){
+ override def toString(): String = s"filter:$filter"
+}
+
/**
*
*/
@@ -467,6 +471,10 @@ object Configs {
Some(UdfConfigEntry(sep, cols, newCol))
} else None
+ val filterConfig = if(tagConfig.hasPath("filter")) {
+ Some(FilterConfigEntry(tagConfig.getString("filter")))
+ } else None
+
LOG.info(s"name ${tagName} batch ${batch}")
val entry = TagConfigEntry(
tagName,
@@ -485,7 +493,8 @@ object Configs {
enableTagless,
ignoreIndex,
deleteEdge,
- vertexUdf
+ vertexUdf,
+ filterConfig
)
LOG.info(s"Tag Config: ${entry}")
tags += entry
@@ -608,6 +617,11 @@ object Configs {
Some(UdfConfigEntry(sep, cols, newCol))
} else None
+
+ val filterConfig = if (edgeConfig.hasPath("filter")) {
+ Some(FilterConfigEntry(edgeConfig.getString("filter")))
+ } else None
+
val entry = EdgeConfigEntry(
edgeName,
sourceConfig,
@@ -631,7 +645,8 @@ object Configs {
repartitionWithNebula,
ignoreIndex,
srcUdf,
- dstUdf
+ dstUdf,
+ filterConfig
)
LOG.info(s"Edge Config: ${entry}")
edges += entry
diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/SchemaConfigs.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/SchemaConfigs.scala
index 0838130..3138e6d 100644
--- a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/SchemaConfigs.scala
+++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/SchemaConfigs.scala
@@ -69,7 +69,8 @@ case class TagConfigEntry(override val name: String,
enableTagless: Boolean = false,
ignoreIndex: Boolean = false,
deleteEdge: Boolean = false,
- vertexUdf: Option[UdfConfigEntry] = None)
+ vertexUdf: Option[UdfConfigEntry] = None,
+ filterConfig: Option[FilterConfigEntry] = None)
extends SchemaConfigEntry {
require(name.trim.nonEmpty, "tag name cannot be empty")
require(vertexField.trim.nonEmpty, "tag vertex id cannot be empty")
@@ -89,7 +90,8 @@ case class TagConfigEntry(override val name: String,
s"repartitionWithNebula: $repartitionWithNebula, " +
s"enableTagless: $enableTagless, " +
s"ignoreIndex: $ignoreIndex, " +
- s"vertexUdf: $vertexUdf."
+ s"vertexUdf: $vertexUdf, " +
+ s"filter: $filterConfig."
}
}
@@ -134,7 +136,8 @@ case class EdgeConfigEntry(override val name: String,
repartitionWithNebula: Boolean = false,
ignoreIndex: Boolean = false,
srcVertexUdf: Option[UdfConfigEntry] = None,
- dstVertexUdf: Option[UdfConfigEntry] = None)
+ dstVertexUdf: Option[UdfConfigEntry] = None,
+ filterConfig: Option[FilterConfigEntry] = None)
extends SchemaConfigEntry {
require(name.trim.nonEmpty, "edge name cannot be empty")
require(sourceField.trim.nonEmpty, "edge source id cannot be empty")
diff --git a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala
index 447c085..fd3f055 100644
--- a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala
+++ b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala
@@ -10,47 +10,8 @@ import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import java.io.File
import com.vesoft.exchange.Argument
import com.vesoft.exchange.common.{CheckPointHandler, ErrorHandler}
-import com.vesoft.exchange.common.config.{
- ClickHouseConfigEntry,
- Configs,
- DataSourceConfigEntry,
- EdgeConfigEntry,
- FileBaseSourceConfigEntry,
- HBaseSourceConfigEntry,
- HiveSourceConfigEntry,
- JanusGraphSourceConfigEntry,
- JdbcConfigEntry,
- KafkaSourceConfigEntry,
- MaxComputeConfigEntry,
- MySQLSourceConfigEntry,
- Neo4JSourceConfigEntry,
- OracleConfigEntry,
- PostgreSQLSourceConfigEntry,
- PulsarSourceConfigEntry,
- SchemaConfigEntry,
- SinkCategory,
- SourceCategory,
- TagConfigEntry,
- UdfConfigEntry
-}
-import com.vesoft.nebula.exchange.reader.{
- CSVReader,
- ClickhouseReader,
- HBaseReader,
- HiveReader,
- JSONReader,
- JanusGraphReader,
- JdbcReader,
- KafkaReader,
- MaxcomputeReader,
- MySQLReader,
- Neo4JReader,
- ORCReader,
- OracleReader,
- ParquetReader,
- PostgreSQLReader,
- PulsarReader
-}
+import com.vesoft.exchange.common.config.{ClickHouseConfigEntry, Configs, DataSourceConfigEntry, EdgeConfigEntry, FileBaseSourceConfigEntry, FilterConfigEntry, HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, JdbcConfigEntry, KafkaSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, OracleConfigEntry, PostgreSQLSourceConfigEntry, PulsarSourceConfigEntry, SchemaConfigEntry, SinkCategory, SourceCategory, TagConfigEntry, UdfConfigEntry}
+import com.vesoft.nebula.exchange.reader.{CSVReader, ClickhouseReader, HBaseReader, HiveReader, JSONReader, JanusGraphReader, JdbcReader, KafkaReader, MaxcomputeReader, MySQLReader, Neo4JReader, ORCReader, OracleReader, ParquetReader, PostgreSQLReader, PulsarReader}
import com.vesoft.exchange.common.processor.ReloadProcessor
import com.vesoft.exchange.common.utils.SparkValidate
import com.vesoft.nebula.exchange.processor.{EdgeProcessor, VerticesProcessor}
@@ -124,7 +85,7 @@ object Exchange {
var totalClientBatchFailure: Long = 0L
var totalClientRecordSuccess: Long = 0L
var totalClientRecordFailure: Long = 0L
- var totalSstRecordSuccess: Long = 0l
+ var totalSstRecordSuccess: Long = 0L
var totalSstRecordFailure: Long = 0L
// reload for failed import tasks
@@ -171,10 +132,11 @@ object Exchange {
data.get.show(truncate = false)
}
if (data.isDefined && !c.dry) {
- val df = if (tagConfig.vertexUdf.isDefined) {
- dataUdf(data.get, tagConfig.vertexUdf.get)
+ var df = filterDf(data.get, tagConfig.filterConfig)
+ df = if (tagConfig.vertexUdf.isDefined) {
+ dataUdf(df, tagConfig.vertexUdf.get)
} else {
- data.get
+ df
}
val batchSuccess =
spark.sparkContext.longAccumulator(s"batchSuccess.${tagConfig.name}")
@@ -238,7 +200,7 @@ object Exchange {
data.get.show(truncate = false)
}
if (data.isDefined && !c.dry) {
- var df = data.get
+ var df = filterDf(data.get, edgeConfig.filterConfig)
if (edgeConfig.srcVertexUdf.isDefined) {
df = dataUdf(df, edgeConfig.srcVertexUdf.get)
}
@@ -437,4 +399,13 @@ object Exchange {
finalColNames.append(concat_ws(sep, oldCols.map(c => col(c)): _*).cast(StringType).as(newCol))
data.select(finalColNames: _*)
}
+
+
+ private[this] def filterDf(data: DataFrame, filter: Option[FilterConfigEntry]): DataFrame = {
+ if (filter.isDefined && filter.get != null && filter.get.filter != null) {
+ data.filter(filter.get.filter)
+ } else {
+ data
+ }
+ }
}
diff --git a/nebula-exchange_spark_2.4/pom.xml b/nebula-exchange_spark_2.4/pom.xml
index ac794e9..18bfec5 100644
--- a/nebula-exchange_spark_2.4/pom.xml
+++ b/nebula-exchange_spark_2.4/pom.xml
@@ -16,7 +16,7 @@
1.8
1.8
2.11.12
- 2.4.4
+ 2.3.2
1.5.0
3.9.2
2.4.5-M1
diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala
index 26622a1..5dc3be2 100644
--- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala
+++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala
@@ -10,47 +10,8 @@ import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import java.io.File
import com.vesoft.exchange.Argument
import com.vesoft.exchange.common.{CheckPointHandler, ErrorHandler}
-import com.vesoft.exchange.common.config.{
- ClickHouseConfigEntry,
- Configs,
- DataSourceConfigEntry,
- EdgeConfigEntry,
- FileBaseSourceConfigEntry,
- HBaseSourceConfigEntry,
- HiveSourceConfigEntry,
- JanusGraphSourceConfigEntry,
- JdbcConfigEntry,
- KafkaSourceConfigEntry,
- MaxComputeConfigEntry,
- MySQLSourceConfigEntry,
- Neo4JSourceConfigEntry,
- OracleConfigEntry,
- PostgreSQLSourceConfigEntry,
- PulsarSourceConfigEntry,
- SchemaConfigEntry,
- SinkCategory,
- SourceCategory,
- TagConfigEntry,
- UdfConfigEntry
-}
-import com.vesoft.nebula.exchange.reader.{
- CSVReader,
- ClickhouseReader,
- HBaseReader,
- HiveReader,
- JSONReader,
- JanusGraphReader,
- JdbcReader,
- KafkaReader,
- MaxcomputeReader,
- MySQLReader,
- Neo4JReader,
- ORCReader,
- OracleReader,
- ParquetReader,
- PostgreSQLReader,
- PulsarReader
-}
+import com.vesoft.exchange.common.config.{ClickHouseConfigEntry, Configs, DataSourceConfigEntry, EdgeConfigEntry, FileBaseSourceConfigEntry, FilterConfigEntry, HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, JdbcConfigEntry, KafkaSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, OracleConfigEntry, PostgreSQLSourceConfigEntry, PulsarSourceConfigEntry, SchemaConfigEntry, SinkCategory, SourceCategory, TagConfigEntry, UdfConfigEntry}
+import com.vesoft.nebula.exchange.reader.{CSVReader, ClickhouseReader, HBaseReader, HiveReader, JSONReader, JanusGraphReader, JdbcReader, KafkaReader, MaxcomputeReader, MySQLReader, Neo4JReader, ORCReader, OracleReader, ParquetReader, PostgreSQLReader, PulsarReader}
import com.vesoft.exchange.common.processor.ReloadProcessor
import com.vesoft.exchange.common.utils.SparkValidate
import com.vesoft.nebula.exchange.processor.{EdgeProcessor, VerticesProcessor}
@@ -118,13 +79,13 @@ object Exchange {
val spark = session.getOrCreate()
// check the spark version
- SparkValidate.validate(spark.version, "2.4.*")
+ SparkValidate.validate(spark.version, "2.3.*")
val startTime = System.currentTimeMillis()
var totalClientBatchSuccess: Long = 0L
var totalClientBatchFailure: Long = 0L
var totalClientRecordSuccess: Long = 0L
var totalClientRecordFailure: Long = 0L
- var totalSstRecordSuccess: Long = 0l
+ var totalSstRecordSuccess: Long = 0L
var totalSstRecordFailure: Long = 0L
// reload for failed import tasks
@@ -170,10 +131,11 @@ object Exchange {
data.get.show(truncate = false)
}
if (data.isDefined && !c.dry) {
- val df = if (tagConfig.vertexUdf.isDefined) {
- dataUdf(data.get, tagConfig.vertexUdf.get)
+ var df = filterDf(data.get, tagConfig.filterConfig)
+ df = if (tagConfig.vertexUdf.isDefined) {
+ dataUdf(df, tagConfig.vertexUdf.get)
} else {
- data.get
+ df
}
val batchSuccess =
@@ -237,7 +199,7 @@ object Exchange {
data.get.show(truncate = false)
}
if (data.isDefined && !c.dry) {
- var df = data.get
+ var df = filterDf(data.get, edgeConfig.filterConfig)
if (edgeConfig.srcVertexUdf.isDefined) {
df = dataUdf(df, edgeConfig.srcVertexUdf.get)
}
@@ -436,4 +398,13 @@ object Exchange {
finalColNames.append(concat_ws(sep, oldCols.map(c => col(c)): _*).cast(StringType).as(newCol))
data.select(finalColNames: _*)
}
+
+ private[this] def filterDf(data: DataFrame, filter: Option[FilterConfigEntry]): DataFrame = {
+ data.show()
+ if (filter.isDefined && filter.get != null && filter.get.filter != null) {
+ data.filter(filter.get.filter)
+ } else {
+ data
+ }
+ }
}
diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala
index 35c08cb..de8f81a 100644
--- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala
+++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala
@@ -163,22 +163,7 @@ class EdgeProcessor(spark: SparkSession,
}(Encoders.kryo[Edge])
// streaming write
- if (streamFlag) {
- val streamingDataSourceConfig =
- edgeConfig.dataSourceConfigEntry.asInstanceOf[StreamingDataSourceConfigEntry]
- val wStream = edgeFrame.writeStream
- if (edgeConfig.checkPointPath.isDefined)
- wStream.option("checkpointLocation", edgeConfig.checkPointPath.get)
-
- wStream
- .foreachBatch((edges, batchId) => {
- LOG.info(s">>>>> ${edgeConfig.name} edge start batch ${batchId}.")
- edges.foreachPartition(processEachPartition _)
- })
- .trigger(Trigger.ProcessingTime(s"${streamingDataSourceConfig.intervalSeconds} seconds"))
- .start()
- .awaitTermination()
- } else
+ if (streamFlag) {} else
edgeFrame.foreachPartition(processEachPartition _)
}
}
diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala
index a79f695..7ce3f9d 100644
--- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala
+++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala
@@ -174,22 +174,7 @@ class VerticesProcessor(spark: SparkSession,
}(Encoders.kryo[Vertex])
// streaming write
- if (streamFlag) {
- val streamingDataSourceConfig =
- tagConfig.dataSourceConfigEntry.asInstanceOf[StreamingDataSourceConfigEntry]
- val wStream = vertices.writeStream
- if (tagConfig.checkPointPath.isDefined)
- wStream.option("checkpointLocation", tagConfig.checkPointPath.get)
-
- wStream
- .foreachBatch((vertexSet, batchId) => {
- LOG.info(s">>>>> ${tagConfig.name} tag start batch ${batchId}.")
- vertexSet.foreachPartition(processEachPartition _)
- })
- .trigger(Trigger.ProcessingTime(s"${streamingDataSourceConfig.intervalSeconds} seconds"))
- .start()
- .awaitTermination()
- } else
+ if (streamFlag) {} else
vertices.foreachPartition(processEachPartition _)
}
}
diff --git a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala
index 6de542e..bef41bb 100644
--- a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala
+++ b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala
@@ -10,47 +10,8 @@ import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import java.io.File
import com.vesoft.exchange.Argument
import com.vesoft.exchange.common.{CheckPointHandler, ErrorHandler}
-import com.vesoft.exchange.common.config.{
- ClickHouseConfigEntry,
- Configs,
- DataSourceConfigEntry,
- EdgeConfigEntry,
- FileBaseSourceConfigEntry,
- HBaseSourceConfigEntry,
- HiveSourceConfigEntry,
- JanusGraphSourceConfigEntry,
- JdbcConfigEntry,
- KafkaSourceConfigEntry,
- MaxComputeConfigEntry,
- MySQLSourceConfigEntry,
- Neo4JSourceConfigEntry,
- OracleConfigEntry,
- PostgreSQLSourceConfigEntry,
- PulsarSourceConfigEntry,
- SchemaConfigEntry,
- SinkCategory,
- SourceCategory,
- TagConfigEntry,
- UdfConfigEntry
-}
-import com.vesoft.nebula.exchange.reader.{
- CSVReader,
- ClickhouseReader,
- HBaseReader,
- HiveReader,
- JSONReader,
- JanusGraphReader,
- JdbcReader,
- KafkaReader,
- MaxcomputeReader,
- MySQLReader,
- Neo4JReader,
- ORCReader,
- OracleReader,
- ParquetReader,
- PostgreSQLReader,
- PulsarReader
-}
+import com.vesoft.exchange.common.config.{ClickHouseConfigEntry, Configs, DataSourceConfigEntry, EdgeConfigEntry, FileBaseSourceConfigEntry, FilterConfigEntry, HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, JdbcConfigEntry, KafkaSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, OracleConfigEntry, PostgreSQLSourceConfigEntry, PulsarSourceConfigEntry, SchemaConfigEntry, SinkCategory, SourceCategory, TagConfigEntry, UdfConfigEntry}
+import com.vesoft.nebula.exchange.reader.{CSVReader, ClickhouseReader, HBaseReader, HiveReader, JSONReader, JanusGraphReader, JdbcReader, KafkaReader, MaxcomputeReader, MySQLReader, Neo4JReader, ORCReader, OracleReader, ParquetReader, PostgreSQLReader, PulsarReader}
import com.vesoft.exchange.common.processor.ReloadProcessor
import com.vesoft.exchange.common.utils.SparkValidate
import com.vesoft.nebula.exchange.processor.{EdgeProcessor, VerticesProcessor}
@@ -124,7 +85,7 @@ object Exchange {
var totalClientBatchFailure: Long = 0L
var totalClientRecordSuccess: Long = 0L
var totalClientRecordFailure: Long = 0L
- var totalSstRecordSuccess: Long = 0l
+ var totalSstRecordSuccess: Long = 0L
var totalSstRecordFailure: Long = 0L
// reload for failed import tasks
@@ -170,10 +131,11 @@ object Exchange {
data.get.show(truncate = false)
}
if (data.isDefined && !c.dry) {
- val df = if (tagConfig.vertexUdf.isDefined) {
- dataUdf(data.get, tagConfig.vertexUdf.get)
+ var df = filterDf(data.get, tagConfig.filterConfig)
+ df = if (tagConfig.vertexUdf.isDefined) {
+ dataUdf(df, tagConfig.vertexUdf.get)
} else {
- data.get
+ df
}
val batchSuccess =
spark.sparkContext.longAccumulator(s"batchSuccess.${tagConfig.name}")
@@ -236,7 +198,7 @@ object Exchange {
data.get.show(truncate = false)
}
if (data.isDefined && !c.dry) {
- var df = data.get
+ var df = filterDf(data.get, edgeConfig.filterConfig)
if (edgeConfig.srcVertexUdf.isDefined) {
df = dataUdf(df, edgeConfig.srcVertexUdf.get)
}
@@ -434,4 +396,13 @@ object Exchange {
finalColNames.append(concat_ws(sep, oldCols.map(c => col(c)): _*).cast(StringType).as(newCol))
data.select(finalColNames: _*)
}
+
+
+ private[this] def filterDf(data: DataFrame, filter: Option[FilterConfigEntry]): DataFrame = {
+ if (filter.isDefined && filter.get != null && filter.get.filter != null) {
+ data.filter(filter.get.filter)
+ } else {
+ data
+ }
+ }
}
diff --git a/pom.xml b/pom.xml
index 6aecd19..3cd4834 100644
--- a/pom.xml
+++ b/pom.xml
@@ -243,7 +243,7 @@
spark-2.4
- 2.4.4
+ 2.3.2
true