Skip to content

Commit

Permalink
📈 Add trace spans to agent-ngt and manager-index
Browse files Browse the repository at this point in the history
Signed-off-by: Rintaro Okamura <[email protected]>

:bug: Fix

Signed-off-by: Rintaro Okamura <[email protected]>
  • Loading branch information
rinx committed May 20, 2020
1 parent df143c9 commit ee45ba7
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 5 deletions.
2 changes: 1 addition & 1 deletion pkg/agent/ngt/handler/grpc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func (s *server) CreateIndex(ctx context.Context, c *payload.Control_CreateIndex
}
}()
res = new(payload.Empty)
err = s.ngt.CreateIndex(c.GetPoolSize())
err = s.ngt.CreateIndex(ctx context.Context, c.GetPoolSize())
if err != nil {
log.Errorf("[CreateIndex]\tUnknown error\t%+v", err)
if span != nil {
Expand Down
30 changes: 26 additions & 4 deletions pkg/agent/ngt/service/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/file"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/observability/trace"
"github.com/vdaas/vald/internal/safety"
"github.com/vdaas/vald/internal/timeutil"
"github.com/vdaas/vald/pkg/agent/ngt/model"
Expand All @@ -50,7 +51,7 @@ type NGT interface {
Delete(uuid string) (err error)
DeleteMultiple(uuids ...string) (err error)
GetObject(uuid string) (vec []float32, err error)
CreateIndex(poolSize uint32) (err error)
CreateIndex(ctx context.Context, poolSize uint32) (err error)
SaveIndex(ctx context.Context) (err error)
Exists(string) (uint32, bool)
CreateAndSaveIndex(ctx context.Context, poolSize uint32) (err error)
Expand Down Expand Up @@ -219,7 +220,7 @@ func (n *ngt) Start(ctx context.Context) <-chan error {
return ctx.Err()
case <-tick.C:
if int(atomic.LoadUint64(&n.ic)) >= n.alen {
err = n.CreateIndex(n.dps)
err = n.CreateIndex(ctx, n.dps)
}
case <-limit.C:
err = n.CreateAndSaveIndex(ctx, n.dps)
Expand Down Expand Up @@ -423,7 +424,14 @@ func (n *ngt) GetObject(uuid string) (vec []float32, err error) {
return vec, nil
}

func (n *ngt) CreateIndex(poolSize uint32) (err error) {
func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) {
ctx, span := trace.StartSpan(ctx, "vald/agent-ngt/service/NGT.CreateIndex")
defer func() {
if span != nil {
span.End()
}
}()

if n.indexing.Load().(bool) {
return nil
}
Expand Down Expand Up @@ -521,6 +529,13 @@ func (n *ngt) CreateIndex(poolSize uint32) (err error) {
}

func (n *ngt) SaveIndex(ctx context.Context) (err error) {
ctx, span := trace.StartSpan(ctx, "vald/agent-ngt/service/NGT.SaveIndex")
defer func() {
if span != nil {
span.End()
}
}()

if len(n.path) != 0 && !n.inMem {
eg, ctx := errgroup.New(ctx)
eg.Go(safety.RecoverFunc(func() error {
Expand Down Expand Up @@ -549,7 +564,14 @@ func (n *ngt) SaveIndex(ctx context.Context) (err error) {
}

func (n *ngt) CreateAndSaveIndex(ctx context.Context, poolSize uint32) (err error) {
err = n.CreateIndex(poolSize)
ctx, span := trace.StartSpan(ctx, "vald/agent-ngt/service/NGT.CreateAndSaveIndex")
defer func() {
if span != nil {
span.End()
}
}()

err = n.CreateIndex(ctx, poolSize)
if err != nil {
return err
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/manager/index/service/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/net/grpc"
"github.com/vdaas/vald/internal/observability/trace"
"github.com/vdaas/vald/internal/safety"
)

Expand Down Expand Up @@ -126,6 +127,13 @@ func (idx *index) Start(ctx context.Context) (<-chan error, error) {
}

func (idx *index) execute(ctx context.Context, enableLowIndexSkip bool) (err error) {
ctx, span := trace.StartSpan(ctx, "vald/manager-index/service/Indexer.execute")
defer func() {
if span != nil {
span.End()
}
}()

if idx.indexing.Load().(bool) {
return nil
}
Expand Down Expand Up @@ -168,6 +176,13 @@ func (idx *index) execute(ctx context.Context, enableLowIndexSkip bool) (err err
}

func (idx *index) loadInfos(ctx context.Context) (err error) {
ctx, span := trace.StartSpan(ctx, "vald/manager-index/service/Indexer.loadInfos")
defer func() {
if span != nil {
span.End()
}
}()

var u, ucu uint32
var infoMap indexInfos
err = idx.client.GetClient().RangeConcurrent(ctx, len(idx.client.GetAddrs(ctx)),
Expand Down

0 comments on commit ee45ba7

Please sign in to comment.