Skip to content

Commit

Permalink
Refactored proto generation and separated store from rules APIs. (#2558)
Browse files Browse the repository at this point in the history
* Refactored proto generation and separate store from rules APIs.

Signed-off-by: Bartlomiej Plotka <[email protected]>

* Addressed comments.

Signed-off-by: Bartlomiej Plotka <[email protected]>

* Fixed proto gen.

Signed-off-by: Bartlomiej Plotka <[email protected]>

* Addressed Serg comments.

Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka committed May 22, 2020
1 parent 9dac4ab commit 5f6a683
Show file tree
Hide file tree
Showing 39 changed files with 2,038 additions and 1,975 deletions.
20 changes: 17 additions & 3 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/query"
v1 "github.com/thanos-io/thanos/pkg/query/api"
"github.com/thanos-io/thanos/pkg/rules"
"github.com/thanos-io/thanos/pkg/runutil"
grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
Expand Down Expand Up @@ -270,7 +271,8 @@ func runQuery(
dialOpts,
unhealthyStoreTimeout,
)
proxy = store.NewProxyStore(logger, reg, stores.Get, stores.GetRulesClients, component.Query, selectorLset, storeResponseTimeout)
proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout)
rulesProxy = rules.NewProxy(logger, stores.GetRulesClients)
queryableCreator = query.NewQueryableCreator(logger, proxy)
engine = promql.NewEngine(
promql.EngineOpts{
Expand Down Expand Up @@ -379,7 +381,19 @@ func runQuery(
// TODO(bplotka in PR #513 review): pass all flags, not only the flags needed by prefix rewriting.
ui.NewQueryUI(logger, reg, stores, webExternalPrefix, webPrefixHeaderName).Register(router, ins)

api := v1.NewAPI(logger, reg, stores, engine, queryableCreator, enableAutodownsampling, enablePartialResponse, queryReplicaLabels, instantDefaultMaxSourceResolution, query.NewRulesRetriever(proxy))
api := v1.NewAPI(
logger,
reg,
stores,
engine,
queryableCreator,
enableAutodownsampling,
enablePartialResponse,
queryReplicaLabels,
instantDefaultMaxSourceResolution,
rules.NewRetriever(rulesProxy),
)

api.Register(router.WithPrefix("/api/v1"), tracer, logger, ins)

srv := httpserver.New(logger, reg, comp, httpProbe,
Expand All @@ -406,7 +420,7 @@ func runQuery(
return errors.Wrap(err, "setup gRPC server")
}

s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, proxy, proxy,
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, proxy, rulesProxy,
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
8 changes: 4 additions & 4 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ import (
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/query"
thanosrule "github.com/thanos-io/thanos/pkg/rule"
v1 "github.com/thanos-io/thanos/pkg/rule/api"
v1 "github.com/thanos-io/thanos/pkg/rules/api"
rulesmanager "github.com/thanos-io/thanos/pkg/rules/manager"
"github.com/thanos-io/thanos/pkg/runutil"
grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
Expand Down Expand Up @@ -400,7 +400,7 @@ func runRule(
// Run rule evaluation and alert notifications.
var (
alertQ = alert.NewQueue(logger, reg, 10000, 100, labelsTSDBToProm(lset), alertExcludeLabels)
ruleMgr = thanosrule.NewManager(dataDir)
ruleMgr = rulesmanager.NewManager(dataDir)
)
{
notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) {
Expand Down Expand Up @@ -758,7 +758,7 @@ func addDiscoveryGroups(g *run.Group, c *http_util.Client, interval time.Duratio

func reloadRules(logger log.Logger,
ruleFiles []string,
ruleMgr *thanosrule.Manager,
ruleMgr *rulesmanager.Manager,
evalInterval time.Duration,
metrics *RuleMetrics) error {
level.Debug(logger).Log("msg", "configured rule files", "files", strings.Join(ruleFiles, ","))
Expand Down
7 changes: 4 additions & 3 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ import (
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/exthttp"
"github.com/thanos-io/thanos/pkg/extprom"
thanoshttp "github.com/thanos-io/thanos/pkg/http"
thanosmodel "github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/reloader"
"github.com/thanos-io/thanos/pkg/rules"
"github.com/thanos-io/thanos/pkg/runutil"
grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
Expand All @@ -38,7 +40,6 @@ import (
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tls"
"github.com/thanos-io/thanos/pkg/tracing"

"gopkg.in/alecthomas/kingpin.v2"
)

Expand Down Expand Up @@ -209,7 +210,7 @@ func runSidecar(
t := exthttp.NewTransport()
t.MaxIdleConnsPerHost = conf.connection.maxIdleConnsPerHost
t.MaxIdleConns = conf.connection.maxIdleConns
c := &http.Client{Transport: tracing.HTTPTripperware(logger, t)}
c := promclient.NewClient(&http.Client{Transport: tracing.HTTPTripperware(logger, t)}, logger, thanoshttp.ThanosUserAgent)

promStore, err := store.NewPrometheusStore(logger, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps)
if err != nil {
Expand All @@ -222,7 +223,7 @@ func runSidecar(
return errors.Wrap(err, "setup gRPC server")
}

s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, promStore, promStore,
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, promStore, rules.NewPrometheus(conf.prometheus.url, c),
grpcserver.WithListen(conf.grpc.bindAddress),
grpcserver.WithGracePeriod(time.Duration(conf.grpc.gracePeriod)),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,8 @@ replace (
// Make sure Cortex is not forcing us to some other Prometheus version.
github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20200407102557-cd73b3d33e06 // @cd73b3d33e064bbd846fc7a26dc8c313d46af382 (after v2.17.0 and before v2.18.0).
k8s.io/api => k8s.io/api v0.0.0-20190620084959-7cf5895f2711
k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.0.0-20190620085554-14e95df34f1f
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20190612205821-1799e75a0719
k8s.io/client-go => k8s.io/client-go v0.0.0-20190620085101-78d2af792bab
k8s.io/code-generator => k8s.io/code-generator v0.0.0-20190612205613-18da4a14b22b
k8s.io/klog => k8s.io/klog v0.3.1
k8s.io/kube-openapi => k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30
)
Expand Down
4 changes: 2 additions & 2 deletions pkg/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ func NewHTTPClient(cfg ClientConfig, name string) (*http.Client, error) {
if err != nil {
return nil, err
}
client.Transport = &userAgentRoundTripper{name: userAgent, rt: client.Transport}
client.Transport = &userAgentRoundTripper{name: ThanosUserAgent, rt: client.Transport}
return client, nil
}

var userAgent = fmt.Sprintf("Thanos/%s", version.Version)
var ThanosUserAgent = fmt.Sprintf("Thanos/%s", version.Version)

type userAgentRoundTripper struct {
name string
Expand Down
10 changes: 3 additions & 7 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/thanos-io/thanos/pkg/rules/rulespb"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
Expand Down Expand Up @@ -381,11 +382,6 @@ func (c *Client) QueryInstant(ctx context.Context, base *url.URL, query string,
defer span.Finish()

body, _, err := c.get2xx(ctx, &u)
if err != nil {
return nil, nil, err
}

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, nil, errors.Wrap(err, "read query instant response")
}
Expand Down Expand Up @@ -615,7 +611,7 @@ func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label str

// RulesInGRPC returns the rules from Prometheus rules API. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) RulesInGRPC(ctx context.Context, base *url.URL, typeRules string) ([]*storepb.RuleGroup, error) {
func (c *Client) RulesInGRPC(ctx context.Context, base *url.URL, typeRules string) ([]*rulespb.RuleGroup, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/rules")

Expand All @@ -626,7 +622,7 @@ func (c *Client) RulesInGRPC(ctx context.Context, base *url.URL, typeRules strin
}

var m struct {
Data *storepb.RuleGroups `json:"data"`
Data *rulespb.RuleGroups `json:"data"`
}

if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_rules HTTP[client]", &u, &m); err != nil {
Expand Down
10 changes: 5 additions & 5 deletions pkg/query/api/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ import (
"github.com/prometheus/prometheus/storage"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/query"
"github.com/thanos-io/thanos/pkg/rules/rulespb"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
)

Expand Down Expand Up @@ -100,7 +100,7 @@ func SetCORS(w http.ResponseWriter) {
type ApiFunc func(r *http.Request) (interface{}, []error, *ApiError)

type rulesRetriever interface {
RuleGroups(context.Context) ([]*storepb.RuleGroup, storage.Warnings, error)
RuleGroups(context.Context) ([]*rulespb.RuleGroup, storage.Warnings, error)
}

// API can register a set of endpoints in a router and handle
Expand Down Expand Up @@ -653,7 +653,7 @@ func (api *API) stores(r *http.Request) (interface{}, []error, *ApiError) {

func (api *API) rules(r *http.Request) (interface{}, []error, *ApiError) {
var (
res = &storepb.RuleGroups{}
res = &rulespb.RuleGroups{}
typeParam = strings.ToLower(r.URL.Query().Get("type"))
)

Expand All @@ -670,7 +670,7 @@ func (api *API) rules(r *http.Request) (interface{}, []error, *ApiError) {
}

for _, grp := range groups {
apiRuleGroup := &storepb.RuleGroup{
apiRuleGroup := &rulespb.RuleGroup{
Name: grp.Name,
File: grp.File,
Interval: grp.Interval,
Expand All @@ -680,7 +680,7 @@ func (api *API) rules(r *http.Request) (interface{}, []error, *ApiError) {
PartialResponseStrategy: grp.PartialResponseStrategy,
}

apiRuleGroup.Rules = make([]*storepb.Rule, 0, len(grp.Rules))
apiRuleGroup.Rules = make([]*rulespb.Rule, 0, len(grp.Rules))

for _, r := range grp.Rules {
switch {
Expand Down
17 changes: 9 additions & 8 deletions pkg/query/storeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/rules/rulespb"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/storepb"
Expand Down Expand Up @@ -174,7 +175,7 @@ type StoreSet struct {
unhealthyStoreTimeout time.Duration
}

// NewStoreSet returns a new set of stores from cluster peers and statically configured ones.
// NewStoreSet returns a new set of store APIs and potentially Rules APIs from given specs.
func NewStoreSet(
logger log.Logger,
reg *prometheus.Registry,
Expand Down Expand Up @@ -212,14 +213,15 @@ func NewStoreSet(
return ss
}

// TODO(bwplotka): Consider moving storeRef out of this package and renaming it, as it also supports rules API.
type storeRef struct {
storepb.StoreClient

mtx sync.RWMutex
cc *grpc.ClientConn
addr string
// if rule is not nil, then this store also supports rules API.
rule storepb.RulesClient
// If rule is not nil, then this store also supports rules API.
rule rulespb.RulesClient

// Meta (can change during runtime).
labelSets []storepb.LabelSet
Expand Down Expand Up @@ -448,10 +450,9 @@ func (s *StoreSet) getActiveStores(ctx context.Context, stores map[string]*store
level.Warn(s.logger).Log("msg", "update of store node failed", "err", errors.Wrap(err, "dialing connection"), "address", addr)
return
}

var rule storepb.RulesClient
var rule rulespb.RulesClient
if _, ok := ruleAddrSet[addr]; ok {
rule = storepb.NewRulesClient(conn)
rule = rulespb.NewRulesClient(conn)
}

st = &storeRef{StoreClient: storepb.NewStoreClient(conn), storeType: component.UnknownStoreAPI, rule: rule, cc: conn, addr: addr, logger: s.logger}
Expand Down Expand Up @@ -551,11 +552,11 @@ func (s *StoreSet) Get() []store.Client {
}

// GetRulesClients returns a list of all active rules clients.
func (s *StoreSet) GetRulesClients() []storepb.RulesClient {
func (s *StoreSet) GetRulesClients() []rulespb.RulesClient {
s.storesMtx.RLock()
defer s.storesMtx.RUnlock()

rules := make([]storepb.RulesClient, 0, len(s.stores))
rules := make([]rulespb.RulesClient, 0, len(s.stores))
for _, st := range s.stores {
if st.HasRulesAPI() {
rules = append(rules, st.rule)
Expand Down
43 changes: 22 additions & 21 deletions pkg/rule/api/v1.go → pkg/rules/api/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ import (
"github.com/NYTimes/gziphandler"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/rules/rulespb"

"github.com/go-kit/kit/log"
opentracing "github.com/opentracing/opentracing-go"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/rules"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
qapi "github.com/thanos-io/thanos/pkg/query/api"
thanosrule "github.com/thanos-io/thanos/pkg/rule"
"github.com/thanos-io/thanos/pkg/rules/manager"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
)
Expand Down Expand Up @@ -63,14 +64,14 @@ func (api *API) Register(r *route.Router, tracer opentracing.Tracer, logger log.
}

type RulesRetriever interface {
RuleGroups() []thanosrule.Group
AlertingRules() []thanosrule.AlertingRule
RuleGroups() []manager.Group
AlertingRules() []manager.AlertingRule
}

func (api *API) rules(*http.Request) (interface{}, []error, *qapi.ApiError) {
res := &storepb.RuleGroups{}
res := &rulespb.RuleGroups{}
for _, grp := range api.ruleRetriever.RuleGroups() {
apiRuleGroup := &storepb.RuleGroup{
apiRuleGroup := &rulespb.RuleGroup{
Name: grp.Name(),
File: grp.OriginalFile(),
Interval: grp.Interval().Seconds(),
Expand All @@ -85,26 +86,26 @@ func (api *API) rules(*http.Request) (interface{}, []error, *qapi.ApiError) {

switch rule := r.(type) {
case *rules.AlertingRule:
apiRuleGroup.Rules = append(apiRuleGroup.Rules, &storepb.Rule{
Result: &storepb.Rule_Alert{Alert: &storepb.Alert{
State: storepb.AlertState(rule.State()),
apiRuleGroup.Rules = append(apiRuleGroup.Rules, &rulespb.Rule{
Result: &rulespb.Rule_Alert{Alert: &rulespb.Alert{
State: rulespb.AlertState(rule.State()),
Name: rule.Name(),
Query: rule.Query().String(),
DurationSeconds: rule.HoldDuration().Seconds(),
Labels: &storepb.PromLabels{Labels: storepb.PromLabelsToLabels(rule.Labels())},
Annotations: &storepb.PromLabels{Labels: storepb.PromLabelsToLabels(rule.Annotations())},
Labels: &rulespb.PromLabels{Labels: storepb.PromLabelsToLabels(rule.Labels())},
Annotations: &rulespb.PromLabels{Labels: storepb.PromLabelsToLabels(rule.Annotations())},
Alerts: rulesAlertsToAPIAlerts(grp.PartialResponseStrategy, rule.ActiveAlerts()),
Health: string(rule.Health()),
LastError: lastError,
EvaluationDurationSeconds: rule.GetEvaluationDuration().Seconds(),
LastEvaluation: rule.GetEvaluationTimestamp(),
}}})
case *rules.RecordingRule:
apiRuleGroup.Rules = append(apiRuleGroup.Rules, &storepb.Rule{
Result: &storepb.Rule_Recording{Recording: &storepb.RecordingRule{
apiRuleGroup.Rules = append(apiRuleGroup.Rules, &rulespb.Rule{
Result: &rulespb.Rule_Recording{Recording: &rulespb.RecordingRule{
Name: rule.Name(),
Query: rule.Query().String(),
Labels: &storepb.PromLabels{Labels: storepb.PromLabelsToLabels(rule.Labels())},
Labels: &rulespb.PromLabels{Labels: storepb.PromLabelsToLabels(rule.Labels())},
Health: string(rule.Health()),
LastError: lastError,
EvaluationDurationSeconds: rule.GetEvaluationDuration().Seconds(),
Expand All @@ -122,24 +123,24 @@ func (api *API) rules(*http.Request) (interface{}, []error, *qapi.ApiError) {
}

func (api *API) alerts(*http.Request) (interface{}, []error, *qapi.ApiError) {
var alerts []*storepb.AlertInstance
var alerts []*rulespb.AlertInstance
for _, alertingRule := range api.ruleRetriever.AlertingRules() {
alerts = append(
alerts,
rulesAlertsToAPIAlerts(alertingRule.PartialResponseStrategy, alertingRule.ActiveAlerts())...,
)
}
return struct{ Alerts []*storepb.AlertInstance }{Alerts: alerts}, nil, nil
return struct{ Alerts []*rulespb.AlertInstance }{Alerts: alerts}, nil, nil
}

func rulesAlertsToAPIAlerts(s storepb.PartialResponseStrategy, rulesAlerts []*rules.Alert) []*storepb.AlertInstance {
apiAlerts := make([]*storepb.AlertInstance, len(rulesAlerts))
func rulesAlertsToAPIAlerts(s storepb.PartialResponseStrategy, rulesAlerts []*rules.Alert) []*rulespb.AlertInstance {
apiAlerts := make([]*rulespb.AlertInstance, len(rulesAlerts))
for i, ruleAlert := range rulesAlerts {
apiAlerts[i] = &storepb.AlertInstance{
apiAlerts[i] = &rulespb.AlertInstance{
PartialResponseStrategy: s,
Labels: &storepb.PromLabels{Labels: storepb.PromLabelsToLabels(ruleAlert.Labels)},
Annotations: &storepb.PromLabels{Labels: storepb.PromLabelsToLabels(ruleAlert.Annotations)},
State: storepb.AlertState(ruleAlert.State),
Labels: &rulespb.PromLabels{Labels: storepb.PromLabelsToLabels(ruleAlert.Labels)},
Annotations: &rulespb.PromLabels{Labels: storepb.PromLabelsToLabels(ruleAlert.Annotations)},
State: rulespb.AlertState(ruleAlert.State),
ActiveAt: &ruleAlert.ActiveAt,
Value: strconv.FormatFloat(ruleAlert.Value, 'e', -1, 64),
}
Expand Down
Loading

0 comments on commit 5f6a683

Please sign in to comment.