Skip to content

Commit

Permalink
Chunks caching at bucket level (#2532)
Browse files Browse the repository at this point in the history
* Added generic cache interface.

Signed-off-by: Peter Štibraný <[email protected]>

* Added memcached implementation of Cache.

Signed-off-by: Peter Štibraný <[email protected]>

* Chunks-caching bucket.

Signed-off-by: Peter Štibraný <[email protected]>

* Fix sentences

Signed-off-by: Peter Štibraný <[email protected]>

* Fix sentences

Signed-off-by: Peter Štibraný <[email protected]>

* Fix sentences

Signed-off-by: Peter Štibraný <[email protected]>

* Rename config objects.

Signed-off-by: Peter Štibraný <[email protected]>

* Review feedback.

Signed-off-by: Peter Štibraný <[email protected]>

* Review feedback.

Signed-off-by: Peter Štibraný <[email protected]>

* Added metrics for object size.

Signed-off-by: Peter Štibraný <[email protected]>

* Added requested chunk bytes metric.

Signed-off-by: Peter Štibraný <[email protected]>

* Caching bucket docs.

Signed-off-by: Peter Štibraný <[email protected]>

* Fixed tests.

Signed-off-by: Peter Štibraný <[email protected]>

* Fix test.

Signed-off-by: Peter Štibraný <[email protected]>

* Update docs/components/store.md
Update pkg/store/cache/caching_bucket.go

Co-authored-by: Marco Pracucci <[email protected]>
Signed-off-by: Peter Štibraný <[email protected]>

* Dots

Signed-off-by: Peter Štibraný <[email protected]>

* Always set lastBlockOffset.

Signed-off-by: Peter Štibraný <[email protected]>

* Merged cached metric into fetched metric, added labels.

Signed-off-by: Peter Štibraný <[email protected]>

* Added CHANGELOG.md entry

Signed-off-by: Peter Štibraný <[email protected]>

* Reworded help for thanos_store_bucket_cache_fetched_chunk_bytes_total

Signed-off-by: Peter Štibraný <[email protected]>

* Added tracing around getRangeChunkFile method.

Signed-off-by: Peter Štibraný <[email protected]>

* Updated CHANGELOG.md

Signed-off-by: Peter Štibraný <[email protected]>

* Options

Signed-off-by: Peter Štibraný <[email protected]>

* Fix parameter name. (store. got dropped by accident)

Signed-off-by: Peter Štibraný <[email protected]>

* Use embedded Bucket

Signed-off-by: Peter Štibraný <[email protected]>

* Added comments.

Signed-off-by: Peter Štibraný <[email protected]>

* Fixed comment.

Signed-off-by: Peter Štibraný <[email protected]>

* Hide store.caching-bucket.config flags.

Signed-off-by: Peter Štibraný <[email protected]>

* Renamed block to subrange.

Signed-off-by: Peter Štibraný <[email protected]>

* Renamed block to subrange.

Signed-off-by: Peter Štibraný <[email protected]>

* Header

Signed-off-by: Peter Štibraný <[email protected]>

* Added TODO

Signed-off-by: Peter Štibraný <[email protected]>

* Removed TODO, in favor of creating issue.

Signed-off-by: Peter Štibraný <[email protected]>

* Use NopCloser.

Signed-off-by: Peter Štibraný <[email protected]>

Co-authored-by: Marco Pracucci <[email protected]>
  • Loading branch information
pstibrany and pracucci authored May 7, 2020
1 parent cca0ce2 commit 8cb72e5
Show file tree
Hide file tree
Showing 10 changed files with 1,116 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel

- [#2502](https://github.com/thanos-io/thanos/pull/2502) Added `hints` field to `SeriesResponse`. Hints in an opaque data structure that can be used to carry additional information from the store and its content is implementation specific.
- [#2521](https://github.com/thanos-io/thanos/pull/2521) Sidecar: add `thanos_sidecar_reloader_reloads_failed_total`, `thanos_sidecar_reloader_reloads_total`, `thanos_sidecar_reloader_watch_errors_total`, `thanos_sidecar_reloader_watch_events_total` and `thanos_sidecar_reloader_watches` metrics.
- [#2532](https://github.com/thanos-io/thanos/pull/2532) Store: Added hidden option for experimental caching bucket, that can cache chunks into shared memcached. This can speed up querying and reduce number of requests to object storage.

### Changed

Expand Down
17 changes: 17 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
"YAML file that contains index cache configuration. See format details: https://thanos.io/components/store.md/#index-cache",
false)

cachingBucketConfig := extflag.RegisterPathOrContent(extflag.HiddenCmdClause(cmd), "store.caching-bucket.config",
"YAML that contains configuration for caching bucket. Experimental feature, with high risk of changes. See format details: https://thanos.io/components/store.md/#caching-bucket",
false)

chunkPoolSize := cmd.Flag("chunk-pool-size", "Maximum size of concurrently allocatable bytes reserved strictly to reuse for chunks in memory.").
Default("2GB").Bytes()

Expand Down Expand Up @@ -149,6 +153,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
*webExternalPrefix,
*webPrefixHeaderName,
*postingOffsetsInMemSampling,
cachingBucketConfig,
)
}
}
Expand Down Expand Up @@ -179,6 +184,7 @@ func runStore(
ignoreDeletionMarksDelay time.Duration,
externalPrefix, prefixHeader string,
postingOffsetsInMemSampling int,
cachingBucketConfig *extflag.PathOrContent,
) error {
grpcProbe := prober.NewGRPC()
httpProbe := prober.NewHTTP()
Expand Down Expand Up @@ -214,6 +220,17 @@ func runStore(
return errors.Wrap(err, "create bucket client")
}

cachingBucketConfigYaml, err := cachingBucketConfig.Content()
if err != nil {
return errors.Wrap(err, "get caching bucket configuration")
}
if len(cachingBucketConfigYaml) > 0 {
bkt, err = storecache.NewCachingBucketFromYaml(cachingBucketConfigYaml, bkt, logger, reg)
if err != nil {
return errors.Wrap(err, "create caching bucket")
}
}

relabelContentYaml, err := selectorRelabelConf.Content()
if err != nil {
return errors.Wrap(err, "get content of relabel configuration")
Expand Down
28 changes: 28 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,34 @@ While the remaining settings are **optional**:
- `max_item_size`: maximum size of an item to be stored in memcached. This option should be set to the same value of memcached `-I` flag (defaults to 1MB) in order to avoid wasting network round trips to store items larger than the max item size allowed in memcached. If set to `0`, the item size is unlimited.
- `dns_provider_update_interval`: the DNS discovery update interval.

## Caching Bucket

Thanos Store Gateway supports a "caching bucket" with chunks caching to speed up loading of chunks from TSDB blocks. Currently only memcached "backend" is supported:

```yaml
backend: memcached
backend_config:
addresses:
- localhost:11211
caching_config:
chunk_subrange_size: 16000
max_chunks_get_range_requests: 3
chunk_object_size_ttl: 24h
chunk_subrange_ttl: 24h
```

`backend_config` field for memcached supports all the same configuration as memcached for [index cache](#memcached-index-cache).

`caching_config` is a configuration for chunks cache and supports the following optional settings:

- `chunk_subrange_size`: size of segment of chunks object that is stored to the cache. This is the smallest unit that chunks cache is working with.
- `max_chunks_get_range_requests`: how many "get range" sub-requests may cache perform to fetch missing subranges.
- `chunk_object_size_ttl`: how long to keep information about chunk file length in the cache.
- `chunk_subrange_ttl`: how long to keep individual subranges in the cache.

Note that chunks cache is an experimental feature, and these fields may be renamed or removed completely in the future.

## Index Header

In order to query series inside blocks from object storage, Store Gateway has to know certain initial info about each block such as:
Expand Down
21 changes: 21 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package cache

import (
"context"
"time"
)

// Generic best-effort cache.
type Cache interface {
// Store data into the cache.
//
// Note that individual byte buffers may be retained by the cache!
Store(ctx context.Context, data map[string][]byte, ttl time.Duration)

// Fetch multiple keys from cache. Returns map of input keys to data.
// If key isn't in the map, data for given key was not found.
Fetch(ctx context.Context, keys []string) map[string][]byte
}
83 changes: 83 additions & 0 deletions pkg/cache/memcached.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package cache

import (
"context"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/thanos-io/thanos/pkg/cacheutil"
)

// MemcachedCache is a memcached-based cache.
type MemcachedCache struct {
logger log.Logger
memcached cacheutil.MemcachedClient

// Metrics.
requests prometheus.Counter
hits prometheus.Counter
}

// NewMemcachedCache makes a new MemcachedCache.
func NewMemcachedCache(name string, logger log.Logger, memcached cacheutil.MemcachedClient, reg prometheus.Registerer) *MemcachedCache {
c := &MemcachedCache{
logger: logger,
memcached: memcached,
}

c.requests = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_cache_memcached_requests_total",
Help: "Total number of items requests to memcached.",
ConstLabels: prometheus.Labels{"name": name},
})

c.hits = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_cache_memcached_hits_total",
Help: "Total number of items requests to the cache that were a hit.",
ConstLabels: prometheus.Labels{"name": name},
})

