diff --git a/pkg/etcd/snapshot.go b/pkg/etcd/snapshot.go index 3fccfe37e868..f2999f093080 100644 --- a/pkg/etcd/snapshot.go +++ b/pkg/etcd/snapshot.go @@ -32,13 +32,16 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + k8sruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/pager" "k8s.io/client-go/util/retry" ) const ( - errorTTL = 24 * time.Hour + errorTTL = 24 * time.Hour + snapshotListPageSize = 20 ) var ( @@ -720,18 +723,20 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error { return err } - // List all snapshots matching the selector snapshots := e.config.Runtime.K3s.K3s().V1().ETCDSnapshotFile() - esfList, err := snapshots.List(metav1.ListOptions{LabelSelector: selector.String()}) - if err != nil { - return err - } + snapshotPager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (k8sruntime.Object, error) { return snapshots.List(opts) })) + snapshotPager.PageSize = snapshotListPageSize + // List all snapshots matching the selector // If a snapshot from Kubernetes was found on disk/s3, it is in sync and we can remove it from the map to sync. // If a snapshot from Kubernetes was not found on disk/s3, is is gone and can be removed from Kubernetes. // The one exception to the last rule is failed snapshots - these must be retained for a period of time. - for _, esf := range esfList.Items { - sfKey := generateETCDSnapshotFileConfigMapKey(esf) + if err := snapshotPager.EachListItem(ctx, metav1.ListOptions{LabelSelector: selector.String()}, func(obj k8sruntime.Object) error { + esf, ok := obj.(*k3s.ETCDSnapshotFile) + if !ok { + return errors.New("failed to convert object to ETCDSnapshotFile") + } + sfKey := generateETCDSnapshotFileConfigMapKey(*esf) logrus.Debugf("Found ETCDSnapshotFile for %s with key %s", esf.Spec.SnapshotName, sfKey) if sf, ok := snapshotFiles[sfKey]; ok && sf.GenerateName() == esf.Name { // exists in both and names match, don't need to sync @@ -741,7 +746,7 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error { if esf.Status.Error != nil && esf.Status.Error.Time != nil { expires := esf.Status.Error.Time.Add(errorTTL) if time.Now().Before(expires) { - continue + return nil } } if ok { @@ -754,6 +759,9 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error { logrus.Errorf("Failed to delete ETCDSnapshotFile: %v", err) } } + return nil + }); err != nil { + return err } // Any snapshots remaining in the map from disk/s3 were not found in Kubernetes and need to be created @@ -794,15 +802,18 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error { } // List and remove all snapshots stored on nodes that do not match the selector - esfList, err = snapshots.List(metav1.ListOptions{LabelSelector: selector.String()}) - if err != nil { - return err - } + if err := snapshotPager.EachListItem(ctx, metav1.ListOptions{LabelSelector: selector.String()}, func(obj k8sruntime.Object) error { + esf, ok := obj.(*k3s.ETCDSnapshotFile) + if !ok { + return errors.New("failed to convert object to ETCDSnapshotFile") + } - for _, esf := range esfList.Items { if err := snapshots.Delete(esf.Name, &metav1.DeleteOptions{}); err != nil { logrus.Errorf("Failed to delete ETCDSnapshotFile for non-etcd node %s: %v", esf.Spec.NodeName, err) } + return nil + }); err != nil { + return err } // Update our Node object to note the timestamp of the snapshot storages that have been reconciled diff --git a/pkg/etcd/snapshot_controller.go b/pkg/etcd/snapshot_controller.go index df15c1335610..fc3c33502de8 100644 --- a/pkg/etcd/snapshot_controller.go +++ b/pkg/etcd/snapshot_controller.go @@ -9,6 +9,7 @@ import ( "time" apisv1 "github.com/k3s-io/k3s/pkg/apis/k3s.cattle.io/v1" + k3s "github.com/k3s-io/k3s/pkg/apis/k3s.cattle.io/v1" "github.com/k3s-io/k3s/pkg/etcd/snapshot" controllersv1 "github.com/k3s-io/k3s/pkg/generated/controllers/k3s.cattle.io/v1" "github.com/k3s-io/k3s/pkg/util" @@ -20,7 +21,9 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + k8sruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/pager" "k8s.io/client-go/util/retry" "github.com/sirupsen/logrus" @@ -216,20 +219,25 @@ func (e *etcdSnapshotHandler) reconcile() error { logrus.Infof("Reconciling snapshot ConfigMap data") // Get a list of existing snapshots - snapshotList, err := e.snapshots.List(metav1.ListOptions{}) - if err != nil { - return err - } - snapshots := map[string]*apisv1.ETCDSnapshotFile{} - for i := range snapshotList.Items { - esf := &snapshotList.Items[i] + snapshotPager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (k8sruntime.Object, error) { return e.snapshots.List(opts) })) + snapshotPager.PageSize = snapshotListPageSize + + if err := snapshotPager.EachListItem(e.ctx, metav1.ListOptions{}, func(obj k8sruntime.Object) error { + esf, ok := obj.(*k3s.ETCDSnapshotFile) + if !ok { + return errors.New("failed to convert object to ETCDSnapshotFile") + } + // Do not create entries for snapshots that have been deleted or do not have extra metadata if !esf.DeletionTimestamp.IsZero() || len(esf.Spec.Metadata) == 0 { - continue + return nil } sfKey := generateETCDSnapshotFileConfigMapKey(*esf) snapshots[sfKey] = esf + return nil + }); err != nil { + return err } snapshotConfigMap, err := e.configmaps.Get(metav1.NamespaceSystem, snapshotConfigMapName, metav1.GetOptions{}) diff --git a/pkg/secretsencrypt/controller.go b/pkg/secretsencrypt/controller.go index 070c420ddf57..03976d7f9e02 100644 --- a/pkg/secretsencrypt/controller.go +++ b/pkg/secretsencrypt/controller.go @@ -2,6 +2,7 @@ package secretsencrypt import ( "context" + "errors" "fmt" "strings" @@ -12,7 +13,6 @@ import ( "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" @@ -30,6 +30,8 @@ const ( secretsProgressEvent string = "SecretsProgress" secretsUpdateCompleteEvent string = "SecretsUpdateComplete" secretsUpdateErrorEvent string = "SecretsUpdateError" + + secretListPageSize = 20 ) type handler struct { @@ -116,7 +118,7 @@ func (h *handler) onChangeNode(nodeName string, node *corev1.Node) (*corev1.Node return node, err } - if err := h.updateSecrets(node); err != nil { + if err := h.updateSecrets(nodeRef); err != nil { h.recorder.Event(nodeRef, corev1.EventTypeWarning, secretsUpdateErrorEvent, err.Error()) return node, err } @@ -213,36 +215,30 @@ func (h *handler) validateReencryptStage(node *corev1.Node, annotation string) ( return true, nil } -func (h *handler) updateSecrets(node *corev1.Node) error { - nodeRef := &corev1.ObjectReference{ - Kind: "Node", - Name: node.Name, - UID: types.UID(node.Name), - Namespace: "", - } +func (h *handler) updateSecrets(nodeRef *corev1.ObjectReference) error { secretPager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) { - return h.secrets.List("", opts) + return h.secrets.List(metav1.NamespaceAll, opts) })) - secretsList, _, err := secretPager.List(h.ctx, metav1.ListOptions{}) - if err != nil { - return err - } + secretPager.PageSize = secretListPageSize + i := 0 - err = meta.EachListItem(secretsList, func(obj runtime.Object) error { - if secret, ok := obj.(*corev1.Secret); ok { - if _, err := h.secrets.Update(secret); err != nil && !apierrors.IsConflict(err) { - return fmt.Errorf("failed to update secret: %v", err) - } - if i != 0 && i%10 == 0 { - h.recorder.Eventf(nodeRef, corev1.EventTypeNormal, secretsProgressEvent, "reencrypted %d secrets", i) - } - i++ + if err := secretPager.EachListItem(h.ctx, metav1.ListOptions{}, func(obj runtime.Object) error { + secret, ok := obj.(*corev1.Secret) + if !ok { + return errors.New("failed to convert object to Secret") + } + if _, err := h.secrets.Update(secret); err != nil && !apierrors.IsConflict(err) { + return fmt.Errorf("failed to update secret: %v", err) } + if i != 0 && i%10 == 0 { + h.recorder.Eventf(nodeRef, corev1.EventTypeNormal, secretsProgressEvent, "reencrypted %d secrets", i) + } + i++ return nil - }) - if err != nil { + }); err != nil { return err } + h.recorder.Eventf(nodeRef, corev1.EventTypeNormal, secretsUpdateCompleteEvent, "completed reencrypt of %d secrets", i) return nil }