Skip to content

Commit

Permalink
store: added option to reencode and compress postings before storing …
Browse files Browse the repository at this point in the history
…them to the cache (#2297)

* Added "diff+varint+snappy" codec for postings.

Signed-off-by: Peter Štibraný <[email protected]>

* Added option to reencode and compress postings stored in cache

Signed-off-by: Peter Štibraný <[email protected]>

* Expose enablePostingsCompression flag as CLI parameter.

Signed-off-by: Peter Štibraný <[email protected]>

* Use "github.com/pkg/errors" instead of "errors" package.

Signed-off-by: Peter Štibraný <[email protected]>

* remove break

Signed-off-by: Peter Štibraný <[email protected]>

* Removed empty branch

Signed-off-by: Peter Štibraný <[email protected]>

* Added copyright headers.

Signed-off-by: Peter Štibraný <[email protected]>

* Added CHANGELOG.md entry

Signed-off-by: Peter Štibraný <[email protected]>

* Added comments.

Signed-off-by: Peter Štibraný <[email protected]>

* Use Encbuf and Decbuf.

Signed-off-by: Peter Štibraný <[email protected]>

* Fix comments in test file.

Signed-off-by: Peter Štibraný <[email protected]>

* Another comment...

Signed-off-by: Peter Štibraný <[email protected]>

* Removed diffVarintSnappyEncode function.

Signed-off-by: Peter Štibraný <[email protected]>

* Comment on usage with in-memory cache.

Signed-off-by: Peter Štibraný <[email protected]>

* var block

Signed-off-by: Peter Štibraný <[email protected]>

* Removed extra comment.

Signed-off-by: Peter Štibraný <[email protected]>

* Move comment to error message.

Signed-off-by: Peter Štibraný <[email protected]>

* Separated snappy compression and postings reencoding into two functions.
There is now header only for snappy-compressed postings.

Signed-off-by: Peter Štibraný <[email protected]>

* Added comment on using diff+varint+snappy.

Signed-off-by: Peter Štibraný <[email protected]>

* Shorten header

Signed-off-by: Peter Štibraný <[email protected]>

* Lint...

Signed-off-by: Peter Štibraný <[email protected]>

* Changed experimental.enable-postings-compression to experimental.enable-index-cache-postings-compression

Signed-off-by: Peter Štibraný <[email protected]>

* Added metrics for postings compression

Signed-off-by: Peter Štibraný <[email protected]>

* Added metrics for postings decompression

Signed-off-by: Peter Štibraný <[email protected]>

* Reorder metrics

Signed-off-by: Peter Štibraný <[email protected]>

* Fixed comment.

Signed-off-by: Peter Štibraný <[email protected]>

* Fixed comment.

Signed-off-by: Peter Štibraný <[email protected]>

* Use encode/decode labels.

Signed-off-by: Peter Štibraný <[email protected]>
  • Loading branch information
pstibrany authored Mar 26, 2020
1 parent b60b3e9 commit 214ff44
Show file tree
Hide file tree
Showing 7 changed files with 472 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#2250](https://github.com/thanos-io/thanos/pull/2250) Compactor: Enable vertical compaction for offline deduplication (Experimental). Uses `--deduplication.replica-label` flag to specify the replica label to deduplicate on (Hidden). Please note that this uses a NAIVE algorithm for merging (no smart replica deduplication, just chaining samples together). This works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication. We plan to add a smarter algorithm in the following weeks.
- [#1714](https://github.com/thanos-io/thanos/pull/1714) Run the bucket web UI in the compact component when it is run as a long-lived process.
- [#2304](https://github.com/thanos-io/thanos/pull/2304) Store: Added `max_item_size` config option to memcached-based index cache. This should be set to the max item size configured in memcached (`-I` flag) in order to not waste network round-trips to cache items larger than the limit configured in memcached.
- [#2297](https://github.com/thanos-io/thanos/pull/2297) Store Gateway: Add `--experimental.enable-index-cache-postings-compression` flag to enable reencoding and compressing postings before storing them into cache. Compressed postings take about 10% of the original size.

### Changed

Expand Down
6 changes: 6 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
enableIndexHeader := cmd.Flag("experimental.enable-index-header", "If true, Store Gateway will recreate index-header instead of index-cache.json for each block. This will replace index-cache.json permanently once it will be out of experimental stage.").
Hidden().Default("false").Bool()

enablePostingsCompression := cmd.Flag("experimental.enable-index-cache-postings-compression", "If true, Store Gateway will reencode and compress postings before storing them into cache. Compressed postings take about 10% of the original size.").
Hidden().Default("false").Bool()

consistencyDelay := modelDuration(cmd.Flag("consistency-delay", "Minimum age of all blocks before they are being read. Set it to safe value (e.g 30m) if your object storage is eventually consistent. GCS and S3 are (roughly) strongly consistent.").
Default("0s"))

Expand Down Expand Up @@ -126,6 +129,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
selectorRelabelConf,
*advertiseCompatibilityLabel,
*enableIndexHeader,
*enablePostingsCompression,
time.Duration(*consistencyDelay),
time.Duration(*ignoreDeletionMarksDelay),
)
Expand Down Expand Up @@ -160,6 +164,7 @@ func runStore(
selectorRelabelConf *extflag.PathOrContent,
advertiseCompatibilityLabel bool,
enableIndexHeader bool,
enablePostingsCompression bool,
consistencyDelay time.Duration,
ignoreDeletionMarksDelay time.Duration,
) error {
Expand Down Expand Up @@ -265,6 +270,7 @@ func runStore(
filterConf,
advertiseCompatibilityLabel,
enableIndexHeader,
enablePostingsCompression,
)
if err != nil {
return errors.Wrap(err, "create object storage store")
Expand Down
141 changes: 126 additions & 15 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ type bucketStoreMetrics struct {
queriesDropped prometheus.Counter
queriesLimit prometheus.Gauge
seriesRefetches prometheus.Counter

cachedPostingsCompressions *prometheus.CounterVec
cachedPostingsCompressionErrors *prometheus.CounterVec
cachedPostingsCompressionTimeSeconds *prometheus.CounterVec
cachedPostingsOriginalSizeBytes prometheus.Counter
cachedPostingsCompressedSizeBytes prometheus.Counter
}

func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
Expand Down Expand Up @@ -180,6 +186,28 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
Name: "thanos_bucket_store_series_refetches_total",
Help: fmt.Sprintf("Total number of cases where %v bytes was not enough was to fetch series from index, resulting in refetch.", maxSeriesSize),
})

m.cachedPostingsCompressions = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_bucket_store_cached_postings_compressions_total",
Help: "Number of postings compressions before storing to index cache.",
}, []string{"op"})
m.cachedPostingsCompressionErrors = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_bucket_store_cached_postings_compression_errors_total",
Help: "Number of postings compression errors.",
}, []string{"op"})
m.cachedPostingsCompressionTimeSeconds = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_bucket_store_cached_postings_compression_time_seconds",
Help: "Time spent compressing postings before storing them into postings cache.",
}, []string{"op"})
m.cachedPostingsOriginalSizeBytes = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_bucket_store_cached_postings_original_size_bytes_total",
Help: "Original size of postings stored into cache.",
})
m.cachedPostingsCompressedSizeBytes = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_bucket_store_cached_postings_compressed_size_bytes_total",
Help: "Compressed size of postings stored into cache.",
})

