Skip to content

Commit

Permalink
Revert "store: read postings directly into delta encoded format (than…
Browse files Browse the repository at this point in the history
…os-io#6442)"

This reverts commit d34f5d9.
  • Loading branch information
MichaHoffmann committed Aug 29, 2023
1 parent ea67282 commit ddaae70
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 49 deletions.
16 changes: 0 additions & 16 deletions .bingo/promdoc.sum
Original file line number Diff line number Diff line change
@@ -1,34 +1,18 @@
github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI=
github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/magiconair/properties v1.8.5 h1:b6kJs+EmPFMYGkow9GiUyCyOvIwYetYJ3fSaWak/Gls=
github.com/magiconair/properties v1.8.5/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60=
github.com/mitchellh/mapstructure v1.4.2 h1:6h7AQ0yhTcIsmFmnAwQls75jp2Gzs4iB8W7pjMO+rqo=
github.com/mitchellh/mapstructure v1.4.2/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/pelletier/go-toml v1.9.4 h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM=
github.com/pelletier/go-toml v1.9.4/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
github.com/plexsystems/promdoc v0.8.0 h1:mNAp+WQkb2yZV5m7PeybHFTPYz+4pbaMCaH8iPLOMog=
github.com/plexsystems/promdoc v0.8.0/go.mod h1:CoTbHLEVPziXN+Y4GozwsiLvgdJqdOBYywqUy40sYuI=
github.com/spf13/afero v1.6.0 h1:xoax2sJ2DT8S8xA2paPFjDCScCNeWsg75VG0DLRreiY=
github.com/spf13/afero v1.6.0/go.mod h1:Ai8FlHk4v/PARR026UzYexafAt9roJ7LcLMAmO6Z93I=
github.com/spf13/cast v1.4.1 h1:s0hze+J0196ZfEMTs80N7UlFt0BDuQ7Q+JDnHiMWKdA=
github.com/spf13/cast v1.4.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cobra v1.2.1 h1:+KmjbUw1hriSNMF55oPrkZcb27aECyrj8V2ytv7kWDw=
github.com/spf13/cobra v1.2.1/go.mod h1:ExllRjgxM/piMAM+3tAZvg8fsklGAf3tPfi+i8t68Nk=
github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmqnqMYADFk=
github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0EXowPYD95IqWIGo=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.9.0 h1:yR6EXjTp0y0cLN8OZg1CRZmOBdI88UcGkhgyJhu6nZk=
github.com/spf13/viper v1.9.0/go.mod h1:+i6ajR7OX2XaiBkrcZJFK21htRk7eDeLg7+O6bhUPP4=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf h1:2ucpDCmfkl8Bd/FsLtiD653Wf96cW37s+iGx93zsu4k=
golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
gopkg.in/ini.v1 v1.63.2 h1:tGK/CyBg7SMzb60vP1M03vNZ3VDu3wGQJwn7Sxi9r3c=
gopkg.in/ini.v1 v1.63.2/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
73 changes: 40 additions & 33 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2799,56 +2799,48 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
g.Go(func() error {
begin := time.Now()

brdr := bufioReaderPool.Get().(*bufio.Reader)
defer bufioReaderPool.Put(brdr)

partReader, err := r.block.bkt.GetRange(ctx, r.block.indexFilename(), start, length)
b, err := r.block.readIndexRange(ctx, start, length)
if err != nil {
return errors.Wrap(err, "read postings range")
}
defer runutil.CloseWithLogOnErr(r.block.logger, partReader, "readIndexRange close range reader")
brdr.Reset(partReader)

rdr := newPostingsReaderBuilder(ctx, brdr, ptrs[i:j], start, length)
fetchTime := time.Since(begin)

r.mtx.Lock()
r.stats.postingsFetchCount++
r.stats.postingsFetched += j - i
r.stats.PostingsFetchDurationSum += fetchTime
r.stats.PostingsFetchedSizeSum += units.Base2Bytes(int(length))
r.mtx.Unlock()

for rdr.Next() {
diffVarintPostings, postingsCount, keyID := rdr.AtDiffVarint()

output[keyID] = newDiffVarintPostings(diffVarintPostings, nil)

startCompression := time.Now()
dataToCache, err := snappyStreamedEncode(int(postingsCount), diffVarintPostings)
for _, p := range ptrs[i:j] {
// index-header can estimate endings, which means we need to resize the endings.
pBytes, err := resizePostings(b[p.ptr.Start-start : p.ptr.End-start])
if err != nil {
r.mtx.Lock()
r.stats.cachedPostingsCompressionErrors += 1
r.mtx.Unlock()
return errors.Wrap(err, "encoding with snappy")
return err
}

// Reencode postings before storing to cache. If that fails, we store original bytes.
// This can only fail, if postings data was somehow corrupted,
// and there is nothing we can do about it.
// Errors from corrupted postings will be reported when postings are used.
bep := newBigEndianPostings(pBytes[4:])
dataToCache, compressionTime, compressionErrors, compressedSize := r.encodePostingsToCache(bep, bep.length())
r.mtx.Lock()
// Return postings and fill LRU cache.
// Truncate first 4 bytes which are length of posting.
output[p.keyID] = newBigEndianPostings(pBytes[4:])

r.block.indexCache.StorePostings(r.block.meta.ULID, keys[p.keyID], dataToCache)

// If we just fetched it we still have to update the stats for touched postings.
r.stats.postingsTouched++
r.stats.PostingsTouchedSizeSum += units.Base2Bytes(int(len(diffVarintPostings)))
r.stats.PostingsTouchedSizeSum += units.Base2Bytes(len(pBytes))
r.stats.cachedPostingsCompressions += 1
r.stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(len(diffVarintPostings))
r.stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(len(dataToCache))
r.stats.CachedPostingsCompressionTimeSum += time.Since(startCompression)
r.stats.cachedPostingsCompressionErrors += compressionErrors
r.stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(len(pBytes))
r.stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(compressedSize)
r.stats.CachedPostingsCompressionTimeSum += compressionTime
r.mtx.Unlock()

r.block.indexCache.StorePostings(r.block.meta.ULID, keys[keyID], dataToCache)
}

r.mtx.Lock()
r.stats.PostingsFetchDurationSum += time.Since(begin)
r.mtx.Unlock()

if rdr.Error() != nil {
return errors.Wrap(err, "reading postings")
}
return nil
})
Expand Down Expand Up @@ -2897,6 +2889,21 @@ func (r *bucketIndexReader) encodePostingsToCache(p index.Postings, length int)
return dataToCache, compressionTime, compressionErrors, compressedSize
}

func resizePostings(b []byte) ([]byte, error) {
d := encoding.Decbuf{B: b}
n := d.Be32int()
if d.Err() != nil {
return nil, errors.Wrap(d.Err(), "read postings list")
}

// 4 for postings number of entries, then 4, foreach each big endian posting.
size := 4 + n*4
if len(b) < size {
return nil, encoding.ErrInvalidSize
}
return b[:size], nil
}

// bigEndianPostings implements the Postings interface over a byte stream of
// big endian numbers.
type bigEndianPostings struct {
Expand Down

0 comments on commit ddaae70

Please sign in to comment.