Skip to content

Commit

Permalink
Add end-to-end for Alertmanager file SD
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Pasquier <[email protected]>
  • Loading branch information
simonpasquier committed Dec 10, 2019
1 parent b514f36 commit 2782488
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 11 deletions.
6 changes: 5 additions & 1 deletion cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application) {
Strings()
alertmgrsTimeout := cmd.Flag("alertmanagers.send-timeout", "Timeout for sending alerts to Alertmanager").Default("10s").Duration()
alertmgrsConfig := extflag.RegisterPathOrContent(cmd, "alertmanagers.config", "YAML file that contains alerting configuration. See format details: https://thanos.io/components/rule.md/#configuration. If defined, it takes precedence over the '--alertmanagers.url' and '--alertmanagers.send-timeout' flags.", false)
alertmgrsDNSSDInterval := modelDuration(cmd.Flag("alertmanagers.sd-dns-interval", "Interval between DNS resolutions of Alertmanager hosts.").
Default("30s"))

alertQueryURL := cmd.Flag("alert.query-url", "The external Thanos Query URL that would be set in all alerts 'Source' field").String()

Expand Down Expand Up @@ -156,6 +158,7 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application) {
*alertmgrs,
*alertmgrsTimeout,
alertmgrsConfig,
time.Duration(*alertmgrsDNSSDInterval),
*grpcBindAddr,
time.Duration(*grpcGracePeriod),
*grpcCert,
Expand Down Expand Up @@ -194,6 +197,7 @@ func runRule(
alertmgrURLs []string,
alertmgrsTimeout time.Duration,
alertmgrsConfig *extflag.PathOrContent,
alertmgrsDNSSDInterval time.Duration,
grpcBindAddr string,
grpcGracePeriod time.Duration,
grpcCert string,
Expand Down Expand Up @@ -403,7 +407,7 @@ func runRule(
})

g.Add(func() error {
return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
return runutil.Repeat(alertmgrsDNSSDInterval, ctx.Done(), func() error {
if err := am.Update(ctx, resolver); err != nil {
level.Error(logger).Log("msg", "refreshing alertmanagers failed", "err", err)
alertMngrAddrResolutionErrors.Inc()
Expand Down
3 changes: 3 additions & 0 deletions docs/components/rule.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ Flags:
If defined, it takes precedence over the
'--alertmanagers.url' and
'--alertmanagers.send-timeout' flags.
--alertmanagers.sd-dns-interval=30s
Interval between DNS resolutions of
Alertmanager hosts.
--alert.query-url=ALERT.QUERY-URL
The external Thanos Query URL that would be set
in all alerts 'Source' field
Expand Down
1 change: 1 addition & 0 deletions pkg/alert/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ func (s *Sender) Send(ctx context.Context, alerts []*Alert) {
go func(amc AlertmanagerDoer, u *url.URL) {
defer wg.Done()

level.Debug(s.logger).Log("msg", "sending alerts", "alertmanager", u.Host, "numAlerts", len(alerts))
start := time.Now()
if err := amc.Do(ctx, u, b); err != nil {
level.Warn(s.logger).Log(
Expand Down
5 changes: 4 additions & 1 deletion pkg/alert/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -326,9 +327,10 @@ func (a *Alertmanager) Discover(ctx context.Context) {
func (a *Alertmanager) Update(ctx context.Context, resolver dns.Resolver) error {
var resolved []string
for _, addr := range append(a.fileSDCache.Addresses(), a.staticAddresses...) {
level.Debug(a.logger).Log("msg", "resolving address", "addr", addr)
qtypeAndName := strings.SplitN(addr, "+", 2)
if len(qtypeAndName) != 2 {
// No lookup needed. Add to the list and continue to the next address.
level.Debug(a.logger).Log("msg", "no lookup needed", "addr", addr)
resolved = append(resolved, addr)
continue
}
Expand All @@ -346,6 +348,7 @@ func (a *Alertmanager) Update(ctx context.Context, resolver dns.Resolver) error
if err != nil {
return errors.Wrap(err, "failed to resolve alertmanager address")
}
level.Debug(a.logger).Log("msg", "address resolved", "addr", addr, "resolved", strings.Join(addrs, ","))
resolved = append(resolved, addrs...)
}
a.mtx.Lock()
Expand Down
167 changes: 160 additions & 7 deletions test/e2e/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@ import (

"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/timestamp"
yaml "gopkg.in/yaml.v2"

"github.com/thanos-io/thanos/pkg/alert"
"github.com/thanos-io/thanos/pkg/promclient"
rapi "github.com/thanos-io/thanos/pkg/rule/api"
"github.com/thanos-io/thanos/pkg/runutil"
Expand Down Expand Up @@ -62,19 +66,162 @@ func createRuleFiles(t *testing.T, dir string) {
}
}

func serializeAlertingConfiguration(t *testing.T, cfg alert.AlertmanagerConfig) []byte {
t.Helper()
amCfg := alert.AlertingConfig{
Alertmanagers: []alert.AlertmanagerConfig{cfg},
}
b, err := yaml.Marshal(&amCfg)
if err != nil {
t.Errorf("failed to serialize alerting configuration: %v", err)
}
return b
}

func writeAlertmanagerFileSD(t *testing.T, path string, addrs ...string) {
group := targetgroup.Group{Targets: []model.LabelSet{}}
for _, addr := range addrs {
group.Targets = append(group.Targets, model.LabelSet{model.LabelName(model.AddressLabel): model.LabelValue(addr)})
}

b, err := yaml.Marshal([]*targetgroup.Group{&group})
if err != nil {
t.Errorf("failed to serialize file SD configuration: %v", err)
return
}

err = ioutil.WriteFile(path+".tmp", b, 0660)
if err != nil {
t.Errorf("failed to write file SD configuration: %v", err)
return
}

err = os.Rename(path+".tmp", path)
testutil.Ok(t, err)
}

func TestRuleAlertmanagerFileSD(t *testing.T) {
a := newLocalAddresser()

am := alertManager(a.New())
amDir, err := ioutil.TempDir("", "am")
defer os.RemoveAll(amDir)
testutil.Ok(t, err)
amCfg := serializeAlertingConfiguration(
t,
alert.AlertmanagerConfig{
FileSDConfigs: []alert.FileSDConfig{
alert.FileSDConfig{
Files: []string{filepath.Join(amDir, "*.yaml")},
RefreshInterval: model.Duration(time.Hour),
},
},
Scheme: "http",
Timeout: model.Duration(time.Second),
},
)

rulesDir, err := ioutil.TempDir("", "rules")
defer os.RemoveAll(rulesDir)
testutil.Ok(t, err)
createRuleFiles(t, rulesDir)

qAddr := a.New()
r := rule(a.New(), a.New(), rulesDir, amCfg, []address{qAddr}, nil)
q := querier(qAddr, a.New(), []address{r.GRPC}, nil)

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
exit, err := e2eSpinup(t, ctx, am, q, r)
if err != nil {
t.Errorf("spinup failed: %v", err)
cancel()
return
}

defer func() {
cancel()
<-exit
}()

// Wait for a couple of evaluations.
testutil.Ok(t, runutil.Retry(5*time.Second, ctx.Done(), func() (err error) {
select {
case <-exit:
cancel()
return nil
default:
}

// The time series written for the firing alerting rule must be queryable.
res, warnings, err := promclient.QueryInstant(ctx, nil, urlParse(t, q.HTTP.URL()), "max(count_over_time(ALERTS[1m])) > 2", time.Now(), promclient.QueryOptions{
Deduplicate: false,
})
if err != nil {
return err
}
if len(warnings) > 0 {
return errors.Errorf("unexpected warnings %s", warnings)
}
if len(res) == 0 {
return errors.Errorf("empty result")
}

alrts, err := queryAlertmanagerAlerts(ctx, am.HTTP.URL())
if err != nil {
return err
}
if len(alrts) != 0 {
return errors.Errorf("unexpected alerts length %d", len(alrts))
}

return nil
}))

// Update the Alertmanager file service discovery configuration.
writeAlertmanagerFileSD(t, filepath.Join(amDir, "targets.yaml"), am.HTTP.HostPort())

// Verify that alerts are received by Alertmanager.
testutil.Ok(t, runutil.Retry(5*time.Second, ctx.Done(), func() (err error) {
select {
case <-exit:
cancel()
return nil
default:
}
alrts, err := queryAlertmanagerAlerts(ctx, am.HTTP.URL())
if err != nil {
return err
}
if len(alrts) == 0 {
return errors.Errorf("expecting alerts")
}

return nil
}))
}

func TestRule(t *testing.T) {
a := newLocalAddresser()

am := alertManager(a.New())
amCfg := serializeAlertingConfiguration(
t,
alert.AlertmanagerConfig{
StaticAddresses: []string{am.HTTP.HostPort()},
Scheme: "http",
Timeout: model.Duration(time.Second),
},
)

qAddr := a.New()

rulesDir, err := ioutil.TempDir("", "rules")
defer os.RemoveAll(rulesDir)
testutil.Ok(t, err)
createRuleFiles(t, rulesDir)

r1 := rule(a.New(), a.New(), rulesDir, am.HTTP, []address{qAddr}, nil)
r2 := rule(a.New(), a.New(), rulesDir, am.HTTP, nil, []address{qAddr})
r1 := rule(a.New(), a.New(), rulesDir, amCfg, []address{qAddr}, nil)
r2 := rule(a.New(), a.New(), rulesDir, amCfg, nil, []address{qAddr})

q := querier(qAddr, a.New(), []address{r1.GRPC, r2.GRPC}, nil)

Expand Down Expand Up @@ -282,19 +429,25 @@ func (a *failingStoreAPI) LabelValues(context.Context, *storepb.LabelValuesReque

// Test Ruler behaviour on different storepb.PartialResponseStrategy when having partial response from single `failingStoreAPI`.
func TestRulePartialResponse(t *testing.T) {
dir, err := ioutil.TempDir("", "test_rulepartial_response")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

a := newLocalAddresser()
qAddr := a.New()

f := fakeStoreAPI(a.New(), &failingStoreAPI{})
am := alertManager(a.New())
amCfg := serializeAlertingConfiguration(
t,
alert.AlertmanagerConfig{
StaticAddresses: []string{am.HTTP.HostPort()},
Scheme: "http",
Timeout: model.Duration(time.Second),
},
)

rulesDir, err := ioutil.TempDir("", "rules")
defer os.RemoveAll(rulesDir)
testutil.Ok(t, err)
r := rule(a.New(), a.New(), rulesDir, am.HTTP, []address{qAddr}, nil)

r := rule(a.New(), a.New(), rulesDir, amCfg, []address{qAddr}, nil)
q := querier(qAddr, a.New(), []address{r.GRPC, f.GRPC}, nil)

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
Expand Down
6 changes: 4 additions & 2 deletions test/e2e/spinup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ receivers:
}
}

func rule(http, grpc address, ruleDir string, am address, queryAddresses []address, queryFileSDAddresses []address) *serverScheduler {
func rule(http, grpc address, ruleDir string, amCfg []byte, queryAddresses []address, queryFileSDAddresses []address) *serverScheduler {
return &serverScheduler{
HTTP: http,
GRPC: grpc,
Expand All @@ -317,12 +317,14 @@ func rule(http, grpc address, ruleDir string, am address, queryAddresses []addre
"--data-dir", filepath.Join(workDir, "data"),
"--rule-file", filepath.Join(ruleDir, "*.yaml"),
"--eval-interval", "1s",
"--alertmanagers.url", am.URL(),
"--alertmanagers.config", string(amCfg),
"--alertmanagers.sd-dns-interval", "5s",
"--grpc-address", grpc.HostPort(),
"--grpc-grace-period", "0s",
"--http-address", http.HostPort(),
"--log.level", "debug",
"--query.sd-dns-interval", "5s",
"--resend-delay", "5s",
}

for _, addr := range queryAddresses {
Expand Down

0 comments on commit 2782488

Please sign in to comment.