level.Info(logger).Log("msg", "created memcached cache")

return c
}

// Store data identified by keys.
// The function enqueues the request and returns immediately: the entry will be
// asynchronously stored in the cache.
func (c *MemcachedCache) Store(ctx context.Context, data map[string][]byte, ttl time.Duration) {
var (
firstErr error
failed int
)

for key, val := range data {
if err := c.memcached.SetAsync(ctx, key, val, ttl); err != nil {
failed++
if firstErr == nil {
firstErr = err
}
}
}

if firstErr != nil {
level.Warn(c.logger).Log("msg", "failed to store one or more items into memcached", "failed", failed, "firstErr", firstErr)
}
}

// Fetch fetches multiple keys and returns a map containing cache hits, along with a list of missing keys.
// In case of error, it logs and return an empty cache hits map.
func (c *MemcachedCache) Fetch(ctx context.Context, keys []string) map[string][]byte {
// Fetch the keys from memcached in a single request.
c.requests.Add(float64(len(keys)))
results := c.memcached.GetMulti(ctx, keys)
c.hits.Add(float64(len(results)))
return results
}
126 changes: 126 additions & 0 deletions pkg/cache/memcached_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package cache

import (
"context"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/pkg/errors"
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"

"github.com/thanos-io/thanos/pkg/testutil"
)

