From 1d0523c46b7cfa844bfa3a3ca0b988107cdb4554 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Fri, 4 Aug 2023 13:38:27 +0300 Subject: [PATCH] store/bucket: wait until chunk loading ends in Close() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Chunk reader needs to wait until the chunk loading ends in Close() because otherwise there will be a race between appending to r.chunkBytes and reading from it. Signed-off-by: Giedrius Statkevičius --- pkg/store/bucket.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 09bfba32068..a4a214dc5ae 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -3160,6 +3160,9 @@ type bucketChunkReader struct { mtx sync.Mutex stats *queryStats chunkBytes []*[]byte // Byte slice to return to the chunk pool on close. + + loadingChunks bool + finishLoadingChks chan struct{} } func newBucketChunkReader(block *bucketBlock) *bucketChunkReader { @@ -3174,9 +3177,16 @@ func (r *bucketChunkReader) reset() { for i := range r.toLoad { r.toLoad[i] = r.toLoad[i][:0] } + if r.finishLoadingChks != nil { + <-r.finishLoadingChks + } + r.finishLoadingChks = make(chan struct{}) } func (r *bucketChunkReader) Close() error { + if r.loadingChunks { + <-r.finishLoadingChks + } r.block.pendingReaders.Done() for _, b := range r.chunkBytes { @@ -3201,6 +3211,12 @@ func (r *bucketChunkReader) addLoad(id chunks.ChunkRef, seriesEntry, chunk int) // load loads all added chunks and saves resulting aggrs to refs. func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, calculateChunkChecksum bool, bytesLimiter BytesLimiter) error { + r.loadingChunks = true + defer func() { + r.loadingChunks = false + close(r.finishLoadingChks) + }() + g, ctx := errgroup.WithContext(ctx) for seq, pIdxs := range r.toLoad {