diff --git a/pkg/kv/kvserver/replica_rangefeed_test.go b/pkg/kv/kvserver/replica_rangefeed_test.go index 27da1fe1d3a1..50ed7f86bdfa 100644 --- a/pkg/kv/kvserver/replica_rangefeed_test.go +++ b/pkg/kv/kvserver/replica_rangefeed_test.go @@ -155,8 +155,9 @@ func TestReplicaRangefeed(t *testing.T) { Span: rangefeedSpan, WithDiff: true, } - pErr := store.RangeFeed(&req, stream) - streamErrC <- pErr + timer := time.AfterFunc(10*time.Second, stream.Cancel) + defer timer.Stop() + streamErrC <- store.RangeFeed(&req, stream) }(i) } @@ -501,8 +502,9 @@ func TestReplicaRangefeedRetryErrors(t *testing.T) { }, Span: rangefeedSpan, } - pErr := store.RangeFeed(&req, stream) - streamErrC <- pErr + timer := time.AfterFunc(10*time.Second, stream.Cancel) + defer timer.Stop() + streamErrC <- store.RangeFeed(&req, stream) }() // Wait for the first checkpoint event. @@ -535,6 +537,8 @@ func TestReplicaRangefeedRetryErrors(t *testing.T) { }, Span: rangefeedSpan, } + timer := time.AfterFunc(10*time.Second, stream.Cancel) + defer timer.Stop() streamErrC <- store.RangeFeed(&req, stream) }() @@ -577,9 +581,9 @@ func TestReplicaRangefeedRetryErrors(t *testing.T) { }, Span: rangefeedLeftSpan, } - - pErr := store.RangeFeed(&req, streamLeft) - streamLeftErrC <- pErr + timer := time.AfterFunc(10*time.Second, streamLeft.Cancel) + defer timer.Stop() + streamLeftErrC <- store.RangeFeed(&req, streamLeft) }() // Establish a rangefeed on the right replica. @@ -593,9 +597,9 @@ func TestReplicaRangefeedRetryErrors(t *testing.T) { }, Span: rangefeedRightSpan, } - - pErr := store.RangeFeed(&req, streamRight) - streamRightErrC <- pErr + timer := time.AfterFunc(10*time.Second, streamRight.Cancel) + defer timer.Stop() + streamRightErrC <- store.RangeFeed(&req, streamRight) }() // Wait for the first checkpoint event on each stream. @@ -647,12 +651,9 @@ func TestReplicaRangefeedRetryErrors(t *testing.T) { }, Span: rangefeedSpan, } - timer := time.AfterFunc(10*time.Second, stream.Cancel) defer timer.Stop() - - pErr := partitionStore.RangeFeed(&req, stream) - streamErrC <- pErr + streamErrC <- partitionStore.RangeFeed(&req, stream) }() // Wait for the first checkpoint event. @@ -757,8 +758,9 @@ func TestReplicaRangefeedRetryErrors(t *testing.T) { Span: rangefeedSpan, } kvserver.RangefeedEnabled.Override(ctx, &store.ClusterSettings().SV, true) - pErr := store.RangeFeed(&req, stream) - streamErrC <- pErr + timer := time.AfterFunc(10*time.Second, stream.Cancel) + defer timer.Stop() + streamErrC <- store.RangeFeed(&req, stream) }() // Wait for the first checkpoint event.