Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pkger): handle edge cases for telegraf configs and tasks that manifest from user interaction #17989

Merged
merged 2 commits into from
May 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
### Features

1. [17934](https://github.com/influxdata/influxdb/pull/17934): Add ability to delete a stack and all the resources associated with it
1. [17941](https://github.com/influxdata/influxdb/pull/17941): Encorce dns name compliance on all pkger resources' metadata.name field
1. [17941](https://github.com/influxdata/influxdb/pull/17941): Enforce DNS name compliance on all pkger resources' metadata.name field
1. [17989](https://github.com/influxdata/influxdb/pull/17989): Add stateful pkg management with stacks

### Bug Fixes

Expand Down
84 changes: 84 additions & 0 deletions cmd/influxd/launcher/pkger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1325,6 +1325,78 @@ func TestLauncher_Pkger(t *testing.T) {
})
})
})

t.Run("when a user has deleted a task that was previously created by a stack", func(t *testing.T) {
testUserDeletedTask := func(t *testing.T, actionFn func(t *testing.T, stackID influxdb.ID, initialObj pkger.Object, initialSum pkger.Summary)) {
t.Helper()

obj := newTaskObject("task-1", "", "")
stackID, cleanup, initialSum := initializeStackPkg(t, newPkg(obj))
defer cleanup()

require.Len(t, initialSum.Tasks, 1)
require.NotZero(t, initialSum.Tasks[0].ID)
resourceCheck.mustDeleteTask(t, influxdb.ID(initialSum.Tasks[0].ID))

actionFn(t, stackID, obj, initialSum)
}

t.Run("should create new resource when attempting to update", func(t *testing.T) {
testUserDeletedTask(t, func(t *testing.T, stackID influxdb.ID, initialObj pkger.Object, initialSum pkger.Summary) {
pkg := newPkg(initialObj)
updateSum, _, err := svc.Apply(ctx, l.Org.ID, l.User.ID, pkg, pkger.ApplyWithStackID(stackID))
require.NoError(t, err)

require.Len(t, updateSum.Tasks, 1)
initial, updated := initialSum.Tasks[0], updateSum.Tasks[0]
assert.NotEqual(t, initial.ID, updated.ID)
initial.ID, updated.ID = 0, 0
assert.Equal(t, initial, updated)
})
})

t.Run("should not error when attempting to remove", func(t *testing.T) {
testUserDeletedTask(t, func(t *testing.T, stackID influxdb.ID, _ pkger.Object, _ pkger.Summary) {
testValidRemoval(t, stackID)
})
})
})

t.Run("when a user has deleted a telegraf config that was previously created by a stack", func(t *testing.T) {
testUserDeletedTelegraf := func(t *testing.T, actionFn func(t *testing.T, stackID influxdb.ID, initialObj pkger.Object, initialSum pkger.Summary)) {
t.Helper()

obj := newTelegrafObject("tele-1", "", "")
stackID, cleanup, initialSum := initializeStackPkg(t, newPkg(obj))
defer cleanup()

require.Len(t, initialSum.TelegrafConfigs, 1)
require.NotZero(t, initialSum.TelegrafConfigs[0].TelegrafConfig.ID)
resourceCheck.mustDeleteTelegrafConfig(t, initialSum.TelegrafConfigs[0].TelegrafConfig.ID)

actionFn(t, stackID, obj, initialSum)
}

t.Run("should create new resource when attempting to update", func(t *testing.T) {
testUserDeletedTelegraf(t, func(t *testing.T, stackID influxdb.ID, initialObj pkger.Object, initialSum pkger.Summary) {
pkg := newPkg(initialObj)
updateSum, _, err := svc.Apply(ctx, l.Org.ID, l.User.ID, pkg, pkger.ApplyWithStackID(stackID))
require.NoError(t, err)

require.Len(t, updateSum.TelegrafConfigs, 1)
initial, updated := initialSum.TelegrafConfigs[0].TelegrafConfig, updateSum.TelegrafConfigs[0].TelegrafConfig
assert.NotEqual(t, initial.ID, updated.ID)
initial.ID, updated.ID = 0, 0
assert.Equal(t, initial, updated)
})
})

t.Run("should not error when attempting to remove", func(t *testing.T) {
testUserDeletedTelegraf(t, func(t *testing.T, stackID influxdb.ID, _ pkger.Object, _ pkger.Summary) {
testValidRemoval(t, stackID)
})
})
})
})
})

