From 732953db60bbc2fa2f266c1a9508dbb521410435 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Mon, 10 Jul 2023 11:51:46 -0700 Subject: [PATCH] transport integration tests: make TestMoreStreamsThanOurLimits less flaky (#2410) * Make this test less flaky * Actually use sawFirstErr --- p2p/test/transport/transport_test.go | 33 +++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/p2p/test/transport/transport_test.go b/p2p/test/transport/transport_test.go index 513d2d9b2b..e960806444 100644 --- a/p2p/test/transport/transport_test.go +++ b/p2p/test/transport/transport_test.go @@ -368,10 +368,15 @@ func TestMoreStreamsThanOurLimits(t *testing.T) { })) var handledStreams atomic.Int32 + var sawFirstErr atomic.Bool + + semaphore := make(chan struct{}, streamCount) + // Start with a single stream at a time. If that works, we'll increase the number of concurrent streams. + semaphore <- struct{}{} + listener.SetStreamHandler("echo", func(s network.Stream) { io.Copy(s, s) s.Close() - handledStreams.Add(1) }) wg := sync.WaitGroup{} @@ -380,14 +385,31 @@ func TestMoreStreamsThanOurLimits(t *testing.T) { var completedStreams atomic.Int32 for i := 0; i < streamCount; i++ { go func() { + <-semaphore + var didErr bool defer wg.Done() defer completedStreams.Add(1) + defer func() { + select { + case semaphore <- struct{}{}: + default: + } + if !didErr && !sawFirstErr.Load() { + // No error! We can add one more stream to our concurrency limit. + select { + case semaphore <- struct{}{}: + default: + } + } + }() var s network.Stream var err error // maxRetries is an arbitrary retry amount if there's any error. maxRetries := streamCount * 4 shouldRetry := func(err error) bool { + didErr = true + sawFirstErr.Store(true) maxRetries-- if maxRetries == 0 || len(errCh) > 0 { select { @@ -426,14 +448,13 @@ func TestMoreStreamsThanOurLimits(t *testing.T) { if !bytes.Equal(b, []byte("hello")) { return errors.New("received data does not match sent data") } + handledStreams.Add(1) return nil }(s) - if err != nil { - if shouldRetry(err) { - time.Sleep(50 * time.Millisecond) - continue - } + if err != nil && shouldRetry(err) { + time.Sleep(50 * time.Millisecond) + continue } return }