diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index e555d148fea3..f8fbe89f2607 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -735,6 +735,7 @@ type Pebble struct { syncutil.Mutex flushCompletedCallback func() } + asyncDone sync.WaitGroup // supportsRangeKeys is 1 if the database supports range keys. It must // be accessed atomically. @@ -1006,7 +1007,7 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) { oldDiskSlow := lel.DiskSlow lel.DiskSlow = func(info pebble.DiskSlowInfo) { // Run oldDiskSlow asynchronously. - go oldDiskSlow(info) + p.async(func() { oldDiskSlow(info) }) } el := pebble.TeeEventListener( p.makeMetricEtcEventListener(ctx), @@ -1087,6 +1088,17 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) { return p, nil } +// async launches the provided function in a new goroutine. It uses a wait group +// to synchronize with (*Pebble).Close to ensure all launched goroutines have +// exited before Close returns. +func (p *Pebble) async(fn func()) { + p.asyncDone.Add(1) + go func() { + defer p.asyncDone.Done() + fn() + }() +} + func (p *Pebble) makeMetricEtcEventListener(ctx context.Context) pebble.EventListener { return pebble.EventListener{ WriteStallBegin: func(info pebble.WriteStallBeginInfo) { @@ -1132,7 +1144,7 @@ func (p *Pebble) makeMetricEtcEventListener(ctx context.Context) pebble.EventLis log.Fatalf(ctx, "file write stall detected: %s", info) } else { - go log.Errorf(ctx, "file write stall detected: %s", info) + p.async(func() { log.Errorf(ctx, "file write stall detected: %s", info) }) } return } @@ -1172,6 +1184,9 @@ func (p *Pebble) Close() { } p.closed = true + // Wait for any asynchronous goroutines to exit. + p.asyncDone.Wait() + handleErr := func(err error) { if err == nil { return