Skip to content

Commit

Permalink
query: add memcached autodiscovery support (#4487)
Browse files Browse the repository at this point in the history
* add memcached autodiscovery support

Signed-off-by: Roy Chiang <[email protected]>

* pass logger to provider struct

Signed-off-by: Roy Chiang <[email protected]>

* fix linter issues

Signed-off-by: Roy Chiang <[email protected]>

* remove copypasted comment

Signed-off-by: Roy Chiang <[email protected]>

* address comments

Signed-off-by: Roy Chiang <[email protected]>

* Update docs/components/store.md

Co-authored-by: Giedrius Statkevičius <[email protected]>
Signed-off-by: Roy Chiang <[email protected]>

* add lock/unlock for autodiscovery provider

Signed-off-by: Roy Chiang <[email protected]>

Co-authored-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
roystchiang and GiedriusS authored Aug 3, 2021
1 parent aee7c55 commit baea4ce
Show file tree
Hide file tree
Showing 8 changed files with 435 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#3970](https://github.com/thanos-io/thanos/pull/3970) Azure: Adds more configuration options for Azure blob storage. This allows for pipeline and reader specific configuration. Implements HTTP transport configuration options. These options allows for more fine-grained control on timeouts and retries. Implements MSI authentication as second method of authentication via a service principal token.
- [#4406](https://github.com/thanos-io/thanos/pull/4406) Tools: Add retention command for applying retention policy on the bucket.
- [#4430](https://github.com/thanos-io/thanos/pull/4430) Compact: Add flag `downsample.concurrency` to specify the concurrency of downsampling blocks.
- [#4487](https://github.com/thanos-io/thanos/pull/4487) Query: Add memcached auto discovery support.

### Fixed

Expand Down
1 change: 1 addition & 0 deletions docs/components/query-frontend.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ config:
max_item_size: 0
max_get_multi_batch_size: 0
dns_provider_update_interval: 0s
auto_discovery: false
expiration: 0s
```

Expand Down
4 changes: 3 additions & 1 deletion docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -290,11 +290,12 @@ config:
max_item_size: 0
max_get_multi_batch_size: 0
dns_provider_update_interval: 0s
auto_discovery: false
```

The **required** settings are:

- `addresses`: list of memcached addresses, that will get resolved with the [DNS service discovery](../service-discovery.md/#dns-service-discovery) provider.
- `addresses`: list of memcached addresses, that will get resolved with the [DNS service discovery](../service-discovery.md/#dns-service-discovery) provider. If your cluster supports auto-discovery, you should use the flag `auto_discovery` instead and only point to *one of* the memcached servers. This typically means that there should be only one address specified that resolves to any of the alive memcached servers. Use this for Amazon ElastiCache and other similar services.

While the remaining settings are **optional**:

Expand All @@ -306,6 +307,7 @@ While the remaining settings are **optional**:
- `max_get_multi_batch_size`: maximum number of keys a single underlying operation should fetch. If more keys are specified, internally keys are splitted into multiple batches and fetched concurrently, honoring `max_get_multi_concurrency`. If set to `0`, the batch size is unlimited.
- `max_item_size`: maximum size of an item to be stored in memcached. This option should be set to the same value of memcached `-I` flag (defaults to 1MB) in order to avoid wasting network round trips to store items larger than the max item size allowed in memcached. If set to `0`, the item size is unlimited.
- `dns_provider_update_interval`: the DNS discovery update interval.
- `auto_discovery`: whether to use the auto-discovery mechanism for memcached.

## Caching Bucket

Expand Down
57 changes: 41 additions & 16 deletions pkg/cacheutil/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"gopkg.in/yaml.v2"

"github.com/thanos-io/thanos/pkg/discovery/dns"
memcacheDiscovery "github.com/thanos-io/thanos/pkg/discovery/memcache"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/model"
Expand Down Expand Up @@ -53,6 +54,7 @@ var (
MaxGetMultiConcurrency: 100,
MaxGetMultiBatchSize: 0,
DNSProviderUpdateInterval: 10 * time.Second,
AutoDiscovery: false,
}
)

Expand Down Expand Up @@ -114,6 +116,9 @@ type MemcachedClientConfig struct {

// DNSProviderUpdateInterval specifies the DNS discovery update interval.
DNSProviderUpdateInterval time.Duration `yaml:"dns_provider_update_interval"`

// AutoDiscovery configures memached client to perform auto-discovery instead of DNS resolution
AutoDiscovery bool `yaml:"auto_discovery"`
}

func (c *MemcachedClientConfig) validate() error {
Expand Down Expand Up @@ -153,8 +158,8 @@ type memcachedClient struct {
// Name provides an identifier for the instantiated Client
name string

// DNS provider used to keep the memcached servers list updated.
dnsProvider *dns.Provider
// Address provider used to keep the memcached servers list updated.
addressProvider AddressProvider

// Channel used to notify internal goroutines when they should quit.
stop chan struct{}
Expand All @@ -177,6 +182,15 @@ type memcachedClient struct {
dataSize *prometheus.HistogramVec
}

// AddressProvider performs node address resolution given a list of clusters.
type AddressProvider interface {
// Resolves the provided list of memcached cluster to the actual nodes
Resolve(context.Context, []string) error

// Returns the nodes
Addresses() []string
}

type memcachedGetMultiResult struct {
items map[string]*memcache.Item
err error
Expand Down Expand Up @@ -220,20 +234,31 @@ func newMemcachedClient(
reg prometheus.Registerer,
name string,
) (*memcachedClient, error) {
dnsProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_memcached_", reg),
dns.GolangResolverType,
)
promRegisterer := extprom.WrapRegistererWithPrefix("thanos_memcached_", reg)

var addressProvider AddressProvider
if config.AutoDiscovery {
addressProvider = memcacheDiscovery.NewProvider(
logger,
promRegisterer,
config.Timeout,
)
} else {
addressProvider = dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_memcached_", reg),
dns.GolangResolverType,
)
}

c := &memcachedClient{
logger: log.With(logger, "name", name),
config: config,
client: client,
selector: selector,
dnsProvider: dnsProvider,
asyncQueue: make(chan func(), config.MaxAsyncBufferSize),
stop: make(chan struct{}, 1),
logger: log.With(logger, "name", name),
config: config,
client: client,
selector: selector,
addressProvider: addressProvider,
asyncQueue: make(chan func(), config.MaxAsyncBufferSize),
stop: make(chan struct{}, 1),
getMultiGate: gate.New(
extprom.WrapRegistererWithPrefix("thanos_memcached_getmulti_", reg),
config.MaxGetMultiConcurrency,
Expand Down Expand Up @@ -561,11 +586,11 @@ func (c *memcachedClient) resolveAddrs() error {
defer cancel()

// If some of the dns resolution fails, log the error.
if err := c.dnsProvider.Resolve(ctx, c.config.Addresses); err != nil {
if err := c.addressProvider.Resolve(ctx, c.config.Addresses); err != nil {
level.Error(c.logger).Log("msg", "failed to resolve addresses for memcached", "addresses", strings.Join(c.config.Addresses, ","), "err", err)
}
// Fail in case no server address is resolved.
servers := c.dnsProvider.Addresses()
servers := c.addressProvider.Addresses()
if len(servers) == 0 {
return fmt.Errorf("no server address resolved for %s", c.name)
}
Expand Down
114 changes: 114 additions & 0 deletions pkg/discovery/memcache/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package memcache

import (
"context"
"fmt"
"sync"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/thanos-io/thanos/pkg/errutil"
"github.com/thanos-io/thanos/pkg/extprom"
)

// Provider is a stateful cache for asynchronous memcached auto-discovery resolution. It provides a way to resolve
// addresses and obtain them.
type Provider struct {
sync.RWMutex
resolver Resolver
clusterConfigs map[string]*clusterConfig
logger log.Logger

configVersion *extprom.TxGaugeVec
resolvedAddresses *extprom.TxGaugeVec
resolverFailuresCount prometheus.Counter
resolverLookupsCount prometheus.Counter
}

func NewProvider(logger log.Logger, reg prometheus.Registerer, dialTimeout time.Duration) *Provider {
p := &Provider{
resolver: &memcachedAutoDiscovery{dialTimeout: dialTimeout},
clusterConfigs: map[string]*clusterConfig{},
configVersion: extprom.NewTxGaugeVec(reg, prometheus.GaugeOpts{
Name: "auto_discovery_config_version",
Help: "The current auto discovery config version",
}, []string{"addr"}),
resolvedAddresses: extprom.NewTxGaugeVec(reg, prometheus.GaugeOpts{
Name: "auto_discovery_resolved_addresses",
Help: "The number of memcached nodes found via auto discovery",
}, []string{"addr"}),
resolverLookupsCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "auto_discovery_total",
Help: "The number of memcache auto discovery attempts",
}),
resolverFailuresCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "auto_discovery_failures_total",
Help: "The number of memcache auto discovery failures",
}),
logger: logger,
}
return p
}

// Resolve stores a list of nodes auto-discovered from the provided addresses.
func (p *Provider) Resolve(ctx context.Context, addresses []string) error {
clusterConfigs := map[string]*clusterConfig{}
errs := errutil.MultiError{}

for _, address := range addresses {
clusterConfig, err := p.resolver.Resolve(ctx, address)
p.resolverLookupsCount.Inc()

if err != nil {
level.Warn(p.logger).Log(
"msg", "failed to perform auto-discovery for memcached",
"address", address,
)
errs.Add(err)
p.resolverFailuresCount.Inc()

// Use cached values.
p.RLock()
clusterConfigs[address] = p.clusterConfigs[address]
p.RUnlock()
} else {
clusterConfigs[address] = clusterConfig
}
}

p.Lock()
defer p.Unlock()

p.resolvedAddresses.ResetTx()
p.configVersion.ResetTx()
for address, config := range clusterConfigs {
p.resolvedAddresses.WithLabelValues(address).Set(float64(len(config.nodes)))
p.configVersion.WithLabelValues(address).Set(float64(config.version))
}
p.resolvedAddresses.Submit()
p.configVersion.Submit()

p.clusterConfigs = clusterConfigs

return errs.Err()
}

// Addresses returns the latest addresses present in the Provider.
func (p *Provider) Addresses() []string {
p.RLock()
defer p.RUnlock()

var result []string
for _, config := range p.clusterConfigs {
for _, node := range config.nodes {
result = append(result, fmt.Sprintf("%s:%d", node.dns, node.port))
}
}
return result
}
81 changes: 81 additions & 0 deletions pkg/discovery/memcache/provider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package memcache

import (
"context"
"sort"
"testing"
"time"

"github.com/pkg/errors"

"github.com/go-kit/kit/log"
"github.com/thanos-io/thanos/pkg/testutil"
)

func TestProviderUpdatesAddresses(t *testing.T) {
ctx := context.TODO()
clusters := []string{"memcached-cluster-1", "memcached-cluster-2"}
provider := NewProvider(log.NewNopLogger(), nil, 5*time.Second)
resolver := mockResolver{
configs: map[string]*clusterConfig{
"memcached-cluster-1": {nodes: []node{{dns: "dns-1", ip: "ip-1", port: 11211}}},
"memcached-cluster-2": {nodes: []node{{dns: "dns-2", ip: "ip-2", port: 8080}}},
},
}
provider.resolver = &resolver

testutil.Ok(t, provider.Resolve(ctx, clusters))
addresses := provider.Addresses()
testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses)

resolver.configs = map[string]*clusterConfig{
"memcached-cluster-1": {nodes: []node{{dns: "dns-1", ip: "ip-1", port: 11211}, {dns: "dns-3", ip: "ip-3", port: 11211}}},
"memcached-cluster-2": {nodes: []node{{dns: "dns-2", ip: "ip-2", port: 8080}}},
}

testutil.Ok(t, provider.Resolve(ctx, clusters))
addresses = provider.Addresses()
sort.Strings(addresses)
testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080", "dns-3:11211"}, addresses)
}

func TestProviderDoesNotUpdateAddressIfFailed(t *testing.T) {
ctx := context.TODO()
clusters := []string{"memcached-cluster-1", "memcached-cluster-2"}
provider := NewProvider(log.NewNopLogger(), nil, 5*time.Second)
resolver := mockResolver{
configs: map[string]*clusterConfig{
"memcached-cluster-1": {nodes: []node{{dns: "dns-1", ip: "ip-1", port: 11211}}},
"memcached-cluster-2": {nodes: []node{{dns: "dns-2", ip: "ip-2", port: 8080}}},
},
}
provider.resolver = &resolver

testutil.Ok(t, provider.Resolve(ctx, clusters))
addresses := provider.Addresses()
sort.Strings(addresses)
testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses)

resolver.configs = nil
resolver.err = errors.New("oops")

testutil.NotOk(t, provider.Resolve(ctx, clusters))
addresses = provider.Addresses()
sort.Strings(addresses)
testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses)
}

type mockResolver struct {
configs map[string]*clusterConfig
err error
}

func (r *mockResolver) Resolve(_ context.Context, address string) (*clusterConfig, error) {
if r.err != nil {
return nil, r.err
}
return r.configs[address], nil
}
Loading

0 comments on commit baea4ce

Please sign in to comment.