Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(deps): update vendored hcloud-go to 2.8.0 #6797

Merged
merged 2 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 0 additions & 166 deletions cluster-autoscaler/cloudprovider/hetzner/hcloud-go/hcloud/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,172 +120,6 @@ func (c *ActionClient) AllWithOpts(ctx context.Context, opts ActionListOpts) ([]
return c.action.All(ctx, opts)
}

// WatchOverallProgress watches several actions' progress until they complete
// with success or error. This watching happens in a goroutine and updates are
// provided through the two returned channels:
//
// - The first channel receives percentage updates of the progress, based on
// the number of completed versus total watched actions. The return value
// is an int between 0 and 100.
// - The second channel returned receives errors for actions that did not
// complete successfully, as well as any errors that happened while
// querying the API.
//
// By default, the method keeps watching until all actions have finished
// processing. If you want to be able to cancel the method or configure a
// timeout, use the [context.Context]. Once the method has stopped watching,
// both returned channels are closed.
//
// WatchOverallProgress uses the [WithPollBackoffFunc] of the [Client] to wait
// until sending the next request.
func (c *ActionClient) WatchOverallProgress(ctx context.Context, actions []*Action) (<-chan int, <-chan error) {
errCh := make(chan error, len(actions))
progressCh := make(chan int)

go func() {
defer close(errCh)
defer close(progressCh)

completedIDs := make([]int64, 0, len(actions))
watchIDs := make(map[int64]struct{}, len(actions))
for _, action := range actions {
watchIDs[action.ID] = struct{}{}
}

retries := 0
previousProgress := 0

for {
select {
case <-ctx.Done():
errCh <- ctx.Err()
return
case <-time.After(c.action.client.pollBackoffFunc(retries)):
retries++
}

opts := ActionListOpts{}
for watchID := range watchIDs {
opts.ID = append(opts.ID, watchID)
}

as, err := c.AllWithOpts(ctx, opts)
if err != nil {
errCh <- err
return
}
if len(as) == 0 {
// No actions returned for the provided IDs, they do not exist in the API.
// We need to catch and fail early for this, otherwise the loop will continue
// indefinitely.
errCh <- fmt.Errorf("failed to wait for actions: remaining actions (%v) are not returned from API", opts.ID)
return
}

progress := 0
for _, a := range as {
switch a.Status {
case ActionStatusRunning:
progress += a.Progress
case ActionStatusSuccess:
delete(watchIDs, a.ID)
completedIDs = append(completedIDs, a.ID)
case ActionStatusError:
delete(watchIDs, a.ID)
completedIDs = append(completedIDs, a.ID)
errCh <- fmt.Errorf("action %d failed: %w", a.ID, a.Error())
}
}

progress += len(completedIDs) * 100
if progress != 0 && progress != previousProgress {
sendProgress(progressCh, progress/len(actions))
previousProgress = progress
}

if len(watchIDs) == 0 {
return
}
}
}()

return progressCh, errCh
}

// WatchProgress watches one action's progress until it completes with success
// or error. This watching happens in a goroutine and updates are provided
// through the two returned channels:
//
// - The first channel receives percentage updates of the progress, based on
// the progress percentage indicated by the API. The return value is an int
// between 0 and 100.
// - The second channel receives any errors that happened while querying the
// API, as well as the error of the action if it did not complete
// successfully, or nil if it did.
//
// By default, the method keeps watching until the action has finished
// processing. If you want to be able to cancel the method or configure a
// timeout, use the [context.Context]. Once the method has stopped watching,
// both returned channels are closed.
//
// WatchProgress uses the [WithPollBackoffFunc] of the [Client] to wait until
// sending the next request.
func (c *ActionClient) WatchProgress(ctx context.Context, action *Action) (<-chan int, <-chan error) {
errCh := make(chan error, 1)
progressCh := make(chan int)

go func() {
defer close(errCh)
defer close(progressCh)

retries := 0

for {
select {
case <-ctx.Done():
errCh <- ctx.Err()
return
case <-time.After(c.action.client.pollBackoffFunc(retries)):
retries++
}

a, _, err := c.GetByID(ctx, action.ID)
if err != nil {
errCh <- err
return
}
if a == nil {
errCh <- fmt.Errorf("failed to wait for action %d: action not returned from API", action.ID)
return
}

switch a.Status {
case ActionStatusRunning:
sendProgress(progressCh, a.Progress)
case ActionStatusSuccess:
sendProgress(progressCh, 100)
errCh <- nil
return
case ActionStatusError:
errCh <- a.Error()
return
}
}
}()

return progressCh, errCh
}

