diff --git a/pkg/timer/runtime/BUILD.bazel b/pkg/timer/runtime/BUILD.bazel index dabc8b21e867c..07b989758e377 100644 --- a/pkg/timer/runtime/BUILD.bazel +++ b/pkg/timer/runtime/BUILD.bazel @@ -34,7 +34,7 @@ go_test( embed = [":runtime"], flaky = True, race = "on", - shard_count = 23, + shard_count = 24, deps = [ "//pkg/testkit/testsetup", "//pkg/timer/api", diff --git a/pkg/timer/runtime/runtime.go b/pkg/timer/runtime/runtime.go index 0164df4ca6359..5896adc1c0f8d 100644 --- a/pkg/timer/runtime/runtime.go +++ b/pkg/timer/runtime/runtime.go @@ -19,6 +19,7 @@ import ( "encoding/hex" "fmt" "maps" + "slices" "sync" "time" @@ -241,9 +242,7 @@ func (rt *TimerGroupRuntime) fullRefreshTimers() { func (rt *TimerGroupRuntime) tryTriggerTimerEvents() { now := rt.nowFunc() - var retryTimerIDs []string - var retryTimerKeys []string - var busyWorkers map[string]struct{} + var readyTimers []*timerCacheItem rt.cache.iterTryTriggerTimers(func(timer *api.TimerRecord, tryTriggerTime time.Time, nextEventTime *time.Time) bool { if tryTriggerTime.After(now) { return false @@ -253,9 +252,45 @@ func (rt *TimerGroupRuntime) tryTriggerTimerEvents() { return true } + if readyTimers == nil { + readyTimers = make([]*timerCacheItem, 0, 8) + } + + readyTimers = append(readyTimers, &timerCacheItem{ + timer: timer, + nextEventTime: nextEventTime, + }) + return true + }) + + if len(readyTimers) == 0 { + return + } + + // resort timer to make sure the timer has the smallest nextEventTime has a higher priority to trigger + slices.SortFunc(readyTimers, func(a, b *timerCacheItem) int { + if a.nextEventTime == nil || b.nextEventTime == nil { + if a.nextEventTime != nil { + return 1 + } + + if b.nextEventTime != nil { + return -1 + } + + return 0 + } + return a.nextEventTime.Compare(*b.nextEventTime) + }) + + var retryTimerIDs []string + var retryTimerKeys []string + var busyWorkers map[string]struct{} + for i, item := range readyTimers { + timer := item.timer worker, ok := rt.ensureWorker(timer.HookClass) if !ok { - return true + continue } eventID := timer.EventID @@ -273,20 +308,22 @@ func (rt *TimerGroupRuntime) tryTriggerTimerEvents() { select { case <-rt.ctx.Done(): - return false + return case worker.ch <- req: rt.cache.setTimerProcStatus(timer.ID, procTriggering, eventID) default: if busyWorkers == nil { - busyWorkers = make(map[string]struct{}) + busySize := len(readyTimers) - i + retryTimerIDs = make([]string, 0, busySize) + retryTimerKeys = make([]string, 0, busySize) + busyWorkers = make(map[string]struct{}, busySize) } busyWorkers[timer.HookClass] = struct{}{} retryTimerIDs = append(retryTimerIDs, timer.ID) retryTimerKeys = append(retryTimerKeys, fmt.Sprintf("[%s] %s", timer.Namespace, timer.Key)) } - return true - }) + } if len(retryTimerIDs) > 0 { busyWorkerList := make([]string, 0, len(busyWorkers)) diff --git a/pkg/timer/runtime/runtime_test.go b/pkg/timer/runtime/runtime_test.go index 590208f7fefe2..b4656ba403404 100644 --- a/pkg/timer/runtime/runtime_test.go +++ b/pkg/timer/runtime/runtime_test.go @@ -264,6 +264,51 @@ func TestTryTriggerTimer(t *testing.T) { consumeAndVerify(t3) } +func TestTryTriggerTimePriority(t *testing.T) { + now := time.Now() + store := api.NewMemoryTimerStore() + defer store.Close() + runtime := NewTimerRuntimeBuilder("g1", store).Build() + runtime.setNowFunc(func() time.Time { + return now + }) + runtime.initCtx() + ch := make(chan *triggerEventRequest, 2) + runtime.workers["hook1"] = &hookWorker{ch: ch} + + t1 := newTestTimer("t1", "1m", now.Add(-time.Hour)) + runtime.cache.updateTimer(t1) + runtime.cache.updateNextTryTriggerTime(t1.ID, now.Add(-3*time.Minute)) + + t2 := newTestTimer("t2", "1m", now.Add(-2*time.Hour)) + runtime.cache.updateTimer(t2) + runtime.cache.updateNextTryTriggerTime(t2.ID, now.Add(-2*time.Minute)) + + t3 := newTestTimer("t3", "1h", now) + t3.EventStatus = api.SchedEventTrigger + t3.EventID = "event2" + t3.EventStart = now.Add(-time.Minute) + t3.Enable = false + runtime.cache.updateTimer(t3) + + t4 := newTestTimer("t4", "1m", now.Add(-10*time.Hour)) + runtime.cache.updateTimer(t4) + runtime.cache.updateNextTryTriggerTime(t4.ID, now.Add(time.Minute)) + + // nextEventTime: t3 (nil) < t4 < t2 < t1 + // nextTryTriggerTime: t1 < t2 < t3 (eventStart) < t4 + // we should test the priority trigger is ordered by `nextEventTime` because to ensure the timer who has a max + // delay time will be triggered first. + // t4 should not be scheduled for the next trigger time is after now. + // so, t3 and t2 will be triggered when the capacity of chan is 2 + runtime.tryTriggerTimerEvents() + require.Equal(t, procTriggering, runtime.cache.items[t2.ID].procStatus) + require.Equal(t, procTriggering, runtime.cache.items[t3.ID].procStatus) + // t1, t4 should keep not triggered + require.Equal(t, procIdle, runtime.cache.items[t1.ID].procStatus) + require.Equal(t, procIdle, runtime.cache.items[t4.ID].procStatus) +} + func TestHandleHookWorkerResponse(t *testing.T) { now := time.Now() store := api.NewMemoryTimerStore()