Skip to content

Commit

Permalink
Improve shutdown logic: wait until no requests are made
Browse files Browse the repository at this point in the history
Pods in Kubernetes endpoints are expected to shut-down 'gracefully' after receiving SIGTERM -
we should keep accepting new connections for a while. This is because Kubernetes updates Service endpoints
and sends SIGTERM to pods *in parallel*.

See kubernetes/kubernetes#106476 for more detail.
  • Loading branch information
motoki317 committed Nov 28, 2024
1 parent e76ea47 commit 25f5d09
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 1 deletion.
61 changes: 60 additions & 1 deletion internal/ingress/controller/nginx.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"errors"
"fmt"
"io/fs"
"k8s.io/ingress-nginx/internal/ingress/metric/collectors"
"net"
"net/http"
"os"
Expand Down Expand Up @@ -377,6 +378,63 @@ func (n *NGINXController) Start() {
}
}

// stopWait waits until no more connections are made to nginx.
//
// This waits until all of following conditions are met:
// - No more requests are made to nginx for the last 5 seconds.
// - 'shutdown-grace-period' seconds have passed after calling this method.
//
// Pods in Kubernetes endpoints are expected to shut-down 'gracefully' after receiving SIGTERM -
// we should keep accepting new connections for a while. This is because Kubernetes updates Service endpoints
// and sends SIGTERM to pods *in parallel*.
// If we don't see new requests for 5 seconds, then we assume that this pod was removed from the upstream endpoints
// (AWS ALB endpoints for example), and proceed with shutdown.
//
// See https://github.com/kubernetes/kubernetes/issues/106476 for more detail on this issue.
func (n *NGINXController) stopWait() {
const checkFrequency = time.Second
const waitUntilNoConnectionsFor = int((5 * time.Second) / checkFrequency)
waitAtLeastUntil := time.Now().Add(time.Duration(n.cfg.ShutdownGracePeriod) * time.Second)

var scraper collectors.NginxStatusScraper
lastRequests := 0
noChangeTimes := 0

for ; ; time.Sleep(checkFrequency) {
st, err := scraper.Scrape()
if err != nil {
klog.Warningf("failed to scrape nginx status: %v", err)
noChangeTimes = 0
continue
}

diff := st.Requests - lastRequests
// We assume that there were no client requests to nginx, if and only if
// there were 0 to 2 increase in handled requests from the last scrape.
// 1 is to account for our own stub_status request from this method,
// and the other 1 is to account for the readinessProbe.
// Note that readinessProbe DO happen even when the pod is terminating.
// See: https://github.com/kubernetes/kubernetes/issues/122824#issuecomment-1899224434
noChange := 0 <= diff && diff <= 2
if noChange {
noChangeTimes++
if noChangeTimes >= waitUntilNoConnectionsFor {
// Safe to proceed shutdown, we are seeing no more client request.
break
}
} else {
noChangeTimes = 0
}
lastRequests = st.Requests
}

// Wait at least for the configured duration, if any
delay := waitAtLeastUntil.Sub(time.Now())
if delay > 0 {
time.Sleep(delay)
}
}

// Stop gracefully stops the NGINX master process.
func (n *NGINXController) Stop() error {
n.isShuttingDown = true
Expand All @@ -388,7 +446,8 @@ func (n *NGINXController) Stop() error {
return fmt.Errorf("shutdown already in progress")
}

time.Sleep(time.Duration(n.cfg.ShutdownGracePeriod) * time.Second)
klog.InfoS("Graceful shutdown - waiting until no more requests are made")
n.stopWait()

klog.InfoS("Shutting down controller queues")
close(n.stopCh)
Expand Down
4 changes: 4 additions & 0 deletions test/e2e/framework/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,10 @@ func (f *Framework) ScaleDeploymentToZero(name string) {
assert.Nil(ginkgo.GinkgoT(), err, "getting deployment")
assert.NotNil(ginkgo.GinkgoT(), d, "expected a deployment but none returned")

err = waitForPodsDeleted(f.KubeClientSet, 2*time.Minute, f.Namespace, &metav1.ListOptions{
LabelSelector: labelSelectorToString(d.Spec.Selector.MatchLabels),
})
assert.Nil(ginkgo.GinkgoT(), err, "waiting for no pods")
err = WaitForEndpoints(f.KubeClientSet, DefaultTimeout, name, f.Namespace, 0)
assert.Nil(ginkgo.GinkgoT(), err, "waiting for no endpoints")
}
Expand Down
87 changes: 87 additions & 0 deletions test/e2e/gracefulshutdown/k8s_async_shutdown.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package gracefulshutdown

import (
"context"
"fmt"
"github.com/onsi/ginkgo/v2"
"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/ingress-nginx/test/e2e/framework"
"net/http"
"strings"
"time"
)

var _ = framework.IngressNginxDescribe("[Shutdown] Asynchronous shutdown", func() {
f := framework.NewDefaultFramework("k8s-async-shutdown", func(f *framework.Framework) {
f.Namespace = "k8s-async-shutdown"
})

host := "async-shutdown"

ginkgo.BeforeEach(func() {
f.NewSlowEchoDeployment()
})

ginkgo.It("should not shut down while still receiving traffic", func() {
defer ginkgo.GinkgoRecover()

err := f.UpdateIngressControllerDeployment(func(deployment *appsv1.Deployment) error {
// Note: e2e's default terminationGracePeriodSeconds is 1 for some reason, so extend it
grace := int64(300)
deployment.Spec.Template.Spec.TerminationGracePeriodSeconds = &grace
_, err := f.KubeClientSet.AppsV1().Deployments(f.Namespace).Update(context.TODO(), deployment, metav1.UpdateOptions{})
return err
})
assert.Nil(ginkgo.GinkgoT(), err, "updating ingress controller deployment")

f.EnsureIngress(framework.NewSingleIngress(host, "/", host, f.Namespace, framework.SlowEchoService, 80, nil))

f.WaitForNginxServer(host,
func(server string) bool {
return strings.Contains(server, "server_name "+host)
})

// We need to get pod IP first because after the pod becomes terminating,
// it is removed from Service endpoints, and becomes unable to be discovered by "f.HTTPTestClient()".
ip := f.GetNginxPodIP()

// Assume that the upstream takes 30 seconds to update its endpoints,
// therefore we are still receiving traffic while shutting down
go func() {
defer ginkgo.GinkgoRecover()
for i := 0; i < 120; i++ {
f.HTTPDumbTestClient().
GET("/").
WithURL(fmt.Sprintf("http://%s/", ip)).
WithHeader("Host", host).
Expect().
Status(http.StatusOK)

framework.Sleep(250 * time.Millisecond)
}
}()

start := time.Now()
f.ScaleDeploymentToZero("nginx-ingress-controller")
assert.GreaterOrEqualf(ginkgo.GinkgoT(), int(time.Since(start).Seconds()), 35,
"should take more than 30 + 5 seconds for graceful shutdown")
})
})

0 comments on commit 25f5d09

Please sign in to comment.