return &m
}

Expand Down Expand Up @@ -220,6 +248,11 @@ type BucketStore struct {
advLabelSets []storepb.LabelSet
enableCompatibilityLabel bool
enableIndexHeader bool

// Reencode postings using diff+varint+snappy when storing to cache.
// This makes them smaller, but takes extra CPU and memory.
// When used with in-memory cache, memory usage should decrease overall, thanks to postings being smaller.
enablePostingsCompression bool
}

// NewBucketStore creates a new bucket backed store that implements the store API against
Expand All @@ -239,6 +272,7 @@ func NewBucketStore(
filterConfig *FilterConfig,
enableCompatibilityLabel bool,
enableIndexHeader bool,
enablePostingsCompression bool,
) (*BucketStore, error) {
if logger == nil {
logger = log.NewNopLogger()
Expand Down Expand Up @@ -270,10 +304,11 @@ func NewBucketStore(
maxConcurrent,
extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg),
),
samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped),
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
enableCompatibilityLabel: enableCompatibilityLabel,
enableIndexHeader: enableIndexHeader,
samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped),
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
enableCompatibilityLabel: enableCompatibilityLabel,
enableIndexHeader: enableIndexHeader,
enablePostingsCompression: enablePostingsCompression,
}
s.metrics = metrics

