Skip to content

Commit

Permalink
[prototype] kv: demonstrate latch poisoning
Browse files Browse the repository at this point in the history
  • Loading branch information
nvanbenschoten committed Feb 16, 2022
1 parent 1a8570f commit c1f2f47
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 3 deletions.
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/concurrency/concurrency_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ type RequestSequencer interface {
// requests that are blocked on this one to proceed. The guard should not
// be used after being released.
FinishReq(*Guard)

// Poisons latches, causing any conflicting requests to be rejected.
Poison(*Guard)
}

// ContentionHandler is concerned with handling contention-related errors. This
Expand Down Expand Up @@ -469,6 +472,9 @@ type latchManager interface {
// Releases latches, relinquish its protection from conflicting requests.
Release(latchGuard)

// Poisons latches, causing any conflicting requests to be rejected.
Poison(latchGuard)

// Metrics returns information about the state of the latchManager.
Metrics() LatchMetrics
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/concurrency/concurrency_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,13 @@ func (m *managerImpl) FinishReq(g *Guard) {
releaseGuard(g)
}

// Poison implements the RequestSequencer interface.
func (m *managerImpl) Poison(g *Guard) {
if g.lg != nil {
m.lm.Poison(g.lg)
}
}

// HandleWriterIntentError implements the ContentionHandler interface.
func (m *managerImpl) HandleWriterIntentError(
ctx context.Context, g *Guard, seq roachpb.LeaseSequence, t *roachpb.WriteIntentError,
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/concurrency/latch_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ func (m *latchManagerImpl) Release(lg latchGuard) {
m.m.Release(lg.(*spanlatch.Guard))
}

func (m *latchManagerImpl) Poison(lg latchGuard) {
m.m.Poison(lg.(*spanlatch.Guard))
}

func (m *latchManagerImpl) Metrics() LatchMetrics {
return m.m.Metrics()
}
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1204,6 +1204,12 @@ func (r *Replica) refreshProposalsLocked(
// power the probe anyway). Over time, we anticipate there being multiple
// mechanisms which trip the breaker.
r.breaker.TripAsync()

for _, p := range r.mu.proposals {
if p.ec.g != nil {
p.ec.repl.concMgr.Poison(p.ec.g)
}
}
}

if log.V(1) && len(reproposals) > 0 {
Expand Down
15 changes: 14 additions & 1 deletion pkg/kv/kvserver/spanlatch/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,9 @@ func (m *Manager) iterAndWait(
for it.FirstOverlap(wait); it.Valid(); it.NextOverlap(wait) {
held := it.Cur()
if held.done.signaled() {
if held.done.poisoned() {
return errors.Errorf("poisoned ... put whatever error we want here")
}
continue
}
if ignore(wait.ts, held.ts) {
Expand All @@ -518,6 +521,9 @@ func (m *Manager) waitForSignal(
for {
select {
case <-held.done.signalChan():
if held.done.poisoned() {
return errors.Errorf("poisoned ... put whatever error we want here")
}
return nil
case <-t.C:
t.Read = true
Expand Down Expand Up @@ -545,7 +551,7 @@ func (m *Manager) waitForSignal(
// dependent latch acquisition attempts can complete if not blocked on any other
// owned latches.
func (m *Manager) Release(lg *Guard) {
lg.done.signal()
lg.done.signal(false /* poison */)
if lg.snap != nil {
lg.snap.close()
}
Expand All @@ -555,6 +561,13 @@ func (m *Manager) Release(lg *Guard) {
m.mu.Unlock()
}

// Poison poisons the latches held by the provided Guard. After being called,
// dependent latch acquisition attempts will be rejected. This will continue
// until the Guard is eventually released.
func (m *Manager) Poison(lg *Guard) {
lg.done.signal(true /* poison */)
}

// removeLocked removes the latches owned by the provided Guard from the
// Manager. Must be called with mu held.
func (m *Manager) removeLocked(lg *Guard) {
Expand Down
12 changes: 10 additions & 2 deletions pkg/kv/kvserver/spanlatch/signal.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,16 @@ const (
//
type signal struct {
a int32
p int32
c unsafe.Pointer // chan struct{}, lazily initialized
}

func (s *signal) signal() {
func (s *signal) signal(poison bool) {
if poison {
atomic.StoreInt32(&s.p, 1)
}
if !atomic.CompareAndSwapInt32(&s.a, noSig, sig) {
panic("signaled twice")
return // already signaled
}
// Close the channel if it was ever initialized.
if cPtr := atomic.LoadPointer(&s.c); cPtr != nil {
Expand All @@ -57,6 +61,10 @@ func (s *signal) signaled() bool {
return atomic.LoadInt32(&s.a) > noSig
}

func (s *signal) poisoned() bool {
return atomic.LoadInt32(&s.p) != 0
}

func (s *signal) signalChan() <-chan struct{} {
// If the signal has already been signaled, return a closed channel.
if s.signaled() {
Expand Down

0 comments on commit c1f2f47

Please sign in to comment.