Skip to content

Commit

Permalink
store: read postings directly into delta encoded format (thanos-io#6442)
Browse files Browse the repository at this point in the history
Instead of allocating bytes for raw postings, let's read them directly
into diff varint format to save memory.

Signed-off-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
GiedriusS committed Jul 27, 2023
1 parent ecc7f68 commit fccb3c7
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 40 deletions.
79 changes: 39 additions & 40 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2486,6 +2486,12 @@ func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context,
return true, ps, nil
}

var bufioReaderPool = sync.Pool{
New: func() any {
return bufio.NewReader(nil)
},
}

// fetchPostings fill postings requested by posting groups.
// It returns one posting for each key, in the same order.
// If postings for given key is not fetched, entry at given index will be nil.
Expand Down Expand Up @@ -2570,52 +2576,60 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
// We assume index does not have any ptrs that has 0 length.
length := int64(part.End) - start

brdr := bufioReaderPool.Get().(*bufio.Reader)
defer bufioReaderPool.Put(brdr)

// Fetch from object storage concurrently and update stats and posting list.
g.Go(func() error {
begin := time.Now()

b, err := r.block.readIndexRange(ctx, start, length)
partReader, err := r.block.bkt.GetRange(ctx, r.block.indexFilename(), start, length)
if err != nil {
return errors.Wrap(err, "read postings range")
}
fetchTime := time.Since(begin)
defer runutil.CloseWithLogOnErr(r.block.logger, partReader, "readIndexRange close range reader")
brdr.Reset(partReader)

rdr := newPostingsReaderBuilder(ctx, brdr, ptrs[i:j], start, length)

r.mtx.Lock()
r.stats.postingsFetchCount++
r.stats.postingsFetched += j - i
r.stats.PostingsFetchDurationSum += fetchTime
r.stats.PostingsFetchedSizeSum += units.Base2Bytes(int(length))
r.mtx.Unlock()

for _, p := range ptrs[i:j] {
// index-header can estimate endings, which means we need to resize the endings.
pBytes, err := resizePostings(b[p.ptr.Start-start : p.ptr.End-start])
for rdr.Next() {
diffVarintPostings, postingsCount, keyID := rdr.AtDiffVarint()

output[keyID] = newDiffVarintPostings(diffVarintPostings, nil)

startCompression := time.Now()
dataToCache, err := snappyStreamedEncode(int(postingsCount), diffVarintPostings)
if err != nil {
return err
r.mtx.Lock()
r.stats.cachedPostingsCompressionErrors += 1
r.mtx.Unlock()
return errors.Wrap(err, "encoding with snappy")
}

// Reencode postings before storing to cache. If that fails, we store original bytes.
// This can only fail, if postings data was somehow corrupted,
// and there is nothing we can do about it.
// Errors from corrupted postings will be reported when postings are used.
bep := newBigEndianPostings(pBytes[4:])
dataToCache, compressionTime, compressionErrors, compressedSize := r.encodePostingsToCache(bep, bep.length())
r.mtx.Lock()
// Return postings and fill LRU cache.
// Truncate first 4 bytes which are length of posting.
output[p.keyID] = newBigEndianPostings(pBytes[4:])

r.block.indexCache.StorePostings(r.block.meta.ULID, keys[p.keyID], dataToCache)

// If we just fetched it we still have to update the stats for touched postings.
r.stats.postingsTouched++
r.stats.PostingsTouchedSizeSum += units.Base2Bytes(len(pBytes))
r.stats.PostingsTouchedSizeSum += units.Base2Bytes(int(len(diffVarintPostings)))
r.stats.cachedPostingsCompressions += 1
r.stats.cachedPostingsCompressionErrors += compressionErrors
r.stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(len(pBytes))
r.stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(compressedSize)
r.stats.CachedPostingsCompressionTimeSum += compressionTime
r.stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(len(diffVarintPostings))
r.stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(len(dataToCache))
r.stats.CachedPostingsCompressionTimeSum += time.Since(startCompression)
r.mtx.Unlock()

r.block.indexCache.StorePostings(r.block.meta.ULID, keys[keyID], dataToCache)
}

r.mtx.Lock()
r.stats.PostingsFetchDurationSum += time.Since(begin)
r.mtx.Unlock()

if rdr.Error() != nil {
return errors.Wrap(err, "reading postings")
}
return nil
})
Expand Down Expand Up @@ -2665,21 +2679,6 @@ func (r *bucketIndexReader) encodePostingsToCache(p index.Postings, length int)
return dataToCache, compressionTime, compressionErrors, compressedSize
}

func resizePostings(b []byte) ([]byte, error) {
d := encoding.Decbuf{B: b}
n := d.Be32int()
if d.Err() != nil {
return nil, errors.Wrap(d.Err(), "read postings list")
}

// 4 for postings number of entries, then 4, foreach each big endian posting.
size := 4 + n*4
if len(b) < size {
return nil, encoding.ErrInvalidSize
}
return b[:size], nil
}

