Skip to content

Commit

Permalink
Store/Receivers: Calculating chunk hashes on stores/receivers (thanos…
Browse files Browse the repository at this point in the history
…-io#5703)

* Adding hashes to raw chunks and populating it from bucket store

Signed-off-by: Pedro Tanaka <[email protected]>

* Adding flag on SeriesRequest to control whether to calculate or not hashes

Signed-off-by: Pedro Tanaka <[email protected]>

* Only calculate hash on series request for bucket when needed

Signed-off-by: Pedro Tanaka <[email protected]>

* implement e2e test for prometheus store

Signed-off-by: Pedro Tanaka <[email protected]>

fixing flaky e2e tests

Signed-off-by: Pedro Tanaka <[email protected]>

fix integration test for prometheus

Signed-off-by: Pedro Tanaka <[email protected]>

* calculating hash on TSDB Series() request as well

Signed-off-by: Pedro Tanaka <[email protected]>

Final fix on error linting

Signed-off-by: Pedro Tanaka <[email protected]>

* Using sync.Pool to avoid gc pressure

Signed-off-by: Pedro Tanaka <[email protected]>

* Fixing whitespace

Signed-off-by: Pedro Tanaka <[email protected]>

* Fixing nit on proto

Signed-off-by: Pedro Tanaka <[email protected]>

* Fixing hashpool

Signed-off-by: Pedro Tanaka <[email protected]>

* Using pool from callee instead of inside helper function

Signed-off-by: Pedro Tanaka <[email protected]>

* Reusing the hasher pool in other places

Signed-off-by: Pedro Tanaka <[email protected]>

* Calculating hashes by default

Signed-off-by: Pedro Tanaka <[email protected]>

* Adjusting e2e and integration tests

Signed-off-by: Pedro Tanaka <[email protected]>

* fixing linting and e2e test

Signed-off-by: Pedro Tanaka <[email protected]>

* Fixing new call format for tsdb.NewHead

Signed-off-by: Pedro Tanaka <[email protected]>

* Adding option to control whether to hash chunks on stores or not

Signed-off-by: Pedro Tanaka <[email protected]>

Signed-off-by: Pedro Tanaka <[email protected]>
Signed-off-by: Pedro Tanaka <[email protected]>
  • Loading branch information
pedro-stanaka authored and Nathaniel Graham committed May 18, 2023
1 parent d4630cc commit 10d08a0
Show file tree
Hide file tree
Showing 12 changed files with 359 additions and 68 deletions.
1 change: 1 addition & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ func runStore(
store.WithQueryGate(queriesGate),
store.WithChunkPool(chunkPool),
store.WithFilterConfig(conf.filterConf),
store.WithChunkHashCalculation(true),
}

if conf.debugLogging {
Expand Down
1 change: 1 addition & 0 deletions internal/cortex/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro
store.WithIndexCache(u.indexCache),
store.WithQueryGate(u.queryGate),
store.WithChunkPool(u.chunksPool),
store.WithChunkHashCalculation(true),
}
if u.logLevel.String() == "debug" {
bucketStoreOpts = append(bucketStoreOpts, store.WithDebugLogging())
Expand Down
80 changes: 56 additions & 24 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"encoding/binary"
"fmt"
"hash"
"io"
"math"
"os"
Expand All @@ -19,6 +20,8 @@ import (
"sync"
"time"

"github.com/cespare/xxhash"

"github.com/alecthomas/units"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -96,10 +99,13 @@ const (
labelDecode = "decode"

minBlockSyncConcurrency = 1

enableChunkHashCalculation = true
)

var (
errBlockSyncConcurrencyNotValid = errors.New("the block sync concurrency must be equal or greater than 1.")
hashPool = sync.Pool{New: func() interface{} { return xxhash.New() }}
)

type bucketStoreMetrics struct {
Expand Down Expand Up @@ -325,10 +331,12 @@ type BucketStore struct {

// Enables hints in the Series() response.
enableSeriesResponseHints bool

enableChunkHashCalculation bool
}

func (b *BucketStore) validate() error {
if b.blockSyncConcurrency < minBlockSyncConcurrency {
func (s *BucketStore) validate() error {
if s.blockSyncConcurrency < minBlockSyncConcurrency {
return errBlockSyncConcurrencyNotValid
}
return nil
Expand Down Expand Up @@ -398,6 +406,12 @@ func WithDebugLogging() BucketStoreOption {
}
}

func WithChunkHashCalculation(enableChunkHashCalculation bool) BucketStoreOption {
return func(s *BucketStore) {
s.enableChunkHashCalculation = enableChunkHashCalculation
}
}

// NewBucketStore creates a new bucket backed store that implements the store API against
// an object store bucket. It is optimized to work against high latency backends.
func NewBucketStore(
Expand Down Expand Up @@ -436,6 +450,7 @@ func NewBucketStore(
enableCompatibilityLabel: enableCompatibilityLabel,
postingOffsetsInMemSampling: postingOffsetsInMemSampling,
enableSeriesResponseHints: enableSeriesResponseHints,
enableChunkHashCalculation: enableChunkHashCalculation,
}

for _, option := range options {
Expand Down Expand Up @@ -794,17 +809,18 @@ func (s *bucketSeriesSet) Err() error {
// blockSeries returns series matching given matchers, that have some data in given time range.
func blockSeries(
ctx context.Context,
extLset labels.Labels, // External labels added to the returned series labels.
indexr *bucketIndexReader, // Index reader for block.
chunkr *bucketChunkReader, // Chunk reader for block.
matchers []*labels.Matcher, // Series matchers.
chunksLimiter ChunksLimiter, // Rate limiter for loading chunks.
seriesLimiter SeriesLimiter, // Rate limiter for loading series.
skipChunks bool, // If true, chunks are not loaded.
minTime, maxTime int64, // Series must have data in this time range to be returned.
loadAggregates []storepb.Aggr, // List of aggregates to load when loading chunks.
extLset labels.Labels,
indexr *bucketIndexReader,
chunkr *bucketChunkReader,
matchers []*labels.Matcher,
chunksLimiter ChunksLimiter,
seriesLimiter SeriesLimiter,
skipChunks bool,
minTime, maxTime int64,
loadAggregates []storepb.Aggr,
shardMatcher *storepb.ShardMatcher,
emptyPostingsCount prometheus.Counter,
calculateChunkHash bool,
) (storepb.SeriesSet, *queryStats, error) {
ps, err := indexr.ExpandedPostings(ctx, matchers)
if err != nil {
Expand Down Expand Up @@ -889,20 +905,23 @@ func blockSeries(
return newBucketSeriesSet(res), indexr.stats, nil
}

if err := chunkr.load(ctx, res, loadAggregates); err != nil {
if err := chunkr.load(ctx, res, loadAggregates, calculateChunkHash); err != nil {
return nil, nil, errors.Wrap(err, "load chunks")
}

return newBucketSeriesSet(res), indexr.stats.merge(chunkr.stats), nil
}

func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr, save func([]byte) ([]byte, error)) error {
func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr, save func([]byte) ([]byte, error), calculateChecksum bool) error {
hasher := hashPool.Get().(hash.Hash64)
defer hashPool.Put(hasher)

if in.Encoding() == chunkenc.EncXOR {
b, err := save(in.Bytes())
if err != nil {
return err
}
out.Raw = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b}
out.Raw = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b, Hash: hashChunk(hasher, b, calculateChecksum)}
return nil
}
if in.Encoding() != downsample.ChunkEncAggr {
Expand All @@ -922,7 +941,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag
if err != nil {
return err
}
out.Count = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b}
out.Count = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b, Hash: hashChunk(hasher, b, calculateChecksum)}
case storepb.Aggr_SUM:
x, err := ac.Get(downsample.AggrSum)
if err != nil {
Expand All @@ -932,7 +951,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag
if err != nil {
return err
}
out.Sum = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b}
out.Sum = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b, Hash: hashChunk(hasher, b, calculateChecksum)}
case storepb.Aggr_MIN:
x, err := ac.Get(downsample.AggrMin)
if err != nil {
Expand All @@ -942,7 +961,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag
if err != nil {
return err
}
out.Min = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b}
out.Min = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b, Hash: hashChunk(hasher, b, calculateChecksum)}
case storepb.Aggr_MAX:
x, err := ac.Get(downsample.AggrMax)
if err != nil {
Expand All @@ -952,7 +971,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag
if err != nil {
return err
}
out.Max = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b}
out.Max = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b, Hash: hashChunk(hasher, b, calculateChecksum)}
case storepb.Aggr_COUNTER:
x, err := ac.Get(downsample.AggrCounter)
if err != nil {
Expand All @@ -962,12 +981,22 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag
if err != nil {
return err
}
out.Counter = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b}
out.Counter = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b, Hash: hashChunk(hasher, b, calculateChecksum)}
}
}
return nil
}

func hashChunk(hasher hash.Hash64, b []byte, doHash bool) uint64 {
if !doHash {
return 0
}
hasher.Reset()
// Write never returns an error on the hasher implementation
_, _ = hasher.Write(b)
return hasher.Sum64()
}

// debugFoundBlockSetOverview logs on debug level what exactly blocks we used for query in terms of
// labels and resolution. This is important because we allow mixed resolution results, so it is quite crucial
// to be aware what exactly resolution we see on query.
Expand Down Expand Up @@ -1104,6 +1133,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
req.Aggregates,
shardMatcher,
s.metrics.emptyPostingCount,
s.enableChunkHashCalculation,
)
if err != nil {
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
Expand Down Expand Up @@ -1326,6 +1356,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
nil,
nil,
s.metrics.emptyPostingCount,
false,
)
if err != nil {
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
Expand Down Expand Up @@ -1494,6 +1525,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
nil,
nil,
s.metrics.emptyPostingCount,
false,
)
if err != nil {
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
Expand Down Expand Up @@ -2566,7 +2598,7 @@ func (r *bucketChunkReader) addLoad(id chunks.ChunkRef, seriesEntry, chunk int)
}

