From a7cef5e24f4d4e94cffdea186c6e83be7766a377 Mon Sep 17 00:00:00 2001 From: Chlins Zhang Date: Tue, 4 Apr 2023 08:42:18 +0800 Subject: [PATCH] fix: wrap schedule/exec/task creation as orm tx (#18458) Wrap orm tx when the scheduler try to create the task because submit job maybe failure depends on the jobservice. Fixes: #18452 Signed-off-by: chlins --- src/controller/systemartifact/execution.go | 30 ++++++------- src/core/main.go | 22 ++++++++-- src/pkg/scheduler/scheduler.go | 50 ++++++++++++++-------- src/pkg/scheduler/scheduler_test.go | 18 +++++--- 4 files changed, 76 insertions(+), 44 deletions(-) diff --git a/src/controller/systemartifact/execution.go b/src/controller/systemartifact/execution.go index 5191a39310e..b7e4dcd6c03 100644 --- a/src/controller/systemartifact/execution.go +++ b/src/controller/systemartifact/execution.go @@ -5,7 +5,6 @@ import ( "time" "github.com/goharbor/harbor/src/jobservice/job" - "github.com/goharbor/harbor/src/jobservice/logger" "github.com/goharbor/harbor/src/lib/log" "github.com/goharbor/harbor/src/lib/orm" "github.com/goharbor/harbor/src/lib/q" @@ -63,7 +62,7 @@ func (c *controller) Start(ctx context.Context, async bool, trigger string) erro return err } - logger.Info("Created job for scan data export successfully") + log.Info("Created job for scan data export successfully") return nil } go func(ctx context.Context) { @@ -77,7 +76,7 @@ func (c *controller) Start(ctx context.Context, async bool, trigger string) erro } err = c.createCleanupTask(ctx, jobParams, execID) if err != nil { - logger.Errorf("Encountered error in scan data artifact cleanup : %v", err) + log.Errorf("Encountered error in scan data artifact cleanup : %v", err) return } }(c.makeCtx()) @@ -97,7 +96,7 @@ func (c *controller) createCleanupTask(ctx context.Context, jobParams job.Parame _, err := c.taskMgr.Create(ctx, execID, j) if err != nil { - logger.Errorf("Unable to create a scan data export job in clean-up mode : %v", err) + log.Errorf("Unable to create a scan data export job in clean-up mode : %v", err) c.markError(ctx, execID, err) return err } @@ -107,44 +106,45 @@ func (c *controller) createCleanupTask(ctx context.Context, jobParams job.Parame func (c *controller) markError(ctx context.Context, executionID int64, err error) { // try to stop the execution first in case that some tasks are already created if err := c.execMgr.StopAndWait(ctx, executionID, 10*time.Second); err != nil { - logger.Errorf("failed to stop the execution %d: %v", executionID, err) + log.Errorf("failed to stop the execution %d: %v", executionID, err) } if err := c.execMgr.MarkError(ctx, executionID, err.Error()); err != nil { - logger.Errorf("failed to mark error for the execution %d: %v", executionID, err) + log.Errorf("failed to mark error for the execution %d: %v", executionID, err) } } // ScheduleCleanupTask schedules a system artifact cleanup task -func ScheduleCleanupTask(ctx context.Context) { - scheduleSystemArtifactCleanJob(ctx) +func ScheduleCleanupTask(ctx context.Context) error { + return scheduleSystemArtifactCleanJob(ctx) } -func scheduleSystemArtifactCleanJob(ctx context.Context) { +func scheduleSystemArtifactCleanJob(ctx context.Context) error { schedule, err := getSystemArtifactCleanupSchedule(ctx) if err != nil { - return + return err } if schedule != nil { - logger.Debugf(" Export data cleanup job already scheduled with ID : %v.", schedule.ID) - return + log.Debugf("Export data cleanup job already scheduled with ID : %v.", schedule.ID) + return nil } scheduleID, err := sched.Schedule(ctx, job.SystemArtifactCleanupVendorType, 0, cronTypeDaily, cronSpec, SystemArtifactCleanupCallback, nil, nil) if err != nil { log.Errorf("Encountered error when scheduling scan data export cleanup job : %v", err) - return + return err } log.Infof("Scheduled scan data export cleanup job with ID : %v", scheduleID) + return nil } func getSystemArtifactCleanupSchedule(ctx context.Context) (*scheduler.Schedule, error) { query := q.New(map[string]interface{}{"vendor_type": job.SystemArtifactCleanupVendorType}) schedules, err := sched.ListSchedules(ctx, query) if err != nil { - logger.Errorf("Unable to check if export data cleanup job is already scheduled : %v", err) + log.Errorf("Unable to check if export data cleanup job is already scheduled : %v", err) return nil, err } if len(schedules) > 0 { - logger.Infof("Found export data cleanup job with schedule id : %v", schedules[0].ID) + log.Debugf("Found export data cleanup job with schedule id : %v", schedules[0].ID) return schedules[0], nil } return nil, nil diff --git a/src/core/main.go b/src/core/main.go index b7695d3ae9a..10ef6848375 100755 --- a/src/core/main.go +++ b/src/core/main.go @@ -259,11 +259,27 @@ func main() { log.Errorf("failed to check the jobservice health status: timeout, error: %v", err) return } - + // schedule the system jobs with retry as the operation depends on the jobservice, + // retry to handle the failure case caused by jobservice. + ctx := orm.Context() + options = []retry.Option{ + retry.InitialInterval(time.Millisecond * 500), + retry.MaxInterval(time.Second * 10), + retry.Timeout(time.Minute * 5), + retry.Callback(func(err error, sleep time.Duration) { + log.Debugf("failed to schedule system job, retry after %s : %v", sleep, err) + }), + } // schedule system artifact cleanup job - systemartifact.ScheduleCleanupTask(ctx) + if err := retry.Retry(func() error { + return systemartifact.ScheduleCleanupTask(ctx) + }, options...); err != nil { + log.Errorf("failed to schedule system artifact cleanup job, error: %v", err) + } // schedule system execution sweep job - if err := task.ScheduleSweepJob(ctx); err != nil { + if err := retry.Retry(func() error { + return task.ScheduleSweepJob(ctx) + }, options...); err != nil { log.Errorf("failed to schedule system execution sweep job, error: %v", err) } }() diff --git a/src/pkg/scheduler/scheduler.go b/src/pkg/scheduler/scheduler.go index 1ac5dda3e87..27227bb47b5 100644 --- a/src/pkg/scheduler/scheduler.go +++ b/src/pkg/scheduler/scheduler.go @@ -24,6 +24,7 @@ import ( "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/lib/errors" "github.com/goharbor/harbor/src/lib/log" + "github.com/goharbor/harbor/src/lib/orm" "github.com/goharbor/harbor/src/lib/q" "github.com/goharbor/harbor/src/pkg/task" ) @@ -133,29 +134,40 @@ func (s *scheduler) Schedule(ctx context.Context, vendorType string, vendorID in } sched.ExtraAttrs = string(extrasData) - // create schedule record - // when checkin hook comes, the database record must exist, - // so the database record must be created first before submitting job - id, err := s.dao.Create(ctx, sched) - if err != nil { - return 0, err + var scheduleID, taskID int64 + // ensureTask makes sure the task has been created at the end + ensureTask := func(ctx context.Context) error { + // create schedule record + // when checkin hook comes, the database record must exist, + // so the database record must be created first before submitting job + scheduleID, err = s.dao.Create(ctx, sched) + if err != nil { + return err + } + // create execution by schedule id + execID, err := s.execMgr.Create(ctx, JobNameScheduler, scheduleID, task.ExecutionTriggerManual, params) + if err != nil { + return err + } + // create task by execution id, maybe failed if error to submit job to jobservice, + // so wrap these 3 actions as a transaction. + taskID, err = s.taskMgr.Create(ctx, execID, &task.Job{ + Name: JobNameScheduler, + Metadata: &job.Metadata{ + JobKind: job.KindPeriodic, + Cron: cron, + }, + }) + if err != nil { + return err + } + return nil } - execID, err := s.execMgr.Create(ctx, JobNameScheduler, id, task.ExecutionTriggerManual, params) - if err != nil { + if err = orm.WithTransaction(ensureTask)(orm.SetTransactionOpNameToContext(ctx, "tx-ensure-schedule-task")); err != nil { return 0, err } - taskID, err := s.taskMgr.Create(ctx, execID, &task.Job{ - Name: JobNameScheduler, - Metadata: &job.Metadata{ - JobKind: job.KindPeriodic, - Cron: cron, - }, - }) - if err != nil { - return 0, err - } // make sure the created task is stopped if got any error in the following steps defer func() { if err == nil { @@ -179,7 +191,7 @@ func (s *scheduler) Schedule(ctx context.Context, vendorType string, vendorID in return 0, err } - return id, nil + return scheduleID, nil } func (s *scheduler) UnScheduleByID(ctx context.Context, id int64) error { diff --git a/src/pkg/scheduler/scheduler_test.go b/src/pkg/scheduler/scheduler_test.go index b530876381a..3eed26f9141 100644 --- a/src/pkg/scheduler/scheduler_test.go +++ b/src/pkg/scheduler/scheduler_test.go @@ -22,13 +22,16 @@ import ( "github.com/stretchr/testify/suite" "github.com/goharbor/harbor/src/jobservice/job" + "github.com/goharbor/harbor/src/lib/orm" "github.com/goharbor/harbor/src/pkg/task" + ormtesting "github.com/goharbor/harbor/src/testing/lib/orm" "github.com/goharbor/harbor/src/testing/mock" tasktesting "github.com/goharbor/harbor/src/testing/pkg/task" ) type schedulerTestSuite struct { suite.Suite + ctx context.Context scheduler *scheduler dao *mockDAO execMgr *tasktesting.ExecutionManager @@ -40,6 +43,7 @@ func (s *schedulerTestSuite) SetupTest() { err := RegisterCallbackFunc("callback", func(context.Context, string) error { return nil }) s.Require().Nil(err) + s.ctx = orm.NewContext(nil, &ormtesting.FakeOrmer{}) s.dao = &mockDAO{} s.execMgr = &tasktesting.ExecutionManager{} s.taskMgr = &tasktesting.Manager{} @@ -54,15 +58,15 @@ func (s *schedulerTestSuite) SetupTest() { func (s *schedulerTestSuite) TestSchedule() { // empty vendor type extras := make(map[string]interface{}) - id, err := s.scheduler.Schedule(nil, "", 0, "", "0 * * * * *", "callback", nil, extras) + id, err := s.scheduler.Schedule(s.ctx, "", 0, "", "0 * * * * *", "callback", nil, extras) s.NotNil(err) // invalid cron - id, err = s.scheduler.Schedule(nil, "vendor", 1, "", "", "callback", nil, extras) + id, err = s.scheduler.Schedule(s.ctx, "vendor", 1, "", "", "callback", nil, extras) s.NotNil(err) // callback function not exist - id, err = s.scheduler.Schedule(nil, "vendor", 1, "", "0 * * * * *", "not-exist", nil, extras) + id, err = s.scheduler.Schedule(s.ctx, "vendor", 1, "", "0 * * * * *", "not-exist", nil, extras) s.NotNil(err) // failed to submit to jobservice @@ -75,7 +79,7 @@ func (s *schedulerTestSuite) TestSchedule() { Status: job.ErrorStatus.String(), }, nil) s.taskMgr.On("Stop", mock.Anything, mock.Anything).Return(nil) - _, err = s.scheduler.Schedule(nil, "vendor", 1, "", "0 * * * * *", "callback", "param", extras) + _, err = s.scheduler.Schedule(s.ctx, "vendor", 1, "", "0 * * * * *", "callback", "param", extras) s.Require().NotNil(err) s.dao.AssertExpectations(s.T()) s.execMgr.AssertExpectations(s.T()) @@ -93,7 +97,7 @@ func (s *schedulerTestSuite) TestSchedule() { ExecutionID: 1, Status: job.SuccessStatus.String(), }, nil) - id, err = s.scheduler.Schedule(nil, "vendor", 1, "", "0 * * * * *", "callback", "param", extras) + id, err = s.scheduler.Schedule(s.ctx, "vendor", 1, "", "0 * * * * *", "callback", "param", extras) s.Require().Nil(err) s.Equal(int64(1), id) s.dao.AssertExpectations(s.T()) @@ -161,7 +165,7 @@ func (s *schedulerTestSuite) TestGetSchedule() { CRON: "0 * * * * *", }, nil) s.execMgr.On("List", mock.Anything, mock.Anything).Return(nil, nil) - schd, err := s.scheduler.GetSchedule(nil, 1) + schd, err := s.scheduler.GetSchedule(s.ctx, 1) s.Require().Nil(err) s.Equal("0 * * * * *", schd.CRON) s.Equal(job.ErrorStatus.String(), schd.Status) @@ -184,7 +188,7 @@ func (s *schedulerTestSuite) TestGetSchedule() { Status: job.SuccessStatus.String(), }, }, nil) - schd, err = s.scheduler.GetSchedule(nil, 1) + schd, err = s.scheduler.GetSchedule(s.ctx, 1) s.Require().Nil(err) s.Equal("0 * * * * *", schd.CRON) s.Equal(job.SuccessStatus.String(), schd.Status)