-
Notifications
You must be signed in to change notification settings - Fork 1.4k
/
Copy pathtoken_watch.go
282 lines (253 loc) · 8.7 KB
/
token_watch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
package cni
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"math/rand"
"os"
"strings"
"sync"
"time"
"github.com/sirupsen/logrus"
authenticationv1 "k8s.io/api/authentication/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"github.com/projectcalico/calico/libcalico-go/lib/winutils"
)
const (
defaultServiceAccountName = "calico-cni-plugin"
serviceAccountNamespace = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
tokenFile = "/var/run/secrets/kubernetes.io/serviceaccount/token"
defaultCNITokenValiditySeconds = 24 * 60 * 60
minTokenRetryDuration = 5 * time.Second
defaultRefreshFraction = 4
kubeconfigPath = "/host/etc/cni/net.d/calico-kubeconfig"
)
type TokenRefresher struct {
tokenSupported bool
tokenOnce *sync.Once
tokenValiditySeconds int64
minTokenRetryDuration time.Duration
defaultRefreshFraction time.Duration
clientset *kubernetes.Clientset
namespace string
serviceAccountName string
tokenChan chan TokenUpdate
stopChan chan struct{}
}
type TokenUpdate struct {
Token string
ExpirationTime time.Time
}
func NamespaceOfUsedServiceAccount() string {
namespace, err := os.ReadFile(winutils.GetHostPath(serviceAccountNamespace))
if err != nil {
logrus.WithError(err).Fatal("Failed to read service account namespace file")
}
return string(namespace)
}
func BuildClientSet() (*kubernetes.Clientset, error) {
kubeconfig := os.Getenv("KUBECONFIG")
cfg, err := winutils.BuildConfigFromFlags("", kubeconfig)
logrus.WithFields(logrus.Fields{"KUBECONFIG": kubeconfig, "cfg": cfg}).Debug("running cni.BuildClientSet")
if err != nil {
return nil, err
}
return kubernetes.NewForConfig(cfg)
}
func NewTokenRefresher(clientset *kubernetes.Clientset, namespace string, serviceAccountName string) *TokenRefresher {
return NewTokenRefresherWithCustomTiming(clientset, namespace, serviceAccountName, defaultCNITokenValiditySeconds, minTokenRetryDuration, defaultRefreshFraction)
}
func NewTokenRefresherWithCustomTiming(clientset *kubernetes.Clientset, namespace string, serviceAccountName string, tokenValiditySeconds int64, minTokenRetryDuration time.Duration, defaultRefreshFraction time.Duration) *TokenRefresher {
return &TokenRefresher{
tokenSupported: false,
tokenOnce: &sync.Once{},
tokenValiditySeconds: tokenValiditySeconds,
minTokenRetryDuration: minTokenRetryDuration,
defaultRefreshFraction: defaultRefreshFraction,
clientset: clientset,
namespace: namespace,
serviceAccountName: serviceAccountName,
tokenChan: make(chan TokenUpdate),
stopChan: make(chan struct{}),
}
}
func (t *TokenRefresher) UpdateToken() (TokenUpdate, error) {
validity := t.tokenValiditySeconds
tr := &authenticationv1.TokenRequest{
Spec: authenticationv1.TokenRequestSpec{
Audiences: []string{},
ExpirationSeconds: &validity,
},
}
tokenRequest, err := t.clientset.CoreV1().ServiceAccounts(t.namespace).CreateToken(context.TODO(), t.serviceAccountName, tr, metav1.CreateOptions{})
if apierrors.IsNotFound(err) && !t.tokenRequestSupported(t.clientset) {
logrus.WithError(err).Debug("Unable to create token for CNI kubeconfig as token request api is not supported, falling back to local service account token")
return tokenUpdateFromFile()
}
if err != nil {
logrus.WithError(err).Error("Unable to create token for CNI kubeconfig")
return TokenUpdate{}, err
}
return TokenUpdate{
Token: tokenRequest.Status.Token,
ExpirationTime: tokenRequest.Status.ExpirationTimestamp.Time,
}, nil
}
func (t *TokenRefresher) TokenChan() <-chan TokenUpdate {
return t.tokenChan
}
func (t *TokenRefresher) Stop() {
close(t.stopChan)
}
func (t *TokenRefresher) Run() {
var nextExpiration time.Time
rand := rand.New(rand.NewSource(time.Now().UnixNano()))
for {
tu, err := t.UpdateToken()
if err != nil {
logrus.WithError(err).Error("Failed to update CNI token, retrying...")
// Reset nextExpiration to retry directly
nextExpiration = time.Time{}
} else {
nextExpiration = tu.ExpirationTime
select {
case t.tokenChan <- tu:
case <-t.stopChan:
return
}
}
now := time.Now()
var sleepTime time.Duration
// Do some basic rate limiting to prevent flooding the kube apiserver with requests
if nextExpiration.Before(now.Add(t.minTokenRetryDuration * t.defaultRefreshFraction)) {
sleepTime = t.minTokenRetryDuration
} else {
sleepTime = nextExpiration.Sub(now) / t.defaultRefreshFraction
}
jitter := rand.Float32() * float32(sleepTime)
sleepTime += time.Duration(jitter)
if logrus.IsLevelEnabled(logrus.DebugLevel) {
logrus.Debugf("Going to sleep for %s", sleepTime.String())
}
select {
case <-time.After(sleepTime):
case <-t.stopChan:
return
}
}
}
func (t *TokenRefresher) tokenRequestSupported(clientset *kubernetes.Clientset) bool {
t.tokenOnce.Do(func() {
resources, err := clientset.Discovery().ServerResourcesForGroupVersion("v1")
if err != nil {
return
}
for _, resource := range resources.APIResources {
if resource.Name == "serviceaccounts/token" {
t.tokenSupported = true
return
}
}
})
return t.tokenSupported
}
func tokenUpdateFromFile() (TokenUpdate, error) {
tokenBytes, err := os.ReadFile(winutils.GetHostPath(tokenFile))
if err != nil {
logrus.WithError(err).Error("Failed to read service account token file")
return TokenUpdate{}, err
}
token := string(tokenBytes)
tokenSegments := strings.Split(token, ".")
if len(tokenSegments) != 3 {
err := fmt.Errorf("invalid token segment size: %d", len(tokenSegments))
logrus.WithError(err).Error("Failed parsing service account token")
return TokenUpdate{}, err
}
unparsedClaims := tokenSegments[1]
// Padding may be missing, hence check and append it
if l := len(unparsedClaims) % 4; l > 0 {
unparsedClaims += strings.Repeat("=", 4-l)
}
decodedClaims, err := base64.URLEncoding.DecodeString(unparsedClaims)
if err != nil {
logrus.WithError(err).Error("Failed to decode service account token claims")
return TokenUpdate{}, err
}
var claimMap map[string]interface{}
err = json.Unmarshal(decodedClaims, &claimMap)
if err != nil {
logrus.WithError(err).Error("Failed to unmarshal service account token claims")
return TokenUpdate{}, err
}
return TokenUpdate{
Token: token,
ExpirationTime: time.Unix(int64(claimMap["exp"].(float64)), 0),
}, nil
}
func Run() {
clientset, err := BuildClientSet()
if err != nil {
logrus.WithError(err).Fatal("Failed to create in cluster client set")
}
tr := NewTokenRefresher(clientset, NamespaceOfUsedServiceAccount(), CNIServiceAccountName())
tokenChan := tr.TokenChan()
go tr.Run()
for tu := range tokenChan {
logrus.Info("Update of CNI kubeconfig triggered based on elapsed time.")
kubeconfig := os.Getenv("KUBECONFIG")
cfg, err := winutils.BuildConfigFromFlags("", kubeconfig)
if err != nil {
logrus.WithError(err).Error("Error generating kube config.")
continue
}
err = rest.LoadTLSFiles(cfg)
if err != nil {
logrus.WithError(err).Error("Error loading TLS files.")
continue
}
writeKubeconfig(cfg, tu.Token)
}
}
// CNIServiceAccountName returns the name of the serviceaccount to use for the CNI plugin token request.
// This can be set via the CALICO_CNI_SERVICE_ACCOUNT environment variable, and defaults to "calico-cni-plugin" (on Linux, "calico-cni-plugin-windows" on Windows) otherwise.
func CNIServiceAccountName() string {
if sa := os.Getenv("CALICO_CNI_SERVICE_ACCOUNT"); sa != "" {
logrus.WithField("name", sa).Debug("Using service account from CALICO_CNI_SERVICE_ACCOUNT")
return sa
}
return defaultServiceAccountName
}
// writeKubeconfig writes an updated kubeconfig file to disk that the CNI plugin can use to access the Kubernetes API.
func writeKubeconfig(cfg *rest.Config, token string) {
template := `# Kubeconfig file for Calico CNI plugin. Installed by calico/node.
apiVersion: v1
kind: Config
clusters:
- name: local
cluster:
server: %s
certificate-authority-data: "%s"
users:
- name: calico
user:
token: %s
contexts:
- name: calico-context
context:
cluster: local
user: calico
current-context: calico-context`
// Replace the placeholders.
data := fmt.Sprintf(template, cfg.Host, base64.StdEncoding.EncodeToString(cfg.CAData), token)
// Write the filled out config to disk.
if err := os.WriteFile(winutils.GetHostPath(kubeconfigPath), []byte(data), 0600); err != nil {
logrus.WithError(err).Error("Failed to write CNI plugin kubeconfig file")
return
}
logrus.WithField("path", winutils.GetHostPath(kubeconfigPath)).Info("Wrote updated CNI kubeconfig file.")
}