diff --git a/CHANGELOG.md b/CHANGELOG.md index 62d8125c0f..ebe08f24fe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re ### Changed +- [#3452](https://github.com/thanos-io/thanos/pull/3452) Store: Index cache posting compression is now enabled by default. Removed `experimental.enable-index-cache-postings-compression` flag. - [#3410](https://github.com/thanos-io/thanos/pull/3410) Compactor: Changed metric `thanos_compactor_blocks_marked_for_deletion_total` to `thanos_compactor_blocks_marked_total` with `marker` label. Compactor will now automatically disable compaction for blocks with large index that would output blocks after compaction larger than specified value (by default: 64GB). This automatically handles the Promethus [format limit](https://github.com/thanos-io/thanos/issues/1424). diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 352d660414..85735ece6f 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -94,9 +94,6 @@ func registerStore(app *extkingpin.App) { "On the contrary, smaller value will increase baseline memory usage, but improve latency slightly. 1 will keep all in memory. Default value is the same as in Prometheus which gives a good balance."). Hidden().Default(fmt.Sprintf("%v", store.DefaultPostingOffsetInMemorySampling)).Int() - enablePostingsCompression := cmd.Flag("experimental.enable-index-cache-postings-compression", "If true, Store Gateway will reencode and compress postings before storing them into cache. Compressed postings take about 10% of the original size."). - Hidden().Default("false").Bool() - consistencyDelay := extkingpin.ModelDuration(cmd.Flag("consistency-delay", "Minimum age of all blocks before they are being read. Set it to safe value (e.g 30m) if your object storage is eventually consistent. GCS and S3 are (roughly) strongly consistent."). Default("0s")) @@ -150,7 +147,6 @@ func registerStore(app *extkingpin.App) { }, selectorRelabelConf, *advertiseCompatibilityLabel, - *enablePostingsCompression, time.Duration(*consistencyDelay), time.Duration(*ignoreDeletionMarksDelay), *webExternalPrefix, @@ -185,7 +181,7 @@ func runStore( blockSyncConcurrency int, filterConf *store.FilterConfig, selectorRelabelConf *extflag.PathOrContent, - advertiseCompatibilityLabel, enablePostingsCompression bool, + advertiseCompatibilityLabel bool, consistencyDelay time.Duration, ignoreDeletionMarksDelay time.Duration, externalPrefix, prefixHeader string, @@ -311,7 +307,6 @@ func runStore( blockSyncConcurrency, filterConf, advertiseCompatibilityLabel, - enablePostingsCompression, postingOffsetsInMemSampling, false, lazyIndexReaderEnabled, diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index f470b56c35..1d0195c529 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -247,6 +247,10 @@ type FilterConfig struct { // BucketStore implements the store API backed by a bucket. It loads all index // files to local disk. +// +// NOTE: Bucket store reencodes postings using diff+varint+snappy when storing to cache. +// This makes them smaller, but takes extra CPU and memory. +// When used with in-memory cache, memory usage should decrease overall, thanks to postings being smaller. type BucketStore struct { logger log.Logger metrics *bucketStoreMetrics @@ -278,10 +282,6 @@ type BucketStore struct { advLabelSets []labelpb.ZLabelSet enableCompatibilityLabel bool - // Reencode postings using diff+varint+snappy when storing to cache. - // This makes them smaller, but takes extra CPU and memory. - // When used with in-memory cache, memory usage should decrease overall, thanks to postings being smaller. - enablePostingsCompression bool postingOffsetsInMemSampling int // Enables hints in the Series() response. @@ -304,7 +304,6 @@ func NewBucketStore( blockSyncConcurrency int, filterConfig *FilterConfig, enableCompatibilityLabel bool, - enablePostingsCompression bool, postingOffsetsInMemSampling int, enableSeriesResponseHints bool, // TODO(pracucci) Thanos 0.12 and below doesn't gracefully handle new fields in SeriesResponse. Drop this flag and always enable hints once we can drop backward compatibility. lazyIndexReaderEnabled bool, @@ -336,7 +335,6 @@ func NewBucketStore( chunksLimiterFactory: chunksLimiterFactory, partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize}, enableCompatibilityLabel: enableCompatibilityLabel, - enablePostingsCompression: enablePostingsCompression, postingOffsetsInMemSampling: postingOffsetsInMemSampling, enableSeriesResponseHints: enableSeriesResponseHints, metrics: newBucketStoreMetrics(reg), @@ -519,7 +517,6 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er s.chunkPool, indexHeaderReader, s.partitioner, - s.enablePostingsCompression, ) if err != nil { return errors.Wrap(err, "new bucket block") @@ -1379,8 +1376,6 @@ type bucketBlock struct { partitioner partitioner - enablePostingsCompression bool - // Block's labels used by block-level matchers to filter blocks to query. These are used to select blocks using // request hints' BlockMatchers. relabelLabels labels.Labels @@ -1397,19 +1392,17 @@ func newBucketBlock( chunkPool pool.BytesPool, indexHeadReader indexheader.Reader, p partitioner, - enablePostingsCompression bool, ) (b *bucketBlock, err error) { b = &bucketBlock{ - logger: logger, - metrics: metrics, - bkt: bkt, - indexCache: indexCache, - chunkPool: chunkPool, - dir: dir, - partitioner: p, - meta: meta, - indexHeaderReader: indexHeadReader, - enablePostingsCompression: enablePostingsCompression, + logger: logger, + metrics: metrics, + bkt: bkt, + indexCache: indexCache, + chunkPool: chunkPool, + dir: dir, + partitioner: p, + meta: meta, + indexHeaderReader: indexHeadReader, } // Translate the block's labels and inject the block ID as a label @@ -1849,22 +1842,20 @@ func (r *bucketIndexReader) fetchPostings(keys []labels.Label) ([]index.Postings compressionTime := time.Duration(0) compressions, compressionErrors, compressedSize := 0, 0, 0 - if r.block.enablePostingsCompression { - // Reencode postings before storing to cache. If that fails, we store original bytes. - // This can only fail, if postings data was somehow corrupted, - // and there is nothing we can do about it. - // Errors from corrupted postings will be reported when postings are used. - compressions++ - s := time.Now() - bep := newBigEndianPostings(pBytes[4:]) - data, err := diffVarintSnappyEncode(bep, bep.length()) - compressionTime = time.Since(s) - if err == nil { - dataToCache = data - compressedSize = len(data) - } else { - compressionErrors = 1 - } + // Reencode postings before storing to cache. If that fails, we store original bytes. + // This can only fail, if postings data was somehow corrupted, + // and there is nothing we can do about it. + // Errors from corrupted postings will be reported when postings are used. + compressions++ + s := time.Now() + bep := newBigEndianPostings(pBytes[4:]) + data, err := diffVarintSnappyEncode(bep, bep.length()) + compressionTime = time.Since(s) + if err == nil { + dataToCache = data + compressedSize = len(data) + } else { + compressionErrors = 1 } r.mtx.Lock() diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index e7af44eeea..482ae959fc 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -168,7 +168,6 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m 20, filterConf, true, - true, DefaultPostingOffsetInMemorySampling, true, true, diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index d2a63d214d..72ba689fe9 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -208,7 +208,7 @@ func TestBucketBlock_matchLabels(t *testing.T) { }, } - b, err := newBucketBlock(context.Background(), log.NewNopLogger(), newBucketStoreMetrics(nil), meta, bkt, path.Join(dir, blockID.String()), nil, nil, nil, nil, true) + b, err := newBucketBlock(context.Background(), log.NewNopLogger(), newBucketStoreMetrics(nil), meta, bkt, path.Join(dir, blockID.String()), nil, nil, nil, nil) testutil.Ok(t, err) cases := []struct { @@ -579,7 +579,6 @@ func TestBucketStore_Info(t *testing.T) { 20, allowAllFilterConf, true, - true, DefaultPostingOffsetInMemorySampling, false, false, @@ -831,7 +830,6 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul 20, allowAllFilterConf, true, - true, DefaultPostingOffsetInMemorySampling, false, false, @@ -1647,7 +1645,6 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) { 10, nil, false, - true, DefaultPostingOffsetInMemorySampling, true, false, @@ -1741,7 +1738,6 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) { 10, nil, false, - true, DefaultPostingOffsetInMemorySampling, true, false, @@ -1886,7 +1882,6 @@ func TestBlockWithLargeChunks(t *testing.T) { 10, nil, false, - true, DefaultPostingOffsetInMemorySampling, true, false,