From b514d3d21c723b5e077b6e7ac065e0e19eaae36c Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Tue, 27 Jun 2023 14:03:28 -0400 Subject: [PATCH] kvserver: de-flake TestMergeQueueSeesLearnerOrJointConfig Fixes #105381. It was possible for us to receive client-side store IDs concurrently with the server-side stopper being quiesced, at which point we read the variable storing the client-side store IDs without a mutex (which triggered the data race detector). The data race was benign, but instead of adding a synchronization primitive, we recognize that there was no reason for the store ID handling to happen on separate threads. Release note: None --- pkg/kv/kvserver/raft_transport.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index 3090305ce134..d9ad130a8a65 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -446,18 +446,6 @@ func (t *RaftTransport) RaftMessageBatch(stream MultiRaft_RaftMessageBatchServer taskCtx = t.AnnotateCtx(taskCtx) defer cancel() - var storeIDs []roachpb.StoreID - defer func() { - ctx := t.AnnotateCtx(context.Background()) - t.kvflowControl.mu.Lock() - t.kvflowControl.mu.connectionTracker.markStoresDisconnected(storeIDs) - t.kvflowControl.mu.Unlock() - t.kvflowControl.disconnectListener.OnRaftTransportDisconnected(ctx, storeIDs...) - if fn := t.knobs.OnServerStreamDisconnected; fn != nil { - fn() - } - }() - if err := t.stopper.RunAsyncTaskEx( taskCtx, stop.TaskOpts{ @@ -465,6 +453,18 @@ func (t *RaftTransport) RaftMessageBatch(stream MultiRaft_RaftMessageBatchServer SpanOpt: stop.ChildSpan, }, func(ctx context.Context) { errCh <- func() error { + var storeIDs []roachpb.StoreID + defer func() { + ctx := t.AnnotateCtx(context.Background()) + t.kvflowControl.mu.Lock() + t.kvflowControl.mu.connectionTracker.markStoresDisconnected(storeIDs) + t.kvflowControl.mu.Unlock() + t.kvflowControl.disconnectListener.OnRaftTransportDisconnected(ctx, storeIDs...) + if fn := t.knobs.OnServerStreamDisconnected; fn != nil { + fn() + } + }() + stream := &lockedRaftMessageResponseStream{wrapped: stream} for { batch, err := stream.Recv()