diff --git a/CHANGELOG.md b/CHANGELOG.md index 7ddd044f87a..26227c994b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#3970](https://github.com/thanos-io/thanos/pull/3970) Azure: Adds more configuration options for Azure blob storage. This allows for pipeline and reader specific configuration. Implements HTTP transport configuration options. These options allows for more fine-grained control on timeouts and retries. Implements MSI authentication as second method of authentication via a service principal token. - [#4406](https://github.com/thanos-io/thanos/pull/4406) Tools: Add retention command for applying retention policy on the bucket. - [#4430](https://github.com/thanos-io/thanos/pull/4430) Compact: Add flag `downsample.concurrency` to specify the concurrency of downsampling blocks. +- [#4487](https://github.com/thanos-io/thanos/pull/4487) Query: Add memcached auto discovery support. ### Fixed diff --git a/docs/components/store.md b/docs/components/store.md index 888a8469917..f1872272183 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -294,7 +294,7 @@ config: The **required** settings are: -- `addresses`: list of memcached addresses, that will get resolved with the [DNS service discovery](../service-discovery.md/#dns-service-discovery) provider. +- `addresses`: list of memcached addresses, that will get resolved with the [DNS service discovery](../service-discovery.md/#dns-service-discovery) provider. If your cluster supports auto-discovery, you should use the flag `auto_discovery` instead. While the remaining settings are **optional**: @@ -306,6 +306,7 @@ While the remaining settings are **optional**: - `max_get_multi_batch_size`: maximum number of keys a single underlying operation should fetch. If more keys are specified, internally keys are splitted into multiple batches and fetched concurrently, honoring `max_get_multi_concurrency`. If set to `0`, the batch size is unlimited. - `max_item_size`: maximum size of an item to be stored in memcached. This option should be set to the same value of memcached `-I` flag (defaults to 1MB) in order to avoid wasting network round trips to store items larger than the max item size allowed in memcached. If set to `0`, the item size is unlimited. - `dns_provider_update_interval`: the DNS discovery update interval. +- `auto_discovery`: whether to use the auto-discovery mechanism for memcached. ## Caching Bucket diff --git a/pkg/cacheutil/memcached_client.go b/pkg/cacheutil/memcached_client.go index cb5adcb0d03..e849ca03595 100644 --- a/pkg/cacheutil/memcached_client.go +++ b/pkg/cacheutil/memcached_client.go @@ -21,6 +21,7 @@ import ( "gopkg.in/yaml.v2" "github.com/thanos-io/thanos/pkg/discovery/dns" + memcacheDiscovery "github.com/thanos-io/thanos/pkg/discovery/memcache" "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/gate" "github.com/thanos-io/thanos/pkg/model" @@ -53,6 +54,7 @@ var ( MaxGetMultiConcurrency: 100, MaxGetMultiBatchSize: 0, DNSProviderUpdateInterval: 10 * time.Second, + AutoDiscovery: false, } ) @@ -114,6 +116,9 @@ type MemcachedClientConfig struct { // DNSProviderUpdateInterval specifies the DNS discovery update interval. DNSProviderUpdateInterval time.Duration `yaml:"dns_provider_update_interval"` + + // AutoDiscovery configures memached client to perform auto-discovery instead of DNS resolution + AutoDiscovery bool `yaml:"auto_discovery"` } func (c *MemcachedClientConfig) validate() error { @@ -153,8 +158,8 @@ type memcachedClient struct { // Name provides an identifier for the instantiated Client name string - // DNS provider used to keep the memcached servers list updated. - dnsProvider *dns.Provider + // Address provider used to keep the memcached servers list updated. + addressProvider AddressProvider // Channel used to notify internal goroutines when they should quit. stop chan struct{} @@ -177,6 +182,11 @@ type memcachedClient struct { dataSize *prometheus.HistogramVec } +type AddressProvider interface { + Resolve(context.Context, []string) error + Addresses() []string +} + type memcachedGetMultiResult struct { items map[string]*memcache.Item err error @@ -220,20 +230,30 @@ func newMemcachedClient( reg prometheus.Registerer, name string, ) (*memcachedClient, error) { - dnsProvider := dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_memcached_", reg), - dns.GolangResolverType, - ) + promRegisterer := extprom.WrapRegistererWithPrefix("thanos_memcached_", reg) + + var addressProvider AddressProvider + if config.AutoDiscovery { + addressProvider = memcacheDiscovery.NewProvider( + logger, + promRegisterer, + 2*time.Second) + } else { + addressProvider = dns.NewProvider( + logger, + extprom.WrapRegistererWithPrefix("thanos_memcached_", reg), + dns.GolangResolverType, + ) + } c := &memcachedClient{ - logger: log.With(logger, "name", name), - config: config, - client: client, - selector: selector, - dnsProvider: dnsProvider, - asyncQueue: make(chan func(), config.MaxAsyncBufferSize), - stop: make(chan struct{}, 1), + logger: log.With(logger, "name", name), + config: config, + client: client, + selector: selector, + addressProvider: addressProvider, + asyncQueue: make(chan func(), config.MaxAsyncBufferSize), + stop: make(chan struct{}, 1), getMultiGate: gate.New( extprom.WrapRegistererWithPrefix("thanos_memcached_getmulti_", reg), config.MaxGetMultiConcurrency, @@ -561,11 +581,11 @@ func (c *memcachedClient) resolveAddrs() error { defer cancel() // If some of the dns resolution fails, log the error. - if err := c.dnsProvider.Resolve(ctx, c.config.Addresses); err != nil { + if err := c.addressProvider.Resolve(ctx, c.config.Addresses); err != nil { level.Error(c.logger).Log("msg", "failed to resolve addresses for memcached", "addresses", strings.Join(c.config.Addresses, ","), "err", err) } // Fail in case no server address is resolved. - servers := c.dnsProvider.Addresses() + servers := c.addressProvider.Addresses() if len(servers) == 0 { return fmt.Errorf("no server address resolved for %s", c.name) } diff --git a/pkg/discovery/memcache/provider.go b/pkg/discovery/memcache/provider.go new file mode 100644 index 00000000000..f005c39148a --- /dev/null +++ b/pkg/discovery/memcache/provider.go @@ -0,0 +1,103 @@ +package memcache + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/thanos-io/thanos/pkg/errutil" + "github.com/thanos-io/thanos/pkg/extprom" +) + +type Provider struct { + sync.RWMutex + resolver Resolver + clusterConfigs map[string]*ClusterConfig + logger log.Logger + + configVersion *extprom.TxGaugeVec + resolvedAddresses *extprom.TxGaugeVec + resolverFailuresCount prometheus.Counter + resolverLookupsCount prometheus.Counter +} + +func NewProvider(logger log.Logger, reg prometheus.Registerer, dialTimeout time.Duration) *Provider { + p := &Provider{ + resolver: &memcachedAutoDiscovery{dialTimeout: dialTimeout}, + clusterConfigs: map[string]*ClusterConfig{}, + configVersion: extprom.NewTxGaugeVec(reg, prometheus.GaugeOpts{ + Name: "auto_discovery_config_version", + Help: "The current auto discovery config version", + }, []string{"addr"}), + resolvedAddresses: extprom.NewTxGaugeVec(reg, prometheus.GaugeOpts{ + Name: "auto_discovery_resolved_addresses", + Help: "The number of memcached nodes found via auto discovery", + }, []string{"addr"}), + resolverLookupsCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "auto_discovery_total", + Help: "The number of memcache auto discovery attempts", + }), + resolverFailuresCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "auto_discovery_failures_total", + Help: "The number of memcache auto discovery failures", + }), + } + return p +} + +func (p *Provider) Resolve(ctx context.Context, addresses []string) error { + clusterConfigs := map[string]*ClusterConfig{} + errs := errutil.MultiError{} + + for _, address := range addresses { + clusterConfig, err := p.resolver.Resolve(ctx, address) + p.resolverLookupsCount.Inc() + + if err != nil { + level.Warn(p.logger).Log( + "msg", "failed to perform auto-discovery for memcached", + "address", address, + ) + errs.Add(err) + p.resolverFailuresCount.Inc() + + // Use cached values. + p.RLock() + clusterConfigs[address] = p.clusterConfigs[address] + p.RUnlock() + } else { + clusterConfigs[address] = clusterConfig + } + } + + p.Lock() + defer p.Unlock() + + p.resolvedAddresses.ResetTx() + p.configVersion.ResetTx() + for address, config := range clusterConfigs { + p.resolvedAddresses.WithLabelValues(address).Set(float64(len(config.nodes))) + p.configVersion.WithLabelValues(address).Set(float64(config.version)) + } + p.resolvedAddresses.Submit() + p.configVersion.Submit() + + p.clusterConfigs = clusterConfigs + + return errs.Err() +} + +func (p *Provider) Addresses() []string { + var result []string + for _, config := range p.clusterConfigs { + for _, node := range config.nodes { + result = append(result, fmt.Sprintf("%s:%d", node.dns, node.port)) + } + } + return result +} diff --git a/pkg/discovery/memcache/provider_test.go b/pkg/discovery/memcache/provider_test.go new file mode 100644 index 00000000000..8032f68ee94 --- /dev/null +++ b/pkg/discovery/memcache/provider_test.go @@ -0,0 +1,84 @@ +package memcache + +import ( + "context" + "errors" + "sort" + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestProviderUpdatesAddresses(t *testing.T) { + ctx := context.TODO() + clusters := []string{"memcached-cluster-1", "memcached-cluster-2"} + provider := NewProvider(log.NewNopLogger(), nil, 5*time.Second) + resolver := mockResolver{ + configs: map[string]*ClusterConfig{ + "memcached-cluster-1": {nodes: []Node{{dns: "dns-1", ip: "ip-1", port: 11211}}}, + "memcached-cluster-2": {nodes: []Node{{dns: "dns-2", ip: "ip-2", port: 8080}}}, + }, + } + provider.resolver = &resolver + + err := provider.Resolve(ctx, clusters) + addresses := provider.Addresses() + sort.Strings(addresses) + + testutil.Ok(t, err) + testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses) + + resolver.configs = map[string]*ClusterConfig{ + "memcached-cluster-1": {nodes: []Node{{dns: "dns-1", ip: "ip-1", port: 11211}, {dns: "dns-3", ip: "ip-3", port: 11211}}}, + "memcached-cluster-2": {nodes: []Node{{dns: "dns-2", ip: "ip-2", port: 8080}}}, + } + err = provider.Resolve(ctx, clusters) + addresses = provider.Addresses() + sort.Strings(addresses) + + testutil.Ok(t, err) + testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080", "dns-3:11211"}, addresses) +} + +func TestProviderDoesNotUpdateAddressIfFailed(t *testing.T) { + ctx := context.TODO() + clusters := []string{"memcached-cluster-1", "memcached-cluster-2"} + provider := NewProvider(log.NewNopLogger(), nil, 5*time.Second) + resolver := mockResolver{ + configs: map[string]*ClusterConfig{ + "memcached-cluster-1": {nodes: []Node{{dns: "dns-1", ip: "ip-1", port: 11211}}}, + "memcached-cluster-2": {nodes: []Node{{dns: "dns-2", ip: "ip-2", port: 8080}}}, + }, + } + provider.resolver = &resolver + + err := provider.Resolve(ctx, clusters) + addresses := provider.Addresses() + sort.Strings(addresses) + + testutil.Ok(t, err) + testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses) + + resolver.configs = nil + resolver.err = errors.New("oops") + err = provider.Resolve(ctx, clusters) + addresses = provider.Addresses() + sort.Strings(addresses) + + testutil.NotOk(t, err) + testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses) +} + +type mockResolver struct { + configs map[string]*ClusterConfig + err error +} + +func (r *mockResolver) Resolve(_ context.Context, address string) (*ClusterConfig, error) { + if r.err != nil { + return nil, r.err + } + return r.configs[address], nil +} diff --git a/pkg/discovery/memcache/resolver.go b/pkg/discovery/memcache/resolver.go new file mode 100644 index 00000000000..0d31f6d3874 --- /dev/null +++ b/pkg/discovery/memcache/resolver.go @@ -0,0 +1,112 @@ +package memcache + +import ( + "bufio" + "context" + "fmt" + "net" + "strconv" + "strings" + "time" +) + +type ClusterConfig struct { + version int + nodes []Node +} + +type Node struct { + dns string + ip string + port int +} + +type Resolver interface { + // Resolve performs a DNS lookup and returns a list of records. + // name is the domain name to be resolved. + // qtype is the query type. Accepted values are `dns` for A/AAAA lookup and `dnssrv` for SRV lookup. + // If scheme is passed through name, it is preserved on IP results. + Resolve(ctx context.Context, address string) (*ClusterConfig, error) +} + +type memcachedAutoDiscovery struct { + dialTimeout time.Duration +} + +func (s *memcachedAutoDiscovery) Resolve(ctx context.Context, address string) (config *ClusterConfig, err error) { + conn, err := net.DialTimeout("tcp", address, s.dialTimeout) + if err != nil { + return nil, err + } + defer func() { + err = conn.Close() + }() + + rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + if _, err := fmt.Fprintf(rw, "config get cluster\n"); err != nil { + return nil, err + } + if err := rw.Flush(); err != nil { + return nil, err + } + + config, err = s.parseConfig(rw.Reader) + if err != nil { + return nil, err + } + + return config, err +} + +func (s *memcachedAutoDiscovery) parseConfig(reader *bufio.Reader) (*ClusterConfig, error) { + clusterConfig := new(ClusterConfig) + + configMeta, err := reader.ReadString('\n') + if err != nil { + return nil, fmt.Errorf("failed to read config metadata: %s", err) + } + configMeta = strings.TrimSpace(configMeta) + + // First line should be "CONFIG cluster 0 [length-of-payload-] + configMetaComponents := strings.Split(configMeta, " ") + if len(configMetaComponents) != 4 { + return nil, fmt.Errorf("expected 4 components in config metadata, and received %d, meta: %s", len(configMetaComponents), configMeta) + } + + configSize, err := strconv.Atoi(configMetaComponents[3]) + if err != nil { + return nil, fmt.Errorf("failed to parse config size from metadata: %s, error: %s", configMeta, err) + } + + configVersion, err := reader.ReadString('\n') + if err != nil { + return nil, fmt.Errorf("failed to find config version: %s", err) + } + clusterConfig.version, err = strconv.Atoi(strings.TrimSpace(configVersion)) + if err != nil { + return nil, fmt.Errorf("failed to parser config version: %s", err) + } + + nodes, err := reader.ReadString('\n') + if err != nil { + return nil, fmt.Errorf("failed to read nodes: %s", err) + } + + if len(configVersion)+len(nodes) != configSize { + return nil, fmt.Errorf("expected %d in config payload, but got %d instead.", configSize, len(configVersion)+len(nodes)) + } + + for _, host := range strings.Split(strings.TrimSpace(nodes), " ") { + dnsIpPort := strings.Split(host, "|") + if len(dnsIpPort) != 3 { + return nil, fmt.Errorf("node not in expected format: %s", dnsIpPort) + } + port, err := strconv.Atoi(dnsIpPort[2]) + if err != nil { + return nil, fmt.Errorf("failed to parse port: %s, err: %s", dnsIpPort, err) + } + clusterConfig.nodes = append(clusterConfig.nodes, Node{dns: dnsIpPort[0], ip: dnsIpPort[1], port: port}) + } + + return clusterConfig, nil +} diff --git a/pkg/discovery/memcache/resolver_test.go b/pkg/discovery/memcache/resolver_test.go new file mode 100644 index 00000000000..00eaa2362db --- /dev/null +++ b/pkg/discovery/memcache/resolver_test.go @@ -0,0 +1,79 @@ +package memcache + +import ( + "bufio" + "errors" + "strings" + "testing" + + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestGoodClusterConfigs(t *testing.T) { + resolver := memcachedAutoDiscovery{} + testCases := []struct { + content string + config ClusterConfig + }{ + {"CONFIG cluster 0 23\r\n100\r\ndns-1|ip-1|11211\r\nEND\r\n", + ClusterConfig{nodes: []Node{{dns: "dns-1", ip: "ip-1", port: 11211}}, version: 100}, + }, + {"CONFIG cluster 0 37\r\n0\r\ndns-1|ip-1|11211 dns-2|ip-2|8080\r\nEND\r\n", + ClusterConfig{nodes: []Node{{dns: "dns-1", ip: "ip-1", port: 11211}, {dns: "dns-2", ip: "ip-2", port: 8080}}, version: 0}, + }, + } + + for _, testCase := range testCases { + reader := bufio.NewReader(strings.NewReader(testCase.content)) + + config, err := resolver.parseConfig(reader) + + testutil.Ok(t, err) + testutil.Equals(t, testCase.config, *config) + } +} + +func TestBadClusterConfigs(t *testing.T) { + resolver := memcachedAutoDiscovery{} + testCases := []struct { + content string + expectedErr error + }{ + {"", + errors.New("failed to read config metadata: EOF"), + }, + {"CONFIG cluster\r\n", + errors.New("expected 4 components in config metadata, and received 2, meta: CONFIG cluster"), + }, + {"CONFIG cluster 0 configSize\r\n", + errors.New("failed to parse config size from metadata: CONFIG cluster 0 configSize, error: strconv.Atoi: parsing \"configSize\": invalid syntax"), + }, + {"CONFIG cluster 0 100\r\n", + errors.New("failed to find config version: EOF"), + }, + {"CONFIG cluster 0 100\r\nconfigVersion\r\n", + errors.New("failed to parser config version: strconv.Atoi: parsing \"configVersion\": invalid syntax"), + }, + {"CONFIG cluster 0 100\r\n0\r\n", + errors.New("failed to read nodes: EOF"), + }, + {"CONFIG cluster 0 0\r\n100\r\ndns-1|ip-1|11211\r\nEND\r\n", + errors.New("expected 0 in config payload, but got 23 instead."), + }, + {"CONFIG cluster 0 17\r\n100\r\ndns-1|ip-1\r\nEND\r\n", + errors.New("node not in expected format: [dns-1 ip-1]"), + }, + {"CONFIG cluster 0 22\r\n100\r\ndns-1|ip-1|port\r\nEND\r\n", + errors.New("failed to parse port: [dns-1 ip-1 port], err: strconv.Atoi: parsing \"port\": invalid syntax"), + }, + } + + for _, testCase := range testCases { + reader := bufio.NewReader(strings.NewReader(testCase.content)) + + _, err := resolver.parseConfig(reader) + + testutil.Assert(t, testCase.expectedErr.Error() == err.Error(), "expected error '%v', but got '%v'", testCase.expectedErr.Error(), err.Error()) + } + +}