From 0f0f0811b664eb00fccbb9db5c73edcd15679dfd Mon Sep 17 00:00:00 2001 From: Mikhail Nozdrachev Date: Tue, 23 Jan 2024 11:37:46 +0100 Subject: [PATCH] receive: race condition in handler Close() when stopped early Receiver hangs waiting for the HTTP Hander to shutdown if an error occurs before Handler is initialized. This might happen, for example, if the hashring is too small for a given replication factor. Signed-off-by: Mikhail Nozdrachev --- CHANGELOG.md | 1 + pkg/receive/handler.go | 39 +++++++++++++++++-------------------- pkg/receive/handler_test.go | 9 +++++++++ 3 files changed, 28 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 264a6351ea..50991c7b37 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#7011](https://github.com/thanos-io/thanos/pull/7011) Query Frontend: queries with negative offset should check whether it is cacheable or not. - [#6874](https://github.com/thanos-io/thanos/pull/6874) Sidecar: fix labels returned by 'api/v1/series' in presence of conflicting external and inner labels. - [#7009](https://github.com/thanos-io/thanos/pull/7009) Rule: Fix spacing error in URL. +- [#7080](https://github.com/thanos-io/thanos/pull/7080) Receive: race condition in handler Close() when stopped early ### Added diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index ba4b9b94ce..e10874d749 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -98,11 +98,11 @@ type Options struct { // Handler serves a Prometheus remote write receiving HTTP endpoint. type Handler struct { - logger log.Logger - writer *Writer - router *route.Router - options *Options - listener net.Listener + logger log.Logger + writer *Writer + router *route.Router + options *Options + httpSrv *http.Server mtx sync.RWMutex hashring Hashring @@ -241,6 +241,14 @@ func NewHandler(logger log.Logger, o *Options) *Handler { }) statusAPI.Register(h.router, o.Tracer, logger, ins, logging.NewHTTPServerMiddleware(logger)) + errlog := stdlog.New(log.NewStdlibAdapter(level.Error(h.logger)), "", 0) + + h.httpSrv = &http.Server{ + Handler: h.router, + ErrorLog: errlog, + TLSConfig: h.options.TLSConfig, + } + return h } @@ -364,42 +372,31 @@ func (h *Handler) getStats(r *http.Request, statsByLabelName string) ([]statusap // Close stops the Handler. func (h *Handler) Close() { - if h.listener != nil { - runutil.CloseWithLogOnErr(h.logger, h.listener, "receive HTTP listener") - } + runutil.CloseWithLogOnErr(h.logger, h.httpSrv, "receive HTTP server") } // Run serves the HTTP endpoints. func (h *Handler) Run() error { level.Info(h.logger).Log("msg", "Start listening for connections", "address", h.options.ListenAddress) - var err error - h.listener, err = net.Listen("tcp", h.options.ListenAddress) + listener, err := net.Listen("tcp", h.options.ListenAddress) if err != nil { return err } // Monitor incoming connections with conntrack. - h.listener = conntrack.NewListener(h.listener, + listener = conntrack.NewListener(listener, conntrack.TrackWithName("http"), conntrack.TrackWithTracing()) - errlog := stdlog.New(log.NewStdlibAdapter(level.Error(h.logger)), "", 0) - - httpSrv := &http.Server{ - Handler: h.router, - ErrorLog: errlog, - TLSConfig: h.options.TLSConfig, - } - if h.options.TLSConfig != nil { level.Info(h.logger).Log("msg", "Serving HTTPS", "address", h.options.ListenAddress) // Cert & Key are already being passed in via TLSConfig. - return httpSrv.ServeTLS(h.listener, "", "") + return h.httpSrv.ServeTLS(listener, "", "") } level.Info(h.logger).Log("msg", "Serving plain HTTP", "address", h.options.ListenAddress) - return httpSrv.Serve(h.listener) + return h.httpSrv.Serve(listener) } // replica encapsulates the replica number of a request and if the request is diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index a693843aff..e7e0d316c9 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -1645,3 +1645,12 @@ func TestHashringChangeCallsClose(t *testing.T) { pg := allHandlers[0].peers.(*fakePeersGroup) testutil.Assert(t, len(pg.closeCalled) > 0) } + +func TestHandlerEarlyStop(t *testing.T) { + h := NewHandler(nil, &Options{}) + h.Close() + + err := h.Run() + testutil.NotOk(t, err) + testutil.Equals(t, "http: Server closed", err.Error()) +}