Skip to content

Commit

Permalink
*: Handle SIGHUP (thanos-io#2139)
Browse files Browse the repository at this point in the history
* Handle SIGHUP

Signed-off-by: Kemal Akkoyun <[email protected]>

* Only log when signal received

Signed-off-by: Kemal Akkoyun <[email protected]>

* Use a buffered channel

Signed-off-by: Kemal Akkoyun <[email protected]>
  • Loading branch information
kakkoyun authored and vankop committed Feb 28, 2020
1 parent 850e3aa commit 67fe678
Show file tree
Hide file tree
Showing 10 changed files with 48 additions and 38 deletions.
8 changes: 4 additions & 4 deletions cmd/thanos/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func registerBucketVerify(m map[string]setupFunc, root *kingpin.CmdClause, name
Short('i').Default(verifier.IndexIssueID, verifier.OverlappedBlocksIssueID).Strings()
idWhitelist := cmd.Flag("id-whitelist", "Block IDs to verify (and optionally repair) only. "+
"If none is specified, all blocks will be verified. Repeated field").Strings()
m[name+" verify"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ bool) error {
m[name+" verify"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
Expand Down Expand Up @@ -167,7 +167,7 @@ func registerBucketLs(m map[string]setupFunc, root *kingpin.CmdClause, name stri
cmd := root.Command("ls", "List all blocks in the bucket")
output := cmd.Flag("output", "Optional format in which to print each block's information. Options are 'json', 'wide' or a custom template.").
Short('o').Default("").String()
m[name+" ls"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ bool) error {
m[name+" ls"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
Expand Down Expand Up @@ -260,7 +260,7 @@ func registerBucketInspect(m map[string]setupFunc, root *kingpin.CmdClause, name
Default("FROM", "UNTIL").Enums(inspectColumns...)
timeout := cmd.Flag("timeout", "Timeout to download metadata from remote storage").Default("5m").Duration()

m[name+" inspect"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ bool) error {
m[name+" inspect"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {

// Parse selector.
selectorLabels, err := parseFlagLabels(*selector)
Expand Down Expand Up @@ -316,7 +316,7 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str
timeout := cmd.Flag("timeout", "Timeout to download metadata from remote storage").Default("5m").Duration()
label := cmd.Flag("label", "Prometheus label to use as timeline title").String()

m[name+" web"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ bool) error {
m[name+" web"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {
ctx, cancel := context.WithCancel(context.Background())

comp := component.Bucket
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func registerCheckRules(m map[string]setupFunc, root *kingpin.CmdClause, name st
"The rule files to check.",
).Required().ExistingFiles()

m[name+" rules"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ bool) error {
m[name+" rules"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {
// Dummy actor to immediately kill the group after the run function returns.
g.Add(func() error { return nil }, func(error) {})
return checkRulesFiles(logger, ruleFiles)
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {

selectorRelabelConf := regSelectorRelabelFlags(cmd)

m[component.Compact.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
m[component.Compact.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
return runCompact(g, logger, reg,
*httpAddr,
time.Duration(*httpGracePeriod),
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func registerDownsample(m map[string]setupFunc, app *kingpin.Application) {

objStoreConfig := regCommonObjStoreFlags(cmd, "", true)

m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
return runDownsample(g, logger, reg, *httpAddr, time.Duration(*httpGracePeriod), *dataDir, objStoreConfig, comp)
}
}
Expand Down
35 changes: 33 additions & 2 deletions cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (
logFormatJson = "json"
)

type setupFunc func(*run.Group, log.Logger, *prometheus.Registry, opentracing.Tracer, bool) error
type setupFunc func(*run.Group, log.Logger, *prometheus.Registry, opentracing.Tracer, <-chan struct{}, bool) error

func main() {
if os.Getenv("DEBUG") != "" {
Expand Down Expand Up @@ -178,7 +178,10 @@ func main() {
})
}

if err := cmds[cmd](&g, logger, metrics, tracer, *logLevel == "debug"); err != nil {
// Create a signal channel to dispatch reload events to sub-commands.
reloadCh := make(chan struct{}, 1)

if err := cmds[cmd](&g, logger, metrics, tracer, reloadCh, *logLevel == "debug"); err != nil {
level.Error(logger).Log("err", errors.Wrapf(err, "%s command failed", cmd))
os.Exit(1)
}
Expand All @@ -193,6 +196,16 @@ func main() {
})
}

// Listen for reload signals.
{
cancel := make(chan struct{})
g.Add(func() error {
return reload(logger, cancel, reloadCh)
}, func(error) {
close(cancel)
})
}

if err := g.Run(); err != nil {
level.Error(logger).Log("msg", "running command failed", "err", err)
os.Exit(1)
Expand All @@ -211,3 +224,21 @@ func interrupt(logger log.Logger, cancel <-chan struct{}) error {
return errors.New("canceled")
}
}

func reload(logger log.Logger, cancel <-chan struct{}, r chan<- struct{}) error {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGHUP)
for {
select {
case s := <-c:
level.Info(logger).Log("msg", "caught signal. Reloading.", "signal", s)
select {
case r <- struct{}{}:
level.Info(logger).Log("msg", "relaod dispatched.")
default:
}
case <-cancel:
return errors.New("canceled")
}
}
}
2 changes: 1 addition & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {

storeResponseTimeout := modelDuration(cmd.Flag("store.response-timeout", "If a Store doesn't send any data in this specified duration then a Store will be ignored and partial data will be returned if it's enabled. 0 disables timeout.").Default("0ms"))

m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
selectorLset, err := parseFlagLabels(*selectorLabels)
if err != nil {
return errors.Wrap(err, "parse federation labels")
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) {

walCompression := cmd.Flag("tsdb.wal-compression", "Compress the tsdb WAL.").Default("true").Bool()

m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
lset, err := parseFlagLabels(*labelStrs)
if err != nil {
return errors.Wrap(err, "parse labels")
Expand Down
29 changes: 4 additions & 25 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,10 @@ import (
"math/rand"
"net/http"
"net/url"
"os"
"os/signal"
"path"
"path/filepath"
"strconv"
"strings"
"syscall"
"time"

"github.com/go-kit/kit/log"
Expand Down Expand Up @@ -116,7 +113,7 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application) {
dnsSDResolver := cmd.Flag("query.sd-dns-resolver", "Resolver to use. Possible options: [golang, miekgdns]").
Default("golang").Hidden().String()

m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, reload <-chan struct{}, _ bool) error {
lset, err := parseFlagLabels(*labelStrs)
if err != nil {
return errors.Wrap(err, "parse labels")
Expand Down Expand Up @@ -168,6 +165,7 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application) {
logger,
reg,
tracer,
reload,
lset,
*alertmgrs,
*alertmgrsTimeout,
Expand Down Expand Up @@ -209,6 +207,7 @@ func runRule(
logger log.Logger,
reg *prometheus.Registry,
tracer opentracing.Tracer,
reloadSignal <-chan struct{},
lset labels.Labels,
alertmgrURLs []string,
alertmgrsTimeout time.Duration,
Expand Down Expand Up @@ -484,6 +483,7 @@ func runRule(
case <-cancel:
return errors.New("canceled")
case <-reload:
case <-reloadSignal:
}

level.Debug(logger).Log("msg", "configured rule files", "files", strings.Join(ruleFiles, ","))
Expand Down Expand Up @@ -520,27 +520,6 @@ func runRule(
close(cancel)
})
}
{
cancel := make(chan struct{})

g.Add(func() error {
c := make(chan os.Signal, 1)
for {
signal.Notify(c, syscall.SIGHUP)
select {
case <-c:
select {
case reload <- struct{}{}:
default:
}
case <-cancel:
return errors.New("canceled")
}
}
}, func(error) {
close(cancel)
})
}

grpcProbe := prober.NewGRPC()
httpProbe := prober.NewHTTP()
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application) {
minTime := thanosmodel.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve. Thanos sidecar will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("0000-01-01T00:00:00Z"))

m[component.Sidecar.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
m[component.Sidecar.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
rl := reloader.New(
log.With(logger, "component", "reloader"),
reloader.ReloadURLFromBase(*promURL),
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
enableIndexHeader := cmd.Flag("experimental.enable-index-header", "If true, Store Gateway will recreate index-header instead of index-cache.json for each block. This will replace index-cache.json permanently once it will be out of experimental stage.").
Hidden().Default("false").Bool()

m[component.Store.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error {
m[component.Store.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, debugLogging bool) error {
if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() {
return errors.Errorf("invalid argument: --min-time '%s' can't be greater than --max-time '%s'",
minTime, maxTime)
Expand Down

0 comments on commit 67fe678

Please sign in to comment.