diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f1135c8fb0..5a4370e9249 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,8 @@ ### Features 1. [17934](https://github.com/influxdata/influxdb/pull/17934): Add ability to delete a stack and all the resources associated with it -1. [17941](https://github.com/influxdata/influxdb/pull/17941): Encorce dns name compliance on all pkger resources' metadata.name field +1. [17941](https://github.com/influxdata/influxdb/pull/17941): Enforce DNS name compliance on all pkger resources' metadata.name field +1. [17989](https://github.com/influxdata/influxdb/pull/17989): Add stateful pkg management with stacks ### Bug Fixes diff --git a/cmd/influxd/launcher/pkger_test.go b/cmd/influxd/launcher/pkger_test.go index f7132ec1863..6a2f34a0302 100644 --- a/cmd/influxd/launcher/pkger_test.go +++ b/cmd/influxd/launcher/pkger_test.go @@ -1325,6 +1325,78 @@ func TestLauncher_Pkger(t *testing.T) { }) }) }) + + t.Run("when a user has deleted a task that was previously created by a stack", func(t *testing.T) { + testUserDeletedTask := func(t *testing.T, actionFn func(t *testing.T, stackID influxdb.ID, initialObj pkger.Object, initialSum pkger.Summary)) { + t.Helper() + + obj := newTaskObject("task-1", "", "") + stackID, cleanup, initialSum := initializeStackPkg(t, newPkg(obj)) + defer cleanup() + + require.Len(t, initialSum.Tasks, 1) + require.NotZero(t, initialSum.Tasks[0].ID) + resourceCheck.mustDeleteTask(t, influxdb.ID(initialSum.Tasks[0].ID)) + + actionFn(t, stackID, obj, initialSum) + } + + t.Run("should create new resource when attempting to update", func(t *testing.T) { + testUserDeletedTask(t, func(t *testing.T, stackID influxdb.ID, initialObj pkger.Object, initialSum pkger.Summary) { + pkg := newPkg(initialObj) + updateSum, _, err := svc.Apply(ctx, l.Org.ID, l.User.ID, pkg, pkger.ApplyWithStackID(stackID)) + require.NoError(t, err) + + require.Len(t, updateSum.Tasks, 1) + initial, updated := initialSum.Tasks[0], updateSum.Tasks[0] + assert.NotEqual(t, initial.ID, updated.ID) + initial.ID, updated.ID = 0, 0 + assert.Equal(t, initial, updated) + }) + }) + + t.Run("should not error when attempting to remove", func(t *testing.T) { + testUserDeletedTask(t, func(t *testing.T, stackID influxdb.ID, _ pkger.Object, _ pkger.Summary) { + testValidRemoval(t, stackID) + }) + }) + }) + + t.Run("when a user has deleted a telegraf config that was previously created by a stack", func(t *testing.T) { + testUserDeletedTelegraf := func(t *testing.T, actionFn func(t *testing.T, stackID influxdb.ID, initialObj pkger.Object, initialSum pkger.Summary)) { + t.Helper() + + obj := newTelegrafObject("tele-1", "", "") + stackID, cleanup, initialSum := initializeStackPkg(t, newPkg(obj)) + defer cleanup() + + require.Len(t, initialSum.TelegrafConfigs, 1) + require.NotZero(t, initialSum.TelegrafConfigs[0].TelegrafConfig.ID) + resourceCheck.mustDeleteTelegrafConfig(t, initialSum.TelegrafConfigs[0].TelegrafConfig.ID) + + actionFn(t, stackID, obj, initialSum) + } + + t.Run("should create new resource when attempting to update", func(t *testing.T) { + testUserDeletedTelegraf(t, func(t *testing.T, stackID influxdb.ID, initialObj pkger.Object, initialSum pkger.Summary) { + pkg := newPkg(initialObj) + updateSum, _, err := svc.Apply(ctx, l.Org.ID, l.User.ID, pkg, pkger.ApplyWithStackID(stackID)) + require.NoError(t, err) + + require.Len(t, updateSum.TelegrafConfigs, 1) + initial, updated := initialSum.TelegrafConfigs[0].TelegrafConfig, updateSum.TelegrafConfigs[0].TelegrafConfig + assert.NotEqual(t, initial.ID, updated.ID) + initial.ID, updated.ID = 0, 0 + assert.Equal(t, initial, updated) + }) + }) + + t.Run("should not error when attempting to remove", func(t *testing.T) { + testUserDeletedTelegraf(t, func(t *testing.T, stackID influxdb.ID, _ pkger.Object, _ pkger.Summary) { + testValidRemoval(t, stackID) + }) + }) + }) }) }) @@ -3016,6 +3088,12 @@ func (r resourceChecker) mustGetTask(t *testing.T, getOpt getResourceOptFn) http return task } +func (r resourceChecker) mustDeleteTask(t *testing.T, id influxdb.ID) { + t.Helper() + + require.NoError(t, r.tl.TaskService(t).DeleteTask(ctx, id)) +} + func (r resourceChecker) getTelegrafConfig(t *testing.T, getOpt getResourceOptFn) (influxdb.TelegrafConfig, error) { t.Helper() @@ -3056,6 +3134,12 @@ func (r resourceChecker) mustGetTelegrafConfig(t *testing.T, getOpt getResourceO return tele } +func (r resourceChecker) mustDeleteTelegrafConfig(t *testing.T, id influxdb.ID) { + t.Helper() + + require.NoError(t, r.tl.TelegrafService(t).DeleteTelegrafConfig(ctx, id)) +} + func (r resourceChecker) getVariable(t *testing.T, getOpt getResourceOptFn) (influxdb.Variable, error) { t.Helper() diff --git a/pkger/service.go b/pkger/service.go index 03b7101ebdc..6594810f189 100644 --- a/pkger/service.go +++ b/pkger/service.go @@ -2183,13 +2183,16 @@ func (s *Service) applyTasks(ctx context.Context, tasks []*stateTask) applier { } func (s *Service) applyTask(ctx context.Context, userID influxdb.ID, t *stateTask) (influxdb.Task, error) { - switch t.stateStatus { - case StateStatusRemove: + switch { + case IsRemoval(t.stateStatus): if err := s.taskSVC.DeleteTask(ctx, t.ID()); err != nil { + if influxdb.ErrorCode(err) == influxdb.ENotFound { + return influxdb.Task{}, nil + } return influxdb.Task{}, ierrors.Wrap(err, "failed to delete task") } return *t.existing, nil - case StateStatusExists: + case IsExisting(t.stateStatus) && t.existing != nil: newFlux := t.parserTask.flux() newStatus := string(t.parserTask.Status()) opt := options.Options{ @@ -2231,6 +2234,10 @@ func (s *Service) applyTask(ctx context.Context, userID influxdb.ID, t *stateTas func (s *Service) rollbackTasks(ctx context.Context, tasks []*stateTask) error { rollbackFn := func(t *stateTask) error { + if !IsNew(t.stateStatus) && t.existing == nil { + return nil + } + var err error switch t.stateStatus { case StateStatusRemove: @@ -2333,13 +2340,16 @@ func (s *Service) applyTelegrafs(ctx context.Context, userID influxdb.ID, teles } func (s *Service) applyTelegrafConfig(ctx context.Context, userID influxdb.ID, t *stateTelegraf) (influxdb.TelegrafConfig, error) { - switch t.stateStatus { - case StateStatusRemove: + switch { + case IsRemoval(t.stateStatus): if err := s.teleSVC.DeleteTelegrafConfig(ctx, t.ID()); err != nil { + if influxdb.ErrorCode(err) == influxdb.ENotFound { + return influxdb.TelegrafConfig{}, nil + } return influxdb.TelegrafConfig{}, ierrors.Wrap(err, "failed to delete config") } return *t.existing, nil - case StateStatusExists: + case IsExisting(t.stateStatus) && t.existing != nil: cfg := t.summarize().TelegrafConfig updatedConfig, err := s.teleSVC.UpdateTelegrafConfig(ctx, t.ID(), &cfg, userID) if err != nil { @@ -2358,6 +2368,10 @@ func (s *Service) applyTelegrafConfig(ctx context.Context, userID influxdb.ID, t func (s *Service) rollbackTelegrafConfigs(ctx context.Context, userID influxdb.ID, cfgs []*stateTelegraf) error { rollbackFn := func(t *stateTelegraf) error { + if !IsNew(t.stateStatus) && t.existing == nil { + return nil + } + var err error switch t.stateStatus { case StateStatusRemove: