Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support filtered index cache #6765

Merged
merged 12 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6605](https://github.com/thanos-io/thanos/pull/6605) Query Frontend: Support vertical sharding binary expression with metric name when no matching labels specified.
- [#6308](https://github.com/thanos-io/thanos/pull/6308) Ruler: Support configuration flag that allows customizing template for alert message.
- [#6760](https://github.com/thanos-io/thanos/pull/6760) Query Frontend: Added TLS support in `--query-frontend.downstream-tripper-config` and `--query-frontend.downstream-tripper-config-file`
- [#6749](https://github.com/thanos-io/thanos/pull/6308) Store Gateway: Added `thanos_store_index_cache_fetch_duration_seconds` histogram for tracking latency of fetching data from index cache.
- [#6749](https://github.com/thanos-io/thanos/pull/6749) Store Gateway: Added `thanos_store_index_cache_fetch_duration_seconds` histogram for tracking latency of fetching data from index cache.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean to include this entry?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to fix the wrong url

- [#6690](https://github.com/thanos-io/thanos/pull/6690) Store: *breaking :warning:* Add tenant label to relevant exported metrics. Note that this change may cause some pre-existing dashboard queries to be incorrect due to the added label.
- [#6530](https://github.com/thanos-io/thanos/pull/6530) / [#6690](https://github.com/thanos-io/thanos/pull/6690) Query: Add command line arguments for configuring tenants and forward tenant information to Store Gateway.
- [#6765](https://github.com/thanos-io/thanos/pull/6765) Index Cache: Add `enabled_items` to index cache config to selectively cache configured items. Available item types are `Postings`, `Series` and `ExpandedPostings`.

### Changed

Expand Down
6 changes: 6 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -291,12 +291,14 @@ type: IN-MEMORY
config:
max_size: 0
max_item_size: 0
enabled_items: []
saswatamcode marked this conversation as resolved.
Show resolved Hide resolved
```

All the settings are **optional**:

- `max_size`: overall maximum number of bytes cache can contain. The value should be specified with a bytes unit (ie. `250MB`).
- `max_item_size`: maximum size of single item, in bytes. The value should be specified with a bytes unit (ie. `125MB`).
- `enabled_items`: selectively choose what types of items to cache. Supported values are `Postings`, `Series` and `ExpandedPostings`. By default, all items are cached.

### Memcached index cache

Expand All @@ -315,6 +317,7 @@ config:
max_get_multi_batch_size: 0
dns_provider_update_interval: 0s
auto_discovery: false
enabled_items: []
```

The **required** settings are:
Expand All @@ -332,6 +335,7 @@ While the remaining settings are **optional**:
- `max_item_size`: maximum size of an item to be stored in memcached. This option should be set to the same value of memcached `-I` flag (defaults to 1MB) in order to avoid wasting network round trips to store items larger than the max item size allowed in memcached. If set to `0`, the item size is unlimited.
- `dns_provider_update_interval`: the DNS discovery update interval.
- `auto_discovery`: whether to use the auto-discovery mechanism for memcached.
- `enabled_items`: selectively choose what types of items to cache. Supported values are `Postings`, `Series` and `ExpandedPostings`. By default, all items are cached.

### Redis index cache

Expand Down Expand Up @@ -362,6 +366,7 @@ config:
master_name: ""
max_async_buffer_size: 10000
max_async_concurrency: 20
enabled_items: []
```

The **required** settings are:
Expand All @@ -377,6 +382,7 @@ While the remaining settings are **optional**:
- `read_timeout`: the redis read timeout.
- `write_timeout`: the redis write timeout.
- `cache_size` size of the in-memory cache used for client-side caching. Client-side caching is enabled when this value is not zero. See [official documentation](https://redis.io/docs/manual/client-side-caching/) for more. It is highly recommended to enable this so that Thanos Store would not need to continuously retrieve data from Redis for repeated requests of the same key(-s).
- `enabled_items`: selectively choose what types of items to cache. Supported values are `Postings`, `Series` and `ExpandedPostings`. By default, all items are cached.

Here is an example of what effect client-side caching could have:

Expand Down
11 changes: 11 additions & 0 deletions pkg/store/cache/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ const (
type IndexCacheConfig struct {
Type IndexCacheProvider `yaml:"type"`
Config interface{} `yaml:"config"`

// Available item types are Postings, Series and ExpandedPostings.
EnabledItems []string `yaml:"enabled_items"`
saswatamcode marked this conversation as resolved.
Show resolved Hide resolved
}

// NewIndexCache initializes and returns new index cache.
Expand Down Expand Up @@ -66,5 +69,13 @@ func NewIndexCache(logger log.Logger, confContentYaml []byte, reg prometheus.Reg
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("create %s index cache", cacheConfig.Type))
}

if len(cacheConfig.EnabledItems) > 0 {
if err = ValidateEnabledItems(cacheConfig.EnabledItems); err != nil {
return nil, err
}
cache = NewFilteredIndexCache(cache, cacheConfig.EnabledItems)
}

return cache, nil
}
88 changes: 88 additions & 0 deletions pkg/store/cache/filter_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package storecache

import (
"context"
"fmt"

"github.com/oklog/ulid"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"golang.org/x/exp/slices"
)

type FilteredIndexCache struct {
cache IndexCache
enabledItems []string
saswatamcode marked this conversation as resolved.
Show resolved Hide resolved
}

// NewFilteredIndexCache creates a filtered index cache based on enabled items.
func NewFilteredIndexCache(cache IndexCache, enabledItems []string) *FilteredIndexCache {
return &FilteredIndexCache{
cache: cache,
enabledItems: enabledItems,
}
}

// StorePostings sets the postings identified by the ulid and label to the value v,
// if the postings already exists in the cache it is not mutated.
func (c *FilteredIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) {
if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypePostings) {
c.cache.StorePostings(blockID, l, v, tenant)
}
}

// FetchMultiPostings fetches multiple postings - each identified by a label -
// and returns a map containing cache hits, along with a list of missing keys.
func (c *FilteredIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label) {
if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypePostings) {
return c.cache.FetchMultiPostings(ctx, blockID, keys, tenant)
}
return nil, keys
}

// StoreExpandedPostings stores expanded postings for a set of label matchers.
func (c *FilteredIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) {
if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeExpandedPostings) {
c.cache.StoreExpandedPostings(blockID, matchers, v, tenant)
}
}

// FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not.
func (c *FilteredIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) {
if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeExpandedPostings) {
return c.cache.FetchExpandedPostings(ctx, blockID, matchers, tenant)
}
return nil, false
}

// StoreSeries sets the series identified by the ulid and id to the value v,
// if the series already exists in the cache it is not mutated.
func (c *FilteredIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) {
if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeSeries) {
c.cache.StoreSeries(blockID, id, v, tenant)
}
}

// FetchMultiSeries fetches multiple series - each identified by ID - from the cache
// and returns a map containing cache hits, along with a list of missing IDs.
func (c *FilteredIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) {
if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeSeries) {
return c.cache.FetchMultiSeries(ctx, blockID, ids, tenant)
}
return nil, ids
}

func ValidateEnabledItems(enabledItems []string) error {
for _, item := range enabledItems {
switch item {
// valid
case cacheTypePostings, cacheTypeExpandedPostings, cacheTypeSeries:
default:
return fmt.Errorf("unsupported item type %s", item)
}
}
return nil
}
164 changes: 164 additions & 0 deletions pkg/store/cache/filter_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package storecache

import (
"context"
"testing"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"

"github.com/thanos-io/thanos/pkg/tenancy"
)

func TestFilterCache(t *testing.T) {
blockID := ulid.MustNew(ulid.Now(), nil)
postingKeys := []labels.Label{
{Name: "foo", Value: "bar"},
}
expandedPostingsMatchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
}
testPostingData := []byte("postings")
testExpandedPostingsData := []byte("expandedPostings")
testSeriesData := []byte("series")
ctx := context.TODO()
for _, tc := range []struct {
name string
enabledItems []string
expectedError string
verifyFunc func(t *testing.T, c IndexCache)
}{
{
name: "invalid item type",
expectedError: "unsupported item type foo",
enabledItems: []string{"foo"},
},
{
name: "invalid item type with 1 valid cache type",
expectedError: "unsupported item type foo",
enabledItems: []string{cacheTypeExpandedPostings, "foo"},
},
{
name: "empty enabled items",
verifyFunc: func(t *testing.T, c IndexCache) {
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant)
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant)

hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant)
testutil.Equals(t, 0, len(missed))
testutil.Equals(t, testPostingData, hits[postingKeys[0]])

ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant)
testutil.Equals(t, true, hit)
testutil.Equals(t, testExpandedPostingsData, ep)

seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant)
testutil.Equals(t, 0, len(misses))
testutil.Equals(t, testSeriesData, seriesHit[1])
},
},
{
name: "all enabled items",
enabledItems: []string{cacheTypeSeries, cacheTypePostings, cacheTypeExpandedPostings},
verifyFunc: func(t *testing.T, c IndexCache) {
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant)
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant)

hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant)
testutil.Equals(t, 0, len(missed))
testutil.Equals(t, testPostingData, hits[postingKeys[0]])

ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant)
testutil.Assert(t, true, hit)
testutil.Equals(t, testExpandedPostingsData, ep)

seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant)
testutil.Equals(t, 0, len(misses))
testutil.Equals(t, testSeriesData, seriesHit[1])
},
},
{
name: "only enable postings",
enabledItems: []string{cacheTypePostings},
verifyFunc: func(t *testing.T, c IndexCache) {
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant)
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant)

hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant)
testutil.Equals(t, 0, len(missed))
testutil.Equals(t, testPostingData, hits[postingKeys[0]])

_, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant)
testutil.Equals(t, false, hit)

seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant)
testutil.Equals(t, 1, len(misses))
testutil.Equals(t, 0, len(seriesHit))
},
},
{
name: "only enable expanded postings",
enabledItems: []string{cacheTypeExpandedPostings},
verifyFunc: func(t *testing.T, c IndexCache) {
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant)
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant)

hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant)
testutil.Equals(t, 1, len(missed))
testutil.Equals(t, 0, len(hits))

ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant)
testutil.Equals(t, true, hit)
testutil.Equals(t, testExpandedPostingsData, ep)

seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant)
testutil.Equals(t, 1, len(misses))
testutil.Equals(t, 0, len(seriesHit))
},
},
{
name: "only enable series",
enabledItems: []string{cacheTypeSeries},
verifyFunc: func(t *testing.T, c IndexCache) {
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant)
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant)

hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant)
testutil.Equals(t, 1, len(missed))
testutil.Equals(t, 0, len(hits))

_, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant)
testutil.Equals(t, false, hit)

seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant)
testutil.Equals(t, 0, len(misses))
testutil.Equals(t, testSeriesData, seriesHit[1])
},
},
} {
t.Run(tc.name, func(t *testing.T) {
inMemoryCache, err := NewInMemoryIndexCacheWithConfig(log.NewNopLogger(), nil, prometheus.NewRegistry(), DefaultInMemoryIndexCacheConfig)
testutil.Ok(t, err)
err = ValidateEnabledItems(tc.enabledItems)
if tc.expectedError != "" {
testutil.Equals(t, tc.expectedError, err.Error())
} else {
testutil.Ok(t, err)
c := NewFilteredIndexCache(inMemoryCache, tc.enabledItems)
tc.verifyFunc(t, c)
}
})
}
}
Loading