Skip to content

Commit

Permalink
make index cache ttl configurable (thanos-io#6773)
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 authored Oct 9, 2023
1 parent 9154323 commit 06020df
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6690](https://github.com/thanos-io/thanos/pull/6690) Store: *breaking :warning:* Add tenant label to relevant exported metrics. Note that this change may cause some pre-existing dashboard queries to be incorrect due to the added label.
- [#6530](https://github.com/thanos-io/thanos/pull/6530) / [#6690](https://github.com/thanos-io/thanos/pull/6690) Query: Add command line arguments for configuring tenants and forward tenant information to Store Gateway.
- [#6765](https://github.com/thanos-io/thanos/pull/6765) Index Cache: Add `enabled_items` to index cache config to selectively cache configured items. Available item types are `Postings`, `Series` and `ExpandedPostings`.
- [#6773](https://github.com/thanos-io/thanos/pull/6773) Index Cache: Add `ttl` to control the ttl to store items in remote index caches like memcached and redis.

### Changed

Expand Down
6 changes: 6 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -292,13 +292,15 @@ config:
max_size: 0
max_item_size: 0
enabled_items: []
ttl: 0s
```

All the settings are **optional**:

- `max_size`: overall maximum number of bytes cache can contain. The value should be specified with a bytes unit (ie. `250MB`).
- `max_item_size`: maximum size of single item, in bytes. The value should be specified with a bytes unit (ie. `125MB`).
- `enabled_items`: selectively choose what types of items to cache. Supported values are `Postings`, `Series` and `ExpandedPostings`. By default, all items are cached.
- `ttl`: this field doesn't do anything for inmemory cache.

### Memcached index cache

Expand All @@ -318,6 +320,7 @@ config:
dns_provider_update_interval: 0s
auto_discovery: false
enabled_items: []
ttl: 0s
```

The **required** settings are:
Expand All @@ -336,6 +339,7 @@ While the remaining settings are **optional**:
- `dns_provider_update_interval`: the DNS discovery update interval.
- `auto_discovery`: whether to use the auto-discovery mechanism for memcached.
- `enabled_items`: selectively choose what types of items to cache. Supported values are `Postings`, `Series` and `ExpandedPostings`. By default, all items are cached.
- `ttl`: ttl to store index cache items in memcached.

### Redis index cache

Expand Down Expand Up @@ -367,6 +371,7 @@ config:
max_async_buffer_size: 10000
max_async_concurrency: 20
enabled_items: []
ttl: 0s
```

The **required** settings are:
Expand All @@ -383,6 +388,7 @@ While the remaining settings are **optional**:
- `write_timeout`: the redis write timeout.
- `cache_size` size of the in-memory cache used for client-side caching. Client-side caching is enabled when this value is not zero. See [official documentation](https://redis.io/docs/manual/client-side-caching/) for more. It is highly recommended to enable this so that Thanos Store would not need to continuously retrieve data from Redis for repeated requests of the same key(-s).
- `enabled_items`: selectively choose what types of items to cache. Supported values are `Postings`, `Series` and `ExpandedPostings`. By default, all items are cached.
- `ttl`: ttl to store index cache items in redis.

Here is an example of what effect client-side caching could have:

Expand Down
12 changes: 10 additions & 2 deletions pkg/store/cache/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package storecache
import (
"fmt"
"strings"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -31,6 +32,9 @@ type IndexCacheConfig struct {

// Available item types are Postings, Series and ExpandedPostings.
EnabledItems []string `yaml:"enabled_items"`
// TTL for storing items in remote cache. Not supported for inmemory cache.
// Default value is 24h.
TTL time.Duration `yaml:"ttl"`
}

// NewIndexCache initializes and returns new index cache.
Expand All @@ -47,6 +51,10 @@ func NewIndexCache(logger log.Logger, confContentYaml []byte, reg prometheus.Reg
return nil, errors.Wrap(err, "marshal content of cache backend configuration")
}

if cacheConfig.TTL == 0 {
cacheConfig.TTL = memcachedDefaultTTL
}

var cache IndexCache
switch strings.ToUpper(string(cacheConfig.Type)) {
case string(INMEMORY):
Expand All @@ -55,13 +63,13 @@ func NewIndexCache(logger log.Logger, confContentYaml []byte, reg prometheus.Reg
var memcached cacheutil.RemoteCacheClient
memcached, err = cacheutil.NewMemcachedClient(logger, "index-cache", backendConfig, reg)
if err == nil {
cache, err = NewRemoteIndexCache(logger, memcached, cacheMetrics, reg)
cache, err = NewRemoteIndexCache(logger, memcached, cacheMetrics, reg, cacheConfig.TTL)
}
case string(REDIS):
var redisCache cacheutil.RemoteCacheClient
redisCache, err = cacheutil.NewRedisClient(logger, "index-cache", backendConfig, reg)
if err == nil {
cache, err = NewRemoteIndexCache(logger, redisCache, cacheMetrics, reg)
cache, err = NewRemoteIndexCache(logger, redisCache, cacheMetrics, reg, cacheConfig.TTL)
}
default:
return nil, errors.Errorf("index cache with type %s is not supported", cacheConfig.Type)
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/cache/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestIndexCacheMetrics(t *testing.T) {
commonMetrics := newCommonMetrics(reg)

memcached := newMockedMemcachedClient(nil)
_, err := NewRemoteIndexCache(log.NewNopLogger(), memcached, commonMetrics, reg)
_, err := NewRemoteIndexCache(log.NewNopLogger(), memcached, commonMetrics, reg, memcachedDefaultTTL)
testutil.Ok(t, err)
conf := []byte(`
max_size: 10MB
Expand Down
13 changes: 8 additions & 5 deletions pkg/store/cache/memcached.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type RemoteIndexCache struct {

compressionScheme string

ttl time.Duration

// Metrics.
requestTotal *prometheus.CounterVec
hitsTotal *prometheus.CounterVec
Expand All @@ -41,8 +43,9 @@ type RemoteIndexCache struct {
}

// NewRemoteIndexCache makes a new RemoteIndexCache.
func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheClient, commonMetrics *commonMetrics, reg prometheus.Registerer) (*RemoteIndexCache, error) {
func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheClient, commonMetrics *commonMetrics, reg prometheus.Registerer, ttl time.Duration) (*RemoteIndexCache, error) {
c := &RemoteIndexCache{
ttl: ttl,
logger: logger,
memcached: cacheClient,
compressionScheme: compressionSchemeStreamedSnappy, // Hardcode it for now. Expose it once we support different types of compressions.
Expand Down Expand Up @@ -81,7 +84,7 @@ func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheCli
func (c *RemoteIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) {
c.dataSizeBytes.WithLabelValues(cacheTypePostings, tenant).Observe(float64(len(v)))
key := cacheKey{blockID.String(), cacheKeyPostings(l), c.compressionScheme}.string()
if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil {
if err := c.memcached.SetAsync(key, v, c.ttl); err != nil {
level.Error(c.logger).Log("msg", "failed to cache postings in memcached", "err", err)
}
}
Expand Down Expand Up @@ -134,7 +137,7 @@ func (c *RemoteIndexCache) StoreExpandedPostings(blockID ulid.ULID, keys []*labe
c.dataSizeBytes.WithLabelValues(cacheTypeExpandedPostings, tenant).Observe(float64(len(v)))
key := cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(keys)), c.compressionScheme}.string()

if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil {
if err := c.memcached.SetAsync(key, v, c.ttl); err != nil {
level.Error(c.logger).Log("msg", "failed to cache expanded postings in memcached", "err", err)
}
}
Expand Down Expand Up @@ -168,7 +171,7 @@ func (c *RemoteIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef,
c.dataSizeBytes.WithLabelValues(cacheTypeSeries, tenant).Observe(float64(len(v)))
key := cacheKey{blockID.String(), cacheKeySeries(id), ""}.string()

if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil {
if err := c.memcached.SetAsync(key, v, c.ttl); err != nil {
level.Error(c.logger).Log("msg", "failed to cache series in memcached", "err", err)
}
}
Expand Down Expand Up @@ -215,5 +218,5 @@ func (c *RemoteIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.UL

// NewMemcachedIndexCache is alias NewRemoteIndexCache for compatible.
func NewMemcachedIndexCache(logger log.Logger, memcached cacheutil.RemoteCacheClient, reg prometheus.Registerer) (*RemoteIndexCache, error) {
return NewRemoteIndexCache(logger, memcached, nil, reg)
return NewRemoteIndexCache(logger, memcached, nil, reg, memcachedDefaultTTL)
}
6 changes: 3 additions & 3 deletions pkg/store/cache/memcached_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestMemcachedIndexCache_FetchMultiPostings(t *testing.T) {
for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
memcached := newMockedMemcachedClient(testData.mockedErr)
c, err := NewRemoteIndexCache(log.NewNopLogger(), memcached, nil, nil)
c, err := NewRemoteIndexCache(log.NewNopLogger(), memcached, nil, nil, memcachedDefaultTTL)
testutil.Ok(t, err)

// Store the postings expected before running the test.
Expand Down Expand Up @@ -169,7 +169,7 @@ func TestMemcachedIndexCache_FetchExpandedPostings(t *testing.T) {
for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
memcached := newMockedMemcachedClient(testData.mockedErr)
c, err := NewRemoteIndexCache(log.NewNopLogger(), memcached, nil, nil)
c, err := NewRemoteIndexCache(log.NewNopLogger(), memcached, nil, nil, memcachedDefaultTTL)
testutil.Ok(t, err)

// Store the postings expected before running the test.
Expand Down Expand Up @@ -264,7 +264,7 @@ func TestMemcachedIndexCache_FetchMultiSeries(t *testing.T) {
for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
memcached := newMockedMemcachedClient(testData.mockedErr)
c, err := NewRemoteIndexCache(log.NewNopLogger(), memcached, nil, nil)
c, err := NewRemoteIndexCache(log.NewNopLogger(), memcached, nil, nil, memcachedDefaultTTL)
testutil.Ok(t, err)

// Store the series expected before running the test.
Expand Down

0 comments on commit 06020df

Please sign in to comment.