Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2] Add upgrade action retry #1219

Merged
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,4 @@
- Add liveness endpoint, allow fleet-gateway component to report degraded state, add update time and messages to status output. {issue}390[390] {pull}569[569]
- Redact sensitive information on diagnostics collect command. {issue}[241] {pull}[566]
- Fix incorrectly creating a filebeat redis input when a policy contains a packetbeat redis input. {issue}[427] {pull}[700]
- Allow upgrade actions to be retried on failure with action queue scheduling. {issue}778[778] {pull}1219[1219]
4 changes: 3 additions & 1 deletion internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,9 @@ func (c *Coordinator) Upgrade(ctx context.Context, version string, sourceURI str
c.state.overrideState = nil
return err
}
c.ReExec(cb)
if cb != nil {
c.ReExec(cb)
}
return nil
}

Expand Down
107 changes: 91 additions & 16 deletions internal/pkg/agent/application/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@ import (
type actionHandlers map[string]actions.Handler

type priorityQueue interface {
Add(fleetapi.Action, int64)
DequeueActions() []fleetapi.Action
Add(fleetapi.ScheduledAction, int64)
DequeueActions() []fleetapi.ScheduledAction
CancelType(string) int
Save() error
}

// Dispatcher processes actions coming from fleet api.
type Dispatcher interface {
Dispatch(context.Context, acker.Acker, ...fleetapi.Action) error
Dispatch(context.Context, acker.Acker, ...fleetapi.Action)
Errors() <-chan error
}

// ActionDispatcher processes actions coming from fleet using registered set of handlers.
Expand All @@ -39,6 +41,8 @@ type ActionDispatcher struct {
handlers actionHandlers
def actions.Handler
queue priorityQueue
rt *retryConfig
errCh chan error
}

// New creates a new action dispatcher.
Expand All @@ -60,9 +64,15 @@ func New(log *logger.Logger, def actions.Handler, queue priorityQueue) (*ActionD
handlers: make(actionHandlers),
def: def,
queue: queue,
rt: defaultRetryConfig(),
errCh: make(chan error),
}, nil
}

func (ad *ActionDispatcher) Errors() <-chan error {
return ad.errCh
}

