Skip to content

Commit

Permalink
Use batch to list the job id in the job queue to avoid crash redis
Browse files Browse the repository at this point in the history
  fixes: goharbor#19436

Signed-off-by: stonezdj <[email protected]>
  • Loading branch information
stonezdj committed Oct 16, 2023
1 parent 27e70cd commit f5d8662
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 24 deletions.
32 changes: 24 additions & 8 deletions src/pkg/jobmonitor/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ import (
// JobServicePool job service pool name
const JobServicePool = "JobService"

// batchSize the batch size to list the job in queue
const batchSize = 1000

// RedisClient defines the job service operations related to redis
type RedisClient interface {
// AllJobTypes returns all the job types registered in the job service
Expand Down Expand Up @@ -74,21 +77,34 @@ func (r *redisClientImpl) StopPendingJobs(ctx context.Context, jobType string) (
var jobInfo struct {
ID string `json:"id"`
}
jobs, err := redis.Strings(conn.Do("LRANGE", redisKeyJobQueue, 0, -1))
size, err := redis.Int64(conn.Do("LLEN", redisKeyJobQueue))
if err != nil {
log.Infof("fail to get the size of the queue")
return []string{}, err
}
if len(jobs) == 0 {
log.Infof("no pending job for job type %v", jobType)
if size == 0 {
return []string{}, nil
}
for _, j := range jobs {
if err := json.Unmarshal([]byte(j), &jobInfo); err != nil {
log.Errorf("failed to parse the job info %v, %v", j, err)
continue

// use batch to list the job in queue, because the too many object load from a list might cause the redis crash
for startIndex := int64(0); startIndex < int64(size); startIndex += batchSize {
endIndex := startIndex + batchSize
if endIndex > int64(size) {
endIndex = int64(size)
}
jobs, err := redis.Strings(conn.Do("LRANGE", redisKeyJobQueue, startIndex, endIndex))
if err != nil {
return []string{}, err
}
for _, j := range jobs {
if err := json.Unmarshal([]byte(j), &jobInfo); err != nil {
log.Errorf("failed to parse the job info %v, %v", j, err)
continue
}
jobIDs = append(jobIDs, jobInfo.ID)
}
jobIDs = append(jobIDs, jobInfo.ID)
}

log.Infof("updated %d tasks in pending status to stop", len(jobIDs))
ret, err := redis.Int64(conn.Do("DEL", redisKeyJobQueue))
if err != nil {
Expand Down
63 changes: 47 additions & 16 deletions src/pkg/jobmonitor/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package jobmonitor

import (
"context"
"encoding/json"
"fmt"
"os"
"testing"
Expand All @@ -34,51 +35,81 @@ type RedisClientTestSuite struct {
redisURL string
}

func (suite *RedisClientTestSuite) SetupSuite() {
func (s *RedisClientTestSuite) SetupSuite() {
redisHost := os.Getenv("REDIS_HOST")
if redisHost == "" {
suite.FailNow("REDIS_HOST is not specified")
s.FailNow("REDIS_HOST is not specified")
}
suite.redisURL = fmt.Sprintf("redis://%s:6379", redisHost)
pool, err := redisPool(&config.RedisPoolConfig{RedisURL: suite.redisURL, Namespace: "{jobservice_namespace}", IdleTimeoutSecond: 30})
suite.redisClient = redisClientImpl{
s.redisURL = fmt.Sprintf("redis://%s:6379", redisHost)
pool, err := redisPool(&config.RedisPoolConfig{RedisURL: s.redisURL, Namespace: "{jobservice_namespace}", IdleTimeoutSecond: 30})
s.redisClient = redisClientImpl{
redisPool: pool,
namespace: "{harbor_job_service_namespace}",
}
if err != nil {
suite.FailNow("failed to create redis client", err)
s.FailNow("failed to create redis client", err)
}
}

func (suite *RedisClientTestSuite) TearDownSuite() {
func (s *RedisClientTestSuite) TearDownSuite() {
}

func (suite *RedisClientTestSuite) TestUntrackJobStatusInBatch() {
func (s *RedisClientTestSuite) TestUntrackJobStatusInBatch() {
// create key and value
jobIDs := make([]string, 0)
conn := suite.redisClient.redisPool.Get()
conn := s.redisClient.redisPool.Get()
defer conn.Close()
for i := 0; i < 100; i++ {
k := utils.GenerateRandomStringWithLen(10)
jobIDs = append(jobIDs, k)
key := rds.KeyJobStats(fmt.Sprintf("{%s}", suite.redisClient.namespace), k)
key := rds.KeyJobStats(fmt.Sprintf("{%s}", s.redisClient.namespace), k)
v := utils.GenerateRandomStringWithLen(10)
_, err := conn.Do("HSET", key, k, v)
if err != nil {
suite.FailNow("can not insert data to redis", err)
s.FailNow("can not insert data to redis", err)
}
}
suite.redisClient.removeJobStatusInRedis(context.Background(), jobIDs)
key := rds.KeyJobStats(fmt.Sprintf("{%s}", suite.redisClient.namespace), "*")
s.redisClient.removeJobStatusInRedis(context.Background(), jobIDs)
key := rds.KeyJobStats(fmt.Sprintf("{%s}", s.redisClient.namespace), "*")
result, err := conn.Do("KEYS", key)
if err != nil {
suite.FailNow("can not get data from redis", err)
s.FailNow("can not get data from redis", err)
}
remains, err := redis.Values(result, err)
if err != nil {
suite.FailNow("can not get data from redis", err)
s.FailNow("can not get data from redis", err)
}
suite.Equal(0, len(remains))
s.Equal(0, len(remains))
}

func (s *RedisClientTestSuite) TestStopPendingJobs() {
redisKeyJobQueue := fmt.Sprintf("{%s}:jobs:%v", "{harbor_job_service_namespace}", "REPLICATION")
// create key and value
type jobInfo struct {
ID string `json:"id"`
Params string `json:"params"`
}
conn := s.redisClient.redisPool.Get()
defer conn.Close()
for i := 0; i < 100; i++ {
job := jobInfo{
ID: utils.GenerateRandomStringWithLen(10),
Params: utils.GenerateRandomStringWithLen(10),
}
val, err := json.Marshal(&job)
if err != nil {
s.Errorf(err, "failed to marshal job info")
}
_, err = conn.Do("LPUSH", redisKeyJobQueue, val)
if err != nil {
s.FailNow("can not insert data to redis", err)
}
}
jobIDs, err := s.redisClient.StopPendingJobs(context.Background(), "REPLICATION")
if err != nil {
s.FailNow("failed to stop pending jobs", err)
}
s.Assert().Equal(100, len(jobIDs))
}

func TestRedisClientTestSuite(t *testing.T) {
Expand Down

0 comments on commit f5d8662

Please sign in to comment.