Skip to content

Commit

Permalink
Added Ruler support for RulesAPI; Refactored Manager.
Browse files Browse the repository at this point in the history
  • Loading branch information
bwplotka committed May 6, 2020
1 parent 7ae6595 commit f891f6a
Show file tree
Hide file tree
Showing 15 changed files with 752 additions and 661 deletions.
2 changes: 1 addition & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ func runQuery(
enablePartialResponse,
queryReplicaLabels,
instantDefaultMaxSourceResolution,
rules.NewRetriever(rulesProxy),
rules.NewGRPCClient(rulesProxy),
)

api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger, ins)
Expand Down
156 changes: 77 additions & 79 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ import (
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/query"
thanosrules "github.com/thanos-io/thanos/pkg/rules"
v1 "github.com/thanos-io/thanos/pkg/rules/api"
thanosmanager "github.com/thanos-io/thanos/pkg/rules/manager"
"github.com/thanos-io/thanos/pkg/runutil"
grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
Expand Down Expand Up @@ -386,13 +386,13 @@ func runRule(
alertmgrs = append(alertmgrs, alert.NewAlertmanager(logger, amClient, time.Duration(cfg.Timeout), cfg.APIVersion))
}

// Run rule evaluation and alert notifications.
var (
ruleMgr *thanosrules.Manager
alertQ = alert.NewQueue(logger, reg, 10000, 100, labelsTSDBToProm(lset), alertExcludeLabels)
ruleMgr = thanosmanager.NewManager(dataDir)
)
{
notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) {
// Run rule evaluation and alert notifications.
notifyFunc := func(ctx context.Context, expr string, alerts ...*rules.Alert) {
res := make([]*alert.Alert, 0, len(alerts))
for _, alrt := range alerts {
// Only send actually firing alerts.
Expand All @@ -414,41 +414,37 @@ func runRule(
}
alertQ.Push(res)
}
st := tsdb.Adapter(db, 0)

opts := rules.ManagerOptions{
NotifyFunc: notify,
Logger: log.With(logger, "component", "rules"),
Appendable: st,
ExternalURL: nil,
TSDB: st,
ResendDelay: resendDelay,
}

// TODO(bwplotka): Hide this behind thanos rules.Manager.
for _, strategy := range storepb.PartialResponseStrategy_value {
s := storepb.PartialResponseStrategy(strategy)

ctx, cancel := context.WithCancel(context.Background())
ctx = tracing.ContextWithTracer(ctx, tracer)
ctx, cancel := context.WithCancel(context.Background())

opts := opts
opts.Registerer = extprom.WrapRegistererWith(prometheus.Labels{"strategy": strings.ToLower(s.String())}, reg)
opts.Context = ctx
opts.QueryFunc = queryFunc(logger, queryClients, duplicatedQuery, ruleEvalWarnings, s)
st := tsdb.Adapter(db, 0)
logger = log.With(logger, "component", "rules")
ruleMgr = thanosrules.NewManager(
tracing.ContextWithTracer(ctx, tracer),
reg,
dataDir,
rules.ManagerOptions{
NotifyFunc: notifyFunc,
Logger: logger,
Appendable: st,
ExternalURL: nil,
TSDB: st,
ResendDelay: resendDelay,
},
queryFuncCreator(logger, queryClients, duplicatedQuery, ruleEvalWarnings),
)

mgr := rules.NewManager(&opts)
ruleMgr.SetRuleManager(s, mgr)
g.Add(func() error {
mgr.Run()
<-ctx.Done()
// Schedule rule manager that evaluates rules.
g.Add(func() error {
ruleMgr.Run()
<-ctx.Done()

return nil
}, func(error) {
cancel()
mgr.Stop()
})
}
return nil
}, func(err error) {
cancel()
ruleMgr.Stop()
})
ruleMgr.Run()
}
// Run the alert sender.
{
Expand Down Expand Up @@ -539,7 +535,7 @@ func runRule(
}

// TODO: Add rules API implementation when ready.
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, store, nil,
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, store, ruleMgr,
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down Expand Up @@ -578,7 +574,7 @@ func runRule(

ui.NewRuleUI(logger, reg, ruleMgr, alertQueryURL.String(), flagsMap).Register(router.WithPrefix(webRoutePrefix), ins)

api := v1.NewAPI(logger, reg, ruleMgr)
api := v1.NewAPI(logger, reg, thanosrules.NewGRPCClient(ruleMgr), ruleMgr)
api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger, ins)

srv := httpserver.New(logger, reg, comp, httpProbe,
Expand Down Expand Up @@ -687,57 +683,59 @@ func removeDuplicateQueryEndpoints(logger log.Logger, duplicatedQueriers prometh
return deduplicated
}

// queryFunc returns query function that hits the HTTP query API of query peers in randomized order until we get a result
// back or the context get canceled.
func queryFunc(
func queryFuncCreator(
logger log.Logger,
queriers []*http_util.Client,
duplicatedQuery prometheus.Counter,
ruleEvalWarnings *prometheus.CounterVec,
partialResponseStrategy storepb.PartialResponseStrategy,
) rules.QueryFunc {
var spanID string

switch partialResponseStrategy {
case storepb.PartialResponseStrategy_WARN:
spanID = "/rule_instant_query HTTP[client]"
case storepb.PartialResponseStrategy_ABORT:
spanID = "/rule_instant_query_part_resp_abort HTTP[client]"
default:
// Programming error will be caught by tests.
panic(errors.Errorf("unknown partial response strategy %v", partialResponseStrategy).Error())
}
) func(partialResponseStrategy storepb.PartialResponseStrategy) rules.QueryFunc {

// queryFunc returns query function that hits the HTTP query API of query peers in randomized order until we get a result
// back or the context get canceled.
return func(partialResponseStrategy storepb.PartialResponseStrategy) rules.QueryFunc {
var spanID string

switch partialResponseStrategy {
case storepb.PartialResponseStrategy_WARN:
spanID = "/rule_instant_query HTTP[client]"
case storepb.PartialResponseStrategy_ABORT:
spanID = "/rule_instant_query_part_resp_abort HTTP[client]"
default:
// Programming error will be caught by tests.
panic(errors.Errorf("unknown partial response strategy %v", partialResponseStrategy).Error())
}

promClients := make([]*promclient.Client, 0, len(queriers))
for _, q := range queriers {
promClients = append(promClients, promclient.NewClient(q, logger, "thanos-rule"))
}
promClients := make([]*promclient.Client, 0, len(queriers))
for _, q := range queriers {
promClients = append(promClients, promclient.NewClient(q, logger, "thanos-rule"))
}

return func(ctx context.Context, q string, t time.Time) (promql.Vector, error) {
for _, i := range rand.Perm(len(queriers)) {
promClient := promClients[i]
endpoints := removeDuplicateQueryEndpoints(logger, duplicatedQuery, queriers[i].Endpoints())
for _, i := range rand.Perm(len(endpoints)) {
span, ctx := tracing.StartSpan(ctx, spanID)
v, warns, err := promClient.PromqlQueryInstant(ctx, endpoints[i], q, t, promclient.QueryOptions{
Deduplicate: true,
PartialResponseStrategy: partialResponseStrategy,
})
span.Finish()

if err != nil {
level.Error(logger).Log("err", err, "query", q)
continue
}
if len(warns) > 0 {
ruleEvalWarnings.WithLabelValues(strings.ToLower(partialResponseStrategy.String())).Inc()
// TODO(bwplotka): Propagate those to UI, probably requires changing rule manager code ):
level.Warn(logger).Log("warnings", strings.Join(warns, ", "), "query", q)
return func(ctx context.Context, q string, t time.Time) (promql.Vector, error) {
for _, i := range rand.Perm(len(queriers)) {
promClient := promClients[i]
endpoints := removeDuplicateQueryEndpoints(logger, duplicatedQuery, queriers[i].Endpoints())
for _, i := range rand.Perm(len(endpoints)) {
span, ctx := tracing.StartSpan(ctx, spanID)
v, warns, err := promClient.PromqlQueryInstant(ctx, endpoints[i], q, t, promclient.QueryOptions{
Deduplicate: true,
PartialResponseStrategy: partialResponseStrategy,
})
span.Finish()

if err != nil {
level.Error(logger).Log("err", err, "query", q)
continue
}
if len(warns) > 0 {
ruleEvalWarnings.WithLabelValues(strings.ToLower(partialResponseStrategy.String())).Inc()
// TODO(bwplotka): Propagate those to UI, probably requires changing rule manager code ):
level.Warn(logger).Log("warnings", strings.Join(warns, ", "), "query", q)
}
return v, nil
}
return v, nil
}
return nil, errors.Errorf("no query API server reachable")
}
return nil, errors.Errorf("no query API server reachable")
}
}

Expand Down
81 changes: 26 additions & 55 deletions pkg/query/api/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ import (
"github.com/prometheus/prometheus/storage"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/query"
"github.com/thanos-io/thanos/pkg/rules"
"github.com/thanos-io/thanos/pkg/rules/rulespb"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
)

Expand Down Expand Up @@ -98,17 +100,13 @@ func SetCORS(w http.ResponseWriter) {

type ApiFunc func(r *http.Request) (interface{}, []error, *ApiError)

type rulesRetriever interface {
RuleGroups(context.Context) ([]*rulespb.RuleGroup, storage.Warnings, error)
}

// API can register a set of endpoints in a router and handle
// them using the provided storage and query engine.
type API struct {
logger log.Logger
queryableCreate query.QueryableCreator
queryEngine *promql.Engine
rulesRetriever rulesRetriever
ruleGroups rules.UnaryClient

enableAutodownsampling bool
enablePartialResponse bool
Expand All @@ -129,7 +127,7 @@ func NewAPI(
enablePartialResponse bool,
replicaLabels []string,
defaultInstantQueryMaxSourceResolution time.Duration,
rr rulesRetriever,
ruleGroups rules.UnaryClient,
) *API {
return &API{
logger: logger,
Expand All @@ -140,7 +138,7 @@ func NewAPI(
replicaLabels: replicaLabels,
reg: reg,
defaultInstantQueryMaxSourceResolution: defaultInstantQueryMaxSourceResolution,
rulesRetriever: rr,
ruleGroups: ruleGroups,

now: time.Now,
}
Expand Down Expand Up @@ -178,7 +176,7 @@ func (api *API) Register(r *route.Router, tracer opentracing.Tracer, logger log.
r.Get("/labels", instr("label_names", api.labelNames))
r.Post("/labels", instr("label_names", api.labelNames))

r.Get("/rules", instr("rules", api.rules))
r.Get("/rules", instr("rules", NewRulesHandler(api.ruleGroups)))
}

type queryData struct {
Expand Down Expand Up @@ -638,55 +636,28 @@ func (api *API) labelNames(r *http.Request) (interface{}, []error, *ApiError) {
return names, warnings, nil
}

func (api *API) rules(r *http.Request) (interface{}, []error, *ApiError) {
var (
res = &rulespb.RuleGroups{}
typeParam = strings.ToLower(r.URL.Query().Get("type"))
)

if typeParam != "" && typeParam != "alert" && typeParam != "record" {
return nil, nil, &ApiError{errorBadData, errors.Errorf("invalid query parameter type='%v'", typeParam)}
}

returnAlerts := typeParam == "" || typeParam == "alert"
returnRecording := typeParam == "" || typeParam == "record"

groups, warnings, err := api.rulesRetriever.RuleGroups(r.Context())
if err != nil {
return nil, nil, &ApiError{ErrorInternal, fmt.Errorf("error retrieving rules: %v", err)}
}

for _, grp := range groups {
apiRuleGroup := &rulespb.RuleGroup{
Name: grp.Name,
File: grp.File,
Interval: grp.Interval,
EvaluationDurationSeconds: grp.EvaluationDurationSeconds,
LastEvaluation: grp.LastEvaluation,
DeprecatedPartialResponseStrategy: grp.DeprecatedPartialResponseStrategy,
PartialResponseStrategy: grp.PartialResponseStrategy,
// NewRulesHandler created handler compatible with HTTP /api/v1/rules https://prometheus.io/docs/prometheus/latest/querying/api/#rules
// which uses gRPC Unary Rules API.
func NewRulesHandler(client rules.UnaryClient) func(*http.Request) (interface{}, []error, *ApiError) {
return func(request *http.Request) (interface{}, []error, *ApiError) {
req := &rulespb.RulesRequest{
PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT,
}
switch typeParam := strings.ToLower(request.URL.Query().Get("type")); typeParam {
case "":
req.Type = rulespb.RulesRequest_ALL
case "alert":
req.Type = rulespb.RulesRequest_ALERTING
case "record":
req.Type = rulespb.RulesRequest_RECORDING
default:
return nil, nil, &ApiError{errorBadData, errors.Errorf("invalid query parameter type='%v'", typeParam)}
}

apiRuleGroup.Rules = make([]*rulespb.Rule, 0, len(grp.Rules))

for _, r := range grp.Rules {
switch {
case r.GetAlert() != nil:
if !returnAlerts {
break
}
apiRuleGroup.Rules = append(apiRuleGroup.Rules, r)
case r.GetRecording() != nil:
if !returnRecording {
break
}
apiRuleGroup.Rules = append(apiRuleGroup.Rules, r)
default:
return nil, nil, &ApiError{ErrorInternal, fmt.Errorf("rule %v: unsupported", r)}
}
groups, warnings, err := client.Rules(request.Context(), req)
if err != nil {
return nil, nil, &ApiError{ErrorInternal, errors.Errorf("error retrieving rules: %v", err)}
}
res.Groups = append(res.Groups, apiRuleGroup)
return groups, warnings, nil
}

return res, warnings, nil
}
Loading

0 comments on commit f891f6a

Please sign in to comment.