Skip to content

Commit

Permalink
Integrate the new changes to tasks (#13473)
Browse files Browse the repository at this point in the history
* Integrat the new changes to tasks
  • Loading branch information
lyondhill authored Apr 19, 2019
1 parent 662cf57 commit 726fbef
Show file tree
Hide file tree
Showing 9 changed files with 351 additions and 132 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
## v2.0.0-alpha.9 [unreleased]


**NOTE: This will remove all tasks from your InfluxDB v2.0 instance.**

### Features
1. [13423](https://github.com/influxdata/influxdb/pull/13423): Set autorefresh of dashboard to pause if absolute time range is selected
1. [13473](https://github.com/influxdata/influxdb/pull/13473): Switch task back end to a more modular and flexible system
1. [13493](https://github.com/influxdata/influxdb/pull/13493): Add org profile tab with ability to edit organization name
1. [13510](https://github.com/influxdata/influxdb/pull/13510): Add org name to dahboard page title

Expand Down
40 changes: 13 additions & 27 deletions cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/influxdata/influxdb/storage/readservice"
"github.com/influxdata/influxdb/task"
taskbackend "github.com/influxdata/influxdb/task/backend"
taskbolt "github.com/influxdata/influxdb/task/backend/bolt"
"github.com/influxdata/influxdb/task/backend/coordinator"
taskexecutor "github.com/influxdata/influxdb/task/backend/executor"
"github.com/influxdata/influxdb/telemetry"
Expand Down Expand Up @@ -211,8 +210,8 @@ type Launcher struct {

natsServer *nats.Server

scheduler *taskbackend.TickScheduler
taskStore taskbackend.Store
scheduler *taskbackend.TickScheduler
taskControlService taskbackend.TaskControlService

jaegerTracerCloser io.Closer
logger *zap.Logger
Expand Down Expand Up @@ -504,35 +503,22 @@ func (m *Launcher) run(ctx context.Context) (err error) {
var storageQueryService = readservice.NewProxyQueryService(m.queryController)
var taskSvc platform.TaskService
{
var (
store taskbackend.Store
err error
)
store, err = taskbolt.New(m.boltClient.DB(), "tasks", taskbolt.NoCatchUp)
if err != nil {
m.logger.Error("failed opening task bolt", zap.Error(err))
return err
}

if m.storeType == "memory" {
store = taskbackend.NewInMemStore()
}
// create the task stack:
// validation(coordinator(analyticalstore(kv.Service)))

executor := taskexecutor.NewAsyncQueryServiceExecutor(m.logger.With(zap.String("service", "task-executor")), m.queryController, authSvc, nil)
// define the executor and build analytical storage middleware
combinedTaskService := taskbackend.NewAnalyticalStorage(m.kvService, m.kvService, pointsWriter, query.QueryServiceBridge{AsyncQueryService: m.queryController})
executor := taskexecutor.NewAsyncQueryServiceExecutor(m.logger.With(zap.String("service", "task-executor")), m.queryController, authSvc, combinedTaskService)

lw := taskbackend.NewPointLogWriter(pointsWriter)
queryService := query.QueryServiceBridge{AsyncQueryService: m.queryController}
lr := taskbackend.NewQueryLogReader(queryService)
taskControlService := taskbackend.TaskControlAdaptor(store, lw, lr)
m.scheduler = taskbackend.NewScheduler(taskControlService, executor, time.Now().UTC().Unix(), taskbackend.WithTicker(ctx, 100*time.Millisecond), taskbackend.WithLogger(m.logger))
// create the scheduler
m.scheduler = taskbackend.NewScheduler(combinedTaskService, executor, time.Now().UTC().Unix(), taskbackend.WithTicker(ctx, 100*time.Millisecond), taskbackend.WithLogger(m.logger))
m.scheduler.Start(ctx)
m.reg.MustRegister(m.scheduler.PrometheusCollectors()...)

taskSvc = task.PlatformAdapter(store, lr, m.scheduler, authSvc, userResourceSvc, orgSvc)
taskexecutor.AddTaskService(executor, taskSvc)
taskSvc = coordinator.New(m.logger.With(zap.String("service", "task-coordinator")), m.scheduler, taskSvc)
taskSvc = coordinator.New(m.logger.With(zap.String("service", "task-coordinator")), m.scheduler, combinedTaskService)
taskSvc = task.NewValidator(m.logger.With(zap.String("service", "task-authz-validator")), taskSvc, bucketSvc)
m.taskStore = store
m.taskControlService = combinedTaskService
}

// NATS streaming server
Expand Down Expand Up @@ -691,8 +677,8 @@ func (m *Launcher) TaskService() platform.TaskService {
}

// TaskStore returns the internal store service.
func (m *Launcher) TaskStore() taskbackend.Store {
return m.taskStore
func (m *Launcher) TaskControlService() taskbackend.TaskControlService {
return m.taskControlService
}

// TaskScheduler returns the internal scheduler service.
Expand Down
9 changes: 2 additions & 7 deletions cmd/influxd/launcher/tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,21 +96,17 @@ stuff f=-123.456,b=true,s="hello"
}
from(bucket:"my_bucket_in") |> range(start:-5m) |> to(bucket:"%s", org:"%s")`, bOut.Name, be.Org.Name),
}

created, err := be.TaskService().CreateTask(ctx, create)
if err != nil {
t.Fatal(err)
}

// Find the next due run of the task we just created, so that we can accurately tick the scheduler to it.
m, err := be.TaskStore().FindTaskMetaByID(ctx, created.ID)
ndr, err := be.TaskControlService().NextDueRun(ctx, created.ID)
if err != nil {
t.Fatal(err)
}
ndr, err := m.NextDueRun()
if err != nil {
t.Fatal(err)
}

be.TaskScheduler().(*backend.TickScheduler).Tick(ndr + 1)

// Poll for the task to have started and finished.
Expand Down Expand Up @@ -210,7 +206,6 @@ from(bucket:"my_bucket_in") |> range(start:-5m) |> to(bucket:"%s", org:"%s")`, b
}

t.Run("showrun", func(t *testing.T) {
t.Skip("FindRunByID isn't returning the log")
// do a show run!
showRun, err := be.TaskService().FindRunByID(ctx, created.ID, targetRun.ID)
if err != nil {
Expand Down
Loading

0 comments on commit 726fbef

Please sign in to comment.