Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added alert compliance test for Thanos #5315

Merged
merged 15 commits into from
Jun 21, 2022
2 changes: 1 addition & 1 deletion cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func registerRule(app *extkingpin.App) {
walCompression := cmd.Flag("tsdb.wal-compression", "Compress the tsdb WAL.").Default("true").Bool()

cmd.Flag("data-dir", "data directory").Default("data/").StringVar(&conf.dataDir)
cmd.Flag("rule-file", "Rule files that should be used by rule manager. Can be in glob format (repeated).").
cmd.Flag("rule-file", "Rule files that should be used by rule manager. Can be in glob format (repeated). Note that rules are not automatically detected, use SIGHUP or do HTTP POST /-/reload to re-read them.").
Default("rules/").StringsVar(&conf.ruleFiles)
cmd.Flag("resend-delay", "Minimum amount of time to wait before resending an alert to Alertmanager.").
Default("1m").DurationVar(&conf.resendDelay)
Expand Down
5 changes: 4 additions & 1 deletion docs/components/rule.md
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,10 @@ Flags:
--resend-delay=1m Minimum amount of time to wait before resending
an alert to Alertmanager.
--rule-file=rules/ ... Rule files that should be used by rule manager.
Can be in glob format (repeated).
Can be in glob format (repeated). Note that
rules are not automatically detected, use
SIGHUP or do HTTP POST /-/reload to re-read
them.
--shipper.upload-compacted
If true shipper will try to upload compacted
blocks as well. Useful for migration purposes.
Expand Down
45 changes: 45 additions & 0 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func (qapi *QueryAPI) Register(r *route.Router, tracer opentracing.Tracer, logge

r.Get("/stores", instr("stores", qapi.stores))

r.Get("/alerts", instr("alerts", NewAlertsHandler(qapi.ruleGroups, qapi.enableRulePartialResponse)))
r.Get("/rules", instr("rules", NewRulesHandler(qapi.ruleGroups, qapi.enableRulePartialResponse)))

r.Get("/targets", instr("targets", NewTargetsHandler(qapi.targets, qapi.enableTargetPartialResponse)))
Expand Down Expand Up @@ -762,6 +763,50 @@ func NewTargetsHandler(client targets.UnaryClient, enablePartialResponse bool) f
}
}

// NewAlertsHandler created handler compatible with HTTP /api/v1/alerts https://prometheus.io/docs/prometheus/latest/querying/api/#alerts
// which uses gRPC Unary Rules API (Rules API works for both /alerts and /rules).
func NewAlertsHandler(client rules.UnaryClient, enablePartialResponse bool) func(*http.Request) (interface{}, []error, *api.ApiError) {
ps := storepb.PartialResponseStrategy_ABORT
if enablePartialResponse {
ps = storepb.PartialResponseStrategy_WARN
}

return func(r *http.Request) (interface{}, []error, *api.ApiError) {
span, ctx := tracing.StartSpan(r.Context(), "receive_http_request")
defer span.Finish()

var (
groups *rulespb.RuleGroups
warnings storage.Warnings
err error
)

// TODO(bwplotka): Allow exactly the same functionality as query API: passing replica, dedup and partial response as HTTP params as well.
req := &rulespb.RulesRequest{
Type: rulespb.RulesRequest_ALERT,
PartialResponseStrategy: ps,
}
tracing.DoInSpan(ctx, "retrieve_rules", func(ctx context.Context) {
groups, warnings, err = client.Rules(ctx, req)
})
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Errorf("error retrieving rules: %v", err)}
}

var resp struct{ Alerts []*rulespb.AlertInstance }
for _, g := range groups.Groups {
for _, r := range g.Rules {
a := r.GetAlert()
if a == nil {
continue
}
resp.Alerts = append(resp.Alerts, a.Alerts...)
}
}
return resp, warnings, nil
}
}

