Skip to content

Commit

Permalink
store/bucket: snappy-encoded postings reading improvements (#6245)
Browse files Browse the repository at this point in the history
* store: pool input to snappy.Decode

Pool input to snappy.Decode to avoid allocations.

Signed-off-by: Giedrius Statkevičius <[email protected]>

* store: use s2 for decoding snappy

It's faster hence use it.

Signed-off-by: Giedrius Statkevičius <[email protected]>

* store: small code style adjustment

Signed-off-by: Giedrius Statkevičius <[email protected]>

* store: call closefns before returning err

Signed-off-by: Giedrius Statkevičius <[email protected]>

* store/postings_codec: return both if possible

Signed-off-by: Giedrius Statkevičius <[email protected]>

* store/bucket: always call close fns

Signed-off-by: Giedrius Statkevičius <[email protected]>

---------

Signed-off-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
GiedriusS authored Apr 11, 2023
1 parent 5d5d39a commit 9f09a9a
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 17 deletions.
26 changes: 18 additions & 8 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2162,7 +2162,12 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M
keys = append(keys, allPostingsLabel)
}

fetchedPostings, err := r.fetchPostings(ctx, keys, bytesLimiter)
fetchedPostings, closeFns, err := r.fetchPostings(ctx, keys, bytesLimiter)
defer func() {
for _, closeFn := range closeFns {
closeFn()
}
}()
if err != nil {
return nil, errors.Wrap(err, "get postings")
}
Expand Down Expand Up @@ -2302,7 +2307,9 @@ type postingPtr struct {
// fetchPostings fill postings requested by posting groups.
// It returns one postings for each key, in the same order.
// If postings for given key is not fetched, entry at given index will be nil.
func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Label, bytesLimiter BytesLimiter) ([]index.Postings, error) {
func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Label, bytesLimiter BytesLimiter) ([]index.Postings, []func(), error) {
var closeFns []func()

timer := prometheus.NewTimer(r.block.metrics.postingsFetchDuration)
defer timer.ObserveDuration()

Expand All @@ -2314,7 +2321,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
fromCache, _ := r.block.indexCache.FetchMultiPostings(ctx, r.block.meta.ULID, keys)
for _, dataFromCache := range fromCache {
if err := bytesLimiter.Reserve(uint64(len(dataFromCache))); err != nil {
return nil, errors.Wrap(err, "bytes limit exceeded while loading postings from index cache")
return nil, closeFns, errors.Wrap(err, "bytes limit exceeded while loading postings from index cache")
}
}

Expand All @@ -2335,18 +2342,21 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
)
if isDiffVarintSnappyEncodedPostings(b) {
s := time.Now()
l, err = diffVarintSnappyDecode(b)
clPostings, err := diffVarintSnappyDecode(b)
r.stats.cachedPostingsDecompressions += 1
r.stats.CachedPostingsDecompressionTimeSum += time.Since(s)
if err != nil {
r.stats.cachedPostingsDecompressionErrors += 1
} else {
closeFns = append(closeFns, clPostings.close)
l = clPostings
}
} else {
_, l, err = r.dec.Postings(b)
}

if err != nil {
return nil, errors.Wrap(err, "decode postings")
return nil, closeFns, errors.Wrap(err, "decode postings")
}

output[ix] = l
Expand All @@ -2362,7 +2372,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
}

if err != nil {
return nil, errors.Wrap(err, "index header PostingsOffset")
return nil, closeFns, errors.Wrap(err, "index header PostingsOffset")
}

r.stats.postingsToFetch++
Expand All @@ -2384,7 +2394,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
length := int64(part.End) - start

if err := bytesLimiter.Reserve(uint64(length)); err != nil {
return nil, errors.Wrap(err, "bytes limit exceeded while fetching postings")
return nil, closeFns, errors.Wrap(err, "bytes limit exceeded while fetching postings")
}
}

Expand Down Expand Up @@ -2462,7 +2472,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
})
}

return output, g.Wait()
return output, closeFns, g.Wait()
}

func resizePostings(b []byte) ([]byte, error) {
Expand Down
49 changes: 42 additions & 7 deletions pkg/store/postings_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ package store

import (
"bytes"
"sync"

"github.com/golang/snappy"
"github.com/klauspost/compress/s2"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/encoding"
Expand Down Expand Up @@ -82,27 +84,60 @@ func diffVarintEncodeNoHeader(p index.Postings, length int) ([]byte, error) {
return buf.B, nil
}

func diffVarintSnappyDecode(input []byte) (index.Postings, error) {
var snappyDecodePool sync.Pool

type closeablePostings interface {
index.Postings
close()
}

// alias returns true if given slices have the same both backing array.
// See: https://groups.google.com/g/golang-nuts/c/C6ufGl73Uzk.
func alias(x, y []byte) bool {
return cap(x) > 0 && cap(y) > 0 && &x[0:cap(x)][cap(x)-1] == &y[0:cap(y)][cap(y)-1]
}

func diffVarintSnappyDecode(input []byte) (closeablePostings, error) {
if !isDiffVarintSnappyEncodedPostings(input) {
return nil, errors.New("header not found")
}

raw, err := snappy.Decode(nil, input[len(codecHeaderSnappy):])
toFree := make([][]byte, 0, 2)

var dstBuf []byte
decodeBuf := snappyDecodePool.Get()
if decodeBuf != nil {
dstBuf = *(decodeBuf.(*[]byte))
toFree = append(toFree, dstBuf)
}

raw, err := s2.Decode(dstBuf, input[len(codecHeaderSnappy):])
if err != nil {
return nil, errors.Wrap(err, "snappy decode")
}

return newDiffVarintPostings(raw), nil
if !alias(raw, dstBuf) {
toFree = append(toFree, raw)
}

return newDiffVarintPostings(raw, toFree), nil
}

func newDiffVarintPostings(input []byte) *diffVarintPostings {
return &diffVarintPostings{buf: &encoding.Decbuf{B: input}}
func newDiffVarintPostings(input []byte, freeSlices [][]byte) *diffVarintPostings {
return &diffVarintPostings{freeSlices: freeSlices, buf: &encoding.Decbuf{B: input}}
}

// diffVarintPostings is an implementation of index.Postings based on diff+varint encoded data.
type diffVarintPostings struct {
buf *encoding.Decbuf
cur storage.SeriesRef
buf *encoding.Decbuf
cur storage.SeriesRef
freeSlices [][]byte
}

func (it *diffVarintPostings) close() {
for i := range it.freeSlices {
snappyDecodePool.Put(&it.freeSlices[i])
}
}

func (it *diffVarintPostings) At() storage.SeriesRef {
Expand Down
4 changes: 2 additions & 2 deletions pkg/store/postings_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ func TestDiffVarintCodec(t *testing.T) {

codecs := map[string]struct {
codingFunction func(index.Postings, int) ([]byte, error)
decodingFunction func([]byte) (index.Postings, error)
decodingFunction func([]byte) (closeablePostings, error)
}{
"raw": {codingFunction: diffVarintEncodeNoHeader, decodingFunction: func(bytes []byte) (index.Postings, error) { return newDiffVarintPostings(bytes), nil }},
"raw": {codingFunction: diffVarintEncodeNoHeader, decodingFunction: func(bytes []byte) (closeablePostings, error) { return newDiffVarintPostings(bytes, nil), nil }},
"snappy": {codingFunction: diffVarintSnappyEncode, decodingFunction: diffVarintSnappyDecode},
}

Expand Down

0 comments on commit 9f09a9a

Please sign in to comment.