diff --git a/pkg/timer/runtime/runtime.go b/pkg/timer/runtime/runtime.go index 9fb2620e20b2d..fef59be8e5cd2 100644 --- a/pkg/timer/runtime/runtime.go +++ b/pkg/timer/runtime/runtime.go @@ -19,6 +19,7 @@ import ( "encoding/hex" "fmt" "maps" + "slices" "sync" "time" @@ -252,9 +253,7 @@ func (rt *TimerGroupRuntime) fullRefreshTimers() { func (rt *TimerGroupRuntime) tryTriggerTimerEvents() { now := rt.nowFunc() - var retryTimerIDs []string - var retryTimerKeys []string - var busyWorkers map[string]struct{} + var readyTimers []*timerCacheItem rt.cache.iterTryTriggerTimers(func(timer *api.TimerRecord, tryTriggerTime time.Time, nextEventTime *time.Time) bool { if tryTriggerTime.After(now) { return false @@ -264,9 +263,45 @@ func (rt *TimerGroupRuntime) tryTriggerTimerEvents() { return true } + if readyTimers == nil { + readyTimers = make([]*timerCacheItem, 0, 8) + } + + readyTimers = append(readyTimers, &timerCacheItem{ + timer: timer, + nextEventTime: nextEventTime, + }) + return true + }) + + if len(readyTimers) == 0 { + return + } + + // resort timer to make sure the timer has the smallest nextEventTime has a higher priority to trigger + slices.SortFunc(readyTimers, func(a, b *timerCacheItem) int { + if a.nextEventTime == nil || b.nextEventTime == nil { + if a.nextEventTime != nil { + return -1 + } + + if b.nextEventTime != nil { + return 1 + } + + return 0 + } + return a.nextEventTime.Compare(*b.nextEventTime) + }) + + var retryTimerIDs []string + var retryTimerKeys []string + var busyWorkers map[string]struct{} + for i, item := range readyTimers { + timer := item.timer worker, ok := rt.ensureWorker(timer.HookClass) if !ok { - return true + continue } eventID := timer.EventID @@ -284,20 +319,22 @@ func (rt *TimerGroupRuntime) tryTriggerTimerEvents() { select { case <-rt.ctx.Done(): - return false + return case worker.ch <- req: rt.cache.setTimerProcStatus(timer.ID, procTriggering, eventID) default: if busyWorkers == nil { - busyWorkers = make(map[string]struct{}) + busySize := len(readyTimers) - i + retryTimerIDs = make([]string, 0, busySize) + retryTimerKeys = make([]string, 0, busySize) + busyWorkers = make(map[string]struct{}, busySize) } busyWorkers[timer.HookClass] = struct{}{} retryTimerIDs = append(retryTimerIDs, timer.ID) retryTimerKeys = append(retryTimerKeys, fmt.Sprintf("[%s] %s", timer.Namespace, timer.Key)) } - return true - }) + } if len(retryTimerIDs) > 0 { busyWorkerList := make([]string, 0, len(busyWorkers))