From aabc4c748576086e37129c04186c9bf8bb2776cf Mon Sep 17 00:00:00 2001 From: Tomofumi Hayashi Date: Mon, 18 Sep 2023 23:46:07 +0900 Subject: [PATCH] Add per-node-certification support This change introduces per-node certification for multus pods. Once multus pod is launched, then specified bootstrap kubeconfig is used for initial access, then multus sends CSR request to kube API to get original certs for kube API access. Once it is accepted then the multus pod uses generated certs for kube access. --- cmd/cert-approver/main.go | 363 ++++++++ cmd/kubeconfig_generator/main.go | 133 +++ go.mod | 2 +- hack/build-go.sh | 4 + images/Dockerfile | 3 +- images/Dockerfile.debug | 3 +- pkg/k8sclient/k8sclient.go | 87 -- pkg/k8sclient/kubeconfig.go | 219 +++++ pkg/multus/multus_cni100_test.go | 4 +- pkg/server/server.go | 33 +- pkg/server/types.go | 16 +- .../client-go/tools/watch/informerwatcher.go | 150 ++++ .../client-go/tools/watch/retrywatcher.go | 296 +++++++ vendor/k8s.io/client-go/tools/watch/until.go | 168 ++++ .../k8s.io/client-go/util/certificate/OWNERS | 8 + .../util/certificate/certificate_manager.go | 775 ++++++++++++++++++ .../util/certificate/certificate_store.go | 318 +++++++ .../client-go/util/certificate/csr/csr.go | 364 ++++++++ vendor/modules.txt | 5 +- 19 files changed, 2850 insertions(+), 101 deletions(-) create mode 100644 cmd/cert-approver/main.go create mode 100644 cmd/kubeconfig_generator/main.go create mode 100644 pkg/k8sclient/kubeconfig.go create mode 100644 vendor/k8s.io/client-go/tools/watch/informerwatcher.go create mode 100644 vendor/k8s.io/client-go/tools/watch/retrywatcher.go create mode 100644 vendor/k8s.io/client-go/tools/watch/until.go create mode 100644 vendor/k8s.io/client-go/util/certificate/OWNERS create mode 100644 vendor/k8s.io/client-go/util/certificate/certificate_manager.go create mode 100644 vendor/k8s.io/client-go/util/certificate/certificate_store.go create mode 100644 vendor/k8s.io/client-go/util/certificate/csr/csr.go diff --git a/cmd/cert-approver/main.go b/cmd/cert-approver/main.go new file mode 100644 index 000000000..9e891f262 --- /dev/null +++ b/cmd/cert-approver/main.go @@ -0,0 +1,363 @@ +// Copyright (c) 2023 Network Plumbing Working Group +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This is Kubernetes controller which approves CSR submitted by multus. +// This command is required only if multus runs with per-node certificate. +package main + +// Note: cert-approver should be simple, just approve multus' CSR, hence +// this go code should not have any dependencies from pkg/, if possible, +// to keep its code simplicity. +import ( + "context" + "crypto/x509" + "encoding/pem" + "fmt" + "os" + "os/signal" + "reflect" + "strings" + "syscall" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/validation" + "k8s.io/apimachinery/pkg/util/wait" + + certificatesv1 "k8s.io/api/certificates/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" + + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/certificate/csr" + "k8s.io/client-go/util/workqueue" +) + +// CertController object +type CertController struct { + clientset kubernetes.Interface + queue workqueue.RateLimitingInterface + informer cache.SharedIndexInformer + broadcaster record.EventBroadcaster + recorder record.EventRecorder + commonNamePrefixes string +} + +const ( + maxDuration = time.Hour * 24 * 365 + resyncPeriod time.Duration = time.Second * 3600 // resync every one hour, default is 10 hour + maxRetries = 5 +) + +var ( + ControllerName = "csr-approver" + NamePrefix = "system:multus" + Organization = []string{"system:multus"} + Groups = sets.New[string]("system:nodes", "system:multus", "system:authenticated") + UserPrefixes = sets.New[string]("system:node", NamePrefix) + Usages = sets.New[certificatesv1.KeyUsage]( + certificatesv1.UsageDigitalSignature, + certificatesv1.UsageClientAuth) +) + +func NewCertController() (*CertController, error) { + var clientset kubernetes.Interface + /* setup Kubernetes API client */ + config, err := rest.InClusterConfig() + if err != nil { + return nil, err + } + + clientset, err = kubernetes.NewForConfig(config) + if err != nil { + return nil, err + } + + informer := cache.NewSharedIndexInformer( + cache.NewListWatchFromClient( + clientset.CertificatesV1().RESTClient(), + "certificatesigningrequests", corev1.NamespaceAll, fields.Everything()), + &certificatesv1.CertificateSigningRequest{}, + resyncPeriod, + nil) + + broadcaster := record.NewBroadcaster() + broadcaster.StartLogging(klog.Infof) + broadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: clientset.CoreV1().Events("")}) + recorder := broadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "cert-approver"}) + queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) + c := &CertController{ + clientset: clientset, + informer: informer, + queue: queue, + commonNamePrefixes: NamePrefix, + broadcaster: broadcaster, + recorder: recorder, + } + + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + if csr, ok := obj.(*certificatesv1.CertificateSigningRequest); ok { + if c.filterCSR(csr) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err == nil { + queue.Add(key) + } + } + } + }, + }) + + return c, nil +} + +func (c *CertController) Run(stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + defer c.queue.ShutDown() + + klog.Info("Starting cert approver") + + go c.informer.Run(stopCh) + if !cache.WaitForCacheSync(stopCh, c.HasSynced) { + utilruntime.HandleError(fmt.Errorf("Timed out waiting for caches to sync")) + return + } + + klog.Info("cert approver synced and ready") + wait.Until(c.runWorker, time.Second, stopCh) +} + +// HasSynced is required for the cache.Controller interface. +func (c *CertController) HasSynced() bool { + return c.informer.HasSynced() +} + +// LastSyncResourceVersion is required for the cache.Controller interface. +func (c *CertController) LastSyncResourceVersion() string { + return c.informer.LastSyncResourceVersion() +} + +func (c *CertController) runWorker() { + for c.processNextItem() { + // continue looping + } +} + +func (c *CertController) processNextItem() bool { + // Wait until there is a new item in the working queue + key, quit := c.queue.Get() + if quit { + return false + } + // Tell the queue that we are done with processing this key. This unblocks the key for other workers + // This allows safe parallel processing because two pods with the same key are never processed in + // parallel. + defer c.queue.Done(key) + + // Invoke the method containing the business logic + err := c.processItem(key.(string)) + // Handle the error if something went wrong during the execution of the business logic + c.handleErr(err, key) + return true + +} + +// handleErr checks if an error happened and makes sure we will retry later. +func (c *CertController) handleErr(err error, key interface{}) { + if err == nil { + // Forget about the #AddRateLimited history of the key on every successful synchronization. + // This ensures that future processing of updates for this key is not delayed because of + // an outdated error history. + c.queue.Forget(key) + return + } + + // This controller retries 5 times if something goes wrong. After that, it stops trying. + if c.queue.NumRequeues(key) < maxRetries { + klog.Infof("Error syncing csr %s: %v", key, err) + // Re-enqueue the key rate limited. Based on the rate limiter on the + // queue and the re-enqueue history, the key will be processed later again. + c.queue.AddRateLimited(key) + return + } + + c.queue.Forget(key) + // Report to an external entity that, even after several retries, we could not successfully process this key + utilruntime.HandleError(err) + klog.Infof("Dropping csr %q out of the queue: %v", key, err) +} + +func (c *CertController) processItem(key string) error { + startTime := time.Now() + + obj, _, err := c.informer.GetIndexer().GetByKey(key) + if err != nil { + return fmt.Errorf("Error fetching object with key %s from store: %v", key, err) + } + + req, _ := obj.(*certificatesv1.CertificateSigningRequest) + + nodeName := "unknown" + defer func() { + klog.Infof("Finished syncing CSR %s for %s node in %v", req.Name, nodeName, time.Since(startTime)) + }() + + if len(req.Status.Certificate) > 0 { + klog.V(5).Infof("CSR %s is already signed", req.Name) + return nil + } + + if isApprovedOrDenied(&req.Status) { + klog.V(5).Infof("CSR %s is already approved/denied", req.Name) + return nil + } + + csrPEM, _ := pem.Decode(req.Spec.Request) + if csrPEM == nil { + return fmt.Errorf("failed to PEM-parse the CSR block in .spec.request: no CSRs were found") + } + + x509CSR, err := x509.ParseCertificateRequest(csrPEM.Bytes) + if err != nil { + return fmt.Errorf("failed to parse the CSR bytes: %v", err) + } + + i := strings.LastIndex(req.Spec.Username, ":") + if i == -1 || i == len(req.Spec.Username)-1 { + return fmt.Errorf("failed to parse the username: %s", req.Spec.Username) + } + + ctx := context.Background() + prefix := req.Spec.Username[:i] + nodeName = req.Spec.Username[i+1:] + if !UserPrefixes.Has(prefix) { + return c.denyCSR(ctx, req, fmt.Sprintf("CSR %q was created by an unexpected user: %q", req.Name, req.Spec.Username)) + } + + if errs := validation.IsDNS1123Subdomain(nodeName); len(errs) != 0 { + return c.denyCSR(ctx, req, fmt.Sprintf("extracted node name %q is not a valid DNS subdomain %v", nodeName, errs)) + } + + if usages := sets.New[certificatesv1.KeyUsage](req.Spec.Usages...); !usages.Equal(Usages) { + return c.denyCSR(ctx, req, fmt.Sprintf("CSR %q was created with unexpected usages: %v", req.Name, usages.UnsortedList())) + } + + if !Groups.HasAll(req.Spec.Groups...) { + return c.denyCSR(ctx, req, fmt.Sprintf("CSR %q was created by a user with unexpected groups: %v", req.Name, req.Spec.Groups)) + } + + expectedSubject := fmt.Sprintf("%s:%s", c.commonNamePrefixes, nodeName) + if x509CSR.Subject.CommonName != expectedSubject { + return c.denyCSR(ctx, req, fmt.Sprintf("expected the CSR's commonName to be %q, but it is %q", expectedSubject, x509CSR.Subject.CommonName)) + } + + if !reflect.DeepEqual(x509CSR.Subject.Organization, Organization) { + return c.denyCSR(ctx, req, fmt.Sprintf("expected the CSR's organization to be %v, but it is %v", Organization, x509CSR.Subject.Organization)) + } + + if req.Spec.ExpirationSeconds == nil { + return c.denyCSR(ctx, req, fmt.Sprintf("CSR %q was created without specyfying the expirationSeconds", req.Name)) + } + + if csr.ExpirationSecondsToDuration(*req.Spec.ExpirationSeconds) > maxDuration { + return c.denyCSR(ctx, req, fmt.Sprintf("CSR %q was created with invalid expirationSeconds value: %d", req.Name, *req.Spec.ExpirationSeconds)) + } + + return c.approveCSR(ctx, req) +} + +// CSR specific functions + +func (c *CertController) filterCSR(csr *certificatesv1.CertificateSigningRequest) bool { + nsName := types.NamespacedName{Namespace: csr.Namespace, Name: csr.Name} + csrPEM, _ := pem.Decode(csr.Spec.Request) + if csrPEM == nil { + klog.Errorf("Failed to PEM-parse the CSR block in .spec.request: no CSRs were found in %s", nsName) + return false + } + + x509CSR, err := x509.ParseCertificateRequest(csrPEM.Bytes) + if err != nil { + klog.Errorf("Failed to parse the CSR .spec.request of %q: %v", nsName, err) + return false + } + + return strings.HasPrefix(x509CSR.Subject.CommonName, c.commonNamePrefixes) && + csr.Spec.SignerName == certificatesv1.KubeAPIServerClientSignerName +} + +func (c *CertController) approveCSR(ctx context.Context, csr *certificatesv1.CertificateSigningRequest) error { + csr.Status.Conditions = append(csr.Status.Conditions, + certificatesv1.CertificateSigningRequestCondition{ + Type: certificatesv1.CertificateApproved, + Status: corev1.ConditionTrue, + Reason: "AutoApproved", + Message: fmt.Sprintf("Auto-approved CSR %q", csr.Name), + }) + + c.recorder.Eventf(csr, corev1.EventTypeNormal, "CSRApproved", "CSR %q has been approved by %s", csr.Name, ControllerName) + _, err := c.clientset.CertificatesV1().CertificateSigningRequests().UpdateApproval(ctx, csr.Name, csr, metav1.UpdateOptions{}) + return err +} + +func (c *CertController) denyCSR(ctx context.Context, csr *certificatesv1.CertificateSigningRequest, message string) error { + csr.Status.Conditions = append(csr.Status.Conditions, + certificatesv1.CertificateSigningRequestCondition{ + Type: certificatesv1.CertificateDenied, + Status: corev1.ConditionTrue, + Reason: "CSRDenied", + Message: message, + }, + ) + + c.recorder.Eventf(csr, corev1.EventTypeWarning, "CSRDenied", "The CSR %q has been denied by: %s", csr.Name, ControllerName, message) + _, err := c.clientset.CertificatesV1().CertificateSigningRequests().Update(ctx, csr, metav1.UpdateOptions{}) + return err +} + +func isApprovedOrDenied(status *certificatesv1.CertificateSigningRequestStatus) bool { + for _, c := range status.Conditions { + if c.Type == certificatesv1.CertificateApproved || c.Type == certificatesv1.CertificateDenied { + return true + } + } + return false +} + +func main() { + klog.Infof("starting cert-approver") + + //Start watching for pod creations + certController, err := NewCertController() + if err != nil { + klog.Fatal(err) + } + + stopCh := make(chan struct{}) + defer close(stopCh) + go certController.Run(stopCh) + + sigterm := make(chan os.Signal, 1) + signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL) + <-sigterm +} diff --git a/cmd/kubeconfig_generator/main.go b/cmd/kubeconfig_generator/main.go new file mode 100644 index 000000000..b5c18036e --- /dev/null +++ b/cmd/kubeconfig_generator/main.go @@ -0,0 +1,133 @@ +// Copyright (c) 2023 Multus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This binary submit CSR for kube controll access for multus thin plugin +// and generate Kubeconfig +package main + +import ( + "encoding/base64" + "fmt" + "os" + "os/signal" + "syscall" + "text/template" + + "github.com/spf13/pflag" + + "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/k8sclient" + + "k8s.io/client-go/tools/clientcmd" + "k8s.io/klog/v2" +) + +var kubeConfigTemplate = `apiVersion: v1 +clusters: + - cluster: + certificate-authority-data: {{.CADATA}} + server: {{.K8S_APISERVER}} + name: default-cluster +contexts: + - context: + cluster: default-cluster + namespace: default + user: default-auth + name: default-context +current-context: default-context +kind: Config +preferences: {} +users: + - name: default-auth + user: + client-certificate: {{.CERTDIR}}/multus-client-current.pem + client-key: {{.CERTDIR}}/multus-client-current.pem +` + +func main() { + certDir := pflag.StringP("certdir", "", "/tmp", "specify cert directory") + bootstrapConfig := pflag.StringP("bootstrap-config", "", "/tmp/kubeconfig", "specify bootstrap kubernetes config") + kubeconfigPath := pflag.StringP("kubeconfig", "", "/run/multus/kubeconfig", "specify output kubeconfig path") + helpFlag := pflag.BoolP("help", "h", false, "show help message and quit") + + pflag.Parse() + if *helpFlag { + pflag.PrintDefaults() + os.Exit(1) + } + + // check variables + if _, err := os.Stat(*bootstrapConfig); err != nil { + klog.Fatalf("failed to read bootstrap config %q", *bootstrapConfig) + } + st, err := os.Stat(*certDir) + if err != nil { + klog.Fatalf("failed to find cert directory %q", *certDir) + } + if !st.IsDir() { + klog.Fatalf("cert directory %q is not directory", *certDir) + } + + nodeName := os.Getenv("K8S_NODE") + if nodeName == "" { + klog.Fatalf("cannot identify node name from K8S_NODE env variables") + } + + // retrieve API server from bootstrapConfig() + config, err := clientcmd.BuildConfigFromFlags("", *bootstrapConfig) + if err != nil { + klog.Fatalf("cannot get in-cluster config: %v", err) + } + apiServer := fmt.Sprintf("%s%s", config.Host, config.APIPath) + caData := base64.StdEncoding.EncodeToString(config.CAData) + + // run certManager to create certification + if _, err = k8sclient.PerNodeK8sClient(nodeName, *bootstrapConfig, *certDir); err != nil { + klog.Fatalf("failed to start cert manager: %v", err) + } + + fp, err := os.OpenFile(*kubeconfigPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600) + if err != nil { + klog.Fatalf("cannot create kubeconfig file %q: %v", *kubeconfigPath, err) + } + + // render kubeconfig + templateKubeconfig, err := template.New("kubeconfig").Parse(kubeConfigTemplate) + if err != nil { + klog.Fatalf("template parse error: %v", err) + } + templateData := map[string]string{ + "CADATA": caData, + "CERTDIR": *certDir, + "K8S_APISERVER": apiServer, + } + // genearate kubeconfig from template + if err = templateKubeconfig.Execute(fp, templateData); err != nil { + klog.Fatalf("cannot create kubeconfig: %v", err) + } + if err = fp.Close(); err != nil { + klog.Fatalf("cannot save kubeconfig: %v", err) + } + + klog.Infof("kubeconfig %q is saved", *kubeconfigPath) + + // wait for signal + sigterm := make(chan os.Signal, 1) + signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL) + <-sigterm + klog.Infof("signal received. remove kubeconfig %q and quit.", *kubeconfigPath) + err = os.Remove(*kubeconfigPath) + if err != nil { + klog.Errorf("failed to remove kubeconfig %q: %v", *kubeconfigPath, err) + } +} diff --git a/go.mod b/go.mod index b4216dac0..cb6b6f361 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( k8s.io/apimachinery v0.27.5 k8s.io/client-go v1.5.2 k8s.io/klog v1.0.0 - k8s.io/klog/v2 v2.90.1 // indirect + k8s.io/klog/v2 v2.90.1 k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f // indirect k8s.io/kubelet v0.27.5 sigs.k8s.io/yaml v1.3.0 // indirect diff --git a/hack/build-go.sh b/hack/build-go.sh index e0ecbca1e..2a0893624 100755 --- a/hack/build-go.sh +++ b/hack/build-go.sh @@ -76,3 +76,7 @@ echo "Building install_multus" go build -o "${DEST_DIR}"/install_multus ${BUILD_ARGS} -ldflags "${LDFLAGS}" ./cmd/install_multus echo "Building thin_entrypoint" go build -o "${DEST_DIR}"/thin_entrypoint ${BUILD_ARGS} -ldflags "${LDFLAGS}" ./cmd/thin_entrypoint +echo "Building kubeconfig_generator" +go build -o "${DEST_DIR}"/kubeconfig_generator ${BUILD_ARGS} -ldflags "${LDFLAGS}" ./cmd/kubeconfig_generator +echo "Building cert-approver" +go build -o "${DEST_DIR}"/cert-approver ${BUILD_ARGS} -ldflags "${LDFLAGS}" ./cmd/cert-approver diff --git a/images/Dockerfile b/images/Dockerfile index 3b0c17814..5c9d46752 100644 --- a/images/Dockerfile +++ b/images/Dockerfile @@ -16,4 +16,5 @@ WORKDIR / COPY --from=build /usr/src/multus-cni/bin/install_multus / COPY --from=build /usr/src/multus-cni/bin/thin_entrypoint / -ENTRYPOINT ["/thin_entrypoint"] +COPY --from=build /usr/src/multus-cni/bin/kubeconfig_generator / +CMD ["/thin_entrypoint"] diff --git a/images/Dockerfile.debug b/images/Dockerfile.debug index 4d837bb3b..4d30ae4a4 100644 --- a/images/Dockerfile.debug +++ b/images/Dockerfile.debug @@ -16,4 +16,5 @@ WORKDIR / COPY --from=build /usr/src/multus-cni/bin/install_multus / COPY --from=build /usr/src/multus-cni/bin/thin_entrypoint / -ENTRYPOINT ["/thin_entrypoint"] +COPY --from=build /usr/src/multus-cni/bin/kubeconfig_generator / +CMD ["/thin_entrypoint"] diff --git a/pkg/k8sclient/k8sclient.go b/pkg/k8sclient/k8sclient.go index cdc209737..c1d1857a9 100644 --- a/pkg/k8sclient/k8sclient.go +++ b/pkg/k8sclient/k8sclient.go @@ -23,18 +23,12 @@ import ( "os" "regexp" "strings" - "time" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" - v1core "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/record" - "k8s.io/klog" "github.com/containernetworking/cni/libcni" "github.com/containernetworking/cni/pkg/skel" @@ -393,87 +387,6 @@ func TryLoadPodDelegates(pod *v1.Pod, conf *types.NetConf, clientInfo *ClientInf return 0, clientInfo, err } -// InClusterK8sClient returns the `k8s.ClientInfo` struct to use to connect to -// the k8s API. -func InClusterK8sClient() (*ClientInfo, error) { - clientInfo, err := GetK8sClient("", nil) - if err != nil { - return nil, err - } - if clientInfo == nil { - return nil, fmt.Errorf("failed to create in-cluster kube client") - } - return clientInfo, err -} - -// GetK8sClient gets client info from kubeconfig -func GetK8sClient(kubeconfig string, kubeClient *ClientInfo) (*ClientInfo, error) { - logging.Debugf("GetK8sClient: %s, %v", kubeconfig, kubeClient) - // If we get a valid kubeClient (eg from testcases) just return that - // one. - if kubeClient != nil { - return kubeClient, nil - } - - var err error - var config *rest.Config - - // Otherwise try to create a kubeClient from a given kubeConfig - if kubeconfig != "" { - // uses the current context in kubeconfig - config, err = clientcmd.BuildConfigFromFlags("", kubeconfig) - if err != nil { - return nil, logging.Errorf("GetK8sClient: failed to get context for the kubeconfig %v: %v", kubeconfig, err) - } - } else if os.Getenv("KUBERNETES_SERVICE_HOST") != "" && os.Getenv("KUBERNETES_SERVICE_PORT") != "" { - // Try in-cluster config where multus might be running in a kubernetes pod - config, err = rest.InClusterConfig() - if err != nil { - return nil, logging.Errorf("GetK8sClient: failed to get context for in-cluster kube config: %v", err) - } - } else { - // No kubernetes config; assume we shouldn't talk to Kube at all - return nil, nil - } - - // Specify that we use gRPC - config.AcceptContentTypes = "application/vnd.kubernetes.protobuf,application/json" - config.ContentType = "application/vnd.kubernetes.protobuf" - // Set the config timeout to one minute. - config.Timeout = time.Minute - // Allow multus (especially in server mode) to make more concurrent requests - // to reduce client-side throttling - config.QPS = 50 - config.Burst = 50 - - return newClientInfo(config) -} - -// newClientInfo returns a `ClientInfo` from a configuration created from an -// existing kubeconfig file. -func newClientInfo(config *rest.Config) (*ClientInfo, error) { - client, err := kubernetes.NewForConfig(config) - if err != nil { - return nil, err - } - - netclient, err := netclient.NewForConfig(config) - if err != nil { - return nil, err - } - - broadcaster := record.NewBroadcaster() - broadcaster.StartLogging(klog.Infof) - broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")}) - recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "multus"}) - return &ClientInfo{ - Client: client, - NetClient: netclient, - EventBroadcaster: broadcaster, - EventRecorder: recorder, - }, nil -} - // GetPodNetwork gets net-attach-def annotation from pod func GetPodNetwork(pod *v1.Pod) ([]*types.NetworkSelectionElement, error) { logging.Debugf("GetPodNetwork: %v", pod) diff --git a/pkg/k8sclient/kubeconfig.go b/pkg/k8sclient/kubeconfig.go new file mode 100644 index 000000000..9e9f2e8d0 --- /dev/null +++ b/pkg/k8sclient/kubeconfig.go @@ -0,0 +1,219 @@ +// Copyright (c) 2023 Multus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package k8sclient + +import ( + "context" + "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" + "fmt" + "os" + "path" + "time" + + certificatesv1 "k8s.io/api/certificates/v1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/transport" + "k8s.io/client-go/util/certificate" + "k8s.io/klog" + + netclient "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/clientset/versioned/typed/k8s.cni.cncf.io/v1" + "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/logging" +) + +const ( + certNamePrefix = "multus-client" + certCommonNamePrefix = "system:multus" + certOrganization = "system:multus" +) + +var ( + certUsages = []certificatesv1.KeyUsage{certificatesv1.UsageDigitalSignature, certificatesv1.UsageClientAuth} +) + +// getPerNodeKubeconfig creates new kubeConfig, based on bootstrap, with new certDir +func getPerNodeKubeconfig(bootstrap *rest.Config, certDir string) *rest.Config { + return &rest.Config{ + Host: bootstrap.Host, + APIPath: bootstrap.APIPath, + ContentConfig: rest.ContentConfig{ + AcceptContentTypes: "application/vnd.kubernetes.protobuf,application/json", + ContentType: "application/vnd.kubernetes.protobuf", + }, + TLSClientConfig: rest.TLSClientConfig{ + KeyFile: path.Join(certDir, certNamePrefix+"-current.pem"), + CertFile: path.Join(certDir, certNamePrefix+"-current.pem"), + CAData: bootstrap.TLSClientConfig.CAData, + }, + // Allow multus (especially in server mode) to make more concurrent requests + // to reduce client-side throttling + QPS: 50, + Burst: 50, + // Set the config timeout to one minute. + Timeout: time.Minute, + } +} + +// PerNodeK8sClient creates/reload new multus kubeconfig per-node. +func PerNodeK8sClient(nodeName, bootstrapKubeconfigFile, certDir string) (*ClientInfo, error) { + bootstrapKubeconfig, err := clientcmd.BuildConfigFromFlags("", bootstrapKubeconfigFile) + if err != nil { + return nil, logging.Errorf("failed to load bootstrap kubeconfig %s: %v", bootstrapKubeconfigFile, err) + } + config := getPerNodeKubeconfig(bootstrapKubeconfig, certDir) + + // If we have a valid certificate, user that to fetch CSRs. + // Otherwise, use the bootstrap credentials from bootstrapKubeconfig + // https://github.com/kubernetes/kubernetes/blob/068ee321bc7bfe1c2cefb87fb4d9e5deea84fbc8/cmd/kubelet/app/server.go#L953-L963 + newClientsetFn := func(current *tls.Certificate) (kubernetes.Interface, error) { + cfg := bootstrapKubeconfig + if current != nil { + cfg = config + } + return kubernetes.NewForConfig(cfg) + } + + certificateStore, err := certificate.NewFileStore(certNamePrefix, certDir, certDir, "", "") + if err != nil { + return nil, logging.Errorf("failed to initialize the certificate store: %v", err) + } + + certDuration := 10 * time.Minute + certManager, err := certificate.NewManager(&certificate.Config{ + ClientsetFn: newClientsetFn, + Template: &x509.CertificateRequest{ + Subject: pkix.Name{ + CommonName: fmt.Sprintf("%s:%s", certCommonNamePrefix, nodeName), + Organization: []string{certOrganization}, + }, + }, + RequestedCertificateLifetime: &certDuration, + SignerName: certificatesv1.KubeAPIServerClientSignerName, + Usages: certUsages, + CertificateStore: certificateStore, + }) + if err != nil { + return nil, logging.Errorf("failed to initialize the certificate manager: %v", err) + } + if certDuration < time.Hour { + // the default value for CertCallbackRefreshDuration (5min) is too long for short-lived certs, + // set it to a more sensible value + transport.CertCallbackRefreshDuration = time.Second * 10 + } + certManager.Start() + + logging.Verbosef("Waiting for certificate") + var storeErr error + err = wait.PollWithContext(context.TODO(), time.Second, 2*time.Minute, func(_ context.Context) (bool, error) { + var currentCert *tls.Certificate + currentCert, storeErr = certificateStore.Current() + return currentCert != nil && storeErr == nil, nil + }) + if err != nil { + return nil, logging.Errorf("certificate was not signed, last cert store err: %v err: %v", storeErr, err) + } + logging.Verbosef("Certificate found!") + + return newClientInfo(config) +} + +// InClusterK8sClient returns the `k8s.ClientInfo` struct to use to connect to +// the k8s API. +func InClusterK8sClient() (*ClientInfo, error) { + clientInfo, err := GetK8sClient("", nil) + if err != nil { + return nil, err + } + if clientInfo == nil { + return nil, fmt.Errorf("failed to create in-cluster kube client") + } + return clientInfo, err +} + +// GetK8sClient gets client info from kubeconfig +func GetK8sClient(kubeconfig string, kubeClient *ClientInfo) (*ClientInfo, error) { + logging.Debugf("GetK8sClient: %s, %v", kubeconfig, kubeClient) + // If we get a valid kubeClient (eg from testcases) just return that + // one. + if kubeClient != nil { + return kubeClient, nil + } + + var err error + var config *rest.Config + + // Otherwise try to create a kubeClient from a given kubeConfig + if kubeconfig != "" { + // uses the current context in kubeconfig + config, err = clientcmd.BuildConfigFromFlags("", kubeconfig) + if err != nil { + return nil, logging.Errorf("GetK8sClient: failed to get context for the kubeconfig %v: %v", kubeconfig, err) + } + } else if os.Getenv("KUBERNETES_SERVICE_HOST") != "" && os.Getenv("KUBERNETES_SERVICE_PORT") != "" { + // Try in-cluster config where multus might be running in a kubernetes pod + config, err = rest.InClusterConfig() + if err != nil { + return nil, logging.Errorf("GetK8sClient: failed to get context for in-cluster kube config: %v", err) + } + } else { + // No kubernetes config; assume we shouldn't talk to Kube at all + return nil, nil + } + + // Specify that we use gRPC + config.AcceptContentTypes = "application/vnd.kubernetes.protobuf,application/json" + config.ContentType = "application/vnd.kubernetes.protobuf" + // Set the config timeout to one minute. + config.Timeout = time.Minute + // Allow multus (especially in server mode) to make more concurrent requests + // to reduce client-side throttling + config.QPS = 50 + config.Burst = 50 + + return newClientInfo(config) +} + +// newClientInfo returns a `ClientInfo` from a configuration created from an +// existing kubeconfig file. +func newClientInfo(config *rest.Config) (*ClientInfo, error) { + client, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, err + } + + netclient, err := netclient.NewForConfig(config) + if err != nil { + return nil, err + } + + broadcaster := record.NewBroadcaster() + broadcaster.StartLogging(klog.Infof) + broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")}) + recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "multus"}) + return &ClientInfo{ + Client: client, + NetClient: netclient, + EventBroadcaster: broadcaster, + EventRecorder: recorder, + }, nil +} diff --git a/pkg/multus/multus_cni100_test.go b/pkg/multus/multus_cni100_test.go index fca8aac5b..51c4b6e23 100644 --- a/pkg/multus/multus_cni100_test.go +++ b/pkg/multus/multus_cni100_test.go @@ -42,7 +42,7 @@ import ( ) func newPodInformer(ctx context.Context, kclient kubernetes.Interface) cache.SharedIndexInformer { - informerFactory := informerfactory.NewSharedInformerFactory(kclient, 0 * time.Second) + informerFactory := informerfactory.NewSharedInformerFactory(kclient, 0*time.Second) podInformer := informerFactory.InformerFor(&kapi.Pod{}, func(c kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { return v1coreinformers.NewFilteredPodInformer( @@ -55,7 +55,7 @@ func newPodInformer(ctx context.Context, kclient kubernetes.Interface) cache.Sha informerFactory.Start(ctx.Done()) - waitCtx, waitCancel := context.WithTimeout(ctx, 20 * time.Second) + waitCtx, waitCancel := context.WithTimeout(ctx, 20*time.Second) if !cache.WaitForCacheSync(waitCtx.Done(), podInformer.HasSynced) { logging.Errorf("failed to sync pod informer cache") } diff --git a/pkg/server/server.go b/pkg/server/server.go index fb2ddafd5..bf83842c9 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -170,11 +170,38 @@ func newPodInformer(kubeClient kubernetes.Interface, nodeName string) (internali return informerFactory, podInformer } +func isPerNodeCertEnabled(config *PerNodeCertificate) (bool, error) { + if config.Enabled { + if config.BootstrapKubeconfig != "" && config.CertDir != "" { + return true, nil + } + return true, logging.Errorf("failed to configure PerNodeCertificate: enabled: %v, BootstrapKubeconfig: %q, CertDir: %q", config.Enabled, config.BootstrapKubeconfig, config.CertDir) + } + return false, nil +} + // NewCNIServer creates and returns a new Server object which will listen on a socket in the given path func NewCNIServer(daemonConfig *ControllerNetConf, serverConfig []byte, ignoreReadinessIndicator bool) (*Server, error) { - kubeClient, err := k8s.InClusterK8sClient() - if err != nil { - return nil, fmt.Errorf("error getting k8s client: %v", err) + var kubeClient *k8s.ClientInfo + enabled, err := isPerNodeCertEnabled(daemonConfig.PerNodeCertificate) + if enabled { + if err != nil { + return nil, err + } + perNodeCertConfig := daemonConfig.PerNodeCertificate + nodeName := os.Getenv("K8S_NODE") + if nodeName == "" { + return nil, logging.Errorf("error getting node name for perNodeCertificate") + } + kubeClient, err = k8s.PerNodeK8sClient(nodeName, perNodeCertConfig.BootstrapKubeconfig, perNodeCertConfig.CertDir) + if err != nil { + return nil, logging.Errorf("error getting perNodeClient: %v", err) + } + } else { + kubeClient, err = k8s.InClusterK8sClient() + if err != nil { + return nil, fmt.Errorf("error getting k8s client: %v", err) + } } exec := invoke.Exec(nil) diff --git a/pkg/server/types.go b/pkg/server/types.go index b7cf11d7d..731ad2862 100644 --- a/pkg/server/types.go +++ b/pkg/server/types.go @@ -56,12 +56,20 @@ type Server struct { ignoreReadinessIndicator bool } +// PerNodeCertificate for auto certificate generation for per node +type PerNodeCertificate struct { + Enabled bool `json:"enabled,omitempty"` + BootstrapKubeconfig string `json:"bootstrapKubeconfig,omitempty"` + CertDir string `json:"certDir,omitempty"` +} + // ControllerNetConf for the controller cni configuration type ControllerNetConf struct { - ChrootDir string `json:"chrootDir,omitempty"` - LogFile string `json:"logFile"` - LogLevel string `json:"logLevel"` - LogToStderr bool `json:"logToStderr,omitempty"` + ChrootDir string `json:"chrootDir,omitempty"` + LogFile string `json:"logFile"` + LogLevel string `json:"logLevel"` + LogToStderr bool `json:"logToStderr,omitempty"` + PerNodeCertificate *PerNodeCertificate `json:"perNodeCertificate,omitempty"` MetricsPort *int `json:"metricsPort,omitempty"` diff --git a/vendor/k8s.io/client-go/tools/watch/informerwatcher.go b/vendor/k8s.io/client-go/tools/watch/informerwatcher.go new file mode 100644 index 000000000..5e6aad5cf --- /dev/null +++ b/vendor/k8s.io/client-go/tools/watch/informerwatcher.go @@ -0,0 +1,150 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "sync" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" +) + +func newEventProcessor(out chan<- watch.Event) *eventProcessor { + return &eventProcessor{ + out: out, + cond: sync.NewCond(&sync.Mutex{}), + done: make(chan struct{}), + } +} + +// eventProcessor buffers events and writes them to an out chan when a reader +// is waiting. Because of the requirement to buffer events, it synchronizes +// input with a condition, and synchronizes output with a channels. It needs to +// be able to yield while both waiting on an input condition and while blocked +// on writing to the output channel. +type eventProcessor struct { + out chan<- watch.Event + + cond *sync.Cond + buff []watch.Event + + done chan struct{} +} + +func (e *eventProcessor) run() { + for { + batch := e.takeBatch() + e.writeBatch(batch) + if e.stopped() { + return + } + } +} + +func (e *eventProcessor) takeBatch() []watch.Event { + e.cond.L.Lock() + defer e.cond.L.Unlock() + + for len(e.buff) == 0 && !e.stopped() { + e.cond.Wait() + } + + batch := e.buff + e.buff = nil + return batch +} + +func (e *eventProcessor) writeBatch(events []watch.Event) { + for _, event := range events { + select { + case e.out <- event: + case <-e.done: + return + } + } +} + +func (e *eventProcessor) push(event watch.Event) { + e.cond.L.Lock() + defer e.cond.L.Unlock() + defer e.cond.Signal() + e.buff = append(e.buff, event) +} + +func (e *eventProcessor) stopped() bool { + select { + case <-e.done: + return true + default: + return false + } +} + +func (e *eventProcessor) stop() { + close(e.done) + e.cond.Signal() +} + +// NewIndexerInformerWatcher will create an IndexerInformer and wrap it into watch.Interface +// so you can use it anywhere where you'd have used a regular Watcher returned from Watch method. +// it also returns a channel you can use to wait for the informers to fully shutdown. +func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface, <-chan struct{}) { + ch := make(chan watch.Event) + w := watch.NewProxyWatcher(ch) + e := newEventProcessor(ch) + + indexer, informer := cache.NewIndexerInformer(lw, objType, 0, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + e.push(watch.Event{ + Type: watch.Added, + Object: obj.(runtime.Object), + }) + }, + UpdateFunc: func(old, new interface{}) { + e.push(watch.Event{ + Type: watch.Modified, + Object: new.(runtime.Object), + }) + }, + DeleteFunc: func(obj interface{}) { + staleObj, stale := obj.(cache.DeletedFinalStateUnknown) + if stale { + // We have no means of passing the additional information down using + // watch API based on watch.Event but the caller can filter such + // objects by checking if metadata.deletionTimestamp is set + obj = staleObj.Obj + } + + e.push(watch.Event{ + Type: watch.Deleted, + Object: obj.(runtime.Object), + }) + }, + }, cache.Indexers{}) + + go e.run() + + doneCh := make(chan struct{}) + go func() { + defer close(doneCh) + defer e.stop() + informer.Run(w.StopChan()) + }() + + return indexer, informer, w, doneCh +} diff --git a/vendor/k8s.io/client-go/tools/watch/retrywatcher.go b/vendor/k8s.io/client-go/tools/watch/retrywatcher.go new file mode 100644 index 000000000..e4806d2ea --- /dev/null +++ b/vendor/k8s.io/client-go/tools/watch/retrywatcher.go @@ -0,0 +1,296 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "context" + "errors" + "fmt" + "io" + "net/http" + "time" + + "github.com/davecgh/go-spew/spew" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/net" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" +) + +// resourceVersionGetter is an interface used to get resource version from events. +// We can't reuse an interface from meta otherwise it would be a cyclic dependency and we need just this one method +type resourceVersionGetter interface { + GetResourceVersion() string +} + +// RetryWatcher will make sure that in case the underlying watcher is closed (e.g. due to API timeout or etcd timeout) +// it will get restarted from the last point without the consumer even knowing about it. +// RetryWatcher does that by inspecting events and keeping track of resourceVersion. +// Especially useful when using watch.UntilWithoutRetry where premature termination is causing issues and flakes. +// Please note that this is not resilient to etcd cache not having the resource version anymore - you would need to +// use Informers for that. +type RetryWatcher struct { + lastResourceVersion string + watcherClient cache.Watcher + resultChan chan watch.Event + stopChan chan struct{} + doneChan chan struct{} + minRestartDelay time.Duration +} + +// NewRetryWatcher creates a new RetryWatcher. +// It will make sure that watches gets restarted in case of recoverable errors. +// The initialResourceVersion will be given to watch method when first called. +func NewRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher) (*RetryWatcher, error) { + return newRetryWatcher(initialResourceVersion, watcherClient, 1*time.Second) +} + +func newRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher, minRestartDelay time.Duration) (*RetryWatcher, error) { + switch initialResourceVersion { + case "", "0": + // TODO: revisit this if we ever get WATCH v2 where it means start "now" + // without doing the synthetic list of objects at the beginning (see #74022) + return nil, fmt.Errorf("initial RV %q is not supported due to issues with underlying WATCH", initialResourceVersion) + default: + break + } + + rw := &RetryWatcher{ + lastResourceVersion: initialResourceVersion, + watcherClient: watcherClient, + stopChan: make(chan struct{}), + doneChan: make(chan struct{}), + resultChan: make(chan watch.Event, 0), + minRestartDelay: minRestartDelay, + } + + go rw.receive() + return rw, nil +} + +func (rw *RetryWatcher) send(event watch.Event) bool { + // Writing to an unbuffered channel is blocking operation + // and we need to check if stop wasn't requested while doing so. + select { + case rw.resultChan <- event: + return true + case <-rw.stopChan: + return false + } +} + +// doReceive returns true when it is done, false otherwise. +// If it is not done the second return value holds the time to wait before calling it again. +func (rw *RetryWatcher) doReceive() (bool, time.Duration) { + watcher, err := rw.watcherClient.Watch(metav1.ListOptions{ + ResourceVersion: rw.lastResourceVersion, + AllowWatchBookmarks: true, + }) + // We are very unlikely to hit EOF here since we are just establishing the call, + // but it may happen that the apiserver is just shutting down (e.g. being restarted) + // This is consistent with how it is handled for informers + switch err { + case nil: + break + + case io.EOF: + // watch closed normally + return false, 0 + + case io.ErrUnexpectedEOF: + klog.V(1).InfoS("Watch closed with unexpected EOF", "err", err) + return false, 0 + + default: + msg := "Watch failed" + if net.IsProbableEOF(err) || net.IsTimeout(err) { + klog.V(5).InfoS(msg, "err", err) + // Retry + return false, 0 + } + + klog.ErrorS(err, msg) + // Retry + return false, 0 + } + + if watcher == nil { + klog.ErrorS(nil, "Watch returned nil watcher") + // Retry + return false, 0 + } + + ch := watcher.ResultChan() + defer watcher.Stop() + + for { + select { + case <-rw.stopChan: + klog.V(4).InfoS("Stopping RetryWatcher.") + return true, 0 + case event, ok := <-ch: + if !ok { + klog.V(4).InfoS("Failed to get event! Re-creating the watcher.", "resourceVersion", rw.lastResourceVersion) + return false, 0 + } + + // We need to inspect the event and get ResourceVersion out of it + switch event.Type { + case watch.Added, watch.Modified, watch.Deleted, watch.Bookmark: + metaObject, ok := event.Object.(resourceVersionGetter) + if !ok { + _ = rw.send(watch.Event{ + Type: watch.Error, + Object: &apierrors.NewInternalError(errors.New("retryWatcher: doesn't support resourceVersion")).ErrStatus, + }) + // We have to abort here because this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data! + return true, 0 + } + + resourceVersion := metaObject.GetResourceVersion() + if resourceVersion == "" { + _ = rw.send(watch.Event{ + Type: watch.Error, + Object: &apierrors.NewInternalError(fmt.Errorf("retryWatcher: object %#v doesn't support resourceVersion", event.Object)).ErrStatus, + }) + // We have to abort here because this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data! + return true, 0 + } + + // All is fine; send the non-bookmark events and update resource version. + if event.Type != watch.Bookmark { + ok = rw.send(event) + if !ok { + return true, 0 + } + } + rw.lastResourceVersion = resourceVersion + + continue + + case watch.Error: + // This round trip allows us to handle unstructured status + errObject := apierrors.FromObject(event.Object) + statusErr, ok := errObject.(*apierrors.StatusError) + if !ok { + klog.Error(spew.Sprintf("Received an error which is not *metav1.Status but %#+v", event.Object)) + // Retry unknown errors + return false, 0 + } + + status := statusErr.ErrStatus + + statusDelay := time.Duration(0) + if status.Details != nil { + statusDelay = time.Duration(status.Details.RetryAfterSeconds) * time.Second + } + + switch status.Code { + case http.StatusGone: + // Never retry RV too old errors + _ = rw.send(event) + return true, 0 + + case http.StatusGatewayTimeout, http.StatusInternalServerError: + // Retry + return false, statusDelay + + default: + // We retry by default. RetryWatcher is meant to proceed unless it is certain + // that it can't. If we are not certain, we proceed with retry and leave it + // up to the user to timeout if needed. + + // Log here so we have a record of hitting the unexpected error + // and we can whitelist some error codes if we missed any that are expected. + klog.V(5).Info(spew.Sprintf("Retrying after unexpected error: %#+v", event.Object)) + + // Retry + return false, statusDelay + } + + default: + klog.Errorf("Failed to recognize Event type %q", event.Type) + _ = rw.send(watch.Event{ + Type: watch.Error, + Object: &apierrors.NewInternalError(fmt.Errorf("retryWatcher failed to recognize Event type %q", event.Type)).ErrStatus, + }) + // We are unable to restart the watch and have to stop the loop or this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data! + return true, 0 + } + } + } +} + +// receive reads the result from a watcher, restarting it if necessary. +func (rw *RetryWatcher) receive() { + defer close(rw.doneChan) + defer close(rw.resultChan) + + klog.V(4).Info("Starting RetryWatcher.") + defer klog.V(4).Info("Stopping RetryWatcher.") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + select { + case <-rw.stopChan: + cancel() + return + case <-ctx.Done(): + return + } + }() + + // We use non sliding until so we don't introduce delays on happy path when WATCH call + // timeouts or gets closed and we need to reestablish it while also avoiding hot loops. + wait.NonSlidingUntilWithContext(ctx, func(ctx context.Context) { + done, retryAfter := rw.doReceive() + if done { + cancel() + return + } + + timer := time.NewTimer(retryAfter) + select { + case <-ctx.Done(): + timer.Stop() + return + case <-timer.C: + } + + klog.V(4).Infof("Restarting RetryWatcher at RV=%q", rw.lastResourceVersion) + }, rw.minRestartDelay) +} + +// ResultChan implements Interface. +func (rw *RetryWatcher) ResultChan() <-chan watch.Event { + return rw.resultChan +} + +// Stop implements Interface. +func (rw *RetryWatcher) Stop() { + close(rw.stopChan) +} + +// Done allows the caller to be notified when Retry watcher stops. +func (rw *RetryWatcher) Done() <-chan struct{} { + return rw.doneChan +} diff --git a/vendor/k8s.io/client-go/tools/watch/until.go b/vendor/k8s.io/client-go/tools/watch/until.go new file mode 100644 index 000000000..a2474556b --- /dev/null +++ b/vendor/k8s.io/client-go/tools/watch/until.go @@ -0,0 +1,168 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "context" + "errors" + "fmt" + "time" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" +) + +// PreconditionFunc returns true if the condition has been reached, false if it has not been reached yet, +// or an error if the condition failed or detected an error state. +type PreconditionFunc func(store cache.Store) (bool, error) + +// ConditionFunc returns true if the condition has been reached, false if it has not been reached yet, +// or an error if the condition cannot be checked and should terminate. In general, it is better to define +// level driven conditions over edge driven conditions (pod has ready=true, vs pod modified and ready changed +// from false to true). +type ConditionFunc func(event watch.Event) (bool, error) + +// ErrWatchClosed is returned when the watch channel is closed before timeout in UntilWithoutRetry. +var ErrWatchClosed = errors.New("watch closed before UntilWithoutRetry timeout") + +// UntilWithoutRetry reads items from the watch until each provided condition succeeds, and then returns the last watch +// encountered. The first condition that returns an error terminates the watch (and the event is also returned). +// If no event has been received, the returned event will be nil. +// Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition. +// Waits until context deadline or until context is canceled. +// +// Warning: Unless you have a very specific use case (probably a special Watcher) don't use this function!!! +// Warning: This will fail e.g. on API timeouts and/or 'too old resource version' error. +// Warning: You are most probably looking for a function *Until* or *UntilWithSync* below, +// Warning: solving such issues. +// TODO: Consider making this function private to prevent misuse when the other occurrences in our codebase are gone. +func UntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions ...ConditionFunc) (*watch.Event, error) { + ch := watcher.ResultChan() + defer watcher.Stop() + var lastEvent *watch.Event + for _, condition := range conditions { + // check the next condition against the previous event and short circuit waiting for the next watch + if lastEvent != nil { + done, err := condition(*lastEvent) + if err != nil { + return lastEvent, err + } + if done { + continue + } + } + ConditionSucceeded: + for { + select { + case event, ok := <-ch: + if !ok { + return lastEvent, ErrWatchClosed + } + lastEvent = &event + + done, err := condition(event) + if err != nil { + return lastEvent, err + } + if done { + break ConditionSucceeded + } + + case <-ctx.Done(): + return lastEvent, wait.ErrWaitTimeout + } + } + } + return lastEvent, nil +} + +// Until wraps the watcherClient's watch function with RetryWatcher making sure that watcher gets restarted in case of errors. +// The initialResourceVersion will be given to watch method when first called. It shall not be "" or "0" +// given the underlying WATCH call issues (#74022). +// Remaining behaviour is identical to function UntilWithoutRetry. (See above.) +// Until can deal with API timeouts and lost connections. +// It guarantees you to see all events and in the order they happened. +// Due to this guarantee there is no way it can deal with 'Resource version too old error'. It will fail in this case. +// (See `UntilWithSync` if you'd prefer to recover from all the errors including RV too old by re-listing +// those items. In normal code you should care about being level driven so you'd not care about not seeing all the edges.) +// +// The most frequent usage for Until would be a test where you want to verify exact order of events ("edges"). +func Until(ctx context.Context, initialResourceVersion string, watcherClient cache.Watcher, conditions ...ConditionFunc) (*watch.Event, error) { + w, err := NewRetryWatcher(initialResourceVersion, watcherClient) + if err != nil { + return nil, err + } + + return UntilWithoutRetry(ctx, w, conditions...) +} + +// UntilWithSync creates an informer from lw, optionally checks precondition when the store is synced, +// and watches the output until each provided condition succeeds, in a way that is identical +// to function UntilWithoutRetry. (See above.) +// UntilWithSync can deal with all errors like API timeout, lost connections and 'Resource version too old'. +// It is the only function that can recover from 'Resource version too old', Until and UntilWithoutRetry will +// just fail in that case. On the other hand it can't provide you with guarantees as strong as using simple +// Watch method with Until. It can skip some intermediate events in case of watch function failing but it will +// re-list to recover and you always get an event, if there has been a change, after recovery. +// Also with the current implementation based on DeltaFIFO, order of the events you receive is guaranteed only for +// particular object, not between more of them even it's the same resource. +// The most frequent usage would be a command that needs to watch the "state of the world" and should't fail, like: +// waiting for object reaching a state, "small" controllers, ... +func UntilWithSync(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object, precondition PreconditionFunc, conditions ...ConditionFunc) (*watch.Event, error) { + indexer, informer, watcher, done := NewIndexerInformerWatcher(lw, objType) + // We need to wait for the internal informers to fully stop so it's easier to reason about + // and it works with non-thread safe clients. + defer func() { <-done }() + // Proxy watcher can be stopped multiple times so it's fine to use defer here to cover alternative branches and + // let UntilWithoutRetry to stop it + defer watcher.Stop() + + if precondition != nil { + if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) { + return nil, fmt.Errorf("UntilWithSync: unable to sync caches: %w", ctx.Err()) + } + + done, err := precondition(indexer) + if err != nil { + return nil, err + } + + if done { + return nil, nil + } + } + + return UntilWithoutRetry(ctx, watcher, conditions...) +} + +// ContextWithOptionalTimeout wraps context.WithTimeout and handles infinite timeouts expressed as 0 duration. +func ContextWithOptionalTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) { + if timeout < 0 { + // This should be handled in validation + klog.Errorf("Timeout for context shall not be negative!") + timeout = 0 + } + + if timeout == 0 { + return context.WithCancel(parent) + } + + return context.WithTimeout(parent, timeout) +} diff --git a/vendor/k8s.io/client-go/util/certificate/OWNERS b/vendor/k8s.io/client-go/util/certificate/OWNERS new file mode 100644 index 000000000..3c3b94c58 --- /dev/null +++ b/vendor/k8s.io/client-go/util/certificate/OWNERS @@ -0,0 +1,8 @@ +# See the OWNERS docs at https://go.k8s.io/owners + +approvers: + - sig-auth-certificates-approvers +reviewers: + - sig-auth-certificates-reviewers +labels: + - sig/auth diff --git a/vendor/k8s.io/client-go/util/certificate/certificate_manager.go b/vendor/k8s.io/client-go/util/certificate/certificate_manager.go new file mode 100644 index 000000000..b4dcb0b84 --- /dev/null +++ b/vendor/k8s.io/client-go/util/certificate/certificate_manager.go @@ -0,0 +1,775 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package certificate + +import ( + "context" + "crypto/ecdsa" + "crypto/elliptic" + cryptorand "crypto/rand" + "crypto/rsa" + "crypto/tls" + "crypto/x509" + "encoding/pem" + "errors" + "fmt" + "reflect" + "sync" + "time" + + "k8s.io/klog/v2" + + certificates "k8s.io/api/certificates/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/util/cert" + "k8s.io/client-go/util/certificate/csr" + "k8s.io/client-go/util/keyutil" +) + +var ( + // certificateWaitTimeout controls the amount of time we wait for certificate + // approval in one iteration. + certificateWaitTimeout = 15 * time.Minute + + kubeletServingUsagesWithEncipherment = []certificates.KeyUsage{ + // https://tools.ietf.org/html/rfc5280#section-4.2.1.3 + // + // Digital signature allows the certificate to be used to verify + // digital signatures used during TLS negotiation. + certificates.UsageDigitalSignature, + // KeyEncipherment allows the cert/key pair to be used to encrypt + // keys, including the symmetric keys negotiated during TLS setup + // and used for data transfer. + certificates.UsageKeyEncipherment, + // ServerAuth allows the cert to be used by a TLS server to + // authenticate itself to a TLS client. + certificates.UsageServerAuth, + } + kubeletServingUsagesNoEncipherment = []certificates.KeyUsage{ + // https://tools.ietf.org/html/rfc5280#section-4.2.1.3 + // + // Digital signature allows the certificate to be used to verify + // digital signatures used during TLS negotiation. + certificates.UsageDigitalSignature, + // ServerAuth allows the cert to be used by a TLS server to + // authenticate itself to a TLS client. + certificates.UsageServerAuth, + } + DefaultKubeletServingGetUsages = func(privateKey interface{}) []certificates.KeyUsage { + switch privateKey.(type) { + case *rsa.PrivateKey: + return kubeletServingUsagesWithEncipherment + default: + return kubeletServingUsagesNoEncipherment + } + } + kubeletClientUsagesWithEncipherment = []certificates.KeyUsage{ + // https://tools.ietf.org/html/rfc5280#section-4.2.1.3 + // + // Digital signature allows the certificate to be used to verify + // digital signatures used during TLS negotiation. + certificates.UsageDigitalSignature, + // KeyEncipherment allows the cert/key pair to be used to encrypt + // keys, including the symmetric keys negotiated during TLS setup + // and used for data transfer. + certificates.UsageKeyEncipherment, + // ClientAuth allows the cert to be used by a TLS client to + // authenticate itself to the TLS server. + certificates.UsageClientAuth, + } + kubeletClientUsagesNoEncipherment = []certificates.KeyUsage{ + // https://tools.ietf.org/html/rfc5280#section-4.2.1.3 + // + // Digital signature allows the certificate to be used to verify + // digital signatures used during TLS negotiation. + certificates.UsageDigitalSignature, + // ClientAuth allows the cert to be used by a TLS client to + // authenticate itself to the TLS server. + certificates.UsageClientAuth, + } + DefaultKubeletClientGetUsages = func(privateKey interface{}) []certificates.KeyUsage { + switch privateKey.(type) { + case *rsa.PrivateKey: + return kubeletClientUsagesWithEncipherment + default: + return kubeletClientUsagesNoEncipherment + } + } +) + +// Manager maintains and updates the certificates in use by this certificate +// manager. In the background it communicates with the API server to get new +// certificates for certificates about to expire. +type Manager interface { + // Start the API server status sync loop. + Start() + // Stop the cert manager loop. + Stop() + // Current returns the currently selected certificate from the + // certificate manager, as well as the associated certificate and key data + // in PEM format. + Current() *tls.Certificate + // ServerHealthy returns true if the manager is able to communicate with + // the server. This allows a caller to determine whether the cert manager + // thinks it can potentially talk to the API server. The cert manager may + // be very conservative and only return true if recent communication has + // occurred with the server. + ServerHealthy() bool +} + +// Config is the set of configuration parameters available for a new Manager. +type Config struct { + // ClientsetFn will be used to create a clientset for + // creating/fetching new certificate requests generated when a key rotation occurs. + // The function will never be invoked in parallel. + // It is passed the current client certificate if one exists. + ClientsetFn ClientsetFunc + // Template is the CertificateRequest that will be used as a template for + // generating certificate signing requests for all new keys generated as + // part of rotation. It follows the same rules as the template parameter of + // crypto.x509.CreateCertificateRequest in the Go standard libraries. + Template *x509.CertificateRequest + // GetTemplate returns the CertificateRequest that will be used as a template for + // generating certificate signing requests for all new keys generated as + // part of rotation. It follows the same rules as the template parameter of + // crypto.x509.CreateCertificateRequest in the Go standard libraries. + // If no template is available, nil may be returned, and no certificate will be requested. + // If specified, takes precedence over Template. + GetTemplate func() *x509.CertificateRequest + // SignerName is the name of the certificate signer that should sign certificates + // generated by the manager. + SignerName string + // RequestedCertificateLifetime is the requested lifetime length for certificates generated by the manager. + // Optional. + // This will set the spec.expirationSeconds field on the CSR. Controlling the lifetime of + // the issued certificate is not guaranteed as the signer may choose to ignore the request. + RequestedCertificateLifetime *time.Duration + // Usages is the types of usages that certificates generated by the manager + // can be used for. It is mutually exclusive with GetUsages. + Usages []certificates.KeyUsage + // GetUsages is dynamic way to get the types of usages that certificates generated by the manager + // can be used for. If Usages is not nil, GetUsages has to be nil, vice versa. + // It is mutually exclusive with Usages. + GetUsages func(privateKey interface{}) []certificates.KeyUsage + // CertificateStore is a persistent store where the current cert/key is + // kept and future cert/key pairs will be persisted after they are + // generated. + CertificateStore Store + // BootstrapCertificatePEM is the certificate data that will be returned + // from the Manager if the CertificateStore doesn't have any cert/key pairs + // currently available and has not yet had a chance to get a new cert/key + // pair from the API. If the CertificateStore does have a cert/key pair, + // this will be ignored. If there is no cert/key pair available in the + // CertificateStore, as soon as Start is called, it will request a new + // cert/key pair from the CertificateSigningRequestClient. This is intended + // to allow the first boot of a component to be initialized using a + // generic, multi-use cert/key pair which will be quickly replaced with a + // unique cert/key pair. + BootstrapCertificatePEM []byte + // BootstrapKeyPEM is the key data that will be returned from the Manager + // if the CertificateStore doesn't have any cert/key pairs currently + // available. If the CertificateStore does have a cert/key pair, this will + // be ignored. If the bootstrap cert/key pair are used, they will be + // rotated at the first opportunity, possibly well in advance of expiring. + // This is intended to allow the first boot of a component to be + // initialized using a generic, multi-use cert/key pair which will be + // quickly replaced with a unique cert/key pair. + BootstrapKeyPEM []byte `datapolicy:"security-key"` + // CertificateRotation will record a metric showing the time in seconds + // that certificates lived before being rotated. This metric is a histogram + // because there is value in keeping a history of rotation cadences. It + // allows one to setup monitoring and alerting of unexpected rotation + // behavior and track trends in rotation frequency. + CertificateRotation Histogram + // CertifcateRenewFailure will record a metric that keeps track of + // certificate renewal failures. + CertificateRenewFailure Counter + // Name is an optional string that will be used when writing log output + // or returning errors from manager methods. If not set, SignerName will + // be used, if SignerName is not set, if Usages includes client auth the + // name will be "client auth", otherwise the value will be "server". + Name string + // Logf is an optional function that log output will be sent to from the + // certificate manager. If not set it will use klog.V(2) + Logf func(format string, args ...interface{}) +} + +// Store is responsible for getting and updating the current certificate. +// Depending on the concrete implementation, the backing store for this +// behavior may vary. +type Store interface { + // Current returns the currently selected certificate, as well as the + // associated certificate and key data in PEM format. If the Store doesn't + // have a cert/key pair currently, it should return a NoCertKeyError so + // that the Manager can recover by using bootstrap certificates to request + // a new cert/key pair. + Current() (*tls.Certificate, error) + // Update accepts the PEM data for the cert/key pair and makes the new + // cert/key pair the 'current' pair, that will be returned by future calls + // to Current(). + Update(cert, key []byte) (*tls.Certificate, error) +} + +// Gauge will record the remaining lifetime of the certificate each time it is +// updated. +type Gauge interface { + Set(float64) +} + +// Histogram will record the time a rotated certificate was used before being +// rotated. +type Histogram interface { + Observe(float64) +} + +// Counter will wrap a counter with labels +type Counter interface { + Inc() +} + +// NoCertKeyError indicates there is no cert/key currently available. +type NoCertKeyError string + +// ClientsetFunc returns a new clientset for discovering CSR API availability and requesting CSRs. +// It is passed the current certificate if one is available and valid. +type ClientsetFunc func(current *tls.Certificate) (clientset.Interface, error) + +func (e *NoCertKeyError) Error() string { return string(*e) } + +type manager struct { + getTemplate func() *x509.CertificateRequest + + // lastRequestLock guards lastRequestCancel and lastRequest + lastRequestLock sync.Mutex + lastRequestCancel context.CancelFunc + lastRequest *x509.CertificateRequest + + dynamicTemplate bool + signerName string + requestedCertificateLifetime *time.Duration + getUsages func(privateKey interface{}) []certificates.KeyUsage + forceRotation bool + + certStore Store + + certificateRotation Histogram + certificateRenewFailure Counter + + // the following variables must only be accessed under certAccessLock + certAccessLock sync.RWMutex + cert *tls.Certificate + serverHealth bool + + // the clientFn must only be accessed under the clientAccessLock + clientAccessLock sync.Mutex + clientsetFn ClientsetFunc + stopCh chan struct{} + stopped bool + + // Set to time.Now but can be stubbed out for testing + now func() time.Time + + name string + logf func(format string, args ...interface{}) +} + +// NewManager returns a new certificate manager. A certificate manager is +// responsible for being the authoritative source of certificates in the +// Kubelet and handling updates due to rotation. +func NewManager(config *Config) (Manager, error) { + cert, forceRotation, err := getCurrentCertificateOrBootstrap( + config.CertificateStore, + config.BootstrapCertificatePEM, + config.BootstrapKeyPEM) + if err != nil { + return nil, err + } + + getTemplate := config.GetTemplate + if getTemplate == nil { + getTemplate = func() *x509.CertificateRequest { return config.Template } + } + + if config.GetUsages != nil && config.Usages != nil { + return nil, errors.New("cannot specify both GetUsages and Usages") + } + if config.GetUsages == nil && config.Usages == nil { + return nil, errors.New("either GetUsages or Usages should be specified") + } + var getUsages func(interface{}) []certificates.KeyUsage + if config.GetUsages != nil { + getUsages = config.GetUsages + } else { + getUsages = func(interface{}) []certificates.KeyUsage { return config.Usages } + } + m := manager{ + stopCh: make(chan struct{}), + clientsetFn: config.ClientsetFn, + getTemplate: getTemplate, + dynamicTemplate: config.GetTemplate != nil, + signerName: config.SignerName, + requestedCertificateLifetime: config.RequestedCertificateLifetime, + getUsages: getUsages, + certStore: config.CertificateStore, + cert: cert, + forceRotation: forceRotation, + certificateRotation: config.CertificateRotation, + certificateRenewFailure: config.CertificateRenewFailure, + now: time.Now, + } + + name := config.Name + if len(name) == 0 { + name = m.signerName + } + if len(name) == 0 { + usages := getUsages(nil) + switch { + case hasKeyUsage(usages, certificates.UsageClientAuth): + name = string(certificates.UsageClientAuth) + default: + name = "certificate" + } + } + + m.name = name + m.logf = config.Logf + if m.logf == nil { + m.logf = func(format string, args ...interface{}) { klog.V(2).Infof(format, args...) } + } + + return &m, nil +} + +// Current returns the currently selected certificate from the certificate +// manager. This can be nil if the manager was initialized without a +// certificate and has not yet received one from the +// CertificateSigningRequestClient, or if the current cert has expired. +func (m *manager) Current() *tls.Certificate { + m.certAccessLock.RLock() + defer m.certAccessLock.RUnlock() + if m.cert != nil && m.cert.Leaf != nil && m.now().After(m.cert.Leaf.NotAfter) { + m.logf("%s: Current certificate is expired", m.name) + return nil + } + return m.cert +} + +// ServerHealthy returns true if the cert manager believes the server +// is currently alive. +func (m *manager) ServerHealthy() bool { + m.certAccessLock.RLock() + defer m.certAccessLock.RUnlock() + return m.serverHealth +} + +// Stop terminates the manager. +func (m *manager) Stop() { + m.clientAccessLock.Lock() + defer m.clientAccessLock.Unlock() + if m.stopped { + return + } + close(m.stopCh) + m.stopped = true +} + +// Start will start the background work of rotating the certificates. +func (m *manager) Start() { + // Certificate rotation depends on access to the API server certificate + // signing API, so don't start the certificate manager if we don't have a + // client. + if m.clientsetFn == nil { + m.logf("%s: Certificate rotation is not enabled, no connection to the apiserver", m.name) + return + } + m.logf("%s: Certificate rotation is enabled", m.name) + + templateChanged := make(chan struct{}) + go wait.Until(func() { + deadline := m.nextRotationDeadline() + if sleepInterval := deadline.Sub(m.now()); sleepInterval > 0 { + m.logf("%s: Waiting %v for next certificate rotation", m.name, sleepInterval) + + timer := time.NewTimer(sleepInterval) + defer timer.Stop() + + select { + case <-timer.C: + // unblock when deadline expires + case <-templateChanged: + _, lastRequestTemplate := m.getLastRequest() + if reflect.DeepEqual(lastRequestTemplate, m.getTemplate()) { + // if the template now matches what we last requested, restart the rotation deadline loop + return + } + m.logf("%s: Certificate template changed, rotating", m.name) + } + } + + // Don't enter rotateCerts and trigger backoff if we don't even have a template to request yet + if m.getTemplate() == nil { + return + } + + backoff := wait.Backoff{ + Duration: 2 * time.Second, + Factor: 2, + Jitter: 0.1, + Steps: 5, + } + if err := wait.ExponentialBackoff(backoff, m.rotateCerts); err != nil { + utilruntime.HandleError(fmt.Errorf("%s: Reached backoff limit, still unable to rotate certs: %v", m.name, err)) + wait.PollInfinite(32*time.Second, m.rotateCerts) + } + }, time.Second, m.stopCh) + + if m.dynamicTemplate { + go wait.Until(func() { + // check if the current template matches what we last requested + lastRequestCancel, lastRequestTemplate := m.getLastRequest() + + if !m.certSatisfiesTemplate() && !reflect.DeepEqual(lastRequestTemplate, m.getTemplate()) { + // if the template is different, queue up an interrupt of the rotation deadline loop. + // if we've requested a CSR that matches the new template by the time the interrupt is handled, the interrupt is disregarded. + if lastRequestCancel != nil { + // if we're currently waiting on a submitted request that no longer matches what we want, stop waiting + lastRequestCancel() + } + select { + case templateChanged <- struct{}{}: + case <-m.stopCh: + } + } + }, time.Second, m.stopCh) + } +} + +func getCurrentCertificateOrBootstrap( + store Store, + bootstrapCertificatePEM []byte, + bootstrapKeyPEM []byte) (cert *tls.Certificate, shouldRotate bool, errResult error) { + + currentCert, err := store.Current() + if err == nil { + // if the current cert is expired, fall back to the bootstrap cert + if currentCert.Leaf != nil && time.Now().Before(currentCert.Leaf.NotAfter) { + return currentCert, false, nil + } + } else { + if _, ok := err.(*NoCertKeyError); !ok { + return nil, false, err + } + } + + if bootstrapCertificatePEM == nil || bootstrapKeyPEM == nil { + return nil, true, nil + } + + bootstrapCert, err := tls.X509KeyPair(bootstrapCertificatePEM, bootstrapKeyPEM) + if err != nil { + return nil, false, err + } + if len(bootstrapCert.Certificate) < 1 { + return nil, false, fmt.Errorf("no cert/key data found") + } + + certs, err := x509.ParseCertificates(bootstrapCert.Certificate[0]) + if err != nil { + return nil, false, fmt.Errorf("unable to parse certificate data: %v", err) + } + if len(certs) < 1 { + return nil, false, fmt.Errorf("no cert data found") + } + bootstrapCert.Leaf = certs[0] + + if _, err := store.Update(bootstrapCertificatePEM, bootstrapKeyPEM); err != nil { + utilruntime.HandleError(fmt.Errorf("unable to set the cert/key pair to the bootstrap certificate: %v", err)) + } + + return &bootstrapCert, true, nil +} + +func (m *manager) getClientset() (clientset.Interface, error) { + current := m.Current() + m.clientAccessLock.Lock() + defer m.clientAccessLock.Unlock() + return m.clientsetFn(current) +} + +// RotateCerts is exposed for testing only and is not a part of the public interface. +// Returns true if it changed the cert, false otherwise. Error is only returned in +// exceptional cases. +func (m *manager) RotateCerts() (bool, error) { + return m.rotateCerts() +} + +// rotateCerts attempts to request a client cert from the server, wait a reasonable +// period of time for it to be signed, and then update the cert on disk. If it cannot +// retrieve a cert, it will return false. It will only return error in exceptional cases. +// This method also keeps track of "server health" by interpreting the responses it gets +// from the server on the various calls it makes. +// TODO: return errors, have callers handle and log them correctly +func (m *manager) rotateCerts() (bool, error) { + m.logf("%s: Rotating certificates", m.name) + + template, csrPEM, keyPEM, privateKey, err := m.generateCSR() + if err != nil { + utilruntime.HandleError(fmt.Errorf("%s: Unable to generate a certificate signing request: %v", m.name, err)) + if m.certificateRenewFailure != nil { + m.certificateRenewFailure.Inc() + } + return false, nil + } + + // request the client each time + clientSet, err := m.getClientset() + if err != nil { + utilruntime.HandleError(fmt.Errorf("%s: Unable to load a client to request certificates: %v", m.name, err)) + if m.certificateRenewFailure != nil { + m.certificateRenewFailure.Inc() + } + return false, nil + } + + getUsages := m.getUsages + if m.getUsages == nil { + getUsages = DefaultKubeletClientGetUsages + } + usages := getUsages(privateKey) + // Call the Certificate Signing Request API to get a certificate for the + // new private key + reqName, reqUID, err := csr.RequestCertificate(clientSet, csrPEM, "", m.signerName, m.requestedCertificateLifetime, usages, privateKey) + if err != nil { + utilruntime.HandleError(fmt.Errorf("%s: Failed while requesting a signed certificate from the control plane: %v", m.name, err)) + if m.certificateRenewFailure != nil { + m.certificateRenewFailure.Inc() + } + return false, m.updateServerError(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), certificateWaitTimeout) + defer cancel() + + // Once we've successfully submitted a CSR for this template, record that we did so + m.setLastRequest(cancel, template) + + // Wait for the certificate to be signed. This interface and internal timout + // is a remainder after the old design using raw watch wrapped with backoff. + crtPEM, err := csr.WaitForCertificate(ctx, clientSet, reqName, reqUID) + if err != nil { + utilruntime.HandleError(fmt.Errorf("%s: certificate request was not signed: %v", m.name, err)) + if m.certificateRenewFailure != nil { + m.certificateRenewFailure.Inc() + } + return false, nil + } + + cert, err := m.certStore.Update(crtPEM, keyPEM) + if err != nil { + utilruntime.HandleError(fmt.Errorf("%s: Unable to store the new cert/key pair: %v", m.name, err)) + if m.certificateRenewFailure != nil { + m.certificateRenewFailure.Inc() + } + return false, nil + } + + if old := m.updateCached(cert); old != nil && m.certificateRotation != nil { + m.certificateRotation.Observe(m.now().Sub(old.Leaf.NotBefore).Seconds()) + } + + return true, nil +} + +// Check that the current certificate on disk satisfies the requests from the +// current template. +// +// Note that extra items in the certificate's SAN or orgs that don't exist in +// the template will not trigger a renewal. +// +// Requires certAccessLock to be locked. +func (m *manager) certSatisfiesTemplateLocked() bool { + if m.cert == nil { + return false + } + + if template := m.getTemplate(); template != nil { + if template.Subject.CommonName != m.cert.Leaf.Subject.CommonName { + m.logf("%s: Current certificate CN (%s) does not match requested CN (%s)", m.name, m.cert.Leaf.Subject.CommonName, template.Subject.CommonName) + return false + } + + currentDNSNames := sets.NewString(m.cert.Leaf.DNSNames...) + desiredDNSNames := sets.NewString(template.DNSNames...) + missingDNSNames := desiredDNSNames.Difference(currentDNSNames) + if len(missingDNSNames) > 0 { + m.logf("%s: Current certificate is missing requested DNS names %v", m.name, missingDNSNames.List()) + return false + } + + currentIPs := sets.NewString() + for _, ip := range m.cert.Leaf.IPAddresses { + currentIPs.Insert(ip.String()) + } + desiredIPs := sets.NewString() + for _, ip := range template.IPAddresses { + desiredIPs.Insert(ip.String()) + } + missingIPs := desiredIPs.Difference(currentIPs) + if len(missingIPs) > 0 { + m.logf("%s: Current certificate is missing requested IP addresses %v", m.name, missingIPs.List()) + return false + } + + currentOrgs := sets.NewString(m.cert.Leaf.Subject.Organization...) + desiredOrgs := sets.NewString(template.Subject.Organization...) + missingOrgs := desiredOrgs.Difference(currentOrgs) + if len(missingOrgs) > 0 { + m.logf("%s: Current certificate is missing requested orgs %v", m.name, missingOrgs.List()) + return false + } + } + + return true +} + +func (m *manager) certSatisfiesTemplate() bool { + m.certAccessLock.RLock() + defer m.certAccessLock.RUnlock() + return m.certSatisfiesTemplateLocked() +} + +// nextRotationDeadline returns a value for the threshold at which the +// current certificate should be rotated, 80%+/-10% of the expiration of the +// certificate. +func (m *manager) nextRotationDeadline() time.Time { + // forceRotation is not protected by locks + if m.forceRotation { + m.forceRotation = false + return m.now() + } + + m.certAccessLock.RLock() + defer m.certAccessLock.RUnlock() + + if !m.certSatisfiesTemplateLocked() { + return m.now() + } + + notAfter := m.cert.Leaf.NotAfter + totalDuration := float64(notAfter.Sub(m.cert.Leaf.NotBefore)) + deadline := m.cert.Leaf.NotBefore.Add(jitteryDuration(totalDuration)) + + m.logf("%s: Certificate expiration is %v, rotation deadline is %v", m.name, notAfter, deadline) + return deadline +} + +// jitteryDuration uses some jitter to set the rotation threshold so each node +// will rotate at approximately 70-90% of the total lifetime of the +// certificate. With jitter, if a number of nodes are added to a cluster at +// approximately the same time (such as cluster creation time), they won't all +// try to rotate certificates at the same time for the rest of the life of the +// cluster. +// +// This function is represented as a variable to allow replacement during testing. +var jitteryDuration = func(totalDuration float64) time.Duration { + return wait.Jitter(time.Duration(totalDuration), 0.2) - time.Duration(totalDuration*0.3) +} + +// updateCached sets the most recent retrieved cert and returns the old cert. +// It also sets the server as assumed healthy. +func (m *manager) updateCached(cert *tls.Certificate) *tls.Certificate { + m.certAccessLock.Lock() + defer m.certAccessLock.Unlock() + m.serverHealth = true + old := m.cert + m.cert = cert + return old +} + +// updateServerError takes an error returned by the server and infers +// the health of the server based on the error. It will return nil if +// the error does not require immediate termination of any wait loops, +// and otherwise it will return the error. +func (m *manager) updateServerError(err error) error { + m.certAccessLock.Lock() + defer m.certAccessLock.Unlock() + switch { + case apierrors.IsUnauthorized(err): + // SSL terminating proxies may report this error instead of the master + m.serverHealth = true + case apierrors.IsUnexpectedServerError(err): + // generally indicates a proxy or other load balancer problem, rather than a problem coming + // from the master + m.serverHealth = false + default: + // Identify known errors that could be expected for a cert request that + // indicate everything is working normally + m.serverHealth = apierrors.IsNotFound(err) || apierrors.IsForbidden(err) + } + return nil +} + +func (m *manager) generateCSR() (template *x509.CertificateRequest, csrPEM []byte, keyPEM []byte, key interface{}, err error) { + // Generate a new private key. + privateKey, err := ecdsa.GenerateKey(elliptic.P256(), cryptorand.Reader) + if err != nil { + return nil, nil, nil, nil, fmt.Errorf("%s: unable to generate a new private key: %v", m.name, err) + } + der, err := x509.MarshalECPrivateKey(privateKey) + if err != nil { + return nil, nil, nil, nil, fmt.Errorf("%s: unable to marshal the new key to DER: %v", m.name, err) + } + + keyPEM = pem.EncodeToMemory(&pem.Block{Type: keyutil.ECPrivateKeyBlockType, Bytes: der}) + + template = m.getTemplate() + if template == nil { + return nil, nil, nil, nil, fmt.Errorf("%s: unable to create a csr, no template available", m.name) + } + csrPEM, err = cert.MakeCSRFromTemplate(privateKey, template) + if err != nil { + return nil, nil, nil, nil, fmt.Errorf("%s: unable to create a csr from the private key: %v", m.name, err) + } + return template, csrPEM, keyPEM, privateKey, nil +} + +func (m *manager) getLastRequest() (context.CancelFunc, *x509.CertificateRequest) { + m.lastRequestLock.Lock() + defer m.lastRequestLock.Unlock() + return m.lastRequestCancel, m.lastRequest +} + +func (m *manager) setLastRequest(cancel context.CancelFunc, r *x509.CertificateRequest) { + m.lastRequestLock.Lock() + defer m.lastRequestLock.Unlock() + m.lastRequestCancel = cancel + m.lastRequest = r +} + +func hasKeyUsage(usages []certificates.KeyUsage, usage certificates.KeyUsage) bool { + for _, u := range usages { + if u == usage { + return true + } + } + return false +} diff --git a/vendor/k8s.io/client-go/util/certificate/certificate_store.go b/vendor/k8s.io/client-go/util/certificate/certificate_store.go new file mode 100644 index 000000000..e7ed58ee8 --- /dev/null +++ b/vendor/k8s.io/client-go/util/certificate/certificate_store.go @@ -0,0 +1,318 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package certificate + +import ( + "crypto/tls" + "crypto/x509" + "encoding/pem" + "fmt" + "os" + "path/filepath" + "time" + + certutil "k8s.io/client-go/util/cert" + "k8s.io/klog/v2" +) + +const ( + keyExtension = ".key" + certExtension = ".crt" + pemExtension = ".pem" + currentPair = "current" + updatedPair = "updated" +) + +type fileStore struct { + pairNamePrefix string + certDirectory string + keyDirectory string + certFile string + keyFile string +} + +// FileStore is a store that provides certificate retrieval as well as +// the path on disk of the current PEM. +type FileStore interface { + Store + // CurrentPath returns the path on disk of the current certificate/key + // pair encoded as PEM files. + CurrentPath() string +} + +// NewFileStore returns a concrete implementation of a Store that is based on +// storing the cert/key pairs in a single file per pair on disk in the +// designated directory. When starting up it will look for the currently +// selected cert/key pair in: +// +// 1. ${certDirectory}/${pairNamePrefix}-current.pem - both cert and key are in the same file. +// 2. ${certFile}, ${keyFile} +// 3. ${certDirectory}/${pairNamePrefix}.crt, ${keyDirectory}/${pairNamePrefix}.key +// +// The first one found will be used. If rotation is enabled, future cert/key +// updates will be written to the ${certDirectory} directory and +// ${certDirectory}/${pairNamePrefix}-current.pem will be created as a soft +// link to the currently selected cert/key pair. +func NewFileStore( + pairNamePrefix string, + certDirectory string, + keyDirectory string, + certFile string, + keyFile string) (FileStore, error) { + + s := fileStore{ + pairNamePrefix: pairNamePrefix, + certDirectory: certDirectory, + keyDirectory: keyDirectory, + certFile: certFile, + keyFile: keyFile, + } + if err := s.recover(); err != nil { + return nil, err + } + return &s, nil +} + +// CurrentPath returns the path to the current version of these certificates. +func (s *fileStore) CurrentPath() string { + return filepath.Join(s.certDirectory, s.filename(currentPair)) +} + +// recover checks if there is a certificate rotation that was interrupted while +// progress, and if so, attempts to recover to a good state. +func (s *fileStore) recover() error { + // If the 'current' file doesn't exist, continue on with the recovery process. + currentPath := filepath.Join(s.certDirectory, s.filename(currentPair)) + if exists, err := fileExists(currentPath); err != nil { + return err + } else if exists { + return nil + } + + // If the 'updated' file exists, and it is a symbolic link, continue on + // with the recovery process. + updatedPath := filepath.Join(s.certDirectory, s.filename(updatedPair)) + if fi, err := os.Lstat(updatedPath); err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } else if fi.Mode()&os.ModeSymlink != os.ModeSymlink { + return fmt.Errorf("expected %q to be a symlink but it is a file", updatedPath) + } + + // Move the 'updated' symlink to 'current'. + if err := os.Rename(updatedPath, currentPath); err != nil { + return fmt.Errorf("unable to rename %q to %q: %v", updatedPath, currentPath, err) + } + return nil +} + +func (s *fileStore) Current() (*tls.Certificate, error) { + pairFile := filepath.Join(s.certDirectory, s.filename(currentPair)) + if pairFileExists, err := fileExists(pairFile); err != nil { + return nil, err + } else if pairFileExists { + klog.Infof("Loading cert/key pair from %q.", pairFile) + return loadFile(pairFile) + } + + certFileExists, err := fileExists(s.certFile) + if err != nil { + return nil, err + } + keyFileExists, err := fileExists(s.keyFile) + if err != nil { + return nil, err + } + if certFileExists && keyFileExists { + klog.Infof("Loading cert/key pair from (%q, %q).", s.certFile, s.keyFile) + return loadX509KeyPair(s.certFile, s.keyFile) + } + + c := filepath.Join(s.certDirectory, s.pairNamePrefix+certExtension) + k := filepath.Join(s.keyDirectory, s.pairNamePrefix+keyExtension) + certFileExists, err = fileExists(c) + if err != nil { + return nil, err + } + keyFileExists, err = fileExists(k) + if err != nil { + return nil, err + } + if certFileExists && keyFileExists { + klog.Infof("Loading cert/key pair from (%q, %q).", c, k) + return loadX509KeyPair(c, k) + } + + noKeyErr := NoCertKeyError( + fmt.Sprintf("no cert/key files read at %q, (%q, %q) or (%q, %q)", + pairFile, + s.certFile, + s.keyFile, + s.certDirectory, + s.keyDirectory)) + return nil, &noKeyErr +} + +func loadFile(pairFile string) (*tls.Certificate, error) { + // LoadX509KeyPair knows how to parse combined cert and private key from + // the same file. + cert, err := tls.LoadX509KeyPair(pairFile, pairFile) + if err != nil { + return nil, fmt.Errorf("could not convert data from %q into cert/key pair: %v", pairFile, err) + } + certs, err := x509.ParseCertificates(cert.Certificate[0]) + if err != nil { + return nil, fmt.Errorf("unable to parse certificate data: %v", err) + } + cert.Leaf = certs[0] + return &cert, nil +} + +func (s *fileStore) Update(certData, keyData []byte) (*tls.Certificate, error) { + ts := time.Now().Format("2006-01-02-15-04-05") + pemFilename := s.filename(ts) + + if err := os.MkdirAll(s.certDirectory, 0755); err != nil { + return nil, fmt.Errorf("could not create directory %q to store certificates: %v", s.certDirectory, err) + } + certPath := filepath.Join(s.certDirectory, pemFilename) + + f, err := os.OpenFile(certPath, os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0600) + if err != nil { + return nil, fmt.Errorf("could not open %q: %v", certPath, err) + } + defer f.Close() + + // First cert is leaf, remainder are intermediates + certs, err := certutil.ParseCertsPEM(certData) + if err != nil { + return nil, fmt.Errorf("invalid certificate data: %v", err) + } + for _, c := range certs { + pem.Encode(f, &pem.Block{Type: "CERTIFICATE", Bytes: c.Raw}) + } + + keyBlock, _ := pem.Decode(keyData) + if keyBlock == nil { + return nil, fmt.Errorf("invalid key data") + } + pem.Encode(f, keyBlock) + + cert, err := loadFile(certPath) + if err != nil { + return nil, err + } + + if err := s.updateSymlink(certPath); err != nil { + return nil, err + } + return cert, nil +} + +// updateSymLink updates the current symlink to point to the file that is +// passed it. It will fail if there is a non-symlink file exists where the +// symlink is expected to be. +func (s *fileStore) updateSymlink(filename string) error { + // If the 'current' file either doesn't exist, or is already a symlink, + // proceed. Otherwise, this is an unrecoverable error. + currentPath := filepath.Join(s.certDirectory, s.filename(currentPair)) + currentPathExists := false + if fi, err := os.Lstat(currentPath); err != nil { + if !os.IsNotExist(err) { + return err + } + } else if fi.Mode()&os.ModeSymlink != os.ModeSymlink { + return fmt.Errorf("expected %q to be a symlink but it is a file", currentPath) + } else { + currentPathExists = true + } + + // If the 'updated' file doesn't exist, proceed. If it exists but it is a + // symlink, delete it. Otherwise, this is an unrecoverable error. + updatedPath := filepath.Join(s.certDirectory, s.filename(updatedPair)) + if fi, err := os.Lstat(updatedPath); err != nil { + if !os.IsNotExist(err) { + return err + } + } else if fi.Mode()&os.ModeSymlink != os.ModeSymlink { + return fmt.Errorf("expected %q to be a symlink but it is a file", updatedPath) + } else { + if err := os.Remove(updatedPath); err != nil { + return fmt.Errorf("unable to remove %q: %v", updatedPath, err) + } + } + + // Check that the new cert/key pair file exists to avoid rotating to an + // invalid cert/key. + if filenameExists, err := fileExists(filename); err != nil { + return err + } else if !filenameExists { + return fmt.Errorf("file %q does not exist so it can not be used as the currently selected cert/key", filename) + } + + // Ensure the source path is absolute to ensure the symlink target is + // correct when certDirectory is a relative path. + filename, err := filepath.Abs(filename) + if err != nil { + return err + } + + // Create the 'updated' symlink pointing to the requested file name. + if err := os.Symlink(filename, updatedPath); err != nil { + return fmt.Errorf("unable to create a symlink from %q to %q: %v", updatedPath, filename, err) + } + + // Replace the 'current' symlink. + if currentPathExists { + if err := os.Remove(currentPath); err != nil { + return fmt.Errorf("unable to remove %q: %v", currentPath, err) + } + } + if err := os.Rename(updatedPath, currentPath); err != nil { + return fmt.Errorf("unable to rename %q to %q: %v", updatedPath, currentPath, err) + } + return nil +} + +func (s *fileStore) filename(qualifier string) string { + return s.pairNamePrefix + "-" + qualifier + pemExtension +} + +func loadX509KeyPair(certFile, keyFile string) (*tls.Certificate, error) { + cert, err := tls.LoadX509KeyPair(certFile, keyFile) + if err != nil { + return nil, err + } + certs, err := x509.ParseCertificates(cert.Certificate[0]) + if err != nil { + return nil, fmt.Errorf("unable to parse certificate data: %v", err) + } + cert.Leaf = certs[0] + return &cert, nil +} + +// FileExists checks if specified file exists. +func fileExists(filename string) (bool, error) { + if _, err := os.Stat(filename); os.IsNotExist(err) { + return false, nil + } else if err != nil { + return false, err + } + return true, nil +} diff --git a/vendor/k8s.io/client-go/util/certificate/csr/csr.go b/vendor/k8s.io/client-go/util/certificate/csr/csr.go new file mode 100644 index 000000000..0390d1c02 --- /dev/null +++ b/vendor/k8s.io/client-go/util/certificate/csr/csr.go @@ -0,0 +1,364 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package csr + +import ( + "context" + "crypto" + "crypto/x509" + "encoding/pem" + "fmt" + "reflect" + "time" + + certificatesv1 "k8s.io/api/certificates/v1" + certificatesv1beta1 "k8s.io/api/certificates/v1beta1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + watchtools "k8s.io/client-go/tools/watch" + certutil "k8s.io/client-go/util/cert" + "k8s.io/klog/v2" + "k8s.io/utils/pointer" +) + +// RequestCertificate will either use an existing (if this process has run +// before but not to completion) or create a certificate signing request using the +// PEM encoded CSR and send it to API server. An optional requestedDuration may be passed +// to set the spec.expirationSeconds field on the CSR to control the lifetime of the issued +// certificate. This is not guaranteed as the signer may choose to ignore the request. +func RequestCertificate(client clientset.Interface, csrData []byte, name, signerName string, requestedDuration *time.Duration, usages []certificatesv1.KeyUsage, privateKey interface{}) (reqName string, reqUID types.UID, err error) { + csr := &certificatesv1.CertificateSigningRequest{ + // Username, UID, Groups will be injected by API server. + TypeMeta: metav1.TypeMeta{Kind: "CertificateSigningRequest"}, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: certificatesv1.CertificateSigningRequestSpec{ + Request: csrData, + Usages: usages, + SignerName: signerName, + }, + } + if len(csr.Name) == 0 { + csr.GenerateName = "csr-" + } + if requestedDuration != nil { + csr.Spec.ExpirationSeconds = DurationToExpirationSeconds(*requestedDuration) + } + + reqName, reqUID, err = create(client, csr) + switch { + case err == nil: + return reqName, reqUID, err + + case apierrors.IsAlreadyExists(err) && len(name) > 0: + klog.Infof("csr for this node already exists, reusing") + req, err := get(client, name) + if err != nil { + return "", "", formatError("cannot retrieve certificate signing request: %v", err) + } + if err := ensureCompatible(req, csr, privateKey); err != nil { + return "", "", fmt.Errorf("retrieved csr is not compatible: %v", err) + } + klog.Infof("csr for this node is still valid") + return req.Name, req.UID, nil + + default: + return "", "", formatError("cannot create certificate signing request: %v", err) + } +} + +func DurationToExpirationSeconds(duration time.Duration) *int32 { + return pointer.Int32(int32(duration / time.Second)) +} + +func ExpirationSecondsToDuration(expirationSeconds int32) time.Duration { + return time.Duration(expirationSeconds) * time.Second +} + +func get(client clientset.Interface, name string) (*certificatesv1.CertificateSigningRequest, error) { + v1req, v1err := client.CertificatesV1().CertificateSigningRequests().Get(context.TODO(), name, metav1.GetOptions{}) + if v1err == nil || !apierrors.IsNotFound(v1err) { + return v1req, v1err + } + + v1beta1req, v1beta1err := client.CertificatesV1beta1().CertificateSigningRequests().Get(context.TODO(), name, metav1.GetOptions{}) + if v1beta1err != nil { + return nil, v1beta1err + } + + v1req = &certificatesv1.CertificateSigningRequest{ + ObjectMeta: v1beta1req.ObjectMeta, + Spec: certificatesv1.CertificateSigningRequestSpec{ + Request: v1beta1req.Spec.Request, + }, + } + if v1beta1req.Spec.SignerName != nil { + v1req.Spec.SignerName = *v1beta1req.Spec.SignerName + } + for _, usage := range v1beta1req.Spec.Usages { + v1req.Spec.Usages = append(v1req.Spec.Usages, certificatesv1.KeyUsage(usage)) + } + return v1req, nil +} + +func create(client clientset.Interface, csr *certificatesv1.CertificateSigningRequest) (reqName string, reqUID types.UID, err error) { + // only attempt a create via v1 if we specified signerName and usages and are not using the legacy unknown signerName + if len(csr.Spec.Usages) > 0 && len(csr.Spec.SignerName) > 0 && csr.Spec.SignerName != "kubernetes.io/legacy-unknown" { + v1req, v1err := client.CertificatesV1().CertificateSigningRequests().Create(context.TODO(), csr, metav1.CreateOptions{}) + switch { + case v1err != nil && apierrors.IsNotFound(v1err): + // v1 CSR API was not found, continue to try v1beta1 + + case v1err != nil: + // other creation error + return "", "", v1err + + default: + // success + return v1req.Name, v1req.UID, v1err + } + } + + // convert relevant bits to v1beta1 + v1beta1csr := &certificatesv1beta1.CertificateSigningRequest{ + ObjectMeta: csr.ObjectMeta, + Spec: certificatesv1beta1.CertificateSigningRequestSpec{ + SignerName: &csr.Spec.SignerName, + Request: csr.Spec.Request, + }, + } + for _, usage := range csr.Spec.Usages { + v1beta1csr.Spec.Usages = append(v1beta1csr.Spec.Usages, certificatesv1beta1.KeyUsage(usage)) + } + + // create v1beta1 + v1beta1req, v1beta1err := client.CertificatesV1beta1().CertificateSigningRequests().Create(context.TODO(), v1beta1csr, metav1.CreateOptions{}) + if v1beta1err != nil { + return "", "", v1beta1err + } + return v1beta1req.Name, v1beta1req.UID, nil +} + +// WaitForCertificate waits for a certificate to be issued until timeout, or returns an error. +func WaitForCertificate(ctx context.Context, client clientset.Interface, reqName string, reqUID types.UID) (certData []byte, err error) { + fieldSelector := fields.OneTermEqualSelector("metadata.name", reqName).String() + + var lw *cache.ListWatch + var obj runtime.Object + for { + // see if the v1 API is available + if _, err := client.CertificatesV1().CertificateSigningRequests().List(ctx, metav1.ListOptions{FieldSelector: fieldSelector}); err == nil { + // watch v1 objects + obj = &certificatesv1.CertificateSigningRequest{} + lw = &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + options.FieldSelector = fieldSelector + return client.CertificatesV1().CertificateSigningRequests().List(ctx, options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + options.FieldSelector = fieldSelector + return client.CertificatesV1().CertificateSigningRequests().Watch(ctx, options) + }, + } + break + } else { + klog.V(2).Infof("error fetching v1 certificate signing request: %v", err) + } + + // return if we've timed out + if err := ctx.Err(); err != nil { + return nil, wait.ErrWaitTimeout + } + + // see if the v1beta1 API is available + if _, err := client.CertificatesV1beta1().CertificateSigningRequests().List(ctx, metav1.ListOptions{FieldSelector: fieldSelector}); err == nil { + // watch v1beta1 objects + obj = &certificatesv1beta1.CertificateSigningRequest{} + lw = &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + options.FieldSelector = fieldSelector + return client.CertificatesV1beta1().CertificateSigningRequests().List(ctx, options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + options.FieldSelector = fieldSelector + return client.CertificatesV1beta1().CertificateSigningRequests().Watch(ctx, options) + }, + } + break + } else { + klog.V(2).Infof("error fetching v1beta1 certificate signing request: %v", err) + } + + // return if we've timed out + if err := ctx.Err(); err != nil { + return nil, wait.ErrWaitTimeout + } + + // wait and try again + time.Sleep(time.Second) + } + + var issuedCertificate []byte + _, err = watchtools.UntilWithSync( + ctx, + lw, + obj, + nil, + func(event watch.Event) (bool, error) { + switch event.Type { + case watch.Modified, watch.Added: + case watch.Deleted: + return false, fmt.Errorf("csr %q was deleted", reqName) + default: + return false, nil + } + + switch csr := event.Object.(type) { + case *certificatesv1.CertificateSigningRequest: + if csr.UID != reqUID { + return false, fmt.Errorf("csr %q changed UIDs", csr.Name) + } + approved := false + for _, c := range csr.Status.Conditions { + if c.Type == certificatesv1.CertificateDenied { + return false, fmt.Errorf("certificate signing request is denied, reason: %v, message: %v", c.Reason, c.Message) + } + if c.Type == certificatesv1.CertificateFailed { + return false, fmt.Errorf("certificate signing request failed, reason: %v, message: %v", c.Reason, c.Message) + } + if c.Type == certificatesv1.CertificateApproved { + approved = true + } + } + if approved { + if len(csr.Status.Certificate) > 0 { + klog.V(2).Infof("certificate signing request %s is issued", csr.Name) + issuedCertificate = csr.Status.Certificate + return true, nil + } + klog.V(2).Infof("certificate signing request %s is approved, waiting to be issued", csr.Name) + } + + case *certificatesv1beta1.CertificateSigningRequest: + if csr.UID != reqUID { + return false, fmt.Errorf("csr %q changed UIDs", csr.Name) + } + approved := false + for _, c := range csr.Status.Conditions { + if c.Type == certificatesv1beta1.CertificateDenied { + return false, fmt.Errorf("certificate signing request is denied, reason: %v, message: %v", c.Reason, c.Message) + } + if c.Type == certificatesv1beta1.CertificateFailed { + return false, fmt.Errorf("certificate signing request failed, reason: %v, message: %v", c.Reason, c.Message) + } + if c.Type == certificatesv1beta1.CertificateApproved { + approved = true + } + } + if approved { + if len(csr.Status.Certificate) > 0 { + klog.V(2).Infof("certificate signing request %s is issued", csr.Name) + issuedCertificate = csr.Status.Certificate + return true, nil + } + klog.V(2).Infof("certificate signing request %s is approved, waiting to be issued", csr.Name) + } + + default: + return false, fmt.Errorf("unexpected type received: %T", event.Object) + } + + return false, nil + }, + ) + if err == wait.ErrWaitTimeout { + return nil, wait.ErrWaitTimeout + } + if err != nil { + return nil, formatError("cannot watch on the certificate signing request: %v", err) + } + + return issuedCertificate, nil +} + +// ensureCompatible ensures that a CSR object is compatible with an original CSR +func ensureCompatible(new, orig *certificatesv1.CertificateSigningRequest, privateKey interface{}) error { + newCSR, err := parseCSR(new.Spec.Request) + if err != nil { + return fmt.Errorf("unable to parse new csr: %v", err) + } + origCSR, err := parseCSR(orig.Spec.Request) + if err != nil { + return fmt.Errorf("unable to parse original csr: %v", err) + } + if !reflect.DeepEqual(newCSR.Subject, origCSR.Subject) { + return fmt.Errorf("csr subjects differ: new: %#v, orig: %#v", newCSR.Subject, origCSR.Subject) + } + if len(new.Spec.SignerName) > 0 && len(orig.Spec.SignerName) > 0 && new.Spec.SignerName != orig.Spec.SignerName { + return fmt.Errorf("csr signerNames differ: new %q, orig: %q", new.Spec.SignerName, orig.Spec.SignerName) + } + signer, ok := privateKey.(crypto.Signer) + if !ok { + return fmt.Errorf("privateKey is not a signer") + } + newCSR.PublicKey = signer.Public() + if err := newCSR.CheckSignature(); err != nil { + return fmt.Errorf("error validating signature new CSR against old key: %v", err) + } + if len(new.Status.Certificate) > 0 { + certs, err := certutil.ParseCertsPEM(new.Status.Certificate) + if err != nil { + return fmt.Errorf("error parsing signed certificate for CSR: %v", err) + } + now := time.Now() + for _, cert := range certs { + if now.After(cert.NotAfter) { + return fmt.Errorf("one of the certificates for the CSR has expired: %s", cert.NotAfter) + } + } + } + return nil +} + +// formatError preserves the type of an API message but alters the message. Expects +// a single argument format string, and returns the wrapped error. +func formatError(format string, err error) error { + if s, ok := err.(apierrors.APIStatus); ok { + se := &apierrors.StatusError{ErrStatus: s.Status()} + se.ErrStatus.Message = fmt.Sprintf(format, se.ErrStatus.Message) + return se + } + return fmt.Errorf(format, err) +} + +// parseCSR extracts the CSR from the API object and decodes it. +func parseCSR(pemData []byte) (*x509.CertificateRequest, error) { + // extract PEM from request object + block, _ := pem.Decode(pemData) + if block == nil || block.Type != "CERTIFICATE REQUEST" { + return nil, fmt.Errorf("PEM block type must be CERTIFICATE REQUEST") + } + return x509.ParseCertificateRequest(block.Bytes) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index bd0b303ec..bec9593ba 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -94,8 +94,6 @@ github.com/google/pprof/profile # github.com/google/uuid v1.3.0 ## explicit github.com/google/uuid -# github.com/googleapis/gnostic v0.5.5 -## explicit; go 1.12 # github.com/imdario/mergo v0.3.11 ## explicit; go 1.13 github.com/imdario/mergo @@ -755,8 +753,11 @@ k8s.io/client-go/tools/pager k8s.io/client-go/tools/record k8s.io/client-go/tools/record/util k8s.io/client-go/tools/reference +k8s.io/client-go/tools/watch k8s.io/client-go/transport k8s.io/client-go/util/cert +k8s.io/client-go/util/certificate +k8s.io/client-go/util/certificate/csr k8s.io/client-go/util/connrotation k8s.io/client-go/util/flowcontrol k8s.io/client-go/util/homedir