Skip to content

Commit

Permalink
Address Bartek's comments
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Pasquier <[email protected]>
  • Loading branch information
simonpasquier committed Dec 16, 2019
1 parent 0293619 commit 8150c9f
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 62 deletions.
5 changes: 2 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ We use *breaking* word for marking changes that are not backward compatible (rel
### Added
- [#1852](https://github.com/thanos-io/thanos/pull/1852) Add support for `AWS_CONTAINER_CREDENTIALS_FULL_URI` by upgrading to minio-go v6.0.44
- [#1854](https://github.com/thanos-io/thanos/pull/1854) Update Rule UI to support alerts count displaying and filtering.

### Added

- [#1838](https://github.com/thanos-io/thanos/pull/1838) Ruler: Add TLS and authentication support for Alertmanager with the `--alertmanagers.config` and `--alertmanagers.config-file` CLI flags. See [documentation](docs/components/rule.md/#configuration) for further information.

- [#1838](https://github.com/thanos-io/thanos/pull/1838) Ruler: Add a new `--alertmanagers.sd-dns-interval` CLI option to specify the interval between DNS resolutions of Alertmanager hosts.

## [v0.9.0](https://github.com/thanos-io/thanos/releases/tag/v0.9.0) - 2019.12.03

### Added
Expand Down
16 changes: 8 additions & 8 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,14 +296,14 @@ func runRule(
return err
}
var (
alertingcfg alert.AlertingConfig
alertingCfg alert.AlertingConfig
alertmgrs []*alert.Alertmanager
)
if len(alertmgrsConfigYAML) > 0 {
if len(alertmgrURLs) != 0 {
return errors.New("--alertmanagers.url and --alertmanagers.config* flags cannot be defined at the same time")
}
alertingcfg, err = alert.LoadAlertingConfig(alertmgrsConfigYAML)
alertingCfg, err = alert.LoadAlertingConfig(alertmgrsConfigYAML)
if err != nil {
return err
}
Expand All @@ -314,13 +314,13 @@ func runRule(
if err != nil {
return err
}
alertingcfg.Alertmanagers = append(alertingcfg.Alertmanagers, cfg)
alertingCfg.Alertmanagers = append(alertingCfg.Alertmanagers, cfg)
}
}
if len(alertingcfg.Alertmanagers) == 0 {
if len(alertingCfg.Alertmanagers) == 0 {
level.Warn(logger).Log("msg", "no alertmanager configured")
}
for _, cfg := range alertingcfg.Alertmanagers {
for _, cfg := range alertingCfg.Alertmanagers {
am, err := alert.NewAlertmanager(logger, cfg)
if err != nil {
return err
Expand Down Expand Up @@ -421,11 +421,11 @@ func runRule(
}
// Run the alert sender.
{
doers := make([]alert.AlertmanagerDoer, len(alertmgrs))
clients := make([]alert.AlertmanagerClient, len(alertmgrs))
for i := range alertmgrs {
doers[i] = alertmgrs[i]
clients[i] = alertmgrs[i]
}
sdr := alert.NewSender(logger, reg, doers)
sdr := alert.NewSender(logger, reg, clients)
ctx, cancel := context.WithCancel(context.Background())

g.Add(func() error {
Expand Down
4 changes: 3 additions & 1 deletion docs/components/rule.md
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,9 @@ Flags:

### Alertmanager

The configuration format supported by the `--alertmanagers.config` and `--alertmanagers.config-file` flags is the following:
The `--alertmanagers.config` and `--alertmanagers.config-file` flags allow specifying multiple Alertmanagers. Those entries are treated as a single HA group. This means that alert send failure is claimed only if the Ruler fails to send to all instances.

The configuration format is the following:

[embedmd]:# (../flags/config_rule_alerting.txt yaml)
```yaml
Expand Down
19 changes: 11 additions & 8 deletions pkg/alert/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
package alert

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/url"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -238,15 +240,15 @@ func (q *Queue) Push(alerts []*Alert) {
}
}

type AlertmanagerDoer interface {
type AlertmanagerClient interface {
Endpoints() []*url.URL
Do(context.Context, *url.URL, []byte) error
Do(context.Context, *url.URL, io.Reader) error
}

// Sender sends notifications to a dynamic set of alertmanagers.
type Sender struct {
logger log.Logger
alertmanagers []AlertmanagerDoer
alertmanagers []AlertmanagerClient

sent *prometheus.CounterVec
errs *prometheus.CounterVec
Expand All @@ -259,7 +261,7 @@ type Sender struct {
func NewSender(
logger log.Logger,
reg prometheus.Registerer,
alertmanagers []AlertmanagerDoer,
alertmanagers []AlertmanagerClient,
) *Sender {
if logger == nil {
logger = log.NewNopLogger()
Expand Down Expand Up @@ -294,7 +296,7 @@ func NewSender(
return s
}

// Send an alert batch to all given Alertmanager client.
// Send an alert batch to all given Alertmanager clients.
// TODO(bwplotka): https://github.com/thanos-io/thanos/issues/660.
func (s *Sender) Send(ctx context.Context, alerts []*Alert) {
if len(alerts) == 0 {
Expand All @@ -313,17 +315,18 @@ func (s *Sender) Send(ctx context.Context, alerts []*Alert) {
for _, amc := range s.alertmanagers {
for _, u := range amc.Endpoints() {
wg.Add(1)
go func(amc AlertmanagerDoer, u *url.URL) {
go func(amc AlertmanagerClient, u *url.URL) {
defer wg.Done()

level.Debug(s.logger).Log("msg", "sending alerts", "alertmanager", u.Host, "numAlerts", len(alerts))
start := time.Now()
if err := amc.Do(ctx, u, b); err != nil {
if err := amc.Do(ctx, u, bytes.NewReader(b)); err != nil {
level.Warn(s.logger).Log(
"msg", "sending alerts failed",
"alertmanager", u.Host,
"numAlerts", len(alerts),
"err", err)
"err", err,
)
s.errs.WithLabelValues(u.Host).Inc()
return
}
Expand Down
19 changes: 10 additions & 9 deletions pkg/alert/alert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package alert

import (
"context"
"io"
"net/url"
"sync"
"testing"
Expand Down Expand Up @@ -46,18 +47,18 @@ func assertSameHosts(t *testing.T, expected []*url.URL, found []*url.URL) {
}
}

type fakeDoer struct {
type fakeClient struct {
urls []*url.URL
postf func(u *url.URL) error
mtx sync.Mutex
seen []*url.URL
}

func (f *fakeDoer) Endpoints() []*url.URL {
func (f *fakeClient) Endpoints() []*url.URL {
return f.urls
}

func (f *fakeDoer) Do(ctx context.Context, u *url.URL, b []byte) error {
func (f *fakeClient) Do(ctx context.Context, u *url.URL, r io.Reader) error {
f.mtx.Lock()
defer f.mtx.Unlock()
f.seen = append(f.seen, u)
Expand All @@ -68,10 +69,10 @@ func (f *fakeDoer) Do(ctx context.Context, u *url.URL, b []byte) error {
}

func TestSenderSendsOk(t *testing.T) {
poster := &fakeDoer{
poster := &fakeClient{
urls: []*url.URL{{Host: "am1:9090"}, {Host: "am2:9090"}},
}
s := NewSender(nil, nil, []AlertmanagerDoer{poster})
s := NewSender(nil, nil, []AlertmanagerClient{poster})

s.Send(context.Background(), []*Alert{{}, {}})

Expand All @@ -86,7 +87,7 @@ func TestSenderSendsOk(t *testing.T) {
}

func TestSenderSendsOneFails(t *testing.T) {
poster := &fakeDoer{
poster := &fakeClient{
urls: []*url.URL{{Host: "am1:9090"}, {Host: "am2:9090"}},
postf: func(u *url.URL) error {
if u.Host == "am1:9090" {
Expand All @@ -95,7 +96,7 @@ func TestSenderSendsOneFails(t *testing.T) {
return nil
},
}
s := NewSender(nil, nil, []AlertmanagerDoer{poster})
s := NewSender(nil, nil, []AlertmanagerClient{poster})

s.Send(context.Background(), []*Alert{{}, {}})

Expand All @@ -110,13 +111,13 @@ func TestSenderSendsOneFails(t *testing.T) {
}

func TestSenderSendsAllFail(t *testing.T) {
poster := &fakeDoer{
poster := &fakeClient{
urls: []*url.URL{{Host: "am1:9090"}, {Host: "am2:9090"}},
postf: func(u *url.URL) error {
return errors.New("no such host")
},
}
s := NewSender(nil, nil, []AlertmanagerDoer{poster})
s := NewSender(nil, nil, []AlertmanagerClient{poster})

s.Send(context.Background(), []*Alert{{}, {}})

Expand Down
11 changes: 4 additions & 7 deletions pkg/alert/client.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package alert

import (
"bytes"
"context"
"io"
"net"
"net/http"
"net/url"
Expand Down Expand Up @@ -157,10 +157,7 @@ func DefaultAlertmanagerConfig() AlertmanagerConfig {
func (c *AlertmanagerConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
*c = DefaultAlertmanagerConfig()
type plain AlertmanagerConfig
if err := unmarshal((*plain)(c)); err != nil {
return err
}
return nil
return unmarshal((*plain)(c))
}

// Alertmanager represents an HTTP client that can send alerts to a cluster of Alertmanager endpoints.
Expand Down Expand Up @@ -278,8 +275,8 @@ func (a *Alertmanager) Endpoints() []*url.URL {
}

// Post sends a POST request to the given URL.
func (a *Alertmanager) Do(ctx context.Context, u *url.URL, b []byte) error {
req, err := http.NewRequest("POST", u.String(), bytes.NewReader(b))
func (a *Alertmanager) Do(ctx context.Context, u *url.URL, r io.Reader) error {
req, err := http.NewRequest("POST", u.String(), r)
if err != nil {
return err
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/alert/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ func TestBuildAlertmanagerConfiguration(t *testing.T) {
Scheme: "http",
},
},
{
address: "://user:pass@localhost:9093",
err: true,
},
} {
t.Run(tc.address, func(t *testing.T) {
cfg, err := BuildAlertmanagerConfig(nil, tc.address, time.Duration(0))
Expand Down
43 changes: 17 additions & 26 deletions test/e2e/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,28 +82,6 @@ func serializeAlertingConfiguration(t *testing.T, cfg ...alert.AlertmanagerConfi
return b
}

func writeAlertmanagerFileSD(t *testing.T, path string, addrs ...string) {
group := targetgroup.Group{Targets: []model.LabelSet{}}
for _, addr := range addrs {
group.Targets = append(group.Targets, model.LabelSet{model.LabelName(model.AddressLabel): model.LabelValue(addr)})
}

b, err := yaml.Marshal([]*targetgroup.Group{&group})
if err != nil {
t.Errorf("failed to serialize file SD configuration: %v", err)
return
}

err = ioutil.WriteFile(path+".tmp", b, 0660)
if err != nil {
t.Errorf("failed to write file SD configuration: %v", err)
return
}

err = os.Rename(path+".tmp", path)
testutil.Ok(t, err)
}

type mockAlertmanager struct {
path string
token string
Expand Down Expand Up @@ -232,7 +210,7 @@ func TestRuleAlertmanagerHTTPClient(t *testing.T) {
r := rule(a.New(), a.New(), rulesDir, amCfg, []address{qAddr}, nil)
q := querier(qAddr, a.New(), []address{r.GRPC}, nil)

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
exit, err := e2eSpinup(t, ctx, q, r)
if err != nil {
t.Errorf("spinup failed: %v", err)
Expand Down Expand Up @@ -306,7 +284,7 @@ func TestRuleAlertmanagerFileSD(t *testing.T) {
<-exit
}()

// Wait for a couple of evaluations.
// Wait for a couple of evaluations and make sure that Alertmanager didn't receive anything.
testutil.Ok(t, runutil.Retry(5*time.Second, ctx.Done(), func() (err error) {
select {
case <-exit:
Expand Down Expand Up @@ -340,8 +318,21 @@ func TestRuleAlertmanagerFileSD(t *testing.T) {
return nil
}))

// Update the Alertmanager file service discovery configuration.
writeAlertmanagerFileSD(t, filepath.Join(amDir, "targets.yaml"), am.HTTP.HostPort())
// Add the Alertmanager address to the file SD directory.
fileSDPath := filepath.Join(amDir, "targets.yaml")
b, err := yaml.Marshal([]*targetgroup.Group{
&targetgroup.Group{
Targets: []model.LabelSet{
model.LabelSet{
model.LabelName(model.AddressLabel): model.LabelValue(am.HTTP.HostPort()),
},
},
},
})
testutil.Ok(t, err)

testutil.Ok(t, ioutil.WriteFile(fileSDPath+".tmp", b, 0660))
testutil.Ok(t, os.Rename(fileSDPath+".tmp", fileSDPath))

// Verify that alerts are received by Alertmanager.
testutil.Ok(t, runutil.Retry(5*time.Second, ctx.Done(), func() (err error) {
Expand Down

0 comments on commit 8150c9f

Please sign in to comment.