Skip to content

Commit

Permalink
exchange supports filter (#467) (#2548)
Browse files Browse the repository at this point in the history
  • Loading branch information
cooper-lzy authored Apr 26, 2024
1 parent c0abd6c commit 924040b
Show file tree
Hide file tree
Showing 32 changed files with 174 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ For different data sources, the vertex configurations are different. There are m
|`tags.vertex.policy`|string|-|No|Supports only the value `hash`. Performs hashing operations on VIDs of type string.|
|`tags.batch`|int|`256`|Yes|The maximum number of vertices written into NebulaGraph in a single batch.|
|`tags.partition`|int|`32`|Yes|The number of partitions to be created when the data is written to {{nebula.name}}. If `tags.partition ≤ 1`, the number of partitions to be created in {{nebula.name}} is the same as that in the data source.|
|`tags.filter`|string|-|No|The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}. For information about filtering formats, see [Dataset](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Dataset.html#filter(conditionExpr:String):org.apache.spark.sql.Dataset[T]).|


#### Specific parameters of Parquet/JSON/ORC data sources

Expand Down Expand Up @@ -303,6 +305,7 @@ For the specific parameters of different data sources for edge configurations, p
|`edges.ranking`|int|-|No|The column of rank values. If not specified, all rank values are `0` by default.|
|`edges.batch`|int|`256`|Yes|The maximum number of edges written into NebulaGraph in a single batch.|
|`edges.partition`|int|`32`|Yes|The number of partitions to be created when the data is written to {{nebula.name}}. If `edges.partition ≤ 1`, the number of partitions to be created in {{nebula.name}} is the same as that in the data source.|
|`edges.filter`|string|-|No|The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}. For information about filtering formats, see [Dataset](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Dataset.html#filter(conditionExpr:String):org.apache.spark.sql.Dataset[T]).|

#### Specific parameters for generating SST files

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# policy:hash
}
# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"
# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT
Expand Down Expand Up @@ -276,6 +279,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# (Optional) Specify a column as the source of the rank.
#ranking: rank
# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"
# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# If the CSV file does not have a header, set the header to false. The default value is false.
header: false
# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"
# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT
Expand Down Expand Up @@ -308,6 +311,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# If the CSV file does not have a header, set the header to false. The default value is false.
header: false
# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"
# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# policy:hash
}
# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"
# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT
Expand Down Expand Up @@ -305,6 +308,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# (Optional) Specify a column as the source of the rank.
#ranking: rank
# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"
# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# policy:hash
}
# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"
# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT
Expand Down Expand Up @@ -341,6 +344,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# (Optional) Specify a column as the source of the rank.
#ranking: rank
# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"
# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# policy:hash
}
# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"
# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT
Expand Down Expand Up @@ -360,6 +363,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# (Optional) Specify a column as the source of the rank.
#ranking: rank
# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"
# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# policy:hash
}
# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"
# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT
Expand Down Expand Up @@ -322,6 +325,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# (Optional) Specify a column as the source of the rank.
#ranking: rank
# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"
# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# policy:hash
}
# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"
# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# policy:hash
}
# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"
# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT
Expand Down Expand Up @@ -300,6 +303,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# (Optional) Specify a column as the source of the rank.
#ranking: rank
# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"
# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# policy:hash
}
# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"
# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# policy:hash
}
# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"
# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# policy:hash
}
# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"
# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT
Expand Down Expand Up @@ -333,6 +336,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# (Optional) Specify a column as the source of the rank.
#ranking: rank
# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"
# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# policy:hash
}
# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"
# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# policy:hash
}
# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"
# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT
Expand Down Expand Up @@ -287,6 +290,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# (Optional) Specify a column as the source of the rank.
#ranking: rank
# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"
# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# policy:hash
}
# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"
# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT
Expand Down Expand Up @@ -280,6 +283,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# (Optional) Specify a column as the source of the rank.
#ranking: rank
# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"
# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# If the CSV file does not have a header, set the header to false. The default value is false.
header: false
# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"
# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT
Expand Down Expand Up @@ -373,6 +376,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# If the CSV file does not have a header, set the header to false. The default value is false.
header: false
# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"
# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,10 @@ check: the real password decrypted by private key and encrypted password is: neb
|`tags.vertex.udf.newColName`|string|-||通过自定义规则合并多列,该参数指定新列的列名。|
|`tags.vertex.prefix`|string|-||为 VID 增加指定的前缀。例如 VID 为`12345`,增加前缀`tag1`后为`tag1_12345`。下划线无法修改。|
|`tags.vertex.policy`|string|-||仅支持取值`hash`。对 string 类型的 VID 进行哈希化操作。|
|`tags.batch`|int|`256`||单批次写入 {{nebula.name}} 的最大点数量。|
|`tags.partition`|int|`32`||数据写入 {{nebula.name}} 时需要创建的分区数。如果`tags.partition ≤ 1`,在 {{nebula.name}} 中创建的分区数和数据源的分区数相同。|
|`tags.batch`|int|`256`||单批次写入{{nebula.name}}的最大点数量。|
|`tags.partition`|int|`32`||数据写入{{nebula.name}}时需要创建的分区数。如果`tags.partition ≤ 1`,在{{nebula.name}}中创建的分区数和数据源的分区数相同。|
|`tags.filter`|string|-||过滤规则。符合过滤规则的数据会被导入{{nebula.name}}。格式请参见[Dataset](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Dataset.html#filter(conditionExpr:String):org.apache.spark.sql.Dataset[T])|


#### Parquet/JSON/ORC 源特有参数

Expand Down Expand Up @@ -304,8 +306,10 @@ check: the real password decrypted by private key and encrypted password is: neb
|`edges.target.prefix`|string|-||为 VID 增加指定的前缀。例如 VID 为`12345`,增加前缀`tag1`后为`tag1_12345`。下划线无法修改。|
|`edges.target.policy`|string|-||仅支持取值`hash`。对 string 类型的 VID 进行哈希化操作。|
|`edges.ranking`|int|-||rank 值的列。没有指定时,默认所有 rank 值为`0`|
|`edges.batch`|int|`256`||单批次写入 {{nebula.name}} 的最大边数量。|
|`edges.partition`|int|`32`||数据写入 {{nebula.name}} 时需要创建的分区数。如果`edges.partition ≤ 1`,在 {{nebula.name}} 中创建的分区数和数据源的分区数相同。|
|`edges.batch`|int|`256`||单批次写入{{nebula.name}}的最大边数量。|
|`edges.partition`|int|`32`||数据写入{{nebula.name}}时需要创建的分区数。如果`edges.partition ≤ 1`,在{{nebula.name}}中创建的分区数和数据源的分区数相同。|
|`edges.filter`|string|-||过滤规则。符合过滤规则的数据会被导入{{nebula.name}}。格式请参见[Dataset](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Dataset.html#filter(conditionExpr:String):org.apache.spark.sql.Dataset[T])|


#### 生成 SST 时的特有参数

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@
# policy:hash
}
# 过滤规则。符合过滤规则的数据会被导入{{nebula.name}}。
# filter: "name='Tom'"
# 批量操作类型,包括 INSERT、UPDATE 和 DELETE。默认为 INSERT。
#writeMode: INSERT
Expand Down Expand Up @@ -281,6 +284,9 @@
# 指定一个列作为 rank 的源(可选)。
#ranking: rank
# 过滤规则。符合过滤规则的数据会被导入{{nebula.name}}。
# filter: "name='Tom'"
# 批量操作类型,包括 INSERT、UPDATE 和 DELETE。默认为 INSERT。
#writeMode: INSERT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@
# 如果 CSV 文件没有表头,请将 header 设置为 false。默认值为 false。
header: false
# 过滤规则。符合过滤规则的数据会被导入{{nebula.name}}。
# filter: "name='Tom'"
# 批量操作类型,包括 INSERT、UPDATE 和 DELETE。默认为 INSERT。
#writeMode: INSERT
Expand Down Expand Up @@ -305,6 +308,9 @@
# 如果 CSV 文件没有表头,请将 header 设置为 false。默认值为 false。
header: false
# 过滤规则。符合过滤规则的数据会被导入{{nebula.name}}。
# filter: "name='Tom'"
# 批量操作类型,包括 INSERT、UPDATE 和 DELETE。默认为 INSERT。
#writeMode: INSERT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,9 @@ ROW COLUMN+CELL
# policy:hash
}
# 过滤规则。符合过滤规则的数据会被导入{{nebula.name}}。
# filter: "name='Tom'"
# 批量操作类型,包括 INSERT、UPDATE 和 DELETE。默认为 INSERT。
#writeMode: INSERT
Expand Down Expand Up @@ -302,6 +305,9 @@ ROW COLUMN+CELL
# 指定一个列作为 rank 的源(可选)。
#ranking: rank
# 过滤规则。符合过滤规则的数据会被导入{{nebula.name}}。
# filter: "name='Tom'"
# 批量操作类型,包括 INSERT、UPDATE 和 DELETE。默认为 INSERT。
#writeMode: INSERT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,9 @@ scala> sql("select playerid, teamid, start_year, end_year from basketball.serve"
# policy:hash
}
# 过滤规则。符合过滤规则的数据会被导入{{nebula.name}}。
# filter: "name='Tom'"
# 批量操作类型,包括 INSERT、UPDATE 和 DELETE。默认为 INSERT。
#writeMode: INSERT
Expand Down Expand Up @@ -336,6 +339,9 @@ scala> sql("select playerid, teamid, start_year, end_year from basketball.serve"
# 指定一个列作为 rank 的源(可选)。
#ranking: rank
# 过滤规则。符合过滤规则的数据会被导入{{nebula.name}}。
# filter: "name='Tom'"
# 批量操作类型,包括 INSERT、UPDATE 和 DELETE。默认为 INSERT。
#writeMode: INSERT
Expand Down
Loading

0 comments on commit 924040b

Please sign in to comment.