From 4dc480167487b3a9f430c550c0ab4beef1ae615b Mon Sep 17 00:00:00 2001 From: ykadowak Date: Tue, 19 Sep 2023 08:45:18 +0000 Subject: [PATCH] refactor --- pkg/index/job/correction/service/corrector.go | 66 +++++++++---------- 1 file changed, 30 insertions(+), 36 deletions(-) diff --git a/pkg/index/job/correction/service/corrector.go b/pkg/index/job/correction/service/corrector.go index 0c5c5d0617..9bee36eec5 100644 --- a/pkg/index/job/correction/service/corrector.go +++ b/pkg/index/job/correction/service/corrector.go @@ -139,48 +139,30 @@ func (c *correct) correct(ctx context.Context) (err error) { return err } - seg, ctx := errgroup.WithContext(ctx) + // context and errgroup for stream.Recv and correction + sctx, scancel := context.WithCancel(ctx) + defer scancel() + seg, sctx := errgroup.WithContext(sctx) concurrency := c.cfg.Corrector.GetStreamListConcurrency() seg.SetLimit(concurrency) + // errgroup for bbolt AsyncSet bolteg, ctx := errgroup.WithContext(ctx) bolteg.SetLimit(2048) - finalize := func() error { - err = seg.Wait() - if err != nil { - log.Errorf("err group returned error: %v", err) - return err - } - - err = bolteg.Wait() - if err != nil { - log.Errorf("bolt err group returned error: %v", err) - return err - } - log.Info("bbolt all batch finished") - - log.Infof("correction finished for agent %s", addr) - return nil - } - defer finalize() - - streamEnd := make(chan struct{}) - var once sync.Once var mu sync.Mutex - log.Infof("starting correction for agent %s, concurrency: %d", addr, concurrency) - // 事前にRecvすべき件数はわかるのだからその回数だけfor文を回すようにする方がいいか + // 事前にRecvすべき件数は事前にわからない。なぜなら処理中に新規でinsertされる可能性があるため + // TODO: そういうものはtimestampで判断して弾かないといけない for { select { - case <-ctx.Done(): - return ctx.Err() - case <-streamEnd: - return nil + case <-sctx.Done(): + if !errors.Is(sctx.Err(), context.Canceled) { + log.Errorf("context done unexpectedly: %v", sctx.Err()) + } + goto Finalize default: - // TODO: when vald internal errgroup is changed to block when eg limitation is reached, - // switch to vald version of errgroup. seg.Go(func() error { mu.Lock() // As long as we don't stream.Recv() from the stream, we do not consume the memory of the message. @@ -191,9 +173,7 @@ func (c *correct) correct(ctx context.Context) (err error) { if errors.Is(err, io.EOF) { log.Debugf("StreamListObject stream finished for agent %s", addr) - once.Do(func() { - close(streamEnd) - }) + scancel() return nil } if err != nil { @@ -229,9 +209,6 @@ func (c *correct) correct(ctx context.Context) (err error) { }, leftAgentAddrs, ); err != nil { - // TODO: valdとstdでerrorの処理が違うので注意 - // (valdはerrが着信するまでにスタートしていた処理は行われる) - // (stdはerrが着信すると他は全て止まる) log.Errorf("failed to check consistency: %v", err) return nil // continue other processes } @@ -243,6 +220,22 @@ func (c *correct) correct(ctx context.Context) (err error) { }) } } + + Finalize: + err = seg.Wait() + if err != nil { + log.Errorf("err group returned error: %v", err) + } + + berr := bolteg.Wait() + if berr != nil { + log.Errorf("bolt err group returned error: %v", err) + err = errors.Join(err, berr) + } + log.Info("bbolt all batch finished") + + log.Infof("correction finished for agent %s", addr) + return err }, ); err != nil { log.Errorf("failed to range over agents(%v): %v", c.agentAddrs, err) @@ -288,6 +281,7 @@ func (c *correct) checkConsistency(ctx context.Context, targetReplica *vectorRep Id: targetReplica.vec.GetId(), }, }) + if err != nil { if st, ok := status.FromError(err); !ok { log.Errorf("gRPC call returned not a gRPC status error: %v", err)