diff --git a/internal/config/corrector.go b/internal/config/corrector.go index 3e1176c8bb..2b5b56beec 100644 --- a/internal/config/corrector.go +++ b/internal/config/corrector.go @@ -60,3 +60,20 @@ func (c *Corrector) Bind() *Corrector { } return c } + +// GetStreamListConcurrency returns the StreamListConcurrency field value if set, otherwise 200 is set, +// since not setting this could use up all the available momory +func (c *Corrector) GetStreamListConcurrency() int { + if c != nil { + return c.StreamListConcurrency + } + return 200 //nolint:gomnd +} + +// GetBboltAsyncWriteConcurrency returns 2048 when not specified since not setting this could use up all the available momory +func (c *Corrector) GetBboltAsyncWriteConcurrency() int { + if c != nil { + return c.BboltAsyncWriteConcurrency + } + return 2048 //nolint:gomnd +} diff --git a/internal/config/corrector_test.go b/internal/config/corrector_test.go index 0391c4d3aa..a66af0181a 100644 --- a/internal/config/corrector_test.go +++ b/internal/config/corrector_test.go @@ -21,10 +21,10 @@ package config // AgentName string // AgentNamespace string // AgentDNS string +// CreationPoolSize uint32 // NodeName string // StreamListConcurrency int // BboltAsyncWriteConcurrency int -// IndexReplica int // Discoverer *DiscovererClient // } // type want struct { @@ -54,10 +54,10 @@ package config // AgentName:"", // AgentNamespace:"", // AgentDNS:"", +// CreationPoolSize:0, // NodeName:"", // StreamListConcurrency:0, // BboltAsyncWriteConcurrency:0, -// IndexReplica:0, // Discoverer:DiscovererClient{}, // }, // want: want{}, @@ -81,10 +81,10 @@ package config // AgentName:"", // AgentNamespace:"", // AgentDNS:"", +// CreationPoolSize:0, // NodeName:"", // StreamListConcurrency:0, // BboltAsyncWriteConcurrency:0, -// IndexReplica:0, // Discoverer:DiscovererClient{}, // }, // want: want{}, @@ -120,10 +120,10 @@ package config // AgentName: test.fields.AgentName, // AgentNamespace: test.fields.AgentNamespace, // AgentDNS: test.fields.AgentDNS, +// CreationPoolSize: test.fields.CreationPoolSize, // NodeName: test.fields.NodeName, // StreamListConcurrency: test.fields.StreamListConcurrency, // BboltAsyncWriteConcurrency: test.fields.BboltAsyncWriteConcurrency, -// IndexReplica: test.fields.IndexReplica, // Discoverer: test.fields.Discoverer, // } // @@ -135,3 +135,245 @@ package config // }) // } // } +// +// func TestCorrector_GetStreamListConcurrency(t *testing.T) { +// type fields struct { +// AgentPort int +// AgentName string +// AgentNamespace string +// AgentDNS string +// CreationPoolSize uint32 +// NodeName string +// StreamListConcurrency int +// BboltAsyncWriteConcurrency int +// Discoverer *DiscovererClient +// } +// type want struct { +// want int +// } +// type test struct { +// name string +// fields fields +// want want +// checkFunc func(want, int) error +// beforeFunc func(*testing.T) +// afterFunc func(*testing.T) +// } +// defaultCheckFunc := func(w want, got int) error { +// if !reflect.DeepEqual(got, w.want) { +// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) +// } +// return nil +// } +// tests := []test{ +// // TODO test cases +// /* +// { +// name: "test_case_1", +// fields: fields { +// AgentPort:0, +// AgentName:"", +// AgentNamespace:"", +// AgentDNS:"", +// CreationPoolSize:0, +// NodeName:"", +// StreamListConcurrency:0, +// BboltAsyncWriteConcurrency:0, +// Discoverer:DiscovererClient{}, +// }, +// want: want{}, +// checkFunc: defaultCheckFunc, +// beforeFunc: func(t *testing.T,) { +// t.Helper() +// }, +// afterFunc: func(t *testing.T,) { +// t.Helper() +// }, +// }, +// */ +// +// // TODO test cases +// /* +// func() test { +// return test { +// name: "test_case_2", +// fields: fields { +// AgentPort:0, +// AgentName:"", +// AgentNamespace:"", +// AgentDNS:"", +// CreationPoolSize:0, +// NodeName:"", +// StreamListConcurrency:0, +// BboltAsyncWriteConcurrency:0, +// Discoverer:DiscovererClient{}, +// }, +// want: want{}, +// checkFunc: defaultCheckFunc, +// beforeFunc: func(t *testing.T,) { +// t.Helper() +// }, +// afterFunc: func(t *testing.T,) { +// t.Helper() +// }, +// } +// }(), +// */ +// } +// +// for _, tc := range tests { +// test := tc +// t.Run(test.name, func(tt *testing.T) { +// tt.Parallel() +// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) +// if test.beforeFunc != nil { +// test.beforeFunc(tt) +// } +// if test.afterFunc != nil { +// defer test.afterFunc(tt) +// } +// checkFunc := test.checkFunc +// if test.checkFunc == nil { +// checkFunc = defaultCheckFunc +// } +// c := &Corrector{ +// AgentPort: test.fields.AgentPort, +// AgentName: test.fields.AgentName, +// AgentNamespace: test.fields.AgentNamespace, +// AgentDNS: test.fields.AgentDNS, +// CreationPoolSize: test.fields.CreationPoolSize, +// NodeName: test.fields.NodeName, +// StreamListConcurrency: test.fields.StreamListConcurrency, +// BboltAsyncWriteConcurrency: test.fields.BboltAsyncWriteConcurrency, +// Discoverer: test.fields.Discoverer, +// } +// +// got := c.GetStreamListConcurrency() +// if err := checkFunc(test.want, got); err != nil { +// tt.Errorf("error = %v", err) +// } +// +// }) +// } +// } +// +// func TestCorrector_GetBboltAsyncWriteConcurrency(t *testing.T) { +// type fields struct { +// AgentPort int +// AgentName string +// AgentNamespace string +// AgentDNS string +// CreationPoolSize uint32 +// NodeName string +// StreamListConcurrency int +// BboltAsyncWriteConcurrency int +// Discoverer *DiscovererClient +// } +// type want struct { +// want int +// } +// type test struct { +// name string +// fields fields +// want want +// checkFunc func(want, int) error +// beforeFunc func(*testing.T) +// afterFunc func(*testing.T) +// } +// defaultCheckFunc := func(w want, got int) error { +// if !reflect.DeepEqual(got, w.want) { +// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) +// } +// return nil +// } +// tests := []test{ +// // TODO test cases +// /* +// { +// name: "test_case_1", +// fields: fields { +// AgentPort:0, +// AgentName:"", +// AgentNamespace:"", +// AgentDNS:"", +// CreationPoolSize:0, +// NodeName:"", +// StreamListConcurrency:0, +// BboltAsyncWriteConcurrency:0, +// Discoverer:DiscovererClient{}, +// }, +// want: want{}, +// checkFunc: defaultCheckFunc, +// beforeFunc: func(t *testing.T,) { +// t.Helper() +// }, +// afterFunc: func(t *testing.T,) { +// t.Helper() +// }, +// }, +// */ +// +// // TODO test cases +// /* +// func() test { +// return test { +// name: "test_case_2", +// fields: fields { +// AgentPort:0, +// AgentName:"", +// AgentNamespace:"", +// AgentDNS:"", +// CreationPoolSize:0, +// NodeName:"", +// StreamListConcurrency:0, +// BboltAsyncWriteConcurrency:0, +// Discoverer:DiscovererClient{}, +// }, +// want: want{}, +// checkFunc: defaultCheckFunc, +// beforeFunc: func(t *testing.T,) { +// t.Helper() +// }, +// afterFunc: func(t *testing.T,) { +// t.Helper() +// }, +// } +// }(), +// */ +// } +// +// for _, tc := range tests { +// test := tc +// t.Run(test.name, func(tt *testing.T) { +// tt.Parallel() +// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) +// if test.beforeFunc != nil { +// test.beforeFunc(tt) +// } +// if test.afterFunc != nil { +// defer test.afterFunc(tt) +// } +// checkFunc := test.checkFunc +// if test.checkFunc == nil { +// checkFunc = defaultCheckFunc +// } +// c := &Corrector{ +// AgentPort: test.fields.AgentPort, +// AgentName: test.fields.AgentName, +// AgentNamespace: test.fields.AgentNamespace, +// AgentDNS: test.fields.AgentDNS, +// CreationPoolSize: test.fields.CreationPoolSize, +// NodeName: test.fields.NodeName, +// StreamListConcurrency: test.fields.StreamListConcurrency, +// BboltAsyncWriteConcurrency: test.fields.BboltAsyncWriteConcurrency, +// Discoverer: test.fields.Discoverer, +// } +// +// got := c.GetBboltAsyncWriteConcurrency() +// if err := checkFunc(test.want, got); err != nil { +// tt.Errorf("error = %v", err) +// } +// +// }) +// } +// } diff --git a/pkg/index/job/correction/config/config.go b/pkg/index/job/correction/config/config.go index aff43f3657..70a48b5baa 100644 --- a/pkg/index/job/correction/config/config.go +++ b/pkg/index/job/correction/config/config.go @@ -69,7 +69,7 @@ func NewConfig(path string) (cfg *Data, err error) { if cfg.Corrector != nil { cfg.Corrector = cfg.Corrector.Bind() } else { - return nil, errors.ErrInvalidConfig + cfg.Corrector = new(config.Corrector).Bind() } return cfg, nil diff --git a/pkg/index/job/correction/service/corrector.go b/pkg/index/job/correction/service/corrector.go index f345268005..44f8b834be 100644 --- a/pkg/index/job/correction/service/corrector.go +++ b/pkg/index/job/correction/service/corrector.go @@ -19,7 +19,6 @@ import ( "fmt" "io" "os" - "reflect" "slices" "sync/atomic" "time" @@ -38,6 +37,7 @@ import ( "github.com/vdaas/vald/internal/safety" "github.com/vdaas/vald/internal/sync" "github.com/vdaas/vald/internal/sync/errgroup" + "github.com/vdaas/vald/pkg/index/job/correction/config" ) type contextTimeKey string @@ -60,6 +60,7 @@ type Corrector interface { } type correct struct { + cfg *config.Data discoverer discoverer.Client agentAddrs []string indexInfos sync.Map[string, *payload.Info_Index_Count] @@ -69,46 +70,24 @@ type correct struct { checkedIndexCount atomic.Uint64 correctedOldIndexCount atomic.Uint64 correctedReplicationCount atomic.Uint64 - - indexReplica int - streamListConcurrency int - bboltAsyncWriteConcurrency int } const filemode = 0o600 -func New(opts ...Option) (_ Corrector, err error) { - c := new(correct) - for _, opt := range append(defaultOpts, opts...) { - if err := opt(c); err != nil { - oerr := errors.ErrOptionFailed(err, reflect.ValueOf(opt)) - e := &errors.ErrCriticalOption{} - if errors.As(oerr, &e) { - log.Error(err) - return nil, oerr - } - log.Warn(oerr) - } - } - if err := c.bboltInit(); err != nil { - return nil, err - } - return c, nil -} - -func (c *correct) bboltInit() error { - dpath := file.Join(os.TempDir(), "bbolt") - err := file.MkdirAll(dpath, os.ModePerm) +func New(cfg *config.Data, discoverer discoverer.Client) (Corrector, error) { + d := file.Join(os.TempDir(), "bbolt") + file.MkdirAll(d, os.ModePerm) + dbfile := file.Join(d, "checkedid.db") + bolt, err := bbolt.New(dbfile, "", os.FileMode(filemode)) if err != nil { - return err + return nil, err } - dbfile := file.Join(dpath, "checkedid.db") - c.checkedID, err = bbolt.New(dbfile, "", os.FileMode(filemode)) - if err != nil { - return err - } - return nil + return &correct{ + cfg: cfg, + discoverer: discoverer, + checkedID: bolt, + }, nil } func (c *correct) StartClient(ctx context.Context) (<-chan error, error) { @@ -123,15 +102,18 @@ func (c *correct) Start(ctx context.Context) error { // this is decending because it's supposed to be used for index manager to decide // which pod to make a create index rpc(higher memory, first to commit) c.agentAddrs = c.discoverer.GetAddrs(ctx) - if len(c.agentAddrs) <= 1 { - log.Warnf("target agent (%v) found, but there must be more than two agents for correction to happen", c.agentAddrs) + log.Debug("agent addrs found:", c.agentAddrs) + + if l := len(c.agentAddrs); l <= 1 { + log.Warn("only %d agent found, there must be more than two agents for correction to happen", l) return errors.ErrAgentReplicaOne } - log.Debugf("target agent addrs: %v", c.agentAddrs) - if err := c.loadInfos(ctx); err != nil { + err := c.loadInfos(ctx) + if err != nil { return err } + c.indexInfos.Range(func(addr string, info *payload.Info_Index_Count) bool { log.Infof("index info: addr(%s), stored(%d), uncommitted(%d)", addr, info.GetStored(), info.GetUncommitted()) return true @@ -139,6 +121,7 @@ func (c *correct) Start(ctx context.Context) error { log.Info("starting correction with bbolt disk cache...") if err := c.correct(ctx); err != nil { + log.Errorf("there's some errors while correction: %v", err) return err } log.Info("correction finished successfully") @@ -178,7 +161,7 @@ func (c *correct) correct(ctx context.Context) (err error) { } curTargetAgent := 0 - jobErrs := make([]error, 0, c.streamListConcurrency) + jobErrs := make([]error, 0, c.cfg.Corrector.StreamListConcurrency) if err := c.discoverer.GetClient().OrderedRange(ctx, c.agentAddrs, func(ctx context.Context, addr string, conn *grpc.ClientConn, copts ...grpc.CallOption) (err error) { // current address is the leftAgentAddrs[0] because this is OrderedRange and @@ -195,13 +178,15 @@ func (c *correct) correct(ctx context.Context) (err error) { sctx, scancel := context.WithCancel(ctx) defer scancel() seg, sctx := errgroup.WithContext(sctx) - seg.SetLimit(c.streamListConcurrency) + sconcurrency := c.cfg.Corrector.GetStreamListConcurrency() + seg.SetLimit(sconcurrency) // errgroup for bbolt AsyncSet bolteg, ctx := errgroup.WithContext(ctx) - bolteg.SetLimit(c.bboltAsyncWriteConcurrency) + bconcurrency := c.cfg.Corrector.GetBboltAsyncWriteConcurrency() + bolteg.SetLimit(bconcurrency) - log.Infof("starting correction for agent %s, stream concurrency: %d, bbolt concurrency: %d", addr, c.streamListConcurrency, c.bboltAsyncWriteConcurrency) + log.Infof("starting correction for agent %s, stream concurrency: %d, bbolt concurrency: %d", addr, sconcurrency, bconcurrency) vc := vald.NewValdClient(conn) stream, err := vc.StreamListObject(ctx, &payload.Object_List_Request{}) @@ -245,11 +230,11 @@ func (c *correct) correct(ctx context.Context) (err error) { res, err := stream.Recv() mu.Unlock() + if errors.Is(err, io.EOF) { + scancel() + return nil + } if err != nil { - if errors.Is(err, io.EOF) { - scancel() - return nil - } return errors.ErrStreamListObjectStreamFinishedUnexpectedly(err) } @@ -434,7 +419,7 @@ func (c *correct) correctReplica( ) error { // diff < 0 means there is less replica than the correct number existReplica := len(foundReplicas) + 1 - diff := existReplica - c.indexReplica + diff := existReplica - c.cfg.Corrector.IndexReplica if diff == 0 { // replica number is correct return nil diff --git a/pkg/index/job/correction/service/corrector_test.go b/pkg/index/job/correction/service/corrector_test.go index a249d59f60..91a6b2fd4c 100644 --- a/pkg/index/job/correction/service/corrector_test.go +++ b/pkg/index/job/correction/service/corrector_test.go @@ -20,9 +20,11 @@ import ( tmock "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/vdaas/vald/apis/grpc/v1/payload" + iconfig "github.com/vdaas/vald/internal/config" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/net/grpc" "github.com/vdaas/vald/internal/test/mock" + "github.com/vdaas/vald/pkg/index/job/correction/config" ) type mockDiscovererClient struct { @@ -428,8 +430,12 @@ func Test_correct_correctReplica(t *testing.T) { for _, tc := range tests { test := tc c := &correct{ - discoverer: &m, - indexReplica: test.args.indexReplica, + discoverer: &m, + cfg: &config.Data{ + Corrector: &iconfig.Corrector{ + IndexReplica: test.args.indexReplica, + }, + }, } // agentAddrs = availableAddrs + target.addr + found.addr @@ -466,7 +472,8 @@ func Test_correct_correctReplica(t *testing.T) { // // func TestNew(t *testing.T) { // type args struct { -// opts []Option +// cfg *config.Data +// discoverer discoverer.Client // } // type want struct { // want Corrector @@ -495,7 +502,8 @@ func Test_correct_correctReplica(t *testing.T) { // { // name: "test_case_1", // args: args { -// opts:nil, +// cfg:nil, +// discoverer:nil, // }, // want: want{}, // checkFunc: defaultCheckFunc, @@ -514,7 +522,8 @@ func Test_correct_correctReplica(t *testing.T) { // return test { // name: "test_case_2", // args: args { -// opts:nil, +// cfg:nil, +// discoverer:nil, // }, // want: want{}, // checkFunc: defaultCheckFunc, @@ -545,7 +554,7 @@ func Test_correct_correctReplica(t *testing.T) { // checkFunc = defaultCheckFunc // } // -// got, err := New(test.args.opts...) +// got, err := New(test.args.cfg, test.args.discoverer) // if err := checkFunc(test.want, got, err); err != nil { // tt.Errorf("error = %v", err) // } @@ -554,23 +563,18 @@ func Test_correct_correctReplica(t *testing.T) { // } // } // -// func Test_correct_StartClient(t *testing.T) { +// func Test_correct_Start(t *testing.T) { // type args struct { // ctx context.Context // } // type fields struct { -// discoverer discoverer.Client -// agentAddrs []string -// indexInfos sync.Map[string, *payload.Info_Index_Count] -// uuidsCount uint32 -// uncommittedUUIDsCount uint32 -// checkedID bbolt.Bbolt -// checkedIndexCount atomic.Uint64 -// correctedOldIndexCount atomic.Uint64 -// correctedReplicationCount atomic.Uint64 -// indexReplica int -// streamListConcurrency int -// bboltAsyncWriteConcurrency int +// cfg *config.Data +// discoverer discoverer.Client +// agentAddrs []string +// indexInfos sync.Map[string, *payload.Info_Index_Count] +// uuidsCount uint32 +// uncommittedUUIDsCount uint32 +// checkedID bbolt.Bbolt // } // type want struct { // want <-chan error @@ -603,18 +607,13 @@ func Test_correct_correctReplica(t *testing.T) { // ctx:nil, // }, // fields: fields { +// cfg:nil, // discoverer:nil, // agentAddrs:nil, // indexInfos:nil, // uuidsCount:0, // uncommittedUUIDsCount:0, // checkedID:nil, -// checkedIndexCount:nil, -// correctedOldIndexCount:nil, -// correctedReplicationCount:nil, -// indexReplica:0, -// streamListConcurrency:0, -// bboltAsyncWriteConcurrency:0, // }, // want: want{}, // checkFunc: defaultCheckFunc, @@ -636,18 +635,13 @@ func Test_correct_correctReplica(t *testing.T) { // ctx:nil, // }, // fields: fields { +// cfg:nil, // discoverer:nil, // agentAddrs:nil, // indexInfos:nil, // uuidsCount:0, // uncommittedUUIDsCount:0, // checkedID:nil, -// checkedIndexCount:nil, -// correctedOldIndexCount:nil, -// correctedReplicationCount:nil, -// indexReplica:0, -// streamListConcurrency:0, -// bboltAsyncWriteConcurrency:0, // }, // want: want{}, // checkFunc: defaultCheckFunc, @@ -678,21 +672,16 @@ func Test_correct_correctReplica(t *testing.T) { // checkFunc = defaultCheckFunc // } // c := &correct{ -// discoverer: test.fields.discoverer, -// agentAddrs: test.fields.agentAddrs, -// indexInfos: test.fields.indexInfos, -// uuidsCount: test.fields.uuidsCount, -// uncommittedUUIDsCount: test.fields.uncommittedUUIDsCount, -// checkedID: test.fields.checkedID, -// checkedIndexCount: test.fields.checkedIndexCount, -// correctedOldIndexCount: test.fields.correctedOldIndexCount, -// correctedReplicationCount: test.fields.correctedReplicationCount, -// indexReplica: test.fields.indexReplica, -// streamListConcurrency: test.fields.streamListConcurrency, -// bboltAsyncWriteConcurrency: test.fields.bboltAsyncWriteConcurrency, +// cfg: test.fields.cfg, +// discoverer: test.fields.discoverer, +// agentAddrs: test.fields.agentAddrs, +// indexInfos: test.fields.indexInfos, +// uuidsCount: test.fields.uuidsCount, +// uncommittedUUIDsCount: test.fields.uncommittedUUIDsCount, +// checkedID: test.fields.checkedID, // } // -// got, err := c.StartClient(test.args.ctx) +// got, err := c.Start(test.args.ctx) // if err := checkFunc(test.want, got, err); err != nil { // tt.Errorf("error = %v", err) // } @@ -701,166 +690,18 @@ func Test_correct_correctReplica(t *testing.T) { // } // } // -// func Test_correct_Start(t *testing.T) { -// type args struct { -// ctx context.Context -// } -// type fields struct { -// discoverer discoverer.Client -// agentAddrs []string -// indexInfos sync.Map[string, *payload.Info_Index_Count] -// uuidsCount uint32 -// uncommittedUUIDsCount uint32 -// checkedID bbolt.Bbolt -// checkedIndexCount atomic.Uint64 -// correctedOldIndexCount atomic.Uint64 -// correctedReplicationCount atomic.Uint64 -// indexReplica int -// streamListConcurrency int -// bboltAsyncWriteConcurrency int -// } -// type want struct { -// err error -// } -// type test struct { -// name string -// args args -// fields fields -// want want -// checkFunc func(want, error) error -// beforeFunc func(*testing.T, args) -// afterFunc func(*testing.T, args) -// } -// defaultCheckFunc := func(w want, err error) error { -// if !errors.Is(err, w.err) { -// return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) -// } -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// args: args { -// ctx:nil, -// }, -// fields: fields { -// discoverer:nil, -// agentAddrs:nil, -// indexInfos:nil, -// uuidsCount:0, -// uncommittedUUIDsCount:0, -// checkedID:nil, -// checkedIndexCount:nil, -// correctedOldIndexCount:nil, -// correctedReplicationCount:nil, -// indexReplica:0, -// streamListConcurrency:0, -// bboltAsyncWriteConcurrency:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// args: args { -// ctx:nil, -// }, -// fields: fields { -// discoverer:nil, -// agentAddrs:nil, -// indexInfos:nil, -// uuidsCount:0, -// uncommittedUUIDsCount:0, -// checkedID:nil, -// checkedIndexCount:nil, -// correctedOldIndexCount:nil, -// correctedReplicationCount:nil, -// indexReplica:0, -// streamListConcurrency:0, -// bboltAsyncWriteConcurrency:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt, test.args) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt, test.args) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// c := &correct{ -// discoverer: test.fields.discoverer, -// agentAddrs: test.fields.agentAddrs, -// indexInfos: test.fields.indexInfos, -// uuidsCount: test.fields.uuidsCount, -// uncommittedUUIDsCount: test.fields.uncommittedUUIDsCount, -// checkedID: test.fields.checkedID, -// checkedIndexCount: test.fields.checkedIndexCount, -// correctedOldIndexCount: test.fields.correctedOldIndexCount, -// correctedReplicationCount: test.fields.correctedReplicationCount, -// indexReplica: test.fields.indexReplica, -// streamListConcurrency: test.fields.streamListConcurrency, -// bboltAsyncWriteConcurrency: test.fields.bboltAsyncWriteConcurrency, -// } -// -// err := c.Start(test.args.ctx) -// if err := checkFunc(test.want, err); err != nil { -// tt.Errorf("error = %v", err) -// } -// -// }) -// } -// } -// // func Test_correct_PreStop(t *testing.T) { // type args struct { // in0 context.Context // } // type fields struct { -// discoverer discoverer.Client -// agentAddrs []string -// indexInfos sync.Map[string, *payload.Info_Index_Count] -// uuidsCount uint32 -// uncommittedUUIDsCount uint32 -// checkedID bbolt.Bbolt -// checkedIndexCount atomic.Uint64 -// correctedOldIndexCount atomic.Uint64 -// correctedReplicationCount atomic.Uint64 -// indexReplica int -// streamListConcurrency int -// bboltAsyncWriteConcurrency int +// cfg *config.Data +// discoverer discoverer.Client +// agentAddrs []string +// indexInfos sync.Map[string, *payload.Info_Index_Count] +// uuidsCount uint32 +// uncommittedUUIDsCount uint32 +// checkedID bbolt.Bbolt // } // type want struct { // err error @@ -889,18 +730,13 @@ func Test_correct_correctReplica(t *testing.T) { // in0:nil, // }, // fields: fields { +// cfg:nil, // discoverer:nil, // agentAddrs:nil, // indexInfos:nil, // uuidsCount:0, // uncommittedUUIDsCount:0, // checkedID:nil, -// checkedIndexCount:nil, -// correctedOldIndexCount:nil, -// correctedReplicationCount:nil, -// indexReplica:0, -// streamListConcurrency:0, -// bboltAsyncWriteConcurrency:0, // }, // want: want{}, // checkFunc: defaultCheckFunc, @@ -922,18 +758,13 @@ func Test_correct_correctReplica(t *testing.T) { // in0:nil, // }, // fields: fields { +// cfg:nil, // discoverer:nil, // agentAddrs:nil, // indexInfos:nil, // uuidsCount:0, // uncommittedUUIDsCount:0, // checkedID:nil, -// checkedIndexCount:nil, -// correctedOldIndexCount:nil, -// correctedReplicationCount:nil, -// indexReplica:0, -// streamListConcurrency:0, -// bboltAsyncWriteConcurrency:0, // }, // want: want{}, // checkFunc: defaultCheckFunc, @@ -964,18 +795,13 @@ func Test_correct_correctReplica(t *testing.T) { // checkFunc = defaultCheckFunc // } // c := &correct{ -// discoverer: test.fields.discoverer, -// agentAddrs: test.fields.agentAddrs, -// indexInfos: test.fields.indexInfos, -// uuidsCount: test.fields.uuidsCount, -// uncommittedUUIDsCount: test.fields.uncommittedUUIDsCount, -// checkedID: test.fields.checkedID, -// checkedIndexCount: test.fields.checkedIndexCount, -// correctedOldIndexCount: test.fields.correctedOldIndexCount, -// correctedReplicationCount: test.fields.correctedReplicationCount, -// indexReplica: test.fields.indexReplica, -// streamListConcurrency: test.fields.streamListConcurrency, -// bboltAsyncWriteConcurrency: test.fields.bboltAsyncWriteConcurrency, +// cfg: test.fields.cfg, +// discoverer: test.fields.discoverer, +// agentAddrs: test.fields.agentAddrs, +// indexInfos: test.fields.indexInfos, +// uuidsCount: test.fields.uuidsCount, +// uncommittedUUIDsCount: test.fields.uncommittedUUIDsCount, +// checkedID: test.fields.checkedID, // } // // err := c.PreStop(test.args.in0) @@ -986,402 +812,3 @@ func Test_correct_correctReplica(t *testing.T) { // }) // } // } -// -// func Test_correct_NumberOfCheckedIndex(t *testing.T) { -// type fields struct { -// discoverer discoverer.Client -// agentAddrs []string -// indexInfos sync.Map[string, *payload.Info_Index_Count] -// uuidsCount uint32 -// uncommittedUUIDsCount uint32 -// checkedID bbolt.Bbolt -// checkedIndexCount atomic.Uint64 -// correctedOldIndexCount atomic.Uint64 -// correctedReplicationCount atomic.Uint64 -// indexReplica int -// streamListConcurrency int -// bboltAsyncWriteConcurrency int -// } -// type want struct { -// want uint64 -// } -// type test struct { -// name string -// fields fields -// want want -// checkFunc func(want, uint64) error -// beforeFunc func(*testing.T) -// afterFunc func(*testing.T) -// } -// defaultCheckFunc := func(w want, got uint64) error { -// if !reflect.DeepEqual(got, w.want) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) -// } -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// fields: fields { -// discoverer:nil, -// agentAddrs:nil, -// indexInfos:nil, -// uuidsCount:0, -// uncommittedUUIDsCount:0, -// checkedID:nil, -// checkedIndexCount:nil, -// correctedOldIndexCount:nil, -// correctedReplicationCount:nil, -// indexReplica:0, -// streamListConcurrency:0, -// bboltAsyncWriteConcurrency:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T,) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T,) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// fields: fields { -// discoverer:nil, -// agentAddrs:nil, -// indexInfos:nil, -// uuidsCount:0, -// uncommittedUUIDsCount:0, -// checkedID:nil, -// checkedIndexCount:nil, -// correctedOldIndexCount:nil, -// correctedReplicationCount:nil, -// indexReplica:0, -// streamListConcurrency:0, -// bboltAsyncWriteConcurrency:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T,) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T,) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// c := &correct{ -// discoverer: test.fields.discoverer, -// agentAddrs: test.fields.agentAddrs, -// indexInfos: test.fields.indexInfos, -// uuidsCount: test.fields.uuidsCount, -// uncommittedUUIDsCount: test.fields.uncommittedUUIDsCount, -// checkedID: test.fields.checkedID, -// checkedIndexCount: test.fields.checkedIndexCount, -// correctedOldIndexCount: test.fields.correctedOldIndexCount, -// correctedReplicationCount: test.fields.correctedReplicationCount, -// indexReplica: test.fields.indexReplica, -// streamListConcurrency: test.fields.streamListConcurrency, -// bboltAsyncWriteConcurrency: test.fields.bboltAsyncWriteConcurrency, -// } -// -// got := c.NumberOfCheckedIndex() -// if err := checkFunc(test.want, got); err != nil { -// tt.Errorf("error = %v", err) -// } -// -// }) -// } -// } -// -// func Test_correct_NumberOfCorrectedOldIndex(t *testing.T) { -// type fields struct { -// discoverer discoverer.Client -// agentAddrs []string -// indexInfos sync.Map[string, *payload.Info_Index_Count] -// uuidsCount uint32 -// uncommittedUUIDsCount uint32 -// checkedID bbolt.Bbolt -// checkedIndexCount atomic.Uint64 -// correctedOldIndexCount atomic.Uint64 -// correctedReplicationCount atomic.Uint64 -// indexReplica int -// streamListConcurrency int -// bboltAsyncWriteConcurrency int -// } -// type want struct { -// want uint64 -// } -// type test struct { -// name string -// fields fields -// want want -// checkFunc func(want, uint64) error -// beforeFunc func(*testing.T) -// afterFunc func(*testing.T) -// } -// defaultCheckFunc := func(w want, got uint64) error { -// if !reflect.DeepEqual(got, w.want) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) -// } -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// fields: fields { -// discoverer:nil, -// agentAddrs:nil, -// indexInfos:nil, -// uuidsCount:0, -// uncommittedUUIDsCount:0, -// checkedID:nil, -// checkedIndexCount:nil, -// correctedOldIndexCount:nil, -// correctedReplicationCount:nil, -// indexReplica:0, -// streamListConcurrency:0, -// bboltAsyncWriteConcurrency:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T,) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T,) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// fields: fields { -// discoverer:nil, -// agentAddrs:nil, -// indexInfos:nil, -// uuidsCount:0, -// uncommittedUUIDsCount:0, -// checkedID:nil, -// checkedIndexCount:nil, -// correctedOldIndexCount:nil, -// correctedReplicationCount:nil, -// indexReplica:0, -// streamListConcurrency:0, -// bboltAsyncWriteConcurrency:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T,) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T,) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// c := &correct{ -// discoverer: test.fields.discoverer, -// agentAddrs: test.fields.agentAddrs, -// indexInfos: test.fields.indexInfos, -// uuidsCount: test.fields.uuidsCount, -// uncommittedUUIDsCount: test.fields.uncommittedUUIDsCount, -// checkedID: test.fields.checkedID, -// checkedIndexCount: test.fields.checkedIndexCount, -// correctedOldIndexCount: test.fields.correctedOldIndexCount, -// correctedReplicationCount: test.fields.correctedReplicationCount, -// indexReplica: test.fields.indexReplica, -// streamListConcurrency: test.fields.streamListConcurrency, -// bboltAsyncWriteConcurrency: test.fields.bboltAsyncWriteConcurrency, -// } -// -// got := c.NumberOfCorrectedOldIndex() -// if err := checkFunc(test.want, got); err != nil { -// tt.Errorf("error = %v", err) -// } -// -// }) -// } -// } -// -// func Test_correct_NumberOfCorrectedReplication(t *testing.T) { -// type fields struct { -// discoverer discoverer.Client -// agentAddrs []string -// indexInfos sync.Map[string, *payload.Info_Index_Count] -// uuidsCount uint32 -// uncommittedUUIDsCount uint32 -// checkedID bbolt.Bbolt -// checkedIndexCount atomic.Uint64 -// correctedOldIndexCount atomic.Uint64 -// correctedReplicationCount atomic.Uint64 -// indexReplica int -// streamListConcurrency int -// bboltAsyncWriteConcurrency int -// } -// type want struct { -// want uint64 -// } -// type test struct { -// name string -// fields fields -// want want -// checkFunc func(want, uint64) error -// beforeFunc func(*testing.T) -// afterFunc func(*testing.T) -// } -// defaultCheckFunc := func(w want, got uint64) error { -// if !reflect.DeepEqual(got, w.want) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) -// } -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// fields: fields { -// discoverer:nil, -// agentAddrs:nil, -// indexInfos:nil, -// uuidsCount:0, -// uncommittedUUIDsCount:0, -// checkedID:nil, -// checkedIndexCount:nil, -// correctedOldIndexCount:nil, -// correctedReplicationCount:nil, -// indexReplica:0, -// streamListConcurrency:0, -// bboltAsyncWriteConcurrency:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T,) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T,) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// fields: fields { -// discoverer:nil, -// agentAddrs:nil, -// indexInfos:nil, -// uuidsCount:0, -// uncommittedUUIDsCount:0, -// checkedID:nil, -// checkedIndexCount:nil, -// correctedOldIndexCount:nil, -// correctedReplicationCount:nil, -// indexReplica:0, -// streamListConcurrency:0, -// bboltAsyncWriteConcurrency:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T,) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T,) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// c := &correct{ -// discoverer: test.fields.discoverer, -// agentAddrs: test.fields.agentAddrs, -// indexInfos: test.fields.indexInfos, -// uuidsCount: test.fields.uuidsCount, -// uncommittedUUIDsCount: test.fields.uncommittedUUIDsCount, -// checkedID: test.fields.checkedID, -// checkedIndexCount: test.fields.checkedIndexCount, -// correctedOldIndexCount: test.fields.correctedOldIndexCount, -// correctedReplicationCount: test.fields.correctedReplicationCount, -// indexReplica: test.fields.indexReplica, -// streamListConcurrency: test.fields.streamListConcurrency, -// bboltAsyncWriteConcurrency: test.fields.bboltAsyncWriteConcurrency, -// } -// -// got := c.NumberOfCorrectedReplication() -// if err := checkFunc(test.want, got); err != nil { -// tt.Errorf("error = %v", err) -// } -// -// }) -// } -// } diff --git a/pkg/index/job/correction/service/options.go b/pkg/index/job/correction/service/options.go deleted file mode 100644 index 3d1ec633a7..0000000000 --- a/pkg/index/job/correction/service/options.go +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright (C) 2019-2023 vdaas.org vald team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// You may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package service - -import ( - "github.com/vdaas/vald/internal/client/v1/client/discoverer" - "github.com/vdaas/vald/internal/errors" -) - -// Option represents the functional option for index corrector. -type Option func(*correct) error - -var defaultOpts = []Option{ - WithStreamListConcurrency(200), //nolint:gomnd - WithBboltAsyncWriteConcurrency(2048), //nolint:gomnd -} - -// WithIndexReplica returns Option that sets index replica. -func WithIndexReplica(num int) Option { - return func(c *correct) error { - if num <= 1 { - return errors.NewErrCriticalOption("indexReplica", num, errors.ErrIndexReplicaOne) - } - c.indexReplica = num - return nil - } -} - -// WithDiscoverer returns Option that sets discoverer client. -func WithDiscoverer(client discoverer.Client) Option { - return func(c *correct) error { - if client == nil { - return errors.NewErrCriticalOption("discoverer", client) - } - c.discoverer = client - return nil - } -} - -// WithStreamListConcurrency returns Option that sets concurrency for StreamList field value. -func WithStreamListConcurrency(num int) Option { - return func(c *correct) error { - if num <= 0 { - return errors.NewErrInvalidOption("streamListConcurrency", num) - } - c.streamListConcurrency = num - return nil - } -} - -// WithBboltAsyncWriteConcurrency returns Option that sets concurrency for kvs async write. -func WithBboltAsyncWriteConcurrency(num int) Option { - return func(c *correct) error { - if num <= 0 { - return errors.NewErrInvalidOption("bboltAsyncWriteConcurrency", num) - } - c.bboltAsyncWriteConcurrency = num - return nil - } -} diff --git a/pkg/index/job/correction/service/options_test.go b/pkg/index/job/correction/service/options_test.go deleted file mode 100644 index ed1a312363..0000000000 --- a/pkg/index/job/correction/service/options_test.go +++ /dev/null @@ -1,360 +0,0 @@ -// Copyright (C) 2019-2023 vdaas.org vald team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// You may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package service - -// NOT IMPLEMENTED BELOW -// -// func TestWithIndexReplica(t *testing.T) { -// type args struct { -// num int -// } -// type want struct { -// want Option -// } -// type test struct { -// name string -// args args -// want want -// checkFunc func(want, Option) error -// beforeFunc func(*testing.T, args) -// afterFunc func(*testing.T, args) -// } -// defaultCheckFunc := func(w want, got Option) error { -// if !reflect.DeepEqual(got, w.want) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) -// } -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// args: args { -// num:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// args: args { -// num:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt, test.args) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt, test.args) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// -// got := WithIndexReplica(test.args.num) -// if err := checkFunc(test.want, got); err != nil { -// tt.Errorf("error = %v", err) -// } -// -// }) -// } -// } -// -// func TestWithDiscoverer(t *testing.T) { -// type args struct { -// client discoverer.Client -// } -// type want struct { -// want Option -// } -// type test struct { -// name string -// args args -// want want -// checkFunc func(want, Option) error -// beforeFunc func(*testing.T, args) -// afterFunc func(*testing.T, args) -// } -// defaultCheckFunc := func(w want, got Option) error { -// if !reflect.DeepEqual(got, w.want) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) -// } -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// args: args { -// client:nil, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// args: args { -// client:nil, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt, test.args) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt, test.args) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// -// got := WithDiscoverer(test.args.client) -// if err := checkFunc(test.want, got); err != nil { -// tt.Errorf("error = %v", err) -// } -// -// }) -// } -// } -// -// func TestWithStreamListConcurrency(t *testing.T) { -// type args struct { -// num int -// } -// type want struct { -// want Option -// } -// type test struct { -// name string -// args args -// want want -// checkFunc func(want, Option) error -// beforeFunc func(*testing.T, args) -// afterFunc func(*testing.T, args) -// } -// defaultCheckFunc := func(w want, got Option) error { -// if !reflect.DeepEqual(got, w.want) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) -// } -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// args: args { -// num:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// args: args { -// num:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt, test.args) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt, test.args) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// -// got := WithStreamListConcurrency(test.args.num) -// if err := checkFunc(test.want, got); err != nil { -// tt.Errorf("error = %v", err) -// } -// -// }) -// } -// } -// -// func TestWithBboltAsyncWriteConcurrency(t *testing.T) { -// type args struct { -// num int -// } -// type want struct { -// want Option -// } -// type test struct { -// name string -// args args -// want want -// checkFunc func(want, Option) error -// beforeFunc func(*testing.T, args) -// afterFunc func(*testing.T, args) -// } -// defaultCheckFunc := func(w want, got Option) error { -// if !reflect.DeepEqual(got, w.want) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) -// } -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// args: args { -// num:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// args: args { -// num:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt, test.args) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt, test.args) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// -// got := WithBboltAsyncWriteConcurrency(test.args.num) -// if err := checkFunc(test.want, got); err != nil { -// tt.Errorf("error = %v", err) -// } -// -// }) -// } -// } diff --git a/pkg/index/job/correction/usecase/corrector.go b/pkg/index/job/correction/usecase/corrector.go index 110e71d4a6..896165f6aa 100644 --- a/pkg/index/job/correction/usecase/corrector.go +++ b/pkg/index/job/correction/usecase/corrector.go @@ -45,21 +45,29 @@ type run struct { } func New(cfg *config.Data) (r runner.Runner, err error) { + if cfg.Corrector.IndexReplica == 1 { + return nil, errors.ErrIndexReplicaOne + } + eg := errgroup.Get() - dOpts, err := cfg.Corrector.Discoverer.Client.Opts() + cOpts, err := cfg.Corrector.Discoverer.Client.Opts() if err != nil { return nil, err } // skipcq: CRT-D0001 - dOpts = append(dOpts, grpc.WithErrGroup(eg)) + dopts := append( + cOpts, + grpc.WithErrGroup(eg)) acOpts, err := cfg.Corrector.Discoverer.AgentClientOptions.Opts() if err != nil { return nil, err } // skipcq: CRT-D0001 - acOpts = append(acOpts, grpc.WithErrGroup(eg)) + aopts := append( + acOpts, + grpc.WithErrGroup(eg)) // Construct discoverer discoverer, err := discoverer.New( @@ -68,9 +76,9 @@ func New(cfg *config.Data) (r runner.Runner, err error) { discoverer.WithNamespace(cfg.Corrector.AgentNamespace), discoverer.WithPort(cfg.Corrector.AgentPort), discoverer.WithServiceDNSARecord(cfg.Corrector.AgentDNS), - discoverer.WithDiscovererClient(grpc.New(dOpts...)), + discoverer.WithDiscovererClient(grpc.New(dopts...)), discoverer.WithDiscoverDuration(cfg.Corrector.Discoverer.Duration), - discoverer.WithOptions(acOpts...), + discoverer.WithOptions(aopts...), discoverer.WithNodeName(cfg.Corrector.NodeName), discoverer.WithOnDiscoverFunc(func(ctx context.Context, c discoverer.Client, addrs []string) error { last := len(addrs) - 1 @@ -101,12 +109,7 @@ func New(cfg *config.Data) (r runner.Runner, err error) { return nil, err } - corrector, err := service.New( - service.WithDiscoverer(discoverer), - service.WithIndexReplica(cfg.Corrector.IndexReplica), - service.WithBboltAsyncWriteConcurrency(cfg.Corrector.BboltAsyncWriteConcurrency), - service.WithStreamListConcurrency(cfg.Corrector.StreamListConcurrency), - ) + corrector, err := service.New(cfg, discoverer) if err != nil { return nil, err } @@ -118,6 +121,7 @@ func New(cfg *config.Data) (r runner.Runner, err error) { correction.New(corrector), ) if err != nil { + log.Error("failed to initialize observability") return nil, err } } @@ -200,25 +204,23 @@ func (r *run) Start(ctx context.Context) (<-chan error, error) { log.Infof("correction finished in %v", end) return nil })) + return ech, nil } func (r *run) PreStop(ctx context.Context) error { - return r.corrector.PreStop(ctx) + r.corrector.PreStop(ctx) + return nil } -func (r *run) Stop(ctx context.Context) (errs error) { +func (r *run) Stop(ctx context.Context) error { if r.observability != nil { - if err := r.observability.Stop(ctx); err != nil { - errs = errors.Join(errs, err) - } + r.observability.Stop(ctx) } if r.server != nil { - if err := r.server.Shutdown(ctx); err != nil { - errs = errors.Join(errs, err) - } + r.server.Shutdown(ctx) } - return errs + return nil } func (*run) PostStop(_ context.Context) error { diff --git a/pkg/index/job/creation/service/indexer.go b/pkg/index/job/creation/service/indexer.go index bd0ff64ba3..73dfa221f0 100644 --- a/pkg/index/job/creation/service/indexer.go +++ b/pkg/index/job/creation/service/indexer.go @@ -37,7 +37,7 @@ const ( // Indexer represents an interface for indexing. type Indexer interface { - StartClient(ctx context.Context) (<-chan error, error) + PreStart(ctx context.Context) (<-chan error, error) Start(ctx context.Context) error } @@ -71,8 +71,8 @@ func New(opts ...Option) (Indexer, error) { return idx, nil } -// StartClient starts the gRPC client. -func (idx *index) StartClient(ctx context.Context) (<-chan error, error) { +// PreStart starts the preparation process. +func (idx *index) PreStart(ctx context.Context) (<-chan error, error) { return idx.client.Start(ctx) } diff --git a/pkg/index/job/creation/service/indexer_test.go b/pkg/index/job/creation/service/indexer_test.go index 6229dd21ee..27dd95ec9d 100644 --- a/pkg/index/job/creation/service/indexer_test.go +++ b/pkg/index/job/creation/service/indexer_test.go @@ -24,8 +24,6 @@ import ( "github.com/vdaas/vald/internal/net/grpc/codes" "github.com/vdaas/vald/internal/net/grpc/status" "github.com/vdaas/vald/internal/test/goleak" - clientmock "github.com/vdaas/vald/internal/test/mock/client" - grpcmock "github.com/vdaas/vald/internal/test/mock/grpc" ) func Test_index_Start(t *testing.T) { @@ -70,12 +68,12 @@ func Test_index_Start(t *testing.T) { }, fields: fields{ - client: &clientmock.DiscovererClientMock{ + client: &mockDiscovererClient{ GetAddrsFunc: func(_ context.Context) []string { return addrs }, GetClientFunc: func() grpc.Client { - return &grpcmock.GRPCClientMock{ + return &mockGrpcClient{ OrderedRangeConcurrentFunc: func(_ context.Context, _ []string, _ int, _ func(_ context.Context, _ string, _ *grpc.ClientConn, _ ...grpc.CallOption) error, ) error { @@ -97,12 +95,12 @@ func Test_index_Start(t *testing.T) { ctx: context.Background(), }, fields: fields{ - client: &clientmock.DiscovererClientMock{ + client: &mockDiscovererClient{ GetAddrsFunc: func(_ context.Context) []string { return addrs }, GetClientFunc: func() grpc.Client { - return &grpcmock.GRPCClientMock{ + return &mockGrpcClient{ OrderedRangeConcurrentFunc: func(_ context.Context, _ []string, _ int, _ func(_ context.Context, _ string, _ *grpc.ClientConn, _ ...grpc.CallOption) error, ) error { @@ -132,12 +130,12 @@ func Test_index_Start(t *testing.T) { }, fields: fields{ - client: &clientmock.DiscovererClientMock{ + client: &mockDiscovererClient{ GetAddrsFunc: func(_ context.Context) []string { return addrs }, GetClientFunc: func() grpc.Client { - return &grpcmock.GRPCClientMock{ + return &mockGrpcClient{ OrderedRangeConcurrentFunc: func(_ context.Context, _ []string, _ int, _ func(_ context.Context, _ string, _ *grpc.ClientConn, _ ...grpc.CallOption) error, ) error { @@ -166,7 +164,7 @@ func Test_index_Start(t *testing.T) { ctx: context.Background(), }, fields: fields{ - client: &clientmock.DiscovererClientMock{ + client: &mockDiscovererClient{ GetAddrsFunc: func(_ context.Context) []string { return nil }, @@ -305,7 +303,7 @@ func Test_index_Start(t *testing.T) { // } // } // -// func Test_index_StartClient(t *testing.T) { +// func Test_index_PreStart(t *testing.T) { // type args struct { // ctx context.Context // } @@ -415,7 +413,7 @@ func Test_index_Start(t *testing.T) { // concurrency: test.fields.concurrency, // } // -// got, err := idx.StartClient(test.args.ctx) +// got, err := idx.PreStart(test.args.ctx) // if err := checkFunc(test.want, got, err); err != nil { // tt.Errorf("error = %v", err) // } diff --git a/pkg/index/job/creation/service/mock_test.go b/pkg/index/job/creation/service/mock_test.go new file mode 100644 index 0000000000..89c0f19aa5 --- /dev/null +++ b/pkg/index/job/creation/service/mock_test.go @@ -0,0 +1,57 @@ +// Copyright (C) 2019-2023 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package service + +import ( + "context" + + "github.com/vdaas/vald/internal/client/v1/client/discoverer" + "github.com/vdaas/vald/internal/net/grpc" +) + +type mockDiscovererClient struct { + discoverer.Client + GetAddrsFunc func(ctx context.Context) []string + GetClientFunc func() grpc.Client +} + +func (mc *mockDiscovererClient) GetAddrs(ctx context.Context) []string { + return mc.GetAddrsFunc(ctx) +} + +func (mc *mockDiscovererClient) GetClient() grpc.Client { + return mc.GetClientFunc() +} + +type mockGrpcClient struct { + grpc.Client + OrderedRangeConcurrentFunc func(ctx context.Context, + order []string, + concurrency int, + f func(ctx context.Context, + addr string, + conn *grpc.ClientConn, + copts ...grpc.CallOption) error) error +} + +func (mc *mockGrpcClient) OrderedRangeConcurrent(ctx context.Context, + order []string, + concurrency int, + f func(ctx context.Context, + addr string, + conn *grpc.ClientConn, + copts ...grpc.CallOption) error, +) error { + return mc.OrderedRangeConcurrentFunc(ctx, order, concurrency, f) +} diff --git a/pkg/index/job/creation/usecase/creation.go b/pkg/index/job/creation/usecase/creation.go index 932263c7e8..7afa0f1958 100644 --- a/pkg/index/job/creation/usecase/creation.go +++ b/pkg/index/job/creation/usecase/creation.go @@ -143,7 +143,7 @@ func (r *run) Start(ctx context.Context) (<-chan error, error) { oech = r.observability.Start(ctx) } sech = r.server.ListenAndServe(ctx) - cech, err := r.indexer.StartClient(ctx) + ipech, err := r.indexer.PreStart(ctx) if err != nil { close(ech) return nil, err @@ -174,7 +174,7 @@ func (r *run) Start(ctx context.Context) (<-chan error, error) { return ctx.Err() case err = <-oech: case err = <-sech: - case err = <-cech: + case err = <-ipech: } if err != nil { select { diff --git a/pkg/index/job/save/service/indexer_test.go b/pkg/index/job/save/service/indexer_test.go index 90d97b2c09..d2341d43b4 100644 --- a/pkg/index/job/save/service/indexer_test.go +++ b/pkg/index/job/save/service/indexer_test.go @@ -417,3 +417,4 @@ func Test_index_Start(t *testing.T) { // }) // } // } +//