Skip to content

Commit

Permalink
feat(pkger): add apply functionality for task resource
Browse files Browse the repository at this point in the history
  • Loading branch information
jsteenb2 committed Dec 23, 2019
1 parent 8e87498 commit fe25e05
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 6 deletions.
4 changes: 3 additions & 1 deletion cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,15 +838,17 @@ func (m *Launcher) run(ctx context.Context) (err error) {
b := m.apibackend
authedOrgSVC := authorizer.NewOrgService(b.OrganizationService)
authedURMSVC := authorizer.NewURMService(b.OrgLookupService, b.UserResourceMappingService)
pkgerLogger := m.log.With(zap.String("service", "pkger"))
pkgSVC = pkger.NewService(
pkger.WithLogger(m.log.With(zap.String("service", "pkger"))),
pkger.WithLogger(pkgerLogger),
pkger.WithBucketSVC(authorizer.NewBucketService(b.BucketService)),
pkger.WithCheckSVC(authorizer.NewCheckService(b.CheckService, authedURMSVC, authedOrgSVC)),
pkger.WithDashboardSVC(authorizer.NewDashboardService(b.DashboardService)),
pkger.WithLabelSVC(authorizer.NewLabelService(b.LabelService)),
pkger.WithNotificationEndpointSVC(authorizer.NewNotificationEndpointService(b.NotificationEndpointService, authedURMSVC, authedOrgSVC)),
pkger.WithNotificationRuleSVC(authorizer.NewNotificationRuleStore(b.NotificationRuleStore, authedURMSVC, authedOrgSVC)),
pkger.WithSecretSVC(authorizer.NewSecretService(b.SecretService)),
pkger.WithTaskSVC(authorizer.NewTaskService(pkgerLogger, b.TaskService)),
pkger.WithTelegrafSVC(authorizer.NewTelegrafConfigService(b.TelegrafService, b.UserResourceMappingService)),
pkger.WithVariableSVC(authorizer.NewVariableService(b.VariableService)),
)
Expand Down
4 changes: 4 additions & 0 deletions cmd/influxd/launcher/launcher_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,10 @@ func (tl *TestLauncher) PkgerService(tb testing.TB) pkger.SVC {
return &http.PkgerService{Client: tl.HTTPClient(tb)}
}

func (tl *TestLauncher) TaskServiceKV() platform.TaskService {
return tl.kvService
}

func (tl *TestLauncher) TelegrafService(tb testing.TB) *http.TelegrafService {
tb.Helper()
return http.NewTelegrafService(tl.HTTPClient(tb))
Expand Down
20 changes: 15 additions & 5 deletions cmd/influxd/launcher/pkger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func TestLauncher_Pkger(t *testing.T) {
}),
pkger.WithNotificationEndpointSVC(l.NotificationEndpointService(t)),
pkger.WithNotificationRuleSVC(l.NotificationRuleService()),
pkger.WithTaskSVC(l.TaskServiceKV()),
pkger.WithTelegrafSVC(l.TelegrafService(t)),
pkger.WithVariableSVC(l.VariableService(t)),
)
Expand Down Expand Up @@ -88,6 +89,12 @@ func TestLauncher_Pkger(t *testing.T) {
require.NoError(t, err)
assert.Empty(t, rules)

tasks, _, err := l.TaskServiceKV().FindTasks(ctx, influxdb.TaskFilter{
OrganizationID: &l.Org.ID,
})
require.NoError(t, err)
assert.Empty(t, tasks)

teles, _, err := l.TelegrafService(t).FindTelegrafConfigs(ctx, influxdb.TelegrafConfigFilter{
OrgID: &l.Org.ID,
})
Expand Down Expand Up @@ -258,6 +265,12 @@ func TestLauncher_Pkger(t *testing.T) {
assert.Equal(t, "http_none_auth_notification_endpoint", rule.EndpointName)
assert.Equal(t, "http", rule.EndpointType)

require.Len(t, sum1.Tasks, 1)
task := sum1.Tasks[0]
assert.NotZero(t, task.ID)
assert.Equal(t, "task_1", task.Name)
assert.Equal(t, "desc_1", task.Description)

teles := sum1.TelegrafConfigs
require.Len(t, teles, 1)
assert.NotZero(t, teles[0].TelegrafConfig.ID)
Expand Down Expand Up @@ -506,6 +519,7 @@ spec:
pkger.WithDashboardSVC(l.DashboardService(t)),
pkger.WithLabelSVC(l.LabelService(t)),
pkger.WithNotificationEndpointSVC(l.NotificationEndpointService(t)),
pkger.WithTaskSVC(l.TaskServiceKV()),
pkger.WithTelegrafSVC(l.TelegrafService(t)),
pkger.WithVariableSVC(l.VariableService(t)),
)
Expand Down Expand Up @@ -714,11 +728,7 @@ spec:
cron: 15 * * * *
query: >
from(bucket: "rucket_1")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r._measurement == "cpu")
|> filter(fn: (r) => r._field == "usage_idle")
|> aggregateWindow(every: 1m, fn: mean)
|> yield(name: "mean")
|> yield()
associations:
- kind: Label
name: label_1
Expand Down
26 changes: 26 additions & 0 deletions pkger/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,7 @@ type SummaryLabelMapping struct {

// SummaryTask provides a summary of a task.
type SummaryTask struct {
ID SafeID `json:"id"`
Name string `json:"name"`
Cron string `json:"cron"`
Description string `json:"description"`
Expand Down Expand Up @@ -1808,6 +1809,8 @@ const (
)

type task struct {
id influxdb.ID
orgID influxdb.ID
name string
cron string
description string
Expand All @@ -1819,6 +1822,10 @@ type task struct {
labels sortedLabels
}

func (t *task) ID() influxdb.ID {
return t.id
}

func (t *task) Name() string {
return t.name
}
Expand All @@ -1834,8 +1841,27 @@ func (t *task) Status() influxdb.Status {
return influxdb.Status(t.status)
}

func (t *task) flux() string {
taskOpts := []string{fmt.Sprintf("name: %q", t.name)}
if t.cron != "" {
taskOpts = append(taskOpts, fmt.Sprintf("cron: %q", t.cron))
}
if t.every > 0 {
taskOpts = append(taskOpts, fmt.Sprintf("every: %s", t.every))
}
if t.offset > 0 {
taskOpts = append(taskOpts, fmt.Sprintf("offset: %s", t.offset))
}
// this is required by the API, super nasty. Will be super challenging for
// anyone outside org to figure out how to do this within an hour of looking
// at the API :sadpanda:. Would be ideal to let the API translate the arguments
// into this required form instead of forcing that complexity on the caller.
return fmt.Sprintf("option task = { %s }\n%s", strings.Join(taskOpts, ", "), t.query)
}

func (t *task) summarize() SummaryTask {
return SummaryTask{
ID: SafeID(t.ID()),
Name: t.Name(),
Cron: t.cron,
Description: t.description,
Expand Down
60 changes: 60 additions & 0 deletions pkger/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type serviceOpt struct {
endpointSVC influxdb.NotificationEndpointService
ruleSVC influxdb.NotificationRuleStore
secretSVC influxdb.SecretService
taskSVC influxdb.TaskService
teleSVC influxdb.TelegrafConfigStore
varSVC influxdb.VariableService

Expand Down Expand Up @@ -99,6 +100,13 @@ func WithSecretSVC(secretSVC influxdb.SecretService) ServiceSetterFn {
}
}

// WithTelegrafSVC sets the telegraf service.
func WithTaskSVC(taskSVC influxdb.TaskService) ServiceSetterFn {
return func(opt *serviceOpt) {
opt.taskSVC = taskSVC
}
}

// WithTelegrafSVC sets the telegraf service.
func WithTelegrafSVC(telegrafSVC influxdb.TelegrafConfigStore) ServiceSetterFn {
return func(opt *serviceOpt) {
Expand All @@ -125,6 +133,7 @@ type Service struct {
endpointSVC influxdb.NotificationEndpointService
ruleSVC influxdb.NotificationRuleStore
secretSVC influxdb.SecretService
taskSVC influxdb.TaskService
teleSVC influxdb.TelegrafConfigStore
varSVC influxdb.VariableService

Expand Down Expand Up @@ -152,6 +161,7 @@ func NewService(opts ...ServiceSetterFn) *Service {
endpointSVC: opt.endpointSVC,
ruleSVC: opt.ruleSVC,
secretSVC: opt.secretSVC,
taskSVC: opt.taskSVC,
teleSVC: opt.teleSVC,
varSVC: opt.varSVC,
applyReqLimit: opt.applyReqLimit,
Expand Down Expand Up @@ -1076,6 +1086,7 @@ func (s *Service) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg
s.applyChecks(pkg.checks()),
s.applyDashboards(pkg.dashboards()),
s.applyNotificationEndpoints(pkg.notificationEndpoints()),
s.applyTasks(pkg.tasks()),
s.applyTelegrafs(pkg.telegrafs()),
},
}
Expand Down Expand Up @@ -1681,6 +1692,55 @@ func (s *Service) rollbackNotificationRules(rules []*notificationRule) error {
return nil
}

func (s *Service) applyTasks(tasks []*task) applier {
const resource = "tasks"

mutex := new(doMutex)
rollbackTasks := make([]task, 0, len(tasks))

createFn := func(ctx context.Context, i int, orgID, userID influxdb.ID) *applyErrBody {
var t task
mutex.Do(func() {
tasks[i].orgID = orgID
t = *tasks[i]
})

newTask, err := s.taskSVC.CreateTask(ctx, influxdb.TaskCreate{
Type: influxdb.TaskSystemType,
Flux: t.flux(),
OwnerID: userID,
Description: t.description,
Status: string(t.Status()),
OrganizationID: t.orgID,
})
if err != nil {
return &applyErrBody{name: t.Name(), msg: err.Error()}
}

mutex.Do(func() {
tasks[i].id = newTask.ID
rollbackTasks = append(rollbackTasks, *tasks[i])
})

return nil
}

return applier{
creater: creater{
entries: len(tasks),
fn: createFn,
},
rollbacker: rollbacker{
resource: resource,
fn: func() error {
return s.deleteByIDs("task", len(rollbackTasks), s.taskSVC.DeleteTask, func(i int) influxdb.ID {
return rollbackTasks[i].ID()
})
},
},
}
}

func (s *Service) applyTelegrafs(teles []*telegraf) applier {
const resource = "telegrafs"

Expand Down

0 comments on commit fe25e05

Please sign in to comment.