Skip to content

Commit

Permalink
Merge #105642
Browse files Browse the repository at this point in the history
105642: kvserver: de-flake TestMergeQueueSeesLearnerOrJointConfig r=irfansharif a=irfansharif

Fixes #105381. It was possible for us to receive client-side store IDs concurrently with the server-side stopper being quiesced, at which point we read the variable storing the client-side store IDs without a mutex (which triggered the data race detector). The data race was benign, but instead of adding a synchronization primitive, we recognize that there was no reason for the store ID handling to happen on separate threads.

Release note: None

Co-authored-by: irfan sharif <[email protected]>
  • Loading branch information
craig[bot] and irfansharif committed Jun 29, 2023
2 parents d0b9088 + b514d3d commit c1d73cd
Showing 1 changed file with 12 additions and 12 deletions.
24 changes: 12 additions & 12 deletions pkg/kv/kvserver/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,25 +448,25 @@ func (t *RaftTransport) RaftMessageBatch(stream MultiRaft_RaftMessageBatchServer
taskCtx = t.AnnotateCtx(taskCtx)
defer cancel()

var storeIDs []roachpb.StoreID
defer func() {
ctx := t.AnnotateCtx(context.Background())
t.kvflowControl.mu.Lock()
t.kvflowControl.mu.connectionTracker.markStoresDisconnected(storeIDs)
t.kvflowControl.mu.Unlock()
t.kvflowControl.disconnectListener.OnRaftTransportDisconnected(ctx, storeIDs...)
if fn := t.knobs.OnServerStreamDisconnected; fn != nil {
fn()
}
}()

if err := t.stopper.RunAsyncTaskEx(
taskCtx,
stop.TaskOpts{
TaskName: "storage.RaftTransport: processing batch",
SpanOpt: stop.ChildSpan,
}, func(ctx context.Context) {
errCh <- func() error {
var storeIDs []roachpb.StoreID
defer func() {
ctx := t.AnnotateCtx(context.Background())
t.kvflowControl.mu.Lock()
t.kvflowControl.mu.connectionTracker.markStoresDisconnected(storeIDs)
t.kvflowControl.mu.Unlock()
t.kvflowControl.disconnectListener.OnRaftTransportDisconnected(ctx, storeIDs...)
if fn := t.knobs.OnServerStreamDisconnected; fn != nil {
fn()
}
}()

stream := &lockedRaftMessageResponseStream{wrapped: stream}
for {
batch, err := stream.Recv()
Expand Down

0 comments on commit c1d73cd

Please sign in to comment.