Skip to content

Commit

Permalink
feat: support gcs (#278)
Browse files Browse the repository at this point in the history
* Support Google cloud storage service

* Add example config file for gcs source

* Apply WithEndpoint to specify GCS emulator endpoint

* Trim leading slashes in object path
  • Loading branch information
OldPanda authored Jun 21, 2023
1 parent dd462b5 commit 61c3b00
Show file tree
Hide file tree
Showing 9 changed files with 339 additions and 9 deletions.
25 changes: 22 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@

## Features

* Support multiple data sources, currently supports `local`, `s3`, `oss`, `ftp`, `sftp`, `hdfs`.
* Support multiple data sources, currently supports `local`, `s3`, `oss`, `ftp`, `sftp`, `hdfs`, and `gcs`.
* Support multiple file formats, currently only `csv` files are supported.
* Support files containing multiple tags, multiple edges, and a mixture of both.
* Support data transformations.
* Support record filtering.
* Support multiple modes, including `INSERT`, `UPDATE`, `DELETE`.
* Support connect multiple Garph with automatically load balance.
* Support connect multiple Graph with automatically load balance.
* Support retry after failure.
* Humanized status printing.

Expand Down Expand Up @@ -160,7 +160,7 @@ log:
The following are the relevant configuration items.

* `batch` specifies the batch size for this source of the inserted data. The priority is greater than `manager.batch`.
* `path`, `s3`, `oss`, `ftp`, `sftp`, `hdfs` are information configurations of various data sources, and only one of them can be configured.
* `path`, `s3`, `oss`, `ftp`, `sftp`, `hdfs`, and `gcs` are information configurations of various data sources, and only one of them can be configured.
* `csv` describes the csv file format information.
* `tags` describes the schema definition for tags.
* `edges` describes the schema definition for edges.
Expand Down Expand Up @@ -288,6 +288,25 @@ hdfs:
* `disablePAFXFAST`: **Optional**. Whether to prohibit the client to use PA_FX_FAST.
* `path`: **Required**. The path of file in the sftp service.

#### gcs

It only needs to be configured for gcs data sources.

```yaml
gcs:
endpoint: <endpoint>
bucket: <bucket>
key: <key>
credentialsFile: <Service account or refresh token JSON credentials file>
credentialsJSON: <Service account or refresh token JSON credentials>
```

* `endpoint`: **Optional**. The endpoint of GCS service.
* `bucket`: **Required**. The bucket of file in GCS service.
* `key`: **Required**. The object key of file in GCS service.
* `credentialsFile`: **Optional**. Path to the service account or refresh token JSON credentials file. Not required for public data.
* `credentialsJSON`: **Optional**. Content of the service account or refresh token JSON credentials file. Not required for public data.

#### batch

```yaml
Expand Down
5 changes: 5 additions & 0 deletions docs/configuration-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@
| sources[].hdfs.dataTransferProtection | The data transfer protection of hdfs service. | - |
| sources[].hdfs.disablePAFXFAST | Whether to prohibit the client to use PA_FX_FAST. | - |
| sources[].hdfs.path | The path of file in the sftp service. | - |
| sources[].gcs.endpoint | The endpoint of GCS service. | - |
| sources[].gcs.bucket | The bucket of file in GCS service. | - |
| sources[].gcs.key | The object key of file in GCS service. | - |
| sources[].gcs.credentialsFile | Path to the service account or refresh token JSON credentials file. Not required for public data. | - |
| sources[].gcs.credentialsJSON | Content of the service account or refresh token JSON credentials file. Not required for public data. | - |
| sources[].batch | Specifies the batch size for this source of the inserted data. | - |
| sources[].csv | Describes the csv file format information. | - |
| sources[].csv.delimiter | Specifies the delimiter for the CSV files. | "," |
Expand Down
61 changes: 61 additions & 0 deletions examples/gcs/gcs.v3.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
client:
version: v3
address: "127.0.0.1:9669"
user: root
password: nebula
concurrencyPerAddress: 10
reconnectInitialInterval: 1s
retry: 3
retryInitialInterval: 1s

manager:
spaceName: gcs_examples
batch: 128
readerConcurrency: 50
importerConcurrency: 512
statsInterval: 10s
hooks:
before:
- statements:
- |
CREATE SPACE IF NOT EXISTS `gcs_examples`(PARTITION_NUM = 20, REPLICA_FACTOR = 1, vid_type = INT);
USE `gcs_examples`;
CREATE TAG IF NOT EXISTS `Crime`(`case_number` STRING);
wait: 10s

log:
level: INFO
console: true
files:
- logs/nebula-importer.log

sources:
- gcs: # Google Cloud Storage
bucket: chicago-crime-sample
key: stats/000000000000.csv
# credentialsFile: "/path/to/your/credentials/file"
# credentialsJSON: '{
# "type": "service_account",
# "project_id": "your-project-id",
# "private_key_id": "key-id",
# "private_key": "-----BEGIN PRIVATE KEY-----\nxxxxx\n-----END PRIVATE KEY-----\n",
# "client_email": "[email protected]",
# "client_id": "client-id",
# "auth_uri": "https://accounts.google.com/o/oauth2/auth",
# "token_uri": "https://oauth2.googleapis.com/token",
# "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
# "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/your-client%40your-project-id.iam.gserviceaccount.com",
# "universe_domain": "googleapis.com"
# }'
csv:
delimiter: ","
withHeader: true
tags:
- name: Crime
id:
type: "INT"
index: 0
props:
- name: "case_number"
type: "STRING"
index: 1
21 changes: 19 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/vesoft-inc/nebula-importer/v4
go 1.19

require (
cloud.google.com/go/storage v1.30.1
github.com/agiledragon/gomonkey/v2 v2.9.0
github.com/aliyun/aliyun-oss-go-sdk v2.2.6+incompatible
github.com/antonmedv/expr v1.12.5
Expand All @@ -24,15 +25,25 @@ require (
github.com/valyala/bytebufferpool v1.0.0
github.com/vesoft-inc/nebula-go/v3 v3.5.0
go.uber.org/zap v1.23.0
golang.org/x/crypto v0.5.0
golang.org/x/crypto v0.7.0
google.golang.org/api v0.114.0
gopkg.in/yaml.v3 v3.0.1
)

require (
cloud.google.com/go v0.110.0 // indirect
cloud.google.com/go/compute v1.18.0 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v0.12.0 // indirect
github.com/facebook/fbthrift v0.31.1-0.20211129061412-801ed7f9f295 // indirect
github.com/fclairamb/go-log v0.4.1 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
github.com/googleapis/gax-go/v2 v2.7.1 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.2 // indirect
Expand All @@ -46,12 +57,18 @@ require (
github.com/kr/fs v0.1.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
go.opencensus.io v0.24.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/oauth2 v0.6.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230320184635-7606e756e683 // indirect
google.golang.org/grpc v1.53.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
)
Loading

0 comments on commit 61c3b00

Please sign in to comment.