From 629c5cb41b88ed0f0f93a843e61e5835e79ac568 Mon Sep 17 00:00:00 2001 From: HarrisChu <1726587+HarrisChu@users.noreply.github.com> Date: Mon, 17 Jul 2023 14:33:47 +0800 Subject: [PATCH] combine the testing options --- Makefile | 4 +- README.md | 183 ++++++----- example/nebula-test-insert-limit-rate.js | 57 ++-- example/nebula-test-insert.js | 32 +- example/nebula-test-ssl.js | 50 +-- example/nebula-test.js | 54 ++-- pkg/common/types.go | 117 ++++++- pkg/nebulagraph/client.go | 368 ++++++++++++++--------- types.go | 1 - 9 files changed, 542 insertions(+), 324 deletions(-) delete mode 100644 types.go diff --git a/Makefile b/Makefile index 5335ba5..4f6ca28 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,6 @@ all: build pairs := darwin/amd64 linux/amd64 linux/arm64 GOPATH ?= ~/go export GO111MODULE=on -VERSION ?= v1.1.0 K6_VERSION ?= v0.43.0 fmt: @@ -15,7 +14,8 @@ lint : build: go install github.com/k6io/xk6/cmd/xk6@v0.4.1 - $(GOPATH)/bin/xk6 build $(K6_VERSION) --with github.com/vesoft-inc/k6-plugin@$(VERSION); + version=$(git describe --tags `git rev-list --tags --max-count=1`) \ + $(GOPATH)/bin/xk6 build $(K6_VERSION) --with github.com/vesoft-inc/k6-plugin@$(version); build-all: build-arm-v7 diff --git a/README.md b/README.md index 29aceb7..cc90578 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ Used to test [NebulaGraph](https://github.com/vesoft-inc/nebula). ## Version match -k6-plugin now support NebulaGraph above v2.5.0. +k6-plugin now support NebulaGraph above v3.0.0. ## Build @@ -32,6 +32,13 @@ Then: 2. Build the binary: ```bash +# build with the latest version. +make + +# build with local source code +make build-dev + +# or build with specified version xk6 build --with github.com/vesoft-inc/k6-plugin@{version} # e.g. build v0.0.8 xk6 build --with github.com/vesoft-inc/k6-plugin@v0.0.8 @@ -45,41 +52,46 @@ xk6 build --with github.com/vesoft-inc/k6-plugin@master import nebulaPool from 'k6/x/nebulagraph'; import { check } from 'k6'; import { Trend } from 'k6/metrics'; -import { sleep } from 'k6'; -var latencyTrend = new Trend('latency'); -var responseTrend = new Trend('responseTime'); -// initial nebula connect pool -// by default the channel buffer size is 20000, you can reset it with -// var pool = nebulaPool.initWithSize("192.168.8.152:9669", {poolSize}, {bufferSize}); e.g. -// var pool = nebulaPool.initWithSize("192.168.8.152:9669", 1000, 4000) -var pool = nebulaPool.init("192.168.8.152:9669", 400); +var latencyTrend = new Trend('latency', true); +var responseTrend = new Trend('responseTime', true); + +// option configuration, please refer more details in this doc. +var graph_option = { + address: "192.168.8.6:10010", + space: "sf1", + csv_path: "person.csv", + csv_delimiter: "|", + csv_with_header: true +}; +nebulaPool.setOption(graph_option); +var pool = nebulaPool.init(); // initial session for every vu -var session = pool.getSession("root", "nebula") -session.execute("USE sf1") +var session = pool.getSession() +String.prototype.format = function () { + var formatted = this; + var data = arguments[0] -export function setup() { - // config csv file - pool.configCSV("person.csv", "|", false) - // config output file, save every query information - pool.configOutput("output.csv") - sleep(1) -} + formatted = formatted.replace(/\{(\d+)\}/g, function (match, key) { + return data[key] + }) + return formatted +}; export default function (data) { // get csv data from csv file let d = session.getData() - // d[0] means the first column data in the csv file - let ngql = 'go 2 steps from ' + d[0] + ' over KNOWS ' + // {0} means the first column data in the csv file + let ngql = 'go 2 steps from {0} over KNOWS'.format(d) let response = session.execute(ngql) check(response, { "IsSucceed": (r) => r.isSucceed() === true }); // add trend -latencyTrend.add(response.getLatency()); -responseTrend.add(response.getResponseTime()); +latencyTrend.add(response.getLatency()/1000); +responseTrend.add(response.getResponseTime()/1000); }; export function teardown() { @@ -95,45 +107,39 @@ export function teardown() { # -d means the duration that test running, e.g. `3s` means 3 seconds, `5m` means 5 minutes. >./k6 run nebula-test.js -u 3 -d 3s + /\ |‾‾| /‾‾/ /‾‾/ /\ / \ | |/ / / / / \/ \ | ( / ‾‾\ / \ | |\ \ | (‾) | / __________ \ |__| \__\ \_____/ .io -INFO[0000] 2021/07/07 16:50:25 [INFO] begin init the nebula pool -INFO[0000] 2021/07/07 16:50:25 [INFO] connection pool is initialized successfully -INFO[0000] 2021/07/07 16:50:25 [INFO] finish init the pool +testing option: {"pool_policy":"connection","output":"output.csv","output_channel_size":10000,"address":"192.168.8.6:10010","timeout_us":0,"idletime_us":0,"max_size":400,"min_size":0,"username":"root","password":"nebula","space":"sf1","csv_path":"person.csv","csv_delimiter":"|","csv_with_header":true,"csv_channel_size":10000,"csv_data_limit":500000,"retry_times":0,"retry_interval_us":0,"retry_timeout_us":0,"ssl_ca_pem_path":"","ssl_client_pem_path":"","ssl_client_key_path":""} execution: local script: nebula-test.js - output: - + output: engine scenarios: (100.00%) 1 scenario, 3 max VUs, 33s max duration (incl. graceful stop): * default: 3 looping VUs for 3s (gracefulStop: 30s) -INFO[0004] 2021/07/07 16:50:29 [INFO] begin close the nebula pool - -running (04.1s), 0/3 VUs, 570 complete and 0 interrupted iterations -default ✓ [======================================] 3 VUs 3s -INFO[0004] 2021/07/07 16:50:29 [INFO] begin init the nebula pool -INFO[0004] 2021/07/07 16:50:29 [INFO] connection pool is initialized successfully -INFO[0004] 2021/07/07 16:50:29 [INFO] finish init the pool ✓ IsSucceed - █ setup - █ teardown - checks...............: 100.00% ✓ 570 ✗ 0 + checks...............: 100.00% ✓ 3529 ✗ 0 data_received........: 0 B 0 B/s data_sent............: 0 B 0 B/s - iteration_duration...: avg=17.5ms min=356.6µs med=11.44ms max=1s p(90)=29.35ms p(95)=38.73ms - iterations...........: 570 139.877575/s - latency..............: avg=2986.831579 min=995 med=2663 max=18347 p(90)=4518.4 p(95)=5803 - responseTime.........: avg=15670.263158 min=4144 med=11326.5 max=108286 p(90)=28928.9 p(95)=38367.1 - vus..................: 3 min=0 max=3 - vus_max..............: 3 min=3 max=3 + iteration_duration...: avg=2.54ms min=129.28µs med=1.78ms max=34.99ms p(90)=5.34ms p(95)=6.79ms + iterations...........: 3529 1174.135729/s + latency..............: avg=1.98ms min=439µs med=1.42ms max=27.77ms p(90)=4.11ms p(95)=5.12ms + responseTime.........: avg=2.48ms min=495µs med=1.72ms max=34.93ms p(90)=5.27ms p(95)=6.71ms + vus..................: 3 min=3 max=3 + vus_max..............: 3 min=3 max=3 + + +running (03.0s), 0/3 VUs, 3529 complete and 0 interrupted iterations +default ✓ [======================================] 3 VUs 3s ``` * `checks`, one check per iteration, verify `isSucceed` by default. @@ -154,38 +160,71 @@ The `output.csv` saves data as below: ```bash >head output.csv -timestamp,nGQL,latency,responseTime,isSucceed,rows,errorMsg -1625647825,USE sf1,7808,10775,true,0, -1625647825,USE sf1,4055,7725,true,0, -1625647825,USE sf1,3431,10231,true,0, -1625647825,USE sf1,2938,5600,true,0, -1625647825,USE sf1,2917,5410,true,0, -1625647826,go 2 steps from 933 over KNOWS ,6022,24537,true,1680, -1625647826,go 2 steps from 1129 over KNOWS ,6141,25861,true,1945, -1625647826,go 2 steps from 4194 over KNOWS ,6317,26309,true,1581, -1625647826,go 2 steps from 8698 over KNOWS ,4388,22597,true,1530, -``` - -## Advanced usage - -By default, all vus use the same channel to read the csv data. - -You can change the strategy before `getSession` function. - -As each vu uses a separate channel, you can reduce channel buffer size to save memory. - -```js -// initial nebula connect pool, channel buffer size is 4000 -var pool = nebulaPool.initWithSize("192.168.8.61:9669", 400, 4000); - -// set csv strategy, 1 means each vu has a separate csv reader. -pool.configCsvStrategy(1) - -// initial session for every vu -var session = pool.getSession("root", "nebula") +timestamp,nGQL,latency,responseTime,isSucceed,rows,firstRecord,errorMsg +1689576531,go 2 steps from 4194 over KNOWS yield dst(edge),4260,5151,true,1581,32985348838665, +1689576531,go 2 steps from 8333 over KNOWS yield dst(edge),4772,5772,true,2063,32985348833536, +1689576531,go 2 steps from 1129 over KNOWS yield dst(edge),5471,6441,true,1945,19791209302529, +1689576531,go 2 steps from 8698 over KNOWS yield dst(edge),3453,4143,true,1530,28587302322946, +1689576531,go 2 steps from 8853 over KNOWS yield dst(edge),4361,5368,true,2516,28587302324992, +1689576531,go 2 steps from 2199023256684 over KNOWS yield dst(edge),2259,2762,true,967,32985348833796, +1689576531,go 2 steps from 2199023262818 over KNOWS yield dst(edge),638,732,true,0,, +1689576531,go 2 steps from 10027 over KNOWS yield dst(edge),5182,6701,true,3288,30786325580290, +1689576531,go 2 steps from 2199023261211 over KNOWS yield dst(edge),2131,2498,true,739,32985348833794, ``` -Please refer to [nebula-test-insert.js](./example/nebula-test-insert.js) for more details. +## Plugin Option + +Pool options + +--- +| Key | Type | Default | Description | +|---|---|---|---| +|pool_policy|string|connection|'connection' or 'session', using which pool to test | +|address |string||NebulaGraph address, e.g. '192.168.8.6:9669,192.168.8.7:9669'| +|timeout_us|int|0|client connetion timeout, 0 means no timeout| +|idletime_us|int|0|client connection idle timeout, 0 means no timeout| +|max_size|int|400|max client connections in pool| +|min_size|int|0|min client connections in pool| +|username|string|root|NebulaGraph username| +|password|string|nebula|NebulaGraph password| +|space|string||NebulaGraph space| + +Output options + +--- +| Key | Type | Default | Description | +|---|---|---|---| +|output|string||output file path| +|output_channel_size|int|10000| size of output channel| + +CSV options + +--- +| Key | Type | Default | Description | +|---|---|---|---| +|csv_path|string||csv file path| +|csv_delimiter|string|,|delimiter of csv file| +|csv_with_header|bool|false|if ture, would ignore the first record| +|csv_channel_size|int|10000|size of csv reader channel| +|csv_data_limit|int|500000|would load [x] rows in memory, and then send to channel in loop| + +Retry options + +--- +| Key | Type | Default | Description | +|---|---|---|---| +|retry_times|int|0|max retry times| +|retry_interval_us|int|0|interval duration for next retry| +|retry_timeout_us|int|0|retry timeout| + +SSL options + +--- +| Key | Type | Default | Description | +|---|---|---|---| +|ssl_ca_pem_path|string||if it is not blank, would use SSL connection. ca pem path| +|ssl_client_pem_path|string||client pem path| +|ssl_client_key_path|string||client key path| ## Batch insert diff --git a/example/nebula-test-insert-limit-rate.js b/example/nebula-test-insert-limit-rate.js index 6f1f929..f454b64 100644 --- a/example/nebula-test-insert-limit-rate.js +++ b/example/nebula-test-insert-limit-rate.js @@ -7,19 +7,23 @@ import nebulaPool from 'k6/x/nebulagraph'; import { check } from 'k6'; import { Trend } from 'k6/metrics'; -import { sleep } from 'k6'; -var latencyTrend = new Trend('latency'); -var responseTrend = new Trend('responseTime'); -// initial nebula connect pool -var pool = nebulaPool.initWithSize("192.168.8.61:9669,192.168.8.62:9669,192.168.8.63:9669", 400, 4000); +var latencyTrend = new Trend('latency', true); +var responseTrend = new Trend('responseTime', true); -// set csv strategy, 1 means each vu has a separate csv reader. -pool.configCsvStrategy(1) +var graph_option = { + address: "192.168.8.6:10010", + space: "sf1", + csv_path: "person.csv", + csv_delimiter: "|", + csv_with_header: true, + output: "output.csv" +}; +nebulaPool.setOption(graph_option); +var pool = nebulaPool.init(); // initial session for every vu -var session = pool.getSession("root", "nebula") -session.execute("USE ldbc") +var session = pool.getSession() // concurrent 300, and each second, 1000 iterations would be made. export const options = { @@ -35,32 +39,27 @@ export const options = { }, }; -export function setup() { - // config csv file - pool.configCSV("person.csv", "|", false) - // config output file, save every query information - pool.configOutput("output.csv") - sleep(1) -} +String.prototype.format = function() { + var formatted = this; + var data = arguments[0] + + formatted = formatted.replace(/\{(\d+)\}/g, function(match, key) { + return data[key] + }) + return formatted +}; -export default function (data) { +export default function(data) { // get csv data from csv file let ngql = 'INSERT VERTEX Person(firstName, lastName, gender, birthday, creationDate, locationIP, browserUsed) VALUES ' let batches = [] - let batchSize = 1 - // batch size + let batchSize = 10 for (let i = 0; i < batchSize; i++) { let d = session.getData(); - let values = [] - // concat the insert value - for (let index = 1; index < 8; index++) { - let value = '"' + d[index] + '"' - values.push(value) - } - let batch = d[0] + ":(" + values.join(",") + ")" - batches.push(batch) + let value = "{0}:(\"{1}\",\"{2}\", \"{3}\", \"{4}\", datetime(\"{5}\"), \"{6}\", \"{7}\")".format(d) + batches.push(value) } - ngql = ngql + batches.join(',') + ngql = ngql + " " + batches.join(',') let response = session.execute(ngql) check(response, { "IsSucceed": (r) => r.isSucceed() === true @@ -74,5 +73,3 @@ export default function (data) { export function teardown() { pool.close() } - - diff --git a/example/nebula-test-insert.js b/example/nebula-test-insert.js index d5dda58..99ec399 100644 --- a/example/nebula-test-insert.js +++ b/example/nebula-test-insert.js @@ -6,22 +6,20 @@ import { sleep } from 'k6'; var latencyTrend = new Trend('latency', true); var responseTrend = new Trend('responseTime', true); var rowSize = new Trend('rowSize'); -// initial nebula connect pool -var pool = nebulaPool.initWithSize("192.168.8.61:9669,192.168.8.62:9669,192.168.8.63:9669", 400, 4000); -// set csv strategy, 1 means each vu has a separate csv reader. -pool.configCsvStrategy(1) +var graph_option = { + address: "192.168.8.6:10010", + space: "sf1", + csv_path: "person.csv", + csv_delimiter: "|", + csv_with_header: true, + output: "output.csv" +}; +nebulaPool.setOption(graph_option); +var pool = nebulaPool.init(); // initial session for every vu -var session = pool.getSession("root", "nebula") -session.execute("USE ldbc") -// export let options = { -// stages: [ -// { duration: '2s', target: 20 }, -// { duration: '2m', target: 20 }, -// { duration: '1m', target: 0 }, -// ], -// }; +var session = pool.getSession() String.prototype.format = function () { var formatted = this; @@ -33,14 +31,6 @@ String.prototype.format = function () { return formatted }; -export function setup() { - // config csv file - pool.configCSV("person.csv", "|", false) - // config output file, save every query information - pool.configOutput("output.csv") - sleep(1) -} - export default function (data) { // get csv data from csv file let ngql = 'INSERT VERTEX Person(firstName, lastName, gender, birthday, creationDate, locationIP, browserUsed) VALUES ' diff --git a/example/nebula-test-ssl.js b/example/nebula-test-ssl.js index 71c1a24..61d2cfe 100644 --- a/example/nebula-test-ssl.js +++ b/example/nebula-test-ssl.js @@ -3,14 +3,26 @@ import { check } from 'k6'; import { Trend } from 'k6/metrics'; import { sleep } from 'k6'; -var latencyTrend = new Trend('latency'); -var responseTrend = new Trend('responseTime'); -// initial nebula connect pool -nebulaPool.newSSLConfig("cert/test.ca.pem", "cert/test.derive.crt", "cert/test.derive.key") -var pool = nebulaPool.init("192.168.15.7:10004", 400); +var latencyTrend = new Trend('latency', true); +var responseTrend = new Trend('responseTime', true); + +var graph_option = { + address: "192.168.8.6:10010", + space: "sf1", + csv_path: "person.csv", + csv_delimiter: "|", + csv_with_header: true, + output: "output.csv", + ssl_ca_pem_path: "cert/test.ca.pem", + ssl_client_pem_path: "cert/test.client.pem", + ssl_client_key_path: "cert/test.client.key" +}; + +nebulaPool.setOption(graph_option); +var pool = nebulaPool.init(); // initial session for every vu -var session = pool.getSession("root", "nebula") -session.execute("USE ldbc") +var session = pool.getSession() + export let options = { stages: [ { duration: '2s', target: 20 }, @@ -19,26 +31,28 @@ export let options = { ], }; -export function setup() { - // config csv file - pool.configCSV("person.csv", "|", false) - // config output file, save every query information - pool.configOutput("output.csv") - sleep(1) -} +String.prototype.format = function() { + var formatted = this; + var data = arguments[0] + + formatted = formatted.replace(/\{(\d+)\}/g, function(match, key) { + return data[key] + }) + return formatted +}; -export default function (data) { +export default function(data) { // get csv data from csv file let d = session.getData() // d[0] means the first column data in the csv file - let ngql = 'go 2 steps from ' + d[0] + ' over KNOWS ' + let ngql = 'go 2 steps from {0} over KNOWS yield dst(edge)'.format(d) let response = session.execute(ngql) check(response, { "IsSucceed": (r) => r.isSucceed() === true }); // add trend - latencyTrend.add(response.getLatency()); - responseTrend.add(response.getResponseTime()); + latencyTrend.add(response.getLatency() / 1000); + responseTrend.add(response.getResponseTime() / 1000); }; diff --git a/example/nebula-test.js b/example/nebula-test.js index 249100b..4d33b0c 100644 --- a/example/nebula-test.js +++ b/example/nebula-test.js @@ -1,41 +1,49 @@ import nebulaPool from 'k6/x/nebulagraph'; import { check } from 'k6'; import { Trend } from 'k6/metrics'; -import { sleep } from 'k6'; -var latencyTrend = new Trend('latency'); -var responseTrend = new Trend('responseTime'); -// initial nebula connect pool -var pool = nebulaPool.init("192.168.8.152:9669", 400); +var latencyTrend = new Trend('latency', true); +var responseTrend = new Trend('responseTime', true); + +// option configuration, please refer more details in this doc. +var graph_option = { + address: "192.168.8.6:10010", + space: "sf1", + csv_path: "person.csv", + csv_delimiter: "|", + csv_with_header: true, + output: "output.csv" +}; + +nebulaPool.setOption(graph_option); +var pool = nebulaPool.init(); // initial session for every vu -var session = pool.getSession("root", "nebula") -session.execute("USE sf1") +var session = pool.getSession() + +String.prototype.format = function() { + var formatted = this; + var data = arguments[0] -export function setup() { - // config csv file - pool.configCSV("person.csv", "|", false) - // config output file, save every query information - pool.configOutput("output.csv") - sleep(1) -} + formatted = formatted.replace(/\{(\d+)\}/g, function(match, key) { + return data[key] + }) + return formatted +}; -export default function (data) { +export default function(data) { // get csv data from csv file let d = session.getData() - // d[0] means the first column data in the csv file - let ngql = 'go 2 steps from ' + d[0] + ' over KNOWS yield $$.Person.firstName as firstName' + // {0} means the first column data in the csv file + let ngql = 'go 2 steps from {0} over KNOWS yield dst(edge)'.format(d) let response = session.execute(ngql) check(response, { "IsSucceed": (r) => r.isSucceed() === true }); // add trend - latencyTrend.add(response.getLatency()); - responseTrend.add(response.getResponseTime()); - + latencyTrend.add(response.getLatency() / 1000); + responseTrend.add(response.getResponseTime() / 1000); }; export function teardown() { pool.close() -} - - +} \ No newline at end of file diff --git a/pkg/common/types.go b/pkg/common/types.go index c37e5e3..ad2b4aa 100644 --- a/pkg/common/types.go +++ b/pkg/common/types.go @@ -1,9 +1,16 @@ package common +import ( + "fmt" +) + type ( // Data data in csv file Data []string + // pool policy + PoolPolicy string + // IClient common client IClient interface { Open() error @@ -33,18 +40,112 @@ type ( // IGraphClientPool graph client pool. IGraphClientPool interface { IClientPool - GetSession(username, password string) (IGraphClient, error) + GetSession() (IGraphClient, error) // Init initialize the poop with default channel bufferSize - Init(address string, concurrent int) (IGraphClientPool, error) - InitWithSize(address string, concurrent int, size int) (IGraphClientPool, error) - ConfigCSV(path, delimiter string, withHeader bool, opts ...interface{}) error - ConfigOutput(path string) error - - // Deprecated - ConfigCsvStrategy(strategy int) + Init() (IGraphClientPool, error) + SetOption(*GraphOption) error } ICsvReader interface { ReadForever(dataCh chan<- Data) error } + + GraphOption struct { + PoolOption `json:",inline"` + OutputOption `json:",inline"` + CsvOption `json:",inline"` + RetryOption `json:",inline"` + SSLOption `json:",inline"` + } + + PoolOption struct { + PoolPolicy string `json:"pool_policy"` + Address string `json:"address"` + TimeoutUs int `json:"timeout_us"` + IdleTimeUs int `json:"idletime_us"` + MaxSize int `json:"max_size"` + MinSize int `json:"min_size"` + Username string `json:"username"` + Password string `json:"password"` + Space string `json:"space"` + } + + OutputOption struct { + Output string `json:"output"` + OutputChannelSize int `json:"output_channel_size"` + } + + SSLOption struct { + SslCaPemPath string `json:"ssl_ca_pem_path"` + SslClientPemPath string `json:"ssl_client_pem_path"` + SslClientKeyPath string `json:"ssl_client_key_path"` + } + + CsvOption struct { + CsvPath string `json:"csv_path"` + CsvDelimiter string `json:"csv_delimiter"` + CsvWithHeader bool `json:"csv_with_header"` + CsvChannelSize int `json:"csv_channel_size"` + CsvDataLimit int `json:"csv_data_limit"` + } + RetryOption struct { + RetryTimes int `json:"retry_times"` + RetryIntervalUs int `json:"retry_interval_us"` + RetryTimeoutUs int `json:"retry_timeout_us"` + } +) + +const ( + ConnectionPool PoolPolicy = "connection" + SessionPool PoolPolicy = "session" ) + +func MakeDefaultOption(opt *GraphOption) *GraphOption { + if opt == nil { + return nil + } + if opt.PoolPolicy == "" { + opt.PoolPolicy = string(ConnectionPool) + } + if opt.OutputChannelSize == 0 { + opt.OutputChannelSize = 10000 + } + if opt.CsvPath != "" && opt.CsvDelimiter == "" { + opt.CsvDelimiter = "," + } + if opt.CsvChannelSize == 0 { + opt.CsvChannelSize = 10000 + } + if opt.CsvDataLimit == 0 { + opt.CsvDataLimit = 500000 + } + if opt.MaxSize == 0 { + opt.MaxSize = 400 + } + if opt.Username == "" { + opt.Username = "root" + } + if opt.Password == "" { + opt.Password = "nebula" + } + return opt +} + +func ValidateOption(option *GraphOption) error { + if option == nil { + return nil + } + if option.Space == "" { + return fmt.Errorf("space is empty") + } + if option.Address == "" { + return fmt.Errorf("address is empty") + } + if option.SslCaPemPath != "" { + if option.SslClientPemPath == "" || option.SslClientKeyPath == "" { + return fmt.Errorf("ssl_client_pem_path or ssl_client_key_path is empty") + } + } + + return nil +} diff --git a/pkg/nebulagraph/client.go b/pkg/nebulagraph/client.go index d8c9502..2b340d2 100644 --- a/pkg/nebulagraph/client.go +++ b/pkg/nebulagraph/client.go @@ -1,8 +1,9 @@ package nebulagraph import ( + "crypto/tls" + "encoding/json" "fmt" - "os" "strconv" "strings" "sync" @@ -19,29 +20,22 @@ const EnvRetryTimeoutUs = "NEBULA_RETRY_TIMEOUT_US" type ( // GraphPool nebula connection pool GraphPool struct { - DataCh chan common.Data - OutputCh chan []string - Version string - csvStrategy csvReaderStrategy - initialized bool - pool *graph.ConnectionPool - clients []common.IGraphClient - channelBufferSize int - hosts []string - mutex sync.Mutex - csvReader common.ICsvReader - retryTimes int - retryIntervalUs int - retryTimeoutUs int + DataCh chan common.Data + OutputCh chan []string + initialized bool + mutex sync.Mutex + csvReader common.ICsvReader + connPool *graph.ConnectionPool + sessPool *graph.SessionPool + clients []common.IGraphClient + graphOption *common.GraphOption } // GraphClient a wrapper for nebula client, could read data from DataCh GraphClient struct { - Client *graph.Session - Pool *GraphPool - DataCh chan common.Data - username string - password string + Client *graph.Session + Pool *GraphPool + DataCh chan common.Data } // Response a wrapper for nebula resultSet @@ -104,82 +98,140 @@ func NewNebulaGraph() *GraphPool { } // Init initializes nebula pool with address and concurrent, by default the bufferSize is 20000 -func (gp *GraphPool) Init(address string, concurrent int) (common.IGraphClientPool, error) { - return gp.InitWithSize(address, concurrent, 20000) -} - -// InitWithSize initializes nebula pool with channel buffer size -func (gp *GraphPool) InitWithSize(address string, concurrent int, chanSize int) (common.IGraphClientPool, error) { +func (gp *GraphPool) Init() (common.IGraphClientPool, error) { var ( - retryTimes int - retryIntervalUs int - retryTimeoutUs int + err error ) - - gp.mutex.Lock() - defer gp.mutex.Unlock() if gp.initialized { return gp, nil } - if os.Getenv(EnvRetryTimes) != "" { - retryTimes, _ = strconv.Atoi(os.Getenv(EnvRetryTimes)) + + switch gp.graphOption.PoolPolicy { + case string(common.ConnectionPool): + err = gp.initConnectionPool() + case string(common.SessionPool): + err = gp.initSessionPool() + default: + return nil, fmt.Errorf("invalid pool policy: %s, need connection or session", gp.graphOption.PoolPolicy) } - if os.Getenv(EnvRetryIntervalUs) != "" { - retryIntervalUs, _ = strconv.Atoi(os.Getenv(EnvRetryIntervalUs)) + if err != nil { + return nil, err } - if os.Getenv(EnvRetryTimeoutUs) != "" { - retryTimeoutUs, _ = strconv.Atoi(os.Getenv(EnvRetryTimeoutUs)) + gp.initialized = true + if gp.graphOption.Output != "" { + channelBufferSize := gp.graphOption.OutputChannelSize + gp.OutputCh = make(chan []string, channelBufferSize) + writer := common.NewCsvWriter(gp.graphOption.Output, ",", outputHeader, gp.OutputCh) + if err := writer.WriteForever(); err != nil { + return nil, err + } } + if gp.graphOption.CsvPath != "" { + gp.csvReader = common.NewCsvReader( + gp.graphOption.CsvPath, + gp.graphOption.CsvDelimiter, + gp.graphOption.CsvWithHeader, + gp.graphOption.CsvDataLimit, + ) + gp.DataCh = make(chan common.Data, gp.graphOption.CsvChannelSize) + if err := gp.csvReader.ReadForever(gp.DataCh); err != nil { + return nil, err + } + } + return gp, nil +} - if retryTimes == 0 { - retryTimes = 200 +func (gp *GraphPool) initConnectionPool() error { + addr := gp.graphOption.Address + hosts, err := gp.validate(addr) + if err != nil { + return err } - if retryIntervalUs == 0 { - retryIntervalUs = 100 * 1e3 + gp.clients = make([]common.IGraphClient, 0, gp.graphOption.MaxSize) + conf := graph.GetDefaultConf() + conf.TimeOut = time.Duration(gp.graphOption.TimeoutUs) * time.Microsecond + conf.IdleTime = time.Duration(gp.graphOption.IdleTimeUs) * time.Microsecond + var sslConfig *tls.Config + if gp.graphOption.SslCaPemPath != "" { + var err error + sslConfig, err = graph.GetDefaultSSLConfig( + gp.graphOption.SslCaPemPath, + gp.graphOption.SslClientPemPath, + gp.graphOption.SslClientKeyPath) + if err != nil { + return err + } } + pool, err := graph.NewSslConnectionPool(hosts, conf, sslConfig, graph.DefaultLogger{}) + if err != nil { + return err + } + gp.connPool = pool - err := gp.initAndVerifyPool(address, concurrent, chanSize) + return nil +} + +func (gp *GraphPool) initSessionPool() error { + addr := gp.graphOption.Address + hosts, err := gp.validate(addr) if err != nil { - return nil, err + return err } - gp.DataCh = make(chan common.Data, chanSize) - gp.initialized = true - gp.retryTimes = retryTimes - gp.retryIntervalUs = retryIntervalUs - gp.retryTimeoutUs = retryTimeoutUs + var sslConfig *tls.Config + if gp.graphOption.SslCaPemPath != "" { + var err error + sslConfig, err = graph.GetDefaultSSLConfig( + gp.graphOption.SslCaPemPath, + gp.graphOption.SslClientPemPath, + gp.graphOption.SslClientKeyPath) + if err != nil { + return err + } + } + conf, err := graph.NewSessionPoolConf( + gp.graphOption.Username, + gp.graphOption.Password, + hosts, + gp.graphOption.Space, + graph.WithTimeOut(time.Duration(gp.graphOption.TimeoutUs)*time.Microsecond), + graph.WithIdleTime(time.Duration(gp.graphOption.IdleTimeUs)*time.Microsecond), + graph.WithMaxSize(gp.graphOption.MaxSize), + graph.WithMinSize(gp.graphOption.MinSize), + graph.WithSSLConfig(sslConfig), + ) + if err != nil { + return err + } + pool, err := graph.NewSessionPool(*conf, graph.DefaultLogger{}) + if err != nil { + return err + } + gp.sessPool = pool - return gp, nil + return nil } -func (gp *GraphPool) initAndVerifyPool(address string, concurrent int, chanSize int) error { +func (gp *GraphPool) validate(address string) ([]graph.HostAddress, error) { var hosts []graph.HostAddress addrs := strings.Split(address, ",") + if len(addrs) == 0 { + return nil, fmt.Errorf("Invalid address: %s", address) + } for _, addr := range addrs { hostAndPort := strings.Split(addr, ":") if len(hostAndPort) != 2 { - return fmt.Errorf("Invalid address: %s", addr) + return nil, fmt.Errorf("Invalid address: %s", addr) } port, err := strconv.Atoi(hostAndPort[1]) if err != nil { - return err + return nil, err } hosts = append(hosts, graph.HostAddress{ Host: hostAndPort[0], Port: port, }) } - - gp.clients = make([]common.IGraphClient, 0, concurrent) - conf := graph.GetDefaultConf() - conf.MaxConnPoolSize = concurrent * 2 - pool, err := graph.NewConnectionPool(hosts, conf, graph.DefaultLogger{}) - if err != nil { - return err - } - gp.pool = pool - gp.channelBufferSize = chanSize - gp.OutputCh = make(chan []string, gp.channelBufferSize) - return nil + return hosts, nil } // Deprecated ConfigCsvStrategy sets csv reader strategy @@ -187,38 +239,6 @@ func (gp *GraphPool) ConfigCsvStrategy(strategy int) { return } -// ConfigCSV makes the read csv file configuration -func (gp *GraphPool) ConfigCSV(path, delimiter string, withHeader bool, opts ...interface{}) error { - var ( - limit int = 500 * 10000 - ) - if gp.csvReader != nil { - return nil - } - if len(opts) > 0 { - l, ok := opts[0].(int) - if ok { - limit = l - } - } - gp.csvReader = common.NewCsvReader(path, delimiter, withHeader, limit) - - if err := gp.csvReader.ReadForever(gp.DataCh); err != nil { - return err - } - - return nil -} - -// ConfigOutput makes the output file configuration, would write the execution outputs -func (gp *GraphPool) ConfigOutput(path string) error { - writer := common.NewCsvWriter(path, ",", outputHeader, gp.OutputCh) - if err := writer.WriteForever(); err != nil { - return err - } - return nil -} - // Close closes the nebula pool func (gp *GraphPool) Close() error { gp.mutex.Lock() @@ -231,21 +251,53 @@ func (gp *GraphPool) Close() error { s.Close() } } - gp.initialized = false + if gp.connPool != nil { + gp.connPool.Close() + } + if gp.sessPool != nil { + gp.sessPool.Close() + } + return nil } // GetSession gets the session from pool -func (gp *GraphPool) GetSession(username, password string) (common.IGraphClient, error) { - gp.mutex.Lock() - defer gp.mutex.Unlock() - c, err := gp.pool.GetSession(username, password) - if err != nil { - return nil, err +func (gp *GraphPool) GetSession() (common.IGraphClient, error) { + if gp.connPool != nil { + gp.mutex.Lock() + defer gp.mutex.Unlock() + c, err := gp.connPool.GetSession( + gp.graphOption.Username, + gp.graphOption.Password, + ) + if err != nil { + return nil, err + } + _, err = c.Execute(fmt.Sprintf("USE %s", gp.graphOption.Space)) + if err != nil { + return nil, err + } + s := &GraphClient{Client: c, Pool: gp, DataCh: gp.DataCh} + gp.clients = append(gp.clients, s) + return s, nil + } else { + s := &GraphClient{Client: nil, Pool: gp, DataCh: gp.DataCh} + return s, nil + } + +} + +func (gp *GraphPool) SetOption(option *common.GraphOption) error { + if gp.graphOption != nil { + return nil + } + gp.graphOption = common.MakeDefaultOption(option) + if err := common.ValidateOption(gp.graphOption); err != nil { + return err } - s := &GraphClient{Client: c, Pool: gp, DataCh: gp.DataCh} - gp.clients = append(gp.clients, s) - return s, nil + bs, _ := json.Marshal(gp.graphOption) + fmt.Printf("testing option: %s\n", bs) + return nil } func (gc *GraphClient) Open() error { @@ -276,13 +328,20 @@ func (gc *GraphClient) executeRetry(stmt string) (*graph.ResultSet, error) { err error ) start := time.Now() - for i := 0; i < gc.Pool.retryTimes; i++ { - if gc.Pool.retryTimeoutUs != 0 && time.Since(start).Microseconds() > int64(gc.Pool.retryTimeoutUs) { - return resp, nil + for i := 0; i < gc.Pool.graphOption.RetryTimes+1; i++ { + if gc.Pool.graphOption.RetryIntervalUs != 0 && + time.Since(start).Microseconds() > int64(gc.Pool.graphOption.RetryTimeoutUs) { + return resp, err + } + if gc.Client != nil { + resp, err = gc.Client.Execute(stmt) + } else { + resp, err = gc.Pool.sessPool.Execute(stmt) } - resp, err = gc.Client.Execute(stmt) + if err != nil { - return nil, err + fmt.Println("execute error: ", err) + continue } graphErr := resp.GetErrorCode() if graphErr == graph.ErrorCode_SUCCEEDED { @@ -292,62 +351,73 @@ func (gc *GraphClient) executeRetry(stmt string) (*graph.ResultSet, error) { if graphErr != graph.ErrorCode_E_EXECUTION_ERROR { break } - <-time.After(time.Duration(gc.Pool.retryIntervalUs) * time.Microsecond) + <-time.After(time.Duration(gc.Pool.graphOption.RetryIntervalUs) * time.Microsecond) } - return resp, nil + return resp, err } // Execute executes nebula query func (gc *GraphClient) Execute(stmt string) (common.IGraphResponse, error) { start := time.Now() var ( - rows int32 - latency int64 + o *output + result common.IGraphResponse ) resp, err := gc.executeRetry(stmt) if err != nil { - return nil, err - } - - rows = int32(resp.GetRowSize()) - latency = resp.GetLatency() - - responseTime := int32(time.Since(start) / 1000) - // output - if gc.Pool.OutputCh != nil { - var fr []string - columns := resp.GetColSize() - if rows != 0 { - r, err := resp.GetRowValuesByIndex(0) - if err != nil { - return nil, err - } - for i := 0; i < columns; i++ { - v, err := r.GetValueByIndex(i) - if err != nil { - return nil, err - } - fr = append(fr, v.String()) - } + // to summary the error, should validate the response is nil or not in js. + o = &output{ + timeStamp: start.Unix(), + nGQL: stmt, + latency: 0, + responseTime: 0, + isSucceed: false, + rows: 0, + errorMsg: err.Error(), + firstRecord: "", } - o := &output{ + result = nil + } else { + o = &output{ timeStamp: start.Unix(), nGQL: stmt, - latency: latency, - responseTime: responseTime, + latency: resp.GetLatency(), + responseTime: int32(time.Since(start) / 1000), isSucceed: resp.GetErrorCode() == graph.ErrorCode_SUCCEEDED, - rows: rows, + rows: int32(resp.GetRowSize()), errorMsg: resp.GetErrorMsg(), - firstRecord: strings.Join(fr, "|"), + firstRecord: "", + } + result = &Response{ResultSet: resp, ResponseTime: o.responseTime} + } + if gc.Pool.OutputCh == nil { + return result, nil + } + + var fr []string + columns := resp.GetColSize() + if o.rows != 0 { + r, err := resp.GetRowValuesByIndex(0) + if err != nil { + return nil, err } - select { - case gc.Pool.OutputCh <- formatOutput(o): - // abandon if the output chan is full. - default: + for i := 0; i < columns; i++ { + v, err := r.GetValueByIndex(i) + if err != nil { + return nil, err + } + fr = append(fr, v.String()) } + } + o.firstRecord = strings.Join(fr, "|") + + select { + case gc.Pool.OutputCh <- formatOutput(o): + // abandon if the output chan is full. + default: } - return &Response{ResultSet: resp, ResponseTime: responseTime}, nil + return result, nil } // GetResponseTime GetResponseTime diff --git a/types.go b/types.go deleted file mode 100644 index eaf754a..0000000 --- a/types.go +++ /dev/null @@ -1 +0,0 @@ -package k6plugin