diff --git a/api/v2.0/swagger.yaml b/api/v2.0/swagger.yaml index a655653d250..c92f9f85409 100644 --- a/api/v2.0/swagger.yaml +++ b/api/v2.0/swagger.yaml @@ -4517,6 +4517,82 @@ paths: $ref: '#/responses/403' '500': $ref: '#/responses/500' + /jobservice/pools: + get: + operationId: getWorkerPools + summary: Get worker pools + description: Get worker pools + tags: + - jobservice + parameters: + - $ref: '#/parameters/requestId' + responses: + '200': + description: Get worker pools successfully. + schema: + type: array + items: + $ref: '#/definitions/WorkerPool' + '401': + $ref: '#/responses/401' + '403': + $ref: '#/responses/403' + '500': + $ref: '#/responses/500' + /jobservice/pools/{pool_id}/workers: + get: + operationId: getWorkers + summary: Get workers + description: Get workers in current pool + tags: + - jobservice + parameters: + - $ref: '#/parameters/requestId' + - name: pool_id + in: path + required: true + type: string + description: The name of the pool. 'all' stands for all pools + responses: + '200': + description: Get workers successfully. + schema: + type: array + items: + $ref: '#/definitions/Worker' + '401': + $ref: '#/responses/401' + '403': + $ref: '#/responses/403' + '404': + $ref: '#/responses/404' + '500': + $ref: '#/responses/500' + /jobservice/jobs/{job_id}: + put: + operationId: stopRunningJob + summary: Stop running job + description: Stop running job + tags: + - jobservice + parameters: + - $ref: '#/parameters/requestId' + - name: job_id + in: path + required: true + type: string + description: The id of the job. + responses: + '200': + description: Stop worker successfully. + '401': + $ref: '#/responses/401' + '403': + $ref: '#/responses/403' + '404': + $ref: '#/responses/404' + '500': + $ref: '#/responses/500' /ping: get: operationId: getPing @@ -9253,4 +9329,58 @@ definitions: type: array items: $ref: '#/definitions/ScanDataExportExecution' - description: The list of scan data export executions \ No newline at end of file + description: The list of scan data export executions + WorkerPool: + type: object + description: the worker pool of job service + properties: + pid: + type: integer + description: the process id of jobservice + worker_pool_id: + type: string + description: the id of the worker pool + start_at: + type: string + format: date-time + description: The start time of the work pool + heartbeat_at: + type: string + format: date-time + description: The heartbeat time of the work pool + concurrency: + type: integer + description: The concurrency of the work pool + host: + type: string + description: The host of the work pool + Worker: + type: object + description: worker in the pool + properties: + id: + type: string + description: the id of the worker + pool_id: + type: string + description: the id of the worker pool + job_name: + type: string + description: the name of the running job in the worker + job_id: + type: string + description: the id of the running job in the worker + start_at: + type: string + format: date-time + description: The start time of the worker + args: + type: string + description: The args of the worker + check_in: + type: string + description: the checkin of the running job in the worker + checkin_at: + type: string + format: date-time + description: The checkin time of the running job in the worker \ No newline at end of file diff --git a/src/common/job/client.go b/src/common/job/client.go index 6b8719f7153..d7f30d3470f 100644 --- a/src/common/job/client.go +++ b/src/common/job/client.go @@ -34,6 +34,8 @@ type Client interface { PostAction(uuid, action string) error GetExecutions(uuid string) ([]job.Stats, error) // TODO Redirect joblog when we see there's memory issue. + // GetJobServiceConfig retrieves the job config + GetJobServiceConfig() (*job.Config, error) } // StatusBehindError represents the error got when trying to stop a success/failed job @@ -212,6 +214,35 @@ func (d *DefaultClient) PostAction(uuid, action string) error { return nil } +// GetJobServiceConfig retrieves the job service configuration +func (d *DefaultClient) GetJobServiceConfig() (*job.Config, error) { + url := d.endpoint + "/api/v1/config" + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return nil, err + } + resp, err := d.client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + data, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + if resp.StatusCode != http.StatusOK { + return nil, &commonhttp.Error{ + Code: resp.StatusCode, + Message: string(data), + } + } + var config job.Config + err = json.Unmarshal(data, &config) + if err != nil { + return nil, err + } + return &config, nil +} func isStatusBehindError(err error) (string, bool) { if err == nil { return "", false diff --git a/src/common/rbac/const.go b/src/common/rbac/const.go index e73fc8fdce1..f52a40c9666 100755 --- a/src/common/rbac/const.go +++ b/src/common/rbac/const.go @@ -77,4 +77,5 @@ const ( ResourceSystemVolumes = Resource("system-volumes") ResourcePurgeAuditLog = Resource("purge-audit") ResourceExportCVE = Resource("export-cve") + ResourceJobServiceMonitor = Resource("jobservice-monitor") ) diff --git a/src/common/rbac/system/policies.go b/src/common/rbac/system/policies.go index 611ccf5e216..0f8e89f029b 100644 --- a/src/common/rbac/system/policies.go +++ b/src/common/rbac/system/policies.go @@ -66,5 +66,9 @@ var ( {Resource: rbac.ResourceLdapUser, Action: rbac.ActionList}, {Resource: rbac.ResourceConfiguration, Action: rbac.ActionRead}, {Resource: rbac.ResourceConfiguration, Action: rbac.ActionUpdate}, + + {Resource: rbac.ResourceJobServiceMonitor, Action: rbac.ActionRead}, + {Resource: rbac.ResourceJobServiceMonitor, Action: rbac.ActionList}, + {Resource: rbac.ResourceJobServiceMonitor, Action: rbac.ActionStop}, } ) diff --git a/src/controller/jobmonitor/monitor.go b/src/controller/jobmonitor/monitor.go new file mode 100644 index 00000000000..c3ff7d580fb --- /dev/null +++ b/src/controller/jobmonitor/monitor.go @@ -0,0 +1,155 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jobmonitor + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/goharbor/harbor/src/lib/orm" + + "github.com/goharbor/harbor/src/lib/log" + + "github.com/gocraft/work" + + "github.com/goharbor/harbor/src/common/job" + "github.com/goharbor/harbor/src/lib/errors" + "github.com/goharbor/harbor/src/lib/q" + libRedis "github.com/goharbor/harbor/src/lib/redis" + jm "github.com/goharbor/harbor/src/pkg/jobmonitor" + "github.com/goharbor/harbor/src/pkg/scheduler" + "github.com/goharbor/harbor/src/pkg/task" +) + +// All the jobs in the pool, or all pools +const All = "all" + +// Ctl the controller instance of the worker pool controller +var Ctl = NewMonitorController() + +// MonitorController defines the worker pool operations +type MonitorController interface { + // ListPools lists the worker pools + ListPools(ctx context.Context) ([]*jm.WorkerPool, error) + // ListWorkers lists the workers in the pool + ListWorkers(ctx context.Context, poolID string) ([]*jm.Worker, error) + // StopRunningJob stop the running job + StopRunningJob(ctx context.Context, jobID string) error +} + +type monitorController struct { + poolManager jm.PoolManager + workerManager jm.WorkerManager + taskManager task.Manager + sch scheduler.Scheduler + monitorClient func() (jm.JobServiceMonitorClient, error) +} + +// NewMonitorController ... +func NewMonitorController() MonitorController { + return &monitorController{ + poolManager: jm.NewPoolManager(), + workerManager: jm.NewWorkerManager(), + taskManager: task.NewManager(), + monitorClient: jobServiceMonitorClient, + } +} + +func (w *monitorController) StopRunningJob(ctx context.Context, jobID string) error { + if strings.EqualFold(jobID, All) { + allRunningJobs, err := w.allRunningJobs(ctx) + if err != nil { + log.Errorf("failed to get all running jobs: %v", err) + return err + } + for _, jobID := range allRunningJobs { + if err := w.stopJob(ctx, jobID); err != nil { + log.Errorf("failed to stop running job %s: %v", jobID, err) + return err + } + } + return nil + } + return w.stopJob(ctx, jobID) +} + +func (w *monitorController) stopJob(ctx context.Context, jobID string) error { + tasks, err := w.taskManager.List(ctx, &q.Query{Keywords: q.KeyWords{"job_id": jobID}}) + if err != nil { + return err + } + if len(tasks) == 0 { + return errors.BadRequestError(nil).WithMessage("job %s not found", jobID) + } + if len(tasks) != 1 { + return fmt.Errorf("there are more than one task with the same job ID") + } + // use local transaction to avoid rollback batch success tasks to previous state when one fail + if ctx == nil { + log.Debug("context is nil, skip stop operation") + return nil + } + return orm.WithTransaction(func(ctx context.Context) error { + return w.taskManager.Stop(ctx, tasks[0].ID) + })(orm.SetTransactionOpNameToContext(ctx, "tx-stop-job")) +} + +func (w *monitorController) allRunningJobs(ctx context.Context) ([]string, error) { + jobIDs := make([]string, 0) + wks, err := w.ListWorkers(ctx, All) + if err != nil { + log.Errorf("failed to list workers: %v", err) + return nil, err + } + for _, wk := range wks { + jobIDs = append(jobIDs, wk.JobID) + } + return jobIDs, nil +} + +func jobServiceMonitorClient() (jm.JobServiceMonitorClient, error) { + cfg, err := job.GlobalClient.GetJobServiceConfig() + if err != nil { + return nil, err + } + config := cfg.RedisPoolConfig + pool, err := libRedis.GetRedisPool("JobService", config.RedisURL, &libRedis.PoolParam{ + PoolMaxIdle: 0, + PoolIdleTimeout: time.Duration(config.IdleTimeoutSecond) * time.Second, + }) + if err != nil { + log.Errorf("failed to get redis pool: %v", err) + return nil, err + } + return work.NewClient(fmt.Sprintf("{%s}", config.Namespace), pool), nil +} + +func (w *monitorController) ListWorkers(ctx context.Context, poolID string) ([]*jm.Worker, error) { + mClient, err := w.monitorClient() + if err != nil { + return nil, err + } + return w.workerManager.List(ctx, mClient, poolID) +} + +func (w *monitorController) ListPools(ctx context.Context) ([]*jm.WorkerPool, error) { + mClient, err := w.monitorClient() + if err != nil { + return nil, err + } + return w.poolManager.List(ctx, mClient) +} diff --git a/src/controller/jobmonitor/monitor_test.go b/src/controller/jobmonitor/monitor_test.go new file mode 100644 index 00000000000..5ffb44e04d2 --- /dev/null +++ b/src/controller/jobmonitor/monitor_test.go @@ -0,0 +1,96 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jobmonitor + +import ( + "testing" + "time" + + "github.com/gocraft/work" + "github.com/stretchr/testify/suite" + + "github.com/goharbor/harbor/src/pkg/jobmonitor" + "github.com/goharbor/harbor/src/pkg/task" + "github.com/goharbor/harbor/src/testing/mock" + monitorMock "github.com/goharbor/harbor/src/testing/pkg/jobmonitor" + taskMock "github.com/goharbor/harbor/src/testing/pkg/task" +) + +type JobServiceMonitorTestSuite struct { + suite.Suite + jmClient jobmonitor.JobServiceMonitorClient + poolManager jobmonitor.PoolManager + workerManager jobmonitor.WorkerManager + monitController MonitorController + taskManager task.Manager +} + +func (s *JobServiceMonitorTestSuite) SetupSuite() { + s.jmClient = &monitorMock.JobServiceMonitorClient{} + s.poolManager = &monitorMock.PoolManager{} + s.workerManager = jobmonitor.NewWorkerManager() + s.taskManager = &taskMock.Manager{} + s.monitController = &monitorController{ + poolManager: s.poolManager, + workerManager: s.workerManager, + taskManager: s.taskManager, + monitorClient: func() (jobmonitor.JobServiceMonitorClient, error) { + return s.jmClient, nil + }, + } +} + +func (s *JobServiceMonitorTestSuite) TearDownSuite() { +} + +func (s *JobServiceMonitorTestSuite) TestListPool() { + mock.OnAnything(s.poolManager, "List").Return([]*jobmonitor.WorkerPool{ + { + ID: "1", PID: 1, StartAt: time.Now().Unix(), Concurrency: 10, + }, + }, nil) + pools, err := s.poolManager.List(nil, s.jmClient) + s.Assert().Nil(err) + s.Assert().Equal(1, len(pools)) +} + +func (s *JobServiceMonitorTestSuite) TestListWorker() { + mock.OnAnything(s.jmClient, "WorkerObservations").Return([]*work.WorkerObservation{ + {WorkerID: "abc", IsBusy: true, JobName: "test", JobID: "1", ArgsJSON: "{\"sample\":\"sample args\"}"}, + }, nil) + mock.OnAnything(s.jmClient, "WorkerPoolHeartbeats").Return([]*work.WorkerPoolHeartbeat{ + {WorkerPoolID: "1", Pid: 1, StartedAt: time.Now().Unix(), Concurrency: 10, WorkerIDs: []string{"abc"}}, + }, nil) + workers, err := s.monitController.ListWorkers(nil, "1") + s.Assert().Nil(err) + s.Assert().Equal(1, len(workers)) +} + +func (s *JobServiceMonitorTestSuite) TestStopRunningJob() { + mock.OnAnything(s.jmClient, "WorkerObservations").Return([]*work.WorkerObservation{ + {WorkerID: "abc", IsBusy: true, JobName: "test", JobID: "1", ArgsJSON: "{\"sample\":\"sample args\"}"}, + }, nil) + mock.OnAnything(s.jmClient, "WorkerPoolHeartbeats").Return([]*work.WorkerPoolHeartbeat{ + {WorkerPoolID: "1", Pid: 1, StartedAt: time.Now().Unix(), Concurrency: 10, WorkerIDs: []string{"abc"}}, + }, nil) + mock.OnAnything(s.taskManager, "List").Return([]*task.Task{{ID: 1, VendorType: "GARBAGE_COLLECTION"}}, nil) + mock.OnAnything(s.taskManager, "Stop").Return(nil) + err := s.monitController.StopRunningJob(nil, "1") + s.Assert().Nil(err) +} + +func TestJobServiceMonitorTestSuite(t *testing.T) { + suite.Run(t, &JobServiceMonitorTestSuite{}) +} diff --git a/src/jobservice/api/handler.go b/src/jobservice/api/handler.go index d9007be3166..3222b3083de 100644 --- a/src/jobservice/api/handler.go +++ b/src/jobservice/api/handler.go @@ -27,6 +27,7 @@ import ( "github.com/goharbor/harbor/src/jobservice/common/query" "github.com/goharbor/harbor/src/jobservice/common/utils" + "github.com/goharbor/harbor/src/jobservice/config" "github.com/goharbor/harbor/src/jobservice/core" "github.com/goharbor/harbor/src/jobservice/errs" "github.com/goharbor/harbor/src/jobservice/job" @@ -56,11 +57,14 @@ type Handler interface { // HandleJobLogReq is used to handle the request of getting job logs HandleJobLogReq(w http.ResponseWriter, req *http.Request) - // HandleJobLogReq is used to handle the request of getting periodic executions + // HandlePeriodicExecutions is used to handle the request of getting periodic executions HandlePeriodicExecutions(w http.ResponseWriter, req *http.Request) // HandleGetJobsReq is used to handle the request of getting jobs HandleGetJobsReq(w http.ResponseWriter, req *http.Request) + + // HandleGetConfigReq is used to handle the request of getting configure + HandleGetConfigReq(w http.ResponseWriter, req *http.Request) } // DefaultHandler is the default request handler which implements the Handler interface. @@ -294,6 +298,18 @@ func (dh *DefaultHandler) log(req *http.Request, code int, text string) { logger.Debugf("Serve http request '%s %s': %d %s", req.Method, req.URL.String(), code, text) } +// HandleGetConfigReq return the config of the job service +func (dh *DefaultHandler) HandleGetConfigReq(w http.ResponseWriter, req *http.Request) { + if config.DefaultConfig == nil || config.DefaultConfig.PoolConfig == nil || config.DefaultConfig.PoolConfig.RedisPoolCfg == nil { + logger.Errorf("Failed to get config, config is nil") + dh.handleError(w, req, http.StatusInternalServerError, errs.HandleJSONDataError(fmt.Errorf("no configuration"))) + return + } + dh.handleJSONData(w, req, http.StatusOK, &job.Config{ + RedisPoolConfig: config.DefaultConfig.PoolConfig.RedisPoolCfg, + }) +} + func extractQuery(req *http.Request) *query.Parameter { q := &query.Parameter{ PageNumber: 1, diff --git a/src/jobservice/api/router.go b/src/jobservice/api/router.go index a34e2bb488b..0e3c7988c53 100644 --- a/src/jobservice/api/router.go +++ b/src/jobservice/api/router.go @@ -100,5 +100,6 @@ func (br *BaseRouter) registerRoutes() { subRouter.HandleFunc("/jobs/{job_id}", br.handler.HandleJobActionReq).Methods(http.MethodPost) subRouter.HandleFunc("/jobs/{job_id}/log", br.handler.HandleJobLogReq).Methods(http.MethodGet) subRouter.HandleFunc("/stats", br.handler.HandleCheckStatusReq).Methods(http.MethodGet) + subRouter.HandleFunc("/config", br.handler.HandleGetConfigReq).Methods(http.MethodGet) subRouter.HandleFunc("/jobs/{job_id}/executions", br.handler.HandlePeriodicExecutions).Methods(http.MethodGet) } diff --git a/src/jobservice/job/models.go b/src/jobservice/job/models.go index 1a521de437e..2cd2eb0bc4c 100644 --- a/src/jobservice/job/models.go +++ b/src/jobservice/job/models.go @@ -17,6 +17,8 @@ package job import ( "encoding/json" + "github.com/goharbor/harbor/src/jobservice/config" + "github.com/goharbor/harbor/src/jobservice/common/utils" "github.com/goharbor/harbor/src/lib/errors" ) @@ -153,3 +155,8 @@ func (st *Stats) Validate() error { return nil } + +// Config job service config +type Config struct { + RedisPoolConfig *config.RedisPoolConfig `json:"redis_pool_config"` +} diff --git a/src/pkg/jobmonitor/model.go b/src/pkg/jobmonitor/model.go new file mode 100644 index 00000000000..ba3acdef0cc --- /dev/null +++ b/src/pkg/jobmonitor/model.go @@ -0,0 +1,46 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jobmonitor + +// WorkerPool job service worker pool +type WorkerPool struct { + ID string `json:"pool_id"` + PID int `json:"pid"` + StartAt int64 `json:"start_at"` + HeartbeatAt int64 `json:"heartbeat_at"` + Concurrency int `json:"concurrency"` + Host string `json:"host"` +} + +// Worker job service worker +type Worker struct { + ID string `json:"id"` + PoolID string `json:"pool_id"` + IsBusy bool `json:"is_busy"` + JobName string `json:"job_name"` + JobID string `json:"job_id"` + StartedAt int64 `json:"start_at"` + Args string `json:"args"` + CheckIn string `json:"check_in"` + CheckInAt int64 `json:"check_in_at"` +} + +// Queue the job queue +type Queue struct { + JobType string + Count int64 + Latency int64 + Paused bool +} diff --git a/src/pkg/jobmonitor/pool.go b/src/pkg/jobmonitor/pool.go new file mode 100644 index 00000000000..335cc3e446d --- /dev/null +++ b/src/pkg/jobmonitor/pool.go @@ -0,0 +1,62 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jobmonitor + +import ( + "context" + + "github.com/gocraft/work" +) + +// PoolManager the interface to retrieve job service monitor metrics +type PoolManager interface { + // List retrieves pools information + List(ctx context.Context, monitorClient JobServiceMonitorClient) ([]*WorkerPool, error) +} + +// JobServiceMonitorClient the interface to retrieve job service monitor metrics +type JobServiceMonitorClient interface { + // WorkerPoolHeartbeats retrieves worker pool heartbeats + WorkerPoolHeartbeats() ([]*work.WorkerPoolHeartbeat, error) + // WorkerObservations retrieves worker observations + WorkerObservations() ([]*work.WorkerObservation, error) +} + +type poolManager struct{} + +// NewPoolManager create a PoolManager with namespace and redis Pool +func NewPoolManager() PoolManager { + return &poolManager{} +} + +func (p poolManager) List(ctx context.Context, monitorClient JobServiceMonitorClient) ([]*WorkerPool, error) { + workerPool := make([]*WorkerPool, 0) + wh, err := monitorClient.WorkerPoolHeartbeats() + if err != nil { + return workerPool, err + } + for _, w := range wh { + wp := &WorkerPool{ + ID: w.WorkerPoolID, + PID: w.Pid, + StartAt: w.StartedAt, + Concurrency: int(w.Concurrency), + Host: w.Host, + HeartbeatAt: w.HeartbeatAt, + } + workerPool = append(workerPool, wp) + } + return workerPool, nil +} diff --git a/src/pkg/jobmonitor/worker.go b/src/pkg/jobmonitor/worker.go new file mode 100644 index 00000000000..debd4fab8fa --- /dev/null +++ b/src/pkg/jobmonitor/worker.go @@ -0,0 +1,87 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jobmonitor + +import ( + "context" + "strings" + + "github.com/gocraft/work" + + "github.com/goharbor/harbor/src/pkg/task" +) + +const all = "all" + +// WorkerManager ... +type WorkerManager interface { + // List lists all workers in the specified pool + List(ctx context.Context, monitClient JobServiceMonitorClient, poolID string) ([]*Worker, error) +} + +type workerManagerImpl struct { + taskMgr task.Manager +} + +// NewWorkerManager ... +func NewWorkerManager() WorkerManager { + return &workerManagerImpl{taskMgr: task.NewManager()} +} + +func (w *workerManagerImpl) List(ctx context.Context, monitClient JobServiceMonitorClient, poolID string) ([]*Worker, error) { + wphs, err := monitClient.WorkerPoolHeartbeats() + if err != nil { + return nil, err + } + workerPoolMap := make(map[string]string) + for _, wph := range wphs { + for _, id := range wph.WorkerIDs { + workerPoolMap[id] = wph.WorkerPoolID + } + } + + workers, err := monitClient.WorkerObservations() + if err != nil { + return nil, err + } + if strings.EqualFold(poolID, all) { + return convertToWorker(workers, workerPoolMap), nil + } + // filter workers by pool id + filteredWorkers := make([]*work.WorkerObservation, 0) + for _, w := range workers { + if workerPoolMap[w.WorkerID] == poolID { + filteredWorkers = append(filteredWorkers, w) + } + } + return convertToWorker(filteredWorkers, workerPoolMap), nil +} + +func convertToWorker(workers []*work.WorkerObservation, workerPoolMap map[string]string) []*Worker { + wks := make([]*Worker, 0) + for _, w := range workers { + wks = append(wks, &Worker{ + ID: w.WorkerID, + PoolID: workerPoolMap[w.WorkerID], + IsBusy: w.IsBusy, + JobName: w.JobName, + JobID: w.JobID, + StartedAt: w.StartedAt, + CheckIn: w.Checkin, + CheckInAt: w.CheckinAt, + }) + } + return wks +} diff --git a/src/pkg/task/mock_jobservice_client_test.go b/src/pkg/task/mock_jobservice_client_test.go index b1f8a6ce333..27b0b9059bb 100644 --- a/src/pkg/task/mock_jobservice_client_test.go +++ b/src/pkg/task/mock_jobservice_client_test.go @@ -60,6 +60,29 @@ func (_m *mockJobserviceClient) GetJobLog(uuid string) ([]byte, error) { return r0, r1 } +// GetJobServiceConfig provides a mock function with given fields: +func (_m *mockJobserviceClient) GetJobServiceConfig() (*job.Config, error) { + ret := _m.Called() + + var r0 *job.Config + if rf, ok := ret.Get(0).(func() *job.Config); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*job.Config) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // PostAction provides a mock function with given fields: uuid, action func (_m *mockJobserviceClient) PostAction(uuid string, action string) error { ret := _m.Called(uuid, action) diff --git a/src/server/v2.0/handler/handler.go b/src/server/v2.0/handler/handler.go index 4fe0a7fc6ce..81a2395841d 100644 --- a/src/server/v2.0/handler/handler.go +++ b/src/server/v2.0/handler/handler.go @@ -67,6 +67,7 @@ func New() http.Handler { ProjectMetadataAPI: newProjectMetadaAPI(), PurgeAPI: newPurgeAPI(), ScanDataExportAPI: newScanDataExportAPI(), + JobserviceAPI: newJobServiceAPI(), }) if err != nil { log.Fatal(err) diff --git a/src/server/v2.0/handler/jobservice.go b/src/server/v2.0/handler/jobservice.go new file mode 100644 index 00000000000..70072efeb48 --- /dev/null +++ b/src/server/v2.0/handler/jobservice.go @@ -0,0 +1,108 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package handler + +import ( + "context" + "time" + + "github.com/go-openapi/runtime/middleware" + "github.com/go-openapi/strfmt" + + "github.com/goharbor/harbor/src/common/rbac" + "github.com/goharbor/harbor/src/controller/jobmonitor" + jm "github.com/goharbor/harbor/src/pkg/jobmonitor" + "github.com/goharbor/harbor/src/server/v2.0/models" + "github.com/goharbor/harbor/src/server/v2.0/restapi/operations/jobservice" +) + +type jobServiceAPI struct { + BaseAPI + jobCtr jobmonitor.MonitorController +} + +func newJobServiceAPI() *jobServiceAPI { + return &jobServiceAPI{jobCtr: jobmonitor.Ctl} +} + +func (j *jobServiceAPI) GetWorkerPools(ctx context.Context, params jobservice.GetWorkerPoolsParams) middleware.Responder { + if err := j.RequireSystemAccess(ctx, rbac.ActionList, rbac.ResourceJobServiceMonitor); err != nil { + return j.SendError(ctx, err) + } + workPools, err := j.jobCtr.ListPools(ctx) + if err != nil { + return j.SendError(ctx, err) + } + return jobservice.NewGetWorkerPoolsOK().WithPayload(toWorkerPoolResponse(workPools)) +} + +func (j *jobServiceAPI) GetWorkers(ctx context.Context, params jobservice.GetWorkersParams) middleware.Responder { + if err := j.RequireSystemAccess(ctx, rbac.ActionList, rbac.ResourceJobServiceMonitor); err != nil { + return j.SendError(ctx, err) + } + workers, err := j.jobCtr.ListWorkers(ctx, params.PoolID) + if err != nil { + return j.SendError(ctx, err) + } + return jobservice.NewGetWorkersOK().WithPayload(toWorkerResponse(workers)) +} + +func (j *jobServiceAPI) StopRunningJob(ctx context.Context, params jobservice.StopRunningJobParams) middleware.Responder { + if err := j.RequireSystemAccess(ctx, rbac.ActionStop, rbac.ResourceJobServiceMonitor); err != nil { + return j.SendError(ctx, err) + } + err := j.jobCtr.StopRunningJob(ctx, params.JobID) + if err != nil { + return j.SendError(ctx, err) + } + return jobservice.NewStopRunningJobOK() +} + +func toWorkerResponse(wks []*jm.Worker) []*models.Worker { + workers := make([]*models.Worker, 0) + for _, w := range wks { + p := &models.Worker{ + ID: w.ID, + JobName: w.JobName, + JobID: w.JobID, + PoolID: w.PoolID, + Args: w.Args, + StartAt: covertTime(w.StartedAt), + CheckinAt: covertTime(w.CheckInAt), + } + workers = append(workers, p) + } + return workers +} + +func toWorkerPoolResponse(wps []*jm.WorkerPool) []*models.WorkerPool { + pools := make([]*models.WorkerPool, 0) + for _, wp := range wps { + p := &models.WorkerPool{ + Pid: int64(wp.PID), + HeartbeatAt: covertTime(wp.HeartbeatAt), + Concurrency: int64(wp.Concurrency), + WorkerPoolID: wp.ID, + StartAt: covertTime(wp.StartAt), + } + pools = append(pools, p) + } + return pools +} + +func covertTime(t int64) strfmt.DateTime { + uxt := time.Unix(int64(t), 0) + return strfmt.DateTime(uxt) +} diff --git a/src/testing/job/mock_client.go b/src/testing/job/mock_client.go index efda03fc703..69e001b0a34 100644 --- a/src/testing/job/mock_client.go +++ b/src/testing/job/mock_client.go @@ -14,6 +14,11 @@ type MockJobClient struct { JobUUID []string } +// GetJobServiceConfig ... +func (mjc *MockJobClient) GetJobServiceConfig() (*job.Config, error) { + panic("implement me") +} + // GetJobLog ... func (mjc *MockJobClient) GetJobLog(uuid string) ([]byte, error) { if uuid == "500" { diff --git a/src/testing/pkg/jobmonitor/job_service_monitor_client.go b/src/testing/pkg/jobmonitor/job_service_monitor_client.go new file mode 100644 index 00000000000..e2fcb126e11 --- /dev/null +++ b/src/testing/pkg/jobmonitor/job_service_monitor_client.go @@ -0,0 +1,74 @@ +// Code generated by mockery v2.14.0. DO NOT EDIT. + +package jobmonitor + +import ( + work "github.com/gocraft/work" + mock "github.com/stretchr/testify/mock" +) + +// JobServiceMonitorClient is an autogenerated mock type for the JobServiceMonitorClient type +type JobServiceMonitorClient struct { + mock.Mock +} + +// WorkerObservations provides a mock function with given fields: +func (_m *JobServiceMonitorClient) WorkerObservations() ([]*work.WorkerObservation, error) { + ret := _m.Called() + + var r0 []*work.WorkerObservation + if rf, ok := ret.Get(0).(func() []*work.WorkerObservation); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*work.WorkerObservation) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// WorkerPoolHeartbeats provides a mock function with given fields: +func (_m *JobServiceMonitorClient) WorkerPoolHeartbeats() ([]*work.WorkerPoolHeartbeat, error) { + ret := _m.Called() + + var r0 []*work.WorkerPoolHeartbeat + if rf, ok := ret.Get(0).(func() []*work.WorkerPoolHeartbeat); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*work.WorkerPoolHeartbeat) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +type mockConstructorTestingTNewJobServiceMonitorClient interface { + mock.TestingT + Cleanup(func()) +} + +// NewJobServiceMonitorClient creates a new instance of JobServiceMonitorClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewJobServiceMonitorClient(t mockConstructorTestingTNewJobServiceMonitorClient) *JobServiceMonitorClient { + mock := &JobServiceMonitorClient{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/src/testing/pkg/jobmonitor/pool_manager.go b/src/testing/pkg/jobmonitor/pool_manager.go new file mode 100644 index 00000000000..e4380c22cc2 --- /dev/null +++ b/src/testing/pkg/jobmonitor/pool_manager.go @@ -0,0 +1,53 @@ +// Code generated by mockery v2.14.0. DO NOT EDIT. + +package jobmonitor + +import ( + context "context" + + jobmonitor "github.com/goharbor/harbor/src/pkg/jobmonitor" + mock "github.com/stretchr/testify/mock" +) + +// PoolManager is an autogenerated mock type for the PoolManager type +type PoolManager struct { + mock.Mock +} + +// List provides a mock function with given fields: ctx, monitorClient +func (_m *PoolManager) List(ctx context.Context, monitorClient jobmonitor.JobServiceMonitorClient) ([]*jobmonitor.WorkerPool, error) { + ret := _m.Called(ctx, monitorClient) + + var r0 []*jobmonitor.WorkerPool + if rf, ok := ret.Get(0).(func(context.Context, jobmonitor.JobServiceMonitorClient) []*jobmonitor.WorkerPool); ok { + r0 = rf(ctx, monitorClient) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*jobmonitor.WorkerPool) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, jobmonitor.JobServiceMonitorClient) error); ok { + r1 = rf(ctx, monitorClient) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +type mockConstructorTestingTNewPoolManager interface { + mock.TestingT + Cleanup(func()) +} + +// NewPoolManager creates a new instance of PoolManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewPoolManager(t mockConstructorTestingTNewPoolManager) *PoolManager { + mock := &PoolManager{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/src/testing/pkg/jobmonitor/worker_manager.go b/src/testing/pkg/jobmonitor/worker_manager.go new file mode 100644 index 00000000000..1a66b9eb413 --- /dev/null +++ b/src/testing/pkg/jobmonitor/worker_manager.go @@ -0,0 +1,53 @@ +// Code generated by mockery v2.14.0. DO NOT EDIT. + +package jobmonitor + +import ( + context "context" + + jobmonitor "github.com/goharbor/harbor/src/pkg/jobmonitor" + mock "github.com/stretchr/testify/mock" +) + +// WorkerManager is an autogenerated mock type for the WorkerManager type +type WorkerManager struct { + mock.Mock +} + +// List provides a mock function with given fields: ctx, monitClient, poolID +func (_m *WorkerManager) List(ctx context.Context, monitClient jobmonitor.JobServiceMonitorClient, poolID string) ([]*jobmonitor.Worker, error) { + ret := _m.Called(ctx, monitClient, poolID) + + var r0 []*jobmonitor.Worker + if rf, ok := ret.Get(0).(func(context.Context, jobmonitor.JobServiceMonitorClient, string) []*jobmonitor.Worker); ok { + r0 = rf(ctx, monitClient, poolID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*jobmonitor.Worker) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, jobmonitor.JobServiceMonitorClient, string) error); ok { + r1 = rf(ctx, monitClient, poolID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +type mockConstructorTestingTNewWorkerManager interface { + mock.TestingT + Cleanup(func()) +} + +// NewWorkerManager creates a new instance of WorkerManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewWorkerManager(t mockConstructorTestingTNewWorkerManager) *WorkerManager { + mock := &WorkerManager{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/src/testing/pkg/pkg.go b/src/testing/pkg/pkg.go index 448847d6a6d..145ced63a39 100644 --- a/src/testing/pkg/pkg.go +++ b/src/testing/pkg/pkg.go @@ -68,3 +68,6 @@ package pkg //go:generate mockery --case snake --dir ../../pkg/registry --name Client --output ./registry --outpkg registry --filename fake_registry_client.go //go:generate mockery --case snake --dir ../../pkg/member --name Manager --output ./member --outpkg member --filename fake_member_manager.go //go:generate mockery --case snake --dir ../../pkg/usergroup --name Manager --output ./usergroup --outpkg usergroup --filename fake_usergroup_manager.go +//go:generate mockery --case snake --dir ../../pkg/jobmonitor --name PoolManager --output ./jobmonitor --outpkg jobmonitor +//go:generate mockery --case snake --dir ../../pkg/jobmonitor --name JobServiceMonitorClient --output ./jobmonitor --outpkg jobmonitor +//go:generate mockery --case snake --dir ../../pkg/jobmonitor --name WorkerManager --output ./jobmonitor --outpkg jobmonitor