diff --git a/CHANGELOG.md b/CHANGELOG.md index c3249140ed..b4634cb1dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,8 @@ We use *breaking* word for marking changes that are not backward compatible (rel ### Added - [#1852](https://github.com/thanos-io/thanos/pull/1852) Add support for `AWS_CONTAINER_CREDENTIALS_FULL_URI` by upgrading to minio-go v6.0.44 - [#1854](https://github.com/thanos-io/thanos/pull/1854) Update Rule UI to support alerts count displaying and filtering. +- [#1838](https://github.com/thanos-io/thanos/pull/1838) Ruler: Add TLS and authentication support for Alertmanager with the `--alertmanagers.config` and `--alertmanagers.config-file` CLI flags. See [documentation](docs/components/rule.md/#configuration) for further information. +- [#1838](https://github.com/thanos-io/thanos/pull/1838) Ruler: Add a new `--alertmanagers.sd-dns-interval` CLI option to specify the interval between DNS resolutions of Alertmanager hosts. ## [v0.9.0](https://github.com/thanos-io/thanos/releases/tag/v0.9.0) - 2019.12.03 diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 115be52ff8..9e3af8c8d4 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "math/rand" - "net" "net/http" "net/url" "os" @@ -13,7 +12,6 @@ import ( "path/filepath" "strconv" "strings" - "sync" "syscall" "time" @@ -83,8 +81,10 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application) { alertmgrs := cmd.Flag("alertmanagers.url", "Alertmanager replica URLs to push firing alerts. Ruler claims success if push to at least one alertmanager from discovered succeeds. The scheme should not be empty e.g `http` might be used. The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect Alertmanager IPs through respective DNS lookups. The port defaults to 9093 or the SRV record's value. The URL path is used as a prefix for the regular Alertmanager API path."). Strings() - - alertmgrsTimeout := cmd.Flag("alertmanagers.send-timeout", "Timeout for sending alerts to alertmanager").Default("10s").Duration() + alertmgrsTimeout := cmd.Flag("alertmanagers.send-timeout", "Timeout for sending alerts to Alertmanager").Default("10s").Duration() + alertmgrsConfig := extflag.RegisterPathOrContent(cmd, "alertmanagers.config", "YAML file that contains alerting configuration. See format details: https://thanos.io/components/rule.md/#configuration. If defined, it takes precedence over the '--alertmanagers.url' and '--alertmanagers.send-timeout' flags.", false) + alertmgrsDNSSDInterval := modelDuration(cmd.Flag("alertmanagers.sd-dns-interval", "Interval between DNS resolutions of Alertmanager hosts."). + Default("30s")) alertQueryURL := cmd.Flag("alert.query-url", "The external Thanos Query URL that would be set in all alerts 'Source' field").String() @@ -157,6 +157,8 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application) { lset, *alertmgrs, *alertmgrsTimeout, + alertmgrsConfig, + time.Duration(*alertmgrsDNSSDInterval), *grpcBindAddr, time.Duration(*grpcGracePeriod), *grpcCert, @@ -194,6 +196,8 @@ func runRule( lset labels.Labels, alertmgrURLs []string, alertmgrsTimeout time.Duration, + alertmgrsConfig *extflag.PathOrContent, + alertmgrsDNSSDInterval time.Duration, grpcBindAddr string, grpcGracePeriod time.Duration, grpcCert string, @@ -286,11 +290,56 @@ func runRule( dns.ResolverType(dnsSDResolver), ) + // Build the Alertmanager clients. + alertmgrsConfigYAML, err := alertmgrsConfig.Content() + if err != nil { + return err + } + var ( + alertingCfg alert.AlertingConfig + alertmgrs []*alert.Alertmanager + ) + if len(alertmgrsConfigYAML) > 0 { + if len(alertmgrURLs) != 0 { + return errors.New("--alertmanagers.url and --alertmanagers.config* flags cannot be defined at the same time") + } + alertingCfg, err = alert.LoadAlertingConfig(alertmgrsConfigYAML) + if err != nil { + return err + } + } else { + // Build the Alertmanager configuration from the legacy flags. + for _, addr := range alertmgrURLs { + cfg, err := alert.BuildAlertmanagerConfig(logger, addr, alertmgrsTimeout) + if err != nil { + return err + } + alertingCfg.Alertmanagers = append(alertingCfg.Alertmanagers, cfg) + } + } + + if len(alertingCfg.Alertmanagers) == 0 { + level.Warn(logger).Log("msg", "no alertmanager configured") + } + + amProvider := dns.NewProvider( + logger, + extprom.WrapRegistererWithPrefix("thanos_ruler_alertmanagers_", reg), + dns.ResolverType(dnsSDResolver), + ) + for _, cfg := range alertingCfg.Alertmanagers { + // Each Alertmanager client has a different list of targets thus each needs its own DNS provider. + am, err := alert.NewAlertmanager(logger, cfg, amProvider.Clone()) + if err != nil { + return err + } + alertmgrs = append(alertmgrs, am) + } + // Run rule evaluation and alert notifications. var ( - alertmgrs = newAlertmanagerSet(logger, alertmgrURLs, dns.ResolverType(dnsSDResolver)) - alertQ = alert.NewQueue(logger, reg, 10000, 100, labelsTSDBToProm(lset), alertExcludeLabels) - ruleMgr = thanosrule.NewManager(dataDir) + alertQ = alert.NewQueue(logger, reg, 10000, 100, labelsTSDBToProm(lset), alertExcludeLabels) + ruleMgr = thanosrule.NewManager(dataDir) ) { notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) { @@ -351,9 +400,35 @@ func runRule( }) } } + // Discover and resolve Alertmanager addresses. + { + for i := range alertmgrs { + am := alertmgrs[i] + ctx, cancel := context.WithCancel(context.Background()) + g.Add(func() error { + am.Discover(ctx) + return nil + }, func(error) { + cancel() + }) + + g.Add(func() error { + return runutil.Repeat(alertmgrsDNSSDInterval, ctx.Done(), func() error { + am.Resolve(ctx) + return nil + }) + }, func(error) { + cancel() + }) + } + } + // Run the alert sender. { - // TODO(bwplotka): https://github.com/thanos-io/thanos/issues/660. - sdr := alert.NewSender(logger, reg, alertmgrs.get, nil, alertmgrsTimeout) + clients := make([]alert.AlertmanagerClient, len(alertmgrs)) + for i := range alertmgrs { + clients[i] = alertmgrs[i] + } + sdr := alert.NewSender(logger, reg, clients) ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { @@ -370,21 +445,6 @@ func runRule( cancel() }) } - { - ctx, cancel := context.WithCancel(context.Background()) - - g.Add(func() error { - return runutil.Repeat(30*time.Second, ctx.Done(), func() error { - if err := alertmgrs.update(ctx); err != nil { - level.Error(logger).Log("msg", "refreshing alertmanagers failed", "err", err) - alertMngrAddrResolutionErrors.Inc() - } - return nil - }) - }, func(error) { - cancel() - }) - } // Run File Service Discovery and update the query addresses when the files are modified. if fileSD != nil { var fileSDUpdates chan []*targetgroup.Group @@ -615,90 +675,6 @@ func runRule( return nil } -type alertmanagerSet struct { - resolver dns.Resolver - addrs []string - mtx sync.Mutex - current []*url.URL -} - -func newAlertmanagerSet(logger log.Logger, addrs []string, dnsSDResolver dns.ResolverType) *alertmanagerSet { - return &alertmanagerSet{ - resolver: dns.NewResolver(dnsSDResolver.ToResolver(logger)), - addrs: addrs, - } -} - -func (s *alertmanagerSet) get() []*url.URL { - s.mtx.Lock() - defer s.mtx.Unlock() - return s.current -} - -const defaultAlertmanagerPort = 9093 - -func parseAlertmanagerAddress(addr string) (qType dns.QType, parsedUrl *url.URL, err error) { - qType = "" - parsedUrl, err = url.Parse(addr) - if err != nil { - return qType, nil, err - } - // The Scheme might contain DNS resolver type separated by + so we split it a part. - if schemeParts := strings.Split(parsedUrl.Scheme, "+"); len(schemeParts) > 1 { - parsedUrl.Scheme = schemeParts[len(schemeParts)-1] - qType = dns.QType(strings.Join(schemeParts[:len(schemeParts)-1], "+")) - } - return qType, parsedUrl, err -} - -func (s *alertmanagerSet) update(ctx context.Context) error { - var result []*url.URL - for _, addr := range s.addrs { - var ( - qtype dns.QType - resolvedDomain []string - ) - - qtype, u, err := parseAlertmanagerAddress(addr) - if err != nil { - return errors.Wrapf(err, "parse URL %q", addr) - } - - // Get only the host and resolve it if needed. - host := u.Host - if qtype != "" { - if qtype == dns.A { - _, _, err = net.SplitHostPort(host) - if err != nil { - // The host could be missing a port. Append the defaultAlertmanagerPort. - host = host + ":" + strconv.Itoa(defaultAlertmanagerPort) - } - } - resolvedDomain, err = s.resolver.Resolve(ctx, host, qtype) - if err != nil { - return errors.Wrap(err, "alertmanager resolve") - } - } else { - resolvedDomain = []string{host} - } - - for _, resolved := range resolvedDomain { - result = append(result, &url.URL{ - Scheme: u.Scheme, - Host: resolved, - Path: u.Path, - User: u.User, - }) - } - } - - s.mtx.Lock() - s.current = result - s.mtx.Unlock() - - return nil -} - func parseFlagLabels(s []string) (labels.Labels, error) { var lset labels.Labels for _, l := range s { diff --git a/cmd/thanos/rule_test.go b/cmd/thanos/rule_test.go index f4d801d747..2abd38cf6a 100644 --- a/cmd/thanos/rule_test.go +++ b/cmd/thanos/rule_test.go @@ -1,12 +1,8 @@ package main import ( - "context" - "net/url" "testing" - "github.com/pkg/errors" - "github.com/thanos-io/thanos/pkg/discovery/dns" "github.com/thanos-io/thanos/pkg/testutil" ) @@ -49,97 +45,3 @@ func Test_parseFlagLabels(t *testing.T) { testutil.Equals(t, err != nil, td.expectErr) } } - -func TestRule_AlertmanagerResolveWithoutPort(t *testing.T) { - mockResolver := mockResolver{ - resultIPs: map[string][]string{ - "alertmanager.com:9093": {"1.1.1.1:9300"}, - }, - } - am := alertmanagerSet{resolver: mockResolver, addrs: []string{"dns+http://alertmanager.com"}} - - ctx := context.TODO() - err := am.update(ctx) - testutil.Ok(t, err) - - expected := []*url.URL{ - { - Scheme: "http", - Host: "1.1.1.1:9300", - }, - } - gotURLs := am.get() - testutil.Equals(t, expected, gotURLs) -} - -func TestRule_AlertmanagerResolveWithPort(t *testing.T) { - mockResolver := mockResolver{ - resultIPs: map[string][]string{ - "alertmanager.com:19093": {"1.1.1.1:9300"}, - }, - } - am := alertmanagerSet{resolver: mockResolver, addrs: []string{"dns+http://alertmanager.com:19093"}} - - ctx := context.TODO() - err := am.update(ctx) - testutil.Ok(t, err) - - expected := []*url.URL{ - { - Scheme: "http", - Host: "1.1.1.1:9300", - }, - } - gotURLs := am.get() - testutil.Equals(t, expected, gotURLs) -} - -type mockResolver struct { - resultIPs map[string][]string - err error -} - -func (m mockResolver) Resolve(ctx context.Context, name string, qtype dns.QType) ([]string, error) { - if m.err != nil { - return nil, m.err - } - if res, ok := m.resultIPs[name]; ok { - return res, nil - } - return nil, errors.Errorf("mockResolver not found response for name: %s", name) -} - -func Test_ParseAlertmanagerAddress(t *testing.T) { - var tData = []struct { - address string - expectQueryType dns.QType - expectUrl *url.URL - expectError error - }{ - { - address: "http://user:pass+word@foo.bar:3289", - expectQueryType: dns.QType(""), - expectUrl: &url.URL{Host: "foo.bar:3289", Scheme: "http", User: url.UserPassword("user", "pass+word")}, - expectError: nil, - }, - { - address: "dnssrvnoa+http://user:pass+word@foo.bar:3289", - expectQueryType: dns.QType("dnssrvnoa"), - expectUrl: &url.URL{Host: "foo.bar:3289", Scheme: "http", User: url.UserPassword("user", "pass+word")}, - expectError: nil, - }, - { - address: "foo+bar+http://foo.bar:3289", - expectQueryType: dns.QType("foo+bar"), - expectUrl: &url.URL{Host: "foo.bar:3289", Scheme: "http"}, - expectError: nil, - }, - } - - for _, d := range tData { - q, u, e := parseAlertmanagerAddress(d.address) - testutil.Equals(t, d.expectError, e) - testutil.Equals(t, d.expectUrl, u) - testutil.Equals(t, d.expectQueryType, q) - } -} diff --git a/docs/components/rule.md b/docs/components/rule.md index c82d82b8e7..7f8807a906 100644 --- a/docs/components/rule.md +++ b/docs/components/rule.md @@ -209,7 +209,26 @@ Flags: record's value. The URL path is used as a prefix for the regular Alertmanager API path. --alertmanagers.send-timeout=10s - Timeout for sending alerts to alertmanager + Timeout for sending alerts to Alertmanager + --alertmanagers.config-file= + Path to YAML file that contains alerting + configuration. See format details: + https://thanos.io/components/rule.md/#configuration. + If defined, it takes precedence over the + '--alertmanagers.url' and + '--alertmanagers.send-timeout' flags. + --alertmanagers.config= + Alternative to 'alertmanagers.config-file' flag + (lower priority). Content of YAML file that + contains alerting configuration. See format + details: + https://thanos.io/components/rule.md/#configuration. + If defined, it takes precedence over the + '--alertmanagers.url' and + '--alertmanagers.send-timeout' flags. + --alertmanagers.sd-dns-interval=30s + Interval between DNS resolutions of + Alertmanager hosts. --alert.query-url=ALERT.QUERY-URL The external Thanos Query URL that would be set in all alerts 'Source' field @@ -266,3 +285,37 @@ Flags: Interval between DNS resolutions. ``` + +## Configuration + +### Alertmanager + +The `--alertmanagers.config` and `--alertmanagers.config-file` flags allow specifying multiple Alertmanagers. Those entries are treated as a single HA group. This means that alert send failure is claimed only if the Ruler fails to send to all instances. + +The configuration format is the following: + +[embedmd]:# (../flags/config_rule_alerting.txt yaml) +```yaml +alertmanagers: +- http_config: + basic_auth: + username: "" + password: "" + password_file: "" + bearer_token: "" + bearer_token_file: "" + proxy_url: "" + tls_config: + ca_file: "" + cert_file: "" + key_file: "" + server_name: "" + insecure_skip_verify: false + static_configs: [] + file_sd_configs: + - files: [] + refresh_interval: 0s + scheme: http + path_prefix: "" + timeout: 10s +``` diff --git a/docs/service-discovery.md b/docs/service-discovery.md index cdb44e8493..34cf0914cd 100644 --- a/docs/service-discovery.md +++ b/docs/service-discovery.md @@ -13,7 +13,7 @@ SD is currently used in the following places within Thanos: * `Thanos Query` needs to know about [StoreAPI](https://github.com/thanos-io/thanos/blob/d3fb337da94d11c78151504b1fccb1d7e036f394/pkg/store/storepb/rpc.proto#L14) servers in order to query metrics from them. * `Thanos Rule` needs to know about `QueryAPI` servers in order to evaluate recording and alerting rules. -* `Thanos Rule` needs to know about `Alertmanagers` HA replicas in order to send alerts; only static option with DNS discovery. +* `Thanos Rule` needs to know about `Alertmanagers` HA replicas in order to send alerts. There are currently several ways to configure SD, described below in more detail: @@ -33,7 +33,7 @@ The repeatable flag `--store=` can be used to specify a `StoreAPI` that ` The repeatable flag `--query=` can be used to specify a `QueryAPI` that `Thanos Rule` should use. -The repeatable flag `--alertmanager.url=` can be used to specify a `Alertmanager API` that `Thanos Rule` should use. +`Thanos Rule` also supports the configuration of Alertmanager endpoints using YAML with the `--alertmanagers.config=` and `--alertmanagers.config-file=` flags in the `StaticAddress` section. ## File Service Discovery @@ -77,6 +77,8 @@ Again, the `` can be a glob pattern. The flag `--query.sd-interval=<5m>` can be used to change the fallback re-read interval. +`Thanos Rule` also supports the configuration of Alertmanager endpoints using YAML with the `--alertmanagers.config=` and `--alertmanagers.config-file=` flags in the `FileSDfiles` section.. + ## DNS Service Discovery DNS Service Discovery is another mechanism for finding components that can be used in conjunction with Static Flags or File SD. diff --git a/pkg/alert/alert.go b/pkg/alert/alert.go index 6af16674e8..2a9e2e8ab8 100644 --- a/pkg/alert/alert.go +++ b/pkg/alert/alert.go @@ -6,30 +6,19 @@ import ( "context" "encoding/json" "fmt" - "net/http" + "io" "net/url" - "path" "sync" "sync/atomic" "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/version" - "github.com/prometheus/prometheus/pkg/labels" - - "github.com/thanos-io/thanos/pkg/runutil" -) -const ( - alertPushEndpoint = "/api/v1/alerts" - contentTypeJSON = "application/json" + "github.com/prometheus/prometheus/pkg/labels" ) -var userAgent = fmt.Sprintf("Thanos/%s", version.Version) - // Alert is a generic representation of an alert in the Prometheus eco-system. type Alert struct { // Label value pairs for purpose of aggregation, matching, and disposition @@ -251,12 +240,15 @@ func (q *Queue) Push(alerts []*Alert) { } } +type AlertmanagerClient interface { + Endpoints() []*url.URL + Do(context.Context, *url.URL, io.Reader) error +} + // Sender sends notifications to a dynamic set of alertmanagers. type Sender struct { logger log.Logger - alertmanagers func() []*url.URL - doReq func(req *http.Request) (*http.Response, error) - timeout time.Duration + alertmanagers []AlertmanagerClient sent *prometheus.CounterVec errs *prometheus.CounterVec @@ -269,21 +261,14 @@ type Sender struct { func NewSender( logger log.Logger, reg prometheus.Registerer, - alertmanagers func() []*url.URL, - doReq func(req *http.Request) (*http.Response, error), - timeout time.Duration, + alertmanagers []AlertmanagerClient, ) *Sender { - if doReq == nil { - doReq = http.DefaultClient.Do - } if logger == nil { logger = log.NewNopLogger() } s := &Sender{ logger: logger, alertmanagers: alertmanagers, - doReq: doReq, - timeout: timeout, sent: prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "thanos_alert_sender_alerts_sent_total", @@ -311,7 +296,7 @@ func NewSender( return s } -// Send an alert batch to all given Alertmanager URLs. +// Send an alert batch to all given Alertmanager clients. // TODO(bwplotka): https://github.com/thanos-io/thanos/issues/660. func (s *Sender) Send(ctx context.Context, alerts []*Alert) { if len(alerts) == 0 { @@ -327,33 +312,30 @@ func (s *Sender) Send(ctx context.Context, alerts []*Alert) { wg sync.WaitGroup numSuccess uint64 ) - amrs := s.alertmanagers() - for _, u := range amrs { - amURL := *u - sendCtx, cancel := context.WithTimeout(ctx, s.timeout) - - wg.Add(1) - go func() { - defer wg.Done() - defer cancel() - - start := time.Now() - amURL.Path = path.Join(amURL.Path, alertPushEndpoint) - - if err := s.sendOne(sendCtx, amURL.String(), b); err != nil { - level.Warn(s.logger).Log( - "msg", "sending alerts failed", - "alertmanager", amURL.Host, - "numAlerts", len(alerts), - "err", err) - s.errs.WithLabelValues(amURL.Host).Inc() - return - } - s.latency.WithLabelValues(amURL.Host).Observe(time.Since(start).Seconds()) - s.sent.WithLabelValues(amURL.Host).Add(float64(len(alerts))) - - atomic.AddUint64(&numSuccess, 1) - }() + for _, amc := range s.alertmanagers { + for _, u := range amc.Endpoints() { + wg.Add(1) + go func(amc AlertmanagerClient, u *url.URL) { + defer wg.Done() + + level.Debug(s.logger).Log("msg", "sending alerts", "alertmanager", u.Host, "numAlerts", len(alerts)) + start := time.Now() + if err := amc.Do(ctx, u, bytes.NewReader(b)); err != nil { + level.Warn(s.logger).Log( + "msg", "sending alerts failed", + "alertmanager", u.Host, + "numAlerts", len(alerts), + "err", err, + ) + s.errs.WithLabelValues(u.Host).Inc() + return + } + s.latency.WithLabelValues(u.Host).Observe(time.Since(start).Seconds()) + s.sent.WithLabelValues(u.Host).Add(float64(len(alerts))) + + atomic.AddUint64(&numSuccess, 1) + }(amc, u) + } } wg.Wait() @@ -362,26 +344,5 @@ func (s *Sender) Send(ctx context.Context, alerts []*Alert) { } s.dropped.Add(float64(len(alerts))) - level.Warn(s.logger).Log("msg", "failed to send alerts to all alertmanagers", "alertmanagers", amrs, "alerts", string(b)) -} - -func (s *Sender) sendOne(ctx context.Context, url string, b []byte) error { - req, err := http.NewRequest("POST", url, bytes.NewReader(b)) - if err != nil { - return err - } - req = req.WithContext(ctx) - req.Header.Set("Content-Type", contentTypeJSON) - req.Header.Set("User-Agent", userAgent) - - resp, err := s.doReq(req) - if err != nil { - return errors.Wrapf(err, "send request to %q", url) - } - defer runutil.ExhaustCloseWithLogOnErr(s.logger, resp.Body, "send one alert") - - if resp.StatusCode/100 != 2 { - return errors.Errorf("bad response status %v from %q", resp.Status, url) - } - return nil + level.Warn(s.logger).Log("msg", "failed to send alerts to all alertmanagers", "alerts", string(b)) } diff --git a/pkg/alert/alert_test.go b/pkg/alert/alert_test.go index c2fcc8f91a..7e1e851d80 100644 --- a/pkg/alert/alert_test.go +++ b/pkg/alert/alert_test.go @@ -1,14 +1,11 @@ package alert import ( - "bytes" "context" - "io/ioutil" - "net/http" + "io" "net/url" "sync" "testing" - "time" "github.com/prometheus/prometheus/pkg/labels" @@ -50,98 +47,86 @@ func assertSameHosts(t *testing.T, expected []*url.URL, found []*url.URL) { } } -func TestSender_Send_OK(t *testing.T) { - var ( - expectedHosts = []*url.URL{{Host: "am1:9090"}, {Host: "am2:9090"}} - spottedHosts []*url.URL - spottedMu sync.Mutex - ) +type fakeClient struct { + urls []*url.URL + postf func(u *url.URL) error + mtx sync.Mutex + seen []*url.URL +} - okDo := func(req *http.Request) (response *http.Response, e error) { - spottedMu.Lock() - defer spottedMu.Unlock() +func (f *fakeClient) Endpoints() []*url.URL { + return f.urls +} - spottedHosts = append(spottedHosts, req.URL) +func (f *fakeClient) Do(ctx context.Context, u *url.URL, r io.Reader) error { + f.mtx.Lock() + defer f.mtx.Unlock() + f.seen = append(f.seen, u) + if f.postf == nil { + return nil + } + return f.postf(u) +} - return &http.Response{ - Body: ioutil.NopCloser(bytes.NewBuffer(nil)), - StatusCode: http.StatusOK, - }, nil +func TestSenderSendsOk(t *testing.T) { + poster := &fakeClient{ + urls: []*url.URL{{Host: "am1:9090"}, {Host: "am2:9090"}}, } - s := NewSender(nil, nil, func() []*url.URL { return expectedHosts }, okDo, 10*time.Second) + s := NewSender(nil, nil, []AlertmanagerClient{poster}) s.Send(context.Background(), []*Alert{{}, {}}) - assertSameHosts(t, expectedHosts, spottedHosts) + assertSameHosts(t, poster.urls, poster.seen) - testutil.Equals(t, 2, int(promtestutil.ToFloat64(s.sent.WithLabelValues(expectedHosts[0].Host)))) - testutil.Equals(t, 0, int(promtestutil.ToFloat64(s.errs.WithLabelValues(expectedHosts[0].Host)))) + testutil.Equals(t, 2, int(promtestutil.ToFloat64(s.sent.WithLabelValues(poster.urls[0].Host)))) + testutil.Equals(t, 0, int(promtestutil.ToFloat64(s.errs.WithLabelValues(poster.urls[0].Host)))) - testutil.Equals(t, 2, int(promtestutil.ToFloat64(s.sent.WithLabelValues(expectedHosts[1].Host)))) - testutil.Equals(t, 0, int(promtestutil.ToFloat64(s.errs.WithLabelValues(expectedHosts[1].Host)))) + testutil.Equals(t, 2, int(promtestutil.ToFloat64(s.sent.WithLabelValues(poster.urls[1].Host)))) + testutil.Equals(t, 0, int(promtestutil.ToFloat64(s.errs.WithLabelValues(poster.urls[1].Host)))) testutil.Equals(t, 0, int(promtestutil.ToFloat64(s.dropped))) } -func TestSender_Send_OneFails(t *testing.T) { - var ( - expectedHosts = []*url.URL{{Host: "am1:9090"}, {Host: "am2:9090"}} - spottedHosts []*url.URL - spottedMu sync.Mutex - ) - - do := func(req *http.Request) (response *http.Response, e error) { - spottedMu.Lock() - defer spottedMu.Unlock() - - spottedHosts = append(spottedHosts, req.URL) - - if req.Host == expectedHosts[0].Host { - return nil, errors.New("no such host") - } - return &http.Response{ - Body: ioutil.NopCloser(bytes.NewBuffer(nil)), - StatusCode: http.StatusOK, - }, nil +func TestSenderSendsOneFails(t *testing.T) { + poster := &fakeClient{ + urls: []*url.URL{{Host: "am1:9090"}, {Host: "am2:9090"}}, + postf: func(u *url.URL) error { + if u.Host == "am1:9090" { + return errors.New("no such host") + } + return nil + }, } - s := NewSender(nil, nil, func() []*url.URL { return expectedHosts }, do, 10*time.Second) + s := NewSender(nil, nil, []AlertmanagerClient{poster}) s.Send(context.Background(), []*Alert{{}, {}}) - assertSameHosts(t, expectedHosts, spottedHosts) + assertSameHosts(t, poster.urls, poster.seen) - testutil.Equals(t, 0, int(promtestutil.ToFloat64(s.sent.WithLabelValues(expectedHosts[0].Host)))) - testutil.Equals(t, 1, int(promtestutil.ToFloat64(s.errs.WithLabelValues(expectedHosts[0].Host)))) + testutil.Equals(t, 0, int(promtestutil.ToFloat64(s.sent.WithLabelValues(poster.urls[0].Host)))) + testutil.Equals(t, 1, int(promtestutil.ToFloat64(s.errs.WithLabelValues(poster.urls[0].Host)))) - testutil.Equals(t, 2, int(promtestutil.ToFloat64(s.sent.WithLabelValues(expectedHosts[1].Host)))) - testutil.Equals(t, 0, int(promtestutil.ToFloat64(s.errs.WithLabelValues(expectedHosts[1].Host)))) + testutil.Equals(t, 2, int(promtestutil.ToFloat64(s.sent.WithLabelValues(poster.urls[1].Host)))) + testutil.Equals(t, 0, int(promtestutil.ToFloat64(s.errs.WithLabelValues(poster.urls[1].Host)))) testutil.Equals(t, 0, int(promtestutil.ToFloat64(s.dropped))) } -func TestSender_Send_AllFails(t *testing.T) { - var ( - expectedHosts = []*url.URL{{Host: "am1:9090"}, {Host: "am2:9090"}} - spottedHosts []*url.URL - spottedMu sync.Mutex - ) - - do := func(req *http.Request) (response *http.Response, e error) { - spottedMu.Lock() - defer spottedMu.Unlock() - - spottedHosts = append(spottedHosts, req.URL) - - return nil, errors.New("no such host") +func TestSenderSendsAllFail(t *testing.T) { + poster := &fakeClient{ + urls: []*url.URL{{Host: "am1:9090"}, {Host: "am2:9090"}}, + postf: func(u *url.URL) error { + return errors.New("no such host") + }, } - s := NewSender(nil, nil, func() []*url.URL { return expectedHosts }, do, 10*time.Second) + s := NewSender(nil, nil, []AlertmanagerClient{poster}) s.Send(context.Background(), []*Alert{{}, {}}) - assertSameHosts(t, expectedHosts, spottedHosts) + assertSameHosts(t, poster.urls, poster.seen) - testutil.Equals(t, 0, int(promtestutil.ToFloat64(s.sent.WithLabelValues(expectedHosts[0].Host)))) - testutil.Equals(t, 1, int(promtestutil.ToFloat64(s.errs.WithLabelValues(expectedHosts[0].Host)))) + testutil.Equals(t, 0, int(promtestutil.ToFloat64(s.sent.WithLabelValues(poster.urls[0].Host)))) + testutil.Equals(t, 1, int(promtestutil.ToFloat64(s.errs.WithLabelValues(poster.urls[0].Host)))) - testutil.Equals(t, 0, int(promtestutil.ToFloat64(s.sent.WithLabelValues(expectedHosts[1].Host)))) - testutil.Equals(t, 1, int(promtestutil.ToFloat64(s.errs.WithLabelValues(expectedHosts[1].Host)))) + testutil.Equals(t, 0, int(promtestutil.ToFloat64(s.sent.WithLabelValues(poster.urls[1].Host)))) + testutil.Equals(t, 1, int(promtestutil.ToFloat64(s.errs.WithLabelValues(poster.urls[1].Host)))) testutil.Equals(t, 2, int(promtestutil.ToFloat64(s.dropped))) } diff --git a/pkg/alert/client.go b/pkg/alert/client.go new file mode 100644 index 0000000000..f58b995a2b --- /dev/null +++ b/pkg/alert/client.go @@ -0,0 +1,343 @@ +package alert + +import ( + "context" + "fmt" + "io" + "net" + "net/http" + "net/url" + "path" + "strconv" + "strings" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/pkg/errors" + config_util "github.com/prometheus/common/config" + "github.com/prometheus/common/model" + "github.com/prometheus/common/version" + "github.com/prometheus/prometheus/discovery/file" + "github.com/prometheus/prometheus/discovery/targetgroup" + "gopkg.in/yaml.v2" + + "github.com/thanos-io/thanos/pkg/discovery/cache" + "github.com/thanos-io/thanos/pkg/discovery/dns" + "github.com/thanos-io/thanos/pkg/runutil" +) + +const ( + defaultAlertmanagerPort = 9093 + alertPushEndpoint = "/api/v1/alerts" + contentTypeJSON = "application/json" +) + +var userAgent = fmt.Sprintf("Thanos/%s", version.Version) + +type AlertingConfig struct { + Alertmanagers []AlertmanagerConfig `yaml:"alertmanagers"` +} + +// AlertmanagerConfig represents a client to a cluster of Alertmanager endpoints. +// TODO(simonpasquier): add support for API version (v1 or v2). +type AlertmanagerConfig struct { + // HTTP client configuration. + HTTPClientConfig HTTPClientConfig `yaml:"http_config"` + + // List of addresses with DNS prefixes. + StaticAddresses []string `yaml:"static_configs"` + // List of file configurations (our FileSD supports different DNS lookups). + FileSDConfigs []FileSDConfig `yaml:"file_sd_configs"` + + // The URL scheme to use when talking to Alertmanagers. + Scheme string `yaml:"scheme"` + + // Path prefix to add in front of the push endpoint path. + PathPrefix string `yaml:"path_prefix"` + + // The timeout used when sending alerts (default: 10s). + Timeout model.Duration `yaml:"timeout"` +} + +type HTTPClientConfig struct { + // The HTTP basic authentication credentials for the targets. + BasicAuth BasicAuth `yaml:"basic_auth"` + // The bearer token for the targets. + BearerToken string `yaml:"bearer_token"` + // The bearer token file for the targets. + BearerTokenFile string `yaml:"bearer_token_file"` + // HTTP proxy server to use to connect to the targets. + ProxyURL string `yaml:"proxy_url"` + // TLSConfig to use to connect to the targets. + TLSConfig TLSConfig `yaml:"tls_config"` +} + +type TLSConfig struct { + // The CA cert to use for the targets. + CAFile string `yaml:"ca_file"` + // The client cert file for the targets. + CertFile string `yaml:"cert_file"` + // The client key file for the targets. + KeyFile string `yaml:"key_file"` + // Used to verify the hostname for the targets. + ServerName string `yaml:"server_name"` + // Disable target certificate validation. + InsecureSkipVerify bool `yaml:"insecure_skip_verify"` +} + +type BasicAuth struct { + Username string `yaml:"username"` + Password string `yaml:"password"` + PasswordFile string `yaml:"password_file"` +} + +func (b BasicAuth) IsZero() bool { + return b.Username == "" && b.Password == "" && b.PasswordFile == "" +} + +func (c HTTPClientConfig) convert() (config_util.HTTPClientConfig, error) { + httpClientConfig := config_util.HTTPClientConfig{ + BearerToken: config_util.Secret(c.BearerToken), + BearerTokenFile: c.BearerTokenFile, + TLSConfig: config_util.TLSConfig{ + CAFile: c.TLSConfig.CAFile, + CertFile: c.TLSConfig.CertFile, + KeyFile: c.TLSConfig.KeyFile, + ServerName: c.TLSConfig.ServerName, + InsecureSkipVerify: c.TLSConfig.InsecureSkipVerify, + }, + } + if c.ProxyURL != "" { + var proxy config_util.URL + err := yaml.Unmarshal([]byte(c.ProxyURL), &proxy) + if err != nil { + return httpClientConfig, err + } + httpClientConfig.ProxyURL = proxy + } + if !c.BasicAuth.IsZero() { + httpClientConfig.BasicAuth = &config_util.BasicAuth{ + Username: c.BasicAuth.Username, + Password: config_util.Secret(c.BasicAuth.Password), + PasswordFile: c.BasicAuth.PasswordFile, + } + } + return httpClientConfig, httpClientConfig.Validate() +} + +type FileSDConfig struct { + Files []string `yaml:"files"` + RefreshInterval model.Duration `yaml:"refresh_interval"` +} + +func (c FileSDConfig) convert() (file.SDConfig, error) { + var fileSDConfig file.SDConfig + b, err := yaml.Marshal(c) + if err != nil { + return fileSDConfig, err + } + err = yaml.Unmarshal(b, &fileSDConfig) + if err != nil { + return fileSDConfig, err + } + return fileSDConfig, nil +} + +func DefaultAlertmanagerConfig() AlertmanagerConfig { + return AlertmanagerConfig{ + Scheme: "http", + Timeout: model.Duration(time.Second * 10), + StaticAddresses: []string{}, + FileSDConfigs: []FileSDConfig{}, + } +} + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (c *AlertmanagerConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + *c = DefaultAlertmanagerConfig() + type plain AlertmanagerConfig + return unmarshal((*plain)(c)) +} + +type AddressProvider interface { + Resolve(context.Context, []string) + Addresses() []string +} + +// Alertmanager represents an HTTP client that can send alerts to a cluster of Alertmanager endpoints. +type Alertmanager struct { + logger log.Logger + + client *http.Client + timeout time.Duration + scheme string + prefix string + + staticAddresses []string + fileSDCache *cache.Cache + fileDiscoverers []*file.Discovery + + provider AddressProvider +} + +// NewAlertmanager returns a new Alertmanager client. +func NewAlertmanager(logger log.Logger, cfg AlertmanagerConfig, provider AddressProvider) (*Alertmanager, error) { + if logger == nil { + logger = log.NewNopLogger() + } + + httpClientConfig, err := cfg.HTTPClientConfig.convert() + if err != nil { + return nil, err + } + client, err := config_util.NewClientFromConfig(httpClientConfig, "alertmanager", false) + if err != nil { + return nil, err + } + + var discoverers []*file.Discovery + for _, sdCfg := range cfg.FileSDConfigs { + fileSDCfg, err := sdCfg.convert() + if err != nil { + return nil, err + } + discoverers = append(discoverers, file.NewDiscovery(&fileSDCfg, logger)) + } + return &Alertmanager{ + logger: logger, + client: client, + scheme: cfg.Scheme, + prefix: cfg.PathPrefix, + timeout: time.Duration(cfg.Timeout), + staticAddresses: cfg.StaticAddresses, + fileSDCache: cache.New(), + fileDiscoverers: discoverers, + provider: provider, + }, nil +} + +// LoadAlertmanagerConfigs loads a list of AlertmanagerConfig from YAML data. +func LoadAlertingConfig(confYaml []byte) (AlertingConfig, error) { + var cfg AlertingConfig + if err := yaml.UnmarshalStrict(confYaml, &cfg); err != nil { + return cfg, err + } + return cfg, nil +} + +// BuildAlertmanagerConfig initializes and returns an Alertmanager client configuration from a static address. +func BuildAlertmanagerConfig(logger log.Logger, address string, timeout time.Duration) (AlertmanagerConfig, error) { + parsed, err := url.Parse(address) + if err != nil { + return AlertmanagerConfig{}, err + } + + scheme := parsed.Scheme + host := parsed.Host + for _, qType := range []dns.QType{dns.A, dns.SRV, dns.SRVNoA} { + prefix := string(qType) + "+" + if strings.HasPrefix(strings.ToLower(scheme), prefix) { + // Scheme is of the form "+". + scheme = strings.TrimPrefix(scheme, prefix) + host = prefix + parsed.Host + if qType == dns.A { + if _, _, err := net.SplitHostPort(parsed.Host); err != nil { + // The host port could be missing. Append the defaultAlertmanagerPort. + host = host + ":" + strconv.Itoa(defaultAlertmanagerPort) + } + } + break + } + } + var basicAuth BasicAuth + if parsed.User != nil && parsed.User.String() != "" { + basicAuth.Username = parsed.User.Username() + pw, _ := parsed.User.Password() + basicAuth.Password = pw + } + + return AlertmanagerConfig{ + PathPrefix: parsed.Path, + Scheme: scheme, + StaticAddresses: []string{host}, + Timeout: model.Duration(timeout), + HTTPClientConfig: HTTPClientConfig{ + BasicAuth: basicAuth, + }, + }, nil +} + +// Endpoints returns the list of known Alertmanager endpoints. +func (a *Alertmanager) Endpoints() []*url.URL { + var urls []*url.URL + for _, addr := range a.provider.Addresses() { + urls = append(urls, + &url.URL{ + Scheme: a.scheme, + Host: addr, + Path: path.Join("/", a.prefix, alertPushEndpoint), + }, + ) + } + return urls +} + +// Do sends a POST request to the given URL. +func (a *Alertmanager) Do(ctx context.Context, u *url.URL, r io.Reader) error { + req, err := http.NewRequest("POST", u.String(), r) + if err != nil { + return err + } + ctx, cancel := context.WithTimeout(ctx, a.timeout) + defer cancel() + req = req.WithContext(ctx) + req.Header.Set("Content-Type", contentTypeJSON) + req.Header.Set("User-Agent", userAgent) + + resp, err := a.client.Do(req) + if err != nil { + return errors.Wrapf(err, "send request to %q", u) + } + defer runutil.ExhaustCloseWithLogOnErr(a.logger, resp.Body, "send one alert") + + if resp.StatusCode/100 != 2 { + return errors.Errorf("bad response status %v from %q", resp.Status, u) + } + return nil +} + +// Discover runs the service to discover target endpoints. +func (a *Alertmanager) Discover(ctx context.Context) { + var wg sync.WaitGroup + ch := make(chan []*targetgroup.Group) + + for _, d := range a.fileDiscoverers { + wg.Add(1) + go func(d *file.Discovery) { + d.Run(ctx, ch) + wg.Done() + }(d) + } + + func() { + for { + select { + case update := <-ch: + // Discoverers sometimes send nil updates so need to check for it to avoid panics. + if update == nil { + continue + } + a.fileSDCache.Update(update) + case <-ctx.Done(): + return + } + } + }() + wg.Wait() +} + +// Resolve refreshes and resolves the list of Alertmanager targets. +func (a *Alertmanager) Resolve(ctx context.Context) { + a.provider.Resolve(ctx, append(a.fileSDCache.Addresses(), a.staticAddresses...)) +} diff --git a/pkg/alert/client_test.go b/pkg/alert/client_test.go new file mode 100644 index 0000000000..8b29d51c9e --- /dev/null +++ b/pkg/alert/client_test.go @@ -0,0 +1,88 @@ +package alert + +import ( + "testing" + "time" + + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestBuildAlertmanagerConfiguration(t *testing.T) { + for _, tc := range []struct { + address string + + err bool + expected AlertmanagerConfig + }{ + { + address: "http://localhost:9093", + expected: AlertmanagerConfig{ + StaticAddresses: []string{"localhost:9093"}, + Scheme: "http", + }, + }, + { + address: "https://am.example.com", + expected: AlertmanagerConfig{ + StaticAddresses: []string{"am.example.com"}, + Scheme: "https", + }, + }, + { + address: "dns+http://localhost:9093", + expected: AlertmanagerConfig{ + StaticAddresses: []string{"dns+localhost:9093"}, + Scheme: "http", + }, + }, + { + address: "dnssrv+http://localhost", + expected: AlertmanagerConfig{ + StaticAddresses: []string{"dnssrv+localhost"}, + Scheme: "http", + }, + }, + { + address: "ssh+http://localhost", + expected: AlertmanagerConfig{ + StaticAddresses: []string{"localhost"}, + Scheme: "ssh+http", + }, + }, + { + address: "dns+https://localhost/path/prefix/", + expected: AlertmanagerConfig{ + StaticAddresses: []string{"dns+localhost:9093"}, + Scheme: "https", + PathPrefix: "/path/prefix/", + }, + }, + { + address: "http://user:pass@localhost:9093", + expected: AlertmanagerConfig{ + HTTPClientConfig: HTTPClientConfig{ + BasicAuth: BasicAuth{ + Username: "user", + Password: "pass", + }, + }, + StaticAddresses: []string{"localhost:9093"}, + Scheme: "http", + }, + }, + { + address: "://user:pass@localhost:9093", + err: true, + }, + } { + t.Run(tc.address, func(t *testing.T) { + cfg, err := BuildAlertmanagerConfig(nil, tc.address, time.Duration(0)) + if tc.err { + testutil.NotOk(t, err) + return + } + + testutil.Equals(t, tc.expected, cfg) + }) + } +} diff --git a/pkg/discovery/dns/provider.go b/pkg/discovery/dns/provider.go index 148143ea4d..332135ba17 100644 --- a/pkg/discovery/dns/provider.go +++ b/pkg/discovery/dns/provider.go @@ -76,6 +76,18 @@ func NewProvider(logger log.Logger, reg prometheus.Registerer, resolverType Reso return p } +// Clone returns a new provider from an existing one. +func (p *Provider) Clone() *Provider { + return &Provider{ + resolver: p.resolver, + resolved: make(map[string][]string), + logger: p.logger, + resolverAddrs: p.resolverAddrs, + resolverLookupsCount: p.resolverLookupsCount, + resolverFailuresCount: p.resolverFailuresCount, + } +} + // Resolve stores a list of provided addresses or their DNS records if requested. // Addresses prefixed with `dns+` or `dnssrv+` will be resolved through respective DNS lookup (A/AAAA or SRV). // defaultPort is used for non-SRV records when a port is not supplied. diff --git a/scripts/cfggen/main.go b/scripts/cfggen/main.go index 9051848a38..2d5cec20e1 100644 --- a/scripts/cfggen/main.go +++ b/scripts/cfggen/main.go @@ -12,6 +12,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" + "github.com/thanos-io/thanos/pkg/alert" "github.com/thanos-io/thanos/pkg/objstore/azure" "github.com/thanos-io/thanos/pkg/objstore/client" "github.com/thanos-io/thanos/pkg/objstore/cos" @@ -71,13 +72,20 @@ func main() { os.Exit(1) } } + + alertmgrCfg := alert.DefaultAlertmanagerConfig() + alertmgrCfg.FileSDConfigs = []alert.FileSDConfig{alert.FileSDConfig{}} + if err := generate(alert.AlertingConfig{Alertmanagers: []alert.AlertmanagerConfig{alertmgrCfg}}, "rule_alerting", *outputDir); err != nil { + level.Error(logger).Log("msg", "failed to generate", "type", "rule_alerting", "err", err) + os.Exit(1) + } logger.Log("msg", "success") } func generate(obj interface{}, typ string, outputDir string) error { // We forbid omitempty option. This is for simplification for doc generation. if err := checkForOmitEmptyTagOption(obj); err != nil { - return err + return errors.Wrap(err, "invalid type") } out, err := yaml.Marshal(obj) @@ -95,15 +103,15 @@ func checkForOmitEmptyTagOption(obj interface{}) error { func checkForOmitEmptyTagOptionRec(v reflect.Value) error { switch v.Kind() { case reflect.Struct: - for i := 0; i < v.NumField(); i += 1 { + for i := 0; i < v.NumField(); i++ { tags, err := structtag.Parse(string(v.Type().Field(i).Tag)) if err != nil { - return err + return errors.Wrapf(err, "%s: failed to parse tag %q", v.Type().Field(i).Name, v.Type().Field(i).Tag) } tag, err := tags.Get("yaml") if err != nil { - return err + return errors.Wrapf(err, "%s: failed to get tag %q", v.Type().Field(i).Name, v.Type().Field(i).Tag) } for _, opts := range tag.Options { @@ -113,16 +121,12 @@ func checkForOmitEmptyTagOptionRec(v reflect.Value) error { } if err := checkForOmitEmptyTagOptionRec(v.Field(i)); err != nil { - return err + return errors.Wrapf(err, "%s", v.Type().Field(i).Name) } } case reflect.Ptr: - if !v.IsValid() { - return errors.New("nil pointers are not allowed in configuration.") - } - - return errors.New("nil pointers are not allowed in configuration.") + return errors.New("nil pointers are not allowed in configuration") case reflect.Interface: return checkForOmitEmptyTagOptionRec(v.Elem()) diff --git a/test/e2e/rule_test.go b/test/e2e/rule_test.go index 5784938aa8..7cfe508f53 100644 --- a/test/e2e/rule_test.go +++ b/test/e2e/rule_test.go @@ -1,22 +1,30 @@ package e2e_test import ( + "bytes" "context" "encoding/json" + "encoding/pem" "fmt" "io/ioutil" "math" "net/http" + "net/http/httptest" "os" "path/filepath" "sort" + "sync" "testing" "time" "github.com/pkg/errors" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/timestamp" + yaml "gopkg.in/yaml.v2" + + "github.com/thanos-io/thanos/pkg/alert" "github.com/thanos-io/thanos/pkg/promclient" rapi "github.com/thanos-io/thanos/pkg/rule/api" "github.com/thanos-io/thanos/pkg/runutil" @@ -62,10 +70,314 @@ func createRuleFiles(t *testing.T, dir string) { } } +func serializeAlertingConfiguration(t *testing.T, cfg ...alert.AlertmanagerConfig) []byte { + t.Helper() + amCfg := alert.AlertingConfig{ + Alertmanagers: cfg, + } + b, err := yaml.Marshal(&amCfg) + if err != nil { + t.Errorf("failed to serialize alerting configuration: %v", err) + } + return b +} + +type mockAlertmanager struct { + path string + token string + mtx sync.Mutex + alerts []*model.Alert + lastError error +} + +func newMockAlertmanager(path string, token string) *mockAlertmanager { + return &mockAlertmanager{ + path: path, + token: token, + alerts: make([]*model.Alert, 0), + } +} + +func (m *mockAlertmanager) setLastError(err error) { + m.mtx.Lock() + defer m.mtx.Unlock() + m.lastError = err +} + +func (m *mockAlertmanager) LastError() error { + m.mtx.Lock() + defer m.mtx.Unlock() + return m.lastError +} + +func (m *mockAlertmanager) Alerts() []*model.Alert { + m.mtx.Lock() + defer m.mtx.Unlock() + return m.alerts +} + +func (m *mockAlertmanager) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + if req.Method != "POST" { + m.setLastError(errors.Errorf("invalid method: %s", req.Method)) + resp.WriteHeader(http.StatusMethodNotAllowed) + return + } + + if req.URL.Path != m.path { + m.setLastError(errors.Errorf("invalid path: %s", req.URL.Path)) + resp.WriteHeader(http.StatusNotFound) + return + } + + if m.token != "" { + auth := req.Header.Get("Authorization") + if auth != fmt.Sprintf("Bearer %s", m.token) { + m.setLastError(errors.Errorf("invalid auth: %s", req.URL.Path)) + resp.WriteHeader(http.StatusForbidden) + return + } + } + + b, err := ioutil.ReadAll(req.Body) + if err != nil { + m.setLastError(err) + resp.WriteHeader(http.StatusInternalServerError) + return + } + + var alerts []*model.Alert + if err := json.Unmarshal(b, &alerts); err != nil { + m.setLastError(err) + resp.WriteHeader(http.StatusInternalServerError) + return + } + + m.mtx.Lock() + m.alerts = append(m.alerts, alerts...) + m.mtx.Unlock() +} + +// TestRuleAlertmanagerHTTPClient verifies that Thanos Ruler can send alerts to +// Alertmanager in various setups: +// * Plain HTTP. +// * HTTPS with custom CA. +// * API with a prefix. +// * API protected by bearer token authentication. +// +// Because Alertmanager supports HTTP only and no authentication, the test uses +// a mocked server instead of the "real" Alertmanager service. +// The other end-to-end tests exercise against the "real" Alertmanager +// implementation. +func TestRuleAlertmanagerHTTPClient(t *testing.T) { + a := newLocalAddresser() + + // Plain HTTP with a prefix. + handler1 := newMockAlertmanager("/prefix/api/v1/alerts", "") + srv1 := httptest.NewServer(handler1) + defer srv1.Close() + // HTTPS with authentication. + handler2 := newMockAlertmanager("/api/v1/alerts", "secret") + srv2 := httptest.NewTLSServer(handler2) + defer srv2.Close() + + // Write the server's certificate to disk for the alerting configuration. + tlsDir, err := ioutil.TempDir("", "tls") + defer os.RemoveAll(tlsDir) + testutil.Ok(t, err) + var out bytes.Buffer + err = pem.Encode(&out, &pem.Block{Type: "CERTIFICATE", Bytes: srv2.TLS.Certificates[0].Certificate[0]}) + testutil.Ok(t, err) + caFile := filepath.Join(tlsDir, "ca.crt") + err = ioutil.WriteFile(caFile, out.Bytes(), 0640) + testutil.Ok(t, err) + + amCfg := serializeAlertingConfiguration( + t, + alert.AlertmanagerConfig{ + StaticAddresses: []string{srv1.Listener.Addr().String()}, + Scheme: "http", + Timeout: model.Duration(time.Second), + PathPrefix: "/prefix/", + }, + alert.AlertmanagerConfig{ + HTTPClientConfig: alert.HTTPClientConfig{ + TLSConfig: alert.TLSConfig{ + CAFile: caFile, + }, + BearerToken: "secret", + }, + StaticAddresses: []string{srv2.Listener.Addr().String()}, + Scheme: "https", + Timeout: model.Duration(time.Second), + }, + ) + + rulesDir, err := ioutil.TempDir("", "rules") + defer os.RemoveAll(rulesDir) + testutil.Ok(t, err) + createRuleFiles(t, rulesDir) + + qAddr := a.New() + r := rule(a.New(), a.New(), rulesDir, amCfg, []address{qAddr}, nil) + q := querier(qAddr, a.New(), []address{r.GRPC}, nil) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + exit, err := e2eSpinup(t, ctx, q, r) + if err != nil { + t.Errorf("spinup failed: %v", err) + cancel() + return + } + + defer func() { + cancel() + <-exit + }() + + testutil.Ok(t, runutil.Retry(5*time.Second, ctx.Done(), func() (err error) { + select { + case <-exit: + cancel() + return nil + default: + } + + for i, am := range []*mockAlertmanager{handler1, handler2} { + if len(am.Alerts()) == 0 { + return errors.Errorf("no alert received from handler%d, last error: %v", i, am.LastError()) + } + } + + return nil + })) +} + +func TestRuleAlertmanagerFileSD(t *testing.T) { + a := newLocalAddresser() + + am := alertManager(a.New()) + amDir, err := ioutil.TempDir("", "am") + defer os.RemoveAll(amDir) + testutil.Ok(t, err) + amCfg := serializeAlertingConfiguration( + t, + alert.AlertmanagerConfig{ + FileSDConfigs: []alert.FileSDConfig{ + alert.FileSDConfig{ + Files: []string{filepath.Join(amDir, "*.yaml")}, + RefreshInterval: model.Duration(time.Hour), + }, + }, + Scheme: "http", + Timeout: model.Duration(time.Second), + }, + ) + + rulesDir, err := ioutil.TempDir("", "rules") + defer os.RemoveAll(rulesDir) + testutil.Ok(t, err) + createRuleFiles(t, rulesDir) + + qAddr := a.New() + r := rule(a.New(), a.New(), rulesDir, amCfg, []address{qAddr}, nil) + q := querier(qAddr, a.New(), []address{r.GRPC}, nil) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + exit, err := e2eSpinup(t, ctx, am, q, r) + if err != nil { + t.Errorf("spinup failed: %v", err) + cancel() + return + } + + defer func() { + cancel() + <-exit + }() + + // Wait for a couple of evaluations and make sure that Alertmanager didn't receive anything. + testutil.Ok(t, runutil.Retry(5*time.Second, ctx.Done(), func() (err error) { + select { + case <-exit: + cancel() + return nil + default: + } + + // The time series written for the firing alerting rule must be queryable. + res, warnings, err := promclient.QueryInstant(ctx, nil, urlParse(t, q.HTTP.URL()), "max(count_over_time(ALERTS[1m])) > 2", time.Now(), promclient.QueryOptions{ + Deduplicate: false, + }) + if err != nil { + return err + } + if len(warnings) > 0 { + return errors.Errorf("unexpected warnings %s", warnings) + } + if len(res) == 0 { + return errors.Errorf("empty result") + } + + alrts, err := queryAlertmanagerAlerts(ctx, am.HTTP.URL()) + if err != nil { + return err + } + if len(alrts) != 0 { + return errors.Errorf("unexpected alerts length %d", len(alrts)) + } + + return nil + })) + + // Add the Alertmanager address to the file SD directory. + fileSDPath := filepath.Join(amDir, "targets.yaml") + b, err := yaml.Marshal([]*targetgroup.Group{ + &targetgroup.Group{ + Targets: []model.LabelSet{ + model.LabelSet{ + model.LabelName(model.AddressLabel): model.LabelValue(am.HTTP.HostPort()), + }, + }, + }, + }) + testutil.Ok(t, err) + + testutil.Ok(t, ioutil.WriteFile(fileSDPath+".tmp", b, 0660)) + testutil.Ok(t, os.Rename(fileSDPath+".tmp", fileSDPath)) + + // Verify that alerts are received by Alertmanager. + testutil.Ok(t, runutil.Retry(5*time.Second, ctx.Done(), func() (err error) { + select { + case <-exit: + cancel() + return nil + default: + } + alrts, err := queryAlertmanagerAlerts(ctx, am.HTTP.URL()) + if err != nil { + return err + } + if len(alrts) == 0 { + return errors.Errorf("expecting alerts") + } + + return nil + })) +} + func TestRule(t *testing.T) { a := newLocalAddresser() am := alertManager(a.New()) + amCfg := serializeAlertingConfiguration( + t, + alert.AlertmanagerConfig{ + StaticAddresses: []string{am.HTTP.HostPort()}, + Scheme: "http", + Timeout: model.Duration(time.Second), + }, + ) + qAddr := a.New() rulesDir, err := ioutil.TempDir("", "rules") @@ -73,8 +385,8 @@ func TestRule(t *testing.T) { testutil.Ok(t, err) createRuleFiles(t, rulesDir) - r1 := rule(a.New(), a.New(), rulesDir, am.HTTP, []address{qAddr}, nil) - r2 := rule(a.New(), a.New(), rulesDir, am.HTTP, nil, []address{qAddr}) + r1 := rule(a.New(), a.New(), rulesDir, amCfg, []address{qAddr}, nil) + r2 := rule(a.New(), a.New(), rulesDir, amCfg, nil, []address{qAddr}) q := querier(qAddr, a.New(), []address{r1.GRPC, r2.GRPC}, nil) @@ -282,19 +594,25 @@ func (a *failingStoreAPI) LabelValues(context.Context, *storepb.LabelValuesReque // Test Ruler behaviour on different storepb.PartialResponseStrategy when having partial response from single `failingStoreAPI`. func TestRulePartialResponse(t *testing.T) { - dir, err := ioutil.TempDir("", "test_rulepartial_response") - testutil.Ok(t, err) - defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - a := newLocalAddresser() qAddr := a.New() f := fakeStoreAPI(a.New(), &failingStoreAPI{}) am := alertManager(a.New()) + amCfg := serializeAlertingConfiguration( + t, + alert.AlertmanagerConfig{ + StaticAddresses: []string{am.HTTP.HostPort()}, + Scheme: "http", + Timeout: model.Duration(time.Second), + }, + ) + rulesDir, err := ioutil.TempDir("", "rules") defer os.RemoveAll(rulesDir) testutil.Ok(t, err) - r := rule(a.New(), a.New(), rulesDir, am.HTTP, []address{qAddr}, nil) + + r := rule(a.New(), a.New(), rulesDir, amCfg, []address{qAddr}, nil) q := querier(qAddr, a.New(), []address{r.GRPC, f.GRPC}, nil) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) diff --git a/test/e2e/spinup_test.go b/test/e2e/spinup_test.go index 4eea79e5fc..721eb70403 100644 --- a/test/e2e/spinup_test.go +++ b/test/e2e/spinup_test.go @@ -305,7 +305,7 @@ receivers: } } -func rule(http, grpc address, ruleDir string, am address, queryAddresses []address, queryFileSDAddresses []address) *serverScheduler { +func rule(http, grpc address, ruleDir string, amCfg []byte, queryAddresses []address, queryFileSDAddresses []address) *serverScheduler { return &serverScheduler{ HTTP: http, GRPC: grpc, @@ -317,12 +317,14 @@ func rule(http, grpc address, ruleDir string, am address, queryAddresses []addre "--data-dir", filepath.Join(workDir, "data"), "--rule-file", filepath.Join(ruleDir, "*.yaml"), "--eval-interval", "1s", - "--alertmanagers.url", am.URL(), + "--alertmanagers.config", string(amCfg), + "--alertmanagers.sd-dns-interval", "5s", "--grpc-address", grpc.HostPort(), "--grpc-grace-period", "0s", "--http-address", http.HostPort(), "--log.level", "debug", "--query.sd-dns-interval", "5s", + "--resend-delay", "5s", } for _, addr := range queryAddresses {