Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store/Receivers: Calculating chunk hashes on stores/receivers #5703

Merged
merged 16 commits into from
Oct 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ func runStore(
store.WithQueryGate(queriesGate),
store.WithChunkPool(chunkPool),
store.WithFilterConfig(conf.filterConf),
store.WithChunkHashCalculation(true),
}

if conf.debugLogging {
Expand Down
1 change: 1 addition & 0 deletions internal/cortex/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro
store.WithIndexCache(u.indexCache),
store.WithQueryGate(u.queryGate),
store.WithChunkPool(u.chunksPool),
store.WithChunkHashCalculation(true),
}
if u.logLevel.String() == "debug" {
bucketStoreOpts = append(bucketStoreOpts, store.WithDebugLogging())
Expand Down
80 changes: 56 additions & 24 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"encoding/binary"
"fmt"
"hash"
"io"
"math"
"os"
Expand All @@ -19,6 +20,8 @@ import (
"sync"
"time"

"github.com/cespare/xxhash"

"github.com/alecthomas/units"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -96,10 +99,13 @@ const (
labelDecode = "decode"

minBlockSyncConcurrency = 1

enableChunkHashCalculation = true
)

var (
errBlockSyncConcurrencyNotValid = errors.New("the block sync concurrency must be equal or greater than 1.")
hashPool = sync.Pool{New: func() interface{} { return xxhash.New() }}
)

type bucketStoreMetrics struct {
Expand Down Expand Up @@ -316,10 +322,12 @@ type BucketStore struct {

// Enables hints in the Series() response.
enableSeriesResponseHints bool

enableChunkHashCalculation bool
}

func (b *BucketStore) validate() error {
if b.blockSyncConcurrency < minBlockSyncConcurrency {
func (s *BucketStore) validate() error {
if s.blockSyncConcurrency < minBlockSyncConcurrency {
return errBlockSyncConcurrencyNotValid
}
return nil
Expand Down Expand Up @@ -389,6 +397,12 @@ func WithDebugLogging() BucketStoreOption {
}
}

func WithChunkHashCalculation(enableChunkHashCalculation bool) BucketStoreOption {
return func(s *BucketStore) {
s.enableChunkHashCalculation = enableChunkHashCalculation
}
}

// NewBucketStore creates a new bucket backed store that implements the store API against
// an object store bucket. It is optimized to work against high latency backends.
func NewBucketStore(
Expand Down Expand Up @@ -427,6 +441,7 @@ func NewBucketStore(
enableCompatibilityLabel: enableCompatibilityLabel,
postingOffsetsInMemSampling: postingOffsetsInMemSampling,
enableSeriesResponseHints: enableSeriesResponseHints,
enableChunkHashCalculation: enableChunkHashCalculation,
}

for _, option := range options {
Expand Down Expand Up @@ -785,17 +800,18 @@ func (s *bucketSeriesSet) Err() error {
// blockSeries returns series matching given matchers, that have some data in given time range.
func blockSeries(
ctx context.Context,
extLset labels.Labels, // External labels added to the returned series labels.
indexr *bucketIndexReader, // Index reader for block.
chunkr *bucketChunkReader, // Chunk reader for block.
matchers []*labels.Matcher, // Series matchers.
chunksLimiter ChunksLimiter, // Rate limiter for loading chunks.
seriesLimiter SeriesLimiter, // Rate limiter for loading series.
skipChunks bool, // If true, chunks are not loaded.
minTime, maxTime int64, // Series must have data in this time range to be returned.
loadAggregates []storepb.Aggr, // List of aggregates to load when loading chunks.
extLset labels.Labels,
indexr *bucketIndexReader,
chunkr *bucketChunkReader,
matchers []*labels.Matcher,
chunksLimiter ChunksLimiter,
seriesLimiter SeriesLimiter,
skipChunks bool,
minTime, maxTime int64,
loadAggregates []storepb.Aggr,
shardMatcher *storepb.ShardMatcher,
emptyPostingsCount prometheus.Counter,
calculateChunkHash bool,
) (storepb.SeriesSet, *queryStats, error) {
ps, err := indexr.ExpandedPostings(ctx, matchers)
if err != nil {
Expand Down Expand Up @@ -880,20 +896,23 @@ func blockSeries(
return newBucketSeriesSet(res), indexr.stats, nil
}

if err := chunkr.load(ctx, res, loadAggregates); err != nil {
if err := chunkr.load(ctx, res, loadAggregates, calculateChunkHash); err != nil {
return nil, nil, errors.Wrap(err, "load chunks")
}

return newBucketSeriesSet(res), indexr.stats.merge(chunkr.stats), nil
}

func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr, save func([]byte) ([]byte, error)) error {
func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr, save func([]byte) ([]byte, error), calculateChecksum bool) error {
hasher := hashPool.Get().(hash.Hash64)
defer hashPool.Put(hasher)

if in.Encoding() == chunkenc.EncXOR {
b, err := save(in.Bytes())
if err != nil {
return err
}
out.Raw = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b}
out.Raw = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b, Hash: hashChunk(hasher, b, calculateChecksum)}
return nil
}
if in.Encoding() != downsample.ChunkEncAggr {
Expand All @@ -913,7 +932,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag
if err != nil {
return err
}
out.Count = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b}
out.Count = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b, Hash: hashChunk(hasher, b, calculateChecksum)}
case storepb.Aggr_SUM:
x, err := ac.Get(downsample.AggrSum)
if err != nil {
Expand All @@ -923,7 +942,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag
if err != nil {
return err
}
out.Sum = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b}
out.Sum = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b, Hash: hashChunk(hasher, b, calculateChecksum)}
case storepb.Aggr_MIN:
x, err := ac.Get(downsample.AggrMin)
if err != nil {
Expand All @@ -933,7 +952,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag
if err != nil {
return err
}
out.Min = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b}
out.Min = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b, Hash: hashChunk(hasher, b, calculateChecksum)}
case storepb.Aggr_MAX:
x, err := ac.Get(downsample.AggrMax)
if err != nil {
Expand All @@ -943,7 +962,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag
if err != nil {
return err
}
out.Max = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b}
out.Max = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b, Hash: hashChunk(hasher, b, calculateChecksum)}
case storepb.Aggr_COUNTER:
x, err := ac.Get(downsample.AggrCounter)
if err != nil {
Expand All @@ -953,12 +972,22 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag
if err != nil {
return err
}
out.Counter = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b}
out.Counter = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b, Hash: hashChunk(hasher, b, calculateChecksum)}
}
}
return nil
}

