Skip to content

Commit

Permalink
Added alert compliance test for Thanos (#5315)
Browse files Browse the repository at this point in the history
* test: Added Alert compatibilty test.

Signed-off-by: Bartlomiej Plotka <[email protected]>

* Tmp.

Signed-off-by: Bartlomiej Plotka <[email protected]>

* Update.

Signed-off-by: Bartlomiej Plotka <[email protected]>

* update.

Signed-off-by: Bartlomiej Plotka <[email protected]>

* update.

Signed-off-by: Bartlomiej Plotka <[email protected]>

* e2e: Refactored service helpers for newest e2e version.

Signed-off-by: Bartlomiej Plotka <[email protected]>

* Removed alert combatibiltiy test for now.

Signed-off-by: Bartlomiej Plotka <[email protected]>

* e2e: Added test for compatibility.

Signed-off-by: Bartlomiej Plotka <[email protected]>

* Added Querier /alerts API.

Signed-off-by: Bartlomiej Plotka <[email protected]>

* e2e:Added replica labels.

Signed-off-by: Bartlomiej Plotka <[email protected]>

* Option to remove replica-label.

Signed-off-by: Bartlomiej Plotka <[email protected]>

* skip.

Signed-off-by: Bartlomiej Plotka <[email protected]>

* Use stateful ruler and default resend delay

Signed-off-by: Matej Gera <[email protected]>

* Update docs

Signed-off-by: Matej Gera <[email protected]>

Co-authored-by: Matej Gera <[email protected]>
  • Loading branch information
bwplotka and matej-g authored Jun 21, 2022
1 parent d2cf622 commit b3eac41
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 18 deletions.
2 changes: 1 addition & 1 deletion cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func registerRule(app *extkingpin.App) {
walCompression := cmd.Flag("tsdb.wal-compression", "Compress the tsdb WAL.").Default("true").Bool()

cmd.Flag("data-dir", "data directory").Default("data/").StringVar(&conf.dataDir)
cmd.Flag("rule-file", "Rule files that should be used by rule manager. Can be in glob format (repeated).").
cmd.Flag("rule-file", "Rule files that should be used by rule manager. Can be in glob format (repeated). Note that rules are not automatically detected, use SIGHUP or do HTTP POST /-/reload to re-read them.").
Default("rules/").StringsVar(&conf.ruleFiles)
cmd.Flag("resend-delay", "Minimum amount of time to wait before resending an alert to Alertmanager.").
Default("1m").DurationVar(&conf.resendDelay)
Expand Down
5 changes: 4 additions & 1 deletion docs/components/rule.md
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,10 @@ Flags:
--resend-delay=1m Minimum amount of time to wait before resending
an alert to Alertmanager.
--rule-file=rules/ ... Rule files that should be used by rule manager.
Can be in glob format (repeated).
Can be in glob format (repeated). Note that
rules are not automatically detected, use
SIGHUP or do HTTP POST /-/reload to re-read
them.
--shipper.upload-compacted
If true shipper will try to upload compacted
blocks as well. Useful for migration purposes.
Expand Down
45 changes: 45 additions & 0 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func (qapi *QueryAPI) Register(r *route.Router, tracer opentracing.Tracer, logge

r.Get("/stores", instr("stores", qapi.stores))

r.Get("/alerts", instr("alerts", NewAlertsHandler(qapi.ruleGroups, qapi.enableRulePartialResponse)))
r.Get("/rules", instr("rules", NewRulesHandler(qapi.ruleGroups, qapi.enableRulePartialResponse)))

r.Get("/targets", instr("targets", NewTargetsHandler(qapi.targets, qapi.enableTargetPartialResponse)))
Expand Down Expand Up @@ -762,6 +763,50 @@ func NewTargetsHandler(client targets.UnaryClient, enablePartialResponse bool) f
}
}

// NewAlertsHandler created handler compatible with HTTP /api/v1/alerts https://prometheus.io/docs/prometheus/latest/querying/api/#alerts
// which uses gRPC Unary Rules API (Rules API works for both /alerts and /rules).
func NewAlertsHandler(client rules.UnaryClient, enablePartialResponse bool) func(*http.Request) (interface{}, []error, *api.ApiError) {
ps := storepb.PartialResponseStrategy_ABORT
if enablePartialResponse {
ps = storepb.PartialResponseStrategy_WARN
}

return func(r *http.Request) (interface{}, []error, *api.ApiError) {
span, ctx := tracing.StartSpan(r.Context(), "receive_http_request")
defer span.Finish()

var (
groups *rulespb.RuleGroups
warnings storage.Warnings
err error
)

// TODO(bwplotka): Allow exactly the same functionality as query API: passing replica, dedup and partial response as HTTP params as well.
req := &rulespb.RulesRequest{
Type: rulespb.RulesRequest_ALERT,
PartialResponseStrategy: ps,
}
tracing.DoInSpan(ctx, "retrieve_rules", func(ctx context.Context) {
groups, warnings, err = client.Rules(ctx, req)
})
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Errorf("error retrieving rules: %v", err)}
}

var resp struct{ Alerts []*rulespb.AlertInstance }
for _, g := range groups.Groups {
for _, r := range g.Rules {
a := r.GetAlert()
if a == nil {
continue
}
resp.Alerts = append(resp.Alerts, a.Alerts...)
}
}
return resp, warnings, nil
}
}

