Skip to content

Commit

Permalink
Use proto rules API instead of struct; Added rulesAPI RPC to sidecar.
Browse files Browse the repository at this point in the history
Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka committed Mar 11, 2020
1 parent 0c9bbb8 commit ff9d8a7
Show file tree
Hide file tree
Showing 22 changed files with 864 additions and 697 deletions.
3 changes: 2 additions & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,8 @@ func runQuery(
return errors.Wrap(err, "setup gRPC server")
}

s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, proxy,
// TODO: Add rules API implementation when ready.
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, proxy, nil,
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
5 changes: 3 additions & 2 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,8 @@ func runRule(
return errors.Wrap(err, "setup gRPC server")
}

s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, store,
// TODO: Add rules API implementation when ready.
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, store, nil,
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down Expand Up @@ -709,7 +710,7 @@ func queryFunc(

promClients := make([]*promclient.Client, 0, len(queriers))
for _, q := range queriers {
promClients = append(promClients, promclient.NewClient(logger, q))
promClients = append(promClients, promclient.NewClient(q, logger, "thanos-rule"))
}

return func(ctx context.Context, q string, t time.Time) (promql.Vector, error) {
Expand Down
19 changes: 11 additions & 8 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func runSidecar(
maxt: math.MaxInt64,

limitMinTime: limitMinTime,
client: promclient.NewWithTracingClient(logger, "thanos-sidecar"),
}

confContentYaml, err := objStoreConfig.Content()
Expand Down Expand Up @@ -198,15 +199,15 @@ func runSidecar(
// Only check Prometheus's flags when upload is enabled.
if uploads {
// Check prometheus's flags to ensure sane sidecar flags.
if err := validatePrometheus(ctx, logger, ignoreBlockSize, m); err != nil {
if err := validatePrometheus(ctx, m.client, logger, ignoreBlockSize, m); err != nil {
return errors.Wrap(err, "validate Prometheus flags")
}
}

// Blocking query of external labels before joining as a Source Peer into gossip.
// We retry infinitely until we reach and fetch labels from our Prometheus.
err := runutil.Retry(2*time.Second, ctx.Done(), func() error {
if err := m.UpdateLabels(ctx, logger); err != nil {
if err := m.UpdateLabels(ctx); err != nil {
level.Warn(logger).Log(
"msg", "failed to fetch initial external labels. Is Prometheus running? Retrying",
"err", err,
Expand Down Expand Up @@ -239,7 +240,7 @@ func runSidecar(
iterCtx, iterCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer iterCancel()

if err := m.UpdateLabels(iterCtx, logger); err != nil {
if err := m.UpdateLabels(iterCtx); err != nil {
level.Warn(logger).Log("msg", "heartbeat failed", "err", err)
promUp.Set(0)
} else {
Expand Down Expand Up @@ -278,7 +279,7 @@ func runSidecar(
return errors.Wrap(err, "setup gRPC server")
}

s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, promStore,
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, promStore, promStore,
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down Expand Up @@ -356,14 +357,14 @@ func runSidecar(
return nil
}

func validatePrometheus(ctx context.Context, logger log.Logger, ignoreBlockSize bool, m *promMetadata) error {
func validatePrometheus(ctx context.Context, client *promclient.Client, logger log.Logger, ignoreBlockSize bool, m *promMetadata) error {
var (
flagErr error
flags promclient.Flags
)

if err := runutil.Retry(2*time.Second, ctx.Done(), func() error {
if flags, flagErr = promclient.ConfiguredFlags(ctx, logger, m.promURL); flagErr != nil && flagErr != promclient.ErrFlagEndpointNotFound {
if flags, flagErr = client.ConfiguredFlags(ctx, m.promURL); flagErr != nil && flagErr != promclient.ErrFlagEndpointNotFound {
level.Warn(logger).Log("msg", "failed to get Prometheus flags. Is Prometheus running? Retrying", "err", flagErr)
return errors.Wrapf(flagErr, "fetch Prometheus flags")
}
Expand Down Expand Up @@ -402,10 +403,12 @@ type promMetadata struct {
labels labels.Labels

limitMinTime thanosmodel.TimeOrDurationValue

client *promclient.Client
}

func (s *promMetadata) UpdateLabels(ctx context.Context, logger log.Logger) error {
elset, err := promclient.ExternalLabels(ctx, logger, s.promURL)
func (s *promMetadata) UpdateLabels(ctx context.Context) error {
elset, err := s.client.ExternalLabels(ctx, s.promURL)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func runStore(
return errors.Wrap(err, "setup gRPC server")
}

s := grpcserver.New(logger, reg, tracer, component, grpcProbe, bs,
s := grpcserver.New(logger, reg, tracer, component, grpcProbe, bs, nil,
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/go-kit/kit v0.9.0
github.com/go-openapi/strfmt v0.19.2
github.com/gogo/protobuf v1.3.1
github.com/gogo/status v1.0.3
github.com/golang/snappy v0.0.1
github.com/googleapis/gax-go v2.0.2+incompatible
github.com/gophercloud/gophercloud v0.6.0
Expand Down
10 changes: 5 additions & 5 deletions pkg/store/matchers.go → pkg/promclient/matchers.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package store
package promclient

import (
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/thanos-io/thanos/pkg/store/storepb"
)

func translateMatcher(m storepb.LabelMatcher) (*labels.Matcher, error) {
func TranslateMatcher(m storepb.LabelMatcher) (*labels.Matcher, error) {
switch m.Type {
case storepb.LabelMatcher_EQ:
return labels.NewMatcher(labels.MatchEqual, m.Name, m.Value)
Expand All @@ -26,9 +26,9 @@ func translateMatcher(m storepb.LabelMatcher) (*labels.Matcher, error) {
return nil, errors.Errorf("unknown label matcher type %d", m.Type)
}

func translateMatchers(ms []storepb.LabelMatcher) (res []*labels.Matcher, err error) {
func TranslateMatchers(ms []storepb.LabelMatcher) (res []*labels.Matcher, err error) {
for _, m := range ms {
r, err := translateMatcher(m)
r, err := TranslateMatcher(m)
if err != nil {
return nil, err
}
Expand All @@ -40,7 +40,7 @@ func translateMatchers(ms []storepb.LabelMatcher) (res []*labels.Matcher, err er
// matchersToString converts label matchers to string format.
func matchersToString(ms []storepb.LabelMatcher) (string, error) {
var res string
matchers, err := translateMatchers(ms)
matchers, err := TranslateMatchers(ms)
if err != nil {
return "", err
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package store
package promclient

import (
"testing"
Expand Down
Loading

0 comments on commit ff9d8a7

Please sign in to comment.