Skip to content

Commit

Permalink
fix: wrap schedule/exec/task creation as orm tx (#18458)
Browse files Browse the repository at this point in the history
Wrap orm tx when the scheduler try to create the task because submit job
maybe failure depends on the jobservice.

Fixes: #18452

Signed-off-by: chlins <[email protected]>
  • Loading branch information
chlins authored Apr 4, 2023
1 parent 02c51c6 commit a7cef5e
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 44 deletions.
30 changes: 15 additions & 15 deletions src/controller/systemartifact/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"time"

"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/logger"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/lib/q"
Expand Down Expand Up @@ -63,7 +62,7 @@ func (c *controller) Start(ctx context.Context, async bool, trigger string) erro
return err
}

logger.Info("Created job for scan data export successfully")
log.Info("Created job for scan data export successfully")
return nil
}
go func(ctx context.Context) {
Expand All @@ -77,7 +76,7 @@ func (c *controller) Start(ctx context.Context, async bool, trigger string) erro
}
err = c.createCleanupTask(ctx, jobParams, execID)
if err != nil {
logger.Errorf("Encountered error in scan data artifact cleanup : %v", err)
log.Errorf("Encountered error in scan data artifact cleanup : %v", err)
return
}
}(c.makeCtx())
Expand All @@ -97,7 +96,7 @@ func (c *controller) createCleanupTask(ctx context.Context, jobParams job.Parame
_, err := c.taskMgr.Create(ctx, execID, j)

if err != nil {
logger.Errorf("Unable to create a scan data export job in clean-up mode : %v", err)
log.Errorf("Unable to create a scan data export job in clean-up mode : %v", err)
c.markError(ctx, execID, err)
return err
}
Expand All @@ -107,44 +106,45 @@ func (c *controller) createCleanupTask(ctx context.Context, jobParams job.Parame
func (c *controller) markError(ctx context.Context, executionID int64, err error) {
// try to stop the execution first in case that some tasks are already created
if err := c.execMgr.StopAndWait(ctx, executionID, 10*time.Second); err != nil {
logger.Errorf("failed to stop the execution %d: %v", executionID, err)
log.Errorf("failed to stop the execution %d: %v", executionID, err)
}
if err := c.execMgr.MarkError(ctx, executionID, err.Error()); err != nil {
logger.Errorf("failed to mark error for the execution %d: %v", executionID, err)
log.Errorf("failed to mark error for the execution %d: %v", executionID, err)
}
}

// ScheduleCleanupTask schedules a system artifact cleanup task
func ScheduleCleanupTask(ctx context.Context) {
scheduleSystemArtifactCleanJob(ctx)
func ScheduleCleanupTask(ctx context.Context) error {
return scheduleSystemArtifactCleanJob(ctx)
}

func scheduleSystemArtifactCleanJob(ctx context.Context) {
func scheduleSystemArtifactCleanJob(ctx context.Context) error {
schedule, err := getSystemArtifactCleanupSchedule(ctx)
if err != nil {
return
return err
}
if schedule != nil {
logger.Debugf(" Export data cleanup job already scheduled with ID : %v.", schedule.ID)
return
log.Debugf("Export data cleanup job already scheduled with ID : %v.", schedule.ID)
return nil
}
scheduleID, err := sched.Schedule(ctx, job.SystemArtifactCleanupVendorType, 0, cronTypeDaily, cronSpec, SystemArtifactCleanupCallback, nil, nil)
if err != nil {
log.Errorf("Encountered error when scheduling scan data export cleanup job : %v", err)
return
return err
}
log.Infof("Scheduled scan data export cleanup job with ID : %v", scheduleID)
return nil
}

func getSystemArtifactCleanupSchedule(ctx context.Context) (*scheduler.Schedule, error) {
query := q.New(map[string]interface{}{"vendor_type": job.SystemArtifactCleanupVendorType})
schedules, err := sched.ListSchedules(ctx, query)
if err != nil {
logger.Errorf("Unable to check if export data cleanup job is already scheduled : %v", err)
log.Errorf("Unable to check if export data cleanup job is already scheduled : %v", err)
return nil, err
}
if len(schedules) > 0 {
logger.Infof("Found export data cleanup job with schedule id : %v", schedules[0].ID)
log.Debugf("Found export data cleanup job with schedule id : %v", schedules[0].ID)
return schedules[0], nil
}
return nil, nil
Expand Down
22 changes: 19 additions & 3 deletions src/core/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,27 @@ func main() {
log.Errorf("failed to check the jobservice health status: timeout, error: %v", err)
return
}

// schedule the system jobs with retry as the operation depends on the jobservice,
// retry to handle the failure case caused by jobservice.
ctx := orm.Context()
options = []retry.Option{
retry.InitialInterval(time.Millisecond * 500),
retry.MaxInterval(time.Second * 10),
retry.Timeout(time.Minute * 5),
retry.Callback(func(err error, sleep time.Duration) {
log.Debugf("failed to schedule system job, retry after %s : %v", sleep, err)
}),
}
// schedule system artifact cleanup job
systemartifact.ScheduleCleanupTask(ctx)
if err := retry.Retry(func() error {
return systemartifact.ScheduleCleanupTask(ctx)
}, options...); err != nil {
log.Errorf("failed to schedule system artifact cleanup job, error: %v", err)
}
// schedule system execution sweep job
if err := task.ScheduleSweepJob(ctx); err != nil {
if err := retry.Retry(func() error {
return task.ScheduleSweepJob(ctx)
}, options...); err != nil {
log.Errorf("failed to schedule system execution sweep job, error: %v", err)
}
}()
Expand Down
50 changes: 31 additions & 19 deletions src/pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/lib/q"
"github.com/goharbor/harbor/src/pkg/task"
)
Expand Down Expand Up @@ -133,29 +134,40 @@ func (s *scheduler) Schedule(ctx context.Context, vendorType string, vendorID in
}
sched.ExtraAttrs = string(extrasData)

// create schedule record
// when checkin hook comes, the database record must exist,
// so the database record must be created first before submitting job
id, err := s.dao.Create(ctx, sched)
if err != nil {
return 0, err
var scheduleID, taskID int64
// ensureTask makes sure the task has been created at the end
ensureTask := func(ctx context.Context) error {
// create schedule record
// when checkin hook comes, the database record must exist,
// so the database record must be created first before submitting job
scheduleID, err = s.dao.Create(ctx, sched)
if err != nil {
return err
}
// create execution by schedule id
execID, err := s.execMgr.Create(ctx, JobNameScheduler, scheduleID, task.ExecutionTriggerManual, params)
if err != nil {
return err
}
// create task by execution id, maybe failed if error to submit job to jobservice,
// so wrap these 3 actions as a transaction.
taskID, err = s.taskMgr.Create(ctx, execID, &task.Job{
Name: JobNameScheduler,
Metadata: &job.Metadata{
JobKind: job.KindPeriodic,
Cron: cron,
},
})
if err != nil {
return err
}
return nil
}

execID, err := s.execMgr.Create(ctx, JobNameScheduler, id, task.ExecutionTriggerManual, params)
if err != nil {
if err = orm.WithTransaction(ensureTask)(orm.SetTransactionOpNameToContext(ctx, "tx-ensure-schedule-task")); err != nil {
return 0, err
}

taskID, err := s.taskMgr.Create(ctx, execID, &task.Job{
Name: JobNameScheduler,
Metadata: &job.Metadata{
JobKind: job.KindPeriodic,
Cron: cron,
},
})
if err != nil {
return 0, err
}
// make sure the created task is stopped if got any error in the following steps
defer func() {
if err == nil {
Expand All @@ -179,7 +191,7 @@ func (s *scheduler) Schedule(ctx context.Context, vendorType string, vendorID in
return 0, err
}

return id, nil
return scheduleID, nil
}

func (s *scheduler) UnScheduleByID(ctx context.Context, id int64) error {
Expand Down
18 changes: 11 additions & 7 deletions src/pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ import (
"github.com/stretchr/testify/suite"

"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/pkg/task"
ormtesting "github.com/goharbor/harbor/src/testing/lib/orm"
"github.com/goharbor/harbor/src/testing/mock"
tasktesting "github.com/goharbor/harbor/src/testing/pkg/task"
)

type schedulerTestSuite struct {
suite.Suite
ctx context.Context
scheduler *scheduler
dao *mockDAO
execMgr *tasktesting.ExecutionManager
Expand All @@ -40,6 +43,7 @@ func (s *schedulerTestSuite) SetupTest() {
err := RegisterCallbackFunc("callback", func(context.Context, string) error { return nil })
s.Require().Nil(err)

s.ctx = orm.NewContext(nil, &ormtesting.FakeOrmer{})
s.dao = &mockDAO{}
s.execMgr = &tasktesting.ExecutionManager{}
s.taskMgr = &tasktesting.Manager{}
Expand All @@ -54,15 +58,15 @@ func (s *schedulerTestSuite) SetupTest() {
func (s *schedulerTestSuite) TestSchedule() {
// empty vendor type
extras := make(map[string]interface{})
id, err := s.scheduler.Schedule(nil, "", 0, "", "0 * * * * *", "callback", nil, extras)
id, err := s.scheduler.Schedule(s.ctx, "", 0, "", "0 * * * * *", "callback", nil, extras)
s.NotNil(err)

// invalid cron
id, err = s.scheduler.Schedule(nil, "vendor", 1, "", "", "callback", nil, extras)
id, err = s.scheduler.Schedule(s.ctx, "vendor", 1, "", "", "callback", nil, extras)
s.NotNil(err)

// callback function not exist
id, err = s.scheduler.Schedule(nil, "vendor", 1, "", "0 * * * * *", "not-exist", nil, extras)
id, err = s.scheduler.Schedule(s.ctx, "vendor", 1, "", "0 * * * * *", "not-exist", nil, extras)
s.NotNil(err)

// failed to submit to jobservice
Expand All @@ -75,7 +79,7 @@ func (s *schedulerTestSuite) TestSchedule() {
Status: job.ErrorStatus.String(),
}, nil)
s.taskMgr.On("Stop", mock.Anything, mock.Anything).Return(nil)
_, err = s.scheduler.Schedule(nil, "vendor", 1, "", "0 * * * * *", "callback", "param", extras)
_, err = s.scheduler.Schedule(s.ctx, "vendor", 1, "", "0 * * * * *", "callback", "param", extras)
s.Require().NotNil(err)
s.dao.AssertExpectations(s.T())
s.execMgr.AssertExpectations(s.T())
Expand All @@ -93,7 +97,7 @@ func (s *schedulerTestSuite) TestSchedule() {
ExecutionID: 1,
Status: job.SuccessStatus.String(),
}, nil)
id, err = s.scheduler.Schedule(nil, "vendor", 1, "", "0 * * * * *", "callback", "param", extras)
id, err = s.scheduler.Schedule(s.ctx, "vendor", 1, "", "0 * * * * *", "callback", "param", extras)
s.Require().Nil(err)
s.Equal(int64(1), id)
s.dao.AssertExpectations(s.T())
Expand Down Expand Up @@ -161,7 +165,7 @@ func (s *schedulerTestSuite) TestGetSchedule() {
CRON: "0 * * * * *",
}, nil)
s.execMgr.On("List", mock.Anything, mock.Anything).Return(nil, nil)
schd, err := s.scheduler.GetSchedule(nil, 1)
schd, err := s.scheduler.GetSchedule(s.ctx, 1)
s.Require().Nil(err)
s.Equal("0 * * * * *", schd.CRON)
s.Equal(job.ErrorStatus.String(), schd.Status)
Expand All @@ -184,7 +188,7 @@ func (s *schedulerTestSuite) TestGetSchedule() {
Status: job.SuccessStatus.String(),
},
}, nil)
schd, err = s.scheduler.GetSchedule(nil, 1)
schd, err = s.scheduler.GetSchedule(s.ctx, 1)
s.Require().Nil(err)
s.Equal("0 * * * * *", schd.CRON)
s.Equal(job.SuccessStatus.String(), schd.Status)
Expand Down

0 comments on commit a7cef5e

Please sign in to comment.