Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pull] main from thanos-io:main #438

Merged
merged 4 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6690](https://github.com/thanos-io/thanos/pull/6690) Store: *breaking :warning:* Add tenant label to relevant exported metrics. Note that this change may cause some pre-existing dashboard queries to be incorrect due to the added label.
- [#6530](https://github.com/thanos-io/thanos/pull/6530) / [#6690](https://github.com/thanos-io/thanos/pull/6690) Query: Add command line arguments for configuring tenants and forward tenant information to Store Gateway.
- [#6765](https://github.com/thanos-io/thanos/pull/6765) Index Cache: Add `enabled_items` to index cache config to selectively cache configured items. Available item types are `Postings`, `Series` and `ExpandedPostings`.
- [#6773](https://github.com/thanos-io/thanos/pull/6773) Index Cache: Add `ttl` to control the ttl to store items in remote index caches like memcached and redis.

### Changed

Expand Down
6 changes: 6 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -292,13 +292,15 @@ config:
max_size: 0
max_item_size: 0
enabled_items: []
ttl: 0s
```

All the settings are **optional**:

- `max_size`: overall maximum number of bytes cache can contain. The value should be specified with a bytes unit (ie. `250MB`).
- `max_item_size`: maximum size of single item, in bytes. The value should be specified with a bytes unit (ie. `125MB`).
- `enabled_items`: selectively choose what types of items to cache. Supported values are `Postings`, `Series` and `ExpandedPostings`. By default, all items are cached.
- `ttl`: this field doesn't do anything for inmemory cache.

### Memcached index cache

Expand All @@ -318,6 +320,7 @@ config:
dns_provider_update_interval: 0s
auto_discovery: false
enabled_items: []
ttl: 0s
```

The **required** settings are:
Expand All @@ -336,6 +339,7 @@ While the remaining settings are **optional**:
- `dns_provider_update_interval`: the DNS discovery update interval.
- `auto_discovery`: whether to use the auto-discovery mechanism for memcached.
- `enabled_items`: selectively choose what types of items to cache. Supported values are `Postings`, `Series` and `ExpandedPostings`. By default, all items are cached.
- `ttl`: ttl to store index cache items in memcached.

### Redis index cache

Expand Down Expand Up @@ -367,6 +371,7 @@ config:
max_async_buffer_size: 10000
max_async_concurrency: 20
enabled_items: []
ttl: 0s
```

The **required** settings are:
Expand All @@ -383,6 +388,7 @@ While the remaining settings are **optional**:
- `write_timeout`: the redis write timeout.
- `cache_size` size of the in-memory cache used for client-side caching. Client-side caching is enabled when this value is not zero. See [official documentation](https://redis.io/docs/manual/client-side-caching/) for more. It is highly recommended to enable this so that Thanos Store would not need to continuously retrieve data from Redis for repeated requests of the same key(-s).
- `enabled_items`: selectively choose what types of items to cache. Supported values are `Postings`, `Series` and `ExpandedPostings`. By default, all items are cached.
- `ttl`: ttl to store index cache items in redis.

Here is an example of what effect client-side caching could have:

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ require (
github.com/prometheus/common v0.44.0
github.com/prometheus/exporter-toolkit v0.10.0
// Prometheus maps version 2.x.y to tags v0.x.y.
github.com/prometheus/prometheus v0.47.2-0.20231006112807-a5a4eab679cc
github.com/prometheus/prometheus v0.47.2-0.20231009162353-f6d9c84fde6b
github.com/sony/gobreaker v0.5.0
github.com/stretchr/testify v1.8.4
github.com/thanos-io/objstore v0.0.0-20230921130928-63a603e651ed
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -826,8 +826,8 @@ github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1
github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4=
github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI=
github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY=
github.com/prometheus/prometheus v0.47.2-0.20231006112807-a5a4eab679cc h1:4WALucWqPlyAuHqe8Z6nagJhdooeI7kotaZSf4SgBQA=
github.com/prometheus/prometheus v0.47.2-0.20231006112807-a5a4eab679cc/go.mod h1:UC0TwJiF90m2T3iYPQBKnGu8gv3s55dF/EgpTq8gyvo=
github.com/prometheus/prometheus v0.47.2-0.20231009162353-f6d9c84fde6b h1:oiCf/rFBXXaDLyiK1MnMKYlSA7Xm2+SQePvXnl8bNUI=
github.com/prometheus/prometheus v0.47.2-0.20231009162353-f6d9c84fde6b/go.mod h1:UC0TwJiF90m2T3iYPQBKnGu8gv3s55dF/EgpTq8gyvo=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/redis/rueidis v1.0.14-go1.18 h1:dGir5z8w8X1ex7JWO/Zx2FMBrZgQ8Yjm+lw9fPLSNGw=
github.com/redis/rueidis v1.0.14-go1.18/go.mod h1:HGekzV3HbmzFmRK6j0xic8Z9119+ECoGMjeN1TV1NYU=
Expand Down
12 changes: 8 additions & 4 deletions pkg/block/indexheader/parallel_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ import (
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/tsdb/fileutil"
"golang.org/x/sync/errgroup"

"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/runutil"
"golang.org/x/sync/errgroup"
)

// partitionSize is used for splitting range reads.
Expand Down Expand Up @@ -66,14 +67,17 @@ func (b *parallelBucketReader) GetRange(ctx context.Context, name string, off in
}
parts = append(parts, part)

// Assign partId to another variable to avoid modifying
// `partId` concurrently in multiple goroutines.
idx := partId
g.Go(func() error {
rc, err := b.BucketReader.GetRange(gctx, name, partOff, partLength)
defer runutil.CloseWithErrCapture(&err, rc, "close object")
if err != nil {
return errors.Wrap(err, fmt.Sprintf("get range part %v", partId))
return errors.Wrap(err, fmt.Sprintf("get range part %v", idx))
}
defer runutil.CloseWithErrCapture(&err, rc, "close object")
if _, err := io.Copy(part, rc); err != nil {
return errors.Wrap(err, fmt.Sprintf("get range part %v", partId))
return errors.Wrap(err, fmt.Sprintf("get range part %v", idx))
}
return part.Flush()
})
Expand Down
54 changes: 41 additions & 13 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,8 @@ type BucketStore struct {

enabledLazyExpandedPostings bool

sortingStrategy sortingStrategy

blockEstimatedMaxSeriesFunc BlockEstimator
blockEstimatedMaxChunkFunc BlockEstimator
}
Expand Down Expand Up @@ -521,6 +523,15 @@ func WithLazyExpandedPostings(enabled bool) BucketStoreOption {
}
}

// WithDontResort disables series resorting in Store Gateway.
func WithDontResort(true bool) BucketStoreOption {
return func(s *BucketStore) {
if true {
s.sortingStrategy = sortingStrategyNone
}
}
}

// NewBucketStore creates a new bucket backed store that implements the store API against
// an object store bucket. It is optimized to work against high latency backends.
func NewBucketStore(
Expand Down Expand Up @@ -563,6 +574,7 @@ func NewBucketStore(
enableSeriesResponseHints: enableSeriesResponseHints,
enableChunkHashCalculation: enableChunkHashCalculation,
seriesBatchSize: SeriesBatchSize,
sortingStrategy: sortingStrategyStore,
}

for _, option := range options {
Expand Down Expand Up @@ -1498,19 +1510,35 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
return errors.Wrapf(err, "fetch postings for block %s", blk.meta.ULID)
}

resp := newEagerRespSet(
srv.Context(),
span,
10*time.Minute,
blk.meta.ULID.String(),
[]labels.Labels{blk.extLset},
onClose,
blockClient,
shardMatcher,
false,
s.metrics.emptyPostingCount.WithLabelValues(tenant),
nil,
)
var resp respSet
if s.sortingStrategy == sortingStrategyStore {
resp = newEagerRespSet(
srv.Context(),
span,
10*time.Minute,
blk.meta.ULID.String(),
[]labels.Labels{blk.extLset},
onClose,
blockClient,
shardMatcher,
false,
s.metrics.emptyPostingCount.WithLabelValues(tenant),
nil,
)
} else {
resp = newLazyRespSet(
srv.Context(),
span,
10*time.Minute,
blk.meta.ULID.String(),
[]labels.Labels{blk.extLset},
onClose,
blockClient,
shardMatcher,
false,
s.metrics.emptyPostingCount.WithLabelValues(tenant),
)
}

mtx.Lock()
respSets = append(respSets, resp)
Expand Down
12 changes: 10 additions & 2 deletions pkg/store/cache/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package storecache
import (
"fmt"
"strings"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -31,6 +32,9 @@ type IndexCacheConfig struct {

// Available item types are Postings, Series and ExpandedPostings.
EnabledItems []string `yaml:"enabled_items"`
// TTL for storing items in remote cache. Not supported for inmemory cache.
// Default value is 24h.
TTL time.Duration `yaml:"ttl"`
}

// NewIndexCache initializes and returns new index cache.
Expand All @@ -47,6 +51,10 @@ func NewIndexCache(logger log.Logger, confContentYaml []byte, reg prometheus.Reg
return nil, errors.Wrap(err, "marshal content of cache backend configuration")
}

if cacheConfig.TTL == 0 {
cacheConfig.TTL = memcachedDefaultTTL
}

var cache IndexCache
switch strings.ToUpper(string(cacheConfig.Type)) {
case string(INMEMORY):
Expand All @@ -55,13 +63,13 @@ func NewIndexCache(logger log.Logger, confContentYaml []byte, reg prometheus.Reg
var memcached cacheutil.RemoteCacheClient
memcached, err = cacheutil.NewMemcachedClient(logger, "index-cache", backendConfig, reg)
if err == nil {
cache, err = NewRemoteIndexCache(logger, memcached, cacheMetrics, reg)
cache, err = NewRemoteIndexCache(logger, memcached, cacheMetrics, reg, cacheConfig.TTL)
}
case string(REDIS):
var redisCache cacheutil.RemoteCacheClient
redisCache, err = cacheutil.NewRedisClient(logger, "index-cache", backendConfig, reg)
if err == nil {
cache, err = NewRemoteIndexCache(logger, redisCache, cacheMetrics, reg)
cache, err = NewRemoteIndexCache(logger, redisCache, cacheMetrics, reg, cacheConfig.TTL)
}
default:
return nil, errors.Errorf("index cache with type %s is not supported", cacheConfig.Type)
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/cache/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestIndexCacheMetrics(t *testing.T) {
commonMetrics := newCommonMetrics(reg)

memcached := newMockedMemcachedClient(nil)
_, err := NewRemoteIndexCache(log.NewNopLogger(), memcached, commonMetrics, reg)
_, err := NewRemoteIndexCache(log.NewNopLogger(), memcached, commonMetrics, reg, memcachedDefaultTTL)
testutil.Ok(t, err)
conf := []byte(`
max_size: 10MB
Expand Down
13 changes: 8 additions & 5 deletions pkg/store/cache/memcached.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type RemoteIndexCache struct {

compressionScheme string

ttl time.Duration

// Metrics.
requestTotal *prometheus.CounterVec
hitsTotal *prometheus.CounterVec
Expand All @@ -41,8 +43,9 @@ type RemoteIndexCache struct {
}

// NewRemoteIndexCache makes a new RemoteIndexCache.
func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheClient, commonMetrics *commonMetrics, reg prometheus.Registerer) (*RemoteIndexCache, error) {
func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheClient, commonMetrics *commonMetrics, reg prometheus.Registerer, ttl time.Duration) (*RemoteIndexCache, error) {
c := &RemoteIndexCache{
ttl: ttl,
logger: logger,
memcached: cacheClient,
compressionScheme: compressionSchemeStreamedSnappy, // Hardcode it for now. Expose it once we support different types of compressions.
Expand Down Expand Up @@ -81,7 +84,7 @@ func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheCli
func (c *RemoteIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) {
c.dataSizeBytes.WithLabelValues(cacheTypePostings, tenant).Observe(float64(len(v)))
key := cacheKey{blockID.String(), cacheKeyPostings(l), c.compressionScheme}.string()
if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil {
if err := c.memcached.SetAsync(key, v, c.ttl); err != nil {
level.Error(c.logger).Log("msg", "failed to cache postings in memcached", "err", err)
}
}
Expand Down Expand Up @@ -134,7 +137,7 @@ func (c *RemoteIndexCache) StoreExpandedPostings(blockID ulid.ULID, keys []*labe
c.dataSizeBytes.WithLabelValues(cacheTypeExpandedPostings, tenant).Observe(float64(len(v)))
key := cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(keys)), c.compressionScheme}.string()

if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil {
if err := c.memcached.SetAsync(key, v, c.ttl); err != nil {
level.Error(c.logger).Log("msg", "failed to cache expanded postings in memcached", "err", err)
}
}
Expand Down Expand Up @@ -168,7 +171,7 @@ func (c *RemoteIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef,
c.dataSizeBytes.WithLabelValues(cacheTypeSeries, tenant).Observe(float64(len(v)))
key := cacheKey{blockID.String(), cacheKeySeries(id), ""}.string()

if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil {
if err := c.memcached.SetAsync(key, v, c.ttl); err != nil {
level.Error(c.logger).Log("msg", "failed to cache series in memcached", "err", err)
}
}
Expand Down Expand Up @@ -215,5 +218,5 @@ func (c *RemoteIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.UL

// NewMemcachedIndexCache is alias NewRemoteIndexCache for compatible.
func NewMemcachedIndexCache(logger log.Logger, memcached cacheutil.RemoteCacheClient, reg prometheus.Registerer) (*RemoteIndexCache, error) {
return NewRemoteIndexCache(logger, memcached, nil, reg)
return NewRemoteIndexCache(logger, memcached, nil, reg, memcachedDefaultTTL)
}
6 changes: 3 additions & 3 deletions pkg/store/cache/memcached_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestMemcachedIndexCache_FetchMultiPostings(t *testing.T) {
for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
memcached := newMockedMemcachedClient(testData.mockedErr)
c, err := NewRemoteIndexCache(log.NewNopLogger(), memcached, nil, nil)
c, err := NewRemoteIndexCache(log.NewNopLogger(), memcached, nil, nil, memcachedDefaultTTL)
testutil.Ok(t, err)

// Store the postings expected before running the test.
Expand Down Expand Up @@ -169,7 +169,7 @@ func TestMemcachedIndexCache_FetchExpandedPostings(t *testing.T) {
for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
memcached := newMockedMemcachedClient(testData.mockedErr)
c, err := NewRemoteIndexCache(log.NewNopLogger(), memcached, nil, nil)
c, err := NewRemoteIndexCache(log.NewNopLogger(), memcached, nil, nil, memcachedDefaultTTL)
testutil.Ok(t, err)

// Store the postings expected before running the test.
Expand Down Expand Up @@ -264,7 +264,7 @@ func TestMemcachedIndexCache_FetchMultiSeries(t *testing.T) {
for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
memcached := newMockedMemcachedClient(testData.mockedErr)
c, err := NewRemoteIndexCache(log.NewNopLogger(), memcached, nil, nil)
c, err := NewRemoteIndexCache(log.NewNopLogger(), memcached, nil, nil, memcachedDefaultTTL)
testutil.Ok(t, err)

// Store the series expected before running the test.
Expand Down