Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: add streamed postings reading #6340

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 20 additions & 123 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
@@ -1326,7 +1326,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
s.metrics.cachedPostingsCompressions.WithLabelValues(labelDecode).Add(float64(stats.cachedPostingsDecompressions))
s.metrics.cachedPostingsCompressionErrors.WithLabelValues(labelEncode).Add(float64(stats.cachedPostingsCompressionErrors))
s.metrics.cachedPostingsCompressionErrors.WithLabelValues(labelDecode).Add(float64(stats.cachedPostingsDecompressionErrors))
s.metrics.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelEncode).Add(stats.CachedPostingsCompressionTimeSum.Seconds())
s.metrics.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelDecode).Add(stats.CachedPostingsDecompressionTimeSum.Seconds())
s.metrics.cachedPostingsOriginalSizeBytes.Add(float64(stats.CachedPostingsOriginalSizeSum))
s.metrics.cachedPostingsCompressedSizeBytes.Add(float64(stats.CachedPostingsCompressedSizeSum))
@@ -2416,142 +2415,42 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab

// Fetch from object storage concurrently and update stats and posting list.
g.Go(func() error {
begin := time.Now()
for _, p := range ptrs[i:j] {
ir, err := r.block.bkt.GetRange(ctx, r.block.indexFilename(), p.ptr.Start, p.ptr.End-p.ptr.Start)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That means we have to send multiple requests to objstore while current logic is sending once per part?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we able to still send 1 request per part and create posting reader from the get range reader?
If this is not doable, I feel it is better to maybe just download postings to disk

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe more get range requests won't impact performance, hopefully we can have some datapoints to understand the impact.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could add this under a feature flag? if it is enabled then we would send multiple requests which would mean bigger costs if using some SaaS that charges per-request but we would get constant RAM usage

if err != nil {
return errors.Wrap(err, "get range reader")
}

b, err := r.block.readIndexRange(ctx, start, length)
if err != nil {
return errors.Wrap(err, "read postings range")
pr, err := newStreamedPostingsReader(
ir,
r.block.meta.ULID,
keys[p.keyID],
r.block.indexCache,
r.stats,
&r.mtx,
)
if err != nil {
return errors.Wrap(err, "creating postings reader")
}
closeFns = append(closeFns, func() {
runutil.CloseWithLogOnErr(r.block.logger, pr, "fetchPostings close streamed reader")
})
output[p.keyID] = pr
}
fetchTime := time.Since(begin)

r.mtx.Lock()
r.stats.postingsFetchCount++
r.stats.postingsFetched += j - i
r.stats.PostingsFetchDurationSum += fetchTime
r.stats.PostingsFetchedSizeSum += units.Base2Bytes(int(length))
r.mtx.Unlock()

for _, p := range ptrs[i:j] {
// index-header can estimate endings, which means we need to resize the endings.
pBytes, err := resizePostings(b[p.ptr.Start-start : p.ptr.End-start])
if err != nil {
return err
}

dataToCache := pBytes

compressionTime := time.Duration(0)
compressions, compressionErrors, compressedSize := 0, 0, 0

// Reencode postings before storing to cache. If that fails, we store original bytes.
// This can only fail, if postings data was somehow corrupted,
// and there is nothing we can do about it.
// Errors from corrupted postings will be reported when postings are used.
compressions++
s := time.Now()
bep := newBigEndianPostings(pBytes[4:])
data, err := diffVarintSnappyStreamedEncode(bep, bep.length())
compressionTime = time.Since(s)
if err == nil {
dataToCache = data
compressedSize = len(data)
} else {
compressionErrors = 1
}

r.mtx.Lock()
// Return postings and fill LRU cache.
// Truncate first 4 bytes which are length of posting.
output[p.keyID] = newBigEndianPostings(pBytes[4:])

r.block.indexCache.StorePostings(r.block.meta.ULID, keys[p.keyID], dataToCache)

// If we just fetched it we still have to update the stats for touched postings.
r.stats.postingsTouched++
r.stats.PostingsTouchedSizeSum += units.Base2Bytes(len(pBytes))
r.stats.cachedPostingsCompressions += compressions
r.stats.cachedPostingsCompressionErrors += compressionErrors
r.stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(len(pBytes))
r.stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(compressedSize)
r.stats.CachedPostingsCompressionTimeSum += compressionTime
r.mtx.Unlock()
}
return nil
})
}

