Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: added option to reencode and compress postings before storing them to the cache #2297

Merged
merged 28 commits into from
Mar 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
c6e8d7a
Added "diff+varint+snappy" codec for postings.
pstibrany Mar 20, 2020
35a31f9
Added option to reencode and compress postings stored in cache
pstibrany Mar 20, 2020
f480494
Expose enablePostingsCompression flag as CLI parameter.
pstibrany Mar 20, 2020
fb2f0df
Use "github.com/pkg/errors" instead of "errors" package.
pstibrany Mar 20, 2020
e672219
remove break
pstibrany Mar 20, 2020
51e2feb
Removed empty branch
pstibrany Mar 20, 2020
391da98
Added copyright headers.
pstibrany Mar 20, 2020
a57624b
Added CHANGELOG.md entry
pstibrany Mar 20, 2020
1ab0142
Added comments.
pstibrany Mar 21, 2020
eb8fca9
Use Encbuf and Decbuf.
pstibrany Mar 21, 2020
5347e89
Fix comments in test file.
pstibrany Mar 21, 2020
eec1a07
Another comment...
pstibrany Mar 21, 2020
85b6a2b
Removed diffVarintSnappyEncode function.
pstibrany Mar 22, 2020
13adc82
Comment on usage with in-memory cache.
pstibrany Mar 23, 2020
ef74bae
var block
pstibrany Mar 23, 2020
564f6f3
Removed extra comment.
pstibrany Mar 23, 2020
2317b03
Move comment to error message.
pstibrany Mar 23, 2020
e2203aa
Separated snappy compression and postings reencoding into two functions.
pstibrany Mar 23, 2020
eacd394
Added comment on using diff+varint+snappy.
pstibrany Mar 23, 2020
a2ee495
Shorten header
pstibrany Mar 23, 2020
8c438d5
Lint...
pstibrany Mar 23, 2020
22a1338
Changed experimental.enable-postings-compression to experimental.enab…
pstibrany Mar 23, 2020
644d09d
Added metrics for postings compression
pstibrany Mar 23, 2020
c754543
Added metrics for postings decompression
pstibrany Mar 23, 2020
91860cf
Reorder metrics
pstibrany Mar 23, 2020
869ac4d
Fixed comment.
pstibrany Mar 23, 2020
5c7dd1b
Fixed comment.
pstibrany Mar 23, 2020
6a8d5b6
Use encode/decode labels.
pstibrany Mar 24, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#2250](https://github.com/thanos-io/thanos/pull/2250) Compactor: Enable vertical compaction for offline deduplication (Experimental). Uses `--deduplication.replica-label` flag to specify the replica label to deduplicate on (Hidden). Please note that this uses a NAIVE algorithm for merging (no smart replica deduplication, just chaining samples together). This works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication. We plan to add a smarter algorithm in the following weeks.
- [#1714](https://github.com/thanos-io/thanos/pull/1714) Run the bucket web UI in the compact component when it is run as a long-lived process.
- [#2304](https://github.com/thanos-io/thanos/pull/2304) Store: Added `max_item_size` config option to memcached-based index cache. This should be set to the max item size configured in memcached (`-I` flag) in order to not waste network round-trips to cache items larger than the limit configured in memcached.
- [#2297](https://github.com/thanos-io/thanos/pull/2297) Store Gateway: Add `--experimental.enable-index-cache-postings-compression` flag to enable reencoding and compressing postings before storing them into cache. Compressed postings take about 10% of the original size.

### Changed

Expand Down
6 changes: 6 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
enableIndexHeader := cmd.Flag("experimental.enable-index-header", "If true, Store Gateway will recreate index-header instead of index-cache.json for each block. This will replace index-cache.json permanently once it will be out of experimental stage.").
Hidden().Default("false").Bool()

enablePostingsCompression := cmd.Flag("experimental.enable-index-cache-postings-compression", "If true, Store Gateway will reencode and compress postings before storing them into cache. Compressed postings take about 10% of the original size.").
Hidden().Default("false").Bool()

consistencyDelay := modelDuration(cmd.Flag("consistency-delay", "Minimum age of all blocks before they are being read. Set it to safe value (e.g 30m) if your object storage is eventually consistent. GCS and S3 are (roughly) strongly consistent.").
Default("0s"))

Expand Down Expand Up @@ -126,6 +129,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
selectorRelabelConf,
*advertiseCompatibilityLabel,
*enableIndexHeader,
*enablePostingsCompression,
time.Duration(*consistencyDelay),
time.Duration(*ignoreDeletionMarksDelay),
)
Expand Down Expand Up @@ -160,6 +164,7 @@ func runStore(
selectorRelabelConf *extflag.PathOrContent,
advertiseCompatibilityLabel bool,
enableIndexHeader bool,
enablePostingsCompression bool,
consistencyDelay time.Duration,
ignoreDeletionMarksDelay time.Duration,
) error {
Expand Down Expand Up @@ -265,6 +270,7 @@ func runStore(
filterConf,
advertiseCompatibilityLabel,
enableIndexHeader,
enablePostingsCompression,
)
if err != nil {
return errors.Wrap(err, "create object storage store")
Expand Down
141 changes: 126 additions & 15 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ type bucketStoreMetrics struct {
queriesDropped prometheus.Counter
queriesLimit prometheus.Gauge
seriesRefetches prometheus.Counter

cachedPostingsCompressions *prometheus.CounterVec
cachedPostingsCompressionErrors *prometheus.CounterVec
cachedPostingsCompressionTimeSeconds *prometheus.CounterVec
cachedPostingsOriginalSizeBytes prometheus.Counter
cachedPostingsCompressedSizeBytes prometheus.Counter
}

func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
Expand Down Expand Up @@ -180,6 +186,28 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
Name: "thanos_bucket_store_series_refetches_total",
Help: fmt.Sprintf("Total number of cases where %v bytes was not enough was to fetch series from index, resulting in refetch.", maxSeriesSize),
})

m.cachedPostingsCompressions = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_bucket_store_cached_postings_compressions_total",
Help: "Number of postings compressions before storing to index cache.",
}, []string{"op"})
m.cachedPostingsCompressionErrors = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_bucket_store_cached_postings_compression_errors_total",
Help: "Number of postings compression errors.",
}, []string{"op"})
m.cachedPostingsCompressionTimeSeconds = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_bucket_store_cached_postings_compression_time_seconds",
Help: "Time spent compressing postings before storing them into postings cache.",
}, []string{"op"})
m.cachedPostingsOriginalSizeBytes = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_bucket_store_cached_postings_original_size_bytes_total",
Help: "Original size of postings stored into cache.",
})
m.cachedPostingsCompressedSizeBytes = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_bucket_store_cached_postings_compressed_size_bytes_total",
Help: "Compressed size of postings stored into cache.",
})

