Skip to content

Commit

Permalink
Use TokenRequest API instead of calico-nodes service account token fo…
Browse files Browse the repository at this point in the history
…r CNI kubeconfig.

With projected service account tokens in kubernetes, the service account tokens of pods
are bound to the lifetime of the corresponding pod. Therefore, it may lead to problems
if an external process re-uses the token of a pod.
The CNI binaries used the token of calico-node. However, in case calico-node got stopped
the corresponding token lost its validity and hence could no longer be used for CNI
operations. Usually, this automatically resolves over time, but there are some edge cases
where this is not possible, e.g. if calico-node is autoscaled in terms of resources and
the new resource requests would require preemption/eviction of an existing pod the CNI
operation to delete the network sandbox will fail due to the no longer valid token
(as calico-node was stopped beforehand).
This change switches over to using the TokenRequest API instead, i.e. creating new
tokens with limited validity. It would have been good to bind the token to an object,
e.g. to the corresponding node, but as of now only secret and pod are supported types
for binding tokens. Hence, the tokens are only limited in time and not bound to any
other kubernetes object.
  • Loading branch information
ScheererJ committed Apr 28, 2022
1 parent de497af commit af199c8
Show file tree
Hide file tree
Showing 9 changed files with 360 additions and 36 deletions.
7 changes: 7 additions & 0 deletions app-policy/config/install/05-calico.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ rules:
- get
- list
- watch
- apiGroups: [""]
resources:
- serviceaccounts/token
resourceNames:
- calico-node
verbs:
- create
---

apiVersion: rbac.authorization.k8s.io/v1beta1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ rules:
- pods/status
verbs:
- patch
# Used for creating service account tokens to be used by the CNI plugin
- apiGroups: [""]
resources:
- serviceaccounts/token
resourceNames:
- calico-node
verbs:
- create
# Calico monitors various CRDs for config.
- apiGroups: ["crd.projectcalico.org"]
resources:
Expand Down
8 changes: 8 additions & 0 deletions calico/getting-started/kubernetes/hardway/install-node.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,14 @@ rules:
- pods/status
verbs:
- patch
# Used for creating service account tokens to be used by the CNI plugin
- apiGroups: [""]
resources:
- serviceaccounts/token
resourceNames:
- calico-node
verbs:
- create
# Calico monitors various CRDs for config.
- apiGroups: ["crd.projectcalico.org"]
resources:
Expand Down
12 changes: 11 additions & 1 deletion cni-plugin/pkg/install/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/client-go/rest"

"github.com/projectcalico/calico/libcalico-go/lib/names"
"github.com/projectcalico/calico/node/pkg/cni"
)

