diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index ec284fc8eec..02c538b4f5d 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -2486,6 +2486,12 @@ func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, return true, ps, nil } +var bufioReaderPool = sync.Pool{ + New: func() any { + return bufio.NewReader(nil) + }, +} + // fetchPostings fill postings requested by posting groups. // It returns one posting for each key, in the same order. // If postings for given key is not fetched, entry at given index will be nil. @@ -2574,12 +2580,17 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab g.Go(func() error { begin := time.Now() - b, err := r.block.readIndexRange(ctx, start, length) + partReader, err := r.block.bkt.GetRange(ctx, r.block.indexFilename(), start, length) if err != nil { return errors.Wrap(err, "read postings range") } + defer runutil.CloseWithLogOnErr(r.block.logger, partReader, "readIndexRange close range reader") fetchTime := time.Since(begin) + brdr := bufioReaderPool.Get().(*bufio.Reader) + brdr.Reset(partReader) + rdr := newPostingsReaderBuilder(ctx, brdr, ptrs[i:j], start, length) + r.mtx.Lock() r.stats.postingsFetchCount++ r.stats.postingsFetched += j - i @@ -2587,35 +2598,20 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab r.stats.PostingsFetchedSizeSum += units.Base2Bytes(int(length)) r.mtx.Unlock() - for _, p := range ptrs[i:j] { - // index-header can estimate endings, which means we need to resize the endings. - pBytes, err := resizePostings(b[p.ptr.Start-start : p.ptr.End-start]) + for rdr.Next() { + deltaEncodedPostings, postingsCount, keyID := rdr.AtDiffVarint() + output[keyID] = newDiffVarintPostings(deltaEncodedPostings, nil) + + dataToCache, err := snappyStreamedEncode(int(postingsCount), deltaEncodedPostings) if err != nil { - return err + return errors.Wrap(err, "encoding with snappy") } - // Reencode postings before storing to cache. If that fails, we store original bytes. - // This can only fail, if postings data was somehow corrupted, - // and there is nothing we can do about it. - // Errors from corrupted postings will be reported when postings are used. - bep := newBigEndianPostings(pBytes[4:]) - dataToCache, compressionTime, compressionErrors, compressedSize := r.encodePostingsToCache(bep, bep.length()) - r.mtx.Lock() - // Return postings and fill LRU cache. - // Truncate first 4 bytes which are length of posting. - output[p.keyID] = newBigEndianPostings(pBytes[4:]) - - r.block.indexCache.StorePostings(r.block.meta.ULID, keys[p.keyID], dataToCache) - - // If we just fetched it we still have to update the stats for touched postings. - r.stats.postingsTouched++ - r.stats.PostingsTouchedSizeSum += units.Base2Bytes(len(pBytes)) - r.stats.cachedPostingsCompressions += 1 - r.stats.cachedPostingsCompressionErrors += compressionErrors - r.stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(len(pBytes)) - r.stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(compressedSize) - r.stats.CachedPostingsCompressionTimeSum += compressionTime - r.mtx.Unlock() + r.block.indexCache.StorePostings(r.block.meta.ULID, keys[keyID], dataToCache) + } + + if rdr.Error() != nil { + return errors.Wrap(err, "reading postings") } return nil }) diff --git a/pkg/store/postings.go b/pkg/store/postings.go new file mode 100644 index 00000000000..279f2ac53f8 --- /dev/null +++ b/pkg/store/postings.go @@ -0,0 +1,143 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package store + +import ( + "bufio" + "context" + "encoding/binary" + "fmt" + "io" + + "github.com/pkg/errors" +) + +type postingsReaderBuilder struct { + e error + readBuf []byte + + r *bufio.Reader + postings []postingPtr + + lastOffset int64 + pi int + err error + + start, length int64 + cur []byte + keyID int + repeatFor int + numberOfPostingsInCur uint64 + uvarintEncodeBuf []byte + ctx context.Context +} + +// newPostingsReaderBuilder is a builder that reads directly from the index +// and builds a diff varint encoded []byte that could be later used directly. +func newPostingsReaderBuilder(ctx context.Context, r *bufio.Reader, postings []postingPtr, start, length int64) *postingsReaderBuilder { + prb := &postingsReaderBuilder{ + r: r, + readBuf: make([]byte, 4), + start: start, + length: length, + postings: postings, + uvarintEncodeBuf: make([]byte, binary.MaxVarintLen64), + ctx: ctx, + } + + return prb +} + +func getInt32(r io.Reader, buf []byte) (uint32, error) { + read, err := r.Read(buf) + if err != nil { + return 0, errors.Wrap(err, "reading") + } + if read != 4 { + return 0, fmt.Errorf("read got %d bytes instead of 4", read) + } + return binary.BigEndian.Uint32(buf), nil +} + +func (r *postingsReaderBuilder) Next() bool { + if r.ctx.Err() != nil { + r.e = r.ctx.Err() + return false + } + if r.repeatFor > 0 { + r.keyID = r.postings[r.pi-r.repeatFor].keyID + r.repeatFor-- + return true + } + if r.pi >= len(r.postings) { + return false + } + if r.Error() != nil { + return false + } + from := r.postings[r.pi].ptr.Start - r.start + + if from-r.lastOffset < 0 { + panic("would have skipped negative bytes") + } + + _, err := r.r.Discard(int(from - r.lastOffset)) + if err != nil { + return false + } + r.lastOffset += from - r.lastOffset + + postingsCount, err := getInt32(r.r, r.readBuf[:]) + if err != nil { + r.err = err + return false + } + r.lastOffset += 4 + + // Assume 1.25 bytes per compressed posting. + r.cur = make([]byte, 0, int(float64(postingsCount)*1.25)) + + prev := uint32(0) + + for i := 0; i < int(postingsCount); i++ { + posting, err := getInt32(r.r, r.readBuf[:]) + if err != nil { + r.err = err + return false + } + r.lastOffset += 4 + + uvarintSize := binary.PutUvarint(r.uvarintEncodeBuf, uint64(posting-prev)) + r.cur = append(r.cur, r.uvarintEncodeBuf[:uvarintSize]...) + prev = posting + } + r.numberOfPostingsInCur = uint64(postingsCount) + + r.keyID = r.postings[r.pi].keyID + r.pi++ + for { + if r.pi >= len(r.postings) { + break + } + + if r.postings[r.pi].ptr.Start == r.postings[r.pi-1].ptr.Start && + r.postings[r.pi].ptr.End == r.postings[r.pi-1].ptr.End { + r.repeatFor++ + r.pi++ + continue + } + + break + } + + return true +} + +func (r *postingsReaderBuilder) Error() error { + return r.e +} + +func (r *postingsReaderBuilder) AtDiffVarint() ([]byte, uint64, int) { + return r.cur, r.numberOfPostingsInCur, r.keyID +} diff --git a/pkg/store/postings_codec.go b/pkg/store/postings_codec.go index 5e2f0a9cc29..2288bb48f1b 100644 --- a/pkg/store/postings_codec.go +++ b/pkg/store/postings_codec.go @@ -339,3 +339,27 @@ func (it *diffVarintPostings) Seek(x storage.SeriesRef) bool { func (it *diffVarintPostings) Err() error { return it.buf.Err() } + +func snappyStreamedEncode(postingsLength int, diffVarintPostings []byte) ([]byte, error) { + compressedBuf := bytes.NewBuffer(make([]byte, 0, estimateSnappyStreamSize(postingsLength))) + if n, err := compressedBuf.WriteString(codecHeaderStreamedSnappy); err != nil { + return nil, fmt.Errorf("writing streamed snappy header") + } else if n != len(codecHeaderStreamedSnappy) { + return nil, fmt.Errorf("short-write streamed snappy header") + } + + sw, err := extsnappy.Compressor.Compress(compressedBuf) + if err != nil { + return nil, fmt.Errorf("creating snappy compressor: %w", err) + } + + _, err = sw.Write(diffVarintPostings) + if err != nil { + return nil, err + } + if err := sw.Close(); err != nil { + return nil, errors.Wrap(err, "closing snappy stream writer") + } + + return compressedBuf.Bytes(), nil +}