Skip to content

Commit

Permalink
Reduced allocated memory by chunks reader in the store gateway at que…
Browse files Browse the repository at this point in the history
…ry time (#3814)

* Reduced allocated memory by chunks reader in the store gateway at query time

Signed-off-by: Marco Pracucci <[email protected]>

* Fixed linter issues

Signed-off-by: Marco Pracucci <[email protected]>

* Fixed linter (hopefully)

Signed-off-by: Marco Pracucci <[email protected]>

* Renamed function

Signed-off-by: Marco Pracucci <[email protected]>

* Updated comment

Signed-off-by: Marco Pracucci <[email protected]>

* Updated code comment

Signed-off-by: Marco Pracucci <[email protected]>
  • Loading branch information
pracucci authored Feb 22, 2021
1 parent d207c0b commit 1bc66f9
Show file tree
Hide file tree
Showing 5 changed files with 484 additions and 25 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re
### Fixed

- [#3773](https://github.com/thanos-io/thanos/pull/3773) Compact: Pad compaction planner size check
- [#3796](https://github.com/thanos-io/thanos/pull/3796) Store: Decreased memory allocations while fetching block's chunks.
- [#3814](https://github.com/thanos-io/thanos/pull/3814) Store: Decreased memory utilisation while fetching block's chunks.
- [#3815](https://github.com/thanos-io/thanos/pull/3815) Receive: Improve handling of empty time series from clients

### Changed
Expand Down
73 changes: 50 additions & 23 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"context"
"encoding/binary"
"fmt"
"io"
"io/ioutil"
"math"
"os"
Expand Down Expand Up @@ -1461,33 +1460,30 @@ func (b *bucketBlock) readIndexRange(ctx context.Context, off, length int64) ([]
return buf.Bytes(), nil
}

func (b *bucketBlock) readChunkRange(ctx context.Context, seq int, off, length int64) (*[]byte, error) {
func (b *bucketBlock) readChunkRange(ctx context.Context, seq int, off, length int64, chunkRanges byteRanges) (*[]byte, error) {
if seq < 0 || seq >= len(b.chunkObjs) {
return nil, errors.Errorf("unknown segment file for index %d", seq)
}

// Request bytes.MinRead extra space to ensure the copy operation will not trigger
// a memory reallocation.
c, err := b.chunkPool.Get(int(length) + bytes.MinRead)
// Get a reader for the required range.
reader, err := b.bkt.GetRange(ctx, b.chunkObjs[seq], off, length)
if err != nil {
return nil, errors.Wrap(err, "allocate chunk bytes")
return nil, errors.Wrap(err, "get range reader")
}
defer runutil.CloseWithLogOnErr(b.logger, reader, "readChunkRange close range reader")

buf := bytes.NewBuffer(*c)

r, err := b.bkt.GetRange(ctx, b.chunkObjs[seq], off, length)
// Get a buffer from the pool.
chunkBuffer, err := b.chunkPool.Get(chunkRanges.size())
if err != nil {
b.chunkPool.Put(c)
return nil, errors.Wrap(err, "get range reader")
return nil, errors.Wrap(err, "allocate chunk bytes")
}
defer runutil.CloseWithLogOnErr(b.logger, r, "readChunkRange close range reader")

if _, err = io.Copy(buf, r); err != nil {
b.chunkPool.Put(c)
return nil, errors.Wrap(err, "read range")
*chunkBuffer, err = readByteRanges(reader, *chunkBuffer, chunkRanges)
if err != nil {
return nil, err
}
internalBuf := buf.Bytes()
return &internalBuf, nil

return chunkBuffer, nil
}

func (b *bucketBlock) indexReader(ctx context.Context) *bucketIndexReader {
Expand Down Expand Up @@ -2255,7 +2251,11 @@ func (r *bucketChunkReader) preload() error {
func (r *bucketChunkReader) loadChunks(ctx context.Context, offs []uint32, seq int, start, end uint32) error {
fetchBegin := time.Now()

b, err := r.block.readChunkRange(ctx, seq, int64(start), int64(end-start))
// Compute the byte ranges of chunks we actually need. The total read data may be bigger
// than required because of the partitioner.
chunkRanges := chunkOffsetsToByteRanges(offs, start)

b, err := r.block.readChunkRange(ctx, seq, int64(start), int64(end-start), chunkRanges)
if err != nil {
return errors.Wrapf(err, "read range for %d", seq)
}
Expand All @@ -2275,8 +2275,13 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, offs []uint32, seq i
r.stats.chunksFetchDurationSum += time.Since(fetchBegin)
r.stats.chunksFetchedSizeSum += int(end - start)

for _, o := range offs {
cb := (*b)[o-start:]
readOffset := 0
for idx, o := range offs {
chunkRange := chunkRanges[idx]

// The chunks byte ranges are stored contiguously in the data buffer.
cb := (*b)[readOffset : readOffset+chunkRange.length]
readOffset += chunkRange.length

l, n := binary.Uvarint(cb)
if n < 1 {
Expand All @@ -2293,15 +2298,14 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, offs []uint32, seq i
continue
}

// If we didn't fetch enough data for the chunk, fetch more. This can only really happen for last
// chunk in the list of fetched chunks, otherwise partitioner would merge fetch ranges together.
// If we didn't fetch enough data for the chunk, fetch more.
r.mtx.Unlock()
locked = false

fetchBegin = time.Now()

// Read entire chunk into new buffer.
nb, err := r.block.readChunkRange(ctx, seq, int64(o), int64(chLen))
nb, err := r.block.readChunkRange(ctx, seq, int64(o), int64(chLen), []byteRange{{offset: 0, length: chLen}})
if err != nil {
return errors.Wrapf(err, "preloaded chunk too small, expecting %d, and failed to fetch full chunk", chLen)
}
Expand All @@ -2324,6 +2328,29 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, offs []uint32, seq i
return nil
}

// chunkOffsetsToByteRanges returns non-overlapping byte ranges with each range offset
// relative to start. The provided input offsets must be sorted.
func chunkOffsetsToByteRanges(offsets []uint32, start uint32) byteRanges {
ranges := make([]byteRange, len(offsets))

for idx := 0; idx < len(offsets); idx++ {
ranges[idx] = byteRange{
// The byte range offset is required to be relative to the start of the read slice.
offset: int(offsets[idx] - start),
length: maxChunkSize,
}

if idx > 0 {
// Ensure ranges are non overlapping.
if prev := ranges[idx-1]; prev.length > ranges[idx].offset-prev.offset {
ranges[idx-1].length = ranges[idx].offset - prev.offset
}
}
}

return ranges
}

func (r *bucketChunkReader) Chunk(id uint64) (chunkenc.Chunk, error) {
c, ok := r.chunks[id]
if !ok {
Expand Down
160 changes: 159 additions & 1 deletion pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2309,9 +2309,167 @@ func BenchmarkBucketBlock_readChunkRange(b *testing.B) {
offset := int64(0)
length := readLengths[n%len(readLengths)]

_, err := blk.readChunkRange(ctx, 0, offset, length)
_, err := blk.readChunkRange(ctx, 0, offset, length, byteRanges{{offset: 0, length: int(length)}})
if err != nil {
b.Fatal(err.Error())
}
}
}

func BenchmarkBlockSeries(b *testing.B) {
var (
ctx = context.Background()
logger = log.NewNopLogger()
)

tmpDir, err := ioutil.TempDir("", "benchmark")
testutil.Ok(b, err)
b.Cleanup(func() {
testutil.Ok(b, os.RemoveAll(tmpDir))
})

bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt"))
testutil.Ok(b, err)
b.Cleanup(func() {
testutil.Ok(b, bkt.Close())
})

// Create a block.
head, _ := storetestutil.CreateHeadWithSeries(b, 0, storetestutil.HeadGenOptions{
TSDBDir: filepath.Join(tmpDir, "head"),
SamplesPerSeries: 86400 / 15, // Simulate 1 day block with 15s scrape interval.
Series: 1000,
PrependLabels: nil,
Random: rand.New(rand.NewSource(120)),
SkipChunks: true,
})
blockID := createBlockFromHead(b, tmpDir, head)
testutil.Ok(b, head.Close())

// Upload the block to the bucket.
thanosMeta := metadata.Thanos{
Labels: labels.Labels{{Name: "ext1", Value: "1"}}.Map(),
Downsample: metadata.ThanosDownsample{Resolution: 0},
Source: metadata.TestSource,
}

blockMeta, err := metadata.InjectThanos(logger, filepath.Join(tmpDir, blockID.String()), thanosMeta, nil)
testutil.Ok(b, err)

testutil.Ok(b, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, blockID.String())))

// Create chunk pool and partitioner using the same production settings.
chunkPool, err := NewDefaultChunkBytesPool(64 * 1024 * 1024 * 1024)
testutil.Ok(b, err)

partitioner := NewGapBasedPartitioner(PartitionerMaxGapSize)

// Create an index header reader.
indexHeaderReader, err := indexheader.NewBinaryReader(ctx, logger, bkt, tmpDir, blockMeta.ULID, DefaultPostingOffsetInMemorySampling)
testutil.Ok(b, err)
indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, storecache.DefaultInMemoryIndexCacheConfig)
testutil.Ok(b, err)

// Create a bucket block with only the dependencies we need for the benchmark.
blk, err := newBucketBlock(context.Background(), logger, newBucketStoreMetrics(nil), blockMeta, bkt, tmpDir, indexCache, chunkPool, indexHeaderReader, partitioner)
testutil.Ok(b, err)

for _, concurrency := range []int{1, 2, 4, 8, 16, 32} {
b.Run(fmt.Sprintf("concurrency: %d", concurrency), func(b *testing.B) {
benchmarkBlockSeriesWithConcurrency(b, concurrency, blockMeta, blk)
})
}
}

func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMeta *metadata.Meta, blk *bucketBlock) {
ctx := context.Background()

// Run the same number of queries per goroutine.
queriesPerWorker := b.N / concurrency

// No limits.
chunksLimiter := NewChunksLimiterFactory(0)(nil)
seriesLimiter := NewSeriesLimiterFactory(0)(nil)

// Run multiple workers to execute the queries.
wg := sync.WaitGroup{}
wg.Add(concurrency)

for w := 0; w < concurrency; w++ {
go func() {
defer wg.Done()

for n := 0; n < queriesPerWorker; n++ {
// Each query touches a subset of series. To make it reproducible and make sure
// we just don't query consecutive series (as is in the real world), we do create
// a label matcher which looks for a short integer within the label value.
labelMatcher := fmt.Sprintf(".*%d.*", n%20)

req := &storepb.SeriesRequest{
MinTime: blockMeta.MinTime,
MaxTime: blockMeta.MaxTime,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_RE, Name: "i", Value: labelMatcher},
},
SkipChunks: false,
}

matchers, err := storepb.MatchersToPromMatchers(req.Matchers...)
testutil.Ok(b, err)

indexReader := blk.indexReader(ctx)
chunkReader := blk.chunkReader(ctx)

seriesSet, _, err := blockSeries(nil, indexReader, chunkReader, matchers, req, chunksLimiter, seriesLimiter)
testutil.Ok(b, err)

// Ensure at least 1 series has been returned (as expected).
testutil.Equals(b, true, seriesSet.Next())

testutil.Ok(b, indexReader.Close())
testutil.Ok(b, chunkReader.Close())
}
}()
}

wg.Wait()
}

func TestChunkOffsetsToByteRanges(t *testing.T) {
tests := map[string]struct {
offsets []uint32
start uint32
expected byteRanges
}{
"no offsets in input": {
offsets: nil,
expected: byteRanges{},
},
"no overlapping ranges in input": {
offsets: []uint32{1000, 20000, 45000},
start: 1000,
expected: byteRanges{
{offset: 0, length: 16000},
{offset: 19000, length: 16000},
{offset: 44000, length: 16000},
},
},
"overlapping ranges in input": {
offsets: []uint32{1000, 5000, 9500, 30000},
start: 1000,
expected: byteRanges{
{offset: 0, length: 4000},
{offset: 4000, length: 4500},
{offset: 8500, length: 16000},
{offset: 29000, length: 16000},
},
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
testutil.Equals(t, len(testData.offsets), len(testData.expected))
testutil.Equals(t, testData.expected, chunkOffsetsToByteRanges(testData.offsets, testData.start))
})
}
}
Loading

0 comments on commit 1bc66f9

Please sign in to comment.