Skip to content

Commit

Permalink
Re-use dns.Provider for resolving Alertmanager addresses
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Pasquier <[email protected]>
  • Loading branch information
simonpasquier committed Dec 17, 2019
1 parent 65f00de commit 07f711b
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 147 deletions.
17 changes: 10 additions & 7 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,11 +317,19 @@ func runRule(
alertingCfg.Alertmanagers = append(alertingCfg.Alertmanagers, cfg)
}
}

if len(alertingCfg.Alertmanagers) == 0 {
level.Warn(logger).Log("msg", "no alertmanager configured")
}

amProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_ruler_alertmanagers_", reg),
dns.ResolverType(dnsSDResolver),
)
for _, cfg := range alertingCfg.Alertmanagers {
am, err := alert.NewAlertmanager(logger, cfg)
// Each Alertmanager client needs its own DNS provider.
am, err := alert.NewAlertmanager(logger, cfg, amProvider.Clone())
if err != nil {
return err
}
Expand Down Expand Up @@ -394,8 +402,6 @@ func runRule(
}
// Discover and resolve Alertmanager addresses.
{
resolver := dns.NewResolver(dns.ResolverType(dnsSDResolver).ToResolver(logger))

for i := range alertmgrs {
am := alertmgrs[i]
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -408,10 +414,7 @@ func runRule(

g.Add(func() error {
return runutil.Repeat(alertmgrsDNSSDInterval, ctx.Done(), func() error {
if err := am.Update(ctx, resolver); err != nil {
level.Error(logger).Log("msg", "refreshing alertmanagers failed", "err", err)
alertMngrAddrResolutionErrors.Inc()
}
am.Resolve(ctx)
return nil
})
}, func(error) {
Expand Down
59 changes: 19 additions & 40 deletions pkg/alert/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -161,6 +160,11 @@ func (c *AlertmanagerConfig) UnmarshalYAML(unmarshal func(interface{}) error) er
return unmarshal((*plain)(c))
}

type AddressProvider interface {
Resolve(context.Context, []string)
Addresses() []string
}

// Alertmanager represents an HTTP client that can send alerts to a cluster of Alertmanager endpoints.
type Alertmanager struct {
logger log.Logger
Expand All @@ -174,12 +178,11 @@ type Alertmanager struct {
fileSDCache *cache.Cache
fileDiscoverers []*file.Discovery

mtx sync.RWMutex
resolved []string
provider AddressProvider
}

// NewAlertmanager returns a new Alertmanager client.
func NewAlertmanager(logger log.Logger, cfg AlertmanagerConfig) (*Alertmanager, error) {
func NewAlertmanager(logger log.Logger, cfg AlertmanagerConfig, provider AddressProvider) (*Alertmanager, error) {
if logger == nil {
logger = log.NewNopLogger()
}
Expand Down Expand Up @@ -210,6 +213,7 @@ func NewAlertmanager(logger log.Logger, cfg AlertmanagerConfig) (*Alertmanager,
staticAddresses: cfg.StaticAddresses,
fileSDCache: cache.New(),
fileDiscoverers: discoverers,
provider: provider,
}, nil
}

Expand Down Expand Up @@ -237,6 +241,12 @@ func BuildAlertmanagerConfig(logger log.Logger, address string, timeout time.Dur
// Scheme is of the form "<dns type>+<http scheme>".
scheme = strings.TrimPrefix(scheme, prefix)
host = prefix + parsed.Host
if qType == dns.A {
if _, _, err := net.SplitHostPort(parsed.Host); err != nil {
// The host port could be missing. Append the defaultAlertmanagerPort.
host = host + ":" + strconv.Itoa(defaultAlertmanagerPort)
}
}
break
}
}
Expand All @@ -260,10 +270,8 @@ func BuildAlertmanagerConfig(logger log.Logger, address string, timeout time.Dur

// Endpoints returns the list of known Alertmanager endpoints.
func (a *Alertmanager) Endpoints() []*url.URL {
a.mtx.RLock()
defer a.mtx.RUnlock()
var urls []*url.URL
for _, addr := range a.resolved {
for _, addr := range a.provider.Addresses() {
urls = append(urls,
&url.URL{
Scheme: a.scheme,
Expand All @@ -275,7 +283,7 @@ func (a *Alertmanager) Endpoints() []*url.URL {
return urls
}

// Post sends a POST request to the given URL.
// Do sends a POST request to the given URL.
func (a *Alertmanager) Do(ctx context.Context, u *url.URL, r io.Reader) error {
req, err := http.NewRequest("POST", u.String(), r)
if err != nil {
Expand Down Expand Up @@ -329,36 +337,7 @@ func (a *Alertmanager) Discover(ctx context.Context) {
wg.Wait()
}

// Update refreshes and resolves the list of targets.
func (a *Alertmanager) Update(ctx context.Context, resolver dns.Resolver) error {
var resolved []string
for _, addr := range append(a.fileSDCache.Addresses(), a.staticAddresses...) {
level.Debug(a.logger).Log("msg", "resolving address", "addr", addr)
qtypeAndName := strings.SplitN(addr, "+", 2)
if len(qtypeAndName) != 2 {
level.Debug(a.logger).Log("msg", "no lookup needed", "addr", addr)
resolved = append(resolved, addr)
continue
}
qtype, name := dns.QType(qtypeAndName[0]), qtypeAndName[1]

// Get only the host and resolve it if needed.
host := name
if qtype == dns.A {
if _, _, err := net.SplitHostPort(host); err != nil {
// The host port could be missing. Append the defaultAlertmanagerPort.
host = host + ":" + strconv.Itoa(defaultAlertmanagerPort)
}
}
addrs, err := resolver.Resolve(ctx, host, qtype)
if err != nil {
return errors.Wrap(err, "failed to resolve alertmanager address")
}
level.Debug(a.logger).Log("msg", "address resolved", "addr", addr, "resolved", strings.Join(addrs, ","))
resolved = append(resolved, addrs...)
}
a.mtx.Lock()
a.resolved = resolved
a.mtx.Unlock()
return nil
// Resolve refreshes and resolves the list of Alertmanager targets.
func (a *Alertmanager) Resolve(ctx context.Context) {
a.provider.Resolve(ctx, append(a.fileSDCache.Addresses(), a.staticAddresses...))
}
101 changes: 1 addition & 100 deletions pkg/alert/client_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
package alert

import (
"context"
"strings"
"testing"
"time"

"github.com/pkg/errors"

"github.com/thanos-io/thanos/pkg/discovery/dns"
"github.com/thanos-io/thanos/pkg/testutil"
)

Expand Down Expand Up @@ -57,7 +52,7 @@ func TestBuildAlertmanagerConfiguration(t *testing.T) {
{
address: "dns+https://localhost/path/prefix/",
expected: AlertmanagerConfig{
StaticAddresses: []string{"dns+localhost"},
StaticAddresses: []string{"dns+localhost:9093"},
Scheme: "https",
PathPrefix: "/path/prefix/",
},
Expand Down Expand Up @@ -91,97 +86,3 @@ func TestBuildAlertmanagerConfiguration(t *testing.T) {
})
}
}

type mockEntry struct {
name string
qtype dns.QType
}

type mockResolver struct {
entries map[mockEntry][]string
err error
}

func (m mockResolver) Resolve(ctx context.Context, name string, qtype dns.QType) ([]string, error) {
if m.err != nil {
return nil, m.err
}
if res, ok := m.entries[mockEntry{name: name, qtype: qtype}]; ok {
return res, nil
}
return nil, errors.Errorf("mockResolver not found response for name: %s", name)
}

func TestUpdate(t *testing.T) {
for _, tc := range []struct {
cfg AlertmanagerConfig
resolver mockResolver

resolved []string
err bool
}{
{
cfg: AlertmanagerConfig{
StaticAddresses: []string{"dns+alertmanager.example.com:9095"},
},
resolver: mockResolver{
entries: map[mockEntry][]string{
mockEntry{name: "alertmanager.example.com:9095", qtype: dns.A}: []string{"1.1.1.1:9095", "2.2.2.2:9095"},
},
},
resolved: []string{"1.1.1.1:9095", "2.2.2.2:9095"},
},
{
cfg: AlertmanagerConfig{
StaticAddresses: []string{"dns+alertmanager.example.com"},
},
resolver: mockResolver{
entries: map[mockEntry][]string{
mockEntry{name: "alertmanager.example.com:9093", qtype: dns.A}: []string{"1.1.1.1:9093", "2.2.2.2:9093"},
},
},
resolved: []string{"1.1.1.1:9093", "2.2.2.2:9093"},
},
{
cfg: AlertmanagerConfig{
StaticAddresses: []string{"alertmanager.example.com:9096"},
},
resolved: []string{"alertmanager.example.com:9096"},
},
{
cfg: AlertmanagerConfig{
StaticAddresses: []string{"dnssrv+_web._tcp.alertmanager.example.com"},
},
resolver: mockResolver{
entries: map[mockEntry][]string{
mockEntry{name: "_web._tcp.alertmanager.example.com", qtype: dns.SRV}: []string{"1.1.1.1:9097", "2.2.2.2:9097"},
},
},
resolved: []string{"1.1.1.1:9097", "2.2.2.2:9097"},
},
{
cfg: AlertmanagerConfig{
StaticAddresses: []string{"dnssrv+_web._tcp.notfound.example.com"},
},
resolver: mockResolver{
entries: map[mockEntry][]string{},
},
err: true,
},
} {
t.Run(strings.Join(tc.cfg.StaticAddresses, ","), func(t *testing.T) {
am, err := NewAlertmanager(nil, tc.cfg)
testutil.Ok(t, err)
ctx := context.Background()
err = am.Update(ctx, &tc.resolver)
if tc.err {
t.Logf("%v", err)
testutil.NotOk(t, err)
return
}

testutil.Equals(t, tc.resolved, am.resolved)
})
}

}
12 changes: 12 additions & 0 deletions pkg/discovery/dns/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,18 @@ func NewProvider(logger log.Logger, reg prometheus.Registerer, resolverType Reso
return p
}

// Clone returns a new provider from an existing one.
func (p *Provider) Clone() *Provider {
return &Provider{
resolver: p.resolver,
resolved: make(map[string][]string),
logger: p.logger,
resolverAddrs: p.resolverAddrs,
resolverLookupsCount: p.resolverLookupsCount,
resolverFailuresCount: p.resolverFailuresCount,
}
}

// Resolve stores a list of provided addresses or their DNS records if requested.
// Addresses prefixed with `dns+` or `dnssrv+` will be resolved through respective DNS lookup (A/AAAA or SRV).
// defaultPort is used for non-SRV records when a port is not supplied.
Expand Down

0 comments on commit 07f711b

Please sign in to comment.