return output, closeFns, g.Wait()
}

func resizePostings(b []byte) ([]byte, error) {
d := encoding.Decbuf{B: b}
n := d.Be32int()
if d.Err() != nil {
return nil, errors.Wrap(d.Err(), "read postings list")
}

// 4 for postings number of entries, then 4, foreach each big endian posting.
size := 4 + n*4
if len(b) < size {
return nil, encoding.ErrInvalidSize
}
return b[:size], nil
}

// bigEndianPostings implements the Postings interface over a byte stream of
// big endian numbers.
type bigEndianPostings struct {
list []byte
cur uint32
}

// TODO(bwplotka): Expose those inside Prometheus.
func newBigEndianPostings(list []byte) *bigEndianPostings {
return &bigEndianPostings{list: list}
}

func (it *bigEndianPostings) At() storage.SeriesRef {
return storage.SeriesRef(it.cur)
}

func (it *bigEndianPostings) Next() bool {
if len(it.list) >= 4 {
it.cur = binary.BigEndian.Uint32(it.list)
it.list = it.list[4:]
return true
}
return false
}

func (it *bigEndianPostings) Seek(x storage.SeriesRef) bool {
if storage.SeriesRef(it.cur) >= x {
return true
}

num := len(it.list) / 4
// Do binary search between current position and end.
i := sort.Search(num, func(i int) bool {
return binary.BigEndian.Uint32(it.list[i*4:]) >= uint32(x)
})
if i < num {
j := i * 4
it.cur = binary.BigEndian.Uint32(it.list[j:])
it.list = it.list[j+4:]
return true
}
it.list = nil
return false
}

func (it *bigEndianPostings) Err() error {
return nil
}

// Returns number of remaining postings values.
func (it *bigEndianPostings) length() int {
return len(it.list) / 4
}

func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.SeriesRef, bytesLimiter BytesLimiter) error {
timer := prometheus.NewTimer(r.block.metrics.seriesFetchDuration)
defer timer.ObserveDuration()
@@ -3062,7 +2961,6 @@ type queryStats struct {
cachedPostingsCompressionErrors int
CachedPostingsOriginalSizeSum units.Base2Bytes
CachedPostingsCompressedSizeSum units.Base2Bytes
CachedPostingsCompressionTimeSum time.Duration
cachedPostingsDecompressions int
cachedPostingsDecompressionErrors int
CachedPostingsDecompressionTimeSum time.Duration
@@ -3101,7 +2999,6 @@ func (s queryStats) merge(o *queryStats) *queryStats {
s.cachedPostingsCompressionErrors += o.cachedPostingsCompressionErrors
s.CachedPostingsOriginalSizeSum += o.CachedPostingsOriginalSizeSum
s.CachedPostingsCompressedSizeSum += o.CachedPostingsCompressedSizeSum
s.CachedPostingsCompressionTimeSum += o.CachedPostingsCompressionTimeSum
s.cachedPostingsDecompressions += o.cachedPostingsDecompressions
s.cachedPostingsDecompressionErrors += o.cachedPostingsDecompressionErrors
s.CachedPostingsDecompressionTimeSum += o.CachedPostingsDecompressionTimeSum
19 changes: 0 additions & 19 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
@@ -6,7 +6,6 @@ package store
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
"math"
@@ -1889,24 +1888,6 @@ func mustMarshalAny(pb proto.Message) *types.Any {
return out
}

func TestBigEndianPostingsCount(t *testing.T) {
const count = 1000
raw := make([]byte, count*4)

for ix := 0; ix < count; ix++ {
binary.BigEndian.PutUint32(raw[4*ix:], rand.Uint32())
}

p := newBigEndianPostings(raw)
testutil.Equals(t, count, p.length())

c := 0
for p.Next() {
c++
}
testutil.Equals(t, count, c)
}

func createBlockWithOneSeriesWithStep(t testutil.TB, dir string, lbls labels.Labels, blockIndex, totalSamples int, random *rand.Rand, step int64) ulid.ULID {
headOpts := tsdb.DefaultHeadOptions()
headOpts.ChunkDirRoot = dir
Loading