diff --git a/CHANGELOG.md b/CHANGELOG.md index e3a45cee6a..133784cd8a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,7 +27,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#6467](https://github.com/thanos-io/thanos/pull/6467) Mixin (Receive): add alert for tenant reaching head series limit. ### Fixed -- [#6496](https://github.com/thanos-io/thanos/pull/6496): *: Remove unnecessary configuration reload from `ContentPathReloader` and improve its tests. +- [#6503](https://github.com/thanos-io/thanos/pull/6503) *: Change the engine behind `ContentPathReloader` to be completely independent of any filesystem concept. This effectively fixes this configuration reload when used with Kubernetes ConfigMaps, Secrets, or other volume mounts. +- [#6496](https://github.com/thanos-io/thanos/pull/6496) *: Remove unnecessary configuration reload from `ContentPathReloader` and improve its tests. - [#6456](https://github.com/thanos-io/thanos/pull/6456) Store: fix crash when computing set matches from regex pattern - [#6427](https://github.com/thanos-io/thanos/pull/6427) Receive: increasing log level for failed uploads to error - [#6172](https://github.com/thanos-io/thanos/pull/6172) query-frontend: return JSON formatted errors for invalid PromQL expression in the split by interval middleware. diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index c12390965e..ee9dbb5b16 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -226,7 +226,7 @@ func runReceive( return errors.Wrap(err, "parse limit configuration") } } - limiter, err := receive.NewLimiter(conf.writeLimitsConfig, reg, receiveMode, log.With(logger, "component", "receive-limiter")) + limiter, err := receive.NewLimiter(conf.writeLimitsConfig, reg, receiveMode, log.With(logger, "component", "receive-limiter"), conf.limitsConfigReloadTimer) if err != nil { return errors.Wrap(err, "creating limiter") } @@ -822,8 +822,9 @@ type receiveConfig struct { reqLogConfig *extflag.PathOrContent relabelConfigPath *extflag.PathOrContent - writeLimitsConfig *extflag.PathOrContent - storeRateLimits store.SeriesSelectLimits + writeLimitsConfig *extflag.PathOrContent + storeRateLimits store.SeriesSelectLimits + limitsConfigReloadTimer time.Duration } func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -953,6 +954,8 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { rc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd) rc.writeLimitsConfig = extflag.RegisterPathOrContent(cmd, "receive.limits-config", "YAML file that contains limit configuration.", extflag.WithEnvSubstitution(), extflag.WithHidden()) + cmd.Flag("receive.limits-config-reload-timer", "Minimum amount of time to pass for the limit configuration to be reloaded. Helps to avoid excessive reloads."). + Default("1s").Hidden().DurationVar(&rc.limitsConfigReloadTimer) } // determineMode returns the ReceiverMode that this receiver is configured to run in. diff --git a/docs/proposals-done/202005-query-logging.md b/docs/proposals-done/202005-query-logging.md index eb17a7e4a5..9894fa2be3 100644 --- a/docs/proposals-done/202005-query-logging.md +++ b/docs/proposals-done/202005-query-logging.md @@ -131,8 +131,8 @@ For HTTP, all these nice middlewares are not present, so we need to code up our **Policy of Request Logging** : We are going to use the following middleware decider for achieving the above objectives - -* [`logging.WithDecider`](https://github.com/grpc-ecosystem/go-grpc-middleware/blob/v2/interceptors/logging/options.go#L51) - This `option` would help us in deciding the logic whether a given request should be logged or not. This should help in logging specific / all queries. It is all pre request interception. -* [`logging.WithLevels`](https://github.com/grpc-ecosystem/go-grpc-middleware/blob/v2/interceptors/logging/options.go#L58) - This `option` would help us in fixating the level of query that we might want to log. So based on a query hitting a certain criteria, we can switch the levels of it. +* [`logging.WithDecider`](https://github.com/grpc-ecosystem/go-grpc-middleware/blob/v2.0.0-rc.5/interceptors/logging/options.go#L51) - This `option` would help us in deciding the logic whether a given request should be logged or not. This should help in logging specific / all queries. It is all pre request interception. +* [`logging.WithLevels`](https://github.com/grpc-ecosystem/go-grpc-middleware/blob/v2.0.0-rc.5/interceptors/logging/options.go#L58) - This `option` would help us in fixating the level of query that we might want to log. So based on a query hitting a certain criteria, we can switch the levels of it. * For using the request-id, we can store the request-id in the metadata of the `context`, and while logging, we can use it. * Same equivalent for the HTTP can be implemented for mirroring the logic of *gRPC middlewares*. * So we would generally have request logging and options: diff --git a/pkg/extkingpin/path_content_reloader.go b/pkg/extkingpin/path_content_reloader.go index 6979424a83..e0d8fdf74b 100644 --- a/pkg/extkingpin/path_content_reloader.go +++ b/pkg/extkingpin/path_content_reloader.go @@ -5,13 +5,11 @@ package extkingpin import ( "context" - "fmt" + "crypto/sha256" "os" - "path" "path/filepath" "time" - "github.com/fsnotify/fsnotify" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/pkg/errors" @@ -22,74 +20,61 @@ type fileContent interface { Path() string } -// PathContentReloader starts a file watcher that monitors the file indicated by fileContent.Path() and runs -// reloadFunc whenever a change is detected. -// A debounce timer can be configured via opts to handle situations where many "write" events are received together or -// a "create" event is followed up by a "write" event, for example. Files will be effectively reloaded at the latest -// after 2 times the debounce timer. By default the debouncer timer is 1 second. -// To ensure renames and deletes are properly handled, the file watcher is put at the file's parent folder. See -// https://github.com/fsnotify/fsnotify/issues/214 for more details. +// PathContentReloader runs the reloadFunc when it detects that the contents of fileContent have changed. func PathContentReloader(ctx context.Context, fileContent fileContent, logger log.Logger, reloadFunc func(), debounceTime time.Duration) error { filePath, err := filepath.Abs(fileContent.Path()) if err != nil { return errors.Wrap(err, "getting absolute file path") } - watcher, err := fsnotify.NewWatcher() - if filePath == "" { - level.Debug(logger).Log("msg", "no path detected for config reload") + engine := &pollingEngine{ + filePath: filePath, + logger: logger, + debounce: debounceTime, + reloadFunc: reloadFunc, } - if err != nil { - return errors.Wrap(err, "creating file watcher") + return engine.start(ctx) +} + +// pollingEngine keeps rereading the contents at filePath and when its checksum changes it runs the reloadFunc. +type pollingEngine struct { + filePath string + logger log.Logger + debounce time.Duration + reloadFunc func() + previousChecksum [sha256.Size]byte +} + +func (p *pollingEngine) start(ctx context.Context) error { + configReader := func() { + // check if file still exists + if _, err := os.Stat(p.filePath); os.IsNotExist(err) { + level.Error(p.logger).Log("msg", "file does not exist", "error", err) + return + } + file, err := os.ReadFile(p.filePath) + if err != nil { + level.Error(p.logger).Log("msg", "error opening file", "error", err) + return + } + checksum := sha256.Sum256(file) + if checksum == p.previousChecksum { + return + } + p.reloadFunc() + p.previousChecksum = checksum + level.Debug(p.logger).Log("msg", "configuration reloaded", "path", p.filePath) } go func() { - var reloadTimer *time.Timer - if debounceTime != 0 { - reloadTimer = time.AfterFunc(debounceTime, func() { - reloadFunc() - level.Debug(logger).Log("msg", "configuration reloaded after debouncing") - }) - reloadTimer.Stop() - } - defer watcher.Close() for { select { case <-ctx.Done(): - if reloadTimer != nil { - reloadTimer.Stop() - } return - case event := <-watcher.Events: - // fsnotify sometimes sends a bunch of events without name or operation. - // It's unclear what they are and why they are sent - filter them out. - if event.Name == "" { - break - } - // We are watching the file's parent folder (more details on this is done can be found below), but are - // only interested in changed to the target file. Discard every other file as quickly as possible. - if event.Name != filePath { - break - } - // We only react to files being written or created. - // On chmod or remove we have nothing to do. - // On rename we have the old file name (not useful). A create event for the new file will come later. - if event.Op&fsnotify.Write == 0 && event.Op&fsnotify.Create == 0 { - break - } - level.Debug(logger).Log("msg", fmt.Sprintf("change detected for %s", filePath), "eventName", event.Name, "eventOp", event.Op) - if reloadTimer != nil { - reloadTimer.Reset(debounceTime) - } - case err := <-watcher.Errors: - level.Error(logger).Log("msg", "watcher error", "error", err) + case <-time.After(p.debounce): + configReader() } } }() - // We watch the file's parent folder and not the file itself to better handle DELETE and RENAME events. Check - // https://github.com/fsnotify/fsnotify/issues/214 for more details. - if err := watcher.Add(path.Dir(filePath)); err != nil { - return errors.Wrapf(err, "adding path %s to file watcher", filePath) - } return nil } diff --git a/pkg/extkingpin/path_content_reloader_test.go b/pkg/extkingpin/path_content_reloader_test.go index 7ab9a93b9f..2725e45883 100644 --- a/pkg/extkingpin/path_content_reloader_test.go +++ b/pkg/extkingpin/path_content_reloader_test.go @@ -11,13 +11,18 @@ import ( "testing" "time" + "github.com/pkg/errors" + + "github.com/thanos-io/thanos/pkg/runutil" + "github.com/efficientgo/core/testutil" "github.com/go-kit/log" ) func TestPathContentReloader(t *testing.T) { + t.Parallel() type args struct { - runSteps func(t *testing.T, testFile string, pathContent *staticPathContent) + runSteps func(t *testing.T, testFile string, pathContent *staticPathContent, reloadTime time.Duration) } tests := []struct { name string @@ -25,59 +30,48 @@ func TestPathContentReloader(t *testing.T) { wantReloads int }{ { - name: "Many operations, only rewrite triggers one reload", + name: "Many operations, only rewrite triggers one reload + plus the initial reload", args: args{ - runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent) { + runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent, reloadTime time.Duration) { testutil.Ok(t, os.Chmod(testFile, 0777)) testutil.Ok(t, os.Remove(testFile)) testutil.Ok(t, pathContent.Rewrite([]byte("test modified"))) }, }, - wantReloads: 1, - }, - { - name: "Many operations, only rename triggers one reload", - args: args{ - runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent) { - testutil.Ok(t, os.Chmod(testFile, 0777)) - testutil.Ok(t, os.Rename(testFile, testFile+".tmp")) - testutil.Ok(t, os.Rename(testFile+".tmp", testFile)) - }, - }, - wantReloads: 1, + wantReloads: 2, }, { - name: "Many operations, two rewrites trigger two reloads", + name: "Many operations, two rewrites trigger two reloads + the initial reload", args: args{ - runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent) { + runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent, reloadTime time.Duration) { testutil.Ok(t, os.Chmod(testFile, 0777)) testutil.Ok(t, os.Remove(testFile)) testutil.Ok(t, pathContent.Rewrite([]byte("test modified"))) - time.Sleep(2 * time.Second) + time.Sleep(2 * reloadTime) testutil.Ok(t, pathContent.Rewrite([]byte("test modified again"))) }, }, - wantReloads: 2, + wantReloads: 3, }, { - name: "Chmod doesn't trigger reload", + name: "Chmod doesn't trigger reload, we have only the initial reload", args: args{ - runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent) { + runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent, reloadTime time.Duration) { testutil.Ok(t, os.Chmod(testFile, 0777)) testutil.Ok(t, os.Chmod(testFile, 0666)) testutil.Ok(t, os.Chmod(testFile, 0777)) }, }, - wantReloads: 0, + wantReloads: 1, }, { - name: "Remove doesn't trigger reload", + name: "Remove doesn't trigger reload, we have only the initial reload", args: args{ - runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent) { + runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent, reloadTime time.Duration) { testutil.Ok(t, os.Remove(testFile)) }, }, - wantReloads: 0, + wantReloads: 1, }, } for _, tt := range tests { @@ -86,28 +80,40 @@ func TestPathContentReloader(t *testing.T) { t.Parallel() testFile := path.Join(t.TempDir(), "test") testutil.Ok(t, os.WriteFile(testFile, []byte("test"), 0666)) + pathContent, err := NewStaticPathContent(testFile) testutil.Ok(t, err) wg := &sync.WaitGroup{} wg.Add(tt.wantReloads) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() reloadCount := 0 + configReloadTime := 500 * time.Millisecond err = PathContentReloader(ctx, pathContent, log.NewLogfmtLogger(os.Stdout), func() { reloadCount++ wg.Done() - }, 100*time.Millisecond) + }, configReloadTime) testutil.Ok(t, err) + // wait for the initial reload + testutil.NotOk(t, runutil.Repeat(configReloadTime, ctx.Done(), func() error { + if reloadCount != 1 { + return nil + } + return errors.New("reload count matched") + })) - tt.args.runSteps(t, testFile, pathContent) - if tt.wantReloads == 0 { - // Give things a little time to sync. The fs watcher events are heavily async and could be delayed. - time.Sleep(1 * time.Second) - } + tt.args.runSteps(t, testFile, pathContent, configReloadTime) wg.Wait() - testutil.Equals(t, tt.wantReloads, reloadCount) + + // wait for final reload + testutil.NotOk(t, runutil.Repeat(2*configReloadTime, ctx.Done(), func() error { + if reloadCount != tt.wantReloads { + return nil + } + return errors.New("reload count matched") + })) }) } } diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index 7d40d83136..b2bb5c043e 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -183,7 +183,7 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin } ag := addrGen{} - limiter, _ := NewLimiter(NewNopConfig(), nil, RouterIngestor, log.NewNopLogger()) + limiter, _ := NewLimiter(NewNopConfig(), nil, RouterIngestor, log.NewNopLogger(), 1*time.Second) for i := range appendables { h := NewHandler(nil, &Options{ TenantHeader: tenancy.DefaultTenantHeader, @@ -741,7 +741,7 @@ func TestReceiveWriteRequestLimits(t *testing.T) { testutil.Ok(t, os.WriteFile(tmpLimitsPath, tenantConfig, 0666)) limitConfig, _ := extkingpin.NewStaticPathContent(tmpLimitsPath) handler.Limiter, _ = NewLimiter( - limitConfig, nil, RouterIngestor, log.NewNopLogger(), + limitConfig, nil, RouterIngestor, log.NewNopLogger(), 1*time.Second, ) wreq := &prompb.WriteRequest{ diff --git a/pkg/receive/limiter.go b/pkg/receive/limiter.go index ac79b4c5bd..f5b0713e20 100644 --- a/pkg/receive/limiter.go +++ b/pkg/receive/limiter.go @@ -33,6 +33,7 @@ type Limiter struct { configReloadCounter prometheus.Counter configReloadFailedCounter prometheus.Counter receiverMode ReceiverMode + configReloadTimer time.Duration } // headSeriesLimiter encompasses active/head series limiting logic. @@ -55,13 +56,14 @@ type fileContent interface { // NewLimiter creates a new *Limiter given a configuration and prometheus // registerer. -func NewLimiter(configFile fileContent, reg prometheus.Registerer, r ReceiverMode, logger log.Logger) (*Limiter, error) { +func NewLimiter(configFile fileContent, reg prometheus.Registerer, r ReceiverMode, logger log.Logger, configReloadTimer time.Duration) (*Limiter, error) { limiter := &Limiter{ writeGate: gate.NewNoop(), requestLimiter: &noopRequestLimiter{}, HeadSeriesLimiter: NewNopSeriesLimit(), logger: logger, receiverMode: r, + configReloadTimer: configReloadTimer, } if reg != nil { @@ -116,7 +118,7 @@ func (l *Limiter) StartConfigReloader(ctx context.Context) error { if reloadCounter := l.configReloadCounter; reloadCounter != nil { reloadCounter.Inc() } - }, 1*time.Second) + }, l.configReloadTimer) } func (l *Limiter) CanReload() bool { diff --git a/pkg/receive/limiter_test.go b/pkg/receive/limiter_test.go index 850124c7d2..960e567a17 100644 --- a/pkg/receive/limiter_test.go +++ b/pkg/receive/limiter_test.go @@ -32,7 +32,7 @@ func TestLimiter_StartConfigReloader(t *testing.T) { t.Fatalf("could not load test content at %s: %s", invalidLimitsPath, err) } - limiter, err := NewLimiter(goodLimits, nil, RouterIngestor, log.NewLogfmtLogger(os.Stdout)) + limiter, err := NewLimiter(goodLimits, nil, RouterIngestor, log.NewLogfmtLogger(os.Stdout), 1*time.Second) testutil.Ok(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -88,7 +88,7 @@ func TestLimiter_CanReload(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { configFile := tt.args.configFilePath - limiter, err := NewLimiter(configFile, nil, RouterIngestor, log.NewLogfmtLogger(os.Stdout)) + limiter, err := NewLimiter(configFile, nil, RouterIngestor, log.NewLogfmtLogger(os.Stdout), 1*time.Second) testutil.Ok(t, err) if tt.wantReload { testutil.Assert(t, limiter.CanReload())