Skip to content

Commit

Permalink
update flink (#2532) (#2533)
Browse files Browse the repository at this point in the history
* update flink

#2517

* Update nebula-flink-connector.md
  • Loading branch information
cooper-lzy authored Apr 10, 2024
1 parent 8dcbcd7 commit e66ede6
Show file tree
Hide file tree
Showing 3 changed files with 570 additions and 15 deletions.
280 changes: 279 additions & 1 deletion docs-2.0-en/connector/nebula-flink-connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,282 @@ NebulaGraph Flink Connector applies to the following scenarios:

## Release note

[Release](https://github.com/vesoft-inc/nebula-flink-connector/releases/tag/{{flinkconnector.tag}})
[Release](https://github.com/vesoft-inc/nebula-flink-connector/releases/tag/{{flinkconnector.tag}})

## Version compatibility

The correspondence between the NebulaGraph Flink Connector version and the NebulaGraph core version is as follows.

| Flink Connector version | NebulaGraph version |
|:----------|:-----------|
| 3.0-SNAPSHOT | nightly |
| 3.5.0 | 3.x.x |
| 3.3.0 | 3.x.x |
| 3.0.0 | 3.x.x |
| 2.6.1 | 2.6.0, 2.6.1 |
| 2.6.0 | 2.6.0, 2.6.1 |
| 2.5.0 | 2.5.0, 2.5.1 |
| 2.0.0 | 2.0.0, 2.0.1 |

## Prerequisites

- Java 8 or later is installed.
- Flink 1.11.x is installed.

## Get NebulaGraph Flink Connector

### Configure Maven dependency

Add the following dependency to the Maven configuration file `pom.xml` to automatically obtain the Flink Connector.

```xml
<dependency>
<groupId>com.vesoft</groupId>
<artifactId>nebula-flink-connector</artifactId>
<version>{{flinkconnector.release}}</version>
</dependency>
```

### Compile and package

Follow the steps below to compile and package the Flink Connector.

1. Clone repository `nebula-flink-connector`.

```bash
$ git clone -b {{flinkconnector.branch}} https://github.com/vesoft-inc/nebula-flink-connector.git
```

2. Enter the `nebula-flink-connector` directory.

3. Compile and package.

```bash
$ mvn clean package -Dmaven.test.skip=true
```

After compilation, a file similar to `nebula-flink-connector-{{flinkconnector.release}}.jar` is generated in the directory `connector/target` of the folder.

## How to use

### Write data into NebulaGraph

```java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
NebulaClientOptions nebulaClientOptions = new NebulaClientOptions.NebulaClientOptionsBuilder()
.setGraphAddress("127.0.0.1:9669")
.setMetaAddress("127.0.0.1:9559")
.build();
NebulaGraphConnectionProvider graphConnectionProvider = new NebulaGraphConnectionProvider(nebulaClientOptions);
NebulaMetaConnectionProvider metaConnectionProvider = new NebulaMetaConnectionProvider(nebulaClientOptions);

VertexExecutionOptions executionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
.setGraphSpace("flinkSink")
.setTag("player")
.setIdIndex(0)
.setFields(Arrays.asList("name", "age"))
.setPositions(Arrays.asList(1, 2))
.setBatchSize(2)
.build();

NebulaVertexBatchOutputFormat outputFormat = new NebulaVertexBatchOutputFormat(
graphConnectionProvider, metaConnectionProvider, executionOptions);
NebulaSinkFunction<Row> nebulaSinkFunction = new NebulaSinkFunction<>(outputFormat);
DataStream<Row> dataStream = playerSource.map(row -> {
Row record = new org.apache.flink.types.Row(row.size());
for (int i = 0; i < row.size(); i++) {
record.setField(i, row.get(i));
}
return record;
});
dataStream.addSink(nebulaSinkFunction);
env.execute("write nebula")
```

### Read data from NebulaGraph

```java
NebulaClientOptions nebulaClientOptions = new NebulaClientOptions.NebulaClientOptionsBuilder()
.setMetaAddress("127.0.0.1:9559")
.build();
storageConnectionProvider = new NebulaStorageConnectionProvider(nebulaClientOptions);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

VertexExecutionOptions vertexExecutionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
.setGraphSpace("flinkSource")
.setTag("person")
.setNoColumn(false)
.setFields(Arrays.asList())
.setLimit(100)
.build();

NebulaSourceFunction sourceFunction = new NebulaSourceFunction(storageConnectionProvider)
.setExecutionOptions(vertexExecutionOptions);
DataStreamSource<BaseTableRow> dataStreamSource = env.addSource(sourceFunction);
dataStreamSource.map(row -> {
List<ValueWrapper> values = row.getValues();
Row record = new Row(15);
record.setField(0, values.get(0).asLong());
record.setField(1, values.get(1).asString());
record.setField(2, values.get(2).asString());
record.setField(3, values.get(3).asLong());
record.setField(4, values.get(4).asLong());
record.setField(5, values.get(5).asLong());
record.setField(6, values.get(6).asLong());
record.setField(7, values.get(7).asDate());
record.setField(8, values.get(8).asDateTime().getUTCDateTimeStr());
record.setField(9, values.get(9).asLong());
record.setField(10, values.get(10).asBoolean());
record.setField(11, values.get(11).asDouble());
record.setField(12, values.get(12).asDouble());
record.setField(13, values.get(13).asTime().getUTCTimeStr());
record.setField(14, values.get(14).asGeography());
return record;
}).print();
env.execute("NebulaStreamSource");
```

### Parameter descriptions

- `NebulaClientOptions` is the configuration for connecting to NebulaGraph, as described below.

|Parameter|Type|Required|Description|
|:---|:---|:---|:---|
|`setGraphAddress` |String |Yes | The Graph service address of NebulaGraph. |
|`setMetaAddress` | String|Yes | The Meta service address of NebulaGraph. |

- `VertexExecutionOptions` is the configuration for reading vertices from and writing vertices to NebulaGraph, as described below.

|Parameter|Type|Required|Description|
|:---|:---|:---|:---|
|`setGraphSpace` |String |Yes | The graph space name. |
|`setTag` |String |Yes | The tag name. |
|`setIdIndex` |Int |Yes | The subscript of the stream data field that is used as the VID when writing data to NebulaGraph. |
|`setFields` |List|Yes | A collection of the property names of a tag. It is used to write data to or read data from NebulaGraph.</br> Make sure the `setNoColumn` is `false` when reading data; otherwise, the configuration is invalid.</br> If this parameter is empty, all properties are read when reading data from NebulaGraph. |
|`setPositions` |List |Yes | A collection of the subscripts of the stream data fields. It indicates that the corresponding field values are written to NebulaGraph as property values. This parameter needs to correspond to `setFields`. |
|`setBatchSize` |String |No | The maximum number of data records to write to NebulaGraph at a time. The default value is `2000`. |
|`setNoColumn` |String |No | The properties are not to be read if set to `true` when reading data. The default value is `false`. |
|`setLimit` |String| No| The maximum number of data records to pull at a time when reading data. The default value is `2000`. |

- `EdgeExecutionOptions` is the configuration for reading edges from and writing edges to NebulaGraph, as described below.

|Parameter|Type|Required|Description|
|:---|:---|:---|:---|
|`setGraphSpace` |String| Yes| The graph space name. |
|`setEdge` |String |Yes | The edge type name. |
|`setSrcIndex` |Int| Yes| The subscript of the stream data field that is used as the VID of the source vertex when writing data to NebulaGraph. |
|`setDstIndex` |Int| Yes| The subscript of the stream data field that is used as the VID of the destination vertex when writing data to NebulaGraph. |
|`setRankIndex` |Int| Yes| The subscript of the stream data field that is used as the rank of the edge when writing data to NebulaGraph. |
|`setFields` |List| Yes| A collection of the property names of an edge type. It is used to write data to or read data from NebulaGraph.</br> Make sure the `setNoColumn` is `false` when reading data; otherwise, the configuration is invalid.</br> If this parameter is empty, all properties are read when reading data from NebulaGraph. |
|`setPositions` |List |Yes | A collection of the subscripts of the stream data fields. It indicates that the corresponding field values are written to NebulaGraph as property values. This parameter needs to correspond to `setFields`. |
|`setBatchSize` |String |No | The maximum number of data records to write to NebulaGraph at a time. The default value is `2000`. |
|`setNoColumn` |String |No | The properties are not to be read if set to `true` when reading data. The default value is `false`. |
|`setLimit` |String| No| The maximum number of data records to pull at a time when reading data. The default value is `2000`. |

## Example

1. Create a graph space.

```java
NebulaCatalog nebulaCatalog = NebulaCatalogUtils.createNebulaCatalog(
"NebulaCatalog",
"default",
"root",
"nebula",
"127.0.0.1:9559",
"127.0.0.1:9669");

EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

tableEnv.registerCatalog(CATALOG_NAME, nebulaCatalog);
tableEnv.useCatalog(CATALOG_NAME);

String createDataBase = "CREATE DATABASE IF NOT EXISTS `db1`"
+ " COMMENT 'space 1'"
+ " WITH ("
+ " 'partition_num' = '100',"
+ " 'replica_factor' = '3',"
+ " 'vid_type' = 'FIXED_STRING(10)'"
+ ")";
tableEnv.executeSql(createDataBase);
```

2. Create a tag.

```java
tableEnvironment.executeSql("CREATE TABLE `person` ("
+ " vid BIGINT,"
+ " col1 STRING,"
+ " col2 STRING,"
+ " col3 BIGINT,"
+ " col4 BIGINT,"
+ " col5 BIGINT,"
+ " col6 BIGINT,"
+ " col7 DATE,"
+ " col8 TIMESTAMP,"
+ " col9 BIGINT,"
+ " col10 BOOLEAN,"
+ " col11 DOUBLE,"
+ " col12 DOUBLE,"
+ " col13 TIME,"
+ " col14 STRING"
+ ") WITH ("
+ " 'connector' = 'nebula',"
+ " 'meta-address' = '127.0.0.1:9559',"
+ " 'graph-address' = '127.0.0.1:9669',"
+ " 'username' = 'root',"
+ " 'password' = 'nebula',"
+ " 'data-type' = 'vertex',"
+ " 'graph-space' = 'flink_test',"
+ " 'label-name' = 'person'"
+ ")"
);
```

3. Create an edge type.

```java
tableEnvironment.executeSql("CREATE TABLE `friend` ("
+ " sid BIGINT,"
+ " did BIGINT,"
+ " rid BIGINT,"
+ " col1 STRING,"
+ " col2 STRING,"
+ " col3 BIGINT,"
+ " col4 BIGINT,"
+ " col5 BIGINT,"
+ " col6 BIGINT,"
+ " col7 DATE,"
+ " col8 TIMESTAMP,"
+ " col9 BIGINT,"
+ " col10 BOOLEAN,"
+ " col11 DOUBLE,"
+ " col12 DOUBLE,"
+ " col13 TIME,"
+ " col14 STRING"
+ ") WITH ("
+ " 'connector' = 'nebula',"
+ " 'meta-address' = '127.0.0.1:9559',"
+ " 'graph-address' = '127.0.0.1:9669',"
+ " 'username' = 'root',"
+ " 'password' = 'nebula',"
+ " 'graph-space' = 'flink_test',"
+ " 'label-name' = 'friend',"
+ " 'data-type'='edge',"
+ " 'src-id-index'='0',"
+ " 'dst-id-index'='1',"
+ " 'rank-id-index'='2'"
+ ")"
);
```

4. Queries the data of an edge type and inserts it into another edge type.

```java
Table table = tableEnvironment.sqlQuery("SELECT * FROM `friend`");
table.executeInsert("`friend_sink`").await();
```
23 changes: 12 additions & 11 deletions docs-2.0-en/connector/nebula-spark-connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ The features of NebulaGraph Spark Connector {{sparkconnector.release}} are as fo

## Get NebulaGraph Spark Connector

### Compile package
### Compile and package



Expand All @@ -77,9 +77,10 @@ The features of NebulaGraph Spark Connector {{sparkconnector.release}} are as fo
```bash
$ git clone -b {{sparkconnector.branch}} https://github.com/vesoft-inc/nebula-spark-connector.git
```

2. Enter the `nebula-spark-connector` directory.

3. Compile package. The procedure varies with Spark versions.
3. Compile and package. The procedure varies with Spark versions.

!!! note

Expand All @@ -103,7 +104,7 @@ The features of NebulaGraph Spark Connector {{sparkconnector.release}} are as fo
$ mvn clean package -Dmaven.test.skip=true -Dgpg.skip -Dmaven.javadoc.skip=true -pl nebula-spark-connector_3.0 -am -Pscala-2.12 -Pspark-3.0
```

After compilation, a similar file `nebula-spark-connector-{{sparkconnector.release}}-SHANPSHOT.jar` is generated in the directory `target` of the folder.
After compilation, a file similar to `nebula-spark-connector-{{sparkconnector.release}}-SHANPSHOT.jar` is generated in the directory `target` of the folder.

### Download maven remote repository

Expand Down Expand Up @@ -168,12 +169,12 @@ val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
val edge = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF()
```
- `NebulaConnectionConfig` is the configuration for connecting to the nebula graph, as described below.
- `NebulaConnectionConfig` is the configuration for connecting to NebulaGraph, as described below.
|Parameter|Required|Description|
|:---|:---|:---|
|`withMetaAddress` |Yes| Specifies the IP addresses and ports of all Meta Services. Separate multiple addresses with commas. The format is `ip1:port1,ip2:port2,...`. Read data is no need to configure `withGraphAddress`. |
|`withConnectionRetry` |No| The number of retries that the NebulaGraph Java Client connected to the NebulaGraph. The default value is `1`. |
|`withConnectionRetry` |No| The number of retries that the NebulaGraph Java Client connected to NebulaGraph. The default value is `1`. |
|`withExecuteRetry` |No| The number of retries that the NebulaGraph Java Client executed query statements. The default value is `1`. |
|`withTimeout` |No| The timeout for the NebulaGraph Java Client request response. The default value is `6000`, Unit: ms. |
Expand All @@ -192,7 +193,7 @@ val edge = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF()
!!! note
The values of columns in a dataframe are automatically written to the NebulaGraph as property values.
The values of columns in a dataframe are automatically written to NebulaGraph as property values.
```scala
val config = NebulaConnectionConfig
Expand Down Expand Up @@ -260,7 +261,7 @@ df.write.nebula(config, nebulaWriteVertexConfig).writeVertices()
|:---|:---|:---|
|`withMetaAddress` |Yes| Specifies the IP addresses and ports of all Meta Services. Separate multiple addresses with commas. The format is `ip1:port1,ip2:port2,...`. |
|`withGraphAddress` |Yes| Specifies the IP addresses and ports of Graph Services. Separate multiple addresses with commas. The format is `ip1:port1,ip2:port2,...`. |
|`withConnectionRetry` |No| Number of retries that the NebulaGraph Java Client connected to the NebulaGraph. The default value is `1`. |
|`withConnectionRetry` |No| Number of retries that the NebulaGraph Java Client connected to NebulaGraph. The default value is `1`. |
- `WriteNebulaVertexConfig` is the configuration of the write vertex, as described below.
Expand All @@ -271,8 +272,8 @@ df.write.nebula(config, nebulaWriteVertexConfig).writeVertices()
|`withVidField` |Yes| The column in the DataFrame as the vertex ID. |
|`withVidPolicy` |No| When writing the vertex ID, NebulaGraph use mapping function, supports HASH only. No mapping is performed by default. |
|`withVidAsProp` |No| Whether the column in the DataFrame that is the vertex ID is also written as an property. The default value is `false`. If set to `true`, make sure the Tag has the same property name as `VidField`. |
|`withUser` |No| NebulaGraph user name. If [authentication](../7.data-security/1.authentication/1.authentication.md) is disabled, you do not need to configure the user name and password. |
|`withPasswd` |No| The password for the NebulaGraph user name. |
|`withUser` |No| NebulaGraph username. If [authentication](7.data-security/1.authentication/1.authentication.md) is disabled, you do not need to configure the username and password. |
|`withPasswd` |No| The password for the NebulaGraph username. |
|`withBatch` |Yes| The number of rows of data written at a time. The default value is `1000`. |
|`withWriteMode`|No|Write mode. The optional values are `insert`, `update` and `delete`. The default value is `insert`.|
|`withDeleteEdge`|No|Whether to delete the related edges synchronously when deleting a vertex. The default value is `false`. It takes effect when `withWriteMode` is `delete`. |
Expand All @@ -291,7 +292,7 @@ df.write.nebula(config, nebulaWriteVertexConfig).writeVertices()
|`withSrcAsProperty` |No| Whether the column in the DataFrame that is the starting vertex is also written as an property. The default value is `false`. If set to `true`, make sure Edge type has the same property name as `SrcIdField`. |
|`withDstAsProperty` |No| Whether column that are destination vertex in the DataFrame are also written as property. The default value is `false`. If set to `true`, make sure Edge type has the same property name as `DstIdField`. |
|`withRankAsProperty` |No| Whether column in the DataFrame that is the rank is also written as property.The default value is `false`. If set to `true`, make sure Edge type has the same property name as `RankField`. |
|`withUser` |No| NebulaGraph user name. If [authentication](../7.data-security/1.authentication/1.authentication.md) is disabled, you do not need to configure the user name and password. |
|`withPasswd` |No| The password for the NebulaGraph user name. |
|`withUser` |No| NebulaGraph username. If [authentication](7.data-security/1.authentication/1.authentication.md) is disabled, you do not need to configure the username and password. |
|`withPasswd` |No| The password for the NebulaGraph username. |
|`withBatch` |Yes| The number of rows of data written at a time. The default value is `1000`. |
|`withWriteMode`|No|Write mode. The optional values are `insert`, `update` and `delete`. The default value is `insert`.|
Loading

0 comments on commit e66ede6

Please sign in to comment.