diff --git a/src/controller/gc/controller.go b/src/controller/gc/controller.go index eb8bd903f4a1..b060e25dc12d 100644 --- a/src/controller/gc/controller.go +++ b/src/controller/gc/controller.go @@ -10,11 +10,6 @@ import ( "github.com/goharbor/harbor/src/pkg/task" ) -func init() { - // keep only the latest created 50 gc execution records - task.SetExecutionSweeperCount(job.GarbageCollectionVendorType, 50) -} - var ( // Ctl is a global garbage collection controller instance Ctl = NewController() diff --git a/src/controller/p2p/preheat/enforcer.go b/src/controller/p2p/preheat/enforcer.go index e017b648e7ca..072b560638aa 100644 --- a/src/controller/p2p/preheat/enforcer.go +++ b/src/controller/p2p/preheat/enforcer.go @@ -46,11 +46,6 @@ import ( "github.com/goharbor/harbor/src/pkg/task" ) -func init() { - // keep only the latest created 50 p2p preheat execution records - task.SetExecutionSweeperCount(job.P2PPreheatVendorType, 50) -} - const ( defaultSeverityCode = 99 extraAttrTotal = "totalCount" diff --git a/src/controller/replication/execution.go b/src/controller/replication/execution.go index 547d2057da59..8eb610365fc5 100644 --- a/src/controller/replication/execution.go +++ b/src/controller/replication/execution.go @@ -35,11 +35,6 @@ import ( "github.com/goharbor/harbor/src/pkg/task" ) -func init() { - // keep only the latest created 50 replication execution records - task.SetExecutionSweeperCount(job.ReplicationVendorType, 50) -} - // Ctl is a global replication controller instance var Ctl = NewController() diff --git a/src/controller/retention/controller.go b/src/controller/retention/controller.go index 967ad15a3259..642d07403630 100644 --- a/src/controller/retention/controller.go +++ b/src/controller/retention/controller.go @@ -31,11 +31,6 @@ import ( "github.com/goharbor/harbor/src/pkg/task" ) -func init() { - // keep only the latest created 50 retention execution records - task.SetExecutionSweeperCount(job.RetentionVendorType, 50) -} - // go:generate mockery -name Controller -case snake // Controller to handle the requests related with retention diff --git a/src/controller/scan/base_controller.go b/src/controller/scan/base_controller.go index 5eda6b723c6a..8963b682290a 100644 --- a/src/controller/scan/base_controller.go +++ b/src/controller/scan/base_controller.go @@ -69,11 +69,6 @@ const ( robotIDKey = "robot_id" ) -func init() { - // keep only the latest created 5 scan all execution records - task.SetExecutionSweeperCount(VendorTypeScanAll, 5) -} - // uuidGenerator is a func template which is for generating UUID. type uuidGenerator func() (string, error) diff --git a/src/controller/scandataexport/execution.go b/src/controller/scandataexport/execution.go index 95bb71e8eded..44b1d34b75ac 100644 --- a/src/controller/scandataexport/execution.go +++ b/src/controller/scandataexport/execution.go @@ -16,10 +16,6 @@ import ( "github.com/goharbor/harbor/src/pkg/task" ) -func init() { - task.SetExecutionSweeperCount(job.ScanDataExportVendorType, 50) -} - var Ctl = NewController() type Controller interface { diff --git a/src/controller/systemartifact/execution.go b/src/controller/systemartifact/execution.go index a178fda04ba8..5191a39310ec 100644 --- a/src/controller/systemartifact/execution.go +++ b/src/controller/systemartifact/execution.go @@ -24,10 +24,6 @@ var ( sched = scheduler.Sched ) -func init() { - task.SetExecutionSweeperCount(job.SystemArtifactCleanupVendorType, 50) -} - var Ctl = NewController() type Controller interface { diff --git a/src/controller/task/sweep.go b/src/controller/task/sweep.go new file mode 100644 index 000000000000..5ebc848d1d78 --- /dev/null +++ b/src/controller/task/sweep.go @@ -0,0 +1,143 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package task + +import ( + "context" + + "github.com/goharbor/harbor/src/jobservice/job" + "github.com/goharbor/harbor/src/lib/log" + "github.com/goharbor/harbor/src/lib/q" + "github.com/goharbor/harbor/src/pkg/scheduler" + "github.com/goharbor/harbor/src/pkg/task" +) + +var ( + // SweepCtl is the global sweep controller + SweepCtl = NewSweepController() +) + +type SweepParams struct { + // ExecRetainCounts records the retained execution counts for different vendor type + ExecRetainCounts map[string]int64 +} + +const ( + // SchedulerCallback ... + SchedulerCallback = "EXECUTION_SWEEP_CALLBACK" + // systemVendorID represents the id for system job. + systemVendorID = -1 + + cronTypeCustom = "Custom" + // run for every hour + cronSpec = "0 0 * * * *" +) + +func init() { + err := scheduler.RegisterCallbackFunc(SchedulerCallback, sweepCallback) + if err != nil { + log.Fatalf("failed to register execution sweep job callback, error: %v", err) + } +} + +func sweepCallback(ctx context.Context, p string) error { + params := &SweepParams{ExecRetainCounts: job.GetExecutionSweeperCount()} + return SweepCtl.Start(ctx, params, task.ExecutionTriggerSchedule) +} + +type SweepController interface { + Start(ctx context.Context, params *SweepParams, trigger string) error +} + +type sweepController struct { + execMgr task.ExecutionManager + taskMgr task.Manager +} + +func (sc *sweepController) Start(ctx context.Context, params *SweepParams, trigger string) error { + jobParams := make(map[string]interface{}) + jobParams[task.ExecRetainCounts] = params.ExecRetainCounts + + execID, err := sc.execMgr.Create(ctx, job.ExecSweepVendorType, systemVendorID, trigger, jobParams) + if err != nil { + log.Errorf("failed to create execution for %s, error: %v", job.ExecSweepVendorType, err) + return err + } + + _, err = sc.taskMgr.Create(ctx, execID, &task.Job{ + Name: job.ExecSweepVendorType, + Metadata: &job.Metadata{ + JobKind: job.KindGeneric, + }, + Parameters: jobParams, + }) + if err != nil { + log.Errorf("failed to create task for %s, error: %v", job.ExecSweepVendorType, err) + return err + } + + return nil +} + +func NewSweepController() SweepController { + return &sweepController{ + execMgr: task.ExecMgr, + taskMgr: task.Mgr, + } +} + +// ScheduleSweepJob schedules the system execution sweep job. +func ScheduleSweepJob(ctx context.Context) error { + sched, err := getScheduledSweepJob(ctx) + if err != nil { + return err + } + // unschedule the job if the cron changed + if sched != nil { + if sched.CRON != cronSpec { + log.Debugf("reschedule the system execution job because the cron changed, old: %s, new: %s", sched.CRON, cronSpec) + if err = scheduler.Sched.UnScheduleByID(ctx, sched.ID); err != nil { + return err + } + } else { + log.Debug("skip to schedule the system execution job because the old one existed and cron not changed") + return nil + } + } + + // schedule a job if no schedule found or cron changed + scheduleID, err := scheduler.Sched.Schedule(ctx, job.ExecSweepVendorType, systemVendorID, cronTypeCustom, cronSpec, SchedulerCallback, nil, nil) + if err != nil { + return err + } + + log.Debugf("scheduled the system execution sweep job, id: %d", scheduleID) + return nil +} + +// getScheduledSweepJob gets sweep job which already scheduled. +func getScheduledSweepJob(ctx context.Context) (*scheduler.Schedule, error) { + query := q.New(map[string]interface{}{"vendor_type": job.ExecSweepVendorType}) + schedules, err := scheduler.Sched.ListSchedules(ctx, query) + if err != nil { + return nil, err + } + + if len(schedules) > 0 { + return schedules[0], nil + } + + return nil, nil +} diff --git a/src/core/main.go b/src/core/main.go index 101e2ca056bc..287f2cf806f5 100755 --- a/src/core/main.go +++ b/src/core/main.go @@ -35,6 +35,7 @@ import ( "github.com/goharbor/harbor/src/controller/health" "github.com/goharbor/harbor/src/controller/registry" "github.com/goharbor/harbor/src/controller/systemartifact" + "github.com/goharbor/harbor/src/controller/task" "github.com/goharbor/harbor/src/core/api" _ "github.com/goharbor/harbor/src/core/auth/authproxy" _ "github.com/goharbor/harbor/src/core/auth/db" @@ -257,7 +258,13 @@ func main() { log.Errorf("failed to check the jobservice health status: timeout, error: %v", err) return } + + // schedule system artifact cleanup job systemartifact.ScheduleCleanupTask(ctx) + // schedule system execution sweep job + if err := task.ScheduleSweepJob(ctx); err != nil { + log.Errorf("failed to schedule system execution sweep job, error: %v", err) + } }() web.RunWithMiddleWares("", middlewares.MiddleWares()...) } diff --git a/src/jobservice/job/known_jobs.go b/src/jobservice/job/known_jobs.go index a89843d9b699..7862857dbf93 100644 --- a/src/jobservice/job/known_jobs.go +++ b/src/jobservice/job/known_jobs.go @@ -40,4 +40,29 @@ const ( SystemArtifactCleanupVendorType = "SYSTEM_ARTIFACT_CLEANUP" // ScanDataExportVendorType : the name of the scan data export job ScanDataExportVendorType = "SCAN_DATA_EXPORT" + // ExecSweepVendorType: the name of the execution sweep job + ExecSweepVendorType = "EXECUTION_SWEEP" + // ScanAllVendorType: the name of the scan all job + ScanAllVendorType = "SCAN_ALL" ) + +var ( + // executionSweeperCount stores the count for execution retained + executionSweeperCount = map[string]int64{ + ScanAllVendorType: 5, + ExecSweepVendorType: 10, + GarbageCollectionVendorType: 50, + SlackJobVendorType: 50, + WebhookJobVendorType: 50, + ReplicationVendorType: 50, + ScanDataExportVendorType: 50, + SystemArtifactCleanupVendorType: 50, + P2PPreheatVendorType: 50, + RetentionVendorType: 50, + } +) + +// GetExecutionSweeperCount gets the count of execution records retained by the sweeper +func GetExecutionSweeperCount() map[string]int64 { + return executionSweeperCount +} diff --git a/src/jobservice/runner/redis.go b/src/jobservice/runner/redis.go index 1ac7a088de1b..a878ae81afcd 100644 --- a/src/jobservice/runner/redis.go +++ b/src/jobservice/runner/redis.go @@ -161,7 +161,7 @@ func (rj *RedisJob) Run(j *work.Job) (err error) { defer func() { if r := recover(); r != nil { // Log the stack - buf := make([]byte, 1<<10) + buf := make([]byte, 1<<20) size := runtime.Stack(buf, false) err = errors.Errorf("runtime error: %s; stack: %s", r, buf[0:size]) logger.Errorf("Run job %s:%s error: %s", j.Name, j.ID, err) diff --git a/src/jobservice/runtime/bootstrap.go b/src/jobservice/runtime/bootstrap.go index aadcb8b18d8c..aad3837bb705 100644 --- a/src/jobservice/runtime/bootstrap.go +++ b/src/jobservice/runtime/bootstrap.go @@ -331,6 +331,7 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool( "IMAGE_GC": (*legacy.GarbageCollectionScheduler)(nil), "IMAGE_SCAN_ALL": (*legacy.ScanAllScheduler)(nil), job.SystemArtifactCleanupVendorType: (*systemartifact.Cleanup)(nil), + job.ExecSweepVendorType: (*task.SweepJob)(nil), }); err != nil { // exit return nil, err diff --git a/src/pkg/task/execution.go b/src/pkg/task/execution.go index 3d8582698ed0..49fee7f96fd1 100644 --- a/src/pkg/task/execution.go +++ b/src/pkg/task/execution.go @@ -21,19 +21,16 @@ import ( "time" "github.com/goharbor/harbor/src/jobservice/job" - "github.com/goharbor/harbor/src/lib" "github.com/goharbor/harbor/src/lib/errors" "github.com/goharbor/harbor/src/lib/log" - "github.com/goharbor/harbor/src/lib/orm" "github.com/goharbor/harbor/src/lib/q" "github.com/goharbor/harbor/src/pkg/task/dao" ) var ( // ExecMgr is a global execution manager instance - ExecMgr = NewExecutionManager() - executionSweeperCount = map[string]uint8{} - ErrTimeOut = errors.New("stopping the execution timeout") + ExecMgr = NewExecutionManager() + ErrTimeOut = errors.New("stopping the execution timeout") ) // ExecutionManager manages executions. @@ -83,8 +80,6 @@ func NewExecutionManager() ExecutionManager { executionDAO: dao.NewExecutionDAO(), taskMgr: Mgr, taskDAO: dao.NewTaskDAO(), - ormCreator: orm.Crt, - wp: lib.NewWorkerPool(10), } } @@ -92,8 +87,6 @@ type executionManager struct { executionDAO dao.ExecutionDAO taskMgr Manager taskDAO dao.TaskDAO - ormCreator orm.Creator - wp *lib.WorkerPool } func (e *executionManager) Count(ctx context.Context, query *q.Query) (int64, error) { @@ -126,91 +119,9 @@ func (e *executionManager) Create(ctx context.Context, vendorType string, vendor return 0, err } - // sweep the execution records to avoid the execution/task records explosion - go func() { - e.wp.GetWorker() - defer e.wp.ReleaseWorker() - // as we start a new transaction here to do the sweep work, the current execution record - // may be not visible(when the transaction in which the current execution is created - // in isn't committed), this will cause that there are one more execution records than expected - ctx := orm.NewContext(context.Background(), e.ormCreator.Create()) - if err := e.sweep(ctx, vendorType, vendorID); err != nil { - log.Errorf("failed to sweep the executions of %s: %v", vendorType, err) - return - } - }() - return id, nil } -func (e *executionManager) sweep(ctx context.Context, vendorType string, vendorID int64) error { - size := int64(executionSweeperCount[vendorType]) - if size == 0 { - log.Debugf("the execution sweeper size doesn't set for %s, skip sweep", vendorType) - return nil - } - - // get the #size execution record - query := &q.Query{ - Keywords: map[string]interface{}{ - "VendorType": vendorType, - "VendorID": vendorID, - }, - Sorts: []*q.Sort{ - { - Key: "StartTime", - DESC: true, - }}, - PageSize: 1, - PageNumber: size, - } - executions, err := e.executionDAO.List(ctx, query) - if err != nil { - return err - } - // list is null means that the execution count < size, return directly - if len(executions) == 0 { - return nil - } - - query.Keywords["StartTime"] = &q.Range{ - Max: executions[0].StartTime, - } - totalOfCandidate, err := e.executionDAO.Count(ctx, query) - if err != nil { - return err - } - // n is the page count of all candidates - n := totalOfCandidate / 1000 - if totalOfCandidate%1000 > 0 { - n = n + 1 - } - query.PageSize = 1000 - for i := n; i >= 1; i-- { - query.PageNumber = i - executions, err := e.List(ctx, query) - if err != nil { - return err - } - for _, execution := range executions { - // if the status of the execution isn't final, skip - if !job.Status(execution.Status).Final() { - continue - } - - log.Debugf("delete execution %d by sweeper", execution.ID) - if err = e.Delete(ctx, execution.ID); err != nil { - // the execution may be deleted by the other sweep operation, ignore the not found error - if errors.IsNotFoundErr(err) { - continue - } - log.Errorf("failed to delete the execution %d: %v", execution.ID, err) - } - } - } - return nil -} - func (e *executionManager) UpdateExtraAttrs(ctx context.Context, id int64, extraAttrs map[string]interface{}) error { data, err := json.Marshal(extraAttrs) if err != nil { @@ -450,10 +361,3 @@ func (e *executionManager) populateExecution(ctx context.Context, execution *dao return exec } - -// SetExecutionSweeperCount sets the count of execution records retained by the sweeper -// If no count is set for the specified vendor, the default value will be used -// The sweeper retains the latest created #count execution records for the specified vendor -func SetExecutionSweeperCount(vendorType string, count uint8) { - executionSweeperCount[vendorType] = count -} diff --git a/src/pkg/task/execution_test.go b/src/pkg/task/execution_test.go index bd673769d5af..8ceaee9010ca 100644 --- a/src/pkg/task/execution_test.go +++ b/src/pkg/task/execution_test.go @@ -22,7 +22,6 @@ import ( "github.com/stretchr/testify/suite" "github.com/goharbor/harbor/src/jobservice/job" - "github.com/goharbor/harbor/src/lib" "github.com/goharbor/harbor/src/lib/errors" "github.com/goharbor/harbor/src/lib/q" "github.com/goharbor/harbor/src/pkg/task/dao" @@ -47,8 +46,6 @@ func (e *executionManagerTestSuite) SetupTest() { executionDAO: e.execDAO, taskMgr: e.taskMgr, taskDAO: e.taskDAO, - ormCreator: e.ormCreator, - wp: lib.NewWorkerPool(10), } } @@ -61,11 +58,7 @@ func (e *executionManagerTestSuite) TestCount() { } func (e *executionManagerTestSuite) TestCreate() { - SetExecutionSweeperCount("vendor", 50) - e.execDAO.On("Create", mock.Anything, mock.Anything).Return(int64(1), nil) - e.ormCreator.On("Create").Return(&orm.FakeOrmer{}) - e.execDAO.On("List", mock.Anything, mock.Anything).Return(nil, nil) id, err := e.execMgr.Create(nil, "vendor", 0, ExecutionTriggerManual, map[string]interface{}{"k": "v"}) e.Require().Nil(err) @@ -73,7 +66,6 @@ func (e *executionManagerTestSuite) TestCreate() { // sleep to make sure the function in the goroutine run time.Sleep(1 * time.Second) e.execDAO.AssertExpectations(e.T()) - e.ormCreator.AssertExpectations(e.T()) } func (e *executionManagerTestSuite) TestUpdateExtraAttrs() { diff --git a/src/pkg/task/sweep_job.go b/src/pkg/task/sweep_job.go new file mode 100644 index 000000000000..e0db1f9e06cf --- /dev/null +++ b/src/pkg/task/sweep_job.go @@ -0,0 +1,187 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package task + +import ( + "encoding/json" + "os" + "strconv" + "time" + + "github.com/goharbor/harbor/src/jobservice/job" + "github.com/goharbor/harbor/src/jobservice/logger" + "github.com/goharbor/harbor/src/lib/errors" +) + +func init() { + // the default batch size is 65535 + sweepBatchSize = 65535 + envBatchSize := os.Getenv("EXECUTION_SWEEP_BATCH_SIZE") + if len(envBatchSize) > 0 { + batchSize, err := strconv.Atoi(envBatchSize) + if err != nil { + logger.Errorf("failed to parse the batch size from env, value: %s, error: %v", envBatchSize, err) + } else { + if batchSize <= 0 || batchSize > 65535 { + logger.Errorf("invalid batch size %d for sweep job, should be positive and not over than 65535", batchSize) + } else { + // override the batch size if provided is valid + sweepBatchSize = batchSize + } + } + } +} + +var ( + // notice that the batch size should not over than 65535 as length limitation of postgres parameters + sweepBatchSize int + errStop = errors.New("stopped") +) + +const ( + // ExecRetainCounts is the params key of execution retain count + ExecRetainCounts = "execution_retain_counts" +) + +// SweepJob used to cleanup the executions and tasks for different vendors. +type SweepJob struct { + execRetainCounts map[string]int64 + logger logger.Interface + mgr SweepManager +} + +// MaxFails of sweep job. Don't need to retry. +func (sj *SweepJob) MaxFails() uint { + return 1 +} + +// MaxCurrency limit 1 concurrency of sweep job. +func (sj *SweepJob) MaxCurrency() uint { + return 1 +} + +// ShouldRetry indicates no need to retry sweep job. +func (sj *SweepJob) ShouldRetry() bool { + return false +} + +// Validate the parameters of preheat job. +func (sj *SweepJob) Validate(params job.Parameters) error { + return nil +} + +// Run the sweep process. +func (sj *SweepJob) Run(ctx job.Context, params job.Parameters) error { + if err := sj.init(ctx, params); err != nil { + return err + } + + sj.logger.Info("start to run sweep job") + + var errs errors.Errors + for vendor, cnt := range sj.execRetainCounts { + if sj.shouldStop(ctx) { + sj.logger.Info("received the stop signal, quit sweep job") + return nil + } + + if err := sj.sweep(ctx, vendor, cnt); err != nil { + if err == errStop { + sj.logger.Info("received the stop signal, quit sweep job") + return nil + } + + sj.logger.Errorf("[%s] failed to run sweep, error: %v", vendor, err) + errs = append(errs, err) + } + } + + sj.logger.Info("end to run sweep job") + + if len(errs) > 0 { + return errs + } + + return nil +} + +func (sj *SweepJob) init(ctx job.Context, params job.Parameters) error { + if sj.mgr == nil { + // use global manager if no sweep manager found + sj.mgr = SweepMgr + } + sj.logger = ctx.GetLogger() + sj.parseParams(params) + return nil +} + +func (sj *SweepJob) parseParams(params job.Parameters) { + sj.execRetainCounts = make(map[string]int64) + execRetainCounts, err := json.Marshal(params[ExecRetainCounts]) + if err != nil { + sj.logger.Errorf("failed to marshal params %+v, error: %v", params[ExecRetainCounts], err) + return + } + + if err = json.Unmarshal(execRetainCounts, &sj.execRetainCounts); err != nil { + sj.logger.Errorf("failed to unmarshal params %s, error: %v", string(execRetainCounts), err) + return + } +} + +// sweep cleanup the executions/tasks by vendor type and retain count. +func (sj *SweepJob) sweep(ctx job.Context, vendorType string, retainCount int64) error { + sj.logger.Infof("[%s] start to sweep, retain latest %d executions", vendorType, retainCount) + + start := time.Now() + candidates, err := sj.mgr.ListCandidates(ctx.SystemContext(), vendorType, retainCount) + if err != nil { + sj.logger.Errorf("[%s] failed to list candidates, error: %v", vendorType, err) + return err + } + + total := len(candidates) + sj.logger.Infof("[%s] listed %d candidate executions for sweep", vendorType, total) + // batch clean the executions and tasks + for i := 0; i < total; i += sweepBatchSize { + // checkpoint + if sj.shouldStop(ctx) { + return errStop + } + // calculate the batch position + j := i + sweepBatchSize + // avoid overflow + if j > total { + j = total + } + + if err = sj.mgr.Clean(ctx.SystemContext(), candidates[i:j]); err != nil { + sj.logger.Errorf("[%s] failed to batch clean candidates, error: %v", vendorType, err) + return err + } + } + + sj.logger.Infof("[%s] end to sweep, %d executions were deleted in total, elapsed time: %v", vendorType, total, time.Since(start)) + + return nil +} + +func (sj *SweepJob) shouldStop(ctx job.Context) bool { + opCmd, exit := ctx.OPCommand() + if exit && opCmd.IsStop() { + return true + } + return false +} diff --git a/src/pkg/task/sweep_job_test.go b/src/pkg/task/sweep_job_test.go new file mode 100644 index 000000000000..bed4d431e533 --- /dev/null +++ b/src/pkg/task/sweep_job_test.go @@ -0,0 +1,111 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package task + +import ( + "context" + "testing" + + "github.com/goharbor/harbor/src/jobservice/job" + mockjobservice "github.com/goharbor/harbor/src/testing/jobservice" + mock "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" +) + +type sweepJobTestSuite struct { + suite.Suite + jobCtx *mockjobservice.MockJobContext + sweepMgr *mockSweepManager +} + +func (suite *sweepJobTestSuite) SetupSuite() { + suite.jobCtx = &mockjobservice.MockJobContext{} + suite.sweepMgr = &mockSweepManager{} +} + +func TestSweepJob(t *testing.T) { + suite.Run(t, &sweepJobTestSuite{}) +} + +func (suite *sweepJobTestSuite) TestRun() { + params := map[string]interface{}{ + "execution_retain_counts": map[string]int{ + "WEBHOOK": 10, + "REPLICATION": 20, + }, + } + // test stop case + j := &SweepJob{} + suite.jobCtx.On("OPCommand").Return(job.StopCommand, true).Once() + err := j.Run(suite.jobCtx, params) + suite.NoError(err, "stop job should not return error") + + // test sweep error case + j = &SweepJob{} + suite.jobCtx.On("OPCommand").Return(job.NilCommand, true) + err = j.Run(suite.jobCtx, params) + suite.Error(err, "should got error if sweep failed") + + // test normal case + j = &SweepJob{mgr: suite.sweepMgr} + ctx := context.TODO() + suite.jobCtx.On("OPCommand").Return(job.NilCommand, true) + suite.jobCtx.On("SystemContext").Return(ctx, nil) + suite.sweepMgr.On("ListCandidates", ctx, "WEBHOOK", int64(10)).Return([]int64{1}, nil) + suite.sweepMgr.On("ListCandidates", ctx, "REPLICATION", int64(20)).Return([]int64{2}, nil) + suite.sweepMgr.On("Clean", ctx, []int64{1}).Return(nil) + suite.sweepMgr.On("Clean", ctx, []int64{2}).Return(nil) + err = j.Run(suite.jobCtx, params) + suite.NoError(err) +} + +type mockSweepManager struct { + mock.Mock +} + +func (_m *mockSweepManager) Clean(ctx context.Context, execID []int64) error { + ret := _m.Called(ctx, execID) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, []int64) error); ok { + r0 = rf(ctx, execID) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +func (_m *mockSweepManager) ListCandidates(ctx context.Context, vendorType string, retainCnt int64) ([]int64, error) { + ret := _m.Called(ctx, vendorType, retainCnt) + + var r0 []int64 + if rf, ok := ret.Get(0).(func(context.Context, string, int64) []int64); ok { + r0 = rf(ctx, vendorType, retainCnt) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]int64) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string, int64) error); ok { + r1 = rf(ctx, vendorType, retainCnt) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/src/pkg/task/sweep_manager.go b/src/pkg/task/sweep_manager.go new file mode 100644 index 000000000000..f697b6b2d937 --- /dev/null +++ b/src/pkg/task/sweep_manager.go @@ -0,0 +1,189 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package task + +import ( + "context" + "fmt" + "time" + + "github.com/goharbor/harbor/src/jobservice/job" + "github.com/goharbor/harbor/src/lib/errors" + "github.com/goharbor/harbor/src/lib/orm" + "github.com/goharbor/harbor/src/lib/q" + "github.com/goharbor/harbor/src/pkg/task/dao" +) + +var ( + // SweepMgr is a global sweep manager instance. + SweepMgr = NewSweepManager() + + timeFormat = "2006-01-02 15:04:05.999999999" + defaultPageSize = 100000 + finalStatusCode = 3 +) + +type SweepManager interface { + // ListCandidates lists the candidate execution ids which met the sweep criteria. + ListCandidates(ctx context.Context, vendorType string, retainCnt int64) (execIDs []int64, err error) + // Clean deletes the tasks belonging to the execution which in final status and deletes executions. + Clean(ctx context.Context, execID []int64) (err error) +} + +// sweepManager implements the interface SweepManager. +type sweepManager struct { + execDAO dao.ExecutionDAO +} + +// listVendorIDs lists distinct vendor ids by vendor type. +func (sm *sweepManager) listVendorIDs(ctx context.Context, vendorType string) ([]int64, error) { + ormer, err := orm.FromContext(ctx) + if err != nil { + return nil, err + } + + var ids []int64 + if _, err = ormer.Raw(`SELECT DISTINCT vendor_id FROM execution WHERE vendor_type = ?`, vendorType).QueryRows(&ids); err != nil { + return nil, err + } + + return ids, nil +} + +// getCandidateMaxStartTime returns the max start time for candidate executions, obtain the start time of the xth recent one. +func (sm *sweepManager) getCandidateMaxStartTime(ctx context.Context, vendorType string, vendorID, retainCnt int64) (*time.Time, error) { + query := &q.Query{ + Keywords: map[string]interface{}{ + "VendorType": vendorType, + "VendorID": vendorID, + }, + Sorts: []*q.Sort{ + { + Key: "StartTime", + DESC: true, + }}, + PageSize: 1, + PageNumber: retainCnt, + } + executions, err := sm.execDAO.List(ctx, query) + if err != nil { + return nil, err + } + // list is null means that the execution count < retainCnt, return nil time + if len(executions) == 0 { + return nil, nil + } + + return &executions[0].StartTime, nil +} + +func (sm *sweepManager) ListCandidates(ctx context.Context, vendorType string, retainCnt int64) ([]int64, error) { + ormer, err := orm.FromContext(ctx) + if err != nil { + return nil, err + } + + vendorIDs, err := sm.listVendorIDs(ctx, vendorType) + if err != nil { + return nil, errors.Wrapf(err, "failed to list vendor ids for vendor type %s", vendorType) + } + + // execIDs stores the result + var execIDs []int64 + for _, vendorID := range vendorIDs { + maxStartTime, err := sm.getCandidateMaxStartTime(ctx, vendorType, vendorID, retainCnt) + if err != nil { + return nil, errors.Wrapf(err, "failed to get candidate max start time, vendor type: %s, vendor id: %d", vendorType, vendorID) + } + // continue if no max start time got that means no candidate executions + if maxStartTime == nil { + continue + } + // candidate criteria + // 1. exact vendor type & vendor id + // 2. start_time is before the max start time + // 3. status is the final state + // count the records for pagination + sql := `SELECT COUNT(1) FROM execution WHERE vendor_type = ? AND vendor_id = ? AND start_time < ? AND status IN (?,?,?)` + totalOfCandidate := 0 + params := []interface{}{ + vendorType, + vendorID, + maxStartTime.Format(timeFormat), + // final status should in Error/Success/Stopped + job.ErrorStatus.String(), + job.SuccessStatus.String(), + job.StoppedStatus.String(), + } + if err = ormer.Raw(sql, params...).QueryRow(&totalOfCandidate); err != nil { + return nil, errors.Wrapf(err, "failed to count candidates, vendor type: %s, vendor id: %d", vendorType, vendorID) + } + // n is the page count of all candidates + n := totalOfCandidate / defaultPageSize + if totalOfCandidate%defaultPageSize > 0 { + n = n + 1 + } + + sql = `SELECT id FROM execution WHERE vendor_type = ? AND vendor_id = ? AND start_time < ? AND status IN (?,?,?)` + // default page size is 100000 + q2 := &q.Query{PageSize: int64(defaultPageSize)} + for i := n; i >= 1; i-- { + q2.PageNumber = int64(i) + // should copy params as pagination will append the slice + paginationParams := make([]interface{}, len(params)) + copy(paginationParams, params) + paginationSQL, paginationParams := orm.PaginationOnRawSQL(q2, sql, paginationParams) + ids := make([]int64, 0, defaultPageSize) + if _, err = ormer.Raw(paginationSQL, paginationParams...).QueryRows(&ids); err != nil { + return nil, errors.Wrapf(err, "failed to list candidate execution ids, vendor type: %s, vendor id: %d", vendorType, vendorID) + } + execIDs = append(execIDs, ids...) + } + } + + return execIDs, nil +} + +func (sm *sweepManager) Clean(ctx context.Context, execIDs []int64) error { + ormer, err := orm.FromContext(ctx) + if err != nil { + return err + } + // construct sql params + params := make([]interface{}, 0, len(execIDs)) + for _, eid := range execIDs { + params = append(params, eid) + } + // delete tasks + sql := fmt.Sprintf("DELETE FROM task WHERE status_code = %d AND execution_id IN (%s)", finalStatusCode, orm.ParamPlaceholderForIn(len(params))) + _, err = ormer.Raw(sql, params...).Exec() + if err != nil { + return errors.Wrap(err, "failed to delete tasks") + } + // delete executions + sql = fmt.Sprintf("DELETE FROM execution WHERE id IN (%s)", orm.ParamPlaceholderForIn(len(params))) + _, err = ormer.Raw(sql, params...).Exec() + if err != nil { + return errors.Wrap(err, "failed to delete executions") + } + + return nil +} + +func NewSweepManager() SweepManager { + return &sweepManager{ + execDAO: dao.NewExecutionDAO(), + } +} diff --git a/src/pkg/task/sweep_manager_test.go b/src/pkg/task/sweep_manager_test.go new file mode 100644 index 000000000000..172404ecdde6 --- /dev/null +++ b/src/pkg/task/sweep_manager_test.go @@ -0,0 +1,56 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package task + +import ( + "context" + "testing" + "time" + + "github.com/goharbor/harbor/src/lib/errors" + "github.com/goharbor/harbor/src/pkg/task/dao" + "github.com/goharbor/harbor/src/testing/mock" + "github.com/stretchr/testify/suite" +) + +type sweepManagerTestSuite struct { + suite.Suite + execDao *mockExecutionDAO + mgr *sweepManager +} + +func TestSweepManager(t *testing.T) { + suite.Run(t, &sweepManagerTestSuite{}) +} + +func (suite *sweepManagerTestSuite) SetupSuite() { + suite.execDao = &mockExecutionDAO{} + suite.mgr = &sweepManager{execDAO: suite.execDao} +} + +func (suite *sweepManagerTestSuite) TestGetCandidateMaxStartTime() { + // test error case + suite.execDao.On("List", mock.Anything, mock.Anything).Return(nil, errors.New("failed to list executions")).Once() + startTime, err := suite.mgr.getCandidateMaxStartTime(context.TODO(), "WEBHOOK", 1, 10) + suite.Error(err, "should got error") + suite.Nil(startTime) + // test normal case + now := time.Now() + execs := []*dao.Execution{{ID: 1, StartTime: now}} + suite.execDao.On("List", mock.Anything, mock.Anything).Return(execs, nil) + startTime, err = suite.mgr.getCandidateMaxStartTime(context.TODO(), "WEBHOOK", 1, 10) + suite.NoError(err, "should not got error") + suite.Equal(now.String(), startTime.String()) +} diff --git a/tests/apitests/python/test_job_service_dashboard.py b/tests/apitests/python/test_job_service_dashboard.py index ec18f4002ea7..6fdc8460e237 100644 --- a/tests/apitests/python/test_job_service_dashboard.py +++ b/tests/apitests/python/test_job_service_dashboard.py @@ -33,7 +33,7 @@ def setUp(self): self.registry = Registry() self.scan_all = ScanAll() self.schedule = Schedule() - self.job_types = [ "GARBAGE_COLLECTION", "PURGE_AUDIT_LOG", "P2P_PREHEAT", "IMAGE_SCAN", "REPLICATION", "RETENTION", "SCAN_DATA_EXPORT", "SCHEDULER", "SLACK", "SYSTEM_ARTIFACT_CLEANUP", "WEBHOOK"] + self.job_types = [ "GARBAGE_COLLECTION", "PURGE_AUDIT_LOG", "P2P_PREHEAT", "IMAGE_SCAN", "REPLICATION", "RETENTION", "SCAN_DATA_EXPORT", "SCHEDULER", "SLACK", "SYSTEM_ARTIFACT_CLEANUP", "WEBHOOK", "EXECUTION_SWEEP"] self.cron_type = "Custom" self.cron = "0 0 0 * * 0"