Skip to content

Commit

Permalink
Refactored proto generation and separate store from rules APIs.
Browse files Browse the repository at this point in the history
Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka committed May 4, 2020
1 parent 705d7cf commit 7ae6595
Show file tree
Hide file tree
Showing 32 changed files with 1,918 additions and 1,840 deletions.
18 changes: 15 additions & 3 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/thanos-io/thanos/pkg/rules"
kingpin "gopkg.in/alecthomas/kingpin.v2"

"github.com/thanos-io/thanos/pkg/component"
Expand Down Expand Up @@ -258,7 +259,8 @@ func runQuery(
dialOpts,
unhealthyStoreTimeout,
)
proxy = store.NewProxyStore(logger, reg, stores.Get, stores.GetRulesClients, component.Query, selectorLset, storeResponseTimeout)
proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout)
rulesProxy = rules.NewProxy(logger, stores.GetRulesClients)
queryableCreator = query.NewQueryableCreator(logger, proxy)
engine = promql.NewEngine(
promql.EngineOpts{
Expand Down Expand Up @@ -362,7 +364,17 @@ func runQuery(
ins := extpromhttp.NewInstrumentationMiddleware(reg)
ui.NewQueryUI(logger, reg, stores, flagsMap).Register(router.WithPrefix(webRoutePrefix), ins)

api := v1.NewAPI(logger, reg, engine, queryableCreator, enableAutodownsampling, enablePartialResponse, queryReplicaLabels, instantDefaultMaxSourceResolution, query.NewRulesRetriever(proxy))
api := v1.NewAPI(
logger,
reg,
engine,
queryableCreator,
enableAutodownsampling,
enablePartialResponse,
queryReplicaLabels,
instantDefaultMaxSourceResolution,
rules.NewRetriever(rulesProxy),
)

api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger, ins)

Expand Down Expand Up @@ -390,7 +402,7 @@ func runQuery(
return errors.Wrap(err, "setup gRPC server")
}

s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, proxy, proxy,
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, proxy, rulesProxy,
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
6 changes: 3 additions & 3 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ import (
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/query"
thanosrule "github.com/thanos-io/thanos/pkg/rule"
v1 "github.com/thanos-io/thanos/pkg/rule/api"
v1 "github.com/thanos-io/thanos/pkg/rules/api"
thanosmanager "github.com/thanos-io/thanos/pkg/rules/manager"
"github.com/thanos-io/thanos/pkg/runutil"
grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
Expand Down Expand Up @@ -389,7 +389,7 @@ func runRule(
// Run rule evaluation and alert notifications.
var (
alertQ = alert.NewQueue(logger, reg, 10000, 100, labelsTSDBToProm(lset), alertExcludeLabels)
ruleMgr = thanosrule.NewManager(dataDir)
ruleMgr = thanosmanager.NewManager(dataDir)
)
{
notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) {
Expand Down
6 changes: 4 additions & 2 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/reloader"
"github.com/thanos-io/thanos/pkg/rules"
"github.com/thanos-io/thanos/pkg/runutil"
grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
Expand Down Expand Up @@ -267,7 +268,8 @@ func runSidecar(
t := exthttp.NewTransport()
t.MaxIdleConnsPerHost = connectionPoolSizePerHost
t.MaxIdleConns = connectionPoolSize
c := &http.Client{Transport: tracing.HTTPTripperware(logger, t)}

c := promclient.NewClient(&http.Client{Transport: tracing.HTTPTripperware(logger, t)}, logger, promclient.ThanosUserAgent)

promStore, err := store.NewPrometheusStore(logger, c, promURL, component.Sidecar, m.Labels, m.Timestamps)
if err != nil {
Expand All @@ -279,7 +281,7 @@ func runSidecar(
return errors.Wrap(err, "setup gRPC server")
}

s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, promStore, promStore,
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, promStore, rules.NewPrometheus(promURL, c),
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
8 changes: 6 additions & 2 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ import (
"github.com/gogo/status"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/common/version"
"github.com/prometheus/prometheus/pkg/labels"
promlabels "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/promql"
"github.com/thanos-io/thanos/pkg/rules/rulespb"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
Expand All @@ -46,6 +48,8 @@ var (
http.StatusServiceUnavailable: codes.Unavailable,
http.StatusInternalServerError: codes.Internal,
}

ThanosUserAgent = fmt.Sprintf("Thanos/%s", version.Version)
)

const (
Expand Down Expand Up @@ -609,7 +613,7 @@ func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label str

// RulesInGRPC returns the rules from Prometheus rules API. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) RulesInGRPC(ctx context.Context, base *url.URL, typeRules string) ([]*storepb.RuleGroup, error) {
func (c *Client) RulesInGRPC(ctx context.Context, base *url.URL, typeRules string) ([]*rulespb.RuleGroup, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/rules")

Expand All @@ -620,7 +624,7 @@ func (c *Client) RulesInGRPC(ctx context.Context, base *url.URL, typeRules strin
}

var m struct {
Data *storepb.RuleGroups `json:"data"`
Data *rulespb.RuleGroups `json:"data"`
}

if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_rules HTTP[client]", &u, &m); err != nil {
Expand Down
10 changes: 5 additions & 5 deletions pkg/query/api/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ import (
"github.com/prometheus/prometheus/storage"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/query"
"github.com/thanos-io/thanos/pkg/rules/rulespb"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
)

Expand Down Expand Up @@ -99,7 +99,7 @@ func SetCORS(w http.ResponseWriter) {
type ApiFunc func(r *http.Request) (interface{}, []error, *ApiError)

type rulesRetriever interface {
RuleGroups(context.Context) ([]*storepb.RuleGroup, storage.Warnings, error)
RuleGroups(context.Context) ([]*rulespb.RuleGroup, storage.Warnings, error)
}

// API can register a set of endpoints in a router and handle
Expand Down Expand Up @@ -640,7 +640,7 @@ func (api *API) labelNames(r *http.Request) (interface{}, []error, *ApiError) {

func (api *API) rules(r *http.Request) (interface{}, []error, *ApiError) {
var (
res = &storepb.RuleGroups{}
res = &rulespb.RuleGroups{}
typeParam = strings.ToLower(r.URL.Query().Get("type"))
)

Expand All @@ -657,7 +657,7 @@ func (api *API) rules(r *http.Request) (interface{}, []error, *ApiError) {
}

for _, grp := range groups {
apiRuleGroup := &storepb.RuleGroup{
apiRuleGroup := &rulespb.RuleGroup{
Name: grp.Name,
File: grp.File,
Interval: grp.Interval,
Expand All @@ -667,7 +667,7 @@ func (api *API) rules(r *http.Request) (interface{}, []error, *ApiError) {
PartialResponseStrategy: grp.PartialResponseStrategy,
}

apiRuleGroup.Rules = make([]*storepb.Rule, 0, len(grp.Rules))
apiRuleGroup.Rules = make([]*rulespb.Rule, 0, len(grp.Rules))

for _, r := range grp.Rules {
switch {
Expand Down
16 changes: 9 additions & 7 deletions pkg/query/storeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/rules/rulespb"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/storepb"
Expand Down Expand Up @@ -177,7 +178,7 @@ type StoreSet struct {
unhealthyStoreTimeout time.Duration
}

// NewStoreSet returns a new set of stores from cluster peers and statically configured ones.
// NewStoreSet returns a new set of store APIs and potentially Rules APIs from given specs.
func NewStoreSet(
logger log.Logger,
reg *prometheus.Registry,
Expand Down Expand Up @@ -215,14 +216,15 @@ func NewStoreSet(
return ss
}

// TODO(bwplotka): Consider moving out of this package and renam as it also supports rules API.
type storeRef struct {
storepb.StoreClient

mtx sync.RWMutex
cc *grpc.ClientConn
addr string
// if rule is not nil, then this store also supports rules API.
rule storepb.RulesClient
// If rule is not nil, then this store also supports rules API.
rule rulespb.RulesClient

// Meta (can change during runtime).
labelSets []storepb.LabelSet
Expand Down Expand Up @@ -449,10 +451,10 @@ func (s *StoreSet) getHealthyStores(ctx context.Context, stores map[string]*stor
return
}

var rule storepb.RulesClient
var rule rulespb.RulesClient

if _, ok := ruleAddrSet[addr]; ok {
rule = storepb.NewRulesClient(conn)
rule = rulespb.NewRulesClient(conn)
}

st = &storeRef{StoreClient: storepb.NewStoreClient(conn), rule: rule, cc: conn, addr: addr, logger: s.logger}
Expand Down Expand Up @@ -542,11 +544,11 @@ func (s *StoreSet) Get() []store.Client {
}

// GetRulesClients returns a list of all active rules clients.
func (s *StoreSet) GetRulesClients() []storepb.RulesClient {
func (s *StoreSet) GetRulesClients() []rulespb.RulesClient {
s.storesMtx.RLock()
defer s.storesMtx.RUnlock()

rules := make([]storepb.RulesClient, 0, len(s.stores))
rules := make([]rulespb.RulesClient, 0, len(s.stores))
for _, st := range s.stores {
if st.HasRulesAPI() {
rules = append(rules, st.rule)
Expand Down
43 changes: 22 additions & 21 deletions pkg/rule/api/v1.go → pkg/rules/api/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ import (

"github.com/NYTimes/gziphandler"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/rules/rulespb"

"github.com/go-kit/kit/log"
opentracing "github.com/opentracing/opentracing-go"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/rules"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
qapi "github.com/thanos-io/thanos/pkg/query/api"
thanosrule "github.com/thanos-io/thanos/pkg/rule"
"github.com/thanos-io/thanos/pkg/rules/manager"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
)
Expand Down Expand Up @@ -63,14 +64,14 @@ func (api *API) Register(r *route.Router, tracer opentracing.Tracer, logger log.
}

type RulesRetriever interface {
RuleGroups() []thanosrule.Group
AlertingRules() []thanosrule.AlertingRule
RuleGroups() []manager.Group
AlertingRules() []manager.AlertingRule
}

func (api *API) rules(*http.Request) (interface{}, []error, *qapi.ApiError) {
res := &storepb.RuleGroups{}
res := &rulespb.RuleGroups{}
for _, grp := range api.ruleRetriever.RuleGroups() {
apiRuleGroup := &storepb.RuleGroup{
apiRuleGroup := &rulespb.RuleGroup{
Name: grp.Name(),
File: grp.OriginalFile(),
Interval: grp.Interval().Seconds(),
Expand All @@ -85,26 +86,26 @@ func (api *API) rules(*http.Request) (interface{}, []error, *qapi.ApiError) {

switch rule := r.(type) {
case *rules.AlertingRule:
apiRuleGroup.Rules = append(apiRuleGroup.Rules, &storepb.Rule{
Result: &storepb.Rule_Alert{Alert: &storepb.Alert{
State: storepb.AlertState(rule.State()),
apiRuleGroup.Rules = append(apiRuleGroup.Rules, &rulespb.Rule{
Result: &rulespb.Rule_Alert{Alert: &rulespb.Alert{
State: rulespb.AlertState(rule.State()),
Name: rule.Name(),
Query: rule.Query().String(),
DurationSeconds: rule.Duration().Seconds(),
Labels: &storepb.PromLabels{Labels: storepb.PromLabelsToLabels(rule.Labels())},
Annotations: &storepb.PromLabels{Labels: storepb.PromLabelsToLabels(rule.Annotations())},
Labels: &rulespb.PromLabels{Labels: storepb.PromLabelsToLabels(rule.Labels())},
Annotations: &rulespb.PromLabels{Labels: storepb.PromLabelsToLabels(rule.Annotations())},
Alerts: rulesAlertsToAPIAlerts(grp.PartialResponseStrategy, rule.ActiveAlerts()),
Health: string(rule.Health()),
LastError: lastError,
EvaluationDurationSeconds: rule.GetEvaluationDuration().Seconds(),
LastEvaluation: rule.GetEvaluationTimestamp(),
}}})
case *rules.RecordingRule:
apiRuleGroup.Rules = append(apiRuleGroup.Rules, &storepb.Rule{
Result: &storepb.Rule_Recording{Recording: &storepb.RecordingRule{
apiRuleGroup.Rules = append(apiRuleGroup.Rules, &rulespb.Rule{
Result: &rulespb.Rule_Recording{Recording: &rulespb.RecordingRule{
Name: rule.Name(),
Query: rule.Query().String(),
Labels: &storepb.PromLabels{Labels: storepb.PromLabelsToLabels(rule.Labels())},
Labels: &rulespb.PromLabels{Labels: storepb.PromLabelsToLabels(rule.Labels())},
Health: string(rule.Health()),
LastError: lastError,
EvaluationDurationSeconds: rule.GetEvaluationDuration().Seconds(),
Expand All @@ -122,24 +123,24 @@ func (api *API) rules(*http.Request) (interface{}, []error, *qapi.ApiError) {
}

func (api *API) alerts(*http.Request) (interface{}, []error, *qapi.ApiError) {
var alerts []*storepb.AlertInstance
var alerts []*rulespb.AlertInstance
for _, alertingRule := range api.ruleRetriever.AlertingRules() {
alerts = append(
alerts,
rulesAlertsToAPIAlerts(alertingRule.PartialResponseStrategy, alertingRule.ActiveAlerts())...,
)
}
return struct{ Alerts []*storepb.AlertInstance }{Alerts: alerts}, nil, nil
return struct{ Alerts []*rulespb.AlertInstance }{Alerts: alerts}, nil, nil
}

func rulesAlertsToAPIAlerts(s storepb.PartialResponseStrategy, rulesAlerts []*rules.Alert) []*storepb.AlertInstance {
apiAlerts := make([]*storepb.AlertInstance, len(rulesAlerts))
func rulesAlertsToAPIAlerts(s storepb.PartialResponseStrategy, rulesAlerts []*rules.Alert) []*rulespb.AlertInstance {
apiAlerts := make([]*rulespb.AlertInstance, len(rulesAlerts))
for i, ruleAlert := range rulesAlerts {
apiAlerts[i] = &storepb.AlertInstance{
apiAlerts[i] = &rulespb.AlertInstance{
PartialResponseStrategy: s,
Labels: &storepb.PromLabels{Labels: storepb.PromLabelsToLabels(ruleAlert.Labels)},
Annotations: &storepb.PromLabels{Labels: storepb.PromLabelsToLabels(ruleAlert.Annotations)},
State: storepb.AlertState(ruleAlert.State),
Labels: &rulespb.PromLabels{Labels: storepb.PromLabelsToLabels(ruleAlert.Labels)},
Annotations: &rulespb.PromLabels{Labels: storepb.PromLabelsToLabels(ruleAlert.Annotations)},
State: rulespb.AlertState(ruleAlert.State),
ActiveAt: &ruleAlert.ActiveAt,
Value: strconv.FormatFloat(ruleAlert.Value, 'e', -1, 64),
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/rule/api/v1_test.go → pkg/rules/api/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/tsdb"
qapi "github.com/thanos-io/thanos/pkg/query/api"
thanosrule "github.com/thanos-io/thanos/pkg/rule"
"github.com/thanos-io/thanos/pkg/rules/manager"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/testutil"
"github.com/thanos-io/thanos/pkg/testutil/testpromcompatibility"
Expand Down Expand Up @@ -66,7 +66,7 @@ type rulesRetrieverMock struct {
testing *testing.T
}

func (m rulesRetrieverMock) RuleGroups() []thanosrule.Group {
func (m rulesRetrieverMock) RuleGroups() []manager.Group {
storage := newStorage(m.testing)

engineOpts := promql.EngineOpts{
Expand Down Expand Up @@ -97,18 +97,18 @@ func (m rulesRetrieverMock) RuleGroups() []thanosrule.Group {
recordingRule := rules.NewRecordingRule("recording-rule-1", recordingExpr, labels.Labels{})
r = append(r, recordingRule)

return []thanosrule.Group{
thanosrule.Group{
return []manager.Group{
manager.Group{
Group: rules.NewGroup("grp", "/path/to/file", time.Second, r, false, opts),
PartialResponseStrategy: storepb.PartialResponseStrategy_WARN,
},
}
}

func (m rulesRetrieverMock) AlertingRules() []thanosrule.AlertingRule {
var ars []thanosrule.AlertingRule
func (m rulesRetrieverMock) AlertingRules() []manager.AlertingRule {
var ars []manager.AlertingRule
for _, ar := range alertingRules(m.testing) {
ars = append(ars, thanosrule.AlertingRule{AlertingRule: ar})
ars = append(ars, manager.AlertingRule{AlertingRule: ar})
}
return ars
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/rule/rule.go → pkg/rules/manager/rule.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package thanosrule
package manager

import (
"crypto/sha256"
Expand Down
Loading

0 comments on commit 7ae6595

Please sign in to comment.