Skip to content

Commit

Permalink
concurrency,kvserver: limited scans optimistically check for locks
Browse files Browse the repository at this point in the history
This optimistic checking happens after the evaluation. The evaluation
will already discover any conflicting intents, so the subsequent
checking of the lock table is primarily to find conflicting
unreplicated locks.

This structure should be easy to extend to optimistic latching.

Fixes cockroachdb#49973
Informs cockroachdb#9521

Release note: None
  • Loading branch information
sumeerbhola committed Jan 8, 2021
1 parent d8b5cb0 commit 104b96d
Show file tree
Hide file tree
Showing 9 changed files with 800 additions and 277 deletions.
34 changes: 34 additions & 0 deletions pkg/kv/kvserver/concurrency/concurrency_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,18 @@ type RequestSequencer interface {
// does so, it will not return a request guard.
SequenceReq(context.Context, *Guard, Request) (*Guard, Response, *Error)

// SequenceOptimisticRetryingAsPessimistic is akin to SequenceReq, but used
// for the first pessimistic retry of a previously optimistic request
// evaluation. Request.Optimistic must be false here. The first pessimistic
// retry has not called any methods in ContentionHandler, so the state of
// latches and locks is whatever it was when the optimistic request
// previously went through SequenceReq. In the current implementation,
// latches are held, but only a snapshot of the lock table has been taken
// (which will need to be snapshot again). In the future, latches will not
// be held either.
SequenceOptimisticRetryingAsPessimistic(
context.Context, *Guard, Request) (*Guard, Response, *Error)

// FinishReq marks the request as complete, releasing any protection
// the request had against conflicting requests and allowing conflicting
// requests that are blocked on this one to proceed. The guard should not
Expand Down Expand Up @@ -351,6 +363,11 @@ type Request struct {
// (Txn == nil), all reads and writes are considered to take place at
// Timestamp.
LockSpans *spanset.SpanSet

// Set to true when the request is desiring optimistic evaluation, i.e.,
// locks will be checked after evaluation using
// Guard.CheckOptimisticNoConflicts.
Optimistic bool
}

// Guard is returned from Manager.SequenceReq. The guard is passed back in to
Expand All @@ -361,6 +378,10 @@ type Guard struct {
ltg lockTableGuard
}

func (g *Guard) CheckOptimisticNoConflicts(lockSpansRead *spanset.SpanSet) (ok bool) {
return g.ltg.CheckOptimisticNoConflicts(lockSpansRead)
}

// Response is a slice of responses to requests in a batch. This type is used
// when the concurrency manager is able to respond to a request directly during
// sequencing.
Expand Down Expand Up @@ -460,6 +481,13 @@ type lockTable interface {
// function.
ScanAndEnqueue(Request, lockTableGuard) lockTableGuard

// ScanOptimistic takes a snapshot of the lock table for later checking for
// conflicts, and returns a guard. It is for optimistic evaluation of
// requests that will typically scan a small subset of the spans mentioned
// in the Request. After Request evaluation, CheckOptimisticNoConflicts
// must be called on the guard.
ScanOptimistic(Request) lockTableGuard

// Dequeue removes the request from its lock wait-queues. It should be
// called when the request is finished, whether it evaluated or not. The
// guard should not be used after being dequeued.
Expand Down Expand Up @@ -599,6 +627,12 @@ type lockTableGuard interface {
// This must be called after the waiting state has transitioned to
// doneWaiting.
ResolveBeforeScanning() []roachpb.LockUpdate

// CheckOptimisticNoConflicts uses the SpanSet representing the spans that
// were actually read, to check for conflicting locks, after an optimistic
// evaluation. It returns true if there were no conflicts. See
// lockTable.ScanOptimistic for context.
CheckOptimisticNoConflicts(*spanset.SpanSet) (ok bool)
}

// lockTableWaiter is concerned with waiting in lock wait-queues for locks held
Expand Down
64 changes: 52 additions & 12 deletions pkg/kv/kvserver/concurrency/concurrency_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,18 @@ func (m *managerImpl) SequenceReq(
var g *Guard
if prev == nil {
g = newGuard(req)
log.Event(ctx, "sequencing request")
if req.Optimistic {
log.Event(ctx, "optimistically sequencing request")
} else {
log.Event(ctx, "sequencing request")
}
} else {
g = prev
g.AssertNoLatches()
log.Event(ctx, "re-sequencing request")
}

resp, err := m.sequenceReqWithGuard(ctx, g, req)
resp, err := m.sequenceReqWithGuard(ctx, g, req, false /* holdsLatches */)
if resp != nil || err != nil {
// Ensure that we release the guard if we return a response or an error.
m.FinishReq(g)
Expand All @@ -130,8 +134,30 @@ func (m *managerImpl) SequenceReq(
return g, nil, nil
}

func (m *managerImpl) sequenceReqWithGuard(
// SequenceOptimisticRetryingAsPessimistic implements the RequestSequencer
// interface.
func (m *managerImpl) SequenceOptimisticRetryingAsPessimistic(
ctx context.Context, g *Guard, req Request,
) (*Guard, Response, *Error) {
if g == nil {
panic("retry should have non-nil guard")
}
if g.Req.Optimistic {
panic("pessimistic retry has optimistic bit set to true")
}
g.AssertLatches()
log.Event(ctx, "re-sequencing request after optimistic sequencing failed")
resp, err := m.sequenceReqWithGuard(ctx, g, req, true /* holdsLatches */)
if resp != nil || err != nil {
// Ensure that we release the guard if we return a response or an error.
m.FinishReq(g)
return nil, resp, err
}
return g, nil, nil
}

func (m *managerImpl) sequenceReqWithGuard(
ctx context.Context, g *Guard, req Request, holdsLatches bool,
) (Response, *Error) {
// Some requests don't need to acquire latches at all.
if !shouldAcquireLatches(req) {
Expand All @@ -148,22 +174,36 @@ func (m *managerImpl) sequenceReqWithGuard(
}

for {
// Acquire latches for the request. This synchronizes the request
// with all conflicting in-flight requests.
log.Event(ctx, "acquiring latches")
g.lg, err = m.lm.Acquire(ctx, req)
if err != nil {
return nil, err
if !holdsLatches {
// TODO(sumeer): optimistic requests could register their need for
// latches, but not actually wait until acquisition.
// https://github.com/cockroachdb/cockroach/issues/9521

// Acquire latches for the request. This synchronizes the request
// with all conflicting in-flight requests.
log.Event(ctx, "acquiring latches")
g.lg, err = m.lm.Acquire(ctx, req)
if err != nil {
return nil, err
}
}

// Some requests don't want the wait on locks.
if req.LockSpans.Empty() {
return nil, nil
}

// Scan for conflicting locks.
log.Event(ctx, "scanning lock table for conflicting locks")
g.ltg = m.lt.ScanAndEnqueue(g.Req, g.ltg)
if req.Optimistic {
if g.ltg != nil {
panic("Optimistic locking should not have a non-nil lockTableGuard")
}
log.Event(ctx, "scanning lock table for conflicting locks")
g.ltg = m.lt.ScanOptimistic(g.Req)
} else {
// Scan for conflicting locks.
log.Event(ctx, "scanning lock table for conflicting locks")
g.ltg = m.lt.ScanAndEnqueue(g.Req, g.ltg)
}

// Wait on conflicting locks, if necessary.
if g.ltg.ShouldWait() {
Expand Down
100 changes: 89 additions & 11 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,30 @@ func (g *lockTableGuardImpl) CurState() waitingState {
return g.mu.state
}

func (g *lockTableGuardImpl) CheckOptimisticNoConflicts(spanSet *spanset.SpanSet) (ok bool) {
// Temporarily replace the SpanSet in the guard.
originalSpanSet := g.spans
g.spans = spanSet
defer func() {
g.spans = originalSpanSet
}()
span := stepToNextSpan(g)
for span != nil {
startKey := span.Key
tree := g.tableSnapshot[g.ss]
iter := tree.MakeIter()
ltRange := &lockState{key: startKey, endKey: span.EndKey}
for iter.FirstOverlap(ltRange); iter.Valid(); iter.NextOverlap(ltRange) {
l := iter.Cur()
if !l.isNonConflictingLock(g, g.sa) {
return false
}
}
span = stepToNextSpan(g)
}
return true
}

func (g *lockTableGuardImpl) notify() {
select {
case g.mu.signal <- struct{}{}:
Expand Down Expand Up @@ -1357,6 +1381,45 @@ func (l *lockState) tryActiveWait(
return true, false
}

func (l *lockState) isNonConflictingLock(g *lockTableGuardImpl, sa spanset.SpanAccess) bool {
l.mu.Lock()
defer l.mu.Unlock()

// It is possible that this lock is empty and has not yet been deleted.
if l.isEmptyLock() {
return true
}
// Lock is not empty.
lockHolderTxn, lockHolderTS := l.getLockHolder()
if lockHolderTxn == nil {
// Reservation holders are non-conflicting.
//
// When optimistic evaluation holds latches, there cannot be a conflicting
// reservation holder that is also holding latches (reservation holder
// without latches can happen due to AddDiscoveredLock). So when this
// optimistic evaluation succeeds and releases latches the reservation
// holder will need to acquire latches and scan the lock table again. When
// optimistic evaluation does not hold latches, it will check for
// conflicting latches before declaring success and a reservation holder
// that holds latches will be discovered, and the optimistic evaluation
// will retry as pessimistic.
return true
}
if g.isSameTxn(lockHolderTxn) {
// Already locked by this txn.
return true
}
// NB: We do not look at the finalizedTxnCache in this optimistic evaluation
// path. A conflict with a finalized txn will be noticed when retrying
// pessimistically.

if sa == spanset.SpanReadOnly && g.readTS.Less(lockHolderTS) {
return true
}
// Conflicts.
return false
}

// Acquires this lock. Returns the list of guards that are done actively
// waiting at this key -- these will be requests from the same transaction
// that is acquiring the lock.
Expand Down Expand Up @@ -1957,6 +2020,12 @@ func (t *treeMu) nextLockSeqNum() uint64 {
return t.lockIDSeqNum
}

func (t *lockTableImpl) ScanOptimistic(req Request) lockTableGuard {
g := t.newGuardForReq(req)
t.doSnapshotForGuard(g)
return g
}

// ScanAndEnqueue implements the lockTable interface.
func (t *lockTableImpl) ScanAndEnqueue(req Request, guard lockTableGuard) lockTableGuard {
// NOTE: there is no need to synchronize with enabledMu here. ScanAndEnqueue
Expand All @@ -1967,15 +2036,7 @@ func (t *lockTableImpl) ScanAndEnqueue(req Request, guard lockTableGuard) lockTa

var g *lockTableGuardImpl
if guard == nil {
g = newLockTableGuardImpl()
g.seqNum = atomic.AddUint64(&t.seqNum, 1)
g.lt = t
g.txn = req.txnMeta()
g.spans = req.LockSpans
g.readTS = req.readConflictTimestamp()
g.writeTS = req.writeConflictTimestamp()
g.sa = spanset.NumSpanAccess - 1
g.index = -1
g = t.newGuardForReq(req)
} else {
g = guard.(*lockTableGuardImpl)
g.key = nil
Expand All @@ -1988,6 +2049,25 @@ func (t *lockTableImpl) ScanAndEnqueue(req Request, guard lockTableGuard) lockTa
g.mu.Unlock()
g.toResolve = g.toResolve[:0]
}
t.doSnapshotForGuard(g)
g.findNextLockAfter(true /* notify */)
return g
}

func (t *lockTableImpl) newGuardForReq(req Request) *lockTableGuardImpl {
g := newLockTableGuardImpl()
g.seqNum = atomic.AddUint64(&t.seqNum, 1)
g.lt = t
g.txn = req.txnMeta()
g.spans = req.LockSpans
g.readTS = req.readConflictTimestamp()
g.writeTS = req.writeConflictTimestamp()
g.sa = spanset.NumSpanAccess - 1
g.index = -1
return g
}

func (t *lockTableImpl) doSnapshotForGuard(g *lockTableGuardImpl) {
for ss := spanset.SpanScope(0); ss < spanset.NumSpanScope; ss++ {
for sa := spanset.SpanAccess(0); sa < spanset.NumSpanAccess; sa++ {
if len(g.spans.GetSpans(sa, ss)) > 0 {
Expand All @@ -2003,8 +2083,6 @@ func (t *lockTableImpl) ScanAndEnqueue(req Request, guard lockTableGuard) lockTa
}
}
}
g.findNextLockAfter(true /* notify */)
return g
}

// Dequeue implements the lockTable interface.
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/concurrency/lock_table_waiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ type mockLockTableGuard struct {
toResolve []roachpb.LockUpdate
}

var _ lockTableGuard = &mockLockTableGuard{}

// mockLockTableGuard implements the lockTableGuard interface.
func (g *mockLockTableGuard) ShouldWait() bool { return true }
func (g *mockLockTableGuard) NewStateChan() chan struct{} { return g.signal }
Expand All @@ -76,6 +78,9 @@ func (g *mockLockTableGuard) CurState() waitingState {
func (g *mockLockTableGuard) ResolveBeforeScanning() []roachpb.LockUpdate {
return g.toResolve
}
func (g *mockLockTableGuard) CheckOptimisticNoConflicts(*spanset.SpanSet) (ok bool) {
return true
}
func (g *mockLockTableGuard) notify() { g.signal <- struct{}{} }

// mockLockTable overrides TransactionIsFinalized, which is the only LockTable
Expand Down
Loading

0 comments on commit 104b96d

Please sign in to comment.