diff --git a/pkg/agent/core/ngt/service/ngt.go b/pkg/agent/core/ngt/service/ngt.go index c873901b42..609f86dd2a 100644 --- a/pkg/agent/core/ngt/service/ngt.go +++ b/pkg/agent/core/ngt/service/ngt.go @@ -22,14 +22,13 @@ import ( "encoding/gob" "os" "path/filepath" + "reflect" "runtime" - "strings" "sync" "sync/atomic" "syscall" "time" - "github.com/vdaas/vald/internal/config" core "github.com/vdaas/vald/internal/core/ngt" "github.com/vdaas/vald/internal/encoding/json" "github.com/vdaas/vald/internal/errgroup" @@ -38,9 +37,7 @@ import ( "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/metadata" "github.com/vdaas/vald/internal/observability/trace" - "github.com/vdaas/vald/internal/rand" "github.com/vdaas/vald/internal/safety" - "github.com/vdaas/vald/internal/timeutil" "github.com/vdaas/vald/pkg/agent/core/ngt/model" "github.com/vdaas/vald/pkg/agent/core/ngt/service/kvs" ) @@ -71,27 +68,40 @@ type NGT interface { } type ngt struct { - alen int + // instances + core core.NGT + eg errgroup.Group + kvs kvs.BidiMap + ivc *vcaches // insertion vector cache + dvc *vcaches // deletion vector cache + + // statuses indexing atomic.Value - saveMu sync.Mutex // creating or saving index - lim time.Duration // auto indexing time limit - dur time.Duration // auto indexing check duration - sdur time.Duration // auto save index check duration - idelay time.Duration // initial delay duration - ic uint64 // insert count - nocie uint64 // number of create index execution - eg errgroup.Group - ivc *vcaches // insertion vector cache - dvc *vcaches // deletion vector cache - path string - kvs kvs.BidiMap - core core.NGT - dcd bool // disable commit daemon - inMem bool + saveMu sync.Mutex // creating or saving index + + // counters + ic uint64 // insert count + nocie uint64 // number of create index execution + + // configurations + inMem bool // in-memory mode + + alen int // auto indexing length + + lim time.Duration // auto indexing time limit + dur time.Duration // auto indexing check duration + sdur time.Duration // auto save index check duration minLit time.Duration // minimum load index timeout maxLit time.Duration // maximum load index timeout litFactor time.Duration // load index timeout factor + + path string // index path + + idelay time.Duration // initial delay duration + dcd bool // disable commit daemon + + ngtOpts []core.Option } type vcache struct { @@ -103,32 +113,22 @@ const ( kvsFileName = "ngt-meta.kvsdb" ) -func New(cfg *config.NGT) (nn NGT, err error) { +func New(opts ...Option) (nn NGT, err error) { n := new(ngt) - n.parseCfg(cfg) - - opts := []core.Option{ - core.WithInMemoryMode(n.inMem), - core.WithIndexPath(n.path), - core.WithDimension(cfg.Dimension), - core.WithDistanceTypeByString(cfg.DistanceType), - core.WithObjectTypeByString(cfg.ObjectType), - core.WithBulkInsertChunkSize(cfg.BulkInsertChunkSize), - core.WithCreationEdgeSize(cfg.CreationEdgeSize), - core.WithSearchEdgeSize(cfg.SearchEdgeSize), - core.WithDefaultPoolSize(cfg.DefaultPoolSize), + for _, opt := range append(defaultOpts, opts...) { + if err := opt(n); err != nil { + return nil, errors.ErrOptionFailed(err, reflect.ValueOf(opt)) + } } n.kvs = kvs.New() - err = n.initNGT(opts...) + err = n.initNGT(n.ngtOpts...) if err != nil { return nil, err } - n.eg = errgroup.Get() - if n.dur == 0 || n.alen == 0 { n.dcd = true } @@ -146,33 +146,6 @@ func New(cfg *config.NGT) (nn NGT, err error) { return n, nil } -func (n *ngt) parseCfg(cfg *config.NGT) { - cfg.IndexPath = strings.TrimSuffix(cfg.IndexPath, "/") - - n.inMem = cfg.EnableInMemoryMode - - if !n.inMem && cfg.IndexPath != "" { - n.path = cfg.IndexPath - } - - n.dur = timeutil.ParseWithDefault(cfg.AutoIndexCheckDuration, 0) - n.lim = timeutil.ParseWithDefault(cfg.AutoIndexDurationLimit, 0) - n.sdur = timeutil.ParseWithDefault(cfg.AutoSaveIndexDuration, 0) - - if cfg.InitialDelayMaxDuration != "" { - d := timeutil.ParseWithDefault(cfg.InitialDelayMaxDuration, 0) - n.idelay = time.Duration( - int64(rand.LimitedUint32(uint64(d/time.Second))), - ) * time.Second - } - - n.alen = cfg.AutoIndexLength - - n.minLit = timeutil.ParseWithDefault(cfg.MinLoadIndexTimeout, 3*time.Minute) - n.maxLit = timeutil.ParseWithDefault(cfg.MaxLoadIndexTimeout, 10*time.Minute) - n.litFactor = timeutil.ParseWithDefault(cfg.LoadIndexTimeoutFactor, time.Millisecond) -} - func (n *ngt) initNGT(opts ...core.Option) (err error) { if _, err = os.Stat(n.path); os.IsNotExist(err) || n.inMem { n.core, err = core.New(opts...) diff --git a/pkg/agent/core/ngt/service/ngt_test.go b/pkg/agent/core/ngt/service/ngt_test.go index 71210e8519..899abec3c8 100644 --- a/pkg/agent/core/ngt/service/ngt_test.go +++ b/pkg/agent/core/ngt/service/ngt_test.go @@ -20,95 +20,19 @@ package service import ( "context" "reflect" + "sync" "sync/atomic" "testing" "time" - "github.com/vdaas/vald/internal/config" core "github.com/vdaas/vald/internal/core/ngt" "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/pkg/agent/core/ngt/model" "github.com/vdaas/vald/pkg/agent/core/ngt/service/kvs" - "go.uber.org/goleak" ) -func TestNew(t *testing.T) { - type args struct { - cfg *config.NGT - } - type want struct { - wantNn NGT - err error - } - type test struct { - name string - args args - want want - checkFunc func(want, NGT, error) error - beforeFunc func(args) - afterFunc func(args) - } - defaultCheckFunc := func(w want, gotNn NGT, err error) error { - if !errors.Is(err, w.err) { - return errors.Errorf("got error = %v, want %v", err, w.err) - } - if !reflect.DeepEqual(gotNn, w.wantNn) { - return errors.Errorf("got = %v, want %v", gotNn, w.wantNn) - } - return nil - } - tests := []test{ - // TODO test cases - /* - { - name: "test_case_1", - args: args { - cfg: nil, - }, - want: want{}, - checkFunc: defaultCheckFunc, - }, - */ - - // TODO test cases - /* - func() test { - return test { - name: "test_case_2", - args: args { - cfg: nil, - }, - want: want{}, - checkFunc: defaultCheckFunc, - } - }(), - */ - } - - for _, test := range tests { - t.Run(test.name, func(tt *testing.T) { - defer goleak.VerifyNone(t) - if test.beforeFunc != nil { - test.beforeFunc(test.args) - } - if test.afterFunc != nil { - defer test.afterFunc(test.args) - } - if test.checkFunc == nil { - test.checkFunc = defaultCheckFunc - } - - gotNn, err := New(test.args.cfg) - if err := test.checkFunc(test.want, gotNn, err); err != nil { - tt.Errorf("error = %v", err) - } - - }) - } -} - func Test_ngt_Start(t *testing.T) { type args struct { ctx context.Context @@ -3408,3 +3332,406 @@ func Test_ngt_Close(t *testing.T) { }) } } + +func TestNew(t *testing.T) { + type args struct { + opts []Option + } + type want struct { + wantNn NGT + err error + } + type test struct { + name string + args args + want want + checkFunc func(want, NGT, error) error + beforeFunc func(args) + afterFunc func(args) + } + defaultCheckFunc := func(w want, gotNn NGT, err error) error { + if !errors.Is(err, w.err) { + return errors.Errorf("got error = %v, want %v", err, w.err) + } + if !reflect.DeepEqual(gotNn, w.wantNn) { + return errors.Errorf("got = %v, want %v", gotNn, w.wantNn) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + args: args { + opts: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + args: args { + opts: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + defer goleak.VerifyNone(tt) + if test.beforeFunc != nil { + test.beforeFunc(test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(test.args) + } + if test.checkFunc == nil { + test.checkFunc = defaultCheckFunc + } + + gotNn, err := New(test.args.opts...) + if err := test.checkFunc(test.want, gotNn, err); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} + +func Test_ngt_initNGT(t *testing.T) { + type args struct { + opts []core.Option + } + type fields struct { + core core.NGT + eg errgroup.Group + kvs kvs.BidiMap + ivc *vcaches + dvc *vcaches + indexing atomic.Value + saveMu sync.Mutex + ic uint64 + nocie uint64 + inMem bool + alen int + lim time.Duration + dur time.Duration + sdur time.Duration + minLit time.Duration + maxLit time.Duration + litFactor time.Duration + path string + idelay time.Duration + dcd bool + ngtOpts []core.Option + } + type want struct { + err error + } + type test struct { + name string + args args + fields fields + want want + checkFunc func(want, error) error + beforeFunc func(args) + afterFunc func(args) + } + defaultCheckFunc := func(w want, err error) error { + if !errors.Is(err, w.err) { + return errors.Errorf("got error = %v, want %v", err, w.err) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + args: args { + opts: nil, + }, + fields: fields { + core: nil, + eg: nil, + kvs: nil, + ivc: vcaches{}, + dvc: vcaches{}, + indexing: nil, + saveMu: sync.Mutex{}, + ic: 0, + nocie: 0, + inMem: false, + alen: 0, + lim: nil, + dur: nil, + sdur: nil, + minLit: nil, + maxLit: nil, + litFactor: nil, + path: "", + idelay: nil, + dcd: false, + ngtOpts: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + args: args { + opts: nil, + }, + fields: fields { + core: nil, + eg: nil, + kvs: nil, + ivc: vcaches{}, + dvc: vcaches{}, + indexing: nil, + saveMu: sync.Mutex{}, + ic: 0, + nocie: 0, + inMem: false, + alen: 0, + lim: nil, + dur: nil, + sdur: nil, + minLit: nil, + maxLit: nil, + litFactor: nil, + path: "", + idelay: nil, + dcd: false, + ngtOpts: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + defer goleak.VerifyNone(tt) + if test.beforeFunc != nil { + test.beforeFunc(test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(test.args) + } + if test.checkFunc == nil { + test.checkFunc = defaultCheckFunc + } + n := &ngt{ + core: test.fields.core, + eg: test.fields.eg, + kvs: test.fields.kvs, + ivc: test.fields.ivc, + dvc: test.fields.dvc, + indexing: test.fields.indexing, + saveMu: test.fields.saveMu, + ic: test.fields.ic, + nocie: test.fields.nocie, + inMem: test.fields.inMem, + alen: test.fields.alen, + lim: test.fields.lim, + dur: test.fields.dur, + sdur: test.fields.sdur, + minLit: test.fields.minLit, + maxLit: test.fields.maxLit, + litFactor: test.fields.litFactor, + path: test.fields.path, + idelay: test.fields.idelay, + dcd: test.fields.dcd, + ngtOpts: test.fields.ngtOpts, + } + + err := n.initNGT(test.args.opts...) + if err := test.checkFunc(test.want, err); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} + +func Test_ngt_saveIndex(t *testing.T) { + type args struct { + ctx context.Context + } + type fields struct { + core core.NGT + eg errgroup.Group + kvs kvs.BidiMap + ivc *vcaches + dvc *vcaches + indexing atomic.Value + saveMu sync.Mutex + ic uint64 + nocie uint64 + inMem bool + alen int + lim time.Duration + dur time.Duration + sdur time.Duration + minLit time.Duration + maxLit time.Duration + litFactor time.Duration + path string + idelay time.Duration + dcd bool + ngtOpts []core.Option + } + type want struct { + err error + } + type test struct { + name string + args args + fields fields + want want + checkFunc func(want, error) error + beforeFunc func(args) + afterFunc func(args) + } + defaultCheckFunc := func(w want, err error) error { + if !errors.Is(err, w.err) { + return errors.Errorf("got error = %v, want %v", err, w.err) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + args: args { + ctx: nil, + }, + fields: fields { + core: nil, + eg: nil, + kvs: nil, + ivc: vcaches{}, + dvc: vcaches{}, + indexing: nil, + saveMu: sync.Mutex{}, + ic: 0, + nocie: 0, + inMem: false, + alen: 0, + lim: nil, + dur: nil, + sdur: nil, + minLit: nil, + maxLit: nil, + litFactor: nil, + path: "", + idelay: nil, + dcd: false, + ngtOpts: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + args: args { + ctx: nil, + }, + fields: fields { + core: nil, + eg: nil, + kvs: nil, + ivc: vcaches{}, + dvc: vcaches{}, + indexing: nil, + saveMu: sync.Mutex{}, + ic: 0, + nocie: 0, + inMem: false, + alen: 0, + lim: nil, + dur: nil, + sdur: nil, + minLit: nil, + maxLit: nil, + litFactor: nil, + path: "", + idelay: nil, + dcd: false, + ngtOpts: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + defer goleak.VerifyNone(tt) + if test.beforeFunc != nil { + test.beforeFunc(test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(test.args) + } + if test.checkFunc == nil { + test.checkFunc = defaultCheckFunc + } + n := &ngt{ + core: test.fields.core, + eg: test.fields.eg, + kvs: test.fields.kvs, + ivc: test.fields.ivc, + dvc: test.fields.dvc, + indexing: test.fields.indexing, + saveMu: test.fields.saveMu, + ic: test.fields.ic, + nocie: test.fields.nocie, + inMem: test.fields.inMem, + alen: test.fields.alen, + lim: test.fields.lim, + dur: test.fields.dur, + sdur: test.fields.sdur, + minLit: test.fields.minLit, + maxLit: test.fields.maxLit, + litFactor: test.fields.litFactor, + path: test.fields.path, + idelay: test.fields.idelay, + dcd: test.fields.dcd, + ngtOpts: test.fields.ngtOpts, + } + + err := n.saveIndex(test.args.ctx) + if err := test.checkFunc(test.want, err); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} diff --git a/pkg/agent/core/ngt/service/option.go b/pkg/agent/core/ngt/service/option.go new file mode 100644 index 0000000000..848b6b0ac1 --- /dev/null +++ b/pkg/agent/core/ngt/service/option.go @@ -0,0 +1,210 @@ +// +// Copyright (C) 2019-2020 Vdaas.org Vald team ( kpango, rinx, kmrmt ) +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package service + +import ( + "path/filepath" + "strings" + "time" + + core "github.com/vdaas/vald/internal/core/ngt" + "github.com/vdaas/vald/internal/errgroup" + "github.com/vdaas/vald/internal/rand" + "github.com/vdaas/vald/internal/timeutil" +) + +type Option func(n *ngt) error + +var ( + defaultOpts = []Option{ + WithErrGroup(errgroup.Get()), + WithAutoIndexCheckDuration("30m"), + WithAutoIndexDurationLimit("24h"), + WithAutoSaveIndexDuration("35m"), + WithAutoIndexLength(100), + WithInitialDelayMaxDuration("3m"), + WithMinLoadIndexTimeout("3m"), + WithMaxLoadIndexTimeout("10m"), + WithLoadIndexTimeoutFactor("1ms"), + } +) + +func WithErrGroup(eg errgroup.Group) Option { + return func(n *ngt) error { + if eg != nil { + n.eg = eg + } + + return nil + } +} + +func WithEnableInMemoryMode(enabled bool) Option { + return func(n *ngt) error { + n.inMem = enabled + + return WithNGTOpts(core.WithInMemoryMode(n.inMem))(n) + } +} + +func WithIndexPath(path string) Option { + return func(n *ngt) error { + n.path = filepath.Clean(strings.TrimSuffix(path, "/")) + + return WithNGTOpts(core.WithIndexPath(n.path))(n) + } +} + +func WithAutoIndexCheckDuration(dur string) Option { + return func(n *ngt) error { + if dur == "" { + return nil + } + + d, err := timeutil.Parse(dur) + if err != nil { + return err + } + + n.dur = d + + return nil + } +} + +func WithAutoIndexDurationLimit(dur string) Option { + return func(n *ngt) error { + if dur == "" { + return nil + } + + d, err := timeutil.Parse(dur) + if err != nil { + return err + } + + n.lim = d + + return nil + } +} + +func WithAutoSaveIndexDuration(dur string) Option { + return func(n *ngt) error { + if dur == "" { + return nil + } + + d, err := timeutil.Parse(dur) + if err != nil { + return err + } + + n.sdur = d + + return nil + } +} + +func WithAutoIndexLength(l int) Option { + return func(n *ngt) error { + n.alen = l + + return nil + } +} + +func WithInitialDelayMaxDuration(dur string) Option { + return func(n *ngt) error { + if dur == "" { + return nil + } + + d, err := timeutil.Parse(dur) + if err != nil { + return err + } + + n.idelay = time.Duration(int64(rand.LimitedUint32(uint64(d/time.Second)))) * time.Second + + return nil + } +} + +func WithMinLoadIndexTimeout(dur string) Option { + return func(n *ngt) error { + if dur == "" { + return nil + } + + d, err := timeutil.Parse(dur) + if err != nil { + return err + } + + n.minLit = d + + return nil + } +} + +func WithMaxLoadIndexTimeout(dur string) Option { + return func(n *ngt) error { + if dur == "" { + return nil + } + + d, err := timeutil.Parse(dur) + if err != nil { + return err + } + + n.maxLit = d + + return nil + } +} + +func WithLoadIndexTimeoutFactor(dur string) Option { + return func(n *ngt) error { + if dur == "" { + return nil + } + + d, err := timeutil.Parse(dur) + if err != nil { + return err + } + + n.litFactor = d + + return nil + } +} + +func WithNGTOpts(opts ...core.Option) Option { + return func(n *ngt) error { + if n.ngtOpts == nil { + n.ngtOpts = opts + return nil + } + + n.ngtOpts = append(n.ngtOpts, opts...) + + return nil + } +} diff --git a/pkg/agent/core/ngt/usecase/agentd.go b/pkg/agent/core/ngt/usecase/agentd.go index d1220b7581..b78397f6a9 100644 --- a/pkg/agent/core/ngt/usecase/agentd.go +++ b/pkg/agent/core/ngt/usecase/agentd.go @@ -21,6 +21,7 @@ import ( agent "github.com/vdaas/vald/apis/grpc/agent/core" iconf "github.com/vdaas/vald/internal/config" + "github.com/vdaas/vald/internal/core/ngt" "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/net/grpc" "github.com/vdaas/vald/internal/net/grpc/metric" @@ -46,7 +47,28 @@ type run struct { } func New(cfg *config.Data) (r runner.Runner, err error) { - ngt, err := service.New(cfg.NGT) + ngt, err := service.New( + service.WithErrGroup(errgroup.Get()), + service.WithEnableInMemoryMode(cfg.NGT.EnableInMemoryMode), + service.WithIndexPath(cfg.NGT.IndexPath), + service.WithAutoIndexCheckDuration(cfg.NGT.AutoIndexCheckDuration), + service.WithAutoIndexDurationLimit(cfg.NGT.AutoIndexDurationLimit), + service.WithAutoSaveIndexDuration(cfg.NGT.AutoSaveIndexDuration), + service.WithAutoIndexLength(cfg.NGT.AutoIndexLength), + service.WithInitialDelayMaxDuration(cfg.NGT.InitialDelayMaxDuration), + service.WithMinLoadIndexTimeout(cfg.NGT.MinLoadIndexTimeout), + service.WithMaxLoadIndexTimeout(cfg.NGT.MaxLoadIndexTimeout), + service.WithLoadIndexTimeoutFactor(cfg.NGT.LoadIndexTimeoutFactor), + service.WithNGTOpts( + ngt.WithDimension(cfg.NGT.Dimension), + ngt.WithDistanceTypeByString(cfg.NGT.DistanceType), + ngt.WithObjectTypeByString(cfg.NGT.ObjectType), + ngt.WithBulkInsertChunkSize(cfg.NGT.BulkInsertChunkSize), + ngt.WithCreationEdgeSize(cfg.NGT.CreationEdgeSize), + ngt.WithSearchEdgeSize(cfg.NGT.SearchEdgeSize), + ngt.WithDefaultPoolSize(cfg.NGT.DefaultPoolSize), + ), + ) if err != nil { return nil, err }