Skip to content

Commit

Permalink
store: Enabled postings compressions by default. Removed feature flag. (
Browse files Browse the repository at this point in the history
thanos-io#3452)

Signed-off-by: Bartlomiej Plotka <[email protected]>
Signed-off-by: Matthias Loibl <[email protected]>
  • Loading branch information
metalmatze committed Nov 20, 2020
1 parent 369196b commit 2d8fd38
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 49 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re

### Changed

- [#3452](https://github.com/thanos-io/thanos/pull/3452) Store: Index cache posting compression is now enabled by default. Removed `experimental.enable-index-cache-postings-compression` flag.
- [#3410](https://github.com/thanos-io/thanos/pull/3410) Compactor: Changed metric `thanos_compactor_blocks_marked_for_deletion_total` to `thanos_compactor_blocks_marked_total` with `marker` label.
Compactor will now automatically disable compaction for blocks with large index that would output blocks after compaction larger than specified value (by default: 64GB). This automatically
handles the Promethus [format limit](https://github.com/thanos-io/thanos/issues/1424).
Expand Down
7 changes: 1 addition & 6 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,6 @@ func registerStore(app *extkingpin.App) {
"On the contrary, smaller value will increase baseline memory usage, but improve latency slightly. 1 will keep all in memory. Default value is the same as in Prometheus which gives a good balance.").
Hidden().Default(fmt.Sprintf("%v", store.DefaultPostingOffsetInMemorySampling)).Int()

enablePostingsCompression := cmd.Flag("experimental.enable-index-cache-postings-compression", "If true, Store Gateway will reencode and compress postings before storing them into cache. Compressed postings take about 10% of the original size.").
Hidden().Default("false").Bool()

consistencyDelay := extkingpin.ModelDuration(cmd.Flag("consistency-delay", "Minimum age of all blocks before they are being read. Set it to safe value (e.g 30m) if your object storage is eventually consistent. GCS and S3 are (roughly) strongly consistent.").
Default("0s"))

Expand Down Expand Up @@ -150,7 +147,6 @@ func registerStore(app *extkingpin.App) {
},
selectorRelabelConf,
*advertiseCompatibilityLabel,
*enablePostingsCompression,
time.Duration(*consistencyDelay),
time.Duration(*ignoreDeletionMarksDelay),
*webExternalPrefix,
Expand Down Expand Up @@ -185,7 +181,7 @@ func runStore(
blockSyncConcurrency int,
filterConf *store.FilterConfig,
selectorRelabelConf *extflag.PathOrContent,
advertiseCompatibilityLabel, enablePostingsCompression bool,
advertiseCompatibilityLabel bool,
consistencyDelay time.Duration,
ignoreDeletionMarksDelay time.Duration,
externalPrefix, prefixHeader string,
Expand Down Expand Up @@ -311,7 +307,6 @@ func runStore(
blockSyncConcurrency,
filterConf,
advertiseCompatibilityLabel,
enablePostingsCompression,
postingOffsetsInMemSampling,
false,
lazyIndexReaderEnabled,
Expand Down
63 changes: 27 additions & 36 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,10 @@ type FilterConfig struct {

// BucketStore implements the store API backed by a bucket. It loads all index
// files to local disk.
//
// NOTE: Bucket store reencodes postings using diff+varint+snappy when storing to cache.
// This makes them smaller, but takes extra CPU and memory.
// When used with in-memory cache, memory usage should decrease overall, thanks to postings being smaller.
type BucketStore struct {
logger log.Logger
metrics *bucketStoreMetrics
Expand Down Expand Up @@ -278,10 +282,6 @@ type BucketStore struct {
advLabelSets []labelpb.ZLabelSet
enableCompatibilityLabel bool

// Reencode postings using diff+varint+snappy when storing to cache.
// This makes them smaller, but takes extra CPU and memory.
// When used with in-memory cache, memory usage should decrease overall, thanks to postings being smaller.
enablePostingsCompression bool
postingOffsetsInMemSampling int

// Enables hints in the Series() response.
Expand All @@ -304,7 +304,6 @@ func NewBucketStore(
blockSyncConcurrency int,
filterConfig *FilterConfig,
enableCompatibilityLabel bool,
enablePostingsCompression bool,
postingOffsetsInMemSampling int,
enableSeriesResponseHints bool, // TODO(pracucci) Thanos 0.12 and below doesn't gracefully handle new fields in SeriesResponse. Drop this flag and always enable hints once we can drop backward compatibility.
lazyIndexReaderEnabled bool,
Expand Down Expand Up @@ -336,7 +335,6 @@ func NewBucketStore(
chunksLimiterFactory: chunksLimiterFactory,
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
enableCompatibilityLabel: enableCompatibilityLabel,
enablePostingsCompression: enablePostingsCompression,
postingOffsetsInMemSampling: postingOffsetsInMemSampling,
enableSeriesResponseHints: enableSeriesResponseHints,
metrics: newBucketStoreMetrics(reg),
Expand Down Expand Up @@ -519,7 +517,6 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er
s.chunkPool,
indexHeaderReader,
s.partitioner,
s.enablePostingsCompression,
)
if err != nil {
return errors.Wrap(err, "new bucket block")
Expand Down Expand Up @@ -1379,8 +1376,6 @@ type bucketBlock struct {

partitioner partitioner

enablePostingsCompression bool

// Block's labels used by block-level matchers to filter blocks to query. These are used to select blocks using
// request hints' BlockMatchers.
relabelLabels labels.Labels
Expand All @@ -1397,19 +1392,17 @@ func newBucketBlock(
chunkPool pool.BytesPool,
indexHeadReader indexheader.Reader,
p partitioner,
enablePostingsCompression bool,
) (b *bucketBlock, err error) {
b = &bucketBlock{
logger: logger,
metrics: metrics,
bkt: bkt,
indexCache: indexCache,
chunkPool: chunkPool,
dir: dir,
partitioner: p,
meta: meta,
indexHeaderReader: indexHeadReader,
enablePostingsCompression: enablePostingsCompression,
logger: logger,
metrics: metrics,
bkt: bkt,
indexCache: indexCache,
chunkPool: chunkPool,
dir: dir,
partitioner: p,
meta: meta,
indexHeaderReader: indexHeadReader,
}

// Translate the block's labels and inject the block ID as a label
Expand Down Expand Up @@ -1849,22 +1842,20 @@ func (r *bucketIndexReader) fetchPostings(keys []labels.Label) ([]index.Postings
compressionTime := time.Duration(0)
compressions, compressionErrors, compressedSize := 0, 0, 0

if r.block.enablePostingsCompression {
// Reencode postings before storing to cache. If that fails, we store original bytes.
// This can only fail, if postings data was somehow corrupted,
// and there is nothing we can do about it.
// Errors from corrupted postings will be reported when postings are used.
compressions++
s := time.Now()
bep := newBigEndianPostings(pBytes[4:])
data, err := diffVarintSnappyEncode(bep, bep.length())
compressionTime = time.Since(s)
if err == nil {
dataToCache = data
compressedSize = len(data)
} else {
compressionErrors = 1
}
// Reencode postings before storing to cache. If that fails, we store original bytes.
// This can only fail, if postings data was somehow corrupted,
// and there is nothing we can do about it.
// Errors from corrupted postings will be reported when postings are used.
compressions++
s := time.Now()
bep := newBigEndianPostings(pBytes[4:])
data, err := diffVarintSnappyEncode(bep, bep.length())
compressionTime = time.Since(s)
if err == nil {
dataToCache = data
compressedSize = len(data)
} else {
compressionErrors = 1
}

r.mtx.Lock()
Expand Down
1 change: 0 additions & 1 deletion pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
20,
filterConf,
true,
true,
DefaultPostingOffsetInMemorySampling,
true,
true,
Expand Down
7 changes: 1 addition & 6 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func TestBucketBlock_matchLabels(t *testing.T) {
},
}

b, err := newBucketBlock(context.Background(), log.NewNopLogger(), newBucketStoreMetrics(nil), meta, bkt, path.Join(dir, blockID.String()), nil, nil, nil, nil, true)
b, err := newBucketBlock(context.Background(), log.NewNopLogger(), newBucketStoreMetrics(nil), meta, bkt, path.Join(dir, blockID.String()), nil, nil, nil, nil)
testutil.Ok(t, err)

cases := []struct {
Expand Down Expand Up @@ -579,7 +579,6 @@ func TestBucketStore_Info(t *testing.T) {
20,
allowAllFilterConf,
true,
true,
DefaultPostingOffsetInMemorySampling,
false,
false,
Expand Down Expand Up @@ -831,7 +830,6 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul
20,
allowAllFilterConf,
true,
true,
DefaultPostingOffsetInMemorySampling,
false,
false,
Expand Down Expand Up @@ -1647,7 +1645,6 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) {
10,
nil,
false,
true,
DefaultPostingOffsetInMemorySampling,
true,
false,
Expand Down Expand Up @@ -1741,7 +1738,6 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) {
10,
nil,
false,
true,
DefaultPostingOffsetInMemorySampling,
true,
false,
Expand Down Expand Up @@ -1886,7 +1882,6 @@ func TestBlockWithLargeChunks(t *testing.T) {
10,
nil,
false,
true,
DefaultPostingOffsetInMemorySampling,
true,
false,
Expand Down

0 comments on commit 2d8fd38

Please sign in to comment.