diff --git a/CHANGELOG.md b/CHANGELOG.md index a0bccbbaf6..43a273b81c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#7011](https://github.com/thanos-io/thanos/pull/7011) Query Frontend: queries with negative offset should check whether it is cacheable or not. - [#6874](https://github.com/thanos-io/thanos/pull/6874) Sidecar: fix labels returned by 'api/v1/series' in presence of conflicting external and inner labels. - [#7009](https://github.com/thanos-io/thanos/pull/7009) Rule: Fix spacing error in URL. +- [#7080](https://github.com/thanos-io/thanos/pull/7080) Receive: race condition in handler Close() when stopped early ### Added diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index ba4b9b94ce..e10874d749 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -98,11 +98,11 @@ type Options struct { // Handler serves a Prometheus remote write receiving HTTP endpoint. type Handler struct { - logger log.Logger - writer *Writer - router *route.Router - options *Options - listener net.Listener + logger log.Logger + writer *Writer + router *route.Router + options *Options + httpSrv *http.Server mtx sync.RWMutex hashring Hashring @@ -241,6 +241,14 @@ func NewHandler(logger log.Logger, o *Options) *Handler { }) statusAPI.Register(h.router, o.Tracer, logger, ins, logging.NewHTTPServerMiddleware(logger)) + errlog := stdlog.New(log.NewStdlibAdapter(level.Error(h.logger)), "", 0) + + h.httpSrv = &http.Server{ + Handler: h.router, + ErrorLog: errlog, + TLSConfig: h.options.TLSConfig, + } + return h } @@ -364,42 +372,31 @@ func (h *Handler) getStats(r *http.Request, statsByLabelName string) ([]statusap // Close stops the Handler. func (h *Handler) Close() { - if h.listener != nil { - runutil.CloseWithLogOnErr(h.logger, h.listener, "receive HTTP listener") - } + runutil.CloseWithLogOnErr(h.logger, h.httpSrv, "receive HTTP server") } // Run serves the HTTP endpoints. func (h *Handler) Run() error { level.Info(h.logger).Log("msg", "Start listening for connections", "address", h.options.ListenAddress) - var err error - h.listener, err = net.Listen("tcp", h.options.ListenAddress) + listener, err := net.Listen("tcp", h.options.ListenAddress) if err != nil { return err } // Monitor incoming connections with conntrack. - h.listener = conntrack.NewListener(h.listener, + listener = conntrack.NewListener(listener, conntrack.TrackWithName("http"), conntrack.TrackWithTracing()) - errlog := stdlog.New(log.NewStdlibAdapter(level.Error(h.logger)), "", 0) - - httpSrv := &http.Server{ - Handler: h.router, - ErrorLog: errlog, - TLSConfig: h.options.TLSConfig, - } - if h.options.TLSConfig != nil { level.Info(h.logger).Log("msg", "Serving HTTPS", "address", h.options.ListenAddress) // Cert & Key are already being passed in via TLSConfig. - return httpSrv.ServeTLS(h.listener, "", "") + return h.httpSrv.ServeTLS(listener, "", "") } level.Info(h.logger).Log("msg", "Serving plain HTTP", "address", h.options.ListenAddress) - return httpSrv.Serve(h.listener) + return h.httpSrv.Serve(listener) } // replica encapsulates the replica number of a request and if the request is diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index a693843aff..e7e0d316c9 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -1645,3 +1645,12 @@ func TestHashringChangeCallsClose(t *testing.T) { pg := allHandlers[0].peers.(*fakePeersGroup) testutil.Assert(t, len(pg.closeCalled) > 0) } + +func TestHandlerEarlyStop(t *testing.T) { + h := NewHandler(nil, &Options{}) + h.Close() + + err := h.Run() + testutil.NotOk(t, err) + testutil.Equals(t, "http: Server closed", err.Error()) +}