diff --git a/charts/vald/values.yaml b/charts/vald/values.yaml index d9a9f1862c..29996831a3 100644 --- a/charts/vald/values.yaml +++ b/charts/vald/values.yaml @@ -752,6 +752,8 @@ agent: auto_index_length: 100 # agent.ngt.auto_save_index_duration -- duration of automatic save index auto_save_index_duration: 35m + # agent.ngt.initial_delay_max_duration -- maximum duration for initial delay + initial_delay_max_duration: 3m # agent.ngt.dimension -- vector dimension dimension: 4096 # agent.ngt.bulk_insert_chunk_size -- bulk insert chunk size diff --git a/go.mod b/go.mod index 1e656baa63..1f863bcd8c 100755 --- a/go.mod +++ b/go.mod @@ -55,7 +55,7 @@ require ( go.uber.org/goleak v1.0.0 golang.org/x/mod v0.3.0 // indirect golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5 - golang.org/x/tools v0.0.0-20200519015757-0d0afa43d58a // indirect + golang.org/x/tools v0.0.0-20200520220537-cf2d1e09c845 // indirect gonum.org/v1/hdf5 v0.0.0-20200504100616-496fefe91614 gonum.org/v1/netlib v0.0.0-20200317120129-c5a04cffd98a // indirect gonum.org/v1/plot v0.7.0 diff --git a/go.sum b/go.sum index 659137f0cc..d8542671c1 100644 --- a/go.sum +++ b/go.sum @@ -645,6 +645,8 @@ golang.org/x/tools v0.0.0-20200515220128-d3bf790afa53 h1:vmsb6v0zUdmUlXfwKaYrHPP golang.org/x/tools v0.0.0-20200515220128-d3bf790afa53/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20200519015757-0d0afa43d58a h1:gILuVKC+ZPD6g/tj6zBOdnOH1ZHI0zZ86+KLMogc6/s= golang.org/x/tools v0.0.0-20200519015757-0d0afa43d58a/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20200520220537-cf2d1e09c845 h1:F4gQH8TKyCccYDuNHX5TfZwiM8QWnPbSPUFE96qvGbs= +golang.org/x/tools v0.0.0-20200520220537-cf2d1e09c845/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= diff --git a/internal/config/ngt.go b/internal/config/ngt.go index e1f09cb2c5..5dc34c761d 100644 --- a/internal/config/ngt.go +++ b/internal/config/ngt.go @@ -52,6 +52,9 @@ type NGT struct { // AutoIndexLength represent auto index length limit AutoIndexLength int `yaml:"auto_index_length" json:"auto_index_length"` + // InitialDelayMaxDuration represent maximum duration for initial delay + InitialDelayMaxDuration string `yaml:"initial_delay_max_duration" json:"initial_delay_max_duration"` + // EnableInMemoryMode enables on memory ngt indexing mode EnableInMemoryMode bool `yaml:"enable_in_memory_mode" json:"enable_in_memory_mode"` } @@ -63,5 +66,6 @@ func (n *NGT) Bind() *NGT { n.AutoIndexCheckDuration = GetActualValue(n.AutoIndexCheckDuration) n.AutoIndexDurationLimit = GetActualValue(n.AutoIndexDurationLimit) n.AutoSaveIndexDuration = GetActualValue(n.AutoSaveIndexDuration) + n.InitialDelayMaxDuration = GetActualValue(n.InitialDelayMaxDuration) return n } diff --git a/internal/observability/metrics/agent/ngt/ngt.go b/internal/observability/metrics/agent/ngt/ngt.go index 160be84d18..3009af5a6f 100644 --- a/internal/observability/metrics/agent/ngt/ngt.go +++ b/internal/observability/metrics/agent/ngt/ngt.go @@ -25,22 +25,42 @@ import ( ) type ngtMetrics struct { - ngt service.NGT - indexCount metrics.Int64Measure - uncommittedIndexCount metrics.Int64Measure - insertVCacheCount metrics.Int64Measure - deleteVCacheCount metrics.Int64Measure - isIndexing metrics.Int64Measure + ngt service.NGT + indexCount metrics.Int64Measure + uncommittedIndexCount metrics.Int64Measure + insertVCacheCount metrics.Int64Measure + deleteVCacheCount metrics.Int64Measure + completedCreateIndexTotal metrics.Int64Measure + isIndexing metrics.Int64Measure } func New(n service.NGT) metrics.Metric { return &ngtMetrics{ - ngt: n, - indexCount: *metrics.Int64(metrics.ValdOrg+"/ngt/index_count", "NGT index count", metrics.UnitDimensionless), - uncommittedIndexCount: *metrics.Int64(metrics.ValdOrg+"/ngt/uncommitted_index_count", "NGT uncommitted index count", metrics.UnitDimensionless), - insertVCacheCount: *metrics.Int64(metrics.ValdOrg+"/ngt/insert_vcache_count", "NGT insert vcache count", metrics.UnitDimensionless), - deleteVCacheCount: *metrics.Int64(metrics.ValdOrg+"/ngt/delete_vcache_count", "NGT delete vcache count", metrics.UnitDimensionless), - isIndexing: *metrics.Int64(metrics.ValdOrg+"/ngt/is_indexing", "currently indexing or not", metrics.UnitDimensionless), + ngt: n, + indexCount: *metrics.Int64( + metrics.ValdOrg+"/ngt/index_count", + "NGT index count", + metrics.UnitDimensionless), + uncommittedIndexCount: *metrics.Int64( + metrics.ValdOrg+"/ngt/uncommitted_index_count", + "NGT uncommitted index count", + metrics.UnitDimensionless), + insertVCacheCount: *metrics.Int64( + metrics.ValdOrg+"/ngt/insert_vcache_count", + "NGT insert vcache count", + metrics.UnitDimensionless), + deleteVCacheCount: *metrics.Int64( + metrics.ValdOrg+"/ngt/delete_vcache_count", + "NGT delete vcache count", + metrics.UnitDimensionless), + completedCreateIndexTotal: *metrics.Int64( + metrics.ValdOrg+"/ngt/completed_create_index_total", + "the cumulative count of completed create index execution", + metrics.UnitDimensionless), + isIndexing: *metrics.Int64( + metrics.ValdOrg+"/ngt/is_indexing", + "currently indexing or not", + metrics.UnitDimensionless), } } @@ -55,6 +75,7 @@ func (n *ngtMetrics) Measurement(ctx context.Context) ([]metrics.Measurement, er n.uncommittedIndexCount.M(int64(n.ngt.InsertVCacheLen() + n.ngt.DeleteVCacheLen())), n.insertVCacheCount.M(int64(n.ngt.InsertVCacheLen())), n.deleteVCacheCount.M(int64(n.ngt.DeleteVCacheLen())), + n.completedCreateIndexTotal.M(int64(n.ngt.NumberOfCreateIndexExecution())), n.isIndexing.M(isIndexing), }, nil } @@ -89,6 +110,12 @@ func (n *ngtMetrics) View() []*metrics.View { Measure: &n.deleteVCacheCount, Aggregation: metrics.LastValue(), }, + &metrics.View{ + Name: "ngt_completed_create_index_total", + Description: "the cumulative count of completed create index execution", + Measure: &n.completedCreateIndexTotal, + Aggregation: metrics.LastValue(), + }, &metrics.View{ Name: "ngt_is_indexing", Description: "currently indexing or not", diff --git a/pkg/agent/ngt/handler/grpc/handler.go b/pkg/agent/ngt/handler/grpc/handler.go index e8d5786030..51b5430927 100644 --- a/pkg/agent/ngt/handler/grpc/handler.go +++ b/pkg/agent/ngt/handler/grpc/handler.go @@ -356,7 +356,7 @@ func (s *server) CreateIndex(ctx context.Context, c *payload.Control_CreateIndex } }() res = new(payload.Empty) - err = s.ngt.CreateIndex(c.GetPoolSize()) + err = s.ngt.CreateIndex(ctx, c.GetPoolSize()) if err != nil { log.Errorf("[CreateIndex]\tUnknown error\t%+v", err) if span != nil { diff --git a/pkg/agent/ngt/service/ngt.go b/pkg/agent/ngt/service/ngt.go index cbc17751e7..2389c21306 100644 --- a/pkg/agent/ngt/service/ngt.go +++ b/pkg/agent/ngt/service/ngt.go @@ -33,6 +33,8 @@ import ( "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/file" "github.com/vdaas/vald/internal/log" + "github.com/vdaas/vald/internal/observability/trace" + "github.com/vdaas/vald/internal/rand" "github.com/vdaas/vald/internal/safety" "github.com/vdaas/vald/internal/timeutil" "github.com/vdaas/vald/pkg/agent/ngt/model" @@ -50,7 +52,7 @@ type NGT interface { Delete(uuid string) (err error) DeleteMultiple(uuids ...string) (err error) GetObject(uuid string) (vec []float32, err error) - CreateIndex(poolSize uint32) (err error) + CreateIndex(ctx context.Context, poolSize uint32) (err error) SaveIndex(ctx context.Context) (err error) Exists(string) (uint32, bool) CreateAndSaveIndex(ctx context.Context, poolSize uint32) (err error) @@ -70,6 +72,7 @@ type ngt struct { lim time.Duration // auto indexing time limit dur time.Duration // auto indexing check duration sdur time.Duration // auto save index check duration + idelay time.Duration // initial delay duration dps uint32 // default pool size ic uint64 // insert count nocie uint64 // number of create index execution @@ -167,6 +170,16 @@ func New(cfg *config.NGT) (nn NGT, err error) { n.sdur = d } + if cfg.InitialDelayMaxDuration != "" { + d, err := timeutil.Parse(cfg.InitialDelayMaxDuration) + if err != nil { + d = 0 + } + n.idelay = time.Duration( + int64(rand.LimitedUint32(uint64(d/time.Second))), + ) * time.Second + } + n.alen = cfg.AutoIndexLength n.eg = errgroup.Get() @@ -201,6 +214,16 @@ func (n *ngt) Start(ctx context.Context) <-chan error { n.lim = n.dur * 2 } defer close(ech) + + timer := time.NewTimer(n.idelay) + select { + case <-ctx.Done(): + timer.Stop() + return ctx.Err() + case <-timer.C: + } + timer.Stop() + tick := time.NewTicker(n.dur) sTick := time.NewTicker(n.sdur) limit := time.NewTicker(n.lim) @@ -219,7 +242,7 @@ func (n *ngt) Start(ctx context.Context) <-chan error { return ctx.Err() case <-tick.C: if int(atomic.LoadUint64(&n.ic)) >= n.alen { - err = n.CreateIndex(n.dps) + err = n.CreateIndex(ctx, n.dps) } case <-limit.C: err = n.CreateAndSaveIndex(ctx, n.dps) @@ -423,7 +446,14 @@ func (n *ngt) GetObject(uuid string) (vec []float32, err error) { return vec, nil } -func (n *ngt) CreateIndex(poolSize uint32) (err error) { +func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) { + ctx, span := trace.StartSpan(ctx, "vald/agent-ngt/service/NGT.CreateIndex") + defer func() { + if span != nil { + span.End() + } + }() + if n.indexing.Load().(bool) { return nil } @@ -521,6 +551,13 @@ func (n *ngt) CreateIndex(poolSize uint32) (err error) { } func (n *ngt) SaveIndex(ctx context.Context) (err error) { + ctx, span := trace.StartSpan(ctx, "vald/agent-ngt/service/NGT.SaveIndex") + defer func() { + if span != nil { + span.End() + } + }() + if len(n.path) != 0 && !n.inMem { eg, ctx := errgroup.New(ctx) eg.Go(safety.RecoverFunc(func() error { @@ -549,7 +586,14 @@ func (n *ngt) SaveIndex(ctx context.Context) (err error) { } func (n *ngt) CreateAndSaveIndex(ctx context.Context, poolSize uint32) (err error) { - err = n.CreateIndex(poolSize) + ctx, span := trace.StartSpan(ctx, "vald/agent-ngt/service/NGT.CreateAndSaveIndex") + defer func() { + if span != nil { + span.End() + } + }() + + err = n.CreateIndex(ctx, poolSize) if err != nil { return err } diff --git a/pkg/agent/ngt/service/ngt_test.go b/pkg/agent/ngt/service/ngt_test.go index 3b04846efd..91b888abbe 100644 --- a/pkg/agent/ngt/service/ngt_test.go +++ b/pkg/agent/ngt/service/ngt_test.go @@ -1791,6 +1791,7 @@ func Test_ngt_GetObject(t *testing.T) { func Test_ngt_CreateIndex(t *testing.T) { type args struct { + ctx context.Context poolSize uint32 } type fields struct { @@ -1916,7 +1917,7 @@ func Test_ngt_CreateIndex(t *testing.T) { dcd: test.fields.dcd, } - err := n.CreateIndex(test.args.poolSize) + err := n.CreateIndex(test.args.ctx, test.args.poolSize) if err := test.checkFunc(test.want, err); err != nil { tt.Errorf("error = %v", err) } diff --git a/pkg/manager/index/service/indexer.go b/pkg/manager/index/service/indexer.go index fad5ee073b..d47f91bf7a 100644 --- a/pkg/manager/index/service/indexer.go +++ b/pkg/manager/index/service/indexer.go @@ -30,6 +30,7 @@ import ( "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/net/grpc" + "github.com/vdaas/vald/internal/observability/trace" "github.com/vdaas/vald/internal/safety" ) @@ -126,6 +127,13 @@ func (idx *index) Start(ctx context.Context) (<-chan error, error) { } func (idx *index) execute(ctx context.Context, enableLowIndexSkip bool) (err error) { + ctx, span := trace.StartSpan(ctx, "vald/manager-index/service/Indexer.execute") + defer func() { + if span != nil { + span.End() + } + }() + if idx.indexing.Load().(bool) { return nil } @@ -168,6 +176,13 @@ func (idx *index) execute(ctx context.Context, enableLowIndexSkip bool) (err err } func (idx *index) loadInfos(ctx context.Context) (err error) { + ctx, span := trace.StartSpan(ctx, "vald/manager-index/service/Indexer.loadInfos") + defer func() { + if span != nil { + span.End() + } + }() + var u, ucu uint32 var infoMap indexInfos err = idx.client.GetClient().RangeConcurrent(ctx, len(idx.client.GetAddrs(ctx)),