Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/bucket: snappy-encoded postings reading improvements #6245

Merged
merged 6 commits into from
Apr 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2162,7 +2162,12 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M
keys = append(keys, allPostingsLabel)
}

fetchedPostings, err := r.fetchPostings(ctx, keys, bytesLimiter)
fetchedPostings, closeFns, err := r.fetchPostings(ctx, keys, bytesLimiter)
defer func() {
for _, closeFn := range closeFns {
closeFn()
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
}
}()
if err != nil {
return nil, errors.Wrap(err, "get postings")
}
Expand Down Expand Up @@ -2302,7 +2307,9 @@ type postingPtr struct {
// fetchPostings fill postings requested by posting groups.
// It returns one postings for each key, in the same order.
// If postings for given key is not fetched, entry at given index will be nil.
func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Label, bytesLimiter BytesLimiter) ([]index.Postings, error) {
func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Label, bytesLimiter BytesLimiter) ([]index.Postings, []func(), error) {
var closeFns []func()

timer := prometheus.NewTimer(r.block.metrics.postingsFetchDuration)
defer timer.ObserveDuration()

Expand All @@ -2314,7 +2321,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
fromCache, _ := r.block.indexCache.FetchMultiPostings(ctx, r.block.meta.ULID, keys)
for _, dataFromCache := range fromCache {
if err := bytesLimiter.Reserve(uint64(len(dataFromCache))); err != nil {
return nil, errors.Wrap(err, "bytes limit exceeded while loading postings from index cache")
return nil, closeFns, errors.Wrap(err, "bytes limit exceeded while loading postings from index cache")
}
}

Expand All @@ -2335,18 +2342,21 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
)
if isDiffVarintSnappyEncodedPostings(b) {
s := time.Now()
l, err = diffVarintSnappyDecode(b)
clPostings, err := diffVarintSnappyDecode(b)
r.stats.cachedPostingsDecompressions += 1
r.stats.CachedPostingsDecompressionTimeSum += time.Since(s)
if err != nil {
r.stats.cachedPostingsDecompressionErrors += 1
} else {
closeFns = append(closeFns, clPostings.close)
l = clPostings
}
} else {
_, l, err = r.dec.Postings(b)
}

if err != nil {
return nil, errors.Wrap(err, "decode postings")
return nil, closeFns, errors.Wrap(err, "decode postings")
}

output[ix] = l
Expand All @@ -2362,7 +2372,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
}

if err != nil {
return nil, errors.Wrap(err, "index header PostingsOffset")
return nil, closeFns, errors.Wrap(err, "index header PostingsOffset")
}

r.stats.postingsToFetch++
Expand All @@ -2384,7 +2394,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
length := int64(part.End) - start

if err := bytesLimiter.Reserve(uint64(length)); err != nil {
return nil, errors.Wrap(err, "bytes limit exceeded while fetching postings")
return nil, closeFns, errors.Wrap(err, "bytes limit exceeded while fetching postings")
}
}

Expand Down Expand Up @@ -2462,7 +2472,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
})
}

return output, g.Wait()
return output, closeFns, g.Wait()
}

func resizePostings(b []byte) ([]byte, error) {
Expand Down
49 changes: 42 additions & 7 deletions pkg/store/postings_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ package store

import (
"bytes"
"sync"

"github.com/golang/snappy"
"github.com/klauspost/compress/s2"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/encoding"
Expand Down Expand Up @@ -82,27 +84,60 @@ func diffVarintEncodeNoHeader(p index.Postings, length int) ([]byte, error) {
return buf.B, nil
}

func diffVarintSnappyDecode(input []byte) (index.Postings, error) {
var snappyDecodePool sync.Pool
saswatamcode marked this conversation as resolved.
Show resolved Hide resolved

type closeablePostings interface {
index.Postings
close()
}

// alias returns true if given slices have the same both backing array.
// See: https://groups.google.com/g/golang-nuts/c/C6ufGl73Uzk.
func alias(x, y []byte) bool {
return cap(x) > 0 && cap(y) > 0 && &x[0:cap(x)][cap(x)-1] == &y[0:cap(y)][cap(y)-1]
}

func diffVarintSnappyDecode(input []byte) (closeablePostings, error) {
if !isDiffVarintSnappyEncodedPostings(input) {
return nil, errors.New("header not found")
}

raw, err := snappy.Decode(nil, input[len(codecHeaderSnappy):])
toFree := make([][]byte, 0, 2)

var dstBuf []byte
decodeBuf := snappyDecodePool.Get()
if decodeBuf != nil {
dstBuf = *(decodeBuf.(*[]byte))
toFree = append(toFree, dstBuf)
}

raw, err := s2.Decode(dstBuf, input[len(codecHeaderSnappy):])
if err != nil {
return nil, errors.Wrap(err, "snappy decode")
}

return newDiffVarintPostings(raw), nil
if !alias(raw, dstBuf) {
toFree = append(toFree, raw)
}

return newDiffVarintPostings(raw, toFree), nil
}

func newDiffVarintPostings(input []byte) *diffVarintPostings {
return &diffVarintPostings{buf: &encoding.Decbuf{B: input}}
func newDiffVarintPostings(input []byte, freeSlices [][]byte) *diffVarintPostings {
return &diffVarintPostings{freeSlices: freeSlices, buf: &encoding.Decbuf{B: input}}
}

// diffVarintPostings is an implementation of index.Postings based on diff+varint encoded data.
type diffVarintPostings struct {
buf *encoding.Decbuf
cur storage.SeriesRef
buf *encoding.Decbuf
cur storage.SeriesRef
freeSlices [][]byte
}

func (it *diffVarintPostings) close() {
for i := range it.freeSlices {
snappyDecodePool.Put(&it.freeSlices[i])
}
}

func (it *diffVarintPostings) At() storage.SeriesRef {
Expand Down
4 changes: 2 additions & 2 deletions pkg/store/postings_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ func TestDiffVarintCodec(t *testing.T) {

codecs := map[string]struct {
codingFunction func(index.Postings, int) ([]byte, error)
decodingFunction func([]byte) (index.Postings, error)
decodingFunction func([]byte) (closeablePostings, error)
}{
"raw": {codingFunction: diffVarintEncodeNoHeader, decodingFunction: func(bytes []byte) (index.Postings, error) { return newDiffVarintPostings(bytes), nil }},
"raw": {codingFunction: diffVarintEncodeNoHeader, decodingFunction: func(bytes []byte) (closeablePostings, error) { return newDiffVarintPostings(bytes, nil), nil }},
"snappy": {codingFunction: diffVarintSnappyEncode, decodingFunction: diffVarintSnappyDecode},
}

Expand Down