Skip to content

Commit

Permalink
[WIP] concurrency: move finalizedTxnCache into lock table
Browse files Browse the repository at this point in the history
(There are bugs here, since existing tests fail. And I have
not added new tests yet -- I would like an opinion on the
high-level approach before fixing those.)

This is a cleanup in preparation for the future, and also
has some, probably minor, immediate benefits.

In the future, the lock table will support multiple intents for
the same key if all but one are known to be finalized. So the
finalizedTxnCache belongs in the lock table data-structure.
Additionally, we will support intent resolution without holding
latches, which has some implications on data-structure
consistency: request evaluation will not be allowed to add
discovered intents to the lock table since the discovery may be
stale. This PR is not changing this discovery behavior since we
need it for now (due to interleaved intents), but it moves us
along the path towards the lock table data-structure not
relying on external behavior for maintaining its in-memory
"cache" of locks. Specifically, removing intents from the lock
table when the intent is still present in the engine is not
principled. We currently do this in two places:
- for optimizing limited scans: a later PR will fix this properly
  by checking the lock table after request evaluation, as
  outlined in cockroachdb#49973.
- using the finalizedTxnCache in the lockTableWaiterImpl: this
  use is changed in this PR. The code in the lock table also does
  removal of intents before resolution, but there is a TODO to
  fix that in the future. It should be easier to do this with the
  behavior contained in the lock table.

The immediate benefits, which may not have any practical
significance, are:
- We no longer resolve unreplicated locks -- they are simply
  removed.
- A replicated lock is removed from the lock table data-structure
  only when the requester has finished a scan and is in a
  position to do resolution. Earlier one could remove the lock
  but block on another lock, and not do intent resolution on
  the first lock. This would cause wasteful evaluation of other
  requests.

Release note: None
  • Loading branch information
sumeerbhola committed Dec 15, 2020
1 parent 1b3186d commit ff2f337
Show file tree
Hide file tree
Showing 6 changed files with 591 additions and 95 deletions.
16 changes: 11 additions & 5 deletions pkg/kv/kvserver/concurrency/concurrency_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,12 @@ type lockTable interface {
// txn.WriteTimestamp.
UpdateLocks(*roachpb.LockUpdate) error

// Informs the lock table that a transaction is finalized. This is used
// by the lock table in a best-effort manner to avoid waiting on locks
// of finalized transactions and telling the caller via
// lockTableGuard.ResolveBeforeEvaluation to resolve a batch of intents.
TransactionIsFinalized(*roachpb.Transaction)

// String returns a debug string representing the state of the lockTable.
String() string
}
Expand All @@ -588,6 +594,11 @@ type lockTableGuard interface {

// CurState returns the latest waiting state.
CurState() waitingState

// ResolveBeforeScanning lists the locks to resolve before scanning again.
// This must be called after the waiting state has transitioned to
// doneWaiting.
ResolveBeforeScanning() []roachpb.LockUpdate
}

// lockTableWaiter is concerned with waiting in lock wait-queues for locks held
Expand Down Expand Up @@ -646,11 +657,6 @@ type lockTableWaiter interface {
// and, in turn, remove this method. This will likely fall out of pulling
// all replicated locks into the lockTable.
WaitOnLock(context.Context, Request, *roachpb.Intent) *Error

// ClearCaches wipes all caches maintained by the lockTableWaiter. This is
// primarily used to recover memory when a replica loses a lease. However,
// it is also used in tests to reset the state of the lockTableWaiter.
ClearCaches()
}

