From 6ff3b54790651d91bc28a331f6087264dcf3f5e9 Mon Sep 17 00:00:00 2001 From: Kosuke Morimoto Date: Fri, 7 Aug 2020 14:15:09 +0900 Subject: [PATCH 1/3] use internal client Signed-off-by: Kosuke Morimoto --- cmd/tools/cli/loadtest/sample.yaml | 3 +- pkg/tools/cli/loadtest/config/config.go | 1 + pkg/tools/cli/loadtest/service/loader.go | 54 +++++++----------------- pkg/tools/cli/loadtest/usecase/load.go | 14 +++--- 4 files changed, 25 insertions(+), 47 deletions(-) diff --git a/cmd/tools/cli/loadtest/sample.yaml b/cmd/tools/cli/loadtest/sample.yaml index 509a61b327..fba176bb22 100644 --- a/cmd/tools/cli/loadtest/sample.yaml +++ b/cmd/tools/cli/loadtest/sample.yaml @@ -21,12 +21,13 @@ logging: logger: glg level: info format: raw -service: agent +service: gateway operation: insert dataset: fashion-mnist concurrency: 100 batch_size: 10 progress_duration: 3s +resolve_dns: false addr: "localhost:8082" client: addrs: [] diff --git a/pkg/tools/cli/loadtest/config/config.go b/pkg/tools/cli/loadtest/config/config.go index 6b81a124a8..9da6149beb 100644 --- a/pkg/tools/cli/loadtest/config/config.go +++ b/pkg/tools/cli/loadtest/config/config.go @@ -115,6 +115,7 @@ type Data struct { Concurrency int `json:"concurrency" yaml:"concurrency"` BatchSize int `json:"batch_size" yaml:"batch_size"` ProgressDuration string `json:"progress_duration" yaml:"progress_duration"` + ResolveDNS bool `json:"resolve_dns" yaml:"resolve_dns"` Client *config.GRPCClient `json:"client" yaml:"client"` } diff --git a/pkg/tools/cli/loadtest/service/loader.go b/pkg/tools/cli/loadtest/service/loader.go index a02de0964e..1aef2e6356 100644 --- a/pkg/tools/cli/loadtest/service/loader.go +++ b/pkg/tools/cli/loadtest/service/loader.go @@ -27,12 +27,10 @@ import ( "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/log" - igrpc "github.com/vdaas/vald/internal/net/grpc" + "github.com/vdaas/vald/internal/net/grpc" "github.com/vdaas/vald/internal/safety" "github.com/vdaas/vald/pkg/tools/cli/loadtest/assets" "github.com/vdaas/vald/pkg/tools/cli/loadtest/config" - - "google.golang.org/grpc" // TODO: related to #557 ) // Loader is representation of load test @@ -47,7 +45,7 @@ type ( type loader struct { eg errgroup.Group - client igrpc.Client + client grpc.Client addr string concurrency int batchSize int @@ -211,29 +209,14 @@ func (l *loader) do(ctx context.Context, f func(interface{}, error), notify func err = nil } }() - // TODO: related to #557 - /* - _, err := l.client.Do(egctx, l.addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) { - st, err := l.loaderFunc(ctx, conn, nil, copts...) - if err != nil { - return nil, err - } - return nil, grpc.BidirectionalStreamClient(st.(grpc.ClientStream), l.dataProvider, newData, f) - }) - */ - conn, err := grpc.Dial(l.addr, grpc.WithInsecure()) - if err != nil { - return err - } - defer notify(egctx, conn.Close()) - st, err := l.loaderFunc(egctx, conn, nil) - if err != nil { - return err - } - if err := igrpc.BidirectionalStreamClient(st.(grpc.ClientStream), l.dataProvider, newData, f); err != nil { - return err - } - return nil + _, err = l.client.Do(egctx, l.addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) { + st, err := l.loaderFunc(ctx, conn, nil, copts...) + if err != nil { + return nil, err + } + return nil, grpc.BidirectionalStreamClient(st.(grpc.ClientStream), l.dataProvider, newData, f) + }) + return err })) err = eg.Wait() case config.Insert, config.Search: @@ -250,18 +233,13 @@ func (l *loader) do(ctx context.Context, f func(interface{}, error), notify func notify(egctx, err) err = nil }() - conn, err := grpc.Dial(l.addr, grpc.WithInsecure()) - if err != nil { - return err - } - defer notify(egctx, conn.Close()) + _, err = l.client.Do(egctx, l.addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) { + res, err := l.loaderFunc(egctx, conn, r) + f(res, err) + return res, err + }) - res, err := l.loaderFunc(egctx, conn, r) - f(res, err) - if err != nil { - return err - } - return nil + return err })) } err = eg.Wait() diff --git a/pkg/tools/cli/loadtest/usecase/load.go b/pkg/tools/cli/loadtest/usecase/load.go index 958a0b933d..9482259a34 100644 --- a/pkg/tools/cli/loadtest/usecase/load.go +++ b/pkg/tools/cli/loadtest/usecase/load.go @@ -45,6 +45,7 @@ func New(cfg *config.Data) (r runner.Runner, err error) { cfg.Client.Opts(), grpc.WithAddrs(cfg.Addr), grpc.WithErrGroup(run.eg), + grpc.WithResolveDNS(cfg.ResolveDNS), ) run.client = grpc.New(clientOpts...) @@ -72,13 +73,10 @@ func (r *run) PreStart(ctx context.Context) (err error) { // Start runs load test and returns error if occurred. func (r *run) Start(ctx context.Context) (<-chan error, error) { - // TODO: related to #557 - /* - rech, err := r.client.StartConnectionMonitor(ctx) - if err != nil { - return nil, err - } - */ + rech, err := r.client.StartConnectionMonitor(ctx) + if err != nil { + return nil, err + } lech := r.loader.Do(ctx) @@ -103,7 +101,7 @@ func (r *run) Start(ctx context.Context) (<-chan error, error) { select { case <-ctx.Done(): return finalize() - //case err = <-rech: // TODO: related to #557 + case err = <-rech: case err = <-lech: } if err != nil { From a6c3d8fd39381e543c1ddab541673217352c2af8 Mon Sep 17 00:00:00 2001 From: Kosuke Morimoto Date: Wed, 12 Aug 2020 15:19:24 +0900 Subject: [PATCH 2/3] fix test --- pkg/tools/cli/loadtest/service/insert_test.go | 8 +++----- pkg/tools/cli/loadtest/service/loader_test.go | 8 ++++---- pkg/tools/cli/loadtest/service/search_test.go | 6 +++--- 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/pkg/tools/cli/loadtest/service/insert_test.go b/pkg/tools/cli/loadtest/service/insert_test.go index 6e94bb023e..62471bc4af 100644 --- a/pkg/tools/cli/loadtest/service/insert_test.go +++ b/pkg/tools/cli/loadtest/service/insert_test.go @@ -22,12 +22,10 @@ import ( "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" - igrpc "github.com/vdaas/vald/internal/net/grpc" + "github.com/vdaas/vald/internal/net/grpc" "github.com/vdaas/vald/pkg/tools/cli/loadtest/assets" "github.com/vdaas/vald/pkg/tools/cli/loadtest/config" "go.uber.org/goleak" - - "google.golang.org/grpc" // TODO: related to #557 ) func Test_insertRequestProvider(t *testing.T) { @@ -552,7 +550,7 @@ func Test_bulkInsert(t *testing.T) { func Test_loader_newInsert(t *testing.T) { type fields struct { eg errgroup.Group - client igrpc.Client + client grpc.Client addr string concurrency int batchSize int @@ -674,7 +672,7 @@ func Test_loader_newInsert(t *testing.T) { func Test_loader_newStreamInsert(t *testing.T) { type fields struct { eg errgroup.Group - client igrpc.Client + client grpc.Client addr string concurrency int batchSize int diff --git a/pkg/tools/cli/loadtest/service/loader_test.go b/pkg/tools/cli/loadtest/service/loader_test.go index 1ec1ed6081..c601c1942d 100644 --- a/pkg/tools/cli/loadtest/service/loader_test.go +++ b/pkg/tools/cli/loadtest/service/loader_test.go @@ -23,7 +23,7 @@ import ( "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" - igrpc "github.com/vdaas/vald/internal/net/grpc" + "github.com/vdaas/vald/internal/net/grpc" "github.com/vdaas/vald/pkg/tools/cli/loadtest/config" "go.uber.org/goleak" ) @@ -109,7 +109,7 @@ func Test_loader_Prepare(t *testing.T) { } type fields struct { eg errgroup.Group - client igrpc.Client + client grpc.Client addr string concurrency int batchSize int @@ -237,7 +237,7 @@ func Test_loader_Do(t *testing.T) { } type fields struct { eg errgroup.Group - client igrpc.Client + client grpc.Client addr string concurrency int batchSize int @@ -367,7 +367,7 @@ func Test_loader_do(t *testing.T) { } type fields struct { eg errgroup.Group - client igrpc.Client + client grpc.Client addr string concurrency int batchSize int diff --git a/pkg/tools/cli/loadtest/service/search_test.go b/pkg/tools/cli/loadtest/service/search_test.go index 5766232479..343c1c653f 100644 --- a/pkg/tools/cli/loadtest/service/search_test.go +++ b/pkg/tools/cli/loadtest/service/search_test.go @@ -22,7 +22,7 @@ import ( "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" - igrpc "github.com/vdaas/vald/internal/net/grpc" + "github.com/vdaas/vald/internal/net/grpc" "github.com/vdaas/vald/pkg/tools/cli/loadtest/assets" "github.com/vdaas/vald/pkg/tools/cli/loadtest/config" "go.uber.org/goleak" @@ -110,7 +110,7 @@ func Test_searchRequestProvider(t *testing.T) { func Test_loader_newSearch(t *testing.T) { type fields struct { eg errgroup.Group - client igrpc.Client + client grpc.Client addr string concurrency int batchSize int @@ -232,7 +232,7 @@ func Test_loader_newSearch(t *testing.T) { func Test_loader_newStreamSearch(t *testing.T) { type fields struct { eg errgroup.Group - client igrpc.Client + client grpc.Client addr string concurrency int batchSize int From f31bd51da3e76828fc72c8cdf5f48bdfde18973a Mon Sep 17 00:00:00 2001 From: Kosuke Morimoto Date: Fri, 21 Aug 2020 14:07:32 +0900 Subject: [PATCH 3/3] use config.GRPCClient.ConnectionPool.ResolveDNS Signed-off-by: Kosuke Morimoto --- cmd/tools/cli/loadtest/sample.yaml | 3 ++- pkg/tools/cli/loadtest/config/config.go | 1 - pkg/tools/cli/loadtest/usecase/load.go | 1 - 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/cmd/tools/cli/loadtest/sample.yaml b/cmd/tools/cli/loadtest/sample.yaml index fba176bb22..39178f9d19 100644 --- a/cmd/tools/cli/loadtest/sample.yaml +++ b/cmd/tools/cli/loadtest/sample.yaml @@ -27,7 +27,6 @@ dataset: fashion-mnist concurrency: 100 batch_size: 10 progress_duration: 3s -resolve_dns: false addr: "localhost:8082" client: addrs: [] @@ -45,6 +44,8 @@ client: max_retry_rpc_buffer_size: 0 max_send_msg_size: 0 wait_for_ready: true + connection_pool: + enable_dns_resolver: true dial_option: enable_backoff: false initial_connection_window_size: 0 diff --git a/pkg/tools/cli/loadtest/config/config.go b/pkg/tools/cli/loadtest/config/config.go index 9da6149beb..6b81a124a8 100644 --- a/pkg/tools/cli/loadtest/config/config.go +++ b/pkg/tools/cli/loadtest/config/config.go @@ -115,7 +115,6 @@ type Data struct { Concurrency int `json:"concurrency" yaml:"concurrency"` BatchSize int `json:"batch_size" yaml:"batch_size"` ProgressDuration string `json:"progress_duration" yaml:"progress_duration"` - ResolveDNS bool `json:"resolve_dns" yaml:"resolve_dns"` Client *config.GRPCClient `json:"client" yaml:"client"` } diff --git a/pkg/tools/cli/loadtest/usecase/load.go b/pkg/tools/cli/loadtest/usecase/load.go index 9482259a34..83b5ad4037 100644 --- a/pkg/tools/cli/loadtest/usecase/load.go +++ b/pkg/tools/cli/loadtest/usecase/load.go @@ -45,7 +45,6 @@ func New(cfg *config.Data) (r runner.Runner, err error) { cfg.Client.Opts(), grpc.WithAddrs(cfg.Addr), grpc.WithErrGroup(run.eg), - grpc.WithResolveDNS(cfg.ResolveDNS), ) run.client = grpc.New(clientOpts...)