Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

simple meta load testing #56

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions example/meta.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import meta from 'k6/x/nebulameta';

let client = meta.open("192.168.15.31", 10510)

export default function(data) {
client.auth("root", "nebula", "192.168.15.31:10210")
};

export function teardown() {
meta.close()
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/sirupsen/logrus v1.9.0
github.com/spf13/cobra v1.4.0
github.com/stretchr/testify v1.8.2
github.com/vesoft-inc/fbthrift v0.0.0-20230214024353-fa2f34755b28
github.com/vesoft-inc/nebula-go/v3 v3.6.1
go.k6.io/k6 v0.45.1
)
Expand All @@ -31,7 +32,6 @@ require (
github.com/serenize/snaker v0.0.0-20201027110005-a7ad2135616e // indirect
github.com/spf13/afero v1.1.2 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/vesoft-inc/fbthrift v0.0.0-20230214024353-fa2f34755b28 // indirect
golang.org/x/net v0.12.0 // indirect
golang.org/x/sys v0.10.0 // indirect
golang.org/x/text v0.11.0 // indirect
Expand Down
6 changes: 5 additions & 1 deletion pkg/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type (
CsvOption `json:",inline"`
RetryOption `json:",inline"`
SSLOption `json:",inline"`
Http2Option `json:",inline"`
}

PoolOption struct {
Expand All @@ -68,7 +69,6 @@ type (
Username string `json:"username"`
Password string `json:"password"`
Space string `json:"space"`
UseHttp bool `json:"use_http"`
}

OutputOption struct {
Expand All @@ -94,6 +94,10 @@ type (
RetryIntervalUs int `json:"retry_interval_us"`
RetryTimeoutUs int `json:"retry_timeout_us"`
}
Http2Option struct {
HttpEnable bool `json:"http_enable"`
HttpHeader map[string][]string `json:"http_header"`
}
)

const (
Expand Down
11 changes: 5 additions & 6 deletions pkg/nebulagraph/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ func (gp *GraphPool) initConnectionPool() error {
conf.MinConnPoolSize = gp.graphOption.MinSize
conf.TimeOut = time.Duration(gp.graphOption.TimeoutUs) * time.Microsecond
conf.IdleTime = time.Duration(gp.graphOption.IdleTimeUs) * time.Microsecond
conf.UseHTTP2 = gp.graphOption.HttpEnable
conf.HttpHeader = gp.graphOption.HttpHeader

var sslConfig *tls.Config
if gp.graphOption.SslCaPemPath != "" {
var err error
Expand All @@ -181,10 +184,6 @@ func (gp *GraphPool) initConnectionPool() error {
return err
}
}
if gp.graphOption.UseHttp {
conf.UseHTTP2 = true
}

pool, err := graph.NewSslConnectionPool(hosts, conf, sslConfig, graph.DefaultLogger{})
if err != nil {
return err
Expand Down Expand Up @@ -221,7 +220,8 @@ func (gp *GraphPool) initSessionPool() error {
graph.WithMaxSize(gp.graphOption.MaxSize),
graph.WithMinSize(gp.graphOption.MinSize),
graph.WithSSLConfig(sslConfig),
graph.WithHTTP2(gp.graphOption.UseHttp),
graph.WithHTTP2(gp.graphOption.HttpEnable),
graph.WithHttpHeader(gp.graphOption.HttpHeader),
)
if err != nil {
return err
Expand Down Expand Up @@ -383,7 +383,6 @@ func (gc *GraphClient) executeRetry(stmt string) (*graph.ResultSet, error) {

// Execute executes nebula query
func (gc *GraphClient) Execute(stmt string) (common.IGraphResponse, error) {
stmt = common.ProcessStmt(stmt)
start := time.Now()
var (
o *output
Expand Down
92 changes: 92 additions & 0 deletions pkg/nebulameta/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package nebulameta

import (
"fmt"
"net"
"strconv"

"github.com/vesoft-inc/fbthrift/thrift/lib/go/thrift"
"github.com/vesoft-inc/nebula-go/v3/nebula"
"github.com/vesoft-inc/nebula-go/v3/nebula/meta"
)

type module struct {
clients []*metaClient
}

type metaClient struct {
client *meta.MetaServiceClient
host string
port int
}

func New() *module {
return &module{}
}

func (c *module) Open(host string, port int) (*metaClient, error) {
client := &metaClient{
host: host,
port: port,
}
if err := client.open(); err != nil {
return nil, err
}
c.clients = append(c.clients, client)
return client, nil
}

func (c *metaClient) open() error {
newAdd := net.JoinHostPort(c.host, strconv.Itoa(c.port))
sock, err := thrift.NewSocket(thrift.SocketAddr(newAdd))
if err != nil {
return err
}
// Set transport
bufferSize := 128 << 10
bufferedTranFactory := thrift.NewBufferedTransportFactory(bufferSize)
transport := thrift.NewHeaderTransport(bufferedTranFactory.GetTransport(sock))
pf := thrift.NewHeaderProtocolFactory()

c.client = meta.NewMetaServiceClientFactory(transport, pf)
if err := transport.Open(); err != nil {
return err
}
return nil
}

func (c *metaClient) Auth(username, password string, graphAddr string) error {
req := meta.NewCreateSessionReq()
req.User = []byte(username)
h, p, err := net.SplitHostPort(graphAddr)
if err != nil {
return err
}
port, err := strconv.Atoi(p)
if err != nil {
return err
}
req.GraphAddr = &nebula.HostAddr{
Host: h,
Port: int32(port),
}
resp, err := c.client.CreateSession(req)

if err != nil {
return err
}
if resp.GetCode() != nebula.ErrorCode_SUCCEEDED {
return fmt.Errorf("auth failed, code: %d", resp.GetCode())
}
return nil
}

func (c *metaClient) Close() error {
return c.client.Close()
}

func (m *module) Close() {
for _, c := range m.clients {
c.Close()
}
}
2 changes: 2 additions & 0 deletions register.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package k6plugin
import (
"github.com/vesoft-inc/k6-plugin/pkg/aggcsv"
"github.com/vesoft-inc/k6-plugin/pkg/nebulagraph"
"github.com/vesoft-inc/k6-plugin/pkg/nebulameta"
"go.k6.io/k6/js/modules"
"go.k6.io/k6/output"
)

func init() {
modules.Register("k6/x/nebulagraph", nebulagraph.NewModule())
modules.Register("k6/x/nebulameta", nebulameta.New())
output.RegisterExtension("aggcsv", func(p output.Params) (output.Output, error) {
return aggcsv.New(p)
})
Expand Down
Loading