Skip to content

Commit

Permalink
support ssl for flink connector (#36)
Browse files Browse the repository at this point in the history
* rebase master

* refactor ssl name

* fix NebulaPool's not serializable

* fix typo

* support ssl for flink connector

* add example for SSL use

* fix test

* fix var name
  • Loading branch information
Nicole00 authored Nov 18, 2021
1 parent adb8352 commit d7be86b
Show file tree
Hide file tree
Showing 9 changed files with 255 additions and 43 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package org.apache.flink.connector.nebula.connection;

import java.io.Serializable;

public class CASignParams implements Serializable {

private String caCrtFilePath;
private String crtFilePath;
private String keyFilePath;

public CASignParams(String caCrtFilePath, String crtFilePath, String keyFilePath) {
this.caCrtFilePath = caCrtFilePath;
this.crtFilePath = crtFilePath;
this.keyFilePath = keyFilePath;
}

public String getCaCrtFilePath() {
return caCrtFilePath;
}

public void setCaCrtFilePath(String caCrtFilePath) {
this.caCrtFilePath = caCrtFilePath;
}

public String getCrtFilePath() {
return crtFilePath;
}

public void setCrtFilePath(String crtFilePath) {
this.crtFilePath = crtFilePath;
}

public String getKeyFilePath() {
return keyFilePath;
}

public void setKeyFilePath(String keyFilePath) {
this.keyFilePath = keyFilePath;
}

@Override
public String toString() {
return "CASSLSignParams{"
+ "caCrtFilePath='" + caCrtFilePath + '\''
+ ", crtFilePath='" + crtFilePath + '\''
+ ", keyFilePath='" + keyFilePath + '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@

package org.apache.flink.connector.nebula.connection;

import com.vesoft.nebula.client.graph.data.CASignedSSLParam;
import com.vesoft.nebula.client.graph.data.HostAddress;
import com.vesoft.nebula.client.graph.data.SelfSignedSSLParam;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -34,18 +32,21 @@ public class NebulaClientOptions implements Serializable {

private final boolean enableMetaSSL;

private final boolean enableStorageSSL;

private final SSLSighType sslSighType;

private final CASignedSSLParam caSignParam;
private final CASignParams caSignParams;

private final SelfSignedSSLParam selfSignParam;
private final SelfSignParams selfSignParams;


private NebulaClientOptions(String metaAddress, String graphAddress, String username,
String password, int timeout, int connectRetry,
boolean enableGraphSSL, boolean enableMetaSSL,
SSLSighType sslSighType, CASignedSSLParam caSignParam,
SelfSignedSSLParam selfSignParam) {
boolean enableStorageSSL,
SSLSighType sslSighType, CASignParams caSignParams,
SelfSignParams selfSignParams) {
this.metaAddress = metaAddress;
this.graphAddress = graphAddress;
this.username = username;
Expand All @@ -54,9 +55,10 @@ private NebulaClientOptions(String metaAddress, String graphAddress, String user
this.connectRetry = connectRetry;
this.enableGraphSSL = enableGraphSSL;
this.enableMetaSSL = enableMetaSSL;
this.enableStorageSSL = enableStorageSSL;
this.sslSighType = sslSighType;
this.caSignParam = caSignParam;
this.selfSignParam = selfSignParam;
this.caSignParams = caSignParams;
this.selfSignParams = selfSignParams;
}

public List<HostAddress> getMetaAddress() {
Expand Down Expand Up @@ -96,16 +98,20 @@ public boolean isEnableMetaSSL() {
return enableMetaSSL;
}

public boolean isEnableStorageSSL() {
return enableStorageSSL;
}

public SSLSighType getSSLSighType() {
return sslSighType;
}

public CASignedSSLParam getCaSignParam() {
return caSignParam;
public CASignParams getCaSignParam() {
return caSignParams;
}

public SelfSignedSSLParam getSelfSignParam() {
return selfSignParam;
public SelfSignParams getSelfSignParam() {
return selfSignParams;
}

/**
Expand All @@ -122,9 +128,10 @@ public static class NebulaClientOptionsBuilder {
// ssl options
private boolean enableGraphSSL = false;
private boolean enableMetaSSL = false;
private boolean enableStorageSSL = false;
private SSLSighType sslSighType = null;
private CASignedSSLParam caSignParam = null;
private SelfSignedSSLParam selfSignParam = null;
private CASignParams caSignParams = null;
private SelfSignParams selfSignParams = null;

public NebulaClientOptionsBuilder setMetaAddress(String metaAddress) {
this.metaAddress = metaAddress;
Expand Down Expand Up @@ -166,48 +173,54 @@ public NebulaClientOptionsBuilder setEnableMetaSSL(boolean enableMetaSSL) {
return this;
}

public NebulaClientOptionsBuilder setEnableStorageSSL(boolean enableStorageSSL) {
this.enableStorageSSL = enableStorageSSL;
return this;
}


public NebulaClientOptionsBuilder setSSLSignType(SSLSighType sslSighType) {
this.sslSighType = sslSighType;
return this;
}

public NebulaClientOptionsBuilder setCaSignParam(String caCrtFilePath, String crtFilePath,
String keyFilePath) {
this.caSignParam = new CASignedSSLParam(caCrtFilePath, crtFilePath,
keyFilePath);
this.caSignParams = new CASignParams(caCrtFilePath, crtFilePath, keyFilePath);
return this;
}

public NebulaClientOptionsBuilder setSelfSignParam(String crtFilePath, String keyFilePath,
String password) {
this.selfSignParam = new SelfSignedSSLParam(crtFilePath, keyFilePath, password);
this.selfSignParams = new SelfSignParams(crtFilePath, keyFilePath, password);
return this;
}

public NebulaClientOptions build() {
if (metaAddress == null || metaAddress.trim().isEmpty()) {
throw new IllegalArgumentException("meta address can not be empty.");
}
if (enableMetaSSL || enableGraphSSL) {
// if meta is set to open ssl, then graph must be set to open ssl
if (enableMetaSSL && !enableGraphSSL) {
if (enableMetaSSL || enableGraphSSL || enableStorageSSL) {
// if storage is set to open ssl, then meta must be set to open ssl
if (enableStorageSSL && !enableMetaSSL) {
throw new IllegalArgumentException(
"meta ssl is enable, graph ssl must be enable");
"storage ssl is enabled, meta ssl must be enabled.");
}

if (sslSighType == null) {
throw new IllegalArgumentException("ssl is enable, ssl sign type must not be "
throw new IllegalArgumentException("ssl is enabled, ssl sign type must not be "
+ "null");
}
switch (sslSighType) {
case CA:
if (caSignParam == null) {
throw new IllegalArgumentException("ssl is enable and sign type is "
if (caSignParams == null) {
throw new IllegalArgumentException("ssl is enabled and sign type is "
+ "CA, caSignParam must not be null");
}
break;
case SELF:
if (selfSignParam == null) {
throw new IllegalArgumentException("ssl is enable and sign type is "
if (selfSignParams == null) {
throw new IllegalArgumentException("ssl is enabled and sign type is "
+ "CA, selfSignParam must not be null");
}
break;
Expand All @@ -226,9 +239,10 @@ public NebulaClientOptions build() {
connectRetry,
enableGraphSSL,
enableMetaSSL,
enableStorageSSL,
sslSighType,
caSignParam,
selfSignParam);
caSignParams,
selfSignParams);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@


import com.vesoft.nebula.client.graph.NebulaPoolConfig;
import com.vesoft.nebula.client.graph.data.CASignedSSLParam;
import com.vesoft.nebula.client.graph.data.HostAddress;
import com.vesoft.nebula.client.graph.exception.AuthFailedException;
import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
import com.vesoft.nebula.client.graph.exception.IOErrorException;
import com.vesoft.nebula.client.graph.exception.NotValidConnectionException;
import com.vesoft.nebula.client.graph.data.SSLParam;
import com.vesoft.nebula.client.graph.data.SelfSignedSSLParam;
import com.vesoft.nebula.client.graph.net.NebulaPool;
import com.vesoft.nebula.client.graph.net.Session;
import java.io.Serializable;
import java.net.UnknownHostException;
import java.util.ArrayList;
Expand Down Expand Up @@ -51,12 +49,20 @@ public NebulaPool getNebulaPool() throws UnknownHostException {
if (nebulaClientOptions.isEnableGraphSSL()) {
poolConfig.setEnableSsl(true);
switch (nebulaClientOptions.getSSLSighType()) {
case CA:
poolConfig.setSslParam(nebulaClientOptions.getCaSignParam());
case CA: {
CASignParams caSignParams = nebulaClientOptions.getCaSignParam();
SSLParam sslParam = new CASignedSSLParam(caSignParams.getCaCrtFilePath(),
caSignParams.getCrtFilePath(), caSignParams.getKeyFilePath());
poolConfig.setSslParam(sslParam);
break;
case SELF:
poolConfig.setSslParam(nebulaClientOptions.getSelfSignParam());
}
case SELF: {
SelfSignParams selfSignParams = nebulaClientOptions.getSelfSignParam();
SSLParam sslParam = new SelfSignedSSLParam(selfSignParams.getCrtFilePath(),
selfSignParams.getKeyFilePath(), selfSignParams.getPassword());
poolConfig.setSslParam(sslParam);
break;
}
default:
throw new IllegalArgumentException("ssl sign type is not supported.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
package org.apache.flink.connector.nebula.connection;

import com.facebook.thrift.TException;
import com.vesoft.nebula.client.graph.data.CASignedSSLParam;
import com.vesoft.nebula.client.graph.data.HostAddress;
import com.vesoft.nebula.client.graph.data.SSLParam;
import com.vesoft.nebula.client.graph.data.SelfSignedSSLParam;
import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
import com.vesoft.nebula.client.meta.MetaClient;
import com.vesoft.nebula.client.meta.exception.ExecuteFailedException;
Expand All @@ -15,11 +18,9 @@
import com.vesoft.nebula.meta.Schema;
import com.vesoft.nebula.meta.SpaceItem;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.connector.nebula.utils.NebulaConstant;
import org.apache.flink.connector.nebula.utils.VidTypeEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -36,7 +37,32 @@ public NebulaMetaConnectionProvider(NebulaClientOptions nebulaClientOptions) {

public MetaClient getMetaClient() throws TException, ClientServerIncompatibleException {
List<HostAddress> addresses = nebulaClientOptions.getMetaAddress();
MetaClient metaClient = new MetaClient(addresses);
int timeout = nebulaClientOptions.getTimeout();
int retry = nebulaClientOptions.getConnectRetry();
MetaClient metaClient;
if (nebulaClientOptions.isEnableMetaSSL()) {
switch (nebulaClientOptions.getSSLSighType()) {
case CA: {
CASignParams caSignParams = nebulaClientOptions.getCaSignParam();
SSLParam sslParam = new CASignedSSLParam(caSignParams.getCaCrtFilePath(),
caSignParams.getCrtFilePath(), caSignParams.getKeyFilePath());
metaClient = new MetaClient(addresses, timeout, retry, retry, true, sslParam);
break;
}
case SELF: {
SelfSignParams selfSignParams = nebulaClientOptions.getSelfSignParam();
SSLParam sslParam = new SelfSignedSSLParam(selfSignParams.getCrtFilePath(),
selfSignParams.getKeyFilePath(), selfSignParams.getPassword());
metaClient = new MetaClient(addresses, timeout, retry, retry, true, sslParam);
break;
}
default:
throw new IllegalArgumentException("ssl sign type is not supported.");
}
} else {
metaClient = new MetaClient(addresses, timeout, retry, retry);
}

metaClient.connect();
return metaClient;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
package org.apache.flink.connector.nebula.connection;


import com.vesoft.nebula.client.graph.data.CASignedSSLParam;
import com.vesoft.nebula.client.graph.data.HostAddress;
import com.vesoft.nebula.client.graph.data.SSLParam;
import com.vesoft.nebula.client.graph.data.SelfSignedSSLParam;
import com.vesoft.nebula.client.storage.StorageClient;
import java.io.Serializable;
import java.util.List;
Expand All @@ -26,7 +29,34 @@ public NebulaStorageConnectionProvider() {

public StorageClient getStorageClient() throws Exception {
List<HostAddress> addresses = nebulaClientOptions.getMetaAddress();
StorageClient storageClient = new StorageClient(addresses);
int timeout = nebulaClientOptions.getTimeout();
int retry = nebulaClientOptions.getConnectRetry();
StorageClient storageClient;
if (nebulaClientOptions.isEnableStorageSSL()) {
switch (nebulaClientOptions.getSSLSighType()) {
case CA: {
CASignParams caSignParams = nebulaClientOptions.getCaSignParam();
SSLParam sslParam = new CASignedSSLParam(caSignParams.getCaCrtFilePath(),
caSignParams.getCrtFilePath(), caSignParams.getKeyFilePath());
storageClient = new StorageClient(addresses, timeout, retry, retry, true,
sslParam);
break;
}
case SELF: {
SelfSignParams selfSignParams = nebulaClientOptions.getSelfSignParam();
SSLParam sslParam = new SelfSignedSSLParam(selfSignParams.getCrtFilePath(),
selfSignParams.getKeyFilePath(), selfSignParams.getPassword());
storageClient = new StorageClient(addresses, timeout, retry, retry, true,
sslParam);
break;
}
default:
throw new IllegalArgumentException("ssl sign type is not supported.");
}
} else {
storageClient = new StorageClient(addresses, timeout);
}

if (!storageClient.connect()) {
throw new Exception("failed to connect storaged.");
}
Expand Down
Loading

0 comments on commit d7be86b

Please sign in to comment.