From af820e7282e506dcc52b843cd039fdc1344a3754 Mon Sep 17 00:00:00 2001 From: Roy Chiang Date: Fri, 23 Jul 2021 13:37:08 -0700 Subject: [PATCH 1/7] add memcached autodiscovery support Signed-off-by: Roy Chiang --- CHANGELOG.md | 1 + docs/components/query-frontend.md | 1 + docs/components/store.md | 4 +- pkg/cacheutil/memcached_client.go | 52 +++++++---- pkg/discovery/memcache/provider.go | 103 ++++++++++++++++++++++ pkg/discovery/memcache/provider_test.go | 85 ++++++++++++++++++ pkg/discovery/memcache/resolver.go | 112 ++++++++++++++++++++++++ pkg/discovery/memcache/resolver_test.go | 80 +++++++++++++++++ 8 files changed, 421 insertions(+), 17 deletions(-) create mode 100644 pkg/discovery/memcache/provider.go create mode 100644 pkg/discovery/memcache/provider_test.go create mode 100644 pkg/discovery/memcache/resolver.go create mode 100644 pkg/discovery/memcache/resolver_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 7ae72d51e4..596fd9e612 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#3970](https://github.com/thanos-io/thanos/pull/3970) Azure: Adds more configuration options for Azure blob storage. This allows for pipeline and reader specific configuration. Implements HTTP transport configuration options. These options allows for more fine-grained control on timeouts and retries. Implements MSI authentication as second method of authentication via a service principal token. - [#4406](https://github.com/thanos-io/thanos/pull/4406) Tools: Add retention command for applying retention policy on the bucket. - [#4430](https://github.com/thanos-io/thanos/pull/4430) Compact: Add flag `downsample.concurrency` to specify the concurrency of downsampling blocks. +- [#4487](https://github.com/thanos-io/thanos/pull/4487) Query: Add memcached auto discovery support. ### Fixed diff --git a/docs/components/query-frontend.md b/docs/components/query-frontend.md index e16228b670..6e1a6a7f74 100644 --- a/docs/components/query-frontend.md +++ b/docs/components/query-frontend.md @@ -74,6 +74,7 @@ config: max_item_size: 0 max_get_multi_batch_size: 0 dns_provider_update_interval: 0s + auto_discovery: false expiration: 0s ``` diff --git a/docs/components/store.md b/docs/components/store.md index 888a846991..f848be0634 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -290,11 +290,12 @@ config: max_item_size: 0 max_get_multi_batch_size: 0 dns_provider_update_interval: 0s + auto_discovery:false ``` The **required** settings are: -- `addresses`: list of memcached addresses, that will get resolved with the [DNS service discovery](../service-discovery.md/#dns-service-discovery) provider. +- `addresses`: list of memcached addresses, that will get resolved with the [DNS service discovery](../service-discovery.md/#dns-service-discovery) provider. If your cluster supports auto-discovery, you should use the flag `auto_discovery` instead. While the remaining settings are **optional**: @@ -306,6 +307,7 @@ While the remaining settings are **optional**: - `max_get_multi_batch_size`: maximum number of keys a single underlying operation should fetch. If more keys are specified, internally keys are splitted into multiple batches and fetched concurrently, honoring `max_get_multi_concurrency`. If set to `0`, the batch size is unlimited. - `max_item_size`: maximum size of an item to be stored in memcached. This option should be set to the same value of memcached `-I` flag (defaults to 1MB) in order to avoid wasting network round trips to store items larger than the max item size allowed in memcached. If set to `0`, the item size is unlimited. - `dns_provider_update_interval`: the DNS discovery update interval. +- `auto_discovery`: whether to use the auto-discovery mechanism for memcached. ## Caching Bucket diff --git a/pkg/cacheutil/memcached_client.go b/pkg/cacheutil/memcached_client.go index cb5adcb0d0..e849ca0359 100644 --- a/pkg/cacheutil/memcached_client.go +++ b/pkg/cacheutil/memcached_client.go @@ -21,6 +21,7 @@ import ( "gopkg.in/yaml.v2" "github.com/thanos-io/thanos/pkg/discovery/dns" + memcacheDiscovery "github.com/thanos-io/thanos/pkg/discovery/memcache" "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/gate" "github.com/thanos-io/thanos/pkg/model" @@ -53,6 +54,7 @@ var ( MaxGetMultiConcurrency: 100, MaxGetMultiBatchSize: 0, DNSProviderUpdateInterval: 10 * time.Second, + AutoDiscovery: false, } ) @@ -114,6 +116,9 @@ type MemcachedClientConfig struct { // DNSProviderUpdateInterval specifies the DNS discovery update interval. DNSProviderUpdateInterval time.Duration `yaml:"dns_provider_update_interval"` + + // AutoDiscovery configures memached client to perform auto-discovery instead of DNS resolution + AutoDiscovery bool `yaml:"auto_discovery"` } func (c *MemcachedClientConfig) validate() error { @@ -153,8 +158,8 @@ type memcachedClient struct { // Name provides an identifier for the instantiated Client name string - // DNS provider used to keep the memcached servers list updated. - dnsProvider *dns.Provider + // Address provider used to keep the memcached servers list updated. + addressProvider AddressProvider // Channel used to notify internal goroutines when they should quit. stop chan struct{} @@ -177,6 +182,11 @@ type memcachedClient struct { dataSize *prometheus.HistogramVec } +type AddressProvider interface { + Resolve(context.Context, []string) error + Addresses() []string +} + type memcachedGetMultiResult struct { items map[string]*memcache.Item err error @@ -220,20 +230,30 @@ func newMemcachedClient( reg prometheus.Registerer, name string, ) (*memcachedClient, error) { - dnsProvider := dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_memcached_", reg), - dns.GolangResolverType, - ) + promRegisterer := extprom.WrapRegistererWithPrefix("thanos_memcached_", reg) + + var addressProvider AddressProvider + if config.AutoDiscovery { + addressProvider = memcacheDiscovery.NewProvider( + logger, + promRegisterer, + 2*time.Second) + } else { + addressProvider = dns.NewProvider( + logger, + extprom.WrapRegistererWithPrefix("thanos_memcached_", reg), + dns.GolangResolverType, + ) + } c := &memcachedClient{ - logger: log.With(logger, "name", name), - config: config, - client: client, - selector: selector, - dnsProvider: dnsProvider, - asyncQueue: make(chan func(), config.MaxAsyncBufferSize), - stop: make(chan struct{}, 1), + logger: log.With(logger, "name", name), + config: config, + client: client, + selector: selector, + addressProvider: addressProvider, + asyncQueue: make(chan func(), config.MaxAsyncBufferSize), + stop: make(chan struct{}, 1), getMultiGate: gate.New( extprom.WrapRegistererWithPrefix("thanos_memcached_getmulti_", reg), config.MaxGetMultiConcurrency, @@ -561,11 +581,11 @@ func (c *memcachedClient) resolveAddrs() error { defer cancel() // If some of the dns resolution fails, log the error. - if err := c.dnsProvider.Resolve(ctx, c.config.Addresses); err != nil { + if err := c.addressProvider.Resolve(ctx, c.config.Addresses); err != nil { level.Error(c.logger).Log("msg", "failed to resolve addresses for memcached", "addresses", strings.Join(c.config.Addresses, ","), "err", err) } // Fail in case no server address is resolved. - servers := c.dnsProvider.Addresses() + servers := c.addressProvider.Addresses() if len(servers) == 0 { return fmt.Errorf("no server address resolved for %s", c.name) } diff --git a/pkg/discovery/memcache/provider.go b/pkg/discovery/memcache/provider.go new file mode 100644 index 0000000000..f005c39148 --- /dev/null +++ b/pkg/discovery/memcache/provider.go @@ -0,0 +1,103 @@ +package memcache + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/thanos-io/thanos/pkg/errutil" + "github.com/thanos-io/thanos/pkg/extprom" +) + +type Provider struct { + sync.RWMutex + resolver Resolver + clusterConfigs map[string]*ClusterConfig + logger log.Logger + + configVersion *extprom.TxGaugeVec + resolvedAddresses *extprom.TxGaugeVec + resolverFailuresCount prometheus.Counter + resolverLookupsCount prometheus.Counter +} + +func NewProvider(logger log.Logger, reg prometheus.Registerer, dialTimeout time.Duration) *Provider { + p := &Provider{ + resolver: &memcachedAutoDiscovery{dialTimeout: dialTimeout}, + clusterConfigs: map[string]*ClusterConfig{}, + configVersion: extprom.NewTxGaugeVec(reg, prometheus.GaugeOpts{ + Name: "auto_discovery_config_version", + Help: "The current auto discovery config version", + }, []string{"addr"}), + resolvedAddresses: extprom.NewTxGaugeVec(reg, prometheus.GaugeOpts{ + Name: "auto_discovery_resolved_addresses", + Help: "The number of memcached nodes found via auto discovery", + }, []string{"addr"}), + resolverLookupsCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "auto_discovery_total", + Help: "The number of memcache auto discovery attempts", + }), + resolverFailuresCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "auto_discovery_failures_total", + Help: "The number of memcache auto discovery failures", + }), + } + return p +} + +func (p *Provider) Resolve(ctx context.Context, addresses []string) error { + clusterConfigs := map[string]*ClusterConfig{} + errs := errutil.MultiError{} + + for _, address := range addresses { + clusterConfig, err := p.resolver.Resolve(ctx, address) + p.resolverLookupsCount.Inc() + + if err != nil { + level.Warn(p.logger).Log( + "msg", "failed to perform auto-discovery for memcached", + "address", address, + ) + errs.Add(err) + p.resolverFailuresCount.Inc() + + // Use cached values. + p.RLock() + clusterConfigs[address] = p.clusterConfigs[address] + p.RUnlock() + } else { + clusterConfigs[address] = clusterConfig + } + } + + p.Lock() + defer p.Unlock() + + p.resolvedAddresses.ResetTx() + p.configVersion.ResetTx() + for address, config := range clusterConfigs { + p.resolvedAddresses.WithLabelValues(address).Set(float64(len(config.nodes))) + p.configVersion.WithLabelValues(address).Set(float64(config.version)) + } + p.resolvedAddresses.Submit() + p.configVersion.Submit() + + p.clusterConfigs = clusterConfigs + + return errs.Err() +} + +func (p *Provider) Addresses() []string { + var result []string + for _, config := range p.clusterConfigs { + for _, node := range config.nodes { + result = append(result, fmt.Sprintf("%s:%d", node.dns, node.port)) + } + } + return result +} diff --git a/pkg/discovery/memcache/provider_test.go b/pkg/discovery/memcache/provider_test.go new file mode 100644 index 0000000000..56a6f6a834 --- /dev/null +++ b/pkg/discovery/memcache/provider_test.go @@ -0,0 +1,85 @@ +package memcache + +import ( + "context" + "sort" + "testing" + "time" + + "github.com/pkg/errors" + + "github.com/go-kit/kit/log" + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestProviderUpdatesAddresses(t *testing.T) { + ctx := context.TODO() + clusters := []string{"memcached-cluster-1", "memcached-cluster-2"} + provider := NewProvider(log.NewNopLogger(), nil, 5*time.Second) + resolver := mockResolver{ + configs: map[string]*ClusterConfig{ + "memcached-cluster-1": {nodes: []Node{{dns: "dns-1", ip: "ip-1", port: 11211}}}, + "memcached-cluster-2": {nodes: []Node{{dns: "dns-2", ip: "ip-2", port: 8080}}}, + }, + } + provider.resolver = &resolver + + err := provider.Resolve(ctx, clusters) + addresses := provider.Addresses() + sort.Strings(addresses) + + testutil.Ok(t, err) + testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses) + + resolver.configs = map[string]*ClusterConfig{ + "memcached-cluster-1": {nodes: []Node{{dns: "dns-1", ip: "ip-1", port: 11211}, {dns: "dns-3", ip: "ip-3", port: 11211}}}, + "memcached-cluster-2": {nodes: []Node{{dns: "dns-2", ip: "ip-2", port: 8080}}}, + } + err = provider.Resolve(ctx, clusters) + addresses = provider.Addresses() + sort.Strings(addresses) + + testutil.Ok(t, err) + testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080", "dns-3:11211"}, addresses) +} + +func TestProviderDoesNotUpdateAddressIfFailed(t *testing.T) { + ctx := context.TODO() + clusters := []string{"memcached-cluster-1", "memcached-cluster-2"} + provider := NewProvider(log.NewNopLogger(), nil, 5*time.Second) + resolver := mockResolver{ + configs: map[string]*ClusterConfig{ + "memcached-cluster-1": {nodes: []Node{{dns: "dns-1", ip: "ip-1", port: 11211}}}, + "memcached-cluster-2": {nodes: []Node{{dns: "dns-2", ip: "ip-2", port: 8080}}}, + }, + } + provider.resolver = &resolver + + err := provider.Resolve(ctx, clusters) + addresses := provider.Addresses() + sort.Strings(addresses) + + testutil.Ok(t, err) + testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses) + + resolver.configs = nil + resolver.err = errors.New("oops") + err = provider.Resolve(ctx, clusters) + addresses = provider.Addresses() + sort.Strings(addresses) + + testutil.NotOk(t, err) + testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses) +} + +type mockResolver struct { + configs map[string]*ClusterConfig + err error +} + +func (r *mockResolver) Resolve(_ context.Context, address string) (*ClusterConfig, error) { + if r.err != nil { + return nil, r.err + } + return r.configs[address], nil +} diff --git a/pkg/discovery/memcache/resolver.go b/pkg/discovery/memcache/resolver.go new file mode 100644 index 0000000000..0d31f6d387 --- /dev/null +++ b/pkg/discovery/memcache/resolver.go @@ -0,0 +1,112 @@ +package memcache + +import ( + "bufio" + "context" + "fmt" + "net" + "strconv" + "strings" + "time" +) + +type ClusterConfig struct { + version int + nodes []Node +} + +type Node struct { + dns string + ip string + port int +} + +type Resolver interface { + // Resolve performs a DNS lookup and returns a list of records. + // name is the domain name to be resolved. + // qtype is the query type. Accepted values are `dns` for A/AAAA lookup and `dnssrv` for SRV lookup. + // If scheme is passed through name, it is preserved on IP results. + Resolve(ctx context.Context, address string) (*ClusterConfig, error) +} + +type memcachedAutoDiscovery struct { + dialTimeout time.Duration +} + +func (s *memcachedAutoDiscovery) Resolve(ctx context.Context, address string) (config *ClusterConfig, err error) { + conn, err := net.DialTimeout("tcp", address, s.dialTimeout) + if err != nil { + return nil, err + } + defer func() { + err = conn.Close() + }() + + rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + if _, err := fmt.Fprintf(rw, "config get cluster\n"); err != nil { + return nil, err + } + if err := rw.Flush(); err != nil { + return nil, err + } + + config, err = s.parseConfig(rw.Reader) + if err != nil { + return nil, err + } + + return config, err +} + +func (s *memcachedAutoDiscovery) parseConfig(reader *bufio.Reader) (*ClusterConfig, error) { + clusterConfig := new(ClusterConfig) + + configMeta, err := reader.ReadString('\n') + if err != nil { + return nil, fmt.Errorf("failed to read config metadata: %s", err) + } + configMeta = strings.TrimSpace(configMeta) + + // First line should be "CONFIG cluster 0 [length-of-payload-] + configMetaComponents := strings.Split(configMeta, " ") + if len(configMetaComponents) != 4 { + return nil, fmt.Errorf("expected 4 components in config metadata, and received %d, meta: %s", len(configMetaComponents), configMeta) + } + + configSize, err := strconv.Atoi(configMetaComponents[3]) + if err != nil { + return nil, fmt.Errorf("failed to parse config size from metadata: %s, error: %s", configMeta, err) + } + + configVersion, err := reader.ReadString('\n') + if err != nil { + return nil, fmt.Errorf("failed to find config version: %s", err) + } + clusterConfig.version, err = strconv.Atoi(strings.TrimSpace(configVersion)) + if err != nil { + return nil, fmt.Errorf("failed to parser config version: %s", err) + } + + nodes, err := reader.ReadString('\n') + if err != nil { + return nil, fmt.Errorf("failed to read nodes: %s", err) + } + + if len(configVersion)+len(nodes) != configSize { + return nil, fmt.Errorf("expected %d in config payload, but got %d instead.", configSize, len(configVersion)+len(nodes)) + } + + for _, host := range strings.Split(strings.TrimSpace(nodes), " ") { + dnsIpPort := strings.Split(host, "|") + if len(dnsIpPort) != 3 { + return nil, fmt.Errorf("node not in expected format: %s", dnsIpPort) + } + port, err := strconv.Atoi(dnsIpPort[2]) + if err != nil { + return nil, fmt.Errorf("failed to parse port: %s, err: %s", dnsIpPort, err) + } + clusterConfig.nodes = append(clusterConfig.nodes, Node{dns: dnsIpPort[0], ip: dnsIpPort[1], port: port}) + } + + return clusterConfig, nil +} diff --git a/pkg/discovery/memcache/resolver_test.go b/pkg/discovery/memcache/resolver_test.go new file mode 100644 index 0000000000..b24209df8f --- /dev/null +++ b/pkg/discovery/memcache/resolver_test.go @@ -0,0 +1,80 @@ +package memcache + +import ( + "bufio" + "strings" + "testing" + + "github.com/pkg/errors" + + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestGoodClusterConfigs(t *testing.T) { + resolver := memcachedAutoDiscovery{} + testCases := []struct { + content string + config ClusterConfig + }{ + {"CONFIG cluster 0 23\r\n100\r\ndns-1|ip-1|11211\r\nEND\r\n", + ClusterConfig{nodes: []Node{{dns: "dns-1", ip: "ip-1", port: 11211}}, version: 100}, + }, + {"CONFIG cluster 0 37\r\n0\r\ndns-1|ip-1|11211 dns-2|ip-2|8080\r\nEND\r\n", + ClusterConfig{nodes: []Node{{dns: "dns-1", ip: "ip-1", port: 11211}, {dns: "dns-2", ip: "ip-2", port: 8080}}, version: 0}, + }, + } + + for _, testCase := range testCases { + reader := bufio.NewReader(strings.NewReader(testCase.content)) + + config, err := resolver.parseConfig(reader) + + testutil.Ok(t, err) + testutil.Equals(t, testCase.config, *config) + } +} + +func TestBadClusterConfigs(t *testing.T) { + resolver := memcachedAutoDiscovery{} + testCases := []struct { + content string + expectedErr error + }{ + {"", + errors.New("failed to read config metadata: EOF"), + }, + {"CONFIG cluster\r\n", + errors.New("expected 4 components in config metadata, and received 2, meta: CONFIG cluster"), + }, + {"CONFIG cluster 0 configSize\r\n", + errors.New("failed to parse config size from metadata: CONFIG cluster 0 configSize, error: strconv.Atoi: parsing \"configSize\": invalid syntax"), + }, + {"CONFIG cluster 0 100\r\n", + errors.New("failed to find config version: EOF"), + }, + {"CONFIG cluster 0 100\r\nconfigVersion\r\n", + errors.New("failed to parser config version: strconv.Atoi: parsing \"configVersion\": invalid syntax"), + }, + {"CONFIG cluster 0 100\r\n0\r\n", + errors.New("failed to read nodes: EOF"), + }, + {"CONFIG cluster 0 0\r\n100\r\ndns-1|ip-1|11211\r\nEND\r\n", + errors.New("expected 0 in config payload, but got 23 instead."), + }, + {"CONFIG cluster 0 17\r\n100\r\ndns-1|ip-1\r\nEND\r\n", + errors.New("node not in expected format: [dns-1 ip-1]"), + }, + {"CONFIG cluster 0 22\r\n100\r\ndns-1|ip-1|port\r\nEND\r\n", + errors.New("failed to parse port: [dns-1 ip-1 port], err: strconv.Atoi: parsing \"port\": invalid syntax"), + }, + } + + for _, testCase := range testCases { + reader := bufio.NewReader(strings.NewReader(testCase.content)) + + _, err := resolver.parseConfig(reader) + + testutil.Assert(t, testCase.expectedErr.Error() == err.Error(), "expected error '%v', but got '%v'", testCase.expectedErr.Error(), err.Error()) + } + +} From 0e8cbbc116a80df4876e88a239864eb0c1b884e8 Mon Sep 17 00:00:00 2001 From: Roy Chiang Date: Mon, 26 Jul 2021 14:48:23 -0700 Subject: [PATCH 2/7] pass logger to provider struct Signed-off-by: Roy Chiang --- pkg/discovery/memcache/provider.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/discovery/memcache/provider.go b/pkg/discovery/memcache/provider.go index f005c39148..d0d9358dc8 100644 --- a/pkg/discovery/memcache/provider.go +++ b/pkg/discovery/memcache/provider.go @@ -46,6 +46,7 @@ func NewProvider(logger log.Logger, reg prometheus.Registerer, dialTimeout time. Name: "auto_discovery_failures_total", Help: "The number of memcache auto discovery failures", }), + logger: logger, } return p } From 933b32547d226f45c65ab372291ffaba79cf3b8e Mon Sep 17 00:00:00 2001 From: Roy Chiang Date: Tue, 27 Jul 2021 08:07:57 -0700 Subject: [PATCH 3/7] fix linter issues Signed-off-by: Roy Chiang --- docs/components/store.md | 2 +- pkg/discovery/memcache/provider.go | 3 +++ pkg/discovery/memcache/provider_test.go | 3 +++ pkg/discovery/memcache/resolver.go | 3 +++ pkg/discovery/memcache/resolver_test.go | 3 +++ 5 files changed, 13 insertions(+), 1 deletion(-) diff --git a/docs/components/store.md b/docs/components/store.md index f848be0634..51694b229e 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -290,7 +290,7 @@ config: max_item_size: 0 max_get_multi_batch_size: 0 dns_provider_update_interval: 0s - auto_discovery:false + auto_discovery: false ``` The **required** settings are: diff --git a/pkg/discovery/memcache/provider.go b/pkg/discovery/memcache/provider.go index d0d9358dc8..5f558766f7 100644 --- a/pkg/discovery/memcache/provider.go +++ b/pkg/discovery/memcache/provider.go @@ -1,3 +1,6 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + package memcache import ( diff --git a/pkg/discovery/memcache/provider_test.go b/pkg/discovery/memcache/provider_test.go index 56a6f6a834..962d7f23f0 100644 --- a/pkg/discovery/memcache/provider_test.go +++ b/pkg/discovery/memcache/provider_test.go @@ -1,3 +1,6 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + package memcache import ( diff --git a/pkg/discovery/memcache/resolver.go b/pkg/discovery/memcache/resolver.go index 0d31f6d387..a0102ee923 100644 --- a/pkg/discovery/memcache/resolver.go +++ b/pkg/discovery/memcache/resolver.go @@ -1,3 +1,6 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + package memcache import ( diff --git a/pkg/discovery/memcache/resolver_test.go b/pkg/discovery/memcache/resolver_test.go index b24209df8f..b2bfca172a 100644 --- a/pkg/discovery/memcache/resolver_test.go +++ b/pkg/discovery/memcache/resolver_test.go @@ -1,3 +1,6 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + package memcache import ( From f3b2dfb4bea728d79b117d0204e7668490fe98d2 Mon Sep 17 00:00:00 2001 From: Roy Chiang Date: Tue, 27 Jul 2021 08:47:36 -0700 Subject: [PATCH 4/7] remove copypasted comment Signed-off-by: Roy Chiang --- pkg/discovery/memcache/resolver.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pkg/discovery/memcache/resolver.go b/pkg/discovery/memcache/resolver.go index a0102ee923..04994a1a6f 100644 --- a/pkg/discovery/memcache/resolver.go +++ b/pkg/discovery/memcache/resolver.go @@ -25,10 +25,6 @@ type Node struct { } type Resolver interface { - // Resolve performs a DNS lookup and returns a list of records. - // name is the domain name to be resolved. - // qtype is the query type. Accepted values are `dns` for A/AAAA lookup and `dnssrv` for SRV lookup. - // If scheme is passed through name, it is preserved on IP results. Resolve(ctx context.Context, address string) (*ClusterConfig, error) } From e80b1f38467760994e44590e8498c0917e31470f Mon Sep 17 00:00:00 2001 From: Roy Chiang Date: Wed, 28 Jul 2021 08:42:25 -0700 Subject: [PATCH 5/7] address comments Signed-off-by: Roy Chiang --- pkg/cacheutil/memcached_client.go | 7 ++++- pkg/discovery/memcache/provider.go | 10 ++++-- pkg/discovery/memcache/provider_test.go | 41 ++++++++++--------------- pkg/discovery/memcache/resolver.go | 22 ++++++------- pkg/discovery/memcache/resolver_test.go | 6 ++-- 5 files changed, 44 insertions(+), 42 deletions(-) diff --git a/pkg/cacheutil/memcached_client.go b/pkg/cacheutil/memcached_client.go index e849ca0359..445138b3e6 100644 --- a/pkg/cacheutil/memcached_client.go +++ b/pkg/cacheutil/memcached_client.go @@ -182,8 +182,12 @@ type memcachedClient struct { dataSize *prometheus.HistogramVec } +// AddressProvider performs node address resolution given a list of clusters. type AddressProvider interface { + // Resolves the provided list of memcached cluster to the actual nodes Resolve(context.Context, []string) error + + // Returns the nodes Addresses() []string } @@ -237,7 +241,8 @@ func newMemcachedClient( addressProvider = memcacheDiscovery.NewProvider( logger, promRegisterer, - 2*time.Second) + config.Timeout, + ) } else { addressProvider = dns.NewProvider( logger, diff --git a/pkg/discovery/memcache/provider.go b/pkg/discovery/memcache/provider.go index 5f558766f7..e958373afc 100644 --- a/pkg/discovery/memcache/provider.go +++ b/pkg/discovery/memcache/provider.go @@ -17,10 +17,12 @@ import ( "github.com/thanos-io/thanos/pkg/extprom" ) +// Provider is a stateful cache for asynchronous memcached auto-discovery resolution. It provides a way to resolve +// addresses and obtain them. type Provider struct { sync.RWMutex resolver Resolver - clusterConfigs map[string]*ClusterConfig + clusterConfigs map[string]*clusterConfig logger log.Logger configVersion *extprom.TxGaugeVec @@ -32,7 +34,7 @@ type Provider struct { func NewProvider(logger log.Logger, reg prometheus.Registerer, dialTimeout time.Duration) *Provider { p := &Provider{ resolver: &memcachedAutoDiscovery{dialTimeout: dialTimeout}, - clusterConfigs: map[string]*ClusterConfig{}, + clusterConfigs: map[string]*clusterConfig{}, configVersion: extprom.NewTxGaugeVec(reg, prometheus.GaugeOpts{ Name: "auto_discovery_config_version", Help: "The current auto discovery config version", @@ -54,8 +56,9 @@ func NewProvider(logger log.Logger, reg prometheus.Registerer, dialTimeout time. return p } +// Resolve stores a list of nodes auto-discovered from the provided addresses. func (p *Provider) Resolve(ctx context.Context, addresses []string) error { - clusterConfigs := map[string]*ClusterConfig{} + clusterConfigs := map[string]*clusterConfig{} errs := errutil.MultiError{} for _, address := range addresses { @@ -96,6 +99,7 @@ func (p *Provider) Resolve(ctx context.Context, addresses []string) error { return errs.Err() } +// Addresses returns the latest addresses present in the Provider. func (p *Provider) Addresses() []string { var result []string for _, config := range p.clusterConfigs { diff --git a/pkg/discovery/memcache/provider_test.go b/pkg/discovery/memcache/provider_test.go index 962d7f23f0..6ebbc16657 100644 --- a/pkg/discovery/memcache/provider_test.go +++ b/pkg/discovery/memcache/provider_test.go @@ -20,29 +20,25 @@ func TestProviderUpdatesAddresses(t *testing.T) { clusters := []string{"memcached-cluster-1", "memcached-cluster-2"} provider := NewProvider(log.NewNopLogger(), nil, 5*time.Second) resolver := mockResolver{ - configs: map[string]*ClusterConfig{ - "memcached-cluster-1": {nodes: []Node{{dns: "dns-1", ip: "ip-1", port: 11211}}}, - "memcached-cluster-2": {nodes: []Node{{dns: "dns-2", ip: "ip-2", port: 8080}}}, + configs: map[string]*clusterConfig{ + "memcached-cluster-1": {nodes: []node{{dns: "dns-1", ip: "ip-1", port: 11211}}}, + "memcached-cluster-2": {nodes: []node{{dns: "dns-2", ip: "ip-2", port: 8080}}}, }, } provider.resolver = &resolver - err := provider.Resolve(ctx, clusters) + testutil.Ok(t, provider.Resolve(ctx, clusters)) addresses := provider.Addresses() - sort.Strings(addresses) - - testutil.Ok(t, err) testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses) - resolver.configs = map[string]*ClusterConfig{ - "memcached-cluster-1": {nodes: []Node{{dns: "dns-1", ip: "ip-1", port: 11211}, {dns: "dns-3", ip: "ip-3", port: 11211}}}, - "memcached-cluster-2": {nodes: []Node{{dns: "dns-2", ip: "ip-2", port: 8080}}}, + resolver.configs = map[string]*clusterConfig{ + "memcached-cluster-1": {nodes: []node{{dns: "dns-1", ip: "ip-1", port: 11211}, {dns: "dns-3", ip: "ip-3", port: 11211}}}, + "memcached-cluster-2": {nodes: []node{{dns: "dns-2", ip: "ip-2", port: 8080}}}, } - err = provider.Resolve(ctx, clusters) + + testutil.Ok(t, provider.Resolve(ctx, clusters)) addresses = provider.Addresses() sort.Strings(addresses) - - testutil.Ok(t, err) testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080", "dns-3:11211"}, addresses) } @@ -51,36 +47,33 @@ func TestProviderDoesNotUpdateAddressIfFailed(t *testing.T) { clusters := []string{"memcached-cluster-1", "memcached-cluster-2"} provider := NewProvider(log.NewNopLogger(), nil, 5*time.Second) resolver := mockResolver{ - configs: map[string]*ClusterConfig{ - "memcached-cluster-1": {nodes: []Node{{dns: "dns-1", ip: "ip-1", port: 11211}}}, - "memcached-cluster-2": {nodes: []Node{{dns: "dns-2", ip: "ip-2", port: 8080}}}, + configs: map[string]*clusterConfig{ + "memcached-cluster-1": {nodes: []node{{dns: "dns-1", ip: "ip-1", port: 11211}}}, + "memcached-cluster-2": {nodes: []node{{dns: "dns-2", ip: "ip-2", port: 8080}}}, }, } provider.resolver = &resolver - err := provider.Resolve(ctx, clusters) + testutil.Ok(t, provider.Resolve(ctx, clusters)) addresses := provider.Addresses() sort.Strings(addresses) - - testutil.Ok(t, err) testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses) resolver.configs = nil resolver.err = errors.New("oops") - err = provider.Resolve(ctx, clusters) + + testutil.NotOk(t, provider.Resolve(ctx, clusters)) addresses = provider.Addresses() sort.Strings(addresses) - - testutil.NotOk(t, err) testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses) } type mockResolver struct { - configs map[string]*ClusterConfig + configs map[string]*clusterConfig err error } -func (r *mockResolver) Resolve(_ context.Context, address string) (*ClusterConfig, error) { +func (r *mockResolver) Resolve(_ context.Context, address string) (*clusterConfig, error) { if r.err != nil { return nil, r.err } diff --git a/pkg/discovery/memcache/resolver.go b/pkg/discovery/memcache/resolver.go index 04994a1a6f..4e7406cfba 100644 --- a/pkg/discovery/memcache/resolver.go +++ b/pkg/discovery/memcache/resolver.go @@ -11,35 +11,35 @@ import ( "strconv" "strings" "time" + + "github.com/thanos-io/thanos/pkg/runutil" ) -type ClusterConfig struct { +type clusterConfig struct { version int - nodes []Node + nodes []node } -type Node struct { +type node struct { dns string ip string port int } type Resolver interface { - Resolve(ctx context.Context, address string) (*ClusterConfig, error) + Resolve(ctx context.Context, address string) (*clusterConfig, error) } type memcachedAutoDiscovery struct { dialTimeout time.Duration } -func (s *memcachedAutoDiscovery) Resolve(ctx context.Context, address string) (config *ClusterConfig, err error) { +func (s *memcachedAutoDiscovery) Resolve(ctx context.Context, address string) (config *clusterConfig, err error) { conn, err := net.DialTimeout("tcp", address, s.dialTimeout) if err != nil { return nil, err } - defer func() { - err = conn.Close() - }() + defer runutil.CloseWithErrCapture(&err, conn, "closing connection") rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) if _, err := fmt.Fprintf(rw, "config get cluster\n"); err != nil { @@ -57,8 +57,8 @@ func (s *memcachedAutoDiscovery) Resolve(ctx context.Context, address string) (c return config, err } -func (s *memcachedAutoDiscovery) parseConfig(reader *bufio.Reader) (*ClusterConfig, error) { - clusterConfig := new(ClusterConfig) +func (s *memcachedAutoDiscovery) parseConfig(reader *bufio.Reader) (*clusterConfig, error) { + clusterConfig := new(clusterConfig) configMeta, err := reader.ReadString('\n') if err != nil { @@ -104,7 +104,7 @@ func (s *memcachedAutoDiscovery) parseConfig(reader *bufio.Reader) (*ClusterConf if err != nil { return nil, fmt.Errorf("failed to parse port: %s, err: %s", dnsIpPort, err) } - clusterConfig.nodes = append(clusterConfig.nodes, Node{dns: dnsIpPort[0], ip: dnsIpPort[1], port: port}) + clusterConfig.nodes = append(clusterConfig.nodes, node{dns: dnsIpPort[0], ip: dnsIpPort[1], port: port}) } return clusterConfig, nil diff --git a/pkg/discovery/memcache/resolver_test.go b/pkg/discovery/memcache/resolver_test.go index b2bfca172a..a1dad82e74 100644 --- a/pkg/discovery/memcache/resolver_test.go +++ b/pkg/discovery/memcache/resolver_test.go @@ -17,13 +17,13 @@ func TestGoodClusterConfigs(t *testing.T) { resolver := memcachedAutoDiscovery{} testCases := []struct { content string - config ClusterConfig + config clusterConfig }{ {"CONFIG cluster 0 23\r\n100\r\ndns-1|ip-1|11211\r\nEND\r\n", - ClusterConfig{nodes: []Node{{dns: "dns-1", ip: "ip-1", port: 11211}}, version: 100}, + clusterConfig{nodes: []node{{dns: "dns-1", ip: "ip-1", port: 11211}}, version: 100}, }, {"CONFIG cluster 0 37\r\n0\r\ndns-1|ip-1|11211 dns-2|ip-2|8080\r\nEND\r\n", - ClusterConfig{nodes: []Node{{dns: "dns-1", ip: "ip-1", port: 11211}, {dns: "dns-2", ip: "ip-2", port: 8080}}, version: 0}, + clusterConfig{nodes: []node{{dns: "dns-1", ip: "ip-1", port: 11211}, {dns: "dns-2", ip: "ip-2", port: 8080}}, version: 0}, }, } From 22ed6125e9a18f4fe9538ec977fe11092a105c5a Mon Sep 17 00:00:00 2001 From: Roy Chiang Date: Mon, 2 Aug 2021 08:24:52 -0700 Subject: [PATCH 6/7] Update docs/components/store.md MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Giedrius Statkevičius Signed-off-by: Roy Chiang --- docs/components/store.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/components/store.md b/docs/components/store.md index 51694b229e..0b955a4b3d 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -295,7 +295,7 @@ config: The **required** settings are: -- `addresses`: list of memcached addresses, that will get resolved with the [DNS service discovery](../service-discovery.md/#dns-service-discovery) provider. If your cluster supports auto-discovery, you should use the flag `auto_discovery` instead. +- `addresses`: list of memcached addresses, that will get resolved with the [DNS service discovery](../service-discovery.md/#dns-service-discovery) provider. If your cluster supports auto-discovery, you should use the flag `auto_discovery` instead and only point to *one of* the memcached servers. This typically means that there should be only one address specified that resolves to any of the alive memcached servers. Use this for Amazon ElastiCache and other similar services. While the remaining settings are **optional**: From 6c852d0cc807c136860eda3a1230f75d74a2418e Mon Sep 17 00:00:00 2001 From: Roy Chiang Date: Tue, 3 Aug 2021 09:07:38 -0700 Subject: [PATCH 7/7] add lock/unlock for autodiscovery provider Signed-off-by: Roy Chiang --- pkg/discovery/memcache/provider.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/discovery/memcache/provider.go b/pkg/discovery/memcache/provider.go index e958373afc..9bb1317824 100644 --- a/pkg/discovery/memcache/provider.go +++ b/pkg/discovery/memcache/provider.go @@ -101,6 +101,9 @@ func (p *Provider) Resolve(ctx context.Context, addresses []string) error { // Addresses returns the latest addresses present in the Provider. func (p *Provider) Addresses() []string { + p.RLock() + defer p.RUnlock() + var result []string for _, config := range p.clusterConfigs { for _, node := range config.nodes {