return &m
}

Expand Down Expand Up @@ -220,6 +248,11 @@ type BucketStore struct {
advLabelSets []storepb.LabelSet
enableCompatibilityLabel bool
enableIndexHeader bool

// Reencode postings using diff+varint+snappy when storing to cache.
// This makes them smaller, but takes extra CPU and memory.
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
// When used with in-memory cache, memory usage should decrease overall, thanks to postings being smaller.
enablePostingsCompression bool
}

// NewBucketStore creates a new bucket backed store that implements the store API against
Expand All @@ -239,6 +272,7 @@ func NewBucketStore(
filterConfig *FilterConfig,
enableCompatibilityLabel bool,
enableIndexHeader bool,
enablePostingsCompression bool,
) (*BucketStore, error) {
if logger == nil {
logger = log.NewNopLogger()
Expand Down Expand Up @@ -270,10 +304,11 @@ func NewBucketStore(
maxConcurrent,
extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg),
),
samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped),
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
enableCompatibilityLabel: enableCompatibilityLabel,
enableIndexHeader: enableIndexHeader,
samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped),
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
enableCompatibilityLabel: enableCompatibilityLabel,
enableIndexHeader: enableIndexHeader,
enablePostingsCompression: enablePostingsCompression,
}
s.metrics = metrics

