Skip to content

Commit

Permalink
fix NebulaPool's not serializable & refactor ssl (#32)
Browse files Browse the repository at this point in the history
* rebase master

* refactor ssl name

* fix NebulaPool's not serializable

* add more datatype

* rich data process for Source and Sink

* provide version match

* fix typo

* add shuffle for hosts
  • Loading branch information
Nicole00 authored Nov 1, 2021
1 parent 6541047 commit 7272b2e
Show file tree
Hide file tree
Showing 7 changed files with 289 additions and 96 deletions.
29 changes: 11 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,14 @@ To use Nebula Flink Connector, do a check of these:
- Java 8 or a higher version is installed.
- Nebula Graph v2.0 is deployed. For more information, see [Deployment and installation of Nebula Graph](https://docs.nebula-graph.io/2.0/4.deployment-and-installation/1.resource-preparations/ "Click to go to Nebula Graph website").

### Build Nebula Flink Connector

1. Install the lastest java client 2.0.
```
$ git clone https://github.com/vesoft-inc/nebula-java.git
$ cd nebula-java
$ mvn clean install -Dmaven.test.skip=true -Dgpg.skip -Dmaven.javadoc.skip=true
```
2. Package Nebula Flink Connector.
```
$ git clone https://github.com/vesoft-inc/nebula-flink-connector.git
$ cd nebula-flink-connector/connector
$ mvn clean package -Dmaven.test.skip=true -Dgpg.skip -Dmaven.javadoc.skip=true
```
### Use in Maven
Add the dependency to your pom.xml.

```
<dependency>
<groupId>com.vesoft</groupId>
<artifactId>nebula-flink-connector</artifactId>
<version>2.0.0</version>
<version>2.6.0</version>
</dependency>
```

Expand Down Expand Up @@ -81,3 +64,13 @@ DataStream<Row> dataStream = playerSource.map(row -> {
dataStream.addSink(nebulaSinkFunction);
env.execute("write nebula")
```
## Version match

There are the version correspondence between Nebula Flink Connector and Nebula:

| Nebula Flink Connector Version | Nebula Version |
|:-----------------------:|:--------------:|
| 2.0.0 | 2.0.0, 2.0.1 |
| 2.5.0 | 2.5.0, 2.5.1 |
| 2.6.0 | 2.6.0 |
| 2.0-SNAPSHOT | nightly |
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.io.Serializable;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.flink.connector.nebula.utils.NebulaConstant;
import org.slf4j.Logger;
Expand All @@ -29,53 +30,53 @@ public class NebulaGraphConnectionProvider implements Serializable {
private static final long serialVersionUID = 8392002706492085208L;

private final NebulaClientOptions nebulaClientOptions;
private final NebulaPool nebulaPool = new NebulaPool();

public NebulaGraphConnectionProvider(NebulaClientOptions nebulaClientOptions) {
this.nebulaClientOptions = nebulaClientOptions;
}

/**
* get Session to execute query statement
*/
public NebulaPool getNebulaPool() throws UnknownHostException {
List<HostAddress> addresses = new ArrayList<>();
for (String address : nebulaClientOptions.getGraphAddress().split(NebulaConstant.COMMA)) {
String[] hostAndPort = address.split(NebulaConstant.COLON);
addresses.add(new HostAddress(hostAndPort[0], Integer.parseInt(hostAndPort[1])));
}

try {
NebulaPoolConfig poolConfig = new NebulaPoolConfig();
poolConfig.setTimeout(nebulaClientOptions.getTimeout());
if (nebulaClientOptions.isEnableGraphSSL()) {
poolConfig.setEnableSsl(true);
switch (nebulaClientOptions.getSSLSighType()) {
case CA:
poolConfig.setSslParam(nebulaClientOptions.getCaSignParam());
break;
case SELF:
poolConfig.setSslParam(nebulaClientOptions.getSelfSignParam());
break;
default:
throw new IllegalArgumentException("ssl sign type is not supported.");
}
Collections.shuffle(addresses);
NebulaPool nebulaPool = new NebulaPool();
NebulaPoolConfig poolConfig = new NebulaPoolConfig();
poolConfig.setTimeout(nebulaClientOptions.getTimeout());
if (nebulaClientOptions.isEnableGraphSSL()) {
poolConfig.setEnableSsl(true);
switch (nebulaClientOptions.getSSLSighType()) {
case CA:
poolConfig.setSslParam(nebulaClientOptions.getCaSignParam());
break;
case SELF:
poolConfig.setSslParam(nebulaClientOptions.getSelfSignParam());
break;
default:
throw new IllegalArgumentException("ssl sign type is not supported.");
}
nebulaPool.init(addresses, poolConfig);
} catch (UnknownHostException e) {
LOG.error("NebulaPool init error, ", e);
}
nebulaPool.init(addresses, poolConfig);
return nebulaPool;
}

/**
* get Session to execute query statement
* get username
*/
public Session getSession() throws NotValidConnectionException, IOErrorException,
AuthFailedException, ClientServerIncompatibleException {
return nebulaPool.getSession(
nebulaClientOptions.getUsername(),
nebulaClientOptions.getPassword(),
true);
public String getUserName() {
return nebulaClientOptions.getUsername();
}

/**
* close nebula pool
* get password
*/
public void close() {
nebulaPool.close();
public String getPassword() {
return nebulaClientOptions.getPassword();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
import com.vesoft.nebula.client.graph.exception.IOErrorException;
import com.vesoft.nebula.client.graph.exception.NotValidConnectionException;
import com.vesoft.nebula.client.graph.net.NebulaPool;
import com.vesoft.nebula.client.graph.net.Session;
import com.vesoft.nebula.client.meta.MetaClient;
import java.io.Flushable;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -34,6 +36,7 @@ public class NebulaBatchOutputFormat<T> extends RichOutputFormat<T> implements F
private static final long serialVersionUID = 8846672119763512586L;

private volatile AtomicLong numPendingRow;
private NebulaPool nebulaPool;
private Session session;
private MetaClient metaClient;
private NebulaBatchExecutor nebulaBatchExecutor;
Expand All @@ -58,9 +61,11 @@ public void configure(Configuration configuration) {
@Override
public void open(int i, int i1) throws IOException {
try {
session = graphProvider.getSession();
} catch (NotValidConnectionException | IOErrorException
| AuthFailedException | ClientServerIncompatibleException e) {
nebulaPool = graphProvider.getNebulaPool();
session = nebulaPool.getSession(graphProvider.getUserName(),
graphProvider.getPassword(), true);
} catch (UnknownHostException | NotValidConnectionException | AuthFailedException
| ClientServerIncompatibleException | IOErrorException e) {
LOG.error("failed to get graph session, ", e);
throw new IOException("get graph session error, ", e);
}
Expand Down Expand Up @@ -137,8 +142,8 @@ public final synchronized void close() throws IOException {
if (session != null) {
session.release();
}
if (graphProvider != null) {
graphProvider.close();
if (nebulaPool != null) {
nebulaPool.close();
}
if (metaClient != null) {
metaClient.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
import com.vesoft.nebula.client.graph.exception.IOErrorException;
import com.vesoft.nebula.client.graph.exception.NotValidConnectionException;
import com.vesoft.nebula.client.graph.net.NebulaPool;
import java.net.UnknownHostException;
import org.apache.flink.connector.nebula.utils.SSLSighType;
import org.junit.After;
import org.junit.Before;
Expand All @@ -30,7 +32,7 @@ public void tearDown() throws Exception {
}

@Test
public void getSession() {
public void getNebulaPool() {
NebulaClientOptions nebulaClientOptions =
new NebulaClientOptions.NebulaClientOptionsBuilder()
.setGraphAddress("127.0.0.1:9669")
Expand All @@ -43,7 +45,8 @@ public void getSession() {
NebulaGraphConnectionProvider graphConnectionProvider =
new NebulaGraphConnectionProvider(nebulaClientOptions);
try {
graphConnectionProvider.getSession();
NebulaPool nebulaPool = graphConnectionProvider.getNebulaPool();
nebulaPool.getSession("root", "nebula", true);
} catch (Exception e) {
LOG.info("get session failed, ", e);
assert (false);
Expand Down Expand Up @@ -76,8 +79,10 @@ public void getSessionWithSsl() throws NotValidConnectionException {
new NebulaGraphConnectionProvider(nebulaClientOptions);

try {
graphConnectionProvider.getSession();
} catch (IOErrorException | AuthFailedException | ClientServerIncompatibleException e) {
NebulaPool pool = graphConnectionProvider.getNebulaPool();
pool.getSession("root", "nebula", true);
} catch (UnknownHostException | IOErrorException | AuthFailedException
| ClientServerIncompatibleException e) {
LOG.error("get session faied, ", e);
assert (false);
}
Expand Down
Loading

0 comments on commit 7272b2e

Please sign in to comment.