Skip to content

Commit

Permalink
store: read postings directly into delta encoded format (thanos-io#6442)
Browse files Browse the repository at this point in the history
Instead of allocating bytes for raw postings, let's read them directly
into diff varint format to save memory.

Signed-off-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
GiedriusS authored and HC Zhu committed Jun 27, 2023
1 parent 04bdc27 commit 35c7610
Show file tree
Hide file tree
Showing 4 changed files with 221 additions and 40 deletions.
16 changes: 16 additions & 0 deletions .bingo/promdoc.sum
Original file line number Diff line number Diff line change
@@ -1,18 +1,34 @@
github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI=
github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/magiconair/properties v1.8.5 h1:b6kJs+EmPFMYGkow9GiUyCyOvIwYetYJ3fSaWak/Gls=
github.com/magiconair/properties v1.8.5/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60=
github.com/mitchellh/mapstructure v1.4.2 h1:6h7AQ0yhTcIsmFmnAwQls75jp2Gzs4iB8W7pjMO+rqo=
github.com/mitchellh/mapstructure v1.4.2/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/pelletier/go-toml v1.9.4 h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM=
github.com/pelletier/go-toml v1.9.4/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
github.com/plexsystems/promdoc v0.8.0 h1:mNAp+WQkb2yZV5m7PeybHFTPYz+4pbaMCaH8iPLOMog=
github.com/plexsystems/promdoc v0.8.0/go.mod h1:CoTbHLEVPziXN+Y4GozwsiLvgdJqdOBYywqUy40sYuI=
github.com/spf13/afero v1.6.0 h1:xoax2sJ2DT8S8xA2paPFjDCScCNeWsg75VG0DLRreiY=
github.com/spf13/afero v1.6.0/go.mod h1:Ai8FlHk4v/PARR026UzYexafAt9roJ7LcLMAmO6Z93I=
github.com/spf13/cast v1.4.1 h1:s0hze+J0196ZfEMTs80N7UlFt0BDuQ7Q+JDnHiMWKdA=
github.com/spf13/cast v1.4.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cobra v1.2.1 h1:+KmjbUw1hriSNMF55oPrkZcb27aECyrj8V2ytv7kWDw=
github.com/spf13/cobra v1.2.1/go.mod h1:ExllRjgxM/piMAM+3tAZvg8fsklGAf3tPfi+i8t68Nk=
github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmqnqMYADFk=
github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0EXowPYD95IqWIGo=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.9.0 h1:yR6EXjTp0y0cLN8OZg1CRZmOBdI88UcGkhgyJhu6nZk=
github.com/spf13/viper v1.9.0/go.mod h1:+i6ajR7OX2XaiBkrcZJFK21htRk7eDeLg7+O6bhUPP4=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf h1:2ucpDCmfkl8Bd/FsLtiD653Wf96cW37s+iGx93zsu4k=
golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
gopkg.in/ini.v1 v1.63.2 h1:tGK/CyBg7SMzb60vP1M03vNZ3VDu3wGQJwn7Sxi9r3c=
gopkg.in/ini.v1 v1.63.2/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
79 changes: 39 additions & 40 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2486,6 +2486,12 @@ func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context,
return true, ps, nil
}

var bufioReaderPool = sync.Pool{
New: func() any {
return bufio.NewReader(nil)
},
}

// fetchPostings fill postings requested by posting groups.
// It returns one posting for each key, in the same order.
// If postings for given key is not fetched, entry at given index will be nil.
Expand Down Expand Up @@ -2570,52 +2576,60 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
// We assume index does not have any ptrs that has 0 length.
length := int64(part.End) - start

brdr := bufioReaderPool.Get().(*bufio.Reader)
defer bufioReaderPool.Put(brdr)

