Skip to content

Commit

Permalink
Remove job status track information from redis after stop the job in …
Browse files Browse the repository at this point in the history
…the queue

  After stop in the queue:
  Remove key in {harbor_job_service_namespace}:job_track:inprogress
  Remove {harbor_job_service_namespace}:job_stats:<job_id>
  fixes #19211

Signed-off-by: stonezdj <[email protected]>
  • Loading branch information
stonezdj committed Sep 5, 2023
1 parent 206c5b8 commit d91368d
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 0 deletions.
26 changes: 26 additions & 0 deletions src/pkg/jobmonitor/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/gomodule/redigo/redis"

"github.com/goharbor/harbor/src/common/job"
"github.com/goharbor/harbor/src/jobservice/common/rds"
"github.com/goharbor/harbor/src/jobservice/config"
"github.com/goharbor/harbor/src/lib/log"
libRedis "github.com/goharbor/harbor/src/lib/redis"
Expand Down Expand Up @@ -93,6 +94,10 @@ func (r *redisClientImpl) StopPendingJobs(ctx context.Context, jobType string) (
if err != nil {
return []string{}, err
}
go func() {
// the amount of jobIDs maybe large, so use goroutine to remove the job status tracking info
r.removeJobStatusInRedis(ctx, jobIDs)
}()

Check warning on line 100 in src/pkg/jobmonitor/redis.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/jobmonitor/redis.go#L97-L100

Added lines #L97 - L100 were not covered by tests
if ret < 1 {
// no job in queue removed
return []string{}, fmt.Errorf("no job in the queue removed")
Expand All @@ -102,6 +107,27 @@ func (r *redisClientImpl) StopPendingJobs(ctx context.Context, jobType string) (
return jobIDs, nil
}

// removeJobStatusInRedis remove job status track information from redis, to avoid performance impact when the jobIDs is too large, use batch to remove
func (r *redisClientImpl) removeJobStatusInRedis(ctx context.Context, jobIDs []string) {
conn := r.redisPool.Get()
defer conn.Close()
for _, id := range jobIDs {
namespace := fmt.Sprintf("{%s}", r.namespace)
redisKeyStatus := rds.KeyJobStats(namespace, id)
log.Debugf("delete job status info for job id:%v, key:%v", id, redisKeyStatus)
_, err := conn.Do("DEL", redisKeyStatus)
if err != nil {
log.Warningf("failed to delete the job status info for job %v, %v, continue", id, err)
}

Check warning on line 121 in src/pkg/jobmonitor/redis.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/jobmonitor/redis.go#L120-L121

Added lines #L120 - L121 were not covered by tests
redisKeyInProgress := rds.KeyJobTrackInProgress(namespace)
log.Debugf("delete inprogress info for key:%v, job id:%v", id, redisKeyInProgress)
_, err = conn.Do("HDEL", redisKeyInProgress, id)
if err != nil {
log.Warningf("failed to delete the job info in %v for job %v, %v, continue", rds.KeyJobTrackInProgress(namespace), id, err)
}

Check warning on line 127 in src/pkg/jobmonitor/redis.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/jobmonitor/redis.go#L126-L127

Added lines #L126 - L127 were not covered by tests
}
}

func (r *redisClientImpl) AllJobTypes(ctx context.Context) ([]string, error) {
conn := r.redisPool.Get()
defer conn.Close()
Expand Down
86 changes: 86 additions & 0 deletions src/pkg/jobmonitor/redis_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package jobmonitor

import (
"context"
"fmt"
"os"
"testing"

"github.com/gomodule/redigo/redis"
"github.com/stretchr/testify/suite"

"github.com/goharbor/harbor/src/common/utils"
"github.com/goharbor/harbor/src/jobservice/common/rds"
"github.com/goharbor/harbor/src/jobservice/config"
)

type RedisClientTestSuite struct {
suite.Suite
redisClient redisClientImpl
redisURL string
}

func (suite *RedisClientTestSuite) SetupSuite() {
redisHost := os.Getenv("REDIS_HOST")
if redisHost == "" {
suite.FailNow("REDIS_HOST is not specified")
}
suite.redisURL = fmt.Sprintf("redis://%s:6379", redisHost)
pool, err := redisPool(&config.RedisPoolConfig{RedisURL: suite.redisURL, Namespace: "{jobservice_namespace}", IdleTimeoutSecond: 30})
suite.redisClient = redisClientImpl{
redisPool: pool,
namespace: "{harbor_job_service_namespace}",
}
if err != nil {
suite.FailNow("failed to create redis client", err)
}
}

func (suite *RedisClientTestSuite) TearDownSuite() {
}

func (suite *RedisClientTestSuite) TestUntrackJobStatusInBatch() {
// create key and value
jobIDs := make([]string, 0)
conn := suite.redisClient.redisPool.Get()
defer conn.Close()
for i := 0; i < 100; i++ {
k := utils.GenerateRandomStringWithLen(10)
jobIDs = append(jobIDs, k)
key := rds.KeyJobStats(fmt.Sprintf("{%s}", suite.redisClient.namespace), k)
v := utils.GenerateRandomStringWithLen(10)
_, err := conn.Do("HSET", key, k, v)
if err != nil {
suite.FailNow("can not insert data to redis", err)
}
}
suite.redisClient.removeJobStatusInRedis(context.Background(), jobIDs)
key := rds.KeyJobStats(fmt.Sprintf("{%s}", suite.redisClient.namespace), "*")
result, err := conn.Do("KEYS", key)
if err != nil {
suite.FailNow("can not get data from redis", err)
}
remains, err := redis.Values(result, err)
if err != nil {
suite.FailNow("can not get data from redis", err)
}
suite.Equal(0, len(remains))
}

func TestRedisClientTestSuite(t *testing.T) {
suite.Run(t, &RedisClientTestSuite{})
}

0 comments on commit d91368d

Please sign in to comment.