From 278fab044355168c8ef8c2f91169fcc18064ce5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Thu, 19 Oct 2023 17:23:37 +0300 Subject: [PATCH] receive: fix limits reloading race MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We are re-reading the limits configuration periodically and also reading it at the same time hence we need a lock around it. Thus, let's make that struct member private and add a getter that returns the limiter under a mutex lock. Fixes: ``` 17:14:45 receive-i3: WARNING: DATA RACE 17:14:45 receive-i3: Read at 0x00c00090aec0 by goroutine 131: 17:14:45 receive-i3: github.com/thanos-io/thanos/pkg/receive.(*headSeriesLimit).QueryMetaMonitoring() 17:14:45 receive-i3: /go/src/github.com/thanos-io/thanos/pkg/receive/head_series_limiter.go:109 +0x2fb 17:14:45 receive-i3: main.runReceive.func9.1() 17:14:45 receive-i3: /go/src/github.com/thanos-io/thanos/cmd/thanos/receive.go:402 +0x9b 17:14:45 receive-i3: github.com/thanos-io/thanos/pkg/runutil.Repeat() 17:14:45 receive-i3: /go/src/github.com/thanos-io/thanos/pkg/runutil/runutil.go:74 +0xc3 17:14:45 receive-i3: Previous write at 0x00c00090aec0 by goroutine 138: 17:14:45 receive-i3: github.com/thanos-io/thanos/pkg/receive.NewHeadSeriesLimit() 17:14:45 receive-i3: /go/src/github.com/thanos-io/thanos/pkg/receive/head_series_limiter.go:41 +0x316 17:14:45 receive-i3: github.com/thanos-io/thanos/pkg/receive.(*Limiter).loadConfig() 17:14:45 receive-i3: /go/src/github.com/thanos-io/thanos/pkg/receive/limiter.go:168 +0xd0d 17:14:45 receive-i3: github.com/thanos-io/thanos/pkg/receive.(*Limiter).StartConfigReloader.func1() 17:14:45 receive-i3: /go/src/github.com/thanos-io/thanos/pkg/receive/limiter.go:111 +0x207 17:14:45 receive-i3: github.com/thanos-io/thanos/pkg/extkingpin.(*pollingEngine).start.func1() ``` Signed-off-by: Giedrius Statkevičius --- cmd/thanos/receive.go | 2 +- pkg/receive/handler.go | 2 +- pkg/receive/limiter.go | 16 +++++++++++++--- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 5b699d264d..1c21cc39b3 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -399,7 +399,7 @@ func runReceive( ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { return runutil.Repeat(15*time.Second, ctx.Done(), func() error { - if err := limiter.HeadSeriesLimiter.QueryMetaMonitoring(ctx); err != nil { + if err := limiter.HeadSeriesLimiter().QueryMetaMonitoring(ctx); err != nil { level.Error(logger).Log("msg", "failed to query meta-monitoring", "err", err.Error()) } return nil diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index cc89748726..f3ab89d4fe 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -442,7 +442,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { return } - under, err := h.Limiter.HeadSeriesLimiter.isUnderLimit(tenant) + under, err := h.Limiter.HeadSeriesLimiter().isUnderLimit(tenant) if err != nil { level.Error(tLogger).Log("msg", "error while limiting", "err", err.Error()) } diff --git a/pkg/receive/limiter.go b/pkg/receive/limiter.go index 860eddc1ec..46055b3001 100644 --- a/pkg/receive/limiter.go +++ b/pkg/receive/limiter.go @@ -25,7 +25,8 @@ import ( type Limiter struct { sync.RWMutex requestLimiter requestLimiter - HeadSeriesLimiter headSeriesLimiter + headSeriesLimiterMtx sync.Mutex + headSeriesLimiter headSeriesLimiter writeGate gate.Gate registerer prometheus.Registerer configPathOrContent fileContent @@ -54,13 +55,20 @@ type fileContent interface { Path() string } +func (l *Limiter) HeadSeriesLimiter() headSeriesLimiter { + l.headSeriesLimiterMtx.Lock() + defer l.headSeriesLimiterMtx.Unlock() + + return l.headSeriesLimiter +} + // NewLimiter creates a new *Limiter given a configuration and prometheus // registerer. func NewLimiter(configFile fileContent, reg prometheus.Registerer, r ReceiverMode, logger log.Logger, configReloadTimer time.Duration) (*Limiter, error) { limiter := &Limiter{ writeGate: gate.NewNoop(), requestLimiter: &noopRequestLimiter{}, - HeadSeriesLimiter: NewNopSeriesLimit(), + headSeriesLimiter: NewNopSeriesLimit(), logger: logger, receiverMode: r, configReloadTimer: configReloadTimer, @@ -165,7 +173,9 @@ func (l *Limiter) loadConfig() error { return false } if (l.receiverMode == RouterOnly || l.receiverMode == RouterIngestor) && seriesLimitIsActivated() { - l.HeadSeriesLimiter = NewHeadSeriesLimit(config.WriteLimits, l.registerer, l.logger) + l.headSeriesLimiterMtx.Lock() + l.headSeriesLimiter = NewHeadSeriesLimit(config.WriteLimits, l.registerer, l.logger) + l.headSeriesLimiterMtx.Unlock() } return nil }