Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: read postings directly into diffvarint format #6442

Merged
merged 1 commit into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .bingo/promdoc.sum
Original file line number Diff line number Diff line change
@@ -1,18 +1,34 @@
github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI=
github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/magiconair/properties v1.8.5 h1:b6kJs+EmPFMYGkow9GiUyCyOvIwYetYJ3fSaWak/Gls=
github.com/magiconair/properties v1.8.5/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60=
github.com/mitchellh/mapstructure v1.4.2 h1:6h7AQ0yhTcIsmFmnAwQls75jp2Gzs4iB8W7pjMO+rqo=
github.com/mitchellh/mapstructure v1.4.2/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/pelletier/go-toml v1.9.4 h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM=
github.com/pelletier/go-toml v1.9.4/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
github.com/plexsystems/promdoc v0.8.0 h1:mNAp+WQkb2yZV5m7PeybHFTPYz+4pbaMCaH8iPLOMog=
github.com/plexsystems/promdoc v0.8.0/go.mod h1:CoTbHLEVPziXN+Y4GozwsiLvgdJqdOBYywqUy40sYuI=
github.com/spf13/afero v1.6.0 h1:xoax2sJ2DT8S8xA2paPFjDCScCNeWsg75VG0DLRreiY=
github.com/spf13/afero v1.6.0/go.mod h1:Ai8FlHk4v/PARR026UzYexafAt9roJ7LcLMAmO6Z93I=
github.com/spf13/cast v1.4.1 h1:s0hze+J0196ZfEMTs80N7UlFt0BDuQ7Q+JDnHiMWKdA=
github.com/spf13/cast v1.4.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cobra v1.2.1 h1:+KmjbUw1hriSNMF55oPrkZcb27aECyrj8V2ytv7kWDw=
github.com/spf13/cobra v1.2.1/go.mod h1:ExllRjgxM/piMAM+3tAZvg8fsklGAf3tPfi+i8t68Nk=
github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmqnqMYADFk=
github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0EXowPYD95IqWIGo=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.9.0 h1:yR6EXjTp0y0cLN8OZg1CRZmOBdI88UcGkhgyJhu6nZk=
github.com/spf13/viper v1.9.0/go.mod h1:+i6ajR7OX2XaiBkrcZJFK21htRk7eDeLg7+O6bhUPP4=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf h1:2ucpDCmfkl8Bd/FsLtiD653Wf96cW37s+iGx93zsu4k=
golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
gopkg.in/ini.v1 v1.63.2 h1:tGK/CyBg7SMzb60vP1M03vNZ3VDu3wGQJwn7Sxi9r3c=
gopkg.in/ini.v1 v1.63.2/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
79 changes: 39 additions & 40 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2486,6 +2486,12 @@ func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context,
return true, ps, nil
}

var bufioReaderPool = sync.Pool{
New: func() any {
return bufio.NewReader(nil)
},
}

// fetchPostings fill postings requested by posting groups.
// It returns one posting for each key, in the same order.
// If postings for given key is not fetched, entry at given index will be nil.
Expand Down Expand Up @@ -2570,52 +2576,60 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
// We assume index does not have any ptrs that has 0 length.
length := int64(part.End) - start

brdr := bufioReaderPool.Get().(*bufio.Reader)
defer bufioReaderPool.Put(brdr)

