Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thick plugin graceful termination #1338

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions cmd/multus-daemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"path/filepath"
"sync"
"syscall"
"time"

utilwait "k8s.io/apimachinery/pkg/util/wait"

Expand All @@ -41,6 +42,10 @@
"github.com/prometheus/client_golang/prometheus/promhttp"
)

// SigtermCancelAfter sets the wait time to cancel after sig term

Check warning on line 45 in cmd/multus-daemon/main.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, ubuntu-latest)

comment on exported const SigTermCancelAfter should be of the form "SigTermCancelAfter ..."

Check warning on line 45 in cmd/multus-daemon/main.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

comment on exported const SigTermCancelAfter should be of the form "SigTermCancelAfter ..."
// TODO: This could be a configuration option
const SigTermCancelAfter = 10 * time.Second

func main() {
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError)

Expand All @@ -58,6 +63,13 @@

ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
sigTermCtx, sigTermCancel := context.WithCancel(ctx)
isInGracefulShutdownMode := func() bool {
if sigTermCtx.Err() == nil {
return false
}
return true
}

daemonConf, err := cniServerConfig(*configFilePath)
if err != nil {
Expand Down Expand Up @@ -105,7 +117,7 @@
}
}

if err := startMultusDaemon(ctx, daemonConf, ignoreReadinessIndicator); err != nil {
if err := startMultusDaemon(ctx, daemonConf, ignoreReadinessIndicator, isInGracefulShutdownMode); err != nil {
logging.Panicf("failed start the multus thick-plugin listener: %v", err)
os.Exit(3)
}
Expand All @@ -123,6 +135,8 @@
go func() {
for sig := range signalCh {
logging.Verbosef("caught %v, stopping...", sig)
sigTermCancel()
<-time.After(SigTermCancelAfter)
cancel()
}
}()
Expand All @@ -139,7 +153,7 @@
logging.Verbosef("multus daemon is exited")
}