// Register registers a new handler for action.
func (ad *ActionDispatcher) Register(a fleetapi.Action, handler actions.Handler) error {
k := ad.key(a)
Expand All @@ -88,13 +98,18 @@ func (ad *ActionDispatcher) key(a fleetapi.Action) string {
}

// Dispatch dispatches an action using pre-registered set of handlers.
func (ad *ActionDispatcher) Dispatch(ctx context.Context, acker acker.Acker, actions ...fleetapi.Action) (err error) {
// Dispatch will handle action queue operations, and retries.
// Any action that implements the ScheduledAction interface may be added/removed from the queue based on StartTime.
// Any action that implements the RetryableAction interface will be rescheduled if the handler returns an error.
func (ad *ActionDispatcher) Dispatch(ctx context.Context, acker acker.Acker, actions ...fleetapi.Action) {
var err error
span, ctx := apm.StartSpan(ctx, "dispatch", "app.internal")
defer func() {
apm.CaptureError(ctx, err).Send()
span.End()
}()

ad.removeQueuedUpgrades(actions)
actions = ad.queueScheduledActions(actions)
actions = ad.dispatchCancelActions(ctx, actions, acker)
queued, expired := ad.gatherQueuedActions(time.Now().UTC())
Expand All @@ -108,7 +123,7 @@ func (ad *ActionDispatcher) Dispatch(ctx context.Context, acker acker.Acker, act

if len(actions) == 0 {
ad.log.Debug("No action to dispatch")
return nil
return
}

ad.log.Debugf(
Expand All @@ -118,18 +133,28 @@ func (ad *ActionDispatcher) Dispatch(ctx context.Context, acker acker.Acker, act
)

for _, action := range actions {
if err := ctx.Err(); err != nil {
return err
if err = ctx.Err(); err != nil {
ad.errCh <- err
return
}

if err := ad.dispatchAction(ctx, action, acker); err != nil {
rAction, ok := action.(fleetapi.RetryableAction)
if ok {
rAction.SetError(err) // set the retryable action error to what the dispatcher returned
ad.scheduleRetry(ctx, rAction, acker)
continue
}
ad.log.Debugf("Failed to dispatch action '%+v', error: %+v", action, err)
return err
ad.errCh <- err
continue
}
ad.log.Debugf("Successfully dispatched action: '%+v'", action)
}

return acker.Commit(ctx)
if err = acker.Commit(ctx); err != nil {
ad.errCh <- err
}
}

func (ad *ActionDispatcher) dispatchAction(ctx context.Context, a fleetapi.Action, acker acker.Acker) error {
Expand All @@ -154,15 +179,18 @@ func detectTypes(actions []fleetapi.Action) []string {
func (ad *ActionDispatcher) queueScheduledActions(input []fleetapi.Action) []fleetapi.Action {
actions := make([]fleetapi.Action, 0, len(input))
for _, action := range input {
start, err := action.StartTime()
if err == nil {
ad.log.Debugf("Adding action id: %s to queue.", action.ID())
ad.queue.Add(action, start.Unix())
sAction, ok := action.(fleetapi.ScheduledAction)
if ok {
start, err := sAction.StartTime()
if err != nil {
ad.log.Warnf("Skipping addition to action-queue, issue gathering start time from action id %s: %v", sAction.ID(), err)
actions = append(actions, action)
continue
}
ad.log.Debugf("Adding action id: %s to queue.", sAction.ID())
ad.queue.Add(sAction, start.Unix())
continue
}
if !errors.Is(err, fleetapi.ErrNoStartTime) {
ad.log.Warnf("Issue gathering start time from action id %s: %v", action.ID(), err)
}
actions = append(actions, action)
}
return actions
Expand Down Expand Up @@ -197,3 +225,50 @@ func (ad *ActionDispatcher) gatherQueuedActions(ts time.Time) (queued, expired [
}
return queued, expired
}

// removeQueuedUpgrades will scan the passed actions and if there is an upgrade action it will remove all upgrade actions in the queue but not alter the passed list.
// this is done to try to only have the most recent upgrade action executed. However it does not eliminate duplicates in retrieved directly from the gateway
func (ad *ActionDispatcher) removeQueuedUpgrades(actions []fleetapi.Action) {
for _, action := range actions {
if action.Type() == fleetapi.ActionTypeUpgrade {
if n := ad.queue.CancelType(fleetapi.ActionTypeUpgrade); n > 0 {
ad.log.Debugw("New upgrade action retrieved from gateway, removing queued upgrade actions", "actions_found", n)
}
return
}
}
}

func (ad *ActionDispatcher) scheduleRetry(ctx context.Context, action fleetapi.RetryableAction, acker acker.Acker) {
attempt := action.RetryAttempt()
d, err := ad.rt.GetWait(attempt)
if err != nil {
ad.log.Errorf("No more reties for action id %s: %v", action.ID(), err)
action.SetRetryAttempt(-1)
if err := acker.Ack(ctx, action); err != nil {
ad.log.Errorf("Unable to ack action failure (id %s) to fleet-server: %v", action.ID(), err)
return
}
if err := acker.Commit(ctx); err != nil {
ad.log.Errorf("Unable to commit action failure (id %s) to fleet-server: %v", action.ID(), err)
}
return
}
attempt = attempt + 1
startTime := time.Now().UTC().Add(d)
action.SetRetryAttempt(attempt)
action.SetStartTime(startTime)
ad.log.Debugf("Adding action id: %s to queue.", action.ID())
ad.queue.Add(action, startTime.Unix())
err = ad.queue.Save()
if err != nil {
ad.log.Errorf("retry action id %s attempt %d failed to persist action_queue: %v", action.ID(), attempt, err)
}
if err := acker.Ack(ctx, action); err != nil {
ad.log.Errorf("Unable to ack action retry (id %s) to fleet-server: %v", action.ID(), err)
return
}
if err := acker.Commit(ctx); err != nil {
ad.log.Errorf("Unable to commit action retry (id %s) to fleet-server: %v", action.ID(), err)
}
}
Loading