From e20d2a222f850686f231ce36ad01c7ec3282f7e5 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Mon, 18 May 2020 14:05:50 -0400 Subject: [PATCH] kv/concurrency: avoid redundant txn pushes and batch intent resolution Fixes #48790. Informs #36876. Closes #31664. This commit adds a per-Range LRU cache of transactions that are known to be aborted or committed. We use this cache in the lockTableWaiter for two purposes: 1. when we see a lock held by a known-finalized txn, we neither wait out the kv.lock_table.coordinator_liveness_push_delay (10 ms) nor push the transactions record (RPC to leaseholder of pushee's txn record range). 2. we use the existence of a transaction in the cache as an indication that it may have abandoned multiple intents, perhaps due to a failure of the transaction coordinator node, so we begin deferring intent resolution to enable batching. Together, these two changes make us much more effective as cleaning up after failed transactions that have abandoned a large number of intents. The following example demonstrates this: ```sql --- BEFORE CREATE TABLE keys (k BIGINT NOT NULL PRIMARY KEY); BEGIN; INSERT INTO keys SELECT generate_series(1, 10000); ROLLBACK; SELECT * FROM keys; k ----- (0 rows) Time: 2m50.801304266s CREATE TABLE keys2 (k BIGINT NOT NULL PRIMARY KEY); BEGIN; INSERT INTO keys2 SELECT generate_series(1, 10000); ROLLBACK; INSERT INTO keys2 SELECT generate_series(1, 10000); INSERT 10000 Time: 3m26.874571045s --- AFTER CREATE TABLE keys (k BIGINT NOT NULL PRIMARY KEY); BEGIN; INSERT INTO keys SELECT generate_series(1, 10000); ROLLBACK; SELECT * FROM keys; k ----- (0 rows) Time: 5.138220753s CREATE TABLE keys2 (k BIGINT NOT NULL PRIMARY KEY); BEGIN; INSERT INTO keys2 SELECT generate_series(1, 10000); ROLLBACK; INSERT INTO keys2 SELECT generate_series(1, 10000); INSERT 10000 Time: 48.763541138s ``` Notice that we are still not as fast at cleaning up intents on the insertion path as we are at doing so on the retrieval path. This is because we only batch the resolution of intents observed by a single request at a time. For the scanning case, a single ScanRequest notices all 10,000 intents and cleans them all up together. For the insertion case, each of the 10,000 PutRequests notice a single intent, and each intent is cleaned up individually. So this case is only benefited by the first part of this change (no liveness delay or txn record push) and not the second part of this change (intent resolution batching). For this reason, we still haven't solved all of #36876. To completely address that, we'll need to defer propagation of WriteIntentError during batch evaluation, like we do for WriteTooOldErrors. Or we can wait out the future LockTable changes - once we remove all cases where an intent is not "discovered", the changes here will effectively address #36876. This was a partial regression in v20.1, so we'll want to backport this to that release branch. This change is on the larger side, but I feel ok about it because the mechanics aren't too tricky. I'll wait a week before backporting just to see if anything falls out. Release note (bug fix): Abandoned intents due to failed transaction coordinators are now cleaned up much faster. This resolves a regression in v20.1.0 compared to prior releases. --- .../concurrency/concurrency_control.go | 5 + .../concurrency/concurrency_manager.go | 8 +- .../concurrency/concurrency_manager_test.go | 49 +++- .../kvserver/concurrency/lock_table_waiter.go | 170 +++++++++++- .../concurrency/lock_table_waiter_test.go | 141 +++++++++- .../acquire_wrong_txn_race | 3 +- .../clear_abandoned_intents | 259 ++++++++++++++++++ .../concurrency_manager/discovered_lock | 12 +- .../concurrency_manager/range_state_listener | 21 +- .../testdata/concurrency_manager/uncertainty | 6 +- 10 files changed, 635 insertions(+), 39 deletions(-) create mode 100644 pkg/kv/kvserver/concurrency/testdata/concurrency_manager/clear_abandoned_intents diff --git a/pkg/kv/kvserver/concurrency/concurrency_control.go b/pkg/kv/kvserver/concurrency/concurrency_control.go index efbb42f907d1..1f12071c8a74 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_control.go +++ b/pkg/kv/kvserver/concurrency/concurrency_control.go @@ -632,6 +632,11 @@ type lockTableWaiter interface { // and, in turn, remove this method. This will likely fall out of pulling // all replicated locks into the lockTable. WaitOnLock(context.Context, Request, *roachpb.Intent) *Error + + // ClearCaches wipes all caches maintained by the lockTableWaiter. This is + // primarily used to recover memory when a replica loses a lease. However, + // it is also used in tests to reset the state of the lockTableWaiter. + ClearCaches() } // txnWaitQueue holds a collection of wait-queues for transaction records. diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager.go b/pkg/kv/kvserver/concurrency/concurrency_manager.go index a3f61c923b58..7745da75d7f1 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager.go @@ -72,7 +72,8 @@ func (c *Config) initDefaults() { // NewManager creates a new concurrency Manager structure. func NewManager(cfg Config) Manager { cfg.initDefaults() - return &managerImpl{ + m := new(managerImpl) + *m = managerImpl{ // TODO(nvanbenschoten): move pkg/storage/spanlatch to a new // pkg/storage/concurrency/latch package. Make it implement the // latchManager interface directly, if possible. @@ -89,6 +90,7 @@ func NewManager(cfg Config) Manager { st: cfg.Settings, stopper: cfg.Stopper, ir: cfg.IntentResolver, + lm: m, disableTxnPushing: cfg.DisableTxnPushing, }, // TODO(nvanbenschoten): move pkg/storage/txnwait to a new @@ -102,6 +104,7 @@ func NewManager(cfg Config) Manager { Knobs: cfg.TxnWaitKnobs, }), } + return m } // SequenceReq implements the RequestSequencer interface. @@ -342,6 +345,9 @@ func (m *managerImpl) OnRangeLeaseUpdated(isLeaseholder bool) { const disable = true m.lt.Clear(disable) m.twq.Clear(disable) + // Also clear caches, since they won't be needed any time soon and + // consume memory. + m.ltw.ClearCaches() } } diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go index a8fbe09f26ab..18fae0a421f2 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go @@ -228,21 +228,35 @@ func TestConcurrencyManagerBasic(t *testing.T) { d.Fatalf(t, "unknown request: %s", reqName) } - var txnName string - d.ScanArgs(t, "txn", &txnName) - txn, ok := c.txnsByName[txnName] - if !ok { - d.Fatalf(t, "unknown txn %s", txnName) - } + // Each roachpb.Intent is provided on an indented line. + var intents []roachpb.Intent + singleReqLines := strings.Split(d.Input, "\n") + for _, line := range singleReqLines { + var err error + d.Cmd, d.CmdArgs, err = datadriven.ParseLine(line) + if err != nil { + d.Fatalf(t, "error parsing single intent: %v", err) + } + if d.Cmd != "intent" { + d.Fatalf(t, "expected \"intent\", found %s", d.Cmd) + } - var key string - d.ScanArgs(t, "key", &key) + var txnName string + d.ScanArgs(t, "txn", &txnName) + txn, ok := c.txnsByName[txnName] + if !ok { + d.Fatalf(t, "unknown txn %s", txnName) + } + + var key string + d.ScanArgs(t, "key", &key) + + intents = append(intents, roachpb.MakeIntent(&txn.TxnMeta, roachpb.Key(key))) + } opName := fmt.Sprintf("handle write intent error %s", reqName) mon.runAsync(opName, func(ctx context.Context) { - wiErr := &roachpb.WriteIntentError{Intents: []roachpb.Intent{ - roachpb.MakeIntent(&txn.TxnMeta, roachpb.Key(key)), - }} + wiErr := &roachpb.WriteIntentError{Intents: intents} guard, err := m.HandleWriterIntentError(ctx, prev, wiErr) if err != nil { log.Eventf(ctx, "handled %v, returned error: %v", wiErr, err) @@ -579,6 +593,19 @@ func (c *cluster) ResolveIntent( return nil } +// ResolveIntents implements the concurrency.IntentResolver interface. +func (c *cluster) ResolveIntents( + ctx context.Context, intents []roachpb.LockUpdate, opts intentresolver.ResolveOptions, +) *roachpb.Error { + log.Eventf(ctx, "resolving a batch of %d intent(s)", len(intents)) + for _, intent := range intents { + if err := c.ResolveIntent(ctx, intent, opts); err != nil { + return err + } + } + return nil +} + func (c *cluster) newTxnID() uuid.UUID { c.mu.Lock() defer c.mu.Unlock() diff --git a/pkg/kv/kvserver/concurrency/lock_table_waiter.go b/pkg/kv/kvserver/concurrency/lock_table_waiter.go index c9100c21584f..b94f285a5006 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_waiter.go +++ b/pkg/kv/kvserver/concurrency/lock_table_waiter.go @@ -24,7 +24,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" ) // LockTableLivenessPushDelay sets the delay before pushing in order to detect @@ -43,7 +45,7 @@ var LockTableLivenessPushDelay = settings.RegisterDurationSetting( // the cost on each of a transaction's abandoned locks and instead only pay // it once per abandoned transaction per range or per node. This could come // in a few different forms, including: - // - a per-wide cache of recently detected abandoned transaction IDs + // - a per-store cache of recently detected abandoned transaction IDs // - a per-range reverse index from transaction ID to locked keys // // TODO(nvanbenschoten): increasing this default value. @@ -83,6 +85,18 @@ type lockTableWaiterImpl struct { st *cluster.Settings stopper *stop.Stopper ir IntentResolver + lm LockManager + + // finalizedTxnCache is a small LRU cache that tracks transactions that + // were pushed and found to be finalized (COMMITTED or ABORTED). It is + // used as an optimization to avoid repeatedly pushing the transaction + // record when cleaning up the intents of an abandoned transaction. + // + // NOTE: it probably makes sense to maintain a single finalizedTxnCache + // across all Ranges on a Store instead of an individual cache per + // Range. For now, we don't do this because we don't share any state + // between separate concurrency.Manager instances. + finalizedTxnCache txnCache // When set, WriteIntentError are propagated instead of pushing // conflicting transactions. @@ -97,18 +111,22 @@ type IntentResolver interface { // provided pushee transaction immediately, if possible. Otherwise, it will // block until the pushee transaction is finalized or eventually can be // pushed successfully. + // TODO(nvanbenschoten): return a *roachpb.Transaction here. PushTransaction( context.Context, *enginepb.TxnMeta, roachpb.Header, roachpb.PushTxnType, ) (roachpb.Transaction, *Error) - // ResolveIntent resolves the provided intent according to the options. + // ResolveIntent synchronously resolves the provided intent. ResolveIntent(context.Context, roachpb.LockUpdate, intentresolver.ResolveOptions) *Error + + // ResolveIntents synchronously resolves the provided batch of intents. + ResolveIntents(context.Context, []roachpb.LockUpdate, intentresolver.ResolveOptions) *Error } // WaitOn implements the lockTableWaiter interface. func (w *lockTableWaiterImpl) WaitOn( ctx context.Context, req Request, guard lockTableGuard, -) *Error { +) (err *Error) { newStateC := guard.NewStateChan() ctxDoneC := ctx.Done() shouldQuiesceC := w.stopper.ShouldQuiesce() @@ -116,6 +134,16 @@ func (w *lockTableWaiterImpl) WaitOn( var timer *timeutil.Timer var timerC <-chan time.Time var timerWaitingState waitingState + // Used to defer the resolution of duplicate intents. Intended to allow + // batching of intent resolution while cleaning up after abandoned txns. A + // request may begin deferring intent resolution and then be forced to wait + // again on other locks. This is ok, as the request that deferred intent + // resolution will often be the new reservation holder for those intents' + // keys. Even when this is not the case (e.g. the request is read-only so it + // can't hold reservations), any other requests that slip ahead will simply + // re-discover the intent(s) during evaluation and resolve them themselves. + var deferredResolution []roachpb.LockUpdate + defer w.resolveDeferredIntents(ctx, &err, &deferredResolution) for { select { case <-newStateC: @@ -138,10 +166,7 @@ func (w *lockTableWaiterImpl) WaitOn( // The purpose of the waitForDistinguished state is to avoid waiting // out the longer deadlock detection delay before recognizing and // recovering from the failure of a transaction coordinator for - // *each* of that transaction's previously written intents. If we - // had a cache of aborted transaction IDs that allowed us to notice - // and quickly resolve abandoned intents then we might be able to - // get rid of this state. + // *each* of that transaction's previously written intents. livenessPush := state.stateKind == waitForDistinguished deadlockPush := true @@ -167,6 +192,54 @@ func (w *lockTableWaiterImpl) WaitOn( continue } + // If we know that a lock holder is already finalized (COMMITTED + // or ABORTED), there's no reason to push it again. Instead, we + // can skip directly to intent resolution. + // + // As an optimization, we defer the intent resolution until the + // we're done waiting on all conflicting locks in this function. + // This allows us to accumulate a group of intents to resolve + // and send them together as a batch. + // + // Remember that if the lock is held, there will be at least one + // waiter with livenessPush = true (the distinguished waiter), + // so at least one request will enter this branch and perform + // the cleanup on behalf of all other waiters. + if livenessPush { + if pusheeTxn, ok := w.finalizedTxnCache.get(state.txn.ID); ok { + resolve := roachpb.MakeLockUpdate(pusheeTxn, roachpb.Span{Key: state.key}) + deferredResolution = append(deferredResolution, resolve) + + // Inform the LockManager that the lock has been updated with a + // finalized status so that it gets removed from the lockTable + // and we are allowed to proceed. + // + // For unreplicated locks, this is all that is needed - the + // lockTable is the source of truth so, once removed, the + // unreplicated lock is gone. It is perfectly valid for us to + // instruct the lock to be released because we know that the + // lock's owner is finalized. + // + // For replicated locks, this is a bit of a lie. The lock hasn't + // actually been updated yet, but we will be conducting intent + // resolution in the future (before we observe the corresponding + // MVCC state). This is safe because we already handle cases + // where locks exist only in the MVCC keyspace and not in the + // lockTable. + // + // In the future, we'd like to make this more explicit. + // Specifically, we'd like to augment the lockTable with an + // understanding of finalized but not yet resolved locks. These + // locks will allow conflicting transactions to proceed with + // evaluation without the need to first remove all traces of + // them via a round of replication. This is discussed in more + // detail in #41720. Specifically, see mention of "contention + // footprint" and COMMITTED_BUT_NOT_REMOVABLE. + w.lm.OnLockUpdated(ctx, &deferredResolution[len(deferredResolution)-1]) + continue + } + } + // The request should push to detect abandoned locks due to // failed transaction coordinators, detect deadlocks between // transactions, or both, but only after delay. This delay @@ -266,7 +339,6 @@ func (w *lockTableWaiterImpl) WaitOn( // behind a lock. In this case, the request has a dependency on the // conflicting request but not necessarily the entire conflicting // transaction. - var err *Error if timerWaitingState.held { err = w.pushLockTxn(ctx, req, timerWaitingState) } else { @@ -319,6 +391,11 @@ func (w *lockTableWaiterImpl) WaitOnLock( }) } +// ClearCaches implements the lockTableWaiter interface. +func (w *lockTableWaiterImpl) ClearCaches() { + w.finalizedTxnCache.clear() +} + // pushLockTxn pushes the holder of the provided lock. // // The method blocks until the lock holder transaction experiences a state @@ -356,6 +433,13 @@ func (w *lockTableWaiterImpl) pushLockTxn( return err } + // If the transaction is finalized, add it to the finalizedTxnCache. This + // avoids needing to push it again if we find another one of its locks and + // allows for batching of intent resolution. + if pusheeTxn.Status.IsFinalized() { + w.finalizedTxnCache.add(&pusheeTxn) + } + // If the push succeeded then the lock holder transaction must have // experienced a state transition such that it no longer conflicts with // the pusher's request. This state transition could have been any of the @@ -450,6 +534,20 @@ func (w *lockTableWaiterImpl) pushHeader(req Request) roachpb.Header { return h } +// resolveDeferredIntents resolves the batch of intents if the provided error is +// nil. The batch of intents may be resolved more efficiently than if they were +// resolved individually. +func (w *lockTableWaiterImpl) resolveDeferredIntents( + ctx context.Context, err **Error, deferredResolution *[]roachpb.LockUpdate, +) { + if (*err != nil) || (len(*deferredResolution) == 0) { + return + } + // See pushLockTxn for an explanation of these options. + opts := intentresolver.ResolveOptions{Poison: true} + *err = w.ir.ResolveIntents(ctx, *deferredResolution, opts) +} + // watchForNotifications selects on the provided channel and watches for any // updates. If the channel is ever notified, it calls the provided context // cancelation function and exits. @@ -469,6 +567,62 @@ func (w *lockTableWaiterImpl) watchForNotifications( } } +// txnCache is a small LRU cache that holds Transaction objects. +// +// The zero value of this struct is ready for use. +type txnCache struct { + mu syncutil.Mutex + txns [8]*roachpb.Transaction // [MRU, ..., LRU] +} + +func (c *txnCache) get(id uuid.UUID) (*roachpb.Transaction, bool) { + c.mu.Lock() + defer c.mu.Unlock() + if idx := c.getIdxLocked(id); idx >= 0 { + txn := c.txns[idx] + c.moveFrontLocked(txn, idx) + return txn, true + } + return nil, false +} + +func (c *txnCache) add(txn *roachpb.Transaction) { + c.mu.Lock() + defer c.mu.Unlock() + if idx := c.getIdxLocked(txn.ID); idx >= 0 { + c.moveFrontLocked(txn, idx) + } else { + c.insertFrontLocked(txn) + } +} + +func (c *txnCache) clear() { + c.mu.Lock() + defer c.mu.Unlock() + for i := range c.txns { + c.txns[i] = nil + } +} + +func (c *txnCache) getIdxLocked(id uuid.UUID) int { + for i, txn := range c.txns { + if txn != nil && txn.ID == id { + return i + } + } + return -1 +} + +func (c *txnCache) moveFrontLocked(txn *roachpb.Transaction, cur int) { + copy(c.txns[1:cur+1], c.txns[:cur]) + c.txns[0] = txn +} + +func (c *txnCache) insertFrontLocked(txn *roachpb.Transaction) { + copy(c.txns[1:], c.txns[:]) + c.txns[0] = txn +} + func hasMinPriority(txn *enginepb.TxnMeta) bool { return txn != nil && txn.Priority == enginepb.MinTxnPriority } diff --git a/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go b/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go index b1d593f190f1..5540347bae3a 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go +++ b/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go @@ -12,6 +12,8 @@ package concurrency import ( "context" + "fmt" + "math/rand" "testing" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver" @@ -23,14 +25,17 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/stretchr/testify/require" ) type mockIntentResolver struct { - pushTxn func(context.Context, *enginepb.TxnMeta, roachpb.Header, roachpb.PushTxnType) (roachpb.Transaction, *Error) - resolveIntent func(context.Context, roachpb.LockUpdate) *Error + pushTxn func(context.Context, *enginepb.TxnMeta, roachpb.Header, roachpb.PushTxnType) (roachpb.Transaction, *Error) + resolveIntent func(context.Context, roachpb.LockUpdate) *Error + resolveIntents func(context.Context, []roachpb.LockUpdate) *Error } +// mockIntentResolver implements the IntentResolver interface. func (m *mockIntentResolver) PushTransaction( ctx context.Context, txn *enginepb.TxnMeta, h roachpb.Header, pushType roachpb.PushTxnType, ) (roachpb.Transaction, *Error) { @@ -43,12 +48,19 @@ func (m *mockIntentResolver) ResolveIntent( return m.resolveIntent(ctx, intent) } +func (m *mockIntentResolver) ResolveIntents( + ctx context.Context, intents []roachpb.LockUpdate, opts intentresolver.ResolveOptions, +) *Error { + return m.resolveIntents(ctx, intents) +} + type mockLockTableGuard struct { state waitingState signal chan struct{} stateObserved chan struct{} } +// mockLockTableGuard implements the lockTableGuard interface. func (g *mockLockTableGuard) ShouldWait() bool { return true } func (g *mockLockTableGuard) NewStateChan() chan struct{} { return g.signal } func (g *mockLockTableGuard) CurState() waitingState { @@ -60,18 +72,30 @@ func (g *mockLockTableGuard) CurState() waitingState { } func (g *mockLockTableGuard) notify() { g.signal <- struct{}{} } +// mockLockTableGuard implements the LockManager interface. +func (g *mockLockTableGuard) OnLockAcquired(_ context.Context, _ *roachpb.LockUpdate) { + panic("unimplemented") +} +func (g *mockLockTableGuard) OnLockUpdated(_ context.Context, up *roachpb.LockUpdate) { + if g.state.held && g.state.txn.ID == up.Txn.ID && g.state.key.Equal(up.Key) { + g.state = waitingState{stateKind: doneWaiting} + g.notify() + } +} + func setupLockTableWaiterTest() (*lockTableWaiterImpl, *mockIntentResolver, *mockLockTableGuard) { ir := &mockIntentResolver{} st := cluster.MakeTestingClusterSettings() LockTableLivenessPushDelay.Override(&st.SV, 0) LockTableDeadlockDetectionPushDelay.Override(&st.SV, 0) + guard := &mockLockTableGuard{ + signal: make(chan struct{}, 1), + } w := &lockTableWaiterImpl{ st: st, stopper: stop.NewStopper(), ir: ir, - } - guard := &mockLockTableGuard{ - signal: make(chan struct{}, 1), + lm: guard, } return w, ir, guard } @@ -394,3 +418,110 @@ func TestLockTableWaiterIntentResolverError(t *testing.T) { } }) } + +// TestLockTableWaiterDeferredIntentResolverError tests that the lockTableWaiter +// propagates errors from its intent resolver when it resolves intent batches. +func TestLockTableWaiterDeferredIntentResolverError(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + w, ir, g := setupLockTableWaiterTest() + defer w.stopper.Stop(ctx) + + txn := makeTxnProto("request") + req := Request{ + Txn: &txn, + Timestamp: txn.ReadTimestamp, + } + keyA := roachpb.Key("keyA") + pusheeTxn := makeTxnProto("pushee") + + // Add the conflicting txn to the finalizedTxnCache so that the request + // avoids the transaction record push and defers the intent resolution. + pusheeTxn.Status = roachpb.ABORTED + w.finalizedTxnCache.add(&pusheeTxn) + + g.state = waitingState{ + stateKind: waitForDistinguished, + txn: &pusheeTxn.TxnMeta, + key: keyA, + held: true, + guardAccess: spanset.SpanReadWrite, + } + g.notify() + + // Errors are propagated when observed while resolving batches of intents. + err1 := roachpb.NewErrorf("error1") + ir.resolveIntents = func(_ context.Context, intents []roachpb.LockUpdate) *Error { + require.Len(t, intents, 1) + require.Equal(t, keyA, intents[0].Key) + require.Equal(t, pusheeTxn.ID, intents[0].Txn.ID) + require.Equal(t, roachpb.ABORTED, intents[0].Status) + return err1 + } + err := w.WaitOn(ctx, req, g) + require.Equal(t, err1, err) +} + +func TestTxnCache(t *testing.T) { + var c txnCache + const overflow = 4 + var txns [len(c.txns) + overflow]roachpb.Transaction + for i := range txns { + txns[i] = makeTxnProto(fmt.Sprintf("txn %d", i)) + } + + // Add each txn to the cache. Observe LRU eviction policy. + for i := range txns { + txn := &txns[i] + c.add(txn) + for j, txnInCache := range c.txns { + if j <= i { + require.Equal(t, &txns[i-j], txnInCache) + } else { + require.Nil(t, txnInCache) + } + } + } + + // Access each txn in the cache in reverse order. + // Should reverse the order of the cache because of LRU policy. + for i := len(txns) - 1; i >= 0; i-- { + txn := &txns[i] + txnInCache, ok := c.get(txn.ID) + if i < overflow { + // Expect overflow. + require.Nil(t, txnInCache) + require.False(t, ok) + } else { + // Should be in cache. + require.Equal(t, txn, txnInCache) + require.True(t, ok) + } + } + + // Cache should be in order again. + for i, txnInCache := range c.txns { + require.Equal(t, &txns[i+overflow], txnInCache) + } +} + +func BenchmarkTxnCache(b *testing.B) { + rng := rand.New(rand.NewSource(timeutil.Now().UnixNano())) + var c txnCache + var txns [len(c.txns) + 4]roachpb.Transaction + for i := range txns { + txns[i] = makeTxnProto(fmt.Sprintf("txn %d", i)) + } + txnOps := make([]*roachpb.Transaction, b.N) + for i := range txnOps { + txnOps[i] = &txns[rng.Intn(len(txns))] + } + b.ResetTimer() + for i, txnOp := range txnOps { + if i%2 == 0 { + c.add(txnOp) + } else { + _, _ = c.get(txnOp.ID) + } + } +} diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/acquire_wrong_txn_race b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/acquire_wrong_txn_race index 4ed4312dca3f..d13239f2799e 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/acquire_wrong_txn_race +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/acquire_wrong_txn_race @@ -120,7 +120,8 @@ sequence req=req3 [4] sequence req3: acquiring latches [4] sequence req3: blocked on select in spanlatch.(*Manager).waitForSignal -handle-write-intent-error req=req2 txn=txn1 key=k +handle-write-intent-error req=req2 + intent txn=txn1 key=k ---- [3] sequence reqRes1: sequencing complete, returned guard [5] handle write intent error req2: handled conflicting intents on "k", released latches diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/clear_abandoned_intents b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/clear_abandoned_intents new file mode 100644 index 000000000000..661e132727c4 --- /dev/null +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/clear_abandoned_intents @@ -0,0 +1,259 @@ +# ------------------------------------------------------------- +# A scan finds 10 abandoned intents from same txn +# ------------------------------------------------------------- + +new-txn name=txn1 ts=10,1 epoch=0 +---- + +new-txn name=txn2 ts=10,1 epoch=0 +---- + +new-request name=req1 txn=txn1 ts=10,1 + scan key=a endkey=z +---- + +sequence req=req1 +---- +[1] sequence req1: sequencing request +[1] sequence req1: acquiring latches +[1] sequence req1: scanning lock table for conflicting locks +[1] sequence req1: sequencing complete, returned guard + +handle-write-intent-error req=req1 + intent txn=txn2 key=a + intent txn=txn2 key=b + intent txn=txn2 key=c + intent txn=txn2 key=d + intent txn=txn2 key=e + intent txn=txn2 key=f + intent txn=txn2 key=g + intent txn=txn2 key=h + intent txn=txn2 key=i + intent txn=txn2 key=j +---- +[2] handle write intent error req1: handled conflicting intents on "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", released latches + +debug-lock-table +---- +global: num=10 + lock: "a" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + lock: "b" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + lock: "c" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + lock: "d" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + lock: "e" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + lock: "f" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + lock: "g" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + lock: "h" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + lock: "i" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + lock: "j" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] +local: num=0 + +sequence req=req1 +---- +[3] sequence req1: re-sequencing request +[3] sequence req1: acquiring latches +[3] sequence req1: scanning lock table for conflicting locks +[3] sequence req1: waiting in lock wait-queues +[3] sequence req1: pushing timestamp of txn 00000002 above 0.000000010,1 +[3] sequence req1: blocked on select in concurrency_test.(*cluster).PushTransaction + +debug-lock-table +---- +global: num=10 + lock: "a" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + waiting readers: + req: 1, txn: 00000001-0000-0000-0000-000000000000 + distinguished req: 1 + lock: "b" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + lock: "c" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + lock: "d" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + lock: "e" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + lock: "f" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + lock: "g" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + lock: "h" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + lock: "i" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + lock: "j" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] +local: num=0 + +# txn1 is the distinguished waiter on key "a". It will push txn2, notice that it +# is aborted, and then resolve key "a". Once txn2 is in the finalizedTxnCache, +# txn1 will create a batch to resolve all other keys together. +on-txn-updated txn=txn2 status=aborted +---- +[-] update txn: aborting txn2 +[3] sequence req1: resolving intent "a" for txn 00000002 with ABORTED status +[3] sequence req1: resolving a batch of 9 intent(s) +[3] sequence req1: resolving intent "b" for txn 00000002 with ABORTED status +[3] sequence req1: resolving intent "c" for txn 00000002 with ABORTED status +[3] sequence req1: resolving intent "d" for txn 00000002 with ABORTED status +[3] sequence req1: resolving intent "e" for txn 00000002 with ABORTED status +[3] sequence req1: resolving intent "f" for txn 00000002 with ABORTED status +[3] sequence req1: resolving intent "g" for txn 00000002 with ABORTED status +[3] sequence req1: resolving intent "h" for txn 00000002 with ABORTED status +[3] sequence req1: resolving intent "i" for txn 00000002 with ABORTED status +[3] sequence req1: resolving intent "j" for txn 00000002 with ABORTED status +[3] sequence req1: acquiring latches +[3] sequence req1: scanning lock table for conflicting locks +[3] sequence req1: sequencing complete, returned guard + +debug-lock-table +---- +global: num=0 +local: num=0 + +finish req=req1 +---- +[-] finish req1: finishing request + +reset namespace +---- + +# ------------------------------------------------------------- +# A series of 3 puts find 1 abandoned intent each from same txn +# ------------------------------------------------------------- + +new-txn name=txn1 ts=10,1 epoch=0 +---- + +new-txn name=txn2 ts=10,1 epoch=0 +---- + +new-request name=req1 txn=txn1 ts=10,1 + put key=a value=v1 + put key=b value=v2 + put key=c value=v3 +---- + +sequence req=req1 +---- +[1] sequence req1: sequencing request +[1] sequence req1: acquiring latches +[1] sequence req1: scanning lock table for conflicting locks +[1] sequence req1: sequencing complete, returned guard + +handle-write-intent-error req=req1 + intent txn=txn2 key=a +---- +[2] handle write intent error req1: handled conflicting intents on "a", released latches + +debug-lock-table +---- +global: num=1 + lock: "a" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + queued writers: + active: false req: 2, txn: 00000001-0000-0000-0000-000000000000 +local: num=0 + +sequence req=req1 +---- +[3] sequence req1: re-sequencing request +[3] sequence req1: acquiring latches +[3] sequence req1: scanning lock table for conflicting locks +[3] sequence req1: waiting in lock wait-queues +[3] sequence req1: pushing txn 00000002 to abort +[3] sequence req1: blocked on select in concurrency_test.(*cluster).PushTransaction + +on-txn-updated txn=txn2 status=committed +---- +[-] update txn: committing txn2 +[3] sequence req1: resolving intent "a" for txn 00000002 with COMMITTED status +[3] sequence req1: acquiring latches +[3] sequence req1: scanning lock table for conflicting locks +[3] sequence req1: sequencing complete, returned guard + +handle-write-intent-error req=req1 + intent txn=txn2 key=b +---- +[4] handle write intent error req1: handled conflicting intents on "b", released latches + +debug-lock-table +---- +global: num=2 + lock: "a" + res: req: 2, txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000010,1, seq: 0 + lock: "b" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + queued writers: + active: false req: 2, txn: 00000001-0000-0000-0000-000000000000 +local: num=0 + +sequence req=req1 +---- +[5] sequence req1: re-sequencing request +[5] sequence req1: acquiring latches +[5] sequence req1: scanning lock table for conflicting locks +[5] sequence req1: waiting in lock wait-queues +[5] sequence req1: resolving a batch of 1 intent(s) +[5] sequence req1: resolving intent "b" for txn 00000002 with COMMITTED status +[5] sequence req1: acquiring latches +[5] sequence req1: scanning lock table for conflicting locks +[5] sequence req1: sequencing complete, returned guard + +handle-write-intent-error req=req1 + intent txn=txn2 key=c +---- +[6] handle write intent error req1: handled conflicting intents on "c", released latches + +debug-lock-table +---- +global: num=3 + lock: "a" + res: req: 2, txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000010,1, seq: 0 + lock: "b" + res: req: 2, txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000010,1, seq: 0 + lock: "c" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + queued writers: + active: false req: 2, txn: 00000001-0000-0000-0000-000000000000 +local: num=0 + +sequence req=req1 +---- +[7] sequence req1: re-sequencing request +[7] sequence req1: acquiring latches +[7] sequence req1: scanning lock table for conflicting locks +[7] sequence req1: waiting in lock wait-queues +[7] sequence req1: resolving a batch of 1 intent(s) +[7] sequence req1: resolving intent "c" for txn 00000002 with COMMITTED status +[7] sequence req1: acquiring latches +[7] sequence req1: scanning lock table for conflicting locks +[7] sequence req1: sequencing complete, returned guard + +debug-lock-table +---- +global: num=3 + lock: "a" + res: req: 2, txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000010,1, seq: 0 + lock: "b" + res: req: 2, txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000010,1, seq: 0 + lock: "c" + res: req: 2, txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000010,1, seq: 0 +local: num=0 + +finish req=req1 +---- +[-] finish req1: finishing request + +reset namespace +---- diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/discovered_lock b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/discovered_lock index e0fb5f43f13c..394a21603b75 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/discovered_lock +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/discovered_lock @@ -20,7 +20,8 @@ sequence req=req1 [1] sequence req1: scanning lock table for conflicting locks [1] sequence req1: sequencing complete, returned guard -handle-write-intent-error req=req1 txn=txn1 key=k +handle-write-intent-error req=req1 + intent txn=txn1 key=k ---- [2] handle write intent error req1: handled conflicting intents on "k", released latches @@ -82,7 +83,8 @@ sequence req=req1 [1] sequence req1: scanning lock table for conflicting locks [1] sequence req1: sequencing complete, returned guard -handle-write-intent-error req=req1 txn=txn1 key=k +handle-write-intent-error req=req1 + intent txn=txn1 key=k ---- [2] handle write intent error req1: pushing timestamp of txn 00000001 above 0.000000012,1 [2] handle write intent error req1: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -139,7 +141,8 @@ sequence req=req1 [1] sequence req1: scanning lock table for conflicting locks [1] sequence req1: sequencing complete, returned guard -handle-write-intent-error req=req1 txn=txn1 key=k +handle-write-intent-error req=req1 + intent txn=txn1 key=k ---- [2] handle write intent error req1: pushing txn 00000001 to abort [2] handle write intent error req1: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -197,7 +200,8 @@ sequence req=req1 [1] sequence req1: scanning lock table for conflicting locks [1] sequence req1: sequencing complete, returned guard -handle-write-intent-error req=req1 txn=txn1 key=k +handle-write-intent-error req=req1 + intent txn=txn1 key=k ---- [2] handle write intent error req1: pushing timestamp of txn 00000001 above 0.000000012,1 [2] handle write intent error req1: blocked on select in concurrency_test.(*cluster).PushTransaction diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/range_state_listener b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/range_state_listener index 79c3ba43b5ad..5ca205f792cb 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/range_state_listener +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/range_state_listener @@ -119,7 +119,8 @@ debug-lock-table global: num=0 local: num=0 -handle-write-intent-error req=req2 txn=txn1 key=k +handle-write-intent-error req=req2 + intent txn=txn1 key=k ---- [3] handle write intent error req2: pushing txn 00000001 to abort [3] handle write intent error req2: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -142,7 +143,8 @@ sequence req=req2 [4] sequence req2: scanning lock table for conflicting locks [4] sequence req2: sequencing complete, returned guard -handle-write-intent-error req=req2 txn=txn1 key=k2 +handle-write-intent-error req=req2 + intent txn=txn1 key=k2 ---- [5] handle write intent error req2: pushing timestamp of txn 00000001 above 0.000000010,1 [5] handle write intent error req2: resolving intent "k2" for txn 00000001 with COMMITTED status @@ -190,7 +192,8 @@ sequence req=req3 [7] sequence req3: scanning lock table for conflicting locks [7] sequence req3: sequencing complete, returned guard -handle-write-intent-error req=req3 txn=txn2 key=k +handle-write-intent-error req=req3 + intent txn=txn2 key=k ---- [8] handle write intent error req3: handled conflicting intents on "k", released latches @@ -357,7 +360,8 @@ debug-lock-table global: num=0 local: num=0 -handle-write-intent-error req=req2 txn=txn1 key=k +handle-write-intent-error req=req2 + intent txn=txn1 key=k ---- [3] handle write intent error req2: handled conflicting intents on "k", released latches @@ -526,7 +530,8 @@ debug-lock-table global: num=0 local: num=0 -handle-write-intent-error req=req2 txn=txn1 key=k +handle-write-intent-error req=req2 + intent txn=txn1 key=k ---- [3] handle write intent error req2: pushing txn 00000001 to abort [3] handle write intent error req2: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -549,7 +554,8 @@ sequence req=req2 [4] sequence req2: scanning lock table for conflicting locks [4] sequence req2: sequencing complete, returned guard -handle-write-intent-error req=req2 txn=txn1 key=k2 +handle-write-intent-error req=req2 + intent txn=txn1 key=k2 ---- [5] handle write intent error req2: pushing timestamp of txn 00000001 above 0.000000010,1 [5] handle write intent error req2: resolving intent "k2" for txn 00000001 with COMMITTED status @@ -672,7 +678,8 @@ debug-lock-table global: num=0 local: num=0 -handle-write-intent-error req=req2 txn=txn1 key=k +handle-write-intent-error req=req2 + intent txn=txn1 key=k ---- [3] handle write intent error req2: handled conflicting intents on "k", released latches diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/uncertainty b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/uncertainty index 035cf1e73aee..7ba304a44578 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/uncertainty +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/uncertainty @@ -22,7 +22,8 @@ sequence req=req1 [1] sequence req1: scanning lock table for conflicting locks [1] sequence req1: sequencing complete, returned guard -handle-write-intent-error req=req1 txn=txn1 key=k +handle-write-intent-error req=req1 + intent txn=txn1 key=k ---- [2] handle write intent error req1: handled conflicting intents on "k", released latches @@ -82,7 +83,8 @@ sequence req=req1 [1] sequence req1: scanning lock table for conflicting locks [1] sequence req1: sequencing complete, returned guard -handle-write-intent-error req=req1 txn=txn1 key=k +handle-write-intent-error req=req1 + intent txn=txn1 key=k ---- [2] handle write intent error req1: handled conflicting intents on "k", released latches