Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Ruler support for RulesAPI; Refactored Manager. #2562

Merged
merged 1 commit into from
May 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ version: 2
defaults: &defaults
docker:
# Built by Thanos make docker-ci
- image: quay.io/thanos/thanos-ci:go1.14.2-node
- image: quay.io/thanos/thanos-ci:v1.1-go1.14.2-node
jobs:
test:
<<: *defaults
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ We use *breaking* word for marking changes that are not backward compatible (rel

## Unreleased

TODO: Add rules API, also now StoreAPI consistently solves clashes in external labels vs metrics - by choosing external always.

### Fixed

- [#2536](https://github.com/thanos-io/thanos/pull/2536) minio-go: Fixed AWS STS endpoint url to https for Web Identity providers on AWS EKS
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ ME ?= $(shell whoami)
# Referenced by github.com/thanos-io/thanos/blob/master/docs/getting_started.md#prometheus

# Limited prom version, because testing was not possible. This should fix it: https://github.com/thanos-io/thanos/issues/758
PROM_VERSIONS ?= v2.4.3 v2.5.0 v2.8.1 v2.9.2 v2.13.0
PROMS ?= $(GOBIN)/prometheus-v2.4.3 $(GOBIN)/prometheus-v2.5.0 $(GOBIN)/prometheus-v2.8.1 $(GOBIN)/prometheus-v2.9.2 $(GOBIN)/prometheus-v2.13.0
PROM_VERSIONS ?= v2.4.3 v2.5.0 v2.8.1 v2.9.2 v2.13.0 v2.18.1
PROMS ?= $(GOBIN)/prometheus-v2.4.3 $(GOBIN)/prometheus-v2.5.0 $(GOBIN)/prometheus-v2.8.1 $(GOBIN)/prometheus-v2.9.2 $(GOBIN)/prometheus-v2.13.0 $(GOBIN)/prometheus-v2.18.1

ALERTMANAGER_VERSION ?= v0.20.0
ALERTMANAGER ?= $(GOBIN)/alertmanager-$(ALERTMANAGER_VERSION)
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func runQuery(
enablePartialResponse,
queryReplicaLabels,
instantDefaultMaxSourceResolution,
rules.NewRetriever(rulesProxy),
rules.NewGRPCClient(rulesProxy),
)

api.Register(router.WithPrefix("/api/v1"), tracer, logger, ins)
Expand Down
152 changes: 75 additions & 77 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ import (
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/query"
thanosrules "github.com/thanos-io/thanos/pkg/rules"
v1 "github.com/thanos-io/thanos/pkg/rules/api"
rulesmanager "github.com/thanos-io/thanos/pkg/rules/manager"
"github.com/thanos-io/thanos/pkg/runutil"
grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
Expand Down Expand Up @@ -397,13 +397,13 @@ func runRule(
alertmgrs = append(alertmgrs, alert.NewAlertmanager(logger, amClient, time.Duration(cfg.Timeout), cfg.APIVersion))
}

// Run rule evaluation and alert notifications.
var (
ruleMgr *thanosrules.Manager
alertQ = alert.NewQueue(logger, reg, 10000, 100, labelsTSDBToProm(lset), alertExcludeLabels)
ruleMgr = rulesmanager.NewManager(dataDir)
)
{
notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) {
// Run rule evaluation and alert notifications.
notifyFunc := func(ctx context.Context, expr string, alerts ...*rules.Alert) {
res := make([]*alert.Alert, 0, len(alerts))
for _, alrt := range alerts {
// Only send actually firing alerts.
Expand All @@ -425,41 +425,37 @@ func runRule(
}
alertQ.Push(res)
}
st := tsdb.Adapter(db, 0)

opts := rules.ManagerOptions{
NotifyFunc: notify,
Logger: log.With(logger, "component", "rules"),
Appendable: st,
ExternalURL: nil,
TSDB: st,
ResendDelay: resendDelay,
}

// TODO(bwplotka): Hide this behind thanos rules.Manager.
for _, strategy := range storepb.PartialResponseStrategy_value {
s := storepb.PartialResponseStrategy(strategy)

ctx, cancel := context.WithCancel(context.Background())
ctx = tracing.ContextWithTracer(ctx, tracer)
ctx, cancel := context.WithCancel(context.Background())

opts := opts
opts.Registerer = extprom.WrapRegistererWith(prometheus.Labels{"strategy": strings.ToLower(s.String())}, reg)
opts.Context = ctx
opts.QueryFunc = queryFunc(logger, queryClients, metrics.duplicatedQuery, metrics.ruleEvalWarnings, s)
st := tsdb.Adapter(db, 0)
logger = log.With(logger, "component", "rules")
ruleMgr = thanosrules.NewManager(
tracing.ContextWithTracer(ctx, tracer),
reg,
dataDir,
rules.ManagerOptions{
NotifyFunc: notifyFunc,
Logger: logger,
Appendable: st,
ExternalURL: nil,
TSDB: st,
ResendDelay: resendDelay,
},
queryFuncCreator(logger, queryClients, metrics.duplicatedQuery, metrics.ruleEvalWarnings),
lset,
)

mgr := rules.NewManager(&opts)
ruleMgr.SetRuleManager(s, mgr)
g.Add(func() error {
mgr.Run()
<-ctx.Done()
// Schedule rule manager that evaluates rules.
g.Add(func() error {
ruleMgr.Run()
<-ctx.Done()

return nil
}, func(error) {
cancel()
mgr.Stop()
})
}
return nil
}, func(err error) {
cancel()
ruleMgr.Stop()
})
}
// Run the alert sender.
{
Expand Down Expand Up @@ -533,7 +529,7 @@ func runRule(
}

// TODO: Add rules API implementation when ready.
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, store, nil,
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, store, ruleMgr,
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down Expand Up @@ -575,7 +571,7 @@ func runRule(
// TODO(bplotka in PR #513 review): pass all flags, not only the flags needed by prefix rewriting.
ui.NewRuleUI(logger, reg, ruleMgr, alertQueryURL.String(), webExternalPrefix, webPrefixHeaderName).Register(router, ins)

api := v1.NewAPI(logger, reg, ruleMgr)
api := v1.NewAPI(logger, reg, thanosrules.NewGRPCClient(ruleMgr), ruleMgr)
api.Register(router.WithPrefix("/api/v1"), tracer, logger, ins)

srv := httpserver.New(logger, reg, comp, httpProbe,
Expand Down Expand Up @@ -684,57 +680,59 @@ func removeDuplicateQueryEndpoints(logger log.Logger, duplicatedQueriers prometh
return deduplicated
}

// queryFunc returns query function that hits the HTTP query API of query peers in randomized order until we get a result
// back or the context get canceled.
func queryFunc(
func queryFuncCreator(
logger log.Logger,
queriers []*http_util.Client,
duplicatedQuery prometheus.Counter,
ruleEvalWarnings *prometheus.CounterVec,
partialResponseStrategy storepb.PartialResponseStrategy,
) rules.QueryFunc {
var spanID string

switch partialResponseStrategy {
case storepb.PartialResponseStrategy_WARN:
spanID = "/rule_instant_query HTTP[client]"
case storepb.PartialResponseStrategy_ABORT:
spanID = "/rule_instant_query_part_resp_abort HTTP[client]"
default:
// Programming error will be caught by tests.
panic(errors.Errorf("unknown partial response strategy %v", partialResponseStrategy).Error())
}
) func(partialResponseStrategy storepb.PartialResponseStrategy) rules.QueryFunc {

// queryFunc returns query function that hits the HTTP query API of query peers in randomized order until we get a result
// back or the context get canceled.
return func(partialResponseStrategy storepb.PartialResponseStrategy) rules.QueryFunc {
var spanID string

switch partialResponseStrategy {
case storepb.PartialResponseStrategy_WARN:
spanID = "/rule_instant_query HTTP[client]"
case storepb.PartialResponseStrategy_ABORT:
spanID = "/rule_instant_query_part_resp_abort HTTP[client]"
default:
// Programming error will be caught by tests.
panic(errors.Errorf("unknown partial response strategy %v", partialResponseStrategy).Error())
}

promClients := make([]*promclient.Client, 0, len(queriers))
for _, q := range queriers {
promClients = append(promClients, promclient.NewClient(q, logger, "thanos-rule"))
}
promClients := make([]*promclient.Client, 0, len(queriers))
for _, q := range queriers {
promClients = append(promClients, promclient.NewClient(q, logger, "thanos-rule"))
}

return func(ctx context.Context, q string, t time.Time) (v promql.Vector, err error) {
for _, i := range rand.Perm(len(queriers)) {
promClient := promClients[i]
endpoints := removeDuplicateQueryEndpoints(logger, duplicatedQuery, queriers[i].Endpoints())
for _, i := range rand.Perm(len(endpoints)) {
var warns []string
tracing.DoInSpan(ctx, spanID, func(ctx context.Context) {
v, warns, err = promClient.PromqlQueryInstant(ctx, endpoints[i], q, t, promclient.QueryOptions{
return func(ctx context.Context, q string, t time.Time) (promql.Vector, error) {
for _, i := range rand.Perm(len(queriers)) {
promClient := promClients[i]
endpoints := removeDuplicateQueryEndpoints(logger, duplicatedQuery, queriers[i].Endpoints())
for _, i := range rand.Perm(len(endpoints)) {
span, ctx := tracing.StartSpan(ctx, spanID)
v, warns, err := promClient.PromqlQueryInstant(ctx, endpoints[i], q, t, promclient.QueryOptions{
Deduplicate: true,
PartialResponseStrategy: partialResponseStrategy,
})
})
if err != nil {
level.Error(logger).Log("err", err, "query", q)
continue
}
if len(warns) > 0 {
ruleEvalWarnings.WithLabelValues(strings.ToLower(partialResponseStrategy.String())).Inc()
// TODO(bwplotka): Propagate those to UI, probably requires changing rule manager code ):
level.Warn(logger).Log("warnings", strings.Join(warns, ", "), "query", q)
span.Finish()

if err != nil {
level.Error(logger).Log("err", err, "query", q)
continue
}
if len(warns) > 0 {
ruleEvalWarnings.WithLabelValues(strings.ToLower(partialResponseStrategy.String())).Inc()
// TODO(bwplotka): Propagate those to UI, probably requires changing rule manager code ):
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
level.Warn(logger).Log("warnings", strings.Join(warns, ", "), "query", q)
}
return v, nil
}
return v, nil
}
return nil, errors.Errorf("no query API server reachable")
}
return nil, errors.New("no query API server reachable")
}
}

Expand All @@ -759,7 +757,7 @@ func addDiscoveryGroups(g *run.Group, c *http_util.Client, interval time.Duratio

func reloadRules(logger log.Logger,
ruleFiles []string,
ruleMgr *rulesmanager.Manager,
ruleMgr *thanosrules.Manager,
evalInterval time.Duration,
metrics *RuleMetrics) error {
level.Debug(logger).Log("msg", "configured rule files", "files", strings.Join(ruleFiles, ","))
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func runSidecar(
return errors.Wrap(err, "setup gRPC server")
}

s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, promStore, rules.NewPrometheus(conf.prometheus.url, c),
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, promStore, rules.NewPrometheus(conf.prometheus.url, c, m.Labels),
grpcserver.WithListen(conf.grpc.bindAddress),
grpcserver.WithGracePeriod(time.Duration(conf.grpc.gracePeriod)),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
4 changes: 4 additions & 0 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,5 +628,9 @@ func (c *Client) RulesInGRPC(ctx context.Context, base *url.URL, typeRules strin
return nil, err
}

// Prometheus does not support PartialResponseStrategy, and probably would never do. Make it Abort by default.
for _, g := range m.Data.Groups {
g.PartialResponseStrategy = storepb.PartialResponseStrategy_ABORT
}
return m.Data.Groups, nil
}
Loading