From c9431bceb83eb4f98728461c0f872f3469e8b161 Mon Sep 17 00:00:00 2001 From: Johnny Steenbergen Date: Mon, 23 Dec 2019 11:51:00 -0800 Subject: [PATCH] feat(pkger): add apply functionality for task resource --- CHANGELOG.md | 1 + cmd/influx/pkg.go | 24 +++++- cmd/influxd/launcher/launcher.go | 4 +- cmd/influxd/launcher/launcher_helpers.go | 4 + cmd/influxd/launcher/pkger_test.go | 27 +++++-- http/swagger.yml | 21 ++++++ mock/task_service.go | 82 ++++++++++++++++++--- pkger/models.go | 44 +++++++++++ pkger/parser_test.go | 2 +- pkger/service.go | 61 +++++++++++++++ pkger/service_test.go | 94 ++++++++++++++++++++++++ pkger/testdata/tasks.json | 4 +- pkger/testdata/tasks.yml | 4 +- 13 files changed, 348 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d4642f505c2..451449288b3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ 1. [16312](https://github.com/influxdata/influxdb/pull/16312): Add support for notification rule pkger export functionality 1. [16320](https://github.com/influxdata/influxdb/pull/16320): Add support for tasks to pkger parser 1. [16322](https://github.com/influxdata/influxdb/pull/16322): Add support for tasks to pkger dry run functionality +1. [16323](https://github.com/influxdata/influxdb/pull/16323): Add support for tasks to pkger apply functionality ### Bug Fixes diff --git a/cmd/influx/pkg.go b/cmd/influx/pkg.go index 874aac463cb..d2d667f4450 100644 --- a/cmd/influx/pkg.go +++ b/cmd/influx/pkg.go @@ -702,13 +702,18 @@ func (b *cmdPkgBuilder) printPkgDiff(diff pkger.Diff) { } if tasks := diff.Tasks; len(tasks) > 0 { - headers := []string{"New", "Name", "Description"} + headers := []string{"New", "Name", "Description", "Cycle"} tablePrintFn("TASKS", headers, len(tasks), func(i int) []string { t := tasks[i] + timing := fmt.Sprintf("every: %s offset: %s", t.Every, t.Offset) + if t.Cron != "" { + timing = t.Cron + } return []string{ boolDiff(true), t.Name, green(t.Description), + green(timing), } }) } @@ -822,6 +827,23 @@ func (b *cmdPkgBuilder) printPkgSummary(sum pkger.Summary) { }) } + if tasks := sum.Tasks; len(tasks) > 0 { + headers := []string{"ID", "Name", "Description", "Cycle"} + tablePrintFn("TASKS", headers, len(tasks), func(i int) []string { + t := tasks[i] + timing := fmt.Sprintf("every: %s offset: %s", t.Every, t.Offset) + if t.Cron != "" { + timing = t.Cron + } + return []string{ + t.ID.String(), + t.Name, + t.Description, + timing, + } + }) + } + if teles := sum.TelegrafConfigs; len(teles) > 0 { headers := []string{"ID", "Name", "Description"} tablePrintFn("TELEGRAF CONFIGS", headers, len(teles), func(i int) []string { diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index 716fbefd444..3ead3021480 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -838,8 +838,9 @@ func (m *Launcher) run(ctx context.Context) (err error) { b := m.apibackend authedOrgSVC := authorizer.NewOrgService(b.OrganizationService) authedURMSVC := authorizer.NewURMService(b.OrgLookupService, b.UserResourceMappingService) + pkgerLogger := m.log.With(zap.String("service", "pkger")) pkgSVC = pkger.NewService( - pkger.WithLogger(m.log.With(zap.String("service", "pkger"))), + pkger.WithLogger(pkgerLogger), pkger.WithBucketSVC(authorizer.NewBucketService(b.BucketService)), pkger.WithCheckSVC(authorizer.NewCheckService(b.CheckService, authedURMSVC, authedOrgSVC)), pkger.WithDashboardSVC(authorizer.NewDashboardService(b.DashboardService)), @@ -847,6 +848,7 @@ func (m *Launcher) run(ctx context.Context) (err error) { pkger.WithNotificationEndpointSVC(authorizer.NewNotificationEndpointService(b.NotificationEndpointService, authedURMSVC, authedOrgSVC)), pkger.WithNotificationRuleSVC(authorizer.NewNotificationRuleStore(b.NotificationRuleStore, authedURMSVC, authedOrgSVC)), pkger.WithSecretSVC(authorizer.NewSecretService(b.SecretService)), + pkger.WithTaskSVC(authorizer.NewTaskService(pkgerLogger, b.TaskService)), pkger.WithTelegrafSVC(authorizer.NewTelegrafConfigService(b.TelegrafService, b.UserResourceMappingService)), pkger.WithVariableSVC(authorizer.NewVariableService(b.VariableService)), ) diff --git a/cmd/influxd/launcher/launcher_helpers.go b/cmd/influxd/launcher/launcher_helpers.go index fafd77c717b..3c64a95b519 100644 --- a/cmd/influxd/launcher/launcher_helpers.go +++ b/cmd/influxd/launcher/launcher_helpers.go @@ -362,6 +362,10 @@ func (tl *TestLauncher) PkgerService(tb testing.TB) pkger.SVC { return &http.PkgerService{Client: tl.HTTPClient(tb)} } +func (tl *TestLauncher) TaskServiceKV() platform.TaskService { + return tl.kvService +} + func (tl *TestLauncher) TelegrafService(tb testing.TB) *http.TelegrafService { tb.Helper() return http.NewTelegrafService(tl.HTTPClient(tb)) diff --git a/cmd/influxd/launcher/pkger_test.go b/cmd/influxd/launcher/pkger_test.go index 880a7f1f466..0e00580a459 100644 --- a/cmd/influxd/launcher/pkger_test.go +++ b/cmd/influxd/launcher/pkger_test.go @@ -49,6 +49,7 @@ func TestLauncher_Pkger(t *testing.T) { }), pkger.WithNotificationEndpointSVC(l.NotificationEndpointService(t)), pkger.WithNotificationRuleSVC(l.NotificationRuleService()), + pkger.WithTaskSVC(l.TaskServiceKV()), pkger.WithTelegrafSVC(l.TelegrafService(t)), pkger.WithVariableSVC(l.VariableService(t)), ) @@ -88,6 +89,12 @@ func TestLauncher_Pkger(t *testing.T) { require.NoError(t, err) assert.Empty(t, rules) + tasks, _, err := l.TaskServiceKV().FindTasks(ctx, influxdb.TaskFilter{ + OrganizationID: &l.Org.ID, + }) + require.NoError(t, err) + assert.Empty(t, tasks) + teles, _, err := l.TelegrafService(t).FindTelegrafConfigs(ctx, influxdb.TelegrafConfigFilter{ OrgID: &l.Org.ID, }) @@ -258,6 +265,12 @@ func TestLauncher_Pkger(t *testing.T) { assert.Equal(t, "http_none_auth_notification_endpoint", rule.EndpointName) assert.Equal(t, "http", rule.EndpointType) + require.Len(t, sum1.Tasks, 1) + task := sum1.Tasks[0] + assert.NotZero(t, task.ID) + assert.Equal(t, "task_1", task.Name) + assert.Equal(t, "desc_1", task.Description) + teles := sum1.TelegrafConfigs require.Len(t, teles, 1) assert.NotZero(t, teles[0].TelegrafConfig.ID) @@ -292,9 +305,14 @@ func TestLauncher_Pkger(t *testing.T) { mappings := sum1.LabelMappings require.Len(t, mappings, 9) hasMapping(t, mappings, newSumMapping(bkts[0].ID, bkts[0].Name, influxdb.BucketsResourceType)) + hasMapping(t, mappings, newSumMapping(pkger.SafeID(checks[0].Check.GetID()), checks[0].Check.GetName(), influxdb.ChecksResourceType)) + hasMapping(t, mappings, newSumMapping(pkger.SafeID(checks[1].Check.GetID()), checks[1].Check.GetName(), influxdb.ChecksResourceType)) hasMapping(t, mappings, newSumMapping(dashs[0].ID, dashs[0].Name, influxdb.DashboardsResourceType)) - hasMapping(t, mappings, newSumMapping(vars[0].ID, vars[0].Name, influxdb.VariablesResourceType)) + hasMapping(t, mappings, newSumMapping(pkger.SafeID(endpoints[0].NotificationEndpoint.GetID()), endpoints[0].NotificationEndpoint.GetName(), influxdb.NotificationEndpointResourceType)) + hasMapping(t, mappings, newSumMapping(rule.ID, rule.Name, influxdb.NotificationRuleResourceType)) + hasMapping(t, mappings, newSumMapping(task.ID, task.Name, influxdb.TasksResourceType)) hasMapping(t, mappings, newSumMapping(pkger.SafeID(teles[0].TelegrafConfig.ID), teles[0].TelegrafConfig.Name, influxdb.TelegrafsResourceType)) + hasMapping(t, mappings, newSumMapping(vars[0].ID, vars[0].Name, influxdb.VariablesResourceType)) t.Run("pkg with same bkt-var-label does nto create new resources for them", func(t *testing.T) { // validate the new package doesn't create new resources for bkts/labels/vars @@ -506,6 +524,7 @@ spec: pkger.WithDashboardSVC(l.DashboardService(t)), pkger.WithLabelSVC(l.LabelService(t)), pkger.WithNotificationEndpointSVC(l.NotificationEndpointService(t)), + pkger.WithTaskSVC(l.TaskServiceKV()), pkger.WithTelegrafSVC(l.TelegrafService(t)), pkger.WithVariableSVC(l.VariableService(t)), ) @@ -714,11 +733,7 @@ spec: cron: 15 * * * * query: > from(bucket: "rucket_1") - |> range(start: v.timeRangeStart, stop: v.timeRangeStop) - |> filter(fn: (r) => r._measurement == "cpu") - |> filter(fn: (r) => r._field == "usage_idle") - |> aggregateWindow(every: 1m, fn: mean) - |> yield(name: "mean") + |> yield() associations: - kind: Label name: label_1 diff --git a/http/swagger.yml b/http/swagger.yml index 1693524d8ad..97238aaa927 100644 --- a/http/swagger.yml +++ b/http/swagger.yml @@ -7321,6 +7321,27 @@ components: type: array items: $ref: "#/components/schemas/PkgSummaryLabel" + tasks: + type: array + items: + type: object + properties: + id: + type: string + name: + type: string + cron: + type: string + description: + type: string + every: + type: string + offset: + type: string + query: + type: string + status: + type: string telegrafConfigs: type: array items: diff --git a/mock/task_service.go b/mock/task_service.go index 42f14404321..7711ac2b01b 100644 --- a/mock/task_service.go +++ b/mock/task_service.go @@ -12,60 +12,120 @@ var _ influxdb.TaskService = (*TaskService)(nil) var _ backend.TaskControlService = (*TaskControlService)(nil) type TaskService struct { - FindTaskByIDFn func(context.Context, influxdb.ID) (*influxdb.Task, error) - FindTasksFn func(context.Context, influxdb.TaskFilter) ([]*influxdb.Task, int, error) - CreateTaskFn func(context.Context, influxdb.TaskCreate) (*influxdb.Task, error) - UpdateTaskFn func(context.Context, influxdb.ID, influxdb.TaskUpdate) (*influxdb.Task, error) - DeleteTaskFn func(context.Context, influxdb.ID) error - FindLogsFn func(context.Context, influxdb.LogFilter) ([]*influxdb.Log, int, error) - FindRunsFn func(context.Context, influxdb.RunFilter) ([]*influxdb.Run, int, error) - FindRunByIDFn func(context.Context, influxdb.ID, influxdb.ID) (*influxdb.Run, error) - CancelRunFn func(context.Context, influxdb.ID, influxdb.ID) error - RetryRunFn func(context.Context, influxdb.ID, influxdb.ID) (*influxdb.Run, error) - ForceRunFn func(context.Context, influxdb.ID, int64) (*influxdb.Run, error) + FindTaskByIDFn func(context.Context, influxdb.ID) (*influxdb.Task, error) + FindTaskByIDCalls SafeCount + FindTasksFn func(context.Context, influxdb.TaskFilter) ([]*influxdb.Task, int, error) + FindTasksCalls SafeCount + CreateTaskFn func(context.Context, influxdb.TaskCreate) (*influxdb.Task, error) + CreateTaskCalls SafeCount + UpdateTaskFn func(context.Context, influxdb.ID, influxdb.TaskUpdate) (*influxdb.Task, error) + UpdateTaskCalls SafeCount + DeleteTaskFn func(context.Context, influxdb.ID) error + DeleteTaskCalls SafeCount + FindLogsFn func(context.Context, influxdb.LogFilter) ([]*influxdb.Log, int, error) + FindLogsCalls SafeCount + FindRunsFn func(context.Context, influxdb.RunFilter) ([]*influxdb.Run, int, error) + FindRunsCalls SafeCount + FindRunByIDFn func(context.Context, influxdb.ID, influxdb.ID) (*influxdb.Run, error) + FindRunByIDCalls SafeCount + CancelRunFn func(context.Context, influxdb.ID, influxdb.ID) error + CancelRunCalls SafeCount + RetryRunFn func(context.Context, influxdb.ID, influxdb.ID) (*influxdb.Run, error) + RetryRunCalls SafeCount + ForceRunFn func(context.Context, influxdb.ID, int64) (*influxdb.Run, error) + ForceRunCalls SafeCount +} + +func NewTaskService() *TaskService { + return &TaskService{ + FindTaskByIDFn: func(ctx context.Context, id influxdb.ID) (*influxdb.Task, error) { + return nil, nil + }, + FindTasksFn: func(ctx context.Context, f influxdb.TaskFilter) ([]*influxdb.Task, int, error) { + return nil, 0, nil + }, + CreateTaskFn: func(ctx context.Context, taskCreate influxdb.TaskCreate) (*influxdb.Task, error) { + return nil, nil + }, + UpdateTaskFn: func(ctx context.Context, id influxdb.ID, update influxdb.TaskUpdate) (*influxdb.Task, error) { + return nil, nil + }, + DeleteTaskFn: func(ctx context.Context, id influxdb.ID) error { + return nil + }, + FindLogsFn: func(ctx context.Context, f influxdb.LogFilter) ([]*influxdb.Log, int, error) { + return nil, 0, nil + }, + FindRunsFn: func(ctx context.Context, f influxdb.RunFilter) ([]*influxdb.Run, int, error) { + return nil, 0, nil + }, + FindRunByIDFn: func(ctx context.Context, id influxdb.ID, id2 influxdb.ID) (*influxdb.Run, error) { + return nil, nil + }, + CancelRunFn: func(ctx context.Context, id influxdb.ID, id2 influxdb.ID) error { + return nil + }, + RetryRunFn: func(ctx context.Context, id influxdb.ID, id2 influxdb.ID) (*influxdb.Run, error) { + return nil, nil + }, + ForceRunFn: func(ctx context.Context, id influxdb.ID, i int64) (*influxdb.Run, error) { + return nil, nil + }, + } } func (s *TaskService) FindTaskByID(ctx context.Context, id influxdb.ID) (*influxdb.Task, error) { + defer s.FindTaskByIDCalls.IncrFn()() return s.FindTaskByIDFn(ctx, id) } func (s *TaskService) FindTasks(ctx context.Context, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) { + defer s.FindTasksCalls.IncrFn()() return s.FindTasksFn(ctx, filter) } func (s *TaskService) CreateTask(ctx context.Context, t influxdb.TaskCreate) (*influxdb.Task, error) { + defer s.CreateTaskCalls.IncrFn()() return s.CreateTaskFn(ctx, t) } func (s *TaskService) UpdateTask(ctx context.Context, id influxdb.ID, upd influxdb.TaskUpdate) (*influxdb.Task, error) { + defer s.UpdateTaskCalls.IncrFn()() return s.UpdateTaskFn(ctx, id, upd) } func (s *TaskService) DeleteTask(ctx context.Context, id influxdb.ID) error { + defer s.DeleteTaskCalls.IncrFn()() return s.DeleteTaskFn(ctx, id) } func (s *TaskService) FindLogs(ctx context.Context, filter influxdb.LogFilter) ([]*influxdb.Log, int, error) { + defer s.FindLogsCalls.IncrFn()() return s.FindLogsFn(ctx, filter) } func (s *TaskService) FindRuns(ctx context.Context, filter influxdb.RunFilter) ([]*influxdb.Run, int, error) { + defer s.FindRunsCalls.IncrFn()() return s.FindRunsFn(ctx, filter) } func (s *TaskService) FindRunByID(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) { + defer s.FindRunByIDCalls.IncrFn()() return s.FindRunByIDFn(ctx, taskID, runID) } func (s *TaskService) CancelRun(ctx context.Context, taskID, runID influxdb.ID) error { + defer s.CancelRunCalls.IncrFn()() return s.CancelRunFn(ctx, taskID, runID) } func (s *TaskService) RetryRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) { + defer s.RetryRunCalls.IncrFn()() return s.RetryRunFn(ctx, taskID, runID) } func (s *TaskService) ForceRun(ctx context.Context, taskID influxdb.ID, scheduledFor int64) (*influxdb.Run, error) { + defer s.ForceRunCalls.IncrFn()() return s.ForceRunFn(ctx, taskID, scheduledFor) } diff --git a/pkger/models.go b/pkger/models.go index 26224b6d87f..6e2353cc8be 100644 --- a/pkger/models.go +++ b/pkger/models.go @@ -754,6 +754,7 @@ type SummaryLabelMapping struct { // SummaryTask provides a summary of a task. type SummaryTask struct { + ID SafeID `json:"id"` Name string `json:"name"` Cron string `json:"cron"` Description string `json:"description"` @@ -1808,6 +1809,8 @@ const ( ) type task struct { + id influxdb.ID + orgID influxdb.ID name string cron string description string @@ -1819,6 +1822,18 @@ type task struct { labels sortedLabels } +func (t *task) Exists() bool { + return false +} + +func (t *task) ID() influxdb.ID { + return t.id +} + +func (t *task) Labels() []*label { + return t.labels +} + func (t *task) Name() string { return t.name } @@ -1834,8 +1849,27 @@ func (t *task) Status() influxdb.Status { return influxdb.Status(t.status) } +func (t *task) flux() string { + taskOpts := []string{fmt.Sprintf("name: %q", t.name)} + if t.cron != "" { + taskOpts = append(taskOpts, fmt.Sprintf("cron: %q", t.cron)) + } + if t.every > 0 { + taskOpts = append(taskOpts, fmt.Sprintf("every: %s", t.every)) + } + if t.offset > 0 { + taskOpts = append(taskOpts, fmt.Sprintf("offset: %s", t.offset)) + } + // this is required by the API, super nasty. Will be super challenging for + // anyone outside org to figure out how to do this within an hour of looking + // at the API :sadpanda:. Would be ideal to let the API translate the arguments + // into this required form instead of forcing that complexity on the caller. + return fmt.Sprintf("option task = { %s }\n%s", strings.Join(taskOpts, ", "), t.query) +} + func (t *task) summarize() SummaryTask { return SummaryTask{ + ID: SafeID(t.ID()), Name: t.Name(), Cron: t.cron, Description: t.description, @@ -1879,6 +1913,16 @@ func (t *task) valid() []validationErr { return vErrs } +type mapperTasks []*task + +func (m mapperTasks) Association(i int) labelAssociater { + return m[i] +} + +func (m mapperTasks) Len() int { + return len(m) +} + const ( fieldTelegrafConfig = "config" ) diff --git a/pkger/parser_test.go b/pkger/parser_test.go index b1801dbd6da..d09eeef39b6 100644 --- a/pkger/parser_test.go +++ b/pkger/parser_test.go @@ -3850,7 +3850,7 @@ spec: assert.Equal(t, "desc_"+strconv.Itoa(i), actual.Description) assert.Equal(t, status, actual.Status) - expectedQuery := "from(bucket: \"rucket_1\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"cpu\")\n |> filter(fn: (r) => r._field == \"usage_idle\")\n |> aggregateWindow(every: 1m, fn: mean)\n |> yield(name: \"mean\")" + expectedQuery := "from(bucket: \"rucket_1\")\n |> range(start: -5d, stop: -1h)\n |> filter(fn: (r) => r._measurement == \"cpu\")\n |> filter(fn: (r) => r._field == \"usage_idle\")\n |> aggregateWindow(every: 1m, fn: mean)\n |> yield(name: \"mean\")" assert.Equal(t, expectedQuery, actual.Query) require.Len(t, actual.LabelAssociations, 1) diff --git a/pkger/service.go b/pkger/service.go index 7e0576aa467..c82c0a479bd 100644 --- a/pkger/service.go +++ b/pkger/service.go @@ -34,6 +34,7 @@ type serviceOpt struct { endpointSVC influxdb.NotificationEndpointService ruleSVC influxdb.NotificationRuleStore secretSVC influxdb.SecretService + taskSVC influxdb.TaskService teleSVC influxdb.TelegrafConfigStore varSVC influxdb.VariableService @@ -99,6 +100,13 @@ func WithSecretSVC(secretSVC influxdb.SecretService) ServiceSetterFn { } } +// WithTelegrafSVC sets the telegraf service. +func WithTaskSVC(taskSVC influxdb.TaskService) ServiceSetterFn { + return func(opt *serviceOpt) { + opt.taskSVC = taskSVC + } +} + // WithTelegrafSVC sets the telegraf service. func WithTelegrafSVC(telegrafSVC influxdb.TelegrafConfigStore) ServiceSetterFn { return func(opt *serviceOpt) { @@ -125,6 +133,7 @@ type Service struct { endpointSVC influxdb.NotificationEndpointService ruleSVC influxdb.NotificationRuleStore secretSVC influxdb.SecretService + taskSVC influxdb.TaskService teleSVC influxdb.TelegrafConfigStore varSVC influxdb.VariableService @@ -152,6 +161,7 @@ func NewService(opts ...ServiceSetterFn) *Service { endpointSVC: opt.endpointSVC, ruleSVC: opt.ruleSVC, secretSVC: opt.secretSVC, + taskSVC: opt.taskSVC, teleSVC: opt.teleSVC, varSVC: opt.varSVC, applyReqLimit: opt.applyReqLimit, @@ -953,6 +963,7 @@ func (s *Service) dryRunLabelMappings(ctx context.Context, pkg *Pkg) ([]DiffLabe mapperDashboards(pkg.mDashboards), mapperNotificationEndpoints(pkg.notificationEndpoints()), mapperNotificationRules(pkg.mNotificationRules), + mapperTasks(pkg.mTasks), mapperTelegrafs(pkg.mTelegrafs), mapperVariables(pkg.variables()), } @@ -1076,6 +1087,7 @@ func (s *Service) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg s.applyChecks(pkg.checks()), s.applyDashboards(pkg.dashboards()), s.applyNotificationEndpoints(pkg.notificationEndpoints()), + s.applyTasks(pkg.tasks()), s.applyTelegrafs(pkg.telegrafs()), }, } @@ -1681,6 +1693,55 @@ func (s *Service) rollbackNotificationRules(rules []*notificationRule) error { return nil } +func (s *Service) applyTasks(tasks []*task) applier { + const resource = "tasks" + + mutex := new(doMutex) + rollbackTasks := make([]task, 0, len(tasks)) + + createFn := func(ctx context.Context, i int, orgID, userID influxdb.ID) *applyErrBody { + var t task + mutex.Do(func() { + tasks[i].orgID = orgID + t = *tasks[i] + }) + + newTask, err := s.taskSVC.CreateTask(ctx, influxdb.TaskCreate{ + Type: influxdb.TaskSystemType, + Flux: t.flux(), + OwnerID: userID, + Description: t.description, + Status: string(t.Status()), + OrganizationID: t.orgID, + }) + if err != nil { + return &applyErrBody{name: t.Name(), msg: err.Error()} + } + + mutex.Do(func() { + tasks[i].id = newTask.ID + rollbackTasks = append(rollbackTasks, *tasks[i]) + }) + + return nil + } + + return applier{ + creater: creater{ + entries: len(tasks), + fn: createFn, + }, + rollbacker: rollbacker{ + resource: resource, + fn: func() error { + return s.deleteByIDs("task", len(rollbackTasks), s.taskSVC.DeleteTask, func(i int) influxdb.ID { + return rollbackTasks[i].ID() + }) + }, + }, + } +} + func (s *Service) applyTelegrafs(teles []*telegraf) applier { const resource = "telegrafs" diff --git a/pkger/service_test.go b/pkger/service_test.go index 7331c640a07..f6d538fa813 100644 --- a/pkger/service_test.go +++ b/pkger/service_test.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "math/rand" + "regexp" "strconv" "testing" "time" @@ -29,6 +30,7 @@ func TestService(t *testing.T) { labelSVC: mock.NewLabelService(), endpointSVC: mock.NewNotificationEndpointService(), ruleSVC: mock.NewNotificationRuleStore(), + taskSVC: mock.NewTaskService(), teleSVC: mock.NewTelegrafConfigStore(), varSVC: mock.NewVariableService(), } @@ -44,6 +46,7 @@ func TestService(t *testing.T) { WithNotificationEndpointSVC(opt.endpointSVC), WithNotificationRuleSVC(opt.ruleSVC), WithSecretSVC(opt.secretSVC), + WithTaskSVC(opt.taskSVC), WithTelegrafSVC(opt.teleSVC), WithVariableSVC(opt.varSVC), ) @@ -934,6 +937,35 @@ func TestService(t *testing.T) { ) }) + t.Run("maps tasks with labels", func(t *testing.T) { + testLabelMappingFn( + t, + "testdata/tasks.yml", + 2, + func() []ServiceSetterFn { + fakeTaskSVC := mock.NewTaskService() + fakeTaskSVC.CreateTaskFn = func(ctx context.Context, tc influxdb.TaskCreate) (*influxdb.Task, error) { + reg := regexp.MustCompile(`name: "(.+)",`) + names := reg.FindStringSubmatch(tc.Flux) + if len(names) < 2 { + return nil, errors.New("bad flux query provided: " + tc.Flux) + } + return &influxdb.Task{ + ID: influxdb.ID(rand.Int()), + Type: tc.Type, + OrganizationID: tc.OrganizationID, + OwnerID: tc.OwnerID, + Name: names[1], + Description: tc.Description, + Status: tc.Status, + Flux: tc.Flux, + }, nil + } + return []ServiceSetterFn{WithTaskSVC(fakeTaskSVC)} + }, + ) + }) + t.Run("maps telegrafs with labels", func(t *testing.T) { testLabelMappingFn( t, @@ -1127,6 +1159,68 @@ func TestService(t *testing.T) { }) }) + t.Run("tasks", func(t *testing.T) { + t.Run("successfuly creates", func(t *testing.T) { + testfileRunner(t, "testdata/tasks.yml", func(t *testing.T, pkg *Pkg) { + orgID := influxdb.ID(9000) + + fakeTaskSVC := mock.NewTaskService() + fakeTaskSVC.CreateTaskFn = func(ctx context.Context, tc influxdb.TaskCreate) (*influxdb.Task, error) { + reg := regexp.MustCompile(`name: "(.+)",`) + names := reg.FindStringSubmatch(tc.Flux) + if len(names) < 2 { + return nil, errors.New("bad flux query provided: " + tc.Flux) + } + return &influxdb.Task{ + ID: influxdb.ID(fakeTaskSVC.CreateTaskCalls.Count() + 1), + Type: tc.Type, + OrganizationID: tc.OrganizationID, + OwnerID: tc.OwnerID, + Name: names[1], + Description: tc.Description, + Status: tc.Status, + Flux: tc.Flux, + }, nil + } + + svc := newTestService(WithTaskSVC(fakeTaskSVC)) + + sum, err := svc.Apply(context.TODO(), orgID, 0, pkg) + require.NoError(t, err) + + require.Len(t, sum.Tasks, 2) + for i, actual := range sum.Tasks { + assert.NotZero(t, actual.ID) + assert.Equal(t, "task_"+strconv.Itoa(i), actual.Name) + assert.Equal(t, "desc_"+strconv.Itoa(i), actual.Description) + } + }) + }) + + t.Run("rolls back all created tasks on an error", func(t *testing.T) { + testfileRunner(t, "testdata/tasks.yml", func(t *testing.T, pkg *Pkg) { + fakeTaskSVC := mock.NewTaskService() + fakeTaskSVC.CreateTaskFn = func(ctx context.Context, tc influxdb.TaskCreate) (*influxdb.Task, error) { + if fakeTaskSVC.CreateTaskCalls.Count() == 1 { + return nil, errors.New("expected error") + } + return &influxdb.Task{ + ID: influxdb.ID(fakeTaskSVC.CreateTaskCalls.Count() + 1), + }, nil + } + + svc := newTestService(WithTaskSVC(fakeTaskSVC)) + + orgID := influxdb.ID(9000) + + _, err := svc.Apply(context.TODO(), orgID, 0, pkg) + require.Error(t, err) + + assert.Equal(t, 1, fakeTaskSVC.DeleteTaskCalls.Count()) + }) + }) + }) + t.Run("telegrafs", func(t *testing.T) { t.Run("successfuly creates", func(t *testing.T) { testfileRunner(t, "testdata/telegraf.yml", func(t *testing.T, pkg *Pkg) { diff --git a/pkger/testdata/tasks.json b/pkger/testdata/tasks.json index f81386822f4..fdfef9ef260 100644 --- a/pkger/testdata/tasks.json +++ b/pkger/testdata/tasks.json @@ -18,7 +18,7 @@ "description": "desc_0", "every": "10m", "offset": "15s", - "query": "from(bucket: \"rucket_1\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"cpu\")\n |> filter(fn: (r) => r._field == \"usage_idle\")\n |> aggregateWindow(every: 1m, fn: mean)\n |> yield(name: \"mean\")", + "query": "from(bucket: \"rucket_1\")\n |> range(start: -5d, stop: -1h)\n |> filter(fn: (r) => r._measurement == \"cpu\")\n |> filter(fn: (r) => r._field == \"usage_idle\")\n |> aggregateWindow(every: 1m, fn: mean)\n |> yield(name: \"mean\")", "status": "inactive", "associations": [ { @@ -32,7 +32,7 @@ "name": "task_1", "description": "desc_1", "cron": "15 * * * *", - "query": "from(bucket: \"rucket_1\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"cpu\")\n |> filter(fn: (r) => r._field == \"usage_idle\")\n |> aggregateWindow(every: 1m, fn: mean)\n |> yield(name: \"mean\")", + "query": "from(bucket: \"rucket_1\")\n |> range(start: -5d, stop: -1h)\n |> filter(fn: (r) => r._measurement == \"cpu\")\n |> filter(fn: (r) => r._field == \"usage_idle\")\n |> aggregateWindow(every: 1m, fn: mean)\n |> yield(name: \"mean\")", "associations": [ { "kind": "Label", diff --git a/pkger/testdata/tasks.yml b/pkger/testdata/tasks.yml index 2310582b3db..a981d45b46e 100644 --- a/pkger/testdata/tasks.yml +++ b/pkger/testdata/tasks.yml @@ -15,7 +15,7 @@ spec: offset: 15s query: > from(bucket: "rucket_1") - |> range(start: v.timeRangeStart, stop: v.timeRangeStop) + |> range(start: -5d, stop: -1h) |> filter(fn: (r) => r._measurement == "cpu") |> filter(fn: (r) => r._field == "usage_idle") |> aggregateWindow(every: 1m, fn: mean) @@ -30,7 +30,7 @@ spec: cron: 15 * * * * query: > from(bucket: "rucket_1") - |> range(start: v.timeRangeStart, stop: v.timeRangeStop) + |> range(start: -5d, stop: -1h) |> filter(fn: (r) => r._measurement == "cpu") |> filter(fn: (r) => r._field == "usage_idle") |> aggregateWindow(every: 1m, fn: mean)