diff --git a/internal/cache/bbolt/bbolt.go b/internal/cache/bbolt/bbolt.go index 174a98feee..920138ba7d 100644 --- a/internal/cache/bbolt/bbolt.go +++ b/internal/cache/bbolt/bbolt.go @@ -68,6 +68,20 @@ func (b *Bbolt) SetBatch(kv map[string]struct{}) error { return nil } +// wait for this eg to make sure all the batches finished +func (b *Bbolt) SetBatch2(eg *errgroup.Group, key string, val []byte) error { + eg.Go(func() error { + b.db.Batch(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(bucket)) + // FIXME: for index correction, value doesn't matter, but for more general use, it should be considered + err := b.Put([]byte(key), nil) + return err + }) + return nil + }) + return nil +} + func (b *Bbolt) Get(key string) ([]byte, bool, error) { var val []byte if err := b.db.View(func(tx *bolt.Tx) error { diff --git a/pkg/index/job/correction/service/corrector.go b/pkg/index/job/correction/service/corrector.go index 15b349d737..c939b1f0d2 100644 --- a/pkg/index/job/correction/service/corrector.go +++ b/pkg/index/job/correction/service/corrector.go @@ -249,9 +249,8 @@ func (c *correct) correctWithCache(ctx context.Context) (err error) { if err := c.discoverer.GetClient().OrderedRange(ctx, c.agentAddrs, func(ctx context.Context, addr string, conn *grpc.ClientConn, copts ...grpc.CallOption) error { - // DEBUG: - tmpSet := make(map[string]struct{}) - // ~DEBUG: + // FIXME: set ch size with cfg or something + wch := make(chan string, 1024) // current address is the leftAgentAddrs[0] because this is OrderedRange and // leftAgentAddrs is copied from c.agentAddrs @@ -269,6 +268,20 @@ func (c *correct) correctWithCache(ctx context.Context) (err error) { log.Infof("starting correction for agent %s, concurrency: %d", addr, concurrency) + bolteg := stdeg.Group{} + bolteg.SetLimit(1000) + go func() { + for { + select { + case <-ctx.Done(): + log.Info("bbolt write goroutine finished") + return + case id := <-wch: + c.checkedIdBbolt.SetBatch2(&bolteg, id, nil) + } + } + }() + finalize := func() error { err = seg.Wait() if err != nil { @@ -276,15 +289,12 @@ func (c *correct) correctWithCache(ctx context.Context) (err error) { return err } - // DEBUG: - log.Info("writing cache to disk...") - if err := c.checkedIdBbolt.SetBatch(tmpSet); err != nil { - log.Errorf("SetBatch failed: %v", err) + err = bolteg.Wait() + if err != nil { + log.Errorf("bolt err group returned error: %v", err) return err } - // delete all the key from the tmpSet - tmpSet = nil - // ~DEBUG: + log.Info("bbolt all batch finished") log.Infof("correction finished for agent %s", addr) return nil @@ -376,9 +386,10 @@ func (c *correct) correctWithCache(ctx context.Context) (err error) { // if err != nil { // return err // } - c.rwmu.Lock() - tmpSet[id] = struct{}{} - c.rwmu.Unlock() + // c.rwmu.Lock() + // tmpSet[id] = struct{}{} + wch <- id + // c.rwmu.Unlock() } else { c.rwmu.Lock() c.checkedId[id] = struct{}{}