From c6e8d7ade67e54b15130c53a1013f3878311bd5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Fri, 20 Mar 2020 11:00:38 +0100 Subject: [PATCH 01/28] Added "diff+varint+snappy" codec for postings. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/bucket_test.go | 31 +++--- pkg/store/postings_codec.go | 133 +++++++++++++++++++++++ pkg/store/postings_codec_test.go | 178 +++++++++++++++++++++++++++++++ 3 files changed, 328 insertions(+), 14 deletions(-) create mode 100644 pkg/store/postings_codec.go create mode 100644 pkg/store/postings_codec_test.go diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 75cf446643..449da5e129 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -932,7 +932,23 @@ func uploadTestBlock(t testing.TB, tmpDir string, bkt objstore.Bucket, series in logger := log.NewNopLogger() - app := h.Appender() + appendTestData(t, h.Appender(), series) + + testutil.Ok(t, os.MkdirAll(filepath.Join(tmpDir, "tmp"), os.ModePerm)) + id := createBlockFromHead(t, filepath.Join(tmpDir, "tmp"), h) + + _, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(tmpDir, "tmp", id.String()), metadata.Thanos{ + Labels: labels.Labels{{Name: "ext1", Value: "1"}}.Map(), + Downsample: metadata.ThanosDownsample{Resolution: 0}, + Source: metadata.TestSource, + }, nil) + testutil.Ok(t, err) + testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, "tmp", id.String()))) + + return id +} + +func appendTestData(t testing.TB, app tsdb.Appender, series int) { addSeries := func(l labels.Labels) { _, err := app.Add(l, 0, 0) testutil.Ok(t, err) @@ -950,19 +966,6 @@ func uploadTestBlock(t testing.TB, tmpDir string, bkt objstore.Bucket, series in } } testutil.Ok(t, app.Commit()) - - testutil.Ok(t, os.MkdirAll(filepath.Join(tmpDir, "tmp"), os.ModePerm)) - id := createBlockFromHead(t, filepath.Join(tmpDir, "tmp"), h) - - _, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(tmpDir, "tmp", id.String()), metadata.Thanos{ - Labels: labels.Labels{{Name: "ext1", Value: "1"}}.Map(), - Downsample: metadata.ThanosDownsample{Resolution: 0}, - Source: metadata.TestSource, - }, nil) - testutil.Ok(t, err) - testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, "tmp", id.String()))) - - return id } func createBlockFromHead(t testing.TB, dir string, head *tsdb.Head) ulid.ULID { diff --git a/pkg/store/postings_codec.go b/pkg/store/postings_codec.go new file mode 100644 index 0000000000..a81c1099a4 --- /dev/null +++ b/pkg/store/postings_codec.go @@ -0,0 +1,133 @@ +package store + +import ( + "bytes" + encoding_binary "encoding/binary" + "errors" + "fmt" + + "github.com/golang/snappy" + "github.com/prometheus/prometheus/tsdb/index" +) + +const ( + // these headers should not be prefix of each other + codecHeaderRaw = "diff+varint+raw" + codecHeaderSnappy = "diff+varint+snappy" +) + +func diffVarintSnappyEncode(p index.Postings) ([]byte, error) { + return diffVarintEncode(p, true) +} + +func diffVarintEncode(p index.Postings, useSnappy bool) ([]byte, error) { + varintBuf := make([]byte, encoding_binary.MaxVarintLen64) + + buf := bytes.Buffer{} + + // if we're returning raw data, write the header to the buffer, and then return buffer directly + if !useSnappy { + buf.WriteString(codecHeaderRaw) + } + + prev := uint64(0) + for p.Next() { + v := p.At() + n := encoding_binary.PutUvarint(varintBuf, v-prev) + buf.Write(varintBuf[:n]) + + prev = v + } + + if p.Err() != nil { + return nil, p.Err() + } + + if !useSnappy { + // this already has the correct header + return buf.Bytes(), nil + } + + // make result buffer large enough to hold our header and compressed block + resultBuf := make([]byte, len(codecHeaderSnappy)+snappy.MaxEncodedLen(buf.Len())) + copy(resultBuf, codecHeaderSnappy) + + compressed := snappy.Encode(resultBuf[len(codecHeaderSnappy):], buf.Bytes()) + + // slice result buffer based on compressed size + resultBuf = resultBuf[:len(codecHeaderSnappy)+len(compressed)] + return resultBuf, nil +} + +func diffVarintDecode(input []byte) (index.Postings, error) { + compressed := false + headerLen := 0 + switch { + case bytes.HasPrefix(input, []byte(codecHeaderRaw)): + headerLen = len(codecHeaderRaw) + break + case bytes.HasPrefix(input, []byte(codecHeaderSnappy)): + headerLen = len(codecHeaderSnappy) + compressed = true + default: + return nil, errors.New("header not found") + } + + raw := input[headerLen:] + if compressed { + var err error + raw, err = snappy.Decode(nil, raw) + if err != nil { + return nil, fmt.Errorf("snappy decode: %w", err) + } + } + + return &diffVarintPostings{data: raw}, nil +} + +type diffVarintPostings struct { + data []byte + cur uint64 + err error +} + +func (it *diffVarintPostings) At() uint64 { + return it.cur +} + +func (it *diffVarintPostings) Next() bool { + if len(it.data) == 0 { + return false + } + + val, n := encoding_binary.Uvarint(it.data) + if n == 0 { + it.err = errors.New("not enough data") + return false + } + + it.data = it.data[n:] + it.cur = it.cur + val + it.err = nil + return true +} + +func (it *diffVarintPostings) Seek(x uint64) bool { + if it.cur >= x { + return true + } + + // we cannot do any search due to how values are stored, + // so we simply advance until we find the right value + for it.Next() { + if it.At() >= x { + return true + } + } + + return false +} + +func (it *diffVarintPostings) Err() error { + return it.err +} diff --git a/pkg/store/postings_codec_test.go b/pkg/store/postings_codec_test.go new file mode 100644 index 0000000000..0497553952 --- /dev/null +++ b/pkg/store/postings_codec_test.go @@ -0,0 +1,178 @@ +package store + +import ( + "testing" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/index" + + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestDiffVarintCodec(t *testing.T) { + h, err := tsdb.NewHead(nil, nil, nil, 1000) + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, h.Close()) + }() + + appendTestData(t, h.Appender(), 1e6) + + idx, err := h.Index() + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, idx.Close()) + }() + + postingsMap := map[string]index.Postings{ + "all": allPostings(t, idx), + `n="1"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchEqual, "n", "1"+postingsBenchSuffix)), + `j="foo"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchEqual, "j", "foo")), + `j!="foo"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotEqual, "j", "foo")), + `i=~".*"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", ".*")), + `i=~".+"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", ".+")), + `i=~"1.+"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", "1.+")), + `i=~"^$"'`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", "^$")), + `i!~""`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotEqual, "i", "")), + `n!="2"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotEqual, "n", "2"+postingsBenchSuffix)), + `i!~"2.*"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotRegexp, "i", "^2.*$")), + } + + for postingName, postings := range postingsMap { + p, err := toUint64Postings(postings) + testutil.Ok(t, err) + + for _, snappy := range []bool{false, true} { + name := postingName + "/" + if snappy { + name = name + "snappy" + } else { + name = name + "raw" + } + + t.Run(postingName+"/"+name, func(t *testing.T) { + t.Log("postings entries:", p.len()) + t.Log("raw size (4*entries):", 4*p.len(), "bytes") + p.reset() // we reuse postings between runs, so we need to reset iterator + + data, err := diffVarintEncode(p, snappy) + testutil.Ok(t, err) + + t.Log("encoded size", len(data), "bytes") + t.Logf("ratio: %0.3f", (float64(len(data)) / float64(4*p.len()))) + + decodedPostings, err := diffVarintDecode(data) + testutil.Ok(t, err) + + p.reset() + comparePostings(t, p, decodedPostings) + }) + } + } +} + +func comparePostings(t *testing.T, p1, p2 index.Postings) { + for p1.Next() { + if !p2.Next() { + t.Log("p1 has more values") + t.Fail() + return + } + + if p1.At() != p2.At() { + t.Logf("values differ: %d, %d", p1.At(), p2.At()) + t.Fail() + return + } + } + + if p2.Next() { + t.Log("p2 has more values") + t.Fail() + return + } + + testutil.Ok(t, p1.Err()) + testutil.Ok(t, p2.Err()) +} + +func allPostings(t testing.TB, ix tsdb.IndexReader) index.Postings { + k, v := index.AllPostingsKey() + p, err := ix.Postings(k, v) + testutil.Ok(t, err) + return p +} + +func matchPostings(t testing.TB, ix tsdb.IndexReader, m *labels.Matcher) index.Postings { + vals, err := ix.LabelValues(m.Name) + testutil.Ok(t, err) + + matching := []string(nil) + for _, v := range vals { + if m.Matches(v) { + matching = append(matching, v) + } + } + + p, err := ix.Postings(m.Name, matching...) + testutil.Ok(t, err) + return p +} + +func toUint64Postings(p index.Postings) (*uint64Postings, error) { + var vals []uint64 + for p.Next() { + vals = append(vals, p.At()) + } + return &uint64Postings{vals: vals, ix: -1}, p.Err() +} + +// postings with no decoding step +type uint64Postings struct { + vals []uint64 + ix int +} + +func (p *uint64Postings) At() uint64 { + if p.ix < 0 || p.ix >= len(p.vals) { + return 0 + } + return p.vals[p.ix] +} + +func (p *uint64Postings) Next() bool { + if p.ix < len(p.vals)-1 { + p.ix++ + return true + } + return false +} + +func (p *uint64Postings) Seek(x uint64) bool { + if p.At() >= x { + return true + } + + // we cannot do any search due to how values are stored, + // so we simply advance until we find the right value + for p.Next() { + if p.At() >= x { + return true + } + } + + return false +} + +func (p *uint64Postings) Err() error { + return nil +} + +func (p *uint64Postings) reset() { + p.ix = -1 +} + +func (p *uint64Postings) len() int { + return len(p.vals) +} From 35a31f9aa548896902700421717ac162cba1dbd4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Fri, 20 Mar 2020 11:46:25 +0100 Subject: [PATCH 02/28] Added option to reencode and compress postings stored in cache MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/bucket.go | 64 +++++++++++++++++++++++++++--------- pkg/store/bucket_e2e_test.go | 1 + pkg/store/bucket_test.go | 2 ++ pkg/store/postings_codec.go | 4 +++ 4 files changed, 56 insertions(+), 15 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index b9689f4f6b..a5f3ea89d0 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -220,6 +220,10 @@ type BucketStore struct { advLabelSets []storepb.LabelSet enableCompatibilityLabel bool enableIndexHeader bool + + // Reencode postings using diff+varint+snappy when storing to cache. + // This makes them smaller, but takes extra CPU and memory. + enablePostingsCompression bool } // NewBucketStore creates a new bucket backed store that implements the store API against @@ -239,6 +243,7 @@ func NewBucketStore( filterConfig *FilterConfig, enableCompatibilityLabel bool, enableIndexHeader bool, + enablePostingsCompression bool, ) (*BucketStore, error) { if logger == nil { logger = log.NewNopLogger() @@ -270,10 +275,11 @@ func NewBucketStore( maxConcurrent, extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg), ), - samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped), - partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize}, - enableCompatibilityLabel: enableCompatibilityLabel, - enableIndexHeader: enableIndexHeader, + samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped), + partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize}, + enableCompatibilityLabel: enableCompatibilityLabel, + enableIndexHeader: enableIndexHeader, + enablePostingsCompression: enablePostingsCompression, } s.metrics = metrics @@ -455,6 +461,7 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er indexHeaderReader, s.partitioner, s.metrics.seriesRefetches, + s.enablePostingsCompression, ) if err != nil { return errors.Wrap(err, "new bucket block") @@ -1183,6 +1190,8 @@ type bucketBlock struct { partitioner partitioner seriesRefetches prometheus.Counter + + enablePostingsCompression bool } func newBucketBlock( @@ -1196,17 +1205,19 @@ func newBucketBlock( indexHeadReader indexheader.Reader, p partitioner, seriesRefetches prometheus.Counter, + enablePostingsCompression bool, ) (b *bucketBlock, err error) { b = &bucketBlock{ - logger: logger, - bkt: bkt, - indexCache: indexCache, - chunkPool: chunkPool, - dir: dir, - partitioner: p, - meta: meta, - indexHeaderReader: indexHeadReader, - seriesRefetches: seriesRefetches, + logger: logger, + bkt: bkt, + indexCache: indexCache, + chunkPool: chunkPool, + dir: dir, + partitioner: p, + meta: meta, + indexHeaderReader: indexHeadReader, + seriesRefetches: seriesRefetches, + enablePostingsCompression: enablePostingsCompression, } // Get object handles for all chunk files. @@ -1512,7 +1523,16 @@ func (r *bucketIndexReader) fetchPostings(keys []labels.Label) ([]index.Postings r.stats.postingsTouched++ r.stats.postingsTouchedSizeSum += len(b) - _, l, err := r.dec.Postings(b) + // Even if this instance is not using compression, there may be compressed + // entries in the cache written by other stores. + var l index.Postings + var err error + if isDiffVarintEncodedPostings(b) { + l, err = diffVarintDecode(b) + } else { + _, l, err = r.dec.Postings(b) + } + if err != nil { return nil, errors.Wrap(err, "decode postings") } @@ -1583,7 +1603,21 @@ func (r *bucketIndexReader) fetchPostings(keys []labels.Label) ([]index.Postings // Return postings and fill LRU cache. // Truncate first 4 bytes which are length of posting. output[p.keyID] = newBigEndianPostings(pBytes[4:]) - r.block.indexCache.StorePostings(r.ctx, r.block.meta.ULID, keys[p.keyID], pBytes) + + storeData := pBytes + + if r.block.enablePostingsCompression { + // Reencode postings before storing to cache. If that fails, we store original bytes. + data, err := diffVarintSnappyEncode(newBigEndianPostings(pBytes[4:])) + if err == nil { + storeData = data + } else { + // This can only fail, if postings data was somehow corrupted, + // and there is nothing we can do about it. It's not worth reporting here. + } + } + + r.block.indexCache.StorePostings(r.ctx, r.block.meta.ULID, keys[p.keyID], storeData) // If we just fetched it we still have to update the stats for touched postings. r.stats.postingsTouched++ diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 821cd70d27..2721c7544e 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -168,6 +168,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m filterConf, true, true, + true, ) testutil.Ok(t, err) s.store = store diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 449da5e129..a7829bf052 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -480,6 +480,7 @@ func TestBucketStore_Info(t *testing.T) { allowAllFilterConf, true, true, + true, ) testutil.Ok(t, err) @@ -731,6 +732,7 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul allowAllFilterConf, true, true, + true, ) testutil.Ok(t, err) diff --git a/pkg/store/postings_codec.go b/pkg/store/postings_codec.go index a81c1099a4..e3c26987f5 100644 --- a/pkg/store/postings_codec.go +++ b/pkg/store/postings_codec.go @@ -16,6 +16,10 @@ const ( codecHeaderSnappy = "diff+varint+snappy" ) +func isDiffVarintEncodedPostings(input []byte) bool { + return bytes.HasPrefix(input, []byte(codecHeaderRaw)) || bytes.HasPrefix(input, []byte(codecHeaderSnappy)) +} + func diffVarintSnappyEncode(p index.Postings) ([]byte, error) { return diffVarintEncode(p, true) } From f480494aebda1d0b71d5f1a01b45cbe93adade8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Fri, 20 Mar 2020 13:32:53 +0100 Subject: [PATCH 03/28] Expose enablePostingsCompression flag as CLI parameter. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- cmd/thanos/store.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index b3609440aa..4ed15c4248 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -81,6 +81,9 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { enableIndexHeader := cmd.Flag("experimental.enable-index-header", "If true, Store Gateway will recreate index-header instead of index-cache.json for each block. This will replace index-cache.json permanently once it will be out of experimental stage."). Hidden().Default("false").Bool() + enablePostingsCompression := cmd.Flag("experimental.enable-postings-compression", "If true, Store Gateway will reencode and compress postings before storing them into cache. Compressed postings take about 10% of the original size."). + Hidden().Default("false").Bool() + consistencyDelay := modelDuration(cmd.Flag("consistency-delay", "Minimum age of all blocks before they are being read. Set it to safe value (e.g 30m) if your object storage is eventually consistent. GCS and S3 are (roughly) strongly consistent."). Default("0s")) @@ -126,6 +129,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { selectorRelabelConf, *advertiseCompatibilityLabel, *enableIndexHeader, + *enablePostingsCompression, time.Duration(*consistencyDelay), time.Duration(*ignoreDeletionMarksDelay), ) @@ -160,6 +164,7 @@ func runStore( selectorRelabelConf *extflag.PathOrContent, advertiseCompatibilityLabel bool, enableIndexHeader bool, + enablePostingsCompression bool, consistencyDelay time.Duration, ignoreDeletionMarksDelay time.Duration, ) error { @@ -265,6 +270,7 @@ func runStore( filterConf, advertiseCompatibilityLabel, enableIndexHeader, + enablePostingsCompression, ) if err != nil { return errors.Wrap(err, "create object storage store") From fb2f0df2a0ce7fcd1b3b58702b80542e7ee564b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Fri, 20 Mar 2020 13:51:18 +0100 Subject: [PATCH 04/28] Use "github.com/pkg/errors" instead of "errors" package. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/postings_codec.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/store/postings_codec.go b/pkg/store/postings_codec.go index e3c26987f5..114e4e256b 100644 --- a/pkg/store/postings_codec.go +++ b/pkg/store/postings_codec.go @@ -3,10 +3,10 @@ package store import ( "bytes" encoding_binary "encoding/binary" - "errors" "fmt" "github.com/golang/snappy" + "github.com/pkg/errors" "github.com/prometheus/prometheus/tsdb/index" ) From e672219a4eb906d403a80f3ec2ccb6afba578835 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Fri, 20 Mar 2020 14:05:39 +0100 Subject: [PATCH 05/28] remove break MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/postings_codec.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/store/postings_codec.go b/pkg/store/postings_codec.go index 114e4e256b..083a9f2fc7 100644 --- a/pkg/store/postings_codec.go +++ b/pkg/store/postings_codec.go @@ -69,7 +69,6 @@ func diffVarintDecode(input []byte) (index.Postings, error) { switch { case bytes.HasPrefix(input, []byte(codecHeaderRaw)): headerLen = len(codecHeaderRaw) - break case bytes.HasPrefix(input, []byte(codecHeaderSnappy)): headerLen = len(codecHeaderSnappy) compressed = true From 51e2feb103f7b439bc4c6f6aa675e53b22f1e6ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Fri, 20 Mar 2020 14:06:10 +0100 Subject: [PATCH 06/28] Removed empty branch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/bucket.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index a5f3ea89d0..f743e37ace 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1608,12 +1608,11 @@ func (r *bucketIndexReader) fetchPostings(keys []labels.Label) ([]index.Postings if r.block.enablePostingsCompression { // Reencode postings before storing to cache. If that fails, we store original bytes. + // This can only fail, if postings data was somehow corrupted, + // and there is nothing we can do about it. It's not worth reporting here. data, err := diffVarintSnappyEncode(newBigEndianPostings(pBytes[4:])) if err == nil { storeData = data - } else { - // This can only fail, if postings data was somehow corrupted, - // and there is nothing we can do about it. It's not worth reporting here. } } From 391da9878b7a082599edefe6f21182dbed226b56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Fri, 20 Mar 2020 14:25:35 +0100 Subject: [PATCH 07/28] Added copyright headers. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/postings_codec.go | 3 +++ pkg/store/postings_codec_test.go | 3 +++ 2 files changed, 6 insertions(+) diff --git a/pkg/store/postings_codec.go b/pkg/store/postings_codec.go index 083a9f2fc7..0a2733d116 100644 --- a/pkg/store/postings_codec.go +++ b/pkg/store/postings_codec.go @@ -1,3 +1,6 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + package store import ( diff --git a/pkg/store/postings_codec_test.go b/pkg/store/postings_codec_test.go index 0497553952..3d834a55c3 100644 --- a/pkg/store/postings_codec_test.go +++ b/pkg/store/postings_codec_test.go @@ -1,3 +1,6 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + package store import ( From a57624bb36f128b2a47f9048a3c66f93aaab6c18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Fri, 20 Mar 2020 15:25:24 +0100 Subject: [PATCH 08/28] Added CHANGELOG.md entry MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f80a55e1d..6b57a94525 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#2250](https://github.com/thanos-io/thanos/pull/2250) Compactor: Enable vertical compaction for offline deduplication (Experimental). Uses `--deduplication.replica-label` flag to specify the replica label to deduplicate on (Hidden). Please note that this uses a NAIVE algorithm for merging (no smart replica deduplication, just chaining samples together). This works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication. We plan to add a smarter algorithm in the following weeks. - [#1714](https://github.com/thanos-io/thanos/pull/1714) Run the bucket web UI in the compact component when it is run as a long-lived process. - [#2304](https://github.com/thanos-io/thanos/pull/2304) Store: Added `max_item_size` config option to memcached-based index cache. This should be set to the max item size configured in memcached (`-I` flag) in order to not waste network round-trips to cache items larger than the limit configured in memcached. +- [#2297](https://github.com/thanos-io/thanos/pull/2297) Store Gateway: Add `--experimental.enable-postings-compression` flag to enable reencoding and compressing postings before storing them into cache. Compressed postings take about 10% of the original size. ### Changed From 1ab0142b35b172bba4f84033401799f097ef8a16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Sat, 21 Mar 2020 11:32:11 +0100 Subject: [PATCH 09/28] Added comments. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/postings_codec.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/pkg/store/postings_codec.go b/pkg/store/postings_codec.go index 0a2733d116..17d2a197aa 100644 --- a/pkg/store/postings_codec.go +++ b/pkg/store/postings_codec.go @@ -14,25 +14,28 @@ import ( ) const ( - // these headers should not be prefix of each other + // These headers should not be a prefix of each other. codecHeaderRaw = "diff+varint+raw" codecHeaderSnappy = "diff+varint+snappy" ) +// isDiffVarintEncodedPostings returns true, if input looks like it has been encoded by diff+varint(+snappy) codec. func isDiffVarintEncodedPostings(input []byte) bool { return bytes.HasPrefix(input, []byte(codecHeaderRaw)) || bytes.HasPrefix(input, []byte(codecHeaderSnappy)) } +// diffVarintSnappyEncode encodes postings using diff+varint+snappy codec. func diffVarintSnappyEncode(p index.Postings) ([]byte, error) { return diffVarintEncode(p, true) } +// diffVarintEncode encodes postings into diff+varint representation and optional snappy compression. func diffVarintEncode(p index.Postings, useSnappy bool) ([]byte, error) { varintBuf := make([]byte, encoding_binary.MaxVarintLen64) buf := bytes.Buffer{} - // if we're returning raw data, write the header to the buffer, and then return buffer directly + // If we're returning raw data, write the header to the buffer, and then return buffer directly. if !useSnappy { buf.WriteString(codecHeaderRaw) } @@ -51,17 +54,17 @@ func diffVarintEncode(p index.Postings, useSnappy bool) ([]byte, error) { } if !useSnappy { - // this already has the correct header + // This already has the correct header. return buf.Bytes(), nil } - // make result buffer large enough to hold our header and compressed block + // Make result buffer large enough to hold our header and compressed block. resultBuf := make([]byte, len(codecHeaderSnappy)+snappy.MaxEncodedLen(buf.Len())) copy(resultBuf, codecHeaderSnappy) compressed := snappy.Encode(resultBuf[len(codecHeaderSnappy):], buf.Bytes()) - // slice result buffer based on compressed size + // Slice result buffer based on compressed size. resultBuf = resultBuf[:len(codecHeaderSnappy)+len(compressed)] return resultBuf, nil } @@ -91,6 +94,7 @@ func diffVarintDecode(input []byte) (index.Postings, error) { return &diffVarintPostings{data: raw}, nil } +// Implementation of index.Postings based on diff+varint encoded data. type diffVarintPostings struct { data []byte cur uint64 From eb8fca91ae93d3f593a989c31b475ab35580e8e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Sat, 21 Mar 2020 11:56:33 +0100 Subject: [PATCH 10/28] Use Encbuf and Decbuf. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/postings_codec.go | 42 +++++++++++++++----------------- pkg/store/postings_codec_test.go | 10 ++++---- 2 files changed, 24 insertions(+), 28 deletions(-) diff --git a/pkg/store/postings_codec.go b/pkg/store/postings_codec.go index 17d2a197aa..414e55b3c5 100644 --- a/pkg/store/postings_codec.go +++ b/pkg/store/postings_codec.go @@ -5,11 +5,10 @@ package store import ( "bytes" - encoding_binary "encoding/binary" - "fmt" "github.com/golang/snappy" "github.com/pkg/errors" + "github.com/prometheus/prometheus/tsdb/encoding" "github.com/prometheus/prometheus/tsdb/index" ) @@ -31,21 +30,22 @@ func diffVarintSnappyEncode(p index.Postings) ([]byte, error) { // diffVarintEncode encodes postings into diff+varint representation and optional snappy compression. func diffVarintEncode(p index.Postings, useSnappy bool) ([]byte, error) { - varintBuf := make([]byte, encoding_binary.MaxVarintLen64) - - buf := bytes.Buffer{} + buf := encoding.Encbuf{} // If we're returning raw data, write the header to the buffer, and then return buffer directly. if !useSnappy { - buf.WriteString(codecHeaderRaw) + buf.PutString(codecHeaderRaw) } prev := uint64(0) for p.Next() { v := p.At() - n := encoding_binary.PutUvarint(varintBuf, v-prev) - buf.Write(varintBuf[:n]) + if v < prev { + // Postings entries must be in increasing order. + return nil, errors.Errorf("decreasing entry, val: %d, prev: %d", v, prev) + } + buf.PutUvarint64(v - prev) prev = v } @@ -54,15 +54,15 @@ func diffVarintEncode(p index.Postings, useSnappy bool) ([]byte, error) { } if !useSnappy { - // This already has the correct header. - return buf.Bytes(), nil + // When not using Snappy, buffer already has the correct header. + return buf.B, nil } // Make result buffer large enough to hold our header and compressed block. resultBuf := make([]byte, len(codecHeaderSnappy)+snappy.MaxEncodedLen(buf.Len())) copy(resultBuf, codecHeaderSnappy) - compressed := snappy.Encode(resultBuf[len(codecHeaderSnappy):], buf.Bytes()) + compressed := snappy.Encode(resultBuf[len(codecHeaderSnappy):], buf.B) // Slice result buffer based on compressed size. resultBuf = resultBuf[:len(codecHeaderSnappy)+len(compressed)] @@ -87,18 +87,17 @@ func diffVarintDecode(input []byte) (index.Postings, error) { var err error raw, err = snappy.Decode(nil, raw) if err != nil { - return nil, fmt.Errorf("snappy decode: %w", err) + return nil, errors.Errorf("snappy decode: %w", err) } } - return &diffVarintPostings{data: raw}, nil + return &diffVarintPostings{buf: &encoding.Decbuf{B: raw}}, nil } // Implementation of index.Postings based on diff+varint encoded data. type diffVarintPostings struct { - data []byte - cur uint64 - err error + buf *encoding.Decbuf + cur uint64 } func (it *diffVarintPostings) At() uint64 { @@ -106,19 +105,16 @@ func (it *diffVarintPostings) At() uint64 { } func (it *diffVarintPostings) Next() bool { - if len(it.data) == 0 { + if it.buf.Err() != nil || it.buf.Len() == 0 { return false } - val, n := encoding_binary.Uvarint(it.data) - if n == 0 { - it.err = errors.New("not enough data") + val := it.buf.Uvarint64() + if it.buf.Err() != nil { return false } - it.data = it.data[n:] it.cur = it.cur + val - it.err = nil return true } @@ -139,5 +135,5 @@ func (it *diffVarintPostings) Seek(x uint64) bool { } func (it *diffVarintPostings) Err() error { - return it.err + return it.buf.Err() } diff --git a/pkg/store/postings_codec_test.go b/pkg/store/postings_codec_test.go index 3d834a55c3..89a3e9067f 100644 --- a/pkg/store/postings_codec_test.go +++ b/pkg/store/postings_codec_test.go @@ -47,16 +47,16 @@ func TestDiffVarintCodec(t *testing.T) { testutil.Ok(t, err) for _, snappy := range []bool{false, true} { - name := postingName + "/" + name := postingName if snappy { - name = name + "snappy" + name = "snappy/" + name } else { - name = name + "raw" + name = "raw/" + name } - t.Run(postingName+"/"+name, func(t *testing.T) { + t.Run(name, func(t *testing.T) { t.Log("postings entries:", p.len()) - t.Log("raw size (4*entries):", 4*p.len(), "bytes") + t.Log("original size (4*entries):", 4*p.len(), "bytes") p.reset() // we reuse postings between runs, so we need to reset iterator data, err := diffVarintEncode(p, snappy) From 5347e89799bad9be52e071432f56f5ce300e4781 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Sat, 21 Mar 2020 12:02:41 +0100 Subject: [PATCH 11/28] Fix comments in test file. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/postings_codec_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/store/postings_codec_test.go b/pkg/store/postings_codec_test.go index 89a3e9067f..876ab3930f 100644 --- a/pkg/store/postings_codec_test.go +++ b/pkg/store/postings_codec_test.go @@ -57,7 +57,7 @@ func TestDiffVarintCodec(t *testing.T) { t.Run(name, func(t *testing.T) { t.Log("postings entries:", p.len()) t.Log("original size (4*entries):", 4*p.len(), "bytes") - p.reset() // we reuse postings between runs, so we need to reset iterator + p.reset() // We reuse postings between runs, so we need to reset iterator. data, err := diffVarintEncode(p, snappy) testutil.Ok(t, err) @@ -131,7 +131,7 @@ func toUint64Postings(p index.Postings) (*uint64Postings, error) { return &uint64Postings{vals: vals, ix: -1}, p.Err() } -// postings with no decoding step +// Postings with no decoding step. type uint64Postings struct { vals []uint64 ix int @@ -157,8 +157,8 @@ func (p *uint64Postings) Seek(x uint64) bool { return true } - // we cannot do any search due to how values are stored, - // so we simply advance until we find the right value + // We cannot do any search due to how values are stored, + // so we simply advance until we find the right value. for p.Next() { if p.At() >= x { return true From eec1a07b2b8e3d8584309afa5c22648391f4553a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Sat, 21 Mar 2020 12:08:22 +0100 Subject: [PATCH 12/28] Another comment... MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/postings_codec.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/store/postings_codec.go b/pkg/store/postings_codec.go index 414e55b3c5..3c33fab260 100644 --- a/pkg/store/postings_codec.go +++ b/pkg/store/postings_codec.go @@ -123,8 +123,8 @@ func (it *diffVarintPostings) Seek(x uint64) bool { return true } - // we cannot do any search due to how values are stored, - // so we simply advance until we find the right value + // We cannot do any search due to how values are stored, + // so we simply advance until we find the right value. for it.Next() { if it.At() >= x { return true From 85b6a2bf6b2dbababb7fe4a5e98bb3f8e9cb322a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Sun, 22 Mar 2020 12:33:24 +0100 Subject: [PATCH 13/28] Removed diffVarintSnappyEncode function. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/bucket.go | 2 +- pkg/store/postings_codec.go | 5 ----- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index f743e37ace..ec8f3ec02e 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1610,7 +1610,7 @@ func (r *bucketIndexReader) fetchPostings(keys []labels.Label) ([]index.Postings // Reencode postings before storing to cache. If that fails, we store original bytes. // This can only fail, if postings data was somehow corrupted, // and there is nothing we can do about it. It's not worth reporting here. - data, err := diffVarintSnappyEncode(newBigEndianPostings(pBytes[4:])) + data, err := diffVarintEncode(newBigEndianPostings(pBytes[4:]), true) if err == nil { storeData = data } diff --git a/pkg/store/postings_codec.go b/pkg/store/postings_codec.go index 3c33fab260..26f0e80217 100644 --- a/pkg/store/postings_codec.go +++ b/pkg/store/postings_codec.go @@ -23,11 +23,6 @@ func isDiffVarintEncodedPostings(input []byte) bool { return bytes.HasPrefix(input, []byte(codecHeaderRaw)) || bytes.HasPrefix(input, []byte(codecHeaderSnappy)) } -// diffVarintSnappyEncode encodes postings using diff+varint+snappy codec. -func diffVarintSnappyEncode(p index.Postings) ([]byte, error) { - return diffVarintEncode(p, true) -} - // diffVarintEncode encodes postings into diff+varint representation and optional snappy compression. func diffVarintEncode(p index.Postings, useSnappy bool) ([]byte, error) { buf := encoding.Encbuf{} From 13adc826b59de7aa64fc8f13674a0423f3d1c5bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 23 Mar 2020 14:14:41 +0100 Subject: [PATCH 14/28] Comment on usage with in-memory cache. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/bucket.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index ec8f3ec02e..2f27da0f3d 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -223,6 +223,7 @@ type BucketStore struct { // Reencode postings using diff+varint+snappy when storing to cache. // This makes them smaller, but takes extra CPU and memory. + // When used with in-memory cache, memory usage should decrease overall, thanks to postings being smaller. enablePostingsCompression bool } From ef74baeb0dc73616d62e38d3674dd5df39cae826 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 23 Mar 2020 14:20:12 +0100 Subject: [PATCH 15/28] var block MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/bucket.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 2f27da0f3d..5c98d03c9c 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1526,8 +1526,10 @@ func (r *bucketIndexReader) fetchPostings(keys []labels.Label) ([]index.Postings // Even if this instance is not using compression, there may be compressed // entries in the cache written by other stores. - var l index.Postings - var err error + var ( + l index.Postings + err error + ) if isDiffVarintEncodedPostings(b) { l, err = diffVarintDecode(b) } else { From 564f6f31a1f6ef5ccaf78515848933434fc15533 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 23 Mar 2020 14:22:19 +0100 Subject: [PATCH 16/28] Removed extra comment. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/postings_codec.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/store/postings_codec.go b/pkg/store/postings_codec.go index 26f0e80217..41f9ab05d5 100644 --- a/pkg/store/postings_codec.go +++ b/pkg/store/postings_codec.go @@ -27,7 +27,6 @@ func isDiffVarintEncodedPostings(input []byte) bool { func diffVarintEncode(p index.Postings, useSnappy bool) ([]byte, error) { buf := encoding.Encbuf{} - // If we're returning raw data, write the header to the buffer, and then return buffer directly. if !useSnappy { buf.PutString(codecHeaderRaw) } From 2317b03a29edadbeee8346f551e91e588051f71f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 23 Mar 2020 14:29:05 +0100 Subject: [PATCH 17/28] Move comment to error message. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/postings_codec.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/store/postings_codec.go b/pkg/store/postings_codec.go index 41f9ab05d5..1e4fce2330 100644 --- a/pkg/store/postings_codec.go +++ b/pkg/store/postings_codec.go @@ -35,8 +35,7 @@ func diffVarintEncode(p index.Postings, useSnappy bool) ([]byte, error) { for p.Next() { v := p.At() if v < prev { - // Postings entries must be in increasing order. - return nil, errors.Errorf("decreasing entry, val: %d, prev: %d", v, prev) + return nil, errors.Errorf("postings entries must be in increasing order, current: %d, previous: %d", v, prev) } buf.PutUvarint64(v - prev) From e2203aa5e029b41c67721e89621adc6454138b30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 23 Mar 2020 14:56:52 +0100 Subject: [PATCH 18/28] Separated snappy compression and postings reencoding into two functions. There is now header only for snappy-compressed postings. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/bucket.go | 6 +-- pkg/store/postings_codec.go | 82 +++++++++++++++----------------- pkg/store/postings_codec_test.go | 21 ++++---- 3 files changed, 53 insertions(+), 56 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 5c98d03c9c..6fab03245f 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1530,8 +1530,8 @@ func (r *bucketIndexReader) fetchPostings(keys []labels.Label) ([]index.Postings l index.Postings err error ) - if isDiffVarintEncodedPostings(b) { - l, err = diffVarintDecode(b) + if isDiffVarintSnappyEncodedPostings(b) { + l, err = diffVarintSnappyDecode(b) } else { _, l, err = r.dec.Postings(b) } @@ -1613,7 +1613,7 @@ func (r *bucketIndexReader) fetchPostings(keys []labels.Label) ([]index.Postings // Reencode postings before storing to cache. If that fails, we store original bytes. // This can only fail, if postings data was somehow corrupted, // and there is nothing we can do about it. It's not worth reporting here. - data, err := diffVarintEncode(newBigEndianPostings(pBytes[4:]), true) + data, err := diffVarintSnappyEncode(newBigEndianPostings(pBytes[4:])) if err == nil { storeData = data } diff --git a/pkg/store/postings_codec.go b/pkg/store/postings_codec.go index 1e4fce2330..a2a6c275ab 100644 --- a/pkg/store/postings_codec.go +++ b/pkg/store/postings_codec.go @@ -13,24 +13,39 @@ import ( ) const ( - // These headers should not be a prefix of each other. - codecHeaderRaw = "diff+varint+raw" codecHeaderSnappy = "diff+varint+snappy" ) -// isDiffVarintEncodedPostings returns true, if input looks like it has been encoded by diff+varint(+snappy) codec. -func isDiffVarintEncodedPostings(input []byte) bool { - return bytes.HasPrefix(input, []byte(codecHeaderRaw)) || bytes.HasPrefix(input, []byte(codecHeaderSnappy)) +// isDiffVarintSnappyEncodedPostings returns true, if input looks like it has been encoded by diff+varint(+snappy) codec. +func isDiffVarintSnappyEncodedPostings(input []byte) bool { + return bytes.HasPrefix(input, []byte(codecHeaderSnappy)) } -// diffVarintEncode encodes postings into diff+varint representation and optional snappy compression. -func diffVarintEncode(p index.Postings, useSnappy bool) ([]byte, error) { - buf := encoding.Encbuf{} - - if !useSnappy { - buf.PutString(codecHeaderRaw) +// diffVarintSnappyEncode encodes postings into diff+varint representation, +// and applies snappy compression on the result. +// Returned byte slice starts with codedHeaderSnappy header. +func diffVarintSnappyEncode(p index.Postings) ([]byte, error) { + buf, err := diffVarintEncodeNoHeader(p) + if err != nil { + return nil, err } + // Make result buffer large enough to hold our header and compressed block. + result := make([]byte, len(codecHeaderSnappy)+snappy.MaxEncodedLen(len(buf))) + copy(result, codecHeaderSnappy) + + compressed := snappy.Encode(result[len(codecHeaderSnappy):], buf) + + // Slice result buffer based on compressed size. + result = result[:len(codecHeaderSnappy)+len(compressed)] + return result, nil +} + +// diffVarintEncodeNoHeader encodes postings into diff+varint representation. +// It doesn't add any header to the output bytes. +func diffVarintEncodeNoHeader(p index.Postings) ([]byte, error) { + buf := encoding.Encbuf{} + prev := uint64(0) for p.Next() { v := p.At() @@ -38,53 +53,32 @@ func diffVarintEncode(p index.Postings, useSnappy bool) ([]byte, error) { return nil, errors.Errorf("postings entries must be in increasing order, current: %d, previous: %d", v, prev) } + // This is the 'diff' part -- compute difference from previous value. buf.PutUvarint64(v - prev) prev = v } - if p.Err() != nil { return nil, p.Err() } - if !useSnappy { - // When not using Snappy, buffer already has the correct header. - return buf.B, nil - } - - // Make result buffer large enough to hold our header and compressed block. - resultBuf := make([]byte, len(codecHeaderSnappy)+snappy.MaxEncodedLen(buf.Len())) - copy(resultBuf, codecHeaderSnappy) - - compressed := snappy.Encode(resultBuf[len(codecHeaderSnappy):], buf.B) - - // Slice result buffer based on compressed size. - resultBuf = resultBuf[:len(codecHeaderSnappy)+len(compressed)] - return resultBuf, nil + return buf.B, nil } -func diffVarintDecode(input []byte) (index.Postings, error) { - compressed := false - headerLen := 0 - switch { - case bytes.HasPrefix(input, []byte(codecHeaderRaw)): - headerLen = len(codecHeaderRaw) - case bytes.HasPrefix(input, []byte(codecHeaderSnappy)): - headerLen = len(codecHeaderSnappy) - compressed = true - default: +func diffVarintSnappyDecode(input []byte) (index.Postings, error) { + if !isDiffVarintSnappyEncodedPostings(input) { return nil, errors.New("header not found") } - raw := input[headerLen:] - if compressed { - var err error - raw, err = snappy.Decode(nil, raw) - if err != nil { - return nil, errors.Errorf("snappy decode: %w", err) - } + raw, err := snappy.Decode(nil, input[len(codecHeaderSnappy):]) + if err != nil { + return nil, errors.Errorf("snappy decode: %w", err) } - return &diffVarintPostings{buf: &encoding.Decbuf{B: raw}}, nil + return newDiffVarintPostings(raw), nil +} + +func newDiffVarintPostings(input []byte) *diffVarintPostings { + return &diffVarintPostings{buf: &encoding.Decbuf{B: input}} } // Implementation of index.Postings based on diff+varint encoded data. diff --git a/pkg/store/postings_codec_test.go b/pkg/store/postings_codec_test.go index 876ab3930f..adb9f57d4b 100644 --- a/pkg/store/postings_codec_test.go +++ b/pkg/store/postings_codec_test.go @@ -42,30 +42,33 @@ func TestDiffVarintCodec(t *testing.T) { `i!~"2.*"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotRegexp, "i", "^2.*$")), } + codecs := map[string]struct { + codingFunction func(index.Postings) ([]byte, error) + decodingFunction func([]byte) (index.Postings, error) + }{ + "raw": {codingFunction: diffVarintEncodeNoHeader, decodingFunction: func(bytes []byte) (index.Postings, error) { return newDiffVarintPostings(bytes), nil }}, + "snappy": {codingFunction: diffVarintSnappyEncode, decodingFunction: diffVarintSnappyDecode}, + } + for postingName, postings := range postingsMap { p, err := toUint64Postings(postings) testutil.Ok(t, err) - for _, snappy := range []bool{false, true} { - name := postingName - if snappy { - name = "snappy/" + name - } else { - name = "raw/" + name - } + for cname, codec := range codecs { + name := cname + "/" + postingName t.Run(name, func(t *testing.T) { t.Log("postings entries:", p.len()) t.Log("original size (4*entries):", 4*p.len(), "bytes") p.reset() // We reuse postings between runs, so we need to reset iterator. - data, err := diffVarintEncode(p, snappy) + data, err := codec.codingFunction(p) testutil.Ok(t, err) t.Log("encoded size", len(data), "bytes") t.Logf("ratio: %0.3f", (float64(len(data)) / float64(4*p.len()))) - decodedPostings, err := diffVarintDecode(data) + decodedPostings, err := codec.decodingFunction(data) testutil.Ok(t, err) p.reset() From eacd3945f19b1de83682e11bc66ee6125a470496 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 23 Mar 2020 15:08:50 +0100 Subject: [PATCH 19/28] Added comment on using diff+varint+snappy. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/postings_codec.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/pkg/store/postings_codec.go b/pkg/store/postings_codec.go index a2a6c275ab..309e58cc53 100644 --- a/pkg/store/postings_codec.go +++ b/pkg/store/postings_codec.go @@ -12,6 +12,15 @@ import ( "github.com/prometheus/prometheus/tsdb/index" ) +// This file implements encoding and decoding of postings using diff (or delta) + varint +// number encoding. On top of that, we apply Snappy compression. +// +// On its own, Snappy compressing raw postings doesn't really help, because there is no +// repetition in raw data. Using diff (delta) between postings entries makes values small, +// and Varint is very efficient at encoding small values (values < 128 are encoded as +// single byte, values < 16384 are encoded as two bytes). Diff + varint reduces postings size +// significantly (to about 20% of original), snappy then halves it to ~10% of the original. + const ( codecHeaderSnappy = "diff+varint+snappy" ) @@ -81,7 +90,7 @@ func newDiffVarintPostings(input []byte) *diffVarintPostings { return &diffVarintPostings{buf: &encoding.Decbuf{B: input}} } -// Implementation of index.Postings based on diff+varint encoded data. +// diffVarintPostings is an implementation of index.Postings based on diff+varint encoded data. type diffVarintPostings struct { buf *encoding.Decbuf cur uint64 From a2ee4957c4a5c68945013d72cbaafac9c2ff1a0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 23 Mar 2020 15:59:35 +0100 Subject: [PATCH 20/28] Shorten header MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/postings_codec.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/store/postings_codec.go b/pkg/store/postings_codec.go index 309e58cc53..d0f3a207f0 100644 --- a/pkg/store/postings_codec.go +++ b/pkg/store/postings_codec.go @@ -22,7 +22,7 @@ import ( // significantly (to about 20% of original), snappy then halves it to ~10% of the original. const ( - codecHeaderSnappy = "diff+varint+snappy" + codecHeaderSnappy = "dvs" // as in "diff+varint+snappy" ) // isDiffVarintSnappyEncodedPostings returns true, if input looks like it has been encoded by diff+varint(+snappy) codec. From 8c438d510a3d1abf97cf1ed45dc93018d098b2ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 23 Mar 2020 16:08:18 +0100 Subject: [PATCH 21/28] Lint... MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/postings_codec.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/store/postings_codec.go b/pkg/store/postings_codec.go index d0f3a207f0..4b25b8c7fa 100644 --- a/pkg/store/postings_codec.go +++ b/pkg/store/postings_codec.go @@ -22,7 +22,7 @@ import ( // significantly (to about 20% of original), snappy then halves it to ~10% of the original. const ( - codecHeaderSnappy = "dvs" // as in "diff+varint+snappy" + codecHeaderSnappy = "dvs" // As in "diff+varint+snappy". ) // isDiffVarintSnappyEncodedPostings returns true, if input looks like it has been encoded by diff+varint(+snappy) codec. From 22a1338c21d940426d27ad82fb1d55c83d21dd10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 23 Mar 2020 19:25:49 +0100 Subject: [PATCH 22/28] Changed experimental.enable-postings-compression to experimental.enable-index-cache-postings-compression MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- CHANGELOG.md | 2 +- cmd/thanos/store.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b57a94525..ab743e683e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,7 +24,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#2250](https://github.com/thanos-io/thanos/pull/2250) Compactor: Enable vertical compaction for offline deduplication (Experimental). Uses `--deduplication.replica-label` flag to specify the replica label to deduplicate on (Hidden). Please note that this uses a NAIVE algorithm for merging (no smart replica deduplication, just chaining samples together). This works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication. We plan to add a smarter algorithm in the following weeks. - [#1714](https://github.com/thanos-io/thanos/pull/1714) Run the bucket web UI in the compact component when it is run as a long-lived process. - [#2304](https://github.com/thanos-io/thanos/pull/2304) Store: Added `max_item_size` config option to memcached-based index cache. This should be set to the max item size configured in memcached (`-I` flag) in order to not waste network round-trips to cache items larger than the limit configured in memcached. -- [#2297](https://github.com/thanos-io/thanos/pull/2297) Store Gateway: Add `--experimental.enable-postings-compression` flag to enable reencoding and compressing postings before storing them into cache. Compressed postings take about 10% of the original size. +- [#2297](https://github.com/thanos-io/thanos/pull/2297) Store Gateway: Add `--experimental.enable-index-cache-postings-compression` flag to enable reencoding and compressing postings before storing them into cache. Compressed postings take about 10% of the original size. ### Changed diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 4ed15c4248..18e95f81a1 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -81,7 +81,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { enableIndexHeader := cmd.Flag("experimental.enable-index-header", "If true, Store Gateway will recreate index-header instead of index-cache.json for each block. This will replace index-cache.json permanently once it will be out of experimental stage."). Hidden().Default("false").Bool() - enablePostingsCompression := cmd.Flag("experimental.enable-postings-compression", "If true, Store Gateway will reencode and compress postings before storing them into cache. Compressed postings take about 10% of the original size."). + enablePostingsCompression := cmd.Flag("experimental.enable-index-cache-postings-compression", "If true, Store Gateway will reencode and compress postings before storing them into cache. Compressed postings take about 10% of the original size."). Hidden().Default("false").Bool() consistencyDelay := modelDuration(cmd.Flag("consistency-delay", "Minimum age of all blocks before they are being read. Set it to safe value (e.g 30m) if your object storage is eventually consistent. GCS and S3 are (roughly) strongly consistent."). From 644d09d50ce9c1de876d26797b6dce364a513ff8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 23 Mar 2020 21:14:37 +0100 Subject: [PATCH 23/28] Added metrics for postings compression MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/bucket.go | 76 ++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 68 insertions(+), 8 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 6fab03245f..9caf042eb4 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -97,6 +97,12 @@ type bucketStoreMetrics struct { queriesDropped prometheus.Counter queriesLimit prometheus.Gauge seriesRefetches prometheus.Counter + + cachedPostingsOriginalSizeBytes prometheus.Counter + cachedPostingsCompressedSizeBytes prometheus.Counter + cachedPostingsCompressionTimeSeconds prometheus.Counter + cachedPostingsCompressions prometheus.Counter + cachedPostingsCompressionErrors prometheus.Counter } func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { @@ -180,6 +186,28 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { Name: "thanos_bucket_store_series_refetches_total", Help: fmt.Sprintf("Total number of cases where %v bytes was not enough was to fetch series from index, resulting in refetch.", maxSeriesSize), }) + + m.cachedPostingsCompressions = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_bucket_store_cached_postings_compressions_total", + Help: "Number of postings compressions before storing to index cache", + }) + m.cachedPostingsCompressionErrors = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_bucket_store_cached_postings_compression_errors_total", + Help: "Number of postings compression errors", + }) + m.cachedPostingsOriginalSizeBytes = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_bucket_store_cached_postings_original_size_bytes_total", + Help: "Original size of postings stored into cache.", + }) + m.cachedPostingsCompressedSizeBytes = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_bucket_store_cached_postings_compressed_size_bytes_total", + Help: "Compressed size of postings stored into cache.", + }) + m.cachedPostingsCompressionTimeSeconds = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_bucket_store_cached_postings_compression_time_seconds", + Help: "Time spent compressing postings before storing them into postings cache", + }) + return &m } @@ -904,6 +932,11 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie s.metrics.seriesDataSizeTouched.WithLabelValues("chunks").Observe(float64(stats.chunksTouchedSizeSum)) s.metrics.seriesDataSizeFetched.WithLabelValues("chunks").Observe(float64(stats.chunksFetchedSizeSum)) s.metrics.resultSeriesCount.Observe(float64(stats.mergedSeriesCount)) + s.metrics.cachedPostingsOriginalSizeBytes.Add(float64(stats.cachedPostingsOriginalSizeSum)) + s.metrics.cachedPostingsCompressedSizeBytes.Add(float64(stats.cachedPostingsCompressedSizeSum)) + s.metrics.cachedPostingsCompressionTimeSeconds.Add(stats.cachedPostingsCompressionTimeSum.Seconds()) + s.metrics.cachedPostingsCompressions.Add(float64(stats.cachedPostingsCompressions)) + s.metrics.cachedPostingsCompressionErrors.Add(float64(stats.cachedPostingsCompressionErrors)) level.Debug(s.logger).Log("msg", "stats query processed", "stats", fmt.Sprintf("%+v", stats), "err", err) @@ -1602,28 +1635,43 @@ func (r *bucketIndexReader) fetchPostings(keys []labels.Label) ([]index.Postings return err } - r.mtx.Lock() - // Return postings and fill LRU cache. - // Truncate first 4 bytes which are length of posting. - output[p.keyID] = newBigEndianPostings(pBytes[4:]) + dataToCache := pBytes - storeData := pBytes + compressionTime := time.Duration(0) + compressions, compressionErrors, compressedSize := 0, 0, 0 if r.block.enablePostingsCompression { // Reencode postings before storing to cache. If that fails, we store original bytes. // This can only fail, if postings data was somehow corrupted, - // and there is nothing we can do about it. It's not worth reporting here. + // and there is nothing we can do about it. + // Errors from corrupted postings will be reported when postings are used. + compressions++ + s := time.Now() data, err := diffVarintSnappyEncode(newBigEndianPostings(pBytes[4:])) + compressionTime = time.Since(s) if err == nil { - storeData = data + dataToCache = data + compressedSize = len(data) + } else { + compressionErrors = 1 } } - r.block.indexCache.StorePostings(r.ctx, r.block.meta.ULID, keys[p.keyID], storeData) + r.mtx.Lock() + // Return postings and fill LRU cache. + // Truncate first 4 bytes which are length of posting. + output[p.keyID] = newBigEndianPostings(pBytes[4:]) + + r.block.indexCache.StorePostings(r.ctx, r.block.meta.ULID, keys[p.keyID], dataToCache) // If we just fetched it we still have to update the stats for touched postings. r.stats.postingsTouched++ r.stats.postingsTouchedSizeSum += len(pBytes) + r.stats.cachedPostingsCompressions += compressions + r.stats.cachedPostingsCompressionErrors += compressionErrors + r.stats.cachedPostingsOriginalSizeSum += len(pBytes) + r.stats.cachedPostingsCompressedSizeSum += compressedSize + r.stats.cachedPostingsCompressionTimeSum += compressionTime r.mtx.Unlock() } return nil @@ -1997,6 +2045,12 @@ type queryStats struct { postingsFetchCount int postingsFetchDurationSum time.Duration + cachedPostingsCompressions int + cachedPostingsCompressionErrors int + cachedPostingsOriginalSizeSum int + cachedPostingsCompressedSizeSum int + cachedPostingsCompressionTimeSum time.Duration + seriesTouched int seriesTouchedSizeSum int seriesFetched int @@ -2027,6 +2081,12 @@ func (s queryStats) merge(o *queryStats) *queryStats { s.postingsFetchCount += o.postingsFetchCount s.postingsFetchDurationSum += o.postingsFetchDurationSum + s.cachedPostingsCompressions += o.cachedPostingsCompressions + s.cachedPostingsCompressionErrors += o.cachedPostingsCompressionErrors + s.cachedPostingsOriginalSizeSum += o.cachedPostingsOriginalSizeSum + s.cachedPostingsCompressedSizeSum += o.cachedPostingsCompressedSizeSum + s.cachedPostingsCompressionTimeSum += o.cachedPostingsCompressionTimeSum + s.seriesTouched += o.seriesTouched s.seriesTouchedSizeSum += o.seriesTouchedSizeSum s.seriesFetched += o.seriesFetched From c754543232c663b098ec165cfb68b2f05f3f2e3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 23 Mar 2020 21:37:23 +0100 Subject: [PATCH 24/28] Added metrics for postings decompression MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/bucket.go | 56 ++++++++++++++++++++++++++++++++++----------- 1 file changed, 43 insertions(+), 13 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 9caf042eb4..1eef3feda6 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -98,11 +98,14 @@ type bucketStoreMetrics struct { queriesLimit prometheus.Gauge seriesRefetches prometheus.Counter - cachedPostingsOriginalSizeBytes prometheus.Counter - cachedPostingsCompressedSizeBytes prometheus.Counter - cachedPostingsCompressionTimeSeconds prometheus.Counter - cachedPostingsCompressions prometheus.Counter - cachedPostingsCompressionErrors prometheus.Counter + cachedPostingsOriginalSizeBytes prometheus.Counter + cachedPostingsCompressedSizeBytes prometheus.Counter + cachedPostingsCompressionTimeSeconds prometheus.Counter + cachedPostingsCompressions prometheus.Counter + cachedPostingsCompressionErrors prometheus.Counter + cachedPostingsDecompressions prometheus.Counter + cachedPostingsDecompressionErrors prometheus.Counter + cachedPostingsDecompressionTimeSeconds prometheus.Counter } func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { @@ -189,11 +192,11 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { m.cachedPostingsCompressions = promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_bucket_store_cached_postings_compressions_total", - Help: "Number of postings compressions before storing to index cache", + Help: "Number of postings compressions before storing to index cache.", }) m.cachedPostingsCompressionErrors = promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_bucket_store_cached_postings_compression_errors_total", - Help: "Number of postings compression errors", + Help: "Number of postings compression errors.", }) m.cachedPostingsOriginalSizeBytes = promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_bucket_store_cached_postings_original_size_bytes_total", @@ -205,7 +208,19 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { }) m.cachedPostingsCompressionTimeSeconds = promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_bucket_store_cached_postings_compression_time_seconds", - Help: "Time spent compressing postings before storing them into postings cache", + Help: "Time spent compressing postings before storing them into postings cache.", + }) + m.cachedPostingsDecompressions = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_bucket_store_cached_postings_decompressions_total", + Help: "Number of postings decompressions after reading from index cache.", + }) + m.cachedPostingsDecompressionErrors = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_bucket_store_cached_postings_decompression_errors_total", + Help: "Number of postings decompression errors.", + }) + m.cachedPostingsDecompressionTimeSeconds = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_bucket_store_cached_postings_decompression_time_seconds", + Help: "Time spent decompressing postings. Decoding is not included.", }) return &m @@ -937,6 +952,9 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie s.metrics.cachedPostingsCompressionTimeSeconds.Add(stats.cachedPostingsCompressionTimeSum.Seconds()) s.metrics.cachedPostingsCompressions.Add(float64(stats.cachedPostingsCompressions)) s.metrics.cachedPostingsCompressionErrors.Add(float64(stats.cachedPostingsCompressionErrors)) + s.metrics.cachedPostingsDecompressions.Add(float64(stats.cachedPostingsDecompressions)) + s.metrics.cachedPostingsDecompressionErrors.Add(float64(stats.cachedPostingsDecompressionErrors)) + s.metrics.cachedPostingsDecompressionTimeSeconds.Add(stats.cachedPostingsDecompressionTimeSum.Seconds()) level.Debug(s.logger).Log("msg", "stats query processed", "stats", fmt.Sprintf("%+v", stats), "err", err) @@ -1564,7 +1582,13 @@ func (r *bucketIndexReader) fetchPostings(keys []labels.Label) ([]index.Postings err error ) if isDiffVarintSnappyEncodedPostings(b) { + s := time.Now() l, err = diffVarintSnappyDecode(b) + r.stats.cachedPostingsDecompressions += 1 + r.stats.cachedPostingsDecompressionTimeSum += time.Since(s) + if err != nil { + r.stats.cachedPostingsDecompressionErrors += 1 + } } else { _, l, err = r.dec.Postings(b) } @@ -2045,11 +2069,14 @@ type queryStats struct { postingsFetchCount int postingsFetchDurationSum time.Duration - cachedPostingsCompressions int - cachedPostingsCompressionErrors int - cachedPostingsOriginalSizeSum int - cachedPostingsCompressedSizeSum int - cachedPostingsCompressionTimeSum time.Duration + cachedPostingsCompressions int + cachedPostingsCompressionErrors int + cachedPostingsOriginalSizeSum int + cachedPostingsCompressedSizeSum int + cachedPostingsCompressionTimeSum time.Duration + cachedPostingsDecompressions int + cachedPostingsDecompressionErrors int + cachedPostingsDecompressionTimeSum time.Duration seriesTouched int seriesTouchedSizeSum int @@ -2086,6 +2113,9 @@ func (s queryStats) merge(o *queryStats) *queryStats { s.cachedPostingsOriginalSizeSum += o.cachedPostingsOriginalSizeSum s.cachedPostingsCompressedSizeSum += o.cachedPostingsCompressedSizeSum s.cachedPostingsCompressionTimeSum += o.cachedPostingsCompressionTimeSum + s.cachedPostingsDecompressions += o.cachedPostingsDecompressions + s.cachedPostingsDecompressionErrors += o.cachedPostingsDecompressionErrors + s.cachedPostingsDecompressionTimeSum += o.cachedPostingsDecompressionTimeSum s.seriesTouched += o.seriesTouched s.seriesTouchedSizeSum += o.seriesTouchedSizeSum From 91860cf586d38cbcb75b4caddf5c6e599b86aedd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 23 Mar 2020 21:39:40 +0100 Subject: [PATCH 25/28] Reorder metrics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/bucket.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 1eef3feda6..b2722e0409 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -98,11 +98,11 @@ type bucketStoreMetrics struct { queriesLimit prometheus.Gauge seriesRefetches prometheus.Counter + cachedPostingsCompressions prometheus.Counter + cachedPostingsCompressionErrors prometheus.Counter cachedPostingsOriginalSizeBytes prometheus.Counter cachedPostingsCompressedSizeBytes prometheus.Counter cachedPostingsCompressionTimeSeconds prometheus.Counter - cachedPostingsCompressions prometheus.Counter - cachedPostingsCompressionErrors prometheus.Counter cachedPostingsDecompressions prometheus.Counter cachedPostingsDecompressionErrors prometheus.Counter cachedPostingsDecompressionTimeSeconds prometheus.Counter @@ -947,11 +947,11 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie s.metrics.seriesDataSizeTouched.WithLabelValues("chunks").Observe(float64(stats.chunksTouchedSizeSum)) s.metrics.seriesDataSizeFetched.WithLabelValues("chunks").Observe(float64(stats.chunksFetchedSizeSum)) s.metrics.resultSeriesCount.Observe(float64(stats.mergedSeriesCount)) + s.metrics.cachedPostingsCompressions.Add(float64(stats.cachedPostingsCompressions)) + s.metrics.cachedPostingsCompressionErrors.Add(float64(stats.cachedPostingsCompressionErrors)) s.metrics.cachedPostingsOriginalSizeBytes.Add(float64(stats.cachedPostingsOriginalSizeSum)) s.metrics.cachedPostingsCompressedSizeBytes.Add(float64(stats.cachedPostingsCompressedSizeSum)) s.metrics.cachedPostingsCompressionTimeSeconds.Add(stats.cachedPostingsCompressionTimeSum.Seconds()) - s.metrics.cachedPostingsCompressions.Add(float64(stats.cachedPostingsCompressions)) - s.metrics.cachedPostingsCompressionErrors.Add(float64(stats.cachedPostingsCompressionErrors)) s.metrics.cachedPostingsDecompressions.Add(float64(stats.cachedPostingsDecompressions)) s.metrics.cachedPostingsDecompressionErrors.Add(float64(stats.cachedPostingsDecompressionErrors)) s.metrics.cachedPostingsDecompressionTimeSeconds.Add(stats.cachedPostingsDecompressionTimeSum.Seconds()) From 869ac4db3145845b8e1d00079fece457a2ad2cc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 23 Mar 2020 21:41:00 +0100 Subject: [PATCH 26/28] Fixed comment. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/postings_codec.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/store/postings_codec.go b/pkg/store/postings_codec.go index 4b25b8c7fa..734f8bbe38 100644 --- a/pkg/store/postings_codec.go +++ b/pkg/store/postings_codec.go @@ -25,7 +25,7 @@ const ( codecHeaderSnappy = "dvs" // As in "diff+varint+snappy". ) -// isDiffVarintSnappyEncodedPostings returns true, if input looks like it has been encoded by diff+varint(+snappy) codec. +// isDiffVarintSnappyEncodedPostings returns true, if input looks like it has been encoded by diff+varint+snappy codec. func isDiffVarintSnappyEncodedPostings(input []byte) bool { return bytes.HasPrefix(input, []byte(codecHeaderSnappy)) } From 5c7dd1b8557b41787f7cd3bd0a5399c812e00455 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 23 Mar 2020 21:42:29 +0100 Subject: [PATCH 27/28] Fixed comment. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/postings_codec.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/store/postings_codec.go b/pkg/store/postings_codec.go index 734f8bbe38..246d8bab81 100644 --- a/pkg/store/postings_codec.go +++ b/pkg/store/postings_codec.go @@ -32,7 +32,7 @@ func isDiffVarintSnappyEncodedPostings(input []byte) bool { // diffVarintSnappyEncode encodes postings into diff+varint representation, // and applies snappy compression on the result. -// Returned byte slice starts with codedHeaderSnappy header. +// Returned byte slice starts with codecHeaderSnappy header. func diffVarintSnappyEncode(p index.Postings) ([]byte, error) { buf, err := diffVarintEncodeNoHeader(p) if err != nil { From 6a8d5b63b1d088ac6a214e4d6ac0ec1d27ca324f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Tue, 24 Mar 2020 14:43:43 +0100 Subject: [PATCH 28/28] Use encode/decode labels. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/bucket.go | 53 ++++++++++++++++----------------------------- 1 file changed, 19 insertions(+), 34 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index b2722e0409..81dcb4ff1e 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -98,14 +98,11 @@ type bucketStoreMetrics struct { queriesLimit prometheus.Gauge seriesRefetches prometheus.Counter - cachedPostingsCompressions prometheus.Counter - cachedPostingsCompressionErrors prometheus.Counter - cachedPostingsOriginalSizeBytes prometheus.Counter - cachedPostingsCompressedSizeBytes prometheus.Counter - cachedPostingsCompressionTimeSeconds prometheus.Counter - cachedPostingsDecompressions prometheus.Counter - cachedPostingsDecompressionErrors prometheus.Counter - cachedPostingsDecompressionTimeSeconds prometheus.Counter + cachedPostingsCompressions *prometheus.CounterVec + cachedPostingsCompressionErrors *prometheus.CounterVec + cachedPostingsCompressionTimeSeconds *prometheus.CounterVec + cachedPostingsOriginalSizeBytes prometheus.Counter + cachedPostingsCompressedSizeBytes prometheus.Counter } func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { @@ -190,14 +187,18 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { Help: fmt.Sprintf("Total number of cases where %v bytes was not enough was to fetch series from index, resulting in refetch.", maxSeriesSize), }) - m.cachedPostingsCompressions = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + m.cachedPostingsCompressions = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_bucket_store_cached_postings_compressions_total", Help: "Number of postings compressions before storing to index cache.", - }) - m.cachedPostingsCompressionErrors = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + }, []string{"op"}) + m.cachedPostingsCompressionErrors = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_bucket_store_cached_postings_compression_errors_total", Help: "Number of postings compression errors.", - }) + }, []string{"op"}) + m.cachedPostingsCompressionTimeSeconds = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_bucket_store_cached_postings_compression_time_seconds", + Help: "Time spent compressing postings before storing them into postings cache.", + }, []string{"op"}) m.cachedPostingsOriginalSizeBytes = promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_bucket_store_cached_postings_original_size_bytes_total", Help: "Original size of postings stored into cache.", @@ -206,22 +207,6 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { Name: "thanos_bucket_store_cached_postings_compressed_size_bytes_total", Help: "Compressed size of postings stored into cache.", }) - m.cachedPostingsCompressionTimeSeconds = promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_bucket_store_cached_postings_compression_time_seconds", - Help: "Time spent compressing postings before storing them into postings cache.", - }) - m.cachedPostingsDecompressions = promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_bucket_store_cached_postings_decompressions_total", - Help: "Number of postings decompressions after reading from index cache.", - }) - m.cachedPostingsDecompressionErrors = promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_bucket_store_cached_postings_decompression_errors_total", - Help: "Number of postings decompression errors.", - }) - m.cachedPostingsDecompressionTimeSeconds = promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_bucket_store_cached_postings_decompression_time_seconds", - Help: "Time spent decompressing postings. Decoding is not included.", - }) return &m } @@ -947,14 +932,14 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie s.metrics.seriesDataSizeTouched.WithLabelValues("chunks").Observe(float64(stats.chunksTouchedSizeSum)) s.metrics.seriesDataSizeFetched.WithLabelValues("chunks").Observe(float64(stats.chunksFetchedSizeSum)) s.metrics.resultSeriesCount.Observe(float64(stats.mergedSeriesCount)) - s.metrics.cachedPostingsCompressions.Add(float64(stats.cachedPostingsCompressions)) - s.metrics.cachedPostingsCompressionErrors.Add(float64(stats.cachedPostingsCompressionErrors)) + s.metrics.cachedPostingsCompressions.WithLabelValues("encode").Add(float64(stats.cachedPostingsCompressions)) + s.metrics.cachedPostingsCompressions.WithLabelValues("decode").Add(float64(stats.cachedPostingsDecompressions)) + s.metrics.cachedPostingsCompressionErrors.WithLabelValues("encode").Add(float64(stats.cachedPostingsCompressionErrors)) + s.metrics.cachedPostingsCompressionErrors.WithLabelValues("decode").Add(float64(stats.cachedPostingsDecompressionErrors)) + s.metrics.cachedPostingsCompressionTimeSeconds.WithLabelValues("encode").Add(stats.cachedPostingsCompressionTimeSum.Seconds()) + s.metrics.cachedPostingsCompressionTimeSeconds.WithLabelValues("decode").Add(stats.cachedPostingsDecompressionTimeSum.Seconds()) s.metrics.cachedPostingsOriginalSizeBytes.Add(float64(stats.cachedPostingsOriginalSizeSum)) s.metrics.cachedPostingsCompressedSizeBytes.Add(float64(stats.cachedPostingsCompressedSizeSum)) - s.metrics.cachedPostingsCompressionTimeSeconds.Add(stats.cachedPostingsCompressionTimeSum.Seconds()) - s.metrics.cachedPostingsDecompressions.Add(float64(stats.cachedPostingsDecompressions)) - s.metrics.cachedPostingsDecompressionErrors.Add(float64(stats.cachedPostingsDecompressionErrors)) - s.metrics.cachedPostingsDecompressionTimeSeconds.Add(stats.cachedPostingsDecompressionTimeSum.Seconds()) level.Debug(s.logger).Log("msg", "stats query processed", "stats", fmt.Sprintf("%+v", stats), "err", err)