diff --git a/.bingo/promdoc.sum b/.bingo/promdoc.sum index 7ecaa4580fa..00adfe7641c 100644 --- a/.bingo/promdoc.sum +++ b/.bingo/promdoc.sum @@ -1,18 +1,34 @@ github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI= +github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= +github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/magiconair/properties v1.8.5 h1:b6kJs+EmPFMYGkow9GiUyCyOvIwYetYJ3fSaWak/Gls= +github.com/magiconair/properties v1.8.5/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60= github.com/mitchellh/mapstructure v1.4.2 h1:6h7AQ0yhTcIsmFmnAwQls75jp2Gzs4iB8W7pjMO+rqo= +github.com/mitchellh/mapstructure v1.4.2/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/pelletier/go-toml v1.9.4 h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM= +github.com/pelletier/go-toml v1.9.4/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= github.com/plexsystems/promdoc v0.8.0 h1:mNAp+WQkb2yZV5m7PeybHFTPYz+4pbaMCaH8iPLOMog= github.com/plexsystems/promdoc v0.8.0/go.mod h1:CoTbHLEVPziXN+Y4GozwsiLvgdJqdOBYywqUy40sYuI= github.com/spf13/afero v1.6.0 h1:xoax2sJ2DT8S8xA2paPFjDCScCNeWsg75VG0DLRreiY= +github.com/spf13/afero v1.6.0/go.mod h1:Ai8FlHk4v/PARR026UzYexafAt9roJ7LcLMAmO6Z93I= github.com/spf13/cast v1.4.1 h1:s0hze+J0196ZfEMTs80N7UlFt0BDuQ7Q+JDnHiMWKdA= +github.com/spf13/cast v1.4.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cobra v1.2.1 h1:+KmjbUw1hriSNMF55oPrkZcb27aECyrj8V2ytv7kWDw= +github.com/spf13/cobra v1.2.1/go.mod h1:ExllRjgxM/piMAM+3tAZvg8fsklGAf3tPfi+i8t68Nk= github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmqnqMYADFk= +github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0EXowPYD95IqWIGo= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.9.0 h1:yR6EXjTp0y0cLN8OZg1CRZmOBdI88UcGkhgyJhu6nZk= +github.com/spf13/viper v1.9.0/go.mod h1:+i6ajR7OX2XaiBkrcZJFK21htRk7eDeLg7+O6bhUPP4= github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= +github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf h1:2ucpDCmfkl8Bd/FsLtiD653Wf96cW37s+iGx93zsu4k= +golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= gopkg.in/ini.v1 v1.63.2 h1:tGK/CyBg7SMzb60vP1M03vNZ3VDu3wGQJwn7Sxi9r3c= +gopkg.in/ini.v1 v1.63.2/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index ec284fc8eec..b06c35518c4 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -2486,6 +2486,12 @@ func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, return true, ps, nil } +var bufioReaderPool = sync.Pool{ + New: func() any { + return bufio.NewReader(nil) + }, +} + // fetchPostings fill postings requested by posting groups. // It returns one posting for each key, in the same order. // If postings for given key is not fetched, entry at given index will be nil. @@ -2570,52 +2576,60 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab // We assume index does not have any ptrs that has 0 length. length := int64(part.End) - start + brdr := bufioReaderPool.Get().(*bufio.Reader) + defer bufioReaderPool.Put(brdr) + // Fetch from object storage concurrently and update stats and posting list. g.Go(func() error { begin := time.Now() - b, err := r.block.readIndexRange(ctx, start, length) + partReader, err := r.block.bkt.GetRange(ctx, r.block.indexFilename(), start, length) if err != nil { return errors.Wrap(err, "read postings range") } - fetchTime := time.Since(begin) + defer runutil.CloseWithLogOnErr(r.block.logger, partReader, "readIndexRange close range reader") + brdr.Reset(partReader) + + rdr := newPostingsReaderBuilder(ctx, brdr, ptrs[i:j], start, length) r.mtx.Lock() r.stats.postingsFetchCount++ r.stats.postingsFetched += j - i - r.stats.PostingsFetchDurationSum += fetchTime r.stats.PostingsFetchedSizeSum += units.Base2Bytes(int(length)) r.mtx.Unlock() - for _, p := range ptrs[i:j] { - // index-header can estimate endings, which means we need to resize the endings. - pBytes, err := resizePostings(b[p.ptr.Start-start : p.ptr.End-start]) + for rdr.Next() { + diffVarintPostings, postingsCount, keyID := rdr.AtDiffVarint() + + output[keyID] = newDiffVarintPostings(diffVarintPostings, nil) + + startCompression := time.Now() + dataToCache, err := snappyStreamedEncode(int(postingsCount), diffVarintPostings) if err != nil { - return err + r.mtx.Lock() + r.stats.cachedPostingsCompressionErrors += 1 + r.mtx.Unlock() + return errors.Wrap(err, "encoding with snappy") } - // Reencode postings before storing to cache. If that fails, we store original bytes. - // This can only fail, if postings data was somehow corrupted, - // and there is nothing we can do about it. - // Errors from corrupted postings will be reported when postings are used. - bep := newBigEndianPostings(pBytes[4:]) - dataToCache, compressionTime, compressionErrors, compressedSize := r.encodePostingsToCache(bep, bep.length()) r.mtx.Lock() - // Return postings and fill LRU cache. - // Truncate first 4 bytes which are length of posting. - output[p.keyID] = newBigEndianPostings(pBytes[4:]) - - r.block.indexCache.StorePostings(r.block.meta.ULID, keys[p.keyID], dataToCache) - - // If we just fetched it we still have to update the stats for touched postings. r.stats.postingsTouched++ - r.stats.PostingsTouchedSizeSum += units.Base2Bytes(len(pBytes)) + r.stats.PostingsTouchedSizeSum += units.Base2Bytes(int(len(diffVarintPostings))) r.stats.cachedPostingsCompressions += 1 - r.stats.cachedPostingsCompressionErrors += compressionErrors - r.stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(len(pBytes)) - r.stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(compressedSize) - r.stats.CachedPostingsCompressionTimeSum += compressionTime + r.stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(len(diffVarintPostings)) + r.stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(len(dataToCache)) + r.stats.CachedPostingsCompressionTimeSum += time.Since(startCompression) r.mtx.Unlock() + + r.block.indexCache.StorePostings(r.block.meta.ULID, keys[keyID], dataToCache) + } + + r.mtx.Lock() + r.stats.PostingsFetchDurationSum += time.Since(begin) + r.mtx.Unlock() + + if rdr.Error() != nil { + return errors.Wrap(err, "reading postings") } return nil }) @@ -2665,21 +2679,6 @@ func (r *bucketIndexReader) encodePostingsToCache(p index.Postings, length int) return dataToCache, compressionTime, compressionErrors, compressedSize } -func resizePostings(b []byte) ([]byte, error) { - d := encoding.Decbuf{B: b} - n := d.Be32int() - if d.Err() != nil { - return nil, errors.Wrap(d.Err(), "read postings list") - } - - // 4 for postings number of entries, then 4, foreach each big endian posting. - size := 4 + n*4 - if len(b) < size { - return nil, encoding.ErrInvalidSize - } - return b[:size], nil -} - // bigEndianPostings implements the Postings interface over a byte stream of // big endian numbers. type bigEndianPostings struct { diff --git a/pkg/store/postings.go b/pkg/store/postings.go new file mode 100644 index 00000000000..066f52116c2 --- /dev/null +++ b/pkg/store/postings.go @@ -0,0 +1,142 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package store + +import ( + "bufio" + "context" + "encoding/binary" + "fmt" + "io" + + "github.com/pkg/errors" +) + +type postingsReaderBuilder struct { + e error + readBuf []byte + + r *bufio.Reader + postings []postingPtr + + lastOffset int64 + pi int + + start, length int64 + cur []byte + keyID int + repeatFor int + numberOfPostingsInCur uint64 + uvarintEncodeBuf []byte + ctx context.Context +} + +// newPostingsReaderBuilder is a builder that reads directly from the index +// and builds a diff varint encoded []byte that could be later used directly. +func newPostingsReaderBuilder(ctx context.Context, r *bufio.Reader, postings []postingPtr, start, length int64) *postingsReaderBuilder { + prb := &postingsReaderBuilder{ + r: r, + readBuf: make([]byte, 4), + start: start, + length: length, + postings: postings, + uvarintEncodeBuf: make([]byte, binary.MaxVarintLen64), + ctx: ctx, + } + + return prb +} + +func getInt32(r io.Reader, buf []byte) (uint32, error) { + read, err := r.Read(buf) + if err != nil { + return 0, errors.Wrap(err, "reading") + } + if read != 4 { + return 0, fmt.Errorf("read got %d bytes instead of 4", read) + } + return binary.BigEndian.Uint32(buf), nil +} + +func (r *postingsReaderBuilder) Next() bool { + if r.ctx.Err() != nil { + r.e = r.ctx.Err() + return false + } + if r.repeatFor > 0 { + r.keyID = r.postings[r.pi-r.repeatFor].keyID + r.repeatFor-- + return true + } + if r.pi >= len(r.postings) { + return false + } + if r.Error() != nil { + return false + } + from := r.postings[r.pi].ptr.Start - r.start + + if from-r.lastOffset < 0 { + panic("would have skipped negative bytes") + } + + _, err := r.r.Discard(int(from - r.lastOffset)) + if err != nil { + return false + } + r.lastOffset += from - r.lastOffset + + postingsCount, err := getInt32(r.r, r.readBuf[:]) + if err != nil { + r.e = err + return false + } + r.lastOffset += 4 + + // Assume 1.25 bytes per compressed posting. + r.cur = make([]byte, 0, int(float64(postingsCount)*1.25)) + + prev := uint32(0) + + for i := 0; i < int(postingsCount); i++ { + posting, err := getInt32(r.r, r.readBuf[:]) + if err != nil { + r.e = err + return false + } + r.lastOffset += 4 + + uvarintSize := binary.PutUvarint(r.uvarintEncodeBuf, uint64(posting-prev)) + r.cur = append(r.cur, r.uvarintEncodeBuf[:uvarintSize]...) + prev = posting + } + r.numberOfPostingsInCur = uint64(postingsCount) + + r.keyID = r.postings[r.pi].keyID + r.pi++ + for { + if r.pi >= len(r.postings) { + break + } + + if r.postings[r.pi].ptr.Start == r.postings[r.pi-1].ptr.Start && + r.postings[r.pi].ptr.End == r.postings[r.pi-1].ptr.End { + r.repeatFor++ + r.pi++ + continue + } + + break + } + + return true +} + +func (r *postingsReaderBuilder) Error() error { + return r.e +} + +func (r *postingsReaderBuilder) AtDiffVarint() ([]byte, uint64, int) { + return r.cur, r.numberOfPostingsInCur, r.keyID +} diff --git a/pkg/store/postings_codec.go b/pkg/store/postings_codec.go index 5e2f0a9cc29..2288bb48f1b 100644 --- a/pkg/store/postings_codec.go +++ b/pkg/store/postings_codec.go @@ -339,3 +339,27 @@ func (it *diffVarintPostings) Seek(x storage.SeriesRef) bool { func (it *diffVarintPostings) Err() error { return it.buf.Err() } + +func snappyStreamedEncode(postingsLength int, diffVarintPostings []byte) ([]byte, error) { + compressedBuf := bytes.NewBuffer(make([]byte, 0, estimateSnappyStreamSize(postingsLength))) + if n, err := compressedBuf.WriteString(codecHeaderStreamedSnappy); err != nil { + return nil, fmt.Errorf("writing streamed snappy header") + } else if n != len(codecHeaderStreamedSnappy) { + return nil, fmt.Errorf("short-write streamed snappy header") + } + + sw, err := extsnappy.Compressor.Compress(compressedBuf) + if err != nil { + return nil, fmt.Errorf("creating snappy compressor: %w", err) + } + + _, err = sw.Write(diffVarintPostings) + if err != nil { + return nil, err + } + if err := sw.Close(); err != nil { + return nil, errors.Wrap(err, "closing snappy stream writer") + } + + return compressedBuf.Bytes(), nil +}