diff --git a/CHANGELOG.md b/CHANGELOG.md index fa174c8db2d..7c6ee5528bd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,11 @@ ## v2.0.0-alpha.9 [unreleased] + +**NOTE: This will remove all tasks from your InfluxDB v2.0 instance.** + ### Features 1. [13423](https://github.com/influxdata/influxdb/pull/13423): Set autorefresh of dashboard to pause if absolute time range is selected +1. [13473](https://github.com/influxdata/influxdb/pull/13473): Switch task back end to a more modular and flexible system 1. [13493](https://github.com/influxdata/influxdb/pull/13493): Add org profile tab with ability to edit organization name 1. [13510](https://github.com/influxdata/influxdb/pull/13510): Add org name to dahboard page title diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index 40b80cf5d13..45f48be08fa 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -37,7 +37,6 @@ import ( "github.com/influxdata/influxdb/storage/readservice" "github.com/influxdata/influxdb/task" taskbackend "github.com/influxdata/influxdb/task/backend" - taskbolt "github.com/influxdata/influxdb/task/backend/bolt" "github.com/influxdata/influxdb/task/backend/coordinator" taskexecutor "github.com/influxdata/influxdb/task/backend/executor" "github.com/influxdata/influxdb/telemetry" @@ -211,8 +210,8 @@ type Launcher struct { natsServer *nats.Server - scheduler *taskbackend.TickScheduler - taskStore taskbackend.Store + scheduler *taskbackend.TickScheduler + taskControlService taskbackend.TaskControlService jaegerTracerCloser io.Closer logger *zap.Logger @@ -504,35 +503,22 @@ func (m *Launcher) run(ctx context.Context) (err error) { var storageQueryService = readservice.NewProxyQueryService(m.queryController) var taskSvc platform.TaskService { - var ( - store taskbackend.Store - err error - ) - store, err = taskbolt.New(m.boltClient.DB(), "tasks", taskbolt.NoCatchUp) - if err != nil { - m.logger.Error("failed opening task bolt", zap.Error(err)) - return err - } - if m.storeType == "memory" { - store = taskbackend.NewInMemStore() - } + // create the task stack: + // validation(coordinator(analyticalstore(kv.Service))) - executor := taskexecutor.NewAsyncQueryServiceExecutor(m.logger.With(zap.String("service", "task-executor")), m.queryController, authSvc, nil) + // define the executor and build analytical storage middleware + combinedTaskService := taskbackend.NewAnalyticalStorage(m.kvService, m.kvService, pointsWriter, query.QueryServiceBridge{AsyncQueryService: m.queryController}) + executor := taskexecutor.NewAsyncQueryServiceExecutor(m.logger.With(zap.String("service", "task-executor")), m.queryController, authSvc, combinedTaskService) - lw := taskbackend.NewPointLogWriter(pointsWriter) - queryService := query.QueryServiceBridge{AsyncQueryService: m.queryController} - lr := taskbackend.NewQueryLogReader(queryService) - taskControlService := taskbackend.TaskControlAdaptor(store, lw, lr) - m.scheduler = taskbackend.NewScheduler(taskControlService, executor, time.Now().UTC().Unix(), taskbackend.WithTicker(ctx, 100*time.Millisecond), taskbackend.WithLogger(m.logger)) + // create the scheduler + m.scheduler = taskbackend.NewScheduler(combinedTaskService, executor, time.Now().UTC().Unix(), taskbackend.WithTicker(ctx, 100*time.Millisecond), taskbackend.WithLogger(m.logger)) m.scheduler.Start(ctx) m.reg.MustRegister(m.scheduler.PrometheusCollectors()...) - taskSvc = task.PlatformAdapter(store, lr, m.scheduler, authSvc, userResourceSvc, orgSvc) - taskexecutor.AddTaskService(executor, taskSvc) - taskSvc = coordinator.New(m.logger.With(zap.String("service", "task-coordinator")), m.scheduler, taskSvc) + taskSvc = coordinator.New(m.logger.With(zap.String("service", "task-coordinator")), m.scheduler, combinedTaskService) taskSvc = task.NewValidator(m.logger.With(zap.String("service", "task-authz-validator")), taskSvc, bucketSvc) - m.taskStore = store + m.taskControlService = combinedTaskService } // NATS streaming server @@ -691,8 +677,8 @@ func (m *Launcher) TaskService() platform.TaskService { } // TaskStore returns the internal store service. -func (m *Launcher) TaskStore() taskbackend.Store { - return m.taskStore +func (m *Launcher) TaskControlService() taskbackend.TaskControlService { + return m.taskControlService } // TaskScheduler returns the internal scheduler service. diff --git a/cmd/influxd/launcher/tasks_test.go b/cmd/influxd/launcher/tasks_test.go index 6e35e09b1f8..27a83f56a0d 100644 --- a/cmd/influxd/launcher/tasks_test.go +++ b/cmd/influxd/launcher/tasks_test.go @@ -96,21 +96,17 @@ stuff f=-123.456,b=true,s="hello" } from(bucket:"my_bucket_in") |> range(start:-5m) |> to(bucket:"%s", org:"%s")`, bOut.Name, be.Org.Name), } + created, err := be.TaskService().CreateTask(ctx, create) if err != nil { t.Fatal(err) } // Find the next due run of the task we just created, so that we can accurately tick the scheduler to it. - m, err := be.TaskStore().FindTaskMetaByID(ctx, created.ID) + ndr, err := be.TaskControlService().NextDueRun(ctx, created.ID) if err != nil { t.Fatal(err) } - ndr, err := m.NextDueRun() - if err != nil { - t.Fatal(err) - } - be.TaskScheduler().(*backend.TickScheduler).Tick(ndr + 1) // Poll for the task to have started and finished. @@ -210,7 +206,6 @@ from(bucket:"my_bucket_in") |> range(start:-5m) |> to(bucket:"%s", org:"%s")`, b } t.Run("showrun", func(t *testing.T) { - t.Skip("FindRunByID isn't returning the log") // do a show run! showRun, err := be.TaskService().FindRunByID(ctx, created.ID, targetRun.ID) if err != nil { diff --git a/kv/task.go b/kv/task.go index b4cca02b0b9..4e9e4b6c93c 100644 --- a/kv/task.go +++ b/kv/task.go @@ -3,7 +3,6 @@ package kv import ( "context" "encoding/json" - "errors" "fmt" "strings" "time" @@ -172,10 +171,24 @@ func (s *Service) findTaskByID(ctx context.Context, tx Tx, id influxdb.ID) (*inf return nil, err } if !latestCompleted.IsZero() { - t.LatestCompleted = latestCompleted.Format(time.RFC3339) - } else { + if t.LatestCompleted != "" { + tlc, err := time.Parse(time.RFC3339, t.LatestCompleted) + if err == nil && latestCompleted.After(tlc) { + t.LatestCompleted = latestCompleted.Format(time.RFC3339) + + } + } else { + t.LatestCompleted = latestCompleted.Format(time.RFC3339) + } + } + + if t.LatestCompleted == "" { t.LatestCompleted = t.CreatedAt } + latestCompleted, err = time.Parse(time.RFC3339, t.LatestCompleted) + if err != nil { + return nil, err + } return t, nil } @@ -200,9 +213,7 @@ func (s *Service) FindTasks(ctx context.Context, filter influxdb.TaskFilter) ([] } func (s *Service) findTasks(ctx context.Context, tx Tx, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) { - if filter.User == nil && filter.OrganizationID == nil && filter.Organization == "" { - return nil, 0, errors.New("find tasks requires filtering by org or user") - } + var org *influxdb.Organization var err error if filter.OrganizationID != nil { @@ -227,87 +238,119 @@ func (s *Service) findTasks(ctx context.Context, tx Tx, filter influxdb.TaskFilt filter.Limit = influxdb.TaskDefaultPageSize } - var ts []*influxdb.Task // filter by user id. if filter.User != nil { - maps, err := s.findUserResourceMappings( - ctx, - tx, - influxdb.UserResourceMappingFilter{ - ResourceType: influxdb.TasksResourceType, - UserID: *filter.User, - UserType: influxdb.Owner, - }, - ) + return s.findTasksByUser(ctx, tx, filter) + } else if org != nil { + return s.findTaskByOrg(ctx, tx, filter) + } + + return s.findAllTasks(ctx, tx, filter) +} + +// findTasksByUser is a subset of the find tasks function. Used for cleanliness +func (s *Service) findTasksByUser(ctx context.Context, tx Tx, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) { + if filter.User == nil { + return nil, 0, ErrTaskNotFound + } + + var org *influxdb.Organization + var err error + if filter.OrganizationID != nil { + org, err = s.findOrganizationByID(ctx, tx, *filter.OrganizationID) if err != nil { return nil, 0, err } + } else if filter.Organization != "" { + org, err = s.findOrganizationByName(ctx, tx, filter.Organization) + if err != nil { + return nil, 0, err + } + } - for _, m := range maps { - task, err := s.findTaskByID(ctx, tx, m.ResourceID) - if err != nil { - return nil, 0, err - } + var ts []*influxdb.Task - if org != nil && task.OrganizationID != org.ID { - continue - } + maps, err := s.findUserResourceMappings( + ctx, + tx, + influxdb.UserResourceMappingFilter{ + ResourceType: influxdb.TasksResourceType, + UserID: *filter.User, + UserType: influxdb.Owner, + }, + ) + if err != nil { + return nil, 0, err + } - ts = append(ts, task) + for _, m := range maps { + task, err := s.findTaskByID(ctx, tx, m.ResourceID) + if err != nil { + return nil, 0, err + } - if len(ts) >= filter.Limit { - break - } + if org != nil && task.OrganizationID != org.ID { + continue } - } else { - indexBucket, err := tx.Bucket(taskIndexBucket) - if err != nil { - return nil, 0, ErrUnexpectedTaskBucketErr(err) + + ts = append(ts, task) + + if len(ts) >= filter.Limit { + break } + } + return ts, len(ts), nil +} - c, err := indexBucket.Cursor() +// findTaskByOrg is a subset of the find tasks function. Used for cleanliness +func (s *Service) findTaskByOrg(ctx context.Context, tx Tx, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) { + var org *influxdb.Organization + var err error + if filter.OrganizationID != nil { + org, err = s.findOrganizationByID(ctx, tx, *filter.OrganizationID) if err != nil { - return nil, 0, ErrUnexpectedTaskBucketErr(err) + return nil, 0, err } - // we can filter by orgID - if filter.After != nil { - key, err := taskOrgKey(org.ID, *filter.After) - if err != nil { - return nil, 0, err - } - // ignore the key:val returned in this seek because we are starting "after" - // this key - c.Seek(key) - } else { - // if we dont have an after we just move the cursor to the first instance of the - // orgID - key, err := org.ID.Encode() - if err != nil { - return nil, 0, ErrInvalidTaskID - } - k, v := c.Seek(key) - if k != nil { - id, err := influxdb.IDFromString(string(v)) - if err != nil { - return nil, 0, ErrInvalidTaskID - } - - t, err := s.findTaskByID(ctx, tx, *id) - if err != nil { - return nil, 0, err - } - - // insert the new task into the list - ts = append(ts, t) - } + } else if filter.Organization != "" { + org, err = s.findOrganizationByName(ctx, tx, filter.Organization) + if err != nil { + return nil, 0, err } + } - for { - k, v := c.Next() - if k == nil { - break - } + if org == nil { + return nil, 0, ErrTaskNotFound + } + var ts []*influxdb.Task + + indexBucket, err := tx.Bucket(taskIndexBucket) + if err != nil { + return nil, 0, ErrUnexpectedTaskBucketErr(err) + } + + c, err := indexBucket.Cursor() + if err != nil { + return nil, 0, ErrUnexpectedTaskBucketErr(err) + } + // we can filter by orgID + if filter.After != nil { + key, err := taskOrgKey(org.ID, *filter.After) + if err != nil { + return nil, 0, err + } + // ignore the key:val returned in this seek because we are starting "after" + // this key + c.Seek(key) + } else { + // if we dont have an after we just move the cursor to the first instance of the + // orgID + key, err := org.ID.Encode() + if err != nil { + return nil, 0, ErrInvalidTaskID + } + k, v := c.Seek(key) + if k != nil { id, err := influxdb.IDFromString(string(v)) if err != nil { return nil, 0, ErrInvalidTaskID @@ -318,22 +361,118 @@ func (s *Service) findTasks(ctx context.Context, tx Tx, filter influxdb.TaskFilt return nil, 0, err } - // If the new task doesn't belong to the org we have looped outside the org filter - if org != nil && t.OrganizationID != org.ID { - break - } - // insert the new task into the list ts = append(ts, t) + } + } - // Check if we are over running the limit - if len(ts) >= filter.Limit { - break - } + for { + k, v := c.Next() + if k == nil { + break + } + + id, err := influxdb.IDFromString(string(v)) + if err != nil { + return nil, 0, ErrInvalidTaskID + } + + t, err := s.findTaskByID(ctx, tx, *id) + if err != nil { + return nil, 0, err + } + + // If the new task doesn't belong to the org we have looped outside the org filter + if org != nil && t.OrganizationID != org.ID { + break + } + + // insert the new task into the list + ts = append(ts, t) + + // Check if we are over running the limit + if len(ts) >= filter.Limit { + break } } + return ts, len(ts), err +} - return ts, len(ts), nil +// findAllTasks is a subset of the find tasks function. Used for cleanliness. +// This function should only be executed internally because it doesn't force organization or user filtering. +// Enforcing filters should be done in a validation layer. +func (s *Service) findAllTasks(ctx context.Context, tx Tx, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) { + var ts []*influxdb.Task + + taskBucket, err := tx.Bucket(taskBucket) + if err != nil { + return nil, 0, ErrUnexpectedTaskBucketErr(err) + } + + c, err := taskBucket.Cursor() + if err != nil { + return nil, 0, ErrUnexpectedTaskBucketErr(err) + } + // we can filter by orgID + if filter.After != nil { + + key, err := taskKey(*filter.After) + if err != nil { + return nil, 0, err + } + // ignore the key:val returned in this seek because we are starting "after" + // this key + c.Seek(key) + } else { + k, v := c.First() + if k == nil { + return ts, len(ts), nil + } + + t := &influxdb.Task{} + if err := json.Unmarshal(v, t); err != nil { + return nil, 0, ErrInternalTaskServiceError(err) + } + latestCompleted, err := s.findLatestCompletedTime(ctx, tx, t.ID) + if err != nil { + return nil, 0, err + } + if !latestCompleted.IsZero() { + t.LatestCompleted = latestCompleted.Format(time.RFC3339) + } else { + t.LatestCompleted = t.CreatedAt + } + // insert the new task into the list + ts = append(ts, t) + } + + for { + k, v := c.Next() + if k == nil { + break + } + t := &influxdb.Task{} + if err := json.Unmarshal(v, t); err != nil { + return nil, 0, ErrInternalTaskServiceError(err) + } + latestCompleted, err := s.findLatestCompletedTime(ctx, tx, t.ID) + if err != nil { + return nil, 0, err + } + if !latestCompleted.IsZero() { + t.LatestCompleted = latestCompleted.Format(time.RFC3339) + } else { + t.LatestCompleted = t.CreatedAt + } + // insert the new task into the list + ts = append(ts, t) + + // Check if we are over running the limit + if len(ts) >= filter.Limit { + break + } + } + return ts, len(ts), err } // CreateTask creates a new task. @@ -363,7 +502,15 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate) auth, err := s.findAuthorizationByToken(ctx, tx, tc.Token) if err != nil { - return nil, err + if err.Error() != " authorization not found" { + return nil, err + } + // if i cant find an authoriaztion based on the token we will use the users authID + auth, err = s.findAuthorizationByID(ctx, tx, userAuth.Identifier()) + if err != nil { + // if we still fail to fine a real auth we cannot continue + return nil, err + } } var org *influxdb.Organization @@ -401,9 +548,11 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate) Flux: tc.Flux, Every: opt.Every.String(), Cron: opt.Cron, - Offset: opt.Offset.String(), CreatedAt: time.Now().UTC().Format(time.RFC3339), } + if opt.Offset != nil { + task.Offset = opt.Offset.String() + } taskBucket, err := tx.Bucket(taskBucket) if err != nil { @@ -510,6 +659,10 @@ func (s *Service) updateTask(ctx context.Context, tx Tx, id influxdb.ID, upd inf task.Status = *upd.Status } + if upd.LatestCompleted != nil { + task.LatestCompleted = *upd.LatestCompleted + } + task.UpdatedAt = time.Now().UTC().Format(time.RFC3339) // save the updated task bucket, err := tx.Bucket(taskBucket) @@ -640,8 +793,8 @@ func (s *Service) findLogs(ctx context.Context, tx Tx, filter influxdb.LogFilter return nil, 0, err } rtn := make([]*influxdb.Log, len(r.Log)) - for i, log := range r.Log { - rtn[i] = &log + for i := 0; i < len(r.Log); i++ { + rtn[i] = &r.Log[i] } return rtn, len(rtn), nil } @@ -1034,6 +1187,14 @@ func (s *Service) createNextRun(ctx context.Context, tx Tx, taskID influxdb.ID, return backend.RunCreation{}, ErrTaskTimeParse(err) } + // we could have a latest completed newer then the created at time. + if task.LatestCompleted != "" { + lc, err := time.Parse(time.RFC3339, task.LatestCompleted) + if err == nil && lc.After(latestCompleted) { + latestCompleted = lc + } + } + lRun, err := s.findLatestCompleted(ctx, tx, taskID) if err != nil { return backend.RunCreation{}, err @@ -1048,6 +1209,27 @@ func (s *Service) createNextRun(ctx context.Context, tx Tx, taskID influxdb.ID, latestCompleted = runTime } } + // Align create to the hour/minute + // If we decide we no longer want to do this we can just remove the code block below + { + if strings.HasPrefix(task.EffectiveCron(), "@every ") { + everyString := strings.TrimPrefix(task.EffectiveCron(), "@every ") + every := options.Duration{} + err := every.Parse(everyString) + if err != nil { + // We cannot align a invalid time + goto NoChange + } + t := time.Unix(latestCompleted.Unix(), 0) + everyDur, err := every.DurationFrom(t) + if err != nil { + goto NoChange + } + t = t.Truncate(everyDur) + latestCompleted = t.Truncate(time.Second) + } + NoChange: + } // create a run if possible sch, err := cron.Parse(task.EffectiveCron()) @@ -1306,18 +1488,11 @@ func (s *Service) nextDueRun(ctx context.Context, tx Tx, taskID influxdb.ID) (in return 0, err } - latestCompleted, err := s.findLatestCompletedTime(ctx, tx, taskID) + latestCompleted, err := time.Parse(time.RFC3339, task.LatestCompleted) if err != nil { return 0, err } - if latestCompleted.IsZero() { - latestCompleted, err = time.Parse(time.RFC3339, task.CreatedAt) - if err != nil { - return 0, ErrTaskTimeParse(err) - } - } - // create a run if possible sch, err := cron.Parse(task.EffectiveCron()) if err != nil { diff --git a/task.go b/task.go index 05ff7d57783..32e5f36ffd0 100644 --- a/task.go +++ b/task.go @@ -155,6 +155,10 @@ func (t TaskCreate) Validate() error { type TaskUpdate struct { Flux *string `json:"flux,omitempty"` Status *string `json:"status,omitempty"` + + // LatestCompleted us to set latest completed on startup to skip task catchup + LatestCompleted *string `json:"-"` + // Options gets unmarshalled from json as if it was flat, with the same level as Flux and Status. Options options.Options // when we unmarshal this gets unmarshalled from flat key-values diff --git a/task/backend/analytical_storage.go b/task/backend/analytical_storage.go index aa951362bd4..a94d57b8387 100644 --- a/task/backend/analytical_storage.go +++ b/task/backend/analytical_storage.go @@ -94,7 +94,7 @@ func (as *AnalyticalStorage) FindLogs(ctx context.Context, filter influxdb.LogFi return nil, 0, err } for i := 0; i < len(run.Log); i++ { - logs = append(logs, &run.Log[0]) + logs = append(logs, &run.Log[i]) } return logs, len(logs), nil } @@ -107,7 +107,7 @@ func (as *AnalyticalStorage) FindLogs(ctx context.Context, filter influxdb.LogFi for _, run := range runs { for i := 0; i < len(run.Log); i++ { - logs = append(logs, &run.Log[0]) + logs = append(logs, &run.Log[i]) } } diff --git a/task/backend/coordinator/coordinator.go b/task/backend/coordinator/coordinator.go index 18c55fb7c4a..cde35e348fd 100644 --- a/task/backend/coordinator/coordinator.go +++ b/task/backend/coordinator/coordinator.go @@ -3,6 +3,7 @@ package coordinator import ( "context" "fmt" + "time" platform "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/task/backend" @@ -58,12 +59,17 @@ func New(logger *zap.Logger, scheduler backend.Scheduler, ts platform.TaskServic func (c *Coordinator) claimExistingTasks() { tasks, _, err := c.TaskService.FindTasks(context.Background(), platform.TaskFilter{}) if err != nil { - c.logger.Error("failed to list tasks", zap.Error(err)) return } - + newLatestCompleted := time.Now().UTC().Format(time.RFC3339) for len(tasks) > 0 { for _, task := range tasks { + + task, err := c.TaskService.UpdateTask(context.Background(), task.ID, platform.TaskUpdate{LatestCompleted: &newLatestCompleted}) + if err != nil { + c.logger.Error("failed to set latestCompleted", zap.Error(err)) + } + if task.Status != string(backend.TaskActive) { // Don't claim inactive tasks at startup. continue diff --git a/task/backend/coordinator/coordinator_test.go b/task/backend/coordinator/coordinator_test.go index 26de5e0e010..9d47d8ecb92 100644 --- a/task/backend/coordinator/coordinator_test.go +++ b/task/backend/coordinator/coordinator_test.go @@ -67,6 +67,10 @@ func inmemTaskService() platform.TaskService { if upd.Status != nil { t.Status = *upd.Status } + if upd.LatestCompleted != nil { + t.LatestCompleted = *upd.LatestCompleted + } + return t, nil }, FindTaskByIDFn: func(ctx context.Context, id platform.ID) (*platform.Task, error) { @@ -76,7 +80,8 @@ func inmemTaskService() platform.TaskService { if !ok { return nil, backend.ErrTaskNotFound } - return t, nil + newt := *t + return &newt, nil }, FindTasksFn: func(ctx context.Context, tf platform.TaskFilter) ([]*platform.Task, int, error) { mu.Lock() @@ -254,6 +259,14 @@ func TestCoordinator_ClaimExistingTasks(t *testing.T) { } createdIDs[i] = task.ID } + origActive, err := ts.FindTaskByID(context.Background(), createdIDs[0]) + if err != nil { + t.Fatal(err) + } + origInactive, err := ts.FindTaskByID(context.Background(), createdIDs[inactiveTaskIndex]) + if err != nil { + t.Fatal(err) + } coordinator.New(zaptest.NewLogger(t), sched, ts) @@ -275,6 +288,24 @@ func TestCoordinator_ClaimExistingTasks(t *testing.T) { t.Fatalf("did not find created task with ID %s", id) } } + + active, err := ts.FindTaskByID(context.Background(), createdIDs[0]) + if err != nil { + t.Fatal(err) + } + inactive, err := ts.FindTaskByID(context.Background(), createdIDs[inactiveTaskIndex]) + if err != nil { + t.Fatal(err) + } + + if origActive.LatestCompleted == active.LatestCompleted { + t.Fatalf("active tasks not update with latest completed time") + } + + if origInactive.LatestCompleted == inactive.LatestCompleted { + t.Fatalf("inactive tasks not update with latest completed time") + } + } func TestCoordinator_ForceRun(t *testing.T) { diff --git a/task/servicetest/servicetest.go b/task/servicetest/servicetest.go index 9a9471d03af..7c08b590df6 100644 --- a/task/servicetest/servicetest.go +++ b/task/servicetest/servicetest.go @@ -1253,7 +1253,7 @@ func testLogsAcrossStorage(t *testing.T, sys *System) { } // Create several run logs in both rc0 and rc1 - // + // We can then finalize rc1 and ensure that both the transactional (currently running logs) can be found with analytical (completed) logs. sys.TaskControlService.AddRunLog(sys.Ctx, task.ID, rc0.Created.RunID, time.Now(), "0-0") sys.TaskControlService.AddRunLog(sys.Ctx, task.ID, rc0.Created.RunID, time.Now(), "0-1") sys.TaskControlService.AddRunLog(sys.Ctx, task.ID, rc0.Created.RunID, time.Now(), "0-2") @@ -1275,6 +1275,16 @@ func testLogsAcrossStorage(t *testing.T, sys *System) { } t.Fatalf("failed to get all logs: expected: 7 got: %d", len(logs)) } + smash := func(logs []*influxdb.Log) string { + smashed := "" + for _, log := range logs { + smashed = smashed + log.Message + } + return smashed + } + if smash(logs) != "0-00-10-21-01-11-21-3" { + t.Fatalf("log contents not acceptable, expected: %q, got: %q", "0-00-10-21-01-11-21-3", smash(logs)) + } logs, _, err = sys.TaskService.FindLogs(sys.Ctx, influxdb.LogFilter{Task: task.ID, Run: &rc1.Created.RunID}) if err != nil { @@ -1284,6 +1294,10 @@ func testLogsAcrossStorage(t *testing.T, sys *System) { t.Fatalf("failed to get all logs: expected: 4 got: %d", len(logs)) } + if smash(logs) != "1-01-11-21-3" { + t.Fatalf("log contents not acceptable, expected: %q, got: %q", "1-01-11-21-3", smash(logs)) + } + logs, _, err = sys.TaskService.FindLogs(sys.Ctx, influxdb.LogFilter{Task: task.ID, Run: &rc0.Created.RunID}) if err != nil { t.Fatal(err) @@ -1292,6 +1306,10 @@ func testLogsAcrossStorage(t *testing.T, sys *System) { t.Fatalf("failed to get all logs: expected: 3 got: %d", len(logs)) } + if smash(logs) != "0-00-10-2" { + t.Fatalf("log contents not acceptable, expected: %q, got: %q", "0-00-10-2", smash(logs)) + } + } func creds(t *testing.T, s *System) TestCreds {