From 531dec1c916d746aabf3ad800803ee0a82c8a11b Mon Sep 17 00:00:00 2001 From: David Eads Date: Wed, 18 Sep 2024 20:22:53 -0400 Subject: [PATCH] This PR introduces graceful shutdown functionality to the Multus daemon by adding a /readyz endpoint That is added alongside the existing /healthz. The /readyz endpoint starts returning 500 once a SIGTERM is received, indicating the daemon is in shutdown mode. During this time, CNI requests can still be processed for a short window. The daemonset configs have been updated to increase terminationGracePeriodSeconds from 10 to 30 seconds, ensuring we have a bit more time for these clean shutdowns. This addresses a race condition during pod transitions where the readiness check might return true, but a subsequent CNI request could fail if the daemon shuts down too quickly. By introducing the /readyz endpoint and delaying the shutdown, we can handle ongoing CNI requests more gracefully, reducing the risk of disruptions during critical transitions. Major thanks to @deads2k for the find, identification, fix, and of course, the explanations. Appreciate it. --- cmd/multus-daemon/main.go | 20 +++++++++++++++++--- deployments/multus-daemonset-crio.yml | 2 +- deployments/multus-daemonset-thick.yml | 2 +- deployments/multus-daemonset.yml | 2 +- pkg/server/api/api.go | 5 ++++- pkg/server/server.go | 23 ++++++++++++++++++++--- pkg/server/thick_cni_test.go | 2 +- 7 files changed, 45 insertions(+), 11 deletions(-) diff --git a/cmd/multus-daemon/main.go b/cmd/multus-daemon/main.go index 11d38f05c..48f3324ef 100644 --- a/cmd/multus-daemon/main.go +++ b/cmd/multus-daemon/main.go @@ -28,6 +28,7 @@ import ( "path/filepath" "sync" "syscall" + "time" utilwait "k8s.io/apimachinery/pkg/util/wait" @@ -41,6 +42,10 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" ) +// SigtermCancelAfter sets the wait time to cancel after sig term +// TODO: This could be a configuration option +const SigTermCancelAfter = 10 * time.Second + func main() { flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError) @@ -58,6 +63,13 @@ func main() { ctx := context.Background() ctx, cancel := context.WithCancel(ctx) + sigTermCtx, sigTermCancel := context.WithCancel(ctx) + isInGracefulShutdownMode := func() bool { + if sigTermCtx.Err() == nil { + return false + } + return true + } daemonConf, err := cniServerConfig(*configFilePath) if err != nil { @@ -105,7 +117,7 @@ func main() { } } - if err := startMultusDaemon(ctx, daemonConf, ignoreReadinessIndicator); err != nil { + if err := startMultusDaemon(ctx, daemonConf, ignoreReadinessIndicator, isInGracefulShutdownMode); err != nil { logging.Panicf("failed start the multus thick-plugin listener: %v", err) os.Exit(3) } @@ -123,6 +135,8 @@ func main() { go func() { for sig := range signalCh { logging.Verbosef("caught %v, stopping...", sig) + sigTermCancel() + <-time.After(SigTermCancelAfter) cancel() } }() @@ -139,7 +153,7 @@ func main() { logging.Verbosef("multus daemon is exited") } -func startMultusDaemon(ctx context.Context, daemonConfig *srv.ControllerNetConf, ignoreReadinessIndicator bool) error { +func startMultusDaemon(ctx context.Context, daemonConfig *srv.ControllerNetConf, ignoreReadinessIndicator bool, isInGracefulShutdownMode func() bool) error { if user, err := user.Current(); err != nil || user.Uid != "0" { return fmt.Errorf("failed to run multus-daemon with root: %v, now running in uid: %s", err, user.Uid) } @@ -148,7 +162,7 @@ func startMultusDaemon(ctx context.Context, daemonConfig *srv.ControllerNetConf, return fmt.Errorf("failed to prepare the cni-socket for communicating with the shim: %w", err) } - server, err := srv.NewCNIServer(daemonConfig, daemonConfig.ConfigFileContents, ignoreReadinessIndicator) + server, err := srv.NewCNIServer(daemonConfig, daemonConfig.ConfigFileContents, ignoreReadinessIndicator, isInGracefulShutdownMode) if err != nil { return fmt.Errorf("failed to create the server: %v", err) } diff --git a/deployments/multus-daemonset-crio.yml b/deployments/multus-daemonset-crio.yml index 84d7b4029..80118f768 100644 --- a/deployments/multus-daemonset-crio.yml +++ b/deployments/multus-daemonset-crio.yml @@ -208,7 +208,7 @@ spec: mountPath: /host/usr/libexec/cni - name: multus-cfg mountPath: /tmp/multus-conf - terminationGracePeriodSeconds: 10 + terminationGracePeriodSeconds: 30 volumes: - name: run hostPath: diff --git a/deployments/multus-daemonset-thick.yml b/deployments/multus-daemonset-thick.yml index 1995a970b..c288e89a3 100644 --- a/deployments/multus-daemonset-thick.yml +++ b/deployments/multus-daemonset-thick.yml @@ -215,7 +215,7 @@ spec: - name: cnibin mountPath: /host/opt/cni/bin mountPropagation: Bidirectional - terminationGracePeriodSeconds: 10 + terminationGracePeriodSeconds: 30 volumes: - name: cni hostPath: diff --git a/deployments/multus-daemonset.yml b/deployments/multus-daemonset.yml index 40fa51932..ae961842b 100644 --- a/deployments/multus-daemonset.yml +++ b/deployments/multus-daemonset.yml @@ -220,7 +220,7 @@ spec: - name: cnibin mountPath: /host/opt/cni/bin mountPropagation: Bidirectional - terminationGracePeriodSeconds: 10 + terminationGracePeriodSeconds: 30 volumes: - name: cni hostPath: diff --git a/pkg/server/api/api.go b/pkg/server/api/api.go index 995676c38..14a827ccb 100644 --- a/pkg/server/api/api.go +++ b/pkg/server/api/api.go @@ -41,6 +41,9 @@ const ( // MultusHealthAPIEndpoint is an endpoint API clients can query to know if they can communicate w/ multus server MultusHealthAPIEndpoint = "/healthz" + + // MultusReadyAPIEndpoint is like health, but starts returning status 500 once a sig-term is received. + MultusReadyAPIEndpoint = "/readyz" ) // DoCNI sends a CNI request to the CNI server via JSON + HTTP over a root-owned unix socket, @@ -100,7 +103,7 @@ func CreateDelegateRequest(cniCommand, cniContainerID, cniNetNS, cniIFName, podN // WaitUntilAPIReady checks API readiness func WaitUntilAPIReady(socketPath string) error { return utilwait.PollImmediate(APIReadyPollDuration, APIReadyPollTimeout, func() (bool, error) { - _, err := DoCNI(GetAPIEndpoint(MultusHealthAPIEndpoint), nil, SocketPath(socketPath)) + _, err := DoCNI(GetAPIEndpoint(MultusReadyAPIEndpoint), nil, SocketPath(socketPath)) return err == nil, nil }) } diff --git a/pkg/server/server.go b/pkg/server/server.go index d469af23a..442ead832 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -209,7 +209,7 @@ func isPerNodeCertEnabled(config *PerNodeCertificate) (bool, error) { } // NewCNIServer creates and returns a new Server object which will listen on a socket in the given path -func NewCNIServer(daemonConfig *ControllerNetConf, serverConfig []byte, ignoreReadinessIndicator bool) (*Server, error) { +func NewCNIServer(daemonConfig *ControllerNetConf, serverConfig []byte, ignoreReadinessIndicator bool, isInGracefulShutdownMode func() bool) (*Server, error) { var kubeClient *k8s.ClientInfo enabled, err := isPerNodeCertEnabled(daemonConfig.PerNodeCertificate) if enabled { @@ -251,10 +251,10 @@ func NewCNIServer(daemonConfig *ControllerNetConf, serverConfig []byte, ignoreRe logging.Verbosef("server configured with chroot: %s", daemonConfig.ChrootDir) } - return newCNIServer(daemonConfig.SocketDir, kubeClient, exec, serverConfig, ignoreReadinessIndicator) + return newCNIServer(daemonConfig.SocketDir, kubeClient, exec, serverConfig, ignoreReadinessIndicator, isInGracefulShutdownMode) } -func newCNIServer(rundir string, kubeClient *k8s.ClientInfo, exec invoke.Exec, servConfig []byte, ignoreReadinessIndicator bool) (*Server, error) { +func newCNIServer(rundir string, kubeClient *k8s.ClientInfo, exec invoke.Exec, servConfig []byte, ignoreReadinessIndicator bool, isInGracefulShutdownMode func() bool) (*Server, error) { informerFactory, podInformer := newPodInformer(kubeClient.Client, os.Getenv("MULTUS_NODE_NAME")) netdefInformerFactory, netdefInformer := newNetDefInformer(kubeClient.NetClient) kubeClient.SetK8sClientInformers(podInformer, netdefInformer) @@ -344,6 +344,23 @@ func newCNIServer(rundir string, kubeClient *k8s.ClientInfo, exec invoke.Exec, s w.Header().Set("Content-Type", "application/json") }))) + // handle for '/readyz' + router.HandleFunc(api.MultusReadyAPIEndpoint, promhttp.InstrumentHandlerCounter(s.metrics.requestCounter.MustCurryWith(prometheus.Labels{"handler": api.MultusHealthAPIEndpoint}), + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet && r.Method != http.MethodPost { + http.Error(w, fmt.Sprintf("Method not allowed"), http.StatusMethodNotAllowed) + return + } + + if !isInGracefulShutdownMode() { + w.WriteHeader(http.StatusOK) + w.Header().Set("Content-Type", "application/json") + } else { + w.WriteHeader(http.StatusInternalServerError) + w.Header().Set("Content-Type", "application/json") + } + }))) + // this handle for the rest of above router.HandleFunc("/", promhttp.InstrumentHandlerCounter(s.metrics.requestCounter.MustCurryWith(prometheus.Labels{"handler": "NotFound"}), http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/pkg/server/thick_cni_test.go b/pkg/server/thick_cni_test.go index e97612fa9..7612ff8ac 100644 --- a/pkg/server/thick_cni_test.go +++ b/pkg/server/thick_cni_test.go @@ -274,7 +274,7 @@ func createFakePod(k8sClient *k8s.ClientInfo, podName string) error { func startCNIServer(ctx context.Context, runDir string, k8sClient *k8s.ClientInfo, servConfig []byte) (*Server, error) { const period = 0 - cniServer, err := newCNIServer(runDir, k8sClient, &fakeExec{}, servConfig, true) + cniServer, err := newCNIServer(runDir, k8sClient, &fakeExec{}, servConfig, true, func() bool { return false }) if err != nil { return nil, err }