diff --git a/cmd/cert-approver/main.go b/cmd/cert-approver/main.go new file mode 100644 index 000000000..9e891f262 --- /dev/null +++ b/cmd/cert-approver/main.go @@ -0,0 +1,363 @@ +// Copyright (c) 2023 Network Plumbing Working Group +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This is Kubernetes controller which approves CSR submitted by multus. +// This command is required only if multus runs with per-node certificate. +package main + +// Note: cert-approver should be simple, just approve multus' CSR, hence +// this go code should not have any dependencies from pkg/, if possible, +// to keep its code simplicity. +import ( + "context" + "crypto/x509" + "encoding/pem" + "fmt" + "os" + "os/signal" + "reflect" + "strings" + "syscall" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/validation" + "k8s.io/apimachinery/pkg/util/wait" + + certificatesv1 "k8s.io/api/certificates/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" + + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/certificate/csr" + "k8s.io/client-go/util/workqueue" +) + +// CertController object +type CertController struct { + clientset kubernetes.Interface + queue workqueue.RateLimitingInterface + informer cache.SharedIndexInformer + broadcaster record.EventBroadcaster + recorder record.EventRecorder + commonNamePrefixes string +} + +const ( + maxDuration = time.Hour * 24 * 365 + resyncPeriod time.Duration = time.Second * 3600 // resync every one hour, default is 10 hour + maxRetries = 5 +) + +var ( + ControllerName = "csr-approver" + NamePrefix = "system:multus" + Organization = []string{"system:multus"} + Groups = sets.New[string]("system:nodes", "system:multus", "system:authenticated") + UserPrefixes = sets.New[string]("system:node", NamePrefix) + Usages = sets.New[certificatesv1.KeyUsage]( + certificatesv1.UsageDigitalSignature, + certificatesv1.UsageClientAuth) +) + +func NewCertController() (*CertController, error) { + var clientset kubernetes.Interface + /* setup Kubernetes API client */ + config, err := rest.InClusterConfig() + if err != nil { + return nil, err + } + + clientset, err = kubernetes.NewForConfig(config) + if err != nil { + return nil, err + } + + informer := cache.NewSharedIndexInformer( + cache.NewListWatchFromClient( + clientset.CertificatesV1().RESTClient(), + "certificatesigningrequests", corev1.NamespaceAll, fields.Everything()), + &certificatesv1.CertificateSigningRequest{}, + resyncPeriod, + nil) + + broadcaster := record.NewBroadcaster() + broadcaster.StartLogging(klog.Infof) + broadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: clientset.CoreV1().Events("")}) + recorder := broadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "cert-approver"}) + queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) + c := &CertController{ + clientset: clientset, + informer: informer, + queue: queue, + commonNamePrefixes: NamePrefix, + broadcaster: broadcaster, + recorder: recorder, + } + + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + if csr, ok := obj.(*certificatesv1.CertificateSigningRequest); ok { + if c.filterCSR(csr) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err == nil { + queue.Add(key) + } + } + } + }, + }) + + return c, nil +} + +func (c *CertController) Run(stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + defer c.queue.ShutDown() + + klog.Info("Starting cert approver") + + go c.informer.Run(stopCh) + if !cache.WaitForCacheSync(stopCh, c.HasSynced) { + utilruntime.HandleError(fmt.Errorf("Timed out waiting for caches to sync")) + return + } + + klog.Info("cert approver synced and ready") + wait.Until(c.runWorker, time.Second, stopCh) +} + +// HasSynced is required for the cache.Controller interface. +func (c *CertController) HasSynced() bool { + return c.informer.HasSynced() +} + +// LastSyncResourceVersion is required for the cache.Controller interface. +func (c *CertController) LastSyncResourceVersion() string { + return c.informer.LastSyncResourceVersion() +} + +func (c *CertController) runWorker() { + for c.processNextItem() { + // continue looping + } +} + +func (c *CertController) processNextItem() bool { + // Wait until there is a new item in the working queue + key, quit := c.queue.Get() + if quit { + return false + } + // Tell the queue that we are done with processing this key. This unblocks the key for other workers + // This allows safe parallel processing because two pods with the same key are never processed in + // parallel. + defer c.queue.Done(key) + + // Invoke the method containing the business logic + err := c.processItem(key.(string)) + // Handle the error if something went wrong during the execution of the business logic + c.handleErr(err, key) + return true + +} + +// handleErr checks if an error happened and makes sure we will retry later. +func (c *CertController) handleErr(err error, key interface{}) { + if err == nil { + // Forget about the #AddRateLimited history of the key on every successful synchronization. + // This ensures that future processing of updates for this key is not delayed because of + // an outdated error history. + c.queue.Forget(key) + return + } + + // This controller retries 5 times if something goes wrong. After that, it stops trying. + if c.queue.NumRequeues(key) < maxRetries { + klog.Infof("Error syncing csr %s: %v", key, err) + // Re-enqueue the key rate limited. Based on the rate limiter on the + // queue and the re-enqueue history, the key will be processed later again. + c.queue.AddRateLimited(key) + return + } + + c.queue.Forget(key) + // Report to an external entity that, even after several retries, we could not successfully process this key + utilruntime.HandleError(err) + klog.Infof("Dropping csr %q out of the queue: %v", key, err) +} + +func (c *CertController) processItem(key string) error { + startTime := time.Now() + + obj, _, err := c.informer.GetIndexer().GetByKey(key) + if err != nil { + return fmt.Errorf("Error fetching object with key %s from store: %v", key, err) + } + + req, _ := obj.(*certificatesv1.CertificateSigningRequest) + + nodeName := "unknown" + defer func() { + klog.Infof("Finished syncing CSR %s for %s node in %v", req.Name, nodeName, time.Since(startTime)) + }() + + if len(req.Status.Certificate) > 0 { + klog.V(5).Infof("CSR %s is already signed", req.Name) + return nil + } + + if isApprovedOrDenied(&req.Status) { + klog.V(5).Infof("CSR %s is already approved/denied", req.Name) + return nil + } + + csrPEM, _ := pem.Decode(req.Spec.Request) + if csrPEM == nil { + return fmt.Errorf("failed to PEM-parse the CSR block in .spec.request: no CSRs were found") + } + + x509CSR, err := x509.ParseCertificateRequest(csrPEM.Bytes) + if err != nil { + return fmt.Errorf("failed to parse the CSR bytes: %v", err) + } + + i := strings.LastIndex(req.Spec.Username, ":") + if i == -1 || i == len(req.Spec.Username)-1 { + return fmt.Errorf("failed to parse the username: %s", req.Spec.Username) + } + + ctx := context.Background() + prefix := req.Spec.Username[:i] + nodeName = req.Spec.Username[i+1:] + if !UserPrefixes.Has(prefix) { + return c.denyCSR(ctx, req, fmt.Sprintf("CSR %q was created by an unexpected user: %q", req.Name, req.Spec.Username)) + } + + if errs := validation.IsDNS1123Subdomain(nodeName); len(errs) != 0 { + return c.denyCSR(ctx, req, fmt.Sprintf("extracted node name %q is not a valid DNS subdomain %v", nodeName, errs)) + } + + if usages := sets.New[certificatesv1.KeyUsage](req.Spec.Usages...); !usages.Equal(Usages) { + return c.denyCSR(ctx, req, fmt.Sprintf("CSR %q was created with unexpected usages: %v", req.Name, usages.UnsortedList())) + } + + if !Groups.HasAll(req.Spec.Groups...) { + return c.denyCSR(ctx, req, fmt.Sprintf("CSR %q was created by a user with unexpected groups: %v", req.Name, req.Spec.Groups)) + } + + expectedSubject := fmt.Sprintf("%s:%s", c.commonNamePrefixes, nodeName) + if x509CSR.Subject.CommonName != expectedSubject { + return c.denyCSR(ctx, req, fmt.Sprintf("expected the CSR's commonName to be %q, but it is %q", expectedSubject, x509CSR.Subject.CommonName)) + } + + if !reflect.DeepEqual(x509CSR.Subject.Organization, Organization) { + return c.denyCSR(ctx, req, fmt.Sprintf("expected the CSR's organization to be %v, but it is %v", Organization, x509CSR.Subject.Organization)) + } + + if req.Spec.ExpirationSeconds == nil { + return c.denyCSR(ctx, req, fmt.Sprintf("CSR %q was created without specyfying the expirationSeconds", req.Name)) + } + + if csr.ExpirationSecondsToDuration(*req.Spec.ExpirationSeconds) > maxDuration { + return c.denyCSR(ctx, req, fmt.Sprintf("CSR %q was created with invalid expirationSeconds value: %d", req.Name, *req.Spec.ExpirationSeconds)) + } + + return c.approveCSR(ctx, req) +} + +// CSR specific functions + +func (c *CertController) filterCSR(csr *certificatesv1.CertificateSigningRequest) bool { + nsName := types.NamespacedName{Namespace: csr.Namespace, Name: csr.Name} + csrPEM, _ := pem.Decode(csr.Spec.Request) + if csrPEM == nil { + klog.Errorf("Failed to PEM-parse the CSR block in .spec.request: no CSRs were found in %s", nsName) + return false + } + + x509CSR, err := x509.ParseCertificateRequest(csrPEM.Bytes) + if err != nil { + klog.Errorf("Failed to parse the CSR .spec.request of %q: %v", nsName, err) + return false + } + + return strings.HasPrefix(x509CSR.Subject.CommonName, c.commonNamePrefixes) && + csr.Spec.SignerName == certificatesv1.KubeAPIServerClientSignerName +} + +func (c *CertController) approveCSR(ctx context.Context, csr *certificatesv1.CertificateSigningRequest) error { + csr.Status.Conditions = append(csr.Status.Conditions, + certificatesv1.CertificateSigningRequestCondition{ + Type: certificatesv1.CertificateApproved, + Status: corev1.ConditionTrue, + Reason: "AutoApproved", + Message: fmt.Sprintf("Auto-approved CSR %q", csr.Name), + }) + + c.recorder.Eventf(csr, corev1.EventTypeNormal, "CSRApproved", "CSR %q has been approved by %s", csr.Name, ControllerName) + _, err := c.clientset.CertificatesV1().CertificateSigningRequests().UpdateApproval(ctx, csr.Name, csr, metav1.UpdateOptions{}) + return err +} + +func (c *CertController) denyCSR(ctx context.Context, csr *certificatesv1.CertificateSigningRequest, message string) error { + csr.Status.Conditions = append(csr.Status.Conditions, + certificatesv1.CertificateSigningRequestCondition{ + Type: certificatesv1.CertificateDenied, + Status: corev1.ConditionTrue, + Reason: "CSRDenied", + Message: message, + }, + ) + + c.recorder.Eventf(csr, corev1.EventTypeWarning, "CSRDenied", "The CSR %q has been denied by: %s", csr.Name, ControllerName, message) + _, err := c.clientset.CertificatesV1().CertificateSigningRequests().Update(ctx, csr, metav1.UpdateOptions{}) + return err +} + +func isApprovedOrDenied(status *certificatesv1.CertificateSigningRequestStatus) bool { + for _, c := range status.Conditions { + if c.Type == certificatesv1.CertificateApproved || c.Type == certificatesv1.CertificateDenied { + return true + } + } + return false +} + +func main() { + klog.Infof("starting cert-approver") + + //Start watching for pod creations + certController, err := NewCertController() + if err != nil { + klog.Fatal(err) + } + + stopCh := make(chan struct{}) + defer close(stopCh) + go certController.Run(stopCh) + + sigterm := make(chan os.Signal, 1) + signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL) + <-sigterm +} diff --git a/cmd/kubeconfig_generator/main.go b/cmd/kubeconfig_generator/main.go new file mode 100644 index 000000000..b5c18036e --- /dev/null +++ b/cmd/kubeconfig_generator/main.go @@ -0,0 +1,133 @@ +// Copyright (c) 2023 Multus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This binary submit CSR for kube controll access for multus thin plugin +// and generate Kubeconfig +package main + +import ( + "encoding/base64" + "fmt" + "os" + "os/signal" + "syscall" + "text/template" + + "github.com/spf13/pflag" + + "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/k8sclient" + + "k8s.io/client-go/tools/clientcmd" + "k8s.io/klog/v2" +) + +var kubeConfigTemplate = `apiVersion: v1 +clusters: + - cluster: + certificate-authority-data: {{.CADATA}} + server: {{.K8S_APISERVER}} + name: default-cluster +contexts: + - context: + cluster: default-cluster + namespace: default + user: default-auth + name: default-context +current-context: default-context +kind: Config +preferences: {} +users: + - name: default-auth + user: + client-certificate: {{.CERTDIR}}/multus-client-current.pem + client-key: {{.CERTDIR}}/multus-client-current.pem +` + +func main() { + certDir := pflag.StringP("certdir", "", "/tmp", "specify cert directory") + bootstrapConfig := pflag.StringP("bootstrap-config", "", "/tmp/kubeconfig", "specify bootstrap kubernetes config") + kubeconfigPath := pflag.StringP("kubeconfig", "", "/run/multus/kubeconfig", "specify output kubeconfig path") + helpFlag := pflag.BoolP("help", "h", false, "show help message and quit") + + pflag.Parse() + if *helpFlag { + pflag.PrintDefaults() + os.Exit(1) + } + + // check variables + if _, err := os.Stat(*bootstrapConfig); err != nil { + klog.Fatalf("failed to read bootstrap config %q", *bootstrapConfig) + } + st, err := os.Stat(*certDir) + if err != nil { + klog.Fatalf("failed to find cert directory %q", *certDir) + } + if !st.IsDir() { + klog.Fatalf("cert directory %q is not directory", *certDir) + } + + nodeName := os.Getenv("K8S_NODE") + if nodeName == "" { + klog.Fatalf("cannot identify node name from K8S_NODE env variables") + } + + // retrieve API server from bootstrapConfig() + config, err := clientcmd.BuildConfigFromFlags("", *bootstrapConfig) + if err != nil { + klog.Fatalf("cannot get in-cluster config: %v", err) + } + apiServer := fmt.Sprintf("%s%s", config.Host, config.APIPath) + caData := base64.StdEncoding.EncodeToString(config.CAData) + + // run certManager to create certification + if _, err = k8sclient.PerNodeK8sClient(nodeName, *bootstrapConfig, *certDir); err != nil { + klog.Fatalf("failed to start cert manager: %v", err) + } + + fp, err := os.OpenFile(*kubeconfigPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600) + if err != nil { + klog.Fatalf("cannot create kubeconfig file %q: %v", *kubeconfigPath, err) + } + + // render kubeconfig + templateKubeconfig, err := template.New("kubeconfig").Parse(kubeConfigTemplate) + if err != nil { + klog.Fatalf("template parse error: %v", err) + } + templateData := map[string]string{ + "CADATA": caData, + "CERTDIR": *certDir, + "K8S_APISERVER": apiServer, + } + // genearate kubeconfig from template + if err = templateKubeconfig.Execute(fp, templateData); err != nil { + klog.Fatalf("cannot create kubeconfig: %v", err) + } + if err = fp.Close(); err != nil { + klog.Fatalf("cannot save kubeconfig: %v", err) + } + + klog.Infof("kubeconfig %q is saved", *kubeconfigPath) + + // wait for signal + sigterm := make(chan os.Signal, 1) + signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL) + <-sigterm + klog.Infof("signal received. remove kubeconfig %q and quit.", *kubeconfigPath) + err = os.Remove(*kubeconfigPath) + if err != nil { + klog.Errorf("failed to remove kubeconfig %q: %v", *kubeconfigPath, err) + } +} diff --git a/go.mod b/go.mod index b4216dac0..cb6b6f361 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( k8s.io/apimachinery v0.27.5 k8s.io/client-go v1.5.2 k8s.io/klog v1.0.0 - k8s.io/klog/v2 v2.90.1 // indirect + k8s.io/klog/v2 v2.90.1 k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f // indirect k8s.io/kubelet v0.27.5 sigs.k8s.io/yaml v1.3.0 // indirect diff --git a/hack/build-go.sh b/hack/build-go.sh index e0ecbca1e..2a0893624 100755 --- a/hack/build-go.sh +++ b/hack/build-go.sh @@ -76,3 +76,7 @@ echo "Building install_multus" go build -o "${DEST_DIR}"/install_multus ${BUILD_ARGS} -ldflags "${LDFLAGS}" ./cmd/install_multus echo "Building thin_entrypoint" go build -o "${DEST_DIR}"/thin_entrypoint ${BUILD_ARGS} -ldflags "${LDFLAGS}" ./cmd/thin_entrypoint +echo "Building kubeconfig_generator" +go build -o "${DEST_DIR}"/kubeconfig_generator ${BUILD_ARGS} -ldflags "${LDFLAGS}" ./cmd/kubeconfig_generator +echo "Building cert-approver" +go build -o "${DEST_DIR}"/cert-approver ${BUILD_ARGS} -ldflags "${LDFLAGS}" ./cmd/cert-approver diff --git a/images/Dockerfile b/images/Dockerfile index 3b0c17814..f55691a03 100644 --- a/images/Dockerfile +++ b/images/Dockerfile @@ -16,4 +16,6 @@ WORKDIR / COPY --from=build /usr/src/multus-cni/bin/install_multus / COPY --from=build /usr/src/multus-cni/bin/thin_entrypoint / -ENTRYPOINT ["/thin_entrypoint"] +COPY --from=build /usr/src/multus-cni/bin/kubeconfig_generator / +COPY --from=build /usr/src/multus-cni/bin/cert-approver / +CMD ["/thin_entrypoint"] diff --git a/images/Dockerfile.debug b/images/Dockerfile.debug index 4d837bb3b..47e54611b 100644 --- a/images/Dockerfile.debug +++ b/images/Dockerfile.debug @@ -16,4 +16,6 @@ WORKDIR / COPY --from=build /usr/src/multus-cni/bin/install_multus / COPY --from=build /usr/src/multus-cni/bin/thin_entrypoint / -ENTRYPOINT ["/thin_entrypoint"] +COPY --from=build /usr/src/multus-cni/bin/kubeconfig_generator / +COPY --from=build /usr/src/multus-cni/bin/cert-approver / +CMD ["/thin_entrypoint"] diff --git a/images/Dockerfile.thick b/images/Dockerfile.thick index 92c3fd8d1..ae170150a 100644 --- a/images/Dockerfile.thick +++ b/images/Dockerfile.thick @@ -11,6 +11,7 @@ FROM debian:stable-slim LABEL org.opencontainers.image.source https://github.com/k8snetworkplumbingwg/multus-cni COPY --from=build /usr/src/multus-cni/bin /usr/src/multus-cni/bin COPY --from=build /usr/src/multus-cni/LICENSE /usr/src/multus-cni/LICENSE +COPY --from=build /usr/src/multus-cni/bin/cert-approver / WORKDIR / ENTRYPOINT [ "/usr/src/multus-cni/bin/multus-daemon" ] diff --git a/pkg/k8sclient/k8sclient.go b/pkg/k8sclient/k8sclient.go index cdc209737..c1d1857a9 100644 --- a/pkg/k8sclient/k8sclient.go +++ b/pkg/k8sclient/k8sclient.go @@ -23,18 +23,12 @@ import ( "os" "regexp" "strings" - "time" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" - v1core "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/record" - "k8s.io/klog" "github.com/containernetworking/cni/libcni" "github.com/containernetworking/cni/pkg/skel" @@ -393,87 +387,6 @@ func TryLoadPodDelegates(pod *v1.Pod, conf *types.NetConf, clientInfo *ClientInf return 0, clientInfo, err } -// InClusterK8sClient returns the `k8s.ClientInfo` struct to use to connect to -// the k8s API. -func InClusterK8sClient() (*ClientInfo, error) { - clientInfo, err := GetK8sClient("", nil) - if err != nil { - return nil, err - } - if clientInfo == nil { - return nil, fmt.Errorf("failed to create in-cluster kube client") - } - return clientInfo, err -} - -// GetK8sClient gets client info from kubeconfig -func GetK8sClient(kubeconfig string, kubeClient *ClientInfo) (*ClientInfo, error) { - logging.Debugf("GetK8sClient: %s, %v", kubeconfig, kubeClient) - // If we get a valid kubeClient (eg from testcases) just return that - // one. - if kubeClient != nil { - return kubeClient, nil - } - - var err error - var config *rest.Config - - // Otherwise try to create a kubeClient from a given kubeConfig - if kubeconfig != "" { - // uses the current context in kubeconfig - config, err = clientcmd.BuildConfigFromFlags("", kubeconfig) - if err != nil { - return nil, logging.Errorf("GetK8sClient: failed to get context for the kubeconfig %v: %v", kubeconfig, err) - } - } else if os.Getenv("KUBERNETES_SERVICE_HOST") != "" && os.Getenv("KUBERNETES_SERVICE_PORT") != "" { - // Try in-cluster config where multus might be running in a kubernetes pod - config, err = rest.InClusterConfig() - if err != nil { - return nil, logging.Errorf("GetK8sClient: failed to get context for in-cluster kube config: %v", err) - } - } else { - // No kubernetes config; assume we shouldn't talk to Kube at all - return nil, nil - } - - // Specify that we use gRPC - config.AcceptContentTypes = "application/vnd.kubernetes.protobuf,application/json" - config.ContentType = "application/vnd.kubernetes.protobuf" - // Set the config timeout to one minute. - config.Timeout = time.Minute - // Allow multus (especially in server mode) to make more concurrent requests - // to reduce client-side throttling - config.QPS = 50 - config.Burst = 50 - - return newClientInfo(config) -} - -// newClientInfo returns a `ClientInfo` from a configuration created from an -// existing kubeconfig file. -func newClientInfo(config *rest.Config) (*ClientInfo, error) { - client, err := kubernetes.NewForConfig(config) - if err != nil { - return nil, err - } - - netclient, err := netclient.NewForConfig(config) - if err != nil { - return nil, err - } - - broadcaster := record.NewBroadcaster() - broadcaster.StartLogging(klog.Infof) - broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")}) - recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "multus"}) - return &ClientInfo{ - Client: client, - NetClient: netclient, - EventBroadcaster: broadcaster, - EventRecorder: recorder, - }, nil -} - // GetPodNetwork gets net-attach-def annotation from pod func GetPodNetwork(pod *v1.Pod) ([]*types.NetworkSelectionElement, error) { logging.Debugf("GetPodNetwork: %v", pod) diff --git a/pkg/k8sclient/kubeconfig.go b/pkg/k8sclient/kubeconfig.go new file mode 100644 index 000000000..9e9f2e8d0 --- /dev/null +++ b/pkg/k8sclient/kubeconfig.go @@ -0,0 +1,219 @@ +// Copyright (c) 2023 Multus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package k8sclient + +import ( + "context" + "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" + "fmt" + "os" + "path" + "time" + + certificatesv1 "k8s.io/api/certificates/v1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/transport" + "k8s.io/client-go/util/certificate" + "k8s.io/klog" + + netclient "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/clientset/versioned/typed/k8s.cni.cncf.io/v1" + "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/logging" +) + +const ( + certNamePrefix = "multus-client" + certCommonNamePrefix = "system:multus" + certOrganization = "system:multus" +) + +var ( + certUsages = []certificatesv1.KeyUsage{certificatesv1.UsageDigitalSignature, certificatesv1.UsageClientAuth} +) + +// getPerNodeKubeconfig creates new kubeConfig, based on bootstrap, with new certDir +func getPerNodeKubeconfig(bootstrap *rest.Config, certDir string) *rest.Config { + return &rest.Config{ + Host: bootstrap.Host, + APIPath: bootstrap.APIPath, + ContentConfig: rest.ContentConfig{ + AcceptContentTypes: "application/vnd.kubernetes.protobuf,application/json", + ContentType: "application/vnd.kubernetes.protobuf", + }, + TLSClientConfig: rest.TLSClientConfig{ + KeyFile: path.Join(certDir, certNamePrefix+"-current.pem"), + CertFile: path.Join(certDir, certNamePrefix+"-current.pem"), + CAData: bootstrap.TLSClientConfig.CAData, + }, + // Allow multus (especially in server mode) to make more concurrent requests + // to reduce client-side throttling + QPS: 50, + Burst: 50, + // Set the config timeout to one minute. + Timeout: time.Minute, + } +} + +// PerNodeK8sClient creates/reload new multus kubeconfig per-node. +func PerNodeK8sClient(nodeName, bootstrapKubeconfigFile, certDir string) (*ClientInfo, error) { + bootstrapKubeconfig, err := clientcmd.BuildConfigFromFlags("", bootstrapKubeconfigFile) + if err != nil { + return nil, logging.Errorf("failed to load bootstrap kubeconfig %s: %v", bootstrapKubeconfigFile, err) + } + config := getPerNodeKubeconfig(bootstrapKubeconfig, certDir) + + // If we have a valid certificate, user that to fetch CSRs. + // Otherwise, use the bootstrap credentials from bootstrapKubeconfig + // https://github.com/kubernetes/kubernetes/blob/068ee321bc7bfe1c2cefb87fb4d9e5deea84fbc8/cmd/kubelet/app/server.go#L953-L963 + newClientsetFn := func(current *tls.Certificate) (kubernetes.Interface, error) { + cfg := bootstrapKubeconfig + if current != nil { + cfg = config + } + return kubernetes.NewForConfig(cfg) + } + + certificateStore, err := certificate.NewFileStore(certNamePrefix, certDir, certDir, "", "") + if err != nil { + return nil, logging.Errorf("failed to initialize the certificate store: %v", err) + } + + certDuration := 10 * time.Minute + certManager, err := certificate.NewManager(&certificate.Config{ + ClientsetFn: newClientsetFn, + Template: &x509.CertificateRequest{ + Subject: pkix.Name{ + CommonName: fmt.Sprintf("%s:%s", certCommonNamePrefix, nodeName), + Organization: []string{certOrganization}, + }, + }, + RequestedCertificateLifetime: &certDuration, + SignerName: certificatesv1.KubeAPIServerClientSignerName, + Usages: certUsages, + CertificateStore: certificateStore, + }) + if err != nil { + return nil, logging.Errorf("failed to initialize the certificate manager: %v", err) + } + if certDuration < time.Hour { + // the default value for CertCallbackRefreshDuration (5min) is too long for short-lived certs, + // set it to a more sensible value + transport.CertCallbackRefreshDuration = time.Second * 10 + } + certManager.Start() + + logging.Verbosef("Waiting for certificate") + var storeErr error + err = wait.PollWithContext(context.TODO(), time.Second, 2*time.Minute, func(_ context.Context) (bool, error) { + var currentCert *tls.Certificate + currentCert, storeErr = certificateStore.Current() + return currentCert != nil && storeErr == nil, nil + }) + if err != nil { + return nil, logging.Errorf("certificate was not signed, last cert store err: %v err: %v", storeErr, err) + } + logging.Verbosef("Certificate found!") + + return newClientInfo(config) +} + +// InClusterK8sClient returns the `k8s.ClientInfo` struct to use to connect to +// the k8s API. +func InClusterK8sClient() (*ClientInfo, error) { + clientInfo, err := GetK8sClient("", nil) + if err != nil { + return nil, err + } + if clientInfo == nil { + return nil, fmt.Errorf("failed to create in-cluster kube client") + } + return clientInfo, err +} + +// GetK8sClient gets client info from kubeconfig +func GetK8sClient(kubeconfig string, kubeClient *ClientInfo) (*ClientInfo, error) { + logging.Debugf("GetK8sClient: %s, %v", kubeconfig, kubeClient) + // If we get a valid kubeClient (eg from testcases) just return that + // one. + if kubeClient != nil { + return kubeClient, nil + } + + var err error + var config *rest.Config + + // Otherwise try to create a kubeClient from a given kubeConfig + if kubeconfig != "" { + // uses the current context in kubeconfig + config, err = clientcmd.BuildConfigFromFlags("", kubeconfig) + if err != nil { + return nil, logging.Errorf("GetK8sClient: failed to get context for the kubeconfig %v: %v", kubeconfig, err) + } + } else if os.Getenv("KUBERNETES_SERVICE_HOST") != "" && os.Getenv("KUBERNETES_SERVICE_PORT") != "" { + // Try in-cluster config where multus might be running in a kubernetes pod + config, err = rest.InClusterConfig() + if err != nil { + return nil, logging.Errorf("GetK8sClient: failed to get context for in-cluster kube config: %v", err) + } + } else { + // No kubernetes config; assume we shouldn't talk to Kube at all + return nil, nil + } + + // Specify that we use gRPC + config.AcceptContentTypes = "application/vnd.kubernetes.protobuf,application/json" + config.ContentType = "application/vnd.kubernetes.protobuf" + // Set the config timeout to one minute. + config.Timeout = time.Minute + // Allow multus (especially in server mode) to make more concurrent requests + // to reduce client-side throttling + config.QPS = 50 + config.Burst = 50 + + return newClientInfo(config) +} + +// newClientInfo returns a `ClientInfo` from a configuration created from an +// existing kubeconfig file. +func newClientInfo(config *rest.Config) (*ClientInfo, error) { + client, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, err + } + + netclient, err := netclient.NewForConfig(config) + if err != nil { + return nil, err + } + + broadcaster := record.NewBroadcaster() + broadcaster.StartLogging(klog.Infof) + broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")}) + recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "multus"}) + return &ClientInfo{ + Client: client, + NetClient: netclient, + EventBroadcaster: broadcaster, + EventRecorder: recorder, + }, nil +} diff --git a/pkg/multus/multus_cni100_test.go b/pkg/multus/multus_cni100_test.go index fca8aac5b..51c4b6e23 100644 --- a/pkg/multus/multus_cni100_test.go +++ b/pkg/multus/multus_cni100_test.go @@ -42,7 +42,7 @@ import ( ) func newPodInformer(ctx context.Context, kclient kubernetes.Interface) cache.SharedIndexInformer { - informerFactory := informerfactory.NewSharedInformerFactory(kclient, 0 * time.Second) + informerFactory := informerfactory.NewSharedInformerFactory(kclient, 0*time.Second) podInformer := informerFactory.InformerFor(&kapi.Pod{}, func(c kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { return v1coreinformers.NewFilteredPodInformer( @@ -55,7 +55,7 @@ func newPodInformer(ctx context.Context, kclient kubernetes.Interface) cache.Sha informerFactory.Start(ctx.Done()) - waitCtx, waitCancel := context.WithTimeout(ctx, 20 * time.Second) + waitCtx, waitCancel := context.WithTimeout(ctx, 20*time.Second) if !cache.WaitForCacheSync(waitCtx.Done(), podInformer.HasSynced) { logging.Errorf("failed to sync pod informer cache") } diff --git a/pkg/server/server.go b/pkg/server/server.go index fb2ddafd5..4bea6ffc5 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -170,11 +170,38 @@ func newPodInformer(kubeClient kubernetes.Interface, nodeName string) (internali return informerFactory, podInformer } +func isPerNodeCertEnabled(config *PerNodeCertificate) (bool, error) { + if config != nil && config.Enabled { + if config.BootstrapKubeconfig != "" && config.CertDir != "" { + return true, nil + } + return true, logging.Errorf("failed to configure PerNodeCertificate: enabled: %v, BootstrapKubeconfig: %q, CertDir: %q", config.Enabled, config.BootstrapKubeconfig, config.CertDir) + } + return false, nil +} + // NewCNIServer creates and returns a new Server object which will listen on a socket in the given path func NewCNIServer(daemonConfig *ControllerNetConf, serverConfig []byte, ignoreReadinessIndicator bool) (*Server, error) { - kubeClient, err := k8s.InClusterK8sClient() - if err != nil { - return nil, fmt.Errorf("error getting k8s client: %v", err) + var kubeClient *k8s.ClientInfo + enabled, err := isPerNodeCertEnabled(daemonConfig.PerNodeCertificate) + if enabled { + if err != nil { + return nil, err + } + perNodeCertConfig := daemonConfig.PerNodeCertificate + nodeName := os.Getenv("K8S_NODE") + if nodeName == "" { + return nil, logging.Errorf("error getting node name for perNodeCertificate") + } + kubeClient, err = k8s.PerNodeK8sClient(nodeName, perNodeCertConfig.BootstrapKubeconfig, perNodeCertConfig.CertDir) + if err != nil { + return nil, logging.Errorf("error getting perNodeClient: %v", err) + } + } else { + kubeClient, err = k8s.InClusterK8sClient() + if err != nil { + return nil, fmt.Errorf("error getting k8s client: %v", err) + } } exec := invoke.Exec(nil) diff --git a/pkg/server/types.go b/pkg/server/types.go index b7cf11d7d..731ad2862 100644 --- a/pkg/server/types.go +++ b/pkg/server/types.go @@ -56,12 +56,20 @@ type Server struct { ignoreReadinessIndicator bool } +// PerNodeCertificate for auto certificate generation for per node +type PerNodeCertificate struct { + Enabled bool `json:"enabled,omitempty"` + BootstrapKubeconfig string `json:"bootstrapKubeconfig,omitempty"` + CertDir string `json:"certDir,omitempty"` +} + // ControllerNetConf for the controller cni configuration type ControllerNetConf struct { - ChrootDir string `json:"chrootDir,omitempty"` - LogFile string `json:"logFile"` - LogLevel string `json:"logLevel"` - LogToStderr bool `json:"logToStderr,omitempty"` + ChrootDir string `json:"chrootDir,omitempty"` + LogFile string `json:"logFile"` + LogLevel string `json:"logLevel"` + LogToStderr bool `json:"logToStderr,omitempty"` + PerNodeCertificate *PerNodeCertificate `json:"perNodeCertificate,omitempty"` MetricsPort *int `json:"metricsPort,omitempty"` diff --git a/vendor/k8s.io/client-go/tools/watch/informerwatcher.go b/vendor/k8s.io/client-go/tools/watch/informerwatcher.go new file mode 100644 index 000000000..5e6aad5cf --- /dev/null +++ b/vendor/k8s.io/client-go/tools/watch/informerwatcher.go @@ -0,0 +1,150 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "sync" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" +) + +func newEventProcessor(out chan<- watch.Event) *eventProcessor { + return &eventProcessor{ + out: out, + cond: sync.NewCond(&sync.Mutex{}), + done: make(chan struct{}), + } +} + +// eventProcessor buffers events and writes them to an out chan when a reader +// is waiting. Because of the requirement to buffer events, it synchronizes +// input with a condition, and synchronizes output with a channels. It needs to +// be able to yield while both waiting on an input condition and while blocked +// on writing to the output channel. +type eventProcessor struct { + out chan<- watch.Event + + cond *sync.Cond + buff []watch.Event + + done chan struct{} +} + +func (e *eventProcessor) run() { + for { + batch := e.takeBatch() + e.writeBatch(batch) + if e.stopped() { + return + } + } +} + +func (e *eventProcessor) takeBatch() []watch.Event { + e.cond.L.Lock() + defer e.cond.L.Unlock() + + for len(e.buff) == 0 && !e.stopped() { + e.cond.Wait() + } + + batch := e.buff + e.buff = nil + return batch +} + +func (e *eventProcessor) writeBatch(events []watch.Event) { + for _, event := range events { + select { + case e.out <- event: + case <-e.done: + return + } + } +} + +func (e *eventProcessor) push(event watch.Event) { + e.cond.L.Lock() + defer e.cond.L.Unlock() + defer e.cond.Signal() + e.buff = append(e.buff, event) +} + +func (e *eventProcessor) stopped() bool { + select { + case <-e.done: + return true + default: + return false + } +} + +func (e *eventProcessor) stop() { + close(e.done) + e.cond.Signal() +} + +// NewIndexerInformerWatcher will create an IndexerInformer and wrap it into watch.Interface +// so you can use it anywhere where you'd have used a regular Watcher returned from Watch method. +// it also returns a channel you can use to wait for the informers to fully shutdown. +func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface, <-chan struct{}) { + ch := make(chan watch.Event) + w := watch.NewProxyWatcher(ch) + e := newEventProcessor(ch) + + indexer, informer := cache.NewIndexerInformer(lw, objType, 0, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + e.push(watch.Event{ + Type: watch.Added, + Object: obj.(runtime.Object), + }) + }, + UpdateFunc: func(old, new interface{}) { + e.push(watch.Event{ + Type: watch.Modified, + Object: new.(runtime.Object), + }) + }, + DeleteFunc: func(obj interface{}) { + staleObj, stale := obj.(cache.DeletedFinalStateUnknown) + if stale { + // We have no means of passing the additional information down using + // watch API based on watch.Event but the caller can filter such + // objects by checking if metadata.deletionTimestamp is set + obj = staleObj.Obj + } + + e.push(watch.Event{ + Type: watch.Deleted, + Object: obj.(runtime.Object), + }) + }, + }, cache.Indexers{}) + + go e.run() + + doneCh := make(chan struct{}) + go func() { + defer close(doneCh) + defer e.stop() + informer.Run(w.StopChan()) + }() + + return indexer, informer, w, doneCh +} diff --git a/vendor/k8s.io/client-go/tools/watch/retrywatcher.go b/vendor/k8s.io/client-go/tools/watch/retrywatcher.go new file mode 100644 index 000000000..e4806d2ea --- /dev/null +++ b/vendor/k8s.io/client-go/tools/watch/retrywatcher.go @@ -0,0 +1,296 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "context" + "errors" + "fmt" + "io" + "net/http" + "time" + + "github.com/davecgh/go-spew/spew" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/net" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" +) + +// resourceVersionGetter is an interface used to get resource version from events. +// We can't reuse an interface from meta otherwise it would be a cyclic dependency and we need just this one method +type resourceVersionGetter interface { + GetResourceVersion() string +} + +// RetryWatcher will make sure that in case the underlying watcher is closed (e.g. due to API timeout or etcd timeout) +// it will get restarted from the last point without the consumer even knowing about it. +// RetryWatcher does that by inspecting events and keeping track of resourceVersion. +// Especially useful when using watch.UntilWithoutRetry where premature termination is causing issues and flakes. +// Please note that this is not resilient to etcd cache not having the resource version anymore - you would need to +// use Informers for that. +type RetryWatcher struct { + lastResourceVersion string + watcherClient cache.Watcher + resultChan chan watch.Event + stopChan chan struct{} + doneChan chan struct{} + minRestartDelay time.Duration +} + +// NewRetryWatcher creates a new RetryWatcher. +// It will make sure that watches gets restarted in case of recoverable errors. +// The initialResourceVersion will be given to watch method when first called. +func NewRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher) (*RetryWatcher, error) { + return newRetryWatcher(initialResourceVersion, watcherClient, 1*time.Second) +} + +func newRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher, minRestartDelay time.Duration) (*RetryWatcher, error) { + switch initialResourceVersion { + case "", "0": + // TODO: revisit this if we ever get WATCH v2 where it means start "now" + // without doing the synthetic list of objects at the beginning (see #74022) + return nil, fmt.Errorf("initial RV %q is not supported due to issues with underlying WATCH", initialResourceVersion) + default: + break + } + + rw := &RetryWatcher{ + lastResourceVersion: initialResourceVersion, + watcherClient: watcherClient, + stopChan: make(chan struct{}), + doneChan: make(chan struct{}), + resultChan: make(chan watch.Event, 0), + minRestartDelay: minRestartDelay, + } + + go rw.receive() + return rw, nil +} + +func (rw *RetryWatcher) send(event watch.Event) bool { + // Writing to an unbuffered channel is blocking operation + // and we need to check if stop wasn't requested while doing so. + select { + case rw.resultChan <- event: + return true + case <-rw.stopChan: + return false + } +} + +// doReceive returns true when it is done, false otherwise. +// If it is not done the second return value holds the time to wait before calling it again. +func (rw *RetryWatcher) doReceive() (bool, time.Duration) { + watcher, err := rw.watcherClient.Watch(metav1.ListOptions{ + ResourceVersion: rw.lastResourceVersion, + AllowWatchBookmarks: true, + }) + // We are very unlikely to hit EOF here since we are just establishing the call, + // but it may happen that the apiserver is just shutting down (e.g. being restarted) + // This is consistent with how it is handled for informers + switch err { + case nil: + break + + case io.EOF: + // watch closed normally + return false, 0 + + case io.ErrUnexpectedEOF: + klog.V(1).InfoS("Watch closed with unexpected EOF", "err", err) + return false, 0 + + default: + msg := "Watch failed" + if net.IsProbableEOF(err) || net.IsTimeout(err) { + klog.V(5).InfoS(msg, "err", err) + // Retry + return false, 0 + } + + klog.ErrorS(err, msg) + // Retry + return false, 0 + } + + if watcher == nil { + klog.ErrorS(nil, "Watch returned nil watcher") + // Retry + return false, 0 + } + + ch := watcher.ResultChan() + defer watcher.Stop() + + for { + select { + case <-rw.stopChan: + klog.V(4).InfoS("Stopping RetryWatcher.") + return true, 0 + case event, ok := <-ch: + if !ok { + klog.V(4).InfoS("Failed to get event! Re-creating the watcher.", "resourceVersion", rw.lastResourceVersion) + return false, 0 + } + + // We need to inspect the event and get ResourceVersion out of it + switch event.Type { + case watch.Added, watch.Modified, watch.Deleted, watch.Bookmark: + metaObject, ok := event.Object.(resourceVersionGetter) + if !ok { + _ = rw.send(watch.Event{ + Type: watch.Error, + Object: &apierrors.NewInternalError(errors.New("retryWatcher: doesn't support resourceVersion")).ErrStatus, + }) + // We have to abort here because this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data! + return true, 0 + } + + resourceVersion := metaObject.GetResourceVersion() + if resourceVersion == "" { + _ = rw.send(watch.Event{ + Type: watch.Error, + Object: &apierrors.NewInternalError(fmt.Errorf("retryWatcher: object %#v doesn't support resourceVersion", event.Object)).ErrStatus, + }) + // We have to abort here because this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data! + return true, 0 + } + + // All is fine; send the non-bookmark events and update resource version. + if event.Type != watch.Bookmark { + ok = rw.send(event) + if !ok { + return true, 0 + } + } + rw.lastResourceVersion = resourceVersion + + continue + + case watch.Error: + // This round trip allows us to handle unstructured status + errObject := apierrors.FromObject(event.Object) + statusErr, ok := errObject.(*apierrors.StatusError) + if !ok { + klog.Error(spew.Sprintf("Received an error which is not *metav1.Status but %#+v", event.Object)) + // Retry unknown errors + return false, 0 + } + + status := statusErr.ErrStatus + + statusDelay := time.Duration(0) + if status.Details != nil { + statusDelay = time.Duration(status.Details.RetryAfterSeconds) * time.Second + } + + switch status.Code { + case http.StatusGone: + // Never retry RV too old errors + _ = rw.send(event) + return true, 0 + + case http.StatusGatewayTimeout, http.StatusInternalServerError: + // Retry + return false, statusDelay + + default: + // We retry by default. RetryWatcher is meant to proceed unless it is certain + // that it can't. If we are not certain, we proceed with retry and leave it + // up to the user to timeout if needed. + + // Log here so we have a record of hitting the unexpected error + // and we can whitelist some error codes if we missed any that are expected. + klog.V(5).Info(spew.Sprintf("Retrying after unexpected error: %#+v", event.Object)) + + // Retry + return false, statusDelay + } + + default: + klog.Errorf("Failed to recognize Event type %q", event.Type) + _ = rw.send(watch.Event{ + Type: watch.Error, + Object: &apierrors.NewInternalError(fmt.Errorf("retryWatcher failed to recognize Event type %q", event.Type)).ErrStatus, + }) + // We are unable to restart the watch and have to stop the loop or this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data! + return true, 0 + } + } + } +} + +// receive reads the result from a watcher, restarting it if necessary. +func (rw *RetryWatcher) receive() { + defer close(rw.doneChan) + defer close(rw.resultChan) + + klog.V(4).Info("Starting RetryWatcher.") + defer klog.V(4).Info("Stopping RetryWatcher.") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + select { + case <-rw.stopChan: + cancel() + return + case <-ctx.Done(): + return + } + }() + + // We use non sliding until so we don't introduce delays on happy path when WATCH call + // timeouts or gets closed and we need to reestablish it while also avoiding hot loops. + wait.NonSlidingUntilWithContext(ctx, func(ctx context.Context) { + done, retryAfter := rw.doReceive() + if done { + cancel() + return + } + + timer := time.NewTimer(retryAfter) + select { + case <-ctx.Done(): + timer.Stop() + return + case <-timer.C: + } + + klog.V(4).Infof("Restarting RetryWatcher at RV=%q", rw.lastResourceVersion) + }, rw.minRestartDelay) +} + +// ResultChan implements Interface. +func (rw *RetryWatcher) ResultChan() <-chan watch.Event { + return rw.resultChan +} + +// Stop implements Interface. +func (rw *RetryWatcher) Stop() { + close(rw.stopChan) +} + +// Done allows the caller to be notified when Retry watcher stops. +func (rw *RetryWatcher) Done() <-chan struct{} { + return rw.doneChan +} diff --git a/vendor/k8s.io/client-go/tools/watch/until.go b/vendor/k8s.io/client-go/tools/watch/until.go new file mode 100644 index 000000000..a2474556b --- /dev/null +++ b/vendor/k8s.io/client-go/tools/watch/until.go @@ -0,0 +1,168 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "context" + "errors" + "fmt" + "time" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" +) + +// PreconditionFunc returns true if the condition has been reached, false if it has not been reached yet, +// or an error if the condition failed or detected an error state. +type PreconditionFunc func(store cache.Store) (bool, error) + +// ConditionFunc returns true if the condition has been reached, false if it has not been reached yet, +// or an error if the condition cannot be checked and should terminate. In general, it is better to define +// level driven conditions over edge driven conditions (pod has ready=true, vs pod modified and ready changed +// from false to true). +type ConditionFunc func(event watch.Event) (bool, error) + +// ErrWatchClosed is returned when the watch channel is closed before timeout in UntilWithoutRetry. +var ErrWatchClosed = errors.New("watch closed before UntilWithoutRetry timeout") + +// UntilWithoutRetry reads items from the watch until each provided condition succeeds, and then returns the last watch +// encountered. The first condition that returns an error terminates the watch (and the event is also returned). +// If no event has been received, the returned event will be nil. +// Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition. +// Waits until context deadline or until context is canceled. +// +// Warning: Unless you have a very specific use case (probably a special Watcher) don't use this function!!! +// Warning: This will fail e.g. on API timeouts and/or 'too old resource version' error. +// Warning: You are most probably looking for a function *Until* or *UntilWithSync* below, +// Warning: solving such issues. +// TODO: Consider making this function private to prevent misuse when the other occurrences in our codebase are gone. +func UntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions ...ConditionFunc) (*watch.Event, error) { + ch := watcher.ResultChan() + defer watcher.Stop() + var lastEvent *watch.Event + for _, condition := range conditions { + // check the next condition against the previous event and short circuit waiting for the next watch + if lastEvent != nil { + done, err := condition(*lastEvent) + if err != nil { + return lastEvent, err + } + if done { + continue + } + } + ConditionSucceeded: + for { + select { + case event, ok := <-ch: + if !ok { + return lastEvent, ErrWatchClosed + } + lastEvent = &event + + done, err := condition(event) + if err != nil { + return lastEvent, err + } + if done { + break ConditionSucceeded + } + + case <-ctx.Done(): + return lastEvent, wait.ErrWaitTimeout + } + } + } + return lastEvent, nil +} + +// Until wraps the watcherClient's watch function with RetryWatcher making sure that watcher gets restarted in case of errors. +// The initialResourceVersion will be given to watch method when first called. It shall not be "" or "0" +// given the underlying WATCH call issues (#74022). +// Remaining behaviour is identical to function UntilWithoutRetry. (See above.) +// Until can deal with API timeouts and lost connections. +// It guarantees you to see all events and in the order they happened. +// Due to this guarantee there is no way it can deal with 'Resource version too old error'. It will fail in this case. +// (See `UntilWithSync` if you'd prefer to recover from all the errors including RV too old by re-listing +// those items. In normal code you should care about being level driven so you'd not care about not seeing all the edges.) +// +// The most frequent usage for Until would be a test where you want to verify exact order of events ("edges"). +func Until(ctx context.Context, initialResourceVersion string, watcherClient cache.Watcher, conditions ...ConditionFunc) (*watch.Event, error) { + w, err := NewRetryWatcher(initialResourceVersion, watcherClient) + if err != nil { + return nil, err + } + + return UntilWithoutRetry(ctx, w, conditions...) +} + +// UntilWithSync creates an informer from lw, optionally checks precondition when the store is synced, +// and watches the output until each provided condition succeeds, in a way that is identical +// to function UntilWithoutRetry. (See above.) +// UntilWithSync can deal with all errors like API timeout, lost connections and 'Resource version too old'. +// It is the only function that can recover from 'Resource version too old', Until and UntilWithoutRetry will +// just fail in that case. On the other hand it can't provide you with guarantees as strong as using simple +// Watch method with Until. It can skip some intermediate events in case of watch function failing but it will +// re-list to recover and you always get an event, if there has been a change, after recovery. +// Also with the current implementation based on DeltaFIFO, order of the events you receive is guaranteed only for +// particular object, not between more of them even it's the same resource. +// The most frequent usage would be a command that needs to watch the "state of the world" and should't fail, like: +// waiting for object reaching a state, "small" controllers, ... +func UntilWithSync(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object, precondition PreconditionFunc, conditions ...ConditionFunc) (*watch.Event, error) { + indexer, informer, watcher, done := NewIndexerInformerWatcher(lw, objType) + // We need to wait for the internal informers to fully stop so it's easier to reason about + // and it works with non-thread safe clients. + defer func() { <-done }() + // Proxy watcher can be stopped multiple times so it's fine to use defer here to cover alternative branches and + // let UntilWithoutRetry to stop it + defer watcher.Stop() + + if precondition != nil { + if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) { + return nil, fmt.Errorf("UntilWithSync: unable to sync caches: %w", ctx.Err()) + } + + done, err := precondition(indexer) + if err != nil { + return nil, err + } + + if done { + return nil, nil + } + } + + return UntilWithoutRetry(ctx, watcher, conditions...) +} + +// ContextWithOptionalTimeout wraps context.WithTimeout and handles infinite timeouts expressed as 0 duration. +func ContextWithOptionalTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) { + if timeout < 0 { + // This should be handled in validation + klog.Errorf("Timeout for context shall not be negative!") + timeout = 0 + } + + if timeout == 0 { + return context.WithCancel(parent) + } + + return context.WithTimeout(parent, timeout) +} diff --git a/vendor/k8s.io/client-go/util/certificate/OWNERS b/vendor/k8s.io/client-go/util/certificate/OWNERS new file mode 100644 index 000000000..3c3b94c58 --- /dev/null +++ b/vendor/k8s.io/client-go/util/certificate/OWNERS @@ -0,0 +1,8 @@ +# See the OWNERS docs at https://go.k8s.io/owners + +approvers: + - sig-auth-certificates-approvers +reviewers: + - sig-auth-certificates-reviewers +labels: + - sig/auth diff --git a/vendor/k8s.io/client-go/util/certificate/certificate_manager.go b/vendor/k8s.io/client-go/util/certificate/certificate_manager.go new file mode 100644 index 000000000..b4dcb0b84 --- /dev/null +++ b/vendor/k8s.io/client-go/util/certificate/certificate_manager.go @@ -0,0 +1,775 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package certificate + +import ( + "context" + "crypto/ecdsa" + "crypto/elliptic" + cryptorand "crypto/rand" + "crypto/rsa" + "crypto/tls" + "crypto/x509" + "encoding/pem" + "errors" + "fmt" + "reflect" + "sync" + "time" + + "k8s.io/klog/v2" + + certificates "k8s.io/api/certificates/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/util/cert" + "k8s.io/client-go/util/certificate/csr" + "k8s.io/client-go/util/keyutil" +) + +var ( + // certificateWaitTimeout controls the amount of time we wait for certificate + // approval in one iteration. + certificateWaitTimeout = 15 * time.Minute + + kubeletServingUsagesWithEncipherment = []certificates.KeyUsage{ + // https://tools.ietf.org/html/rfc5280#section-4.2.1.3 + // + // Digital signature allows the certificate to be used to verify + // digital signatures used during TLS negotiation. + certificates.UsageDigitalSignature, + // KeyEncipherment allows the cert/key pair to be used to encrypt + // keys, including the symmetric keys negotiated during TLS setup + // and used for data transfer. + certificates.UsageKeyEncipherment, + // ServerAuth allows the cert to be used by a TLS server to + // authenticate itself to a TLS client. + certificates.UsageServerAuth, + } + kubeletServingUsagesNoEncipherment = []certificates.KeyUsage{ + // https://tools.ietf.org/html/rfc5280#section-4.2.1.3 + // + // Digital signature allows the certificate to be used to verify + // digital signatures used during TLS negotiation. + certificates.UsageDigitalSignature, + // ServerAuth allows the cert to be used by a TLS server to + // authenticate itself to a TLS client. + certificates.UsageServerAuth, + } + DefaultKubeletServingGetUsages = func(privateKey interface{}) []certificates.KeyUsage { + switch privateKey.(type) { + case *rsa.PrivateKey: + return kubeletServingUsagesWithEncipherment + default: + return kubeletServingUsagesNoEncipherment + } + } + kubeletClientUsagesWithEncipherment = []certificates.KeyUsage{ + // https://tools.ietf.org/html/rfc5280#section-4.2.1.3 + // + // Digital signature allows the certificate to be used to verify + // digital signatures used during TLS negotiation. + certificates.UsageDigitalSignature, + // KeyEncipherment allows the cert/key pair to be used to encrypt + // keys, including the symmetric keys negotiated during TLS setup + // and used for data transfer. + certificates.UsageKeyEncipherment, + // ClientAuth allows the cert to be used by a TLS client to + // authenticate itself to the TLS server. + certificates.UsageClientAuth, + } + kubeletClientUsagesNoEncipherment = []certificates.KeyUsage{ + // https://tools.ietf.org/html/rfc5280#section-4.2.1.3 + // + // Digital signature allows the certificate to be used to verify + // digital signatures used during TLS negotiation. + certificates.UsageDigitalSignature, + // ClientAuth allows the cert to be used by a TLS client to + // authenticate itself to the TLS server. + certificates.UsageClientAuth, + } + DefaultKubeletClientGetUsages = func(privateKey interface{}) []certificates.KeyUsage { + switch privateKey.(type) { + case *rsa.PrivateKey: + return kubeletClientUsagesWithEncipherment + default: + return kubeletClientUsagesNoEncipherment + } + } +) + +// Manager maintains and updates the certificates in use by this certificate +// manager. In the background it communicates with the API server to get new +// certificates for certificates about to expire. +type Manager interface { + // Start the API server status sync loop. + Start() + // Stop the cert manager loop. + Stop() + // Current returns the currently selected certificate from the + // certificate manager, as well as the associated certificate and key data + // in PEM format. + Current() *tls.Certificate + // ServerHealthy returns true if the manager is able to communicate with + // the server. This allows a caller to determine whether the cert manager + // thinks it can potentially talk to the API server. The cert manager may + // be very conservative and only return true if recent communication has + // occurred with the server. + ServerHealthy() bool +} + +// Config is the set of configuration parameters available for a new Manager. +type Config struct { + // ClientsetFn will be used to create a clientset for + // creating/fetching new certificate requests generated when a key rotation occurs. + // The function will never be invoked in parallel. + // It is passed the current client certificate if one exists. + ClientsetFn ClientsetFunc + // Template is the CertificateRequest that will be used as a template for + // generating certificate signing requests for all new keys generated as + // part of rotation. It follows the same rules as the template parameter of + // crypto.x509.CreateCertificateRequest in the Go standard libraries. + Template *x509.CertificateRequest + // GetTemplate returns the CertificateRequest that will be used as a template for + // generating certificate signing requests for all new keys generated as + // part of rotation. It follows the same rules as the template parameter of + // crypto.x509.CreateCertificateRequest in the Go standard libraries. + // If no template is available, nil may be returned, and no certificate will be requested. + // If specified, takes precedence over Template. + GetTemplate func() *x509.CertificateRequest + // SignerName is the name of the certificate signer that should sign certificates + // generated by the manager. + SignerName string + // RequestedCertificateLifetime is the requested lifetime length for certificates generated by the manager. + // Optional. + // This will set the spec.expirationSeconds field on the CSR. Controlling the lifetime of + // the issued certificate is not guaranteed as the signer may choose to ignore the request. + RequestedCertificateLifetime *time.Duration + // Usages is the types of usages that certificates generated by the manager + // can be used for. It is mutually exclusive with GetUsages. + Usages []certificates.KeyUsage + // GetUsages is dynamic way to get the types of usages that certificates generated by the manager + // can be used for. If Usages is not nil, GetUsages has to be nil, vice versa. + // It is mutually exclusive with Usages. + GetUsages func(privateKey interface{}) []certificates.KeyUsage + // CertificateStore is a persistent store where the current cert/key is + // kept and future cert/key pairs will be persisted after they are + // generated. + CertificateStore Store + // BootstrapCertificatePEM is the certificate data that will be returned + // from the Manager if the CertificateStore doesn't have any cert/key pairs + // currently available and has not yet had a chance to get a new cert/key + // pair from the API. If the CertificateStore does have a cert/key pair, + // this will be ignored. If there is no cert/key pair available in the + // CertificateStore, as soon as Start is called, it will request a new + // cert/key pair from the CertificateSigningRequestClient. This is intended + // to allow the first boot of a component to be initialized using a + // generic, multi-use cert/key pair which will be quickly replaced with a + // unique cert/key pair. + BootstrapCertificatePEM []byte + // BootstrapKeyPEM is the key data that will be returned from the Manager + // if the CertificateStore doesn't have any cert/key pairs currently + // available. If the CertificateStore does have a cert/key pair, this will + // be ignored. If the bootstrap cert/key pair are used, they will be + // rotated at the first opportunity, possibly well in advance of expiring. + // This is intended to allow the first boot of a component to be + // initialized using a generic, multi-use cert/key pair which will be + // quickly replaced with a unique cert/key pair. + BootstrapKeyPEM []byte `datapolicy:"security-key"` + // CertificateRotation will record a metric showing the time in seconds + // that certificates lived before being rotated. This metric is a histogram + // because there is value in keeping a history of rotation cadences. It + // allows one to setup monitoring and alerting of unexpected rotation + // behavior and track trends in rotation frequency. + CertificateRotation Histogram + // CertifcateRenewFailure will record a metric that keeps track of + // certificate renewal failures. + CertificateRenewFailure Counter + // Name is an optional string that will be used when writing log output + // or returning errors from manager methods. If not set, SignerName will + // be used, if SignerName is not set, if Usages includes client auth the + // name will be "client auth", otherwise the value will be "server". + Name string + // Logf is an optional function that log output will be sent to from the + // certificate manager. If not set it will use klog.V(2) + Logf func(format string, args ...interface{}) +} + +// Store is responsible for getting and updating the current certificate. +// Depending on the concrete implementation, the backing store for this +// behavior may vary. +type Store interface { + // Current returns the currently selected certificate, as well as the + // associated certificate and key data in PEM format. If the Store doesn't + // have a cert/key pair currently, it should return a NoCertKeyError so + // that the Manager can recover by using bootstrap certificates to request + // a new cert/key pair. + Current() (*tls.Certificate, error) + // Update accepts the PEM data for the cert/key pair and makes the new + // cert/key pair the 'current' pair, that will be returned by future calls + // to Current(). + Update(cert, key []byte) (*tls.Certificate, error) +} + +// Gauge will record the remaining lifetime of the certificate each time it is +// updated. +type Gauge interface { + Set(float64) +} + +// Histogram will record the time a rotated certificate was used before being +// rotated. +type Histogram interface { + Observe(float64) +} + +// Counter will wrap a counter with labels +type Counter interface { + Inc() +} + +// NoCertKeyError indicates there is no cert/key currently available. +type NoCertKeyError string + +// ClientsetFunc returns a new clientset for discovering CSR API availability and requesting CSRs. +// It is passed the current certificate if one is available and valid. +type ClientsetFunc func(current *tls.Certificate) (clientset.Interface, error) + +func (e *NoCertKeyError) Error() string { return string(*e) } + +type manager struct { + getTemplate func() *x509.CertificateRequest + + // lastRequestLock guards lastRequestCancel and lastRequest + lastRequestLock sync.Mutex + lastRequestCancel context.CancelFunc + lastRequest *x509.CertificateRequest + + dynamicTemplate bool + signerName string + requestedCertificateLifetime *time.Duration + getUsages func(privateKey interface{}) []certificates.KeyUsage + forceRotation bool + + certStore Store + + certificateRotation Histogram + certificateRenewFailure Counter + + // the following variables must only be accessed under certAccessLock + certAccessLock sync.RWMutex + cert *tls.Certificate + serverHealth bool + + // the clientFn must only be accessed under the clientAccessLock + clientAccessLock sync.Mutex + clientsetFn ClientsetFunc + stopCh chan struct{} + stopped bool + + // Set to time.Now but can be stubbed out for testing + now func() time.Time + + name string + logf func(format string, args ...interface{}) +} + +// NewManager returns a new certificate manager. A certificate manager is +// responsible for being the authoritative source of certificates in the +// Kubelet and handling updates due to rotation. +func NewManager(config *Config) (Manager, error) { + cert, forceRotation, err := getCurrentCertificateOrBootstrap( + config.CertificateStore, + config.BootstrapCertificatePEM, + config.BootstrapKeyPEM) + if err != nil { + return nil, err + } + + getTemplate := config.GetTemplate + if getTemplate == nil { + getTemplate = func() *x509.CertificateRequest { return config.Template } + } + + if config.GetUsages != nil && config.Usages != nil { + return nil, errors.New("cannot specify both GetUsages and Usages") + } + if config.GetUsages == nil && config.Usages == nil { + return nil, errors.New("either GetUsages or Usages should be specified") + } + var getUsages func(interface{}) []certificates.KeyUsage + if config.GetUsages != nil { + getUsages = config.GetUsages + } else { + getUsages = func(interface{}) []certificates.KeyUsage { return config.Usages } + } + m := manager{ + stopCh: make(chan struct{}), + clientsetFn: config.ClientsetFn, + getTemplate: getTemplate, + dynamicTemplate: config.GetTemplate != nil, + signerName: config.SignerName, + requestedCertificateLifetime: config.RequestedCertificateLifetime, + getUsages: getUsages, + certStore: config.CertificateStore, + cert: cert, + forceRotation: forceRotation, + certificateRotation: config.CertificateRotation, + certificateRenewFailure: config.CertificateRenewFailure, + now: time.Now, + } + + name := config.Name + if len(name) == 0 { + name = m.signerName + } + if len(name) == 0 { + usages := getUsages(nil) + switch { + case hasKeyUsage(usages, certificates.UsageClientAuth): + name = string(certificates.UsageClientAuth) + default: + name = "certificate" + } + } + + m.name = name + m.logf = config.Logf + if m.logf == nil { + m.logf = func(format string, args ...interface{}) { klog.V(2).Infof(format, args...) } + } + + return &m, nil +} + +// Current returns the currently selected certificate from the certificate +// manager. This can be nil if the manager was initialized without a +// certificate and has not yet received one from the +// CertificateSigningRequestClient, or if the current cert has expired. +func (m *manager) Current() *tls.Certificate { + m.certAccessLock.RLock() + defer m.certAccessLock.RUnlock() + if m.cert != nil && m.cert.Leaf != nil && m.now().After(m.cert.Leaf.NotAfter) { + m.logf("%s: Current certificate is expired", m.name) + return nil + } + return m.cert +} + +// ServerHealthy returns true if the cert manager believes the server +// is currently alive. +func (m *manager) ServerHealthy() bool { + m.certAccessLock.RLock() + defer m.certAccessLock.RUnlock() + return m.serverHealth +} + +// Stop terminates the manager. +func (m *manager) Stop() { + m.clientAccessLock.Lock() + defer m.clientAccessLock.Unlock() + if m.stopped { + return + } + close(m.stopCh) + m.stopped = true +} + +// Start will start the background work of rotating the certificates. +func (m *manager) Start() { + // Certificate rotation depends on access to the API server certificate + // signing API, so don't start the certificate manager if we don't have a + // client. + if m.clientsetFn == nil { + m.logf("%s: Certificate rotation is not enabled, no connection to the apiserver", m.name) + return + } + m.logf("%s: Certificate rotation is enabled", m.name) + + templateChanged := make(chan struct{}) + go wait.Until(func() { + deadline := m.nextRotationDeadline() + if sleepInterval := deadline.Sub(m.now()); sleepInterval > 0 { + m.logf("%s: Waiting %v for next certificate rotation", m.name, sleepInterval) + + timer := time.NewTimer(sleepInterval) + defer timer.Stop() + + select { + case <-timer.C: + // unblock when deadline expires + case <-templateChanged: + _, lastRequestTemplate := m.getLastRequest() + if reflect.DeepEqual(lastRequestTemplate, m.getTemplate()) { + // if the template now matches what we last requested, restart the rotation deadline loop + return + } + m.logf("%s: Certificate template changed, rotating", m.name) + } + } + + // Don't enter rotateCerts and trigger backoff if we don't even have a template to request yet + if m.getTemplate() == nil { + return + } + + backoff := wait.Backoff{ + Duration: 2 * time.Second, + Factor: 2, + Jitter: 0.1, + Steps: 5, + } + if err := wait.ExponentialBackoff(backoff, m.rotateCerts); err != nil { + utilruntime.HandleError(fmt.Errorf("%s: Reached backoff limit, still unable to rotate certs: %v", m.name, err)) + wait.PollInfinite(32*time.Second, m.rotateCerts) + } + }, time.Second, m.stopCh) + + if m.dynamicTemplate { + go wait.Until(func() { + // check if the current template matches what we last requested + lastRequestCancel, lastRequestTemplate := m.getLastRequest() + + if !m.certSatisfiesTemplate() && !reflect.DeepEqual(lastRequestTemplate, m.getTemplate()) { + // if the template is different, queue up an interrupt of the rotation deadline loop. + // if we've requested a CSR that matches the new template by the time the interrupt is handled, the interrupt is disregarded. + if lastRequestCancel != nil { + // if we're currently waiting on a submitted request that no longer matches what we want, stop waiting + lastRequestCancel() + } + select { + case templateChanged <- struct{}{}: + case <-m.stopCh: + } + } + }, time.Second, m.stopCh) + } +} + +func getCurrentCertificateOrBootstrap( + store Store, + bootstrapCertificatePEM []byte, + bootstrapKeyPEM []byte) (cert *tls.Certificate, shouldRotate bool, errResult error) { + + currentCert, err := store.Current() + if err == nil { + // if the current cert is expired, fall back to the bootstrap cert + if currentCert.Leaf != nil && time.Now().Before(currentCert.Leaf.NotAfter) { + return currentCert, false, nil + } + } else { + if _, ok := err.(*NoCertKeyError); !ok { + return nil, false, err + } + } + + if bootstrapCertificatePEM == nil || bootstrapKeyPEM == nil { + return nil, true, nil + } + + bootstrapCert, err := tls.X509KeyPair(bootstrapCertificatePEM, bootstrapKeyPEM) + if err != nil { + return nil, false, err + } + if len(bootstrapCert.Certificate) < 1 { + return nil, false, fmt.Errorf("no cert/key data found") + } + + certs, err := x509.ParseCertificates(bootstrapCert.Certificate[0]) + if err != nil { + return nil, false, fmt.Errorf("unable to parse certificate data: %v", err) + } + if len(certs) < 1 { + return nil, false, fmt.Errorf("no cert data found") + } + bootstrapCert.Leaf = certs[0] + + if _, err := store.Update(bootstrapCertificatePEM, bootstrapKeyPEM); err != nil { + utilruntime.HandleError(fmt.Errorf("unable to set the cert/key pair to the bootstrap certificate: %v", err)) + } + + return &bootstrapCert, true, nil +} + +func (m *manager) getClientset() (clientset.Interface, error) { + current := m.Current() + m.clientAccessLock.Lock() + defer m.clientAccessLock.Unlock() + return m.clientsetFn(current) +} + +// RotateCerts is exposed for testing only and is not a part of the public interface. +// Returns true if it changed the cert, false otherwise. Error is only returned in +// exceptional cases. +func (m *manager) RotateCerts() (bool, error) { + return m.rotateCerts() +} + +// rotateCerts attempts to request a client cert from the server, wait a reasonable +// period of time for it to be signed, and then update the cert on disk. If it cannot +// retrieve a cert, it will return false. It will only return error in exceptional cases. +// This method also keeps track of "server health" by interpreting the responses it gets +// from the server on the various calls it makes. +// TODO: return errors, have callers handle and log them correctly +func (m *manager) rotateCerts() (bool, error) { + m.logf("%s: Rotating certificates", m.name) + + template, csrPEM, keyPEM, privateKey, err := m.generateCSR() + if err != nil { + utilruntime.HandleError(fmt.Errorf("%s: Unable to generate a certificate signing request: %v", m.name, err)) + if m.certificateRenewFailure != nil { + m.certificateRenewFailure.Inc() + } + return false, nil + } + + // request the client each time + clientSet, err := m.getClientset() + if err != nil { + utilruntime.HandleError(fmt.Errorf("%s: Unable to load a client to request certificates: %v", m.name, err)) + if m.certificateRenewFailure != nil { + m.certificateRenewFailure.Inc() + } + return false, nil + } + + getUsages := m.getUsages + if m.getUsages == nil { + getUsages = DefaultKubeletClientGetUsages + } + usages := getUsages(privateKey) + // Call the Certificate Signing Request API to get a certificate for the + // new private key + reqName, reqUID, err := csr.RequestCertificate(clientSet, csrPEM, "", m.signerName, m.requestedCertificateLifetime, usages, privateKey) + if err != nil { + utilruntime.HandleError(fmt.Errorf("%s: Failed while requesting a signed certificate from the control plane: %v", m.name, err)) + if m.certificateRenewFailure != nil { + m.certificateRenewFailure.Inc() + } + return false, m.updateServerError(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), certificateWaitTimeout) + defer cancel() + + // Once we've successfully submitted a CSR for this template, record that we did so + m.setLastRequest(cancel, template) + + // Wait for the certificate to be signed. This interface and internal timout + // is a remainder after the old design using raw watch wrapped with backoff. + crtPEM, err := csr.WaitForCertificate(ctx, clientSet, reqName, reqUID) + if err != nil { + utilruntime.HandleError(fmt.Errorf("%s: certificate request was not signed: %v", m.name, err)) + if m.certificateRenewFailure != nil { + m.certificateRenewFailure.Inc() + } + return false, nil + } + + cert, err := m.certStore.Update(crtPEM, keyPEM) + if err != nil { + utilruntime.HandleError(fmt.Errorf("%s: Unable to store the new cert/key pair: %v", m.name, err)) + if m.certificateRenewFailure != nil { + m.certificateRenewFailure.Inc() + } + return false, nil + } + + if old := m.updateCached(cert); old != nil && m.certificateRotation != nil { + m.certificateRotation.Observe(m.now().Sub(old.Leaf.NotBefore).Seconds()) + } + + return true, nil +} + +// Check that the current certificate on disk satisfies the requests from the +// current template. +// +// Note that extra items in the certificate's SAN or orgs that don't exist in +// the template will not trigger a renewal. +// +// Requires certAccessLock to be locked. +func (m *manager) certSatisfiesTemplateLocked() bool { + if m.cert == nil { + return false + } + + if template := m.getTemplate(); template != nil { + if template.Subject.CommonName != m.cert.Leaf.Subject.CommonName { + m.logf("%s: Current certificate CN (%s) does not match requested CN (%s)", m.name, m.cert.Leaf.Subject.CommonName, template.Subject.CommonName) + return false + } + + currentDNSNames := sets.NewString(m.cert.Leaf.DNSNames...) + desiredDNSNames := sets.NewString(template.DNSNames...) + missingDNSNames := desiredDNSNames.Difference(currentDNSNames) + if len(missingDNSNames) > 0 { + m.logf("%s: Current certificate is missing requested DNS names %v", m.name, missingDNSNames.List()) + return false + } + + currentIPs := sets.NewString() + for _, ip := range m.cert.Leaf.IPAddresses { + currentIPs.Insert(ip.String()) + } + desiredIPs := sets.NewString() + for _, ip := range template.IPAddresses { + desiredIPs.Insert(ip.String()) + } + missingIPs := desiredIPs.Difference(currentIPs) + if len(missingIPs) > 0 { + m.logf("%s: Current certificate is missing requested IP addresses %v", m.name, missingIPs.List()) + return false + } + + currentOrgs := sets.NewString(m.cert.Leaf.Subject.Organization...) + desiredOrgs := sets.NewString(template.Subject.Organization...) + missingOrgs := desiredOrgs.Difference(currentOrgs) + if len(missingOrgs) > 0 { + m.logf("%s: Current certificate is missing requested orgs %v", m.name, missingOrgs.List()) + return false + } + } + + return true +} + +func (m *manager) certSatisfiesTemplate() bool { + m.certAccessLock.RLock() + defer m.certAccessLock.RUnlock() + return m.certSatisfiesTemplateLocked() +} + +// nextRotationDeadline returns a value for the threshold at which the +// current certificate should be rotated, 80%+/-10% of the expiration of the +// certificate. +func (m *manager) nextRotationDeadline() time.Time { + // forceRotation is not protected by locks + if m.forceRotation { + m.forceRotation = false + return m.now() + } + + m.certAccessLock.RLock() + defer m.certAccessLock.RUnlock() + + if !m.certSatisfiesTemplateLocked() { + return m.now() + } + + notAfter := m.cert.Leaf.NotAfter + totalDuration := float64(notAfter.Sub(m.cert.Leaf.NotBefore)) + deadline := m.cert.Leaf.NotBefore.Add(jitteryDuration(totalDuration)) + + m.logf("%s: Certificate expiration is %v, rotation deadline is %v", m.name, notAfter, deadline) + return deadline +} + +// jitteryDuration uses some jitter to set the rotation threshold so each node +// will rotate at approximately 70-90% of the total lifetime of the +// certificate. With jitter, if a number of nodes are added to a cluster at +// approximately the same time (such as cluster creation time), they won't all +// try to rotate certificates at the same time for the rest of the life of the +// cluster. +// +// This function is represented as a variable to allow replacement during testing. +var jitteryDuration = func(totalDuration float64) time.Duration { + return wait.Jitter(time.Duration(totalDuration), 0.2) - time.Duration(totalDuration*0.3) +} + +// updateCached sets the most recent retrieved cert and returns the old cert. +// It also sets the server as assumed healthy. +func (m *manager) updateCached(cert *tls.Certificate) *tls.Certificate { + m.certAccessLock.Lock() + defer m.certAccessLock.Unlock() + m.serverHealth = true + old := m.cert + m.cert = cert + return old +} + +// updateServerError takes an error returned by the server and infers +// the health of the server based on the error. It will return nil if +// the error does not require immediate termination of any wait loops, +// and otherwise it will return the error. +func (m *manager) updateServerError(err error) error { + m.certAccessLock.Lock() + defer m.certAccessLock.Unlock() + switch { + case apierrors.IsUnauthorized(err): + // SSL terminating proxies may report this error instead of the master + m.serverHealth = true + case apierrors.IsUnexpectedServerError(err): + // generally indicates a proxy or other load balancer problem, rather than a problem coming + // from the master + m.serverHealth = false + default: + // Identify known errors that could be expected for a cert request that + // indicate everything is working normally + m.serverHealth = apierrors.IsNotFound(err) || apierrors.IsForbidden(err) + } + return nil +} + +func (m *manager) generateCSR() (template *x509.CertificateRequest, csrPEM []byte, keyPEM []byte, key interface{}, err error) { + // Generate a new private key. + privateKey, err := ecdsa.GenerateKey(elliptic.P256(), cryptorand.Reader) + if err != nil { + return nil, nil, nil, nil, fmt.Errorf("%s: unable to generate a new private key: %v", m.name, err) + } + der, err := x509.MarshalECPrivateKey(privateKey) + if err != nil { + return nil, nil, nil, nil, fmt.Errorf("%s: unable to marshal the new key to DER: %v", m.name, err) + } + + keyPEM = pem.EncodeToMemory(&pem.Block{Type: keyutil.ECPrivateKeyBlockType, Bytes: der}) + + template = m.getTemplate() + if template == nil { + return nil, nil, nil, nil, fmt.Errorf("%s: unable to create a csr, no template available", m.name) + } + csrPEM, err = cert.MakeCSRFromTemplate(privateKey, template) + if err != nil { + return nil, nil, nil, nil, fmt.Errorf("%s: unable to create a csr from the private key: %v", m.name, err) + } + return template, csrPEM, keyPEM, privateKey, nil +} + +func (m *manager) getLastRequest() (context.CancelFunc, *x509.CertificateRequest) { + m.lastRequestLock.Lock() + defer m.lastRequestLock.Unlock() + return m.lastRequestCancel, m.lastRequest +} + +func (m *manager) setLastRequest(cancel context.CancelFunc, r *x509.CertificateRequest) { + m.lastRequestLock.Lock() + defer m.lastRequestLock.Unlock() + m.lastRequestCancel = cancel + m.lastRequest = r +} + +func hasKeyUsage(usages []certificates.KeyUsage, usage certificates.KeyUsage) bool { + for _, u := range usages { + if u == usage { + return true + } + } + return false +} diff --git a/vendor/k8s.io/client-go/util/certificate/certificate_store.go b/vendor/k8s.io/client-go/util/certificate/certificate_store.go new file mode 100644 index 000000000..e7ed58ee8 --- /dev/null +++ b/vendor/k8s.io/client-go/util/certificate/certificate_store.go @@ -0,0 +1,318 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package certificate + +import ( + "crypto/tls" + "crypto/x509" + "encoding/pem" + "fmt" + "os" + "path/filepath" + "time" + + certutil "k8s.io/client-go/util/cert" + "k8s.io/klog/v2" +) + +const ( + keyExtension = ".key" + certExtension = ".crt" + pemExtension = ".pem" + currentPair = "current" + updatedPair = "updated" +) + +type fileStore struct { + pairNamePrefix string + certDirectory string + keyDirectory string + certFile string + keyFile string +} + +// FileStore is a store that provides certificate retrieval as well as +// the path on disk of the current PEM. +type FileStore interface { + Store + // CurrentPath returns the path on disk of the current certificate/key + // pair encoded as PEM files. + CurrentPath() string +} + +// NewFileStore returns a concrete implementation of a Store that is based on +// storing the cert/key pairs in a single file per pair on disk in the +// designated directory. When starting up it will look for the currently +// selected cert/key pair in: +// +// 1. ${certDirectory}/${pairNamePrefix}-current.pem - both cert and key are in the same file. +// 2. ${certFile}, ${keyFile} +// 3. ${certDirectory}/${pairNamePrefix}.crt, ${keyDirectory}/${pairNamePrefix}.key +// +// The first one found will be used. If rotation is enabled, future cert/key +// updates will be written to the ${certDirectory} directory and +// ${certDirectory}/${pairNamePrefix}-current.pem will be created as a soft +// link to the currently selected cert/key pair. +func NewFileStore( + pairNamePrefix string, + certDirectory string, + keyDirectory string, + certFile string, + keyFile string) (FileStore, error) { + + s := fileStore{ + pairNamePrefix: pairNamePrefix, + certDirectory: certDirectory, + keyDirectory: keyDirectory, + certFile: certFile, + keyFile: keyFile, + } + if err := s.recover(); err != nil { + return nil, err + } + return &s, nil +} + +// CurrentPath returns the path to the current version of these certificates. +func (s *fileStore) CurrentPath() string { + return filepath.Join(s.certDirectory, s.filename(currentPair)) +} + +// recover checks if there is a certificate rotation that was interrupted while +// progress, and if so, attempts to recover to a good state. +func (s *fileStore) recover() error { + // If the 'current' file doesn't exist, continue on with the recovery process. + currentPath := filepath.Join(s.certDirectory, s.filename(currentPair)) + if exists, err := fileExists(currentPath); err != nil { + return err + } else if exists { + return nil + } + + // If the 'updated' file exists, and it is a symbolic link, continue on + // with the recovery process. + updatedPath := filepath.Join(s.certDirectory, s.filename(updatedPair)) + if fi, err := os.Lstat(updatedPath); err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } else if fi.Mode()&os.ModeSymlink != os.ModeSymlink { + return fmt.Errorf("expected %q to be a symlink but it is a file", updatedPath) + } + + // Move the 'updated' symlink to 'current'. + if err := os.Rename(updatedPath, currentPath); err != nil { + return fmt.Errorf("unable to rename %q to %q: %v", updatedPath, currentPath, err) + } + return nil +} + +func (s *fileStore) Current() (*tls.Certificate, error) { + pairFile := filepath.Join(s.certDirectory, s.filename(currentPair)) + if pairFileExists, err := fileExists(pairFile); err != nil { + return nil, err + } else if pairFileExists { + klog.Infof("Loading cert/key pair from %q.", pairFile) + return loadFile(pairFile) + } + + certFileExists, err := fileExists(s.certFile) + if err != nil { + return nil, err + } + keyFileExists, err := fileExists(s.keyFile) + if err != nil { + return nil, err + } + if certFileExists && keyFileExists { + klog.Infof("Loading cert/key pair from (%q, %q).", s.certFile, s.keyFile) + return loadX509KeyPair(s.certFile, s.keyFile) + } + + c := filepath.Join(s.certDirectory, s.pairNamePrefix+certExtension) + k := filepath.Join(s.keyDirectory, s.pairNamePrefix+keyExtension) + certFileExists, err = fileExists(c) + if err != nil { + return nil, err + } + keyFileExists, err = fileExists(k) + if err != nil { + return nil, err + } + if certFileExists && keyFileExists { + klog.Infof("Loading cert/key pair from (%q, %q).", c, k) + return loadX509KeyPair(c, k) + } + + noKeyErr := NoCertKeyError( + fmt.Sprintf("no cert/key files read at %q, (%q, %q) or (%q, %q)", + pairFile, + s.certFile, + s.keyFile, + s.certDirectory, + s.keyDirectory)) + return nil, &noKeyErr +} + +func loadFile(pairFile string) (*tls.Certificate, error) { + // LoadX509KeyPair knows how to parse combined cert and private key from + // the same file. + cert, err := tls.LoadX509KeyPair(pairFile, pairFile) + if err != nil { + return nil, fmt.Errorf("could not convert data from %q into cert/key pair: %v", pairFile, err) + } + certs, err := x509.ParseCertificates(cert.Certificate[0]) + if err != nil { + return nil, fmt.Errorf("unable to parse certificate data: %v", err) + } + cert.Leaf = certs[0] + return &cert, nil +} + +func (s *fileStore) Update(certData, keyData []byte) (*tls.Certificate, error) { + ts := time.Now().Format("2006-01-02-15-04-05") + pemFilename := s.filename(ts) + + if err := os.MkdirAll(s.certDirectory, 0755); err != nil { + return nil, fmt.Errorf("could not create directory %q to store certificates: %v", s.certDirectory, err) + } + certPath := filepath.Join(s.certDirectory, pemFilename) + + f, err := os.OpenFile(certPath, os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0600) + if err != nil { + return nil, fmt.Errorf("could not open %q: %v", certPath, err) + } + defer f.Close() + + // First cert is leaf, remainder are intermediates + certs, err := certutil.ParseCertsPEM(certData) + if err != nil { + return nil, fmt.Errorf("invalid certificate data: %v", err) + } + for _, c := range certs { + pem.Encode(f, &pem.Block{Type: "CERTIFICATE", Bytes: c.Raw}) + } + + keyBlock, _ := pem.Decode(keyData) + if keyBlock == nil { + return nil, fmt.Errorf("invalid key data") + } + pem.Encode(f, keyBlock) + + cert, err := loadFile(certPath) + if err != nil { + return nil, err + } + + if err := s.updateSymlink(certPath); err != nil { + return nil, err + } + return cert, nil +} + +// updateSymLink updates the current symlink to point to the file that is +// passed it. It will fail if there is a non-symlink file exists where the +// symlink is expected to be. +func (s *fileStore) updateSymlink(filename string) error { + // If the 'current' file either doesn't exist, or is already a symlink, + // proceed. Otherwise, this is an unrecoverable error. + currentPath := filepath.Join(s.certDirectory, s.filename(currentPair)) + currentPathExists := false + if fi, err := os.Lstat(currentPath); err != nil { + if !os.IsNotExist(err) { + return err + } + } else if fi.Mode()&os.ModeSymlink != os.ModeSymlink { + return fmt.Errorf("expected %q to be a symlink but it is a file", currentPath) + } else { + currentPathExists = true + } + + // If the 'updated' file doesn't exist, proceed. If it exists but it is a + // symlink, delete it. Otherwise, this is an unrecoverable error. + updatedPath := filepath.Join(s.certDirectory, s.filename(updatedPair)) + if fi, err := os.Lstat(updatedPath); err != nil { + if !os.IsNotExist(err) { + return err + } + } else if fi.Mode()&os.ModeSymlink != os.ModeSymlink { + return fmt.Errorf("expected %q to be a symlink but it is a file", updatedPath) + } else { + if err := os.Remove(updatedPath); err != nil { + return fmt.Errorf("unable to remove %q: %v", updatedPath, err) + } + } + + // Check that the new cert/key pair file exists to avoid rotating to an + // invalid cert/key. + if filenameExists, err := fileExists(filename); err != nil { + return err + } else if !filenameExists { + return fmt.Errorf("file %q does not exist so it can not be used as the currently selected cert/key", filename) + } + + // Ensure the source path is absolute to ensure the symlink target is + // correct when certDirectory is a relative path. + filename, err := filepath.Abs(filename) + if err != nil { + return err + } + + // Create the 'updated' symlink pointing to the requested file name. + if err := os.Symlink(filename, updatedPath); err != nil { + return fmt.Errorf("unable to create a symlink from %q to %q: %v", updatedPath, filename, err) + } + + // Replace the 'current' symlink. + if currentPathExists { + if err := os.Remove(currentPath); err != nil { + return fmt.Errorf("unable to remove %q: %v", currentPath, err) + } + } + if err := os.Rename(updatedPath, currentPath); err != nil { + return fmt.Errorf("unable to rename %q to %q: %v", updatedPath, currentPath, err) + } + return nil +} + +func (s *fileStore) filename(qualifier string) string { + return s.pairNamePrefix + "-" + qualifier + pemExtension +} + +func loadX509KeyPair(certFile, keyFile string) (*tls.Certificate, error) { + cert, err := tls.LoadX509KeyPair(certFile, keyFile) + if err != nil { + return nil, err + } + certs, err := x509.ParseCertificates(cert.Certificate[0]) + if err != nil { + return nil, fmt.Errorf("unable to parse certificate data: %v", err) + } + cert.Leaf = certs[0] + return &cert, nil +} + +// FileExists checks if specified file exists. +func fileExists(filename string) (bool, error) { + if _, err := os.Stat(filename); os.IsNotExist(err) { + return false, nil + } else if err != nil { + return false, err + } + return true, nil +} diff --git a/vendor/k8s.io/client-go/util/certificate/csr/csr.go b/vendor/k8s.io/client-go/util/certificate/csr/csr.go new file mode 100644 index 000000000..0390d1c02 --- /dev/null +++ b/vendor/k8s.io/client-go/util/certificate/csr/csr.go @@ -0,0 +1,364 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package csr + +import ( + "context" + "crypto" + "crypto/x509" + "encoding/pem" + "fmt" + "reflect" + "time" + + certificatesv1 "k8s.io/api/certificates/v1" + certificatesv1beta1 "k8s.io/api/certificates/v1beta1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + watchtools "k8s.io/client-go/tools/watch" + certutil "k8s.io/client-go/util/cert" + "k8s.io/klog/v2" + "k8s.io/utils/pointer" +) + +// RequestCertificate will either use an existing (if this process has run +// before but not to completion) or create a certificate signing request using the +// PEM encoded CSR and send it to API server. An optional requestedDuration may be passed +// to set the spec.expirationSeconds field on the CSR to control the lifetime of the issued +// certificate. This is not guaranteed as the signer may choose to ignore the request. +func RequestCertificate(client clientset.Interface, csrData []byte, name, signerName string, requestedDuration *time.Duration, usages []certificatesv1.KeyUsage, privateKey interface{}) (reqName string, reqUID types.UID, err error) { + csr := &certificatesv1.CertificateSigningRequest{ + // Username, UID, Groups will be injected by API server. + TypeMeta: metav1.TypeMeta{Kind: "CertificateSigningRequest"}, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: certificatesv1.CertificateSigningRequestSpec{ + Request: csrData, + Usages: usages, + SignerName: signerName, + }, + } + if len(csr.Name) == 0 { + csr.GenerateName = "csr-" + } + if requestedDuration != nil { + csr.Spec.ExpirationSeconds = DurationToExpirationSeconds(*requestedDuration) + } + + reqName, reqUID, err = create(client, csr) + switch { + case err == nil: + return reqName, reqUID, err + + case apierrors.IsAlreadyExists(err) && len(name) > 0: + klog.Infof("csr for this node already exists, reusing") + req, err := get(client, name) + if err != nil { + return "", "", formatError("cannot retrieve certificate signing request: %v", err) + } + if err := ensureCompatible(req, csr, privateKey); err != nil { + return "", "", fmt.Errorf("retrieved csr is not compatible: %v", err) + } + klog.Infof("csr for this node is still valid") + return req.Name, req.UID, nil + + default: + return "", "", formatError("cannot create certificate signing request: %v", err) + } +} + +func DurationToExpirationSeconds(duration time.Duration) *int32 { + return pointer.Int32(int32(duration / time.Second)) +} + +func ExpirationSecondsToDuration(expirationSeconds int32) time.Duration { + return time.Duration(expirationSeconds) * time.Second +} + +func get(client clientset.Interface, name string) (*certificatesv1.CertificateSigningRequest, error) { + v1req, v1err := client.CertificatesV1().CertificateSigningRequests().Get(context.TODO(), name, metav1.GetOptions{}) + if v1err == nil || !apierrors.IsNotFound(v1err) { + return v1req, v1err + } + + v1beta1req, v1beta1err := client.CertificatesV1beta1().CertificateSigningRequests().Get(context.TODO(), name, metav1.GetOptions{}) + if v1beta1err != nil { + return nil, v1beta1err + } + + v1req = &certificatesv1.CertificateSigningRequest{ + ObjectMeta: v1beta1req.ObjectMeta, + Spec: certificatesv1.CertificateSigningRequestSpec{ + Request: v1beta1req.Spec.Request, + }, + } + if v1beta1req.Spec.SignerName != nil { + v1req.Spec.SignerName = *v1beta1req.Spec.SignerName + } + for _, usage := range v1beta1req.Spec.Usages { + v1req.Spec.Usages = append(v1req.Spec.Usages, certificatesv1.KeyUsage(usage)) + } + return v1req, nil +} + +func create(client clientset.Interface, csr *certificatesv1.CertificateSigningRequest) (reqName string, reqUID types.UID, err error) { + // only attempt a create via v1 if we specified signerName and usages and are not using the legacy unknown signerName + if len(csr.Spec.Usages) > 0 && len(csr.Spec.SignerName) > 0 && csr.Spec.SignerName != "kubernetes.io/legacy-unknown" { + v1req, v1err := client.CertificatesV1().CertificateSigningRequests().Create(context.TODO(), csr, metav1.CreateOptions{}) + switch { + case v1err != nil && apierrors.IsNotFound(v1err): + // v1 CSR API was not found, continue to try v1beta1 + + case v1err != nil: + // other creation error + return "", "", v1err + + default: + // success + return v1req.Name, v1req.UID, v1err + } + } + + // convert relevant bits to v1beta1 + v1beta1csr := &certificatesv1beta1.CertificateSigningRequest{ + ObjectMeta: csr.ObjectMeta, + Spec: certificatesv1beta1.CertificateSigningRequestSpec{ + SignerName: &csr.Spec.SignerName, + Request: csr.Spec.Request, + }, + } + for _, usage := range csr.Spec.Usages { + v1beta1csr.Spec.Usages = append(v1beta1csr.Spec.Usages, certificatesv1beta1.KeyUsage(usage)) + } + + // create v1beta1 + v1beta1req, v1beta1err := client.CertificatesV1beta1().CertificateSigningRequests().Create(context.TODO(), v1beta1csr, metav1.CreateOptions{}) + if v1beta1err != nil { + return "", "", v1beta1err + } + return v1beta1req.Name, v1beta1req.UID, nil +} + +// WaitForCertificate waits for a certificate to be issued until timeout, or returns an error. +func WaitForCertificate(ctx context.Context, client clientset.Interface, reqName string, reqUID types.UID) (certData []byte, err error) { + fieldSelector := fields.OneTermEqualSelector("metadata.name", reqName).String() + + var lw *cache.ListWatch + var obj runtime.Object + for { + // see if the v1 API is available + if _, err := client.CertificatesV1().CertificateSigningRequests().List(ctx, metav1.ListOptions{FieldSelector: fieldSelector}); err == nil { + // watch v1 objects + obj = &certificatesv1.CertificateSigningRequest{} + lw = &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + options.FieldSelector = fieldSelector + return client.CertificatesV1().CertificateSigningRequests().List(ctx, options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + options.FieldSelector = fieldSelector + return client.CertificatesV1().CertificateSigningRequests().Watch(ctx, options) + }, + } + break + } else { + klog.V(2).Infof("error fetching v1 certificate signing request: %v", err) + } + + // return if we've timed out + if err := ctx.Err(); err != nil { + return nil, wait.ErrWaitTimeout + } + + // see if the v1beta1 API is available + if _, err := client.CertificatesV1beta1().CertificateSigningRequests().List(ctx, metav1.ListOptions{FieldSelector: fieldSelector}); err == nil { + // watch v1beta1 objects + obj = &certificatesv1beta1.CertificateSigningRequest{} + lw = &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + options.FieldSelector = fieldSelector + return client.CertificatesV1beta1().CertificateSigningRequests().List(ctx, options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + options.FieldSelector = fieldSelector + return client.CertificatesV1beta1().CertificateSigningRequests().Watch(ctx, options) + }, + } + break + } else { + klog.V(2).Infof("error fetching v1beta1 certificate signing request: %v", err) + } + + // return if we've timed out + if err := ctx.Err(); err != nil { + return nil, wait.ErrWaitTimeout + } + + // wait and try again + time.Sleep(time.Second) + } + + var issuedCertificate []byte + _, err = watchtools.UntilWithSync( + ctx, + lw, + obj, + nil, + func(event watch.Event) (bool, error) { + switch event.Type { + case watch.Modified, watch.Added: + case watch.Deleted: + return false, fmt.Errorf("csr %q was deleted", reqName) + default: + return false, nil + } + + switch csr := event.Object.(type) { + case *certificatesv1.CertificateSigningRequest: + if csr.UID != reqUID { + return false, fmt.Errorf("csr %q changed UIDs", csr.Name) + } + approved := false + for _, c := range csr.Status.Conditions { + if c.Type == certificatesv1.CertificateDenied { + return false, fmt.Errorf("certificate signing request is denied, reason: %v, message: %v", c.Reason, c.Message) + } + if c.Type == certificatesv1.CertificateFailed { + return false, fmt.Errorf("certificate signing request failed, reason: %v, message: %v", c.Reason, c.Message) + } + if c.Type == certificatesv1.CertificateApproved { + approved = true + } + } + if approved { + if len(csr.Status.Certificate) > 0 { + klog.V(2).Infof("certificate signing request %s is issued", csr.Name) + issuedCertificate = csr.Status.Certificate + return true, nil + } + klog.V(2).Infof("certificate signing request %s is approved, waiting to be issued", csr.Name) + } + + case *certificatesv1beta1.CertificateSigningRequest: + if csr.UID != reqUID { + return false, fmt.Errorf("csr %q changed UIDs", csr.Name) + } + approved := false + for _, c := range csr.Status.Conditions { + if c.Type == certificatesv1beta1.CertificateDenied { + return false, fmt.Errorf("certificate signing request is denied, reason: %v, message: %v", c.Reason, c.Message) + } + if c.Type == certificatesv1beta1.CertificateFailed { + return false, fmt.Errorf("certificate signing request failed, reason: %v, message: %v", c.Reason, c.Message) + } + if c.Type == certificatesv1beta1.CertificateApproved { + approved = true + } + } + if approved { + if len(csr.Status.Certificate) > 0 { + klog.V(2).Infof("certificate signing request %s is issued", csr.Name) + issuedCertificate = csr.Status.Certificate + return true, nil + } + klog.V(2).Infof("certificate signing request %s is approved, waiting to be issued", csr.Name) + } + + default: + return false, fmt.Errorf("unexpected type received: %T", event.Object) + } + + return false, nil + }, + ) + if err == wait.ErrWaitTimeout { + return nil, wait.ErrWaitTimeout + } + if err != nil { + return nil, formatError("cannot watch on the certificate signing request: %v", err) + } + + return issuedCertificate, nil +} + +// ensureCompatible ensures that a CSR object is compatible with an original CSR +func ensureCompatible(new, orig *certificatesv1.CertificateSigningRequest, privateKey interface{}) error { + newCSR, err := parseCSR(new.Spec.Request) + if err != nil { + return fmt.Errorf("unable to parse new csr: %v", err) + } + origCSR, err := parseCSR(orig.Spec.Request) + if err != nil { + return fmt.Errorf("unable to parse original csr: %v", err) + } + if !reflect.DeepEqual(newCSR.Subject, origCSR.Subject) { + return fmt.Errorf("csr subjects differ: new: %#v, orig: %#v", newCSR.Subject, origCSR.Subject) + } + if len(new.Spec.SignerName) > 0 && len(orig.Spec.SignerName) > 0 && new.Spec.SignerName != orig.Spec.SignerName { + return fmt.Errorf("csr signerNames differ: new %q, orig: %q", new.Spec.SignerName, orig.Spec.SignerName) + } + signer, ok := privateKey.(crypto.Signer) + if !ok { + return fmt.Errorf("privateKey is not a signer") + } + newCSR.PublicKey = signer.Public() + if err := newCSR.CheckSignature(); err != nil { + return fmt.Errorf("error validating signature new CSR against old key: %v", err) + } + if len(new.Status.Certificate) > 0 { + certs, err := certutil.ParseCertsPEM(new.Status.Certificate) + if err != nil { + return fmt.Errorf("error parsing signed certificate for CSR: %v", err) + } + now := time.Now() + for _, cert := range certs { + if now.After(cert.NotAfter) { + return fmt.Errorf("one of the certificates for the CSR has expired: %s", cert.NotAfter) + } + } + } + return nil +} + +// formatError preserves the type of an API message but alters the message. Expects +// a single argument format string, and returns the wrapped error. +func formatError(format string, err error) error { + if s, ok := err.(apierrors.APIStatus); ok { + se := &apierrors.StatusError{ErrStatus: s.Status()} + se.ErrStatus.Message = fmt.Sprintf(format, se.ErrStatus.Message) + return se + } + return fmt.Errorf(format, err) +} + +// parseCSR extracts the CSR from the API object and decodes it. +func parseCSR(pemData []byte) (*x509.CertificateRequest, error) { + // extract PEM from request object + block, _ := pem.Decode(pemData) + if block == nil || block.Type != "CERTIFICATE REQUEST" { + return nil, fmt.Errorf("PEM block type must be CERTIFICATE REQUEST") + } + return x509.ParseCertificateRequest(block.Bytes) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index bd0b303ec..bec9593ba 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -94,8 +94,6 @@ github.com/google/pprof/profile # github.com/google/uuid v1.3.0 ## explicit github.com/google/uuid -# github.com/googleapis/gnostic v0.5.5 -## explicit; go 1.12 # github.com/imdario/mergo v0.3.11 ## explicit; go 1.13 github.com/imdario/mergo @@ -755,8 +753,11 @@ k8s.io/client-go/tools/pager k8s.io/client-go/tools/record k8s.io/client-go/tools/record/util k8s.io/client-go/tools/reference +k8s.io/client-go/tools/watch k8s.io/client-go/transport k8s.io/client-go/util/cert +k8s.io/client-go/util/certificate +k8s.io/client-go/util/certificate/csr k8s.io/client-go/util/connrotation k8s.io/client-go/util/flowcontrol k8s.io/client-go/util/homedir