Skip to content

Commit

Permalink
store: read postings directly into delta encoded format
Browse files Browse the repository at this point in the history
Instead of allocating bytes for raw postings, let's read them directly
into diff varint format to save memory.

Signed-off-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
GiedriusS committed Jun 15, 2023
1 parent 0651f33 commit 30a176f
Show file tree
Hide file tree
Showing 3 changed files with 206 additions and 40 deletions.
79 changes: 39 additions & 40 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2486,6 +2486,12 @@ func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context,
return true, ps, nil
}

var bufioReaderPool = sync.Pool{
New: func() any {
return bufio.NewReader(nil)
},
}

// fetchPostings fill postings requested by posting groups.
// It returns one posting for each key, in the same order.
// If postings for given key is not fetched, entry at given index will be nil.
Expand Down Expand Up @@ -2570,52 +2576,60 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
// We assume index does not have any ptrs that has 0 length.
length := int64(part.End) - start

brdr := bufioReaderPool.Get().(*bufio.Reader)
defer bufioReaderPool.Put(brdr)

// Fetch from object storage concurrently and update stats and posting list.
g.Go(func() error {
begin := time.Now()

b, err := r.block.readIndexRange(ctx, start, length)
partReader, err := r.block.bkt.GetRange(ctx, r.block.indexFilename(), start, length)
if err != nil {
return errors.Wrap(err, "read postings range")
}
fetchTime := time.Since(begin)
defer runutil.CloseWithLogOnErr(r.block.logger, partReader, "readIndexRange close range reader")
brdr.Reset(partReader)

rdr := newPostingsReaderBuilder(ctx, brdr, ptrs[i:j], start, length)

r.mtx.Lock()
r.stats.postingsFetchCount++
r.stats.postingsFetched += j - i
r.stats.PostingsFetchDurationSum += fetchTime
r.stats.PostingsFetchedSizeSum += units.Base2Bytes(int(length))
r.mtx.Unlock()

for _, p := range ptrs[i:j] {
// index-header can estimate endings, which means we need to resize the endings.
pBytes, err := resizePostings(b[p.ptr.Start-start : p.ptr.End-start])
for rdr.Next() {
deltaEncodedPostings, postingsCount, keyID := rdr.AtDiffVarint()

output[keyID] = newDiffVarintPostings(deltaEncodedPostings, nil)

startCompression := time.Now()
dataToCache, err := snappyStreamedEncode(int(postingsCount), deltaEncodedPostings)
if err != nil {
return err
r.mtx.Lock()
r.stats.cachedPostingsCompressionErrors += 1
r.mtx.Unlock()
return errors.Wrap(err, "encoding with snappy")
}

// Reencode postings before storing to cache. If that fails, we store original bytes.
// This can only fail, if postings data was somehow corrupted,
// and there is nothing we can do about it.
// Errors from corrupted postings will be reported when postings are used.
bep := newBigEndianPostings(pBytes[4:])
dataToCache, compressionTime, compressionErrors, compressedSize := r.encodePostingsToCache(bep, bep.length())
r.mtx.Lock()
// Return postings and fill LRU cache.
// Truncate first 4 bytes which are length of posting.
output[p.keyID] = newBigEndianPostings(pBytes[4:])

r.block.indexCache.StorePostings(r.block.meta.ULID, keys[p.keyID], dataToCache)

// If we just fetched it we still have to update the stats for touched postings.
r.stats.postingsTouched++
r.stats.PostingsTouchedSizeSum += units.Base2Bytes(len(pBytes))
r.stats.PostingsTouchedSizeSum += units.Base2Bytes(int(len(deltaEncodedPostings)))
r.stats.cachedPostingsCompressions += 1
r.stats.cachedPostingsCompressionErrors += compressionErrors
r.stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(len(pBytes))
r.stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(compressedSize)
r.stats.CachedPostingsCompressionTimeSum += compressionTime
r.stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(len(deltaEncodedPostings))
r.stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(len(dataToCache))
r.stats.CachedPostingsCompressionTimeSum += time.Since(startCompression)
r.mtx.Unlock()

r.block.indexCache.StorePostings(r.block.meta.ULID, keys[keyID], dataToCache)
}

r.mtx.Lock()
r.stats.PostingsFetchDurationSum += time.Since(begin)
r.mtx.Unlock()

if rdr.Error() != nil {
return errors.Wrap(err, "reading postings")
}
return nil
})
Expand Down Expand Up @@ -2665,21 +2679,6 @@ func (r *bucketIndexReader) encodePostingsToCache(p index.Postings, length int)
return dataToCache, compressionTime, compressionErrors, compressedSize
}

func resizePostings(b []byte) ([]byte, error) {
d := encoding.Decbuf{B: b}
n := d.Be32int()
if d.Err() != nil {
return nil, errors.Wrap(d.Err(), "read postings list")
}

// 4 for postings number of entries, then 4, foreach each big endian posting.
size := 4 + n*4
if len(b) < size {
return nil, encoding.ErrInvalidSize
}
return b[:size], nil
}

