diff --git a/CHANGELOG.md b/CHANGELOG.md index e6dc6f9a2e..cab5a36852 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#1854](https://github.com/thanos-io/thanos/pull/1854) Update Rule UI to support alerts count displaying and filtering. - [#1838](https://github.com/thanos-io/thanos/pull/1838) Ruler: Add TLS and authentication support for Alertmanager with the `--alertmanagers.config` and `--alertmanagers.config-file` CLI flags. See [documentation](docs/components/rule.md/#configuration) for further information. - [#1838](https://github.com/thanos-io/thanos/pull/1838) Ruler: Add a new `--alertmanagers.sd-dns-interval` CLI option to specify the interval between DNS resolutions of Alertmanager hosts. +- [#1881](https://github.com/thanos-io/thanos/pull/1881) Store Gateway: memcached support for index cache. See [documentation](docs/components/store.md/#index-cache) for further information. ## [v0.9.0](https://github.com/thanos-io/thanos/releases/tag/v0.9.0) - 2019.12.03 diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 0a3af11677..1ec81d87b6 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -36,9 +36,13 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { dataDir := cmd.Flag("data-dir", "Data directory in which to cache remote blocks."). Default("./data").String() - indexCacheSize := cmd.Flag("index-cache-size", "Maximum size of items held in the index cache."). + indexCacheSize := cmd.Flag("index-cache-size", "Maximum size of items held in the in-memory index cache. Ignored if --index-cache.config or --index-cache.config-file option is specified."). Default("250MB").Bytes() + indexCacheConfig := extflag.RegisterPathOrContent(cmd, "index-cache.config", + "YAML file that contains index cache configuration. See format details: https://thanos.io/components/store.md/#index-cache", + false) + chunkPoolSize := cmd.Flag("chunk-pool-size", "Maximum size of concurrently allocatable bytes for chunks."). Default("2GB").Bytes() @@ -77,6 +81,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { logger, reg, tracer, + indexCacheConfig, objStoreConfig, *dataDir, *grpcBindAddr, @@ -110,6 +115,7 @@ func runStore( logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, + indexCacheConfig *extflag.PathOrContent, objStoreConfig *extflag.PathOrContent, dataDir string, grpcBindAddr string, @@ -169,6 +175,11 @@ func runStore( return err } + indexCacheContentYaml, err := indexCacheConfig.Content() + if err != nil { + return errors.Wrap(err, "get content of index cache configuration") + } + // Ensure we close up everything properly. defer func() { if err != nil { @@ -176,13 +187,17 @@ func runStore( } }() - // TODO(bwplotka): Add as a flag? - maxItemSizeBytes := indexCacheSizeBytes / 2 - - indexCache, err := storecache.NewInMemoryIndexCache(logger, reg, storecache.Opts{ - MaxSizeBytes: indexCacheSizeBytes, - MaxItemSizeBytes: maxItemSizeBytes, - }) + // Create the index cache loading its config from config file, while keeping + // backward compatibility with the pre-config file era. + var indexCache storecache.IndexCache + if len(indexCacheContentYaml) > 0 { + indexCache, err = storecache.NewIndexCache(logger, indexCacheContentYaml, reg) + } else { + indexCache, err = storecache.NewInMemoryIndexCacheWithConfig(logger, reg, storecache.InMemoryIndexCacheConfig{ + MaxSize: storecache.Bytes(indexCacheSizeBytes), + MaxItemSize: storecache.DefaultInMemoryIndexCacheConfig.MaxItemSize, + }) + } if err != nil { return errors.Wrap(err, "create index cache") } diff --git a/docs/components/store.md b/docs/components/store.md index 3b2a7f5737..c25512993c 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -68,7 +68,19 @@ Flags: CA is specified, there is no client verification on server side. (tls.NoClientCert) --data-dir="./data" Data directory in which to cache remote blocks. - --index-cache-size=250MB Maximum size of items held in the index cache. + --index-cache-size=250MB Maximum size of items held in the in-memory + index cache. Ignored if --index-cache.config or + --index-cache.config-file option is specified. + --index-cache.config-file= + Path to YAML file that contains index cache + configuration. See format details: + https://thanos.io/components/store.md/#index-cache + --index-cache.config= + Alternative to 'index-cache.config-file' flag + (lower priority). Content of YAML file that + contains index cache configuration. See format + details: + https://thanos.io/components/store.md/#index-cache --chunk-pool-size=2GB Maximum size of concurrently allocatable bytes for chunks. --store.grpc.series-sample-limit=0 @@ -151,3 +163,61 @@ Filtering is done on a Chunk level, so Thanos Store might still return Samples w - `/-/ready` starts after all the bootstrapping completed (e.g initial index building) and ready to serve traffic. > NOTE: Metric endpoint starts immediately so, make sure you set up readiness probe on designated HTTP `/-/ready` path. + +## Index cache + +Thanos Store Gateway supports an index cache to speed up postings and series lookups from TSDB blocks indexes. Two types of caches are supported: + +- `in-memory` (_default_) +- `memcached` + +### In-memory index cache + +The `in-memory` index cache is enabled by default and its max size can be configured through the flag `--index-cache-size`. + +Alternatively, the `in-memory` index cache can also by configured using `--index-cache.config-file` to reference to the configuration file or `--index-cache.config` to put yaml config directly: + +[embedmd]:# (../flags/config_index_cache_in_memory.txt yaml) +```yaml +type: IN-MEMORY +config: + max_size: 0 + max_item_size: 0 +``` + +All the settings are **optional**: + +- `max_size`: overall maximum number of bytes cache can contain. The value should be specified with a bytes unit (ie. `250MB`). +- `max_item_size`: maximum size of single item, in bytes. The value should be specified with a bytes unit (ie. `125MB`). + +### Memcached index cache + +The `memcached` index cache allows to use [Memcached](https://memcached.org) as cache backend. This cache type is configured using `--index-cache.config-file` to reference to the configuration file or `--index-cache.config` to put yaml config directly: + +[embedmd]:# (../flags/config_index_cache_memcached.txt yaml) +```yaml +type: MEMCACHED +config: + addresses: [] + timeout: 0s + max_idle_connections: 0 + max_async_concurrency: 0 + max_async_buffer_size: 0 + max_get_multi_concurrency: 0 + max_get_multi_batch_size: 0 + dns_provider_update_interval: 0s +``` + +The **required** settings are: + +- `addresses`: list of memcached addresses, that will get resolved with the [DNS service discovery](../service-discovery.md/#dns-service-discovery) provider. + +While the remaining settings are **optional**: + +- `timeout`: the socket read/write timeout. +- `max_idle_connections`: maximum number of idle connections that will be maintained per address. +- `max_async_concurrency`: maximum number of concurrent asynchronous operations can occur. +- `max_async_buffer_size`: maximum number of enqueued asynchronous operations allowed. +- `max_get_multi_concurrency`: maximum number of concurrent connections when fetching keys. If set to `0`, the concurrency is unlimited. +- `max_get_multi_batch_size`: maximum number of keys a single underlying operation should fetch. If more keys are specified, internally keys are splitted into multiple batches and fetched concurrently, honoring `max_get_multi_concurrency`. If set to `0`, the batch size is unlimited. +- `dns_provider_update_interval`: the DNS discovery update interval. diff --git a/go.mod b/go.mod index 171dd7188a..bae87de512 100644 --- a/go.mod +++ b/go.mod @@ -13,17 +13,19 @@ require ( github.com/Azure/go-autorest/autorest/validation v0.2.1-0.20191028180845-3492b2aff503 // indirect github.com/NYTimes/gziphandler v1.1.1 github.com/OneOfOne/xxhash v1.2.6 // indirect - github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect + github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d github.com/aliyun/aliyun-oss-go-sdk v2.0.4+incompatible github.com/armon/go-metrics v0.3.0 github.com/aws/aws-sdk-go v1.25.35 // indirect github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f // indirect + github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b github.com/cespare/xxhash v1.1.0 github.com/cespare/xxhash/v2 v2.1.1 // indirect github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect github.com/elastic/go-sysinfo v1.1.1 // indirect github.com/elastic/go-windows v1.0.1 // indirect github.com/evanphx/json-patch v4.5.0+incompatible // indirect + github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb github.com/fatih/structtag v1.1.0 github.com/fortytw2/leaktest v1.3.0 github.com/fsnotify/fsnotify v1.4.7 @@ -83,7 +85,7 @@ require ( go.opencensus.io v0.22.2 // indirect go.uber.org/atomic v1.5.0 // indirect go.uber.org/automaxprocs v1.2.0 - golang.org/x/crypto v0.0.0-20191112222119-e1110fd1c708 // indirect + golang.org/x/crypto v0.0.0-20191112222119-e1110fd1c708 golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e golang.org/x/sys v0.0.0-20191113165036-4c7a9d0fe056 // indirect diff --git a/go.sum b/go.sum index 42cbdc4313..724168b406 100644 --- a/go.sum +++ b/go.sum @@ -93,6 +93,8 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= +github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b h1:L/QXpzIa3pOvUGt1D1lA5KjYhPBAN/3iWdP7xeFS9F0= +github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b/go.mod h1:H0wQNHz2YrLsuXOZozoeDmnHXkNCRmMW0gwFWDfEZDA= github.com/cenkalti/backoff v0.0.0-20181003080854-62661b46c409/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v0.0.0-20181017004759-096ff4a8a059/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= @@ -134,6 +136,8 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7 github.com/evanphx/json-patch v0.0.0-20190203023257-5858425f7550/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch v4.5.0+incompatible h1:ouOWdg56aJriqS0huScTkVXPC5IcNrDCXZ6OoTAWu7M= github.com/evanphx/json-patch v4.5.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= +github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb h1:IT4JYU7k4ikYg1SCxNI1/Tieq/NFvh6dzLdgi7eu0tM= +github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb/go.mod h1:bH6Xx7IW64qjjJq8M2u4dxNaBiDfKK+z/3eGDpXEQhc= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/structtag v1.1.0 h1:6j4mUV/ES2duvnAzKMFkN6/A5mCaNYPD3xfbAkLLOF8= github.com/fatih/structtag v1.1.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94= diff --git a/pkg/cacheutil/jump_hash.go b/pkg/cacheutil/jump_hash.go new file mode 100644 index 0000000000..4b684b7384 --- /dev/null +++ b/pkg/cacheutil/jump_hash.go @@ -0,0 +1,18 @@ +package cacheutil + +// jumpHash consistently chooses a hash bucket number in the range +// [0, numBuckets) for the given key. numBuckets must be >= 1. +// +// Copied from github.com/dgryski/go-jump/blob/master/jump.go (MIT license). +func jumpHash(key uint64, numBuckets int) int32 { + var b int64 = -1 + var j int64 + + for j < int64(numBuckets) { + b = j + key = key*2862933555777941757 + 1 + j = int64(float64(b+1) * (float64(int64(1)<<31) / float64((key>>33)+1))) + } + + return int32(b) +} diff --git a/pkg/cacheutil/memcached_client.go b/pkg/cacheutil/memcached_client.go new file mode 100644 index 0000000000..91d4552dd1 --- /dev/null +++ b/pkg/cacheutil/memcached_client.go @@ -0,0 +1,441 @@ +package cacheutil + +import ( + "context" + "sync" + "time" + + "github.com/bradfitz/gomemcache/memcache" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/thanos/pkg/discovery/dns" + "github.com/thanos-io/thanos/pkg/extprom" + "github.com/thanos-io/thanos/pkg/gate" + "github.com/thanos-io/thanos/pkg/tracing" + yaml "gopkg.in/yaml.v2" +) + +const ( + opSet = "set" + opGetMulti = "getmulti" +) + +var ( + errMemcachedAsyncBufferFull = errors.New("the async buffer is full") + errMemcachedConfigNoAddrs = errors.New("no memcached addresses provided") + + defaultMemcachedClientConfig = MemcachedClientConfig{ + Timeout: 500 * time.Millisecond, + MaxIdleConnections: 100, + MaxAsyncConcurrency: 20, + MaxAsyncBufferSize: 10000, + MaxGetMultiConcurrency: 100, + MaxGetMultiBatchSize: 0, + DNSProviderUpdateInterval: 10 * time.Second, + } +) + +// MemcachedClient is a high level client to interact with memcached. +type MemcachedClient interface { + // GetMulti fetches multiple keys at once from memcached. In case of error, + // an empty map is returned and the error tracked/logged. + GetMulti(ctx context.Context, keys []string) map[string][]byte + + // SetAsync enqueues an asynchronous operation to store a key into memcached. + // Returns an error in case it fails to enqueue the operation. In case the + // underlying async operation will fail, the error will be tracked/logged. + SetAsync(ctx context.Context, key string, value []byte, ttl time.Duration) error + + // Stop client and release underlying resources. + Stop() +} + +// memcachedClientBackend is an interface used to mock the underlying client in tests. +type memcachedClientBackend interface { + GetMulti(keys []string) (map[string]*memcache.Item, error) + Set(item *memcache.Item) error +} + +// MemcachedClientConfig is the config accepted by MemcachedClient. +type MemcachedClientConfig struct { + // Addresses specifies the list of memcached addresses. The addresses get + // resolved with the DNS provider. + Addresses []string `yaml:"addresses"` + + // Timeout specifies the socket read/write timeout. + Timeout time.Duration `yaml:"timeout"` + + // MaxIdleConnections specifies the maximum number of idle connections that + // will be maintained per address. For better performances, this should be + // set to a number higher than your peak parallel requests. + MaxIdleConnections int `yaml:"max_idle_connections"` + + // MaxAsyncConcurrency specifies the maximum number of concurrent asynchronous + // operations can occur. + MaxAsyncConcurrency int `yaml:"max_async_concurrency"` + + // MaxAsyncBufferSize specifies the maximum number of enqueued asynchronous + // operations allowed. + MaxAsyncBufferSize int `yaml:"max_async_buffer_size"` + + // MaxGetMultiConcurrency specifies the maximum number of concurrent connections + // running GetMulti() operations. If set to 0, concurrency is unlimited. + MaxGetMultiConcurrency int `yaml:"max_get_multi_concurrency"` + + // MaxGetMultiBatchSize specifies the maximum number of keys a single underlying + // GetMulti() should run. If more keys are specified, internally keys are splitted + // into multiple batches and fetched concurrently, honoring MaxGetMultiConcurrency + // parallelism. If set to 0, the max batch size is unlimited. + MaxGetMultiBatchSize int `yaml:"max_get_multi_batch_size"` + + // DNSProviderUpdateInterval specifies the DNS discovery update interval. + DNSProviderUpdateInterval time.Duration `yaml:"dns_provider_update_interval"` +} + +func (c *MemcachedClientConfig) validate() error { + if len(c.Addresses) == 0 { + return errMemcachedConfigNoAddrs + } + + return nil +} + +// parseMemcachedClientConfig unmarshals a buffer into a MemcachedClientConfig with default values. +func parseMemcachedClientConfig(conf []byte) (MemcachedClientConfig, error) { + config := defaultMemcachedClientConfig + if err := yaml.Unmarshal(conf, &config); err != nil { + return MemcachedClientConfig{}, err + } + + return config, nil +} + +type memcachedClient struct { + logger log.Logger + config MemcachedClientConfig + client memcachedClientBackend + selector *MemcachedJumpHashSelector + + // DNS provider used to keep the memcached servers list updated. + dnsProvider *dns.Provider + + // Channel used to notify internal goroutines when they should quit. + stop chan struct{} + + // Channel used to enqueue async operations. + asyncQueue chan func() + + // Gate used to enforce the max number of concurrent GetMulti() operations. + getMultiGate *gate.Gate + + // Wait group used to wait all workers on stopping. + workers sync.WaitGroup + + // Tracked metrics. + operations *prometheus.CounterVec + failures *prometheus.CounterVec + duration *prometheus.HistogramVec +} + +type memcachedGetMultiResult struct { + items map[string]*memcache.Item + err error +} + +// NewMemcachedClient makes a new MemcachedClient. +func NewMemcachedClient(logger log.Logger, name string, conf []byte, reg prometheus.Registerer) (*memcachedClient, error) { + config, err := parseMemcachedClientConfig(conf) + if err != nil { + return nil, err + } + + return NewMemcachedClientWithConfig(logger, name, config, reg) +} + +// NewMemcachedClientWithConfig makes a new MemcachedClient. +func NewMemcachedClientWithConfig(logger log.Logger, name string, config MemcachedClientConfig, reg prometheus.Registerer) (*memcachedClient, error) { + if err := config.validate(); err != nil { + return nil, err + } + + // We use a custom servers selector in order to use a jump hash + // for servers selection. + selector := &MemcachedJumpHashSelector{} + + client := memcache.NewFromSelector(selector) + client.Timeout = config.Timeout + client.MaxIdleConns = config.MaxIdleConnections + + return newMemcachedClient(logger, name, client, selector, config, reg) +} + +func newMemcachedClient( + logger log.Logger, + name string, + client memcachedClientBackend, + selector *MemcachedJumpHashSelector, + config MemcachedClientConfig, + reg prometheus.Registerer, +) (*memcachedClient, error) { + dnsProvider := dns.NewProvider( + logger, + extprom.WrapRegistererWithPrefix("thanos_memcached_", reg), + dns.ResolverType(dns.GolangResolverType), + ) + + c := &memcachedClient{ + logger: logger, + config: config, + client: client, + selector: selector, + dnsProvider: dnsProvider, + asyncQueue: make(chan func(), config.MaxAsyncBufferSize), + stop: make(chan struct{}, 1), + getMultiGate: gate.NewGate( + config.MaxGetMultiConcurrency, + extprom.WrapRegistererWithPrefix("thanos_memcached_getmulti_", reg), + ), + } + + c.operations = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_memcached_operations_total", + Help: "Total number of operations against memcached.", + ConstLabels: prometheus.Labels{"name": name}, + }, []string{"operation"}) + + c.failures = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_memcached_operation_failures_total", + Help: "Total number of operations against memcached that failed.", + ConstLabels: prometheus.Labels{"name": name}, + }, []string{"operation"}) + + c.duration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "thanos_memcached_operation_duration_seconds", + Help: "Duration of operations against memcached.", + ConstLabels: prometheus.Labels{"name": name}, + Buckets: []float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1}, + }, []string{"operation"}) + + if reg != nil { + reg.MustRegister(c.operations, c.failures, c.duration) + } + + // As soon as the client is created it must ensure that memcached server + // addresses are resolved, so we're going to trigger an initial addresses + // resolution here. + if err := c.resolveAddrs(); err != nil { + return nil, err + } + + c.workers.Add(1) + go c.resolveAddrsLoop() + + // Start a number of goroutines - processing async operations - equal + // to the max concurrency we have. + c.workers.Add(c.config.MaxAsyncConcurrency) + for i := 0; i < c.config.MaxAsyncConcurrency; i++ { + go c.asyncQueueProcessLoop() + } + + return c, nil +} + +func (c *memcachedClient) Stop() { + close(c.stop) + + // Wait until all workers have terminated. + c.workers.Wait() +} + +func (c *memcachedClient) SetAsync(ctx context.Context, key string, value []byte, ttl time.Duration) error { + return c.enqueueAsync(func() { + start := time.Now() + c.operations.WithLabelValues(opSet).Inc() + + span, _ := tracing.StartSpan(ctx, "memcached_set") + err := c.client.Set(&memcache.Item{ + Key: key, + Value: value, + Expiration: int32(time.Now().Add(ttl).Unix()), + }) + span.Finish() + if err != nil { + c.failures.WithLabelValues(opSet).Inc() + level.Warn(c.logger).Log("msg", "failed to store item to memcached", "key", key, "err", err) + return + } + + c.duration.WithLabelValues(opSet).Observe(time.Since(start).Seconds()) + }) +} + +func (c *memcachedClient) GetMulti(ctx context.Context, keys []string) map[string][]byte { + batches, err := c.getMultiBatched(ctx, keys) + if err != nil { + level.Warn(c.logger).Log("msg", "failed to fetch items from memcached", "err", err) + + // In case we have both results and an error, it means some batch requests + // failed and other succeeded. In this case we prefer to log it and move on, + // given returning some results from the cache is better than returning + // nothing. + if len(batches) == 0 { + return nil + } + } + + hits := map[string][]byte{} + for _, items := range batches { + for key, item := range items { + hits[key] = item.Value + } + } + + return hits +} + +func (c *memcachedClient) getMultiBatched(ctx context.Context, keys []string) ([]map[string]*memcache.Item, error) { + // Do not batch if the input keys are less than the max batch size. + if (c.config.MaxGetMultiBatchSize <= 0) || (len(keys) <= c.config.MaxGetMultiBatchSize) { + items, err := c.getMultiSingle(ctx, keys) + if err != nil { + return nil, err + } + + return []map[string]*memcache.Item{items}, nil + } + + // Calculate the number of expected results. + batchSize := c.config.MaxGetMultiBatchSize + numResults := len(keys) / batchSize + if len(keys)%batchSize != 0 { + numResults++ + } + + // Spawn a goroutine for each batch request. The max concurrency will be + // enforced by getMultiSingle(). + results := make(chan *memcachedGetMultiResult, numResults) + defer close(results) + + for batchStart := 0; batchStart < len(keys); batchStart += batchSize { + batchEnd := batchStart + batchSize + if batchEnd > len(keys) { + batchEnd = len(keys) + } + + batchKeys := keys[batchStart:batchEnd] + + c.workers.Add(1) + go func() { + defer c.workers.Done() + + res := &memcachedGetMultiResult{} + res.items, res.err = c.getMultiSingle(ctx, batchKeys) + + results <- res + }() + } + + // Wait for all batch results. In case of error, we keep + // track of the last error occurred. + items := make([]map[string]*memcache.Item, 0, numResults) + var lastErr error + + for i := 0; i < numResults; i++ { + result := <-results + if result.err != nil { + lastErr = result.err + continue + } + + items = append(items, result.items) + } + + return items, lastErr +} + +func (c *memcachedClient) getMultiSingle(ctx context.Context, keys []string) (map[string]*memcache.Item, error) { + // Wait until we get a free slot from the gate, if the max + // concurrency should be enforced. + if c.config.MaxGetMultiConcurrency > 0 { + span, _ := tracing.StartSpan(ctx, "memcached_getmulti_gate_ismyturn") + err := c.getMultiGate.IsMyTurn(ctx) + span.Finish() + if err != nil { + return nil, errors.Wrapf(err, "failed to wait for turn") + } + defer c.getMultiGate.Done() + } + + start := time.Now() + c.operations.WithLabelValues(opGetMulti).Inc() + + span, _ := tracing.StartSpan(ctx, "memcached_getmulti") + items, err := c.client.GetMulti(keys) + span.Finish() + if err != nil { + c.failures.WithLabelValues(opGetMulti).Inc() + } else { + c.duration.WithLabelValues(opGetMulti).Observe(time.Since(start).Seconds()) + } + + return items, err +} + +func (c *memcachedClient) enqueueAsync(op func()) error { + select { + case c.asyncQueue <- op: + return nil + default: + return errMemcachedAsyncBufferFull + } +} + +func (c *memcachedClient) asyncQueueProcessLoop() { + defer c.workers.Done() + + for { + select { + case op := <-c.asyncQueue: + op() + case <-c.stop: + return + } + } +} + +func (c *memcachedClient) resolveAddrsLoop() { + defer c.workers.Done() + + ticker := time.NewTicker(c.config.DNSProviderUpdateInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + err := c.resolveAddrs() + if err != nil { + level.Warn(c.logger).Log("msg", "failed update memcached servers list", "err", err) + } + case <-c.stop: + return + } + } +} + +func (c *memcachedClient) resolveAddrs() error { + // Resolve configured addresses with a reasonable timeout. + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + c.dnsProvider.Resolve(ctx, c.config.Addresses) + + // Fail in case no server address is resolved. + servers := c.dnsProvider.Addresses() + if len(servers) == 0 { + return errors.New("no server address resolved") + } + + return c.selector.SetServers(servers...) +} diff --git a/pkg/cacheutil/memcached_client_test.go b/pkg/cacheutil/memcached_client_test.go new file mode 100644 index 0000000000..a1f77fe282 --- /dev/null +++ b/pkg/cacheutil/memcached_client_test.go @@ -0,0 +1,400 @@ +package cacheutil + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + "github.com/bradfitz/gomemcache/memcache" + "github.com/fortytw2/leaktest" + "github.com/go-kit/kit/log" + prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestMemcachedClientConfig_validate(t *testing.T) { + tests := map[string]struct { + config MemcachedClientConfig + expected error + }{ + "should pass on valid config": { + config: MemcachedClientConfig{ + Addresses: []string{"127.0.0.1:11211"}, + }, + expected: nil, + }, + "should fail on no addresses": { + config: MemcachedClientConfig{ + Addresses: []string{}, + }, + expected: errMemcachedConfigNoAddrs, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + testutil.Equals(t, testData.expected, testData.config.validate()) + }) + } +} + +func TestNewMemcachedClient(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + + // Should return error on empty YAML config. + conf := []byte{} + cache, err := NewMemcachedClient(log.NewNopLogger(), "test", conf, nil) + testutil.NotOk(t, err) + testutil.Equals(t, (*memcachedClient)(nil), cache) + + // Should return error on invalid YAML config. + conf = []byte("invalid") + cache, err = NewMemcachedClient(log.NewNopLogger(), "test", conf, nil) + testutil.NotOk(t, err) + testutil.Equals(t, (*memcachedClient)(nil), cache) + + // Should instance a memcached client with minimum YAML config. + conf = []byte(` +addresses: + - 127.0.0.1:11211 + - 127.0.0.2:11211 +`) + cache, err = NewMemcachedClient(log.NewNopLogger(), "test", conf, nil) + testutil.Ok(t, err) + defer cache.Stop() + + testutil.Equals(t, []string{"127.0.0.1:11211", "127.0.0.2:11211"}, cache.config.Addresses) + testutil.Equals(t, defaultMemcachedClientConfig.Timeout, cache.config.Timeout) + testutil.Equals(t, defaultMemcachedClientConfig.MaxIdleConnections, cache.config.MaxIdleConnections) + testutil.Equals(t, defaultMemcachedClientConfig.MaxAsyncConcurrency, cache.config.MaxAsyncConcurrency) + testutil.Equals(t, defaultMemcachedClientConfig.MaxAsyncBufferSize, cache.config.MaxAsyncBufferSize) + testutil.Equals(t, defaultMemcachedClientConfig.DNSProviderUpdateInterval, cache.config.DNSProviderUpdateInterval) + testutil.Equals(t, defaultMemcachedClientConfig.MaxGetMultiConcurrency, cache.config.MaxGetMultiConcurrency) + testutil.Equals(t, defaultMemcachedClientConfig.MaxGetMultiBatchSize, cache.config.MaxGetMultiBatchSize) + + // Should instance a memcached client with configured YAML config. + conf = []byte(` +addresses: + - 127.0.0.1:11211 + - 127.0.0.2:11211 +timeout: 1s +max_idle_connections: 1 +max_async_concurrency: 1 +max_async_buffer_size: 1 +max_get_multi_concurrency: 1 +max_get_multi_batch_size: 1 +dns_provider_update_interval: 1s +`) + cache, err = NewMemcachedClient(log.NewNopLogger(), "test", conf, nil) + testutil.Ok(t, err) + defer cache.Stop() + + testutil.Equals(t, []string{"127.0.0.1:11211", "127.0.0.2:11211"}, cache.config.Addresses) + testutil.Equals(t, 1*time.Second, cache.config.Timeout) + testutil.Equals(t, 1, cache.config.MaxIdleConnections) + testutil.Equals(t, 1, cache.config.MaxAsyncConcurrency) + testutil.Equals(t, 1, cache.config.MaxAsyncBufferSize) + testutil.Equals(t, 1*time.Second, cache.config.DNSProviderUpdateInterval) + testutil.Equals(t, 1, cache.config.MaxGetMultiConcurrency) + testutil.Equals(t, 1, cache.config.MaxGetMultiBatchSize) +} + +func TestMemcachedClient_SetAsync(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + + ctx := context.Background() + config := defaultMemcachedClientConfig + config.Addresses = []string{"127.0.0.1:11211"} + backendMock := newMemcachedClientBackendMock() + + client, err := prepare(config, backendMock) + testutil.Ok(t, err) + defer client.Stop() + + testutil.Ok(t, client.SetAsync(ctx, "key-1", []byte("value-1"), time.Second)) + testutil.Ok(t, client.SetAsync(ctx, "key-2", []byte("value-2"), time.Second)) + testutil.Ok(t, backendMock.waitItems(2)) + + testutil.Equals(t, 2.0, prom_testutil.ToFloat64(client.operations.WithLabelValues(opSet))) + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(client.operations.WithLabelValues(opGetMulti))) + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(client.failures.WithLabelValues(opSet))) +} + +func TestMemcachedClient_GetMulti(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + + tests := map[string]struct { + maxBatchSize int + maxConcurrency int + mockedGetMultiErrors int + initialItems []memcache.Item + getKeys []string + expectedHits map[string][]byte + expectedGetMultiCount int + }{ + "should fetch keys in a single batch if the input keys is <= the max batch size": { + maxBatchSize: 2, + maxConcurrency: 5, + initialItems: []memcache.Item{ + {Key: "key-1", Value: []byte("value-1")}, + {Key: "key-2", Value: []byte("value-2")}, + }, + getKeys: []string{"key-1", "key-2"}, + expectedHits: map[string][]byte{ + "key-1": []byte("value-1"), + "key-2": []byte("value-2"), + }, + expectedGetMultiCount: 1, + }, + "should fetch keys in multiple batches if the input keys is > the max batch size": { + maxBatchSize: 2, + maxConcurrency: 5, + initialItems: []memcache.Item{ + {Key: "key-1", Value: []byte("value-1")}, + {Key: "key-2", Value: []byte("value-2")}, + {Key: "key-3", Value: []byte("value-3")}, + }, + getKeys: []string{"key-1", "key-2", "key-3"}, + expectedHits: map[string][]byte{ + "key-1": []byte("value-1"), + "key-2": []byte("value-2"), + "key-3": []byte("value-3"), + }, + expectedGetMultiCount: 2, + }, + "should fetch keys in multiple batches on input keys exact multiple of batch size": { + maxBatchSize: 2, + maxConcurrency: 5, + initialItems: []memcache.Item{ + {Key: "key-1", Value: []byte("value-1")}, + {Key: "key-2", Value: []byte("value-2")}, + {Key: "key-3", Value: []byte("value-3")}, + {Key: "key-4", Value: []byte("value-4")}, + }, + getKeys: []string{"key-1", "key-2", "key-3", "key-4"}, + expectedHits: map[string][]byte{ + "key-1": []byte("value-1"), + "key-2": []byte("value-2"), + "key-3": []byte("value-3"), + "key-4": []byte("value-4"), + }, + expectedGetMultiCount: 2, + }, + "should fetch keys in multiple batches on input keys exact multiple of batch size with max concurrency disabled (0)": { + maxBatchSize: 2, + maxConcurrency: 0, + initialItems: []memcache.Item{ + {Key: "key-1", Value: []byte("value-1")}, + {Key: "key-2", Value: []byte("value-2")}, + {Key: "key-3", Value: []byte("value-3")}, + {Key: "key-4", Value: []byte("value-4")}, + }, + getKeys: []string{"key-1", "key-2", "key-3", "key-4"}, + expectedHits: map[string][]byte{ + "key-1": []byte("value-1"), + "key-2": []byte("value-2"), + "key-3": []byte("value-3"), + "key-4": []byte("value-4"), + }, + expectedGetMultiCount: 2, + }, + "should fetch keys in multiple batches on input keys exact multiple of batch size with max concurrency lower than the batches": { + maxBatchSize: 1, + maxConcurrency: 1, + initialItems: []memcache.Item{ + {Key: "key-1", Value: []byte("value-1")}, + {Key: "key-2", Value: []byte("value-2")}, + {Key: "key-3", Value: []byte("value-3")}, + {Key: "key-4", Value: []byte("value-4")}, + }, + getKeys: []string{"key-1", "key-2", "key-3", "key-4"}, + expectedHits: map[string][]byte{ + "key-1": []byte("value-1"), + "key-2": []byte("value-2"), + "key-3": []byte("value-3"), + "key-4": []byte("value-4"), + }, + expectedGetMultiCount: 4, + }, + "should fetch keys in a single batch if max batch size is disabled (0)": { + maxBatchSize: 0, + maxConcurrency: 5, + initialItems: []memcache.Item{ + {Key: "key-1", Value: []byte("value-1")}, + {Key: "key-2", Value: []byte("value-2")}, + {Key: "key-3", Value: []byte("value-3")}, + {Key: "key-4", Value: []byte("value-4")}, + }, + getKeys: []string{"key-1", "key-2", "key-3", "key-4"}, + expectedHits: map[string][]byte{ + "key-1": []byte("value-1"), + "key-2": []byte("value-2"), + "key-3": []byte("value-3"), + "key-4": []byte("value-4"), + }, + expectedGetMultiCount: 1, + }, + "should fetch keys in a single batch if max batch size is disabled (0) and max concurrency is disabled (0)": { + maxBatchSize: 0, + maxConcurrency: 0, + initialItems: []memcache.Item{ + {Key: "key-1", Value: []byte("value-1")}, + {Key: "key-2", Value: []byte("value-2")}, + {Key: "key-3", Value: []byte("value-3")}, + {Key: "key-4", Value: []byte("value-4")}, + }, + getKeys: []string{"key-1", "key-2", "key-3", "key-4"}, + expectedHits: map[string][]byte{ + "key-1": []byte("value-1"), + "key-2": []byte("value-2"), + "key-3": []byte("value-3"), + "key-4": []byte("value-4"), + }, + expectedGetMultiCount: 1, + }, + "should return no hits on all keys missing": { + maxBatchSize: 2, + maxConcurrency: 5, + initialItems: []memcache.Item{ + {Key: "key-1", Value: []byte("value-1")}, + {Key: "key-2", Value: []byte("value-2")}, + }, + getKeys: []string{"key-1", "key-2", "key-3", "key-4"}, + expectedHits: map[string][]byte{ + "key-1": []byte("value-1"), + "key-2": []byte("value-2"), + }, + expectedGetMultiCount: 2, + }, + "should return no hits on partial errors while fetching batches and no items found": { + maxBatchSize: 2, + maxConcurrency: 5, + mockedGetMultiErrors: 1, + initialItems: []memcache.Item{ + {Key: "key-1", Value: []byte("value-1")}, + {Key: "key-2", Value: []byte("value-2")}, + {Key: "key-3", Value: []byte("value-3")}, + }, + getKeys: []string{"key-5", "key-6", "key-7"}, + expectedHits: map[string][]byte{}, + expectedGetMultiCount: 2, + }, + "should return no hits on all errors while fetching batches": { + maxBatchSize: 2, + maxConcurrency: 5, + mockedGetMultiErrors: 2, + initialItems: []memcache.Item{ + {Key: "key-1", Value: []byte("value-1")}, + {Key: "key-2", Value: []byte("value-2")}, + {Key: "key-3", Value: []byte("value-3")}, + }, + getKeys: []string{"key-5", "key-6", "key-7"}, + expectedHits: nil, + expectedGetMultiCount: 2, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + ctx := context.Background() + config := defaultMemcachedClientConfig + config.Addresses = []string{"127.0.0.1:11211"} + config.MaxGetMultiBatchSize = testData.maxBatchSize + config.MaxGetMultiConcurrency = testData.maxConcurrency + + backendMock := newMemcachedClientBackendMock() + backendMock.getMultiErrors = testData.mockedGetMultiErrors + + client, err := prepare(config, backendMock) + testutil.Ok(t, err) + defer client.Stop() + + // Populate memcached with the initial items. + for _, item := range testData.initialItems { + testutil.Ok(t, client.SetAsync(ctx, item.Key, item.Value, time.Second)) + } + + // Wait until initial items have been added. + testutil.Ok(t, backendMock.waitItems(len(testData.initialItems))) + + // Read back the items. + testutil.Equals(t, testData.expectedHits, client.GetMulti(ctx, testData.getKeys)) + + // Ensure the client has interacted with the backend as expected. + backendMock.lock.Lock() + defer backendMock.lock.Unlock() + testutil.Equals(t, testData.expectedGetMultiCount, backendMock.getMultiCount) + + // Ensure metrics are tracked. + testutil.Equals(t, float64(testData.expectedGetMultiCount), prom_testutil.ToFloat64(client.operations.WithLabelValues(opGetMulti))) + testutil.Equals(t, float64(testData.mockedGetMultiErrors), prom_testutil.ToFloat64(client.failures.WithLabelValues(opGetMulti))) + }) + } +} + +func prepare(config MemcachedClientConfig, backendMock *memcachedClientBackendMock) (*memcachedClient, error) { + logger := log.NewNopLogger() + selector := &MemcachedJumpHashSelector{} + client, err := newMemcachedClient(logger, "test", backendMock, selector, config, nil) + + return client, err +} + +type memcachedClientBackendMock struct { + lock sync.Mutex + items map[string]*memcache.Item + getMultiCount int + getMultiErrors int +} + +func newMemcachedClientBackendMock() *memcachedClientBackendMock { + return &memcachedClientBackendMock{ + items: map[string]*memcache.Item{}, + } +} + +func (c *memcachedClientBackendMock) GetMulti(keys []string) (map[string]*memcache.Item, error) { + c.lock.Lock() + defer c.lock.Unlock() + + c.getMultiCount++ + if c.getMultiCount <= c.getMultiErrors { + return nil, errors.New("mocked GetMulti error") + } + + items := make(map[string]*memcache.Item) + for _, key := range keys { + if item, ok := c.items[key]; ok { + items[key] = item + } + } + + return items, nil +} + +func (c *memcachedClientBackendMock) Set(item *memcache.Item) error { + c.lock.Lock() + defer c.lock.Unlock() + + c.items[item.Key] = item + + return nil +} + +func (c *memcachedClientBackendMock) waitItems(expected int) error { + deadline := time.Now().Add(1 * time.Second) + + for time.Now().Before(deadline) { + c.lock.Lock() + count := len(c.items) + c.lock.Unlock() + + if count >= expected { + return nil + } + } + + return errors.New("timeout expired while waiting for items in the memcached mock") +} diff --git a/pkg/cacheutil/memcached_server_selector.go b/pkg/cacheutil/memcached_server_selector.go new file mode 100644 index 0000000000..1318574045 --- /dev/null +++ b/pkg/cacheutil/memcached_server_selector.go @@ -0,0 +1,98 @@ +package cacheutil + +import ( + "net" + "sync" + + "github.com/bradfitz/gomemcache/memcache" + "github.com/cespare/xxhash" + "github.com/facette/natsort" +) + +var ( + addrsPool = sync.Pool{ + New: func() interface{} { + addrs := make([]net.Addr, 0, 64) + return &addrs + }, + } +) + +// MemcachedJumpHashSelector implements the memcache.ServerSelector +// interface, utilizing a jump hash to distribute keys to servers. +// +// While adding or removing servers only requires 1/N keys to move, +// servers are treated as a stack and can only be pushed/popped. +// Therefore, MemcachedJumpHashSelector works best for servers +// with consistent DNS names where the naturally sorted order +// is predictable (ie. Kubernetes statefulsets). +type MemcachedJumpHashSelector struct { + // To avoid copy and pasting all memcache server list logic, + // we embed it and implement our features on top of it. + servers memcache.ServerList +} + +// SetServers changes a MemcachedJumpHashSelector's set of servers at +// runtime and is safe for concurrent use by multiple goroutines. +// +// Each server is given equal weight. A server is given more weight +// if it's listed multiple times. +// +// SetServers returns an error if any of the server names fail to +// resolve. No attempt is made to connect to the server. If any +// error occurs, no changes are made to the internal server list. +// +// To minimize the number of rehashes for keys when scaling the +// number of servers in subsequent calls to SetServers, servers +// are stored in natural sort order. +func (s *MemcachedJumpHashSelector) SetServers(servers ...string) error { + sortedServers := make([]string, len(servers)) + copy(sortedServers, servers) + natsort.Sort(sortedServers) + + return s.servers.SetServers(sortedServers...) +} + +// PickServer returns the server address that a given item +// should be shared onto. +func (s *MemcachedJumpHashSelector) PickServer(key string) (net.Addr, error) { + // Unfortunately we can't read the list of server addresses from + // the original implementation, so we use Each() to fetch all of them. + addrs := *(addrsPool.Get().(*[]net.Addr)) + err := s.servers.Each(func(addr net.Addr) error { + addrs = append(addrs, addr) + return nil + }) + if err != nil { + return nil, err + } + + // No need of a jump hash in case of 0 or 1 servers. + if len(addrs) == 0 { + addrs = (addrs)[:0] + addrsPool.Put(&addrs) + return nil, memcache.ErrNoServers + } + if len(addrs) == 1 { + addrs = (addrs)[:0] + addrsPool.Put(&addrs) + return (addrs)[0], nil + } + + // Pick a server using the jump hash. + cs := xxhash.Sum64String(key) + idx := jumpHash(cs, len(addrs)) + picked := (addrs)[idx] + + addrs = (addrs)[:0] + addrsPool.Put(&addrs) + + return picked, nil +} + +// Each iterates over each server and calls the given function. +// If f returns a non-nil error, iteration will stop and that +// error will be returned. +func (s *MemcachedJumpHashSelector) Each(f func(net.Addr) error) error { + return s.servers.Each(f) +} diff --git a/pkg/cacheutil/memcached_server_selector_test.go b/pkg/cacheutil/memcached_server_selector_test.go new file mode 100644 index 0000000000..a215604ece --- /dev/null +++ b/pkg/cacheutil/memcached_server_selector_test.go @@ -0,0 +1,184 @@ +package cacheutil + +import ( + "fmt" + "net" + "testing" + "time" + + "github.com/bradfitz/gomemcache/memcache" + "github.com/facette/natsort" + "github.com/fortytw2/leaktest" + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestNatSort(t *testing.T) { + + // Validate that the order of SRV records returned by a DNS + // lookup for a k8s StatefulSet are ordered as expected when + // a natsort is done. + input := []string{ + "memcached-10.memcached.thanos.svc.cluster.local.", + "memcached-1.memcached.thanos.svc.cluster.local.", + "memcached-6.memcached.thanos.svc.cluster.local.", + "memcached-3.memcached.thanos.svc.cluster.local.", + "memcached-25.memcached.thanos.svc.cluster.local.", + } + + expected := []string{ + "memcached-1.memcached.thanos.svc.cluster.local.", + "memcached-3.memcached.thanos.svc.cluster.local.", + "memcached-6.memcached.thanos.svc.cluster.local.", + "memcached-10.memcached.thanos.svc.cluster.local.", + "memcached-25.memcached.thanos.svc.cluster.local.", + } + + natsort.Sort(input) + testutil.Equals(t, expected, input) +} + +func TestMemcachedJumpHashSelector_Each_ShouldRespectServersOrdering(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + + tests := []struct { + input []string + expected []string + }{ + { + input: []string{"127.0.0.1:11211", "127.0.0.2:11211", "127.0.0.3:11211"}, + expected: []string{"127.0.0.1:11211", "127.0.0.2:11211", "127.0.0.3:11211"}, + }, + { + input: []string{"127.0.0.2:11211", "127.0.0.3:11211", "127.0.0.1:11211"}, + expected: []string{"127.0.0.1:11211", "127.0.0.2:11211", "127.0.0.3:11211"}, + }, + } + + s := MemcachedJumpHashSelector{} + + for _, test := range tests { + testutil.Ok(t, s.SetServers(test.input...)) + + actual := make([]string, 0, 3) + err := s.Each(func(addr net.Addr) error { + actual = append(actual, addr.String()) + return nil + }) + + testutil.Ok(t, err) + testutil.Equals(t, test.expected, actual) + } +} + +func TestMemcachedJumpHashSelector_PickServer_ShouldEvenlyDistributeKeysToServers(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + + servers := []string{"127.0.0.1:11211", "127.0.0.2:11211", "127.0.0.3:11211"} + selector := MemcachedJumpHashSelector{} + testutil.Ok(t, selector.SetServers(servers...)) + + // Calculate the distribution of keys. + distribution := make(map[string]int) + + for i := 0; i < 1000; i++ { + key := fmt.Sprintf("key-%d", i) + addr, err := selector.PickServer(key) + testutil.Ok(t, err) + distribution[addr.String()]++ + } + + // Expect each server got at least 25% of keys, where the perfect split would be 33.3% each. + minKeysPerServer := int(float64(len(servers)) * 0.25) + testutil.Equals(t, len(servers), len(distribution)) + + for addr, count := range distribution { + if count < minKeysPerServer { + testutil.Ok(t, fmt.Errorf("expected %s to have received at least %d keys instead it received %d", addr, minKeysPerServer, count)) + } + } +} + +func TestMemcachedJumpHashSelector_PickServer_ShouldUseConsistentHashing(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + + servers := []string{ + "127.0.0.1:11211", + "127.0.0.2:11211", + "127.0.0.3:11211", + "127.0.0.4:11211", + "127.0.0.5:11211", + "127.0.0.6:11211", + "127.0.0.7:11211", + "127.0.0.8:11211", + "127.0.0.9:11211", + } + + selector := MemcachedJumpHashSelector{} + testutil.Ok(t, selector.SetServers(servers...)) + + // Pick a server for each key. + distribution := make(map[string]string) + numKeys := 1000 + + for i := 0; i < 1000; i++ { + key := fmt.Sprintf("key-%d", i) + addr, err := selector.PickServer(key) + testutil.Ok(t, err) + distribution[key] = addr.String() + } + + // Add 1 more server that - in a natural ordering - is added as last. + servers = append(servers, "127.0.0.10:11211") + testutil.Ok(t, selector.SetServers(servers...)) + + // Calculate the number of keys who has been moved due to the resharding. + moved := 0 + + for i := 0; i < 1000; i++ { + key := fmt.Sprintf("key-%d", i) + addr, err := selector.PickServer(key) + testutil.Ok(t, err) + + if distribution[key] != addr.String() { + moved++ + } + } + + // Expect we haven't moved more than (1/shards)% +2% tolerance. + maxExpectedMovedPerc := (1.0 / float64(len(servers))) + 0.02 + maxExpectedMoved := int(float64(numKeys) * maxExpectedMovedPerc) + if moved > maxExpectedMoved { + testutil.Ok(t, fmt.Errorf("expected resharding moved no more then %d keys while %d have been moved", maxExpectedMoved, moved)) + } +} + +func TestMemcachedJumpHashSelector_PickServer_ShouldReturnErrNoServersOnNoServers(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + + s := MemcachedJumpHashSelector{} + _, err := s.PickServer("foo") + testutil.Equals(t, memcache.ErrNoServers, err) +} + +func BenchmarkMemcachedJumpHashSelector_PickServer(b *testing.B) { + // Create a pretty long list of servers. + servers := make([]string, 0) + for i := 1; i <= 60; i++ { + servers = append(servers, fmt.Sprintf("127.0.0.%d:11211", i)) + } + + selector := MemcachedJumpHashSelector{} + err := selector.SetServers(servers...) + if err != nil { + b.Error(err) + } + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + _, err := selector.PickServer(string(i)) + if err != nil { + b.Error(err) + } + } +} diff --git a/pkg/store/gate.go b/pkg/gate/gate.go similarity index 99% rename from pkg/store/gate.go rename to pkg/gate/gate.go index 7662e5392f..78aeefe671 100644 --- a/pkg/store/gate.go +++ b/pkg/gate/gate.go @@ -1,4 +1,4 @@ -package store +package gate import ( "context" diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index d82690d7c3..ddb9edb114 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -32,6 +32,7 @@ import ( "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/extprom" + "github.com/thanos-io/thanos/pkg/gate" "github.com/thanos-io/thanos/pkg/model" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/pool" @@ -215,7 +216,7 @@ type BucketStore struct { blockSyncConcurrency int // Query gate which limits the maximum amount of concurrent queries. - queryGate *Gate + queryGate *gate.Gate // samplesLimiter limits the number of samples per each Series() call. samplesLimiter *Limiter @@ -271,7 +272,7 @@ func NewBucketStore( blockSets: map[uint64]*bucketBlockSet{}, debugLogging: debugLogging, blockSyncConcurrency: blockSyncConcurrency, - queryGate: NewGate( + queryGate: gate.NewGate( maxConcurrent, extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg), ), @@ -1527,7 +1528,7 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error { keys = append(keys, g.keys...) } - fromCache, _ := r.cache.FetchMultiPostings(r.block.meta.ULID, keys) + fromCache, _ := r.cache.FetchMultiPostings(r.ctx, r.block.meta.ULID, keys) // Iterate over all groups and fetch posting from cache. // If we have a miss, mark key to be fetched in `ptrs` slice. @@ -1606,7 +1607,7 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error { // Return postings and fill LRU cache. groups[p.groupID].Fill(p.keyID, fetchedPostings) - r.cache.StorePostings(r.block.meta.ULID, groups[p.groupID].keys[p.keyID], c) + r.cache.StorePostings(r.ctx, r.block.meta.ULID, groups[p.groupID].keys[p.keyID], c) // If we just fetched it we still have to update the stats for touched postings. r.stats.postingsTouched++ @@ -1624,7 +1625,7 @@ func (r *bucketIndexReader) PreloadSeries(ids []uint64) error { // Load series from cache, overwriting the list of ids to preload // with the missing ones. - fromCache, ids := r.cache.FetchMultiSeries(r.block.meta.ULID, ids) + fromCache, ids := r.cache.FetchMultiSeries(r.ctx, r.block.meta.ULID, ids) for id, b := range fromCache { r.loadedSeries[id] = b } @@ -1672,7 +1673,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, start, } c = c[n : n+int(l)] r.loadedSeries[id] = c - r.cache.StoreSeries(r.block.meta.ULID, id, c) + r.cache.StoreSeries(r.ctx, r.block.meta.ULID, id, c) } return nil } diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index c30ec4e36d..d860887b3a 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -37,13 +37,13 @@ var ( type noopCache struct{} -func (noopCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) {} -func (noopCache) FetchMultiPostings(blockID ulid.ULID, keys []labels.Label) (map[labels.Label][]byte, []labels.Label) { +func (noopCache) StorePostings(ctx context.Context, blockID ulid.ULID, l labels.Label, v []byte) {} +func (noopCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label) (map[labels.Label][]byte, []labels.Label) { return map[labels.Label][]byte{}, keys } -func (noopCache) StoreSeries(blockID ulid.ULID, id uint64, v []byte) {} -func (noopCache) FetchMultiSeries(blockID ulid.ULID, ids []uint64) (map[uint64][]byte, []uint64) { +func (noopCache) StoreSeries(ctx context.Context, blockID ulid.ULID, id uint64, v []byte) {} +func (noopCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []uint64) (map[uint64][]byte, []uint64) { return map[uint64][]byte{}, ids } @@ -55,20 +55,20 @@ func (c *swappableCache) SwapWith(ptr2 storecache.IndexCache) { c.ptr = ptr2 } -func (c *swappableCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) { - c.ptr.StorePostings(blockID, l, v) +func (c *swappableCache) StorePostings(ctx context.Context, blockID ulid.ULID, l labels.Label, v []byte) { + c.ptr.StorePostings(ctx, blockID, l, v) } -func (c *swappableCache) FetchMultiPostings(blockID ulid.ULID, keys []labels.Label) (map[labels.Label][]byte, []labels.Label) { - return c.ptr.FetchMultiPostings(blockID, keys) +func (c *swappableCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label) (map[labels.Label][]byte, []labels.Label) { + return c.ptr.FetchMultiPostings(ctx, blockID, keys) } -func (c *swappableCache) StoreSeries(blockID ulid.ULID, id uint64, v []byte) { - c.ptr.StoreSeries(blockID, id, v) +func (c *swappableCache) StoreSeries(ctx context.Context, blockID ulid.ULID, id uint64, v []byte) { + c.ptr.StoreSeries(ctx, blockID, id, v) } -func (c *swappableCache) FetchMultiSeries(blockID ulid.ULID, ids []uint64) (map[uint64][]byte, []uint64) { - return c.ptr.FetchMultiSeries(blockID, ids) +func (c *swappableCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []uint64) (map[uint64][]byte, []uint64) { + return c.ptr.FetchMultiSeries(ctx, blockID, ids) } type storeSuite struct { @@ -378,18 +378,18 @@ func TestBucketStore_e2e(t *testing.T) { testBucketStore_e2e(t, ctx, s) t.Log("Test with large, sufficient index cache") - indexCache, err := storecache.NewInMemoryIndexCache(s.logger, nil, storecache.Opts{ - MaxItemSizeBytes: 1e5, - MaxSizeBytes: 2e5, + indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(s.logger, nil, storecache.InMemoryIndexCacheConfig{ + MaxItemSize: 1e5, + MaxSize: 2e5, }) testutil.Ok(t, err) s.cache.SwapWith(indexCache) testBucketStore_e2e(t, ctx, s) t.Log("Test with small index cache") - indexCache2, err := storecache.NewInMemoryIndexCache(s.logger, nil, storecache.Opts{ - MaxItemSizeBytes: 50, - MaxSizeBytes: 100, + indexCache2, err := storecache.NewInMemoryIndexCacheWithConfig(s.logger, nil, storecache.InMemoryIndexCacheConfig{ + MaxItemSize: 50, + MaxSize: 100, }) testutil.Ok(t, err) s.cache.SwapWith(indexCache2) @@ -421,9 +421,9 @@ func TestBucketStore_ManyParts_e2e(t *testing.T) { s := prepareStoreWithTestBlocks(t, dir, bkt, true, 0, emptyRelabelConfig) - indexCache, err := storecache.NewInMemoryIndexCache(s.logger, nil, storecache.Opts{ - MaxItemSizeBytes: 1e5, - MaxSizeBytes: 2e5, + indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(s.logger, nil, storecache.InMemoryIndexCacheConfig{ + MaxItemSize: 1e5, + MaxSize: 2e5, }) testutil.Ok(t, err) s.cache.SwapWith(indexCache) diff --git a/pkg/store/cache/cache.go b/pkg/store/cache/cache.go index d2c42da88b..d7e175da82 100644 --- a/pkg/store/cache/cache.go +++ b/pkg/store/cache/cache.go @@ -1,22 +1,83 @@ package storecache import ( + "context" + "encoding/base64" + "strconv" + "github.com/oklog/ulid" "github.com/prometheus/prometheus/pkg/labels" + "golang.org/x/crypto/blake2b" +) + +const ( + cacheTypePostings string = "Postings" + cacheTypeSeries string = "Series" + + sliceHeaderSize = 16 ) +var ( + ulidSize = uint64(len(ulid.ULID{})) +) + +// IndexCache is the interface exported by index cache backends. type IndexCache interface { // StorePostings stores postings for a single series. - StorePostings(blockID ulid.ULID, l labels.Label, v []byte) + StorePostings(ctx context.Context, blockID ulid.ULID, l labels.Label, v []byte) // FetchMultiPostings fetches multiple postings - each identified by a label - // and returns a map containing cache hits, along with a list of missing keys. - FetchMultiPostings(blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) + FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) // StoreSeries stores a single series. - StoreSeries(blockID ulid.ULID, id uint64, v []byte) + StoreSeries(ctx context.Context, blockID ulid.ULID, id uint64, v []byte) // FetchMultiSeries fetches multiple series - each identified by ID - from the cache // and returns a map containing cache hits, along with a list of missing IDs. - FetchMultiSeries(blockID ulid.ULID, ids []uint64) (hits map[uint64][]byte, misses []uint64) + FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []uint64) (hits map[uint64][]byte, misses []uint64) +} + +type cacheKey struct { + block ulid.ULID + key interface{} } + +func (c cacheKey) keyType() string { + switch c.key.(type) { + case cacheKeyPostings: + return cacheTypePostings + case cacheKeySeries: + return cacheTypeSeries + } + return "" +} + +func (c cacheKey) size() uint64 { + switch k := c.key.(type) { + case cacheKeyPostings: + // ULID + 2 slice headers + number of chars in value and name. + return ulidSize + 2*sliceHeaderSize + uint64(len(k.Value)+len(k.Name)) + case cacheKeySeries: + return ulidSize + 8 // ULID + uint64. + } + return 0 +} + +func (c cacheKey) string() string { + switch c.key.(type) { + case cacheKeyPostings: + // Use cryptographically hash functions to avoid hash collisions + // which would end up in wrong query results. + lbl := c.key.(cacheKeyPostings) + lblHash := blake2b.Sum256([]byte(lbl.Name + ":" + lbl.Value)) + return "P:" + c.block.String() + ":" + base64.RawURLEncoding.EncodeToString(lblHash[0:]) + case cacheKeySeries: + return "S:" + c.block.String() + ":" + strconv.FormatUint(uint64(c.key.(cacheKeySeries)), 10) + default: + return "" + } +} + +type cacheKeyPostings labels.Label +type cacheKeySeries uint64 diff --git a/pkg/store/cache/cache_test.go b/pkg/store/cache/cache_test.go new file mode 100644 index 0000000000..60b0a3db7f --- /dev/null +++ b/pkg/store/cache/cache_test.go @@ -0,0 +1,99 @@ +package storecache + +import ( + "encoding/base64" + "fmt" + "math" + "strings" + "testing" + + "github.com/oklog/ulid" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/thanos-io/thanos/pkg/testutil" + "golang.org/x/crypto/blake2b" +) + +func TestCacheKey_string(t *testing.T) { + t.Parallel() + + uid := ulid.MustNew(1, nil) + + tests := map[string]struct { + key cacheKey + expected string + }{ + "should stringify postings cache key": { + key: cacheKey{uid, cacheKeyPostings(labels.Label{Name: "foo", Value: "bar"})}, + expected: func() string { + hash := blake2b.Sum256([]byte("foo:bar")) + encodedHash := base64.RawURLEncoding.EncodeToString(hash[0:]) + + return fmt.Sprintf("P:%s:%s", uid.String(), encodedHash) + }(), + }, + "should stringify series cache key": { + key: cacheKey{uid, cacheKeySeries(12345)}, + expected: fmt.Sprintf("S:%s:12345", uid.String()), + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + actual := testData.key.string() + testutil.Equals(t, testData.expected, actual) + }) + } +} + +func TestCacheKey_string_ShouldGuaranteeReasonablyShortKeyLength(t *testing.T) { + t.Parallel() + + uid := ulid.MustNew(1, nil) + + tests := map[string]struct { + keys []cacheKey + expectedLen int + }{ + "should guarantee reasonably short key length for postings": { + expectedLen: 72, + keys: []cacheKey{ + {uid, cacheKeyPostings(labels.Label{Name: "a", Value: "b"})}, + {uid, cacheKeyPostings(labels.Label{Name: strings.Repeat("a", 100), Value: strings.Repeat("a", 1000)})}, + }, + }, + "should guarantee reasonably short key length for series": { + expectedLen: 49, + keys: []cacheKey{ + {uid, cacheKeySeries(math.MaxUint64)}, + }, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + for _, key := range testData.keys { + testutil.Equals(t, testData.expectedLen, len(key.string())) + } + }) + } +} + +func BenchmarkCacheKey_string_Postings(b *testing.B) { + uid := ulid.MustNew(1, nil) + key := cacheKey{uid, cacheKeyPostings(labels.Label{Name: strings.Repeat("a", 100), Value: strings.Repeat("a", 1000)})} + + b.ResetTimer() + for i := 0; i < b.N; i++ { + key.string() + } +} + +func BenchmarkCacheKey_string_Series(b *testing.B) { + uid := ulid.MustNew(1, nil) + key := cacheKey{uid, cacheKeySeries(math.MaxUint64)} + + b.ResetTimer() + for i := 0; i < b.N; i++ { + key.string() + } +} diff --git a/pkg/store/cache/factory.go b/pkg/store/cache/factory.go new file mode 100644 index 0000000000..7fed27dac2 --- /dev/null +++ b/pkg/store/cache/factory.go @@ -0,0 +1,58 @@ +package storecache + +import ( + "fmt" + "strings" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/thanos/pkg/cacheutil" + "gopkg.in/yaml.v2" +) + +type IndexCacheProvider string + +const ( + INMEMORY IndexCacheProvider = "IN-MEMORY" + MEMCACHED IndexCacheProvider = "MEMCACHED" +) + +// IndexCacheConfig specifies the index cache config. +type IndexCacheConfig struct { + Type IndexCacheProvider `yaml:"type"` + Config interface{} `yaml:"config"` +} + +// NewIndexCache initializes and returns new index cache. +func NewIndexCache(logger log.Logger, confContentYaml []byte, reg prometheus.Registerer) (IndexCache, error) { + level.Info(logger).Log("msg", "loading index cache configuration") + cacheConfig := &IndexCacheConfig{} + if err := yaml.UnmarshalStrict(confContentYaml, cacheConfig); err != nil { + return nil, errors.Wrap(err, "parsing config YAML file") + } + + backendConfig, err := yaml.Marshal(cacheConfig.Config) + if err != nil { + return nil, errors.Wrap(err, "marshal content of cache backend configuration") + } + + var cache IndexCache + switch strings.ToUpper(string(cacheConfig.Type)) { + case string(INMEMORY): + cache, err = NewInMemoryIndexCache(logger, reg, backendConfig) + case string(MEMCACHED): + var memcached cacheutil.MemcachedClient + memcached, err = cacheutil.NewMemcachedClient(logger, "index-cache", backendConfig, reg) + if err == nil { + cache, err = NewMemcachedIndexCache(logger, memcached, reg) + } + default: + return nil, errors.Errorf("index cache with type %s is not supported", cacheConfig.Type) + } + if err != nil { + return nil, errors.Wrap(err, fmt.Sprintf("create %s index cache", cacheConfig.Type)) + } + return cache, nil +} diff --git a/pkg/store/cache/inmemory.go b/pkg/store/cache/inmemory.go index 3ef7d0c7ce..3b8881eb65 100644 --- a/pkg/store/cache/inmemory.go +++ b/pkg/store/cache/inmemory.go @@ -1,6 +1,7 @@ package storecache import ( + "context" "math" "sync" @@ -11,43 +12,15 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/labels" + "gopkg.in/yaml.v2" ) -const ( - cacheTypePostings string = "Postings" - cacheTypeSeries string = "Series" - - sliceHeaderSize = 16 -) - -type cacheKey struct { - block ulid.ULID - key interface{} -} - -func (c cacheKey) keyType() string { - switch c.key.(type) { - case cacheKeyPostings: - return cacheTypePostings - case cacheKeySeries: - return cacheTypeSeries +var ( + DefaultInMemoryIndexCacheConfig = InMemoryIndexCacheConfig{ + MaxSize: 250 * 1024 * 1024, + MaxItemSize: 125 * 1024 * 1024, } - return "" -} - -func (c cacheKey) size() uint64 { - switch k := c.key.(type) { - case cacheKeyPostings: - // ULID + 2 slice headers + number of chars in value and name. - return 16 + 2*sliceHeaderSize + uint64(len(k.Value)+len(k.Name)) - case cacheKeySeries: - return 16 + 8 // ULID + uint64. - } - return 0 -} - -type cacheKeyPostings labels.Label -type cacheKeySeries uint64 +) type InMemoryIndexCache struct { mtx sync.Mutex @@ -69,24 +42,46 @@ type InMemoryIndexCache struct { overflow *prometheus.CounterVec } -type Opts struct { - // MaxSizeBytes represents overall maximum number of bytes cache can contain. - MaxSizeBytes uint64 - // MaxItemSizeBytes represents maximum size of single item. - MaxItemSizeBytes uint64 +// InMemoryIndexCacheConfig holds the in-memory index cache config. +type InMemoryIndexCacheConfig struct { + // MaxSize represents overall maximum number of bytes cache can contain. + MaxSize Bytes `yaml:"max_size"` + // MaxItemSize represents maximum size of single item. + MaxItemSize Bytes `yaml:"max_item_size"` +} + +// parseInMemoryIndexCacheConfig unmarshals a buffer into a InMemoryIndexCacheConfig with default values. +func parseInMemoryIndexCacheConfig(conf []byte) (InMemoryIndexCacheConfig, error) { + config := DefaultInMemoryIndexCacheConfig + if err := yaml.Unmarshal(conf, &config); err != nil { + return InMemoryIndexCacheConfig{}, err + } + + return config, nil } // NewInMemoryIndexCache creates a new thread-safe LRU cache for index entries and ensures the total cache // size approximately does not exceed maxBytes. -func NewInMemoryIndexCache(logger log.Logger, reg prometheus.Registerer, opts Opts) (*InMemoryIndexCache, error) { - if opts.MaxItemSizeBytes > opts.MaxSizeBytes { - return nil, errors.Errorf("max item size (%v) cannot be bigger than overall cache size (%v)", opts.MaxItemSizeBytes, opts.MaxSizeBytes) +func NewInMemoryIndexCache(logger log.Logger, reg prometheus.Registerer, conf []byte) (*InMemoryIndexCache, error) { + config, err := parseInMemoryIndexCacheConfig(conf) + if err != nil { + return nil, err + } + + return NewInMemoryIndexCacheWithConfig(logger, reg, config) +} + +// NewInMemoryIndexCacheWithConfig creates a new thread-safe LRU cache for index entries and ensures the total cache +// size approximately does not exceed maxBytes. +func NewInMemoryIndexCacheWithConfig(logger log.Logger, reg prometheus.Registerer, config InMemoryIndexCacheConfig) (*InMemoryIndexCache, error) { + if config.MaxItemSize > config.MaxSize { + return nil, errors.Errorf("max item size (%v) cannot be bigger than overall cache size (%v)", config.MaxItemSize, config.MaxSize) } c := &InMemoryIndexCache{ logger: logger, - maxSizeBytes: opts.MaxSizeBytes, - maxItemSizeBytes: opts.MaxItemSizeBytes, + maxSizeBytes: uint64(config.MaxSize), + maxItemSizeBytes: uint64(config.MaxItemSize), } c.evicted = prometheus.NewCounterVec(prometheus.CounterOpts{ @@ -170,7 +165,7 @@ func NewInMemoryIndexCache(logger log.Logger, reg prometheus.Registerer, opts Op c.lru = l level.Info(logger).Log( - "msg", "created index cache", + "msg", "created in-memory index cache", "maxItemSizeBytes", c.maxItemSizeBytes, "maxSizeBytes", c.maxSizeBytes, "maxItems", "math.MaxInt64", @@ -273,13 +268,13 @@ func (c *InMemoryIndexCache) reset() { // StorePostings sets the postings identified by the ulid and label to the value v, // if the postings already exists in the cache it is not mutated. -func (c *InMemoryIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) { +func (c *InMemoryIndexCache) StorePostings(ctx context.Context, blockID ulid.ULID, l labels.Label, v []byte) { c.set(cacheTypePostings, cacheKey{blockID, cacheKeyPostings(l)}, v) } // FetchMultiPostings fetches multiple postings - each identified by a label - // and returns a map containing cache hits, along with a list of missing keys. -func (c *InMemoryIndexCache) FetchMultiPostings(blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) { +func (c *InMemoryIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) { hits = map[labels.Label][]byte{} for _, key := range keys { @@ -296,13 +291,13 @@ func (c *InMemoryIndexCache) FetchMultiPostings(blockID ulid.ULID, keys []labels // StoreSeries sets the series identified by the ulid and id to the value v, // if the series already exists in the cache it is not mutated. -func (c *InMemoryIndexCache) StoreSeries(blockID ulid.ULID, id uint64, v []byte) { +func (c *InMemoryIndexCache) StoreSeries(ctx context.Context, blockID ulid.ULID, id uint64, v []byte) { c.set(cacheTypeSeries, cacheKey{blockID, cacheKeySeries(id)}, v) } // FetchMultiSeries fetches multiple series - each identified by ID - from the cache // and returns a map containing cache hits, along with a list of missing IDs. -func (c *InMemoryIndexCache) FetchMultiSeries(blockID ulid.ULID, ids []uint64) (hits map[uint64][]byte, misses []uint64) { +func (c *InMemoryIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []uint64) (hits map[uint64][]byte, misses []uint64) { hits = map[uint64][]byte{} for _, id := range ids { diff --git a/pkg/store/cache/inmemory_test.go b/pkg/store/cache/inmemory_test.go index f70c79bf50..e14d3794af 100644 --- a/pkg/store/cache/inmemory_test.go +++ b/pkg/store/cache/inmemory_test.go @@ -3,6 +3,7 @@ package storecache import ( "bytes" + "context" "fmt" "math" "testing" @@ -18,13 +19,41 @@ import ( "github.com/thanos-io/thanos/pkg/testutil" ) +func TestNewInMemoryIndexCache(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + + // Should return error on invalid YAML config. + conf := []byte("invalid") + cache, err := NewInMemoryIndexCache(log.NewNopLogger(), nil, conf) + testutil.NotOk(t, err) + testutil.Equals(t, (*InMemoryIndexCache)(nil), cache) + + // Should instance an in-memory index cache with default config + // on empty YAML config. + conf = []byte{} + cache, err = NewInMemoryIndexCache(log.NewNopLogger(), nil, conf) + testutil.Ok(t, err) + testutil.Equals(t, uint64(DefaultInMemoryIndexCacheConfig.MaxSize), cache.maxSizeBytes) + testutil.Equals(t, uint64(DefaultInMemoryIndexCacheConfig.MaxItemSize), cache.maxItemSizeBytes) + + // Should instance an in-memory index cache with specified YAML config.s with units. + conf = []byte(` +max_size: 1MB +max_item_size: 2KB +`) + cache, err = NewInMemoryIndexCache(log.NewNopLogger(), nil, conf) + testutil.Ok(t, err) + testutil.Equals(t, uint64(1024*1024), cache.maxSizeBytes) + testutil.Equals(t, uint64(2*1024), cache.maxItemSizeBytes) +} + func TestInMemoryIndexCache_AvoidsDeadlock(t *testing.T) { defer leaktest.CheckTimeout(t, 10*time.Second)() metrics := prometheus.NewRegistry() - cache, err := NewInMemoryIndexCache(log.NewNopLogger(), metrics, Opts{ - MaxItemSizeBytes: sliceHeaderSize + 5, - MaxSizeBytes: sliceHeaderSize + 5, + cache, err := NewInMemoryIndexCacheWithConfig(log.NewNopLogger(), metrics, InMemoryIndexCacheConfig{ + MaxItemSize: sliceHeaderSize + 5, + MaxSize: sliceHeaderSize + 5, }) testutil.Ok(t, err) @@ -37,14 +66,15 @@ func TestInMemoryIndexCache_AvoidsDeadlock(t *testing.T) { testutil.Ok(t, err) cache.lru = l - cache.StorePostings(ulid.MustNew(0, nil), labels.Label{Name: "test2", Value: "1"}, []byte{42, 33, 14, 67, 11}) + ctx := context.Background() + cache.StorePostings(ctx, ulid.MustNew(0, nil), labels.Label{Name: "test2", Value: "1"}, []byte{42, 33, 14, 67, 11}) testutil.Equals(t, uint64(sliceHeaderSize+5), cache.curSize) testutil.Equals(t, float64(cache.curSize), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) // This triggers deadlock logic. - cache.StorePostings(ulid.MustNew(0, nil), labels.Label{Name: "test1", Value: "1"}, []byte{42}) + cache.StorePostings(ctx, ulid.MustNew(0, nil), labels.Label{Name: "test1", Value: "1"}, []byte{42}) testutil.Equals(t, uint64(sliceHeaderSize+1), cache.curSize) testutil.Equals(t, float64(cache.curSize), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypePostings))) @@ -74,14 +104,15 @@ func TestInMemoryIndexCache_UpdateItem(t *testing.T) { }) metrics := prometheus.NewRegistry() - cache, err := NewInMemoryIndexCache(log.NewSyncLogger(errorLogger), metrics, Opts{ - MaxItemSizeBytes: maxSize, - MaxSizeBytes: maxSize, + cache, err := NewInMemoryIndexCacheWithConfig(log.NewSyncLogger(errorLogger), metrics, InMemoryIndexCacheConfig{ + MaxItemSize: maxSize, + MaxSize: maxSize, }) testutil.Ok(t, err) uid := func(id uint64) ulid.ULID { return ulid.MustNew(id, nil) } lbl := labels.Label{Name: "foo", Value: "bar"} + ctx := context.Background() for _, tt := range []struct { typ string @@ -90,9 +121,9 @@ func TestInMemoryIndexCache_UpdateItem(t *testing.T) { }{ { typ: cacheTypePostings, - set: func(id uint64, b []byte) { cache.StorePostings(uid(id), lbl, b) }, + set: func(id uint64, b []byte) { cache.StorePostings(ctx, uid(id), lbl, b) }, get: func(id uint64) ([]byte, bool) { - hits, _ := cache.FetchMultiPostings(uid(id), []labels.Label{lbl}) + hits, _ := cache.FetchMultiPostings(ctx, uid(id), []labels.Label{lbl}) b, ok := hits[lbl] return b, ok @@ -100,9 +131,9 @@ func TestInMemoryIndexCache_UpdateItem(t *testing.T) { }, { typ: cacheTypeSeries, - set: func(id uint64, b []byte) { cache.StoreSeries(uid(id), id, b) }, + set: func(id uint64, b []byte) { cache.StoreSeries(ctx, uid(id), id, b) }, get: func(id uint64) ([]byte, bool) { - hits, _ := cache.FetchMultiSeries(uid(id), []uint64{id}) + hits, _ := cache.FetchMultiSeries(ctx, uid(id), []uint64{id}) b, ok := hits[id] return b, ok @@ -159,9 +190,9 @@ func TestInMemoryIndexCache_MaxNumberOfItemsHit(t *testing.T) { defer leaktest.CheckTimeout(t, 10*time.Second)() metrics := prometheus.NewRegistry() - cache, err := NewInMemoryIndexCache(log.NewNopLogger(), metrics, Opts{ - MaxItemSizeBytes: 2*sliceHeaderSize + 10, - MaxSizeBytes: 2*sliceHeaderSize + 10, + cache, err := NewInMemoryIndexCacheWithConfig(log.NewNopLogger(), metrics, InMemoryIndexCacheConfig{ + MaxItemSize: 2*sliceHeaderSize + 10, + MaxSize: 2*sliceHeaderSize + 10, }) testutil.Ok(t, err) @@ -170,10 +201,11 @@ func TestInMemoryIndexCache_MaxNumberOfItemsHit(t *testing.T) { cache.lru = l id := ulid.MustNew(0, nil) + ctx := context.Background() - cache.StorePostings(id, labels.Label{Name: "test", Value: "123"}, []byte{42, 33}) - cache.StorePostings(id, labels.Label{Name: "test", Value: "124"}, []byte{42, 33}) - cache.StorePostings(id, labels.Label{Name: "test", Value: "125"}, []byte{42, 33}) + cache.StorePostings(ctx, id, labels.Label{Name: "test", Value: "123"}, []byte{42, 33}) + cache.StorePostings(ctx, id, labels.Label{Name: "test", Value: "124"}, []byte{42, 33}) + cache.StorePostings(ctx, id, labels.Label{Name: "test", Value: "125"}, []byte{42, 33}) testutil.Equals(t, uint64(2*sliceHeaderSize+4), cache.curSize) testutil.Equals(t, float64(0), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypePostings))) @@ -192,25 +224,26 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { defer leaktest.CheckTimeout(t, 10*time.Second)() metrics := prometheus.NewRegistry() - cache, err := NewInMemoryIndexCache(log.NewNopLogger(), metrics, Opts{ - MaxItemSizeBytes: 2*sliceHeaderSize + 5, - MaxSizeBytes: 2*sliceHeaderSize + 5, + cache, err := NewInMemoryIndexCacheWithConfig(log.NewNopLogger(), metrics, InMemoryIndexCacheConfig{ + MaxItemSize: 2*sliceHeaderSize + 5, + MaxSize: 2*sliceHeaderSize + 5, }) testutil.Ok(t, err) id := ulid.MustNew(0, nil) lbls := labels.Label{Name: "test", Value: "123"} + ctx := context.Background() emptyPostingsHits := map[labels.Label][]byte{} emptyPostingsMisses := []labels.Label(nil) emptySeriesHits := map[uint64][]byte{} emptySeriesMisses := []uint64(nil) - pHits, pMisses := cache.FetchMultiPostings(id, []labels.Label{lbls}) + pHits, pMisses := cache.FetchMultiPostings(ctx, id, []labels.Label{lbls}) testutil.Equals(t, emptyPostingsHits, pHits, "no such key") testutil.Equals(t, []labels.Label{lbls}, pMisses) // Add sliceHeaderSize + 2 bytes. - cache.StorePostings(id, lbls, []byte{42, 33}) + cache.StorePostings(ctx, id, lbls, []byte{42, 33}) testutil.Equals(t, uint64(sliceHeaderSize+2), cache.curSize) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(sliceHeaderSize+2), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypePostings))) @@ -223,20 +256,20 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, float64(0), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(0), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) - pHits, pMisses = cache.FetchMultiPostings(id, []labels.Label{lbls}) + pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls}) testutil.Equals(t, map[labels.Label][]byte{lbls: []byte{42, 33}}, pHits, "key exists") testutil.Equals(t, emptyPostingsMisses, pMisses) - pHits, pMisses = cache.FetchMultiPostings(ulid.MustNew(1, nil), []labels.Label{lbls}) + pHits, pMisses = cache.FetchMultiPostings(ctx, ulid.MustNew(1, nil), []labels.Label{lbls}) testutil.Equals(t, emptyPostingsHits, pHits, "no such key") testutil.Equals(t, []labels.Label{lbls}, pMisses) - pHits, pMisses = cache.FetchMultiPostings(id, []labels.Label{{Name: "test", Value: "124"}}) + pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{{Name: "test", Value: "124"}}) testutil.Equals(t, emptyPostingsHits, pHits, "no such key") testutil.Equals(t, []labels.Label{{Name: "test", Value: "124"}}, pMisses) // Add sliceHeaderSize + 3 more bytes. - cache.StoreSeries(id, 1234, []byte{222, 223, 224}) + cache.StoreSeries(ctx, id, 1234, []byte{222, 223, 224}) testutil.Equals(t, uint64(2*sliceHeaderSize+5), cache.curSize) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(sliceHeaderSize+2), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypePostings))) @@ -249,7 +282,7 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, float64(0), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(0), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) - sHits, sMisses := cache.FetchMultiSeries(id, []uint64{1234}) + sHits, sMisses := cache.FetchMultiSeries(ctx, id, []uint64{1234}) testutil.Equals(t, map[uint64][]byte{1234: []byte{222, 223, 224}}, sHits, "key exists") testutil.Equals(t, emptySeriesMisses, sMisses) @@ -260,7 +293,7 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { for i := 0; i < sliceHeaderSize; i++ { v = append(v, 3) } - cache.StorePostings(id, lbls2, v) + cache.StorePostings(ctx, id, lbls2, v) testutil.Equals(t, uint64(2*sliceHeaderSize+5), cache.curSize) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) @@ -275,20 +308,20 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) // Eviction. // Evicted. - pHits, pMisses = cache.FetchMultiPostings(id, []labels.Label{lbls}) + pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls}) testutil.Equals(t, emptyPostingsHits, pHits, "no such key") testutil.Equals(t, []labels.Label{lbls}, pMisses) - sHits, sMisses = cache.FetchMultiSeries(id, []uint64{1234}) + sHits, sMisses = cache.FetchMultiSeries(ctx, id, []uint64{1234}) testutil.Equals(t, emptySeriesHits, sHits, "no such key") testutil.Equals(t, []uint64{1234}, sMisses) - pHits, pMisses = cache.FetchMultiPostings(id, []labels.Label{lbls2}) + pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls2}) testutil.Equals(t, map[labels.Label][]byte{lbls2: v}, pHits) testutil.Equals(t, emptyPostingsMisses, pMisses) // Add same item again. - cache.StorePostings(id, lbls2, v) + cache.StorePostings(ctx, id, lbls2, v) testutil.Equals(t, uint64(2*sliceHeaderSize+5), cache.curSize) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) @@ -302,12 +335,12 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) - pHits, pMisses = cache.FetchMultiPostings(id, []labels.Label{lbls2}) + pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls2}) testutil.Equals(t, map[labels.Label][]byte{lbls2: v}, pHits) testutil.Equals(t, emptyPostingsMisses, pMisses) // Add too big item. - cache.StorePostings(id, labels.Label{Name: "test", Value: "toobig"}, append(v, 5)) + cache.StorePostings(ctx, id, labels.Label{Name: "test", Value: "toobig"}, append(v, 5)) testutil.Equals(t, uint64(2*sliceHeaderSize+5), cache.curSize) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(2*sliceHeaderSize+5), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypePostings))) @@ -340,7 +373,7 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { lbls3 := labels.Label{Name: "test", Value: "124"} - cache.StorePostings(id, lbls3, []byte{}) + cache.StorePostings(ctx, id, lbls3, []byte{}) testutil.Equals(t, uint64(sliceHeaderSize), cache.curSize) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) @@ -354,13 +387,13 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, float64(2), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) - pHits, pMisses = cache.FetchMultiPostings(id, []labels.Label{lbls3}) + pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls3}) testutil.Equals(t, map[labels.Label][]byte{lbls3: []byte{}}, pHits, "key exists") testutil.Equals(t, emptyPostingsMisses, pMisses) // nil works and still allocates empty slice. lbls4 := labels.Label{Name: "test", Value: "125"} - cache.StorePostings(id, lbls4, []byte(nil)) + cache.StorePostings(ctx, id, lbls4, []byte(nil)) testutil.Equals(t, 2*uint64(sliceHeaderSize), cache.curSize) testutil.Equals(t, float64(2), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) @@ -374,7 +407,7 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, float64(2), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) - pHits, pMisses = cache.FetchMultiPostings(id, []labels.Label{lbls4}) + pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls4}) testutil.Equals(t, map[labels.Label][]byte{lbls4: []byte{}}, pHits, "key exists") testutil.Equals(t, emptyPostingsMisses, pMisses) diff --git a/pkg/store/cache/memcached.go b/pkg/store/cache/memcached.go new file mode 100644 index 0000000000..51c028dd54 --- /dev/null +++ b/pkg/store/cache/memcached.go @@ -0,0 +1,177 @@ +package storecache + +import ( + "context" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/thanos-io/thanos/pkg/cacheutil" +) + +const ( + memcachedDefaultTTL = 24 * time.Hour +) + +// MemcachedIndexCache is a memcached-based index cache. +type MemcachedIndexCache struct { + logger log.Logger + memcached cacheutil.MemcachedClient + + // Metrics. + requests *prometheus.CounterVec + hits *prometheus.CounterVec +} + +// NewMemcachedIndexCache makes a new MemcachedIndexCache. +func NewMemcachedIndexCache(logger log.Logger, memcached cacheutil.MemcachedClient, reg prometheus.Registerer) (*MemcachedIndexCache, error) { + c := &MemcachedIndexCache{ + logger: logger, + memcached: memcached, + } + + c.requests = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_store_index_cache_requests_total", + Help: "Total number of items requests to the cache.", + }, []string{"item_type"}) + c.requests.WithLabelValues(cacheTypePostings) + c.requests.WithLabelValues(cacheTypeSeries) + + c.hits = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_store_index_cache_hits_total", + Help: "Total number of items requests to the cache that were a hit.", + }, []string{"item_type"}) + c.hits.WithLabelValues(cacheTypePostings) + c.hits.WithLabelValues(cacheTypeSeries) + + if reg != nil { + reg.MustRegister(c.requests, c.hits) + } + + level.Info(logger).Log("msg", "created memcached index cache") + + return c, nil +} + +// StorePostings sets the postings identified by the ulid and label to the value v. +// The function enqueues the request and returns immediately: the entry will be +// asynchronously stored in the cache. +func (c *MemcachedIndexCache) StorePostings(ctx context.Context, blockID ulid.ULID, l labels.Label, v []byte) { + key := cacheKey{blockID, cacheKeyPostings(l)}.string() + + if err := c.memcached.SetAsync(ctx, key, v, memcachedDefaultTTL); err != nil { + level.Error(c.logger).Log("msg", "failed to cache postings in memcached", "err", err) + } +} + +// FetchMultiPostings fetches multiple postings - each identified by a label - +// and returns a map containing cache hits, along with a list of missing keys. +// In case of error, it logs and return an empty cache hits map. +func (c *MemcachedIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, lbls []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) { + // Build the cache keys, while keeping a map between input label and the cache key + // so that we can easily reverse it back after the GetMulti(). + keys := make([]string, 0, len(lbls)) + keysMapping := map[labels.Label]string{} + + for _, lbl := range lbls { + key := cacheKey{blockID, cacheKeyPostings(lbl)}.string() + + keys = append(keys, key) + keysMapping[lbl] = key + } + + // Fetch the keys from memcached in a single request. + c.requests.WithLabelValues(cacheTypePostings).Add(float64(len(keys))) + results := c.memcached.GetMulti(ctx, keys) + if len(results) == 0 { + return nil, lbls + } + + // Construct the resulting hits map and list of missing keys. We iterate on the input + // list of labels to be able to easily create the list of ones in a single iteration. + hits = map[labels.Label][]byte{} + + for _, lbl := range lbls { + key, ok := keysMapping[lbl] + if !ok { + level.Error(c.logger).Log("msg", "keys mapping inconsistency found in memcached index cache client", "type", "postings", "label", lbl.Name+":"+lbl.Value) + continue + } + + // Check if the key has been found in memcached. If not, we add it to the list + // of missing keys. + value, ok := results[key] + if !ok { + misses = append(misses, lbl) + continue + } + + hits[lbl] = value + } + + c.hits.WithLabelValues(cacheTypePostings).Add(float64(len(hits))) + return hits, misses +} + +// StoreSeries sets the series identified by the ulid and id to the value v. +// The function enqueues the request and returns immediately: the entry will be +// asynchronously stored in the cache. +func (c *MemcachedIndexCache) StoreSeries(ctx context.Context, blockID ulid.ULID, id uint64, v []byte) { + key := cacheKey{blockID, cacheKeySeries(id)}.string() + + if err := c.memcached.SetAsync(ctx, key, v, memcachedDefaultTTL); err != nil { + level.Error(c.logger).Log("msg", "failed to cache series in memcached", "err", err) + } +} + +// FetchMultiSeries fetches multiple series - each identified by ID - from the cache +// and returns a map containing cache hits, along with a list of missing IDs. +// In case of error, it logs and return an empty cache hits map. +func (c *MemcachedIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []uint64) (hits map[uint64][]byte, misses []uint64) { + // Build the cache keys, while keeping a map between input id and the cache key + // so that we can easily reverse it back after the GetMulti(). + keys := make([]string, 0, len(ids)) + keysMapping := map[uint64]string{} + + for _, id := range ids { + key := cacheKey{blockID, cacheKeySeries(id)}.string() + + keys = append(keys, key) + keysMapping[id] = key + } + + // Fetch the keys from memcached in a single request. + c.requests.WithLabelValues(cacheTypeSeries).Add(float64(len(ids))) + results := c.memcached.GetMulti(ctx, keys) + if len(results) == 0 { + return nil, ids + } + + // Construct the resulting hits map and list of missing keys. We iterate on the input + // list of ids to be able to easily create the list of ones in a single iteration. + hits = map[uint64][]byte{} + + for _, id := range ids { + key, ok := keysMapping[id] + if !ok { + level.Error(c.logger).Log("msg", "keys mapping inconsistency found in memcached index cache client", "type", "series", "id", id) + continue + } + + // Check if the key has been found in memcached. If not, we add it to the list + // of missing keys. + value, ok := results[key] + if !ok { + misses = append(misses, id) + continue + } + + hits[id] = value + } + + c.hits.WithLabelValues(cacheTypeSeries).Add(float64(len(hits))) + return hits, misses +} diff --git a/pkg/store/cache/memcached_test.go b/pkg/store/cache/memcached_test.go new file mode 100644 index 0000000000..83d858ff30 --- /dev/null +++ b/pkg/store/cache/memcached_test.go @@ -0,0 +1,247 @@ +package storecache + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/fortytw2/leaktest" + "github.com/go-kit/kit/log" + "github.com/oklog/ulid" + prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestMemcachedIndexCache_FetchMultiPostings(t *testing.T) { + t.Parallel() + defer leaktest.CheckTimeout(t, 10*time.Second)() + + // Init some data to conveniently define test cases later one. + block1 := ulid.MustNew(1, nil) + block2 := ulid.MustNew(2, nil) + label1 := labels.Label{Name: "instance", Value: "a"} + label2 := labels.Label{Name: "instance", Value: "b"} + value1 := []byte{1} + value2 := []byte{2} + value3 := []byte{3} + + tests := map[string]struct { + setup []mockedPostings + mockedErr error + fetchBlockID ulid.ULID + fetchLabels []labels.Label + expectedHits map[labels.Label][]byte + expectedMisses []labels.Label + }{ + "should return no hits on empty cache": { + setup: []mockedPostings{}, + fetchBlockID: block1, + fetchLabels: []labels.Label{label1, label2}, + expectedHits: nil, + expectedMisses: []labels.Label{label1, label2}, + }, + "should return no misses on 100% hit ratio": { + setup: []mockedPostings{ + {block: block1, label: label1, value: value1}, + {block: block1, label: label2, value: value2}, + {block: block2, label: label1, value: value3}, + }, + fetchBlockID: block1, + fetchLabels: []labels.Label{label1, label2}, + expectedHits: map[labels.Label][]byte{ + label1: value1, + label2: value2, + }, + expectedMisses: nil, + }, + "should return hits and misses on partial hits": { + setup: []mockedPostings{ + {block: block1, label: label1, value: value1}, + {block: block2, label: label1, value: value3}, + }, + fetchBlockID: block1, + fetchLabels: []labels.Label{label1, label2}, + expectedHits: map[labels.Label][]byte{label1: value1}, + expectedMisses: []labels.Label{label2}, + }, + "should return no hits on memcached error": { + setup: []mockedPostings{ + {block: block1, label: label1, value: value1}, + {block: block1, label: label2, value: value2}, + {block: block2, label: label1, value: value3}, + }, + mockedErr: errors.New("mocked error"), + fetchBlockID: block1, + fetchLabels: []labels.Label{label1, label2}, + expectedHits: nil, + expectedMisses: []labels.Label{label1, label2}, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + memcached := newMockedMemcachedClient(testData.mockedErr) + c, err := NewMemcachedIndexCache(log.NewNopLogger(), memcached, nil) + testutil.Ok(t, err) + + // Store the postings expected before running the test. + ctx := context.Background() + for _, p := range testData.setup { + c.StorePostings(ctx, p.block, p.label, p.value) + } + + // Fetch postings from cached and assert on it. + hits, misses := c.FetchMultiPostings(ctx, testData.fetchBlockID, testData.fetchLabels) + testutil.Equals(t, testData.expectedHits, hits) + testutil.Equals(t, testData.expectedMisses, misses) + + // Assert on metrics. + testutil.Equals(t, float64(len(testData.fetchLabels)), prom_testutil.ToFloat64(c.requests.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(len(testData.expectedHits)), prom_testutil.ToFloat64(c.hits.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.requests.WithLabelValues(cacheTypeSeries))) + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.hits.WithLabelValues(cacheTypeSeries))) + }) + } +} + +func TestMemcachedIndexCache_FetchMultiSeries(t *testing.T) { + t.Parallel() + defer leaktest.CheckTimeout(t, 10*time.Second)() + + // Init some data to conveniently define test cases later one. + block1 := ulid.MustNew(1, nil) + block2 := ulid.MustNew(2, nil) + value1 := []byte{1} + value2 := []byte{2} + value3 := []byte{3} + + tests := map[string]struct { + setup []mockedSeries + mockedErr error + fetchBlockID ulid.ULID + fetchIds []uint64 + expectedHits map[uint64][]byte + expectedMisses []uint64 + }{ + "should return no hits on empty cache": { + setup: []mockedSeries{}, + fetchBlockID: block1, + fetchIds: []uint64{1, 2}, + expectedHits: nil, + expectedMisses: []uint64{1, 2}, + }, + "should return no misses on 100% hit ratio": { + setup: []mockedSeries{ + {block: block1, id: 1, value: value1}, + {block: block1, id: 2, value: value2}, + {block: block2, id: 1, value: value3}, + }, + fetchBlockID: block1, + fetchIds: []uint64{1, 2}, + expectedHits: map[uint64][]byte{ + 1: value1, + 2: value2, + }, + expectedMisses: nil, + }, + "should return hits and misses on partial hits": { + setup: []mockedSeries{ + {block: block1, id: 1, value: value1}, + {block: block2, id: 1, value: value3}, + }, + fetchBlockID: block1, + fetchIds: []uint64{1, 2}, + expectedHits: map[uint64][]byte{1: value1}, + expectedMisses: []uint64{2}, + }, + "should return no hits on memcached error": { + setup: []mockedSeries{ + {block: block1, id: 1, value: value1}, + {block: block1, id: 2, value: value2}, + {block: block2, id: 1, value: value3}, + }, + mockedErr: errors.New("mocked error"), + fetchBlockID: block1, + fetchIds: []uint64{1, 2}, + expectedHits: nil, + expectedMisses: []uint64{1, 2}, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + memcached := newMockedMemcachedClient(testData.mockedErr) + c, err := NewMemcachedIndexCache(log.NewNopLogger(), memcached, nil) + testutil.Ok(t, err) + + // Store the series expected before running the test. + ctx := context.Background() + for _, p := range testData.setup { + c.StoreSeries(ctx, p.block, p.id, p.value) + } + + // Fetch series from cached and assert on it. + hits, misses := c.FetchMultiSeries(ctx, testData.fetchBlockID, testData.fetchIds) + testutil.Equals(t, testData.expectedHits, hits) + testutil.Equals(t, testData.expectedMisses, misses) + + // Assert on metrics. + testutil.Equals(t, float64(len(testData.fetchIds)), prom_testutil.ToFloat64(c.requests.WithLabelValues(cacheTypeSeries))) + testutil.Equals(t, float64(len(testData.expectedHits)), prom_testutil.ToFloat64(c.hits.WithLabelValues(cacheTypeSeries))) + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.requests.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.hits.WithLabelValues(cacheTypePostings))) + }) + } +} + +type mockedPostings struct { + block ulid.ULID + label labels.Label + value []byte +} + +type mockedSeries struct { + block ulid.ULID + id uint64 + value []byte +} + +type mockedMemcachedClient struct { + cache map[string][]byte + mockedGetMultiErr error +} + +func newMockedMemcachedClient(mockedGetMultiErr error) *mockedMemcachedClient { + return &mockedMemcachedClient{ + cache: map[string][]byte{}, + mockedGetMultiErr: mockedGetMultiErr, + } +} + +func (c *mockedMemcachedClient) GetMulti(ctx context.Context, keys []string) map[string][]byte { + if c.mockedGetMultiErr != nil { + return nil + } + + hits := map[string][]byte{} + + for _, key := range keys { + if value, ok := c.cache[key]; ok { + hits[key] = value + } + } + + return hits +} + +func (c *mockedMemcachedClient) SetAsync(ctx context.Context, key string, value []byte, ttl time.Duration) error { + c.cache[key] = value + + return nil +} + +func (c *mockedMemcachedClient) Stop() { + // Nothing to do. +} diff --git a/pkg/store/cache/units.go b/pkg/store/cache/units.go new file mode 100644 index 0000000000..d9b6dce367 --- /dev/null +++ b/pkg/store/cache/units.go @@ -0,0 +1,29 @@ +package storecache + +import ( + "github.com/alecthomas/units" +) + +// Bytes is a data type which supports yaml serialization/deserialization +// with units. +type Bytes uint64 + +func (b *Bytes) UnmarshalYAML(unmarshal func(interface{}) error) error { + var value string + err := unmarshal(&value) + if err != nil { + return err + } + + bytes, err := units.ParseBase2Bytes(value) + if err != nil { + return err + } + + *b = Bytes(bytes) + return nil +} + +func (b *Bytes) MarshalYAML() (interface{}, error) { + return units.Base2Bytes(*b).String(), nil +} diff --git a/pkg/store/cache/units_test.go b/pkg/store/cache/units_test.go new file mode 100644 index 0000000000..95438eaa50 --- /dev/null +++ b/pkg/store/cache/units_test.go @@ -0,0 +1,21 @@ +package storecache + +import ( + "testing" + + "github.com/thanos-io/thanos/pkg/testutil" + "gopkg.in/yaml.v2" +) + +func TestBytes_Marshalling(t *testing.T) { + value := Bytes(1048576) + + encoded, err := yaml.Marshal(&value) + testutil.Ok(t, err) + testutil.Equals(t, "1MiB\n", string(encoded)) + + var decoded Bytes + err = yaml.Unmarshal(encoded, &decoded) + testutil.Equals(t, value, decoded) + testutil.Ok(t, err) +} diff --git a/scripts/cfggen/main.go b/scripts/cfggen/main.go index 2d5cec20e1..5f33f2bdba 100644 --- a/scripts/cfggen/main.go +++ b/scripts/cfggen/main.go @@ -13,6 +13,7 @@ import ( "github.com/go-kit/kit/log/level" "github.com/pkg/errors" "github.com/thanos-io/thanos/pkg/alert" + "github.com/thanos-io/thanos/pkg/cacheutil" "github.com/thanos-io/thanos/pkg/objstore/azure" "github.com/thanos-io/thanos/pkg/objstore/client" "github.com/thanos-io/thanos/pkg/objstore/cos" @@ -21,6 +22,7 @@ import ( "github.com/thanos-io/thanos/pkg/objstore/oss" "github.com/thanos-io/thanos/pkg/objstore/s3" "github.com/thanos-io/thanos/pkg/objstore/swift" + storecache "github.com/thanos-io/thanos/pkg/store/cache" trclient "github.com/thanos-io/thanos/pkg/tracing/client" "github.com/thanos-io/thanos/pkg/tracing/elasticapm" "github.com/thanos-io/thanos/pkg/tracing/jaeger" @@ -46,6 +48,10 @@ var ( trclient.ELASTIC_APM: elasticapm.Config{}, trclient.LIGHTSTEP: lightstep.Config{}, } + indexCacheConfigs = map[storecache.IndexCacheProvider]interface{}{ + storecache.INMEMORY: storecache.InMemoryIndexCacheConfig{}, + storecache.MEMCACHED: cacheutil.MemcachedClientConfig{}, + } ) func main() { @@ -60,14 +66,21 @@ func main() { } for typ, config := range bucketConfigs { - if err := generate(client.BucketConfig{Type: typ, Config: config}, "bucket_"+strings.ToLower(string(typ)), *outputDir); err != nil { + if err := generate(client.BucketConfig{Type: typ, Config: config}, generateName("bucket_", string(typ)), *outputDir); err != nil { level.Error(logger).Log("msg", "failed to generate", "type", typ, "err", err) os.Exit(1) } } for typ, config := range tracingConfigs { - if err := generate(trclient.TracingConfig{Type: typ, Config: config}, "tracing_"+strings.ToLower(string(typ)), *outputDir); err != nil { + if err := generate(trclient.TracingConfig{Type: typ, Config: config}, generateName("tracing_", string(typ)), *outputDir); err != nil { + level.Error(logger).Log("msg", "failed to generate", "type", typ, "err", err) + os.Exit(1) + } + } + + for typ, config := range indexCacheConfigs { + if err := generate(storecache.IndexCacheConfig{Type: typ, Config: config}, generateName("index_cache_", string(typ)), *outputDir); err != nil { level.Error(logger).Log("msg", "failed to generate", "type", typ, "err", err) os.Exit(1) } @@ -96,6 +109,10 @@ func generate(obj interface{}, typ string, outputDir string) error { return ioutil.WriteFile(filepath.Join(outputDir, fmt.Sprintf("config_%s.txt", typ)), out, os.ModePerm) } +func generateName(prefix, typ string) string { + return prefix + strings.ReplaceAll(strings.ToLower(string(typ)), "-", "_") +} + func checkForOmitEmptyTagOption(obj interface{}) error { return checkForOmitEmptyTagOptionRec(reflect.ValueOf(obj)) }