Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

querier, receiver, sidecar, store: Add gRPC health check endpoints #2008

Merged
merged 10 commits into from
Jan 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#1970](https://github.com/thanos-io/thanos/issues/1970) *breaking* Receive: Use gRPC for forwarding requests between peers. Note that existing values for the `--receive.local-endpoint` flag and the endpoints in the hashring configuration file must now specify the receive gRPC port and must be updated to be a simple `host:port` combination, e.g. `127.0.0.1:10901`, rather than a full HTTP URL, e.g. `http://127.0.0.1:10902/api/v1/receive`.
- [#1939](https://github.com/thanos-io/thanos/pull/1939) Ruler: Add TLS and authentication support for query endpoints with the `--query.config` and `--query.config-file` CLI flags. See [documentation](docs/components/rule.md/#configuration) for further information.
- [#1982](https://github.com/thanos-io/thanos/pull/1982) Ruler: Add support for Alertmanager v2 API endpoints.
- #2030 Query: Add `thanos_proxy_store_empty_stream_responses_total` metric for number of empty responses from stores.
- [#2030](https://github.com/thanos-io/thanos/pull/2030) Query: Add `thanos_proxy_store_empty_stream_responses_total` metric for number of empty responses from stores.
- [#2049](https://github.com/thanos-io/thanos/pull/2049) Tracing: Support sampling on Elastic APM with new sample_rate setting.
- [#2008](https://github.com/thanos-io/thanos/pull/2008) Querier, Receiver, Sidecar, Store: Add gRPC [health check](https://github.com/grpc/grpc/blob/master/doc/health-checking.md) endpoints.

### Changed

Expand Down
11 changes: 8 additions & 3 deletions cmd/thanos/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,14 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str
m[name+" web"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ bool) error {
ctx, cancel := context.WithCancel(context.Background())

statusProber := prober.New(component.Bucket, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
srv := httpserver.New(logger, reg, component.Bucket, statusProber,
comp := component.Bucket
httpProbe := prober.NewHTTP()
statusProber := prober.Combine(
httpProbe,
prober.NewInstrumentation(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)),
)

srv := httpserver.New(logger, reg, comp, httpProbe,
httpserver.WithListen(*httpBindAddr),
httpserver.WithGracePeriod(time.Duration(*httpGracePeriod)),
)
Expand Down
10 changes: 7 additions & 3 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,13 @@ func runCompact(

downsampleMetrics := newDownsampleMetrics(reg)

statusProber := prober.New(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
srv := httpserver.New(logger, reg, component, statusProber,
httpProbe := prober.NewHTTP()
statusProber := prober.Combine(
httpProbe,
prober.NewInstrumentation(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)),
)

srv := httpserver.New(logger, reg, component, httpProbe,
httpserver.WithListen(httpBindAddr),
httpserver.WithGracePeriod(httpGracePeriod),
)
Expand Down
11 changes: 8 additions & 3 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,13 @@ func runDownsample(
}
}()

httpProbe := prober.NewHTTP()
statusProber := prober.Combine(
httpProbe,
prober.NewInstrumentation(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)),
)

metrics := newDownsampleMetrics(reg)
statusProber := prober.New(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
// Start cycle of syncing blocks from the bucket and garbage collecting the bucket.
{
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -129,11 +134,11 @@ func runDownsample(
})
}

// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
srv := httpserver.New(logger, reg, comp, statusProber,
srv := httpserver.New(logger, reg, comp, httpProbe,
httpserver.WithListen(httpBindAddr),
httpserver.WithGracePeriod(httpGracePeriod),
)

g.Add(func() error {
statusProber.Healthy()

Expand Down
16 changes: 11 additions & 5 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,9 +308,16 @@ func runQuery(
cancel()
})
}
// Start query API + UI HTTP server.

statusProber := prober.New(comp, logger, reg)
grpcProbe := prober.NewGRPC()
httpProbe := prober.NewHTTP()
statusProber := prober.Combine(
httpProbe,
grpcProbe,
prober.NewInstrumentation(comp, logger, reg),
)

// Start query API + UI HTTP server.
{
router := route.New()

Expand All @@ -334,8 +341,7 @@ func runQuery(

api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger, ins)

// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
srv := httpserver.New(logger, reg, comp, statusProber,
srv := httpserver.New(logger, reg, comp, httpProbe,
httpserver.WithListen(httpBindAddr),
httpserver.WithGracePeriod(httpGracePeriod),
)
Expand All @@ -359,7 +365,7 @@ func runQuery(
return errors.Wrap(err, "setup gRPC server")
}

s := grpcserver.New(logger, reg, tracer, comp, proxy,
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, proxy,
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
14 changes: 10 additions & 4 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,14 @@ func runReceive(
DialOpts: dialOpts,
})

statusProber := prober.New(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
grpcProbe := prober.NewGRPC()
httpProbe := prober.NewHTTP()
statusProber := prober.Combine(
httpProbe,
grpcProbe,
prober.NewInstrumentation(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)),
)

confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
Expand Down Expand Up @@ -351,8 +358,7 @@ func runReceive(
}

level.Debug(logger).Log("msg", "setting up http server")
// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
srv := httpserver.New(logger, reg, comp, statusProber,
srv := httpserver.New(logger, reg, comp, httpProbe,
httpserver.WithListen(httpBindAddr),
httpserver.WithGracePeriod(httpGracePeriod),
)
Expand Down Expand Up @@ -389,7 +395,7 @@ func runReceive(
WriteableStoreServer: webHandler,
}

s = grpcserver.NewReadWrite(logger, &receive.UnRegisterer{Registerer: reg}, tracer, comp, rw,
s = grpcserver.NewReadWrite(logger, &receive.UnRegisterer{Registerer: reg}, tracer, comp, grpcProbe, rw,
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
15 changes: 11 additions & 4 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,15 @@ func runRule(
close(cancel)
})
}
statusProber := prober.New(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))

grpcProbe := prober.NewGRPC()
httpProbe := prober.NewHTTP()
statusProber := prober.Combine(
httpProbe,
grpcProbe,
prober.NewInstrumentation(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)),
)

// Start gRPC server.
{
store := store.NewTSDBStore(logger, reg, db, component.Rule, lset)
Expand All @@ -548,7 +556,7 @@ func runRule(
return errors.Wrap(err, "setup gRPC server")
}

s := grpcserver.New(logger, reg, tracer, comp, store,
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, store,
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down Expand Up @@ -590,8 +598,7 @@ func runRule(
api := v1.NewAPI(logger, reg, ruleMgr)
api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger, ins)

// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
srv := httpserver.New(logger, reg, comp, statusProber,
srv := httpserver.New(logger, reg, comp, httpProbe,
httpserver.WithListen(httpBindAddr),
httpserver.WithGracePeriod(httpGracePeriod),
)
Expand Down
14 changes: 10 additions & 4 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,15 @@ func runSidecar(
uploads = false
}

// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
statusProber := prober.New(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
srv := httpserver.New(logger, reg, comp, statusProber,
grpcProbe := prober.NewGRPC()
httpProbe := prober.NewHTTP()
statusProber := prober.Combine(
httpProbe,
grpcProbe,
prober.NewInstrumentation(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)),
)

srv := httpserver.New(logger, reg, comp, httpProbe,
httpserver.WithListen(httpBindAddr),
httpserver.WithGracePeriod(httpGracePeriod),
)
Expand Down Expand Up @@ -269,7 +275,7 @@ func runSidecar(
return errors.Wrap(err, "setup gRPC server")
}

s := grpcserver.New(logger, reg, tracer, comp, promStore,
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, promStore,
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
14 changes: 10 additions & 4 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,15 @@ func runStore(
advertiseCompatibilityLabel bool,
enableIndexHeader bool,
) error {
// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
statusProber := prober.New(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
srv := httpserver.New(logger, reg, component, statusProber,
grpcProbe := prober.NewGRPC()
httpProbe := prober.NewHTTP()
statusProber := prober.Combine(
httpProbe,
grpcProbe,
prober.NewInstrumentation(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)),
)

srv := httpserver.New(logger, reg, component, httpProbe,
httpserver.WithListen(httpBindAddr),
httpserver.WithGracePeriod(httpGracePeriod),
)
Expand Down Expand Up @@ -278,7 +284,7 @@ func runStore(
return errors.Wrap(err, "setup gRPC server")
}

s := grpcserver.New(logger, reg, tracer, component, bs,
s := grpcserver.New(logger, reg, tracer, component, grpcProbe, bs,
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
53 changes: 53 additions & 0 deletions pkg/prober/combiner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package prober

import "sync"

type combined struct {
mu sync.Mutex
probes []Probe
}

// Combine folds given probes into one, reflects their statuses in a thread-safe way.
func Combine(probes ...Probe) Probe {
return &combined{probes: probes}
}

// Ready sets components status to ready.
func (p *combined) Ready() {
p.mu.Lock()
defer p.mu.Unlock()

for _, probe := range p.probes {
probe.Ready()
}
}

// NotReady sets components status to not ready with given error as a cause.
func (p *combined) NotReady(err error) {
p.mu.Lock()
defer p.mu.Unlock()

for _, probe := range p.probes {
probe.NotReady(err)
}
}

// Healthy sets components status to healthy.
func (p *combined) Healthy() {
p.mu.Lock()
defer p.mu.Unlock()

for _, probe := range p.probes {
probe.Healthy()
}
}

// NotHealthy sets components status to not healthy with given error as a cause.
func (p *combined) NotHealthy(err error) {
p.mu.Lock()
defer p.mu.Unlock()

for _, probe := range p.probes {
probe.NotHealthy(err)
}
}
44 changes: 44 additions & 0 deletions pkg/prober/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package prober

import (
"google.golang.org/grpc/health"
grpc_health "google.golang.org/grpc/health/grpc_health_v1"
)

// GRPCProbe represents health and readiness status of given component, and provides GRPC integration.
type GRPCProbe struct {
h *health.Server
}

// NewGRPC creates a Probe that wrapped around grpc/healt.Server which reflects status of server.
func NewGRPC() *GRPCProbe {
h := health.NewServer()
h.SetServingStatus("", grpc_health.HealthCheckResponse_NOT_SERVING)

return &GRPCProbe{h: h}
}

// HealthServer returns a gRPC health server which responds readiness and liveness checks.
func (p *GRPCProbe) HealthServer() *health.Server {
return p.h
}

// Ready sets components status to ready.
func (p *GRPCProbe) Ready() {
p.h.SetServingStatus("", grpc_health.HealthCheckResponse_SERVING)
}

// NotReady sets components status to not ready with given error as a cause.
func (p *GRPCProbe) NotReady(err error) {
p.h.SetServingStatus("", grpc_health.HealthCheckResponse_NOT_SERVING)
}

// Healthy sets components status to healthy.
func (p *GRPCProbe) Healthy() {
p.h.Resume()
}

// NotHealthy sets components status to not healthy with given error as a cause.
func (p *GRPCProbe) NotHealthy(err error) {
p.h.Shutdown()
}
Loading