From e917c8c80f922c99215af51fc743866875dc4dd9 Mon Sep 17 00:00:00 2001 From: vigneshshanmugam Date: Fri, 30 Jul 2021 08:48:44 -0700 Subject: [PATCH 1/6] [Heartbeat]: limit parallelization of tasks by jobtype --- heartbeat/beater/heartbeat.go | 3 +- heartbeat/config/config.go | 5 ++ heartbeat/monitors/task.go | 2 +- heartbeat/scheduler/scheduler.go | 75 ++++++++++++++++++--------- heartbeat/scheduler/scheduler_test.go | 28 +++++----- 5 files changed, 72 insertions(+), 41 deletions(-) diff --git a/heartbeat/beater/heartbeat.go b/heartbeat/beater/heartbeat.go index 85bfb79fe20..48a89a92394 100644 --- a/heartbeat/beater/heartbeat.go +++ b/heartbeat/beater/heartbeat.go @@ -65,8 +65,9 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { if err != nil { return nil, err } + jobConfig := parsedConfig.Jobs - scheduler := scheduler.NewWithLocation(limit, hbregistry.SchedulerRegistry, location) + scheduler := scheduler.NewWithLocation(limit, hbregistry.SchedulerRegistry, location, jobConfig) bt := &Heartbeat{ done: make(chan struct{}), diff --git a/heartbeat/config/config.go b/heartbeat/config/config.go index 052d472ea39..8156495a36e 100644 --- a/heartbeat/config/config.go +++ b/heartbeat/config/config.go @@ -33,6 +33,11 @@ type Config struct { Scheduler Scheduler `config:"scheduler"` Autodiscover *autodiscover.Config `config:"autodiscover"` SyntheticSuites []*common.Config `config:"synthetic_suites"` + Jobs map[string]JobLimit `config:"jobs"` +} + +type JobLimit struct { + Limit int64 `config:"limit" validate:"min=0"` } // Scheduler defines the syntax of a heartbeat.yml scheduler block. diff --git a/heartbeat/monitors/task.go b/heartbeat/monitors/task.go index eb53ddeb195..d9794da16d1 100644 --- a/heartbeat/monitors/task.go +++ b/heartbeat/monitors/task.go @@ -84,7 +84,7 @@ func (t *configuredJob) Start() { } tf := t.makeSchedulerTaskFunc() - t.cancelFn, err = t.monitor.scheduler.Add(t.config.Schedule, t.monitor.stdFields.ID, tf) + t.cancelFn, err = t.monitor.scheduler.Add(t.config.Schedule, t.monitor.stdFields.ID, tf, t.config.Type) if err != nil { logp.Err("could not start monitor: %v", err) } diff --git a/heartbeat/scheduler/scheduler.go b/heartbeat/scheduler/scheduler.go index 87db7075c44..ddcf19b0d80 100644 --- a/heartbeat/scheduler/scheduler.go +++ b/heartbeat/scheduler/scheduler.go @@ -27,6 +27,7 @@ import ( "golang.org/x/sync/semaphore" + "github.com/elastic/beats/v7/heartbeat/config" "github.com/elastic/beats/v7/heartbeat/scheduler/timerqueue" "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/logp" @@ -46,14 +47,15 @@ var ErrInvalidTransition = fmt.Errorf("invalid state transition") // Scheduler represents our async timer based scheduler. type Scheduler struct { - limit int64 - limitSem *semaphore.Weighted - state atomic.Int - location *time.Location - timerQueue *timerqueue.TimerQueue - ctx context.Context - cancelCtx context.CancelFunc - stats schedulerStats + limit int64 + limitSem *semaphore.Weighted + state atomic.Int + location *time.Location + timerQueue *timerqueue.TimerQueue + ctx context.Context + cancelCtx context.CancelFunc + stats schedulerStats + jobLimitSem map[string]*semaphore.Weighted } type schedulerStats struct { @@ -77,13 +79,33 @@ type Schedule interface { RunOnInit() bool } +func getJobLimitSem(jobLimitByType map[string]config.JobLimit) map[string]*semaphore.Weighted { + jc := map[string]config.JobLimit{ + "http": {Limit: math.MaxInt64}, + "icmp": {Limit: math.MaxInt64}, + "tcp": {Limit: math.MaxInt64}, + "browser": {Limit: math.MaxInt64}, + } + for jobType, jobLimit := range jobLimitByType { + if jobLimit.Limit > 0 { + jc[jobType] = jobLimit + } + } + + jobLimitSem := map[string]*semaphore.Weighted{} + for jobType, jobLimit := range jc { + jobLimitSem[jobType] = semaphore.NewWeighted(jobLimit.Limit) + } + return jobLimitSem +} + // New creates a new Scheduler func New(limit int64, registry *monitoring.Registry) *Scheduler { - return NewWithLocation(limit, registry, time.Local) + return NewWithLocation(limit, registry, time.Local, nil) } // NewWithLocation creates a new Scheduler using the given runAt zone. -func NewWithLocation(limit int64, registry *monitoring.Registry, location *time.Location) *Scheduler { +func NewWithLocation(limit int64, registry *monitoring.Registry, location *time.Location, jobLimitByType map[string]config.JobLimit) *Scheduler { ctx, cancelCtx := context.WithCancel(context.Background()) if limit < 1 { @@ -96,14 +118,14 @@ func NewWithLocation(limit int64, registry *monitoring.Registry, location *time. waitingTasksGauge := monitoring.NewUint(registry, "tasks.waiting") sched := &Scheduler{ - limit: limit, - location: location, - state: atomic.MakeInt(statePreRunning), - ctx: ctx, - cancelCtx: cancelCtx, - limitSem: semaphore.NewWeighted(limit), - - timerQueue: timerqueue.NewTimerQueue(ctx), + limit: limit, + location: location, + state: atomic.MakeInt(statePreRunning), + ctx: ctx, + cancelCtx: cancelCtx, + limitSem: semaphore.NewWeighted(limit), + jobLimitSem: getJobLimitSem(jobLimitByType), + timerQueue: timerqueue.NewTimerQueue(ctx), stats: schedulerStats{ activeJobs: activeJobsGauge, @@ -174,7 +196,7 @@ var ErrAlreadyStopped = errors.New("attempted to add job to already stopped sche // Add adds the given TaskFunc to the current scheduler. Will return an error if the scheduler // is done. -func (s *Scheduler) Add(sched Schedule, id string, entrypoint TaskFunc) (removeFn context.CancelFunc, err error) { +func (s *Scheduler) Add(sched Schedule, id string, entrypoint TaskFunc, jobType string) (removeFn context.CancelFunc, err error) { if s.state.Load() == stateStopped { return nil, ErrAlreadyStopped } @@ -195,7 +217,7 @@ func (s *Scheduler) Add(sched Schedule, id string, entrypoint TaskFunc) (removeF default: } s.stats.activeJobs.Inc() - lastRanAt = s.runRecursiveJob(jobCtx, entrypoint) + lastRanAt = s.runRecursiveJob(jobCtx, entrypoint, jobType) s.stats.activeJobs.Dec() s.runOnce(sched.Next(lastRanAt), taskFn) debugf("Job '%v' returned at %v", id, time.Now()) @@ -233,10 +255,10 @@ func (s *Scheduler) runOnce(runAt time.Time, taskFn timerqueue.TimerTaskFn) { // runRecursiveJob runs the entry point for a job, blocking until all subtasks are completed. // Subtasks are run in separate goroutines. // returns the time execution began on its first task -func (s *Scheduler) runRecursiveJob(jobCtx context.Context, task TaskFunc) (startedAt time.Time) { +func (s *Scheduler) runRecursiveJob(jobCtx context.Context, task TaskFunc, jobType string) (startedAt time.Time) { wg := &sync.WaitGroup{} wg.Add(1) - startedAt = s.runRecursiveTask(jobCtx, task, wg) + startedAt = s.runRecursiveTask(jobCtx, task, wg, jobType) wg.Wait() return startedAt } @@ -245,8 +267,13 @@ func (s *Scheduler) runRecursiveJob(jobCtx context.Context, task TaskFunc) (star // Since task funcs can emit continuations recursively we need a function to execute // recursively. // The wait group passed into this function expects to already have its count incremented by one. -func (s *Scheduler) runRecursiveTask(jobCtx context.Context, task TaskFunc, wg *sync.WaitGroup) (startedAt time.Time) { +func (s *Scheduler) runRecursiveTask(jobCtx context.Context, task TaskFunc, wg *sync.WaitGroup, jobType string) (startedAt time.Time) { defer wg.Done() + jobSem := s.jobLimitSem[jobType] + jobSemErr := jobSem.Acquire(jobCtx, 1) + if jobSemErr == nil { + defer jobSem.Release(1) + } // The accounting for waiting/active tasks is done using atomics. // Absolute accuracy is not critical here so the gap between modifying waitingTasks and activeJobs is acceptable. @@ -280,7 +307,7 @@ func (s *Scheduler) runRecursiveTask(jobCtx context.Context, task TaskFunc, wg * for _, cont := range continuations { // Run continuations in parallel, note that these each will acquire their own slots // We can discard the started at times for continuations as those are irrelevant - go s.runRecursiveTask(jobCtx, cont, wg) + go s.runRecursiveTask(jobCtx, cont, wg, jobType) } } diff --git a/heartbeat/scheduler/scheduler_test.go b/heartbeat/scheduler/scheduler_test.go index 6567dab2beb..e74b658a86c 100644 --- a/heartbeat/scheduler/scheduler_test.go +++ b/heartbeat/scheduler/scheduler_test.go @@ -50,7 +50,7 @@ func TestNew(t *testing.T) { } func TestNewWithLocation(t *testing.T) { - scheduler := NewWithLocation(123, monitoring.NewRegistry(), tarawaTime()) + scheduler := NewWithLocation(123, monitoring.NewRegistry(), tarawaTime(), nil) assert.Equal(t, int64(123), scheduler.limit) assert.Equal(t, tarawaTime(), scheduler.location) } @@ -85,7 +85,7 @@ func testTaskTimes(limit uint32, fn TaskFunc) TaskFunc { func TestScheduler_Start(t *testing.T) { // We use tarawa runAt because it could expose some weird runAt math if by accident some code // relied on the local TZ. - s := NewWithLocation(10, monitoring.NewRegistry(), tarawaTime()) + s := NewWithLocation(10, monitoring.NewRegistry(), tarawaTime(), nil) defer s.Stop() executed := make(chan string) @@ -98,7 +98,7 @@ func TestScheduler_Start(t *testing.T) { return nil } return []TaskFunc{cont} - })) + }), "http") removedEvents := uint32(1) // This function will be removed after being invoked once @@ -113,7 +113,7 @@ func TestScheduler_Start(t *testing.T) { } // Attempt to execute this twice to see if remove() had any effect removeMtx.Lock() - remove, err := s.Add(testSchedule{}, "removed", testTaskTimes(removedEvents+1, testFn)) + remove, err := s.Add(testSchedule{}, "removed", testTaskTimes(removedEvents+1, testFn), "http") require.NoError(t, err) require.NotNil(t, remove) removeMtx.Unlock() @@ -128,7 +128,7 @@ func TestScheduler_Start(t *testing.T) { return nil } return []TaskFunc{cont} - })) + }), "http") received := make([]string, 0) // We test for a good number of events in this loop because we want to ensure that the remove() took effect @@ -160,7 +160,7 @@ func TestScheduler_Start(t *testing.T) { } func TestScheduler_Stop(t *testing.T) { - s := NewWithLocation(10, monitoring.NewRegistry(), tarawaTime()) + s := NewWithLocation(10, monitoring.NewRegistry(), tarawaTime(), nil) executed := make(chan struct{}) @@ -170,7 +170,7 @@ func TestScheduler_Stop(t *testing.T) { _, err := s.Add(testSchedule{}, "testPostStop", testTaskTimes(1, func(_ context.Context) []TaskFunc { executed <- struct{}{} return nil - })) + }), "http") assert.Equal(t, ErrAlreadyStopped, err) } @@ -208,7 +208,7 @@ func TestScheduler_runRecursiveTask(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { limit := int64(100) - s := NewWithLocation(limit, monitoring.NewRegistry(), tarawaTime()) + s := NewWithLocation(limit, monitoring.NewRegistry(), tarawaTime(), nil) if testCase.overLimit { s.limitSem.Acquire(context.Background(), limit) @@ -224,7 +224,7 @@ func TestScheduler_runRecursiveTask(t *testing.T) { } beforeStart := time.Now() - startedAt := s.runRecursiveTask(testCase.jobCtx, tf, wg) + startedAt := s.runRecursiveTask(testCase.jobCtx, tf, wg, "http") // This will panic in the case where we don't check s.limitSem.Acquire // for an error value and released an unacquired resource in scheduler.go. @@ -241,7 +241,7 @@ func TestScheduler_runRecursiveTask(t *testing.T) { } func BenchmarkScheduler(b *testing.B) { - s := NewWithLocation(0, monitoring.NewRegistry(), tarawaTime()) + s := NewWithLocation(0, monitoring.NewRegistry(), tarawaTime(), nil) sched := testSchedule{0} @@ -250,7 +250,7 @@ func BenchmarkScheduler(b *testing.B) { _, err := s.Add(sched, "testPostStop", func(_ context.Context) []TaskFunc { executed <- struct{}{} return nil - }) + }, "http") assert.NoError(b, err) } @@ -260,9 +260,7 @@ func BenchmarkScheduler(b *testing.B) { count := 0 for count < b.N { - select { - case <-executed: - count++ - } + <-executed + count++ } } From 5278194174e53d11b517a45e578cebd0b17aa991 Mon Sep 17 00:00:00 2001 From: vigneshshanmugam Date: Fri, 30 Jul 2021 15:59:01 -0700 Subject: [PATCH 2/6] handle release and add tests --- heartbeat/scheduler/scheduler.go | 19 ++++--- heartbeat/scheduler/scheduler_test.go | 81 +++++++++++++++++++++++++-- 2 files changed, 87 insertions(+), 13 deletions(-) diff --git a/heartbeat/scheduler/scheduler.go b/heartbeat/scheduler/scheduler.go index ddcf19b0d80..1f18ae4a5f2 100644 --- a/heartbeat/scheduler/scheduler.go +++ b/heartbeat/scheduler/scheduler.go @@ -257,8 +257,10 @@ func (s *Scheduler) runOnce(runAt time.Time, taskFn timerqueue.TimerTaskFn) { // returns the time execution began on its first task func (s *Scheduler) runRecursiveJob(jobCtx context.Context, task TaskFunc, jobType string) (startedAt time.Time) { wg := &sync.WaitGroup{} + jobSem := s.jobLimitSem[jobType] + jobSem.Acquire(jobCtx, 1) wg.Add(1) - startedAt = s.runRecursiveTask(jobCtx, task, wg, jobType) + startedAt = s.runRecursiveTask(jobCtx, task, wg, jobSem) wg.Wait() return startedAt } @@ -267,13 +269,8 @@ func (s *Scheduler) runRecursiveJob(jobCtx context.Context, task TaskFunc, jobTy // Since task funcs can emit continuations recursively we need a function to execute // recursively. // The wait group passed into this function expects to already have its count incremented by one. -func (s *Scheduler) runRecursiveTask(jobCtx context.Context, task TaskFunc, wg *sync.WaitGroup, jobType string) (startedAt time.Time) { +func (s *Scheduler) runRecursiveTask(jobCtx context.Context, task TaskFunc, wg *sync.WaitGroup, jobSem *semaphore.Weighted) (startedAt time.Time) { defer wg.Done() - jobSem := s.jobLimitSem[jobType] - jobSemErr := jobSem.Acquire(jobCtx, 1) - if jobSemErr == nil { - defer jobSem.Release(1) - } // The accounting for waiting/active tasks is done using atomics. // Absolute accuracy is not critical here so the gap between modifying waitingTasks and activeJobs is acceptable. @@ -306,8 +303,12 @@ func (s *Scheduler) runRecursiveTask(jobCtx context.Context, task TaskFunc, wg * wg.Add(len(continuations)) for _, cont := range continuations { // Run continuations in parallel, note that these each will acquire their own slots - // We can discard the started at times for continuations as those are irrelevant - go s.runRecursiveTask(jobCtx, cont, wg, jobType) + // We can discard the started at times for continuations as those are + // irrelevant + go s.runRecursiveTask(jobCtx, cont, wg, jobSem) + } + if len(continuations) == 0 { + jobSem.Release(1) } } diff --git a/heartbeat/scheduler/scheduler_test.go b/heartbeat/scheduler/scheduler_test.go index e74b658a86c..2b73b647d20 100644 --- a/heartbeat/scheduler/scheduler_test.go +++ b/heartbeat/scheduler/scheduler_test.go @@ -20,16 +20,18 @@ package scheduler import ( "context" "fmt" + "math" "sync" "sync/atomic" "testing" "time" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - + "github.com/elastic/beats/v7/heartbeat/config" batomic "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/monitoring" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/sync/semaphore" ) // The runAt in the island of tarawa 🏝. Good test TZ because it's pretty rare for a local box @@ -210,6 +212,9 @@ func TestScheduler_runRecursiveTask(t *testing.T) { limit := int64(100) s := NewWithLocation(limit, monitoring.NewRegistry(), tarawaTime(), nil) + jobSem := semaphore.NewWeighted(math.MaxInt64) + jobSem.Acquire(context.Background(), 1) + if testCase.overLimit { s.limitSem.Acquire(context.Background(), limit) } @@ -224,7 +229,7 @@ func TestScheduler_runRecursiveTask(t *testing.T) { } beforeStart := time.Now() - startedAt := s.runRecursiveTask(testCase.jobCtx, tf, wg, "http") + startedAt := s.runRecursiveTask(testCase.jobCtx, tf, wg, jobSem) // This will panic in the case where we don't check s.limitSem.Acquire // for an error value and released an unacquired resource in scheduler.go. @@ -240,6 +245,74 @@ func TestScheduler_runRecursiveTask(t *testing.T) { } } +func makeTasks(num int, callback func()) TaskFunc { + return func(ctx context.Context) []TaskFunc { + callback() + if num < 1 { + return nil + } + return []TaskFunc{makeTasks(num-1, callback)} + } +} + +func TestScheduler_runRecursiveJob(t *testing.T) { + tests := []struct { + name string + numJobs int + limit int64 + expect func(events []int) + }{ + { + name: "runs more than 1 with limit of 1", + numJobs: 2, + limit: 1, + expect: func(events []int) { + mid := len(events) / 2 + firstHalf := events[0:mid] + lastHalf := events[mid:] + for _, ele := range firstHalf { + assert.Equal(t, firstHalf[0], ele) + } + for _, ele := range lastHalf { + assert.Equal(t, lastHalf[0], ele) + } + }, + }, + { + name: "runs 50 interleaved without limit", + numJobs: 50, + limit: math.MaxInt64, + expect: func(events []int) { + require.GreaterOrEqual(t, len(events), 200) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + jobType := "http" + jobConfigByType := map[string]config.JobLimit{ + jobType: {Limit: tt.limit}, + } + s := NewWithLocation(math.MaxInt64, monitoring.NewRegistry(), tarawaTime(), jobConfigByType) + var taskArr []int + wg := sync.WaitGroup{} + wg.Add(tt.numJobs) + for i := 0; i < tt.numJobs; i++ { + num := i + tf := makeTasks(4, func() { + taskArr = append(taskArr, num) + }) + go func(tff TaskFunc) { + s.runRecursiveJob(context.Background(), tff, jobType) + wg.Done() + }(tf) + } + wg.Wait() + tt.expect(taskArr) + }) + } +} + func BenchmarkScheduler(b *testing.B) { s := NewWithLocation(0, monitoring.NewRegistry(), tarawaTime(), nil) From 4092f62608498bb7c6ab6dbf1d42d5c73c054c10 Mon Sep 17 00:00:00 2001 From: vigneshshanmugam Date: Fri, 30 Jul 2021 16:56:01 -0700 Subject: [PATCH 3/6] fix linting --- heartbeat/scheduler/scheduler_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/heartbeat/scheduler/scheduler_test.go b/heartbeat/scheduler/scheduler_test.go index 2b73b647d20..a63ad2c6483 100644 --- a/heartbeat/scheduler/scheduler_test.go +++ b/heartbeat/scheduler/scheduler_test.go @@ -26,12 +26,13 @@ import ( "testing" "time" - "github.com/elastic/beats/v7/heartbeat/config" - batomic "github.com/elastic/beats/v7/libbeat/common/atomic" - "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/sync/semaphore" + + "github.com/elastic/beats/v7/heartbeat/config" + batomic "github.com/elastic/beats/v7/libbeat/common/atomic" + "github.com/elastic/beats/v7/libbeat/monitoring" ) // The runAt in the island of tarawa 🏝. Good test TZ because it's pretty rare for a local box From 9d39e9cecbff9e974d137d057e5801ef4a7d2924 Mon Sep 17 00:00:00 2001 From: vigneshshanmugam Date: Mon, 2 Aug 2021 16:14:07 -0700 Subject: [PATCH 4/6] add guard aganist nil in tests --- heartbeat/scheduler/scheduler.go | 6 ++++-- heartbeat/scheduler/scheduler_test.go | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/heartbeat/scheduler/scheduler.go b/heartbeat/scheduler/scheduler.go index 1f18ae4a5f2..51f87134882 100644 --- a/heartbeat/scheduler/scheduler.go +++ b/heartbeat/scheduler/scheduler.go @@ -258,7 +258,9 @@ func (s *Scheduler) runOnce(runAt time.Time, taskFn timerqueue.TimerTaskFn) { func (s *Scheduler) runRecursiveJob(jobCtx context.Context, task TaskFunc, jobType string) (startedAt time.Time) { wg := &sync.WaitGroup{} jobSem := s.jobLimitSem[jobType] - jobSem.Acquire(jobCtx, 1) + if jobSem != nil { + jobSem.Acquire(jobCtx, 1) + } wg.Add(1) startedAt = s.runRecursiveTask(jobCtx, task, wg, jobSem) wg.Wait() @@ -307,7 +309,7 @@ func (s *Scheduler) runRecursiveTask(jobCtx context.Context, task TaskFunc, wg * // irrelevant go s.runRecursiveTask(jobCtx, cont, wg, jobSem) } - if len(continuations) == 0 { + if jobSem != nil && len(continuations) == 0 { jobSem.Release(1) } } diff --git a/heartbeat/scheduler/scheduler_test.go b/heartbeat/scheduler/scheduler_test.go index a63ad2c6483..e1b52a35e37 100644 --- a/heartbeat/scheduler/scheduler_test.go +++ b/heartbeat/scheduler/scheduler_test.go @@ -284,7 +284,7 @@ func TestScheduler_runRecursiveJob(t *testing.T) { numJobs: 50, limit: math.MaxInt64, expect: func(events []int) { - require.GreaterOrEqual(t, len(events), 200) + require.GreaterOrEqual(t, len(events), 50) }, }, } From 50a312a1b42a29ee59a63cc24a6466dae66c9e1a Mon Sep 17 00:00:00 2001 From: vigneshshanmugam Date: Mon, 9 Aug 2021 10:28:46 -0700 Subject: [PATCH 5/6] address review comments --- heartbeat/scheduler/scheduler.go | 14 ++------------ heartbeat/scheduler/scheduler_test.go | 6 +----- 2 files changed, 3 insertions(+), 17 deletions(-) diff --git a/heartbeat/scheduler/scheduler.go b/heartbeat/scheduler/scheduler.go index 51f87134882..55731dd7d11 100644 --- a/heartbeat/scheduler/scheduler.go +++ b/heartbeat/scheduler/scheduler.go @@ -80,22 +80,12 @@ type Schedule interface { } func getJobLimitSem(jobLimitByType map[string]config.JobLimit) map[string]*semaphore.Weighted { - jc := map[string]config.JobLimit{ - "http": {Limit: math.MaxInt64}, - "icmp": {Limit: math.MaxInt64}, - "tcp": {Limit: math.MaxInt64}, - "browser": {Limit: math.MaxInt64}, - } + jobLimitSem := map[string]*semaphore.Weighted{} for jobType, jobLimit := range jobLimitByType { if jobLimit.Limit > 0 { - jc[jobType] = jobLimit + jobLimitSem[jobType] = semaphore.NewWeighted(jobLimit.Limit) } } - - jobLimitSem := map[string]*semaphore.Weighted{} - for jobType, jobLimit := range jc { - jobLimitSem[jobType] = semaphore.NewWeighted(jobLimit.Limit) - } return jobLimitSem } diff --git a/heartbeat/scheduler/scheduler_test.go b/heartbeat/scheduler/scheduler_test.go index e1b52a35e37..67e76a40331 100644 --- a/heartbeat/scheduler/scheduler_test.go +++ b/heartbeat/scheduler/scheduler_test.go @@ -28,7 +28,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "golang.org/x/sync/semaphore" "github.com/elastic/beats/v7/heartbeat/config" batomic "github.com/elastic/beats/v7/libbeat/common/atomic" @@ -213,9 +212,6 @@ func TestScheduler_runRecursiveTask(t *testing.T) { limit := int64(100) s := NewWithLocation(limit, monitoring.NewRegistry(), tarawaTime(), nil) - jobSem := semaphore.NewWeighted(math.MaxInt64) - jobSem.Acquire(context.Background(), 1) - if testCase.overLimit { s.limitSem.Acquire(context.Background(), limit) } @@ -230,7 +226,7 @@ func TestScheduler_runRecursiveTask(t *testing.T) { } beforeStart := time.Now() - startedAt := s.runRecursiveTask(testCase.jobCtx, tf, wg, jobSem) + startedAt := s.runRecursiveTask(testCase.jobCtx, tf, wg, nil) // This will panic in the case where we don't check s.limitSem.Acquire // for an error value and released an unacquired resource in scheduler.go. From 10b830647537d05d87850dc7fae5453ea61733c2 Mon Sep 17 00:00:00 2001 From: vigneshshanmugam Date: Tue, 10 Aug 2021 07:46:17 -0700 Subject: [PATCH 6/6] add test when limit is not specified --- heartbeat/scheduler/scheduler_test.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/heartbeat/scheduler/scheduler_test.go b/heartbeat/scheduler/scheduler_test.go index 67e76a40331..0b699c2a778 100644 --- a/heartbeat/scheduler/scheduler_test.go +++ b/heartbeat/scheduler/scheduler_test.go @@ -283,12 +283,23 @@ func TestScheduler_runRecursiveJob(t *testing.T) { require.GreaterOrEqual(t, len(events), 50) }, }, + { + name: "runs 100 with limit not configured", + numJobs: 100, + limit: 0, + expect: func(events []int) { + require.GreaterOrEqual(t, len(events), 100) + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + var jobConfigByType = map[string]config.JobLimit{} jobType := "http" - jobConfigByType := map[string]config.JobLimit{ - jobType: {Limit: tt.limit}, + if tt.limit > 0 { + jobConfigByType = map[string]config.JobLimit{ + jobType: {Limit: tt.limit}, + } } s := NewWithLocation(math.MaxInt64, monitoring.NewRegistry(), tarawaTime(), jobConfigByType) var taskArr []int