// bigEndianPostings implements the Postings interface over a byte stream of
// big endian numbers.
type bigEndianPostings struct {
Expand Down
143 changes: 143 additions & 0 deletions pkg/store/postings.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package store

import (
"bufio"
"context"
"encoding/binary"
"fmt"
"io"

"github.com/pkg/errors"
)

type postingsReaderBuilder struct {
e error
readBuf []byte

r *bufio.Reader
postings []postingPtr

lastOffset int64
pi int
err error

start, length int64
cur []byte
keyID int
repeatFor int
numberOfPostingsInCur uint64
uvarintEncodeBuf []byte
ctx context.Context
}

// newPostingsReaderBuilder is a builder that reads directly from the index
// and builds a diff varint encoded []byte that could be later used directly.
func newPostingsReaderBuilder(ctx context.Context, r *bufio.Reader, postings []postingPtr, start, length int64) *postingsReaderBuilder {
prb := &postingsReaderBuilder{
r: r,
readBuf: make([]byte, 4),
start: start,
length: length,
postings: postings,
uvarintEncodeBuf: make([]byte, binary.MaxVarintLen64),
ctx: ctx,
}

return prb
}

func getInt32(r io.Reader, buf []byte) (uint32, error) {
read, err := r.Read(buf)
if err != nil {
return 0, errors.Wrap(err, "reading")
}
if read != 4 {
return 0, fmt.Errorf("read got %d bytes instead of 4", read)
}
return binary.BigEndian.Uint32(buf), nil
}

func (r *postingsReaderBuilder) Next() bool {
if r.ctx.Err() != nil {
r.e = r.ctx.Err()
return false
}
if r.repeatFor > 0 {
r.keyID = r.postings[r.pi-r.repeatFor].keyID
r.repeatFor--
return true
}
if r.pi >= len(r.postings) {
return false
}
if r.Error() != nil {
return false
}
from := r.postings[r.pi].ptr.Start - r.start

if from-r.lastOffset < 0 {
panic("would have skipped negative bytes")
}

_, err := r.r.Discard(int(from - r.lastOffset))
if err != nil {
return false
}
r.lastOffset += from - r.lastOffset

postingsCount, err := getInt32(r.r, r.readBuf[:])
if err != nil {
r.err = err
return false
}
r.lastOffset += 4

// Assume 1.25 bytes per compressed posting.
r.cur = make([]byte, 0, int(float64(postingsCount)*1.25))

prev := uint32(0)

for i := 0; i < int(postingsCount); i++ {
posting, err := getInt32(r.r, r.readBuf[:])
if err != nil {
r.err = err
return false
}
r.lastOffset += 4

uvarintSize := binary.PutUvarint(r.uvarintEncodeBuf, uint64(posting-prev))
r.cur = append(r.cur, r.uvarintEncodeBuf[:uvarintSize]...)
prev = posting
}
r.numberOfPostingsInCur = uint64(postingsCount)

r.keyID = r.postings[r.pi].keyID
r.pi++
for {
if r.pi >= len(r.postings) {
break
}

if r.postings[r.pi].ptr.Start == r.postings[r.pi-1].ptr.Start &&
r.postings[r.pi].ptr.End == r.postings[r.pi-1].ptr.End {
r.repeatFor++
r.pi++
continue
}

break
}

return true
}

func (r *postingsReaderBuilder) Error() error {
return r.e
}

func (r *postingsReaderBuilder) AtDiffVarint() ([]byte, uint64, int) {
return r.cur, r.numberOfPostingsInCur, r.keyID
}
24 changes: 24 additions & 0 deletions pkg/store/postings_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,3 +339,27 @@ func (it *diffVarintPostings) Seek(x storage.SeriesRef) bool {
func (it *diffVarintPostings) Err() error {
return it.buf.Err()
}

func snappyStreamedEncode(postingsLength int, diffVarintPostings []byte) ([]byte, error) {
compressedBuf := bytes.NewBuffer(make([]byte, 0, estimateSnappyStreamSize(postingsLength)))
if n, err := compressedBuf.WriteString(codecHeaderStreamedSnappy); err != nil {
return nil, fmt.Errorf("writing streamed snappy header")
} else if n != len(codecHeaderStreamedSnappy) {
return nil, fmt.Errorf("short-write streamed snappy header")
}

sw, err := extsnappy.Compressor.Compress(compressedBuf)
if err != nil {
return nil, fmt.Errorf("creating snappy compressor: %w", err)
}

_, err = sw.Write(diffVarintPostings)
if err != nil {
return nil, err
}
if err := sw.Close(); err != nil {
return nil, errors.Wrap(err, "closing snappy stream writer")
}

return compressedBuf.Bytes(), nil
}

0 comments on commit 30a176f

Please sign in to comment.