diff --git a/example/nebula-test-insert-limit-rate.js b/example/nebula-test-insert-limit-rate.js index f454b64..7572c82 100644 --- a/example/nebula-test-insert-limit-rate.js +++ b/example/nebula-test-insert-limit-rate.js @@ -62,12 +62,13 @@ export default function(data) { ngql = ngql + " " + batches.join(',') let response = session.execute(ngql) check(response, { - "IsSucceed": (r) => r.isSucceed() === true + "IsSucceed": (r) => r !== null && r.isSucceed() === true }); // add trend - latencyTrend.add(response.getLatency()); - responseTrend.add(response.getResponseTime()); - + if (response !== null) { + latencyTrend.add(response.getLatency() / 1000); + responseTrend.add(response.getResponseTime() / 1000); + } }; export function teardown() { diff --git a/example/nebula-test-insert.js b/example/nebula-test-insert.js index 99ec399..37f86c1 100644 --- a/example/nebula-test-insert.js +++ b/example/nebula-test-insert.js @@ -44,13 +44,16 @@ export default function (data) { } ngql = ngql + " " + batches.join(',') let response = session.execute(ngql) - check(response, { - "IsSucceed": (r) => r.isSucceed() === true - }); - // add trend - latencyTrend.add(response.getLatency() / 1000); - responseTrend.add(response.getResponseTime() / 1000); - rowSize.add(response.getRowSize()); + check(response, { + "IsSucceed": (r) => r !== null && r.isSucceed() === true + }); + // add trend + if (response !== null) { + latencyTrend.add(response.getLatency() / 1000); + responseTrend.add(response.getResponseTime() / 1000); + rowSize.add(response.getRowSize()); + } + }; export function teardown() { diff --git a/example/nebula-test-ssl.js b/example/nebula-test-ssl.js index 61d2cfe..a4f1382 100644 --- a/example/nebula-test-ssl.js +++ b/example/nebula-test-ssl.js @@ -48,12 +48,13 @@ export default function(data) { let ngql = 'go 2 steps from {0} over KNOWS yield dst(edge)'.format(d) let response = session.execute(ngql) check(response, { - "IsSucceed": (r) => r.isSucceed() === true + "IsSucceed": (r) => r !== null && r.isSucceed() === true }); // add trend - latencyTrend.add(response.getLatency() / 1000); - responseTrend.add(response.getResponseTime() / 1000); - + if (response !== null) { + latencyTrend.add(response.getLatency() / 1000); + responseTrend.add(response.getResponseTime() / 1000); + } }; export function teardown() { diff --git a/example/nebula-test.js b/example/nebula-test.js index 4d33b0c..5791202 100644 --- a/example/nebula-test.js +++ b/example/nebula-test.js @@ -37,11 +37,13 @@ export default function(data) { let ngql = 'go 2 steps from {0} over KNOWS yield dst(edge)'.format(d) let response = session.execute(ngql) check(response, { - "IsSucceed": (r) => r.isSucceed() === true + "IsSucceed": (r) => r !== null && r.isSucceed() === true }); // add trend - latencyTrend.add(response.getLatency() / 1000); - responseTrend.add(response.getResponseTime() / 1000); + if (response !== null) { + latencyTrend.add(response.getLatency() / 1000); + responseTrend.add(response.getResponseTime() / 1000); + } }; export function teardown() { diff --git a/go.mod b/go.mod index 0226822..df5cc38 100644 --- a/go.mod +++ b/go.mod @@ -4,15 +4,15 @@ go 1.19 require ( github.com/go-echarts/go-echarts/v2 v2.2.4 + github.com/sirupsen/logrus v1.9.0 github.com/spf13/cobra v1.4.0 - github.com/vesoft-inc/nebula-go/v3 v3.5.0 + github.com/vesoft-inc/nebula-go/v3 v3.6.1 go.k6.io/k6 v0.45.1 ) require ( github.com/dlclark/regexp2 v1.9.0 // indirect github.com/dop251/goja v0.0.0-20230531210528-d7324b2d74f7 // indirect - github.com/facebook/fbthrift v0.31.1-0.20211129061412-801ed7f9f295 // indirect github.com/fatih/color v1.15.0 // indirect github.com/go-sourcemap/sourcemap v2.1.4-0.20211119122758-180fcef48034+incompatible // indirect github.com/google/pprof v0.0.0-20230207041349-798e818bf904 // indirect @@ -25,9 +25,10 @@ require ( github.com/onsi/ginkgo v1.16.5 // indirect github.com/onsi/gomega v1.27.10 // indirect github.com/serenize/snaker v0.0.0-20201027110005-a7ad2135616e // indirect - github.com/sirupsen/logrus v1.9.0 // indirect github.com/spf13/afero v1.1.2 // indirect github.com/spf13/pflag v1.0.5 // indirect + github.com/vesoft-inc/fbthrift v0.0.0-20230214024353-fa2f34755b28 // indirect + golang.org/x/net v0.12.0 // indirect golang.org/x/sys v0.10.0 // indirect golang.org/x/text v0.11.0 // indirect golang.org/x/time v0.3.0 // indirect diff --git a/go.sum b/go.sum index 21e10f6..7612616 100644 --- a/go.sum +++ b/go.sum @@ -16,8 +16,6 @@ github.com/dop251/goja v0.0.0-20230531210528-d7324b2d74f7 h1:cVGkvrdHgyBkYeB6kMC github.com/dop251/goja v0.0.0-20230531210528-d7324b2d74f7/go.mod h1:QMWlm50DNe14hD7t24KEqZuUdC9sOTy8W6XbCU1mlw4= github.com/dop251/goja_nodejs v0.0.0-20210225215109-d91c329300e7/go.mod h1:hn7BA7c8pLvoGndExHudxTDKZ84Pyvv+90pbBjbTz0Y= github.com/dop251/goja_nodejs v0.0.0-20211022123610-8dd9abb0616d/go.mod h1:DngW8aVqWbuLRMHItjPUyqdj+HWPvnQe8V8y1nDpIbM= -github.com/facebook/fbthrift v0.31.1-0.20211129061412-801ed7f9f295 h1:ZA+qQ3d2In0RNzVpk+D/nq1sjDSv+s1Wy2zrAPQAmsg= -github.com/facebook/fbthrift v0.31.1-0.20211129061412-801ed7f9f295/go.mod h1:2tncLx5rmw69e5kMBv/yJneERbzrr1yr5fdlnTbu8lU= github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs= github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= @@ -98,8 +96,10 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5 github.com/stretchr/testify v1.6.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= -github.com/vesoft-inc/nebula-go/v3 v3.5.0 h1:2ZSkoBxtIfs15AXJXqrAPDPd0Z9HrzKR7YKXPqlJcR0= -github.com/vesoft-inc/nebula-go/v3 v3.5.0/go.mod h1:+sXv05jYQBARdTbTcIEsWVXCnF/6ttOlDK35xQ6m54s= +github.com/vesoft-inc/fbthrift v0.0.0-20230214024353-fa2f34755b28 h1:gpoPCGeOEuk/TnoY9nLVK1FoBM5ie7zY3BPVG8q43ME= +github.com/vesoft-inc/fbthrift v0.0.0-20230214024353-fa2f34755b28/go.mod h1:xu7e9za8StcJhBZmCDwK1Hyv4/Y0xFsjS+uqp10ECJg= +github.com/vesoft-inc/nebula-go/v3 v3.6.1 h1:RHdt8WC+jmrRqM9r9WWzz4tzM8VrykPHe9RhtLZjSVA= +github.com/vesoft-inc/nebula-go/v3 v3.6.1/go.mod h1:mjMPlpNKnHYhe1pWz4caT7x9R+wKoX7dIm6u1+Rdcws= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.k6.io/k6 v0.45.1 h1:z+iVxE7Qze2Ka8tKvnjerOsoTuQb8e27Vqd1wcG2IFI= @@ -108,7 +108,7 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g= +golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -118,7 +118,9 @@ golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= golang.org/x/net v0.12.0 h1:cfawfvKITfUsFCeJIHJrbSxpeu/E81khclypR0GVT50= +golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -139,16 +141,19 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4= golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= diff --git a/pkg/nebulagraph/client.go b/pkg/nebulagraph/client.go index f1b6dda..c650adc 100644 --- a/pkg/nebulagraph/client.go +++ b/pkg/nebulagraph/client.go @@ -23,12 +23,14 @@ type ( DataCh chan common.Data OutputCh chan []string initialized bool + closed bool mutex sync.Mutex csvReader common.ICsvReader connPool *graph.ConnectionPool sessPool *graph.SessionPool clients []common.IGraphClient graphOption *common.GraphOption + logger logger } // GraphClient a wrapper for nebula client, could read data from DataCh @@ -36,6 +38,7 @@ type ( Client *graph.Session Pool *GraphPool DataCh chan common.Data + logger logger } // Response a wrapper for nebula resultSet @@ -56,6 +59,14 @@ type ( errorMsg string firstRecord string } + + logger interface { + Info(msg string) + Warn(msg string) + Debug(msg string) + Error(msg string) + Fatal(msg string) + } ) var _ common.IGraphClient = &GraphClient{} @@ -102,10 +113,16 @@ func (gp *GraphPool) Init() (common.IGraphClientPool, error) { var ( err error ) + gp.mutex.Lock() + defer gp.mutex.Unlock() if gp.initialized { return gp, nil } + if gp.closed { + return nil, fmt.Errorf("pool has been closed") + } + gp.logger.Debug("initializing graph pool") switch gp.graphOption.PoolPolicy { case string(common.ConnectionPool): err = gp.initConnectionPool() @@ -204,7 +221,7 @@ func (gp *GraphPool) initSessionPool() error { if err != nil { return err } - pool, err := graph.NewSessionPool(*conf, graph.DefaultLogger{}) + pool, err := graph.NewSessionPool(*conf, gp.logger) if err != nil { return err } @@ -259,6 +276,7 @@ func (gp *GraphPool) Close() error { if gp.sessPool != nil { gp.sessPool.Close() } + gp.closed = true return nil } @@ -279,11 +297,11 @@ func (gp *GraphPool) GetSession() (common.IGraphClient, error) { if err != nil { return nil, err } - s := &GraphClient{Client: c, Pool: gp, DataCh: gp.DataCh} + s := &GraphClient{Client: c, Pool: gp, DataCh: gp.DataCh, logger: gp.logger} gp.clients = append(gp.clients, s) return s, nil } else { - s := &GraphClient{Client: nil, Pool: gp, DataCh: gp.DataCh} + s := &GraphClient{Client: nil, Pool: gp, DataCh: gp.DataCh, logger: gp.logger} return s, nil } @@ -298,7 +316,7 @@ func (gp *GraphPool) SetOption(option *common.GraphOption) error { return err } bs, _ := json.Marshal(gp.graphOption) - fmt.Printf("testing option: %s\n", bs) + gp.logger.Debug(fmt.Sprintf("testing option: %s\n", bs)) return nil } @@ -341,7 +359,7 @@ func (gc *GraphClient) executeRetry(stmt string) (*graph.ResultSet, error) { return resp, fmt.Errorf("retry timeout") } if err != nil { - fmt.Println("execute error: ", err) + gc.logger.Warn(fmt.Sprintf("execute error: %s", err.Error())) continue } diff --git a/pkg/nebulagraph/module.go b/pkg/nebulagraph/module.go new file mode 100644 index 0000000..06e22f8 --- /dev/null +++ b/pkg/nebulagraph/module.go @@ -0,0 +1,60 @@ +package nebulagraph + +import ( + "github.com/sirupsen/logrus" + "go.k6.io/k6/js/modules" +) + +var _ modules.Module = &K6Module{} + +// refer: https://k6.io/docs/extensions/get-started/create/javascript-extensions/#use-the-advanced-module-api +// K6Module is a module for k6, using the advanced module API +type K6Module struct { + pool *GraphPool +} + +type K6NebulaInstance struct { + vu modules.VU + pool *GraphPool +} + +type loggerWrapper struct { + log logrus.FieldLogger +} + +func (l *loggerWrapper) Info(msg string) { + l.log.Info(msg) +} +func (l *loggerWrapper) Warn(msg string) { + l.log.Warn(msg) +} +func (l *loggerWrapper) Debug(msg string) { + l.log.Debug(msg) +} +func (l *loggerWrapper) Error(msg string) { + l.log.Error(msg) +} +func (l *loggerWrapper) Fatal(msg string) { + l.log.Fatal(msg) +} + +func NewModule() *K6Module { + return &K6Module{ + pool: NewNebulaGraph(), + } +} + +func (m *K6Module) NewModuleInstance(vu modules.VU) modules.Instance { + return &K6NebulaInstance{ + vu: vu, + pool: m.pool, + } +} + +func (i *K6NebulaInstance) Exports() modules.Exports { + logger := i.vu.InitEnv().Logger + i.pool.logger = &loggerWrapper{log: logger} + return modules.Exports{ + Default: i.pool, + } +} diff --git a/register.go b/register.go index 61b907f..38e9eb4 100644 --- a/register.go +++ b/register.go @@ -8,7 +8,7 @@ import ( ) func init() { - modules.Register("k6/x/nebulagraph", nebulagraph.NewNebulaGraph()) + modules.Register("k6/x/nebulagraph", nebulagraph.NewModule()) output.RegisterExtension("aggcsv", func(p output.Params) (output.Output, error) { return aggcsv.New(p) })