Skip to content

Commit

Permalink
Added Ruler support for RulesAPI; Refactored Manager. (#2562)
Browse files Browse the repository at this point in the history
  • Loading branch information
bwplotka committed May 22, 2020
1 parent 5f6a683 commit 87bcc73
Show file tree
Hide file tree
Showing 32 changed files with 1,504 additions and 1,343 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ version: 2
defaults: &defaults
docker:
# Built by Thanos make docker-ci
- image: quay.io/thanos/thanos-ci:go1.14.2-node
- image: quay.io/thanos/thanos-ci:v1.1-go1.14.2-node
jobs:
test:
<<: *defaults
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ We use *breaking* word for marking changes that are not backward compatible (rel

## Unreleased

TODO: Add rules API, also now StoreAPI consistently solves clashes in external labels vs metrics - by choosing external always.

### Fixed

* [#2637](https://github.com/thanos-io/thanos/pull/2637) Compact: detect retryable errors that are inside of a wrapped `tsdb.MultiError`
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ ME ?= $(shell whoami)
# Referenced by github.com/thanos-io/thanos/blob/master/docs/getting_started.md#prometheus

# Limited prom version, because testing was not possible. This should fix it: https://github.com/thanos-io/thanos/issues/758
PROM_VERSIONS ?= v2.4.3 v2.5.0 v2.8.1 v2.9.2 v2.13.0
PROMS ?= $(GOBIN)/prometheus-v2.4.3 $(GOBIN)/prometheus-v2.5.0 $(GOBIN)/prometheus-v2.8.1 $(GOBIN)/prometheus-v2.9.2 $(GOBIN)/prometheus-v2.13.0
PROM_VERSIONS ?= v2.4.3 v2.5.0 v2.8.1 v2.9.2 v2.13.0 v2.18.1
PROMS ?= $(GOBIN)/prometheus-v2.4.3 $(GOBIN)/prometheus-v2.5.0 $(GOBIN)/prometheus-v2.8.1 $(GOBIN)/prometheus-v2.9.2 $(GOBIN)/prometheus-v2.13.0 $(GOBIN)/prometheus-v2.18.1

ALERTMANAGER_VERSION ?= v0.20.0
ALERTMANAGER ?= $(GOBIN)/alertmanager-$(ALERTMANAGER_VERSION)
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func runQuery(
enablePartialResponse,
queryReplicaLabels,
instantDefaultMaxSourceResolution,
rules.NewRetriever(rulesProxy),
rules.NewGRPCClient(rulesProxy),
)

api.Register(router.WithPrefix("/api/v1"), tracer, logger, ins)
Expand Down
152 changes: 74 additions & 78 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ import (
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/query"
thanosrules "github.com/thanos-io/thanos/pkg/rules"
v1 "github.com/thanos-io/thanos/pkg/rules/api"
rulesmanager "github.com/thanos-io/thanos/pkg/rules/manager"
"github.com/thanos-io/thanos/pkg/runutil"
grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
Expand Down Expand Up @@ -397,13 +397,13 @@ func runRule(
alertmgrs = append(alertmgrs, alert.NewAlertmanager(logger, amClient, time.Duration(cfg.Timeout), cfg.APIVersion))
}

// Run rule evaluation and alert notifications.
var (
ruleMgr *thanosrules.Manager
alertQ = alert.NewQueue(logger, reg, 10000, 100, labelsTSDBToProm(lset), alertExcludeLabels)
ruleMgr = rulesmanager.NewManager(dataDir)
)
{
notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) {
// Run rule evaluation and alert notifications.
notifyFunc := func(ctx context.Context, expr string, alerts ...*rules.Alert) {
res := make([]*alert.Alert, 0, len(alerts))
for _, alrt := range alerts {
// Only send actually firing alerts.
Expand All @@ -425,41 +425,35 @@ func runRule(
}
alertQ.Push(res)
}
st := db

opts := rules.ManagerOptions{
NotifyFunc: notify,
Logger: log.With(logger, "component", "rules"),
Appendable: st,
ExternalURL: nil,
TSDB: st,
ResendDelay: resendDelay,
}

// TODO(bwplotka): Hide this behind thanos rules.Manager.
for _, strategy := range storepb.PartialResponseStrategy_value {
s := storepb.PartialResponseStrategy(strategy)

ctx, cancel := context.WithCancel(context.Background())
ctx = tracing.ContextWithTracer(ctx, tracer)

opts := opts
opts.Registerer = extprom.WrapRegistererWith(prometheus.Labels{"strategy": strings.ToLower(s.String())}, reg)
opts.Context = ctx
opts.QueryFunc = queryFunc(logger, queryClients, metrics.duplicatedQuery, metrics.ruleEvalWarnings, s)
ctx, cancel := context.WithCancel(context.Background())
logger = log.With(logger, "component", "rules")
ruleMgr = thanosrules.NewManager(
tracing.ContextWithTracer(ctx, tracer),
reg,
dataDir,
rules.ManagerOptions{
NotifyFunc: notifyFunc,
Logger: logger,
Appendable: db,
ExternalURL: nil,
TSDB: db,
ResendDelay: resendDelay,
},
queryFuncCreator(logger, queryClients, metrics.duplicatedQuery, metrics.ruleEvalWarnings),
lset,
)

mgr := rules.NewManager(&opts)
ruleMgr.SetRuleManager(s, mgr)
g.Add(func() error {
mgr.Run()
<-ctx.Done()
// Schedule rule manager that evaluates rules.
g.Add(func() error {
ruleMgr.Run()
<-ctx.Done()

return nil
}, func(error) {
cancel()
mgr.Stop()
})
}
return nil
}, func(err error) {
cancel()
ruleMgr.Stop()
})
}
// Run the alert sender.
{
Expand Down Expand Up @@ -533,7 +527,7 @@ func runRule(
}

// TODO: Add rules API implementation when ready.
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, store, nil,
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, store, ruleMgr,
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down Expand Up @@ -575,7 +569,7 @@ func runRule(
// TODO(bplotka in PR #513 review): pass all flags, not only the flags needed by prefix rewriting.
ui.NewRuleUI(logger, reg, ruleMgr, alertQueryURL.String(), webExternalPrefix, webPrefixHeaderName).Register(router, ins)

api := v1.NewAPI(logger, reg, ruleMgr)
api := v1.NewAPI(logger, reg, thanosrules.NewGRPCClient(ruleMgr), ruleMgr)
api.Register(router.WithPrefix("/api/v1"), tracer, logger, ins)

srv := httpserver.New(logger, reg, comp, httpProbe,
Expand Down Expand Up @@ -684,57 +678,59 @@ func removeDuplicateQueryEndpoints(logger log.Logger, duplicatedQueriers prometh
return deduplicated
}

// queryFunc returns query function that hits the HTTP query API of query peers in randomized order until we get a result
// back or the context get canceled.
func queryFunc(
func queryFuncCreator(
logger log.Logger,
queriers []*http_util.Client,
duplicatedQuery prometheus.Counter,
ruleEvalWarnings *prometheus.CounterVec,
partialResponseStrategy storepb.PartialResponseStrategy,
) rules.QueryFunc {
var spanID string

switch partialResponseStrategy {
case storepb.PartialResponseStrategy_WARN:
spanID = "/rule_instant_query HTTP[client]"
case storepb.PartialResponseStrategy_ABORT:
spanID = "/rule_instant_query_part_resp_abort HTTP[client]"
default:
// Programming error will be caught by tests.
panic(errors.Errorf("unknown partial response strategy %v", partialResponseStrategy).Error())
}
) func(partialResponseStrategy storepb.PartialResponseStrategy) rules.QueryFunc {

// queryFunc returns query function that hits the HTTP query API of query peers in randomized order until we get a result
// back or the context get canceled.
return func(partialResponseStrategy storepb.PartialResponseStrategy) rules.QueryFunc {
var spanID string

switch partialResponseStrategy {
case storepb.PartialResponseStrategy_WARN:
spanID = "/rule_instant_query HTTP[client]"
case storepb.PartialResponseStrategy_ABORT:
spanID = "/rule_instant_query_part_resp_abort HTTP[client]"
default:
// Programming error will be caught by tests.
panic(errors.Errorf("unknown partial response strategy %v", partialResponseStrategy).Error())
}

promClients := make([]*promclient.Client, 0, len(queriers))
for _, q := range queriers {
promClients = append(promClients, promclient.NewClient(q, logger, "thanos-rule"))
}
promClients := make([]*promclient.Client, 0, len(queriers))
for _, q := range queriers {
promClients = append(promClients, promclient.NewClient(q, logger, "thanos-rule"))
}

return func(ctx context.Context, q string, t time.Time) (v promql.Vector, err error) {
for _, i := range rand.Perm(len(queriers)) {
promClient := promClients[i]
endpoints := removeDuplicateQueryEndpoints(logger, duplicatedQuery, queriers[i].Endpoints())
for _, i := range rand.Perm(len(endpoints)) {
var warns []string
tracing.DoInSpan(ctx, spanID, func(ctx context.Context) {
v, warns, err = promClient.PromqlQueryInstant(ctx, endpoints[i], q, t, promclient.QueryOptions{
return func(ctx context.Context, q string, t time.Time) (promql.Vector, error) {
for _, i := range rand.Perm(len(queriers)) {
promClient := promClients[i]
endpoints := removeDuplicateQueryEndpoints(logger, duplicatedQuery, queriers[i].Endpoints())
for _, i := range rand.Perm(len(endpoints)) {
span, ctx := tracing.StartSpan(ctx, spanID)
v, warns, err := promClient.PromqlQueryInstant(ctx, endpoints[i], q, t, promclient.QueryOptions{
Deduplicate: true,
PartialResponseStrategy: partialResponseStrategy,
})
})
if err != nil {
level.Error(logger).Log("err", err, "query", q)
continue
}
if len(warns) > 0 {
ruleEvalWarnings.WithLabelValues(strings.ToLower(partialResponseStrategy.String())).Inc()
// TODO(bwplotka): Propagate those to UI, probably requires changing rule manager code ):
level.Warn(logger).Log("warnings", strings.Join(warns, ", "), "query", q)
span.Finish()

if err != nil {
level.Error(logger).Log("err", err, "query", q)
continue
}
if len(warns) > 0 {
ruleEvalWarnings.WithLabelValues(strings.ToLower(partialResponseStrategy.String())).Inc()
// TODO(bwplotka): Propagate those to UI, probably requires changing rule manager code ):
level.Warn(logger).Log("warnings", strings.Join(warns, ", "), "query", q)
}
return v, nil
}
return v, nil
}
return nil, errors.Errorf("no query API server reachable")
}
return nil, errors.New("no query API server reachable")
}
}

Expand All @@ -758,7 +754,7 @@ func addDiscoveryGroups(g *run.Group, c *http_util.Client, interval time.Duratio

func reloadRules(logger log.Logger,
ruleFiles []string,
ruleMgr *rulesmanager.Manager,
ruleMgr *thanosrules.Manager,
evalInterval time.Duration,
metrics *RuleMetrics) error {
level.Debug(logger).Log("msg", "configured rule files", "files", strings.Join(ruleFiles, ","))
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func runSidecar(
return errors.Wrap(err, "setup gRPC server")
}

s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, promStore, rules.NewPrometheus(conf.prometheus.url, c),
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, promStore, rules.NewPrometheus(conf.prometheus.url, c, m.Labels),
grpcserver.WithListen(conf.grpc.bindAddress),
grpcserver.WithGracePeriod(time.Duration(conf.grpc.gracePeriod)),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
4 changes: 4 additions & 0 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,5 +629,9 @@ func (c *Client) RulesInGRPC(ctx context.Context, base *url.URL, typeRules strin
return nil, err
}

// Prometheus does not support PartialResponseStrategy, and probably would never do. Make it Abort by default.
for _, g := range m.Data.Groups {
g.PartialResponseStrategy = storepb.PartialResponseStrategy_ABORT
}
return m.Data.Groups, nil
}
Loading

0 comments on commit 87bcc73

Please sign in to comment.