From 5ef0a8b8484d76ff88ffca75b9b416fae5a5096b Mon Sep 17 00:00:00 2001 From: Will Date: Thu, 18 Jul 2024 20:45:19 +0100 Subject: [PATCH 1/3] remove deprecated use of wait functions Signed-off-by: Will --- pkg/agent/config/config.go | 6 ++++-- pkg/agent/netpol/netpol.go | 2 +- pkg/agent/tunnel/tunnel.go | 2 +- pkg/cluster/cluster.go | 2 +- pkg/cluster/managed.go | 2 +- pkg/cluster/storage.go | 6 +++--- pkg/etcd/etcd.go | 2 +- pkg/secretsencrypt/config.go | 6 ++++-- pkg/server/router.go | 2 +- pkg/spegel/bootstrap.go | 2 +- pkg/spegel/spegel.go | 2 +- pkg/util/api.go | 4 ++-- 12 files changed, 21 insertions(+), 17 deletions(-) diff --git a/pkg/agent/config/config.go b/pkg/agent/config/config.go index 8cfd815e7e7b..f1850c5d105c 100644 --- a/pkg/agent/config/config.go +++ b/pkg/agent/config/config.go @@ -59,6 +59,8 @@ func Get(ctx context.Context, agent cmds.Agent, proxy proxy.Proxy) (*config.Node // does not support jittering, so we instead use wait.JitterUntilWithContext, and cancel // the context on success. ctx, cancel := context.WithCancel(ctx) + defer cancel() + wait.JitterUntilWithContext(ctx, func(ctx context.Context) { agentConfig, err = get(ctx, &agent, proxy) if err != nil { @@ -78,7 +80,7 @@ func KubeProxyDisabled(ctx context.Context, node *config.Node, proxy proxy.Proxy var disabled bool var err error - wait.PollImmediateUntilWithContext(ctx, 5*time.Second, func(ctx context.Context) (bool, error) { + _ = wait.PollUntilContextCancel(ctx, 5*time.Second, true, func(ctx context.Context) (bool, error) { disabled, err = getKubeProxyDisabled(ctx, node, proxy) if err != nil { logrus.Infof("Waiting to retrieve kube-proxy configuration; server is not ready: %v", err) @@ -96,7 +98,7 @@ func APIServers(ctx context.Context, node *config.Node, proxy proxy.Proxy) []str var addresses []string var err error - wait.PollImmediateUntilWithContext(ctx, 5*time.Second, func(ctx context.Context) (bool, error) { + _ = wait.PollUntilContextCancel(ctx, 5*time.Second, true, func(ctx context.Context) (bool, error) { addresses, err = getAPIServers(ctx, node, proxy) if err != nil { logrus.Infof("Failed to retrieve list of apiservers from server: %v", err) diff --git a/pkg/agent/netpol/netpol.go b/pkg/agent/netpol/netpol.go index f09d47d11e5b..5c892a668f36 100644 --- a/pkg/agent/netpol/netpol.go +++ b/pkg/agent/netpol/netpol.go @@ -70,7 +70,7 @@ func Run(ctx context.Context, nodeConfig *config.Node) error { // kube-router netpol requires addresses to be available in the node object. // Wait until the uninitialized taint has been removed, at which point the addresses should be set. // TODO: Replace with non-deprecated PollUntilContextTimeout when our and Kubernetes code migrate to it - if err := wait.PollImmediateInfiniteWithContext(ctx, 2*time.Second, func(ctx context.Context) (bool, error) { + if err := wait.PollUntilContextCancel(ctx, 2*time.Second, true, func(ctx context.Context) (bool, error) { // Get the node object node, err := client.CoreV1().Nodes().Get(ctx, nodeConfig.AgentConfig.NodeName, metav1.GetOptions{}) if err != nil { diff --git a/pkg/agent/tunnel/tunnel.go b/pkg/agent/tunnel/tunnel.go index 479288e0fb28..a5df415c7343 100644 --- a/pkg/agent/tunnel/tunnel.go +++ b/pkg/agent/tunnel/tunnel.go @@ -181,7 +181,7 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er func (a *agentTunnel) setKubeletPort(ctx context.Context, apiServerReady <-chan struct{}) { <-apiServerReady - wait.PollImmediateWithContext(ctx, time.Second, util.DefaultAPIServerReadyTimeout, func(ctx context.Context) (bool, error) { + wait.PollUntilContextTimeout(ctx, time.Second, util.DefaultAPIServerReadyTimeout, true, func(ctx context.Context) (bool, error) { var readyTime metav1.Time nodeName := os.Getenv("NODE_NAME") node, err := a.client.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 66f98ca165d0..365fd3568868 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -123,7 +123,7 @@ func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) { } if !c.config.EtcdDisableSnapshots { - wait.PollImmediateUntilWithContext(ctx, time.Second, func(ctx context.Context) (bool, error) { + _ = wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) { err := c.managedDB.ReconcileSnapshotData(ctx) if err != nil { logrus.Errorf("Failed to record snapshots for cluster: %v", err) diff --git a/pkg/cluster/managed.go b/pkg/cluster/managed.go index b0e6f71861dc..f3065cf1e191 100644 --- a/pkg/cluster/managed.go +++ b/pkg/cluster/managed.go @@ -136,7 +136,7 @@ func (c *Cluster) setupEtcdProxy(ctx context.Context, etcdProxy etcd.Proxy) { return } // We use Poll here instead of Until because we want to wait the interval before running the function. - go wait.PollUntilWithContext(ctx, 30*time.Second, func(ctx context.Context) (bool, error) { + go wait.PollUntilContextCancel(ctx, 30*time.Second, true, func(ctx context.Context) (bool, error) { clientURLs, err := c.managedDB.GetMembersClientURLs(ctx) if err != nil { logrus.Warnf("Failed to get etcd ClientURLs: %v", err) diff --git a/pkg/cluster/storage.go b/pkg/cluster/storage.go index e99766152631..b555ac976349 100644 --- a/pkg/cluster/storage.go +++ b/pkg/cluster/storage.go @@ -39,7 +39,7 @@ func RotateBootstrapToken(ctx context.Context, config *config.Control, oldToken tokenKey := storageKey(normalizedToken) var bootstrapList []client.Value - if err := wait.PollImmediateUntilWithContext(ctx, 5*time.Second, func(ctx context.Context) (bool, error) { + if err := wait.PollUntilContextCancel(ctx, 5*time.Second, true, func(ctx context.Context) (bool, error) { bootstrapList, err = storageClient.List(ctx, "/bootstrap", 0) if err != nil { return false, err @@ -198,7 +198,7 @@ func (c *Cluster) storageBootstrap(ctx context.Context) error { attempts := 0 tokenKey := storageKey(normalizedToken) - return wait.PollImmediateUntilWithContext(ctx, time.Second, func(ctx context.Context) (bool, error) { + return wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) { attempts++ value, saveBootstrap, err := getBootstrapKeyFromStorage(ctx, storageClient, normalizedToken, token) c.saveBootstrap = saveBootstrap @@ -258,7 +258,7 @@ func getBootstrapKeyFromStorage(ctx context.Context, storageClient client.Client var bootstrapList []client.Value var err error - if err := wait.PollImmediateUntilWithContext(ctx, 5*time.Second, func(ctx context.Context) (bool, error) { + if err := wait.PollUntilContextCancel(ctx, 5*time.Second, true, func(ctx context.Context) (bool, error) { bootstrapList, err = storageClient.List(ctx, "/bootstrap", 0) if err != nil { if errors.Is(err, rpctypes.ErrGPRCNotSupportedForLearner) { diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index da8d0b503592..4125680e66ef 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -482,7 +482,7 @@ func (e *ETCD) Start(ctx context.Context, clientAccessInfo *clientaccess.Info) e case <-time.After(30 * time.Second): logrus.Infof("Waiting for container runtime to become ready before joining etcd cluster") case <-e.config.Runtime.ContainerRuntimeReady: - if err := wait.PollImmediateUntilWithContext(ctx, time.Second, func(ctx context.Context) (bool, error) { + if err := wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) { if err := e.join(ctx, clientAccessInfo); err != nil { // Retry the join if waiting for another member to be promoted, or waiting for peers to connect after promotion if errors.Is(err, rpctypes.ErrTooManyLearners) || errors.Is(err, rpctypes.ErrUnhealthy) { diff --git a/pkg/secretsencrypt/config.go b/pkg/secretsencrypt/config.go index c95f856c798e..78213c19af85 100644 --- a/pkg/secretsencrypt/config.go +++ b/pkg/secretsencrypt/config.go @@ -198,8 +198,9 @@ func WriteEncryptionHashAnnotation(runtime *config.ControlRuntime, node *corev1. // WaitForEncryptionConfigReload watches the metrics API, polling the latest time the encryption config was reloaded. func WaitForEncryptionConfigReload(runtime *config.ControlRuntime, reloadSuccesses, reloadTime int64) error { var lastFailure string - err := wait.PollImmediate(5*time.Second, 60*time.Second, func() (bool, error) { + ctx := context.Background() + err := wait.PollUntilContextTimeout(ctx, 5*time.Second, 60*time.Second, true, func(ctx context.Context) (bool, error) { newReloadTime, newReloadSuccess, err := GetEncryptionConfigMetrics(runtime, false) if err != nil { return true, err @@ -238,7 +239,8 @@ func GetEncryptionConfigMetrics(runtime *config.ControlRuntime, initialMetrics b // This is wrapped in a poller because on startup no metrics exist. Its only after the encryption config // is modified and the first reload occurs that the metrics are available. - err = wait.PollImmediate(5*time.Second, 60*time.Second, func() (bool, error) { + ctx := context.Background() + err = wait.PollUntilContextTimeout(ctx, 5*time.Second, 60*time.Second, true, func(ctx context.Context) (bool, error) { data, err := restClient.Get().AbsPath("/metrics").DoRaw(context.TODO()) if err != nil { return true, err diff --git a/pkg/server/router.go b/pkg/server/router.go index 98ed472963ce..ec60d5f3d9c9 100644 --- a/pkg/server/router.go +++ b/pkg/server/router.go @@ -502,7 +502,7 @@ func verifyNode(ctx context.Context, nodeClient coreclient.NodeController, node func ensureSecret(ctx context.Context, config *Config, node *nodeInfo) { runtime := config.ControlConfig.Runtime - wait.PollImmediateUntilWithContext(ctx, time.Second*5, func(ctx context.Context) (bool, error) { + _ = wait.PollUntilContextCancel(ctx, time.Second*5, true, func(ctx context.Context) (bool, error) { if runtime.Core != nil { secretClient := runtime.Core.Core().V1().Secret() // This is consistent with events attached to the node generated by the kubelet diff --git a/pkg/spegel/bootstrap.go b/pkg/spegel/bootstrap.go index 3263bc65c647..b88e4040dfe5 100644 --- a/pkg/spegel/bootstrap.go +++ b/pkg/spegel/bootstrap.go @@ -103,7 +103,7 @@ func NewServerBootstrapper(controlConfig *config.Control) routing.Bootstrapper { func (s *serverBootstrapper) Run(_ context.Context, id string) error { s.controlConfig.Runtime.ClusterControllerStarts["spegel-p2p"] = func(ctx context.Context) { nodes := s.controlConfig.Runtime.Core.Core().V1().Node() - wait.PollImmediateUntilWithContext(ctx, 1*time.Second, func(ctx context.Context) (bool, error) { + _ = wait.PollUntilContextCancel(ctx, 1*time.Second, true, func(ctx context.Context) (bool, error) { nodeName := os.Getenv("NODE_NAME") if nodeName == "" { return false, nil diff --git a/pkg/spegel/spegel.go b/pkg/spegel/spegel.go index 785a31c915a4..aacdadca84c7 100644 --- a/pkg/spegel/spegel.go +++ b/pkg/spegel/spegel.go @@ -236,7 +236,7 @@ func (c *Config) Start(ctx context.Context, nodeConfig *config.Node) error { // Wait up to 5 seconds for the p2p network to find peers. This will return // immediately if the node is bootstrapping from itself. - wait.PollImmediateWithContext(ctx, time.Second, resolveTimeout, func(_ context.Context) (bool, error) { + _ = wait.PollUntilContextTimeout(ctx, time.Second, resolveTimeout, true, func(_ context.Context) (bool, error) { return router.Ready() }) diff --git a/pkg/util/api.go b/pkg/util/api.go index eb5acb7dbba8..5ce53c49ba48 100644 --- a/pkg/util/api.go +++ b/pkg/util/api.go @@ -80,7 +80,7 @@ func WaitForAPIServerReady(ctx context.Context, kubeconfigPath string, timeout t return err } - err = wait.PollImmediateWithContext(ctx, time.Second, timeout, func(ctx context.Context) (bool, error) { + err = wait.PollUntilContextTimeout(ctx, time.Second, timeout, true, func(ctx context.Context) (bool, error) { healthStatus := 0 result := restClient.Get().AbsPath("/readyz").Do(ctx).StatusCode(&healthStatus) if rerr := result.Error(); rerr != nil { @@ -128,7 +128,7 @@ func WaitForRBACReady(ctx context.Context, kubeconfigPath string, timeout time.D reviewFunc = subjectAccessReview(authClient, ra, user, groups) } - err = wait.PollImmediateWithContext(ctx, time.Second, timeout, func(ctx context.Context) (bool, error) { + err = wait.PollUntilContextTimeout(ctx, time.Second, timeout, true, func(ctx context.Context) (bool, error) { status, rerr := reviewFunc(ctx) if rerr != nil { lastErr = rerr From 2798e74ce00c82e94f82129a2d3a68f8c15e08ae Mon Sep 17 00:00:00 2001 From: Will Andrews Date: Tue, 23 Jul 2024 10:28:16 +0100 Subject: [PATCH 2/3] Update pkg/secretsencrypt/config.go Co-authored-by: Brad Davidson Signed-off-by: Will Andrews --- pkg/secretsencrypt/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/secretsencrypt/config.go b/pkg/secretsencrypt/config.go index 78213c19af85..382a66731142 100644 --- a/pkg/secretsencrypt/config.go +++ b/pkg/secretsencrypt/config.go @@ -241,7 +241,7 @@ func GetEncryptionConfigMetrics(runtime *config.ControlRuntime, initialMetrics b // is modified and the first reload occurs that the metrics are available. ctx := context.Background() err = wait.PollUntilContextTimeout(ctx, 5*time.Second, 60*time.Second, true, func(ctx context.Context) (bool, error) { - data, err := restClient.Get().AbsPath("/metrics").DoRaw(context.TODO()) + data, err := restClient.Get().AbsPath("/metrics").DoRaw(ctx) if err != nil { return true, err } From 2024e59d93a4249eee26a1e90b2a1070a8683893 Mon Sep 17 00:00:00 2001 From: Will Andrews Date: Thu, 25 Jul 2024 18:37:11 +0100 Subject: [PATCH 3/3] Update pkg/cluster/managed.go Co-authored-by: Derek Nola Signed-off-by: Will Andrews --- pkg/cluster/managed.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cluster/managed.go b/pkg/cluster/managed.go index f3065cf1e191..dc206ff8c396 100644 --- a/pkg/cluster/managed.go +++ b/pkg/cluster/managed.go @@ -136,7 +136,7 @@ func (c *Cluster) setupEtcdProxy(ctx context.Context, etcdProxy etcd.Proxy) { return } // We use Poll here instead of Until because we want to wait the interval before running the function. - go wait.PollUntilContextCancel(ctx, 30*time.Second, true, func(ctx context.Context) (bool, error) { + go wait.PollUntilContextCancel(ctx, 30*time.Second, false, func(ctx context.Context) (bool, error) { clientURLs, err := c.managedDB.GetMembersClientURLs(ctx) if err != nil { logrus.Warnf("Failed to get etcd ClientURLs: %v", err)