func startMultusDaemon(ctx context.Context, daemonConfig *srv.ControllerNetConf, ignoreReadinessIndicator bool) error {
func startMultusDaemon(ctx context.Context, daemonConfig *srv.ControllerNetConf, ignoreReadinessIndicator bool, isInGracefulShutdownMode func() bool) error {
if user, err := user.Current(); err != nil || user.Uid != "0" {
return fmt.Errorf("failed to run multus-daemon with root: %v, now running in uid: %s", err, user.Uid)
}
Expand All @@ -148,7 +162,7 @@
return fmt.Errorf("failed to prepare the cni-socket for communicating with the shim: %w", err)
}

server, err := srv.NewCNIServer(daemonConfig, daemonConfig.ConfigFileContents, ignoreReadinessIndicator)
server, err := srv.NewCNIServer(daemonConfig, daemonConfig.ConfigFileContents, ignoreReadinessIndicator, isInGracefulShutdownMode)
if err != nil {
return fmt.Errorf("failed to create the server: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion deployments/multus-daemonset-crio.yml
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ spec:
mountPath: /host/usr/libexec/cni
- name: multus-cfg
mountPath: /tmp/multus-conf
terminationGracePeriodSeconds: 10
terminationGracePeriodSeconds: 30
volumes:
- name: run
hostPath:
Expand Down
2 changes: 1 addition & 1 deletion deployments/multus-daemonset-thick.yml
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ spec:
- name: cnibin
mountPath: /host/opt/cni/bin
mountPropagation: Bidirectional
terminationGracePeriodSeconds: 10
terminationGracePeriodSeconds: 30
volumes:
- name: cni
hostPath:
Expand Down
2 changes: 1 addition & 1 deletion deployments/multus-daemonset.yml
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ spec:
- name: cnibin
mountPath: /host/opt/cni/bin
mountPropagation: Bidirectional
terminationGracePeriodSeconds: 10
terminationGracePeriodSeconds: 30
volumes:
- name: cni
hostPath:
Expand Down
5 changes: 4 additions & 1 deletion pkg/server/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ const (

// MultusHealthAPIEndpoint is an endpoint API clients can query to know if they can communicate w/ multus server
MultusHealthAPIEndpoint = "/healthz"

// MultusReadyAPIEndpoint is like health, but starts returning status 500 once a sig-term is received.
MultusReadyAPIEndpoint = "/readyz"
)

// DoCNI sends a CNI request to the CNI server via JSON + HTTP over a root-owned unix socket,
Expand Down Expand Up @@ -100,7 +103,7 @@ func CreateDelegateRequest(cniCommand, cniContainerID, cniNetNS, cniIFName, podN
// WaitUntilAPIReady checks API readiness
func WaitUntilAPIReady(socketPath string) error {
return utilwait.PollImmediate(APIReadyPollDuration, APIReadyPollTimeout, func() (bool, error) {
_, err := DoCNI(GetAPIEndpoint(MultusHealthAPIEndpoint), nil, SocketPath(socketPath))
_, err := DoCNI(GetAPIEndpoint(MultusReadyAPIEndpoint), nil, SocketPath(socketPath))
return err == nil, nil
})
}
Expand Down
23 changes: 20 additions & 3 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func isPerNodeCertEnabled(config *PerNodeCertificate) (bool, error) {
}

// NewCNIServer creates and returns a new Server object which will listen on a socket in the given path
func NewCNIServer(daemonConfig *ControllerNetConf, serverConfig []byte, ignoreReadinessIndicator bool) (*Server, error) {
func NewCNIServer(daemonConfig *ControllerNetConf, serverConfig []byte, ignoreReadinessIndicator bool, isInGracefulShutdownMode func() bool) (*Server, error) {
var kubeClient *k8s.ClientInfo
enabled, err := isPerNodeCertEnabled(daemonConfig.PerNodeCertificate)
if enabled {
Expand Down Expand Up @@ -251,10 +251,10 @@ func NewCNIServer(daemonConfig *ControllerNetConf, serverConfig []byte, ignoreRe
logging.Verbosef("server configured with chroot: %s", daemonConfig.ChrootDir)
}

return newCNIServer(daemonConfig.SocketDir, kubeClient, exec, serverConfig, ignoreReadinessIndicator)
return newCNIServer(daemonConfig.SocketDir, kubeClient, exec, serverConfig, ignoreReadinessIndicator, isInGracefulShutdownMode)
}

func newCNIServer(rundir string, kubeClient *k8s.ClientInfo, exec invoke.Exec, servConfig []byte, ignoreReadinessIndicator bool) (*Server, error) {
func newCNIServer(rundir string, kubeClient *k8s.ClientInfo, exec invoke.Exec, servConfig []byte, ignoreReadinessIndicator bool, isInGracefulShutdownMode func() bool) (*Server, error) {
informerFactory, podInformer := newPodInformer(kubeClient.Client, os.Getenv("MULTUS_NODE_NAME"))
netdefInformerFactory, netdefInformer := newNetDefInformer(kubeClient.NetClient)
kubeClient.SetK8sClientInformers(podInformer, netdefInformer)
Expand Down Expand Up @@ -344,6 +344,23 @@ func newCNIServer(rundir string, kubeClient *k8s.ClientInfo, exec invoke.Exec, s
w.Header().Set("Content-Type", "application/json")
})))

// handle for '/readyz'
router.HandleFunc(api.MultusReadyAPIEndpoint, promhttp.InstrumentHandlerCounter(s.metrics.requestCounter.MustCurryWith(prometheus.Labels{"handler": api.MultusHealthAPIEndpoint}),
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet && r.Method != http.MethodPost {
http.Error(w, fmt.Sprintf("Method not allowed"), http.StatusMethodNotAllowed)
return
}

if !isInGracefulShutdownMode() {
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
} else {
w.WriteHeader(http.StatusInternalServerError)
w.Header().Set("Content-Type", "application/json")
}
})))

// this handle for the rest of above
router.HandleFunc("/", promhttp.InstrumentHandlerCounter(s.metrics.requestCounter.MustCurryWith(prometheus.Labels{"handler": "NotFound"}),
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/thick_cni_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func createFakePod(k8sClient *k8s.ClientInfo, podName string) error {
func startCNIServer(ctx context.Context, runDir string, k8sClient *k8s.ClientInfo, servConfig []byte) (*Server, error) {
const period = 0

cniServer, err := newCNIServer(runDir, k8sClient, &fakeExec{}, servConfig, true)
cniServer, err := newCNIServer(runDir, k8sClient, &fakeExec{}, servConfig, true, func() bool { return false })
if err != nil {
return nil, err
}
Expand Down
Loading