// Fetch from object storage concurrently and update stats and posting list.
g.Go(func() error {
begin := time.Now()

b, err := r.block.readIndexRange(ctx, start, length)
partReader, err := r.block.bkt.GetRange(ctx, r.block.indexFilename(), start, length)
if err != nil {
return errors.Wrap(err, "read postings range")
}
fetchTime := time.Since(begin)
defer runutil.CloseWithLogOnErr(r.block.logger, partReader, "readIndexRange close range reader")
brdr.Reset(partReader)

rdr := newPostingsReaderBuilder(ctx, brdr, ptrs[i:j], start, length)

r.mtx.Lock()
r.stats.postingsFetchCount++
r.stats.postingsFetched += j - i
r.stats.PostingsFetchDurationSum += fetchTime
r.stats.PostingsFetchedSizeSum += units.Base2Bytes(int(length))
r.mtx.Unlock()

for _, p := range ptrs[i:j] {
// index-header can estimate endings, which means we need to resize the endings.
pBytes, err := resizePostings(b[p.ptr.Start-start : p.ptr.End-start])
for rdr.Next() {
diffVarintPostings, postingsCount, keyID := rdr.AtDiffVarint()

output[keyID] = newDiffVarintPostings(diffVarintPostings, nil)

startCompression := time.Now()
dataToCache, err := snappyStreamedEncode(int(postingsCount), diffVarintPostings)
if err != nil {
return err
r.mtx.Lock()
r.stats.cachedPostingsCompressionErrors += 1
r.mtx.Unlock()
return errors.Wrap(err, "encoding with snappy")
}

// Reencode postings before storing to cache. If that fails, we store original bytes.
// This can only fail, if postings data was somehow corrupted,
// and there is nothing we can do about it.
// Errors from corrupted postings will be reported when postings are used.
bep := newBigEndianPostings(pBytes[4:])
dataToCache, compressionTime, compressionErrors, compressedSize := r.encodePostingsToCache(bep, bep.length())
r.mtx.Lock()
// Return postings and fill LRU cache.
// Truncate first 4 bytes which are length of posting.
output[p.keyID] = newBigEndianPostings(pBytes[4:])

r.block.indexCache.StorePostings(r.block.meta.ULID, keys[p.keyID], dataToCache)

// If we just fetched it we still have to update the stats for touched postings.
r.stats.postingsTouched++
r.stats.PostingsTouchedSizeSum += units.Base2Bytes(len(pBytes))
r.stats.PostingsTouchedSizeSum += units.Base2Bytes(int(len(diffVarintPostings)))
r.stats.cachedPostingsCompressions += 1
r.stats.cachedPostingsCompressionErrors += compressionErrors
r.stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(len(pBytes))
r.stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(compressedSize)
r.stats.CachedPostingsCompressionTimeSum += compressionTime
r.stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(len(diffVarintPostings))
r.stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(len(dataToCache))
r.stats.CachedPostingsCompressionTimeSum += time.Since(startCompression)
r.mtx.Unlock()

r.block.indexCache.StorePostings(r.block.meta.ULID, keys[keyID], dataToCache)
}

r.mtx.Lock()
r.stats.PostingsFetchDurationSum += time.Since(begin)
r.mtx.Unlock()

if rdr.Error() != nil {
return errors.Wrap(err, "reading postings")
}
return nil
})
Expand Down Expand Up @@ -2665,21 +2679,6 @@ func (r *bucketIndexReader) encodePostingsToCache(p index.Postings, length int)
return dataToCache, compressionTime, compressionErrors, compressedSize
}

func resizePostings(b []byte) ([]byte, error) {
d := encoding.Decbuf{B: b}
n := d.Be32int()
if d.Err() != nil {
return nil, errors.Wrap(d.Err(), "read postings list")
}

// 4 for postings number of entries, then 4, foreach each big endian posting.
size := 4 + n*4
if len(b) < size {
return nil, encoding.ErrInvalidSize
}
return b[:size], nil
}

