Skip to content

Commit

Permalink
federated targets functionality (#1375)
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Tunik <[email protected]>
  • Loading branch information
2nick committed Mar 16, 2021
1 parent 03c7747 commit 5578a59
Show file tree
Hide file tree
Showing 25 changed files with 3,023 additions and 17 deletions.
30 changes: 30 additions & 0 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/targets"
"github.com/thanos-io/thanos/pkg/tls"
"github.com/thanos-io/thanos/pkg/ui"
)
Expand Down Expand Up @@ -103,6 +104,10 @@ func registerQuery(app *extkingpin.App) {
metadataEndpoints := cmd.Flag("metadata", "Experimental: Addresses of statically configured metadata API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect metadata API servers through respective DNS lookups.").
Hidden().PlaceHolder("<metadata>").Strings()

// TODO(atunik): Hidden because we plan to extract discovery to separate API: https://github.com/thanos-io/thanos/issues/2600.
targetEndpoints := cmd.Flag("target", "Experimental: Addresses of statically configured target API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect target API servers through respective DNS lookups.").
Hidden().PlaceHolder("<target>").Strings()

strictStores := cmd.Flag("store-strict", "Addresses of only statically configured store API servers that are always used, even if the health check fails. Useful if you have a caching layer on top.").
PlaceHolder("<staticstore>").Strings()

Expand Down Expand Up @@ -169,6 +174,10 @@ func registerQuery(app *extkingpin.App) {
return errors.Wrap(err, "error while parsing config for request logging")
}

if dup := firstDuplicate(*targetEndpoints); dup != "" {
return errors.Errorf("Address %s is duplicated for --target flag.", dup)
}

var fileSD *file.Discovery
if len(*fileSDFiles) > 0 {
conf := &file.SDConfig{
Expand Down Expand Up @@ -222,6 +231,7 @@ func registerQuery(app *extkingpin.App) {
getFlagsMap(cmd.Flags()),
*stores,
*ruleEndpoints,
*targetEndpoints,
*metadataEndpoints,
*enableAutodownsampling,
*enableQueryPartialResponse,
Expand Down Expand Up @@ -278,6 +288,7 @@ func runQuery(
flagsMap map[string]string,
storeAddrs []string,
ruleAddrs []string,
targetAddrs []string,
metadataAddrs []string,
enableAutodownsampling bool,
enableQueryPartialResponse bool,
Expand Down Expand Up @@ -323,6 +334,12 @@ func runQuery(
dns.ResolverType(dnsSDResolver),
)

dnsTargetProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_target_apis_", reg),
dns.ResolverType(dnsSDResolver),
)

dnsMetadataProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_metadata_apis_", reg),
Expand Down Expand Up @@ -355,6 +372,13 @@ func runQuery(

return specs
},
func() (specs []query.TargetSpec) {
for _, addr := range dnsTargetProvider.Addresses() {
specs = append(specs, query.NewGRPCStoreSpec(addr, false))
}

return specs
},
func() (specs []query.MetadataSpec) {
for _, addr := range dnsMetadataProvider.Addresses() {
specs = append(specs, query.NewGRPCStoreSpec(addr, false))
Expand All @@ -367,6 +391,7 @@ func runQuery(
)
proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout)
rulesProxy = rules.NewProxy(logger, stores.GetRulesClients)
targetsProxy = targets.NewProxy(logger, stores.GetTargetsClients)
metadataProxy = metadata.NewProxy(logger, stores.GetMetadataClients)
queryableCreator = query.NewQueryableCreator(
logger,
Expand Down Expand Up @@ -454,6 +479,9 @@ func runQuery(
if err := dnsRuleProvider.Resolve(resolveCtx, ruleAddrs); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for rulesAPIs", "err", err)
}
if err := dnsTargetProvider.Resolve(ctx, targetAddrs); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for targetsAPIs", "err", err)
}
if err := dnsMetadataProvider.Resolve(resolveCtx, metadataAddrs); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for metadataAPIs", "err", err)
}
Expand Down Expand Up @@ -504,6 +532,7 @@ func runQuery(
queryableCreator,
// NOTE: Will share the same replica label as the query for now.
rules.NewGRPCClientWithDedup(rulesProxy, queryReplicaLabels),
targets.NewGRPCClientWithDedup(targetsProxy, queryReplicaLabels),
metadata.NewGRPCClient(metadataProxy),
enableAutodownsampling,
enableQueryPartialResponse,
Expand Down Expand Up @@ -550,6 +579,7 @@ func runQuery(
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(proxy)),
grpcserver.WithServer(rules.RegisterRulesServer(rulesProxy)),
grpcserver.WithServer(targets.RegisterTargetsServer(targetsProxy)),
grpcserver.WithServer(metadata.RegisterMetadataServer(metadataProxy)),
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
Expand Down
2 changes: 2 additions & 0 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/targets"
"github.com/thanos-io/thanos/pkg/tls"
"github.com/thanos-io/thanos/pkg/tracing"
)
Expand Down Expand Up @@ -229,6 +230,7 @@ func runSidecar(
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(promStore)),
grpcserver.WithServer(rules.RegisterRulesServer(rules.NewPrometheus(conf.prometheus.url, c, m.Labels))),
grpcserver.WithServer(targets.RegisterTargetsServer(targets.NewPrometheus(conf.prometheus.url, c, m.Labels))),
grpcserver.WithServer(meta.RegisterMetadataServer(meta.NewPrometheus(conf.prometheus.url, c))),
grpcserver.WithListen(conf.grpc.bindAddress),
grpcserver.WithGracePeriod(time.Duration(conf.grpc.gracePeriod)),
Expand Down
37 changes: 37 additions & 0 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ import (
"github.com/thanos-io/thanos/pkg/rules/rulespb"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/targets"
"github.com/thanos-io/thanos/pkg/targets/targetspb"
"github.com/thanos-io/thanos/pkg/tracing"
)

Expand All @@ -72,6 +74,7 @@ type QueryAPI struct {
// queryEngine returns appropriate promql.Engine for a query with a given step.
queryEngine func(int64) *promql.Engine
ruleGroups rules.UnaryClient
targets targets.UnaryClient
metadatas metadata.UnaryClient

enableAutodownsampling bool
Expand All @@ -95,6 +98,7 @@ func NewQueryAPI(
qe func(int64) *promql.Engine,
c query.QueryableCreator,
ruleGroups rules.UnaryClient,
targets targets.UnaryClient,
metadatas metadata.UnaryClient,
enableAutodownsampling bool,
enableQueryPartialResponse bool,
Expand All @@ -115,6 +119,7 @@ func NewQueryAPI(
queryableCreate: c,
gate: gate,
ruleGroups: ruleGroups,
targets: targets,
metadatas: metadatas,

enableAutodownsampling: enableAutodownsampling,
Expand Down Expand Up @@ -154,6 +159,8 @@ func (qapi *QueryAPI) Register(r *route.Router, tracer opentracing.Tracer, logge

r.Get("/rules", instr("rules", NewRulesHandler(qapi.ruleGroups, qapi.enableRulePartialResponse)))

r.Get("/targets", instr("targets", NewTargetsHandler(qapi.targets, true)))

r.Get("/metadata", instr("metadata", NewMetricMetadataHandler(qapi.metadatas, qapi.enableMetricMetadataPartialResponse)))
}

Expand Down Expand Up @@ -652,6 +659,36 @@ func (qapi *QueryAPI) stores(_ *http.Request) (interface{}, []error, *api.ApiErr
return statuses, nil, nil
}

func NewTargetsHandler(client targets.UnaryClient, enablePartialResponse bool) func(*http.Request) (interface{}, []error, *api.ApiError) {
ps := storepb.PartialResponseStrategy_ABORT
if enablePartialResponse {
ps = storepb.PartialResponseStrategy_WARN
}

return func(r *http.Request) (interface{}, []error, *api.ApiError) {
stateParam := r.URL.Query().Get("state")
state, ok := targetspb.TargetsRequest_State_value[strings.ToUpper(stateParam)]
if !ok {
if stateParam != "" {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Errorf("invalid targets parameter state='%v'", stateParam)}
}
state = int32(targetspb.TargetsRequest_ANY)
}

req := &targetspb.TargetsRequest{
State: targetspb.TargetsRequest_State(state),
PartialResponseStrategy: ps,
}

t, warnings, err := client.Targets(r.Context(), req)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Wrap(err, "retrieving targets")}
}

return t, warnings, nil
}
}

// NewRulesHandler created handler compatible with HTTP /api/v1/rules https://prometheus.io/docs/prometheus/latest/querying/api/#rules
// which uses gRPC Unary Rules API.
func NewRulesHandler(client rules.UnaryClient, enablePartialResponse bool) func(*http.Request) (interface{}, []error, *api.ApiError) {
Expand Down
17 changes: 17 additions & 0 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/thanos-io/thanos/pkg/rules/rulespb"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/targets/targetspb"
"github.com/thanos-io/thanos/pkg/tracing"
"google.golang.org/grpc/codes"
yaml "gopkg.in/yaml.v2"
Expand Down Expand Up @@ -758,3 +759,19 @@ func (c *Client) MetadataInGRPC(ctx context.Context, base *url.URL, metric strin
}
return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/metadata HTTP[client]", &u, &v)
}

func (c *Client) TargetsInGRPC(ctx context.Context, base *url.URL, stateTargets string) (*targetspb.TargetDiscovery, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/targets")

if stateTargets != "" {
q := u.Query()
q.Add("state", stateTargets)
u.RawQuery = q.Encode()
}

var v struct {
Data *targetspb.TargetDiscovery `json:"data"`
}
return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/targets HTTP[client]", &u, &v)
}
56 changes: 54 additions & 2 deletions pkg/query/storeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/targets/targetspb"
)

const (
Expand All @@ -50,6 +51,11 @@ type RuleSpec interface {
Addr() string
}

type TargetSpec interface {
// Addr returns TargetsAPI Address for the targets spec. It is used as its ID.
Addr() string
}

type MetadataSpec interface {
// Addr returns MetadataAPI Address for the metadata spec. It is used as its ID.
Addr() string
Expand Down Expand Up @@ -187,6 +193,7 @@ type StoreSet struct {
// accessible and we close gRPC client for it.
storeSpecs func() []StoreSpec
ruleSpecs func() []RuleSpec
targetSpecs func() []TargetSpec
metadataSpecs func() []MetadataSpec
dialOpts []grpc.DialOption
gRPCInfoCallTimeout time.Duration
Expand All @@ -210,6 +217,7 @@ func NewStoreSet(
reg *prometheus.Registry,
storeSpecs func() []StoreSpec,
ruleSpecs func() []RuleSpec,
targetSpecs func() []TargetSpec,
metadataSpecs func() []MetadataSpec,
dialOpts []grpc.DialOption,
unhealthyStoreTimeout time.Duration,
Expand All @@ -228,6 +236,9 @@ func NewStoreSet(
if ruleSpecs == nil {
ruleSpecs = func() []RuleSpec { return nil }
}
if targetSpecs == nil {
targetSpecs = func() []TargetSpec { return nil }
}
if metadataSpecs == nil {
metadataSpecs = func() []MetadataSpec { return nil }
}
Expand All @@ -236,6 +247,7 @@ func NewStoreSet(
logger: log.With(logger, "component", "storeset"),
storeSpecs: storeSpecs,
ruleSpecs: ruleSpecs,
targetSpecs: targetSpecs,
metadataSpecs: metadataSpecs,
dialOpts: dialOpts,
storesMetric: storesMetric,
Expand All @@ -258,6 +270,9 @@ type storeRef struct {
rule rulespb.RulesClient
metadata metadatapb.MetadataClient

// If target is not nil, then this store also supports targets API.
target targetspb.TargetsClient

// Meta (can change during runtime).
labelSets []labels.Labels
storeType component.StoreAPI
Expand All @@ -267,7 +282,7 @@ type storeRef struct {
logger log.Logger
}

func (s *storeRef) Update(labelSets []labels.Labels, minTime int64, maxTime int64, storeType component.StoreAPI, rule rulespb.RulesClient, metadata metadatapb.MetadataClient) {
func (s *storeRef) Update(labelSets []labels.Labels, minTime int64, maxTime int64, storeType component.StoreAPI, rule rulespb.RulesClient, target targetspb.TargetsClient, metadata metadatapb.MetadataClient) {
s.mtx.Lock()
defer s.mtx.Unlock()

Expand All @@ -276,6 +291,7 @@ func (s *storeRef) Update(labelSets []labels.Labels, minTime int64, maxTime int6
s.minTime = minTime
s.maxTime = maxTime
s.rule = rule
s.target = target
s.metadata = metadata
}

Expand All @@ -293,6 +309,13 @@ func (s *storeRef) HasRulesAPI() bool {
return s.rule != nil
}

func (s *storeRef) HasTargetsAPI() bool {
s.mtx.RLock()
defer s.mtx.RUnlock()

return s.target != nil
}

func (s *storeRef) HasMetadataAPI() bool {
s.mtx.RLock()
defer s.mtx.RUnlock()
Expand Down Expand Up @@ -405,6 +428,10 @@ func (s *StoreSet) Update(ctx context.Context) {
level.Info(s.logger).Log("msg", "adding new rulesAPI to query storeset", "address", addr)
}

if st.HasTargetsAPI() {
level.Info(s.logger).Log("msg", "adding new targetsAPI to query storeset", "address", addr)
}

level.Info(s.logger).Log("msg", "adding new storeAPI to query storeset", "address", addr, "extLset", extLset)
}

Expand All @@ -425,6 +452,7 @@ func (s *StoreSet) getActiveStores(ctx context.Context, stores map[string]*store

storeAddrSet = make(map[string]struct{})
ruleAddrSet = make(map[string]struct{})
targetAddrSet = make(map[string]struct{})
metadataAddrSet = make(map[string]struct{})
)

Expand All @@ -433,6 +461,11 @@ func (s *StoreSet) getActiveStores(ctx context.Context, stores map[string]*store
ruleAddrSet[ruleSpec.Addr()] = struct{}{}
}

// Gather active targets map concurrently. Add a new target if it does not exist already.
for _, targetSpec := range s.targetSpecs() {
targetAddrSet[targetSpec.Addr()] = struct{}{}
}

// Gather active stores map concurrently. Build new store if does not exist already.
for _, metadataSpec := range s.metadataSpecs() {
metadataAddrSet[metadataSpec.Addr()] = struct{}{}
Expand Down Expand Up @@ -473,6 +506,11 @@ func (s *StoreSet) getActiveStores(ctx context.Context, stores map[string]*store
rule = rulespb.NewRulesClient(st.cc)
}

var target targetspb.TargetsClient
if _, ok := targetAddrSet[addr]; ok {
target = targetspb.NewTargetsClient(st.cc)
}

var metadata metadatapb.MetadataClient
if _, ok := metadataAddrSet[addr]; ok {
metadata = metadatapb.NewMetadataClient(st.cc)
Expand Down Expand Up @@ -502,7 +540,7 @@ func (s *StoreSet) getActiveStores(ctx context.Context, stores map[string]*store
}

s.updateStoreStatus(st, nil)
st.Update(labelSets, minTime, maxTime, storeType, rule, metadata)
st.Update(labelSets, minTime, maxTime, storeType, rule, target, metadata)

mtx.Lock()
defer mtx.Unlock()
Expand Down Expand Up @@ -586,6 +624,20 @@ func (s *StoreSet) GetRulesClients() []rulespb.RulesClient {
return rules
}

// GetTargetsClients returns a list of all active targets clients.
func (s *StoreSet) GetTargetsClients() []targetspb.TargetsClient {
s.storesMtx.RLock()
defer s.storesMtx.RUnlock()

targets := make([]targetspb.TargetsClient, 0, len(s.stores))
for _, st := range s.stores {
if st.HasTargetsAPI() {
targets = append(targets, st.target)
}
}
return targets
}

// GetMetadataClients returns a list of all active metadata clients.
func (s *StoreSet) GetMetadataClients() []metadatapb.MetadataClient {
s.storesMtx.RLock()
Expand Down
Loading

0 comments on commit 5578a59

Please sign in to comment.