type config struct {
Expand Down Expand Up @@ -466,7 +467,16 @@ contexts:
user: calico
current-context: calico-context`

data = strings.Replace(data, "TOKEN", kubecfg.BearerToken, 1)
clientset, err := cni.InClusterClientSet()
if err != nil {
logrus.WithError(err).Fatal("Unable to create client for generating CNI token")
}
tr := cni.NewTokenRefresher(clientset, cni.NamespaceOfUsedServiceAccount(), cni.DefaultServiceAccountName)
tu, err := tr.UpdateToken()
if err != nil {
logrus.WithError(err).Fatal("Unable to create token for CNI kubeconfig")
}
data = strings.Replace(data, "TOKEN", tu.Token, 1)
data = strings.Replace(data, "__KUBERNETES_SERVICE_PROTOCOL__", getEnv("KUBERNETES_SERVICE_PROTOCOL", "https"), -1)
data = strings.Replace(data, "__KUBERNETES_SERVICE_HOST__", getEnv("KUBERNETES_SERVICE_HOST", ""), -1)
data = strings.Replace(data, "__KUBERNETES_SERVICE_PORT__", getEnv("KUBERNETES_SERVICE_PORT", ""), -1)
Expand Down
2 changes: 1 addition & 1 deletion node/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ fv: run-k8s-apiserver
-v $(CERTS_PATH):/home/user/certs \
-e KUBECONFIG=/go/src/github.com/projectcalico/calico/hack/test/certs/kubeconfig \
-e ETCD_ENDPOINTS=http://$(LOCAL_IP_ENV):2379 \
$(CALICO_BUILD) ginkgo -cover -r -skipPackage vendor pkg/lifecycle/startup pkg/allocateip $(GINKGO_ARGS)
$(CALICO_BUILD) ginkgo -cover -r -skipPackage vendor pkg/lifecycle/startup pkg/allocateip pkg/cni $(GINKGO_ARGS)

# Skip packages containing FV tests.
UT_PACKAGES_TO_SKIP?=pkg/lifecycle/startup,pkg/allocateip,pkg/status
Expand Down
239 changes: 205 additions & 34 deletions node/pkg/cni/token_watch.go
Original file line number Diff line number Diff line change
@@ -1,56 +1,231 @@
package cni

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
"math/rand"
"os"
"strings"
"sync"
"time"

"github.com/sirupsen/logrus"
"gopkg.in/fsnotify/fsnotify.v1"
authenticationv1 "k8s.io/api/authentication/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

var serviceaccountDirectory string = "/var/run/secrets/kubernetes.io/serviceaccount/"
const DefaultServiceAccountName = "calico-node"
const serviceAccountNamespace = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
const tokenFile = "/var/run/secrets/kubernetes.io/serviceaccount/token"
const defaultCNITokenValiditySeconds = 24 * 60 * 60
const minTokenRetryDuration = 5 * time.Second
const defaultRefreshFraction = 4

var kubeconfigPath string = "/host/etc/cni/net.d/calico-kubeconfig"

func Run() {
// Log to stdout. this prevents our logs from being interpreted as errors by, for example,
// fluentd's default configuration.
logrus.SetOutput(os.Stdout)
type TokenRefresher struct {
tokenSupported bool
tokenOnce *sync.Once

// Create a watcher for file changes.
watcher, err := fsnotify.NewWatcher()
tokenValiditySeconds int64
minTokenRetryDuration time.Duration
defaultRefreshFraction time.Duration

clientset *kubernetes.Clientset

namespace string
serviceAccountName string

tokenChan chan TokenUpdate
}

type TokenUpdate struct {
Token string
ExpirationTime time.Time
}

func NamespaceOfUsedServiceAccount() string {
namespace, err := ioutil.ReadFile(serviceAccountNamespace)
if err != nil {
panic(err)
logrus.WithError(err).Fatal("Failed to read service account namespace file")
}
defer watcher.Close()
return string(namespace)
}

// Watch for changes to the serviceaccount directory. Rety if necessary.
func InClusterClientSet() (*kubernetes.Clientset, error) {
cfg, err := rest.InClusterConfig()
if err != nil {
logrus.WithError(err).Error("Error generating kube config.")
return nil, err
}
err = rest.LoadTLSFiles(cfg)
if err != nil {
logrus.WithError(err).Error("Error loading TLS files.")
return nil, err
}
clientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
logrus.WithError(err).Error("Failed to create clientset for CNI kubeconfig")
return nil, err
}
return clientset, nil
}

func NewTokenRefresher(clientset *kubernetes.Clientset, namespace string, serviceAccountName string) *TokenRefresher {
return NewTokenRefresherWithCustomTiming(clientset, namespace, serviceAccountName, defaultCNITokenValiditySeconds, minTokenRetryDuration, defaultRefreshFraction)
}

func NewTokenRefresherWithCustomTiming(clientset *kubernetes.Clientset, namespace string, serviceAccountName string, tokenValiditySeconds int64, minTokenRetryDuration time.Duration, defaultRefreshFraction time.Duration) *TokenRefresher {
return &TokenRefresher{
tokenSupported: false,
tokenOnce: &sync.Once{},
tokenValiditySeconds: tokenValiditySeconds,
minTokenRetryDuration: minTokenRetryDuration,
defaultRefreshFraction: defaultRefreshFraction,
clientset: clientset,
namespace: namespace,
serviceAccountName: serviceAccountName,
tokenChan: make(chan TokenUpdate),
}
}

func (t *TokenRefresher) UpdateToken() (TokenUpdate, error) {
validity := t.tokenValiditySeconds
tr := &authenticationv1.TokenRequest{
Spec: authenticationv1.TokenRequestSpec{
Audiences: []string{"kubernetes"},
ExpirationSeconds: &validity,
},
}

tokenRequest, err := t.clientset.CoreV1().ServiceAccounts(t.namespace).CreateToken(context.TODO(), t.serviceAccountName, tr, metav1.CreateOptions{})
if apierrors.IsNotFound(err) && !t.tokenRequestSupported(t.clientset) {
logrus.WithError(err).Debug("Unable to create token for CNI kubeconfig as token request api is not supported, falling back to local service account token")
return tokenUpdateFromFile()
}
if err != nil {
logrus.WithError(err).Error("Unable to create token for CNI kubeconfig")
return tokenUpdateError(err)
}

return TokenUpdate{
Token: tokenRequest.Status.Token,
ExpirationTime: tokenRequest.Status.ExpirationTimestamp.Time,
}, nil
}

func (t *TokenRefresher) TokenChan() <-chan TokenUpdate {
return t.tokenChan
}

func (t *TokenRefresher) Run() {
var nextExpiration time.Time
rand := rand.New(rand.NewSource(time.Now().UnixNano()))
for {
if err := watcher.Add(serviceaccountDirectory); err != nil {
// Error watching the file - retry
logrus.WithError(err).Error("Failed to watch Kubernetes serviceaccount files.")
time.Sleep(5 * time.Second)
continue
tu, err := t.UpdateToken()
if err != nil {
logrus.WithError(err).Error("Failed to update CNI token, retrying...")
// Reset nextExpiration to retry directly
nextExpiration = time.Time{}
} else {
nextExpiration = tu.ExpirationTime
t.tokenChan <- tu
}
now := time.Now()
var sleepTime time.Duration
// Do some basic rate limiting to prevent flooding the kube apiserver with requests
if nextExpiration.Before(now.Add(t.minTokenRetryDuration * t.defaultRefreshFraction)) {
sleepTime = t.minTokenRetryDuration
} else {
sleepTime = nextExpiration.Sub(now) / t.defaultRefreshFraction
}
jitter := rand.Float32() * float32(sleepTime)
sleepTime += time.Duration(jitter)
if logrus.IsLevelEnabled(logrus.DebugLevel) {
logrus.Debugf("Going to sleep for %s", sleepTime.String())
}
time.Sleep(sleepTime)
}
}

func (t *TokenRefresher) tokenRequestSupported(clientset *kubernetes.Clientset) bool {
t.tokenOnce.Do(func() {
resources, err := clientset.Discovery().ServerResourcesForGroupVersion("v1")
if err != nil {
return
}
for _, resource := range resources.APIResources {
if resource.Name == "serviceaccounts/token" {
t.tokenSupported = true
return
}
}
})
return t.tokenSupported
}

func tokenUpdateError(err error) (TokenUpdate, error) {
return TokenUpdate{}, err
}

func tokenUpdateFromFile() (TokenUpdate, error) {
tokenBytes, err := ioutil.ReadFile(tokenFile)
if err != nil {
logrus.WithError(err).Error("Failed to read service account token file")
return tokenUpdateError(err)
}
token := string(tokenBytes)
tokenSegments := strings.Split(token, ".")
if len(tokenSegments) != 3 {
err := fmt.Errorf("invalid token segment size: %d", len(tokenSegments))
logrus.WithError(err).Error("Failed parsing service account token")
return tokenUpdateError(err)
}
unparsedClaims := tokenSegments[1]
// Padding may be missing, hence check and append it
if l := len(unparsedClaims) % 4; l > 0 {
unparsedClaims += strings.Repeat("=", 4-l)
}
decodedClaims, err := base64.URLEncoding.DecodeString(unparsedClaims)
if err != nil {
logrus.WithError(err).Error("Failed to decode service account token claims")
return tokenUpdateError(err)
}
var claimMap map[string]interface{}
err = json.Unmarshal(decodedClaims, &claimMap)
if err != nil {
logrus.WithError(err).Error("Failed to unmarshal service account token claims")
return tokenUpdateError(err)
}
return TokenUpdate{
Token: token,
ExpirationTime: time.Unix(int64(claimMap["exp"].(float64)), 0),
}, nil
}

// Successfully watched the file. Break from the retry loop.
logrus.WithField("directory", serviceaccountDirectory).Info("Watching contents for changes.")
break
func Run() {
// Log to stdout. this prevents our logs from being interpreted as errors by, for example,
// fluentd's default configuration.
logrus.SetOutput(os.Stdout)

clientset, err := InClusterClientSet()
if err != nil {
logrus.WithError(err).Fatal("Failed to create in cluster client set")
}
tr := NewTokenRefresher(clientset, NamespaceOfUsedServiceAccount(), DefaultServiceAccountName)
tokenChan := tr.TokenChan()
go tr.Run()

// Handle events from the watcher.
for {
// To prevent tight looping, add a sleep here.
time.Sleep(1 * time.Second)

select {
case event := <-watcher.Events:
// We've received a notification that the Kubernetes secrets files have changed.
// Update the kubeconfig file on disk to match.
logrus.WithField("event", event).Info("Notified of change to serviceaccount files.")
case tu := <-tokenChan:
logrus.Info("Update of CNI kubeconfig triggered based on elapsed time.")
cfg, err := rest.InClusterConfig()
if err != nil {
logrus.WithError(err).Error("Error generating kube config.")
Expand All @@ -61,17 +236,13 @@ func Run() {
logrus.WithError(err).Error("Error loading TLS files.")
continue
}
writeKubeconfig(cfg)

case err := <-watcher.Errors:
// We've received an error - log it out but don't exit.
logrus.WithError(err).Error("Error watching serviceaccount files.")
writeKubeconfig(cfg, tu.Token)
}
}
}

// writeKubeconfig writes an updated kubeconfig file to disk that the CNI plugin can use to access the Kubernetes API.
func writeKubeconfig(cfg *rest.Config) {
func writeKubeconfig(cfg *rest.Config, token string) {
template := `# Kubeconfig file for Calico CNI plugin. Installed by calico/node.
apiVersion: v1
kind: Config
Expand All @@ -92,7 +263,7 @@ contexts:
current-context: calico-context`

// Replace the placeholders.
data := fmt.Sprintf(template, cfg.Host, base64.StdEncoding.EncodeToString(cfg.CAData), cfg.BearerToken)
data := fmt.Sprintf(template, cfg.Host, base64.StdEncoding.EncodeToString(cfg.CAData), token)

// Write the filled out config to disk.
if err := ioutil.WriteFile(kubeconfigPath, []byte(data), 0600); err != nil {
Expand Down
22 changes: 22 additions & 0 deletions node/pkg/cni/token_watch_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package cni_test

import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

"testing"

"github.com/onsi/ginkgo/reporters"

"github.com/projectcalico/calico/libcalico-go/lib/testutils"
)

func init() {
testutils.HookLogrusForGinkgo()
}

func TestCommands(t *testing.T) {
RegisterFailHandler(Fail)
junitReporter := reporters.NewJUnitReporter("../../report/cnitokenwatch_suite.xml")
RunSpecsWithDefaultAndCustomReporters(t, "CNITokenWatch Suite", []Reporter{junitReporter})
}
Loading

0 comments on commit af199c8

Please sign in to comment.