// NewRulesHandler created handler compatible with HTTP /api/v1/rules https://prometheus.io/docs/prometheus/latest/querying/api/#rules
// which uses gRPC Unary Rules API.
func NewRulesHandler(client rules.UnaryClient, enablePartialResponse bool) func(*http.Request) (interface{}, []error, *api.ApiError) {
Expand Down
6 changes: 2 additions & 4 deletions pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1713,7 +1713,7 @@ func TestRulesHandler(t *testing.T) {
Type: "alerting",
},
}
var tests = []test{
for _, test := range []test{
{
response: &testpromcompatibility.RuleDiscovery{
RuleGroups: []*testpromcompatibility.RuleGroup{
Expand Down Expand Up @@ -1770,9 +1770,7 @@ func TestRulesHandler(t *testing.T) {
},
},
},
}

for _, test := range tests {
} {
t.Run(fmt.Sprintf("endpoint=%s/method=%s/query=%q", "rules", http.MethodGet, test.query.Encode()), func(t *testing.T) {
// Build a context with the correct request params.
ctx := context.Background()
Expand Down
96 changes: 96 additions & 0 deletions test/e2e/compatibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,20 @@
package e2e_test

import (
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"path/filepath"
"testing"
"time"

"github.com/efficientgo/e2e"
e2edb "github.com/efficientgo/e2e/db"
"github.com/thanos-io/thanos/pkg/alert"
"github.com/thanos-io/thanos/pkg/httpconfig"
"github.com/thanos-io/thanos/pkg/testutil"
"github.com/thanos-io/thanos/test/e2e/e2ethanos"
)
Expand Down Expand Up @@ -110,3 +116,93 @@ query_tweaks:
return ret
}()
}

// TestAlertCompliance tests Alert compatibility against https://github.com/prometheus/compliance/blob/main/alert_generator.
// NOTE: This requires a dockerization of compliance framework: https://github.com/prometheus/compliance/pull/46
func TestAlertCompliance(t *testing.T) {
t.Skip("This is an interactive test, using https://github.com/prometheus/compliance/tree/main/alert_generator. This tool is not optimized for CI runs (e.g. it infinitely retries, takes 38 minutes)")

t.Run("stateful ruler", func(t *testing.T) {
e, err := e2e.NewDockerEnvironment("alert_compatibility")
testutil.Ok(t, err)
t.Cleanup(e.Close)

// Start receive + Querier.
receive := e2ethanos.NewReceiveBuilder(e, "receive").WithIngestionEnabled().Init()
querierBuilder := e2ethanos.NewQuerierBuilder(e, "query")

compliance := e.Runnable("alert_generator_compliance_tester").WithPorts(map[string]int{"http": 8080}).Init(e2e.StartOptions{
Image: "alert_generator_compliance_tester:latest",
Command: e2e.NewCommandRunUntilStop(),
})

rFuture := e2ethanos.NewRulerBuilder(e, "1")
ruler := rFuture.WithAlertManagerConfig([]alert.AlertmanagerConfig{
{
EndpointsConfig: httpconfig.EndpointsConfig{
StaticAddresses: []string{compliance.InternalEndpoint("http")},
Scheme: "http",
},
Timeout: amTimeout,
APIVersion: alert.APIv1,
},
}).
// Use default resend delay and eval interval, as the compliance spec requires this.
WithResendDelay("1m").
WithEvalInterval("1m").
InitTSDB(filepath.Join(rFuture.InternalDir(), "rules"), []httpconfig.Config{
{
EndpointsConfig: httpconfig.EndpointsConfig{
StaticAddresses: []string{
querierBuilder.InternalEndpoint("http"),
},
Scheme: "http",
},
},
})

query := querierBuilder.
WithStoreAddresses(receive.InternalEndpoint("grpc")).
WithRuleAddresses(ruler.InternalEndpoint("grpc")).
// We deduplicate by this, since alert compatibility tool requires clean metric without labels
// attached by receivers.
WithReplicaLabels("receive", "tenant_id").
Init()
testutil.Ok(t, e2e.StartAndWaitReady(receive, query, ruler, compliance))

// Pull rules.yaml:
{
var stdout bytes.Buffer
testutil.Ok(t, compliance.Exec(e2e.NewCommand("cat", "/rules.yaml"), e2e.WithExecOptionStdout(&stdout)))
testutil.Ok(t, os.MkdirAll(filepath.Join(ruler.Dir(), "rules"), os.ModePerm))
testutil.Ok(t, os.WriteFile(filepath.Join(ruler.Dir(), "rules", "rules.yaml"), stdout.Bytes(), os.ModePerm))

// Reload ruler.
resp, err := http.Post("http://"+ruler.Endpoint("http")+"/-/reload", "", nil)
testutil.Ok(t, err)
defer func() {
_, _ = io.Copy(io.Discard, resp.Body)
_ = resp.Body.Close()
}()
testutil.Equals(t, http.StatusOK, resp.StatusCode)
}
testutil.Ok(t, ioutil.WriteFile(filepath.Join(compliance.Dir(), "test-thanos.yaml"), []byte(alertCompatConfig(receive, query)), os.ModePerm))

fmt.Println(alertCompatConfig(receive, query))

testutil.Ok(t, compliance.Exec(e2e.NewCommand(
"/alert_generator_compliance_tester", "-config-file", filepath.Join(compliance.InternalDir(), "test-thanos.yaml")),
))
})
}

// nolint (it's still used in skipped test).
func alertCompatConfig(receive e2e.Runnable, query e2e.Runnable) string {
return fmt.Sprintf(`settings:
remote_write_url: '%s'
query_base_url: 'http://%s'
rules_and_alerts_api_base_url: 'http://%s'
alert_reception_server_port: 8080
alert_message_parser: default
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about making this similar to https://github.com/thanos-io/thanos/blob/main/test/e2e/e2ethanos/services.go#L1058?
My preference is that it is more readable, just like reading yaml directory.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind any, happy to change.

`, e2ethanos.RemoteWriteEndpoint(receive.InternalEndpoint("remote-write")), query.InternalEndpoint("http"), query.InternalEndpoint("http"))
}
45 changes: 33 additions & 12 deletions test/e2e/e2ethanos/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ type QuerierBuilder struct {
enableFeatures []string
endpoints []string

replicaLabels []string
tracingConfig string

e2e.Linkable
Expand All @@ -200,6 +201,7 @@ func NewQuerierBuilder(e e2e.Environment, name string, storeAddresses ...string)
name: name,
storeAddresses: storeAddresses,
image: DefaultImage(),
replicaLabels: []string{replicaLabel},
}
}

Expand Down Expand Up @@ -263,6 +265,12 @@ func (q *QuerierBuilder) WithTracingConfig(tracingConfig string) *QuerierBuilder
return q
}

// WithReplicaLabels replaces default [replica] replica label configuration for the querier.
func (q *QuerierBuilder) WithReplicaLabels(labels ...string) *QuerierBuilder {
q.replicaLabels = labels
return q
}

func (q *QuerierBuilder) Init() e2e.InstrumentedRunnable {
args, err := q.collectArgs()
if err != nil {
Expand All @@ -284,40 +292,36 @@ func (q *QuerierBuilder) collectArgs() ([]string, error) {
"--grpc-address": ":9091",
"--grpc-grace-period": "0s",
"--http-address": ":8080",
"--query.replica-label": replicaLabel,
"--store.sd-dns-interval": "5s",
"--log.level": infoLogLevel,
"--query.max-concurrent": "1",
"--store.sd-interval": "5s",
})

for _, repl := range q.replicaLabels {
args = append(args, "--query.replica-label="+repl)
}
for _, addr := range q.storeAddresses {
args = append(args, "--store="+addr)
}

for _, addr := range q.ruleAddresses {
args = append(args, "--rule="+addr)
}

for _, addr := range q.targetAddresses {
args = append(args, "--target="+addr)
}

for _, addr := range q.metadataAddresses {
args = append(args, "--metadata="+addr)
}

for _, addr := range q.exemplarAddresses {
args = append(args, "--exemplar="+addr)
}

for _, feature := range q.enableFeatures {
args = append(args, "--enable-feature="+feature)
}

for _, addr := range q.endpoints {
args = append(args, "--endpoint="+addr)
}

if len(q.fileSDStoreAddresses) > 0 {
if err := os.MkdirAll(q.Dir(), 0750); err != nil {
return nil, errors.Wrap(err, "create query dir failed")
Expand All @@ -339,19 +343,15 @@ func (q *QuerierBuilder) collectArgs() ([]string, error) {

args = append(args, "--store.sd-files="+filepath.Join(q.InternalDir(), "filesd.yaml"))
}

if q.routePrefix != "" {
args = append(args, "--web.route-prefix="+q.routePrefix)
}

if q.externalPrefix != "" {
args = append(args, "--web.external-prefix="+q.externalPrefix)
}

if q.tracingConfig != "" {
args = append(args, "--tracing.config="+q.tracingConfig)
}

return args, nil
}

Expand Down Expand Up @@ -477,6 +477,8 @@ type RulerBuilder struct {
amCfg []alert.AlertmanagerConfig
replicaLabel string
image string
resendDelay string
evalInterval string
}

// NewRulerBuilder is a Ruler future that allows extra configuration before initialization.
Expand Down Expand Up @@ -507,6 +509,16 @@ func (r *RulerBuilder) WithReplicaLabel(replicaLabel string) *RulerBuilder {
return r
}

func (r *RulerBuilder) WithResendDelay(resendDelay string) *RulerBuilder {
r.resendDelay = resendDelay
return r
}

func (r *RulerBuilder) WithEvalInterval(evalInterval string) *RulerBuilder {
r.evalInterval = evalInterval
return r
}

func (r *RulerBuilder) InitTSDB(internalRuleDir string, queryCfg []httpconfig.Config) e2e.InstrumentedRunnable {
return r.initRule(internalRuleDir, queryCfg, nil)
}
Expand Down Expand Up @@ -550,6 +562,15 @@ func (r *RulerBuilder) initRule(internalRuleDir string, queryCfg []httpconfig.Co
if r.replicaLabel != "" {
ruleArgs["--label"] = fmt.Sprintf(`%s="%s"`, replicaLabel, r.replicaLabel)
}

if r.resendDelay != "" {
ruleArgs["--resend-delay"] = r.resendDelay
}

if r.evalInterval != "" {
ruleArgs["--eval-interval"] = r.evalInterval
}

if remoteWriteCfg != nil {
rwCfgBytes, err := yaml.Marshal(struct {
RemoteWriteConfigs []*config.RemoteWriteConfig `yaml:"remote_write,omitempty"`
Expand Down