diff --git a/example/meta.js b/example/meta.js new file mode 100644 index 0000000..2d43bbc --- /dev/null +++ b/example/meta.js @@ -0,0 +1,11 @@ +import meta from 'k6/x/nebulameta'; + +let client = meta.open("192.168.15.31", 10510) + +export default function(data) { + client.auth("root", "nebula", "192.168.15.31:10210") +}; + +export function teardown() { + meta.close() +} diff --git a/go.mod b/go.mod index cb4e6ea..fd965dd 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/sirupsen/logrus v1.9.0 github.com/spf13/cobra v1.4.0 github.com/stretchr/testify v1.8.2 + github.com/vesoft-inc/fbthrift v0.0.0-20230214024353-fa2f34755b28 github.com/vesoft-inc/nebula-go/v3 v3.6.1 go.k6.io/k6 v0.45.1 ) @@ -31,7 +32,6 @@ require ( github.com/serenize/snaker v0.0.0-20201027110005-a7ad2135616e // indirect github.com/spf13/afero v1.1.2 // indirect github.com/spf13/pflag v1.0.5 // indirect - github.com/vesoft-inc/fbthrift v0.0.0-20230214024353-fa2f34755b28 // indirect golang.org/x/net v0.12.0 // indirect golang.org/x/sys v0.10.0 // indirect golang.org/x/text v0.11.0 // indirect diff --git a/pkg/common/types.go b/pkg/common/types.go index 5435c85..2b70c43 100644 --- a/pkg/common/types.go +++ b/pkg/common/types.go @@ -56,6 +56,7 @@ type ( CsvOption `json:",inline"` RetryOption `json:",inline"` SSLOption `json:",inline"` + Http2Option `json:",inline"` } PoolOption struct { @@ -68,7 +69,6 @@ type ( Username string `json:"username"` Password string `json:"password"` Space string `json:"space"` - UseHttp bool `json:"use_http"` } OutputOption struct { @@ -94,6 +94,10 @@ type ( RetryIntervalUs int `json:"retry_interval_us"` RetryTimeoutUs int `json:"retry_timeout_us"` } + Http2Option struct { + HttpEnable bool `json:"http_enable"` + HttpHeader map[string][]string `json:"http_header"` + } ) const ( diff --git a/pkg/nebulagraph/client.go b/pkg/nebulagraph/client.go index d705043..0140777 100644 --- a/pkg/nebulagraph/client.go +++ b/pkg/nebulagraph/client.go @@ -170,6 +170,9 @@ func (gp *GraphPool) initConnectionPool() error { conf.MinConnPoolSize = gp.graphOption.MinSize conf.TimeOut = time.Duration(gp.graphOption.TimeoutUs) * time.Microsecond conf.IdleTime = time.Duration(gp.graphOption.IdleTimeUs) * time.Microsecond + conf.UseHTTP2 = gp.graphOption.HttpEnable + conf.HttpHeader = gp.graphOption.HttpHeader + var sslConfig *tls.Config if gp.graphOption.SslCaPemPath != "" { var err error @@ -181,10 +184,6 @@ func (gp *GraphPool) initConnectionPool() error { return err } } - if gp.graphOption.UseHttp { - conf.UseHTTP2 = true - } - pool, err := graph.NewSslConnectionPool(hosts, conf, sslConfig, graph.DefaultLogger{}) if err != nil { return err @@ -221,7 +220,8 @@ func (gp *GraphPool) initSessionPool() error { graph.WithMaxSize(gp.graphOption.MaxSize), graph.WithMinSize(gp.graphOption.MinSize), graph.WithSSLConfig(sslConfig), - graph.WithHTTP2(gp.graphOption.UseHttp), + graph.WithHTTP2(gp.graphOption.HttpEnable), + graph.WithHttpHeader(gp.graphOption.HttpHeader), ) if err != nil { return err @@ -383,7 +383,6 @@ func (gc *GraphClient) executeRetry(stmt string) (*graph.ResultSet, error) { // Execute executes nebula query func (gc *GraphClient) Execute(stmt string) (common.IGraphResponse, error) { - stmt = common.ProcessStmt(stmt) start := time.Now() var ( o *output diff --git a/pkg/nebulameta/client.go b/pkg/nebulameta/client.go new file mode 100644 index 0000000..50e0281 --- /dev/null +++ b/pkg/nebulameta/client.go @@ -0,0 +1,92 @@ +package nebulameta + +import ( + "fmt" + "net" + "strconv" + + "github.com/vesoft-inc/fbthrift/thrift/lib/go/thrift" + "github.com/vesoft-inc/nebula-go/v3/nebula" + "github.com/vesoft-inc/nebula-go/v3/nebula/meta" +) + +type module struct { + clients []*metaClient +} + +type metaClient struct { + client *meta.MetaServiceClient + host string + port int +} + +func New() *module { + return &module{} +} + +func (c *module) Open(host string, port int) (*metaClient, error) { + client := &metaClient{ + host: host, + port: port, + } + if err := client.open(); err != nil { + return nil, err + } + c.clients = append(c.clients, client) + return client, nil +} + +func (c *metaClient) open() error { + newAdd := net.JoinHostPort(c.host, strconv.Itoa(c.port)) + sock, err := thrift.NewSocket(thrift.SocketAddr(newAdd)) + if err != nil { + return err + } + // Set transport + bufferSize := 128 << 10 + bufferedTranFactory := thrift.NewBufferedTransportFactory(bufferSize) + transport := thrift.NewHeaderTransport(bufferedTranFactory.GetTransport(sock)) + pf := thrift.NewHeaderProtocolFactory() + + c.client = meta.NewMetaServiceClientFactory(transport, pf) + if err := transport.Open(); err != nil { + return err + } + return nil +} + +func (c *metaClient) Auth(username, password string, graphAddr string) error { + req := meta.NewCreateSessionReq() + req.User = []byte(username) + h, p, err := net.SplitHostPort(graphAddr) + if err != nil { + return err + } + port, err := strconv.Atoi(p) + if err != nil { + return err + } + req.GraphAddr = &nebula.HostAddr{ + Host: h, + Port: int32(port), + } + resp, err := c.client.CreateSession(req) + + if err != nil { + return err + } + if resp.GetCode() != nebula.ErrorCode_SUCCEEDED { + return fmt.Errorf("auth failed, code: %d", resp.GetCode()) + } + return nil +} + +func (c *metaClient) Close() error { + return c.client.Close() +} + +func (m *module) Close() { + for _, c := range m.clients { + c.Close() + } +} diff --git a/register.go b/register.go index 38e9eb4..b0f1845 100644 --- a/register.go +++ b/register.go @@ -3,12 +3,14 @@ package k6plugin import ( "github.com/vesoft-inc/k6-plugin/pkg/aggcsv" "github.com/vesoft-inc/k6-plugin/pkg/nebulagraph" + "github.com/vesoft-inc/k6-plugin/pkg/nebulameta" "go.k6.io/k6/js/modules" "go.k6.io/k6/output" ) func init() { modules.Register("k6/x/nebulagraph", nebulagraph.NewModule()) + modules.Register("k6/x/nebulameta", nebulameta.New()) output.RegisterExtension("aggcsv", func(p output.Params) (output.Output, error) { return aggcsv.New(p) })