Skip to content

Commit

Permalink
update exchange kafka (#2167)
Browse files Browse the repository at this point in the history
  • Loading branch information
cooper-lzy authored Jul 13, 2023
1 parent 9a75c32 commit 20911da
Showing 1 changed file with 68 additions and 115 deletions.
183 changes: 68 additions & 115 deletions docs-2.0/nebula-exchange/use-exchange/ex-ug-import-from-kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ For more information, see [Quick start workflow](../../2.quick-start/1.quick-sta
After Exchange is compiled, copy the conf file `target/classes/application.conf` to set Kafka data source configuration. In this example, the copied file is called `kafka_application.conf`. For details on each configuration item, see [Parameters in the configuration file](../parameter-reference/ex-ug-parameter.md).
!!! note
When importing Kafka data, a configuration file can only handle one tag or edge type. If there are multiple tag or edge types, you need to create multiple configuration files.
```conf
{
# Spark configuration
Expand Down Expand Up @@ -195,124 +199,73 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# Flow control, with a rate limit on the maximum offset processed per trigger interval, may not be configured.
# maxOffsetsPerTrigger:10000
}
# Set the information about the Tag Team.
{
name: team
type: {
source: kafka
sink: client
}
service: "127.0.0.1:9092"
topic: "topic_name2"
fields: [key]
nebula.fields: [name]
vertex:{
field:key
}
batch: 10
partition: 10
interval.seconds: 10
startingOffsets: latest
# maxOffsetsPerTrigger:10000
}
]
# Processing edges
edges: [
# Set the information about the Edge Type follow.
{
# The corresponding Edge Type name in NebulaGraph.
name: follow
type: {
# Specify the data source file format to Kafka.
source: kafka
# Specify how to import the Edge type data into NebulaGraph.
# Specify how to import the data into NebulaGraph. Only client is supported.
sink: client
}
# Kafka server address.
service: "127.0.0.1:9092"
# Message category.
topic: "topic_name3"
# Kafka data has a fixed domain name: key, value, topic, partition, offset, timestamp, timestampType.
# If multiple fields need to be specified after Spark reads as DataFrame, separate them with commas.
# Specify the field name in fields. For example, use key for degree in Nebula, as shown in the following.
fields: [key]
nebula.fields: [degree]
# In source, use a column in the topic as the source of the edge's source vertex.
# In target, use a column in the topic as the source of the edge's destination vertex.
source:{
field:timestamp
# udf:{
# separator:"_"
# oldColNames:[field-0,field-1,field-2]
# newColName:new-field
# }
}
target:{
field:offset
# udf:{
# separator:"_"
# oldColNames:[field-0,field-1,field-2]
# newColName:new-field
# }
}
# (Optional) Specify a column as the source of the rank.
#ranking: rank
# The number of data written to NebulaGraph in a single batch.
batch: 10
# The number of Spark partitions.
partition: 10
# The interval for message reading. Unit: second.
interval.seconds: 10
# The consumer offsets. The default value is latest. Optional value are latest and earliest.
startingOffsets: latest
# Flow control, with a rate limit on the maximum offset processed per trigger interval, may not be configured.
# maxOffsetsPerTrigger:10000
}
# Set the information about the Edge Type serve.
{
name: serve
type: {
source: kafka
sink: client
}
service: "127.0.0.1:9092"
topic: "topic_name4"
fields: [timestamp,offset]
nebula.fields: [start_year,end_year]
source:{
field:key
}
target:{
field:value
}
# (Optional) Specify a column as the source of the rank.
#ranking: rank
batch: 10
partition: 10
interval.seconds: 10
startingOffsets: latest
# maxOffsetsPerTrigger:10000
}
]
#edges: [
# # Set the information about the Edge Type follow.
# {
# # The corresponding Edge Type name in NebulaGraph.
# name: follow
# type: {
# # Specify the data source file format to Kafka.
# source: kafka
# # Specify how to import the Edge type data into NebulaGraph.
# # Specify how to import the data into NebulaGraph. Only client is supported.
# sink: client
# }
# # Kafka server address.
# service: "127.0.0.1:9092"
# # Message category.
# topic: "topic_name3"
# # Kafka data has a fixed domain name: key, value, topic, partition, offset, timestamp, timestampType.
# # If multiple fields need to be specified after Spark reads as DataFrame, separate them with commas.
# # Specify the field name in fields. For example, use key for degree in Nebula, as shown in the following.
# fields: [key]
# nebula.fields: [degree]
# # In source, use a column in the topic as the source of the edge's source vertex.
# # In target, use a column in the topic as the source of the edge's destination vertex.
# source:{
# field:timestamp
# # udf:{
# # separator:"_"
# # oldColNames:[field-0,field-1,field-2]
# # newColName:new-field
# # }
# }
# target:{
# field:offset
# # udf:{
# # separator:"_"
# # oldColNames:[field-0,field-1,field-2]
# # newColName:new-field
# # }
# }
# # (Optional) Specify a column as the source of the rank.
# #ranking: rank
# # The number of data written to NebulaGraph in a single batch.
# batch: 10
# # The number of Spark partitions.
# partition: 10
# # The interval for message reading. Unit: second.
# interval.seconds: 10
# # The consumer offsets. The default value is latest. Optional value are latest and earliest.
# startingOffsets: latest
# # Flow control, with a rate limit on the maximum offset processed per trigger interval, may not be configured.
# # maxOffsetsPerTrigger:10000
# }
#]
}
```

Expand Down

0 comments on commit 20911da

Please sign in to comment.