Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query: add memcached autodiscovery support #4487

Merged
merged 7 commits into from
Aug 3, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#3970](https://github.com/thanos-io/thanos/pull/3970) Azure: Adds more configuration options for Azure blob storage. This allows for pipeline and reader specific configuration. Implements HTTP transport configuration options. These options allows for more fine-grained control on timeouts and retries. Implements MSI authentication as second method of authentication via a service principal token.
- [#4406](https://github.com/thanos-io/thanos/pull/4406) Tools: Add retention command for applying retention policy on the bucket.
- [#4430](https://github.com/thanos-io/thanos/pull/4430) Compact: Add flag `downsample.concurrency` to specify the concurrency of downsampling blocks.
- [#4487](https://github.com/thanos-io/thanos/pull/4487) Query: Add memcached auto discovery support.

### Fixed

Expand Down
1 change: 1 addition & 0 deletions docs/components/query-frontend.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ config:
max_item_size: 0
max_get_multi_batch_size: 0
dns_provider_update_interval: 0s
auto_discovery: false
expiration: 0s
```

Expand Down
4 changes: 3 additions & 1 deletion docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -290,11 +290,12 @@ config:
max_item_size: 0
max_get_multi_batch_size: 0
dns_provider_update_interval: 0s
auto_discovery: false
```

The **required** settings are:

- `addresses`: list of memcached addresses, that will get resolved with the [DNS service discovery](../service-discovery.md/#dns-service-discovery) provider.
- `addresses`: list of memcached addresses, that will get resolved with the [DNS service discovery](../service-discovery.md/#dns-service-discovery) provider. If your cluster supports auto-discovery, you should use the flag `auto_discovery` instead.
roystchiang marked this conversation as resolved.
Show resolved Hide resolved

While the remaining settings are **optional**:

Expand All @@ -306,6 +307,7 @@ While the remaining settings are **optional**:
- `max_get_multi_batch_size`: maximum number of keys a single underlying operation should fetch. If more keys are specified, internally keys are splitted into multiple batches and fetched concurrently, honoring `max_get_multi_concurrency`. If set to `0`, the batch size is unlimited.
- `max_item_size`: maximum size of an item to be stored in memcached. This option should be set to the same value of memcached `-I` flag (defaults to 1MB) in order to avoid wasting network round trips to store items larger than the max item size allowed in memcached. If set to `0`, the item size is unlimited.
- `dns_provider_update_interval`: the DNS discovery update interval.
- `auto_discovery`: whether to use the auto-discovery mechanism for memcached.

## Caching Bucket

Expand Down
52 changes: 36 additions & 16 deletions pkg/cacheutil/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"gopkg.in/yaml.v2"

"github.com/thanos-io/thanos/pkg/discovery/dns"
memcacheDiscovery "github.com/thanos-io/thanos/pkg/discovery/memcache"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/model"
Expand Down Expand Up @@ -53,6 +54,7 @@ var (
MaxGetMultiConcurrency: 100,
MaxGetMultiBatchSize: 0,
DNSProviderUpdateInterval: 10 * time.Second,
AutoDiscovery: false,
}
)

Expand Down Expand Up @@ -114,6 +116,9 @@ type MemcachedClientConfig struct {

// DNSProviderUpdateInterval specifies the DNS discovery update interval.
DNSProviderUpdateInterval time.Duration `yaml:"dns_provider_update_interval"`

// AutoDiscovery configures memached client to perform auto-discovery instead of DNS resolution
AutoDiscovery bool `yaml:"auto_discovery"`
}

func (c *MemcachedClientConfig) validate() error {
Expand Down Expand Up @@ -153,8 +158,8 @@ type memcachedClient struct {
// Name provides an identifier for the instantiated Client
name string

// DNS provider used to keep the memcached servers list updated.
dnsProvider *dns.Provider
// Address provider used to keep the memcached servers list updated.
addressProvider AddressProvider

// Channel used to notify internal goroutines when they should quit.
stop chan struct{}
Expand All @@ -177,6 +182,11 @@ type memcachedClient struct {
dataSize *prometheus.HistogramVec
}

type AddressProvider interface {
roystchiang marked this conversation as resolved.
Show resolved Hide resolved
Resolve(context.Context, []string) error
Addresses() []string
}

type memcachedGetMultiResult struct {
items map[string]*memcache.Item
err error
Expand Down Expand Up @@ -220,20 +230,30 @@ func newMemcachedClient(
reg prometheus.Registerer,
name string,
) (*memcachedClient, error) {
dnsProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_memcached_", reg),
dns.GolangResolverType,
)
promRegisterer := extprom.WrapRegistererWithPrefix("thanos_memcached_", reg)

var addressProvider AddressProvider
if config.AutoDiscovery {
addressProvider = memcacheDiscovery.NewProvider(
logger,
promRegisterer,
2*time.Second)
roystchiang marked this conversation as resolved.
Show resolved Hide resolved
} else {
addressProvider = dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_memcached_", reg),
dns.GolangResolverType,
)
}

c := &memcachedClient{
logger: log.With(logger, "name", name),
config: config,
client: client,
selector: selector,
dnsProvider: dnsProvider,
asyncQueue: make(chan func(), config.MaxAsyncBufferSize),
stop: make(chan struct{}, 1),
logger: log.With(logger, "name", name),
config: config,
client: client,
selector: selector,
addressProvider: addressProvider,
asyncQueue: make(chan func(), config.MaxAsyncBufferSize),
stop: make(chan struct{}, 1),
getMultiGate: gate.New(
extprom.WrapRegistererWithPrefix("thanos_memcached_getmulti_", reg),
config.MaxGetMultiConcurrency,
Expand Down Expand Up @@ -561,11 +581,11 @@ func (c *memcachedClient) resolveAddrs() error {
defer cancel()

// If some of the dns resolution fails, log the error.
if err := c.dnsProvider.Resolve(ctx, c.config.Addresses); err != nil {
if err := c.addressProvider.Resolve(ctx, c.config.Addresses); err != nil {
level.Error(c.logger).Log("msg", "failed to resolve addresses for memcached", "addresses", strings.Join(c.config.Addresses, ","), "err", err)
}
// Fail in case no server address is resolved.
servers := c.dnsProvider.Addresses()
servers := c.addressProvider.Addresses()
if len(servers) == 0 {
return fmt.Errorf("no server address resolved for %s", c.name)
}
Expand Down
107 changes: 107 additions & 0 deletions pkg/discovery/memcache/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package memcache

import (
"context"
"fmt"
"sync"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/thanos-io/thanos/pkg/errutil"
"github.com/thanos-io/thanos/pkg/extprom"
)

type Provider struct {
roystchiang marked this conversation as resolved.
Show resolved Hide resolved
sync.RWMutex
resolver Resolver
clusterConfigs map[string]*ClusterConfig
logger log.Logger

configVersion *extprom.TxGaugeVec
resolvedAddresses *extprom.TxGaugeVec
resolverFailuresCount prometheus.Counter
resolverLookupsCount prometheus.Counter
}

func NewProvider(logger log.Logger, reg prometheus.Registerer, dialTimeout time.Duration) *Provider {
p := &Provider{
resolver: &memcachedAutoDiscovery{dialTimeout: dialTimeout},
clusterConfigs: map[string]*ClusterConfig{},
configVersion: extprom.NewTxGaugeVec(reg, prometheus.GaugeOpts{
Name: "auto_discovery_config_version",
Help: "The current auto discovery config version",
}, []string{"addr"}),
resolvedAddresses: extprom.NewTxGaugeVec(reg, prometheus.GaugeOpts{
Name: "auto_discovery_resolved_addresses",
Help: "The number of memcached nodes found via auto discovery",
}, []string{"addr"}),
resolverLookupsCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "auto_discovery_total",
Help: "The number of memcache auto discovery attempts",
}),
resolverFailuresCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "auto_discovery_failures_total",
Help: "The number of memcache auto discovery failures",
}),
logger: logger,
}
return p
}

func (p *Provider) Resolve(ctx context.Context, addresses []string) error {
clusterConfigs := map[string]*ClusterConfig{}
errs := errutil.MultiError{}

for _, address := range addresses {
clusterConfig, err := p.resolver.Resolve(ctx, address)
p.resolverLookupsCount.Inc()

if err != nil {
level.Warn(p.logger).Log(
"msg", "failed to perform auto-discovery for memcached",
"address", address,
)
errs.Add(err)
p.resolverFailuresCount.Inc()

// Use cached values.
p.RLock()
clusterConfigs[address] = p.clusterConfigs[address]
p.RUnlock()
} else {
clusterConfigs[address] = clusterConfig
}
}

p.Lock()
defer p.Unlock()

p.resolvedAddresses.ResetTx()
p.configVersion.ResetTx()
for address, config := range clusterConfigs {
p.resolvedAddresses.WithLabelValues(address).Set(float64(len(config.nodes)))
p.configVersion.WithLabelValues(address).Set(float64(config.version))
}
p.resolvedAddresses.Submit()
p.configVersion.Submit()

p.clusterConfigs = clusterConfigs

return errs.Err()
}

func (p *Provider) Addresses() []string {
var result []string
for _, config := range p.clusterConfigs {
roystchiang marked this conversation as resolved.
Show resolved Hide resolved
for _, node := range config.nodes {
result = append(result, fmt.Sprintf("%s:%d", node.dns, node.port))
}
}
return result
}
88 changes: 88 additions & 0 deletions pkg/discovery/memcache/provider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package memcache

import (
"context"
"sort"
"testing"
"time"

"github.com/pkg/errors"

"github.com/go-kit/kit/log"
"github.com/thanos-io/thanos/pkg/testutil"
)

func TestProviderUpdatesAddresses(t *testing.T) {
ctx := context.TODO()
clusters := []string{"memcached-cluster-1", "memcached-cluster-2"}
provider := NewProvider(log.NewNopLogger(), nil, 5*time.Second)
resolver := mockResolver{
configs: map[string]*ClusterConfig{
"memcached-cluster-1": {nodes: []Node{{dns: "dns-1", ip: "ip-1", port: 11211}}},
"memcached-cluster-2": {nodes: []Node{{dns: "dns-2", ip: "ip-2", port: 8080}}},
},
}
provider.resolver = &resolver

err := provider.Resolve(ctx, clusters)
addresses := provider.Addresses()
sort.Strings(addresses)

testutil.Ok(t, err)
testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses)

resolver.configs = map[string]*ClusterConfig{
"memcached-cluster-1": {nodes: []Node{{dns: "dns-1", ip: "ip-1", port: 11211}, {dns: "dns-3", ip: "ip-3", port: 11211}}},
"memcached-cluster-2": {nodes: []Node{{dns: "dns-2", ip: "ip-2", port: 8080}}},
}
err = provider.Resolve(ctx, clusters)
roystchiang marked this conversation as resolved.
Show resolved Hide resolved
addresses = provider.Addresses()
sort.Strings(addresses)

testutil.Ok(t, err)
testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080", "dns-3:11211"}, addresses)
}

func TestProviderDoesNotUpdateAddressIfFailed(t *testing.T) {
ctx := context.TODO()
clusters := []string{"memcached-cluster-1", "memcached-cluster-2"}
provider := NewProvider(log.NewNopLogger(), nil, 5*time.Second)
resolver := mockResolver{
configs: map[string]*ClusterConfig{
"memcached-cluster-1": {nodes: []Node{{dns: "dns-1", ip: "ip-1", port: 11211}}},
"memcached-cluster-2": {nodes: []Node{{dns: "dns-2", ip: "ip-2", port: 8080}}},
},
}
provider.resolver = &resolver

err := provider.Resolve(ctx, clusters)
roystchiang marked this conversation as resolved.
Show resolved Hide resolved
addresses := provider.Addresses()
sort.Strings(addresses)

testutil.Ok(t, err)
testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses)

resolver.configs = nil
resolver.err = errors.New("oops")
err = provider.Resolve(ctx, clusters)
roystchiang marked this conversation as resolved.
Show resolved Hide resolved
addresses = provider.Addresses()
sort.Strings(addresses)

testutil.NotOk(t, err)
testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses)
}

type mockResolver struct {
configs map[string]*ClusterConfig
err error
}

func (r *mockResolver) Resolve(_ context.Context, address string) (*ClusterConfig, error) {
if r.err != nil {
return nil, r.err
}
return r.configs[address], nil
}
Loading