Skip to content

Commit

Permalink
use internal client
Browse files Browse the repository at this point in the history
Signed-off-by: Kosuke Morimoto <[email protected]>
  • Loading branch information
kmrmt committed Aug 19, 2020
1 parent cd8be5b commit eafcc0e
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 47 deletions.
3 changes: 2 additions & 1 deletion cmd/tools/cli/loadtest/sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ logging:
logger: glg
level: info
format: raw
service: agent
service: gateway
operation: insert
dataset: fashion-mnist
concurrency: 100
batch_size: 10
progress_duration: 3s
resolve_dns: false
addr: "localhost:8082"
client:
addrs: []
Expand Down
1 change: 1 addition & 0 deletions pkg/tools/cli/loadtest/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ type Data struct {
Concurrency int `json:"concurrency" yaml:"concurrency"`
BatchSize int `json:"batch_size" yaml:"batch_size"`
ProgressDuration string `json:"progress_duration" yaml:"progress_duration"`
ResolveDNS bool `json:"resolve_dns" yaml:"resolve_dns"`
Client *config.GRPCClient `json:"client" yaml:"client"`
}

Expand Down
54 changes: 16 additions & 38 deletions pkg/tools/cli/loadtest/service/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,10 @@ import (
"github.com/vdaas/vald/internal/errgroup"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/log"
igrpc "github.com/vdaas/vald/internal/net/grpc"
"github.com/vdaas/vald/internal/net/grpc"
"github.com/vdaas/vald/internal/safety"
"github.com/vdaas/vald/pkg/tools/cli/loadtest/assets"
"github.com/vdaas/vald/pkg/tools/cli/loadtest/config"

"google.golang.org/grpc" // TODO: related to #557
)

// Loader is representation of load test
Expand All @@ -47,7 +45,7 @@ type (

type loader struct {
eg errgroup.Group
client igrpc.Client
client grpc.Client
addr string
concurrency int
batchSize int
Expand Down Expand Up @@ -211,29 +209,14 @@ func (l *loader) do(ctx context.Context, f func(interface{}, error), notify func
err = nil
}
}()
// TODO: related to #557
/*
_, err := l.client.Do(egctx, l.addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) {
st, err := l.loaderFunc(ctx, conn, nil, copts...)
if err != nil {
return nil, err
}
return nil, grpc.BidirectionalStreamClient(st.(grpc.ClientStream), l.dataProvider, newData, f)
})
*/
conn, err := grpc.Dial(l.addr, grpc.WithInsecure())
if err != nil {
return err
}
defer notify(egctx, conn.Close())
st, err := l.loaderFunc(egctx, conn, nil)
if err != nil {
return err
}
if err := igrpc.BidirectionalStreamClient(st.(grpc.ClientStream), l.dataProvider, newData, f); err != nil {
return err
}
return nil
_, err = l.client.Do(egctx, l.addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) {
st, err := l.loaderFunc(ctx, conn, nil, copts...)
if err != nil {
return nil, err
}
return nil, grpc.BidirectionalStreamClient(st.(grpc.ClientStream), l.dataProvider, newData, f)
})
return err
}))
err = eg.Wait()
case config.Insert, config.Search:
Expand All @@ -250,18 +233,13 @@ func (l *loader) do(ctx context.Context, f func(interface{}, error), notify func
notify(egctx, err)
err = nil
}()
conn, err := grpc.Dial(l.addr, grpc.WithInsecure())
if err != nil {
return err
}
defer notify(egctx, conn.Close())
_, err = l.client.Do(egctx, l.addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) {
res, err := l.loaderFunc(egctx, conn, r)
f(res, err)
return res, err
})

res, err := l.loaderFunc(egctx, conn, r)
f(res, err)
if err != nil {
return err
}
return nil
return err
}))
}
err = eg.Wait()
Expand Down
14 changes: 6 additions & 8 deletions pkg/tools/cli/loadtest/usecase/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func New(cfg *config.Data) (r runner.Runner, err error) {
cfg.Client.Opts(),
grpc.WithAddrs(cfg.Addr),
grpc.WithErrGroup(run.eg),
grpc.WithResolveDNS(cfg.ResolveDNS),
)
run.client = grpc.New(clientOpts...)

Expand Down Expand Up @@ -72,13 +73,10 @@ func (r *run) PreStart(ctx context.Context) (err error) {

// Start runs load test and returns error if occurred.
func (r *run) Start(ctx context.Context) (<-chan error, error) {
// TODO: related to #557
/*
rech, err := r.client.StartConnectionMonitor(ctx)
if err != nil {
return nil, err
}
*/
rech, err := r.client.StartConnectionMonitor(ctx)
if err != nil {
return nil, err
}

lech := r.loader.Do(ctx)

Expand All @@ -103,7 +101,7 @@ func (r *run) Start(ctx context.Context) (<-chan error, error) {
select {
case <-ctx.Done():
return finalize()
//case err = <-rech: // TODO: related to #557
case err = <-rech:
case err = <-lech:
}
if err != nil {
Expand Down

0 comments on commit eafcc0e

Please sign in to comment.