-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
upgrader.go
545 lines (458 loc) · 20 KB
/
upgrader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cluster
import (
"context"
"sort"
"time"
"github.com/pkg/errors"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/version"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
clusterctlv1 "sigs.k8s.io/cluster-api/cmd/clusterctl/api/v1alpha3"
"sigs.k8s.io/cluster-api/cmd/clusterctl/client/config"
"sigs.k8s.io/cluster-api/cmd/clusterctl/client/repository"
logf "sigs.k8s.io/cluster-api/cmd/clusterctl/log"
)
// ProviderUpgrader defines methods for supporting provider upgrade.
type ProviderUpgrader interface {
// Plan returns a set of suggested Upgrade plans for the management cluster, and more specifically:
// - Upgrade to the latest version in the the v1alpha3 series: ....
// - Upgrade to the latest version in the the v1alpha4 series: ....
Plan() ([]UpgradePlan, error)
// ApplyPlan executes an upgrade following an UpgradePlan generated by clusterctl.
ApplyPlan(opts UpgradeOptions, clusterAPIVersion string) error
// ApplyCustomPlan plan executes an upgrade using the UpgradeItems provided by the user.
ApplyCustomPlan(opts UpgradeOptions, providersToUpgrade ...UpgradeItem) error
}
// UpgradePlan defines a list of possible upgrade targets for a management cluster.
type UpgradePlan struct {
Contract string
Providers []UpgradeItem
}
// UpgradeOptions defines the options used to upgrade installation.
type UpgradeOptions struct {
WaitProviders bool
WaitProviderTimeout time.Duration
}
// isPartialUpgrade returns true if at least one upgradeItem in the plan does not have a target version.
func (u *UpgradePlan) isPartialUpgrade() bool {
for _, i := range u.Providers {
if i.NextVersion == "" {
return true
}
}
return false
}
// UpgradeItem defines a possible upgrade target for a provider in the management cluster.
type UpgradeItem struct {
clusterctlv1.Provider
NextVersion string
}
// UpgradeRef returns a string identifying the upgrade item; this string is derived by the provider.
func (u *UpgradeItem) UpgradeRef() string {
return u.InstanceName()
}
type providerUpgrader struct {
configClient config.Client
proxy Proxy
repositoryClientFactory RepositoryClientFactory
providerInventory InventoryClient
providerComponents ComponentsClient
}
var _ ProviderUpgrader = &providerUpgrader{}
func (u *providerUpgrader) Plan() ([]UpgradePlan, error) {
log := logf.Log
log.Info("Checking new release availability...")
providerList, err := u.providerInventory.List()
if err != nil {
return nil, err
}
// The core provider is driving all the plan logic for entire management cluster, because all the providers
// are expected to support the same API Version of Cluster API (contract).
// e.g if the core provider supports v1alpha4, all the providers in the same management cluster should support v1alpha4 as well;
// all the providers in the management cluster can upgrade to the latest release supporting v1alpha4, or if available,
// all the providers can upgrade to the latest release supporting v1alpha5 (not supported in current clusterctl release,
// but upgrade plan should report these options)
// Please note that upgrade plan also works on management cluster still in v1alpha3. In this case upgrade plan is shown, but
// upgrade to latest version in the v1alpha3 series are not supported using clusterctl v1alpha4 (use older releases).
// Gets the upgrade info for the core provider.
coreProviders := providerList.FilterCore()
if len(coreProviders) != 1 {
return nil, errors.Errorf("invalid management cluster: there should a core provider, found %d", len(coreProviders))
}
coreProvider := coreProviders[0]
coreUpgradeInfo, err := u.getUpgradeInfo(coreProvider)
if err != nil {
return nil, err
}
// Identifies the API Version of Cluster API (contract) that we should consider for the management cluster update (Nb. the core provider is driving the entire management cluster).
// This includes the current contract and the new ones available, if any.
contractsForUpgrade := coreUpgradeInfo.getContractsForUpgrade()
if len(contractsForUpgrade) == 0 {
return nil, errors.Wrapf(err, "invalid metadata: unable to find the API Version of Cluster API (contract) supported by the %s provider", coreProvider.InstanceName())
}
// Creates an UpgradePlan for each contract considered for upgrades; each upgrade plans contains
// an UpgradeItem for each provider defining the next available version with the target contract, if available.
// e.g. v1alpha3, cluster-api --> v0.3.2, kubeadm bootstrap --> v0.3.2, aws --> v0.5.4 (not supported in current clusterctl release, but upgrade plan should report these options).
// e.g. v1alpha4, cluster-api --> v0.4.1, kubeadm bootstrap --> v0.4.1, aws --> v0.X.2
// e.g. v1alpha4, cluster-api --> v0.5.1, kubeadm bootstrap --> v0.5.1, aws --> v0.Y.4 (not supported in current clusterctl release, but upgrade plan should report these options).
ret := make([]UpgradePlan, 0)
for _, contract := range contractsForUpgrade {
upgradePlan, err := u.getUpgradePlan(providerList.Items, contract)
if err != nil {
return nil, err
}
// If the upgrade plan is partial (at least one upgradeItem in the plan does not have a target version) and
// the upgrade plan requires a change of the contract for this management cluster, then drop it
// (all the provider in a management cluster are required to change contract at the same time).
if upgradePlan.isPartialUpgrade() && coreUpgradeInfo.currentContract != contract {
continue
}
ret = append(ret, *upgradePlan)
}
return ret, nil
}
func (u *providerUpgrader) ApplyPlan(opts UpgradeOptions, contract string) error {
if contract != clusterv1.GroupVersion.Version {
return errors.Errorf("current version of clusterctl could only upgrade to %s contract, requested %s", clusterv1.GroupVersion.Version, contract)
}
log := logf.Log
log.Info("Performing upgrade...")
// Gets the upgrade plan for the selected API Version of Cluster API (contract).
providerList, err := u.providerInventory.List()
if err != nil {
return err
}
upgradePlan, err := u.getUpgradePlan(providerList.Items, contract)
if err != nil {
return err
}
// Do the upgrade
return u.doUpgrade(upgradePlan, opts)
}
func (u *providerUpgrader) ApplyCustomPlan(opts UpgradeOptions, upgradeItems ...UpgradeItem) error {
log := logf.Log
log.Info("Performing upgrade...")
// Create a custom upgrade plan from the upgrade items, taking care of ensuring all the providers in a management
// cluster are consistent with the API Version of Cluster API (contract).
upgradePlan, err := u.createCustomPlan(upgradeItems)
if err != nil {
return err
}
// Do the upgrade
return u.doUpgrade(upgradePlan, opts)
}
// getUpgradePlan returns the upgrade plan for a specific set of providers/contract
// NB. this function is used both for upgrade plan and upgrade apply.
func (u *providerUpgrader) getUpgradePlan(providers []clusterctlv1.Provider, contract string) (*UpgradePlan, error) {
upgradeItems := []UpgradeItem{}
for _, provider := range providers {
// Gets the upgrade info for the provider.
providerUpgradeInfo, err := u.getUpgradeInfo(provider)
if err != nil {
return nil, err
}
// Identifies the next available version with the target contract for the provider, if available.
nextVersion := providerUpgradeInfo.getLatestNextVersion(contract)
// Append the upgrade item for the provider/with the target contract.
upgradeItems = append(upgradeItems, UpgradeItem{
Provider: provider,
NextVersion: versionTag(nextVersion),
})
}
return &UpgradePlan{
Contract: contract,
Providers: upgradeItems,
}, nil
}
// createCustomPlan creates a custom upgrade plan from a set of upgrade items, taking care of ensuring all the providers
// in a management cluster are consistent with the API Version of Cluster API (contract).
func (u *providerUpgrader) createCustomPlan(upgradeItems []UpgradeItem) (*UpgradePlan, error) {
// Gets the API Version of Cluster API (contract).
// The this is required to ensure all the providers in a management cluster are consistent with the contract supported by the core provider.
// e.g if the core provider is v1alpha3, all the provider should be v1alpha3 as well.
// The target contract is derived from the current version of the core provider, or, if the core provider is included in the upgrade list,
// from its target version.
providerList, err := u.providerInventory.List()
if err != nil {
return nil, err
}
coreProviders := providerList.FilterCore()
if len(coreProviders) != 1 {
return nil, errors.Errorf("invalid management cluster: there should a core provider, found %d", len(coreProviders))
}
coreProvider := coreProviders[0]
targetCoreProviderVersion := coreProvider.Version
for _, providerToUpgrade := range upgradeItems {
if providerToUpgrade.InstanceName() == coreProvider.InstanceName() {
targetCoreProviderVersion = providerToUpgrade.NextVersion
break
}
}
targetContract, err := u.getProviderContractByVersion(coreProvider, targetCoreProviderVersion)
if err != nil {
return nil, err
}
if targetContract != clusterv1.GroupVersion.Version {
return nil, errors.Errorf("current version of clusterctl could only upgrade to %s contract, requested %s", clusterv1.GroupVersion.Version, targetContract)
}
// Builds the custom upgrade plan, by adding all the upgrade items after checking consistency with the targetContract.
upgradeInstanceNames := sets.NewString()
upgradePlan := &UpgradePlan{
Contract: targetContract,
}
for _, upgradeItem := range upgradeItems {
// Match the upgrade item with the corresponding provider in the management cluster
var provider *clusterctlv1.Provider
for i := range providerList.Items {
if providerList.Items[i].InstanceName() == upgradeItem.InstanceName() {
provider = &providerList.Items[i]
break
}
}
if provider == nil {
return nil, errors.Errorf("unable to complete that upgrade: the provider %s in not part of the management cluster", upgradeItem.InstanceName())
}
// Retrieves the contract that is supported by the target version of the provider.
contract, err := u.getProviderContractByVersion(*provider, upgradeItem.NextVersion)
if err != nil {
return nil, err
}
if contract != targetContract {
return nil, errors.Errorf("unable to complete that upgrade: the target version for the provider %s supports the %s API Version of Cluster API (contract), while the management cluster is using %s", upgradeItem.InstanceName(), contract, targetContract)
}
upgradePlan.Providers = append(upgradePlan.Providers, upgradeItem)
upgradeInstanceNames.Insert(upgradeItem.InstanceName())
}
// Before doing upgrades, checks if other providers in the management cluster are lagging behind the target contract.
for _, provider := range providerList.Items {
// skip providers already included in the upgrade plan
if upgradeInstanceNames.Has(provider.InstanceName()) {
continue
}
// Retrieves the contract that is supported by the current version of the provider.
contract, err := u.getProviderContractByVersion(provider, provider.Version)
if err != nil {
return nil, err
}
if contract != targetContract {
return nil, errors.Errorf("unable to complete that upgrade: the provider %s supports the %s API Version of Cluster API (contract), while the management cluster is being updated to %s. Please include the %[1]s provider in the upgrade", provider.InstanceName(), contract, targetContract)
}
}
return upgradePlan, nil
}
// getProviderContractByVersion returns the contract that a provider will support if updated to the given target version.
func (u *providerUpgrader) getProviderContractByVersion(provider clusterctlv1.Provider, targetVersion string) (string, error) {
targetSemVersion, err := version.ParseSemantic(targetVersion)
if err != nil {
return "", errors.Wrapf(err, "failed to parse target version for the %s provider", provider.InstanceName())
}
// Gets the metadata for the core Provider
upgradeInfo, err := u.getUpgradeInfo(provider)
if err != nil {
return "", err
}
releaseSeries := upgradeInfo.metadata.GetReleaseSeriesForVersion(targetSemVersion)
if releaseSeries == nil {
return "", errors.Errorf("invalid target version: version %s for the provider %s does not match any release series", targetVersion, provider.InstanceName())
}
return releaseSeries.Contract, nil
}
// getUpgradeComponents returns the provider components for the selected target version.
func (u *providerUpgrader) getUpgradeComponents(provider UpgradeItem) (repository.Components, error) {
configRepository, err := u.configClient.Providers().Get(provider.ProviderName, provider.GetProviderType())
if err != nil {
return nil, err
}
providerRepository, err := u.repositoryClientFactory(configRepository, u.configClient)
if err != nil {
return nil, err
}
options := repository.ComponentsOptions{
Version: provider.NextVersion,
TargetNamespace: provider.Namespace,
}
components, err := providerRepository.Components().Get(options)
if err != nil {
return nil, err
}
return components, nil
}
func (u *providerUpgrader) doUpgrade(upgradePlan *UpgradePlan, opts UpgradeOptions) error {
// Check for multiple instances of the same provider if current contract is v1alpha3.
if upgradePlan.Contract == clusterv1.GroupVersion.Version {
if err := u.providerInventory.CheckSingleProviderInstance(); err != nil {
return err
}
}
// Ensure Providers are updated in the following order: Core, Bootstrap, ControlPlane, Infrastructure.
providers := upgradePlan.Providers
sort.Slice(providers, func(a, b int) bool {
return providers[a].GetProviderType().Order() < providers[b].GetProviderType().Order()
})
// Migrate CRs to latest CRD storage version, if necessary.
// Note: We have to do this before the providers are scaled down or deleted
// so conversion webhooks still work.
for _, upgradeItem := range providers {
// If there is not a specified next version, skip it (we are already up-to-date).
if upgradeItem.NextVersion == "" {
continue
}
// Gets the provider components for the target version.
components, err := u.getUpgradeComponents(upgradeItem)
if err != nil {
return err
}
c, err := u.proxy.NewClient()
if err != nil {
return err
}
if err := newCRDMigrator(c).Run(ctx, components.Objs()); err != nil {
return err
}
}
// Scale down all providers.
// This is done to ensure all Pods of all "old" provider Deployments have been deleted.
// Otherwise it can happen that a provider Pod survives the upgrade because we create
// a new Deployment with the same selector directly after `Delete`.
// This can lead to a failed upgrade because:
// * new provider Pods fail to startup because they try to list resources.
// * list resources fails, because the API server hits the old provider Pod when trying to
// call the conversion webhook for those resources.
for _, upgradeItem := range providers {
// If there is not a specified next version, skip it (we are already up-to-date).
if upgradeItem.NextVersion == "" {
continue
}
// Scale down provider.
if err := u.scaleDownProvider(upgradeItem.Provider); err != nil {
return err
}
}
installQueue := []repository.Components{}
// Delete old providers and deploy new ones if necessary, i.e. there is a NextVersion.
for _, upgradeItem := range providers {
// If there is not a specified next version, skip it (we are already up-to-date).
if upgradeItem.NextVersion == "" {
continue
}
// Gets the provider components for the target version.
components, err := u.getUpgradeComponents(upgradeItem)
if err != nil {
return err
}
installQueue = append(installQueue, components)
// Delete the provider, preserving CRD, namespace and the inventory.
if err := u.providerComponents.Delete(DeleteOptions{
Provider: upgradeItem.Provider,
IncludeNamespace: false,
IncludeCRDs: false,
SkipInventory: true,
}); err != nil {
return err
}
// Install the new version of the provider components.
if err := installComponentsAndUpdateInventory(components, u.providerComponents, u.providerInventory); err != nil {
return err
}
}
// Delete webhook namespace since it's not needed from v1alpha4.
if upgradePlan.Contract == clusterv1.GroupVersion.Version {
if err := u.providerComponents.DeleteWebhookNamespace(); err != nil {
return err
}
}
return waitForProvidersReady(InstallOptions(opts), installQueue, u.proxy)
}
func (u *providerUpgrader) scaleDownProvider(provider clusterctlv1.Provider) error {
log := logf.Log
log.Info("Scaling down", "Provider", provider.Name, "Version", provider.Version, "Namespace", provider.Namespace)
cs, err := u.proxy.NewClient()
if err != nil {
return err
}
// Fetch all Deployments belonging to a provider.
deploymentList := &appsv1.DeploymentList{}
if err := cs.List(ctx,
deploymentList,
client.InNamespace(provider.Namespace),
client.MatchingLabels{
clusterctlv1.ClusterctlLabelName: "",
clusterv1.ProviderLabelName: provider.ManifestLabel(),
}); err != nil {
return errors.Wrapf(err, "failed to list Deployments for provider %s", provider.Name)
}
// Scale down provider Deployments.
for _, deployment := range deploymentList.Items {
log.V(5).Info("Scaling down", "Deployment", deployment.Name, "Namespace", deployment.Namespace)
if err := scaleDownDeployment(ctx, cs, deployment); err != nil {
return err
}
}
return nil
}
// scaleDownDeployment scales down a Deployment to 0 and waits until all replicas have been deleted.
func scaleDownDeployment(ctx context.Context, c client.Client, deploy appsv1.Deployment) error {
if err := retryWithExponentialBackoff(newWriteBackoff(), func() error {
deployment := &appsv1.Deployment{}
if err := c.Get(ctx, client.ObjectKeyFromObject(&deploy), deployment); err != nil {
return errors.Wrapf(err, "failed to get Deployment/%s", deploy.GetName())
}
// Deployment already scaled down, return early.
if deployment.Spec.Replicas != nil && *deployment.Spec.Replicas == 0 {
return nil
}
// Scale down.
deployment.Spec.Replicas = pointer.Int32Ptr(0)
if err := c.Update(ctx, deployment); err != nil {
return errors.Wrapf(err, "failed to update Deployment/%s", deploy.GetName())
}
return nil
}); err != nil {
return errors.Wrapf(err, "failed to scale down Deployment")
}
deploymentScaleToZeroBackOff := wait.Backoff{
Duration: 1 * time.Second,
Factor: 1,
Steps: 60,
Jitter: 0.4,
}
if err := retryWithExponentialBackoff(deploymentScaleToZeroBackOff, func() error {
deployment := &appsv1.Deployment{}
if err := c.Get(ctx, client.ObjectKeyFromObject(&deploy), deployment); err != nil {
return errors.Wrapf(err, "failed to get Deployment/%s", deploy.GetName())
}
// Deployment is scaled down.
if deployment.Status.Replicas == 0 {
return nil
}
return errors.Errorf("Deployment still has %d replicas", deployment.Status.Replicas)
}); err != nil {
return errors.Wrapf(err, "failed to wait until Deployment is scaled down")
}
return nil
}
func newProviderUpgrader(configClient config.Client, proxy Proxy, repositoryClientFactory RepositoryClientFactory, providerInventory InventoryClient, providerComponents ComponentsClient) *providerUpgrader {
return &providerUpgrader{
configClient: configClient,
proxy: proxy,
repositoryClientFactory: repositoryClientFactory,
providerInventory: providerInventory,
providerComponents: providerComponents,
}
}