From aebb15ff0be7735eaf54ef7ce662d93740710705 Mon Sep 17 00:00:00 2001 From: Stavros Kontopoulos Date: Thu, 22 Sep 2022 03:35:59 +0300 Subject: [PATCH] reset drainer (#1242) (#1252) --- cmd/queue/main.go | 36 ++++++--- test/test_images/readiness/service.yaml | 5 ++ .../knative.dev/pkg/network/handlers/drain.go | 73 ++++++++++++++----- 3 files changed, 86 insertions(+), 28 deletions(-) diff --git a/cmd/queue/main.go b/cmd/queue/main.go index 440c10df3da1..f2b244fe9dfc 100644 --- a/cmd/queue/main.go +++ b/cmd/queue/main.go @@ -169,11 +169,11 @@ func main() { // Enable TLS when certificate is mounted. tlsEnabled := exists(logger, certPath) && exists(logger, keyPath) - mainServer, drain := buildServer(ctx, env, probe, stats, logger, concurrencyendpoint, false) + mainServer, drainer := buildServer(ctx, env, probe, stats, logger, concurrencyendpoint, false) httpServers := map[string]*http.Server{ "main": mainServer, "metrics": buildMetricsServer(protoStatReporter), - "admin": buildAdminServer(logger, drain), + "admin": buildAdminServer(ctx, logger, drainer), } if env.EnableProfiling { httpServers["profile"] = profiling.NewServer(profiling.NewHandler(logger, true)) @@ -184,10 +184,10 @@ func main() { // See also https://github.com/knative/serving/issues/12808. var tlsServers map[string]*http.Server if tlsEnabled { - mainTLSServer, drain := buildServer(ctx, env, probe, stats, logger, concurrencyendpoint, true /* enable TLS */) + mainTLSServer, drainer := buildServer(ctx, env, probe, stats, logger, concurrencyendpoint, true /* enable TLS */) tlsServers = map[string]*http.Server{ "tlsMain": mainTLSServer, - "tlsAdmin": buildAdminServer(logger, drain), + "tlsAdmin": buildAdminServer(ctx, logger, drainer), } // Drop admin http server as we Use TLS for the admin server. // TODO: The drain created with mainServer above is lost. Unify the two drain. @@ -229,7 +229,7 @@ func main() { } logger.Info("Received TERM signal, attempting to gracefully shutdown servers.") logger.Infof("Sleeping %v to allow K8s propagation of non-ready state", drainSleepDuration) - drain() + drainer.Drain() // Removing the main server from the shutdown logic as we've already shut it down. delete(httpServers, "main") @@ -264,7 +264,7 @@ func buildProbe(logger *zap.SugaredLogger, encodedProbe string, autodetectHTTP2 } func buildServer(ctx context.Context, env config, probeContainer func() bool, stats *netstats.RequestStats, logger *zap.SugaredLogger, - ce *queue.ConcurrencyEndpoint, enableTLS bool) (server *http.Server, drain func()) { + ce *queue.ConcurrencyEndpoint, enableTLS bool) (*http.Server, *pkghandler.Drainer) { // TODO: If TLS is enabled, execute probes twice and tracking two different sets of container health. target := net.JoinHostPort("127.0.0.1", env.UserPort) @@ -330,10 +330,10 @@ func buildServer(ctx context.Context, env config, probeContainer func() bool, st } if enableTLS { - return pkgnet.NewServer(":"+env.QueueServingTLSPort, composedHandler), drainer.Drain + return pkgnet.NewServer(":"+env.QueueServingTLSPort, composedHandler), drainer } - return pkgnet.NewServer(":"+env.QueueServingPort, composedHandler), drainer.Drain + return pkgnet.NewServer(":"+env.QueueServingPort, composedHandler), drainer } func buildTransport(env config, logger *zap.SugaredLogger) http.RoundTripper { @@ -396,11 +396,25 @@ func supportsMetrics(ctx context.Context, logger *zap.SugaredLogger, env config, return true } -func buildAdminServer(logger *zap.SugaredLogger, drain func()) *http.Server { +func buildAdminServer(ctx context.Context, logger *zap.SugaredLogger, drainer *pkghandler.Drainer) *http.Server { adminMux := http.NewServeMux() adminMux.HandleFunc(queue.RequestQueueDrainPath, func(w http.ResponseWriter, r *http.Request) { - logger.Info("Attached drain handler from user-container") - drain() + logger.Info("Attached drain handler from user-container", r) + + go func() { + select { + case <-ctx.Done(): + case <-time.After(time.Second): + // If the context isn't done then the queue proxy didn't + // receive a TERM signal. Thus the user-container's + // liveness probes are triggering the container to restart + // and we shouldn't block that + drainer.Reset() + } + }() + + drainer.Drain() + w.WriteHeader(http.StatusOK) }) return &http.Server{ diff --git a/test/test_images/readiness/service.yaml b/test/test_images/readiness/service.yaml index ec67500b003a..08cb112c0b69 100644 --- a/test/test_images/readiness/service.yaml +++ b/test/test_images/readiness/service.yaml @@ -8,3 +8,8 @@ spec: spec: containers: - image: ko://knative.dev/serving/test/test_images/readiness + livenessProbe: + httpGet: + path: /healthz + periodSeconds: 1 + failureThreshold: 1 \ No newline at end of file diff --git a/vendor/knative.dev/pkg/network/handlers/drain.go b/vendor/knative.dev/pkg/network/handlers/drain.go index d6ef19f4eac2..81eaeaed1613 100644 --- a/vendor/knative.dev/pkg/network/handlers/drain.go +++ b/vendor/knative.dev/pkg/network/handlers/drain.go @@ -70,12 +70,15 @@ type Drainer struct { // after Drain is called before it may return. QuietPeriod time.Duration - // once is used to initialize timer - once sync.Once - // timer is used to orchestrate the drain. timer timer + // used to synchronize callers of Drain + drainCh chan struct{} + + // used to synchronize Drain and Reset + resetCh chan struct{} + // HealthCheckUAPrefixes are the additional user agent prefixes that trigger the // drainer's health check HealthCheckUAPrefixes []string @@ -106,7 +109,7 @@ func (d *Drainer) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - d.reset() + d.resetTimer() d.Inner.ServeHTTP(w, r) } @@ -115,19 +118,36 @@ func (d *Drainer) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (d *Drainer) Drain() { // Note: until the first caller exits, the others // will wait blocked as well. - d.once.Do(func() { - t := func() timer { - d.Lock() - defer d.Unlock() - if d.QuietPeriod <= 0 { - d.QuietPeriod = network.DefaultDrainTimeout + ch := func() chan struct{} { + d.Lock() + defer d.Unlock() + if d.drainCh != nil { + return d.drainCh + } + + if d.QuietPeriod <= 0 { + d.QuietPeriod = network.DefaultDrainTimeout + } + + timer := newTimer(d.QuietPeriod) + drainCh := make(chan struct{}) + resetCh := make(chan struct{}) + + go func() { + select { + case <-resetCh: + case <-timer.tickChan(): } - d.timer = newTimer(d.QuietPeriod) - return d.timer + close(drainCh) }() - <-t.tickChan() - }) + d.drainCh = drainCh + d.resetCh = resetCh + d.timer = timer + return drainCh + }() + + <-ch } // isHealthcheckRequest validates if the request has a user agent that is for healthcheck @@ -145,8 +165,27 @@ func (d *Drainer) isHealthCheckRequest(r *http.Request) bool { return false } -// reset resets the drain timer to the full amount of time. -func (d *Drainer) reset() { +// Reset interrupts Drain and clears the drainers internal state +// Thus further calls to Drain will block and wait for the entire QuietPeriod +func (d *Drainer) Reset() { + d.Lock() + defer d.Unlock() + + if d.timer != nil { + d.timer.Stop() + d.timer = nil + } + + if d.resetCh != nil { + close(d.resetCh) + d.resetCh = nil + } + if d.drainCh != nil { + d.drainCh = nil + } +} + +func (d *Drainer) resetTimer() { if func() bool { d.RLock() defer d.RUnlock() @@ -185,4 +224,4 @@ func serveKProbe(w http.ResponseWriter, r *http.Request) { } w.Header().Set(network.HashHeaderName, hh) w.WriteHeader(http.StatusOK) -} +} \ No newline at end of file