Expand Down Expand Up @@ -455,6 +490,7 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er
indexHeaderReader,
s.partitioner,
s.metrics.seriesRefetches,
s.enablePostingsCompression,
)
if err != nil {
return errors.Wrap(err, "new bucket block")
Expand Down Expand Up @@ -896,6 +932,14 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
s.metrics.seriesDataSizeTouched.WithLabelValues("chunks").Observe(float64(stats.chunksTouchedSizeSum))
s.metrics.seriesDataSizeFetched.WithLabelValues("chunks").Observe(float64(stats.chunksFetchedSizeSum))
s.metrics.resultSeriesCount.Observe(float64(stats.mergedSeriesCount))
s.metrics.cachedPostingsCompressions.WithLabelValues("encode").Add(float64(stats.cachedPostingsCompressions))
s.metrics.cachedPostingsCompressions.WithLabelValues("decode").Add(float64(stats.cachedPostingsDecompressions))
s.metrics.cachedPostingsCompressionErrors.WithLabelValues("encode").Add(float64(stats.cachedPostingsCompressionErrors))
s.metrics.cachedPostingsCompressionErrors.WithLabelValues("decode").Add(float64(stats.cachedPostingsDecompressionErrors))
s.metrics.cachedPostingsCompressionTimeSeconds.WithLabelValues("encode").Add(stats.cachedPostingsCompressionTimeSum.Seconds())
s.metrics.cachedPostingsCompressionTimeSeconds.WithLabelValues("decode").Add(stats.cachedPostingsDecompressionTimeSum.Seconds())
s.metrics.cachedPostingsOriginalSizeBytes.Add(float64(stats.cachedPostingsOriginalSizeSum))
s.metrics.cachedPostingsCompressedSizeBytes.Add(float64(stats.cachedPostingsCompressedSizeSum))

level.Debug(s.logger).Log("msg", "stats query processed",
"stats", fmt.Sprintf("%+v", stats), "err", err)
Expand Down Expand Up @@ -1183,6 +1227,8 @@ type bucketBlock struct {
partitioner partitioner

seriesRefetches prometheus.Counter

enablePostingsCompression bool
}

func newBucketBlock(
Expand All @@ -1196,17 +1242,19 @@ func newBucketBlock(
indexHeadReader indexheader.Reader,
p partitioner,
seriesRefetches prometheus.Counter,
enablePostingsCompression bool,
) (b *bucketBlock, err error) {
b = &bucketBlock{
logger: logger,
bkt: bkt,
indexCache: indexCache,
chunkPool: chunkPool,
dir: dir,
partitioner: p,
meta: meta,
indexHeaderReader: indexHeadReader,
seriesRefetches: seriesRefetches,
logger: logger,
bkt: bkt,
indexCache: indexCache,
chunkPool: chunkPool,
dir: dir,
partitioner: p,
meta: meta,
indexHeaderReader: indexHeadReader,
seriesRefetches: seriesRefetches,
enablePostingsCompression: enablePostingsCompression,
}

// Get object handles for all chunk files.
Expand Down Expand Up @@ -1512,7 +1560,24 @@ func (r *bucketIndexReader) fetchPostings(keys []labels.Label) ([]index.Postings
r.stats.postingsTouched++
r.stats.postingsTouchedSizeSum += len(b)

_, l, err := r.dec.Postings(b)
// Even if this instance is not using compression, there may be compressed
// entries in the cache written by other stores.
var (
l index.Postings
err error
)
if isDiffVarintSnappyEncodedPostings(b) {
s := time.Now()
l, err = diffVarintSnappyDecode(b)
r.stats.cachedPostingsDecompressions += 1
r.stats.cachedPostingsDecompressionTimeSum += time.Since(s)
if err != nil {
r.stats.cachedPostingsDecompressionErrors += 1
}
} else {
_, l, err = r.dec.Postings(b)
}

if err != nil {
return nil, errors.Wrap(err, "decode postings")
}
Expand Down Expand Up @@ -1579,15 +1644,43 @@ func (r *bucketIndexReader) fetchPostings(keys []labels.Label) ([]index.Postings
return err
}

dataToCache := pBytes

compressionTime := time.Duration(0)
compressions, compressionErrors, compressedSize := 0, 0, 0

if r.block.enablePostingsCompression {
// Reencode postings before storing to cache. If that fails, we store original bytes.
// This can only fail, if postings data was somehow corrupted,
// and there is nothing we can do about it.
// Errors from corrupted postings will be reported when postings are used.
compressions++
s := time.Now()
data, err := diffVarintSnappyEncode(newBigEndianPostings(pBytes[4:]))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered adding a couple of metrics with postings raw size and the compressed one, to measure the compression ratio in the wild?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added following metrics:

  • number of compressions, and compression failures
  • total original size and compressed size
  • total compression time

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, one more thing... I've moved compression outside of the lock. However, storing data back to cache is still done with lock held, which looks wrong. I haven't fixed that part, because this PR ir not about it, but it looks like it should be moved outside as well. I haven't looked into more details here (like, does it break anything?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also added number of decompressions, decompression errors, and total decompression time (snappy part only, we don't time decoding varints and adding values together)

compressionTime = time.Since(s)
if err == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be worth logging the error, otherwise we'll never found out when it happens.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will know as querying would fail?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was my thinking here... if it we get error here, it means postings was corrupted, so it will still be corrupted few lines later when it's used, and error will be reported there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These errors are now tracked via metric.

dataToCache = data
compressedSize = len(data)
} else {
compressionErrors = 1
}
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
}

r.mtx.Lock()
// Return postings and fill LRU cache.
// Truncate first 4 bytes which are length of posting.
output[p.keyID] = newBigEndianPostings(pBytes[4:])
r.block.indexCache.StorePostings(r.ctx, r.block.meta.ULID, keys[p.keyID], pBytes)

r.block.indexCache.StorePostings(r.ctx, r.block.meta.ULID, keys[p.keyID], dataToCache)

// If we just fetched it we still have to update the stats for touched postings.
r.stats.postingsTouched++
r.stats.postingsTouchedSizeSum += len(pBytes)
r.stats.cachedPostingsCompressions += compressions
r.stats.cachedPostingsCompressionErrors += compressionErrors
r.stats.cachedPostingsOriginalSizeSum += len(pBytes)
r.stats.cachedPostingsCompressedSizeSum += compressedSize
r.stats.cachedPostingsCompressionTimeSum += compressionTime
r.mtx.Unlock()
}
return nil
Expand Down Expand Up @@ -1961,6 +2054,15 @@ type queryStats struct {
postingsFetchCount int
postingsFetchDurationSum time.Duration

cachedPostingsCompressions int
cachedPostingsCompressionErrors int
cachedPostingsOriginalSizeSum int
cachedPostingsCompressedSizeSum int
cachedPostingsCompressionTimeSum time.Duration
cachedPostingsDecompressions int
cachedPostingsDecompressionErrors int
cachedPostingsDecompressionTimeSum time.Duration

seriesTouched int
seriesTouchedSizeSum int
seriesFetched int
Expand Down Expand Up @@ -1991,6 +2093,15 @@ func (s queryStats) merge(o *queryStats) *queryStats {
s.postingsFetchCount += o.postingsFetchCount
s.postingsFetchDurationSum += o.postingsFetchDurationSum

s.cachedPostingsCompressions += o.cachedPostingsCompressions
s.cachedPostingsCompressionErrors += o.cachedPostingsCompressionErrors
s.cachedPostingsOriginalSizeSum += o.cachedPostingsOriginalSizeSum
s.cachedPostingsCompressedSizeSum += o.cachedPostingsCompressedSizeSum
s.cachedPostingsCompressionTimeSum += o.cachedPostingsCompressionTimeSum
s.cachedPostingsDecompressions += o.cachedPostingsDecompressions
s.cachedPostingsDecompressionErrors += o.cachedPostingsDecompressionErrors
s.cachedPostingsDecompressionTimeSum += o.cachedPostingsDecompressionTimeSum

s.seriesTouched += o.seriesTouched
s.seriesTouchedSizeSum += o.seriesTouchedSizeSum
s.seriesFetched += o.seriesFetched
Expand Down
1 change: 1 addition & 0 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
filterConf,
true,
true,
true,
)
testutil.Ok(t, err)
s.store = store
Expand Down
33 changes: 19 additions & 14 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ func TestBucketStore_Info(t *testing.T) {
allowAllFilterConf,
true,
true,
true,
)
testutil.Ok(t, err)

Expand Down Expand Up @@ -731,6 +732,7 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul
allowAllFilterConf,
true,
true,
true,
)
testutil.Ok(t, err)

Expand Down Expand Up @@ -932,7 +934,23 @@ func uploadTestBlock(t testing.TB, tmpDir string, bkt objstore.Bucket, series in

logger := log.NewNopLogger()

app := h.Appender()
appendTestData(t, h.Appender(), series)

testutil.Ok(t, os.MkdirAll(filepath.Join(tmpDir, "tmp"), os.ModePerm))
id := createBlockFromHead(t, filepath.Join(tmpDir, "tmp"), h)

_, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(tmpDir, "tmp", id.String()), metadata.Thanos{
Labels: labels.Labels{{Name: "ext1", Value: "1"}}.Map(),
Downsample: metadata.ThanosDownsample{Resolution: 0},
Source: metadata.TestSource,
}, nil)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, "tmp", id.String())))

return id
}

func appendTestData(t testing.TB, app tsdb.Appender, series int) {
addSeries := func(l labels.Labels) {
_, err := app.Add(l, 0, 0)
testutil.Ok(t, err)
Expand All @@ -950,19 +968,6 @@ func uploadTestBlock(t testing.TB, tmpDir string, bkt objstore.Bucket, series in
}
}
testutil.Ok(t, app.Commit())

testutil.Ok(t, os.MkdirAll(filepath.Join(tmpDir, "tmp"), os.ModePerm))
id := createBlockFromHead(t, filepath.Join(tmpDir, "tmp"), h)

_, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(tmpDir, "tmp", id.String()), metadata.Thanos{
Labels: labels.Labels{{Name: "ext1", Value: "1"}}.Map(),
Downsample: metadata.ThanosDownsample{Resolution: 0},
Source: metadata.TestSource,
}, nil)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, "tmp", id.String())))

return id
}

func createBlockFromHead(t testing.TB, dir string, head *tsdb.Head) ulid.ULID {
Expand Down
Loading