diff --git a/app-policy/config/install/05-calico.yaml b/app-policy/config/install/05-calico.yaml index 9604987f11d..75f02ee0046 100644 --- a/app-policy/config/install/05-calico.yaml +++ b/app-policy/config/install/05-calico.yaml @@ -96,6 +96,13 @@ rules: - get - list - watch + - apiGroups: [""] + resources: + - serviceaccounts/token + resourceNames: + - calico-node + verbs: + - create --- apiVersion: rbac.authorization.k8s.io/v1beta1 diff --git a/calico/_includes/charts/calico/templates/calico-node-rbac.yaml b/calico/_includes/charts/calico/templates/calico-node-rbac.yaml index 5d39ecd0f53..3aa1ecae5d0 100644 --- a/calico/_includes/charts/calico/templates/calico-node-rbac.yaml +++ b/calico/_includes/charts/calico/templates/calico-node-rbac.yaml @@ -70,6 +70,14 @@ rules: - pods/status verbs: - patch + # Used for creating service account tokens to be used by the CNI plugin + - apiGroups: [""] + resources: + - serviceaccounts/token + resourceNames: + - calico-node + verbs: + - create # Calico monitors various CRDs for config. - apiGroups: ["crd.projectcalico.org"] resources: diff --git a/calico/getting-started/kubernetes/hardway/install-node.md b/calico/getting-started/kubernetes/hardway/install-node.md index d2dc657a8a1..6051d9128f1 100644 --- a/calico/getting-started/kubernetes/hardway/install-node.md +++ b/calico/getting-started/kubernetes/hardway/install-node.md @@ -121,6 +121,14 @@ rules: - pods/status verbs: - patch + # Used for creating service account tokens to be used by the CNI plugin + - apiGroups: [""] + resources: + - serviceaccounts/token + resourceNames: + - calico-node + verbs: + - create # Calico monitors various CRDs for config. - apiGroups: ["crd.projectcalico.org"] resources: diff --git a/cni-plugin/pkg/install/install.go b/cni-plugin/pkg/install/install.go index f6fc2f71dbd..ade37fd283e 100644 --- a/cni-plugin/pkg/install/install.go +++ b/cni-plugin/pkg/install/install.go @@ -34,6 +34,7 @@ import ( "k8s.io/client-go/rest" "github.com/projectcalico/calico/libcalico-go/lib/names" + "github.com/projectcalico/calico/node/pkg/cni" ) type config struct { @@ -466,7 +467,16 @@ contexts: user: calico current-context: calico-context` - data = strings.Replace(data, "TOKEN", kubecfg.BearerToken, 1) + clientset, err := cni.InClusterClientSet() + if err != nil { + logrus.WithError(err).Fatal("Unable to create client for generating CNI token") + } + tr := cni.NewTokenRefresher(clientset, cni.NamespaceOfUsedServiceAccount(), cni.DefaultServiceAccountName) + tu, err := tr.UpdateToken() + if err != nil { + logrus.WithError(err).Fatal("Unable to create token for CNI kubeconfig") + } + data = strings.Replace(data, "TOKEN", tu.Token, 1) data = strings.Replace(data, "__KUBERNETES_SERVICE_PROTOCOL__", getEnv("KUBERNETES_SERVICE_PROTOCOL", "https"), -1) data = strings.Replace(data, "__KUBERNETES_SERVICE_HOST__", getEnv("KUBERNETES_SERVICE_HOST", ""), -1) data = strings.Replace(data, "__KUBERNETES_SERVICE_PORT__", getEnv("KUBERNETES_SERVICE_PORT", ""), -1) diff --git a/node/Makefile b/node/Makefile index 70fe6e6ebc0..17cccf1a37e 100644 --- a/node/Makefile +++ b/node/Makefile @@ -281,7 +281,7 @@ fv: run-k8s-apiserver -v $(CERTS_PATH):/home/user/certs \ -e KUBECONFIG=/go/src/github.com/projectcalico/calico/hack/test/certs/kubeconfig \ -e ETCD_ENDPOINTS=http://$(LOCAL_IP_ENV):2379 \ - $(CALICO_BUILD) ginkgo -cover -r -skipPackage vendor pkg/lifecycle/startup pkg/allocateip $(GINKGO_ARGS) + $(CALICO_BUILD) ginkgo -cover -r -skipPackage vendor pkg/lifecycle/startup pkg/allocateip pkg/cni $(GINKGO_ARGS) # Skip packages containing FV tests. UT_PACKAGES_TO_SKIP?=pkg/lifecycle/startup,pkg/allocateip,pkg/status diff --git a/node/pkg/cni/token_watch.go b/node/pkg/cni/token_watch.go index d78db589c36..f1bbf6f5ef1 100644 --- a/node/pkg/cni/token_watch.go +++ b/node/pkg/cni/token_watch.go @@ -1,56 +1,231 @@ package cni import ( + "context" "encoding/base64" + "encoding/json" "fmt" "io/ioutil" + "math/rand" "os" + "strings" + "sync" "time" "github.com/sirupsen/logrus" - "gopkg.in/fsnotify/fsnotify.v1" + authenticationv1 "k8s.io/api/authentication/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ) -var serviceaccountDirectory string = "/var/run/secrets/kubernetes.io/serviceaccount/" +const DefaultServiceAccountName = "calico-node" +const serviceAccountNamespace = "/var/run/secrets/kubernetes.io/serviceaccount/namespace" +const tokenFile = "/var/run/secrets/kubernetes.io/serviceaccount/token" +const defaultCNITokenValiditySeconds = 24 * 60 * 60 +const minTokenRetryDuration = 5 * time.Second +const defaultRefreshFraction = 4 + var kubeconfigPath string = "/host/etc/cni/net.d/calico-kubeconfig" -func Run() { - // Log to stdout. this prevents our logs from being interpreted as errors by, for example, - // fluentd's default configuration. - logrus.SetOutput(os.Stdout) +type TokenRefresher struct { + tokenSupported bool + tokenOnce *sync.Once - // Create a watcher for file changes. - watcher, err := fsnotify.NewWatcher() + tokenValiditySeconds int64 + minTokenRetryDuration time.Duration + defaultRefreshFraction time.Duration + + clientset *kubernetes.Clientset + + namespace string + serviceAccountName string + + tokenChan chan TokenUpdate +} + +type TokenUpdate struct { + Token string + ExpirationTime time.Time +} + +func NamespaceOfUsedServiceAccount() string { + namespace, err := ioutil.ReadFile(serviceAccountNamespace) if err != nil { - panic(err) + logrus.WithError(err).Fatal("Failed to read service account namespace file") } - defer watcher.Close() + return string(namespace) +} - // Watch for changes to the serviceaccount directory. Rety if necessary. +func InClusterClientSet() (*kubernetes.Clientset, error) { + cfg, err := rest.InClusterConfig() + if err != nil { + logrus.WithError(err).Error("Error generating kube config.") + return nil, err + } + err = rest.LoadTLSFiles(cfg) + if err != nil { + logrus.WithError(err).Error("Error loading TLS files.") + return nil, err + } + clientset, err := kubernetes.NewForConfig(cfg) + if err != nil { + logrus.WithError(err).Error("Failed to create clientset for CNI kubeconfig") + return nil, err + } + return clientset, nil +} + +func NewTokenRefresher(clientset *kubernetes.Clientset, namespace string, serviceAccountName string) *TokenRefresher { + return NewTokenRefresherWithCustomTiming(clientset, namespace, serviceAccountName, defaultCNITokenValiditySeconds, minTokenRetryDuration, defaultRefreshFraction) +} + +func NewTokenRefresherWithCustomTiming(clientset *kubernetes.Clientset, namespace string, serviceAccountName string, tokenValiditySeconds int64, minTokenRetryDuration time.Duration, defaultRefreshFraction time.Duration) *TokenRefresher { + return &TokenRefresher{ + tokenSupported: false, + tokenOnce: &sync.Once{}, + tokenValiditySeconds: tokenValiditySeconds, + minTokenRetryDuration: minTokenRetryDuration, + defaultRefreshFraction: defaultRefreshFraction, + clientset: clientset, + namespace: namespace, + serviceAccountName: serviceAccountName, + tokenChan: make(chan TokenUpdate), + } +} + +func (t *TokenRefresher) UpdateToken() (TokenUpdate, error) { + validity := t.tokenValiditySeconds + tr := &authenticationv1.TokenRequest{ + Spec: authenticationv1.TokenRequestSpec{ + Audiences: []string{"kubernetes"}, + ExpirationSeconds: &validity, + }, + } + + tokenRequest, err := t.clientset.CoreV1().ServiceAccounts(t.namespace).CreateToken(context.TODO(), t.serviceAccountName, tr, metav1.CreateOptions{}) + if apierrors.IsNotFound(err) && !t.tokenRequestSupported(t.clientset) { + logrus.WithError(err).Debug("Unable to create token for CNI kubeconfig as token request api is not supported, falling back to local service account token") + return tokenUpdateFromFile() + } + if err != nil { + logrus.WithError(err).Error("Unable to create token for CNI kubeconfig") + return tokenUpdateError(err) + } + + return TokenUpdate{ + Token: tokenRequest.Status.Token, + ExpirationTime: tokenRequest.Status.ExpirationTimestamp.Time, + }, nil +} + +func (t *TokenRefresher) TokenChan() <-chan TokenUpdate { + return t.tokenChan +} + +func (t *TokenRefresher) Run() { + var nextExpiration time.Time + rand := rand.New(rand.NewSource(time.Now().UnixNano())) for { - if err := watcher.Add(serviceaccountDirectory); err != nil { - // Error watching the file - retry - logrus.WithError(err).Error("Failed to watch Kubernetes serviceaccount files.") - time.Sleep(5 * time.Second) - continue + tu, err := t.UpdateToken() + if err != nil { + logrus.WithError(err).Error("Failed to update CNI token, retrying...") + // Reset nextExpiration to retry directly + nextExpiration = time.Time{} + } else { + nextExpiration = tu.ExpirationTime + t.tokenChan <- tu + } + now := time.Now() + var sleepTime time.Duration + // Do some basic rate limiting to prevent flooding the kube apiserver with requests + if nextExpiration.Before(now.Add(t.minTokenRetryDuration * t.defaultRefreshFraction)) { + sleepTime = t.minTokenRetryDuration + } else { + sleepTime = nextExpiration.Sub(now) / t.defaultRefreshFraction + } + jitter := rand.Float32() * float32(sleepTime) + sleepTime += time.Duration(jitter) + if logrus.IsLevelEnabled(logrus.DebugLevel) { + logrus.Debugf("Going to sleep for %s", sleepTime.String()) + } + time.Sleep(sleepTime) + } +} + +func (t *TokenRefresher) tokenRequestSupported(clientset *kubernetes.Clientset) bool { + t.tokenOnce.Do(func() { + resources, err := clientset.Discovery().ServerResourcesForGroupVersion("v1") + if err != nil { + return + } + for _, resource := range resources.APIResources { + if resource.Name == "serviceaccounts/token" { + t.tokenSupported = true + return + } } + }) + return t.tokenSupported +} + +func tokenUpdateError(err error) (TokenUpdate, error) { + return TokenUpdate{}, err +} + +func tokenUpdateFromFile() (TokenUpdate, error) { + tokenBytes, err := ioutil.ReadFile(tokenFile) + if err != nil { + logrus.WithError(err).Error("Failed to read service account token file") + return tokenUpdateError(err) + } + token := string(tokenBytes) + tokenSegments := strings.Split(token, ".") + if len(tokenSegments) != 3 { + err := fmt.Errorf("invalid token segment size: %d", len(tokenSegments)) + logrus.WithError(err).Error("Failed parsing service account token") + return tokenUpdateError(err) + } + unparsedClaims := tokenSegments[1] + // Padding may be missing, hence check and append it + if l := len(unparsedClaims) % 4; l > 0 { + unparsedClaims += strings.Repeat("=", 4-l) + } + decodedClaims, err := base64.URLEncoding.DecodeString(unparsedClaims) + if err != nil { + logrus.WithError(err).Error("Failed to decode service account token claims") + return tokenUpdateError(err) + } + var claimMap map[string]interface{} + err = json.Unmarshal(decodedClaims, &claimMap) + if err != nil { + logrus.WithError(err).Error("Failed to unmarshal service account token claims") + return tokenUpdateError(err) + } + return TokenUpdate{ + Token: token, + ExpirationTime: time.Unix(int64(claimMap["exp"].(float64)), 0), + }, nil +} - // Successfully watched the file. Break from the retry loop. - logrus.WithField("directory", serviceaccountDirectory).Info("Watching contents for changes.") - break +func Run() { + // Log to stdout. this prevents our logs from being interpreted as errors by, for example, + // fluentd's default configuration. + logrus.SetOutput(os.Stdout) + + clientset, err := InClusterClientSet() + if err != nil { + logrus.WithError(err).Fatal("Failed to create in cluster client set") } + tr := NewTokenRefresher(clientset, NamespaceOfUsedServiceAccount(), DefaultServiceAccountName) + tokenChan := tr.TokenChan() + go tr.Run() - // Handle events from the watcher. for { - // To prevent tight looping, add a sleep here. - time.Sleep(1 * time.Second) - select { - case event := <-watcher.Events: - // We've received a notification that the Kubernetes secrets files have changed. - // Update the kubeconfig file on disk to match. - logrus.WithField("event", event).Info("Notified of change to serviceaccount files.") + case tu := <-tokenChan: + logrus.Info("Update of CNI kubeconfig triggered based on elapsed time.") cfg, err := rest.InClusterConfig() if err != nil { logrus.WithError(err).Error("Error generating kube config.") @@ -61,17 +236,13 @@ func Run() { logrus.WithError(err).Error("Error loading TLS files.") continue } - writeKubeconfig(cfg) - - case err := <-watcher.Errors: - // We've received an error - log it out but don't exit. - logrus.WithError(err).Error("Error watching serviceaccount files.") + writeKubeconfig(cfg, tu.Token) } } } // writeKubeconfig writes an updated kubeconfig file to disk that the CNI plugin can use to access the Kubernetes API. -func writeKubeconfig(cfg *rest.Config) { +func writeKubeconfig(cfg *rest.Config, token string) { template := `# Kubeconfig file for Calico CNI plugin. Installed by calico/node. apiVersion: v1 kind: Config @@ -92,7 +263,7 @@ contexts: current-context: calico-context` // Replace the placeholders. - data := fmt.Sprintf(template, cfg.Host, base64.StdEncoding.EncodeToString(cfg.CAData), cfg.BearerToken) + data := fmt.Sprintf(template, cfg.Host, base64.StdEncoding.EncodeToString(cfg.CAData), token) // Write the filled out config to disk. if err := ioutil.WriteFile(kubeconfigPath, []byte(data), 0600); err != nil { diff --git a/node/pkg/cni/token_watch_suite_test.go b/node/pkg/cni/token_watch_suite_test.go new file mode 100644 index 00000000000..1f0a4080ccd --- /dev/null +++ b/node/pkg/cni/token_watch_suite_test.go @@ -0,0 +1,22 @@ +package cni_test + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "testing" + + "github.com/onsi/ginkgo/reporters" + + "github.com/projectcalico/calico/libcalico-go/lib/testutils" +) + +func init() { + testutils.HookLogrusForGinkgo() +} + +func TestCommands(t *testing.T) { + RegisterFailHandler(Fail) + junitReporter := reporters.NewJUnitReporter("../../report/cnitokenwatch_suite.xml") + RunSpecsWithDefaultAndCustomReporters(t, "CNITokenWatch Suite", []Reporter{junitReporter}) +} diff --git a/node/pkg/cni/token_watch_test.go b/node/pkg/cni/token_watch_test.go new file mode 100644 index 00000000000..1d64811a425 --- /dev/null +++ b/node/pkg/cni/token_watch_test.go @@ -0,0 +1,90 @@ +package cni_test + +import ( + "context" + "fmt" + "os" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/projectcalico/calico/node/pkg/cni" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" +) + +var _ = Describe("FV tests", func() { + const serviceAccountName = "cni-token-watch-test" + const namespace = metav1.NamespaceSystem + + It("should create a token successfully", func() { + clientset := createKubernetesClient() + setupServiceAccount(clientset, serviceAccountName, namespace) + defer cleanupServiceAccount(clientset, serviceAccountName, namespace) + + tr := cni.NewTokenRefresher(clientset, namespace, serviceAccountName) + tu, err := tr.UpdateToken() + Expect(err).ShouldNot(HaveOccurred()) + Expect(tu.Token).NotTo(BeEmpty()) + }) + + It("should create a token successfully and deliver it through the channel", func() { + clientset := createKubernetesClient() + setupServiceAccount(clientset, serviceAccountName, namespace) + defer cleanupServiceAccount(clientset, serviceAccountName, namespace) + + tr := cni.NewTokenRefresher(clientset, namespace, serviceAccountName) + tokenChan := tr.TokenChan() + go tr.Run() + tu := <-tokenChan + Expect(tu.Token).NotTo(BeEmpty()) + }) + + It("should create multiple tokens successfully and deliver them through the channel", func() { + const iterations = 5 + // kube-apiserver does not allow smaller validity periods than 10 minutes + const tokenValiditySeconds = 600 + + clientset := createKubernetesClient() + setupServiceAccount(clientset, serviceAccountName, namespace) + defer cleanupServiceAccount(clientset, serviceAccountName, namespace) + + tr := cni.NewTokenRefresherWithCustomTiming(clientset, namespace, serviceAccountName, tokenValiditySeconds, 1*time.Nanosecond, 600000) + tokenChan := tr.TokenChan() + go tr.Run() + for i := 0; i < iterations; i++ { + tu := <-tokenChan + Expect(tu.Token).NotTo(BeEmpty()) + } + }) +}) + +func createKubernetesClient() *kubernetes.Clientset { + kubeconfigPath := os.Getenv("KUBECONFIG") + kubeconfig, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath) + if err != nil { + Fail(fmt.Sprintf("Failed to create kubernetes config: %v", err)) + } + clientset, err := kubernetes.NewForConfig(kubeconfig) + if err != nil { + Fail(fmt.Sprintf("Could not create kubernetes client: %v", err)) + } + return clientset +} + +func setupServiceAccount(clientset *kubernetes.Clientset, name string, namespace string) { + serviceAccount := &v1.ServiceAccount{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + } + _, err := clientset.CoreV1().ServiceAccounts(namespace).Create(context.Background(), serviceAccount, metav1.CreateOptions{}) + Expect(err).ShouldNot(HaveOccurred()) +} + +func cleanupServiceAccount(clientset *kubernetes.Clientset, name string, namespace string) { + err := clientset.CoreV1().ServiceAccounts(namespace).Delete(context.Background(), name, metav1.DeleteOptions{}) + Expect(err).ShouldNot(HaveOccurred()) +} diff --git a/node/tests/k8st/infra/calico-kdd.yaml b/node/tests/k8st/infra/calico-kdd.yaml index 84935480eda..0cae02133e4 100644 --- a/node/tests/k8st/infra/calico-kdd.yaml +++ b/node/tests/k8st/infra/calico-kdd.yaml @@ -228,6 +228,14 @@ rules: - pods/status verbs: - patch + # Used for creating service account tokens to be used by the CNI plugin + - apiGroups: [""] + resources: + - serviceaccounts/token + resourceNames: + - calico-node + verbs: + - create # Calico monitors various CRDs for config. - apiGroups: ["crd.projectcalico.org"] resources: