Skip to content

Commit

Permalink
Use TokenRequest API instead of calico-nodes service account token fo…
Browse files Browse the repository at this point in the history
…r CNI kubeconfig.

With projected service account tokens in kubernetes, the service account tokens of pods
are bound to the lifetime of the corresponding pod. Therefore, it may lead to problems
if an external process re-uses the token of a pod.
The CNI binaries used the token of calico-node. However, in case calico-node got stopped
the corresponding token lost its validity and hence could no longer be used for CNI
operations. Usually, this automatically resolves over time, but there are some edge cases
where this is not possible, e.g. if calico-node is autoscaled in terms of resources and
the new resource requests would require preemption/eviction of an existing pod the CNI
operation to delete the network sandbox will fail due to the no longer valid token
(as calico-node was stopped beforehand).
This change switches over to using the TokenRequest API instead, i.e. creating new
tokens with limited validity. It would have been good to bind the token to an object,
e.g. to the corresponding node, but as of now only secret and pod are supported types
for binding tokens. Hence, the tokens are only limited in time and not bound to any
other kubernetes object.
  • Loading branch information
ScheererJ committed Apr 27, 2022
1 parent de497af commit e375136
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 35 deletions.
7 changes: 7 additions & 0 deletions app-policy/config/install/05-calico.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ rules:
- get
- list
- watch
- apiGroups: [""]
resources:
- serviceaccounts/token
resourceNames:
- calico-node
verbs:
- create
---

apiVersion: rbac.authorization.k8s.io/v1beta1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ rules:
- pods/status
verbs:
- patch
# Used for creating service account tokens to be used by the CNI plugin
- apiGroups: [""]
resources:
- serviceaccounts/token
resourceNames:
- calico-node
verbs:
- create
# Calico monitors various CRDs for config.
- apiGroups: ["crd.projectcalico.org"]
resources:
Expand Down
8 changes: 8 additions & 0 deletions calico/getting-started/kubernetes/hardway/install-node.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,14 @@ rules:
- pods/status
verbs:
- patch
# Used for creating service account tokens to be used by the CNI plugin
- apiGroups: [""]
resources:
- serviceaccounts/token
resourceNames:
- calico-node
verbs:
- create
# Calico monitors various CRDs for config.
- apiGroups: ["crd.projectcalico.org"]
resources:
Expand Down
12 changes: 11 additions & 1 deletion cni-plugin/pkg/install/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/client-go/rest"

"github.com/projectcalico/calico/libcalico-go/lib/names"
"github.com/projectcalico/calico/node/pkg/cni"
)

type config struct {
Expand Down Expand Up @@ -466,7 +467,16 @@ contexts:
user: calico
current-context: calico-context`

data = strings.Replace(data, "TOKEN", kubecfg.BearerToken, 1)
clientset, err := cni.InClusterClientSet()
if err != nil {
logrus.WithError(err).Fatal("Unable to create client for generating CNI token")
}
tr := cni.NewTokenRefresher(clientset, cni.NamespaceOfUsedServiceAccount(), cni.DefaultServiceAccountName)
tu, err := tr.UpdateToken()
if err != nil {
logrus.WithError(err).Fatal("Unable to create token for CNI kubeconfig")
}
data = strings.Replace(data, "TOKEN", tu.Token, 1)
data = strings.Replace(data, "__KUBERNETES_SERVICE_PROTOCOL__", getEnv("KUBERNETES_SERVICE_PROTOCOL", "https"), -1)
data = strings.Replace(data, "__KUBERNETES_SERVICE_HOST__", getEnv("KUBERNETES_SERVICE_HOST", ""), -1)
data = strings.Replace(data, "__KUBERNETES_SERVICE_PORT__", getEnv("KUBERNETES_SERVICE_PORT", ""), -1)
Expand Down
225 changes: 191 additions & 34 deletions node/pkg/cni/token_watch.go
Original file line number Diff line number Diff line change
@@ -1,56 +1,217 @@
package cni

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
"math/rand"
"os"
"strings"
"sync"
"time"

"github.com/sirupsen/logrus"
"gopkg.in/fsnotify/fsnotify.v1"
authenticationv1 "k8s.io/api/authentication/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

var serviceaccountDirectory string = "/var/run/secrets/kubernetes.io/serviceaccount/"
const DefaultServiceAccountName = "calico-node"
const serviceAccountNamespace = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
const tokenFile = "/var/run/secrets/kubernetes.io/serviceaccount/token"
const defaultCNITokenValiditySeconds = 3600
const minTokenRetryDuration = 5 * time.Second
const defaultRefreshFraction = 4

var kubeconfigPath string = "/host/etc/cni/net.d/calico-kubeconfig"

func Run() {
// Log to stdout. this prevents our logs from being interpreted as errors by, for example,
// fluentd's default configuration.
logrus.SetOutput(os.Stdout)
type TokenRefresher struct {
tokenSupported bool
tokenOnce *sync.Once

clientset *kubernetes.Clientset

namespace string
serviceAccountName string

tokenChan chan TokenUpdate
}

type TokenUpdate struct {
Token string
ExpirationTime time.Time
}

func NamespaceOfUsedServiceAccount() string {
namespace, err := ioutil.ReadFile(serviceAccountNamespace)
if err != nil {
logrus.WithError(err).Fatal("Failed to read service account namespace file")
}
return string(namespace)
}

// Create a watcher for file changes.
watcher, err := fsnotify.NewWatcher()
func InClusterClientSet() (*kubernetes.Clientset, error) {
cfg, err := rest.InClusterConfig()
if err != nil {
panic(err)
logrus.WithError(err).Error("Error generating kube config.")
return nil, err
}
defer watcher.Close()
err = rest.LoadTLSFiles(cfg)
if err != nil {
logrus.WithError(err).Error("Error loading TLS files.")
return nil, err
}
clientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
logrus.WithError(err).Error("Failed to create clientset for CNI kubeconfig")
return nil, err
}
return clientset, nil
}

func NewTokenRefresher(clientset *kubernetes.Clientset, namespace string, serviceAccountName string) *TokenRefresher {
return &TokenRefresher{
tokenSupported: false,
tokenOnce: &sync.Once{},
clientset: clientset,
namespace: namespace,
serviceAccountName: serviceAccountName,
tokenChan: make(chan TokenUpdate),
}
}

func (t *TokenRefresher) UpdateToken() (TokenUpdate, error) {
validity := int64(defaultCNITokenValiditySeconds)
tr := &authenticationv1.TokenRequest{
Spec: authenticationv1.TokenRequestSpec{
Audiences: []string{"kubernetes"},
ExpirationSeconds: &validity,
},
}

tokenRequest, err := t.clientset.CoreV1().ServiceAccounts(t.namespace).CreateToken(context.TODO(), t.serviceAccountName, tr, metav1.CreateOptions{})
if apierrors.IsNotFound(err) && !t.tokenRequestSupported(t.clientset) {
logrus.WithError(err).Debug("Unable to create token for CNI kubeconfig as token request api is not supported, falling back to local service account token")
return tokenUpdateFromFile()
}
if err != nil {
logrus.WithError(err).Error("Unable to create token for CNI kubeconfig")
return tokenUpdateError(err)
}

return TokenUpdate{
Token: tokenRequest.Status.Token,
ExpirationTime: tokenRequest.Status.ExpirationTimestamp.Time,
}, nil
}

func (t *TokenRefresher) TokenChan() <-chan TokenUpdate {
return t.tokenChan
}

// Watch for changes to the serviceaccount directory. Rety if necessary.
func (t *TokenRefresher) Run() {
var nextExpiration time.Time
rand := rand.New(rand.NewSource(time.Now().UnixNano()))
for {
if err := watcher.Add(serviceaccountDirectory); err != nil {
// Error watching the file - retry
logrus.WithError(err).Error("Failed to watch Kubernetes serviceaccount files.")
time.Sleep(5 * time.Second)
continue
tu, err := t.UpdateToken()
if err != nil {
logrus.WithError(err).Error("Failed to update CNI token, retrying...")
// Reset nextExpiration to retry directly
nextExpiration = time.Time{}
} else {
nextExpiration = tu.ExpirationTime
t.tokenChan <- tu
}
now := time.Now()
var sleepTime time.Duration
// Do some basic rate limiting to prevent flooding the kube apiserver with requests
if nextExpiration.Before(now.Add(minTokenRetryDuration * defaultRefreshFraction)) {
sleepTime = minTokenRetryDuration
} else {
sleepTime = nextExpiration.Sub(now) / defaultRefreshFraction
}
jitter := rand.Float32() * float32(sleepTime)
sleepTime += time.Duration(jitter)
if logrus.IsLevelEnabled(logrus.DebugLevel) {
logrus.Debugf("Going to sleep for %s", sleepTime.String())
}
time.Sleep(sleepTime)
}
}

// Successfully watched the file. Break from the retry loop.
logrus.WithField("directory", serviceaccountDirectory).Info("Watching contents for changes.")
break
func (t *TokenRefresher) tokenRequestSupported(clientset *kubernetes.Clientset) bool {
t.tokenOnce.Do(func() {
resources, err := clientset.Discovery().ServerResourcesForGroupVersion("v1")
if err != nil {
return
}
for _, resource := range resources.APIResources {
if resource.Name == "serviceaccounts/token" {
t.tokenSupported = true
return
}
}
})
return t.tokenSupported
}

func tokenUpdateError(err error) (TokenUpdate, error) {
return TokenUpdate{}, err
}

func tokenUpdateFromFile() (TokenUpdate, error) {
tokenBytes, err := ioutil.ReadFile(tokenFile)
if err != nil {
logrus.WithError(err).Error("Failed to read service account token file")
return tokenUpdateError(err)
}
token := string(tokenBytes)
tokenSegments := strings.Split(token, ".")
if len(tokenSegments) != 3 {
err := fmt.Errorf("invalid token segment size: %d", len(tokenSegments))
logrus.WithError(err).Error("Failed parsing service account token")
return tokenUpdateError(err)
}
unparsedClaims := tokenSegments[1]
// Padding may be missing, hence check and append it
if l := len(unparsedClaims) % 4; l > 0 {
unparsedClaims += strings.Repeat("=", 4-l)
}
decodedClaims, err := base64.URLEncoding.DecodeString(unparsedClaims)
if err != nil {
logrus.WithError(err).Error("Failed to decode service account token claims")
return tokenUpdateError(err)
}
var claimMap map[string]interface{}
err = json.Unmarshal(decodedClaims, &claimMap)
if err != nil {
logrus.WithError(err).Error("Failed to unmarshal service account token claims")
return tokenUpdateError(err)
}
return TokenUpdate{
Token: token,
ExpirationTime: time.Unix(int64(claimMap["exp"].(float64)), 0),
}, nil
}

// Handle events from the watcher.
for {
// To prevent tight looping, add a sleep here.
time.Sleep(1 * time.Second)
func Run() {
// Log to stdout. this prevents our logs from being interpreted as errors by, for example,
// fluentd's default configuration.
logrus.SetOutput(os.Stdout)

clientset, _ := InClusterClientSet()
tr := NewTokenRefresher(clientset, NamespaceOfUsedServiceAccount(), DefaultServiceAccountName)
tokenChan := tr.TokenChan()
go tr.Run()

for {
select {
case event := <-watcher.Events:
// We've received a notification that the Kubernetes secrets files have changed.
// Update the kubeconfig file on disk to match.
logrus.WithField("event", event).Info("Notified of change to serviceaccount files.")
case tu := <-tokenChan:
logrus.Info("Update of CNI kubeconfig triggered based on elapsed time.")
cfg, err := rest.InClusterConfig()
if err != nil {
logrus.WithError(err).Error("Error generating kube config.")
Expand All @@ -61,17 +222,13 @@ func Run() {
logrus.WithError(err).Error("Error loading TLS files.")
continue
}
writeKubeconfig(cfg)

case err := <-watcher.Errors:
// We've received an error - log it out but don't exit.
logrus.WithError(err).Error("Error watching serviceaccount files.")
writeKubeconfig(cfg, tu.Token)
}
}
}

// writeKubeconfig writes an updated kubeconfig file to disk that the CNI plugin can use to access the Kubernetes API.
func writeKubeconfig(cfg *rest.Config) {
func writeKubeconfig(cfg *rest.Config, token string) {
template := `# Kubeconfig file for Calico CNI plugin. Installed by calico/node.
apiVersion: v1
kind: Config
Expand All @@ -92,7 +249,7 @@ contexts:
current-context: calico-context`

// Replace the placeholders.
data := fmt.Sprintf(template, cfg.Host, base64.StdEncoding.EncodeToString(cfg.CAData), cfg.BearerToken)
data := fmt.Sprintf(template, cfg.Host, base64.StdEncoding.EncodeToString(cfg.CAData), token)

// Write the filled out config to disk.
if err := ioutil.WriteFile(kubeconfigPath, []byte(data), 0600); err != nil {
Expand Down
8 changes: 8 additions & 0 deletions node/tests/k8st/infra/calico-kdd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,14 @@ rules:
- pods/status
verbs:
- patch
# Used for creating service account tokens to be used by the CNI plugin
- apiGroups: [""]
resources:
- serviceaccounts/token
resourceNames:
- calico-node
verbs:
- create
# Calico monitors various CRDs for config.
- apiGroups: ["crd.projectcalico.org"]
resources:
Expand Down

0 comments on commit e375136

Please sign in to comment.