diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 6726991f7c5..05e82607e3d 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -2162,7 +2162,12 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M keys = append(keys, allPostingsLabel) } - fetchedPostings, err := r.fetchPostings(ctx, keys, bytesLimiter) + fetchedPostings, closeFns, err := r.fetchPostings(ctx, keys, bytesLimiter) + defer func() { + for _, closeFn := range closeFns { + closeFn() + } + }() if err != nil { return nil, errors.Wrap(err, "get postings") } @@ -2302,7 +2307,9 @@ type postingPtr struct { // fetchPostings fill postings requested by posting groups. // It returns one postings for each key, in the same order. // If postings for given key is not fetched, entry at given index will be nil. -func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Label, bytesLimiter BytesLimiter) ([]index.Postings, error) { +func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Label, bytesLimiter BytesLimiter) ([]index.Postings, []func(), error) { + var closeFns []func() + timer := prometheus.NewTimer(r.block.metrics.postingsFetchDuration) defer timer.ObserveDuration() @@ -2314,7 +2321,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab fromCache, _ := r.block.indexCache.FetchMultiPostings(ctx, r.block.meta.ULID, keys) for _, dataFromCache := range fromCache { if err := bytesLimiter.Reserve(uint64(len(dataFromCache))); err != nil { - return nil, errors.Wrap(err, "bytes limit exceeded while loading postings from index cache") + return nil, closeFns, errors.Wrap(err, "bytes limit exceeded while loading postings from index cache") } } @@ -2335,18 +2342,21 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab ) if isDiffVarintSnappyEncodedPostings(b) { s := time.Now() - l, err = diffVarintSnappyDecode(b) + clPostings, err := diffVarintSnappyDecode(b) r.stats.cachedPostingsDecompressions += 1 r.stats.CachedPostingsDecompressionTimeSum += time.Since(s) if err != nil { r.stats.cachedPostingsDecompressionErrors += 1 + } else { + closeFns = append(closeFns, clPostings.close) + l = clPostings } } else { _, l, err = r.dec.Postings(b) } if err != nil { - return nil, errors.Wrap(err, "decode postings") + return nil, closeFns, errors.Wrap(err, "decode postings") } output[ix] = l @@ -2362,7 +2372,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab } if err != nil { - return nil, errors.Wrap(err, "index header PostingsOffset") + return nil, closeFns, errors.Wrap(err, "index header PostingsOffset") } r.stats.postingsToFetch++ @@ -2384,7 +2394,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab length := int64(part.End) - start if err := bytesLimiter.Reserve(uint64(length)); err != nil { - return nil, errors.Wrap(err, "bytes limit exceeded while fetching postings") + return nil, closeFns, errors.Wrap(err, "bytes limit exceeded while fetching postings") } } @@ -2462,7 +2472,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab }) } - return output, g.Wait() + return output, closeFns, g.Wait() } func resizePostings(b []byte) ([]byte, error) { diff --git a/pkg/store/postings_codec.go b/pkg/store/postings_codec.go index 3920049227c..f60fe7c1b26 100644 --- a/pkg/store/postings_codec.go +++ b/pkg/store/postings_codec.go @@ -5,8 +5,10 @@ package store import ( "bytes" + "sync" "github.com/golang/snappy" + "github.com/klauspost/compress/s2" "github.com/pkg/errors" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/encoding" @@ -82,27 +84,60 @@ func diffVarintEncodeNoHeader(p index.Postings, length int) ([]byte, error) { return buf.B, nil } -func diffVarintSnappyDecode(input []byte) (index.Postings, error) { +var snappyDecodePool sync.Pool + +type closeablePostings interface { + index.Postings + close() +} + +// alias returns true if given slices have the same both backing array. +// See: https://groups.google.com/g/golang-nuts/c/C6ufGl73Uzk. +func alias(x, y []byte) bool { + return cap(x) > 0 && cap(y) > 0 && &x[0:cap(x)][cap(x)-1] == &y[0:cap(y)][cap(y)-1] +} + +func diffVarintSnappyDecode(input []byte) (closeablePostings, error) { if !isDiffVarintSnappyEncodedPostings(input) { return nil, errors.New("header not found") } - raw, err := snappy.Decode(nil, input[len(codecHeaderSnappy):]) + toFree := make([][]byte, 0, 2) + + var dstBuf []byte + decodeBuf := snappyDecodePool.Get() + if decodeBuf != nil { + dstBuf = *(decodeBuf.(*[]byte)) + toFree = append(toFree, dstBuf) + } + + raw, err := s2.Decode(dstBuf, input[len(codecHeaderSnappy):]) if err != nil { return nil, errors.Wrap(err, "snappy decode") } - return newDiffVarintPostings(raw), nil + if !alias(raw, dstBuf) { + toFree = append(toFree, raw) + } + + return newDiffVarintPostings(raw, toFree), nil } -func newDiffVarintPostings(input []byte) *diffVarintPostings { - return &diffVarintPostings{buf: &encoding.Decbuf{B: input}} +func newDiffVarintPostings(input []byte, freeSlices [][]byte) *diffVarintPostings { + return &diffVarintPostings{freeSlices: freeSlices, buf: &encoding.Decbuf{B: input}} } // diffVarintPostings is an implementation of index.Postings based on diff+varint encoded data. type diffVarintPostings struct { - buf *encoding.Decbuf - cur storage.SeriesRef + buf *encoding.Decbuf + cur storage.SeriesRef + freeSlices [][]byte +} + +func (it *diffVarintPostings) close() { + for i := range it.freeSlices { + snappyDecodePool.Put(&it.freeSlices[i]) + } } func (it *diffVarintPostings) At() storage.SeriesRef { diff --git a/pkg/store/postings_codec_test.go b/pkg/store/postings_codec_test.go index be5cce4f915..8ac86008b5f 100644 --- a/pkg/store/postings_codec_test.go +++ b/pkg/store/postings_codec_test.go @@ -55,9 +55,9 @@ func TestDiffVarintCodec(t *testing.T) { codecs := map[string]struct { codingFunction func(index.Postings, int) ([]byte, error) - decodingFunction func([]byte) (index.Postings, error) + decodingFunction func([]byte) (closeablePostings, error) }{ - "raw": {codingFunction: diffVarintEncodeNoHeader, decodingFunction: func(bytes []byte) (index.Postings, error) { return newDiffVarintPostings(bytes), nil }}, + "raw": {codingFunction: diffVarintEncodeNoHeader, decodingFunction: func(bytes []byte) (closeablePostings, error) { return newDiffVarintPostings(bytes, nil), nil }}, "snappy": {codingFunction: diffVarintSnappyEncode, decodingFunction: diffVarintSnappyDecode}, }