// txnWaitQueue holds a collection of wait-queues for transaction records.
Expand Down
11 changes: 5 additions & 6 deletions pkg/kv/kvserver/concurrency/concurrency_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ func (c *Config) initDefaults() {
func NewManager(cfg Config) Manager {
cfg.initDefaults()
m := new(managerImpl)
lt := &lockTableImpl{
maxLocks: cfg.MaxLockTableSize,
}
*m = managerImpl{
// TODO(nvanbenschoten): move pkg/storage/spanlatch to a new
// pkg/storage/concurrency/latch package. Make it implement the
Expand All @@ -82,14 +85,13 @@ func NewManager(cfg Config) Manager {
cfg.SlowLatchGauge,
),
},
lt: &lockTableImpl{
maxLocks: cfg.MaxLockTableSize,
},
lt: lt,
ltw: &lockTableWaiterImpl{
st: cfg.Settings,
stopper: cfg.Stopper,
ir: cfg.IntentResolver,
lm: m,
lt: lt,
disableTxnPushing: cfg.DisableTxnPushing,
},
// TODO(nvanbenschoten): move pkg/storage/txnwait to a new
Expand Down Expand Up @@ -344,9 +346,6 @@ func (m *managerImpl) OnRangeLeaseUpdated(seq roachpb.LeaseSequence, isLeasehold
const disable = true
m.lt.Clear(disable)
m.twq.Clear(disable)
// Also clear caches, since they won't be needed any time soon and
// consume memory.
m.ltw.ClearCaches()
}
}

Expand Down
154 changes: 146 additions & 8 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,17 @@ type lockTableImpl struct {
locks [spanset.NumSpanScope]treeMu

maxLocks int64

// finalizedTxnCache is a small LRU cache that tracks transactions that
// were pushed and found to be finalized (COMMITTED or ABORTED). It is
// used as an optimization to avoid repeatedly pushing the transaction
// record when cleaning up the intents of an abandoned transaction.
//
// NOTE: it probably makes sense to maintain a single finalizedTxnCache
// across all Ranges on a Store instead of an individual cache per
// Range. For now, we don't do this because we don't share any state
// between separate concurrency.Manager instances.
finalizedTxnCache txnCache
}

var _ lockTable = &lockTableImpl{}
Expand Down Expand Up @@ -256,6 +267,7 @@ var _ lockTable = &lockTableImpl{}
// lockTableGuard that returns false from StartWaiting()).
type lockTableGuardImpl struct {
seqNum uint64
lt *lockTableImpl

// Information about this request.
txn *enginepb.TxnMeta
Expand Down Expand Up @@ -332,6 +344,10 @@ type lockTableGuardImpl struct {
// (proportional to number of waiters).
mustFindNextLockAfter bool
}
// Locks to resolve before scanning again. Doesn't need to be protected by
// mu since should only be read after the caller has already synced with mu
// in realizing that it is doneWaiting.
toResolve []roachpb.LockUpdate
}

var _ lockTableGuard = &lockTableGuardImpl{}
Expand Down Expand Up @@ -379,6 +395,10 @@ func (g *lockTableGuardImpl) ShouldWait() bool {
return g.mu.startWait
}

func (g *lockTableGuardImpl) ResolveBeforeScanning() []roachpb.LockUpdate {
return g.toResolve
}

func (g *lockTableGuardImpl) NewStateChan() chan struct{} {
g.mu.Lock()
defer g.mu.Unlock()
Expand Down Expand Up @@ -431,9 +451,11 @@ func (g *lockTableGuardImpl) isSameTxnAsReservation(ws waitingState) bool {

// Finds the next lock, after the current one, to actively wait at. If it
// finds the next lock the request starts actively waiting there, else it is
// told that it is done waiting.
// told that it is done waiting. lockTableImpl.finalizedTxnCache is used to
// accumulate intents to resolve.
// Acquires g.mu.
func (g *lockTableGuardImpl) findNextLockAfter(notify bool) {
g.toResolve = g.toResolve[:0]
spans := g.spans.GetSpans(g.sa, g.ss)
var span *spanset.Span
resumingInSameSpan := false
Expand Down Expand Up @@ -475,10 +497,24 @@ func (g *lockTableGuardImpl) findNextLockAfter(notify bool) {
resumingInSameSpan = false
span = stepToNextSpan(g)
}
if len(g.toResolve) > 0 {
for i := range g.toResolve {
// TODO
if err := g.lt.UpdateLocks(&g.toResolve[i]); err != nil {
panic(err.Error())
}
}
}
g.mu.Lock()
defer g.mu.Unlock()
g.mu.state = waitingState{kind: doneWaiting}
if notify {
if len(g.toResolve) > 0 {
// Force caller to release latches and resolve intents. The first
// state it will see after releasing latches is doneWaiting, which
// will cause it to resolve intents.
g.mu.startWait = true
}
g.notify()
}
}
Expand Down Expand Up @@ -1028,8 +1064,57 @@ func (l *lockState) clearLockHolder() {
// it is set to false when the call to tryActiveWait is happening due to an
// event for a different request or transaction (like a lock release) since in
// that case the channel is notified first and the call to tryActiveWait()
// happens later in lockTableGuard.CurState(). The return value is true iff
// it is actively waiting.
// happens later in lockTableGuard.CurState().
//
// It uses the finalizedTxnCache to decide that the caller does not need to
// wait on a lock of a transaction that is already finalized.
//
// - For unreplicated locks, this method will silently remove the lock and
// proceed as normal.
// - For replicated locks the behavior is more complicated since we need to
// resolve the intent. We desire:
// A. batching of intent resolution.
// B. minimize races where intent resolution is being performed by multiple
// requests.
// C. minimize races where intent has not yet been resolved but has been
// removed from the lock table, thereby causing some other request to
// evaluate wastefully and discover the intent.
//
// For A, the caller of tryActiveWait will accumulate the LockUpdates. For B,
// we only generate a LockUpdate here if this request is either a reader, or
// the first writer in the queue, i.e., it is only blocked by the lock
// holder. This prevents races between multiple writers in doing resolution
// but not between multiple readers and between readers and writers. We could
// be more conservative in only doing the intent resolution if the waiter was
// equivalent to a distinguished-waiter, but there it no guarantee that that
// distinguished waiter will do intent resolution in a timely manner (since
// it could block waiting on some other lock). Instead, the caller of
// tryActiveWait makes a best-effort to reduce racing (explained below). For
// C, the caller of tryActiveWait removes the lock from the in-memory
// data-structure only if the request does not need to wait anywhere, which
// means it will immediately proceed to intent resolution. Additionally, if
// the lock has already been removed, it suggests that some other request has
// already claimed intent resolution (or done it), so this request does not
// need to do the resolution.
//
// Ideally, we would strengthen B and C -- a request should make a claim on
// intent resolution for a set of keys, and will either resolve the intent,
// or due to an error will return that claim so others can do so. A
// replicated lock (intent) would not be removed from the in-memory
// data-structure until it was actually gone.
// TODO(sumeer): do this cleaner solution for batched intent resolution.
//
// In the future we'd like to augment the lockTable with an understanding of
// finalized but not yet resolved locks. These locks will allow conflicting
// transactions to proceed with evaluation without the need to first remove
// all traces of them via a round of replication. This is discussed in more
// detail in #41720. Specifically, see mention of "contention footprint" and
// COMMITTED_BUT_NOT_REMOVABLE.
// Also, resolving these locks/intents would proceed without latching, so we
// would not rely on MVCC scanning to add discovered locks to the lock table,
// since the discovered locks may be stale.
//
// The return value is true iff it is actively waiting.
// Acquires l.mu, g.mu.
func (l *lockState) tryActiveWait(g *lockTableGuardImpl, sa spanset.SpanAccess, notify bool) bool {
l.mu.Lock()
Expand All @@ -1047,6 +1132,23 @@ func (l *lockState) tryActiveWait(g *lockTableGuardImpl, sa spanset.SpanAccess,
return false
}

var replicatedLockFinalizedTxn *roachpb.Transaction
if lockHolderTxn != nil {
finalizedTxn, ok := g.lt.finalizedTxnCache.get(lockHolderTxn.ID)
if ok {
if l.holder.holder[lock.Replicated].txn == nil {
// Only held unreplicated. Release immediately. We don't expect the
// caller to GC this lockState and instead will GC it in
// tryUpdateLock. Note that there may be other waiters, so the caller
// may have to wait behind them.
l.clearLockHolder()
l.lockIsFree()
} else {
replicatedLockFinalizedTxn = finalizedTxn
}
}
}

if sa == spanset.SpanReadOnly {
if lockHolderTxn == nil {
// Reads only care about locker, not a reservation.
Expand Down Expand Up @@ -1112,8 +1214,8 @@ func (l *lockState) tryActiveWait(g *lockTableGuardImpl, sa spanset.SpanAccess,
return false
}

// Need to wait.

// May need to wait.
wait := true
g.mu.Lock()
defer g.mu.Unlock()
if sa == spanset.SpanReadWrite {
Expand All @@ -1131,14 +1233,25 @@ func (l *lockState) tryActiveWait(g *lockTableGuardImpl, sa spanset.SpanAccess,
if qg == nil {
panic("lockTable bug")
}
qg.active = true
active := true
if replicatedLockFinalizedTxn != nil && l.queuedWriters.Front().Value.(*queuedGuard) == qg {
// First waiter, so should not wait.
active = false
wait = false
}
qg.active = active
} else {
// Not in queue so insert as active waiter.
qg := &queuedGuard{
guard: g,
active: true,
}
if l.queuedWriters.Len() == 0 {
if replicatedLockFinalizedTxn != nil {
// First waiter, so should not wait.
qg.active = false
wait = false
}
l.queuedWriters.PushFront(qg)
} else {
var e *list.Element
Expand All @@ -1149,6 +1262,11 @@ func (l *lockState) tryActiveWait(g *lockTableGuardImpl, sa spanset.SpanAccess,
}
}
if e == nil {
if replicatedLockFinalizedTxn != nil {
// First waiter, so should not wait.
qg.active = false
wait = false
}
l.queuedWriters.PushFront(qg)
} else {
l.queuedWriters.InsertAfter(qg, e)
Expand All @@ -1157,8 +1275,19 @@ func (l *lockState) tryActiveWait(g *lockTableGuardImpl, sa spanset.SpanAccess,
g.mu.locks[l] = struct{}{}
}
} else {
l.waitingReaders.PushFront(g)
g.mu.locks[l] = struct{}{}
if replicatedLockFinalizedTxn != nil {
// Don't add to waitingReaders since all readers in waitingReaders are
// active waiters, and this request is not an active waiter here.
wait = false
} else {
l.waitingReaders.PushFront(g)
g.mu.locks[l] = struct{}{}
}
}
if !wait {
g.toResolve = append(
g.toResolve, roachpb.MakeLockUpdate(replicatedLockFinalizedTxn, roachpb.Span{Key: l.key}))
return false
}
// Make it an active waiter.
g.key = l.key
Expand Down Expand Up @@ -1779,6 +1908,7 @@ func (t *lockTableImpl) ScanAndEnqueue(req Request, guard lockTableGuard) lockTa
if guard == nil {
g = newLockTableGuardImpl()
g.seqNum = atomic.AddUint64(&t.seqNum, 1)
g.lt = t
g.txn = req.txnMeta()
g.spans = req.LockSpans
g.readTS = req.readConflictTimestamp()
Expand Down Expand Up @@ -2102,6 +2232,11 @@ func stepToNextSpan(g *lockTableGuardImpl) *spanset.Span {
return nil
}

// TransactionIsFinalized implements the lockTable interface.
func (t *lockTableImpl) TransactionIsFinalized(txn *roachpb.Transaction) {
t.finalizedTxnCache.add(txn)
}

// Enable implements the lockTable interface.
func (t *lockTableImpl) Enable(seq roachpb.LeaseSequence) {
// Avoid disrupting other requests if the lockTable is already enabled.
Expand Down Expand Up @@ -2129,6 +2264,9 @@ func (t *lockTableImpl) Clear(disable bool) {
t.enabled = false
}
t.tryClearLocks(true /* force */)
// Also clear the finalized txn cache, since it won't be needed any time
// soon and consumes memory.
t.finalizedTxnCache.clear()
}

// For tests.
Expand Down
Loading

0 comments on commit ff2f337

Please sign in to comment.