// bigEndianPostings implements the Postings interface over a byte stream of
// big endian numbers.
type bigEndianPostings struct {
Expand Down
142 changes: 142 additions & 0 deletions pkg/store/postings.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package store

import (
"bufio"
"context"
"encoding/binary"
"fmt"
"io"

"github.com/pkg/errors"
)

type postingsReaderBuilder struct {
e error
readBuf []byte

r *bufio.Reader
postings []postingPtr

lastOffset int64
pi int

start, length int64
cur []byte
keyID int
repeatFor int
numberOfPostingsInCur uint64
uvarintEncodeBuf []byte
ctx context.Context
}

// newPostingsReaderBuilder is a builder that reads directly from the index
// and builds a diff varint encoded []byte that could be later used directly.
func newPostingsReaderBuilder(ctx context.Context, r *bufio.Reader, postings []postingPtr, start, length int64) *postingsReaderBuilder {
prb := &postingsReaderBuilder{
r: r,
readBuf: make([]byte, 4),
start: start,
length: length,
postings: postings,
uvarintEncodeBuf: make([]byte, binary.MaxVarintLen64),
ctx: ctx,
}

return prb
}

func getInt32(r io.Reader, buf []byte) (uint32, error) {
read, err := r.Read(buf)
if err != nil {
return 0, errors.Wrap(err, "reading")
}
if read != 4 {
return 0, fmt.Errorf("read got %d bytes instead of 4", read)
}
return binary.BigEndian.Uint32(buf), nil
}

func (r *postingsReaderBuilder) Next() bool {
if r.ctx.Err() != nil {
r.e = r.ctx.Err()
return false
}
if r.repeatFor > 0 {
r.keyID = r.postings[r.pi-r.repeatFor].keyID
r.repeatFor--
return true
}
if r.pi >= len(r.postings) {
return false
}
if r.Error() != nil {
return false
}
from := r.postings[r.pi].ptr.Start - r.start

if from-r.lastOffset < 0 {
panic("would have skipped negative bytes")
}

_, err := r.r.Discard(int(from - r.lastOffset))
if err != nil {
return false
}
r.lastOffset += from - r.lastOffset

postingsCount, err := getInt32(r.r, r.readBuf[:])
if err != nil {
r.e = err
return false
}
r.lastOffset += 4

// Assume 1.25 bytes per compressed posting.
r.cur = make([]byte, 0, int(float64(postingsCount)*1.25))

prev := uint32(0)

for i := 0; i < int(postingsCount); i++ {
posting, err := getInt32(r.r, r.readBuf[:])
if err != nil {
r.e = err
return false
}
r.lastOffset += 4

uvarintSize := binary.PutUvarint(r.uvarintEncodeBuf, uint64(posting-prev))
r.cur = append(r.cur, r.uvarintEncodeBuf[:uvarintSize]...)
prev = posting
}
r.numberOfPostingsInCur = uint64(postingsCount)

r.keyID = r.postings[r.pi].keyID
r.pi++
for {
if r.pi >= len(r.postings) {
break
}

if r.postings[r.pi].ptr.Start == r.postings[r.pi-1].ptr.Start &&
r.postings[r.pi].ptr.End == r.postings[r.pi-1].ptr.End {
r.repeatFor++
r.pi++
continue
}

break
}

return true
}

func (r *postingsReaderBuilder) Error() error {
return r.e
}

func (r *postingsReaderBuilder) AtDiffVarint() ([]byte, uint64, int) {
return r.cur, r.numberOfPostingsInCur, r.keyID
}
24 changes: 24 additions & 0 deletions pkg/store/postings_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,3 +339,27 @@ func (it *diffVarintPostings) Seek(x storage.SeriesRef) bool {
func (it *diffVarintPostings) Err() error {
return it.buf.Err()
}

func snappyStreamedEncode(postingsLength int, diffVarintPostings []byte) ([]byte, error) {
compressedBuf := bytes.NewBuffer(make([]byte, 0, estimateSnappyStreamSize(postingsLength)))
if n, err := compressedBuf.WriteString(codecHeaderStreamedSnappy); err != nil {
return nil, fmt.Errorf("writing streamed snappy header")
} else if n != len(codecHeaderStreamedSnappy) {
return nil, fmt.Errorf("short-write streamed snappy header")
}

sw, err := extsnappy.Compressor.Compress(compressedBuf)
if err != nil {
return nil, fmt.Errorf("creating snappy compressor: %w", err)
}

_, err = sw.Write(diffVarintPostings)
if err != nil {
return nil, err
}
if err := sw.Close(); err != nil {
return nil, errors.Wrap(err, "closing snappy stream writer")
}

return compressedBuf.Bytes(), nil
}

0 comments on commit 35c7610

Please sign in to comment.