Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

polling helpers for stack plan/configuration state #953

Merged
merged 3 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 81 additions & 10 deletions stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,16 @@ type StackVCSRepo struct {

// Stack represents a stack.
type Stack struct {
ID string `jsonapi:"primary,stacks"`
Name string `jsonapi:"attr,name"`
Description string `jsonapi:"attr,description"`
DeploymentNames []string `jsonapi:"attr,deployment-names"`
VCSRepo *StackVCSRepo `jsonapi:"attr,vcs-repo"`
ErrorsCount int `jsonapi:"attr,errors-count"`
WarningsCount int `jsonapi:"attr,warnings-count"`
SpeculativePlansEnabled bool `jsonapi:"attr,speculative-enabled"`
CreatedAt time.Time `jsonapi:"attr,created-at,iso8601"`
UpdatedAt time.Time `jsonapi:"attr,updated-at,iso8601"`
ID string `jsonapi:"primary,stacks"`
Name string `jsonapi:"attr,name"`
Description string `jsonapi:"attr,description"`
DeploymentNames []string `jsonapi:"attr,deployment-names"`
VCSRepo *StackVCSRepo `jsonapi:"attr,vcs-repo"`
ErrorsCount int `jsonapi:"attr,errors-count"`
WarningsCount int `jsonapi:"attr,warnings-count"`
SpeculativeEnabled bool `jsonapi:"attr,speculative-enabled"`
CreatedAt time.Time `jsonapi:"attr,created-at,iso8601"`
UpdatedAt time.Time `jsonapi:"attr,updated-at,iso8601"`

// Relationships
Project *Project `jsonapi:"relation,project"`
Expand Down Expand Up @@ -168,6 +168,22 @@ type StackUpdateOptions struct {
VCSRepo *StackVCSRepo `jsonapi:"attr,vcs-repo,omitempty"`
}

// WaitForStatusResult is the data structure that is sent over the channel
// returned by various status polling functions. For each result, either the
// Error or the Status will be set, but not both. If the Quit field is set,
// the channel will be closed. If the Quit field is set and the Error is
// nil, the Status field will be set to a specified quit status.
type WaitForStatusResult struct {
ID string
Status string
ReadAttempts int
Error error
Quit bool
}

const minimumPollingIntervalMs = 3000
const maximumPollingIntervalMs = 5000

// UpdateConfiguration updates the configuration of a stack, triggering stack operations
func (s *stacks) UpdateConfiguration(ctx context.Context, stackID string) (*Stack, error) {
req, err := s.client.NewRequest("POST", fmt.Sprintf("stacks/%s/actions/update-configuration", url.PathEscape(stackID)), nil)
Expand Down Expand Up @@ -289,3 +305,58 @@ func (s StackVCSRepo) valid() error {

return nil
}

// awaitPoll is a helper function that uses a callback to read a status, then
// waits for a terminal status or an error. The callback should return the
// current status, or an error. For each time the status changes, the channel
// emits a new result. The id parameter should be the ID of the resource being
// polled, which is used in the result to help identify the resource being polled.
func awaitPoll(ctx context.Context, id string, reader func(ctx context.Context) (string, error), quitStatus []string) <-chan WaitForStatusResult {
resultCh := make(chan WaitForStatusResult)

mapStatus := make(map[string]struct{}, len(quitStatus))
for _, status := range quitStatus {
mapStatus[status] = struct{}{}
}

go func() {
defer close(resultCh)

reads := 0
lastStatus := ""
for {
select {
case <-ctx.Done():
resultCh <- WaitForStatusResult{ID: id, Error: fmt.Errorf("context canceled: %w", ctx.Err())}
return
case <-time.After(backoff(minimumPollingIntervalMs, maximumPollingIntervalMs, reads)):
status, err := reader(ctx)
if err != nil {
resultCh <- WaitForStatusResult{ID: id, Error: err, Quit: true}
return
}

_, terminal := mapStatus[status]

if status != lastStatus {
resultCh <- WaitForStatusResult{
ID: id,
Status: status,
ReadAttempts: reads + 1,
Quit: terminal,
}
}

lastStatus = status

if terminal {
return
}

reads += 1
}
}
}()

return resultCh
}
57 changes: 57 additions & 0 deletions stack_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,33 @@ type StackConfigurations interface {

// JSONSchemas returns a byte slice of the JSON schema for the stack configuration.
JSONSchemas(ctx context.Context, stackConfigurationID string) ([]byte, error)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add an AwaitQueued? This would be helpful since only then the eventstream url is set :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added AwaitStatus that accepts a specific status, and renamed "AwaitPrepared" to "AwaitCompleted"

// AwaitCompleted generates a channel that will receive the status of the
// stack configuration as it progresses, until that status is "converged",
// "converging", "errored", "canceled".
AwaitCompleted(ctx context.Context, stackConfigurationID string) <-chan WaitForStatusResult

// AwaitPrepared generates a channel that will receive the status of the
// stack configuration as it progresses, until that status is "<status>",
// "errored", "canceled".
AwaitStatus(ctx context.Context, stackConfigurationID string, status StackConfigurationStatus) <-chan WaitForStatusResult
}

type StackConfigurationStatus string

const (
StackConfigurationStatusPending StackConfigurationStatus = "pending"
StackConfigurationStatusQueued StackConfigurationStatus = "queued"
StackConfigurationStatusPreparing StackConfigurationStatus = "preparing"
StackConfigurationStatusEnqueueing StackConfigurationStatus = "enqueueing"
StackConfigurationStatusConverged StackConfigurationStatus = "converged"
StackConfigurationStatusConverging StackConfigurationStatus = "converging"
StackConfigurationStatusErrored StackConfigurationStatus = "errored"
StackConfigurationStatusCanceled StackConfigurationStatus = "canceled"
)

func (s StackConfigurationStatus) String() string {
return string(s)
}

type stackConfigurations struct {
Expand Down Expand Up @@ -64,3 +91,33 @@ func (s stackConfigurations) JSONSchemas(ctx context.Context, stackConfiguration

return raw.Bytes(), nil
}

// AwaitCompleted generates a channel that will receive the status of the stack configuration as it progresses.
// The channel will be closed when the stack configuration reaches a status indicating that or an error occurs. The
// read will be retried dependending on the configuration of the client. When the channel is closed,
// the last value will either be a completed status or an error.
func (s stackConfigurations) AwaitCompleted(ctx context.Context, stackConfigurationID string) <-chan WaitForStatusResult {
return awaitPoll(ctx, stackConfigurationID, func(ctx context.Context) (string, error) {
stackConfiguration, err := s.Read(ctx, stackConfigurationID)
if err != nil {
return "", err
}

return stackConfiguration.Status, nil
}, []string{StackConfigurationStatusConverged.String(), StackConfigurationStatusConverging.String(), StackConfigurationStatusErrored.String(), StackConfigurationStatusCanceled.String()})
}

// AwaitStatus generates a channel that will receive the status of the stack configuration as it progresses.
// The channel will be closed when the stack configuration reaches a status indicating that or an error occurs. The
// read will be retried dependending on the configuration of the client. When the channel is closed,
// the last value will either be the specified status, "errored" status, or "canceled" status, or an error.
func (s stackConfigurations) AwaitStatus(ctx context.Context, stackConfigurationID string, status StackConfigurationStatus) <-chan WaitForStatusResult {
return awaitPoll(ctx, stackConfigurationID, func(ctx context.Context) (string, error) {
stackConfiguration, err := s.Read(ctx, stackConfigurationID)
if err != nil {
return "", err
}

return stackConfiguration.Status, nil
}, []string{status.String(), StackConfigurationStatusErrored.String(), StackConfigurationStatusCanceled.String()})
}
84 changes: 83 additions & 1 deletion stack_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ type StackPlans interface {

// PlanDescription returns the plan description for a stack plan.
PlanDescription(ctx context.Context, stackPlanID string) (*JSONChangeDesc, error)

DanielMSchmidt marked this conversation as resolved.
Show resolved Hide resolved
// AwaitTerminal generates a channel that will receive the status of the stack plan as it progresses.
// See WaitForStatusResult for more information.
AwaitTerminal(ctx context.Context, stackPlanID string) <-chan WaitForStatusResult

// AwaitRunning generates a channel that will receive the status of the stack plan as it progresses.
// See WaitForStatusResult for more information.
AwaitRunning(ctx context.Context, stackPlanID string) <-chan WaitForStatusResult
}

type StackPlansStatusFilter string
Expand All @@ -46,8 +54,29 @@ const (
StackPlansStatusFilterCanceled StackPlansStatusFilter = "canceled"
)

type StackPlanStatus string

const (
StackPlanStatusCreated StackPlanStatus = "created"
StackPlanStatusRunning StackPlanStatus = "running"
StackPlanStatusRunningQueued StackPlanStatus = "running_queued"
StackPlanStatusRunningPlanning StackPlanStatus = "running_planning"
StackPlanStatusRunningApplying StackPlanStatus = "running_applying"
StackPlanStatusFinished StackPlanStatus = "finished"
StackPlanStatusFinishedNoChanges StackPlanStatus = "finished_no_changes"
StackPlanStatusFinishedPlanned StackPlanStatus = "finished_planned"
StackPlanStatusFinishedApplied StackPlanStatus = "finished_applied"
StackPlanStatusDiscarded StackPlanStatus = "discarded"
StackPlanStatusErrored StackPlanStatus = "errored"
StackPlanStatusCanceled StackPlanStatus = "canceled"
)

type StackPlansIncludeOpt string

func (s StackPlanStatus) String() string {
return string(s)
}

const (
StackPlansIncludeOperations StackPlansIncludeOpt = "stack_plan_operations"
)
Expand Down Expand Up @@ -98,7 +127,7 @@ type StackPlan struct {
ID string `jsonapi:"primary,stack-plans"`
PlanMode string `jsonapi:"attr,plan-mode"`
PlanNumber string `jsonapi:"attr,plan-number"`
Status string `jsonapi:"attr,status"`
Status StackPlanStatus `jsonapi:"attr,status"`
StatusTimestamps *StackPlanStatusTimestamps `jsonapi:"attr,status-timestamps"`
IsPlanned bool `jsonapi:"attr,is-planned"`
Changes *PlanChanges `jsonapi:"attr,changes"`
Expand Down Expand Up @@ -261,3 +290,56 @@ func (s stackPlans) PlanDescription(ctx context.Context, stackPlanID string) (*J

return jd, nil
}

// AwaitTerminal generates a channel that will receive the status of the stack plan as it progresses.
// The channel will be closed when the stack plan reaches a final status or an error occurs. The
// read will be retried dependending on the configuration of the client. When the channel is closed,
// the last value will either be a terminal status (finished, finished_no_changes, finished_applied,
// finished_planned, discarded, canceled, errorer), or an error. The status check will continue even
// if the stack plan is waiting for approval. Check the status within the the channel to determine
// if the stack plan needs approval.
func (s stackPlans) AwaitTerminal(ctx context.Context, stackPlanID string) <-chan WaitForStatusResult {
return awaitPoll(ctx, stackPlanID, func(ctx context.Context) (string, error) {
stackPlan, err := s.Read(ctx, stackPlanID)
if err != nil {
return "", err
}

return stackPlan.Status.String(), nil
}, []string{
StackPlanStatusFinished.String(),
StackPlanStatusFinishedNoChanges.String(),
StackPlanStatusFinishedApplied.String(),
StackPlanStatusFinishedPlanned.String(),
StackPlanStatusDiscarded.String(),
StackPlanStatusErrored.String(),
StackPlanStatusCanceled.String(),
})
}

// AwaitRunning generates a channel that will receive the status of the stack plan as it progresses.
// The channel will be closed when the stack plan reaches a running status (running, running_queued,
// running_planning, running_applying), a terminal status (finished, finished_no_changes, finished_applied,
// finished_planned, discarded, canceled, errorer), or an error occurs. The read will be retried
// dependending on the configuration of the client.
func (s stackPlans) AwaitRunning(ctx context.Context, stackPlanID string) <-chan WaitForStatusResult {
return awaitPoll(ctx, stackPlanID, func(ctx context.Context) (string, error) {
stackPlan, err := s.Read(ctx, stackPlanID)
if err != nil {
return "", err
}

return stackPlan.Status.String(), nil
}, []string{
StackPlanStatusRunning.String(),
StackPlanStatusRunningPlanning.String(),
StackPlanStatusRunningApplying.String(),
StackPlanStatusFinished.String(),
StackPlanStatusFinishedNoChanges.String(),
StackPlanStatusFinishedApplied.String(),
StackPlanStatusFinishedPlanned.String(),
StackPlanStatusDiscarded.String(),
StackPlanStatusErrored.String(),
StackPlanStatusCanceled.String(),
})
}
Loading