Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove deprecated use of wait. functions #10546

Merged
merged 3 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions pkg/agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ func Get(ctx context.Context, agent cmds.Agent, proxy proxy.Proxy) (*config.Node
// does not support jittering, so we instead use wait.JitterUntilWithContext, and cancel
// the context on success.
ctx, cancel := context.WithCancel(ctx)
defer cancel()

wait.JitterUntilWithContext(ctx, func(ctx context.Context) {
agentConfig, err = get(ctx, &agent, proxy)
if err != nil {
Expand All @@ -78,7 +80,7 @@ func KubeProxyDisabled(ctx context.Context, node *config.Node, proxy proxy.Proxy
var disabled bool
var err error

wait.PollImmediateUntilWithContext(ctx, 5*time.Second, func(ctx context.Context) (bool, error) {
_ = wait.PollUntilContextCancel(ctx, 5*time.Second, true, func(ctx context.Context) (bool, error) {
disabled, err = getKubeProxyDisabled(ctx, node, proxy)
if err != nil {
logrus.Infof("Waiting to retrieve kube-proxy configuration; server is not ready: %v", err)
Expand All @@ -96,7 +98,7 @@ func APIServers(ctx context.Context, node *config.Node, proxy proxy.Proxy) []str
var addresses []string
var err error

wait.PollImmediateUntilWithContext(ctx, 5*time.Second, func(ctx context.Context) (bool, error) {
_ = wait.PollUntilContextCancel(ctx, 5*time.Second, true, func(ctx context.Context) (bool, error) {
addresses, err = getAPIServers(ctx, node, proxy)
if err != nil {
logrus.Infof("Failed to retrieve list of apiservers from server: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/netpol/netpol.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func Run(ctx context.Context, nodeConfig *config.Node) error {
// kube-router netpol requires addresses to be available in the node object.
// Wait until the uninitialized taint has been removed, at which point the addresses should be set.
// TODO: Replace with non-deprecated PollUntilContextTimeout when our and Kubernetes code migrate to it
if err := wait.PollImmediateInfiniteWithContext(ctx, 2*time.Second, func(ctx context.Context) (bool, error) {
if err := wait.PollUntilContextCancel(ctx, 2*time.Second, true, func(ctx context.Context) (bool, error) {
// Get the node object
node, err := client.CoreV1().Nodes().Get(ctx, nodeConfig.AgentConfig.NodeName, metav1.GetOptions{})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/tunnel/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er
func (a *agentTunnel) setKubeletPort(ctx context.Context, apiServerReady <-chan struct{}) {
<-apiServerReady

wait.PollImmediateWithContext(ctx, time.Second, util.DefaultAPIServerReadyTimeout, func(ctx context.Context) (bool, error) {
wait.PollUntilContextTimeout(ctx, time.Second, util.DefaultAPIServerReadyTimeout, true, func(ctx context.Context) (bool, error) {
var readyTime metav1.Time
nodeName := os.Getenv("NODE_NAME")
node, err := a.client.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) {
}

if !c.config.EtcdDisableSnapshots {
wait.PollImmediateUntilWithContext(ctx, time.Second, func(ctx context.Context) (bool, error) {
_ = wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) {
err := c.managedDB.ReconcileSnapshotData(ctx)
if err != nil {
logrus.Errorf("Failed to record snapshots for cluster: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/managed.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (c *Cluster) setupEtcdProxy(ctx context.Context, etcdProxy etcd.Proxy) {
return
}
// We use Poll here instead of Until because we want to wait the interval before running the function.
go wait.PollUntilWithContext(ctx, 30*time.Second, func(ctx context.Context) (bool, error) {
go wait.PollUntilContextCancel(ctx, 30*time.Second, false, func(ctx context.Context) (bool, error) {
clientURLs, err := c.managedDB.GetMembersClientURLs(ctx)
if err != nil {
logrus.Warnf("Failed to get etcd ClientURLs: %v", err)
Expand Down
6 changes: 3 additions & 3 deletions pkg/cluster/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func RotateBootstrapToken(ctx context.Context, config *config.Control, oldToken
tokenKey := storageKey(normalizedToken)

var bootstrapList []client.Value
if err := wait.PollImmediateUntilWithContext(ctx, 5*time.Second, func(ctx context.Context) (bool, error) {
if err := wait.PollUntilContextCancel(ctx, 5*time.Second, true, func(ctx context.Context) (bool, error) {
bootstrapList, err = storageClient.List(ctx, "/bootstrap", 0)
if err != nil {
return false, err
Expand Down Expand Up @@ -198,7 +198,7 @@ func (c *Cluster) storageBootstrap(ctx context.Context) error {

attempts := 0
tokenKey := storageKey(normalizedToken)
return wait.PollImmediateUntilWithContext(ctx, time.Second, func(ctx context.Context) (bool, error) {
return wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) {
attempts++
value, saveBootstrap, err := getBootstrapKeyFromStorage(ctx, storageClient, normalizedToken, token)
c.saveBootstrap = saveBootstrap
Expand Down Expand Up @@ -258,7 +258,7 @@ func getBootstrapKeyFromStorage(ctx context.Context, storageClient client.Client
var bootstrapList []client.Value
var err error

if err := wait.PollImmediateUntilWithContext(ctx, 5*time.Second, func(ctx context.Context) (bool, error) {
if err := wait.PollUntilContextCancel(ctx, 5*time.Second, true, func(ctx context.Context) (bool, error) {
bootstrapList, err = storageClient.List(ctx, "/bootstrap", 0)
if err != nil {
if errors.Is(err, rpctypes.ErrGPRCNotSupportedForLearner) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ func (e *ETCD) Start(ctx context.Context, clientAccessInfo *clientaccess.Info) e
case <-time.After(30 * time.Second):
logrus.Infof("Waiting for container runtime to become ready before joining etcd cluster")
case <-e.config.Runtime.ContainerRuntimeReady:
if err := wait.PollImmediateUntilWithContext(ctx, time.Second, func(ctx context.Context) (bool, error) {
if err := wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) {
if err := e.join(ctx, clientAccessInfo); err != nil {
// Retry the join if waiting for another member to be promoted, or waiting for peers to connect after promotion
if errors.Is(err, rpctypes.ErrTooManyLearners) || errors.Is(err, rpctypes.ErrUnhealthy) {
Expand Down
8 changes: 5 additions & 3 deletions pkg/secretsencrypt/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,9 @@ func WriteEncryptionHashAnnotation(runtime *config.ControlRuntime, node *corev1.
// WaitForEncryptionConfigReload watches the metrics API, polling the latest time the encryption config was reloaded.
func WaitForEncryptionConfigReload(runtime *config.ControlRuntime, reloadSuccesses, reloadTime int64) error {
var lastFailure string
err := wait.PollImmediate(5*time.Second, 60*time.Second, func() (bool, error) {

ctx := context.Background()
err := wait.PollUntilContextTimeout(ctx, 5*time.Second, 60*time.Second, true, func(ctx context.Context) (bool, error) {
newReloadTime, newReloadSuccess, err := GetEncryptionConfigMetrics(runtime, false)
if err != nil {
return true, err
Expand Down Expand Up @@ -238,8 +239,9 @@ func GetEncryptionConfigMetrics(runtime *config.ControlRuntime, initialMetrics b

// This is wrapped in a poller because on startup no metrics exist. Its only after the encryption config
// is modified and the first reload occurs that the metrics are available.
err = wait.PollImmediate(5*time.Second, 60*time.Second, func() (bool, error) {
data, err := restClient.Get().AbsPath("/metrics").DoRaw(context.TODO())
ctx := context.Background()
err = wait.PollUntilContextTimeout(ctx, 5*time.Second, 60*time.Second, true, func(ctx context.Context) (bool, error) {
data, err := restClient.Get().AbsPath("/metrics").DoRaw(ctx)
if err != nil {
return true, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ func verifyNode(ctx context.Context, nodeClient coreclient.NodeController, node

func ensureSecret(ctx context.Context, config *Config, node *nodeInfo) {
runtime := config.ControlConfig.Runtime
wait.PollImmediateUntilWithContext(ctx, time.Second*5, func(ctx context.Context) (bool, error) {
_ = wait.PollUntilContextCancel(ctx, time.Second*5, true, func(ctx context.Context) (bool, error) {
if runtime.Core != nil {
secretClient := runtime.Core.Core().V1().Secret()
// This is consistent with events attached to the node generated by the kubelet
Expand Down
2 changes: 1 addition & 1 deletion pkg/spegel/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func NewServerBootstrapper(controlConfig *config.Control) routing.Bootstrapper {
func (s *serverBootstrapper) Run(_ context.Context, id string) error {
s.controlConfig.Runtime.ClusterControllerStarts["spegel-p2p"] = func(ctx context.Context) {
nodes := s.controlConfig.Runtime.Core.Core().V1().Node()
wait.PollImmediateUntilWithContext(ctx, 1*time.Second, func(ctx context.Context) (bool, error) {
_ = wait.PollUntilContextCancel(ctx, 1*time.Second, true, func(ctx context.Context) (bool, error) {
nodeName := os.Getenv("NODE_NAME")
if nodeName == "" {
return false, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/spegel/spegel.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (c *Config) Start(ctx context.Context, nodeConfig *config.Node) error {

// Wait up to 5 seconds for the p2p network to find peers. This will return
// immediately if the node is bootstrapping from itself.
wait.PollImmediateWithContext(ctx, time.Second, resolveTimeout, func(_ context.Context) (bool, error) {
_ = wait.PollUntilContextTimeout(ctx, time.Second, resolveTimeout, true, func(_ context.Context) (bool, error) {
return router.Ready()
})

Expand Down
4 changes: 2 additions & 2 deletions pkg/util/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func WaitForAPIServerReady(ctx context.Context, kubeconfigPath string, timeout t
return err
}

err = wait.PollImmediateWithContext(ctx, time.Second, timeout, func(ctx context.Context) (bool, error) {
err = wait.PollUntilContextTimeout(ctx, time.Second, timeout, true, func(ctx context.Context) (bool, error) {
healthStatus := 0
result := restClient.Get().AbsPath("/readyz").Do(ctx).StatusCode(&healthStatus)
if rerr := result.Error(); rerr != nil {
Expand Down Expand Up @@ -128,7 +128,7 @@ func WaitForRBACReady(ctx context.Context, kubeconfigPath string, timeout time.D
reviewFunc = subjectAccessReview(authClient, ra, user, groups)
}

err = wait.PollImmediateWithContext(ctx, time.Second, timeout, func(ctx context.Context) (bool, error) {
err = wait.PollUntilContextTimeout(ctx, time.Second, timeout, true, func(ctx context.Context) (bool, error) {
status, rerr := reviewFunc(ctx)
if rerr != nil {
lastErr = rerr
Expand Down
Loading