Expand Down Expand Up @@ -3016,6 +3088,12 @@ func (r resourceChecker) mustGetTask(t *testing.T, getOpt getResourceOptFn) http
return task
}

func (r resourceChecker) mustDeleteTask(t *testing.T, id influxdb.ID) {
t.Helper()

require.NoError(t, r.tl.TaskService(t).DeleteTask(ctx, id))
}

func (r resourceChecker) getTelegrafConfig(t *testing.T, getOpt getResourceOptFn) (influxdb.TelegrafConfig, error) {
t.Helper()

Expand Down Expand Up @@ -3056,6 +3134,12 @@ func (r resourceChecker) mustGetTelegrafConfig(t *testing.T, getOpt getResourceO
return tele
}

func (r resourceChecker) mustDeleteTelegrafConfig(t *testing.T, id influxdb.ID) {
t.Helper()

require.NoError(t, r.tl.TelegrafService(t).DeleteTelegrafConfig(ctx, id))
}

func (r resourceChecker) getVariable(t *testing.T, getOpt getResourceOptFn) (influxdb.Variable, error) {
t.Helper()

Expand Down
26 changes: 20 additions & 6 deletions pkger/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2183,13 +2183,16 @@ func (s *Service) applyTasks(ctx context.Context, tasks []*stateTask) applier {
}

func (s *Service) applyTask(ctx context.Context, userID influxdb.ID, t *stateTask) (influxdb.Task, error) {
switch t.stateStatus {
case StateStatusRemove:
switch {
case IsRemoval(t.stateStatus):
if err := s.taskSVC.DeleteTask(ctx, t.ID()); err != nil {
if influxdb.ErrorCode(err) == influxdb.ENotFound {
return influxdb.Task{}, nil
}
return influxdb.Task{}, ierrors.Wrap(err, "failed to delete task")
}
return *t.existing, nil
case StateStatusExists:
case IsExisting(t.stateStatus) && t.existing != nil:
newFlux := t.parserTask.flux()
newStatus := string(t.parserTask.Status())
opt := options.Options{
Expand Down Expand Up @@ -2231,6 +2234,10 @@ func (s *Service) applyTask(ctx context.Context, userID influxdb.ID, t *stateTas

func (s *Service) rollbackTasks(ctx context.Context, tasks []*stateTask) error {
rollbackFn := func(t *stateTask) error {
if !IsNew(t.stateStatus) && t.existing == nil {
return nil
}

var err error
switch t.stateStatus {
case StateStatusRemove:
Expand Down Expand Up @@ -2333,13 +2340,16 @@ func (s *Service) applyTelegrafs(ctx context.Context, userID influxdb.ID, teles
}

func (s *Service) applyTelegrafConfig(ctx context.Context, userID influxdb.ID, t *stateTelegraf) (influxdb.TelegrafConfig, error) {
switch t.stateStatus {
case StateStatusRemove:
switch {
case IsRemoval(t.stateStatus):
if err := s.teleSVC.DeleteTelegrafConfig(ctx, t.ID()); err != nil {
if influxdb.ErrorCode(err) == influxdb.ENotFound {
return influxdb.TelegrafConfig{}, nil
}
return influxdb.TelegrafConfig{}, ierrors.Wrap(err, "failed to delete config")
}
return *t.existing, nil
case StateStatusExists:
case IsExisting(t.stateStatus) && t.existing != nil:
cfg := t.summarize().TelegrafConfig
updatedConfig, err := s.teleSVC.UpdateTelegrafConfig(ctx, t.ID(), &cfg, userID)
if err != nil {
Expand All @@ -2358,6 +2368,10 @@ func (s *Service) applyTelegrafConfig(ctx context.Context, userID influxdb.ID, t

func (s *Service) rollbackTelegrafConfigs(ctx context.Context, userID influxdb.ID, cfgs []*stateTelegraf) error {
rollbackFn := func(t *stateTelegraf) error {
if !IsNew(t.stateStatus) && t.existing == nil {
return nil
}

var err error
switch t.stateStatus {
case StateStatusRemove:
Expand Down