// NewRulesHandler created handler compatible with HTTP /api/v1/rules https://prometheus.io/docs/prometheus/latest/querying/api/#rules
// which uses gRPC Unary Rules API.
func NewRulesHandler(client rules.UnaryClient, enablePartialResponse bool) func(*http.Request) (interface{}, []error, *api.ApiError) {
Expand Down
6 changes: 2 additions & 4 deletions pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1713,7 +1713,7 @@ func TestRulesHandler(t *testing.T) {
Type: "alerting",
},
}
var tests = []test{
for _, test := range []test{
{
response: &testpromcompatibility.RuleDiscovery{
RuleGroups: []*testpromcompatibility.RuleGroup{
Expand Down Expand Up @@ -1770,9 +1770,7 @@ func TestRulesHandler(t *testing.T) {
},
},
},
}

for _, test := range tests {
} {
t.Run(fmt.Sprintf("endpoint=%s/method=%s/query=%q", "rules", http.MethodGet, test.query.Encode()), func(t *testing.T) {
// Build a context with the correct request params.
ctx := context.Background()
Expand Down
96 changes: 96 additions & 0 deletions test/e2e/compatibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,20 @@
package e2e_test

import (
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"path/filepath"
"testing"
"time"

"github.com/efficientgo/e2e"
e2edb "github.com/efficientgo/e2e/db"
"github.com/thanos-io/thanos/pkg/alert"
"github.com/thanos-io/thanos/pkg/httpconfig"
"github.com/thanos-io/thanos/pkg/testutil"
"github.com/thanos-io/thanos/test/e2e/e2ethanos"
)
Expand Down Expand Up @@ -110,3 +116,93 @@ query_tweaks:
return ret
}()
}

// TestAlertCompliance tests Alert compatibility against https://github.com/prometheus/compliance/blob/main/alert_generator.
// NOTE: This requires a dockerization of compliance framework: https://github.com/prometheus/compliance/pull/46
func TestAlertCompliance(t *testing.T) {
t.Skip("This is an interactive test, using https://github.com/prometheus/compliance/tree/main/alert_generator. This tool is not optimized for CI runs (e.g. it infinitely retries, takes 38 minutes)")

t.Run("stateful ruler", func(t *testing.T) {
e, err := e2e.NewDockerEnvironment("alert_compatibility")
testutil.Ok(t, err)
t.Cleanup(e.Close)

// Start receive + Querier.
receive := e2ethanos.NewReceiveBuilder(e, "receive").WithIngestionEnabled().Init()
querierBuilder := e2ethanos.NewQuerierBuilder(e, "query")

compliance := e.Runnable("alert_generator_compliance_tester").WithPorts(map[string]int{"http": 8080}).Init(e2e.StartOptions{
Image: "alert_generator_compliance_tester:latest",
Command: e2e.NewCommandRunUntilStop(),
})

rFuture := e2ethanos.NewRulerBuilder(e, "1")
ruler := rFuture.WithAlertManagerConfig([]alert.AlertmanagerConfig{
{
EndpointsConfig: httpconfig.EndpointsConfig{
StaticAddresses: []string{compliance.InternalEndpoint("http")},
Scheme: "http",
},
Timeout: amTimeout,
APIVersion: alert.APIv1,
},
}).
// Use default resend delay and eval interval, as the compliance spec requires this.
WithResendDelay("1m").
WithEvalInterval("1m").
InitTSDB(filepath.Join(rFuture.InternalDir(), "rules"), []httpconfig.Config{
{
EndpointsConfig: httpconfig.EndpointsConfig{
StaticAddresses: []string{
querierBuilder.InternalEndpoint("http"),
},
Scheme: "http",
},
},
})

query := querierBuilder.
WithStoreAddresses(receive.InternalEndpoint("grpc")).
WithRuleAddresses(ruler.InternalEndpoint("grpc")).
// We deduplicate by this, since alert compatibility tool requires clean metric without labels
// attached by receivers.
WithReplicaLabels("receive", "tenant_id").
Init()
testutil.Ok(t, e2e.StartAndWaitReady(receive, query, ruler, compliance))

// Pull rules.yaml:
{
var stdout bytes.Buffer
testutil.Ok(t, compliance.Exec(e2e.NewCommand("cat", "/rules.yaml"), e2e.WithExecOptionStdout(&stdout)))
testutil.Ok(t, os.MkdirAll(filepath.Join(ruler.Dir(), "rules"), os.ModePerm))
testutil.Ok(t, os.WriteFile(filepath.Join(ruler.Dir(), "rules", "rules.yaml"), stdout.Bytes(), os.ModePerm))

// Reload ruler.
resp, err := http.Post("http://"+ruler.Endpoint("http")+"/-/reload", "", nil)
testutil.Ok(t, err)
defer func() {
_, _ = io.Copy(io.Discard, resp.Body)
_ = resp.Body.Close()
}()
testutil.Equals(t, http.StatusOK, resp.StatusCode)
}
testutil.Ok(t, ioutil.WriteFile(filepath.Join(compliance.Dir(), "test-thanos.yaml"), []byte(alertCompatConfig(receive, query)), os.ModePerm))

fmt.Println(alertCompatConfig(receive, query))

testutil.Ok(t, compliance.Exec(e2e.NewCommand(
"/alert_generator_compliance_tester", "-config-file", filepath.Join(compliance.InternalDir(), "test-thanos.yaml")),
))
})
}

// nolint (it's still used in skipped test).
func alertCompatConfig(receive e2e.Runnable, query e2e.Runnable) string {
return fmt.Sprintf(`settings:
remote_write_url: '%s'
query_base_url: 'http://%s'
rules_and_alerts_api_base_url: 'http://%s'
alert_reception_server_port: 8080
alert_message_parser: default
`, e2ethanos.RemoteWriteEndpoint(receive.InternalEndpoint("remote-write")), query.InternalEndpoint("http"), query.InternalEndpoint("http"))
}
45 changes: 33 additions & 12 deletions test/e2e/e2ethanos/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ type QuerierBuilder struct {
enableFeatures []string
endpoints []string

replicaLabels []string
tracingConfig string

e2e.Linkable
Expand All @@ -200,6 +201,7 @@ func NewQuerierBuilder(e e2e.Environment, name string, storeAddresses ...string)
name: name,
storeAddresses: storeAddresses,
image: DefaultImage(),
replicaLabels: []string{replicaLabel},
}
}

Expand Down Expand Up @@ -263,6 +265,12 @@ func (q *QuerierBuilder) WithTracingConfig(tracingConfig string) *QuerierBuilder
return q
}

// WithReplicaLabels replaces default [replica] replica label configuration for the querier.
func (q *QuerierBuilder) WithReplicaLabels(labels ...string) *QuerierBuilder {
q.replicaLabels = labels
return q
}

func (q *QuerierBuilder) Init() e2e.InstrumentedRunnable {
args, err := q.collectArgs()
if err != nil {
Expand All @@ -284,40 +292,36 @@ func (q *QuerierBuilder) collectArgs() ([]string, error) {
"--grpc-address": ":9091",
"--grpc-grace-period": "0s",
"--http-address": ":8080",
"--query.replica-label": replicaLabel,
"--store.sd-dns-interval": "5s",
"--log.level": infoLogLevel,
"--query.max-concurrent": "1",
"--store.sd-interval": "5s",
})

for _, repl := range q.replicaLabels {
args = append(args, "--query.replica-label="+repl)
}
for _, addr := range q.storeAddresses {
args = append(args, "--store="+addr)
}

for _, addr := range q.ruleAddresses {
args = append(args, "--rule="+addr)
}

for _, addr := range q.targetAddresses {
args = append(args, "--target="+addr)
}

for _, addr := range q.metadataAddresses {
args = append(args, "--metadata="+addr)
}

for _, addr := range q.exemplarAddresses {
args = append(args, "--exemplar="+addr)
}

for _, feature := range q.enableFeatures {
args = append(args, "--enable-feature="+feature)
}

for _, addr := range q.endpoints {
args = append(args, "--endpoint="+addr)
}

if len(q.fileSDStoreAddresses) > 0 {
if err := os.MkdirAll(q.Dir(), 0750); err != nil {
return nil, errors.Wrap(err, "create query dir failed")
Expand All @@ -339,19 +343,15 @@ func (q *QuerierBuilder) collectArgs() ([]string, error) {

args = append(args, "--store.sd-files="+filepath.Join(q.InternalDir(), "filesd.yaml"))
}

if q.routePrefix != "" {
args = append(args, "--web.route-prefix="+q.routePrefix)
}

if q.externalPrefix != "" {
args = append(args, "--web.external-prefix="+q.externalPrefix)
}

if q.tracingConfig != "" {
args = append(args, "--tracing.config="+q.tracingConfig)
}

return args, nil
}

Expand Down Expand Up @@ -477,6 +477,8 @@ type RulerBuilder struct {
amCfg []alert.AlertmanagerConfig
replicaLabel string
image string
resendDelay string
evalInterval string
}

// NewRulerBuilder is a Ruler future that allows extra configuration before initialization.
Expand Down Expand Up @@ -507,6 +509,16 @@ func (r *RulerBuilder) WithReplicaLabel(replicaLabel string) *RulerBuilder {
return r
}

func (r *RulerBuilder) WithResendDelay(resendDelay string) *RulerBuilder {
r.resendDelay = resendDelay
return r
}

func (r *RulerBuilder) WithEvalInterval(evalInterval string) *RulerBuilder {
r.evalInterval = evalInterval
return r
}

func (r *RulerBuilder) InitTSDB(internalRuleDir string, queryCfg []httpconfig.Config) e2e.InstrumentedRunnable {
return r.initRule(internalRuleDir, queryCfg, nil)
}
Expand Down Expand Up @@ -550,6 +562,15 @@ func (r *RulerBuilder) initRule(internalRuleDir string, queryCfg []httpconfig.Co
if r.replicaLabel != "" {
ruleArgs["--label"] = fmt.Sprintf(`%s="%s"`, replicaLabel, r.replicaLabel)
}

if r.resendDelay != "" {
ruleArgs["--resend-delay"] = r.resendDelay
}

if r.evalInterval != "" {
ruleArgs["--eval-interval"] = r.evalInterval
}

if remoteWriteCfg != nil {
rwCfgBytes, err := yaml.Marshal(struct {
RemoteWriteConfigs []*config.RemoteWriteConfig `yaml:"remote_write,omitempty"`
Expand Down

0 comments on commit b3eac41

Please sign in to comment.