Skip to content

Commit

Permalink
feat store: added readiness and livenes prober (#1460)
Browse files Browse the repository at this point in the history
Signed-off-by: Martin Chodur <[email protected]>
Signed-off-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
FUSAKLA authored and GiedriusS committed Oct 28, 2019
1 parent c820b36 commit f0b9d09
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 110 deletions.
9 changes: 5 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ We use *breaking* word for marking changes that are not backward compatible (rel
## Unreleased

## Added
- [#1540](https://github.com/thanos-io/thanos/pull/1540) Added `/-/ready` and `/-/healthy` endpoints to Thanos Downsample.
- [#1538](https://github.com/thanos-io/thanos/pull/1538) Added `/-/ready` and `/-/healthy` endpoints to Thanos Rule.
- [#1537](https://github.com/thanos-io/thanos/pull/1537) Added `/-/ready` and `/-/healthy` endpoints to Thanos Receive.
- [#1534](https://github.com/thanos-io/thanos/pull/1534) Added `/-/ready` endpoint to Thanos Query.
- [#1540](https://github.com/thanos-io/thanos/pull/1540) Thanos Downsample added `/-/ready` and `/-/healthy` endpoints.
- [#1538](https://github.com/thanos-io/thanos/pull/1538) Thanos Rule added `/-/ready` and `/-/healthy` endpoints.
- [#1537](https://github.com/thanos-io/thanos/pull/1537) Thanos Receive added `/-/ready` and `/-/healthy` endpoints.
- [#1460](https://github.com/thanos-io/thanos/pull/1460) Thanos Store Added `/-/ready` and `/-/healthy` endpoints.
- [#1534](https://github.com/thanos-io/thanos/pull/1534) Thanos Query Added `/-/ready` and `/-/healthy` endpoints.
- [#1533](https://github.com/thanos-io/thanos/pull/1533) Thanos inspect now supports the timeout flag.

### Fixed
Expand Down
23 changes: 1 addition & 22 deletions cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func main() {

cmds := map[string]setupFunc{}
registerSidecar(cmds, app)
registerStore(cmds, app, "store")
registerStore(cmds, app)
registerQuery(cmds, app)
registerRule(cmds, app)
registerCompact(cmds, app)
Expand Down Expand Up @@ -333,27 +333,6 @@ func newStoreGRPCServer(logger log.Logger, reg *prometheus.Registry, tracer open
return s
}

// TODO Remove once all components are migrated to the new scheduleHTTPServer.
// metricHTTPListenGroup is a run.Group that servers HTTP endpoint with only Prometheus metrics.
func metricHTTPListenGroup(g *run.Group, logger log.Logger, reg *prometheus.Registry, httpBindAddr string) error {
mux := http.NewServeMux()
registerMetrics(mux, reg)
registerProfile(mux)

l, err := net.Listen("tcp", httpBindAddr)
if err != nil {
return errors.Wrap(err, "listen metrics address")
}

g.Add(func() error {
level.Info(logger).Log("msg", "Listening for metrics", "address", httpBindAddr)
return errors.Wrap(http.Serve(l, mux), "serve metrics")
}, func(error) {
runutil.CloseWithLogOnErr(logger, l, "metric listener")
})
return nil
}

// scheduleHTTPServer starts a run.Group that servers HTTP endpoint with default endpoints providing Prometheus metrics,
// profiling and liveness/readiness probes.
func scheduleHTTPServer(g *run.Group, logger log.Logger, reg *prometheus.Registry, readinessProber *prober.Prober, httpBindAddr string, handler http.Handler, comp component.Component) error {
Expand Down
172 changes: 88 additions & 84 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,22 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
kingpin "gopkg.in/alecthomas/kingpin.v2"
"gopkg.in/alecthomas/kingpin.v2"
)

// registerStore registers a store command.
func registerStore(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "store node giving access to blocks in a bucket provider. Now supported GCS, S3, Azure, Swift and Tencent COS.")
func registerStore(m map[string]setupFunc, app *kingpin.Application) {
cmd := app.Command(component.Store.String(), "store node giving access to blocks in a bucket provider. Now supported GCS, S3, Azure, Swift and Tencent COS.")

grpcBindAddr, httpBindAddr, cert, key, clientCA := regCommonServerFlags(cmd)

Expand Down Expand Up @@ -54,7 +56,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve. Thanos Store serves only blocks, which happened eariler than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("9999-12-31T23:59:59Z"))

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error {
m[component.Store.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error {
if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() {
return errors.Errorf("invalid argument: --min-time '%s' can't be greater than --max-time '%s'",
minTime, maxTime)
Expand All @@ -75,7 +77,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
uint64(*chunkPoolSize),
uint64(*maxSampleCount),
int(*maxConcurrent),
name,
component.Store,
debugLogging,
*syncInterval,
*blockSyncConcurrency,
Expand Down Expand Up @@ -104,102 +106,104 @@ func runStore(
chunkPoolSizeBytes uint64,
maxSampleCount uint64,
maxConcurrent int,
component string,
component component.Component,
verbose bool,
syncInterval time.Duration,
blockSyncConcurrency int,
filterConf *store.FilterConfig,
) error {
{
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}
statusProber := prober.NewProber(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))

confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, reg, component.String())
if err != nil {
return errors.Wrap(err, "create bucket client")
}

bkt, err := client.NewBucket(logger, confContentYaml, reg, component)
// Ensure we close up everything properly.
defer func() {
if err != nil {
return errors.Wrap(err, "create bucket client")
runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
}
}()

// Ensure we close up everything properly.
defer func() {
if err != nil {
runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
}
}()
// TODO(bwplotka): Add as a flag?
maxItemSizeBytes := indexCacheSizeBytes / 2

// TODO(bwplotka): Add as a flag?
maxItemSizeBytes := indexCacheSizeBytes / 2
indexCache, err := storecache.NewIndexCache(logger, reg, storecache.Opts{
MaxSizeBytes: indexCacheSizeBytes,
MaxItemSizeBytes: maxItemSizeBytes,
})
if err != nil {
return errors.Wrap(err, "create index cache")
}

indexCache, err := storecache.NewIndexCache(logger, reg, storecache.Opts{
MaxSizeBytes: indexCacheSizeBytes,
MaxItemSizeBytes: maxItemSizeBytes,
})
if err != nil {
return errors.Wrap(err, "create index cache")
}
bs, err := store.NewBucketStore(
logger,
reg,
bkt,
dataDir,
indexCache,
chunkPoolSizeBytes,
maxSampleCount,
maxConcurrent,
verbose,
blockSyncConcurrency,
filterConf,
)
if err != nil {
return errors.Wrap(err, "create object storage store")
}

bs, err := store.NewBucketStore(
logger,
reg,
bkt,
dataDir,
indexCache,
chunkPoolSizeBytes,
maxSampleCount,
maxConcurrent,
verbose,
blockSyncConcurrency,
filterConf,
)
if err != nil {
return errors.Wrap(err, "create object storage store")
}
begin := time.Now()
level.Debug(logger).Log("msg", "initializing bucket store")
if err := bs.InitialSync(context.Background()); err != nil {
return errors.Wrap(err, "bucket store initial sync")
}
level.Debug(logger).Log("msg", "bucket store ready", "init_duration", time.Since(begin).String())

begin := time.Now()
level.Debug(logger).Log("msg", "initializing bucket store")
if err := bs.InitialSync(context.Background()); err != nil {
return errors.Wrap(err, "bucket store initial sync")
}
level.Debug(logger).Log("msg", "bucket store ready", "init_duration", time.Since(begin).String())

ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

err := runutil.Repeat(syncInterval, ctx.Done(), func() error {
if err := bs.SyncBlocks(ctx); err != nil {
level.Warn(logger).Log("msg", "syncing blocks failed", "err", err)
}
return nil
})

runutil.CloseWithLogOnErr(logger, bs, "bucket store")
return err
}, func(error) {
cancel()
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

err := runutil.Repeat(syncInterval, ctx.Done(), func() error {
if err := bs.SyncBlocks(ctx); err != nil {
level.Warn(logger).Log("msg", "syncing blocks failed", "err", err)
}
return nil
})

l, err := net.Listen("tcp", grpcBindAddr)
if err != nil {
return errors.Wrap(err, "listen API address")
}
runutil.CloseWithLogOnErr(logger, bs, "bucket store")
return err
}, func(error) {
cancel()
})

opts, err := defaultGRPCServerOpts(logger, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "grpc server options")
}
s := newStoreGRPCServer(logger, reg, tracer, bs, opts)
l, err := net.Listen("tcp", grpcBindAddr)
if err != nil {
return errors.Wrap(err, "listen API address")
}

g.Add(func() error {
level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr)
return errors.Wrap(s.Serve(l), "serve gRPC")
}, func(error) {
s.Stop()
})
opts, err := defaultGRPCServerOpts(logger, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "grpc server options")
}
if err := metricHTTPListenGroup(g, logger, reg, httpBindAddr); err != nil {
return err
s := newStoreGRPCServer(logger, reg, tracer, bs, opts)

g.Add(func() error {
level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr)
statusProber.SetReady()
return errors.Wrap(s.Serve(l), "serve gRPC")
}, func(error) {
s.Stop()
})

if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, nil, component); err != nil {
return errors.Wrap(err, "schedule HTTP server")
}

level.Info(logger).Log("msg", "starting store node")
Expand Down
8 changes: 8 additions & 0 deletions tutorials/kubernetes-demo/manifests/thanos-store-gateway.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ spec:
containerPort: 10902
- name: grpc
containerPort: 10901
livenessProbe:
httpGet:
port: 10902
path: /-/healthy
readinessProbe:
httpGet:
port: 10902
path: /-/ready
resources:
limits:
cpu: "1"
Expand Down

0 comments on commit f0b9d09

Please sign in to comment.