Flink Connector for Nebula Graph
Nebula-Flink-Connector 2.0/3.0 is a connector that helps Flink users to easily access Nebula Graph 2.0/3.0. If you want to access Nebula Graph 1.x with Flink, please refer to Nebula-Flink-Connector 1.0.
To use Nebula Flink Connector, do a check of these:
- Java 8 or a higher version is installed.
- Nebula Graph is deployed. For more information, see Deployment and installation of Nebula Graph.
Add the dependency to your pom.xml.
<dependency>
<groupId>com.vesoft</groupId>
<artifactId>nebula-flink-connector</artifactId>
<version>3.0-SNAPSHOT</version>
</dependency>
To write data into Nebula Graph using Flink.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
NebulaClientOptions nebulaClientOptions = new NebulaClientOptions.NebulaClientOptionsBuilder()
.setGraphAddress("127.0.0.1:9669")
.setMetaAddress("127.0.0.1:9559")
.build();
NebulaGraphConnectionProvider graphConnectionProvider = new NebulaGraphConnectionProvider(nebulaClientOptions);
NebulaMetaConnectionProvider metaConnectionProvider = new NebulaMetaConnectionProvider(nebulaClientOptions);
ExecutionOptions executionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
.setGraphSpace("flinkSink")
.setTag("player")
.setIdIndex(0)
.setFields(Arrays.asList("name", "age"))
.setPositions(Arrays.asList(1, 2))
.setBatch(2)
.builder();
NebulaBatchOutputFormat outPutFormat =
new NebulaBatchOutputFormat(graphConnectionProvider, metaConnectionProvider)
.setExecutionOptions(executionOptions);
NebulaSinkFunction nebulaSinkFunction = new NebulaSinkFunction(outPutFormat);
DataStream<Row> dataStream = playerSource.map(row -> {
Row record = new org.apache.flink.types.Row(row.size());
for (int i = 0; i < row.size(); i++) {
record.setField(i, row.get(i));
}
return record;
});
dataStream.addSink(nebulaSinkFunction);
env.execute("write nebula")
There are the version correspondence between Nebula Flink Connector and Nebula:
Nebula Flink Connector Version | Nebula Version |
---|---|
2.0.0 | 2.0.0, 2.0.1 |
2.5.0 | 2.5.0, 2.5.1 |
2.6.0 | 2.6.0, 2.6.1 |
2.6.1 | 2.6.0, 2.6.1 |
3.0.0 | 3.0.x, 3.1.x |
3.0-SNAPSHOT | nightly |
Flink version requirements: 1.11.x