// Fetch from object storage concurrently and update stats and posting list.
g.Go(func() error {
begin := time.Now()

b, err := r.block.readIndexRange(ctx, start, length)
partReader, err := r.block.bkt.GetRange(ctx, r.block.indexFilename(), start, length)
if err != nil {
return errors.Wrap(err, "read postings range")
}
fetchTime := time.Since(begin)
defer runutil.CloseWithLogOnErr(r.block.logger, partReader, "readIndexRange close range reader")
brdr.Reset(partReader)

rdr := newPostingsReaderBuilder(ctx, brdr, ptrs[i:j], start, length)

r.mtx.Lock()
r.stats.postingsFetchCount++
r.stats.postingsFetched += j - i
r.stats.PostingsFetchDurationSum += fetchTime
r.stats.PostingsFetchedSizeSum += units.Base2Bytes(int(length))
r.mtx.Unlock()

for _, p := range ptrs[i:j] {
// index-header can estimate endings, which means we need to resize the endings.
pBytes, err := resizePostings(b[p.ptr.Start-start : p.ptr.End-start])
for rdr.Next() {
diffVarintPostings, postingsCount, keyID := rdr.AtDiffVarint()

output[keyID] = newDiffVarintPostings(diffVarintPostings, nil)

startCompression := time.Now()
dataToCache, err := snappyStreamedEncode(int(postingsCount), diffVarintPostings)
if err != nil {
return err
r.mtx.Lock()
r.stats.cachedPostingsCompressionErrors += 1
r.mtx.Unlock()
return errors.Wrap(err, "encoding with snappy")
}

// Reencode postings before storing to cache. If that fails, we store original bytes.
// This can only fail, if postings data was somehow corrupted,
// and there is nothing we can do about it.
// Errors from corrupted postings will be reported when postings are used.
bep := newBigEndianPostings(pBytes[4:])
dataToCache, compressionTime, compressionErrors, compressedSize := r.encodePostingsToCache(bep, bep.length())
r.mtx.Lock()
// Return postings and fill LRU cache.
// Truncate first 4 bytes which are length of posting.
output[p.keyID] = newBigEndianPostings(pBytes[4:])

r.block.indexCache.StorePostings(r.block.meta.ULID, keys[p.keyID], dataToCache)

// If we just fetched it we still have to update the stats for touched postings.
r.stats.postingsTouched++
r.stats.PostingsTouchedSizeSum += units.Base2Bytes(len(pBytes))
r.stats.PostingsTouchedSizeSum += units.Base2Bytes(int(len(diffVarintPostings)))
r.stats.cachedPostingsCompressions += 1
r.stats.cachedPostingsCompressionErrors += compressionErrors
r.stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(len(pBytes))
r.stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(compressedSize)
r.stats.CachedPostingsCompressionTimeSum += compressionTime
r.stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(len(diffVarintPostings))
r.stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(len(dataToCache))
r.stats.CachedPostingsCompressionTimeSum += time.Since(startCompression)
r.mtx.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if it is better to hold the lock only once and add to stats at the end? But it is a small nit

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is really an issue, maybe we can change next pr


r.block.indexCache.StorePostings(r.block.meta.ULID, keys[keyID], dataToCache)
}

r.mtx.Lock()
r.stats.PostingsFetchDurationSum += time.Since(begin)
r.mtx.Unlock()

if rdr.Error() != nil {
return errors.Wrap(err, "reading postings")
}
return nil
})
Expand Down Expand Up @@ -2665,21 +2679,6 @@ func (r *bucketIndexReader) encodePostingsToCache(p index.Postings, length int)
return dataToCache, compressionTime, compressionErrors, compressedSize
}

func resizePostings(b []byte) ([]byte, error) {
d := encoding.Decbuf{B: b}
n := d.Be32int()
if d.Err() != nil {
return nil, errors.Wrap(d.Err(), "read postings list")
}

// 4 for postings number of entries, then 4, foreach each big endian posting.
size := 4 + n*4
if len(b) < size {
return nil, encoding.ErrInvalidSize
}
return b[:size], nil
}

