Skip to content

Commit

Permalink
reset drainer (knative#1242) (knative#1252)
Browse files Browse the repository at this point in the history
  • Loading branch information
skonto authored Sep 22, 2022
1 parent aba172f commit 57f2a9a
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 28 deletions.
36 changes: 25 additions & 11 deletions cmd/queue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,11 @@ func main() {
// Enable TLS when certificate is mounted.
tlsEnabled := exists(logger, certPath) && exists(logger, keyPath)

mainServer, drain := buildServer(ctx, env, probe, stats, logger, concurrencyendpoint, false)
mainServer, drainer := buildServer(ctx, env, probe, stats, logger, concurrencyendpoint, false)
httpServers := map[string]*http.Server{
"main": mainServer,
"metrics": buildMetricsServer(promStatReporter, protoStatReporter),
"admin": buildAdminServer(logger, drain),
"admin": buildAdminServer(ctx, logger, drainer),
}
if env.EnableProfiling {
httpServers["profile"] = profiling.NewServer(profiling.NewHandler(logger, true))
Expand All @@ -193,10 +193,10 @@ func main() {
// See also https://github.com/knative/serving/issues/12808.
var tlsServers map[string]*http.Server
if tlsEnabled {
mainTLSServer, drain := buildServer(ctx, env, probe, stats, logger, concurrencyendpoint, true /* enable TLS */)
mainTLSServer, drainer := buildServer(ctx, env, probe, stats, logger, concurrencyendpoint, true /* enable TLS */)
tlsServers = map[string]*http.Server{
"tlsMain": mainTLSServer,
"tlsAdmin": buildAdminServer(logger, drain),
"tlsAdmin": buildAdminServer(ctx, logger, drainer),
}
// Drop admin http server as we Use TLS for the admin server.
// TODO: The drain created with mainServer above is lost. Unify the two drain.
Expand Down Expand Up @@ -238,7 +238,7 @@ func main() {
}
logger.Info("Received TERM signal, attempting to gracefully shutdown servers.")
logger.Infof("Sleeping %v to allow K8s propagation of non-ready state", drainSleepDuration)
drain()
drainer.Drain()

// Removing the main server from the shutdown logic as we've already shut it down.
delete(httpServers, "main")
Expand Down Expand Up @@ -273,7 +273,7 @@ func buildProbe(logger *zap.SugaredLogger, encodedProbe string, autodetectHTTP2
}

func buildServer(ctx context.Context, env config, probeContainer func() bool, stats *netstats.RequestStats, logger *zap.SugaredLogger,
ce *queue.ConcurrencyEndpoint, enableTLS bool) (server *http.Server, drain func()) {
ce *queue.ConcurrencyEndpoint, enableTLS bool) (*http.Server, *pkghandler.Drainer) {
// TODO: If TLS is enabled, execute probes twice and tracking two different sets of container health.

target := net.JoinHostPort("127.0.0.1", env.UserPort)
Expand Down Expand Up @@ -339,10 +339,10 @@ func buildServer(ctx context.Context, env config, probeContainer func() bool, st
}

if enableTLS {
return pkgnet.NewServer(":"+env.QueueServingTLSPort, composedHandler), drainer.Drain
return pkgnet.NewServer(":"+env.QueueServingTLSPort, composedHandler), drainer
}

return pkgnet.NewServer(":"+env.QueueServingPort, composedHandler), drainer.Drain
return pkgnet.NewServer(":"+env.QueueServingPort, composedHandler), drainer
}

func buildTransport(env config, logger *zap.SugaredLogger) http.RoundTripper {
Expand Down Expand Up @@ -405,11 +405,25 @@ func supportsMetrics(ctx context.Context, logger *zap.SugaredLogger, env config,
return true
}

func buildAdminServer(logger *zap.SugaredLogger, drain func()) *http.Server {
func buildAdminServer(ctx context.Context, logger *zap.SugaredLogger, drainer *pkghandler.Drainer) *http.Server {
adminMux := http.NewServeMux()
adminMux.HandleFunc(queue.RequestQueueDrainPath, func(w http.ResponseWriter, r *http.Request) {
logger.Info("Attached drain handler from user-container")
drain()
logger.Info("Attached drain handler from user-container", r)

go func() {
select {
case <-ctx.Done():
case <-time.After(time.Second):
// If the context isn't done then the queue proxy didn't
// receive a TERM signal. Thus the user-container's
// liveness probes are triggering the container to restart
// and we shouldn't block that
drainer.Reset()
}
}()

drainer.Drain()
w.WriteHeader(http.StatusOK)
})

return &http.Server{
Expand Down
5 changes: 5 additions & 0 deletions test/test_images/readiness/service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,8 @@ spec:
spec:
containers:
- image: ko://knative.dev/serving/test/test_images/readiness
livenessProbe:
httpGet:
path: /healthz
periodSeconds: 1
failureThreshold: 1
73 changes: 56 additions & 17 deletions vendor/knative.dev/pkg/network/handlers/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,15 @@ type Drainer struct {
// after Drain is called before it may return.
QuietPeriod time.Duration

// once is used to initialize timer
once sync.Once

// timer is used to orchestrate the drain.
timer timer

// used to synchronize callers of Drain
drainCh chan struct{}

// used to synchronize Drain and Reset
resetCh chan struct{}

// HealthCheckUAPrefixes are the additional user agent prefixes that trigger the
// drainer's health check
HealthCheckUAPrefixes []string
Expand Down Expand Up @@ -106,7 +109,7 @@ func (d *Drainer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

d.reset()
d.resetTimer()
d.Inner.ServeHTTP(w, r)
}

Expand All @@ -115,19 +118,36 @@ func (d *Drainer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
func (d *Drainer) Drain() {
// Note: until the first caller exits, the others
// will wait blocked as well.
d.once.Do(func() {
t := func() timer {
d.Lock()
defer d.Unlock()
if d.QuietPeriod <= 0 {
d.QuietPeriod = network.DefaultDrainTimeout
ch := func() chan struct{} {
d.Lock()
defer d.Unlock()
if d.drainCh != nil {
return d.drainCh
}

if d.QuietPeriod <= 0 {
d.QuietPeriod = network.DefaultDrainTimeout
}

timer := newTimer(d.QuietPeriod)
drainCh := make(chan struct{})
resetCh := make(chan struct{})

go func() {
select {
case <-resetCh:
case <-timer.tickChan():
}
d.timer = newTimer(d.QuietPeriod)
return d.timer
close(drainCh)
}()

<-t.tickChan()
})
d.drainCh = drainCh
d.resetCh = resetCh
d.timer = timer
return drainCh
}()

<-ch
}

// isHealthcheckRequest validates if the request has a user agent that is for healthcheck
Expand All @@ -145,8 +165,27 @@ func (d *Drainer) isHealthCheckRequest(r *http.Request) bool {
return false
}

// reset resets the drain timer to the full amount of time.
func (d *Drainer) reset() {
// Reset interrupts Drain and clears the drainers internal state
// Thus further calls to Drain will block and wait for the entire QuietPeriod
func (d *Drainer) Reset() {
d.Lock()
defer d.Unlock()

if d.timer != nil {
d.timer.Stop()
d.timer = nil
}

if d.resetCh != nil {
close(d.resetCh)
d.resetCh = nil
}
if d.drainCh != nil {
d.drainCh = nil
}
}

func (d *Drainer) resetTimer() {
if func() bool {
d.RLock()
defer d.RUnlock()
Expand Down Expand Up @@ -185,4 +224,4 @@ func serveKProbe(w http.ResponseWriter, r *http.Request) {
}
w.Header().Set(network.HashHeaderName, hh)
w.WriteHeader(http.StatusOK)
}
}

0 comments on commit 57f2a9a

Please sign in to comment.