Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cherry-pick] Use batch to list the job id in the job queue to avoid crash redis #19455

Merged
merged 1 commit into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 26 additions & 8 deletions src/pkg/jobmonitor/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
// JobServicePool job service pool name
const JobServicePool = "JobService"

// batchSize the batch size to list the job in queue
const batchSize = 1000

// RedisClient defines the job service operations related to redis
type RedisClient interface {
// AllJobTypes returns all the job types registered in the job service
Expand Down Expand Up @@ -74,21 +77,36 @@
var jobInfo struct {
ID string `json:"id"`
}
jobs, err := redis.Strings(conn.Do("LRANGE", redisKeyJobQueue, 0, -1))
size, err := redis.Int64(conn.Do("LLEN", redisKeyJobQueue))
if err != nil {
log.Infof("fail to get the size of the queue")

Check warning on line 82 in src/pkg/jobmonitor/redis.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/jobmonitor/redis.go#L82

Added line #L82 was not covered by tests
return []string{}, err
}
if len(jobs) == 0 {
log.Infof("no pending job for job type %v", jobType)
if size == 0 {
return []string{}, nil
}
for _, j := range jobs {
if err := json.Unmarshal([]byte(j), &jobInfo); err != nil {
log.Errorf("failed to parse the job info %v, %v", j, err)
continue

// use batch to list the job in queue, because the too many object load from a list might cause the redis crash
for startIndex := int64(0); startIndex < int64(size); startIndex += batchSize {
endIndex := startIndex + batchSize
if endIndex > int64(size) {
endIndex = int64(size)
}
jobs, err := redis.Strings(conn.Do("LRANGE", redisKeyJobQueue, startIndex, endIndex))
if err != nil {
return []string{}, err
}

Check warning on line 98 in src/pkg/jobmonitor/redis.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/jobmonitor/redis.go#L97-L98

Added lines #L97 - L98 were not covered by tests
for _, j := range jobs {
if err := json.Unmarshal([]byte(j), &jobInfo); err != nil {
log.Errorf("failed to parse the job info %v, %v", j, err)
continue

Check warning on line 102 in src/pkg/jobmonitor/redis.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/jobmonitor/redis.go#L101-L102

Added lines #L101 - L102 were not covered by tests
}
if len(jobInfo.ID) > 0 {
jobIDs = append(jobIDs, jobInfo.ID)
}
}
jobIDs = append(jobIDs, jobInfo.ID)
}

log.Infof("updated %d tasks in pending status to stop", len(jobIDs))
ret, err := redis.Int64(conn.Do("DEL", redisKeyJobQueue))
if err != nil {
Expand Down
79 changes: 63 additions & 16 deletions src/pkg/jobmonitor/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package jobmonitor

import (
"context"
"encoding/json"
"fmt"
"os"
"testing"
Expand All @@ -34,51 +35,97 @@ type RedisClientTestSuite struct {
redisURL string
}

func (suite *RedisClientTestSuite) SetupSuite() {
func (s *RedisClientTestSuite) SetupSuite() {
redisHost := os.Getenv("REDIS_HOST")
if redisHost == "" {
suite.FailNow("REDIS_HOST is not specified")
s.FailNow("REDIS_HOST is not specified")
}
suite.redisURL = fmt.Sprintf("redis://%s:6379", redisHost)
pool, err := redisPool(&config.RedisPoolConfig{RedisURL: suite.redisURL, Namespace: "{jobservice_namespace}", IdleTimeoutSecond: 30})
suite.redisClient = redisClientImpl{
s.redisURL = fmt.Sprintf("redis://%s:6379", redisHost)
pool, err := redisPool(&config.RedisPoolConfig{RedisURL: s.redisURL, Namespace: "{jobservice_namespace}", IdleTimeoutSecond: 30})
s.redisClient = redisClientImpl{
redisPool: pool,
namespace: "{harbor_job_service_namespace}",
}
if err != nil {
suite.FailNow("failed to create redis client", err)
s.FailNow("failed to create redis client", err)
}
}

func (suite *RedisClientTestSuite) TearDownSuite() {
func (s *RedisClientTestSuite) TearDownSuite() {
}

func (suite *RedisClientTestSuite) TestUntrackJobStatusInBatch() {
func (s *RedisClientTestSuite) TestUntrackJobStatusInBatch() {
// create key and value
jobIDs := make([]string, 0)
conn := suite.redisClient.redisPool.Get()
conn := s.redisClient.redisPool.Get()
defer conn.Close()
for i := 0; i < 100; i++ {
k := utils.GenerateRandomStringWithLen(10)
jobIDs = append(jobIDs, k)
key := rds.KeyJobStats(fmt.Sprintf("{%s}", suite.redisClient.namespace), k)
key := rds.KeyJobStats(fmt.Sprintf("{%s}", s.redisClient.namespace), k)
v := utils.GenerateRandomStringWithLen(10)
_, err := conn.Do("HSET", key, k, v)
if err != nil {
suite.FailNow("can not insert data to redis", err)
s.FailNow("can not insert data to redis", err)
}
}
suite.redisClient.removeJobStatusInRedis(context.Background(), jobIDs)
key := rds.KeyJobStats(fmt.Sprintf("{%s}", suite.redisClient.namespace), "*")

s.redisClient.removeJobStatusInRedis(context.Background(), jobIDs)
key := rds.KeyJobStats(fmt.Sprintf("{%s}", s.redisClient.namespace), "*")
result, err := conn.Do("KEYS", key)
if err != nil {
suite.FailNow("can not get data from redis", err)
s.FailNow("can not get data from redis", err)
}
remains, err := redis.Values(result, err)
if err != nil {
suite.FailNow("can not get data from redis", err)
s.FailNow("can not get data from redis", err)
}
s.Equal(0, len(remains))
}

func (s *RedisClientTestSuite) TestStopPendingJobs() {
redisKeyJobQueue := fmt.Sprintf("{%s}:jobs:%v", "{harbor_job_service_namespace}", "REPLICATION")
// create key and value
type jobInfo struct {
ID string `json:"id"`
Params string `json:"params"`
}
conn := s.redisClient.redisPool.Get()
defer conn.Close()
for i := 0; i < 100; i++ {
job := jobInfo{
ID: utils.GenerateRandomStringWithLen(10),
Params: utils.GenerateRandomStringWithLen(10),
}
val, err := json.Marshal(&job)
if err != nil {
s.Errorf(err, "failed to marshal job info")
}
_, err = conn.Do("LPUSH", redisKeyJobQueue, val)
if err != nil {
s.FailNow("can not insert data to redis", err)
}
}
// job without id
for i := 0; i < 10; i++ {
job := jobInfo{
Params: utils.GenerateRandomStringWithLen(10),
}
val, err := json.Marshal(&job)
if err != nil {
s.Errorf(err, "failed to marshal job info")
}
_, err = conn.Do("LPUSH", redisKeyJobQueue, val)
if err != nil {
s.FailNow("can not insert data to redis", err)
}
}

jobIDs, err := s.redisClient.StopPendingJobs(context.Background(), "REPLICATION")
if err != nil {
s.FailNow("failed to stop pending jobs", err)
}
suite.Equal(0, len(remains))
s.Assert().Equal(100, len(jobIDs))
}

func TestRedisClientTestSuite(t *testing.T) {
Expand Down
Loading