From 20911dadb29d378921e6fa3b18f2e0de706657bb Mon Sep 17 00:00:00 2001 From: cooper-lzy <78672629+cooper-lzy@users.noreply.github.com> Date: Thu, 13 Jul 2023 10:55:22 +0800 Subject: [PATCH] update exchange kafka (#2167) --- .../use-exchange/ex-ug-import-from-kafka.md | 183 +++++++----------- 1 file changed, 68 insertions(+), 115 deletions(-) diff --git a/docs-2.0/nebula-exchange/use-exchange/ex-ug-import-from-kafka.md b/docs-2.0/nebula-exchange/use-exchange/ex-ug-import-from-kafka.md index c15c93c1397..c5e20c5e954 100644 --- a/docs-2.0/nebula-exchange/use-exchange/ex-ug-import-from-kafka.md +++ b/docs-2.0/nebula-exchange/use-exchange/ex-ug-import-from-kafka.md @@ -98,6 +98,10 @@ For more information, see [Quick start workflow](../../2.quick-start/1.quick-sta After Exchange is compiled, copy the conf file `target/classes/application.conf` to set Kafka data source configuration. In this example, the copied file is called `kafka_application.conf`. For details on each configuration item, see [Parameters in the configuration file](../parameter-reference/ex-ug-parameter.md). +!!! note + + When importing Kafka data, a configuration file can only handle one tag or edge type. If there are multiple tag or edge types, you need to create multiple configuration files. + ```conf { # Spark configuration @@ -195,124 +199,73 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf` # Flow control, with a rate limit on the maximum offset processed per trigger interval, may not be configured. # maxOffsetsPerTrigger:10000 } - # Set the information about the Tag Team. - { - name: team - type: { - source: kafka - sink: client - } - service: "127.0.0.1:9092" - topic: "topic_name2" - fields: [key] - nebula.fields: [name] - vertex:{ - field:key - } - batch: 10 - partition: 10 - interval.seconds: 10 - startingOffsets: latest - # maxOffsetsPerTrigger:10000 - } - ] # Processing edges - edges: [ - # Set the information about the Edge Type follow. - { - # The corresponding Edge Type name in NebulaGraph. - name: follow - - type: { - # Specify the data source file format to Kafka. - source: kafka - - # Specify how to import the Edge type data into NebulaGraph. - # Specify how to import the data into NebulaGraph. Only client is supported. - sink: client - } - - # Kafka server address. - service: "127.0.0.1:9092" - # Message category. - topic: "topic_name3" - - # Kafka data has a fixed domain name: key, value, topic, partition, offset, timestamp, timestampType. - # If multiple fields need to be specified after Spark reads as DataFrame, separate them with commas. - # Specify the field name in fields. For example, use key for degree in Nebula, as shown in the following. - fields: [key] - nebula.fields: [degree] - - # In source, use a column in the topic as the source of the edge's source vertex. - # In target, use a column in the topic as the source of the edge's destination vertex. - source:{ - field:timestamp - # udf:{ - # separator:"_" - # oldColNames:[field-0,field-1,field-2] - # newColName:new-field - # } - } - - - target:{ - field:offset - # udf:{ - # separator:"_" - # oldColNames:[field-0,field-1,field-2] - # newColName:new-field - # } - } - - # (Optional) Specify a column as the source of the rank. - #ranking: rank - - # The number of data written to NebulaGraph in a single batch. - batch: 10 - - # The number of Spark partitions. - partition: 10 - - # The interval for message reading. Unit: second. - interval.seconds: 10 - # The consumer offsets. The default value is latest. Optional value are latest and earliest. - startingOffsets: latest - # Flow control, with a rate limit on the maximum offset processed per trigger interval, may not be configured. - # maxOffsetsPerTrigger:10000 - } - - # Set the information about the Edge Type serve. - { - name: serve - type: { - source: kafka - sink: client - } - service: "127.0.0.1:9092" - topic: "topic_name4" - - fields: [timestamp,offset] - nebula.fields: [start_year,end_year] - source:{ - field:key - } - - target:{ - field:value - } - - # (Optional) Specify a column as the source of the rank. - #ranking: rank - - batch: 10 - partition: 10 - interval.seconds: 10 - startingOffsets: latest - # maxOffsetsPerTrigger:10000 - } - ] + #edges: [ + # # Set the information about the Edge Type follow. + # { + # # The corresponding Edge Type name in NebulaGraph. + # name: follow + + # type: { + # # Specify the data source file format to Kafka. + # source: kafka + + # # Specify how to import the Edge type data into NebulaGraph. + # # Specify how to import the data into NebulaGraph. Only client is supported. + # sink: client + # } + + # # Kafka server address. + # service: "127.0.0.1:9092" + # # Message category. + # topic: "topic_name3" + + # # Kafka data has a fixed domain name: key, value, topic, partition, offset, timestamp, timestampType. + # # If multiple fields need to be specified after Spark reads as DataFrame, separate them with commas. + # # Specify the field name in fields. For example, use key for degree in Nebula, as shown in the following. + # fields: [key] + # nebula.fields: [degree] + + # # In source, use a column in the topic as the source of the edge's source vertex. + # # In target, use a column in the topic as the source of the edge's destination vertex. + # source:{ + # field:timestamp + # # udf:{ + # # separator:"_" + # # oldColNames:[field-0,field-1,field-2] + # # newColName:new-field + # # } + # } + + + # target:{ + # field:offset + # # udf:{ + # # separator:"_" + # # oldColNames:[field-0,field-1,field-2] + # # newColName:new-field + # # } + # } + + # # (Optional) Specify a column as the source of the rank. + # #ranking: rank + + # # The number of data written to NebulaGraph in a single batch. + # batch: 10 + + # # The number of Spark partitions. + # partition: 10 + + # # The interval for message reading. Unit: second. + # interval.seconds: 10 + # # The consumer offsets. The default value is latest. Optional value are latest and earliest. + # startingOffsets: latest + # # Flow control, with a rate limit on the maximum offset processed per trigger interval, may not be configured. + # # maxOffsetsPerTrigger:10000 + # } + #] } ```