Skip to content

Commit

Permalink
[v2] Move queue management to dispatcher (#1109)
Browse files Browse the repository at this point in the history
* Move queue management to dispatcher

Move queue management actions to the dispatcher from the fleet-server
in order to help with future work to add a retry mechanism. Add a
PersistedQueue type which wrap the ActionQueue to make persisting the
queue simpler for the consumer.

* Refactor ActionQueue

Refactor ActionQueue to only export methods that are used by consumers.
The priority queue implementation has been changed to an unexported
type. Persistency has been added and the persistedqueue type has been
removed.

* Rename persistedQueue interface to priorityQueue

* Review feedback

* failing to save queue will log message

* Chagne gateway to use copy
  • Loading branch information
michel-laterman authored Sep 15, 2022
1 parent f95c9ed commit 5051218
Show file tree
Hide file tree
Showing 7 changed files with 359 additions and 452 deletions.
71 changes: 70 additions & 1 deletion internal/pkg/agent/application/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"reflect"
"strings"
"time"

"go.elastic.co/apm"

Expand All @@ -21,6 +22,12 @@ import (

type actionHandlers map[string]actions.Handler

type priorityQueue interface {
Add(fleetapi.Action, int64)
DequeueActions() []fleetapi.Action
Save() error
}

// Dispatcher processes actions coming from fleet api.
type Dispatcher interface {
Dispatch(context.Context, acker.Acker, ...fleetapi.Action) error
Expand All @@ -31,10 +38,11 @@ type ActionDispatcher struct {
log *logger.Logger
handlers actionHandlers
def actions.Handler
queue priorityQueue
}

// New creates a new action dispatcher.
func New(log *logger.Logger, def actions.Handler) (*ActionDispatcher, error) {
func New(log *logger.Logger, def actions.Handler, queue priorityQueue) (*ActionDispatcher, error) {
var err error
if log == nil {
log, err = logger.New("action_dispatcher", false)
Expand All @@ -51,6 +59,7 @@ func New(log *logger.Logger, def actions.Handler) (*ActionDispatcher, error) {
log: log,
handlers: make(actionHandlers),
def: def,
queue: queue,
}, nil
}

Expand Down Expand Up @@ -86,6 +95,17 @@ func (ad *ActionDispatcher) Dispatch(ctx context.Context, acker acker.Acker, act
span.End()
}()

actions = ad.queueScheduledActions(actions)
actions = ad.dispatchCancelActions(ctx, actions, acker)
queued, expired := ad.gatherQueuedActions(time.Now().UTC())
ad.log.Debugf("Gathered %d actions from queue, %d actions expired", len(queued), len(expired))
ad.log.Debugf("Expired actions: %v", expired)
actions = append(actions, queued...)

if err := ad.queue.Save(); err != nil {
ad.log.Errorf("failed to persist action_queue: %v", err)
}

if len(actions) == 0 {
ad.log.Debug("No action to dispatch")
return nil
Expand Down Expand Up @@ -128,3 +148,52 @@ func detectTypes(actions []fleetapi.Action) []string {
}
return str
}

// queueScheduledActions will add any action in actions with a valid start time to the queue and return the rest.
// start time to current time comparisons are purposefully not made in case of cancel actions.
func (ad *ActionDispatcher) queueScheduledActions(input []fleetapi.Action) []fleetapi.Action {
actions := make([]fleetapi.Action, 0, len(input))
for _, action := range input {
start, err := action.StartTime()
if err == nil {
ad.log.Debugf("Adding action id: %s to queue.", action.ID())
ad.queue.Add(action, start.Unix())
continue
}
if !errors.Is(err, fleetapi.ErrNoStartTime) {
ad.log.Warnf("Issue gathering start time from action id %s: %v", action.ID(), err)
}
actions = append(actions, action)
}
return actions
}

// dispatchCancelActions will separate and dispatch any cancel actions from the actions list and return the rest of the list.
// cancel actions are dispatched seperatly as they may remove items from the queue.
func (ad *ActionDispatcher) dispatchCancelActions(ctx context.Context, actions []fleetapi.Action, acker acker.Acker) []fleetapi.Action {
for i := len(actions) - 1; i >= 0; i-- {
action := actions[i]
// If it is a cancel action, remove from list and dispatch
if action.Type() == fleetapi.ActionTypeCancel {
actions = append(actions[:i], actions[i+1:]...)
if err := ad.dispatchAction(ctx, action, acker); err != nil {
ad.log.Errorf("Unable to dispatch cancel action id %s: %v", action.ID(), err)
}
}
}
return actions
}

// gatherQueuedActions will dequeue actions from the action queue and separate those that have already expired.
func (ad *ActionDispatcher) gatherQueuedActions(ts time.Time) (queued, expired []fleetapi.Action) {
actions := ad.queue.DequeueActions()
for _, action := range actions {
exp, _ := action.Expiration()
if ts.After(exp) {
expired = append(expired, action)
continue
}
queued = append(queued, action)
}
return queued, expired
}
144 changes: 141 additions & 3 deletions internal/pkg/agent/application/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,34 @@ func (m *mockAction) Expiration() (time.Time, error) {
return args.Get(0).(time.Time), args.Error(1)
}

type mockQueue struct {
mock.Mock
}

func (m *mockQueue) Add(action fleetapi.Action, n int64) {
m.Called(action, n)
}

func (m *mockQueue) DequeueActions() []fleetapi.Action {
args := m.Called()
return args.Get(0).([]fleetapi.Action)
}

func (m *mockQueue) Save() error {
args := m.Called()
return args.Error(0)
}

func TestActionDispatcher(t *testing.T) {
ack := noop.New()

t.Run("Success to dispatch multiples events", func(t *testing.T) {
ctx := context.Background()
def := &mockHandler{}
d, err := New(nil, def)
queue := &mockQueue{}
queue.On("Save").Return(nil).Once()
queue.On("DequeueActions").Return([]fleetapi.Action{}).Once()
d, err := New(nil, def, queue)
require.NoError(t, err)

success1 := &mockHandler{}
Expand All @@ -76,7 +97,13 @@ func TestActionDispatcher(t *testing.T) {
require.NoError(t, err)

action1 := &mockAction{}
action1.On("StartTime").Return(time.Time{}, fleetapi.ErrNoStartTime)
action1.On("Type").Return("action")
action1.On("ID").Return("id")
action2 := &mockOtherAction{}
action2.On("StartTime").Return(time.Time{}, fleetapi.ErrNoStartTime)
action2.On("Type").Return("action")
action2.On("ID").Return("id")

// TODO better matching for actions
success1.On("Handle", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
Expand All @@ -88,34 +115,145 @@ func TestActionDispatcher(t *testing.T) {
success1.AssertExpectations(t)
success2.AssertExpectations(t)
def.AssertNotCalled(t, "Handle", mock.Anything, mock.Anything, mock.Anything)
queue.AssertExpectations(t)
})

t.Run("Unknown action are caught by the unknown handler", func(t *testing.T) {
def := &mockHandler{}
def.On("Handle", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
ctx := context.Background()
d, err := New(nil, def)
queue := &mockQueue{}
queue.On("Save").Return(nil).Once()
queue.On("DequeueActions").Return([]fleetapi.Action{}).Once()
d, err := New(nil, def, queue)
require.NoError(t, err)

action := &mockUnknownAction{}
action.On("StartTime").Return(time.Time{}, fleetapi.ErrNoStartTime)
action.On("Type").Return("action")
action.On("ID").Return("id")
err = d.Dispatch(ctx, ack, action)

require.NoError(t, err)
def.AssertExpectations(t)
queue.AssertExpectations(t)
})

t.Run("Could not register two handlers on the same action", func(t *testing.T) {
success1 := &mockHandler{}
success2 := &mockHandler{}

def := &mockHandler{}
d, err := New(nil, def)
queue := &mockQueue{}
d, err := New(nil, def, queue)
require.NoError(t, err)

err = d.Register(&mockAction{}, success1)
require.NoError(t, err)

err = d.Register(&mockAction{}, success2)
require.Error(t, err)
queue.AssertExpectations(t)
})

t.Run("Dispatched action is queued", func(t *testing.T) {
def := &mockHandler{}
def.On("Handle", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()

queue := &mockQueue{}
queue.On("Save").Return(nil).Once()
queue.On("DequeueActions").Return([]fleetapi.Action{}).Once()
queue.On("Add", mock.Anything, mock.Anything).Once()

d, err := New(nil, def, queue)
require.NoError(t, err)
err = d.Register(&mockAction{}, def)
require.NoError(t, err)

action1 := &mockAction{}
action1.On("StartTime").Return(time.Time{}, fleetapi.ErrNoStartTime)
action1.On("Type").Return("action")
action1.On("ID").Return("id")
action2 := &mockAction{}
action2.On("StartTime").Return(time.Now().Add(time.Hour), nil)
action2.On("Type").Return("action")
action2.On("ID").Return("id")

err = d.Dispatch(context.Background(), ack, action1, action2)
require.NoError(t, err)
def.AssertExpectations(t)
queue.AssertExpectations(t)
})

t.Run("Cancel queued action", func(t *testing.T) {
def := &mockHandler{}
def.On("Handle", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()

queue := &mockQueue{}
queue.On("Save").Return(nil).Once()
queue.On("DequeueActions").Return([]fleetapi.Action{}).Once()

d, err := New(nil, def, queue)
require.NoError(t, err)
err = d.Register(&mockAction{}, def)
require.NoError(t, err)

action := &mockAction{}
action.On("StartTime").Return(time.Time{}, fleetapi.ErrNoStartTime)
action.On("Type").Return(fleetapi.ActionTypeCancel)
action.On("ID").Return("id")

err = d.Dispatch(context.Background(), ack, action)
require.NoError(t, err)
def.AssertExpectations(t)
queue.AssertExpectations(t)
})

t.Run("Retrieve actions from queue", func(t *testing.T) {
def := &mockHandler{}
def.On("Handle", mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice()

action1 := &mockAction{}
action1.On("StartTime").Return(time.Time{}, fleetapi.ErrNoStartTime)
action1.On("Expiration").Return(time.Now().Add(time.Hour), fleetapi.ErrNoStartTime)
action1.On("Type").Return(fleetapi.ActionTypeCancel)
action1.On("ID").Return("id")

queue := &mockQueue{}
queue.On("Save").Return(nil).Once()
queue.On("DequeueActions").Return([]fleetapi.Action{action1}).Once()

d, err := New(nil, def, queue)
require.NoError(t, err)
err = d.Register(&mockAction{}, def)
require.NoError(t, err)

action2 := &mockAction{}
action2.On("StartTime").Return(time.Time{}, fleetapi.ErrNoStartTime)
action2.On("Type").Return(fleetapi.ActionTypeCancel)
action2.On("ID").Return("id")

err = d.Dispatch(context.Background(), ack, action2)
require.NoError(t, err)
def.AssertExpectations(t)
queue.AssertExpectations(t)
})

t.Run("Retrieve no actions from queue", func(t *testing.T) {
def := &mockHandler{}
def.On("Handle", mock.Anything, mock.Anything, mock.Anything).Return(nil)

queue := &mockQueue{}
queue.On("Save").Return(nil).Once()
queue.On("DequeueActions").Return([]fleetapi.Action{}).Once()

d, err := New(nil, def, queue)
require.NoError(t, err)
err = d.Register(&mockAction{}, def)
require.NoError(t, err)

err = d.Dispatch(context.Background(), ack)
require.NoError(t, err)
def.AssertNotCalled(t, "Handle", mock.Anything, mock.Anything, mock.Anything)
})
}
Loading

0 comments on commit 5051218

Please sign in to comment.