// bigEndianPostings implements the Postings interface over a byte stream of
// big endian numbers.
type bigEndianPostings struct {
Expand Down
142 changes: 142 additions & 0 deletions pkg/store/postings.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package store

import (
"bufio"
"context"
"encoding/binary"
"fmt"
"io"

"github.com/pkg/errors"
)

type postingsReaderBuilder struct {
e error
readBuf []byte

r *bufio.Reader
postings []postingPtr

lastOffset int64
pi int

start, length int64
cur []byte
keyID int
repeatFor int
numberOfPostingsInCur uint64
uvarintEncodeBuf []byte
ctx context.Context
}

// newPostingsReaderBuilder is a builder that reads directly from the index
// and builds a diff varint encoded []byte that could be later used directly.
func newPostingsReaderBuilder(ctx context.Context, r *bufio.Reader, postings []postingPtr, start, length int64) *postingsReaderBuilder {
prb := &postingsReaderBuilder{
r: r,
readBuf: make([]byte, 4),
start: start,
length: length,
postings: postings,
uvarintEncodeBuf: make([]byte, binary.MaxVarintLen64),
ctx: ctx,
}

return prb
}

func getInt32(r io.Reader, buf []byte) (uint32, error) {
read, err := r.Read(buf)
if err != nil {
return 0, errors.Wrap(err, "reading")
}
if read != 4 {
return 0, fmt.Errorf("read got %d bytes instead of 4", read)
}
return binary.BigEndian.Uint32(buf), nil
}

func (r *postingsReaderBuilder) Next() bool {
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
if r.ctx.Err() != nil {
r.e = r.ctx.Err()
return false
}
if r.repeatFor > 0 {
r.keyID = r.postings[r.pi-r.repeatFor].keyID
r.repeatFor--
return true
}
if r.pi >= len(r.postings) {
return false
}
if r.Error() != nil {
return false
}
from := r.postings[r.pi].ptr.Start - r.start

if from-r.lastOffset < 0 {
panic("would have skipped negative bytes")
}

_, err := r.r.Discard(int(from - r.lastOffset))
if err != nil {
return false
}
r.lastOffset += from - r.lastOffset

postingsCount, err := getInt32(r.r, r.readBuf[:])
if err != nil {
r.e = err
return false
}
r.lastOffset += 4

// Assume 1.25 bytes per compressed posting.
r.cur = make([]byte, 0, int(float64(postingsCount)*1.25))

prev := uint32(0)

for i := 0; i < int(postingsCount); i++ {
posting, err := getInt32(r.r, r.readBuf[:])
if err != nil {
r.e = err
return false
}
r.lastOffset += 4

uvarintSize := binary.PutUvarint(r.uvarintEncodeBuf, uint64(posting-prev))
r.cur = append(r.cur, r.uvarintEncodeBuf[:uvarintSize]...)
prev = posting
}
r.numberOfPostingsInCur = uint64(postingsCount)

r.keyID = r.postings[r.pi].keyID
r.pi++
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
for {
if r.pi >= len(r.postings) {
break
}

if r.postings[r.pi].ptr.Start == r.postings[r.pi-1].ptr.Start &&
r.postings[r.pi].ptr.End == r.postings[r.pi-1].ptr.End {
r.repeatFor++
r.pi++
continue
}

break
}

return true
}

func (r *postingsReaderBuilder) Error() error {
return r.e
}

func (r *postingsReaderBuilder) AtDiffVarint() ([]byte, uint64, int) {
return r.cur, r.numberOfPostingsInCur, r.keyID
}
24 changes: 24 additions & 0 deletions pkg/store/postings_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,3 +339,27 @@ func (it *diffVarintPostings) Seek(x storage.SeriesRef) bool {
func (it *diffVarintPostings) Err() error {
return it.buf.Err()
}

func snappyStreamedEncode(postingsLength int, diffVarintPostings []byte) ([]byte, error) {
compressedBuf := bytes.NewBuffer(make([]byte, 0, estimateSnappyStreamSize(postingsLength)))
if n, err := compressedBuf.WriteString(codecHeaderStreamedSnappy); err != nil {
return nil, fmt.Errorf("writing streamed snappy header")
} else if n != len(codecHeaderStreamedSnappy) {
return nil, fmt.Errorf("short-write streamed snappy header")
}

sw, err := extsnappy.Compressor.Compress(compressedBuf)
if err != nil {
return nil, fmt.Errorf("creating snappy compressor: %w", err)
}

_, err = sw.Write(diffVarintPostings)
if err != nil {
return nil, err
}
if err := sw.Close(); err != nil {
return nil, errors.Wrap(err, "closing snappy stream writer")
}

return compressedBuf.Bytes(), nil
}