Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
ykadowak committed Sep 19, 2023
1 parent e0ef7a8 commit 4dc4801
Showing 1 changed file with 30 additions and 36 deletions.
66 changes: 30 additions & 36 deletions pkg/index/job/correction/service/corrector.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,48 +139,30 @@ func (c *correct) correct(ctx context.Context) (err error) {
return err
}

seg, ctx := errgroup.WithContext(ctx)
// context and errgroup for stream.Recv and correction
sctx, scancel := context.WithCancel(ctx)
defer scancel()
seg, sctx := errgroup.WithContext(sctx)
concurrency := c.cfg.Corrector.GetStreamListConcurrency()
seg.SetLimit(concurrency)

// errgroup for bbolt AsyncSet
bolteg, ctx := errgroup.WithContext(ctx)
bolteg.SetLimit(2048)

finalize := func() error {
err = seg.Wait()
if err != nil {
log.Errorf("err group returned error: %v", err)
return err
}

err = bolteg.Wait()
if err != nil {
log.Errorf("bolt err group returned error: %v", err)
return err
}
log.Info("bbolt all batch finished")

log.Infof("correction finished for agent %s", addr)
return nil
}
defer finalize()

streamEnd := make(chan struct{})
var once sync.Once
var mu sync.Mutex

log.Infof("starting correction for agent %s, concurrency: %d", addr, concurrency)

// 事前にRecvすべき件数はわかるのだからその回数だけfor文を回すようにする方がいいか
// 事前にRecvすべき件数は事前にわからない。なぜなら処理中に新規でinsertされる可能性があるため
// TODO: そういうものはtimestampで判断して弾かないといけない
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-streamEnd:
return nil
case <-sctx.Done():
if !errors.Is(sctx.Err(), context.Canceled) {
log.Errorf("context done unexpectedly: %v", sctx.Err())
}
goto Finalize
default:
// TODO: when vald internal errgroup is changed to block when eg limitation is reached,
// switch to vald version of errgroup.
seg.Go(func() error {
mu.Lock()
// As long as we don't stream.Recv() from the stream, we do not consume the memory of the message.
Expand All @@ -191,9 +173,7 @@ func (c *correct) correct(ctx context.Context) (err error) {

if errors.Is(err, io.EOF) {
log.Debugf("StreamListObject stream finished for agent %s", addr)
once.Do(func() {
close(streamEnd)
})
scancel()
return nil
}
if err != nil {
Expand Down Expand Up @@ -229,9 +209,6 @@ func (c *correct) correct(ctx context.Context) (err error) {
},
leftAgentAddrs,
); err != nil {
// TODO: valdとstdでerrorの処理が違うので注意
// (valdはerrが着信するまでにスタートしていた処理は行われる)
// (stdはerrが着信すると他は全て止まる)
log.Errorf("failed to check consistency: %v", err)
return nil // continue other processes
}
Expand All @@ -243,6 +220,22 @@ func (c *correct) correct(ctx context.Context) (err error) {
})
}
}

Finalize:
err = seg.Wait()
if err != nil {
log.Errorf("err group returned error: %v", err)
}

berr := bolteg.Wait()
if berr != nil {
log.Errorf("bolt err group returned error: %v", err)
err = errors.Join(err, berr)
}
log.Info("bbolt all batch finished")

log.Infof("correction finished for agent %s", addr)
return err
},
); err != nil {
log.Errorf("failed to range over agents(%v): %v", c.agentAddrs, err)
Expand Down Expand Up @@ -288,6 +281,7 @@ func (c *correct) checkConsistency(ctx context.Context, targetReplica *vectorRep
Id: targetReplica.vec.GetId(),
},
})

if err != nil {
if st, ok := status.FromError(err); !ok {
log.Errorf("gRPC call returned not a gRPC status error: %v", err)
Expand Down

0 comments on commit 4dc4801

Please sign in to comment.