// load loads all added chunks and saves resulting aggrs to res.
func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr) error {
func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, calculateChunkChecksum bool) error {
g, ctx := errgroup.WithContext(ctx)

for seq, pIdxs := range r.toLoad {
Expand All @@ -2582,7 +2614,7 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [
p := p
indices := pIdxs[p.ElemRng[0]:p.ElemRng[1]]
g.Go(func() error {
return r.loadChunks(ctx, res, aggrs, seq, p, indices)
return r.loadChunks(ctx, res, aggrs, seq, p, indices, calculateChunkChecksum)
})
}
}
Expand All @@ -2591,7 +2623,7 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [

// loadChunks will read range [start, end] from the segment file with sequence number seq.
// This data range covers chunks starting at supplied offsets.
func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, seq int, part Part, pIdxs []loadIdx) error {
func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, seq int, part Part, pIdxs []loadIdx, calculateChunkChecksum bool) error {
fetchBegin := time.Now()

// Get a reader for the required range.
Expand Down Expand Up @@ -2670,7 +2702,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a
// There is also crc32 after the chunk, but we ignore that.
chunkLen = n + 1 + int(chunkDataLen)
if chunkLen <= len(cb) {
err = populateChunk(&(res[pIdx.seriesEntry].chks[pIdx.chunk]), rawChunk(cb[n:chunkLen]), aggrs, r.save)
err = populateChunk(&(res[pIdx.seriesEntry].chks[pIdx.chunk]), rawChunk(cb[n:chunkLen]), aggrs, r.save, calculateChunkChecksum)
if err != nil {
return errors.Wrap(err, "populate chunk")
}
Expand Down Expand Up @@ -2701,7 +2733,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a
r.stats.chunksFetchCount++
r.stats.ChunksFetchDurationSum += time.Since(fetchBegin)
r.stats.ChunksFetchedSizeSum += units.Base2Bytes(len(*nb))
err = populateChunk(&(res[pIdx.seriesEntry].chks[pIdx.chunk]), rawChunk((*nb)[n:]), aggrs, r.save)
err = populateChunk(&(res[pIdx.seriesEntry].chks[pIdx.chunk]), rawChunk((*nb)[n:]), aggrs, r.save, calculateChunkChecksum)
if err != nil {
r.block.chunkPool.Put(nb)
return errors.Wrap(err, "populate chunk")
Expand Down
Loading

0 comments on commit 10d08a0

Please sign in to comment.