Skip to content

Commit

Permalink
feat(pkger): add apply functionality for task resource
Browse files Browse the repository at this point in the history
  • Loading branch information
jsteenb2 committed Dec 23, 2019
1 parent 8e87498 commit c9431bc
Show file tree
Hide file tree
Showing 13 changed files with 348 additions and 24 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
1. [16312](https://github.com/influxdata/influxdb/pull/16312): Add support for notification rule pkger export functionality
1. [16320](https://github.com/influxdata/influxdb/pull/16320): Add support for tasks to pkger parser
1. [16322](https://github.com/influxdata/influxdb/pull/16322): Add support for tasks to pkger dry run functionality
1. [16323](https://github.com/influxdata/influxdb/pull/16323): Add support for tasks to pkger apply functionality

### Bug Fixes

Expand Down
24 changes: 23 additions & 1 deletion cmd/influx/pkg.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,13 +702,18 @@ func (b *cmdPkgBuilder) printPkgDiff(diff pkger.Diff) {
}

if tasks := diff.Tasks; len(tasks) > 0 {
headers := []string{"New", "Name", "Description"}
headers := []string{"New", "Name", "Description", "Cycle"}
tablePrintFn("TASKS", headers, len(tasks), func(i int) []string {
t := tasks[i]
timing := fmt.Sprintf("every: %s offset: %s", t.Every, t.Offset)
if t.Cron != "" {
timing = t.Cron
}
return []string{
boolDiff(true),
t.Name,
green(t.Description),
green(timing),
}
})
}
Expand Down Expand Up @@ -822,6 +827,23 @@ func (b *cmdPkgBuilder) printPkgSummary(sum pkger.Summary) {
})
}

if tasks := sum.Tasks; len(tasks) > 0 {
headers := []string{"ID", "Name", "Description", "Cycle"}
tablePrintFn("TASKS", headers, len(tasks), func(i int) []string {
t := tasks[i]
timing := fmt.Sprintf("every: %s offset: %s", t.Every, t.Offset)
if t.Cron != "" {
timing = t.Cron
}
return []string{
t.ID.String(),
t.Name,
t.Description,
timing,
}
})
}

if teles := sum.TelegrafConfigs; len(teles) > 0 {
headers := []string{"ID", "Name", "Description"}
tablePrintFn("TELEGRAF CONFIGS", headers, len(teles), func(i int) []string {
Expand Down
4 changes: 3 additions & 1 deletion cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,15 +838,17 @@ func (m *Launcher) run(ctx context.Context) (err error) {
b := m.apibackend
authedOrgSVC := authorizer.NewOrgService(b.OrganizationService)
authedURMSVC := authorizer.NewURMService(b.OrgLookupService, b.UserResourceMappingService)
pkgerLogger := m.log.With(zap.String("service", "pkger"))
pkgSVC = pkger.NewService(
pkger.WithLogger(m.log.With(zap.String("service", "pkger"))),
pkger.WithLogger(pkgerLogger),
pkger.WithBucketSVC(authorizer.NewBucketService(b.BucketService)),
pkger.WithCheckSVC(authorizer.NewCheckService(b.CheckService, authedURMSVC, authedOrgSVC)),
pkger.WithDashboardSVC(authorizer.NewDashboardService(b.DashboardService)),
pkger.WithLabelSVC(authorizer.NewLabelService(b.LabelService)),
pkger.WithNotificationEndpointSVC(authorizer.NewNotificationEndpointService(b.NotificationEndpointService, authedURMSVC, authedOrgSVC)),
pkger.WithNotificationRuleSVC(authorizer.NewNotificationRuleStore(b.NotificationRuleStore, authedURMSVC, authedOrgSVC)),
pkger.WithSecretSVC(authorizer.NewSecretService(b.SecretService)),
pkger.WithTaskSVC(authorizer.NewTaskService(pkgerLogger, b.TaskService)),
pkger.WithTelegrafSVC(authorizer.NewTelegrafConfigService(b.TelegrafService, b.UserResourceMappingService)),
pkger.WithVariableSVC(authorizer.NewVariableService(b.VariableService)),
)
Expand Down
4 changes: 4 additions & 0 deletions cmd/influxd/launcher/launcher_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,10 @@ func (tl *TestLauncher) PkgerService(tb testing.TB) pkger.SVC {
return &http.PkgerService{Client: tl.HTTPClient(tb)}
}

func (tl *TestLauncher) TaskServiceKV() platform.TaskService {
return tl.kvService
}

func (tl *TestLauncher) TelegrafService(tb testing.TB) *http.TelegrafService {
tb.Helper()
return http.NewTelegrafService(tl.HTTPClient(tb))
Expand Down
27 changes: 21 additions & 6 deletions cmd/influxd/launcher/pkger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func TestLauncher_Pkger(t *testing.T) {
}),
pkger.WithNotificationEndpointSVC(l.NotificationEndpointService(t)),
pkger.WithNotificationRuleSVC(l.NotificationRuleService()),
pkger.WithTaskSVC(l.TaskServiceKV()),
pkger.WithTelegrafSVC(l.TelegrafService(t)),
pkger.WithVariableSVC(l.VariableService(t)),
)
Expand Down Expand Up @@ -88,6 +89,12 @@ func TestLauncher_Pkger(t *testing.T) {
require.NoError(t, err)
assert.Empty(t, rules)

tasks, _, err := l.TaskServiceKV().FindTasks(ctx, influxdb.TaskFilter{
OrganizationID: &l.Org.ID,
})
require.NoError(t, err)
assert.Empty(t, tasks)

teles, _, err := l.TelegrafService(t).FindTelegrafConfigs(ctx, influxdb.TelegrafConfigFilter{
OrgID: &l.Org.ID,
})
Expand Down Expand Up @@ -258,6 +265,12 @@ func TestLauncher_Pkger(t *testing.T) {
assert.Equal(t, "http_none_auth_notification_endpoint", rule.EndpointName)
assert.Equal(t, "http", rule.EndpointType)

require.Len(t, sum1.Tasks, 1)
task := sum1.Tasks[0]
assert.NotZero(t, task.ID)
assert.Equal(t, "task_1", task.Name)
assert.Equal(t, "desc_1", task.Description)

teles := sum1.TelegrafConfigs
require.Len(t, teles, 1)
assert.NotZero(t, teles[0].TelegrafConfig.ID)
Expand Down Expand Up @@ -292,9 +305,14 @@ func TestLauncher_Pkger(t *testing.T) {
mappings := sum1.LabelMappings
require.Len(t, mappings, 9)
hasMapping(t, mappings, newSumMapping(bkts[0].ID, bkts[0].Name, influxdb.BucketsResourceType))
hasMapping(t, mappings, newSumMapping(pkger.SafeID(checks[0].Check.GetID()), checks[0].Check.GetName(), influxdb.ChecksResourceType))
hasMapping(t, mappings, newSumMapping(pkger.SafeID(checks[1].Check.GetID()), checks[1].Check.GetName(), influxdb.ChecksResourceType))
hasMapping(t, mappings, newSumMapping(dashs[0].ID, dashs[0].Name, influxdb.DashboardsResourceType))
hasMapping(t, mappings, newSumMapping(vars[0].ID, vars[0].Name, influxdb.VariablesResourceType))
hasMapping(t, mappings, newSumMapping(pkger.SafeID(endpoints[0].NotificationEndpoint.GetID()), endpoints[0].NotificationEndpoint.GetName(), influxdb.NotificationEndpointResourceType))
hasMapping(t, mappings, newSumMapping(rule.ID, rule.Name, influxdb.NotificationRuleResourceType))
hasMapping(t, mappings, newSumMapping(task.ID, task.Name, influxdb.TasksResourceType))
hasMapping(t, mappings, newSumMapping(pkger.SafeID(teles[0].TelegrafConfig.ID), teles[0].TelegrafConfig.Name, influxdb.TelegrafsResourceType))
hasMapping(t, mappings, newSumMapping(vars[0].ID, vars[0].Name, influxdb.VariablesResourceType))

t.Run("pkg with same bkt-var-label does nto create new resources for them", func(t *testing.T) {
// validate the new package doesn't create new resources for bkts/labels/vars
Expand Down Expand Up @@ -506,6 +524,7 @@ spec:
pkger.WithDashboardSVC(l.DashboardService(t)),
pkger.WithLabelSVC(l.LabelService(t)),
pkger.WithNotificationEndpointSVC(l.NotificationEndpointService(t)),
pkger.WithTaskSVC(l.TaskServiceKV()),
pkger.WithTelegrafSVC(l.TelegrafService(t)),
pkger.WithVariableSVC(l.VariableService(t)),
)
Expand Down Expand Up @@ -714,11 +733,7 @@ spec:
cron: 15 * * * *
query: >
from(bucket: "rucket_1")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r._measurement == "cpu")
|> filter(fn: (r) => r._field == "usage_idle")
|> aggregateWindow(every: 1m, fn: mean)
|> yield(name: "mean")
|> yield()
associations:
- kind: Label
name: label_1
Expand Down
21 changes: 21 additions & 0 deletions http/swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7321,6 +7321,27 @@ components:
type: array
items:
$ref: "#/components/schemas/PkgSummaryLabel"
tasks:
type: array
items:
type: object
properties:
id:
type: string
name:
type: string
cron:
type: string
description:
type: string
every:
type: string
offset:
type: string
query:
type: string
status:
type: string
telegrafConfigs:
type: array
items:
Expand Down
82 changes: 71 additions & 11 deletions mock/task_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,60 +12,120 @@ var _ influxdb.TaskService = (*TaskService)(nil)
var _ backend.TaskControlService = (*TaskControlService)(nil)

type TaskService struct {
FindTaskByIDFn func(context.Context, influxdb.ID) (*influxdb.Task, error)
FindTasksFn func(context.Context, influxdb.TaskFilter) ([]*influxdb.Task, int, error)
CreateTaskFn func(context.Context, influxdb.TaskCreate) (*influxdb.Task, error)
UpdateTaskFn func(context.Context, influxdb.ID, influxdb.TaskUpdate) (*influxdb.Task, error)
DeleteTaskFn func(context.Context, influxdb.ID) error
FindLogsFn func(context.Context, influxdb.LogFilter) ([]*influxdb.Log, int, error)
FindRunsFn func(context.Context, influxdb.RunFilter) ([]*influxdb.Run, int, error)
FindRunByIDFn func(context.Context, influxdb.ID, influxdb.ID) (*influxdb.Run, error)
CancelRunFn func(context.Context, influxdb.ID, influxdb.ID) error
RetryRunFn func(context.Context, influxdb.ID, influxdb.ID) (*influxdb.Run, error)
ForceRunFn func(context.Context, influxdb.ID, int64) (*influxdb.Run, error)
FindTaskByIDFn func(context.Context, influxdb.ID) (*influxdb.Task, error)
FindTaskByIDCalls SafeCount
FindTasksFn func(context.Context, influxdb.TaskFilter) ([]*influxdb.Task, int, error)
FindTasksCalls SafeCount
CreateTaskFn func(context.Context, influxdb.TaskCreate) (*influxdb.Task, error)
CreateTaskCalls SafeCount
UpdateTaskFn func(context.Context, influxdb.ID, influxdb.TaskUpdate) (*influxdb.Task, error)
UpdateTaskCalls SafeCount
DeleteTaskFn func(context.Context, influxdb.ID) error
DeleteTaskCalls SafeCount
FindLogsFn func(context.Context, influxdb.LogFilter) ([]*influxdb.Log, int, error)
FindLogsCalls SafeCount
FindRunsFn func(context.Context, influxdb.RunFilter) ([]*influxdb.Run, int, error)
FindRunsCalls SafeCount
FindRunByIDFn func(context.Context, influxdb.ID, influxdb.ID) (*influxdb.Run, error)
FindRunByIDCalls SafeCount
CancelRunFn func(context.Context, influxdb.ID, influxdb.ID) error
CancelRunCalls SafeCount
RetryRunFn func(context.Context, influxdb.ID, influxdb.ID) (*influxdb.Run, error)
RetryRunCalls SafeCount
ForceRunFn func(context.Context, influxdb.ID, int64) (*influxdb.Run, error)
ForceRunCalls SafeCount
}

func NewTaskService() *TaskService {
return &TaskService{
FindTaskByIDFn: func(ctx context.Context, id influxdb.ID) (*influxdb.Task, error) {
return nil, nil
},
FindTasksFn: func(ctx context.Context, f influxdb.TaskFilter) ([]*influxdb.Task, int, error) {
return nil, 0, nil
},
CreateTaskFn: func(ctx context.Context, taskCreate influxdb.TaskCreate) (*influxdb.Task, error) {
return nil, nil
},
UpdateTaskFn: func(ctx context.Context, id influxdb.ID, update influxdb.TaskUpdate) (*influxdb.Task, error) {
return nil, nil
},
DeleteTaskFn: func(ctx context.Context, id influxdb.ID) error {
return nil
},
FindLogsFn: func(ctx context.Context, f influxdb.LogFilter) ([]*influxdb.Log, int, error) {
return nil, 0, nil
},
FindRunsFn: func(ctx context.Context, f influxdb.RunFilter) ([]*influxdb.Run, int, error) {
return nil, 0, nil
},
FindRunByIDFn: func(ctx context.Context, id influxdb.ID, id2 influxdb.ID) (*influxdb.Run, error) {
return nil, nil
},
CancelRunFn: func(ctx context.Context, id influxdb.ID, id2 influxdb.ID) error {
return nil
},
RetryRunFn: func(ctx context.Context, id influxdb.ID, id2 influxdb.ID) (*influxdb.Run, error) {
return nil, nil
},
ForceRunFn: func(ctx context.Context, id influxdb.ID, i int64) (*influxdb.Run, error) {
return nil, nil
},
}
}

func (s *TaskService) FindTaskByID(ctx context.Context, id influxdb.ID) (*influxdb.Task, error) {
defer s.FindTaskByIDCalls.IncrFn()()
return s.FindTaskByIDFn(ctx, id)
}

func (s *TaskService) FindTasks(ctx context.Context, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) {
defer s.FindTasksCalls.IncrFn()()
return s.FindTasksFn(ctx, filter)
}

func (s *TaskService) CreateTask(ctx context.Context, t influxdb.TaskCreate) (*influxdb.Task, error) {
defer s.CreateTaskCalls.IncrFn()()
return s.CreateTaskFn(ctx, t)
}

func (s *TaskService) UpdateTask(ctx context.Context, id influxdb.ID, upd influxdb.TaskUpdate) (*influxdb.Task, error) {
defer s.UpdateTaskCalls.IncrFn()()
return s.UpdateTaskFn(ctx, id, upd)
}

func (s *TaskService) DeleteTask(ctx context.Context, id influxdb.ID) error {
defer s.DeleteTaskCalls.IncrFn()()
return s.DeleteTaskFn(ctx, id)
}

func (s *TaskService) FindLogs(ctx context.Context, filter influxdb.LogFilter) ([]*influxdb.Log, int, error) {
defer s.FindLogsCalls.IncrFn()()
return s.FindLogsFn(ctx, filter)
}

func (s *TaskService) FindRuns(ctx context.Context, filter influxdb.RunFilter) ([]*influxdb.Run, int, error) {
defer s.FindRunsCalls.IncrFn()()
return s.FindRunsFn(ctx, filter)
}

func (s *TaskService) FindRunByID(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) {
defer s.FindRunByIDCalls.IncrFn()()
return s.FindRunByIDFn(ctx, taskID, runID)
}

func (s *TaskService) CancelRun(ctx context.Context, taskID, runID influxdb.ID) error {
defer s.CancelRunCalls.IncrFn()()
return s.CancelRunFn(ctx, taskID, runID)
}

func (s *TaskService) RetryRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) {
defer s.RetryRunCalls.IncrFn()()
return s.RetryRunFn(ctx, taskID, runID)
}

func (s *TaskService) ForceRun(ctx context.Context, taskID influxdb.ID, scheduledFor int64) (*influxdb.Run, error) {
defer s.ForceRunCalls.IncrFn()()
return s.ForceRunFn(ctx, taskID, scheduledFor)
}

Expand Down
44 changes: 44 additions & 0 deletions pkger/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,7 @@ type SummaryLabelMapping struct {

// SummaryTask provides a summary of a task.
type SummaryTask struct {
ID SafeID `json:"id"`
Name string `json:"name"`
Cron string `json:"cron"`
Description string `json:"description"`
Expand Down Expand Up @@ -1808,6 +1809,8 @@ const (
)

type task struct {
id influxdb.ID
orgID influxdb.ID
name string
cron string
description string
Expand All @@ -1819,6 +1822,18 @@ type task struct {
labels sortedLabels
}

func (t *task) Exists() bool {
return false
}

func (t *task) ID() influxdb.ID {
return t.id
}

func (t *task) Labels() []*label {
return t.labels
}

func (t *task) Name() string {
return t.name
}
Expand All @@ -1834,8 +1849,27 @@ func (t *task) Status() influxdb.Status {
return influxdb.Status(t.status)
}

func (t *task) flux() string {
taskOpts := []string{fmt.Sprintf("name: %q", t.name)}
if t.cron != "" {
taskOpts = append(taskOpts, fmt.Sprintf("cron: %q", t.cron))
}
if t.every > 0 {
taskOpts = append(taskOpts, fmt.Sprintf("every: %s", t.every))
}
if t.offset > 0 {
taskOpts = append(taskOpts, fmt.Sprintf("offset: %s", t.offset))
}
// this is required by the API, super nasty. Will be super challenging for
// anyone outside org to figure out how to do this within an hour of looking
// at the API :sadpanda:. Would be ideal to let the API translate the arguments
// into this required form instead of forcing that complexity on the caller.
return fmt.Sprintf("option task = { %s }\n%s", strings.Join(taskOpts, ", "), t.query)
}

func (t *task) summarize() SummaryTask {
return SummaryTask{
ID: SafeID(t.ID()),
Name: t.Name(),
Cron: t.cron,
Description: t.description,
Expand Down Expand Up @@ -1879,6 +1913,16 @@ func (t *task) valid() []validationErr {
return vErrs
}

type mapperTasks []*task

func (m mapperTasks) Association(i int) labelAssociater {
return m[i]
}

func (m mapperTasks) Len() int {
return len(m)
}

const (
fieldTelegrafConfig = "config"
)
Expand Down
Loading

0 comments on commit c9431bc

Please sign in to comment.