diff --git a/src/pkg/jobmonitor/redis.go b/src/pkg/jobmonitor/redis.go index 801ad5657a1..ddf525d34b8 100644 --- a/src/pkg/jobmonitor/redis.go +++ b/src/pkg/jobmonitor/redis.go @@ -32,6 +32,9 @@ import ( // JobServicePool job service pool name const JobServicePool = "JobService" +// batchSize the batch size to list the job in queue +const batchSize = 1000 + // RedisClient defines the job service operations related to redis type RedisClient interface { // AllJobTypes returns all the job types registered in the job service @@ -74,21 +77,36 @@ func (r *redisClientImpl) StopPendingJobs(ctx context.Context, jobType string) ( var jobInfo struct { ID string `json:"id"` } - jobs, err := redis.Strings(conn.Do("LRANGE", redisKeyJobQueue, 0, -1)) + size, err := redis.Int64(conn.Do("LLEN", redisKeyJobQueue)) if err != nil { + log.Infof("fail to get the size of the queue") return []string{}, err } - if len(jobs) == 0 { - log.Infof("no pending job for job type %v", jobType) + if size == 0 { return []string{}, nil } - for _, j := range jobs { - if err := json.Unmarshal([]byte(j), &jobInfo); err != nil { - log.Errorf("failed to parse the job info %v, %v", j, err) - continue + + // use batch to list the job in queue, because the too many object load from a list might cause the redis crash + for startIndex := int64(0); startIndex < int64(size); startIndex += batchSize { + endIndex := startIndex + batchSize + if endIndex > int64(size) { + endIndex = int64(size) + } + jobs, err := redis.Strings(conn.Do("LRANGE", redisKeyJobQueue, startIndex, endIndex)) + if err != nil { + return []string{}, err + } + for _, j := range jobs { + if err := json.Unmarshal([]byte(j), &jobInfo); err != nil { + log.Errorf("failed to parse the job info %v, %v", j, err) + continue + } + if len(jobInfo.ID) > 0 { + jobIDs = append(jobIDs, jobInfo.ID) + } } - jobIDs = append(jobIDs, jobInfo.ID) } + log.Infof("updated %d tasks in pending status to stop", len(jobIDs)) ret, err := redis.Int64(conn.Do("DEL", redisKeyJobQueue)) if err != nil { diff --git a/src/pkg/jobmonitor/redis_test.go b/src/pkg/jobmonitor/redis_test.go index b902f569ae3..4eae1a49a3b 100644 --- a/src/pkg/jobmonitor/redis_test.go +++ b/src/pkg/jobmonitor/redis_test.go @@ -16,6 +16,7 @@ package jobmonitor import ( "context" + "encoding/json" "fmt" "os" "testing" @@ -34,51 +35,97 @@ type RedisClientTestSuite struct { redisURL string } -func (suite *RedisClientTestSuite) SetupSuite() { +func (s *RedisClientTestSuite) SetupSuite() { redisHost := os.Getenv("REDIS_HOST") if redisHost == "" { - suite.FailNow("REDIS_HOST is not specified") + s.FailNow("REDIS_HOST is not specified") } - suite.redisURL = fmt.Sprintf("redis://%s:6379", redisHost) - pool, err := redisPool(&config.RedisPoolConfig{RedisURL: suite.redisURL, Namespace: "{jobservice_namespace}", IdleTimeoutSecond: 30}) - suite.redisClient = redisClientImpl{ + s.redisURL = fmt.Sprintf("redis://%s:6379", redisHost) + pool, err := redisPool(&config.RedisPoolConfig{RedisURL: s.redisURL, Namespace: "{jobservice_namespace}", IdleTimeoutSecond: 30}) + s.redisClient = redisClientImpl{ redisPool: pool, namespace: "{harbor_job_service_namespace}", } if err != nil { - suite.FailNow("failed to create redis client", err) + s.FailNow("failed to create redis client", err) } } -func (suite *RedisClientTestSuite) TearDownSuite() { +func (s *RedisClientTestSuite) TearDownSuite() { } -func (suite *RedisClientTestSuite) TestUntrackJobStatusInBatch() { +func (s *RedisClientTestSuite) TestUntrackJobStatusInBatch() { // create key and value jobIDs := make([]string, 0) - conn := suite.redisClient.redisPool.Get() + conn := s.redisClient.redisPool.Get() defer conn.Close() for i := 0; i < 100; i++ { k := utils.GenerateRandomStringWithLen(10) jobIDs = append(jobIDs, k) - key := rds.KeyJobStats(fmt.Sprintf("{%s}", suite.redisClient.namespace), k) + key := rds.KeyJobStats(fmt.Sprintf("{%s}", s.redisClient.namespace), k) v := utils.GenerateRandomStringWithLen(10) _, err := conn.Do("HSET", key, k, v) if err != nil { - suite.FailNow("can not insert data to redis", err) + s.FailNow("can not insert data to redis", err) } } - suite.redisClient.removeJobStatusInRedis(context.Background(), jobIDs) - key := rds.KeyJobStats(fmt.Sprintf("{%s}", suite.redisClient.namespace), "*") + + s.redisClient.removeJobStatusInRedis(context.Background(), jobIDs) + key := rds.KeyJobStats(fmt.Sprintf("{%s}", s.redisClient.namespace), "*") result, err := conn.Do("KEYS", key) if err != nil { - suite.FailNow("can not get data from redis", err) + s.FailNow("can not get data from redis", err) } remains, err := redis.Values(result, err) if err != nil { - suite.FailNow("can not get data from redis", err) + s.FailNow("can not get data from redis", err) + } + s.Equal(0, len(remains)) +} + +func (s *RedisClientTestSuite) TestStopPendingJobs() { + redisKeyJobQueue := fmt.Sprintf("{%s}:jobs:%v", "{harbor_job_service_namespace}", "REPLICATION") + // create key and value + type jobInfo struct { + ID string `json:"id"` + Params string `json:"params"` + } + conn := s.redisClient.redisPool.Get() + defer conn.Close() + for i := 0; i < 100; i++ { + job := jobInfo{ + ID: utils.GenerateRandomStringWithLen(10), + Params: utils.GenerateRandomStringWithLen(10), + } + val, err := json.Marshal(&job) + if err != nil { + s.Errorf(err, "failed to marshal job info") + } + _, err = conn.Do("LPUSH", redisKeyJobQueue, val) + if err != nil { + s.FailNow("can not insert data to redis", err) + } + } + // job without id + for i := 0; i < 10; i++ { + job := jobInfo{ + Params: utils.GenerateRandomStringWithLen(10), + } + val, err := json.Marshal(&job) + if err != nil { + s.Errorf(err, "failed to marshal job info") + } + _, err = conn.Do("LPUSH", redisKeyJobQueue, val) + if err != nil { + s.FailNow("can not insert data to redis", err) + } + } + + jobIDs, err := s.redisClient.StopPendingJobs(context.Background(), "REPLICATION") + if err != nil { + s.FailNow("failed to stop pending jobs", err) } - suite.Equal(0, len(remains)) + s.Assert().Equal(100, len(jobIDs)) } func TestRedisClientTestSuite(t *testing.T) {