From 69083ffd51984ccecefac5dd6507d999d5b85ac4 Mon Sep 17 00:00:00 2001 From: Simon Pasquier Date: Mon, 16 Dec 2019 15:55:58 +0100 Subject: [PATCH] Address Bartek's comments Signed-off-by: Simon Pasquier --- CHANGELOG.md | 5 ++--- cmd/thanos/rule.go | 16 +++++++-------- docs/components/rule.md | 4 +++- pkg/alert/alert.go | 19 ++++++++++-------- pkg/alert/alert_test.go | 19 +++++++++--------- pkg/alert/client.go | 11 ++++------ pkg/alert/client_test.go | 4 ++++ test/e2e/rule_test.go | 43 ++++++++++++++++------------------------ 8 files changed, 59 insertions(+), 62 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4c66b9a5445..35cdc6f7fd2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,11 +18,10 @@ We use *breaking* word for marking changes that are not backward compatible (rel ### Added - [#1852](https://github.com/thanos-io/thanos/pull/1852) Add support for `AWS_CONTAINER_CREDENTIALS_FULL_URI` by upgrading to minio-go v6.0.44 - [#1854](https://github.com/thanos-io/thanos/pull/1854) Update Rule UI to support alerts count displaying and filtering. - -### Added - - [#1838](https://github.com/thanos-io/thanos/pull/1838) Ruler: Add TLS and authentication support for Alertmanager with the `--alertmanagers.config` and `--alertmanagers.config-file` CLI flags. See [documentation](docs/components/rule.md/#configuration) for further information. +- [#1838](https://github.com/thanos-io/thanos/pull/1838) Ruler: Add a new `--alertmanagers.sd-dns-interval` CLI option to specify the interval between DNS resolutions of Alertmanager hosts. + ## [v0.9.0](https://github.com/thanos-io/thanos/releases/tag/v0.9.0) - 2019.12.03 ### Added diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 0b756e5f25e..2a0e9e9969f 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -296,14 +296,14 @@ func runRule( return err } var ( - alertingcfg alert.AlertingConfig + alertingCfg alert.AlertingConfig alertmgrs []*alert.Alertmanager ) if len(alertmgrsConfigYAML) > 0 { if len(alertmgrURLs) != 0 { return errors.New("--alertmanagers.url and --alertmanagers.config* flags cannot be defined at the same time") } - alertingcfg, err = alert.LoadAlertingConfig(alertmgrsConfigYAML) + alertingCfg, err = alert.LoadAlertingConfig(alertmgrsConfigYAML) if err != nil { return err } @@ -314,13 +314,13 @@ func runRule( if err != nil { return err } - alertingcfg.Alertmanagers = append(alertingcfg.Alertmanagers, cfg) + alertingCfg.Alertmanagers = append(alertingCfg.Alertmanagers, cfg) } } - if len(alertingcfg.Alertmanagers) == 0 { + if len(alertingCfg.Alertmanagers) == 0 { level.Warn(logger).Log("msg", "no alertmanager configured") } - for _, cfg := range alertingcfg.Alertmanagers { + for _, cfg := range alertingCfg.Alertmanagers { am, err := alert.NewAlertmanager(logger, cfg) if err != nil { return err @@ -421,11 +421,11 @@ func runRule( } // Run the alert sender. { - doers := make([]alert.AlertmanagerDoer, len(alertmgrs)) + clients := make([]alert.AlertmanagerClient, len(alertmgrs)) for i := range alertmgrs { - doers[i] = alertmgrs[i] + clients[i] = alertmgrs[i] } - sdr := alert.NewSender(logger, reg, doers) + sdr := alert.NewSender(logger, reg, clients) ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { diff --git a/docs/components/rule.md b/docs/components/rule.md index 565223681fd..7f8807a9062 100644 --- a/docs/components/rule.md +++ b/docs/components/rule.md @@ -290,7 +290,9 @@ Flags: ### Alertmanager -The configuration format supported by the `--alertmanagers.config` and `--alertmanagers.config-file` flags is the following: +The `--alertmanagers.config` and `--alertmanagers.config-file` flags allow specifying multiple Alertmanagers. Those entries are treated as a single HA group. This means that alert send failure is claimed only if the Ruler fails to send to all instances. + +The configuration format is the following: [embedmd]:# (../flags/config_rule_alerting.txt yaml) ```yaml diff --git a/pkg/alert/alert.go b/pkg/alert/alert.go index c443f715558..2a9e2e8ab85 100644 --- a/pkg/alert/alert.go +++ b/pkg/alert/alert.go @@ -2,9 +2,11 @@ package alert import ( + "bytes" "context" "encoding/json" "fmt" + "io" "net/url" "sync" "sync/atomic" @@ -238,15 +240,15 @@ func (q *Queue) Push(alerts []*Alert) { } } -type AlertmanagerDoer interface { +type AlertmanagerClient interface { Endpoints() []*url.URL - Do(context.Context, *url.URL, []byte) error + Do(context.Context, *url.URL, io.Reader) error } // Sender sends notifications to a dynamic set of alertmanagers. type Sender struct { logger log.Logger - alertmanagers []AlertmanagerDoer + alertmanagers []AlertmanagerClient sent *prometheus.CounterVec errs *prometheus.CounterVec @@ -259,7 +261,7 @@ type Sender struct { func NewSender( logger log.Logger, reg prometheus.Registerer, - alertmanagers []AlertmanagerDoer, + alertmanagers []AlertmanagerClient, ) *Sender { if logger == nil { logger = log.NewNopLogger() @@ -294,7 +296,7 @@ func NewSender( return s } -// Send an alert batch to all given Alertmanager client. +// Send an alert batch to all given Alertmanager clients. // TODO(bwplotka): https://github.com/thanos-io/thanos/issues/660. func (s *Sender) Send(ctx context.Context, alerts []*Alert) { if len(alerts) == 0 { @@ -313,17 +315,18 @@ func (s *Sender) Send(ctx context.Context, alerts []*Alert) { for _, amc := range s.alertmanagers { for _, u := range amc.Endpoints() { wg.Add(1) - go func(amc AlertmanagerDoer, u *url.URL) { + go func(amc AlertmanagerClient, u *url.URL) { defer wg.Done() level.Debug(s.logger).Log("msg", "sending alerts", "alertmanager", u.Host, "numAlerts", len(alerts)) start := time.Now() - if err := amc.Do(ctx, u, b); err != nil { + if err := amc.Do(ctx, u, bytes.NewReader(b)); err != nil { level.Warn(s.logger).Log( "msg", "sending alerts failed", "alertmanager", u.Host, "numAlerts", len(alerts), - "err", err) + "err", err, + ) s.errs.WithLabelValues(u.Host).Inc() return } diff --git a/pkg/alert/alert_test.go b/pkg/alert/alert_test.go index 8797d9f8f2d..7e1e851d80c 100644 --- a/pkg/alert/alert_test.go +++ b/pkg/alert/alert_test.go @@ -2,6 +2,7 @@ package alert import ( "context" + "io" "net/url" "sync" "testing" @@ -46,18 +47,18 @@ func assertSameHosts(t *testing.T, expected []*url.URL, found []*url.URL) { } } -type fakeDoer struct { +type fakeClient struct { urls []*url.URL postf func(u *url.URL) error mtx sync.Mutex seen []*url.URL } -func (f *fakeDoer) Endpoints() []*url.URL { +func (f *fakeClient) Endpoints() []*url.URL { return f.urls } -func (f *fakeDoer) Do(ctx context.Context, u *url.URL, b []byte) error { +func (f *fakeClient) Do(ctx context.Context, u *url.URL, r io.Reader) error { f.mtx.Lock() defer f.mtx.Unlock() f.seen = append(f.seen, u) @@ -68,10 +69,10 @@ func (f *fakeDoer) Do(ctx context.Context, u *url.URL, b []byte) error { } func TestSenderSendsOk(t *testing.T) { - poster := &fakeDoer{ + poster := &fakeClient{ urls: []*url.URL{{Host: "am1:9090"}, {Host: "am2:9090"}}, } - s := NewSender(nil, nil, []AlertmanagerDoer{poster}) + s := NewSender(nil, nil, []AlertmanagerClient{poster}) s.Send(context.Background(), []*Alert{{}, {}}) @@ -86,7 +87,7 @@ func TestSenderSendsOk(t *testing.T) { } func TestSenderSendsOneFails(t *testing.T) { - poster := &fakeDoer{ + poster := &fakeClient{ urls: []*url.URL{{Host: "am1:9090"}, {Host: "am2:9090"}}, postf: func(u *url.URL) error { if u.Host == "am1:9090" { @@ -95,7 +96,7 @@ func TestSenderSendsOneFails(t *testing.T) { return nil }, } - s := NewSender(nil, nil, []AlertmanagerDoer{poster}) + s := NewSender(nil, nil, []AlertmanagerClient{poster}) s.Send(context.Background(), []*Alert{{}, {}}) @@ -110,13 +111,13 @@ func TestSenderSendsOneFails(t *testing.T) { } func TestSenderSendsAllFail(t *testing.T) { - poster := &fakeDoer{ + poster := &fakeClient{ urls: []*url.URL{{Host: "am1:9090"}, {Host: "am2:9090"}}, postf: func(u *url.URL) error { return errors.New("no such host") }, } - s := NewSender(nil, nil, []AlertmanagerDoer{poster}) + s := NewSender(nil, nil, []AlertmanagerClient{poster}) s.Send(context.Background(), []*Alert{{}, {}}) diff --git a/pkg/alert/client.go b/pkg/alert/client.go index fbd216a24f2..35694b2ed5b 100644 --- a/pkg/alert/client.go +++ b/pkg/alert/client.go @@ -1,9 +1,9 @@ package alert import ( - "bytes" "context" "fmt" + "io" "net" "net/http" "net/url" @@ -158,10 +158,7 @@ func DefaultAlertmanagerConfig() AlertmanagerConfig { func (c *AlertmanagerConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { *c = DefaultAlertmanagerConfig() type plain AlertmanagerConfig - if err := unmarshal((*plain)(c)); err != nil { - return err - } - return nil + return unmarshal((*plain)(c)) } // Alertmanager represents an HTTP client that can send alerts to a cluster of Alertmanager endpoints. @@ -279,8 +276,8 @@ func (a *Alertmanager) Endpoints() []*url.URL { } // Post sends a POST request to the given URL. -func (a *Alertmanager) Do(ctx context.Context, u *url.URL, b []byte) error { - req, err := http.NewRequest("POST", u.String(), bytes.NewReader(b)) +func (a *Alertmanager) Do(ctx context.Context, u *url.URL, r io.Reader) error { + req, err := http.NewRequest("POST", u.String(), r) if err != nil { return err } diff --git a/pkg/alert/client_test.go b/pkg/alert/client_test.go index 060e218e051..3fd94c9404b 100644 --- a/pkg/alert/client_test.go +++ b/pkg/alert/client_test.go @@ -75,6 +75,10 @@ func TestBuildAlertmanagerConfiguration(t *testing.T) { Scheme: "http", }, }, + { + address: "://user:pass@localhost:9093", + err: true, + }, } { t.Run(tc.address, func(t *testing.T) { cfg, err := BuildAlertmanagerConfig(nil, tc.address, time.Duration(0)) diff --git a/test/e2e/rule_test.go b/test/e2e/rule_test.go index de28a5aa9ad..89f1a4ee49a 100644 --- a/test/e2e/rule_test.go +++ b/test/e2e/rule_test.go @@ -82,28 +82,6 @@ func serializeAlertingConfiguration(t *testing.T, cfg ...alert.AlertmanagerConfi return b } -func writeAlertmanagerFileSD(t *testing.T, path string, addrs ...string) { - group := targetgroup.Group{Targets: []model.LabelSet{}} - for _, addr := range addrs { - group.Targets = append(group.Targets, model.LabelSet{model.LabelName(model.AddressLabel): model.LabelValue(addr)}) - } - - b, err := yaml.Marshal([]*targetgroup.Group{&group}) - if err != nil { - t.Errorf("failed to serialize file SD configuration: %v", err) - return - } - - err = ioutil.WriteFile(path+".tmp", b, 0660) - if err != nil { - t.Errorf("failed to write file SD configuration: %v", err) - return - } - - err = os.Rename(path+".tmp", path) - testutil.Ok(t, err) -} - type mockAlertmanager struct { path string token string @@ -232,7 +210,7 @@ func TestRuleAlertmanagerHTTPClient(t *testing.T) { r := rule(a.New(), a.New(), rulesDir, amCfg, []address{qAddr}, nil) q := querier(qAddr, a.New(), []address{r.GRPC}, nil) - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) exit, err := e2eSpinup(t, ctx, q, r) if err != nil { t.Errorf("spinup failed: %v", err) @@ -306,7 +284,7 @@ func TestRuleAlertmanagerFileSD(t *testing.T) { <-exit }() - // Wait for a couple of evaluations. + // Wait for a couple of evaluations and make sure that Alertmanager didn't receive anything. testutil.Ok(t, runutil.Retry(5*time.Second, ctx.Done(), func() (err error) { select { case <-exit: @@ -340,8 +318,21 @@ func TestRuleAlertmanagerFileSD(t *testing.T) { return nil })) - // Update the Alertmanager file service discovery configuration. - writeAlertmanagerFileSD(t, filepath.Join(amDir, "targets.yaml"), am.HTTP.HostPort()) + // Add the Alertmanager address to the file SD directory. + fileSDPath := filepath.Join(amDir, "targets.yaml") + b, err := yaml.Marshal([]*targetgroup.Group{ + &targetgroup.Group{ + Targets: []model.LabelSet{ + model.LabelSet{ + model.LabelName(model.AddressLabel): model.LabelValue(am.HTTP.HostPort()), + }, + }, + }, + }) + testutil.Ok(t, err) + + testutil.Ok(t, ioutil.WriteFile(fileSDPath+".tmp", b, 0660)) + testutil.Ok(t, os.Rename(fileSDPath+".tmp", fileSDPath)) // Verify that alerts are received by Alertmanager. testutil.Ok(t, runutil.Retry(5*time.Second, ctx.Done(), func() (err error) {