diff --git a/util/flightcontrol/flightcontrol.go b/util/flightcontrol/flightcontrol.go index fc9f7272a42b..201dbc58b4bb 100644 --- a/util/flightcontrol/flightcontrol.go +++ b/util/flightcontrol/flightcontrol.go @@ -39,7 +39,7 @@ func (g *Group) Do(ctx context.Context, key string, fn func(ctx context.Context) return v, err } // backoff logic - if backoff >= 3*time.Second { + if backoff >= 15*time.Second { err = errors.Wrapf(errRetryTimeout, "flightcontrol") return v, err } @@ -132,8 +132,16 @@ func (c *call) wait(ctx context.Context) (v interface{}, err error) { select { case <-c.ready: c.mu.Unlock() - <-c.cleaned - return nil, errRetry + if c.err != nil { // on error retry + <-c.cleaned + return nil, errRetry + } + pw, ok, _ := progress.FromContext(ctx) + if ok { + c.progressState.add(pw) + } + return c.result, nil + case <-c.ctx.done: // could return if no error c.mu.Unlock() <-c.cleaned diff --git a/util/flightcontrol/flightcontrol_test.go b/util/flightcontrol/flightcontrol_test.go index bb02386f8e83..634a1e85b4e1 100644 --- a/util/flightcontrol/flightcontrol_test.go +++ b/util/flightcontrol/flightcontrol_test.go @@ -2,6 +2,7 @@ package flightcontrol import ( "context" + "sync" "sync/atomic" "testing" "time" @@ -203,6 +204,29 @@ func TestCancelBoth(t *testing.T) { assert.Equal(t, counter, int64(4)) } +func TestContention(t *testing.T) { + perthread := 1000 + threads := 100 + + wg := sync.WaitGroup{} + wg.Add(threads) + + g := &Group{} + + for i := 0; i < threads; i++ { + for j := 0; j < perthread; j++ { + _, err := g.Do(context.TODO(), "foo", func(ctx context.Context) (interface{}, error) { + time.Sleep(time.Microsecond) + return nil, nil + }) + require.NoError(t, err) + } + wg.Done() + } + + wg.Wait() +} + func testFunc(wait time.Duration, ret string, counter *int64) func(ctx context.Context) (interface{}, error) { return func(ctx context.Context) (interface{}, error) { atomic.AddInt64(counter, 1)