func hashChunk(hasher hash.Hash64, b []byte, doHash bool) uint64 {
if !doHash {
return 0
}
hasher.Reset()
// Write never returns an error on the hasher implementation
_, _ = hasher.Write(b)
return hasher.Sum64()
}

// debugFoundBlockSetOverview logs on debug level what exactly blocks we used for query in terms of
// labels and resolution. This is important because we allow mixed resolution results, so it is quite crucial
// to be aware what exactly resolution we see on query.
Expand Down Expand Up @@ -1095,6 +1124,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
req.Aggregates,
shardMatcher,
s.metrics.emptyPostingCount,
s.enableChunkHashCalculation,
)
if err != nil {
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
Expand Down Expand Up @@ -1316,6 +1346,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
nil,
nil,
s.metrics.emptyPostingCount,
false,
)
if err != nil {
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
Expand Down Expand Up @@ -1484,6 +1515,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
nil,
nil,
s.metrics.emptyPostingCount,
false,
)
if err != nil {
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
Expand Down Expand Up @@ -2556,7 +2588,7 @@ func (r *bucketChunkReader) addLoad(id chunks.ChunkRef, seriesEntry, chunk int)
}

// load loads all added chunks and saves resulting aggrs to res.
func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr) error {
func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, calculateChunkChecksum bool) error {
g, ctx := errgroup.WithContext(ctx)

for seq, pIdxs := range r.toLoad {
Expand All @@ -2572,7 +2604,7 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [
p := p
indices := pIdxs[p.ElemRng[0]:p.ElemRng[1]]
g.Go(func() error {
return r.loadChunks(ctx, res, aggrs, seq, p, indices)
return r.loadChunks(ctx, res, aggrs, seq, p, indices, calculateChunkChecksum)
})
}
}
Expand All @@ -2581,7 +2613,7 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [

// loadChunks will read range [start, end] from the segment file with sequence number seq.
// This data range covers chunks starting at supplied offsets.
func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, seq int, part Part, pIdxs []loadIdx) error {
func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, seq int, part Part, pIdxs []loadIdx, calculateChunkChecksum bool) error {
fetchBegin := time.Now()

// Get a reader for the required range.
Expand Down Expand Up @@ -2660,7 +2692,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a
// There is also crc32 after the chunk, but we ignore that.
chunkLen = n + 1 + int(chunkDataLen)
if chunkLen <= len(cb) {
err = populateChunk(&(res[pIdx.seriesEntry].chks[pIdx.chunk]), rawChunk(cb[n:chunkLen]), aggrs, r.save)
err = populateChunk(&(res[pIdx.seriesEntry].chks[pIdx.chunk]), rawChunk(cb[n:chunkLen]), aggrs, r.save, calculateChunkChecksum)
if err != nil {
return errors.Wrap(err, "populate chunk")
}
Expand Down Expand Up @@ -2691,7 +2723,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a
r.stats.chunksFetchCount++
r.stats.ChunksFetchDurationSum += time.Since(fetchBegin)
r.stats.ChunksFetchedSizeSum += units.Base2Bytes(len(*nb))
err = populateChunk(&(res[pIdx.seriesEntry].chks[pIdx.chunk]), rawChunk((*nb)[n:]), aggrs, r.save)
err = populateChunk(&(res[pIdx.seriesEntry].chks[pIdx.chunk]), rawChunk((*nb)[n:]), aggrs, r.save, calculateChunkChecksum)
if err != nil {
r.block.chunkPool.Put(nb)
return errors.Wrap(err, "populate chunk")
Expand Down
Loading