From de2b2be5b0f6ac3ad6e8da6519c8c6641f5beab2 Mon Sep 17 00:00:00 2001 From: Kostiantyn Masliuk <1pkg@protonmail.com> Date: Wed, 17 Feb 2021 22:54:20 +0100 Subject: [PATCH] time thr first acq starts the loop guarantee --- executors.go | 7 +++++++ throttlers.go | 12 +++++++----- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/executors.go b/executors.go index 98fc3ac..1a0f86b 100644 --- a/executors.go +++ b/executors.go @@ -114,6 +114,13 @@ func once(run Runnable) Runnable { } } +func async(run Runnable) Runnable { + return func(ctx context.Context) (err error) { + gorun(ctx, run) + return nil + } +} + func all(runs ...Runnable) Runnable { return func(ctx context.Context) error { var once sync.Once diff --git a/throttlers.go b/throttlers.go index d4f1c89..fa59f24 100644 --- a/throttlers.go +++ b/throttlers.go @@ -461,17 +461,19 @@ func NewThrottlerTimed(threshold uint64, interval time.Duration, quantum time.Du } thr := ttimed{tafter: tafter} thr.loop = once( - loop(window, func(ctx context.Context) error { - atomicBSub(&thr.current, delta) - return ctx.Err() - }), + async( + loop(window, func(ctx context.Context) error { + atomicBSub(&thr.current, delta) + return ctx.Err() + }), + ), ) return thr } func (thr ttimed) Acquire(ctx context.Context) error { // start loop on first acquire - gorun(ctx, thr.loop) + _ = thr.loop(ctx) err := thr.tafter.Acquire(ctx) if current := atomicGet(&thr.current); current > thr.threshold { atomicSet(&thr.current, thr.threshold)