From 91fcb8421ee05919642f66ff18c268613aa0a9c3 Mon Sep 17 00:00:00 2001 From: fabriziopandini Date: Fri, 24 Jun 2022 22:21:48 +0200 Subject: [PATCH] clusterctl: migrate CRDs during upgrade Co-authored-by: sbueringer --- cmd/clusterctl/client/cluster/cert_manager.go | 10 + cmd/clusterctl/client/cluster/components.go | 10 + .../client/cluster/crd_migration.go | 224 +++++++++++++++ .../client/cluster/crd_migration_test.go | 254 ++++++++++++++++++ cmd/clusterctl/main.go | 5 + 5 files changed, 503 insertions(+) create mode 100644 cmd/clusterctl/client/cluster/crd_migration.go create mode 100644 cmd/clusterctl/client/cluster/crd_migration_test.go diff --git a/cmd/clusterctl/client/cluster/cert_manager.go b/cmd/clusterctl/client/cluster/cert_manager.go index bdd5e12a8dd1..d1ca26024e8e 100644 --- a/cmd/clusterctl/client/cluster/cert_manager.go +++ b/cmd/clusterctl/client/cluster/cert_manager.go @@ -179,6 +179,16 @@ func (cm *certManagerClient) install() error { return err } + c, err := cm.proxy.NewClient() + if err != nil { + return err + } + + // Migrate CRs to latest CRD storage version, if necessary. + if err := newCRDMigrator(c).Run(ctx, objs); err != nil { + return err + } + // Install all cert-manager manifests createCertManagerBackoff := newWriteBackoff() objs = utilresource.SortForCreate(objs) diff --git a/cmd/clusterctl/client/cluster/components.go b/cmd/clusterctl/client/cluster/components.go index 09be0b8275dc..142e3b4c65b8 100644 --- a/cmd/clusterctl/client/cluster/components.go +++ b/cmd/clusterctl/client/cluster/components.go @@ -73,6 +73,16 @@ type providerComponents struct { } func (p *providerComponents) Create(objs []unstructured.Unstructured) error { + c, err := p.proxy.NewClient() + if err != nil { + return err + } + + // Migrate CRs to latest CRD storage version, if necessary. + if err := newCRDMigrator(c).Run(ctx, objs); err != nil { + return err + } + createComponentObjectBackoff := newWriteBackoff() for i := range objs { obj := objs[i] diff --git a/cmd/clusterctl/client/cluster/crd_migration.go b/cmd/clusterctl/client/cluster/crd_migration.go new file mode 100644 index 000000000000..add1ec0a21fa --- /dev/null +++ b/cmd/clusterctl/client/cluster/crd_migration.go @@ -0,0 +1,224 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/pkg/errors" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/apimachinery/pkg/util/sets" + "sigs.k8s.io/controller-runtime/pkg/client" + + "sigs.k8s.io/cluster-api/cmd/clusterctl/internal/scheme" + logf "sigs.k8s.io/cluster-api/cmd/clusterctl/log" +) + +// crdMigrator migrates CRs to the storage version of new CRDs. +// This is necessary when the new CRD drops a version which +// was previously used as a storage version. +type crdMigrator struct { + Client client.Client +} + +// newCRDMigrator creates a new CRD migrator. +func newCRDMigrator(client client.Client) *crdMigrator { + return &crdMigrator{ + Client: client, + } +} + +// Run migrates CRs to the storage version of new CRDs. +// This is necessary when the new CRD drops a version which +// was previously used as a storage version. +func (m *crdMigrator) Run(ctx context.Context, objs []unstructured.Unstructured) error { + for i := range objs { + obj := objs[i] + + if obj.GetKind() == "CustomResourceDefinition" { + crd := &apiextensionsv1.CustomResourceDefinition{} + if err := scheme.Scheme.Convert(&obj, crd, nil); err != nil { + return errors.Wrapf(err, "failed to convert CRD %q", obj.GetName()) + } + + if _, err := m.run(ctx, crd); err != nil { + return err + } + } + } + return nil +} + +// Run migrates CRs of a new CRD. +// This is necessary when the new CRD drops a version which +// was previously used as a storage version. +func (m *crdMigrator) run(ctx context.Context, newCRD *apiextensionsv1.CustomResourceDefinition) (bool, error) { + log := logf.Log + + // Gets the list of version supported by the new CRD + newVersions := sets.NewString() + for _, version := range newCRD.Spec.Versions { + newVersions.Insert(version.Name) + } + + // Get the current CRD. + currentCRD := &apiextensionsv1.CustomResourceDefinition{} + if err := retryWithExponentialBackoff(newReadBackoff(), func() error { + return m.Client.Get(ctx, client.ObjectKeyFromObject(newCRD), currentCRD) + }); err != nil { + // Return if the CRD doesn't exist yet. We only have to migrate if the CRD exists already. + if apierrors.IsNotFound(err) { + return false, nil + } + return false, err + } + + // Get the storage version of the current CRD. + currentStorageVersion, err := storageVersionForCRD(currentCRD) + if err != nil { + return false, err + } + + // Return an error, if the current storage version has been dropped in the new CRD. + if !newVersions.Has(currentStorageVersion) { + return false, errors.Errorf("unable to upgrade CRD %q because the new CRD does not contain the storage version %q of the current CRD, thus not allowing CR migration", newCRD.Name, currentStorageVersion) + } + + currentStatusStoredVersions := sets.NewString(currentCRD.Status.StoredVersions...) + + // If the new CRD still contains all current stored versions, nothing to do + // as no previous storage version will be dropped. + if newVersions.HasAll(currentStatusStoredVersions.List()...) { + log.V(2).Info("CRD migration check passed", "name", newCRD.Name) + return false, nil + } + + // Otherwise a version that has been used as storage version will be dropped, so it is necessary to migrate all the + // objects and drop the storage version from the current CRD status before installing the new CRD. + // Rif https://kubernetes.io/docs/tasks/extend-kubernetes/custom-resources/custom-resource-definition-versioning/#writing-reading-and-updating-versioned-customresourcedefinition-objects + // Note: We are simply migrating all CR objects independent of the version in which they are actually stored in etcd. + // This way we can make sure that all CR objects are now stored in the current storage version. + // Alternatively, we would have to figure out which objects are stored in which version but this information is not + // exposed by the apiserver. + storedVersionsToDelete := currentStatusStoredVersions.Difference(newVersions) + storedVersionsToPreserve := currentStatusStoredVersions.Intersection(newVersions) + log.Info("CR migration required", "kind", newCRD.Spec.Names.Kind, "storedVersionsToDelete", strings.Join(storedVersionsToDelete.List(), ","), "storedVersionsToPreserve", strings.Join(storedVersionsToPreserve.List(), ",")) + + if err := m.migrateResourcesForCRD(ctx, currentCRD, currentStorageVersion); err != nil { + return false, err + } + + if err := m.patchCRDStoredVersions(ctx, currentCRD, storedVersionsToPreserve.List()); err != nil { + return false, err + } + + return true, nil +} + +func (m *crdMigrator) migrateResourcesForCRD(ctx context.Context, crd *apiextensionsv1.CustomResourceDefinition, currentStorageVersion string) error { + log := logf.Log + log.Info("Migrating CRs, this operation may take a while...", "kind", crd.Spec.Names.Kind) + + list := &unstructured.UnstructuredList{} + list.SetGroupVersionKind(schema.GroupVersionKind{ + Group: crd.Spec.Group, + Version: currentStorageVersion, + Kind: crd.Spec.Names.ListKind, + }) + + var i int + for { + if err := retryWithExponentialBackoff(newReadBackoff(), func() error { + return m.Client.List(ctx, list, client.Continue(list.GetContinue())) + }); err != nil { + return errors.Wrapf(err, "failed to list %q", list.GetKind()) + } + + for i := range list.Items { + obj := list.Items[i] + + log.V(5).Info("Migrating", logf.UnstructuredToValues(obj)...) + if err := retryWithExponentialBackoff(newWriteBackoff(), func() error { + return handleMigrateErr(m.Client.Update(ctx, &obj)) + }); err != nil { + return errors.Wrapf(err, "failed to migrate %s/%s", obj.GetNamespace(), obj.GetName()) + } + + // Add some random delays to avoid pressure on the API server. + i++ + if i%10 == 0 { + log.V(2).Info(fmt.Sprintf("%d objects migrated", i)) + time.Sleep(time.Duration(rand.IntnRange(50*int(time.Millisecond), 250*int(time.Millisecond)))) + } + } + + if list.GetContinue() == "" { + break + } + } + + log.V(2).Info(fmt.Sprintf("CR migration completed: migrated %d objects", i), "kind", crd.Spec.Names.Kind) + return nil +} + +func (m *crdMigrator) patchCRDStoredVersions(ctx context.Context, crd *apiextensionsv1.CustomResourceDefinition, storedVersionsToPreserve []string) error { + crd.Status.StoredVersions = storedVersionsToPreserve + if err := retryWithExponentialBackoff(newWriteBackoff(), func() error { + return m.Client.Status().Update(ctx, crd) + }); err != nil { + return errors.Wrapf(err, "failed to update status.storedVersions for CRD %q", crd.Name) + } + return nil +} + +// handleMigrateErr will absorb certain types of errors that we know can be skipped/passed on +// during a migration of a particular object. +func handleMigrateErr(err error) error { + if err == nil { + return nil + } + + // If the resource no longer exists, don't return the error as the object no longer + // needs updating to the new API version. + if apierrors.IsNotFound(err) { + return nil + } + + // If there was a conflict, another client must have written the object already which + // means we don't need to force an update. + if apierrors.IsConflict(err) { + return nil + } + return err +} + +// storageVersionForCRD discovers the storage version for a given CRD. +func storageVersionForCRD(crd *apiextensionsv1.CustomResourceDefinition) (string, error) { + for _, v := range crd.Spec.Versions { + if v.Storage { + return v.Name, nil + } + } + return "", errors.Errorf("could not find storage version for CRD %q", crd.Name) +} diff --git a/cmd/clusterctl/client/cluster/crd_migration_test.go b/cmd/clusterctl/client/cluster/crd_migration_test.go new file mode 100644 index 000000000000..0813751443a5 --- /dev/null +++ b/cmd/clusterctl/client/cluster/crd_migration_test.go @@ -0,0 +1,254 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster + +import ( + "fmt" + "testing" + + . "github.com/onsi/gomega" + "golang.org/x/net/context" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "sigs.k8s.io/controller-runtime/pkg/client" + + "sigs.k8s.io/cluster-api/cmd/clusterctl/internal/test" +) + +func Test_CRDMigrator(t *testing.T) { + tests := []struct { + name string + CRs []unstructured.Unstructured + currentCRD *apiextensionsv1.CustomResourceDefinition + newCRD *apiextensionsv1.CustomResourceDefinition + wantIsMigrated bool + wantStoredVersions []string + wantErr bool + }{ + { + name: "No-op if current CRD does not exists", + currentCRD: &apiextensionsv1.CustomResourceDefinition{ObjectMeta: metav1.ObjectMeta{Name: "something else"}}, // There is currently no "foo" CRD + newCRD: &apiextensionsv1.CustomResourceDefinition{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + wantIsMigrated: false, + }, + { + name: "Error if current CRD does not have a storage version", + currentCRD: &apiextensionsv1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: apiextensionsv1.CustomResourceDefinitionSpec{ + Versions: []apiextensionsv1.CustomResourceDefinitionVersion{ + {Name: "v1alpha1"}, // No storage version as storage is not set. + }, + }, + Status: apiextensionsv1.CustomResourceDefinitionStatus{StoredVersions: []string{"v1alpha1"}}, + }, + newCRD: &apiextensionsv1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: apiextensionsv1.CustomResourceDefinitionSpec{ + Versions: []apiextensionsv1.CustomResourceDefinitionVersion{ + {Name: "v1alpha1"}, + }, + }, + }, + wantErr: true, + wantIsMigrated: false, + }, + { + name: "No-op if new CRD supports same versions", + currentCRD: &apiextensionsv1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: apiextensionsv1.CustomResourceDefinitionSpec{ + Versions: []apiextensionsv1.CustomResourceDefinitionVersion{ + {Name: "v1alpha1", Storage: true}, + }, + }, + Status: apiextensionsv1.CustomResourceDefinitionStatus{StoredVersions: []string{"v1alpha1"}}, + }, + newCRD: &apiextensionsv1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: apiextensionsv1.CustomResourceDefinitionSpec{ + Versions: []apiextensionsv1.CustomResourceDefinitionVersion{ + {Name: "v1alpha1", Storage: true}, + }, + }, + }, + wantIsMigrated: false, + }, + { + name: "No-op if new CRD adds a new versions", + currentCRD: &apiextensionsv1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: apiextensionsv1.CustomResourceDefinitionSpec{ + Versions: []apiextensionsv1.CustomResourceDefinitionVersion{ + {Name: "v1alpha1", Storage: true}, + }, + }, + Status: apiextensionsv1.CustomResourceDefinitionStatus{StoredVersions: []string{"v1alpha1"}}, + }, + newCRD: &apiextensionsv1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: apiextensionsv1.CustomResourceDefinitionSpec{ + Versions: []apiextensionsv1.CustomResourceDefinitionVersion{ + {Name: "v1beta1", Storage: true}, // v1beta1 is being added + {Name: "v1alpha1"}, // v1alpha1 still exists + }, + }, + }, + wantIsMigrated: false, + }, + { + name: "Fails if new CRD drops current storage version", + currentCRD: &apiextensionsv1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: apiextensionsv1.CustomResourceDefinitionSpec{ + Versions: []apiextensionsv1.CustomResourceDefinitionVersion{ + {Name: "v1alpha1", Storage: true}, + }, + }, + Status: apiextensionsv1.CustomResourceDefinitionStatus{StoredVersions: []string{"v1alpha1"}}, + }, + newCRD: &apiextensionsv1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: apiextensionsv1.CustomResourceDefinitionSpec{ + Versions: []apiextensionsv1.CustomResourceDefinitionVersion{ + {Name: "v1", Storage: true}, // CRD is jumping to v1, but dropping current storage version without allowing migration. + }, + }, + }, + wantErr: true, + }, + { + name: "Migrate", + CRs: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "apiVersion": "foo/v1beta1", + "kind": "Foo", + "metadata": map[string]interface{}{ + "name": "cr1", + "namespace": metav1.NamespaceDefault, + }, + }, + }, + { + Object: map[string]interface{}{ + "apiVersion": "foo/v1beta1", + "kind": "Foo", + "metadata": map[string]interface{}{ + "name": "cr2", + "namespace": metav1.NamespaceDefault, + }, + }, + }, + { + Object: map[string]interface{}{ + "apiVersion": "foo/v1beta1", + "kind": "Foo", + "metadata": map[string]interface{}{ + "name": "cr3", + "namespace": metav1.NamespaceDefault, + }, + }, + }, + }, + currentCRD: &apiextensionsv1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: apiextensionsv1.CustomResourceDefinitionSpec{ + Group: "foo", + Names: apiextensionsv1.CustomResourceDefinitionNames{Kind: "Foo", ListKind: "FooList"}, + Versions: []apiextensionsv1.CustomResourceDefinitionVersion{ + {Name: "v1beta1", Storage: true}, + {Name: "v1alpha1"}, + }, + }, + Status: apiextensionsv1.CustomResourceDefinitionStatus{StoredVersions: []string{"v1beta1", "v1alpha1"}}, + }, + newCRD: &apiextensionsv1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: apiextensionsv1.CustomResourceDefinitionSpec{ + Group: "foo", + Names: apiextensionsv1.CustomResourceDefinitionNames{Kind: "Foo", ListKind: "FooList"}, + Versions: []apiextensionsv1.CustomResourceDefinitionVersion{ + {Name: "v1", Storage: true}, // v1 is being added + {Name: "v1beta1"}, // v1beta1 still there (required for migration) + // v1alpha1 is being dropped + }, + }, + }, + wantStoredVersions: []string{"v1beta1"}, // v1alpha1 should be dropped from current CRD's stored versions + wantIsMigrated: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + objs := []client.Object{tt.currentCRD} + for i := range tt.CRs { + objs = append(objs, &tt.CRs[i]) + } + + c, err := test.NewFakeProxy().WithObjs(objs...).NewClient() + g.Expect(err).ToNot(HaveOccurred()) + countingClient := newUpgradeCountingClient(c) + + m := crdMigrator{ + Client: countingClient, + } + + isMigrated, err := m.run(ctx, tt.newCRD) + if tt.wantErr { + g.Expect(err).To(HaveOccurred()) + } else { + g.Expect(err).ToNot(HaveOccurred()) + } + g.Expect(isMigrated).To(Equal(tt.wantIsMigrated)) + + if isMigrated { + storageVersion, err := storageVersionForCRD(tt.currentCRD) + g.Expect(err).ToNot(HaveOccurred()) + + // Check all the objects has been migrated. + g.Expect(countingClient.count).To(HaveKeyWithValue(fmt.Sprintf("%s/%s, Kind=%s", tt.currentCRD.Spec.Group, storageVersion, tt.currentCRD.Spec.Names.Kind), len(tt.CRs))) + + // Check storage versions has been cleaned up. + currentCRD := &apiextensionsv1.CustomResourceDefinition{} + err = c.Get(ctx, client.ObjectKeyFromObject(tt.newCRD), currentCRD) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(currentCRD.Status.StoredVersions).To(Equal(tt.wantStoredVersions)) + } + }) + } +} + +type UpgradeCountingClient struct { + count map[string]int + client.Client +} + +func newUpgradeCountingClient(inner client.Client) UpgradeCountingClient { + return UpgradeCountingClient{ + count: map[string]int{}, + Client: inner, + } +} + +func (u UpgradeCountingClient) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { + u.count[obj.GetObjectKind().GroupVersionKind().String()]++ + return u.Client.Update(ctx, obj, opts...) +} diff --git a/cmd/clusterctl/main.go b/cmd/clusterctl/main.go index 8179fe8942fd..d546bcc9bfa6 100644 --- a/cmd/clusterctl/main.go +++ b/cmd/clusterctl/main.go @@ -17,11 +17,16 @@ limitations under the License. package main import ( + "math/rand" + "time" + _ "k8s.io/client-go/plugin/pkg/client/auth" "sigs.k8s.io/cluster-api/cmd/clusterctl/cmd" ) func main() { + rand.Seed(time.Now().UnixNano()) + cmd.Execute() }