diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala
index 5552c1c..6224f62 100644
--- a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala
+++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala
@@ -187,6 +187,10 @@ case class UdfConfigEntry(sep: String, oldColNames: List[String], newColName: St
   }
 }
 
+case class FilterConfigEntry(filter: String){
+  override def toString(): String = s"filter:$filter"
+}
+
 /**
   *
   */
@@ -467,6 +471,10 @@ object Configs {
           Some(UdfConfigEntry(sep, cols, newCol))
         } else None
 
+        val filterConfig = if(tagConfig.hasPath("filter")) {
+          Some(FilterConfigEntry(tagConfig.getString("filter")))
+        } else None
+
         LOG.info(s"name ${tagName}  batch ${batch}")
         val entry = TagConfigEntry(
           tagName,
@@ -485,7 +493,8 @@ object Configs {
           enableTagless,
           ignoreIndex,
           deleteEdge,
-          vertexUdf
+          vertexUdf,
+          filterConfig
         )
         LOG.info(s"Tag Config: ${entry}")
         tags += entry
@@ -608,6 +617,11 @@ object Configs {
           Some(UdfConfigEntry(sep, cols, newCol))
         } else None
 
+
+        val filterConfig = if (edgeConfig.hasPath("filter")) {
+          Some(FilterConfigEntry(edgeConfig.getString("filter")))
+        } else None
+
         val entry = EdgeConfigEntry(
           edgeName,
           sourceConfig,
@@ -631,7 +645,8 @@ object Configs {
           repartitionWithNebula,
           ignoreIndex,
           srcUdf,
-          dstUdf
+          dstUdf,
+          filterConfig
         )
         LOG.info(s"Edge Config: ${entry}")
         edges += entry
diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/SchemaConfigs.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/SchemaConfigs.scala
index 0838130..3138e6d 100644
--- a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/SchemaConfigs.scala
+++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/SchemaConfigs.scala
@@ -69,7 +69,8 @@ case class TagConfigEntry(override val name: String,
                           enableTagless: Boolean = false,
                           ignoreIndex: Boolean = false,
                           deleteEdge: Boolean = false,
-                          vertexUdf: Option[UdfConfigEntry] = None)
+                          vertexUdf: Option[UdfConfigEntry] = None,
+                          filterConfig: Option[FilterConfigEntry] = None)
     extends SchemaConfigEntry {
   require(name.trim.nonEmpty, "tag name cannot be empty")
   require(vertexField.trim.nonEmpty, "tag vertex id cannot be empty")
@@ -89,7 +90,8 @@ case class TagConfigEntry(override val name: String,
       s"repartitionWithNebula: $repartitionWithNebula, " +
       s"enableTagless: $enableTagless, " +
       s"ignoreIndex: $ignoreIndex, " +
-      s"vertexUdf: $vertexUdf."
+      s"vertexUdf: $vertexUdf, " +
+      s"filter: $filterConfig."
   }
 }
 
@@ -134,7 +136,8 @@ case class EdgeConfigEntry(override val name: String,
                            repartitionWithNebula: Boolean = false,
                            ignoreIndex: Boolean = false,
                            srcVertexUdf: Option[UdfConfigEntry] = None,
-                           dstVertexUdf: Option[UdfConfigEntry] = None)
+                           dstVertexUdf: Option[UdfConfigEntry] = None,
+                           filterConfig: Option[FilterConfigEntry] = None)
     extends SchemaConfigEntry {
   require(name.trim.nonEmpty, "edge name cannot be empty")
   require(sourceField.trim.nonEmpty, "edge source id cannot be empty")
diff --git a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala
index 447c085..fd3f055 100644
--- a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala
+++ b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala
@@ -10,47 +10,8 @@ import org.apache.spark.sql.{Column, DataFrame, SparkSession}
 import java.io.File
 import com.vesoft.exchange.Argument
 import com.vesoft.exchange.common.{CheckPointHandler, ErrorHandler}
-import com.vesoft.exchange.common.config.{
-  ClickHouseConfigEntry,
-  Configs,
-  DataSourceConfigEntry,
-  EdgeConfigEntry,
-  FileBaseSourceConfigEntry,
-  HBaseSourceConfigEntry,
-  HiveSourceConfigEntry,
-  JanusGraphSourceConfigEntry,
-  JdbcConfigEntry,
-  KafkaSourceConfigEntry,
-  MaxComputeConfigEntry,
-  MySQLSourceConfigEntry,
-  Neo4JSourceConfigEntry,
-  OracleConfigEntry,
-  PostgreSQLSourceConfigEntry,
-  PulsarSourceConfigEntry,
-  SchemaConfigEntry,
-  SinkCategory,
-  SourceCategory,
-  TagConfigEntry,
-  UdfConfigEntry
-}
-import com.vesoft.nebula.exchange.reader.{
-  CSVReader,
-  ClickhouseReader,
-  HBaseReader,
-  HiveReader,
-  JSONReader,
-  JanusGraphReader,
-  JdbcReader,
-  KafkaReader,
-  MaxcomputeReader,
-  MySQLReader,
-  Neo4JReader,
-  ORCReader,
-  OracleReader,
-  ParquetReader,
-  PostgreSQLReader,
-  PulsarReader
-}
+import com.vesoft.exchange.common.config.{ClickHouseConfigEntry, Configs, DataSourceConfigEntry, EdgeConfigEntry, FileBaseSourceConfigEntry, FilterConfigEntry, HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, JdbcConfigEntry, KafkaSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, OracleConfigEntry, PostgreSQLSourceConfigEntry, PulsarSourceConfigEntry, SchemaConfigEntry, SinkCategory, SourceCategory, TagConfigEntry, UdfConfigEntry}
+import com.vesoft.nebula.exchange.reader.{CSVReader, ClickhouseReader, HBaseReader, HiveReader, JSONReader, JanusGraphReader, JdbcReader, KafkaReader, MaxcomputeReader, MySQLReader, Neo4JReader, ORCReader, OracleReader, ParquetReader, PostgreSQLReader, PulsarReader}
 import com.vesoft.exchange.common.processor.ReloadProcessor
 import com.vesoft.exchange.common.utils.SparkValidate
 import com.vesoft.nebula.exchange.processor.{EdgeProcessor, VerticesProcessor}
@@ -124,7 +85,7 @@ object Exchange {
     var totalClientBatchFailure: Long  = 0L
     var totalClientRecordSuccess: Long = 0L
     var totalClientRecordFailure: Long = 0L
-    var totalSstRecordSuccess: Long    = 0l
+    var totalSstRecordSuccess: Long    = 0L
     var totalSstRecordFailure: Long    = 0L
 
     // reload for failed import tasks
@@ -171,10 +132,11 @@ object Exchange {
           data.get.show(truncate = false)
         }
         if (data.isDefined && !c.dry) {
-          val df = if (tagConfig.vertexUdf.isDefined) {
-            dataUdf(data.get, tagConfig.vertexUdf.get)
+          var df = filterDf(data.get, tagConfig.filterConfig)
+          df = if (tagConfig.vertexUdf.isDefined) {
+            dataUdf(df, tagConfig.vertexUdf.get)
           } else {
-            data.get
+            df
           }
           val batchSuccess =
             spark.sparkContext.longAccumulator(s"batchSuccess.${tagConfig.name}")
@@ -238,7 +200,7 @@ object Exchange {
           data.get.show(truncate = false)
         }
         if (data.isDefined && !c.dry) {
-          var df = data.get
+          var df = filterDf(data.get, edgeConfig.filterConfig)
           if (edgeConfig.srcVertexUdf.isDefined) {
             df = dataUdf(df, edgeConfig.srcVertexUdf.get)
           }
@@ -437,4 +399,13 @@ object Exchange {
     finalColNames.append(concat_ws(sep, oldCols.map(c => col(c)): _*).cast(StringType).as(newCol))
     data.select(finalColNames: _*)
   }
+
+
+  private[this] def filterDf(data: DataFrame, filter: Option[FilterConfigEntry]): DataFrame = {
+    if (filter.isDefined && filter.get != null && filter.get.filter != null) {
+      data.filter(filter.get.filter)
+    } else {
+      data
+    }
+  }
 }
diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala
index 26622a1..ecc3374 100644
--- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala
+++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala
@@ -10,47 +10,8 @@ import org.apache.spark.sql.{Column, DataFrame, SparkSession}
 import java.io.File
 import com.vesoft.exchange.Argument
 import com.vesoft.exchange.common.{CheckPointHandler, ErrorHandler}
-import com.vesoft.exchange.common.config.{
-  ClickHouseConfigEntry,
-  Configs,
-  DataSourceConfigEntry,
-  EdgeConfigEntry,
-  FileBaseSourceConfigEntry,
-  HBaseSourceConfigEntry,
-  HiveSourceConfigEntry,
-  JanusGraphSourceConfigEntry,
-  JdbcConfigEntry,
-  KafkaSourceConfigEntry,
-  MaxComputeConfigEntry,
-  MySQLSourceConfigEntry,
-  Neo4JSourceConfigEntry,
-  OracleConfigEntry,
-  PostgreSQLSourceConfigEntry,
-  PulsarSourceConfigEntry,
-  SchemaConfigEntry,
-  SinkCategory,
-  SourceCategory,
-  TagConfigEntry,
-  UdfConfigEntry
-}
-import com.vesoft.nebula.exchange.reader.{
-  CSVReader,
-  ClickhouseReader,
-  HBaseReader,
-  HiveReader,
-  JSONReader,
-  JanusGraphReader,
-  JdbcReader,
-  KafkaReader,
-  MaxcomputeReader,
-  MySQLReader,
-  Neo4JReader,
-  ORCReader,
-  OracleReader,
-  ParquetReader,
-  PostgreSQLReader,
-  PulsarReader
-}
+import com.vesoft.exchange.common.config.{ClickHouseConfigEntry, Configs, DataSourceConfigEntry, EdgeConfigEntry, FileBaseSourceConfigEntry, FilterConfigEntry, HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, JdbcConfigEntry, KafkaSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, OracleConfigEntry, PostgreSQLSourceConfigEntry, PulsarSourceConfigEntry, SchemaConfigEntry, SinkCategory, SourceCategory, TagConfigEntry, UdfConfigEntry}
+import com.vesoft.nebula.exchange.reader.{CSVReader, ClickhouseReader, HBaseReader, HiveReader, JSONReader, JanusGraphReader, JdbcReader, KafkaReader, MaxcomputeReader, MySQLReader, Neo4JReader, ORCReader, OracleReader, ParquetReader, PostgreSQLReader, PulsarReader}
 import com.vesoft.exchange.common.processor.ReloadProcessor
 import com.vesoft.exchange.common.utils.SparkValidate
 import com.vesoft.nebula.exchange.processor.{EdgeProcessor, VerticesProcessor}
@@ -124,7 +85,7 @@ object Exchange {
     var totalClientBatchFailure: Long  = 0L
     var totalClientRecordSuccess: Long = 0L
     var totalClientRecordFailure: Long = 0L
-    var totalSstRecordSuccess: Long    = 0l
+    var totalSstRecordSuccess: Long    = 0L
     var totalSstRecordFailure: Long    = 0L
 
     // reload for failed import tasks
@@ -170,10 +131,11 @@ object Exchange {
           data.get.show(truncate = false)
         }
         if (data.isDefined && !c.dry) {
-          val df = if (tagConfig.vertexUdf.isDefined) {
-            dataUdf(data.get, tagConfig.vertexUdf.get)
+          var df = filterDf(data.get, tagConfig.filterConfig)
+          df = if (tagConfig.vertexUdf.isDefined) {
+            dataUdf(df, tagConfig.vertexUdf.get)
           } else {
-            data.get
+            df
           }
 
           val batchSuccess =
@@ -237,7 +199,7 @@ object Exchange {
           data.get.show(truncate = false)
         }
         if (data.isDefined && !c.dry) {
-          var df = data.get
+          var df = filterDf(data.get, edgeConfig.filterConfig)
           if (edgeConfig.srcVertexUdf.isDefined) {
             df = dataUdf(df, edgeConfig.srcVertexUdf.get)
           }
@@ -436,4 +398,13 @@ object Exchange {
     finalColNames.append(concat_ws(sep, oldCols.map(c => col(c)): _*).cast(StringType).as(newCol))
     data.select(finalColNames: _*)
   }
+
+  private[this] def filterDf(data: DataFrame, filter: Option[FilterConfigEntry]): DataFrame = {
+    data.show()
+    if (filter.isDefined && filter.get != null && filter.get.filter != null) {
+      data.filter(filter.get.filter)
+    } else {
+      data
+    }
+  }
 }
diff --git a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala
index 6de542e..bef41bb 100644
--- a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala
+++ b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala
@@ -10,47 +10,8 @@ import org.apache.spark.sql.{Column, DataFrame, SparkSession}
 import java.io.File
 import com.vesoft.exchange.Argument
 import com.vesoft.exchange.common.{CheckPointHandler, ErrorHandler}
-import com.vesoft.exchange.common.config.{
-  ClickHouseConfigEntry,
-  Configs,
-  DataSourceConfigEntry,
-  EdgeConfigEntry,
-  FileBaseSourceConfigEntry,
-  HBaseSourceConfigEntry,
-  HiveSourceConfigEntry,
-  JanusGraphSourceConfigEntry,
-  JdbcConfigEntry,
-  KafkaSourceConfigEntry,
-  MaxComputeConfigEntry,
-  MySQLSourceConfigEntry,
-  Neo4JSourceConfigEntry,
-  OracleConfigEntry,
-  PostgreSQLSourceConfigEntry,
-  PulsarSourceConfigEntry,
-  SchemaConfigEntry,
-  SinkCategory,
-  SourceCategory,
-  TagConfigEntry,
-  UdfConfigEntry
-}
-import com.vesoft.nebula.exchange.reader.{
-  CSVReader,
-  ClickhouseReader,
-  HBaseReader,
-  HiveReader,
-  JSONReader,
-  JanusGraphReader,
-  JdbcReader,
-  KafkaReader,
-  MaxcomputeReader,
-  MySQLReader,
-  Neo4JReader,
-  ORCReader,
-  OracleReader,
-  ParquetReader,
-  PostgreSQLReader,
-  PulsarReader
-}
+import com.vesoft.exchange.common.config.{ClickHouseConfigEntry, Configs, DataSourceConfigEntry, EdgeConfigEntry, FileBaseSourceConfigEntry, FilterConfigEntry, HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, JdbcConfigEntry, KafkaSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, OracleConfigEntry, PostgreSQLSourceConfigEntry, PulsarSourceConfigEntry, SchemaConfigEntry, SinkCategory, SourceCategory, TagConfigEntry, UdfConfigEntry}
+import com.vesoft.nebula.exchange.reader.{CSVReader, ClickhouseReader, HBaseReader, HiveReader, JSONReader, JanusGraphReader, JdbcReader, KafkaReader, MaxcomputeReader, MySQLReader, Neo4JReader, ORCReader, OracleReader, ParquetReader, PostgreSQLReader, PulsarReader}
 import com.vesoft.exchange.common.processor.ReloadProcessor
 import com.vesoft.exchange.common.utils.SparkValidate
 import com.vesoft.nebula.exchange.processor.{EdgeProcessor, VerticesProcessor}
@@ -124,7 +85,7 @@ object Exchange {
     var totalClientBatchFailure: Long  = 0L
     var totalClientRecordSuccess: Long = 0L
     var totalClientRecordFailure: Long = 0L
-    var totalSstRecordSuccess: Long    = 0l
+    var totalSstRecordSuccess: Long    = 0L
     var totalSstRecordFailure: Long    = 0L
 
     // reload for failed import tasks
@@ -170,10 +131,11 @@ object Exchange {
           data.get.show(truncate = false)
         }
         if (data.isDefined && !c.dry) {
-          val df = if (tagConfig.vertexUdf.isDefined) {
-            dataUdf(data.get, tagConfig.vertexUdf.get)
+          var df = filterDf(data.get, tagConfig.filterConfig)
+          df = if (tagConfig.vertexUdf.isDefined) {
+            dataUdf(df, tagConfig.vertexUdf.get)
           } else {
-            data.get
+            df
           }
           val batchSuccess =
             spark.sparkContext.longAccumulator(s"batchSuccess.${tagConfig.name}")
@@ -236,7 +198,7 @@ object Exchange {
           data.get.show(truncate = false)
         }
         if (data.isDefined && !c.dry) {
-          var df = data.get
+          var df = filterDf(data.get, edgeConfig.filterConfig)
           if (edgeConfig.srcVertexUdf.isDefined) {
             df = dataUdf(df, edgeConfig.srcVertexUdf.get)
           }
@@ -434,4 +396,13 @@ object Exchange {
     finalColNames.append(concat_ws(sep, oldCols.map(c => col(c)): _*).cast(StringType).as(newCol))
     data.select(finalColNames: _*)
   }
+
+
+  private[this] def filterDf(data: DataFrame, filter: Option[FilterConfigEntry]): DataFrame = {
+    if (filter.isDefined && filter.get != null && filter.get.filter != null) {
+      data.filter(filter.get.filter)
+    } else {
+      data
+    }
+  }
 }