Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update exchange kafka #2167

Merged
merged 1 commit into from
Jul 13, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 68 additions & 115 deletions docs-2.0/nebula-exchange/use-exchange/ex-ug-import-from-kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ For more information, see [Quick start workflow](../../2.quick-start/1.quick-sta

After Exchange is compiled, copy the conf file `target/classes/application.conf` to set Kafka data source configuration. In this example, the copied file is called `kafka_application.conf`. For details on each configuration item, see [Parameters in the configuration file](../parameter-reference/ex-ug-parameter.md).

!!! note

When importing Kafka data, a configuration file can only handle one tag or edge type. If there are multiple tag or edge types, you need to create multiple configuration files.

```conf
{
# Spark configuration
Expand Down Expand Up @@ -195,124 +199,73 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# Flow control, with a rate limit on the maximum offset processed per trigger interval, may not be configured.
# maxOffsetsPerTrigger:10000
}
# Set the information about the Tag Team.
{
name: team
type: {
source: kafka
sink: client
}
service: "127.0.0.1:9092"
topic: "topic_name2"
fields: [key]
nebula.fields: [name]
vertex:{
field:key
}
batch: 10
partition: 10
interval.seconds: 10
startingOffsets: latest
# maxOffsetsPerTrigger:10000
}

]

# Processing edges
edges: [
# Set the information about the Edge Type follow.
{
# The corresponding Edge Type name in NebulaGraph.
name: follow

type: {
# Specify the data source file format to Kafka.
source: kafka

# Specify how to import the Edge type data into NebulaGraph.
# Specify how to import the data into NebulaGraph. Only client is supported.
sink: client
}

# Kafka server address.
service: "127.0.0.1:9092"
# Message category.
topic: "topic_name3"

# Kafka data has a fixed domain name: key, value, topic, partition, offset, timestamp, timestampType.
# If multiple fields need to be specified after Spark reads as DataFrame, separate them with commas.
# Specify the field name in fields. For example, use key for degree in Nebula, as shown in the following.
fields: [key]
nebula.fields: [degree]

# In source, use a column in the topic as the source of the edge's source vertex.
# In target, use a column in the topic as the source of the edge's destination vertex.
source:{
field:timestamp
# udf:{
# separator:"_"
# oldColNames:[field-0,field-1,field-2]
# newColName:new-field
# }
}


target:{
field:offset
# udf:{
# separator:"_"
# oldColNames:[field-0,field-1,field-2]
# newColName:new-field
# }
}

# (Optional) Specify a column as the source of the rank.
#ranking: rank

# The number of data written to NebulaGraph in a single batch.
batch: 10

# The number of Spark partitions.
partition: 10

# The interval for message reading. Unit: second.
interval.seconds: 10
# The consumer offsets. The default value is latest. Optional value are latest and earliest.
startingOffsets: latest
# Flow control, with a rate limit on the maximum offset processed per trigger interval, may not be configured.
# maxOffsetsPerTrigger:10000
}

# Set the information about the Edge Type serve.
{
name: serve
type: {
source: kafka
sink: client
}
service: "127.0.0.1:9092"
topic: "topic_name4"

fields: [timestamp,offset]
nebula.fields: [start_year,end_year]
source:{
field:key
}

target:{
field:value
}

# (Optional) Specify a column as the source of the rank.
#ranking: rank

batch: 10
partition: 10
interval.seconds: 10
startingOffsets: latest
# maxOffsetsPerTrigger:10000
}
]
#edges: [
# # Set the information about the Edge Type follow.
# {
# # The corresponding Edge Type name in NebulaGraph.
# name: follow

# type: {
# # Specify the data source file format to Kafka.
# source: kafka

# # Specify how to import the Edge type data into NebulaGraph.
# # Specify how to import the data into NebulaGraph. Only client is supported.
# sink: client
# }

# # Kafka server address.
# service: "127.0.0.1:9092"
# # Message category.
# topic: "topic_name3"

# # Kafka data has a fixed domain name: key, value, topic, partition, offset, timestamp, timestampType.
# # If multiple fields need to be specified after Spark reads as DataFrame, separate them with commas.
# # Specify the field name in fields. For example, use key for degree in Nebula, as shown in the following.
# fields: [key]
# nebula.fields: [degree]

# # In source, use a column in the topic as the source of the edge's source vertex.
# # In target, use a column in the topic as the source of the edge's destination vertex.
# source:{
# field:timestamp
# # udf:{
# # separator:"_"
# # oldColNames:[field-0,field-1,field-2]
# # newColName:new-field
# # }
# }


# target:{
# field:offset
# # udf:{
# # separator:"_"
# # oldColNames:[field-0,field-1,field-2]
# # newColName:new-field
# # }
# }

# # (Optional) Specify a column as the source of the rank.
# #ranking: rank

# # The number of data written to NebulaGraph in a single batch.
# batch: 10

# # The number of Spark partitions.
# partition: 10

# # The interval for message reading. Unit: second.
# interval.seconds: 10
# # The consumer offsets. The default value is latest. Optional value are latest and earliest.
# startingOffsets: latest
# # Flow control, with a rate limit on the maximum offset processed per trigger interval, may not be configured.
# # maxOffsetsPerTrigger:10000
# }
#]
}
```

Expand Down