Skip to content

Commit

Permalink
store: read postings directly into delta encoded format
Browse files Browse the repository at this point in the history
Instead of allocating bytes for raw postings, let's read them directly
into diff varint format to save memory.

Signed-off-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
GiedriusS committed Jun 13, 2023
1 parent 0651f33 commit bcff74a
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 27 deletions.
46 changes: 19 additions & 27 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2486,6 +2486,12 @@ func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context,
return true, ps, nil
}

var bufioReaderPool = sync.Pool{
New: func() any {
return bufio.NewReader(nil)
},
}

// fetchPostings fill postings requested by posting groups.
// It returns one posting for each key, in the same order.
// If postings for given key is not fetched, entry at given index will be nil.
Expand Down Expand Up @@ -2574,48 +2580,34 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
g.Go(func() error {
begin := time.Now()

b, err := r.block.readIndexRange(ctx, start, length)
partReader, err := r.block.bkt.GetRange(ctx, r.block.indexFilename(), start, length)
if err != nil {
return errors.Wrap(err, "read postings range")
}
defer runutil.CloseWithLogOnErr(r.block.logger, partReader, "readIndexRange close range reader")
fetchTime := time.Since(begin)

brdr := bufioReaderPool.Get().(*bufio.Reader)
brdr.Reset(partReader)
rdr := newPostingsReaderBuilder(brdr, ptrs, start, length)

r.mtx.Lock()
r.stats.postingsFetchCount++
r.stats.postingsFetched += j - i
r.stats.PostingsFetchDurationSum += fetchTime
r.stats.PostingsFetchedSizeSum += units.Base2Bytes(int(length))
r.mtx.Unlock()

for _, p := range ptrs[i:j] {
// index-header can estimate endings, which means we need to resize the endings.
pBytes, err := resizePostings(b[p.ptr.Start-start : p.ptr.End-start])
for rdr.Next() {
deltaEncodedPostings, postingsCount, keyID := rdr.AtDiffVarint()
output[keyID] = newDiffVarintPostings(deltaEncodedPostings, nil)

dataToCache, err := snappyStreamedEncode(int(postingsCount), deltaEncodedPostings)
if err != nil {
return err
return errors.Wrap(err, "encoding with snappy")
}

// Reencode postings before storing to cache. If that fails, we store original bytes.
// This can only fail, if postings data was somehow corrupted,
// and there is nothing we can do about it.
// Errors from corrupted postings will be reported when postings are used.
bep := newBigEndianPostings(pBytes[4:])
dataToCache, compressionTime, compressionErrors, compressedSize := r.encodePostingsToCache(bep, bep.length())
r.mtx.Lock()
// Return postings and fill LRU cache.
// Truncate first 4 bytes which are length of posting.
output[p.keyID] = newBigEndianPostings(pBytes[4:])

r.block.indexCache.StorePostings(r.block.meta.ULID, keys[p.keyID], dataToCache)

// If we just fetched it we still have to update the stats for touched postings.
r.stats.postingsTouched++
r.stats.PostingsTouchedSizeSum += units.Base2Bytes(len(pBytes))
r.stats.cachedPostingsCompressions += 1
r.stats.cachedPostingsCompressionErrors += compressionErrors
r.stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(len(pBytes))
r.stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(compressedSize)
r.stats.CachedPostingsCompressionTimeSum += compressionTime
r.mtx.Unlock()
r.block.indexCache.StorePostings(r.block.meta.ULID, keys[keyID], dataToCache)
}
return nil
})
Expand Down
136 changes: 136 additions & 0 deletions pkg/store/postings.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package store

import (
"bufio"
"encoding/binary"
"fmt"
"io"

"github.com/pkg/errors"
)

type postingsReaderBuilder struct {
e error
readBuf []byte

r *bufio.Reader
postings []postingPtr

lastOffset int64
pi int
err error

start, length int64
cur []byte
keyID int
repeatFor int
numberOfPostingsInCur uint64
uvarintEncodeBuf []byte
}

// newPostingsReaderBuilder is a builder that reads directly from the index
// and builds a diff varint encoded []byte that could be later used directly.
func newPostingsReaderBuilder(r *bufio.Reader, postings []postingPtr, start, length int64) *postingsReaderBuilder {
prb := &postingsReaderBuilder{
r: r,
readBuf: make([]byte, 4),
start: start,
length: length,
postings: postings,
uvarintEncodeBuf: make([]byte, binary.MaxVarintLen64),
}

return prb
}

func getInt32(r io.Reader, buf []byte) (uint32, error) {
read, err := r.Read(buf)
if err != nil {
return 0, errors.Wrap(err, "reading")
}
if read != 4 {
return 0, fmt.Errorf("read got %d bytes instead of 4", read)
}
return binary.BigEndian.Uint32(buf), nil
}

func (r *postingsReaderBuilder) Next() bool {
if r.repeatFor > 0 {
r.repeatFor--
r.keyID = r.postings[r.pi-r.repeatFor].keyID
return true
}
if r.pi >= len(r.postings) {
return false
}
if r.Error() != nil {
return false
}
from := r.postings[r.pi].ptr.Start - r.start

if from-r.lastOffset < 0 {
panic("would have skipped negative bytes")
}

_, err := r.r.Discard(int(from - r.lastOffset))
if err != nil {
return false
}
r.lastOffset += from - r.lastOffset

postingsCount, err := getInt32(r.r, r.readBuf[:])
if err != nil {
r.err = err
return false
}
r.lastOffset += 4

// Assume 1.25 bytes per compressed posting.
r.cur = make([]byte, 0, int(float64(postingsCount)*1.25))

prev := uint32(0)

for i := 0; i < int(postingsCount); i++ {
posting, err := getInt32(r.r, r.readBuf[:])
if err != nil {
r.err = err
return false
}
r.lastOffset += 4

uvarintSize := binary.PutUvarint(r.uvarintEncodeBuf, uint64(posting-prev))
r.cur = append(r.cur, r.uvarintEncodeBuf[:uvarintSize]...)
prev = posting
}
r.numberOfPostingsInCur = uint64(postingsCount)

r.pi++
for {
if r.pi >= len(r.postings) {
break
}

if r.postings[r.pi].ptr.Start == r.postings[r.pi-1].ptr.Start &&
r.postings[r.pi].ptr.End == r.postings[r.pi-1].ptr.End {
r.repeatFor++
r.keyID = r.postings[r.pi].keyID
r.pi++
continue
}

break
}

return true
}

func (r *postingsReaderBuilder) Error() error {
return r.e
}

func (r *postingsReaderBuilder) AtDiffVarint() ([]byte, uint64, int) {
return r.cur, r.numberOfPostingsInCur, r.keyID
}
24 changes: 24 additions & 0 deletions pkg/store/postings_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,3 +339,27 @@ func (it *diffVarintPostings) Seek(x storage.SeriesRef) bool {
func (it *diffVarintPostings) Err() error {
return it.buf.Err()
}

func snappyStreamedEncode(postingsLength int, diffVarintPostings []byte) ([]byte, error) {
compressedBuf := bytes.NewBuffer(make([]byte, 0, estimateSnappyStreamSize(postingsLength)))
if n, err := compressedBuf.WriteString(codecHeaderStreamedSnappy); err != nil {
return nil, fmt.Errorf("writing streamed snappy header")
} else if n != len(codecHeaderStreamedSnappy) {
return nil, fmt.Errorf("short-write streamed snappy header")
}

sw, err := extsnappy.Compressor.Compress(compressedBuf)
if err != nil {
return nil, fmt.Errorf("creating snappy compressor: %w", err)
}

_, err = sw.Write(diffVarintPostings)
if err != nil {
return nil, err
}
if err := sw.Close(); err != nil {
return nil, errors.Wrap(err, "closing snappy stream writer")
}

return compressedBuf.Bytes(), nil
}

0 comments on commit bcff74a

Please sign in to comment.