Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use internal client #618

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/tools/cli/loadtest/sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ logging:
logger: glg
level: info
format: raw
service: agent
service: gateway
operation: insert
dataset: fashion-mnist
concurrency: 100
Expand All @@ -44,6 +44,8 @@ client:
max_retry_rpc_buffer_size: 0
max_send_msg_size: 0
wait_for_ready: true
connection_pool:
enable_dns_resolver: true
dial_option:
enable_backoff: false
initial_connection_window_size: 0
Expand Down
8 changes: 3 additions & 5 deletions pkg/tools/cli/loadtest/service/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@ import (

"github.com/vdaas/vald/internal/errgroup"
"github.com/vdaas/vald/internal/errors"
igrpc "github.com/vdaas/vald/internal/net/grpc"
"github.com/vdaas/vald/internal/net/grpc"
"github.com/vdaas/vald/pkg/tools/cli/loadtest/assets"
"github.com/vdaas/vald/pkg/tools/cli/loadtest/config"
"go.uber.org/goleak"

"google.golang.org/grpc" // TODO: related to #557
)

func Test_insertRequestProvider(t *testing.T) {
Expand Down Expand Up @@ -552,7 +550,7 @@ func Test_bulkInsert(t *testing.T) {
func Test_loader_newInsert(t *testing.T) {
type fields struct {
eg errgroup.Group
client igrpc.Client
client grpc.Client
addr string
concurrency int
batchSize int
Expand Down Expand Up @@ -674,7 +672,7 @@ func Test_loader_newInsert(t *testing.T) {
func Test_loader_newStreamInsert(t *testing.T) {
type fields struct {
eg errgroup.Group
client igrpc.Client
client grpc.Client
addr string
concurrency int
batchSize int
Expand Down
54 changes: 16 additions & 38 deletions pkg/tools/cli/loadtest/service/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,10 @@ import (
"github.com/vdaas/vald/internal/errgroup"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/log"
igrpc "github.com/vdaas/vald/internal/net/grpc"
"github.com/vdaas/vald/internal/net/grpc"
"github.com/vdaas/vald/internal/safety"
"github.com/vdaas/vald/pkg/tools/cli/loadtest/assets"
"github.com/vdaas/vald/pkg/tools/cli/loadtest/config"

"google.golang.org/grpc" // TODO: related to #557
)

// Loader is representation of load test
Expand All @@ -47,7 +45,7 @@ type (

type loader struct {
eg errgroup.Group
client igrpc.Client
client grpc.Client
addr string
concurrency int
batchSize int
Expand Down Expand Up @@ -211,29 +209,14 @@ func (l *loader) do(ctx context.Context, f func(interface{}, error), notify func
err = nil
}
}()
// TODO: related to #557
/*
_, err := l.client.Do(egctx, l.addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) {
st, err := l.loaderFunc(ctx, conn, nil, copts...)
if err != nil {
return nil, err
}
return nil, grpc.BidirectionalStreamClient(st.(grpc.ClientStream), l.dataProvider, newData, f)
})
*/
conn, err := grpc.Dial(l.addr, grpc.WithInsecure())
if err != nil {
return err
}
defer notify(egctx, conn.Close())
st, err := l.loaderFunc(egctx, conn, nil)
if err != nil {
return err
}
if err := igrpc.BidirectionalStreamClient(st.(grpc.ClientStream), l.dataProvider, newData, f); err != nil {
return err
}
return nil
_, err = l.client.Do(egctx, l.addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) {
vankichi marked this conversation as resolved.
Show resolved Hide resolved
st, err := l.loaderFunc(ctx, conn, nil, copts...)
if err != nil {
return nil, err
}
return nil, grpc.BidirectionalStreamClient(st.(grpc.ClientStream), l.dataProvider, newData, f)
})
return err
}))
err = eg.Wait()
case config.Insert, config.Search:
Expand All @@ -250,18 +233,13 @@ func (l *loader) do(ctx context.Context, f func(interface{}, error), notify func
notify(egctx, err)
err = nil
}()
conn, err := grpc.Dial(l.addr, grpc.WithInsecure())
if err != nil {
return err
}
defer notify(egctx, conn.Close())
_, err = l.client.Do(egctx, l.addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) {
vankichi marked this conversation as resolved.
Show resolved Hide resolved
res, err := l.loaderFunc(egctx, conn, r)
f(res, err)
return res, err
})

res, err := l.loaderFunc(egctx, conn, r)
f(res, err)
if err != nil {
return err
}
return nil
return err
}))
}
err = eg.Wait()
Expand Down
8 changes: 4 additions & 4 deletions pkg/tools/cli/loadtest/service/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

