Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pagination when listing large numbers of resources #10527

Merged
merged 2 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 25 additions & 14 deletions pkg/etcd/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,16 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/pager"
"k8s.io/client-go/util/retry"
)

const (
errorTTL = 24 * time.Hour
errorTTL = 24 * time.Hour
snapshotListPageSize = 20
)

var (
Expand Down Expand Up @@ -720,18 +723,20 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
return err
}

// List all snapshots matching the selector
snapshots := e.config.Runtime.K3s.K3s().V1().ETCDSnapshotFile()
esfList, err := snapshots.List(metav1.ListOptions{LabelSelector: selector.String()})
if err != nil {
return err
}
snapshotPager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (k8sruntime.Object, error) { return snapshots.List(opts) }))
snapshotPager.PageSize = snapshotListPageSize

// List all snapshots matching the selector
// If a snapshot from Kubernetes was found on disk/s3, it is in sync and we can remove it from the map to sync.
// If a snapshot from Kubernetes was not found on disk/s3, is is gone and can be removed from Kubernetes.
// The one exception to the last rule is failed snapshots - these must be retained for a period of time.
for _, esf := range esfList.Items {
sfKey := generateETCDSnapshotFileConfigMapKey(esf)
if err := snapshotPager.EachListItem(ctx, metav1.ListOptions{LabelSelector: selector.String()}, func(obj k8sruntime.Object) error {
esf, ok := obj.(*k3s.ETCDSnapshotFile)
if !ok {
return errors.New("failed to convert object to ETCDSnapshotFile")
}
sfKey := generateETCDSnapshotFileConfigMapKey(*esf)
logrus.Debugf("Found ETCDSnapshotFile for %s with key %s", esf.Spec.SnapshotName, sfKey)
if sf, ok := snapshotFiles[sfKey]; ok && sf.GenerateName() == esf.Name {
// exists in both and names match, don't need to sync
Expand All @@ -741,7 +746,7 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
if esf.Status.Error != nil && esf.Status.Error.Time != nil {
expires := esf.Status.Error.Time.Add(errorTTL)
if time.Now().Before(expires) {
continue
return nil
}
}
if ok {
Expand All @@ -754,6 +759,9 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
logrus.Errorf("Failed to delete ETCDSnapshotFile: %v", err)
}
}
return nil
}); err != nil {
return err
}

// Any snapshots remaining in the map from disk/s3 were not found in Kubernetes and need to be created
Expand Down Expand Up @@ -794,15 +802,18 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
}

// List and remove all snapshots stored on nodes that do not match the selector
esfList, err = snapshots.List(metav1.ListOptions{LabelSelector: selector.String()})
if err != nil {
return err
}
if err := snapshotPager.EachListItem(ctx, metav1.ListOptions{LabelSelector: selector.String()}, func(obj k8sruntime.Object) error {
esf, ok := obj.(*k3s.ETCDSnapshotFile)
if !ok {
return errors.New("failed to convert object to ETCDSnapshotFile")
}

for _, esf := range esfList.Items {
if err := snapshots.Delete(esf.Name, &metav1.DeleteOptions{}); err != nil {
logrus.Errorf("Failed to delete ETCDSnapshotFile for non-etcd node %s: %v", esf.Spec.NodeName, err)
}
return nil
}); err != nil {
return err
}

// Update our Node object to note the timestamp of the snapshot storages that have been reconciled
Expand Down
24 changes: 16 additions & 8 deletions pkg/etcd/snapshot_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

apisv1 "github.com/k3s-io/k3s/pkg/apis/k3s.cattle.io/v1"
k3s "github.com/k3s-io/k3s/pkg/apis/k3s.cattle.io/v1"
"github.com/k3s-io/k3s/pkg/etcd/snapshot"
controllersv1 "github.com/k3s-io/k3s/pkg/generated/controllers/k3s.cattle.io/v1"
"github.com/k3s-io/k3s/pkg/util"
Expand All @@ -20,7 +21,9 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/pager"
"k8s.io/client-go/util/retry"

"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -216,20 +219,25 @@ func (e *etcdSnapshotHandler) reconcile() error {
logrus.Infof("Reconciling snapshot ConfigMap data")

// Get a list of existing snapshots
snapshotList, err := e.snapshots.List(metav1.ListOptions{})
if err != nil {
return err
}

snapshots := map[string]*apisv1.ETCDSnapshotFile{}
for i := range snapshotList.Items {
esf := &snapshotList.Items[i]
snapshotPager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (k8sruntime.Object, error) { return e.snapshots.List(opts) }))
snapshotPager.PageSize = snapshotListPageSize