func TestMemcachedIndexCache(t *testing.T) {
t.Parallel()

// Init some data to conveniently define test cases later one.
key1 := "key1"
key2 := "key2"
key3 := "key3"
value1 := []byte{1}
value2 := []byte{2}
value3 := []byte{3}

tests := map[string]struct {
setup map[string][]byte
mockedErr error
fetchKeys []string
expectedHits map[string][]byte
}{
"should return no hits on empty cache": {
setup: nil,
fetchKeys: []string{key1, key2},
expectedHits: map[string][]byte{},
},
"should return no misses on 100% hit ratio": {
setup: map[string][]byte{
key1: value1,
key2: value2,
key3: value3,
},
fetchKeys: []string{key1},
expectedHits: map[string][]byte{
key1: value1,
},
},
"should return hits and misses on partial hits": {
setup: map[string][]byte{
key1: value1,
key2: value2,
},
fetchKeys: []string{key1, key3},
expectedHits: map[string][]byte{key1: value1},
},
"should return no hits on memcached error": {
setup: map[string][]byte{
key1: value1,
key2: value2,
key3: value3,
},
mockedErr: errors.New("mocked error"),
fetchKeys: []string{key1},
expectedHits: nil,
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
memcached := newMockedMemcachedClient(testData.mockedErr)
c := NewMemcachedCache("test", log.NewNopLogger(), memcached, nil)

// Store the postings expected before running the test.
ctx := context.Background()
c.Store(ctx, testData.setup, time.Hour)

// Fetch postings from cached and assert on it.
hits := c.Fetch(ctx, testData.fetchKeys)
testutil.Equals(t, testData.expectedHits, hits)

// Assert on metrics.
testutil.Equals(t, float64(len(testData.fetchKeys)), prom_testutil.ToFloat64(c.requests))
testutil.Equals(t, float64(len(testData.expectedHits)), prom_testutil.ToFloat64(c.hits))
})
}
}

type mockedMemcachedClient struct {
cache map[string][]byte
mockedGetMultiErr error
}

func newMockedMemcachedClient(mockedGetMultiErr error) *mockedMemcachedClient {
return &mockedMemcachedClient{
cache: map[string][]byte{},
mockedGetMultiErr: mockedGetMultiErr,
}
}

func (c *mockedMemcachedClient) GetMulti(_ context.Context, keys []string) map[string][]byte {
if c.mockedGetMultiErr != nil {
return nil
}

hits := map[string][]byte{}

for _, key := range keys {
if value, ok := c.cache[key]; ok {
hits[key] = value
}
}

return hits
}

func (c *mockedMemcachedClient) SetAsync(_ context.Context, key string, value []byte, _ time.Duration) error {
c.cache[key] = value
return nil
}

func (c *mockedMemcachedClient) Stop() {
// Nothing to do.
}
21 changes: 21 additions & 0 deletions pkg/extflag/hidden.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package extflag

import (
"gopkg.in/alecthomas/kingpin.v2"
)

// HiddenCmdClause returns CmdClause that hides created flags.
func HiddenCmdClause(c CmdClause) CmdClause {
return hidden{c: c}
}

type hidden struct {
c CmdClause
}

func (h hidden) Flag(name, help string) *kingpin.FlagClause {
return h.c.Flag(name, help).Hidden()
}
Loading

0 comments on commit 8cb72e5

Please sign in to comment.