From f2e9557a2d34c75d553370c3b6ce13a64b334ccb Mon Sep 17 00:00:00 2001 From: Oleg Zhurakivskyy Date: Wed, 27 Mar 2024 20:40:36 +0200 Subject: [PATCH] nfd-topology-updater: Add liveness probe Signed-off-by: Oleg Zhurakivskyy --- cmd/nfd-topology-updater/main.go | 2 + .../topologyupdater-daemonset.yaml | 11 +++++ .../templates/topologyupdater.yaml | 11 +++++ .../helm/node-feature-discovery/values.yaml | 14 ++++++ .../nfd-topology-updater.go | 44 +++++++++++++++++++ 5 files changed, 82 insertions(+) diff --git a/cmd/nfd-topology-updater/main.go b/cmd/nfd-topology-updater/main.go index f758c83c11..e5b77edee4 100644 --- a/cmd/nfd-topology-updater/main.go +++ b/cmd/nfd-topology-updater/main.go @@ -36,6 +36,7 @@ const ( // ProgramName is the canonical name of this program ProgramName = "nfd-topology-updater" kubeletSecurePort = 10250 + GrpcHealthPort = 8082 ) var DefaultKubeletStateDir = path.Join(string(hostpath.VarDir), "lib", "kubelet") @@ -54,6 +55,7 @@ func main() { utils.ConfigureGrpcKlog() // Get new TopologyUpdater instance + args.GrpcHealthPort = GrpcHealthPort instance, err := topology.NewTopologyUpdater(*args, *resourcemonitorArgs) if err != nil { klog.ErrorS(err, "failed to initialize topology updater instance") diff --git a/deployment/base/topologyupdater-daemonset/topologyupdater-daemonset.yaml b/deployment/base/topologyupdater-daemonset/topologyupdater-daemonset.yaml index 0db740115e..5d11e9b157 100644 --- a/deployment/base/topologyupdater-daemonset/topologyupdater-daemonset.yaml +++ b/deployment/base/topologyupdater-daemonset/topologyupdater-daemonset.yaml @@ -19,6 +19,17 @@ spec: - name: nfd-topology-updater image: gcr.io/k8s-staging-nfd/node-feature-discovery:master imagePullPolicy: Always + livenessProbe: + grpc: + port: 8082 + initialDelaySeconds: 10 + periodSeconds: 10 + readinessProbe: + grpc: + port: 8082 + initialDelaySeconds: 5 + periodSeconds: 10 + failureThreshold: 10 command: - "nfd-topology-updater" args: [] diff --git a/deployment/helm/node-feature-discovery/templates/topologyupdater.yaml b/deployment/helm/node-feature-discovery/templates/topologyupdater.yaml index af62107f92..1221cfd2dc 100644 --- a/deployment/helm/node-feature-discovery/templates/topologyupdater.yaml +++ b/deployment/helm/node-feature-discovery/templates/topologyupdater.yaml @@ -41,6 +41,17 @@ spec: - name: topology-updater image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" imagePullPolicy: "{{ .Values.image.pullPolicy }}" + livenessProbe: + grpc: + port: 8082 + initialDelaySeconds: 10 + periodSeconds: 10 + readinessProbe: + grpc: + port: 8082 + initialDelaySeconds: 5 + periodSeconds: 10 + failureThreshold: 10 env: - name: NODE_NAME valueFrom: diff --git a/deployment/helm/node-feature-discovery/values.yaml b/deployment/helm/node-feature-discovery/values.yaml index d915e35fb0..da6be9b025 100644 --- a/deployment/helm/node-feature-discovery/values.yaml +++ b/deployment/helm/node-feature-discovery/values.yaml @@ -475,6 +475,20 @@ topologyUpdater: readOnlyRootFilesystem: true runAsUser: 0 + # livenessProbe: {} + ## NOTE: Currently not configurable, defaults are provided for the sake of extra documentation. + # grpc: + # port: 8082 + # initialDelaySeconds: 10 + # periodSeconds: 10 + # readinessProbe: {} + ## NOTE: Currently not configurable, defaults are provided for the sake of extra documentation. + # grpc: + # port: 8082 + # initialDelaySeconds: 5 + # periodSeconds: 10 + # failureThreshold: 10 + resources: limits: cpu: 100m diff --git a/pkg/nfd-topology-updater/nfd-topology-updater.go b/pkg/nfd-topology-updater/nfd-topology-updater.go index 131547b873..165b1824e3 100644 --- a/pkg/nfd-topology-updater/nfd-topology-updater.go +++ b/pkg/nfd-topology-updater/nfd-topology-updater.go @@ -18,12 +18,16 @@ package nfdtopologyupdater import ( "fmt" + "net" "net/url" "os" "path/filepath" "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -58,6 +62,7 @@ type Args struct { KubeConfigFile string ConfigFile string KubeletStateDir string + GrpcHealthPort int Klog map[string]*utils.KlogFlagVal } @@ -85,6 +90,7 @@ type nfdTopologyUpdater struct { ownerRefs []metav1.OwnerReference k8sClient k8sclient.Interface kubeletConfigFunc func() (*kubeletconfigv1beta1.KubeletConfiguration, error) + healthServer *grpc.Server } // NewTopologyUpdater creates a new NfdTopologyUpdater instance. @@ -128,6 +134,29 @@ func (w *nfdTopologyUpdater) detectTopologyPolicyAndScope() (string, string, err return klConfig.TopologyManagerPolicy, klConfig.TopologyManagerScope, nil } +func (w *nfdTopologyUpdater) startGrpcHealthServer(errChan chan<- error) error { + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", w.args.GrpcHealthPort)) + if err != nil { + return fmt.Errorf("failed to listen: %w", err) + } + + s := grpc.NewServer() + grpc_health_v1.RegisterHealthServer(s, health.NewServer()) + klog.InfoS("gRPC health server serving", "port", w.args.GrpcHealthPort) + + go func() { + defer func() { + lis.Close() + }() + if err := s.Serve(lis); err != nil { + errChan <- fmt.Errorf("gRPC health server exited with an error: %w", err) + } + klog.InfoS("gRPC health server stopped") + }() + w.healthServer = s + return nil +} + // Run nfdTopologyUpdater. Returns if a fatal error is encountered, or, after // one request if OneShot is set to 'true' in the updater args. func (w *nfdTopologyUpdater) Run() error { @@ -187,8 +216,20 @@ func (w *nfdTopologyUpdater) Run() error { return fmt.Errorf("failed to obtain node resource information: %w", err) } + grpcErr := make(chan error, 1) + + // Start gRPC server for liveness probe (at this point we're "live") + if w.args.GrpcHealthPort != 0 { + if err := w.startGrpcHealthServer(grpcErr); err != nil { + return fmt.Errorf("failed to start gRPC health server: %w", err) + } + } + for { select { + case err := <-grpcErr: + return fmt.Errorf("error in serving gRPC: %w", err) + case info := <-w.eventSource: klog.V(4).InfoS("event received, scanning...", "event", info.Event) scanResponse, err := resScan.Scan() @@ -217,6 +258,9 @@ func (w *nfdTopologyUpdater) Run() error { case <-w.stop: klog.InfoS("shutting down nfd-topology-updater") + if w.healthServer != nil { + w.healthServer.GracefulStop() + } return nil } }