"github.com/vdaas/vald/internal/errgroup"
"github.com/vdaas/vald/internal/errors"
igrpc "github.com/vdaas/vald/internal/net/grpc"
"github.com/vdaas/vald/internal/net/grpc"
"github.com/vdaas/vald/pkg/tools/cli/loadtest/config"
"go.uber.org/goleak"
)
Expand Down Expand Up @@ -109,7 +109,7 @@ func Test_loader_Prepare(t *testing.T) {
}
type fields struct {
eg errgroup.Group
client igrpc.Client
client grpc.Client
addr string
concurrency int
batchSize int
Expand Down Expand Up @@ -237,7 +237,7 @@ func Test_loader_Do(t *testing.T) {
}
type fields struct {
eg errgroup.Group
client igrpc.Client
client grpc.Client
addr string
concurrency int
batchSize int
Expand Down Expand Up @@ -367,7 +367,7 @@ func Test_loader_do(t *testing.T) {
}
type fields struct {
eg errgroup.Group
client igrpc.Client
client grpc.Client
addr string
concurrency int
batchSize int
Expand Down
6 changes: 3 additions & 3 deletions pkg/tools/cli/loadtest/service/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

"github.com/vdaas/vald/internal/errgroup"
"github.com/vdaas/vald/internal/errors"
igrpc "github.com/vdaas/vald/internal/net/grpc"
"github.com/vdaas/vald/internal/net/grpc"
"github.com/vdaas/vald/pkg/tools/cli/loadtest/assets"
"github.com/vdaas/vald/pkg/tools/cli/loadtest/config"
"go.uber.org/goleak"
Expand Down Expand Up @@ -110,7 +110,7 @@ func Test_searchRequestProvider(t *testing.T) {
func Test_loader_newSearch(t *testing.T) {
type fields struct {
eg errgroup.Group
client igrpc.Client
client grpc.Client
addr string
concurrency int
batchSize int
Expand Down Expand Up @@ -232,7 +232,7 @@ func Test_loader_newSearch(t *testing.T) {
func Test_loader_newStreamSearch(t *testing.T) {
type fields struct {
eg errgroup.Group
client igrpc.Client
client grpc.Client
addr string
concurrency int
batchSize int
Expand Down
13 changes: 5 additions & 8 deletions pkg/tools/cli/loadtest/usecase/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,10 @@ func (r *run) PreStart(ctx context.Context) (err error) {

// Start runs load test and returns error if occurred.
func (r *run) Start(ctx context.Context) (<-chan error, error) {
// TODO: related to #557
/*
rech, err := r.client.StartConnectionMonitor(ctx)
if err != nil {
return nil, err
}
*/
rech, err := r.client.StartConnectionMonitor(ctx)
if err != nil {
return nil, err
}

lech := r.loader.Do(ctx)

Expand All @@ -103,7 +100,7 @@ func (r *run) Start(ctx context.Context) (<-chan error, error) {
select {
case <-ctx.Done():
return finalize()
//case err = <-rech: // TODO: related to #557
case err = <-rech:
case err = <-lech:
}
if err != nil {
Expand Down