Skip to content

Commit

Permalink
Added Ruler support for RulesAPI; Refactored Manager.
Browse files Browse the repository at this point in the history
  • Loading branch information
bwplotka committed May 7, 2020
1 parent 7ae6595 commit c72d4f7
Show file tree
Hide file tree
Showing 30 changed files with 1,081 additions and 1,111 deletions.
6 changes: 3 additions & 3 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ jobs:
test:
docker:
# Build by Thanos make docker-ci
- image: quay.io/thanos/thanos-ci:v0.3.0
- image: quay.io/thanos/thanos-ci:v1.1-go1.14.2-node
working_directory: /go/src/github.com/thanos-io/thanos
environment:
GO111MODULE: 'on'
Expand Down Expand Up @@ -59,7 +59,7 @@ jobs:
publish_master:
docker:
# Build by Thanos make docker-ci
- image: quay.io/thanos/thanos-ci:v0.2.0
- image: quay.io/thanos/thanos-ci:v1.1-go1.14.2-node
working_directory: /go/src/github.com/thanos-io/thanos
steps:
- checkout
Expand All @@ -79,7 +79,7 @@ jobs:
publish_release:
docker:
# Build by Thanos make docker-ci
- image: quay.io/thanos/thanos-ci:v0.2.0
- image: quay.io/thanos/thanos-ci:v1.1-go1.14.2-node
working_directory: /go/src/github.com/thanos-io/thanos
environment:
GOBIN: "/go/bin"
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ We use *breaking* word for marking changes that are not backward compatible (rel

## Unreleased

TODO: Add rules API, also now StoreAPI consistently solves clashes in external labels vs metrics - by choosing external always.

### Fixed

- [#2033](https://github.com/thanos-io/thanos/pull/2033) minio-go: Fixed Issue #1494 support Web Identity providers for IAM credentials for AWS EKS
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ ME ?= $(shell whoami)
# Referenced by github.com/thanos-io/thanos/blob/master/docs/getting_started.md#prometheus

# Limited prom version, because testing was not possible. This should fix it: https://github.com/thanos-io/thanos/issues/758
PROM_VERSIONS ?= v2.4.3 v2.5.0 v2.8.1 v2.9.2 v2.13.0
PROMS ?= $(GOBIN)/prometheus-v2.4.3 $(GOBIN)/prometheus-v2.5.0 $(GOBIN)/prometheus-v2.8.1 $(GOBIN)/prometheus-v2.9.2 $(GOBIN)/prometheus-v2.13.0
PROM_VERSIONS ?= v2.4.3 v2.5.0 v2.8.1 v2.9.2 v2.13.0 v2.18.0
PROMS ?= $(GOBIN)/prometheus-v2.4.3 $(GOBIN)/prometheus-v2.5.0 $(GOBIN)/prometheus-v2.8.1 $(GOBIN)/prometheus-v2.9.2 $(GOBIN)/prometheus-v2.13.0 $(GOBIN)/prometheus-v2.18.0

ALERTMANAGER_VERSION ?= v0.20.0
ALERTMANAGER ?= $(GOBIN)/alertmanager-$(ALERTMANAGER_VERSION)
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ func runQuery(
enablePartialResponse,
queryReplicaLabels,
instantDefaultMaxSourceResolution,
rules.NewRetriever(rulesProxy),
rules.NewGRPCClient(rulesProxy),
)

api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger, ins)
Expand Down
157 changes: 78 additions & 79 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ import (
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/query"
thanosrules "github.com/thanos-io/thanos/pkg/rules"
v1 "github.com/thanos-io/thanos/pkg/rules/api"
thanosmanager "github.com/thanos-io/thanos/pkg/rules/manager"
"github.com/thanos-io/thanos/pkg/runutil"
grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
Expand Down Expand Up @@ -386,13 +386,13 @@ func runRule(
alertmgrs = append(alertmgrs, alert.NewAlertmanager(logger, amClient, time.Duration(cfg.Timeout), cfg.APIVersion))
}

// Run rule evaluation and alert notifications.
var (
ruleMgr *thanosrules.Manager
alertQ = alert.NewQueue(logger, reg, 10000, 100, labelsTSDBToProm(lset), alertExcludeLabels)
ruleMgr = thanosmanager.NewManager(dataDir)
)
{
notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) {
// Run rule evaluation and alert notifications.
notifyFunc := func(ctx context.Context, expr string, alerts ...*rules.Alert) {
res := make([]*alert.Alert, 0, len(alerts))
for _, alrt := range alerts {
// Only send actually firing alerts.
Expand All @@ -414,41 +414,38 @@ func runRule(
}
alertQ.Push(res)
}
st := tsdb.Adapter(db, 0)

opts := rules.ManagerOptions{
NotifyFunc: notify,
Logger: log.With(logger, "component", "rules"),
Appendable: st,
ExternalURL: nil,
TSDB: st,
ResendDelay: resendDelay,
}

// TODO(bwplotka): Hide this behind thanos rules.Manager.
for _, strategy := range storepb.PartialResponseStrategy_value {
s := storepb.PartialResponseStrategy(strategy)

ctx, cancel := context.WithCancel(context.Background())
ctx = tracing.ContextWithTracer(ctx, tracer)
ctx, cancel := context.WithCancel(context.Background())

opts := opts
opts.Registerer = extprom.WrapRegistererWith(prometheus.Labels{"strategy": strings.ToLower(s.String())}, reg)
opts.Context = ctx
opts.QueryFunc = queryFunc(logger, queryClients, duplicatedQuery, ruleEvalWarnings, s)
st := tsdb.Adapter(db, 0)
logger = log.With(logger, "component", "rules")
ruleMgr = thanosrules.NewManager(
tracing.ContextWithTracer(ctx, tracer),
reg,
dataDir,
rules.ManagerOptions{
NotifyFunc: notifyFunc,
Logger: logger,
Appendable: st,
ExternalURL: nil,
TSDB: st,
ResendDelay: resendDelay,
},
queryFuncCreator(logger, queryClients, duplicatedQuery, ruleEvalWarnings),
lset,
)

mgr := rules.NewManager(&opts)
ruleMgr.SetRuleManager(s, mgr)
g.Add(func() error {
mgr.Run()
<-ctx.Done()
// Schedule rule manager that evaluates rules.
g.Add(func() error {
ruleMgr.Run()
<-ctx.Done()

return nil
}, func(error) {
cancel()
mgr.Stop()
})
}
return nil
}, func(err error) {
cancel()
ruleMgr.Stop()
})
ruleMgr.Run()
}
// Run the alert sender.
{
Expand Down Expand Up @@ -539,7 +536,7 @@ func runRule(
}

// TODO: Add rules API implementation when ready.
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, store, nil,
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, store, ruleMgr,
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down Expand Up @@ -578,7 +575,7 @@ func runRule(

ui.NewRuleUI(logger, reg, ruleMgr, alertQueryURL.String(), flagsMap).Register(router.WithPrefix(webRoutePrefix), ins)

api := v1.NewAPI(logger, reg, ruleMgr)
api := v1.NewAPI(logger, reg, thanosrules.NewGRPCClient(ruleMgr), ruleMgr)
api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger, ins)

srv := httpserver.New(logger, reg, comp, httpProbe,
Expand Down Expand Up @@ -687,57 +684,59 @@ func removeDuplicateQueryEndpoints(logger log.Logger, duplicatedQueriers prometh
return deduplicated
}

// queryFunc returns query function that hits the HTTP query API of query peers in randomized order until we get a result
// back or the context get canceled.
func queryFunc(
func queryFuncCreator(
logger log.Logger,
queriers []*http_util.Client,
duplicatedQuery prometheus.Counter,
ruleEvalWarnings *prometheus.CounterVec,
partialResponseStrategy storepb.PartialResponseStrategy,
) rules.QueryFunc {
var spanID string

switch partialResponseStrategy {
case storepb.PartialResponseStrategy_WARN:
spanID = "/rule_instant_query HTTP[client]"
case storepb.PartialResponseStrategy_ABORT:
spanID = "/rule_instant_query_part_resp_abort HTTP[client]"
default:
// Programming error will be caught by tests.
panic(errors.Errorf("unknown partial response strategy %v", partialResponseStrategy).Error())
}
) func(partialResponseStrategy storepb.PartialResponseStrategy) rules.QueryFunc {

// queryFunc returns query function that hits the HTTP query API of query peers in randomized order until we get a result
// back or the context get canceled.
return func(partialResponseStrategy storepb.PartialResponseStrategy) rules.QueryFunc {
var spanID string

switch partialResponseStrategy {
case storepb.PartialResponseStrategy_WARN:
spanID = "/rule_instant_query HTTP[client]"
case storepb.PartialResponseStrategy_ABORT:
spanID = "/rule_instant_query_part_resp_abort HTTP[client]"
default:
// Programming error will be caught by tests.
panic(errors.Errorf("unknown partial response strategy %v", partialResponseStrategy).Error())
}

promClients := make([]*promclient.Client, 0, len(queriers))
for _, q := range queriers {
promClients = append(promClients, promclient.NewClient(q, logger, "thanos-rule"))
}
promClients := make([]*promclient.Client, 0, len(queriers))
for _, q := range queriers {
promClients = append(promClients, promclient.NewClient(q, logger, "thanos-rule"))
}

return func(ctx context.Context, q string, t time.Time) (promql.Vector, error) {
for _, i := range rand.Perm(len(queriers)) {
promClient := promClients[i]
endpoints := removeDuplicateQueryEndpoints(logger, duplicatedQuery, queriers[i].Endpoints())
for _, i := range rand.Perm(len(endpoints)) {
span, ctx := tracing.StartSpan(ctx, spanID)
v, warns, err := promClient.PromqlQueryInstant(ctx, endpoints[i], q, t, promclient.QueryOptions{
Deduplicate: true,
PartialResponseStrategy: partialResponseStrategy,
})
span.Finish()

if err != nil {
level.Error(logger).Log("err", err, "query", q)
continue
}
if len(warns) > 0 {
ruleEvalWarnings.WithLabelValues(strings.ToLower(partialResponseStrategy.String())).Inc()
// TODO(bwplotka): Propagate those to UI, probably requires changing rule manager code ):
level.Warn(logger).Log("warnings", strings.Join(warns, ", "), "query", q)
return func(ctx context.Context, q string, t time.Time) (promql.Vector, error) {
for _, i := range rand.Perm(len(queriers)) {
promClient := promClients[i]
endpoints := removeDuplicateQueryEndpoints(logger, duplicatedQuery, queriers[i].Endpoints())
for _, i := range rand.Perm(len(endpoints)) {
span, ctx := tracing.StartSpan(ctx, spanID)
v, warns, err := promClient.PromqlQueryInstant(ctx, endpoints[i], q, t, promclient.QueryOptions{
Deduplicate: true,
PartialResponseStrategy: partialResponseStrategy,
})
span.Finish()

if err != nil {
level.Error(logger).Log("err", err, "query", q)
continue
}
if len(warns) > 0 {
ruleEvalWarnings.WithLabelValues(strings.ToLower(partialResponseStrategy.String())).Inc()
// TODO(bwplotka): Propagate those to UI, probably requires changing rule manager code ):
level.Warn(logger).Log("warnings", strings.Join(warns, ", "), "query", q)
}
return v, nil
}
return v, nil
}
return nil, errors.Errorf("no query API server reachable")
}
return nil, errors.Errorf("no query API server reachable")
}
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func runSidecar(
return errors.Wrap(err, "setup gRPC server")
}

s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, promStore, rules.NewPrometheus(promURL, c),
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, promStore, rules.NewPrometheus(promURL, c, m.Labels),
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
4 changes: 4 additions & 0 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,5 +631,9 @@ func (c *Client) RulesInGRPC(ctx context.Context, base *url.URL, typeRules strin
return nil, err
}

// Prometheus does not support PartialResponseStrategy, and probably would never do. Make it Abort by default.
for _, g := range m.Data.Groups {
g.PartialResponseStrategy = storepb.PartialResponseStrategy_ABORT
}
return m.Data.Groups, nil
}
Loading

0 comments on commit c72d4f7

Please sign in to comment.