Skip to content

Commit

Permalink
Add jobservice monitoring api list pool, worker and stop running task (
Browse files Browse the repository at this point in the history
…#17658)

Add REST API to list job pool, worker, stop running task

  Add jobservice handler to retrieve configuration
  Add RBAC for jobservice monitoring dashboard
  Add REST API to list pool, worker and stop running task

Signed-off-by: stonezdj <[email protected]>

Signed-off-by: stonezdj <[email protected]>
  • Loading branch information
stonezdj authored Nov 3, 2022
1 parent e81067b commit 39ca918
Show file tree
Hide file tree
Showing 20 changed files with 958 additions and 2 deletions.
132 changes: 131 additions & 1 deletion api/v2.0/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4517,6 +4517,82 @@ paths:
$ref: '#/responses/403'
'500':
$ref: '#/responses/500'
/jobservice/pools:
get:
operationId: getWorkerPools
summary: Get worker pools
description: Get worker pools
tags:
- jobservice
parameters:
- $ref: '#/parameters/requestId'
responses:
'200':
description: Get worker pools successfully.
schema:
type: array
items:
$ref: '#/definitions/WorkerPool'
'401':
$ref: '#/responses/401'
'403':
$ref: '#/responses/403'
'500':
$ref: '#/responses/500'
/jobservice/pools/{pool_id}/workers:
get:
operationId: getWorkers
summary: Get workers
description: Get workers in current pool
tags:
- jobservice
parameters:
- $ref: '#/parameters/requestId'
- name: pool_id
in: path
required: true
type: string
description: The name of the pool. 'all' stands for all pools
responses:
'200':
description: Get workers successfully.
schema:
type: array
items:
$ref: '#/definitions/Worker'
'401':
$ref: '#/responses/401'
'403':
$ref: '#/responses/403'
'404':
$ref: '#/responses/404'
'500':
$ref: '#/responses/500'
/jobservice/jobs/{job_id}:
put:
operationId: stopRunningJob
summary: Stop running job
description: Stop running job
tags:
- jobservice
parameters:
- $ref: '#/parameters/requestId'
- name: job_id
in: path
required: true
type: string
description: The id of the job.
responses:
'200':
description: Stop worker successfully.
'401':
$ref: '#/responses/401'
'403':
$ref: '#/responses/403'
'404':
$ref: '#/responses/404'
'500':
$ref: '#/responses/500'
/ping:
get:
operationId: getPing
Expand Down Expand Up @@ -9253,4 +9329,58 @@ definitions:
type: array
items:
$ref: '#/definitions/ScanDataExportExecution'
description: The list of scan data export executions
description: The list of scan data export executions
WorkerPool:
type: object
description: the worker pool of job service
properties:
pid:
type: integer
description: the process id of jobservice
worker_pool_id:
type: string
description: the id of the worker pool
start_at:
type: string
format: date-time
description: The start time of the work pool
heartbeat_at:
type: string
format: date-time
description: The heartbeat time of the work pool
concurrency:
type: integer
description: The concurrency of the work pool
host:
type: string
description: The host of the work pool
Worker:
type: object
description: worker in the pool
properties:
id:
type: string
description: the id of the worker
pool_id:
type: string
description: the id of the worker pool
job_name:
type: string
description: the name of the running job in the worker
job_id:
type: string
description: the id of the running job in the worker
start_at:
type: string
format: date-time
description: The start time of the worker
args:
type: string
description: The args of the worker
check_in:
type: string
description: the checkin of the running job in the worker
checkin_at:
type: string
format: date-time
description: The checkin time of the running job in the worker
31 changes: 31 additions & 0 deletions src/common/job/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type Client interface {
PostAction(uuid, action string) error
GetExecutions(uuid string) ([]job.Stats, error)
// TODO Redirect joblog when we see there's memory issue.
// GetJobServiceConfig retrieves the job config
GetJobServiceConfig() (*job.Config, error)
}

// StatusBehindError represents the error got when trying to stop a success/failed job
Expand Down Expand Up @@ -212,6 +214,35 @@ func (d *DefaultClient) PostAction(uuid, action string) error {
return nil
}

// GetJobServiceConfig retrieves the job service configuration
func (d *DefaultClient) GetJobServiceConfig() (*job.Config, error) {
url := d.endpoint + "/api/v1/config"
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, err
}
resp, err := d.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, &commonhttp.Error{
Code: resp.StatusCode,
Message: string(data),
}
}
var config job.Config
err = json.Unmarshal(data, &config)
if err != nil {
return nil, err
}
return &config, nil
}
func isStatusBehindError(err error) (string, bool) {
if err == nil {
return "", false
Expand Down
1 change: 1 addition & 0 deletions src/common/rbac/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,5 @@ const (
ResourceSystemVolumes = Resource("system-volumes")
ResourcePurgeAuditLog = Resource("purge-audit")
ResourceExportCVE = Resource("export-cve")
ResourceJobServiceMonitor = Resource("jobservice-monitor")
)
4 changes: 4 additions & 0 deletions src/common/rbac/system/policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,9 @@ var (
{Resource: rbac.ResourceLdapUser, Action: rbac.ActionList},
{Resource: rbac.ResourceConfiguration, Action: rbac.ActionRead},
{Resource: rbac.ResourceConfiguration, Action: rbac.ActionUpdate},

{Resource: rbac.ResourceJobServiceMonitor, Action: rbac.ActionRead},
{Resource: rbac.ResourceJobServiceMonitor, Action: rbac.ActionList},
{Resource: rbac.ResourceJobServiceMonitor, Action: rbac.ActionStop},
}
)
155 changes: 155 additions & 0 deletions src/controller/jobmonitor/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package jobmonitor

import (
"context"
"fmt"
"strings"
"time"

"github.com/goharbor/harbor/src/lib/orm"

"github.com/goharbor/harbor/src/lib/log"

"github.com/gocraft/work"

"github.com/goharbor/harbor/src/common/job"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/q"
libRedis "github.com/goharbor/harbor/src/lib/redis"
jm "github.com/goharbor/harbor/src/pkg/jobmonitor"
"github.com/goharbor/harbor/src/pkg/scheduler"
"github.com/goharbor/harbor/src/pkg/task"
)

// All the jobs in the pool, or all pools
const All = "all"

// Ctl the controller instance of the worker pool controller
var Ctl = NewMonitorController()

// MonitorController defines the worker pool operations
type MonitorController interface {
// ListPools lists the worker pools
ListPools(ctx context.Context) ([]*jm.WorkerPool, error)
// ListWorkers lists the workers in the pool
ListWorkers(ctx context.Context, poolID string) ([]*jm.Worker, error)
// StopRunningJob stop the running job
StopRunningJob(ctx context.Context, jobID string) error
}

type monitorController struct {
poolManager jm.PoolManager
workerManager jm.WorkerManager
taskManager task.Manager
sch scheduler.Scheduler
monitorClient func() (jm.JobServiceMonitorClient, error)
}

// NewMonitorController ...
func NewMonitorController() MonitorController {
return &monitorController{
poolManager: jm.NewPoolManager(),
workerManager: jm.NewWorkerManager(),
taskManager: task.NewManager(),
monitorClient: jobServiceMonitorClient,
}
}

func (w *monitorController) StopRunningJob(ctx context.Context, jobID string) error {
if strings.EqualFold(jobID, All) {
allRunningJobs, err := w.allRunningJobs(ctx)
if err != nil {
log.Errorf("failed to get all running jobs: %v", err)
return err
}
for _, jobID := range allRunningJobs {
if err := w.stopJob(ctx, jobID); err != nil {
log.Errorf("failed to stop running job %s: %v", jobID, err)
return err
}
}
return nil
}
return w.stopJob(ctx, jobID)
}

func (w *monitorController) stopJob(ctx context.Context, jobID string) error {
tasks, err := w.taskManager.List(ctx, &q.Query{Keywords: q.KeyWords{"job_id": jobID}})
if err != nil {
return err
}
if len(tasks) == 0 {
return errors.BadRequestError(nil).WithMessage("job %s not found", jobID)
}
if len(tasks) != 1 {
return fmt.Errorf("there are more than one task with the same job ID")
}
// use local transaction to avoid rollback batch success tasks to previous state when one fail
if ctx == nil {
log.Debug("context is nil, skip stop operation")
return nil
}
return orm.WithTransaction(func(ctx context.Context) error {
return w.taskManager.Stop(ctx, tasks[0].ID)
})(orm.SetTransactionOpNameToContext(ctx, "tx-stop-job"))
}

func (w *monitorController) allRunningJobs(ctx context.Context) ([]string, error) {
jobIDs := make([]string, 0)
wks, err := w.ListWorkers(ctx, All)
if err != nil {
log.Errorf("failed to list workers: %v", err)
return nil, err
}
for _, wk := range wks {
jobIDs = append(jobIDs, wk.JobID)
}
return jobIDs, nil
}

func jobServiceMonitorClient() (jm.JobServiceMonitorClient, error) {
cfg, err := job.GlobalClient.GetJobServiceConfig()
if err != nil {
return nil, err
}
config := cfg.RedisPoolConfig
pool, err := libRedis.GetRedisPool("JobService", config.RedisURL, &libRedis.PoolParam{
PoolMaxIdle: 0,
PoolIdleTimeout: time.Duration(config.IdleTimeoutSecond) * time.Second,
})
if err != nil {
log.Errorf("failed to get redis pool: %v", err)
return nil, err
}
return work.NewClient(fmt.Sprintf("{%s}", config.Namespace), pool), nil
}

func (w *monitorController) ListWorkers(ctx context.Context, poolID string) ([]*jm.Worker, error) {
mClient, err := w.monitorClient()
if err != nil {
return nil, err
}
return w.workerManager.List(ctx, mClient, poolID)
}

func (w *monitorController) ListPools(ctx context.Context) ([]*jm.WorkerPool, error) {
mClient, err := w.monitorClient()
if err != nil {
return nil, err
}
return w.poolManager.List(ctx, mClient)
}
Loading

0 comments on commit 39ca918

Please sign in to comment.