From 214ff4480e93f89a8e39b29fd4a43ccea1f61d3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Thu, 26 Mar 2020 11:59:47 +0100 Subject: [PATCH] store: added option to reencode and compress postings before storing them to the cache (#2297) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Added "diff+varint+snappy" codec for postings. Signed-off-by: Peter Štibraný * Added option to reencode and compress postings stored in cache Signed-off-by: Peter Štibraný * Expose enablePostingsCompression flag as CLI parameter. Signed-off-by: Peter Štibraný * Use "github.com/pkg/errors" instead of "errors" package. Signed-off-by: Peter Štibraný * remove break Signed-off-by: Peter Štibraný * Removed empty branch Signed-off-by: Peter Štibraný * Added copyright headers. Signed-off-by: Peter Štibraný * Added CHANGELOG.md entry Signed-off-by: Peter Štibraný * Added comments. Signed-off-by: Peter Štibraný * Use Encbuf and Decbuf. Signed-off-by: Peter Štibraný * Fix comments in test file. Signed-off-by: Peter Štibraný * Another comment... Signed-off-by: Peter Štibraný * Removed diffVarintSnappyEncode function. Signed-off-by: Peter Štibraný * Comment on usage with in-memory cache. Signed-off-by: Peter Štibraný * var block Signed-off-by: Peter Štibraný * Removed extra comment. Signed-off-by: Peter Štibraný * Move comment to error message. Signed-off-by: Peter Štibraný * Separated snappy compression and postings reencoding into two functions. There is now header only for snappy-compressed postings. Signed-off-by: Peter Štibraný * Added comment on using diff+varint+snappy. Signed-off-by: Peter Štibraný * Shorten header Signed-off-by: Peter Štibraný * Lint... Signed-off-by: Peter Štibraný * Changed experimental.enable-postings-compression to experimental.enable-index-cache-postings-compression Signed-off-by: Peter Štibraný * Added metrics for postings compression Signed-off-by: Peter Štibraný * Added metrics for postings decompression Signed-off-by: Peter Štibraný * Reorder metrics Signed-off-by: Peter Štibraný * Fixed comment. Signed-off-by: Peter Štibraný * Fixed comment. Signed-off-by: Peter Štibraný * Use encode/decode labels. Signed-off-by: Peter Štibraný --- CHANGELOG.md | 1 + cmd/thanos/store.go | 6 + pkg/store/bucket.go | 141 ++++++++++++++++++++--- pkg/store/bucket_e2e_test.go | 1 + pkg/store/bucket_test.go | 33 +++--- pkg/store/postings_codec.go | 135 +++++++++++++++++++++++ pkg/store/postings_codec_test.go | 184 +++++++++++++++++++++++++++++++ 7 files changed, 472 insertions(+), 29 deletions(-) create mode 100644 pkg/store/postings_codec.go create mode 100644 pkg/store/postings_codec_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f80a55e1d..ab743e683e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#2250](https://github.com/thanos-io/thanos/pull/2250) Compactor: Enable vertical compaction for offline deduplication (Experimental). Uses `--deduplication.replica-label` flag to specify the replica label to deduplicate on (Hidden). Please note that this uses a NAIVE algorithm for merging (no smart replica deduplication, just chaining samples together). This works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication. We plan to add a smarter algorithm in the following weeks. - [#1714](https://github.com/thanos-io/thanos/pull/1714) Run the bucket web UI in the compact component when it is run as a long-lived process. - [#2304](https://github.com/thanos-io/thanos/pull/2304) Store: Added `max_item_size` config option to memcached-based index cache. This should be set to the max item size configured in memcached (`-I` flag) in order to not waste network round-trips to cache items larger than the limit configured in memcached. +- [#2297](https://github.com/thanos-io/thanos/pull/2297) Store Gateway: Add `--experimental.enable-index-cache-postings-compression` flag to enable reencoding and compressing postings before storing them into cache. Compressed postings take about 10% of the original size. ### Changed diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index b3609440aa..18e95f81a1 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -81,6 +81,9 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { enableIndexHeader := cmd.Flag("experimental.enable-index-header", "If true, Store Gateway will recreate index-header instead of index-cache.json for each block. This will replace index-cache.json permanently once it will be out of experimental stage."). Hidden().Default("false").Bool() + enablePostingsCompression := cmd.Flag("experimental.enable-index-cache-postings-compression", "If true, Store Gateway will reencode and compress postings before storing them into cache. Compressed postings take about 10% of the original size."). + Hidden().Default("false").Bool() + consistencyDelay := modelDuration(cmd.Flag("consistency-delay", "Minimum age of all blocks before they are being read. Set it to safe value (e.g 30m) if your object storage is eventually consistent. GCS and S3 are (roughly) strongly consistent."). Default("0s")) @@ -126,6 +129,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { selectorRelabelConf, *advertiseCompatibilityLabel, *enableIndexHeader, + *enablePostingsCompression, time.Duration(*consistencyDelay), time.Duration(*ignoreDeletionMarksDelay), ) @@ -160,6 +164,7 @@ func runStore( selectorRelabelConf *extflag.PathOrContent, advertiseCompatibilityLabel bool, enableIndexHeader bool, + enablePostingsCompression bool, consistencyDelay time.Duration, ignoreDeletionMarksDelay time.Duration, ) error { @@ -265,6 +270,7 @@ func runStore( filterConf, advertiseCompatibilityLabel, enableIndexHeader, + enablePostingsCompression, ) if err != nil { return errors.Wrap(err, "create object storage store") diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index b9689f4f6b..81dcb4ff1e 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -97,6 +97,12 @@ type bucketStoreMetrics struct { queriesDropped prometheus.Counter queriesLimit prometheus.Gauge seriesRefetches prometheus.Counter + + cachedPostingsCompressions *prometheus.CounterVec + cachedPostingsCompressionErrors *prometheus.CounterVec + cachedPostingsCompressionTimeSeconds *prometheus.CounterVec + cachedPostingsOriginalSizeBytes prometheus.Counter + cachedPostingsCompressedSizeBytes prometheus.Counter } func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { @@ -180,6 +186,28 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { Name: "thanos_bucket_store_series_refetches_total", Help: fmt.Sprintf("Total number of cases where %v bytes was not enough was to fetch series from index, resulting in refetch.", maxSeriesSize), }) + + m.cachedPostingsCompressions = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_bucket_store_cached_postings_compressions_total", + Help: "Number of postings compressions before storing to index cache.", + }, []string{"op"}) + m.cachedPostingsCompressionErrors = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_bucket_store_cached_postings_compression_errors_total", + Help: "Number of postings compression errors.", + }, []string{"op"}) + m.cachedPostingsCompressionTimeSeconds = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_bucket_store_cached_postings_compression_time_seconds", + Help: "Time spent compressing postings before storing them into postings cache.", + }, []string{"op"}) + m.cachedPostingsOriginalSizeBytes = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_bucket_store_cached_postings_original_size_bytes_total", + Help: "Original size of postings stored into cache.", + }) + m.cachedPostingsCompressedSizeBytes = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_bucket_store_cached_postings_compressed_size_bytes_total", + Help: "Compressed size of postings stored into cache.", + }) + return &m } @@ -220,6 +248,11 @@ type BucketStore struct { advLabelSets []storepb.LabelSet enableCompatibilityLabel bool enableIndexHeader bool + + // Reencode postings using diff+varint+snappy when storing to cache. + // This makes them smaller, but takes extra CPU and memory. + // When used with in-memory cache, memory usage should decrease overall, thanks to postings being smaller. + enablePostingsCompression bool } // NewBucketStore creates a new bucket backed store that implements the store API against @@ -239,6 +272,7 @@ func NewBucketStore( filterConfig *FilterConfig, enableCompatibilityLabel bool, enableIndexHeader bool, + enablePostingsCompression bool, ) (*BucketStore, error) { if logger == nil { logger = log.NewNopLogger() @@ -270,10 +304,11 @@ func NewBucketStore( maxConcurrent, extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg), ), - samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped), - partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize}, - enableCompatibilityLabel: enableCompatibilityLabel, - enableIndexHeader: enableIndexHeader, + samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped), + partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize}, + enableCompatibilityLabel: enableCompatibilityLabel, + enableIndexHeader: enableIndexHeader, + enablePostingsCompression: enablePostingsCompression, } s.metrics = metrics @@ -455,6 +490,7 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er indexHeaderReader, s.partitioner, s.metrics.seriesRefetches, + s.enablePostingsCompression, ) if err != nil { return errors.Wrap(err, "new bucket block") @@ -896,6 +932,14 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie s.metrics.seriesDataSizeTouched.WithLabelValues("chunks").Observe(float64(stats.chunksTouchedSizeSum)) s.metrics.seriesDataSizeFetched.WithLabelValues("chunks").Observe(float64(stats.chunksFetchedSizeSum)) s.metrics.resultSeriesCount.Observe(float64(stats.mergedSeriesCount)) + s.metrics.cachedPostingsCompressions.WithLabelValues("encode").Add(float64(stats.cachedPostingsCompressions)) + s.metrics.cachedPostingsCompressions.WithLabelValues("decode").Add(float64(stats.cachedPostingsDecompressions)) + s.metrics.cachedPostingsCompressionErrors.WithLabelValues("encode").Add(float64(stats.cachedPostingsCompressionErrors)) + s.metrics.cachedPostingsCompressionErrors.WithLabelValues("decode").Add(float64(stats.cachedPostingsDecompressionErrors)) + s.metrics.cachedPostingsCompressionTimeSeconds.WithLabelValues("encode").Add(stats.cachedPostingsCompressionTimeSum.Seconds()) + s.metrics.cachedPostingsCompressionTimeSeconds.WithLabelValues("decode").Add(stats.cachedPostingsDecompressionTimeSum.Seconds()) + s.metrics.cachedPostingsOriginalSizeBytes.Add(float64(stats.cachedPostingsOriginalSizeSum)) + s.metrics.cachedPostingsCompressedSizeBytes.Add(float64(stats.cachedPostingsCompressedSizeSum)) level.Debug(s.logger).Log("msg", "stats query processed", "stats", fmt.Sprintf("%+v", stats), "err", err) @@ -1183,6 +1227,8 @@ type bucketBlock struct { partitioner partitioner seriesRefetches prometheus.Counter + + enablePostingsCompression bool } func newBucketBlock( @@ -1196,17 +1242,19 @@ func newBucketBlock( indexHeadReader indexheader.Reader, p partitioner, seriesRefetches prometheus.Counter, + enablePostingsCompression bool, ) (b *bucketBlock, err error) { b = &bucketBlock{ - logger: logger, - bkt: bkt, - indexCache: indexCache, - chunkPool: chunkPool, - dir: dir, - partitioner: p, - meta: meta, - indexHeaderReader: indexHeadReader, - seriesRefetches: seriesRefetches, + logger: logger, + bkt: bkt, + indexCache: indexCache, + chunkPool: chunkPool, + dir: dir, + partitioner: p, + meta: meta, + indexHeaderReader: indexHeadReader, + seriesRefetches: seriesRefetches, + enablePostingsCompression: enablePostingsCompression, } // Get object handles for all chunk files. @@ -1512,7 +1560,24 @@ func (r *bucketIndexReader) fetchPostings(keys []labels.Label) ([]index.Postings r.stats.postingsTouched++ r.stats.postingsTouchedSizeSum += len(b) - _, l, err := r.dec.Postings(b) + // Even if this instance is not using compression, there may be compressed + // entries in the cache written by other stores. + var ( + l index.Postings + err error + ) + if isDiffVarintSnappyEncodedPostings(b) { + s := time.Now() + l, err = diffVarintSnappyDecode(b) + r.stats.cachedPostingsDecompressions += 1 + r.stats.cachedPostingsDecompressionTimeSum += time.Since(s) + if err != nil { + r.stats.cachedPostingsDecompressionErrors += 1 + } + } else { + _, l, err = r.dec.Postings(b) + } + if err != nil { return nil, errors.Wrap(err, "decode postings") } @@ -1579,15 +1644,43 @@ func (r *bucketIndexReader) fetchPostings(keys []labels.Label) ([]index.Postings return err } + dataToCache := pBytes + + compressionTime := time.Duration(0) + compressions, compressionErrors, compressedSize := 0, 0, 0 + + if r.block.enablePostingsCompression { + // Reencode postings before storing to cache. If that fails, we store original bytes. + // This can only fail, if postings data was somehow corrupted, + // and there is nothing we can do about it. + // Errors from corrupted postings will be reported when postings are used. + compressions++ + s := time.Now() + data, err := diffVarintSnappyEncode(newBigEndianPostings(pBytes[4:])) + compressionTime = time.Since(s) + if err == nil { + dataToCache = data + compressedSize = len(data) + } else { + compressionErrors = 1 + } + } + r.mtx.Lock() // Return postings and fill LRU cache. // Truncate first 4 bytes which are length of posting. output[p.keyID] = newBigEndianPostings(pBytes[4:]) - r.block.indexCache.StorePostings(r.ctx, r.block.meta.ULID, keys[p.keyID], pBytes) + + r.block.indexCache.StorePostings(r.ctx, r.block.meta.ULID, keys[p.keyID], dataToCache) // If we just fetched it we still have to update the stats for touched postings. r.stats.postingsTouched++ r.stats.postingsTouchedSizeSum += len(pBytes) + r.stats.cachedPostingsCompressions += compressions + r.stats.cachedPostingsCompressionErrors += compressionErrors + r.stats.cachedPostingsOriginalSizeSum += len(pBytes) + r.stats.cachedPostingsCompressedSizeSum += compressedSize + r.stats.cachedPostingsCompressionTimeSum += compressionTime r.mtx.Unlock() } return nil @@ -1961,6 +2054,15 @@ type queryStats struct { postingsFetchCount int postingsFetchDurationSum time.Duration + cachedPostingsCompressions int + cachedPostingsCompressionErrors int + cachedPostingsOriginalSizeSum int + cachedPostingsCompressedSizeSum int + cachedPostingsCompressionTimeSum time.Duration + cachedPostingsDecompressions int + cachedPostingsDecompressionErrors int + cachedPostingsDecompressionTimeSum time.Duration + seriesTouched int seriesTouchedSizeSum int seriesFetched int @@ -1991,6 +2093,15 @@ func (s queryStats) merge(o *queryStats) *queryStats { s.postingsFetchCount += o.postingsFetchCount s.postingsFetchDurationSum += o.postingsFetchDurationSum + s.cachedPostingsCompressions += o.cachedPostingsCompressions + s.cachedPostingsCompressionErrors += o.cachedPostingsCompressionErrors + s.cachedPostingsOriginalSizeSum += o.cachedPostingsOriginalSizeSum + s.cachedPostingsCompressedSizeSum += o.cachedPostingsCompressedSizeSum + s.cachedPostingsCompressionTimeSum += o.cachedPostingsCompressionTimeSum + s.cachedPostingsDecompressions += o.cachedPostingsDecompressions + s.cachedPostingsDecompressionErrors += o.cachedPostingsDecompressionErrors + s.cachedPostingsDecompressionTimeSum += o.cachedPostingsDecompressionTimeSum + s.seriesTouched += o.seriesTouched s.seriesTouchedSizeSum += o.seriesTouchedSizeSum s.seriesFetched += o.seriesFetched diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 821cd70d27..2721c7544e 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -168,6 +168,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m filterConf, true, true, + true, ) testutil.Ok(t, err) s.store = store diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 75cf446643..a7829bf052 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -480,6 +480,7 @@ func TestBucketStore_Info(t *testing.T) { allowAllFilterConf, true, true, + true, ) testutil.Ok(t, err) @@ -731,6 +732,7 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul allowAllFilterConf, true, true, + true, ) testutil.Ok(t, err) @@ -932,7 +934,23 @@ func uploadTestBlock(t testing.TB, tmpDir string, bkt objstore.Bucket, series in logger := log.NewNopLogger() - app := h.Appender() + appendTestData(t, h.Appender(), series) + + testutil.Ok(t, os.MkdirAll(filepath.Join(tmpDir, "tmp"), os.ModePerm)) + id := createBlockFromHead(t, filepath.Join(tmpDir, "tmp"), h) + + _, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(tmpDir, "tmp", id.String()), metadata.Thanos{ + Labels: labels.Labels{{Name: "ext1", Value: "1"}}.Map(), + Downsample: metadata.ThanosDownsample{Resolution: 0}, + Source: metadata.TestSource, + }, nil) + testutil.Ok(t, err) + testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, "tmp", id.String()))) + + return id +} + +func appendTestData(t testing.TB, app tsdb.Appender, series int) { addSeries := func(l labels.Labels) { _, err := app.Add(l, 0, 0) testutil.Ok(t, err) @@ -950,19 +968,6 @@ func uploadTestBlock(t testing.TB, tmpDir string, bkt objstore.Bucket, series in } } testutil.Ok(t, app.Commit()) - - testutil.Ok(t, os.MkdirAll(filepath.Join(tmpDir, "tmp"), os.ModePerm)) - id := createBlockFromHead(t, filepath.Join(tmpDir, "tmp"), h) - - _, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(tmpDir, "tmp", id.String()), metadata.Thanos{ - Labels: labels.Labels{{Name: "ext1", Value: "1"}}.Map(), - Downsample: metadata.ThanosDownsample{Resolution: 0}, - Source: metadata.TestSource, - }, nil) - testutil.Ok(t, err) - testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, "tmp", id.String()))) - - return id } func createBlockFromHead(t testing.TB, dir string, head *tsdb.Head) ulid.ULID { diff --git a/pkg/store/postings_codec.go b/pkg/store/postings_codec.go new file mode 100644 index 0000000000..246d8bab81 --- /dev/null +++ b/pkg/store/postings_codec.go @@ -0,0 +1,135 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package store + +import ( + "bytes" + + "github.com/golang/snappy" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/tsdb/encoding" + "github.com/prometheus/prometheus/tsdb/index" +) + +// This file implements encoding and decoding of postings using diff (or delta) + varint +// number encoding. On top of that, we apply Snappy compression. +// +// On its own, Snappy compressing raw postings doesn't really help, because there is no +// repetition in raw data. Using diff (delta) between postings entries makes values small, +// and Varint is very efficient at encoding small values (values < 128 are encoded as +// single byte, values < 16384 are encoded as two bytes). Diff + varint reduces postings size +// significantly (to about 20% of original), snappy then halves it to ~10% of the original. + +const ( + codecHeaderSnappy = "dvs" // As in "diff+varint+snappy". +) + +// isDiffVarintSnappyEncodedPostings returns true, if input looks like it has been encoded by diff+varint+snappy codec. +func isDiffVarintSnappyEncodedPostings(input []byte) bool { + return bytes.HasPrefix(input, []byte(codecHeaderSnappy)) +} + +// diffVarintSnappyEncode encodes postings into diff+varint representation, +// and applies snappy compression on the result. +// Returned byte slice starts with codecHeaderSnappy header. +func diffVarintSnappyEncode(p index.Postings) ([]byte, error) { + buf, err := diffVarintEncodeNoHeader(p) + if err != nil { + return nil, err + } + + // Make result buffer large enough to hold our header and compressed block. + result := make([]byte, len(codecHeaderSnappy)+snappy.MaxEncodedLen(len(buf))) + copy(result, codecHeaderSnappy) + + compressed := snappy.Encode(result[len(codecHeaderSnappy):], buf) + + // Slice result buffer based on compressed size. + result = result[:len(codecHeaderSnappy)+len(compressed)] + return result, nil +} + +// diffVarintEncodeNoHeader encodes postings into diff+varint representation. +// It doesn't add any header to the output bytes. +func diffVarintEncodeNoHeader(p index.Postings) ([]byte, error) { + buf := encoding.Encbuf{} + + prev := uint64(0) + for p.Next() { + v := p.At() + if v < prev { + return nil, errors.Errorf("postings entries must be in increasing order, current: %d, previous: %d", v, prev) + } + + // This is the 'diff' part -- compute difference from previous value. + buf.PutUvarint64(v - prev) + prev = v + } + if p.Err() != nil { + return nil, p.Err() + } + + return buf.B, nil +} + +func diffVarintSnappyDecode(input []byte) (index.Postings, error) { + if !isDiffVarintSnappyEncodedPostings(input) { + return nil, errors.New("header not found") + } + + raw, err := snappy.Decode(nil, input[len(codecHeaderSnappy):]) + if err != nil { + return nil, errors.Errorf("snappy decode: %w", err) + } + + return newDiffVarintPostings(raw), nil +} + +func newDiffVarintPostings(input []byte) *diffVarintPostings { + return &diffVarintPostings{buf: &encoding.Decbuf{B: input}} +} + +// diffVarintPostings is an implementation of index.Postings based on diff+varint encoded data. +type diffVarintPostings struct { + buf *encoding.Decbuf + cur uint64 +} + +func (it *diffVarintPostings) At() uint64 { + return it.cur +} + +func (it *diffVarintPostings) Next() bool { + if it.buf.Err() != nil || it.buf.Len() == 0 { + return false + } + + val := it.buf.Uvarint64() + if it.buf.Err() != nil { + return false + } + + it.cur = it.cur + val + return true +} + +func (it *diffVarintPostings) Seek(x uint64) bool { + if it.cur >= x { + return true + } + + // We cannot do any search due to how values are stored, + // so we simply advance until we find the right value. + for it.Next() { + if it.At() >= x { + return true + } + } + + return false +} + +func (it *diffVarintPostings) Err() error { + return it.buf.Err() +} diff --git a/pkg/store/postings_codec_test.go b/pkg/store/postings_codec_test.go new file mode 100644 index 0000000000..adb9f57d4b --- /dev/null +++ b/pkg/store/postings_codec_test.go @@ -0,0 +1,184 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package store + +import ( + "testing" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/index" + + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestDiffVarintCodec(t *testing.T) { + h, err := tsdb.NewHead(nil, nil, nil, 1000) + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, h.Close()) + }() + + appendTestData(t, h.Appender(), 1e6) + + idx, err := h.Index() + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, idx.Close()) + }() + + postingsMap := map[string]index.Postings{ + "all": allPostings(t, idx), + `n="1"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchEqual, "n", "1"+postingsBenchSuffix)), + `j="foo"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchEqual, "j", "foo")), + `j!="foo"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotEqual, "j", "foo")), + `i=~".*"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", ".*")), + `i=~".+"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", ".+")), + `i=~"1.+"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", "1.+")), + `i=~"^$"'`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", "^$")), + `i!~""`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotEqual, "i", "")), + `n!="2"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotEqual, "n", "2"+postingsBenchSuffix)), + `i!~"2.*"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotRegexp, "i", "^2.*$")), + } + + codecs := map[string]struct { + codingFunction func(index.Postings) ([]byte, error) + decodingFunction func([]byte) (index.Postings, error) + }{ + "raw": {codingFunction: diffVarintEncodeNoHeader, decodingFunction: func(bytes []byte) (index.Postings, error) { return newDiffVarintPostings(bytes), nil }}, + "snappy": {codingFunction: diffVarintSnappyEncode, decodingFunction: diffVarintSnappyDecode}, + } + + for postingName, postings := range postingsMap { + p, err := toUint64Postings(postings) + testutil.Ok(t, err) + + for cname, codec := range codecs { + name := cname + "/" + postingName + + t.Run(name, func(t *testing.T) { + t.Log("postings entries:", p.len()) + t.Log("original size (4*entries):", 4*p.len(), "bytes") + p.reset() // We reuse postings between runs, so we need to reset iterator. + + data, err := codec.codingFunction(p) + testutil.Ok(t, err) + + t.Log("encoded size", len(data), "bytes") + t.Logf("ratio: %0.3f", (float64(len(data)) / float64(4*p.len()))) + + decodedPostings, err := codec.decodingFunction(data) + testutil.Ok(t, err) + + p.reset() + comparePostings(t, p, decodedPostings) + }) + } + } +} + +func comparePostings(t *testing.T, p1, p2 index.Postings) { + for p1.Next() { + if !p2.Next() { + t.Log("p1 has more values") + t.Fail() + return + } + + if p1.At() != p2.At() { + t.Logf("values differ: %d, %d", p1.At(), p2.At()) + t.Fail() + return + } + } + + if p2.Next() { + t.Log("p2 has more values") + t.Fail() + return + } + + testutil.Ok(t, p1.Err()) + testutil.Ok(t, p2.Err()) +} + +func allPostings(t testing.TB, ix tsdb.IndexReader) index.Postings { + k, v := index.AllPostingsKey() + p, err := ix.Postings(k, v) + testutil.Ok(t, err) + return p +} + +func matchPostings(t testing.TB, ix tsdb.IndexReader, m *labels.Matcher) index.Postings { + vals, err := ix.LabelValues(m.Name) + testutil.Ok(t, err) + + matching := []string(nil) + for _, v := range vals { + if m.Matches(v) { + matching = append(matching, v) + } + } + + p, err := ix.Postings(m.Name, matching...) + testutil.Ok(t, err) + return p +} + +func toUint64Postings(p index.Postings) (*uint64Postings, error) { + var vals []uint64 + for p.Next() { + vals = append(vals, p.At()) + } + return &uint64Postings{vals: vals, ix: -1}, p.Err() +} + +// Postings with no decoding step. +type uint64Postings struct { + vals []uint64 + ix int +} + +func (p *uint64Postings) At() uint64 { + if p.ix < 0 || p.ix >= len(p.vals) { + return 0 + } + return p.vals[p.ix] +} + +func (p *uint64Postings) Next() bool { + if p.ix < len(p.vals)-1 { + p.ix++ + return true + } + return false +} + +func (p *uint64Postings) Seek(x uint64) bool { + if p.At() >= x { + return true + } + + // We cannot do any search due to how values are stored, + // so we simply advance until we find the right value. + for p.Next() { + if p.At() >= x { + return true + } + } + + return false +} + +func (p *uint64Postings) Err() error { + return nil +} + +func (p *uint64Postings) reset() { + p.ix = -1 +} + +func (p *uint64Postings) len() int { + return len(p.vals) +}