// bigEndianPostings implements the Postings interface over a byte stream of
// big endian numbers.
type bigEndianPostings struct {
Expand Down
142 changes: 142 additions & 0 deletions pkg/store/postings.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package store

import (
"bufio"
"context"
"encoding/binary"
"fmt"
"io"

"github.com/pkg/errors"
)

type postingsReaderBuilder struct {
e error
readBuf []byte

r *bufio.Reader
postings []postingPtr

lastOffset int64
pi int

start, length int64
cur []byte
keyID int
repeatFor int
numberOfPostingsInCur uint64
uvarintEncodeBuf []byte
ctx context.Context
}

// newPostingsReaderBuilder is a builder that reads directly from the index
// and builds a diff varint encoded []byte that could be later used directly.
func newPostingsReaderBuilder(ctx context.Context, r *bufio.Reader, postings []postingPtr, start, length int64) *postingsReaderBuilder {
prb := &postingsReaderBuilder{
r: r,
readBuf: make([]byte, 4),
start: start,
length: length,
postings: postings,
uvarintEncodeBuf: make([]byte, binary.MaxVarintLen64),
ctx: ctx,
}

return prb
}

func getInt32(r io.Reader, buf []byte) (uint32, error) {
read, err := r.Read(buf)
if err != nil {
return 0, errors.Wrap(err, "reading")
}
if read != 4 {
return 0, fmt.Errorf("read got %d bytes instead of 4", read)
}
return binary.BigEndian.Uint32(buf), nil
}

func (r *postingsReaderBuilder) Next() bool {
if r.ctx.Err() != nil {
r.e = r.ctx.Err()
return false
}
if r.repeatFor > 0 {
r.keyID = r.postings[r.pi-r.repeatFor].keyID
r.repeatFor--
return true
}
if r.pi >= len(r.postings) {
return false
}
if r.Error() != nil {
return false
}
from := r.postings[r.pi].ptr.Start - r.start

if from-r.lastOffset < 0 {
panic("would have skipped negative bytes")
}

_, err := r.r.Discard(int(from - r.lastOffset))
if err != nil {
return false
}
r.lastOffset += from - r.lastOffset

postingsCount, err := getInt32(r.r, r.readBuf[:])
if err != nil {
r.e = err
return false
}
r.lastOffset += 4

// Assume 1.25 bytes per compressed posting.
r.cur = make([]byte, 0, int(float64(postingsCount)*1.25))

prev := uint32(0)

for i := 0; i < int(postingsCount); i++ {
posting, err := getInt32(r.r, r.readBuf[:])
if err != nil {
r.e = err
return false
}
r.lastOffset += 4

uvarintSize := binary.PutUvarint(r.uvarintEncodeBuf, uint64(posting-prev))
r.cur = append(r.cur, r.uvarintEncodeBuf[:uvarintSize]...)
prev = posting
}
r.numberOfPostingsInCur = uint64(postingsCount)

r.keyID = r.postings[r.pi].keyID
r.pi++
for {
if r.pi >= len(r.postings) {
break
}

if r.postings[r.pi].ptr.Start == r.postings[r.pi-1].ptr.Start &&
r.postings[r.pi].ptr.End == r.postings[r.pi-1].ptr.End {
r.repeatFor++
r.pi++
continue
}

break
}

return true
}

func (r *postingsReaderBuilder) Error() error {
return r.e
}

func (r *postingsReaderBuilder) AtDiffVarint() ([]byte, uint64, int) {
return r.cur, r.numberOfPostingsInCur, r.keyID
}
24 changes: 24 additions & 0 deletions pkg/store/postings_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,3 +339,27 @@ func (it *diffVarintPostings) Seek(x storage.SeriesRef) bool {
func (it *diffVarintPostings) Err() error {
return it.buf.Err()
}

func snappyStreamedEncode(postingsLength int, diffVarintPostings []byte) ([]byte, error) {
compressedBuf := bytes.NewBuffer(make([]byte, 0, estimateSnappyStreamSize(postingsLength)))
if n, err := compressedBuf.WriteString(codecHeaderStreamedSnappy); err != nil {
return nil, fmt.Errorf("writing streamed snappy header")
} else if n != len(codecHeaderStreamedSnappy) {
return nil, fmt.Errorf("short-write streamed snappy header")
}

sw, err := extsnappy.Compressor.Compress(compressedBuf)
if err != nil {
return nil, fmt.Errorf("creating snappy compressor: %w", err)
}

_, err = sw.Write(diffVarintPostings)
if err != nil {
return nil, err
}
if err := sw.Close(); err != nil {
return nil, errors.Wrap(err, "closing snappy stream writer")
}

return compressedBuf.Bytes(), nil
}

0 comments on commit fccb3c7

Please sign in to comment.