Expand Down Expand Up @@ -455,6 +490,7 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er
indexHeaderReader,
s.partitioner,
s.metrics.seriesRefetches,
s.enablePostingsCompression,
)
if err != nil {
return errors.Wrap(err, "new bucket block")
Expand Down Expand Up @@ -896,6 +932,14 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
s.metrics.seriesDataSizeTouched.WithLabelValues("chunks").Observe(float64(stats.chunksTouchedSizeSum))
s.metrics.seriesDataSizeFetched.WithLabelValues("chunks").Observe(float64(stats.chunksFetchedSizeSum))
s.metrics.resultSeriesCount.Observe(float64(stats.mergedSeriesCount))
s.metrics.cachedPostingsCompressions.WithLabelValues("encode").Add(float64(stats.cachedPostingsCompressions))
s.metrics.cachedPostingsCompressions.WithLabelValues("decode").Add(float64(stats.cachedPostingsDecompressions))
s.metrics.cachedPostingsCompressionErrors.WithLabelValues("encode").Add(float64(stats.cachedPostingsCompressionErrors))
s.metrics.cachedPostingsCompressionErrors.WithLabelValues("decode").Add(float64(stats.cachedPostingsDecompressionErrors))
s.metrics.cachedPostingsCompressionTimeSeconds.WithLabelValues("encode").Add(stats.cachedPostingsCompressionTimeSum.Seconds())
s.metrics.cachedPostingsCompressionTimeSeconds.WithLabelValues("decode").Add(stats.cachedPostingsDecompressionTimeSum.Seconds())
s.metrics.cachedPostingsOriginalSizeBytes.Add(float64(stats.cachedPostingsOriginalSizeSum))
s.metrics.cachedPostingsCompressedSizeBytes.Add(float64(stats.cachedPostingsCompressedSizeSum))

level.Debug(s.logger).Log("msg", "stats query processed",
"stats", fmt.Sprintf("%+v", stats), "err", err)
Expand Down Expand Up @@ -1183,6 +1227,8 @@ type bucketBlock struct {
partitioner partitioner

seriesRefetches prometheus.Counter

enablePostingsCompression bool
}

func newBucketBlock(
Expand All @@ -1196,17 +1242,19 @@ func newBucketBlock(
indexHeadReader indexheader.Reader,
p partitioner,
seriesRefetches prometheus.Counter,
enablePostingsCompression bool,
) (b *bucketBlock, err error) {
b = &bucketBlock{
logger: logger,
bkt: bkt,
indexCache: indexCache,
chunkPool: chunkPool,
dir: dir,
partitioner: p,
meta: meta,
indexHeaderReader: indexHeadReader,
seriesRefetches: seriesRefetches,
logger: logger,
bkt: bkt,
indexCache: indexCache,
chunkPool: chunkPool,
dir: dir,
partitioner: p,
meta: meta,
indexHeaderReader: indexHeadReader,
seriesRefetches: seriesRefetches,
enablePostingsCompression: enablePostingsCompression,
}

// Get object handles for all chunk files.
Expand Down Expand Up @@ -1512,7 +1560,24 @@ func (r *bucketIndexReader) fetchPostings(keys []labels.Label) ([]index.Postings
r.stats.postingsTouched++
r.stats.postingsTouchedSizeSum += len(b)

_, l, err := r.dec.Postings(b)
// Even if this instance is not using compression, there may be compressed
// entries in the cache written by other stores.
var (
l index.Postings
err error
)
if isDiffVarintSnappyEncodedPostings(b) {
s := time.Now()
l, err = diffVarintSnappyDecode(b)
r.stats.cachedPostingsDecompressions += 1
r.stats.cachedPostingsDecompressionTimeSum += time.Since(s)
if err != nil {
r.stats.cachedPostingsDecompressionErrors += 1
}
} else {
_, l, err = r.dec.Postings(b)
}

if err != nil {
return nil, errors.Wrap(err, "decode postings")
}
Expand Down Expand Up @@ -1579,15 +1644,43 @@ func (r *bucketIndexReader) fetchPostings(keys []labels.Label) ([]index.Postings
return err
}

dataToCache := pBytes

compressionTime := time.Duration(0)
compressions, compressionErrors, compressedSize := 0, 0, 0

if r.block.enablePostingsCompression {
// Reencode postings before storing to cache. If that fails, we store original bytes.
// This can only fail, if postings data was somehow corrupted,
// and there is nothing we can do about it.
// Errors from corrupted postings will be reported when postings are used.
compressions++
s := time.Now()
data, err := diffVarintSnappyEncode(newBigEndianPostings(pBytes[4:]))
compressionTime = time.Since(s)
if err == nil {
dataToCache = data
compressedSize = len(data)
} else {
compressionErrors = 1
}
}

r.mtx.Lock()
// Return postings and fill LRU cache.
// Truncate first 4 bytes which are length of posting.
output[p.keyID] = newBigEndianPostings(pBytes[4:])
r.block.indexCache.StorePostings(r.ctx, r.block.meta.ULID, keys[p.keyID], pBytes)

r.block.indexCache.StorePostings(r.ctx, r.block.meta.ULID, keys[p.keyID], dataToCache)

// If we just fetched it we still have to update the stats for touched postings.
r.stats.postingsTouched++
r.stats.postingsTouchedSizeSum += len(pBytes)
r.stats.cachedPostingsCompressions += compressions
r.stats.cachedPostingsCompressionErrors += compressionErrors
r.stats.cachedPostingsOriginalSizeSum += len(pBytes)
r.stats.cachedPostingsCompressedSizeSum += compressedSize
r.stats.cachedPostingsCompressionTimeSum += compressionTime
r.mtx.Unlock()
}
return nil
Expand Down Expand Up @@ -1961,6 +2054,15 @@ type queryStats struct {
postingsFetchCount int
postingsFetchDurationSum time.Duration

cachedPostingsCompressions int
cachedPostingsCompressionErrors int
cachedPostingsOriginalSizeSum int
cachedPostingsCompressedSizeSum int
cachedPostingsCompressionTimeSum time.Duration
cachedPostingsDecompressions int
cachedPostingsDecompressionErrors int
cachedPostingsDecompressionTimeSum time.Duration

seriesTouched int
seriesTouchedSizeSum int
seriesFetched int
Expand Down Expand Up @@ -1991,6 +2093,15 @@ func (s queryStats) merge(o *queryStats) *queryStats {
s.postingsFetchCount += o.postingsFetchCount
s.postingsFetchDurationSum += o.postingsFetchDurationSum

s.cachedPostingsCompressions += o.cachedPostingsCompressions
s.cachedPostingsCompressionErrors += o.cachedPostingsCompressionErrors
s.cachedPostingsOriginalSizeSum += o.cachedPostingsOriginalSizeSum
s.cachedPostingsCompressedSizeSum += o.cachedPostingsCompressedSizeSum
s.cachedPostingsCompressionTimeSum += o.cachedPostingsCompressionTimeSum
s.cachedPostingsDecompressions += o.cachedPostingsDecompressions
s.cachedPostingsDecompressionErrors += o.cachedPostingsDecompressionErrors
s.cachedPostingsDecompressionTimeSum += o.cachedPostingsDecompressionTimeSum

s.seriesTouched += o.seriesTouched
s.seriesTouchedSizeSum += o.seriesTouchedSizeSum
s.seriesFetched += o.seriesFetched
Expand Down
1 change: 1 addition & 0 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
filterConf,
true,
true,
true,
)
testutil.Ok(t, err)
s.store = store
Expand Down
33 changes: 19 additions & 14 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ func TestBucketStore_Info(t *testing.T) {
allowAllFilterConf,
true,
true,
true,
)
testutil.Ok(t, err)

Expand Down Expand Up @@ -731,6 +732,7 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul
allowAllFilterConf,
true,
true,
true,
)
testutil.Ok(t, err)

Expand Down Expand Up @@ -932,7 +934,23 @@ func uploadTestBlock(t testing.TB, tmpDir string, bkt objstore.Bucket, series in

logger := log.NewNopLogger()

app := h.Appender()
appendTestData(t, h.Appender(), series)

testutil.Ok(t, os.MkdirAll(filepath.Join(tmpDir, "tmp"), os.ModePerm))
id := createBlockFromHead(t, filepath.Join(tmpDir, "tmp"), h)

_, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(tmpDir, "tmp", id.String()), metadata.Thanos{
Labels: labels.Labels{{Name: "ext1", Value: "1"}}.Map(),
Downsample: metadata.ThanosDownsample{Resolution: 0},
Source: metadata.TestSource,
}, nil)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, "tmp", id.String())))

return id
}

func appendTestData(t testing.TB, app tsdb.Appender, series int) {
addSeries := func(l labels.Labels) {
_, err := app.Add(l, 0, 0)
testutil.Ok(t, err)
Expand All @@ -950,19 +968,6 @@ func uploadTestBlock(t testing.TB, tmpDir string, bkt objstore.Bucket, series in
}
}
testutil.Ok(t, app.Commit())

testutil.Ok(t, os.MkdirAll(filepath.Join(tmpDir, "tmp"), os.ModePerm))
id := createBlockFromHead(t, filepath.Join(tmpDir, "tmp"), h)

_, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(tmpDir, "tmp", id.String()), metadata.Thanos{
Labels: labels.Labels{{Name: "ext1", Value: "1"}}.Map(),
Downsample: metadata.ThanosDownsample{Resolution: 0},
Source: metadata.TestSource,
}, nil)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, "tmp", id.String())))

return id
}

func createBlockFromHead(t testing.TB, dir string, head *tsdb.Head) ulid.ULID {
Expand Down
Loading

0 comments on commit 214ff44

Please sign in to comment.