// sendProgress allows the user to only read from the error channel and ignore any progress updates.
func sendProgress(progressCh chan int, p int) {
select {
case progressCh <- p:
break
default:
break
}
}

// ResourceActionClient is a client for the actions API exposed by the resource.
type ResourceActionClient struct {
resource string
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package hcloud

import (
"context"
"fmt"
"maps"
"slices"
"time"
)

type ActionWaiter interface {
WaitForFunc(ctx context.Context, handleUpdate func(update *Action) error, actions ...*Action) error
WaitFor(ctx context.Context, actions ...*Action) error
}

var _ ActionWaiter = (*ActionClient)(nil)

// WaitForFunc waits until all actions are completed by polling the API at the interval
// defined by [WithPollBackoffFunc]. An action is considered as complete when its status is
// either [ActionStatusSuccess] or [ActionStatusError].
//
// The handleUpdate callback is called every time an action is updated.
func (c *ActionClient) WaitForFunc(ctx context.Context, handleUpdate func(update *Action) error, actions ...*Action) error {
running := make(map[int64]struct{}, len(actions))
for _, action := range actions {
if action.Status == ActionStatusRunning {
running[action.ID] = struct{}{}
} else if handleUpdate != nil {
// We filter out already completed actions from the API polling loop; while
// this isn't a real update, the caller should be notified about the new
// state.
if err := handleUpdate(action); err != nil {
return err
}
}
}

retries := 0
for {
if len(running) == 0 {
break
}

select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(c.action.client.pollBackoffFunc(retries)):
retries++
}

opts := ActionListOpts{
Sort: []string{"status", "id"},
ID: make([]int64, 0, len(running)),
}
for actionID := range running {
opts.ID = append(opts.ID, actionID)
}
slices.Sort(opts.ID)

updates, err := c.AllWithOpts(ctx, opts)
if err != nil {
return err
}

if len(updates) != len(running) {
// Some actions may not exist in the API, also fail early to prevent an
// infinite loop when updates == 0.

notFound := maps.Clone(running)
for _, update := range updates {
delete(notFound, update.ID)
}
notFoundIDs := make([]int64, 0, len(notFound))
for unknownID := range notFound {
notFoundIDs = append(notFoundIDs, unknownID)
}

return fmt.Errorf("actions not found: %v", notFoundIDs)
}

for _, update := range updates {
if update.Status != ActionStatusRunning {
delete(running, update.ID)
}

if handleUpdate != nil {
if err := handleUpdate(update); err != nil {
return err
}
}
}
}

return nil
}

// WaitFor waits until all actions succeed by polling the API at the interval defined by
// [WithPollBackoffFunc]. An action is considered as succeeded when its status is either
// [ActionStatusSuccess].
//
// If a single action fails, the function will stop waiting and the error set in the
// action will be returned as an [ActionError].
//
// For more flexibility, see the [ActionClient.WaitForFunc] function.
func (c *ActionClient) WaitFor(ctx context.Context, actions ...*Action) error {
return c.WaitForFunc(
ctx,
func(update *Action) error {
if update.Status == ActionStatusError {
return update.Error()
}
return nil
},
actions...,
)
}
Loading
Loading