From 3ba6dcb6b23953e859d8d87d3014ad968d82e137 Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Tue, 1 Mar 2022 16:05:38 -0500 Subject: [PATCH] kvserver: fix race in durability callback queueing in raftLogTruncator The existing code admitted the following interleaving between thread-1, running the async raft log truncation, and thread-2 which is running a new durabilityAdvancedCallback. thread-1: executes queued := t.mu.queuedDurabilityCB and sees queued is false thread-2: sees t.mu.runningTruncation is true and sets t.mu.queuedDurabilityCB = true thread-1: Sets t.mu.runningTruncation = false and returns Now the queued callback will never run. This can happen in tests that wait for truncation before doing the next truncation step, because they will stop waiting once the truncation is observed on a Replica, which happens before any of the steps listed above for thread-1. Fixes #77046 Release justification: Bug fix Release note: None --- pkg/kv/kvserver/raft_log_truncator.go | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/pkg/kv/kvserver/raft_log_truncator.go b/pkg/kv/kvserver/raft_log_truncator.go index ad0aef0c485d..47c34d7e612d 100644 --- a/pkg/kv/kvserver/raft_log_truncator.go +++ b/pkg/kv/kvserver/raft_log_truncator.go @@ -380,19 +380,10 @@ func (r rangesByRangeID) Swap(i, j int) { // deadlock (see storage.Engine.RegisterFlushCompletedCallback). func (t *raftLogTruncator) durabilityAdvancedCallback() { runTruncation := false - doneRunning := func() {} t.mu.Lock() if !t.mu.runningTruncation && len(t.mu.addRanges) > 0 { runTruncation = true t.mu.runningTruncation = true - doneRunning = func() { - t.mu.Lock() - defer t.mu.Unlock() - if !t.mu.runningTruncation { - panic("expected runningTruncation") - } - t.mu.runningTruncation = false - } } if !runTruncation && len(t.mu.addRanges) > 0 { t.mu.queuedDurabilityCB = true @@ -403,20 +394,31 @@ func (t *raftLogTruncator) durabilityAdvancedCallback() { } if err := t.stopper.RunAsyncTask(t.ambientCtx, "raft-log-truncation", func(ctx context.Context) { - defer doneRunning() for { t.durabilityAdvanced(ctx) + shouldReturn := false t.mu.Lock() queued := t.mu.queuedDurabilityCB t.mu.queuedDurabilityCB = false - t.mu.Unlock() if !queued { + t.mu.runningTruncation = false + shouldReturn = true + } + t.mu.Unlock() + if shouldReturn { return } } }); err != nil { // Task did not run because stopper is stopped. - doneRunning() + func() { + t.mu.Lock() + defer t.mu.Unlock() + if !t.mu.runningTruncation { + panic("expected runningTruncation") + } + t.mu.runningTruncation = false + }() } }