Skip to content

Commit

Permalink
store/bucket: wait until chunk loading ends in Close()
Browse files Browse the repository at this point in the history
Chunk reader needs to wait until the chunk loading ends in Close()
because otherwise there will be a race between appending to r.chunkBytes
and reading from it.

Signed-off-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
GiedriusS committed Aug 4, 2023
1 parent eb80318 commit 9bf6078
Showing 1 changed file with 9 additions and 4 deletions.
13 changes: 9 additions & 4 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -3153,6 +3153,8 @@ type bucketChunkReader struct {
mtx sync.Mutex
stats *queryStats
chunkBytes []*[]byte // Byte slice to return to the chunk pool on close.

chunkLoading sync.WaitGroup
}

func newBucketChunkReader(block *bucketBlock) *bucketChunkReader {
Expand All @@ -3170,6 +3172,9 @@ func (r *bucketChunkReader) reset() {
}

func (r *bucketChunkReader) Close() error {
// Wait until any chunk loading ends.
r.chunkLoading.Wait()

r.block.pendingReaders.Done()

for _, b := range r.chunkBytes {
Expand Down Expand Up @@ -3211,6 +3216,7 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [
r.stats.DataDownloadedSizeSum += units.Base2Bytes(p.End - p.Start)
}

r.chunkLoading.Add(len(parts))
for _, p := range parts {
seq := seq
p := p
Expand All @@ -3234,6 +3240,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a
}
r.stats.ChunksFetchDurationSum += time.Since(fetchBegin)
r.mtx.Unlock()
r.chunkLoading.Done()
}()

// Get a reader for the required range.
Expand Down Expand Up @@ -3268,7 +3275,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a
} else {
buf = make([]byte, r.block.estimatedMaxChunkSize)
}
defer r.block.chunkPool.Put(&buf)
r.chunkBytes = append(r.chunkBytes, &buf)

for i, pIdx := range pIdxs {
// Fast forward range reader to the next chunk start in case of sparse (for our purposes) byte range.
Expand Down Expand Up @@ -3333,6 +3340,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a
if len(*nb) != chunkLen {
return errors.Errorf("preloaded chunk too small, expecting %d", chunkLen)
}
r.chunkBytes = append(r.chunkBytes, nb)

r.mtx.Lock()
locked = true
Expand All @@ -3341,13 +3349,10 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a
r.stats.ChunksFetchedSizeSum += units.Base2Bytes(len(*nb))
err = populateChunk(&(res[pIdx.seriesEntry].chks[pIdx.chunk]), rawChunk((*nb)[n:]), aggrs, r.save, calculateChunkChecksum)
if err != nil {
r.block.chunkPool.Put(nb)
return errors.Wrap(err, "populate chunk")
}
r.stats.chunksTouched++
r.stats.ChunksTouchedSizeSum += units.Base2Bytes(int(chunkDataLen))

r.block.chunkPool.Put(nb)
}
return nil
}
Expand Down

0 comments on commit 9bf6078

Please sign in to comment.