From c1f2f4747b783c6f356ccb266845140ac469c000 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 15 Feb 2022 22:40:11 -0500 Subject: [PATCH] [prototype] kv: demonstrate latch poisoning --- .../kvserver/concurrency/concurrency_control.go | 6 ++++++ .../kvserver/concurrency/concurrency_manager.go | 7 +++++++ pkg/kv/kvserver/concurrency/latch_manager.go | 4 ++++ pkg/kv/kvserver/replica_raft.go | 6 ++++++ pkg/kv/kvserver/spanlatch/manager.go | 15 ++++++++++++++- pkg/kv/kvserver/spanlatch/signal.go | 12 ++++++++++-- 6 files changed, 47 insertions(+), 3 deletions(-) diff --git a/pkg/kv/kvserver/concurrency/concurrency_control.go b/pkg/kv/kvserver/concurrency/concurrency_control.go index 883a83e6037c..d730434d52a7 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_control.go +++ b/pkg/kv/kvserver/concurrency/concurrency_control.go @@ -194,6 +194,9 @@ type RequestSequencer interface { // requests that are blocked on this one to proceed. The guard should not // be used after being released. FinishReq(*Guard) + + // Poisons latches, causing any conflicting requests to be rejected. + Poison(*Guard) } // ContentionHandler is concerned with handling contention-related errors. This @@ -469,6 +472,9 @@ type latchManager interface { // Releases latches, relinquish its protection from conflicting requests. Release(latchGuard) + // Poisons latches, causing any conflicting requests to be rejected. + Poison(latchGuard) + // Metrics returns information about the state of the latchManager. Metrics() LatchMetrics } diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager.go b/pkg/kv/kvserver/concurrency/concurrency_manager.go index 5f63d0a05265..c1a15103bc89 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager.go @@ -408,6 +408,13 @@ func (m *managerImpl) FinishReq(g *Guard) { releaseGuard(g) } +// Poison implements the RequestSequencer interface. +func (m *managerImpl) Poison(g *Guard) { + if g.lg != nil { + m.lm.Poison(g.lg) + } +} + // HandleWriterIntentError implements the ContentionHandler interface. func (m *managerImpl) HandleWriterIntentError( ctx context.Context, g *Guard, seq roachpb.LeaseSequence, t *roachpb.WriteIntentError, diff --git a/pkg/kv/kvserver/concurrency/latch_manager.go b/pkg/kv/kvserver/concurrency/latch_manager.go index b0a4b8eb1073..e495870e785f 100644 --- a/pkg/kv/kvserver/concurrency/latch_manager.go +++ b/pkg/kv/kvserver/concurrency/latch_manager.go @@ -62,6 +62,10 @@ func (m *latchManagerImpl) Release(lg latchGuard) { m.m.Release(lg.(*spanlatch.Guard)) } +func (m *latchManagerImpl) Poison(lg latchGuard) { + m.m.Poison(lg.(*spanlatch.Guard)) +} + func (m *latchManagerImpl) Metrics() LatchMetrics { return m.m.Metrics() } diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 2bb9d5c2e6f5..3ce7f2046c32 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -1204,6 +1204,12 @@ func (r *Replica) refreshProposalsLocked( // power the probe anyway). Over time, we anticipate there being multiple // mechanisms which trip the breaker. r.breaker.TripAsync() + + for _, p := range r.mu.proposals { + if p.ec.g != nil { + p.ec.repl.concMgr.Poison(p.ec.g) + } + } } if log.V(1) && len(reproposals) > 0 { diff --git a/pkg/kv/kvserver/spanlatch/manager.go b/pkg/kv/kvserver/spanlatch/manager.go index 92a67b87d8f3..69e737f91073 100644 --- a/pkg/kv/kvserver/spanlatch/manager.go +++ b/pkg/kv/kvserver/spanlatch/manager.go @@ -498,6 +498,9 @@ func (m *Manager) iterAndWait( for it.FirstOverlap(wait); it.Valid(); it.NextOverlap(wait) { held := it.Cur() if held.done.signaled() { + if held.done.poisoned() { + return errors.Errorf("poisoned ... put whatever error we want here") + } continue } if ignore(wait.ts, held.ts) { @@ -518,6 +521,9 @@ func (m *Manager) waitForSignal( for { select { case <-held.done.signalChan(): + if held.done.poisoned() { + return errors.Errorf("poisoned ... put whatever error we want here") + } return nil case <-t.C: t.Read = true @@ -545,7 +551,7 @@ func (m *Manager) waitForSignal( // dependent latch acquisition attempts can complete if not blocked on any other // owned latches. func (m *Manager) Release(lg *Guard) { - lg.done.signal() + lg.done.signal(false /* poison */) if lg.snap != nil { lg.snap.close() } @@ -555,6 +561,13 @@ func (m *Manager) Release(lg *Guard) { m.mu.Unlock() } +// Poison poisons the latches held by the provided Guard. After being called, +// dependent latch acquisition attempts will be rejected. This will continue +// until the Guard is eventually released. +func (m *Manager) Poison(lg *Guard) { + lg.done.signal(true /* poison */) +} + // removeLocked removes the latches owned by the provided Guard from the // Manager. Must be called with mu held. func (m *Manager) removeLocked(lg *Guard) { diff --git a/pkg/kv/kvserver/spanlatch/signal.go b/pkg/kv/kvserver/spanlatch/signal.go index 71deb5621918..8ed0dde155b6 100644 --- a/pkg/kv/kvserver/spanlatch/signal.go +++ b/pkg/kv/kvserver/spanlatch/signal.go @@ -37,12 +37,16 @@ const ( // type signal struct { a int32 + p int32 c unsafe.Pointer // chan struct{}, lazily initialized } -func (s *signal) signal() { +func (s *signal) signal(poison bool) { + if poison { + atomic.StoreInt32(&s.p, 1) + } if !atomic.CompareAndSwapInt32(&s.a, noSig, sig) { - panic("signaled twice") + return // already signaled } // Close the channel if it was ever initialized. if cPtr := atomic.LoadPointer(&s.c); cPtr != nil { @@ -57,6 +61,10 @@ func (s *signal) signaled() bool { return atomic.LoadInt32(&s.a) > noSig } +func (s *signal) poisoned() bool { + return atomic.LoadInt32(&s.p) != 0 +} + func (s *signal) signalChan() <-chan struct{} { // If the signal has already been signaled, return a closed channel. if s.signaled() {