Skip to content

Commit

Permalink
federated targets functionality (#1375)
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Tunik <[email protected]>
  • Loading branch information
2nick committed Oct 21, 2020
1 parent 7ec45ef commit 2f338c9
Show file tree
Hide file tree
Showing 27 changed files with 3,059 additions and 18 deletions.
Empty file modified .bingo/go.mod
100755 → 100644
Empty file.
33 changes: 33 additions & 0 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/targets"
"github.com/thanos-io/thanos/pkg/tls"
"github.com/thanos-io/thanos/pkg/ui"
)
Expand Down Expand Up @@ -94,6 +95,10 @@ func registerQuery(app *extkingpin.App) {
ruleEndpoints := cmd.Flag("rule", "Experimental: Addresses of statically configured rules API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect rule API servers through respective DNS lookups.").
Hidden().PlaceHolder("<rule>").Strings()

// TODO(atunik): Hidden because we plan to extract discovery to separate API: https://github.com/thanos-io/thanos/issues/2600.
targetEndpoints := cmd.Flag("target", "Experimental: Addresses of statically configured targets API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect rule API servers through respective DNS lookups.").
Hidden().PlaceHolder("<target>").Strings()

strictStores := cmd.Flag("store-strict", "Addresses of only statically configured store API servers that are always used, even if the health check fails. Useful if you have a caching layer on top.").
PlaceHolder("<staticstore>").Strings()

Expand Down Expand Up @@ -139,6 +144,10 @@ func registerQuery(app *extkingpin.App) {
return errors.Errorf("Address %s is duplicated for --rule flag.", dup)
}

if dup := firstDuplicate(*targetEndpoints); dup != "" {
return errors.Errorf("Address %s is duplicated for --target flag.", dup)
}

var fileSD *file.Discovery
if len(*fileSDFiles) > 0 {
conf := &file.SDConfig{
Expand Down Expand Up @@ -188,6 +197,7 @@ func registerQuery(app *extkingpin.App) {
getFlagsMap(cmd.Flags()),
*stores,
*ruleEndpoints,
*targetEndpoints,
*enableAutodownsampling,
*enableQueryPartialResponse,
*enableRulePartialResponse,
Expand Down Expand Up @@ -237,6 +247,7 @@ func runQuery(
flagsMap map[string]string,
storeAddrs []string,
ruleAddrs []string,
targetAddrs []string,
enableAutodownsampling bool,
enableQueryPartialResponse bool,
enableRulePartialResponse bool,
Expand Down Expand Up @@ -279,6 +290,12 @@ func runQuery(
dns.ResolverType(dnsSDResolver),
)

dnsTargetProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_querier_target_apis_", reg),
dns.ResolverType(dnsSDResolver),
)

var (
stores = query.NewStoreSet(
logger,
Expand All @@ -305,11 +322,22 @@ func runQuery(

return specs
},
func() (specs []query.TargetSpec) {
for _, addr := range dnsTargetProvider.Addresses() {
specs = append(specs, query.NewGRPCStoreSpec(addr, false))
}

// NOTE(s-urbaniak): No need to remove duplicates, as target apis are a subset of store apis.
// hence, any duplicates will be tracked in the store api set.

return specs
},
dialOpts,
unhealthyStoreTimeout,
)
proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout)
rulesProxy = rules.NewProxy(logger, stores.GetRulesClients)
targetsProxy = targets.NewProxy(logger, stores.GetTargetsClients)
queryableCreator = query.NewQueryableCreator(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_", reg),
Expand Down Expand Up @@ -394,6 +422,9 @@ func runQuery(
if err := dnsRuleProvider.Resolve(ctx, ruleAddrs); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for rulesAPIs", "err", err)
}
if err := dnsTargetProvider.Resolve(ctx, targetAddrs); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for targetsAPIs", "err", err)
}
return nil
})
}, func(error) {
Expand Down Expand Up @@ -444,6 +475,7 @@ func runQuery(
queryableCreator,
// NOTE: Will share the same replica label as the query for now.
rules.NewGRPCClientWithDedup(rulesProxy, queryReplicaLabels),
targets.NewGRPCClientWithDedup(targetsProxy, queryReplicaLabels),
enableAutodownsampling,
enableQueryPartialResponse,
enableRulePartialResponse,
Expand Down Expand Up @@ -486,6 +518,7 @@ func runQuery(
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(proxy)),
grpcserver.WithServer(rules.RegisterRulesServer(rulesProxy)),
grpcserver.WithServer(targets.RegisterTargetsServer(targetsProxy)),
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
2 changes: 2 additions & 0 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/targets"
"github.com/thanos-io/thanos/pkg/tls"
"github.com/thanos-io/thanos/pkg/tracing"
)
Expand Down Expand Up @@ -218,6 +219,7 @@ func runSidecar(
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(promStore)),
grpcserver.WithServer(rules.RegisterRulesServer(rules.NewPrometheus(conf.prometheus.url, c, m.Labels))),
grpcserver.WithServer(targets.RegisterTargetsServer(targets.NewPrometheus(conf.prometheus.url, c, m.Labels))),
grpcserver.WithListen(conf.grpc.bindAddress),
grpcserver.WithGracePeriod(time.Duration(conf.grpc.gracePeriod)),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb
github.com/fatih/structtag v1.1.0
github.com/felixge/fgprof v0.9.1
github.com/fortytw2/leaktest v1.3.0
github.com/fsnotify/fsnotify v1.4.9
github.com/go-kit/kit v0.10.0
github.com/go-openapi/strfmt v0.19.5
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ github.com/felixge/fgprof v0.9.1/go.mod h1:7/HK6JFtFaARhIljgP2IV8rJLIoHDoOYoUphs
github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4=
github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20=
Expand Down
37 changes: 37 additions & 0 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ import (
"github.com/thanos-io/thanos/pkg/rules/rulespb"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/targets"
"github.com/thanos-io/thanos/pkg/targets/targetspb"
"github.com/thanos-io/thanos/pkg/tracing"
)

Expand All @@ -67,6 +69,7 @@ type QueryAPI struct {
queryableCreate query.QueryableCreator
queryEngine *promql.Engine
ruleGroups rules.UnaryClient
targets targets.UnaryClient

enableAutodownsampling bool
enableQueryPartialResponse bool
Expand All @@ -86,6 +89,7 @@ func NewQueryAPI(
qe *promql.Engine,
c query.QueryableCreator,
ruleGroups rules.UnaryClient,
targets targets.UnaryClient,
enableAutodownsampling bool,
enableQueryPartialResponse bool,
enableRulePartialResponse bool,
Expand All @@ -102,6 +106,7 @@ func NewQueryAPI(
queryableCreate: c,
gate: gate,
ruleGroups: ruleGroups,
targets: targets,

enableAutodownsampling: enableAutodownsampling,
enableQueryPartialResponse: enableQueryPartialResponse,
Expand Down Expand Up @@ -136,6 +141,8 @@ func (qapi *QueryAPI) Register(r *route.Router, tracer opentracing.Tracer, logge
r.Get("/stores", instr("stores", qapi.stores))

r.Get("/rules", instr("rules", NewRulesHandler(qapi.ruleGroups, qapi.enableRulePartialResponse)))

r.Get("/targets", instr("targets", NewTargetsHandler(qapi.targets, true)))
}

type queryData struct {
Expand Down Expand Up @@ -562,6 +569,36 @@ func (qapi *QueryAPI) stores(r *http.Request) (interface{}, []error, *api.ApiErr
return statuses, nil, nil
}

func NewTargetsHandler(client targets.UnaryClient, enablePartialResponse bool) func(*http.Request) (interface{}, []error, *api.ApiError) {
ps := storepb.PartialResponseStrategy_ABORT
if enablePartialResponse {
ps = storepb.PartialResponseStrategy_WARN
}

return func(r *http.Request) (interface{}, []error, *api.ApiError) {
stateParam := r.URL.Query().Get("state")
state, ok := targetspb.TargetsRequest_State_value[strings.ToUpper(stateParam)]
if !ok {
if stateParam != "" {
return nil, nil, &api.ApiError{api.ErrorBadData, errors.Errorf("invalid targets parameter state='%v'", stateParam)}
}
state = int32(targetspb.TargetsRequest_ANY)
}

req := &targetspb.TargetsRequest{
State: targetspb.TargetsRequest_State(state),
PartialResponseStrategy: ps,
}

t, warnings, err := client.Targets(r.Context(), req)
if err != nil {
return nil, nil, &api.ApiError{api.ErrorInternal, errors.Errorf("error retrieving targets: %v", err)}
}

return t, warnings, nil
}
}

// NewRulesHandler created handler compatible with HTTP /api/v1/rules https://prometheus.io/docs/prometheus/latest/querying/api/#rules
// which uses gRPC Unary Rules API.
func NewRulesHandler(client rules.UnaryClient, enablePartialResponse bool) func(*http.Request) (interface{}, []error, *api.ApiError) {
Expand Down
33 changes: 33 additions & 0 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/thanos-io/thanos/pkg/rules/rulespb"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/targets/targetspb"
"github.com/thanos-io/thanos/pkg/tracing"
"google.golang.org/grpc/codes"
yaml "gopkg.in/yaml.v2"
Expand Down Expand Up @@ -708,3 +709,35 @@ func (c *Client) RulesInGRPC(ctx context.Context, base *url.URL, typeRules strin
}
return m.Data.Groups, nil
}

func (c *Client) TargetsInGRPC(ctx context.Context, base *url.URL, stateTargets string) (*targetspb.TargetDiscovery, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/targets")

if stateTargets != "" {
q := u.Query()
q.Add("state", stateTargets)
u.RawQuery = q.Encode()
}

level.Debug(c.logger).Log("msg", "getting targets", "url", u.String())

span, ctx := tracing.StartSpan(ctx, "/targets HTTP[client]")
defer span.Finish()

body, _, err := c.get2xx(ctx, &u)
if err != nil {
return nil, err
}

// Decode only ResultType and load Result only as RawJson since we don't know
// structure of the Result yet.
var v struct {
Data *targetspb.TargetDiscovery `json:"data"`
}
if err = json.Unmarshal(body, &v); err != nil {
return nil, errors.Wrap(err, "unmarshal targets API response")
}

return v.Data, nil
}
Loading

0 comments on commit 2f338c9

Please sign in to comment.