Skip to content

Commit

Permalink
✨ add trace spans and metrics for agent-ngt and index-manager (#389)
Browse files Browse the repository at this point in the history
* 📈 Add trace spans to agent-ngt and manager-index

Signed-off-by: Rintaro Okamura <[email protected]>

🐛 Fix

Signed-off-by: Rintaro Okamura <[email protected]>

🐛 Fix

Signed-off-by: Rintaro Okamura <[email protected]>

* 📈 Add completed_create_index_total metric

Signed-off-by: Rintaro Okamura <[email protected]>

* ✅ Fix tests

Signed-off-by: Rintaro Okamura <[email protected]>

* ✨ Add initial_delay_max_duration to ngt

Signed-off-by: Rintaro Okamura <[email protected]>

* ♻️ Revise calculation of initial duration

Signed-off-by: Rintaro Okamura <[email protected]>

* Update pkg/agent/ngt/service/ngt.go

Co-authored-by: Rintaro Okamura <[email protected]>

* 🤖 Update license headers and formatting go codes

Signed-off-by: vdaas-ci <[email protected]>

Co-authored-by: Yusuke Kato <[email protected]>
Co-authored-by: vdaas-ci <[email protected]>
  • Loading branch information
3 people authored May 21, 2020
1 parent f8dc984 commit 302d624
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 19 deletions.
2 changes: 2 additions & 0 deletions charts/vald/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,8 @@ agent:
auto_index_length: 100
# agent.ngt.auto_save_index_duration -- duration of automatic save index
auto_save_index_duration: 35m
# agent.ngt.initial_delay_max_duration -- maximum duration for initial delay
initial_delay_max_duration: 3m
# agent.ngt.dimension -- vector dimension
dimension: 4096
# agent.ngt.bulk_insert_chunk_size -- bulk insert chunk size
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ require (
go.uber.org/goleak v1.0.0
golang.org/x/mod v0.3.0 // indirect
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5
golang.org/x/tools v0.0.0-20200519015757-0d0afa43d58a // indirect
golang.org/x/tools v0.0.0-20200520220537-cf2d1e09c845 // indirect
gonum.org/v1/hdf5 v0.0.0-20200504100616-496fefe91614
gonum.org/v1/netlib v0.0.0-20200317120129-c5a04cffd98a // indirect
gonum.org/v1/plot v0.7.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,8 @@ golang.org/x/tools v0.0.0-20200515220128-d3bf790afa53 h1:vmsb6v0zUdmUlXfwKaYrHPP
golang.org/x/tools v0.0.0-20200515220128-d3bf790afa53/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200519015757-0d0afa43d58a h1:gILuVKC+ZPD6g/tj6zBOdnOH1ZHI0zZ86+KLMogc6/s=
golang.org/x/tools v0.0.0-20200519015757-0d0afa43d58a/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200520220537-cf2d1e09c845 h1:F4gQH8TKyCccYDuNHX5TfZwiM8QWnPbSPUFE96qvGbs=
golang.org/x/tools v0.0.0-20200520220537-cf2d1e09c845/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
Expand Down
4 changes: 4 additions & 0 deletions internal/config/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ type NGT struct {
// AutoIndexLength represent auto index length limit
AutoIndexLength int `yaml:"auto_index_length" json:"auto_index_length"`

// InitialDelayMaxDuration represent maximum duration for initial delay
InitialDelayMaxDuration string `yaml:"initial_delay_max_duration" json:"initial_delay_max_duration"`

// EnableInMemoryMode enables on memory ngt indexing mode
EnableInMemoryMode bool `yaml:"enable_in_memory_mode" json:"enable_in_memory_mode"`
}
Expand All @@ -63,5 +66,6 @@ func (n *NGT) Bind() *NGT {
n.AutoIndexCheckDuration = GetActualValue(n.AutoIndexCheckDuration)
n.AutoIndexDurationLimit = GetActualValue(n.AutoIndexDurationLimit)
n.AutoSaveIndexDuration = GetActualValue(n.AutoSaveIndexDuration)
n.InitialDelayMaxDuration = GetActualValue(n.InitialDelayMaxDuration)
return n
}
51 changes: 39 additions & 12 deletions internal/observability/metrics/agent/ngt/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,42 @@ import (
)

type ngtMetrics struct {
ngt service.NGT
indexCount metrics.Int64Measure
uncommittedIndexCount metrics.Int64Measure
insertVCacheCount metrics.Int64Measure
deleteVCacheCount metrics.Int64Measure
isIndexing metrics.Int64Measure
ngt service.NGT
indexCount metrics.Int64Measure
uncommittedIndexCount metrics.Int64Measure
insertVCacheCount metrics.Int64Measure
deleteVCacheCount metrics.Int64Measure
completedCreateIndexTotal metrics.Int64Measure
isIndexing metrics.Int64Measure
}

func New(n service.NGT) metrics.Metric {
return &ngtMetrics{
ngt: n,
indexCount: *metrics.Int64(metrics.ValdOrg+"/ngt/index_count", "NGT index count", metrics.UnitDimensionless),
uncommittedIndexCount: *metrics.Int64(metrics.ValdOrg+"/ngt/uncommitted_index_count", "NGT uncommitted index count", metrics.UnitDimensionless),
insertVCacheCount: *metrics.Int64(metrics.ValdOrg+"/ngt/insert_vcache_count", "NGT insert vcache count", metrics.UnitDimensionless),
deleteVCacheCount: *metrics.Int64(metrics.ValdOrg+"/ngt/delete_vcache_count", "NGT delete vcache count", metrics.UnitDimensionless),
isIndexing: *metrics.Int64(metrics.ValdOrg+"/ngt/is_indexing", "currently indexing or not", metrics.UnitDimensionless),
ngt: n,
indexCount: *metrics.Int64(
metrics.ValdOrg+"/ngt/index_count",
"NGT index count",
metrics.UnitDimensionless),
uncommittedIndexCount: *metrics.Int64(
metrics.ValdOrg+"/ngt/uncommitted_index_count",
"NGT uncommitted index count",
metrics.UnitDimensionless),
insertVCacheCount: *metrics.Int64(
metrics.ValdOrg+"/ngt/insert_vcache_count",
"NGT insert vcache count",
metrics.UnitDimensionless),
deleteVCacheCount: *metrics.Int64(
metrics.ValdOrg+"/ngt/delete_vcache_count",
"NGT delete vcache count",
metrics.UnitDimensionless),
completedCreateIndexTotal: *metrics.Int64(
metrics.ValdOrg+"/ngt/completed_create_index_total",
"the cumulative count of completed create index execution",
metrics.UnitDimensionless),
isIndexing: *metrics.Int64(
metrics.ValdOrg+"/ngt/is_indexing",
"currently indexing or not",
metrics.UnitDimensionless),
}
}

Expand All @@ -55,6 +75,7 @@ func (n *ngtMetrics) Measurement(ctx context.Context) ([]metrics.Measurement, er
n.uncommittedIndexCount.M(int64(n.ngt.InsertVCacheLen() + n.ngt.DeleteVCacheLen())),
n.insertVCacheCount.M(int64(n.ngt.InsertVCacheLen())),
n.deleteVCacheCount.M(int64(n.ngt.DeleteVCacheLen())),
n.completedCreateIndexTotal.M(int64(n.ngt.NumberOfCreateIndexExecution())),
n.isIndexing.M(isIndexing),
}, nil
}
Expand Down Expand Up @@ -89,6 +110,12 @@ func (n *ngtMetrics) View() []*metrics.View {
Measure: &n.deleteVCacheCount,
Aggregation: metrics.LastValue(),
},
&metrics.View{
Name: "ngt_completed_create_index_total",
Description: "the cumulative count of completed create index execution",
Measure: &n.completedCreateIndexTotal,
Aggregation: metrics.LastValue(),
},
&metrics.View{
Name: "ngt_is_indexing",
Description: "currently indexing or not",
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/ngt/handler/grpc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func (s *server) CreateIndex(ctx context.Context, c *payload.Control_CreateIndex
}
}()
res = new(payload.Empty)
err = s.ngt.CreateIndex(c.GetPoolSize())
err = s.ngt.CreateIndex(ctx, c.GetPoolSize())
if err != nil {
log.Errorf("[CreateIndex]\tUnknown error\t%+v", err)
if span != nil {
Expand Down
52 changes: 48 additions & 4 deletions pkg/agent/ngt/service/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/file"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/observability/trace"
"github.com/vdaas/vald/internal/rand"
"github.com/vdaas/vald/internal/safety"
"github.com/vdaas/vald/internal/timeutil"
"github.com/vdaas/vald/pkg/agent/ngt/model"
Expand All @@ -50,7 +52,7 @@ type NGT interface {
Delete(uuid string) (err error)
DeleteMultiple(uuids ...string) (err error)
GetObject(uuid string) (vec []float32, err error)
CreateIndex(poolSize uint32) (err error)
CreateIndex(ctx context.Context, poolSize uint32) (err error)
SaveIndex(ctx context.Context) (err error)
Exists(string) (uint32, bool)
CreateAndSaveIndex(ctx context.Context, poolSize uint32) (err error)
Expand All @@ -70,6 +72,7 @@ type ngt struct {
lim time.Duration // auto indexing time limit
dur time.Duration // auto indexing check duration
sdur time.Duration // auto save index check duration
idelay time.Duration // initial delay duration
dps uint32 // default pool size
ic uint64 // insert count
nocie uint64 // number of create index execution
Expand Down Expand Up @@ -167,6 +170,16 @@ func New(cfg *config.NGT) (nn NGT, err error) {
n.sdur = d
}

if cfg.InitialDelayMaxDuration != "" {
d, err := timeutil.Parse(cfg.InitialDelayMaxDuration)
if err != nil {
d = 0
}
n.idelay = time.Duration(
int64(rand.LimitedUint32(uint64(d/time.Second))),
) * time.Second
}

n.alen = cfg.AutoIndexLength

n.eg = errgroup.Get()
Expand Down Expand Up @@ -201,6 +214,16 @@ func (n *ngt) Start(ctx context.Context) <-chan error {
n.lim = n.dur * 2
}
defer close(ech)

timer := time.NewTimer(n.idelay)
select {
case <-ctx.Done():
timer.Stop()
return ctx.Err()
case <-timer.C:
}
timer.Stop()

tick := time.NewTicker(n.dur)
sTick := time.NewTicker(n.sdur)
limit := time.NewTicker(n.lim)
Expand All @@ -219,7 +242,7 @@ func (n *ngt) Start(ctx context.Context) <-chan error {
return ctx.Err()
case <-tick.C:
if int(atomic.LoadUint64(&n.ic)) >= n.alen {
err = n.CreateIndex(n.dps)
err = n.CreateIndex(ctx, n.dps)
}
case <-limit.C:
err = n.CreateAndSaveIndex(ctx, n.dps)
Expand Down Expand Up @@ -423,7 +446,14 @@ func (n *ngt) GetObject(uuid string) (vec []float32, err error) {
return vec, nil
}

func (n *ngt) CreateIndex(poolSize uint32) (err error) {
func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) {
ctx, span := trace.StartSpan(ctx, "vald/agent-ngt/service/NGT.CreateIndex")
defer func() {
if span != nil {
span.End()
}
}()

if n.indexing.Load().(bool) {
return nil
}
Expand Down Expand Up @@ -521,6 +551,13 @@ func (n *ngt) CreateIndex(poolSize uint32) (err error) {
}

func (n *ngt) SaveIndex(ctx context.Context) (err error) {
ctx, span := trace.StartSpan(ctx, "vald/agent-ngt/service/NGT.SaveIndex")
defer func() {
if span != nil {
span.End()
}
}()

if len(n.path) != 0 && !n.inMem {
eg, ctx := errgroup.New(ctx)
eg.Go(safety.RecoverFunc(func() error {
Expand Down Expand Up @@ -549,7 +586,14 @@ func (n *ngt) SaveIndex(ctx context.Context) (err error) {
}

func (n *ngt) CreateAndSaveIndex(ctx context.Context, poolSize uint32) (err error) {
err = n.CreateIndex(poolSize)
ctx, span := trace.StartSpan(ctx, "vald/agent-ngt/service/NGT.CreateAndSaveIndex")
defer func() {
if span != nil {
span.End()
}
}()

err = n.CreateIndex(ctx, poolSize)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/ngt/service/ngt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1791,6 +1791,7 @@ func Test_ngt_GetObject(t *testing.T) {

func Test_ngt_CreateIndex(t *testing.T) {
type args struct {
ctx context.Context
poolSize uint32
}
type fields struct {
Expand Down Expand Up @@ -1916,7 +1917,7 @@ func Test_ngt_CreateIndex(t *testing.T) {
dcd: test.fields.dcd,
}

err := n.CreateIndex(test.args.poolSize)
err := n.CreateIndex(test.args.ctx, test.args.poolSize)
if err := test.checkFunc(test.want, err); err != nil {
tt.Errorf("error = %v", err)
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/manager/index/service/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/net/grpc"
"github.com/vdaas/vald/internal/observability/trace"
"github.com/vdaas/vald/internal/safety"
)

Expand Down Expand Up @@ -126,6 +127,13 @@ func (idx *index) Start(ctx context.Context) (<-chan error, error) {
}

func (idx *index) execute(ctx context.Context, enableLowIndexSkip bool) (err error) {
ctx, span := trace.StartSpan(ctx, "vald/manager-index/service/Indexer.execute")
defer func() {
if span != nil {
span.End()
}
}()

if idx.indexing.Load().(bool) {
return nil
}
Expand Down Expand Up @@ -168,6 +176,13 @@ func (idx *index) execute(ctx context.Context, enableLowIndexSkip bool) (err err
}

func (idx *index) loadInfos(ctx context.Context) (err error) {
ctx, span := trace.StartSpan(ctx, "vald/manager-index/service/Indexer.loadInfos")
defer func() {
if span != nil {
span.End()
}
}()

var u, ucu uint32
var infoMap indexInfos
err = idx.client.GetClient().RangeConcurrent(ctx, len(idx.client.GetAddrs(ctx)),
Expand Down

0 comments on commit 302d624

Please sign in to comment.