Skip to content

Commit

Permalink
wal: work around false data race in failoverWriter
Browse files Browse the repository at this point in the history
failoverWriter has competing consumers who do compare-and-swap (CAS) to
pop from recordQueue. Since the CAS releases the popped slots in buffer
to the producer, the consumers need to read the contents of these slots
before doing the CAS. After the CAS, they only use the read entries
that they actually popped. But the fact that they read some entries,
even though they are not going to use them, trips up the data race
detector (since the read can be happening while the producer is
writing).

We work around this by introducing a consumer mutex to make consumption
single threaded. This is acceptable since what we really care about
is many concurrent producers, and consumers not interfering with
producers.

Informs #3230
Informs CRDB-35401
  • Loading branch information
sumeerbhola committed Mar 6, 2024
1 parent 08e0a86 commit ce293cc
Showing 1 changed file with 74 additions and 41 deletions.
115 changes: 74 additions & 41 deletions wal/failover_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,9 @@ type recordQueue struct {
// queue is [tail, head). tail is the oldest entry and head is the index for
// the next entry.
//
// Consumers: atomically read and write tail in pop (using
// compare-and-swap). This is not the usual kind of queue consumer since
// they already know the index that they are popping exists, hence don't
// need to look at head.
// Consumers: atomically read and write tail in pop. This is not the usual
// kind of queue consumer since they already know the index that they are
// popping exists, hence don't need to look at head.
//
// Producer: atomically reads tail in push. Writes to head.
//
Expand All @@ -63,12 +62,65 @@ type recordQueue struct {
// in push and the load of tail in pop accomplishes this release-acquire
// pair.
//
// We initially implemented competition between multiple consumers solely
// via atomic read-write of the tail using compare-and-swap (CAS). Since the
// atomic read-write to tail in pop releases those buffer entries for reuse
// to the producer, the consumer needs to grab the contents of
// recordQueueEntry that it needs to do callbacks etc. (specifically the
// contents corresponding to poppedEntry), *before* it succeeds with the
// atomic read-write. This introduced a false data race in the Golang data
// race detector
// https://github.com/cockroachdb/pebble/issues/3380#issuecomment-1981188174.
// Consider the case where the queue is [10,20), and consumer C1 is trying
// to pop [10,12) and consumer C2 is trying to pop [10,14). The following
// interleaving can happen:
//
// [C1] reads head=10, tail=20
// [C2] reads head=10, tail=20
// [C1] reads buffer contents [10,12) and makes local copy
// [C1] CAS to make the queue [12,20)
// [C2] reads buffer contents [10,14) and makes local copy, concurrently
// with producer writing to 10, 11. *
// [C2] CAS fails for popping [10,14), so revises to [12,14), and succeeds
// in CAS. C2 only uses the contents it read into the local copy for
// [12,14).
//
// * is a false data race since C2 is later deciding what contents it should
// use from among the contents it read, based on what indices it
// successfully popped. Unfortunately, we don't have a way to annotate the
// code to tell the data race detector to ignore this false positive. So we
// need to strengthen the synchronization to prevent such false positives.
// We observe that usually a consumer will be popping a batch of entries
// (based on a single successful fsync), and the number of consumers will be
// small (usually 1). In comparison, producers can be highly concurrent (due
// to workload concurrency). We don't want consumers to compete for a mutex
// with producers, but we can afford to have multiple consumers compete for
// a mutex. So we fix this false data race by using consumerMu to force
// single-threaded popping.
//
// An alternative would be to pass the information contained in poppedEntry
// to the LogWriter, so that it can pass it back when popping (so we don't
// have to retrieve it from the recordQueue.buffer). We would still need
// recordQueue.buffer, since writer switching needs those entries to be
// replayed. We don't consider this solution for the same reason we replaced
// record.pendingSyncsWithSyncQueue with
// record.pendingSyncsWithHighestSyncIndex for the failoverWriter code path
// -- we cannot bound the queue in the LogWriter by record.SyncConcurrency:
// say SyncConcurrency was 4096, and LogWriter1's queue was full, and we
// switched to LogWriter2, to which we replayed the same records and filled
// up the queue. Then if LogWriter1 unblocks and pops all the 4096 entries,
// the commit pipeline can send another 4096 entries, while LogWriter2 is
// still blocked on trying to write and sync the previous 4096 entries. This
// will overflow the queue in LogWriter2.
//
// All updates to headTail hold mu at least for reading. So when mu is held
// for writing, there is a guarantee that headTail is not being updated.
//
// head is most-significant 32 bits and tail is least-significant 32 bits.
headTail atomic.Uint64

consumerMu sync.Mutex

// Access to buffer requires at least RLock.
buffer []recordQueueEntry

Expand Down Expand Up @@ -153,7 +205,7 @@ func (q *recordQueue) popAll(err error) (numRecords int, numSyncsPopped int) {
if n == 0 {
return 0, 0
}
return n, q.pop(h-1, err, false)
return n, q.pop(h-1, err)
}

// Pops all entries up to and including index. The remaining queue is
Expand All @@ -162,19 +214,21 @@ func (q *recordQueue) popAll(err error) (numRecords int, numSyncsPopped int) {
// NB: we could slightly simplify to only have the latest writer be able to
// pop. This would avoid the CAS below, but it seems better to reduce the
// amount of queued work regardless of who has successfully written it.
func (q *recordQueue) pop(index uint32, err error, runCb bool) (numSyncsPopped int) {
func (q *recordQueue) pop(index uint32, err error) (numSyncsPopped int) {
var buf [512]poppedEntry
// Tail can increase, and numEntriesToPop decrease, due to competition with
// other consumers. Head can increase due to the concurrent producer.
headTailEntriesToPop := func() (ht uint64, h uint32, t uint32, numEntriesToPop int) {
ht = q.headTail.Load()
h, t = unpackHeadTail(ht)
tailEntriesToPop := func() (t uint32, numEntriesToPop int) {
ht := q.headTail.Load()
_, t = unpackHeadTail(ht)
tail := int(t)
numEntriesToPop = int(index) - tail + 1
return ht, h, t, numEntriesToPop
return t, numEntriesToPop
}
ht, head, tail, numEntriesToPop := headTailEntriesToPop()
q.consumerMu.Lock()
// numEntriesToPop is a function of index and tail. The value of tail cannot
// change since consumerMu is held.
tail, numEntriesToPop := tailEntriesToPop()
if numEntriesToPop <= 0 {
q.consumerMu.Unlock()
return 0
}
var b []poppedEntry
Expand All @@ -187,36 +241,19 @@ func (q *recordQueue) pop(index uint32, err error, runCb bool) (numSyncsPopped i
q.mu.RLock()
n := len(q.buffer)
for i := 0; i < numEntriesToPop; i++ {
// Grab all the possible entries before doing CAS, since successful CAS
// will also release those buffer slots to the producer.
// Grab the popped entries before incrementing tail, since that will
// release those buffer slots to the producer.
idx := (i + int(tail)) % n
b[i] = poppedEntry{
opts: q.buffer[idx].opts,
unref: q.buffer[idx].unref,
}
}
// CAS, with retry loop, since this pop can race with other consumers.
for {
newHT := makeHeadTail(head, index+1)
if q.headTail.CompareAndSwap(ht, newHT) {
break
}
ht, head, _, numEntriesToPop = headTailEntriesToPop()
if numEntriesToPop <= 0 {
break
}
}
// Since tail cannot change, we don't need to do a compare-and-swap.
q.headTail.Add(uint64(numEntriesToPop))
q.mu.RUnlock()

// The current value of numEntriesToPop is the number of entries that were
// popped.
if numEntriesToPop <= 0 {
return 0
}
bufLen := len(b)
// [0, bufLen-numEntriesToPop) were not popped, since this pop raced with
// other consumers that popped those entries.
for i := bufLen - numEntriesToPop; i < bufLen; i++ {
q.consumerMu.Unlock()
for i := 0; i < numEntriesToPop; i++ {
// Now that we've synced the entry, we can unref it to signal that we
// will not read the written byte slice again.
if b[i].unref != nil {
Expand Down Expand Up @@ -267,10 +304,6 @@ func unpackHeadTail(ht uint64) (head, tail uint32) {
return head, tail
}

func makeHeadTail(head, tail uint32) uint64 {
return (uint64(head) << headTailBits) | uint64(tail)
}

// Maximum number of physical log files when writing a virtual WAL. Arbitrarily
// chosen value. Setting this to 2 will not simplify the code. We make this a
// constant since we want a fixed size array for writer.writers.
Expand Down Expand Up @@ -646,7 +679,7 @@ func (ww *failoverWriter) doneSyncCallback(doneSync record.PendingSyncIndex, err
return
}
// NB: harmless after Close returns since numSyncsPopped will be 0.
numSyncsPopped := ww.q.pop(uint32(doneSync.Index), err, true)
numSyncsPopped := ww.q.pop(uint32(doneSync.Index), err)
if ww.opts.queueSemChan != nil {
for i := 0; i < numSyncsPopped; i++ {
<-ww.opts.queueSemChan
Expand Down

0 comments on commit ce293cc

Please sign in to comment.