diff --git a/Makefile b/Makefile index db1d8882..8534c3f9 100644 --- a/Makefile +++ b/Makefile @@ -71,7 +71,7 @@ $(GOBIN)/gofumpt: $(GOBIN)/golangci-lint: @[ -f $(GOBIN)/golangci-lint ] || { \ set -e ;\ - curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(GOBIN) v1.49.0 ;\ + curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(GOBIN) v1.51.2 ;\ } $(GOBIN)/mockgen: diff --git a/README.md b/README.md index 181a5873..50756ddf 100644 --- a/README.md +++ b/README.md @@ -86,6 +86,12 @@ client: address: "127.0.0.1:9669" user: root password: nebula + ssl: + enable: true + certPath: "your/cert/file/path" + keyPath: "your/key/file/path" + caPath: "your/ca/file/path" + insecureSkipVerify: false concurrencyPerAddress: 16 reconnectInitialInterval: 1s retry: 3 @@ -96,6 +102,12 @@ client: * `client.address`: **Required**. The address of graph in NebulaGraph. * `client.user`: **Optional**. The user of NebulaGraph. The default value is `root`. * `client.password`: **Optional**. The password of NebulaGraph. The default value is `nebula`. +* `client.ssl`: **Optional**. SSL related configuration. +* `client.ssl.enable`: **Optional**. Specifies whether to enable ssl authentication. The default value is `false`. +* `client.ssl.certPath`: **Required**. Specifies the path of the certificate file. +* `client.ssl.keyPath`: **Required**. Specifies the path of the private key file. +* `client.ssl.caPath`: **Required**. Specifies the path of the certification authority file. +* `client.ssl.insecureSkipVerify`: **Optional**. Specifies whether a client verifies the server's certificate chain and host name. The default value is `false`. * `client.concurrencyPerAddress`: **Optional**. The number of client connections to each graph in NebulaGraph. The default value is `10`. * `client.reconnectInitialInterval`: **Optional**. The initialization interval for reconnecting NebulaGraph. The default value is `1s`. * `client.retry`: **Optional**. The failed retrying times to execute nGQL queries in NebulaGraph client. The default value is `3`. diff --git a/docs/configuration-reference.md b/docs/configuration-reference.md index 23d11cc3..aa2ac99c 100644 --- a/docs/configuration-reference.md +++ b/docs/configuration-reference.md @@ -1,108 +1,114 @@ # NebulaGraph Importer Configuration Description -| options | description | default | -|:---------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------|:-----------------| -| client | The NebulaGraph client configuration options. | - | -| client.version | Specifies which version of NebulaGraph, currently only `v3` is supported. | - | -| client.address | The address of graph in NebulaGraph. | - | -| client.user | The user of NebulaGraph. | root | -| client.password | The password of NebulaGraph. | nebula | -| client.concurrencyPerAddress | The number of client connections to each graph in NebulaGraph. | 10 | -| client.reconnectInitialInterval | The initialization interval for reconnecting NebulaGraph. | 1s | -| client.retry | The failed retrying times to execute nGQL queries in NebulaGraph client. | 3 | -| client.retryInitialInterval | The initialization interval retrying. | 1s | -| | | | -| manager | The global control configuration options related to NebulaGraph Importer. | - | -| manager.spaceName | Specifies which space the data is imported into. | - | -| manager.batch | Specifies the batch size for all sources of the inserted data. | 128 | -| manager.readerConcurrency | Specifies the concurrency of reader to read from sources. | 50 | -| manager.importerConcurrency | Specifies the concurrency of generating statement, call client to import. | 512 | -| manager.statsInterval | Specifies the interval at which statistics are printed. | 10s | -| manager.hooks.before | Configures the statements before the import begins. | - | -| manager.hooks.before.[].statements | Defines the list of statements. | - | -| manager.hooks.before.[].wait | Defines the waiting time after executing the above statements. | - | -| manager.hooks.after | Configures the statements after the import is complete. | - | -| manager.hooks.after.[].statements | Defines the list of statements. | - | -| manager.hooks.after.[].wait | Defines the waiting time after executing the above statements. | - | -| | | | -| log | The log configuration options. | - | -| log.level | Specifies the log level. | "INFO" | -| log.console | Specifies whether to print logs to the console. | true | -| log.files | Specifies which files to print logs to. | - | -| | | | -| sources | The data sources to be imported | - | -| sources[].path | Local file path | - | -| sources[].s3.endpoint | The endpoint of s3 service. | - | -| sources[].s3.region | The region of s3 service. | - | -| sources[].s3.bucket | The bucket of file in s3 service. | - | -| sources[].s3.key | The object key of file in s3 service. | - | -| sources[].s3.accessKeyID | The `Access Key ID` of s3 service. | - | -| sources[].s3.accessKeySecret | The `Access Key Secret` of s3 service. | - | -| sources[].oss.endpoint | The endpoint of oss service. | - | -| sources[].oss.bucket | The bucket of file in oss service. | - | -| sources[].oss.key | The object key of file in oss service. | - | -| sources[].oss.accessKeyID | The `Access Key ID` of oss service. | - | -| sources[].oss.accessKeySecret | The `Access Key Secret` of oss service. | - | -| sources[].ftp.host | The host of ftp service. | - | -| sources[].ftp.host | The port of ftp service. | - | -| sources[].ftp.user | The user of ftp service. | - | -| sources[].ftp.password | The password of ftp service. | - | -| sources[].ftp.path | The path of file in the ftp service. | - | -| sources[].sftp.host | The host of sftp service. | - | -| sources[].sftp.host | The port of sftp service. | - | -| sources[].sftp.user | The user of sftp service. | - | -| sources[].sftp.password | The password of sftp service. | - | -| sources[].sftp.keyFile | The ssh key file path of sftp service. | - | -| sources[].sftp.keyData | The ssh key file content of sftp service. | - | -| sources[].sftp.passphrase | The ssh key passphrase of sftp service. | - | -| sources[].sftp.path | The path of file in the ftp service. | - | -| sources[].hdfs.address | The address of hdfs service. | - | -| sources[].hdfs.user | The user of hdfs service. | - | -| sources[].hdfs.servicePrincipalName | The kerberos service principal name of hdfs service when enable kerberos. | - | -| sources[].hdfs.krb5ConfigFile | The kerberos config file of hdfs service when enable kerberos. | "/etc/krb5.conf" | -| sources[].hdfs.ccacheFile | The ccache file of hdfs service when enable kerberos. | - | -| sources[].hdfs.keyTabFile | The keytab file of hdfs service when enable kerberos. | - | -| sources[].hdfs.password | The kerberos password of hdfs service when enable kerberos. | - | -| sources[].hdfs.dataTransferProtection | The data transfer protection of hdfs service. | - | -| sources[].hdfs.disablePAFXFAST | Whether to prohibit the client to use PA_FX_FAST. | - | -| sources[].hdfs.path | The path of file in the sftp service. | - | -| sources[].gcs.endpoint | The endpoint of GCS service. | - | -| sources[].gcs.bucket | The bucket of file in GCS service. | - | -| sources[].gcs.key | The object key of file in GCS service. | - | -| sources[].gcs.credentialsFile | Path to the service account or refresh token JSON credentials file. Not required for public data. | - | -| sources[].gcs.credentialsJSON | Content of the service account or refresh token JSON credentials file. Not required for public data. | - | -| sources[].batch | Specifies the batch size for this source of the inserted data. | - | -| sources[].csv | Describes the csv file format information. | - | -| sources[].csv.delimiter | Specifies the delimiter for the CSV files. | "," | -| sources[].csv.withHeader | Specifies whether to ignore the first record in csv file. | false | -| sources[].csv.lazyQuotes | Specifies lazy quotes of csv file. | false | -| sources[].tags | Describes the schema definition for tags. | - | -| sources[].tags[].name | The tag name. | - | -| sources[].tags[].mode | The mode for processing data, one of `INSERT`, `UPDATE` or `DELETE`. | - | -| sources[].tags[].filter | The data filtering configuration. | - | -| sources[].tags[].filter.expr | The filter expression. | - | -| sources[].tags[].id | Describes the tag ID information. | - | -| sources[].tags[].id.type | The type for ID | "STRING" | -| sources[].tags[].id.index | The column number in the records. | - | -| sources[].tags[].id.concatItems | The concat items to generate for IDs. | - | -| sources[].tags[].id.function | Function to generate the IDs. | - | -| sources[].tags[].ignoreExistedIndex | Specifies whether to enable `IGNORE_EXISTED_INDEX`. | true | -| sources[].tags[].props | Describes the tag props definition. | - | -| sources[].tags[].props[].name | The property name, must be the same with the tag property in NebulaGraph. | - | -| sources[].tags[].props[].type | The property type. | - | -| sources[].tags[].props[].index | The column number in the records. | - | -| sources[].tags[].props[].nullable | Whether this prop property can be `NULL`. | false | -| sources[].tags[].props[].nullValue | The value used to determine whether it is a `NULL`. | "" | -| sources[].tags[].props[].alternativeIndices | The alternative indices. | - | -| sources[].tags[].props[].defaultValue | The property default value. | - | -| sources[].edges | Describes the schema definition for edges. | - | -| sources[].edges[].name | The edge name. | - | -| sources[].tags[].mode | The `mode` here is similar to `mode` in the `tags` above. | - | -| sources[].tags[].filter | The `filter` here is similar to `filter` in the `tags` above. | - | -| sources[].edges[].src | Describes the source definition for the edge. | - | -| sources[].edges[].src.id | The `id` here is similar to `id` in the `tags` above. | - | -| sources[].edges[].dst | Describes the destination definition for the edge. | - | -| sources[].edges[].dst.id | The `id` here is similar to `id` in the `tags` above. | - | -| sources[].edges[].rank | Describes the rank definition for the edge. | - | -| sources[].edges[].rank.index | The column number in the records. | - | -| sources[].edges[].props | Similar to the `props` in the `tags`, but for edges. | - | +| options | description | default | +|:--------------------------------------------|:-----------------------------------------------------------------------------------------------------|:-----------------| +| client | The NebulaGraph client configuration options. | - | +| client.version | Specifies which version of NebulaGraph, currently only `v3` is supported. | - | +| client.address | The address of graph in NebulaGraph. | - | +| client.user | The user of NebulaGraph. | root | +| client.password | The password of NebulaGraph. | nebula | +| client.ssl | SSL related configuration. | nebula | +| client.ssl.enable | Specifies whether to enable ssl authentication. | false | +| client.ssl.certPath | Specifies the path of the certificate file. | - | +| client.ssl.keyPath | Specifies the path of the private key file. | - | +| client.ssl.caPath | Specifies the path of the certification authority file. | - | +| client.ssl.insecureSkipVerify | Specifies whether a client verifies the server's certificate chain and host name. | false | +| client.concurrencyPerAddress | The number of client connections to each graph in NebulaGraph. | 10 | +| client.reconnectInitialInterval | The initialization interval for reconnecting NebulaGraph. | 1s | +| client.retry | The failed retrying times to execute nGQL queries in NebulaGraph client. | 3 | +| client.retryInitialInterval | The initialization interval retrying. | 1s | +| | | | +| manager | The global control configuration options related to NebulaGraph Importer. | - | +| manager.spaceName | Specifies which space the data is imported into. | - | +| manager.batch | Specifies the batch size for all sources of the inserted data. | 128 | +| manager.readerConcurrency | Specifies the concurrency of reader to read from sources. | 50 | +| manager.importerConcurrency | Specifies the concurrency of generating statement, call client to import. | 512 | +| manager.statsInterval | Specifies the interval at which statistics are printed. | 10s | +| manager.hooks.before | Configures the statements before the import begins. | - | +| manager.hooks.before.[].statements | Defines the list of statements. | - | +| manager.hooks.before.[].wait | Defines the waiting time after executing the above statements. | - | +| manager.hooks.after | Configures the statements after the import is complete. | - | +| manager.hooks.after.[].statements | Defines the list of statements. | - | +| manager.hooks.after.[].wait | Defines the waiting time after executing the above statements. | - | +| | | | +| log | The log configuration options. | - | +| log.level | Specifies the log level. | "INFO" | +| log.console | Specifies whether to print logs to the console. | true | +| log.files | Specifies which files to print logs to. | - | +| | | | +| sources | The data sources to be imported | - | +| sources[].path | Local file path | - | +| sources[].s3.endpoint | The endpoint of s3 service. | - | +| sources[].s3.region | The region of s3 service. | - | +| sources[].s3.bucket | The bucket of file in s3 service. | - | +| sources[].s3.key | The object key of file in s3 service. | - | +| sources[].s3.accessKeyID | The `Access Key ID` of s3 service. | - | +| sources[].s3.accessKeySecret | The `Access Key Secret` of s3 service. | - | +| sources[].oss.endpoint | The endpoint of oss service. | - | +| sources[].oss.bucket | The bucket of file in oss service. | - | +| sources[].oss.key | The object key of file in oss service. | - | +| sources[].oss.accessKeyID | The `Access Key ID` of oss service. | - | +| sources[].oss.accessKeySecret | The `Access Key Secret` of oss service. | - | +| sources[].ftp.host | The host of ftp service. | - | +| sources[].ftp.host | The port of ftp service. | - | +| sources[].ftp.user | The user of ftp service. | - | +| sources[].ftp.password | The password of ftp service. | - | +| sources[].ftp.path | The path of file in the ftp service. | - | +| sources[].sftp.host | The host of sftp service. | - | +| sources[].sftp.host | The port of sftp service. | - | +| sources[].sftp.user | The user of sftp service. | - | +| sources[].sftp.password | The password of sftp service. | - | +| sources[].sftp.keyFile | The ssh key file path of sftp service. | - | +| sources[].sftp.keyData | The ssh key file content of sftp service. | - | +| sources[].sftp.passphrase | The ssh key passphrase of sftp service. | - | +| sources[].sftp.path | The path of file in the ftp service. | - | +| sources[].hdfs.address | The address of hdfs service. | - | +| sources[].hdfs.user | The user of hdfs service. | - | +| sources[].hdfs.servicePrincipalName | The kerberos service principal name of hdfs service when enable kerberos. | - | +| sources[].hdfs.krb5ConfigFile | The kerberos config file of hdfs service when enable kerberos. | "/etc/krb5.conf" | +| sources[].hdfs.ccacheFile | The ccache file of hdfs service when enable kerberos. | - | +| sources[].hdfs.keyTabFile | The keytab file of hdfs service when enable kerberos. | - | +| sources[].hdfs.password | The kerberos password of hdfs service when enable kerberos. | - | +| sources[].hdfs.dataTransferProtection | The data transfer protection of hdfs service. | - | +| sources[].hdfs.disablePAFXFAST | Whether to prohibit the client to use PA_FX_FAST. | - | +| sources[].hdfs.path | The path of file in the sftp service. | - | +| sources[].gcs.endpoint | The endpoint of GCS service. | - | +| sources[].gcs.bucket | The bucket of file in GCS service. | - | +| sources[].gcs.key | The object key of file in GCS service. | - | +| sources[].gcs.credentialsFile | Path to the service account or refresh token JSON credentials file. Not required for public data. | - | +| sources[].gcs.credentialsJSON | Content of the service account or refresh token JSON credentials file. Not required for public data. | - | +| sources[].batch | Specifies the batch size for this source of the inserted data. | - | +| sources[].csv | Describes the csv file format information. | - | +| sources[].csv.delimiter | Specifies the delimiter for the CSV files. | "," | +| sources[].csv.withHeader | Specifies whether to ignore the first record in csv file. | false | +| sources[].csv.lazyQuotes | Specifies lazy quotes of csv file. | false | +| sources[].tags | Describes the schema definition for tags. | - | +| sources[].tags[].name | The tag name. | - | +| sources[].tags[].mode | The mode for processing data, one of `INSERT`, `UPDATE` or `DELETE`. | - | +| sources[].tags[].filter | The data filtering configuration. | - | +| sources[].tags[].filter.expr | The filter expression. | - | +| sources[].tags[].id | Describes the tag ID information. | - | +| sources[].tags[].id.type | The type for ID | "STRING" | +| sources[].tags[].id.index | The column number in the records. | - | +| sources[].tags[].id.concatItems | The concat items to generate for IDs. | - | +| sources[].tags[].id.function | Function to generate the IDs. | - | +| sources[].tags[].ignoreExistedIndex | Specifies whether to enable `IGNORE_EXISTED_INDEX`. | true | +| sources[].tags[].props | Describes the tag props definition. | - | +| sources[].tags[].props[].name | The property name, must be the same with the tag property in NebulaGraph. | - | +| sources[].tags[].props[].type | The property type. | - | +| sources[].tags[].props[].index | The column number in the records. | - | +| sources[].tags[].props[].nullable | Whether this prop property can be `NULL`. | false | +| sources[].tags[].props[].nullValue | The value used to determine whether it is a `NULL`. | "" | +| sources[].tags[].props[].alternativeIndices | The alternative indices. | - | +| sources[].tags[].props[].defaultValue | The property default value. | - | +| sources[].edges | Describes the schema definition for edges. | - | +| sources[].edges[].name | The edge name. | - | +| sources[].tags[].mode | The `mode` here is similar to `mode` in the `tags` above. | - | +| sources[].tags[].filter | The `filter` here is similar to `filter` in the `tags` above. | - | +| sources[].edges[].src | Describes the source definition for the edge. | - | +| sources[].edges[].src.id | The `id` here is similar to `id` in the `tags` above. | - | +| sources[].edges[].dst | Describes the destination definition for the edge. | - | +| sources[].edges[].dst.id | The `id` here is similar to `id` in the `tags` above. | - | +| sources[].edges[].rank | Describes the rank definition for the edge. | - | +| sources[].edges[].rank.index | The column number in the records. | - | +| sources[].edges[].props | Similar to the `props` in the `tags`, but for edges. | - | diff --git a/go.mod b/go.mod index 705fb53a..8e006db4 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/spf13/afero v1.9.3 github.com/spf13/cobra v1.6.1 github.com/valyala/bytebufferpool v1.0.0 - github.com/vesoft-inc/nebula-go/v3 v3.5.0 + github.com/vesoft-inc/nebula-go/v3 v3.6.1 go.uber.org/zap v1.23.0 golang.org/x/crypto v0.7.0 google.golang.org/api v0.114.0 @@ -35,7 +35,6 @@ require ( cloud.google.com/go/compute v1.18.0 // indirect cloud.google.com/go/compute/metadata v0.2.3 // indirect cloud.google.com/go/iam v0.12.0 // indirect - github.com/facebook/fbthrift v0.31.1-0.20211129061412-801ed7f9f295 // indirect github.com/fclairamb/go-log v0.4.1 // indirect github.com/go-logr/logr v1.2.3 // indirect github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect @@ -57,6 +56,7 @@ require ( github.com/kr/fs v0.1.0 // indirect github.com/kr/text v0.2.0 // indirect github.com/spf13/pflag v1.0.5 // indirect + github.com/vesoft-inc/fbthrift v0.0.0-20230214024353-fa2f34755b28 // indirect go.opencensus.io v0.24.0 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect diff --git a/go.sum b/go.sum index 822b8150..bbdf5d51 100644 --- a/go.sum +++ b/go.sum @@ -83,8 +83,6 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po= github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= -github.com/facebook/fbthrift v0.31.1-0.20211129061412-801ed7f9f295 h1:ZA+qQ3d2In0RNzVpk+D/nq1sjDSv+s1Wy2zrAPQAmsg= -github.com/facebook/fbthrift v0.31.1-0.20211129061412-801ed7f9f295/go.mod h1:2tncLx5rmw69e5kMBv/yJneERbzrr1yr5fdlnTbu8lU= github.com/fclairamb/ftpserverlib v0.21.0 h1:QO4ex827FU6Y7FNi1cj4dmAs6bcmy+UtWcX5yzVzFAw= github.com/fclairamb/ftpserverlib v0.21.0/go.mod h1:03sR5yGPYyUH/8hFKML02SVNLY7A//3qIy0q0ZJGhTw= github.com/fclairamb/go-log v0.4.1 h1:rLtdSG9x2pK41AIAnE8WYpl05xBJfw1ZyYxZaXFcBsM= @@ -255,8 +253,10 @@ github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKs github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -github.com/vesoft-inc/nebula-go/v3 v3.5.0 h1:2ZSkoBxtIfs15AXJXqrAPDPd0Z9HrzKR7YKXPqlJcR0= -github.com/vesoft-inc/nebula-go/v3 v3.5.0/go.mod h1:+sXv05jYQBARdTbTcIEsWVXCnF/6ttOlDK35xQ6m54s= +github.com/vesoft-inc/fbthrift v0.0.0-20230214024353-fa2f34755b28 h1:gpoPCGeOEuk/TnoY9nLVK1FoBM5ie7zY3BPVG8q43ME= +github.com/vesoft-inc/fbthrift v0.0.0-20230214024353-fa2f34755b28/go.mod h1:xu7e9za8StcJhBZmCDwK1Hyv4/Y0xFsjS+uqp10ECJg= +github.com/vesoft-inc/nebula-go/v3 v3.6.1 h1:RHdt8WC+jmrRqM9r9WWzz4tzM8VrykPHe9RhtLZjSVA= +github.com/vesoft-inc/nebula-go/v3 v3.6.1/go.mod h1:mjMPlpNKnHYhe1pWz4caT7x9R+wKoX7dIm6u1+Rdcws= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -361,6 +361,7 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= +golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -429,11 +430,13 @@ golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ= golang.org/x/term v0.8.0 h1:n5xxQn2i3PC0yLAbjTpNT85q/Kgzcr2gIoX9OrJUols= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -444,6 +447,7 @@ golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/pkg/client/option.go b/pkg/client/option.go index 9a582195..ac1f72f2 100644 --- a/pkg/client/option.go +++ b/pkg/client/option.go @@ -1,6 +1,7 @@ package client import ( + "crypto/tls" "strings" "time" @@ -30,6 +31,7 @@ type ( addresses []string user string password string + tlsConfig *tls.Config retry int retryInitialInterval time.Duration logger logger.Logger @@ -44,98 +46,104 @@ type ( ) func WithV3() Option { - return func(c *options) { + return func(o *options) { WithNewSessionFunc(func(hostAddress HostAddress) Session { - return newSessionV3(hostAddress, c.user, c.password, c.logger) - })(c) + return newSessionV3(hostAddress, o.user, o.password, o.tlsConfig.Clone(), o.logger) + })(o) } } func WithAddress(addresses ...string) Option { - return func(c *options) { + return func(o *options) { for _, addr := range addresses { if strings.IndexByte(addr, ',') != -1 { - c.addresses = append(c.addresses, strings.Split(addr, ",")...) + o.addresses = append(o.addresses, strings.Split(addr, ",")...) } else { - c.addresses = append(c.addresses, addr) + o.addresses = append(o.addresses, addr) } } } } func WithUser(user string) Option { - return func(c *options) { - c.user = user + return func(o *options) { + o.user = user } } func WithPassword(password string) Option { - return func(c *options) { - c.password = password + return func(o *options) { + o.password = password } } func WithUserPassword(user, password string) Option { - return func(c *options) { - WithUser(user)(c) - WithPassword(password)(c) + return func(o *options) { + WithUser(user)(o) + WithPassword(password)(o) + } +} + +func WithTLSConfig(tlsConfig *tls.Config) Option { + return func(o *options) { + o.tlsConfig = tlsConfig } } func WithRetry(retry int) Option { - return func(c *options) { + return func(o *options) { if retry > 0 { - c.retry = retry + o.retry = retry } } } func WithRetryInitialInterval(interval time.Duration) Option { - return func(c *options) { + return func(o *options) { if interval > 0 { - c.retryInitialInterval = interval + o.retryInitialInterval = interval } } } func WithLogger(l logger.Logger) Option { - return func(m *options) { - m.logger = l + return func(o *options) { + o.logger = l } } func WithNewSessionFunc(fn NewSessionFunc) Option { - return func(m *options) { - m.fnNewSession = fn + return func(o *options) { + o.fnNewSession = fn } } func WithClientInitFunc(fn func(Client) error) Option { - return func(c *options) { - c.clientInitFunc = fn + return func(o *options) { + o.clientInitFunc = fn } } func WithReconnectInitialInterval(interval time.Duration) Option { - return func(c *options) { + return func(o *options) { if interval > 0 { - c.reconnectInitialInterval = interval + o.reconnectInitialInterval = interval } } } func WithConcurrencyPerAddress(concurrencyPerAddress int) Option { - return func(c *options) { + return func(o *options) { if concurrencyPerAddress > 0 { - c.concurrencyPerAddress = concurrencyPerAddress + o.concurrencyPerAddress = concurrencyPerAddress } } } func WithQueueSize(queueSize int) Option { - return func(c *options) { + return func(o *options) { if queueSize > 0 { - c.queueSize = queueSize + o.queueSize = queueSize } } } diff --git a/pkg/client/option_test.go b/pkg/client/option_test.go index ad11f018..880f3dfe 100644 --- a/pkg/client/option_test.go +++ b/pkg/client/option_test.go @@ -1,6 +1,8 @@ package client import ( + "crypto/tls" + "github.com/vesoft-inc/nebula-importer/v4/pkg/logger" . "github.com/onsi/ginkgo/v2" @@ -15,6 +17,7 @@ var _ = Describe("Option", func() { Expect(o.addresses).To(Equal([]string(nil))) Expect(o.user).To(Equal(DefaultUser)) Expect(o.password).To(Equal(DefaultPassword)) + Expect(o.tlsConfig).To(BeNil()) Expect(o.retry).To(Equal(DefaultRetry)) Expect(o.retryInitialInterval).To(Equal(DefaultRetryInitialInterval)) Expect(o.logger).NotTo(BeNil()) @@ -29,6 +32,7 @@ var _ = Describe("Option", func() { Expect(o1.addresses).To(Equal([]string(nil))) Expect(o1.user).To(Equal(DefaultUser)) Expect(o1.password).To(Equal(DefaultPassword)) + Expect(o1.tlsConfig).To(BeNil()) Expect(o1.retry).To(Equal(DefaultRetry)) Expect(o1.retryInitialInterval).To(Equal(DefaultRetryInitialInterval)) Expect(o1.logger).NotTo(BeNil()) @@ -53,6 +57,8 @@ var _ = Describe("Option", func() { WithUser("u0"), WithPassword("p0"), WithUserPassword("newUser", "newPassword"), + WithTLSConfig(&tls.Config{}), //nolint:gosec + WithTLSConfig(&tls.Config{InsecureSkipVerify: true}), //nolint:gosec WithRetry(DefaultRetry-1), WithRetry(DefaultRetry+1), WithRetryInitialInterval(DefaultRetryInitialInterval-1), @@ -78,6 +84,8 @@ var _ = Describe("Option", func() { })) Expect(o.user).To(Equal("newUser")) Expect(o.password).To(Equal("newPassword")) + Expect(o.tlsConfig).NotTo(BeNil()) + Expect(o.tlsConfig.InsecureSkipVerify).To(BeTrue()) Expect(o.retry).To(Equal(DefaultRetry + 1)) Expect(o.retryInitialInterval).To(Equal(DefaultRetryInitialInterval + 1)) Expect(o.logger).NotTo(BeNil()) @@ -100,6 +108,8 @@ var _ = Describe("Option", func() { })) Expect(o1.user).To(Equal("newUser")) Expect(o1.password).To(Equal("newPassword")) + Expect(o1.tlsConfig).NotTo(BeNil()) + Expect(o1.tlsConfig.InsecureSkipVerify).To(BeTrue()) Expect(o1.retry).To(Equal(DefaultRetry + 1)) Expect(o1.retryInitialInterval).To(Equal(DefaultRetryInitialInterval + 1)) Expect(o1.logger).NotTo(BeNil()) diff --git a/pkg/client/session_v3.go b/pkg/client/session_v3.go index 76a3e388..0151012e 100644 --- a/pkg/client/session_v3.go +++ b/pkg/client/session_v3.go @@ -2,6 +2,7 @@ package client import ( + "crypto/tls" "fmt" "time" @@ -15,11 +16,12 @@ type ( hostAddress nebula.HostAddress user string password string + tlsConfig *tls.Config logger logger.Logger } ) -func newSessionV3(hostAddress HostAddress, user, password string, l logger.Logger) Session { +func newSessionV3(hostAddress HostAddress, user, password string, tlsConfig *tls.Config, l logger.Logger) Session { if l == nil { l = logger.NopLogger } @@ -28,19 +30,21 @@ func newSessionV3(hostAddress HostAddress, user, password string, l logger.Logge Host: hostAddress.Host, Port: hostAddress.Port, }, - user: user, - password: password, - logger: l, + user: user, + password: password, + tlsConfig: tlsConfig, + logger: l, } } func (s *defaultSessionV3) Open() error { hostAddress := s.hostAddress - pool, err := nebula.NewConnectionPool( + pool, err := nebula.NewSslConnectionPool( []nebula.HostAddress{hostAddress}, nebula.PoolConfig{ MaxConnPoolSize: 1, }, + s.tlsConfig, newNebulaLogger(s.logger.With(logger.Field{ Key: "address", Value: fmt.Sprintf("%s:%d", hostAddress.Host, hostAddress.Port), diff --git a/pkg/client/session_v3_test.go b/pkg/client/session_v3_test.go index b441b7ec..37f2d678 100644 --- a/pkg/client/session_v3_test.go +++ b/pkg/client/session_v3_test.go @@ -15,14 +15,14 @@ import ( var _ = Describe("SessionV3", func() { It("success", func() { - session := newSessionV3(HostAddress{}, "user", "password", nil) + session := newSessionV3(HostAddress{}, "user", "password", nil, nil) pool := &nebula.ConnectionPool{} nSession := &nebula.Session{} patches := gomonkey.NewPatches() defer patches.Reset() - patches.ApplyFuncReturn(nebula.NewConnectionPool, pool, nil) + patches.ApplyFuncReturn(nebula.NewSslConnectionPool, pool, nil) patches.ApplyMethodReturn(pool, "GetSession", nSession, nil) patches.ApplyMethodReturn(nSession, "Execute", &nebula.ResultSet{}, nil) @@ -39,21 +39,21 @@ var _ = Describe("SessionV3", func() { }) It("failed", func() { - session := newSessionV3(HostAddress{}, "user", "password", logger.NopLogger) + session := newSessionV3(HostAddress{}, "user", "password", nil, logger.NopLogger) pool := &nebula.ConnectionPool{} nSession := &nebula.Session{} patches := gomonkey.NewPatches() defer patches.Reset() - patches.ApplyFuncReturn(nebula.NewConnectionPool, nil, stderrors.New("new connection pool failed")) + patches.ApplyFuncReturn(nebula.NewSslConnectionPool, nil, stderrors.New("new connection pool failed")) err := session.Open() Expect(err).To(HaveOccurred()) patches.Reset() - patches.ApplyFuncReturn(nebula.NewConnectionPool, pool, nil) + patches.ApplyFuncReturn(nebula.NewSslConnectionPool, pool, nil) patches.ApplyMethodReturn(pool, "GetSession", nil, stderrors.New("get session failed")) err = session.Open() @@ -61,7 +61,7 @@ var _ = Describe("SessionV3", func() { patches.Reset() - patches.ApplyFuncReturn(nebula.NewConnectionPool, pool, nil) + patches.ApplyFuncReturn(nebula.NewSslConnectionPool, pool, nil) patches.ApplyMethodReturn(pool, "GetSession", nSession, nil) patches.ApplyMethodReturn(nSession, "Execute", nil, stderrors.New("execute failed")) diff --git a/pkg/config/base/client.go b/pkg/config/base/client.go index dcec6e21..612d45d2 100644 --- a/pkg/config/base/client.go +++ b/pkg/config/base/client.go @@ -1,10 +1,15 @@ package configbase import ( + "crypto/tls" + "crypto/x509" + "os" + "path/filepath" "time" "github.com/vesoft-inc/nebula-importer/v4/pkg/client" "github.com/vesoft-inc/nebula-importer/v4/pkg/errors" + "github.com/vesoft-inc/nebula-importer/v4/pkg/utils" ) var newClientPool = client.NewPool @@ -14,26 +19,59 @@ const ( ClientVersionDefault = ClientVersion3 ) -type Client struct { - Version string `yaml:"version"` - Address string `yaml:"address"` - User string `yaml:"user,omitempty"` - Password string `yaml:"password,omitempty"` - ConcurrencyPerAddress int `yaml:"concurrencyPerAddress,omitempty"` - ReconnectInitialInterval time.Duration `yaml:"reconnectInitialInterval,omitempty"` - Retry int `yaml:"retry,omitempty"` - RetryInitialInterval time.Duration `yaml:"retryInitialInterval,omitempty"` +type ( + Client struct { + Version string `yaml:"version"` + Address string `yaml:"address"` + User string `yaml:"user,omitempty"` + Password string `yaml:"password,omitempty"` + ConcurrencyPerAddress int `yaml:"concurrencyPerAddress,omitempty"` + ReconnectInitialInterval time.Duration `yaml:"reconnectInitialInterval,omitempty"` + Retry int `yaml:"retry,omitempty"` + RetryInitialInterval time.Duration `yaml:"retryInitialInterval,omitempty"` + SSL *SSL `yaml:"ssl,omitempty"` + } + + SSL struct { + Enable bool `yaml:"enable,omitempty"` + CertPath string `yaml:"certPath,omitempty"` + KeyPath string `yaml:"keyPath,omitempty"` + CAPath string `yaml:"caPath,omitempty"` + InsecureSkipVerify bool `yaml:"insecureSkipVerify,omitempty"` + } +) + +// OptimizePath optimizes relative paths base to the configuration file path +func (c *Client) OptimizePath(configPath string) error { + if c == nil { + return nil + } + + if c.SSL != nil && c.SSL.Enable { + configPathDir := filepath.Dir(configPath) + c.SSL.CertPath = utils.RelativePathBaseOn(configPathDir, c.SSL.CertPath) + c.SSL.KeyPath = utils.RelativePathBaseOn(configPathDir, c.SSL.KeyPath) + c.SSL.CAPath = utils.RelativePathBaseOn(configPathDir, c.SSL.CAPath) + } + + return nil } func (c *Client) BuildClientPool(opts ...client.Option) (client.Pool, error) { if c.Version == "" { c.Version = ClientVersion3 } - options := make([]client.Option, 0, 7+len(opts)) + tlsConfig, err := c.SSL.BuildConfig() + if err != nil { + return nil, err + } + + options := make([]client.Option, 0, 8+len(opts)) options = append( options, client.WithAddress(c.Address), client.WithUserPassword(c.User, c.Password), + client.WithTLSConfig(tlsConfig), client.WithReconnectInitialInterval(c.ReconnectInitialInterval), client.WithRetry(c.Retry), client.WithRetryInitialInterval(c.RetryInitialInterval), @@ -49,3 +87,31 @@ func (c *Client) BuildClientPool(opts ...client.Option) (client.Pool, error) { pool := newClientPool(options...) return pool, nil } + +func (s *SSL) BuildConfig() (*tls.Config, error) { + if s == nil || !s.Enable { + return nil, nil + } + + tlsConfig := &tls.Config{ + Certificates: make([]tls.Certificate, 1), + InsecureSkipVerify: s.InsecureSkipVerify, //nolint:gosec + } + + rootPEM, err := os.ReadFile(s.CAPath) + if err != nil { + return nil, err + } + + rootCAs := x509.NewCertPool() + rootCAs.AppendCertsFromPEM(rootPEM) + tlsConfig.RootCAs = rootCAs + + cert, err := tls.LoadX509KeyPair(s.CertPath, s.KeyPath) + if err != nil { + return nil, err + } + tlsConfig.Certificates[0] = cert + + return tlsConfig, nil +} diff --git a/pkg/config/base/client_test.go b/pkg/config/base/client_test.go index eb5d33fc..38a2061d 100644 --- a/pkg/config/base/client_test.go +++ b/pkg/config/base/client_test.go @@ -64,29 +64,64 @@ var _ = Describe("Client", func() { ) }) - DescribeTable("OptimizeFiles", - func(configPath string, files, expectFiles []string) { - l := &Log{ - Files: files, + It(".OptimizePath nil", func() { + var c *Client + Expect(c.OptimizePath("")).NotTo(HaveOccurred()) + }) + + DescribeTable("OptimizePath ssl", + func(configPath string, ssl, expectSSL *SSL) { + c := Client{ + SSL: ssl, } - Expect(l.OptimizeFiles(configPath)).NotTo(HaveOccurred()) - Expect(l.Files).To(Equal(expectFiles)) + Expect(c.OptimizePath(configPath)).NotTo(HaveOccurred()) + Expect(c.SSL).To(Equal(expectSSL)) }, EntryDescription("%[1]s : %[2]v => %[3]v"), Entry(nil, "f.yaml", nil, nil), - Entry(nil, "./f.yaml", []string{"1.log"}, []string{"1.log"}), - Entry(nil, "f.yaml", []string{"1.log", "2.log"}, []string{"1.log", "2.log"}), - Entry(nil, "./f.yaml", []string{"d10/1.log", "./d20/2.log"}, []string{"d10/1.log", "d20/2.log"}), - - Entry(nil, "./d1/f.yaml", nil, nil), - Entry(nil, "d1/f.yaml", []string{"1.log"}, []string{"d1/1.log"}), - Entry(nil, "./d1/f.yaml", []string{"1.log", "2.log"}, []string{"d1/1.log", "d1/2.log"}), - Entry(nil, "d1/f.yaml", []string{"d10/1.log", "./d20/2.log"}, []string{"d1/d10/1.log", "d1/d20/2.log"}), - - Entry(nil, "./d1/f.yaml", nil, nil), - Entry(nil, "d1/f.yaml", []string{"/1.log"}, []string{"/1.log"}), - Entry(nil, "./d1/f.yaml", []string{"/1.log", "/2.log"}, []string{"/1.log", "/2.log"}), - Entry(nil, "d1/f.yaml", []string{"/d10/1.log", "/d20/2.log"}, []string{"/d10/1.log", "/d20/2.log"}), + Entry(nil, "f.yaml", &SSL{}, &SSL{}), + Entry(nil, "./f.yaml", + &SSL{ + Enable: true, + CertPath: "cert.crt", + KeyPath: "d10/cert.key", + CAPath: "d20/ca.crt", + }, + &SSL{ + Enable: true, + CertPath: "cert.crt", + KeyPath: "d10/cert.key", + CAPath: "d20/ca.crt", + }, + ), + Entry(nil, "./d1/f.yaml", + &SSL{ + Enable: true, + CertPath: "cert.crt", + KeyPath: "d10/cert.key", + CAPath: "d20/ca.crt", + }, + &SSL{ + Enable: true, + CertPath: "d1/cert.crt", + KeyPath: "d1/d10/cert.key", + CAPath: "d1/d20/ca.crt", + }, + ), + Entry(nil, "./d1/f.yaml", + &SSL{ + Enable: true, + CertPath: "/cert.crt", + KeyPath: "/d10/cert.key", + CAPath: "/d20/ca.crt", + }, + &SSL{ + Enable: true, + CertPath: "/cert.crt", + KeyPath: "/d10/cert.key", + CAPath: "/d20/ca.crt", + }, + ), ) }) diff --git a/pkg/config/base/log.go b/pkg/config/base/log.go index 04f0c12f..53834b5c 100644 --- a/pkg/config/base/log.go +++ b/pkg/config/base/log.go @@ -14,8 +14,8 @@ type Log struct { Fields logger.Fields `yaml:"fields,omitempty"` } -// OptimizeFiles optimizes relative files path base to the configuration file path -func (l *Log) OptimizeFiles(configPath string) error { +// OptimizePath optimizes relative paths base to the configuration file path +func (l *Log) OptimizePath(configPath string) error { if l == nil { return nil } diff --git a/pkg/config/base/log_test.go b/pkg/config/base/log_test.go index c7cdb399..d702dd2a 100644 --- a/pkg/config/base/log_test.go +++ b/pkg/config/base/log_test.go @@ -60,17 +60,17 @@ var _ = Describe("Log", func() { }) }) - It(".OptimizeFiles nil", func() { + It(".OptimizePath nil", func() { var configLog *Log - Expect(configLog.OptimizeFiles("")).NotTo(HaveOccurred()) + Expect(configLog.OptimizePath("")).NotTo(HaveOccurred()) }) - DescribeTable(".OptimizeFiles", + DescribeTable(".OptimizePath", func(configPath string, files, expectFiles []string) { l := &Log{ Files: files, } - Expect(l.OptimizeFiles(configPath)).NotTo(HaveOccurred()) + Expect(l.OptimizePath(configPath)).NotTo(HaveOccurred()) Expect(l.Files).To(Equal(expectFiles)) }, EntryDescription("%[1]s : %[2]v => %[3]v"), diff --git a/pkg/config/v3/config.go b/pkg/config/v3/config.go index 30c70990..15b4a620 100644 --- a/pkg/config/v3/config.go +++ b/pkg/config/v3/config.go @@ -29,7 +29,11 @@ type ( ) func (c *Config) Optimize(configPath string) error { - if err := c.Log.OptimizeFiles(configPath); err != nil { + if err := c.Client.OptimizePath(configPath); err != nil { + return err + } + + if err := c.Log.OptimizePath(configPath); err != nil { return err }