Skip to content

Commit

Permalink
use ch to set batch bbolt
Browse files Browse the repository at this point in the history
  • Loading branch information
ykadowak committed Sep 11, 2023
1 parent 33ca6c7 commit cf9fa5f
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 13 deletions.
14 changes: 14 additions & 0 deletions internal/cache/bbolt/bbolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,20 @@ func (b *Bbolt) SetBatch(kv map[string]struct{}) error {
return nil
}

// wait for this eg to make sure all the batches finished
func (b *Bbolt) SetBatch2(eg *errgroup.Group, key string, val []byte) error {
eg.Go(func() error {
b.db.Batch(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(bucket))
// FIXME: for index correction, value doesn't matter, but for more general use, it should be considered
err := b.Put([]byte(key), nil)
return err
})
return nil
})
return nil
}

func (b *Bbolt) Get(key string) ([]byte, bool, error) {
var val []byte
if err := b.db.View(func(tx *bolt.Tx) error {
Expand Down
37 changes: 24 additions & 13 deletions pkg/index/job/correction/service/corrector.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,8 @@ func (c *correct) correctWithCache(ctx context.Context) (err error) {

if err := c.discoverer.GetClient().OrderedRange(ctx, c.agentAddrs,
func(ctx context.Context, addr string, conn *grpc.ClientConn, copts ...grpc.CallOption) error {
// DEBUG:
tmpSet := make(map[string]struct{})
// ~DEBUG:
// FIXME: set ch size with cfg or something
wch := make(chan string, 1024)

// current address is the leftAgentAddrs[0] because this is OrderedRange and
// leftAgentAddrs is copied from c.agentAddrs
Expand All @@ -269,22 +268,33 @@ func (c *correct) correctWithCache(ctx context.Context) (err error) {

log.Infof("starting correction for agent %s, concurrency: %d", addr, concurrency)

bolteg := stdeg.Group{}
bolteg.SetLimit(1000)
go func() {
for {
select {
case <-ctx.Done():
log.Info("bbolt write goroutine finished")
return
case id := <-wch:
c.checkedIdBbolt.SetBatch2(&bolteg, id, nil)
}
}
}()

finalize := func() error {
err = seg.Wait()
if err != nil {
log.Errorf("err group returned error: %v", err)
return err
}

// DEBUG:
log.Info("writing cache to disk...")
if err := c.checkedIdBbolt.SetBatch(tmpSet); err != nil {
log.Errorf("SetBatch failed: %v", err)
err = bolteg.Wait()
if err != nil {
log.Errorf("bolt err group returned error: %v", err)
return err
}
// delete all the key from the tmpSet
tmpSet = nil
// ~DEBUG:
log.Info("bbolt all batch finished")

log.Infof("correction finished for agent %s", addr)
return nil
Expand Down Expand Up @@ -376,9 +386,10 @@ func (c *correct) correctWithCache(ctx context.Context) (err error) {
// if err != nil {
// return err
// }
c.rwmu.Lock()
tmpSet[id] = struct{}{}
c.rwmu.Unlock()
// c.rwmu.Lock()
// tmpSet[id] = struct{}{}
wch <- id
// c.rwmu.Unlock()
} else {
c.rwmu.Lock()
c.checkedId[id] = struct{}{}
Expand Down

0 comments on commit cf9fa5f

Please sign in to comment.