diff --git a/pkg/kv/kvserver/concurrency/concurrency_control.go b/pkg/kv/kvserver/concurrency/concurrency_control.go index 7881eaf7e1c8..0f3e71f2c719 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_control.go +++ b/pkg/kv/kvserver/concurrency/concurrency_control.go @@ -632,6 +632,11 @@ type lockTableWaiter interface { // and, in turn, remove this method. This will likely fall out of pulling // all replicated locks into the lockTable. WaitOnLock(context.Context, Request, *roachpb.Intent) *Error + + // ClearCaches wipes all caches maintained by the lockTableWaiter. This is + // primarily used to recover memory when a replica loses a lease. However, + // it is also used in tests to reset the state of the lockTableWaiter. + ClearCaches() } // txnWaitQueue holds a collection of wait-queues for transaction records. diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager.go b/pkg/kv/kvserver/concurrency/concurrency_manager.go index 0193037d7fd7..90d4aff1f143 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager.go @@ -72,7 +72,8 @@ func (c *Config) initDefaults() { // NewManager creates a new concurrency Manager structure. func NewManager(cfg Config) Manager { cfg.initDefaults() - return &managerImpl{ + m := new(managerImpl) + *m = managerImpl{ // TODO(nvanbenschoten): move pkg/storage/spanlatch to a new // pkg/storage/concurrency/latch package. Make it implement the // latchManager interface directly, if possible. @@ -89,6 +90,7 @@ func NewManager(cfg Config) Manager { st: cfg.Settings, stopper: cfg.Stopper, ir: cfg.IntentResolver, + lm: m, disableTxnPushing: cfg.DisableTxnPushing, }, // TODO(nvanbenschoten): move pkg/storage/txnwait to a new @@ -102,6 +104,7 @@ func NewManager(cfg Config) Manager { Knobs: cfg.TxnWaitKnobs, }), } + return m } // SequenceReq implements the RequestSequencer interface. @@ -342,6 +345,9 @@ func (m *managerImpl) OnRangeLeaseUpdated(isLeaseholder bool) { const disable = true m.lt.Clear(disable) m.twq.Clear(disable) + // Also clear caches, since they won't be needed any time soon and + // consume memory. + m.ltw.ClearCaches() } } diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go index 2c806f979128..63b0325228ff 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go @@ -228,21 +228,35 @@ func TestConcurrencyManagerBasic(t *testing.T) { d.Fatalf(t, "unknown request: %s", reqName) } - var txnName string - d.ScanArgs(t, "txn", &txnName) - txn, ok := c.txnsByName[txnName] - if !ok { - d.Fatalf(t, "unknown txn %s", txnName) - } + // Each roachpb.Intent is provided on an indented line. + var intents []roachpb.Intent + singleReqLines := strings.Split(d.Input, "\n") + for _, line := range singleReqLines { + var err error + d.Cmd, d.CmdArgs, err = datadriven.ParseLine(line) + if err != nil { + d.Fatalf(t, "error parsing single intent: %v", err) + } + if d.Cmd != "intent" { + d.Fatalf(t, "expected \"intent\", found %s", d.Cmd) + } - var key string - d.ScanArgs(t, "key", &key) + var txnName string + d.ScanArgs(t, "txn", &txnName) + txn, ok := c.txnsByName[txnName] + if !ok { + d.Fatalf(t, "unknown txn %s", txnName) + } + + var key string + d.ScanArgs(t, "key", &key) + + intents = append(intents, roachpb.MakeIntent(&txn.TxnMeta, roachpb.Key(key))) + } opName := fmt.Sprintf("handle write intent error %s", reqName) mon.runAsync(opName, func(ctx context.Context) { - wiErr := &roachpb.WriteIntentError{Intents: []roachpb.Intent{ - roachpb.MakeIntent(&txn.TxnMeta, roachpb.Key(key)), - }} + wiErr := &roachpb.WriteIntentError{Intents: intents} guard, err := m.HandleWriterIntentError(ctx, prev, wiErr) if err != nil { log.Eventf(ctx, "handled %v, returned error: %v", wiErr, err) @@ -578,6 +592,19 @@ func (c *cluster) ResolveIntent( return nil } +// ResolveIntents implements the concurrency.IntentResolver interface. +func (c *cluster) ResolveIntents( + ctx context.Context, intents []roachpb.LockUpdate, opts intentresolver.ResolveOptions, +) *roachpb.Error { + log.Eventf(ctx, "resolving a batch of %d intent(s)", len(intents)) + for _, intent := range intents { + if err := c.ResolveIntent(ctx, intent, opts); err != nil { + return err + } + } + return nil +} + func (c *cluster) newTxnID() uuid.UUID { c.mu.Lock() defer c.mu.Unlock() diff --git a/pkg/kv/kvserver/concurrency/lock_table_waiter.go b/pkg/kv/kvserver/concurrency/lock_table_waiter.go index a3a9fe33f3bf..7b30bd3152a5 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_waiter.go +++ b/pkg/kv/kvserver/concurrency/lock_table_waiter.go @@ -23,7 +23,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" ) @@ -43,7 +45,7 @@ var LockTableLivenessPushDelay = settings.RegisterDurationSetting( // the cost on each of a transaction's abandoned locks and instead only pay // it once per abandoned transaction per range or per node. This could come // in a few different forms, including: - // - a per-wide cache of recently detected abandoned transaction IDs + // - a per-store cache of recently detected abandoned transaction IDs // - a per-range reverse index from transaction ID to locked keys // // TODO(nvanbenschoten): increasing this default value. @@ -83,6 +85,18 @@ type lockTableWaiterImpl struct { st *cluster.Settings stopper *stop.Stopper ir IntentResolver + lm LockManager + + // finalizedTxnCache is a small LRU cache that tracks transactions that + // were pushed and found to be finalized (COMMITTED or ABORTED). It is + // used as an optimization to avoid repeatedly pushing the transaction + // record when cleaning up the intents of an abandoned transaction. + // + // NOTE: it probably makes sense to maintain a single finalizedTxnCache + // across all Ranges on a Store instead of an individual cache per + // Range. For now, we don't do this because we don't share any state + // between separate concurrency.Manager instances. + finalizedTxnCache txnCache // When set, WriteIntentError are propagated instead of pushing // conflicting transactions. @@ -97,18 +111,22 @@ type IntentResolver interface { // provided pushee transaction immediately, if possible. Otherwise, it will // block until the pushee transaction is finalized or eventually can be // pushed successfully. + // TODO(nvanbenschoten): return a *roachpb.Transaction here. PushTransaction( context.Context, *enginepb.TxnMeta, roachpb.Header, roachpb.PushTxnType, ) (roachpb.Transaction, *Error) - // ResolveIntent resolves the provided intent according to the options. + // ResolveIntent synchronously resolves the provided intent. ResolveIntent(context.Context, roachpb.LockUpdate, intentresolver.ResolveOptions) *Error + + // ResolveIntents synchronously resolves the provided batch of intents. + ResolveIntents(context.Context, []roachpb.LockUpdate, intentresolver.ResolveOptions) *Error } // WaitOn implements the lockTableWaiter interface. func (w *lockTableWaiterImpl) WaitOn( ctx context.Context, req Request, guard lockTableGuard, -) *Error { +) (err *Error) { newStateC := guard.NewStateChan() ctxDoneC := ctx.Done() shouldQuiesceC := w.stopper.ShouldQuiesce() @@ -116,6 +134,16 @@ func (w *lockTableWaiterImpl) WaitOn( var timer *timeutil.Timer var timerC <-chan time.Time var timerWaitingState waitingState + // Used to defer the resolution of duplicate intents. Intended to allow + // batching of intent resolution while cleaning up after abandoned txns. A + // request may begin deferring intent resolution and then be forced to wait + // again on other locks. This is ok, as the request that deferred intent + // resolution will often be the new reservation holder for those intents' + // keys. Even when this is not the case (e.g. the request is read-only so it + // can't hold reservations), any other requests that slip ahead will simply + // re-discover the intent(s) during evaluation and resolve them themselves. + var deferredResolution []roachpb.LockUpdate + defer w.resolveDeferredIntents(ctx, &err, &deferredResolution) for { select { case <-newStateC: @@ -138,10 +166,7 @@ func (w *lockTableWaiterImpl) WaitOn( // The purpose of the waitForDistinguished state is to avoid waiting // out the longer deadlock detection delay before recognizing and // recovering from the failure of a transaction coordinator for - // *each* of that transaction's previously written intents. If we - // had a cache of aborted transaction IDs that allowed us to notice - // and quickly resolve abandoned intents then we might be able to - // get rid of this state. + // *each* of that transaction's previously written intents. livenessPush := state.kind == waitForDistinguished deadlockPush := true @@ -167,6 +192,54 @@ func (w *lockTableWaiterImpl) WaitOn( continue } + // If we know that a lock holder is already finalized (COMMITTED + // or ABORTED), there's no reason to push it again. Instead, we + // can skip directly to intent resolution. + // + // As an optimization, we defer the intent resolution until the + // we're done waiting on all conflicting locks in this function. + // This allows us to accumulate a group of intents to resolve + // and send them together as a batch. + // + // Remember that if the lock is held, there will be at least one + // waiter with livenessPush = true (the distinguished waiter), + // so at least one request will enter this branch and perform + // the cleanup on behalf of all other waiters. + if livenessPush { + if pusheeTxn, ok := w.finalizedTxnCache.get(state.txn.ID); ok { + resolve := roachpb.MakeLockUpdate(pusheeTxn, roachpb.Span{Key: state.key}) + deferredResolution = append(deferredResolution, resolve) + + // Inform the LockManager that the lock has been updated with a + // finalized status so that it gets removed from the lockTable + // and we are allowed to proceed. + // + // For unreplicated locks, this is all that is needed - the + // lockTable is the source of truth so, once removed, the + // unreplicated lock is gone. It is perfectly valid for us to + // instruct the lock to be released because we know that the + // lock's owner is finalized. + // + // For replicated locks, this is a bit of a lie. The lock hasn't + // actually been updated yet, but we will be conducting intent + // resolution in the future (before we observe the corresponding + // MVCC state). This is safe because we already handle cases + // where locks exist only in the MVCC keyspace and not in the + // lockTable. + // + // In the future, we'd like to make this more explicit. + // Specifically, we'd like to augment the lockTable with an + // understanding of finalized but not yet resolved locks. These + // locks will allow conflicting transactions to proceed with + // evaluation without the need to first remove all traces of + // them via a round of replication. This is discussed in more + // detail in #41720. Specifically, see mention of "contention + // footprint" and COMMITTED_BUT_NOT_REMOVABLE. + w.lm.OnLockUpdated(ctx, &deferredResolution[len(deferredResolution)-1]) + continue + } + } + // The request should push to detect abandoned locks due to // failed transaction coordinators, detect deadlocks between // transactions, or both, but only after delay. This delay @@ -266,7 +339,6 @@ func (w *lockTableWaiterImpl) WaitOn( // behind a lock. In this case, the request has a dependency on the // conflicting request but not necessarily the entire conflicting // transaction. - var err *Error if timerWaitingState.held { err = w.pushLockTxn(ctx, req, timerWaitingState) } else { @@ -316,6 +388,11 @@ func (w *lockTableWaiterImpl) WaitOnLock( }) } +// ClearCaches implements the lockTableWaiter interface. +func (w *lockTableWaiterImpl) ClearCaches() { + w.finalizedTxnCache.clear() +} + // pushLockTxn pushes the holder of the provided lock. // // The method blocks until the lock holder transaction experiences a state @@ -353,6 +430,13 @@ func (w *lockTableWaiterImpl) pushLockTxn( return err } + // If the transaction is finalized, add it to the finalizedTxnCache. This + // avoids needing to push it again if we find another one of its locks and + // allows for batching of intent resolution. + if pusheeTxn.Status.IsFinalized() { + w.finalizedTxnCache.add(&pusheeTxn) + } + // If the push succeeded then the lock holder transaction must have // experienced a state transition such that it no longer conflicts with // the pusher's request. This state transition could have been any of the @@ -485,6 +569,20 @@ func (w *lockTableWaiterImpl) pushHeader(req Request) roachpb.Header { return h } +// resolveDeferredIntents resolves the batch of intents if the provided error is +// nil. The batch of intents may be resolved more efficiently than if they were +// resolved individually. +func (w *lockTableWaiterImpl) resolveDeferredIntents( + ctx context.Context, err **Error, deferredResolution *[]roachpb.LockUpdate, +) { + if (*err != nil) || (len(*deferredResolution) == 0) { + return + } + // See pushLockTxn for an explanation of these options. + opts := intentresolver.ResolveOptions{Poison: true} + *err = w.ir.ResolveIntents(ctx, *deferredResolution, opts) +} + // watchForNotifications selects on the provided channel and watches for any // updates. If the channel is ever notified, it calls the provided context // cancelation function and exits. @@ -504,6 +602,62 @@ func (w *lockTableWaiterImpl) watchForNotifications( } } +// txnCache is a small LRU cache that holds Transaction objects. +// +// The zero value of this struct is ready for use. +type txnCache struct { + mu syncutil.Mutex + txns [8]*roachpb.Transaction // [MRU, ..., LRU] +} + +func (c *txnCache) get(id uuid.UUID) (*roachpb.Transaction, bool) { + c.mu.Lock() + defer c.mu.Unlock() + if idx := c.getIdxLocked(id); idx >= 0 { + txn := c.txns[idx] + c.moveFrontLocked(txn, idx) + return txn, true + } + return nil, false +} + +func (c *txnCache) add(txn *roachpb.Transaction) { + c.mu.Lock() + defer c.mu.Unlock() + if idx := c.getIdxLocked(txn.ID); idx >= 0 { + c.moveFrontLocked(txn, idx) + } else { + c.insertFrontLocked(txn) + } +} + +func (c *txnCache) clear() { + c.mu.Lock() + defer c.mu.Unlock() + for i := range c.txns { + c.txns[i] = nil + } +} + +func (c *txnCache) getIdxLocked(id uuid.UUID) int { + for i, txn := range c.txns { + if txn != nil && txn.ID == id { + return i + } + } + return -1 +} + +func (c *txnCache) moveFrontLocked(txn *roachpb.Transaction, cur int) { + copy(c.txns[1:cur+1], c.txns[:cur]) + c.txns[0] = txn +} + +func (c *txnCache) insertFrontLocked(txn *roachpb.Transaction) { + copy(c.txns[1:], c.txns[:]) + c.txns[0] = txn +} + func hasMinPriority(txn *enginepb.TxnMeta) bool { return txn != nil && txn.Priority == enginepb.MinTxnPriority } diff --git a/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go b/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go index a30195f10d09..66b7e47388a5 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go +++ b/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go @@ -12,6 +12,8 @@ package concurrency import ( "context" + "fmt" + "math/rand" "testing" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver" @@ -23,14 +25,17 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/stretchr/testify/require" ) type mockIntentResolver struct { - pushTxn func(context.Context, *enginepb.TxnMeta, roachpb.Header, roachpb.PushTxnType) (roachpb.Transaction, *Error) - resolveIntent func(context.Context, roachpb.LockUpdate) *Error + pushTxn func(context.Context, *enginepb.TxnMeta, roachpb.Header, roachpb.PushTxnType) (roachpb.Transaction, *Error) + resolveIntent func(context.Context, roachpb.LockUpdate) *Error + resolveIntents func(context.Context, []roachpb.LockUpdate) *Error } +// mockIntentResolver implements the IntentResolver interface. func (m *mockIntentResolver) PushTransaction( ctx context.Context, txn *enginepb.TxnMeta, h roachpb.Header, pushType roachpb.PushTxnType, ) (roachpb.Transaction, *Error) { @@ -43,12 +48,19 @@ func (m *mockIntentResolver) ResolveIntent( return m.resolveIntent(ctx, intent) } +func (m *mockIntentResolver) ResolveIntents( + ctx context.Context, intents []roachpb.LockUpdate, opts intentresolver.ResolveOptions, +) *Error { + return m.resolveIntents(ctx, intents) +} + type mockLockTableGuard struct { state waitingState signal chan struct{} stateObserved chan struct{} } +// mockLockTableGuard implements the lockTableGuard interface. func (g *mockLockTableGuard) ShouldWait() bool { return true } func (g *mockLockTableGuard) NewStateChan() chan struct{} { return g.signal } func (g *mockLockTableGuard) CurState() waitingState { @@ -60,18 +72,30 @@ func (g *mockLockTableGuard) CurState() waitingState { } func (g *mockLockTableGuard) notify() { g.signal <- struct{}{} } +// mockLockTableGuard implements the LockManager interface. +func (g *mockLockTableGuard) OnLockAcquired(_ context.Context, _ *roachpb.LockAcquisition) { + panic("unimplemented") +} +func (g *mockLockTableGuard) OnLockUpdated(_ context.Context, up *roachpb.LockUpdate) { + if g.state.held && g.state.txn.ID == up.Txn.ID && g.state.key.Equal(up.Key) { + g.state = waitingState{kind: doneWaiting} + g.notify() + } +} + func setupLockTableWaiterTest() (*lockTableWaiterImpl, *mockIntentResolver, *mockLockTableGuard) { ir := &mockIntentResolver{} st := cluster.MakeTestingClusterSettings() LockTableLivenessPushDelay.Override(&st.SV, 0) LockTableDeadlockDetectionPushDelay.Override(&st.SV, 0) + guard := &mockLockTableGuard{ + signal: make(chan struct{}, 1), + } w := &lockTableWaiterImpl{ st: st, stopper: stop.NewStopper(), ir: ir, - } - guard := &mockLockTableGuard{ - signal: make(chan struct{}, 1), + lm: guard, } return w, ir, guard } @@ -390,3 +414,110 @@ func TestLockTableWaiterIntentResolverError(t *testing.T) { } }) } + +// TestLockTableWaiterDeferredIntentResolverError tests that the lockTableWaiter +// propagates errors from its intent resolver when it resolves intent batches. +func TestLockTableWaiterDeferredIntentResolverError(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + w, ir, g := setupLockTableWaiterTest() + defer w.stopper.Stop(ctx) + + txn := makeTxnProto("request") + req := Request{ + Txn: &txn, + Timestamp: txn.ReadTimestamp, + } + keyA := roachpb.Key("keyA") + pusheeTxn := makeTxnProto("pushee") + + // Add the conflicting txn to the finalizedTxnCache so that the request + // avoids the transaction record push and defers the intent resolution. + pusheeTxn.Status = roachpb.ABORTED + w.finalizedTxnCache.add(&pusheeTxn) + + g.state = waitingState{ + kind: waitForDistinguished, + txn: &pusheeTxn.TxnMeta, + key: keyA, + held: true, + guardAccess: spanset.SpanReadWrite, + } + g.notify() + + // Errors are propagated when observed while resolving batches of intents. + err1 := roachpb.NewErrorf("error1") + ir.resolveIntents = func(_ context.Context, intents []roachpb.LockUpdate) *Error { + require.Len(t, intents, 1) + require.Equal(t, keyA, intents[0].Key) + require.Equal(t, pusheeTxn.ID, intents[0].Txn.ID) + require.Equal(t, roachpb.ABORTED, intents[0].Status) + return err1 + } + err := w.WaitOn(ctx, req, g) + require.Equal(t, err1, err) +} + +func TestTxnCache(t *testing.T) { + var c txnCache + const overflow = 4 + var txns [len(c.txns) + overflow]roachpb.Transaction + for i := range txns { + txns[i] = makeTxnProto(fmt.Sprintf("txn %d", i)) + } + + // Add each txn to the cache. Observe LRU eviction policy. + for i := range txns { + txn := &txns[i] + c.add(txn) + for j, txnInCache := range c.txns { + if j <= i { + require.Equal(t, &txns[i-j], txnInCache) + } else { + require.Nil(t, txnInCache) + } + } + } + + // Access each txn in the cache in reverse order. + // Should reverse the order of the cache because of LRU policy. + for i := len(txns) - 1; i >= 0; i-- { + txn := &txns[i] + txnInCache, ok := c.get(txn.ID) + if i < overflow { + // Expect overflow. + require.Nil(t, txnInCache) + require.False(t, ok) + } else { + // Should be in cache. + require.Equal(t, txn, txnInCache) + require.True(t, ok) + } + } + + // Cache should be in order again. + for i, txnInCache := range c.txns { + require.Equal(t, &txns[i+overflow], txnInCache) + } +} + +func BenchmarkTxnCache(b *testing.B) { + rng := rand.New(rand.NewSource(timeutil.Now().UnixNano())) + var c txnCache + var txns [len(c.txns) + 4]roachpb.Transaction + for i := range txns { + txns[i] = makeTxnProto(fmt.Sprintf("txn %d", i)) + } + txnOps := make([]*roachpb.Transaction, b.N) + for i := range txnOps { + txnOps[i] = &txns[rng.Intn(len(txns))] + } + b.ResetTimer() + for i, txnOp := range txnOps { + if i%2 == 0 { + c.add(txnOp) + } else { + _, _ = c.get(txnOp.ID) + } + } +} diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/acquire_wrong_txn_race b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/acquire_wrong_txn_race index 4ed4312dca3f..d13239f2799e 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/acquire_wrong_txn_race +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/acquire_wrong_txn_race @@ -120,7 +120,8 @@ sequence req=req3 [4] sequence req3: acquiring latches [4] sequence req3: blocked on select in spanlatch.(*Manager).waitForSignal -handle-write-intent-error req=req2 txn=txn1 key=k +handle-write-intent-error req=req2 + intent txn=txn1 key=k ---- [3] sequence reqRes1: sequencing complete, returned guard [5] handle write intent error req2: handled conflicting intents on "k", released latches diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/clear_abandoned_intents b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/clear_abandoned_intents new file mode 100644 index 000000000000..661e132727c4 --- /dev/null +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/clear_abandoned_intents @@ -0,0 +1,259 @@ +# ------------------------------------------------------------- +# A scan finds 10 abandoned intents from same txn +# ------------------------------------------------------------- + +new-txn name=txn1 ts=10,1 epoch=0 +---- + +new-txn name=txn2 ts=10,1 epoch=0 +---- + +new-request name=req1 txn=txn1 ts=10,1 + scan key=a endkey=z +---- + +sequence req=req1 +---- +[1] sequence req1: sequencing request +[1] sequence req1: acquiring latches +[1] sequence req1: scanning lock table for conflicting locks +[1] sequence req1: sequencing complete, returned guard + +handle-write-intent-error req=req1 + intent txn=txn2 key=a + intent txn=txn2 key=b + intent txn=txn2 key=c + intent txn=txn2 key=d + intent txn=txn2 key=e + intent txn=txn2 key=f + intent txn=txn2 key=g + intent txn=txn2 key=h + intent txn=txn2 key=i + intent txn=txn2 key=j +---- +[2] handle write intent error req1: handled conflicting intents on "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", released latches + +debug-lock-table +---- +global: num=10 + lock: "a" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + lock: "b" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + lock: "c" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + lock: "d" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + lock: "e" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + lock: "f" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + lock: "g" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + lock: "h" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + lock: "i" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + lock: "j" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] +local: num=0 + +sequence req=req1 +---- +[3] sequence req1: re-sequencing request +[3] sequence req1: acquiring latches +[3] sequence req1: scanning lock table for conflicting locks +[3] sequence req1: waiting in lock wait-queues +[3] sequence req1: pushing timestamp of txn 00000002 above 0.000000010,1 +[3] sequence req1: blocked on select in concurrency_test.(*cluster).PushTransaction + +debug-lock-table +---- +global: num=10 + lock: "a" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + waiting readers: + req: 1, txn: 00000001-0000-0000-0000-000000000000 + distinguished req: 1 + lock: "b" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + lock: "c" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + lock: "d" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + lock: "e" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + lock: "f" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + lock: "g" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + lock: "h" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + lock: "i" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + lock: "j" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] +local: num=0 + +# txn1 is the distinguished waiter on key "a". It will push txn2, notice that it +# is aborted, and then resolve key "a". Once txn2 is in the finalizedTxnCache, +# txn1 will create a batch to resolve all other keys together. +on-txn-updated txn=txn2 status=aborted +---- +[-] update txn: aborting txn2 +[3] sequence req1: resolving intent "a" for txn 00000002 with ABORTED status +[3] sequence req1: resolving a batch of 9 intent(s) +[3] sequence req1: resolving intent "b" for txn 00000002 with ABORTED status +[3] sequence req1: resolving intent "c" for txn 00000002 with ABORTED status +[3] sequence req1: resolving intent "d" for txn 00000002 with ABORTED status +[3] sequence req1: resolving intent "e" for txn 00000002 with ABORTED status +[3] sequence req1: resolving intent "f" for txn 00000002 with ABORTED status +[3] sequence req1: resolving intent "g" for txn 00000002 with ABORTED status +[3] sequence req1: resolving intent "h" for txn 00000002 with ABORTED status +[3] sequence req1: resolving intent "i" for txn 00000002 with ABORTED status +[3] sequence req1: resolving intent "j" for txn 00000002 with ABORTED status +[3] sequence req1: acquiring latches +[3] sequence req1: scanning lock table for conflicting locks +[3] sequence req1: sequencing complete, returned guard + +debug-lock-table +---- +global: num=0 +local: num=0 + +finish req=req1 +---- +[-] finish req1: finishing request + +reset namespace +---- + +# ------------------------------------------------------------- +# A series of 3 puts find 1 abandoned intent each from same txn +# ------------------------------------------------------------- + +new-txn name=txn1 ts=10,1 epoch=0 +---- + +new-txn name=txn2 ts=10,1 epoch=0 +---- + +new-request name=req1 txn=txn1 ts=10,1 + put key=a value=v1 + put key=b value=v2 + put key=c value=v3 +---- + +sequence req=req1 +---- +[1] sequence req1: sequencing request +[1] sequence req1: acquiring latches +[1] sequence req1: scanning lock table for conflicting locks +[1] sequence req1: sequencing complete, returned guard + +handle-write-intent-error req=req1 + intent txn=txn2 key=a +---- +[2] handle write intent error req1: handled conflicting intents on "a", released latches + +debug-lock-table +---- +global: num=1 + lock: "a" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + queued writers: + active: false req: 2, txn: 00000001-0000-0000-0000-000000000000 +local: num=0 + +sequence req=req1 +---- +[3] sequence req1: re-sequencing request +[3] sequence req1: acquiring latches +[3] sequence req1: scanning lock table for conflicting locks +[3] sequence req1: waiting in lock wait-queues +[3] sequence req1: pushing txn 00000002 to abort +[3] sequence req1: blocked on select in concurrency_test.(*cluster).PushTransaction + +on-txn-updated txn=txn2 status=committed +---- +[-] update txn: committing txn2 +[3] sequence req1: resolving intent "a" for txn 00000002 with COMMITTED status +[3] sequence req1: acquiring latches +[3] sequence req1: scanning lock table for conflicting locks +[3] sequence req1: sequencing complete, returned guard + +handle-write-intent-error req=req1 + intent txn=txn2 key=b +---- +[4] handle write intent error req1: handled conflicting intents on "b", released latches + +debug-lock-table +---- +global: num=2 + lock: "a" + res: req: 2, txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000010,1, seq: 0 + lock: "b" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + queued writers: + active: false req: 2, txn: 00000001-0000-0000-0000-000000000000 +local: num=0 + +sequence req=req1 +---- +[5] sequence req1: re-sequencing request +[5] sequence req1: acquiring latches +[5] sequence req1: scanning lock table for conflicting locks +[5] sequence req1: waiting in lock wait-queues +[5] sequence req1: resolving a batch of 1 intent(s) +[5] sequence req1: resolving intent "b" for txn 00000002 with COMMITTED status +[5] sequence req1: acquiring latches +[5] sequence req1: scanning lock table for conflicting locks +[5] sequence req1: sequencing complete, returned guard + +handle-write-intent-error req=req1 + intent txn=txn2 key=c +---- +[6] handle write intent error req1: handled conflicting intents on "c", released latches + +debug-lock-table +---- +global: num=3 + lock: "a" + res: req: 2, txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000010,1, seq: 0 + lock: "b" + res: req: 2, txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000010,1, seq: 0 + lock: "c" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] + queued writers: + active: false req: 2, txn: 00000001-0000-0000-0000-000000000000 +local: num=0 + +sequence req=req1 +---- +[7] sequence req1: re-sequencing request +[7] sequence req1: acquiring latches +[7] sequence req1: scanning lock table for conflicting locks +[7] sequence req1: waiting in lock wait-queues +[7] sequence req1: resolving a batch of 1 intent(s) +[7] sequence req1: resolving intent "c" for txn 00000002 with COMMITTED status +[7] sequence req1: acquiring latches +[7] sequence req1: scanning lock table for conflicting locks +[7] sequence req1: sequencing complete, returned guard + +debug-lock-table +---- +global: num=3 + lock: "a" + res: req: 2, txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000010,1, seq: 0 + lock: "b" + res: req: 2, txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000010,1, seq: 0 + lock: "c" + res: req: 2, txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000010,1, seq: 0 +local: num=0 + +finish req=req1 +---- +[-] finish req1: finishing request + +reset namespace +---- diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/discovered_lock b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/discovered_lock index e0fb5f43f13c..394a21603b75 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/discovered_lock +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/discovered_lock @@ -20,7 +20,8 @@ sequence req=req1 [1] sequence req1: scanning lock table for conflicting locks [1] sequence req1: sequencing complete, returned guard -handle-write-intent-error req=req1 txn=txn1 key=k +handle-write-intent-error req=req1 + intent txn=txn1 key=k ---- [2] handle write intent error req1: handled conflicting intents on "k", released latches @@ -82,7 +83,8 @@ sequence req=req1 [1] sequence req1: scanning lock table for conflicting locks [1] sequence req1: sequencing complete, returned guard -handle-write-intent-error req=req1 txn=txn1 key=k +handle-write-intent-error req=req1 + intent txn=txn1 key=k ---- [2] handle write intent error req1: pushing timestamp of txn 00000001 above 0.000000012,1 [2] handle write intent error req1: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -139,7 +141,8 @@ sequence req=req1 [1] sequence req1: scanning lock table for conflicting locks [1] sequence req1: sequencing complete, returned guard -handle-write-intent-error req=req1 txn=txn1 key=k +handle-write-intent-error req=req1 + intent txn=txn1 key=k ---- [2] handle write intent error req1: pushing txn 00000001 to abort [2] handle write intent error req1: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -197,7 +200,8 @@ sequence req=req1 [1] sequence req1: scanning lock table for conflicting locks [1] sequence req1: sequencing complete, returned guard -handle-write-intent-error req=req1 txn=txn1 key=k +handle-write-intent-error req=req1 + intent txn=txn1 key=k ---- [2] handle write intent error req1: pushing timestamp of txn 00000001 above 0.000000012,1 [2] handle write intent error req1: blocked on select in concurrency_test.(*cluster).PushTransaction diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/range_state_listener b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/range_state_listener index 79c3ba43b5ad..5ca205f792cb 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/range_state_listener +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/range_state_listener @@ -119,7 +119,8 @@ debug-lock-table global: num=0 local: num=0 -handle-write-intent-error req=req2 txn=txn1 key=k +handle-write-intent-error req=req2 + intent txn=txn1 key=k ---- [3] handle write intent error req2: pushing txn 00000001 to abort [3] handle write intent error req2: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -142,7 +143,8 @@ sequence req=req2 [4] sequence req2: scanning lock table for conflicting locks [4] sequence req2: sequencing complete, returned guard -handle-write-intent-error req=req2 txn=txn1 key=k2 +handle-write-intent-error req=req2 + intent txn=txn1 key=k2 ---- [5] handle write intent error req2: pushing timestamp of txn 00000001 above 0.000000010,1 [5] handle write intent error req2: resolving intent "k2" for txn 00000001 with COMMITTED status @@ -190,7 +192,8 @@ sequence req=req3 [7] sequence req3: scanning lock table for conflicting locks [7] sequence req3: sequencing complete, returned guard -handle-write-intent-error req=req3 txn=txn2 key=k +handle-write-intent-error req=req3 + intent txn=txn2 key=k ---- [8] handle write intent error req3: handled conflicting intents on "k", released latches @@ -357,7 +360,8 @@ debug-lock-table global: num=0 local: num=0 -handle-write-intent-error req=req2 txn=txn1 key=k +handle-write-intent-error req=req2 + intent txn=txn1 key=k ---- [3] handle write intent error req2: handled conflicting intents on "k", released latches @@ -526,7 +530,8 @@ debug-lock-table global: num=0 local: num=0 -handle-write-intent-error req=req2 txn=txn1 key=k +handle-write-intent-error req=req2 + intent txn=txn1 key=k ---- [3] handle write intent error req2: pushing txn 00000001 to abort [3] handle write intent error req2: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -549,7 +554,8 @@ sequence req=req2 [4] sequence req2: scanning lock table for conflicting locks [4] sequence req2: sequencing complete, returned guard -handle-write-intent-error req=req2 txn=txn1 key=k2 +handle-write-intent-error req=req2 + intent txn=txn1 key=k2 ---- [5] handle write intent error req2: pushing timestamp of txn 00000001 above 0.000000010,1 [5] handle write intent error req2: resolving intent "k2" for txn 00000001 with COMMITTED status @@ -672,7 +678,8 @@ debug-lock-table global: num=0 local: num=0 -handle-write-intent-error req=req2 txn=txn1 key=k +handle-write-intent-error req=req2 + intent txn=txn1 key=k ---- [3] handle write intent error req2: handled conflicting intents on "k", released latches diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/uncertainty b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/uncertainty index 035cf1e73aee..7ba304a44578 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/uncertainty +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/uncertainty @@ -22,7 +22,8 @@ sequence req=req1 [1] sequence req1: scanning lock table for conflicting locks [1] sequence req1: sequencing complete, returned guard -handle-write-intent-error req=req1 txn=txn1 key=k +handle-write-intent-error req=req1 + intent txn=txn1 key=k ---- [2] handle write intent error req1: handled conflicting intents on "k", released latches @@ -82,7 +83,8 @@ sequence req=req1 [1] sequence req1: scanning lock table for conflicting locks [1] sequence req1: sequencing complete, returned guard -handle-write-intent-error req=req1 txn=txn1 key=k +handle-write-intent-error req=req1 + intent txn=txn1 key=k ---- [2] handle write intent error req1: handled conflicting intents on "k", released latches