Skip to content

Commit

Permalink
kvserver: de-flake TestMergeQueueSeesLearnerOrJointConfig
Browse files Browse the repository at this point in the history
Fixes #105381. It was possible for us to receive client-side store IDs
concurrently with the server-side stopper being quiesced, at which point
we read the variable storing the client-side store IDs without a mutex
(which triggered the data race detector). The data race was benign, but
instead of adding a synchronization primitive, we recognize that there
was no reason for the store ID handling to happen on separate threads.

Release note: None
  • Loading branch information
irfansharif committed Jun 27, 2023
1 parent 13addf5 commit b514d3d
Showing 1 changed file with 12 additions and 12 deletions.
24 changes: 12 additions & 12 deletions pkg/kv/kvserver/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,25 +446,25 @@ func (t *RaftTransport) RaftMessageBatch(stream MultiRaft_RaftMessageBatchServer
taskCtx = t.AnnotateCtx(taskCtx)
defer cancel()

var storeIDs []roachpb.StoreID
defer func() {
ctx := t.AnnotateCtx(context.Background())
t.kvflowControl.mu.Lock()
t.kvflowControl.mu.connectionTracker.markStoresDisconnected(storeIDs)
t.kvflowControl.mu.Unlock()
t.kvflowControl.disconnectListener.OnRaftTransportDisconnected(ctx, storeIDs...)
if fn := t.knobs.OnServerStreamDisconnected; fn != nil {
fn()
}
}()

if err := t.stopper.RunAsyncTaskEx(
taskCtx,
stop.TaskOpts{
TaskName: "storage.RaftTransport: processing batch",
SpanOpt: stop.ChildSpan,
}, func(ctx context.Context) {
errCh <- func() error {
var storeIDs []roachpb.StoreID
defer func() {
ctx := t.AnnotateCtx(context.Background())
t.kvflowControl.mu.Lock()
t.kvflowControl.mu.connectionTracker.markStoresDisconnected(storeIDs)
t.kvflowControl.mu.Unlock()
t.kvflowControl.disconnectListener.OnRaftTransportDisconnected(ctx, storeIDs...)
if fn := t.knobs.OnServerStreamDisconnected; fn != nil {
fn()
}
}()

stream := &lockedRaftMessageResponseStream{wrapped: stream}
for {
batch, err := stream.Recv()
Expand Down

0 comments on commit b514d3d

Please sign in to comment.