if err := snapshotPager.EachListItem(e.ctx, metav1.ListOptions{}, func(obj k8sruntime.Object) error {
esf, ok := obj.(*k3s.ETCDSnapshotFile)
if !ok {
return errors.New("failed to convert object to ETCDSnapshotFile")
}

// Do not create entries for snapshots that have been deleted or do not have extra metadata
if !esf.DeletionTimestamp.IsZero() || len(esf.Spec.Metadata) == 0 {
continue
return nil
}
sfKey := generateETCDSnapshotFileConfigMapKey(*esf)
snapshots[sfKey] = esf
return nil
}); err != nil {
return err
}

snapshotConfigMap, err := e.configmaps.Get(metav1.NamespaceSystem, snapshotConfigMapName, metav1.GetOptions{})
Expand Down
46 changes: 21 additions & 25 deletions pkg/secretsencrypt/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package secretsencrypt

import (
"context"
"errors"
"fmt"
"strings"

Expand All @@ -12,7 +13,6 @@ import (
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -30,6 +30,8 @@ const (
secretsProgressEvent string = "SecretsProgress"
secretsUpdateCompleteEvent string = "SecretsUpdateComplete"
secretsUpdateErrorEvent string = "SecretsUpdateError"

secretListPageSize = 20
dereknola marked this conversation as resolved.
Show resolved Hide resolved
)

type handler struct {
Expand Down Expand Up @@ -116,7 +118,7 @@ func (h *handler) onChangeNode(nodeName string, node *corev1.Node) (*corev1.Node
return node, err
}

if err := h.updateSecrets(node); err != nil {
if err := h.updateSecrets(nodeRef); err != nil {
h.recorder.Event(nodeRef, corev1.EventTypeWarning, secretsUpdateErrorEvent, err.Error())
return node, err
}
Expand Down Expand Up @@ -213,36 +215,30 @@ func (h *handler) validateReencryptStage(node *corev1.Node, annotation string) (
return true, nil
}

func (h *handler) updateSecrets(node *corev1.Node) error {
nodeRef := &corev1.ObjectReference{
Kind: "Node",
Name: node.Name,
UID: types.UID(node.Name),
Namespace: "",
}
func (h *handler) updateSecrets(nodeRef *corev1.ObjectReference) error {
secretPager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return h.secrets.List("", opts)
return h.secrets.List(metav1.NamespaceAll, opts)
}))
secretsList, _, err := secretPager.List(h.ctx, metav1.ListOptions{})
if err != nil {
return err
}
secretPager.PageSize = secretListPageSize

i := 0
err = meta.EachListItem(secretsList, func(obj runtime.Object) error {
if secret, ok := obj.(*corev1.Secret); ok {
if _, err := h.secrets.Update(secret); err != nil && !apierrors.IsConflict(err) {
return fmt.Errorf("failed to update secret: %v", err)
}
if i != 0 && i%10 == 0 {
h.recorder.Eventf(nodeRef, corev1.EventTypeNormal, secretsProgressEvent, "reencrypted %d secrets", i)
}
i++
if err := secretPager.EachListItem(h.ctx, metav1.ListOptions{}, func(obj runtime.Object) error {
secret, ok := obj.(*corev1.Secret)
if !ok {
return errors.New("failed to convert object to Secret")
}
if _, err := h.secrets.Update(secret); err != nil && !apierrors.IsConflict(err) {
return fmt.Errorf("failed to update secret: %v", err)
}
if i != 0 && i%10 == 0 {
h.recorder.Eventf(nodeRef, corev1.EventTypeNormal, secretsProgressEvent, "reencrypted %d secrets", i)
}
i++
return nil
})
if err != nil {
}); err != nil {
return err
}

h.recorder.Eventf(nodeRef, corev1.EventTypeNormal, secretsUpdateCompleteEvent, "completed reencrypt of %d secrets", i)
return nil
}
Loading