Skip to content

Commit

Permalink
time thr first acq starts the loop guarantee
Browse files Browse the repository at this point in the history
  • Loading branch information
1pkg committed Feb 17, 2021
1 parent ce66d76 commit de2b2be
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 5 deletions.
7 changes: 7 additions & 0 deletions executors.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ func once(run Runnable) Runnable {
}
}

func async(run Runnable) Runnable {
return func(ctx context.Context) (err error) {
gorun(ctx, run)
return nil
}
}

func all(runs ...Runnable) Runnable {
return func(ctx context.Context) error {
var once sync.Once
Expand Down
12 changes: 7 additions & 5 deletions throttlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,17 +461,19 @@ func NewThrottlerTimed(threshold uint64, interval time.Duration, quantum time.Du
}
thr := ttimed{tafter: tafter}
thr.loop = once(
loop(window, func(ctx context.Context) error {
atomicBSub(&thr.current, delta)
return ctx.Err()
}),
async(
loop(window, func(ctx context.Context) error {
atomicBSub(&thr.current, delta)
return ctx.Err()
}),
),
)
return thr
}

func (thr ttimed) Acquire(ctx context.Context) error {
// start loop on first acquire
gorun(ctx, thr.loop)
_ = thr.loop(ctx)
err := thr.tafter.Acquire(ctx)
if current := atomicGet(&thr.current); current > thr.threshold {
atomicSet(&thr.current, thr.threshold)
Expand Down

0 comments on commit de2b2be

Please sign in to comment.