diff --git a/pkg/kv/kvserver/closed_timestamp_test.go b/pkg/kv/kvserver/closed_timestamp_test.go index b5dbf085c8af..d30967866698 100644 --- a/pkg/kv/kvserver/closed_timestamp_test.go +++ b/pkg/kv/kvserver/closed_timestamp_test.go @@ -15,11 +15,14 @@ import ( gosql "database/sql" "fmt" "math/rand" + "strconv" "sync/atomic" "testing" "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -172,6 +175,79 @@ func TestClosedTimestampCanServeThroughoutLeaseTransfer(t *testing.T) { } } +// TestClosedTimestampCanServeWithConflictingIntent validates that a read served +// from a follower replica will wait on conflicting intents and ensure that they +// are cleaned up if necessary to allow the read to proceed. +func TestClosedTimestampCanServeWithConflictingIntent(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + tc, _, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t, testingTargetDuration) + defer tc.Stopper().Stop(ctx) + ds := tc.Server(0).DistSenderI().(*kvcoord.DistSender) + + // Write N different intents for the same transaction, where N is the number + // of replicas in the testing range. Each intent will be read and eventually + // resolved by a read on a different replica. + txnKey := desc.StartKey.AsRawKey() + txnKey = txnKey[:len(txnKey):len(txnKey)] // avoid aliasing + txn := roachpb.MakeTransaction("txn", txnKey, 0, tc.Server(0).Clock().Now(), 0) + var keys []roachpb.Key + for i := range repls { + key := append(txnKey, []byte(strconv.Itoa(i))...) + keys = append(keys, key) + put := putArgs(key, []byte("val")) + resp, err := kv.SendWrappedWith(ctx, ds, roachpb.Header{Txn: &txn}, put) + if err != nil { + t.Fatal(err) + } + txn.Update(resp.Header().Txn) + } + + // Read a different intent on each replica. All should begin waiting on the + // intents by pushing the transaction that wrote them. None should complete. + ts := txn.WriteTimestamp + respCh := make(chan struct{}, len(keys)) + for i, key := range keys { + go func(repl *kvserver.Replica, key roachpb.Key) { + var baRead roachpb.BatchRequest + r := &roachpb.ScanRequest{} + r.Key = key + r.EndKey = key.Next() + baRead.Add(r) + baRead.Timestamp = ts + baRead.RangeID = desc.RangeID + + testutils.SucceedsSoon(t, func() error { + // Expect 0 rows, because the intents will be aborted. + _, err := expectRows(0)(repl.Send(ctx, baRead)) + return err + }) + respCh <- struct{}{} + }(repls[i], key) + } + + select { + case <-respCh: + t.Fatal("request unexpectedly succeeded, should block") + case <-time.After(20 * time.Millisecond): + } + + // Abort the transaction. All pushes should succeed and all intents should + // be resolved, allowing all reads (on the leaseholder and on followers) to + // proceed and finish. + endTxn := &roachpb.EndTxnRequest{ + RequestHeader: roachpb.RequestHeader{Key: txn.Key}, + Commit: false, + } + if _, err := kv.SendWrappedWith(ctx, ds, roachpb.Header{Txn: &txn}, endTxn); err != nil { + t.Fatal(err) + } + for range keys { + <-respCh + } +} + // TestClosedTimestampCanServeAfterSplitsAndMerges validates the invariant that // if a timestamp is safe for reading on both the left side and right side of a // a merge then it will be safe after the merge and that if a timestamp is safe diff --git a/pkg/kv/kvserver/concurrency/concurrency_control.go b/pkg/kv/kvserver/concurrency/concurrency_control.go index 1fc035413fb6..efbb42f907d1 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_control.go +++ b/pkg/kv/kvserver/concurrency/concurrency_control.go @@ -196,6 +196,8 @@ type ContentionHandler interface { // error in the lock's wait-queue (but does not wait) and releases the // guard's latches. It returns an updated guard reflecting this change. // After the method returns, the original guard should no longer be used. + // If an error is returned then the provided guard will be released and no + // guard will be returned. // // Example usage: Txn A scans the lock table and does not see an intent on // key K from txn B because the intent is not being tracked in the lock @@ -204,7 +206,7 @@ type ContentionHandler interface { // method before txn A retries its scan. During the retry, txn A scans the // lock table and observes the lock on key K, so it enters the lock's // wait-queue and waits for it to be resolved. - HandleWriterIntentError(context.Context, *Guard, *roachpb.WriteIntentError) *Guard + HandleWriterIntentError(context.Context, *Guard, *roachpb.WriteIntentError) (*Guard, *Error) // HandleTransactionPushError consumes a TransactionPushError thrown by a // PushTxnRequest by informing the concurrency manager about a transaction @@ -474,7 +476,11 @@ type lockTable interface { // // A latch consistent with the access desired by the guard must be held on // the span containing the discovered lock's key. - AddDiscoveredLock(*roachpb.Intent, lockTableGuard) error + // + // The method returns a boolean indicating whether the discovered lock was + // added to the lockTable (true) or whether it was ignored because the + // lockTable is currently disabled (false). + AddDiscoveredLock(*roachpb.Intent, lockTableGuard) (bool, error) // AcquireLock informs the lockTable that a new lock was acquired or an // existing lock was updated. @@ -610,6 +616,22 @@ type lockTableWaiter interface { // wait-queues and it is safe to re-acquire latches and scan the lockTable // again. WaitOn(context.Context, Request, lockTableGuard) *Error + + // WaitOnLock waits on the transaction responsible for the specified lock + // and then ensures that the lock is cleared out of the request's way. + // + // The method should be called after dropping any latches that a request has + // acquired. It returns when the lock has been resolved. + // + // NOTE: this method is used when the lockTable is disabled (e.g. on a + // follower replica) and a lock is discovered that must be waited on (e.g. + // during a follower read). If/when lockTables are maintained on follower + // replicas by propagating lockTable state transitions through the Raft log + // in the ReplicatedEvalResult instead of through the (leaseholder-only) + // LocalResult, we should be able to remove the lockTable "disabled" state + // and, in turn, remove this method. This will likely fall out of pulling + // all replicated locks into the lockTable. + WaitOnLock(context.Context, Request, *roachpb.Intent) *Error } // txnWaitQueue holds a collection of wait-queues for transaction records. diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager.go b/pkg/kv/kvserver/concurrency/concurrency_manager.go index c98e92fe279b..a3f61c923b58 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" @@ -85,7 +86,6 @@ func NewManager(cfg Config) Manager { maxLocks: cfg.MaxLockTableSize, }, ltw: &lockTableWaiterImpl{ - nodeID: cfg.NodeDesc.NodeID, st: cfg.Settings, stopper: cfg.Stopper, ir: cfg.IntentResolver, @@ -244,19 +244,25 @@ func (m *managerImpl) FinishReq(g *Guard) { // HandleWriterIntentError implements the ContentionHandler interface. func (m *managerImpl) HandleWriterIntentError( ctx context.Context, g *Guard, t *roachpb.WriteIntentError, -) *Guard { +) (*Guard, *Error) { if g.ltg == nil { log.Fatalf(ctx, "cannot handle WriteIntentError %v for request without "+ "lockTableGuard; were lock spans declared for this request?", t) } // Add a discovered lock to lock-table for each intent and enter each lock's - // wait-queue. + // wait-queue. If the lock-table is disabled and one or more of the intents + // are ignored then we immediately wait on all intents. + wait := false for i := range t.Intents { intent := &t.Intents[i] - if err := m.lt.AddDiscoveredLock(intent, g.ltg); err != nil { + added, err := m.lt.AddDiscoveredLock(intent, g.ltg) + if err != nil { log.Fatal(ctx, errors.HandleAsAssertionFailure(err)) } + if !added { + wait = true + } } // Release the Guard's latches but continue to remain in lock wait-queues by @@ -264,7 +270,21 @@ func (m *managerImpl) HandleWriterIntentError( // then re-sequence the Request by calling SequenceReq with the un-latched // Guard. This is analogous to iterating through the loop in SequenceReq. m.lm.Release(g.moveLatchGuard()) - return g + + // If the lockTable was disabled then we need to immediately wait on the + // intents to ensure that they are resolved and moved out of the request's + // way. + if wait { + for i := range t.Intents { + intent := &t.Intents[i] + if err := m.ltw.WaitOnLock(ctx, g.Req, intent); err != nil { + m.FinishReq(g) + return nil, err + } + } + } + + return g, nil } // HandleTransactionPushError implements the ContentionHandler interface. @@ -378,6 +398,38 @@ func (m *managerImpl) TxnWaitQueue() *txnwait.Queue { return m.twq.(*txnwait.Queue) } +func (r *Request) txnMeta() *enginepb.TxnMeta { + if r.Txn == nil { + return nil + } + return &r.Txn.TxnMeta +} + +// readConflictTimestamp returns the maximum timestamp at which the request +// conflicts with locks acquired by other transaction. The request must wait +// for all locks acquired by other transactions at or below this timestamp +// to be released. All locks acquired by other transactions above this +// timestamp are ignored. +func (r *Request) readConflictTimestamp() hlc.Timestamp { + ts := r.Timestamp + if r.Txn != nil { + ts = r.Txn.ReadTimestamp + ts.Forward(r.Txn.MaxTimestamp) + } + return ts +} + +// writeConflictTimestamp returns the minimum timestamp at which the request +// acquires locks when performing mutations. All writes performed by the +// requests must take place at or above this timestamp. +func (r *Request) writeConflictTimestamp() hlc.Timestamp { + ts := r.Timestamp + if r.Txn != nil { + ts = r.Txn.WriteTimestamp + } + return ts +} + func (r *Request) isSingle(m roachpb.Method) bool { if len(r.Requests) != 1 { return false diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go index 8cfc109902d0..12fb8aadc7b5 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go @@ -51,7 +51,7 @@ import ( // // The input files use the following DSL: // -// new-txn name= ts=[,] epoch= +// new-txn name= ts=[,] epoch= [maxts=[,]] // new-request name= txn=|none ts=[,] [priority] [consistency] // [=...] // sequence req= @@ -61,8 +61,8 @@ import ( // handle-txn-push-error req= txn= key= TODO(nvanbenschoten): implement this // // on-lock-acquired txn= key= -// on-lock-updated txn= key= status=[committed|aborted|pending] [ts[,]] -// on-txn-updated txn= status=[committed|aborted|pending] [ts[,]] +// on-lock-updated txn= key= status=[committed|aborted|pending] [ts=[,]] +// on-txn-updated txn= status=[committed|aborted|pending] [ts=[,]] // // on-lease-updated leaseholder= // on-split @@ -94,6 +94,11 @@ func TestConcurrencyManagerBasic(t *testing.T) { var epoch int d.ScanArgs(t, "epoch", &epoch) + maxTS := ts + if d.HasArg("maxts") { + maxTS = scanTimestampWithName(t, d, "maxts") + } + txn, ok := c.txnsByName[txnName] var id uuid.UUID if ok { @@ -110,6 +115,7 @@ func TestConcurrencyManagerBasic(t *testing.T) { Priority: 1, // not min or max }, ReadTimestamp: ts, + MaxTimestamp: maxTS, } txn.UpdateObservedTimestamp(c.nodeDesc.NodeID, ts) c.registerTxn(txnName, txn) @@ -215,7 +221,7 @@ func TestConcurrencyManagerBasic(t *testing.T) { case "handle-write-intent-error": var reqName string d.ScanArgs(t, "req", &reqName) - guard, ok := c.guardsByReqName[reqName] + prev, ok := c.guardsByReqName[reqName] if !ok { d.Fatalf(t, "unknown request: %s", reqName) } @@ -231,12 +237,22 @@ func TestConcurrencyManagerBasic(t *testing.T) { d.ScanArgs(t, "key", &key) opName := fmt.Sprintf("handle write intent error %s", reqName) - mon.runSync(opName, func(ctx context.Context) { - err := &roachpb.WriteIntentError{Intents: []roachpb.Intent{ + mon.runAsync(opName, func(ctx context.Context) { + wiErr := &roachpb.WriteIntentError{Intents: []roachpb.Intent{ roachpb.MakeIntent(&txn.TxnMeta, roachpb.Key(key)), }} - log.Eventf(ctx, "handling %v", err) - guard = m.HandleWriterIntentError(ctx, guard, err) + guard, err := m.HandleWriterIntentError(ctx, prev, wiErr) + if err != nil { + log.Eventf(ctx, "handled %v, returned error: %v", wiErr, err) + c.mu.Lock() + delete(c.guardsByReqName, reqName) + c.mu.Unlock() + } else { + log.Eventf(ctx, "handled %v, released latches", wiErr) + c.mu.Lock() + c.guardsByReqName[reqName] = guard + c.mu.Unlock() + } }) return c.waitAndCollect(t, mon) @@ -445,7 +461,6 @@ func (c *cluster) makeConfig() concurrency.Config { func (c *cluster) PushTransaction( ctx context.Context, pushee *enginepb.TxnMeta, h roachpb.Header, pushType roachpb.PushTxnType, ) (roachpb.Transaction, *roachpb.Error) { - log.Eventf(ctx, "pushing txn %s", pushee.ID.Short()) pusheeRecord, err := c.getTxnRecord(pushee.ID) if err != nil { return roachpb.Transaction{}, roachpb.NewError(err) diff --git a/pkg/kv/kvserver/concurrency/datadriven_util_test.go b/pkg/kv/kvserver/concurrency/datadriven_util_test.go index d8ea7a2d72b5..dba8c870197e 100644 --- a/pkg/kv/kvserver/concurrency/datadriven_util_test.go +++ b/pkg/kv/kvserver/concurrency/datadriven_util_test.go @@ -29,9 +29,13 @@ func nextUUID(counter *uint32) uuid.UUID { } func scanTimestamp(t *testing.T, d *datadriven.TestData) hlc.Timestamp { + return scanTimestampWithName(t, d, "ts") +} + +func scanTimestampWithName(t *testing.T, d *datadriven.TestData, name string) hlc.Timestamp { var ts hlc.Timestamp var tsS string - d.ScanArgs(t, "ts", &tsS) + d.ScanArgs(t, name, &tsS) parts := strings.Split(tsS, ",") // Find the wall time part. diff --git a/pkg/kv/kvserver/concurrency/lock_table.go b/pkg/kv/kvserver/concurrency/lock_table.go index 5742b344cf2e..35ef83b7378d 100644 --- a/pkg/kv/kvserver/concurrency/lock_table.go +++ b/pkg/kv/kvserver/concurrency/lock_table.go @@ -1663,17 +1663,12 @@ func (t *lockTableImpl) ScanAndEnqueue(req Request, guard lockTableGuard) lockTa if guard == nil { g = newLockTableGuardImpl() g.seqNum = atomic.AddUint64(&t.seqNum, 1) + g.txn = req.txnMeta() g.spans = req.LockSpans - g.readTS = req.Timestamp - g.writeTS = req.Timestamp + g.readTS = req.readConflictTimestamp() + g.writeTS = req.writeConflictTimestamp() g.sa = spanset.NumSpanAccess - 1 g.index = -1 - if req.Txn != nil { - g.txn = &req.Txn.TxnMeta - g.readTS = req.Txn.ReadTimestamp - g.readTS.Forward(req.Txn.MaxTimestamp) - g.writeTS = req.Txn.WriteTimestamp - } } else { g = guard.(*lockTableGuardImpl) g.key = nil @@ -1734,22 +1729,20 @@ func (t *lockTableImpl) Dequeue(guard lockTableGuard) { } // AddDiscoveredLock implements the lockTable interface. -func (t *lockTableImpl) AddDiscoveredLock(intent *roachpb.Intent, guard lockTableGuard) error { +func (t *lockTableImpl) AddDiscoveredLock( + intent *roachpb.Intent, guard lockTableGuard, +) (added bool, _ error) { t.enabledMu.RLock() defer t.enabledMu.RUnlock() if !t.enabled { // If not enabled, don't track any locks. - return nil + return false, nil } g := guard.(*lockTableGuardImpl) key := intent.Key - ss := spanset.SpanGlobal - if keys.IsLocal(key) { - ss = spanset.SpanLocal - } - sa, err := findAccessInSpans(key, ss, g.spans) + sa, ss, err := findAccessInSpans(key, g.spans) if err != nil { - return err + return false, err } var l *lockState tree := &t.locks[ss] @@ -1768,7 +1761,7 @@ func (t *lockTableImpl) AddDiscoveredLock(intent *roachpb.Intent, guard lockTabl } else { l = iter.Cur() } - return l.discoveredLock(&intent.Txn, intent.Txn.WriteTimestamp, g, sa) + return true, l.discoveredLock(&intent.Txn, intent.Txn.WriteTimestamp, g, sa) } // AcquireLock implements the lockTable interface. @@ -1861,11 +1854,15 @@ func (t *lockTableImpl) tryClearLocks(force bool) { } } -// Given the key with scope ss must be in spans, returns the strongest access -// specified in the spans. +// Given the key must be in spans, returns the strongest access +// specified in the spans, along with the scope of the key. func findAccessInSpans( - key roachpb.Key, ss spanset.SpanScope, spans *spanset.SpanSet, -) (spanset.SpanAccess, error) { + key roachpb.Key, spans *spanset.SpanSet, +) (spanset.SpanAccess, spanset.SpanScope, error) { + ss := spanset.SpanGlobal + if keys.IsLocal(key) { + ss = spanset.SpanLocal + } for sa := spanset.NumSpanAccess - 1; sa >= 0; sa-- { s := spans.GetSpans(sa, ss) // First span that starts after key @@ -1874,10 +1871,10 @@ func findAccessInSpans( }) if i > 0 && ((len(s[i-1].EndKey) > 0 && key.Compare(s[i-1].EndKey) < 0) || key.Equal(s[i-1].Key)) { - return sa, nil + return sa, ss, nil } } - return spanset.NumSpanAccess, errors.Errorf("caller violated contract") + return 0, 0, errors.Errorf("caller violated contract") } // Tries to GC locks that were previously known to have become empty. diff --git a/pkg/kv/kvserver/concurrency/lock_table_test.go b/pkg/kv/kvserver/concurrency/lock_table_test.go index cff11d6b72b5..9c16286b4717 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_test.go +++ b/pkg/kv/kvserver/concurrency/lock_table_test.go @@ -333,7 +333,7 @@ func TestLockTableBasic(t *testing.T) { d.Fatalf(t, "unknown txn %s", txnName) } intent := roachpb.MakeIntent(txnMeta, roachpb.Key(key)) - if err := lt.AddDiscoveredLock(&intent, g); err != nil { + if _, err := lt.AddDiscoveredLock(&intent, g); err != nil { return err.Error() } return lt.(*lockTableImpl).String() diff --git a/pkg/kv/kvserver/concurrency/lock_table_waiter.go b/pkg/kv/kvserver/concurrency/lock_table_waiter.go index a738a0cce1a8..c9100c21584f 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_waiter.go +++ b/pkg/kv/kvserver/concurrency/lock_table_waiter.go @@ -15,12 +15,14 @@ import ( "math" "time" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) @@ -78,7 +80,6 @@ var LockTableDeadlockDetectionPushDelay = settings.RegisterDurationSetting( // lockTableWaiterImpl is an implementation of lockTableWaiter. type lockTableWaiterImpl struct { - nodeID roachpb.NodeID st *cluster.Settings stopper *stop.Stopper ir IntentResolver @@ -298,6 +299,26 @@ func (w *lockTableWaiterImpl) WaitOn( } } +// WaitOnLock implements the lockTableWaiter interface. +func (w *lockTableWaiterImpl) WaitOnLock( + ctx context.Context, req Request, intent *roachpb.Intent, +) *Error { + sa, _, err := findAccessInSpans(intent.Key, req.LockSpans) + if err != nil { + return roachpb.NewError(err) + } + return w.pushLockTxn(ctx, req, waitingState{ + stateKind: waitFor, + txn: &intent.Txn, + ts: intent.Txn.WriteTimestamp, + dur: lock.Replicated, + key: intent.Key, + held: true, + access: spanset.SpanReadWrite, + guardAccess: sa, + }) +} + // pushLockTxn pushes the holder of the provided lock. // // The method blocks until the lock holder transaction experiences a state @@ -319,15 +340,17 @@ func (w *lockTableWaiterImpl) pushLockTxn( // under the lock. For write-write conflicts, try to abort the lock holder // entirely so the write request can revoke and replace the lock with its // own lock. + h := w.pushHeader(req) var pushType roachpb.PushTxnType switch ws.guardAccess { case spanset.SpanReadOnly: pushType = roachpb.PUSH_TIMESTAMP + log.VEventf(ctx, 3, "pushing timestamp of txn %s above %s", ws.txn.ID.Short(), h.Timestamp) case spanset.SpanReadWrite: pushType = roachpb.PUSH_ABORT + log.VEventf(ctx, 3, "pushing txn %s to abort", ws.txn.ID.Short()) } - h := w.pushHeader(req) pusheeTxn, err := w.ir.PushTransaction(ctx, ws.txn, h, pushType) if err != nil { return err @@ -397,9 +420,10 @@ func (w *lockTableWaiterImpl) pushRequestTxn( // because it wants to block until either a) the pushee or the pusher is // aborted due to a deadlock or b) the request exits the lock wait-queue and // the caller of this function cancels the push. + h := w.pushHeader(req) pushType := roachpb.PUSH_ABORT + log.VEventf(ctx, 3, "pushing txn %s to detect request deadlock", ws.txn.ID.Short()) - h := w.pushHeader(req) _, err := w.ir.PushTransaction(ctx, ws.txn, h, pushType) // Even if the push succeeded and aborted the other transaction to break a // deadlock, there's nothing for the pusher to clean up. The conflicting @@ -413,26 +437,15 @@ func (w *lockTableWaiterImpl) pushRequestTxn( func (w *lockTableWaiterImpl) pushHeader(req Request) roachpb.Header { h := roachpb.Header{ - Timestamp: req.Timestamp, + Timestamp: req.readConflictTimestamp(), UserPriority: req.Priority, } if req.Txn != nil { - // We are going to hand the header (and thus the transaction proto) - // to the RPC framework, after which it must not be changed (since - // that could race). Since the subsequent execution of the original - // request might mutate the transaction, make a copy here. - // - // See #9130. + // We are going to hand the header (and thus the transaction proto) to + // the RPC framework, after which it must not be changed (since that + // could race). Since the subsequent execution of the original request + // might mutate the transaction, make a copy here. See #9130. h.Txn = req.Txn.Clone() - - // We must push at least to h.Timestamp, but in fact we want to - // go all the way up to a timestamp which was taken off the HLC - // after our operation started. This allows us to not have to - // restart for uncertainty as we come back and read. - obsTS, ok := h.Txn.GetObservedTimestamp(w.nodeID) - if ok { - h.Timestamp.Forward(obsTS) - } } return h } diff --git a/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go b/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go index 92f45cc2ba60..b1d593f190f1 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go +++ b/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go @@ -66,7 +66,6 @@ func setupLockTableWaiterTest() (*lockTableWaiterImpl, *mockIntentResolver, *moc LockTableLivenessPushDelay.Override(&st.SV, 0) LockTableDeadlockDetectionPushDelay.Override(&st.SV, 0) w := &lockTableWaiterImpl{ - nodeID: 2, st: st, stopper: stop.NewStopper(), ir: ir, @@ -87,10 +86,10 @@ func TestLockTableWaiterWithTxn(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() - observedTS := hlc.Timestamp{WallTime: 15} + maxTS := hlc.Timestamp{WallTime: 15} makeReq := func() Request { txn := makeTxnProto("request") - txn.UpdateObservedTimestamp(2, observedTS) + txn.MaxTimestamp = maxTS return Request{ Txn: &txn, Timestamp: txn.ReadTimestamp, @@ -99,15 +98,15 @@ func TestLockTableWaiterWithTxn(t *testing.T) { t.Run("state", func(t *testing.T) { t.Run("waitFor", func(t *testing.T) { - testWaitPush(t, waitFor, makeReq, observedTS) + testWaitPush(t, waitFor, makeReq, maxTS) }) t.Run("waitForDistinguished", func(t *testing.T) { - testWaitPush(t, waitForDistinguished, makeReq, observedTS) + testWaitPush(t, waitForDistinguished, makeReq, maxTS) }) t.Run("waitElsewhere", func(t *testing.T) { - testWaitPush(t, waitElsewhere, makeReq, observedTS) + testWaitPush(t, waitElsewhere, makeReq, maxTS) }) t.Run("waitSelf", func(t *testing.T) { diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/basic b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/basic index 95e0ea3dc5a8..177cf23ab607 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/basic +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/basic @@ -105,7 +105,7 @@ sequence req=req3 [1] sequence req3: acquiring latches [1] sequence req3: scanning lock table for conflicting locks [1] sequence req3: waiting in lock wait-queues -[1] sequence req3: pushing txn 00000002 +[1] sequence req3: pushing timestamp of txn 00000002 above 0.000000014,1 [1] sequence req3: blocked on select in concurrency_test.(*cluster).PushTransaction on-txn-updated txn=txn2 status=committed @@ -174,7 +174,7 @@ sequence req=req5 [1] sequence req5: acquiring latches [1] sequence req5: scanning lock table for conflicting locks [1] sequence req5: waiting in lock wait-queues -[1] sequence req5: pushing txn 00000002 +[1] sequence req5: pushing timestamp of txn 00000002 above 0.000000014,1 [1] sequence req5: blocked on select in concurrency_test.(*cluster).PushTransaction new-request name=req6 txn=none ts=16,1 @@ -219,7 +219,7 @@ finish req=req6 [-] finish req6: finishing request [3] sequence req7: scanning lock table for conflicting locks [3] sequence req7: waiting in lock wait-queues -[3] sequence req7: pushing txn 00000002 +[3] sequence req7: pushing txn 00000002 to abort [3] sequence req7: blocked on select in concurrency_test.(*cluster).PushTransaction on-txn-updated txn=txn2 status=committed diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/deadlocks b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/deadlocks index 87c05266ceae..fdeac15a25cf 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/deadlocks +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/deadlocks @@ -110,7 +110,7 @@ sequence req=req1r [4] sequence req1r: acquiring latches [4] sequence req1r: scanning lock table for conflicting locks [4] sequence req1r: waiting in lock wait-queues -[4] sequence req1r: pushing txn 00000002 +[4] sequence req1r: pushing timestamp of txn 00000002 above 0.000000010,1 [4] sequence req1r: blocked on select in concurrency_test.(*cluster).PushTransaction sequence req=req2r @@ -119,7 +119,7 @@ sequence req=req2r [5] sequence req2r: acquiring latches [5] sequence req2r: scanning lock table for conflicting locks [5] sequence req2r: waiting in lock wait-queues -[5] sequence req2r: pushing txn 00000003 +[5] sequence req2r: pushing timestamp of txn 00000003 above 0.000000010,1 [5] sequence req2r: blocked on select in concurrency_test.(*cluster).PushTransaction sequence req=req3r @@ -130,7 +130,7 @@ sequence req=req3r [6] sequence req3r: acquiring latches [6] sequence req3r: scanning lock table for conflicting locks [6] sequence req3r: waiting in lock wait-queues -[6] sequence req3r: pushing txn 00000001 +[6] sequence req3r: pushing timestamp of txn 00000001 above 0.000000010,1 [6] sequence req3r: blocked on select in concurrency_test.(*cluster).PushTransaction [6] sequence req3r: dependency cycle detected 00000003->00000001->00000002->00000003 @@ -311,7 +311,7 @@ sequence req=req4w [4] sequence req4w: acquiring latches [4] sequence req4w: scanning lock table for conflicting locks [4] sequence req4w: waiting in lock wait-queues -[4] sequence req4w: pushing txn 00000001 +[4] sequence req4w: pushing txn 00000001 to abort [4] sequence req4w: blocked on select in concurrency_test.(*cluster).PushTransaction sequence req=req1w2 @@ -320,7 +320,7 @@ sequence req=req1w2 [5] sequence req1w2: acquiring latches [5] sequence req1w2: scanning lock table for conflicting locks [5] sequence req1w2: waiting in lock wait-queues -[5] sequence req1w2: pushing txn 00000002 +[5] sequence req1w2: pushing txn 00000002 to abort [5] sequence req1w2: blocked on select in concurrency_test.(*cluster).PushTransaction sequence req=req2w2 @@ -329,7 +329,7 @@ sequence req=req2w2 [6] sequence req2w2: acquiring latches [6] sequence req2w2: scanning lock table for conflicting locks [6] sequence req2w2: waiting in lock wait-queues -[6] sequence req2w2: pushing txn 00000003 +[6] sequence req2w2: pushing txn 00000003 to abort [6] sequence req2w2: blocked on select in concurrency_test.(*cluster).PushTransaction sequence req=req3w2 @@ -340,7 +340,7 @@ sequence req=req3w2 [7] sequence req3w2: acquiring latches [7] sequence req3w2: scanning lock table for conflicting locks [7] sequence req3w2: waiting in lock wait-queues -[7] sequence req3w2: pushing txn 00000001 +[7] sequence req3w2: pushing txn 00000001 to abort [7] sequence req3w2: blocked on select in concurrency_test.(*cluster).PushTransaction [7] sequence req3w2: dependency cycle detected 00000003->00000001->00000002->00000003 @@ -376,7 +376,7 @@ on-txn-updated txn=txn1 status=aborted [5] sequence req1w2: detected pusher aborted [5] sequence req1w2: sequencing complete, returned error: TransactionAbortedError(ABORT_REASON_PUSHER_ABORTED): [7] sequence req3w2: resolving intent "a" for txn 00000001 with ABORTED status -[7] sequence req3w2: pushing txn 00000004 +[7] sequence req3w2: pushing txn 00000004 to detect request deadlock [7] sequence req3w2: blocked on select in concurrency_test.(*cluster).PushTransaction # Txn4 can proceed. @@ -520,14 +520,14 @@ sequence req=req4w [4] sequence req4w: acquiring latches [4] sequence req4w: scanning lock table for conflicting locks [4] sequence req4w: waiting in lock wait-queues -[4] sequence req4w: pushing txn 00000002 +[4] sequence req4w: pushing txn 00000002 to abort [4] sequence req4w: blocked on select in concurrency_test.(*cluster).PushTransaction on-txn-updated txn=txn2 status=committed ---- [-] update txn: committing txn2 [4] sequence req4w: resolving intent "b" for txn 00000002 with COMMITTED status -[4] sequence req4w: pushing txn 00000003 +[4] sequence req4w: pushing txn 00000003 to abort [4] sequence req4w: blocked on select in concurrency_test.(*cluster).PushTransaction debug-lock-table @@ -562,7 +562,7 @@ sequence req=req1w2 [5] sequence req1w2: acquiring latches [5] sequence req1w2: scanning lock table for conflicting locks [5] sequence req1w2: waiting in lock wait-queues -[5] sequence req1w2: pushing txn 00000004 +[5] sequence req1w2: pushing txn 00000004 to detect request deadlock [5] sequence req1w2: blocked on select in concurrency_test.(*cluster).PushTransaction sequence req=req3w2 @@ -573,7 +573,7 @@ sequence req=req3w2 [6] sequence req3w2: acquiring latches [6] sequence req3w2: scanning lock table for conflicting locks [6] sequence req3w2: waiting in lock wait-queues -[6] sequence req3w2: pushing txn 00000001 +[6] sequence req3w2: pushing txn 00000001 to abort [6] sequence req3w2: blocked on select in concurrency_test.(*cluster).PushTransaction [6] sequence req3w2: dependency cycle detected 00000003->00000001->00000004->00000003 @@ -740,14 +740,14 @@ sequence req=req4w [4] sequence req4w: acquiring latches [4] sequence req4w: scanning lock table for conflicting locks [4] sequence req4w: waiting in lock wait-queues -[4] sequence req4w: pushing txn 00000002 +[4] sequence req4w: pushing txn 00000002 to abort [4] sequence req4w: blocked on select in concurrency_test.(*cluster).PushTransaction on-txn-updated txn=txn2 status=committed ---- [-] update txn: committing txn2 [4] sequence req4w: resolving intent "b" for txn 00000002 with COMMITTED status -[4] sequence req4w: pushing txn 00000003 +[4] sequence req4w: pushing txn 00000003 to abort [4] sequence req4w: blocked on select in concurrency_test.(*cluster).PushTransaction debug-lock-table @@ -782,7 +782,7 @@ sequence req=req1w2 [5] sequence req1w2: acquiring latches [5] sequence req1w2: scanning lock table for conflicting locks [5] sequence req1w2: waiting in lock wait-queues -[5] sequence req1w2: pushing txn 00000004 +[5] sequence req1w2: pushing txn 00000004 to detect request deadlock [5] sequence req1w2: blocked on select in concurrency_test.(*cluster).PushTransaction sequence req=req3w2 @@ -793,7 +793,7 @@ sequence req=req3w2 [6] sequence req3w2: acquiring latches [6] sequence req3w2: scanning lock table for conflicting locks [6] sequence req3w2: waiting in lock wait-queues -[6] sequence req3w2: pushing txn 00000001 +[6] sequence req3w2: pushing txn 00000001 to abort [6] sequence req3w2: blocked on select in concurrency_test.(*cluster).PushTransaction [6] sequence req3w2: dependency cycle detected 00000003->00000001->00000004->00000003 @@ -972,7 +972,7 @@ sequence req=req5w [4] sequence req5w: acquiring latches [4] sequence req5w: scanning lock table for conflicting locks [4] sequence req5w: waiting in lock wait-queues -[4] sequence req5w: pushing txn 00000002 +[4] sequence req5w: pushing txn 00000002 to abort [4] sequence req5w: blocked on select in concurrency_test.(*cluster).PushTransaction sequence req=req4w @@ -981,24 +981,24 @@ sequence req=req4w [5] sequence req4w: acquiring latches [5] sequence req4w: scanning lock table for conflicting locks [5] sequence req4w: waiting in lock wait-queues -[5] sequence req4w: pushing txn 00000001 +[5] sequence req4w: pushing txn 00000001 to abort [5] sequence req4w: blocked on select in concurrency_test.(*cluster).PushTransaction on-txn-updated txn=txn1 status=committed ---- [-] update txn: committing txn1 [5] sequence req4w: resolving intent "a" for txn 00000001 with COMMITTED status -[5] sequence req4w: pushing txn 00000002 +[5] sequence req4w: pushing txn 00000002 to abort [5] sequence req4w: blocked on select in concurrency_test.(*cluster).PushTransaction on-txn-updated txn=txn2 status=committed ---- [-] update txn: committing txn2 [4] sequence req5w: resolving intent "b" for txn 00000002 with COMMITTED status -[4] sequence req5w: pushing txn 00000003 +[4] sequence req5w: pushing txn 00000003 to abort [4] sequence req5w: blocked on select in concurrency_test.(*cluster).PushTransaction [5] sequence req4w: resolving intent "b" for txn 00000002 with COMMITTED status -[5] sequence req4w: pushing txn 00000005 +[5] sequence req4w: pushing txn 00000005 to detect request deadlock [5] sequence req4w: blocked on select in concurrency_test.(*cluster).PushTransaction debug-lock-table @@ -1034,7 +1034,7 @@ sequence req=req3w2 [6] sequence req3w2: acquiring latches [6] sequence req3w2: scanning lock table for conflicting locks [6] sequence req3w2: waiting in lock wait-queues -[6] sequence req3w2: pushing txn 00000004 +[6] sequence req3w2: pushing txn 00000004 to detect request deadlock [6] sequence req3w2: blocked on select in concurrency_test.(*cluster).PushTransaction [6] sequence req3w2: dependency cycle detected 00000003->00000004->00000005->00000003 diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/discovered_lock b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/discovered_lock index 63a6db003564..e0fb5f43f13c 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/discovered_lock +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/discovered_lock @@ -1,6 +1,6 @@ # ------------------------------------------------------------- # Read-only request runs into replicated intent. It informs the -# lock table and waits for the intent to be resolved. +# lock-table and waits for the intent to be resolved. # ------------------------------------------------------------- new-txn name=txn1 ts=10,1 epoch=0 @@ -22,7 +22,7 @@ sequence req=req1 handle-write-intent-error req=req1 txn=txn1 key=k ---- -[-] handle write intent error req1: handling conflicting intents on "k" +[2] handle write intent error req1: handled conflicting intents on "k", released latches debug-lock-table ---- @@ -33,24 +33,185 @@ local: num=0 sequence req=req1 ---- -[2] sequence req1: re-sequencing request -[2] sequence req1: acquiring latches -[2] sequence req1: scanning lock table for conflicting locks -[2] sequence req1: waiting in lock wait-queues -[2] sequence req1: pushing txn 00000001 -[2] sequence req1: blocked on select in concurrency_test.(*cluster).PushTransaction +[3] sequence req1: re-sequencing request +[3] sequence req1: acquiring latches +[3] sequence req1: scanning lock table for conflicting locks +[3] sequence req1: waiting in lock wait-queues +[3] sequence req1: pushing timestamp of txn 00000001 above 0.000000012,1 +[3] sequence req1: blocked on select in concurrency_test.(*cluster).PushTransaction on-txn-updated txn=txn1 status=aborted ---- [-] update txn: aborting txn1 -[2] sequence req1: resolving intent "k" for txn 00000001 with ABORTED status -[2] sequence req1: acquiring latches -[2] sequence req1: scanning lock table for conflicting locks -[2] sequence req1: sequencing complete, returned guard +[3] sequence req1: resolving intent "k" for txn 00000001 with ABORTED status +[3] sequence req1: acquiring latches +[3] sequence req1: scanning lock table for conflicting locks +[3] sequence req1: sequencing complete, returned guard finish req=req1 ---- [-] finish req1: finishing request -reset +reset namespace +---- + +# ------------------------------------------------------------- +# Read-only request runs into replicated intent while the +# lock-table is disabled. The lock-table cannot store the lock, +# so the request is forced to push (PUSH_TIMESTAMP) immediately. +# ------------------------------------------------------------- + +new-txn name=txn1 ts=10,1 epoch=0 +---- + +new-txn name=txn2 ts=12,1 epoch=0 +---- + +new-request name=req1 txn=txn2 ts=12,1 + get key=k +---- + +on-lease-updated leaseholder=false +---- +[-] transfer lease: released + +sequence req=req1 +---- +[1] sequence req1: sequencing request +[1] sequence req1: acquiring latches +[1] sequence req1: scanning lock table for conflicting locks +[1] sequence req1: sequencing complete, returned guard + +handle-write-intent-error req=req1 txn=txn1 key=k +---- +[2] handle write intent error req1: pushing timestamp of txn 00000001 above 0.000000012,1 +[2] handle write intent error req1: blocked on select in concurrency_test.(*cluster).PushTransaction + +on-txn-updated txn=txn1 status=aborted +---- +[-] update txn: aborting txn1 +[2] handle write intent error req1: resolving intent "k" for txn 00000001 with ABORTED status +[2] handle write intent error req1: handled conflicting intents on "k", released latches + +debug-lock-table +---- +global: num=0 +local: num=0 + +sequence req=req1 +---- +[3] sequence req1: re-sequencing request +[3] sequence req1: acquiring latches +[3] sequence req1: scanning lock table for conflicting locks +[3] sequence req1: sequencing complete, returned guard + +finish req=req1 +---- +[-] finish req1: finishing request + +reset namespace +---- + +# ------------------------------------------------------------- +# Read-write request runs into replicated intent while the +# lock-table is disabled. The lock-table cannot store the lock, +# so the request is forced to push (PUSH_ABORT) immediately. +# ------------------------------------------------------------- + +new-txn name=txn1 ts=10,1 epoch=0 +---- + +new-txn name=txn2 ts=12,1 epoch=0 +---- + +new-request name=req1 txn=txn2 ts=12,1 + put key=k value=v +---- + +on-lease-updated leaseholder=false +---- +[-] transfer lease: released + +sequence req=req1 +---- +[1] sequence req1: sequencing request +[1] sequence req1: acquiring latches +[1] sequence req1: scanning lock table for conflicting locks +[1] sequence req1: sequencing complete, returned guard + +handle-write-intent-error req=req1 txn=txn1 key=k +---- +[2] handle write intent error req1: pushing txn 00000001 to abort +[2] handle write intent error req1: blocked on select in concurrency_test.(*cluster).PushTransaction + +on-txn-updated txn=txn1 status=aborted +---- +[-] update txn: aborting txn1 +[2] handle write intent error req1: resolving intent "k" for txn 00000001 with ABORTED status +[2] handle write intent error req1: handled conflicting intents on "k", released latches + +debug-lock-table +---- +global: num=0 +local: num=0 + +sequence req=req1 +---- +[3] sequence req1: re-sequencing request +[3] sequence req1: acquiring latches +[3] sequence req1: scanning lock table for conflicting locks +[3] sequence req1: sequencing complete, returned guard + +finish req=req1 +---- +[-] finish req1: finishing request + +reset namespace +---- + +# ------------------------------------------------------------- +# Read-write request runs into replicated intent while the +# lock-table is disabled. The lock-table cannot store the lock, +# so the request is forced to push (PUSH_ABORT) immediately. +# The request's own transaction is aborted while pushing. +# ------------------------------------------------------------- + +new-txn name=txn1 ts=10,1 epoch=0 +---- + +new-txn name=txn2 ts=12,1 epoch=0 +---- + +new-request name=req1 txn=txn2 ts=12,1 + get key=k +---- + +on-lease-updated leaseholder=false +---- +[-] transfer lease: released + +sequence req=req1 +---- +[1] sequence req1: sequencing request +[1] sequence req1: acquiring latches +[1] sequence req1: scanning lock table for conflicting locks +[1] sequence req1: sequencing complete, returned guard + +handle-write-intent-error req=req1 txn=txn1 key=k +---- +[2] handle write intent error req1: pushing timestamp of txn 00000001 above 0.000000012,1 +[2] handle write intent error req1: blocked on select in concurrency_test.(*cluster).PushTransaction + +on-txn-updated txn=txn2 status=aborted +---- +[-] update txn: aborting txn2 +[2] handle write intent error req1: detected pusher aborted +[2] handle write intent error req1: handled conflicting intents on "k", returned error: TransactionAbortedError(ABORT_REASON_PUSHER_ABORTED): + +debug-lock-table +---- +global: num=0 +local: num=0 + +reset namespace ---- diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/range_state_listener b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/range_state_listener index 02e13a0f108b..410289b1d21f 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/range_state_listener +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/range_state_listener @@ -10,12 +10,14 @@ debug-disable-txn-pushes # OnRangeLeaseUpdated - losing this lease disables the # lock-table and acquiring the lease enables the lock-table. # -# Setup: txn1 acquires lock +# Setup: txn1 acquires locks on k and k2 # # Test: txn2 enters lock's wait-queue # replica loses lease # txn2 proceeds -# txn2 discovers txn1's lock (ignored) +# txn2 discovers txn1's lock on k while writing (not ignored, waits) +# txn2 re-sequences +# txn2 discovers txn1's lock on k2 while reading (not ignored, waits) # txn2 re-sequences # txn1 lock is released (ignored) # txn2 proceeds and acquires lock (ignored) @@ -39,11 +41,13 @@ new-txn name=txn3 ts=10,1 epoch=0 ---- new-request name=req1 txn=txn1 ts=10,1 - put key=k value=v + put key=k value=v + put key=k2 value=v ---- new-request name=req2 txn=txn2 ts=10,1 - put key=k value=v + put key=k value=v + get key=k2 ---- new-request name=req3 txn=txn3 ts=10,1 @@ -61,15 +65,21 @@ on-lock-acquired txn=txn1 key=k ---- [-] acquire lock: txn1 @ k +on-lock-acquired txn=txn1 key=k2 +---- +[-] acquire lock: txn1 @ k2 + finish req=req1 ---- [-] finish req1: finishing request debug-lock-table ---- -global: num=1 +global: num=2 lock: "k" holder: txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000010,1, info: unrepl epoch: 0, seqs: [0] + lock: "k2" + holder: txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000010,1, info: unrepl epoch: 0, seqs: [0] local: num=0 # -------------------------------- @@ -86,12 +96,14 @@ sequence req=req2 debug-lock-table ---- -global: num=1 +global: num=2 lock: "k" holder: txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000010,1, info: unrepl epoch: 0, seqs: [0] queued writers: active: true req: 2, txn: 00000002-0000-0000-0000-000000000000 distinguished req: 2 + lock: "k2" + holder: txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000010,1, info: unrepl epoch: 0, seqs: [0] local: num=0 # Replica loses lease. @@ -109,7 +121,14 @@ local: num=0 handle-write-intent-error req=req2 txn=txn1 key=k ---- -[-] handle write intent error req2: handling conflicting intents on "k" +[3] handle write intent error req2: pushing txn 00000001 to abort +[3] handle write intent error req2: blocked on select in concurrency_test.(*cluster).PushTransaction + +on-txn-updated txn=txn1 status=committed +---- +[-] update txn: committing txn1 +[3] handle write intent error req2: resolving intent "k" for txn 00000001 with COMMITTED status +[3] handle write intent error req2: handled conflicting intents on "k", released latches debug-lock-table ---- @@ -118,20 +137,29 @@ local: num=0 sequence req=req2 ---- -[3] sequence req2: re-sequencing request -[3] sequence req2: acquiring latches -[3] sequence req2: scanning lock table for conflicting locks -[3] sequence req2: sequencing complete, returned guard +[4] sequence req2: re-sequencing request +[4] sequence req2: acquiring latches +[4] sequence req2: scanning lock table for conflicting locks +[4] sequence req2: sequencing complete, returned guard -on-lock-updated txn=txn1 key=k status=committed +handle-write-intent-error req=req2 txn=txn1 key=k2 ---- -[-] update lock: committing txn1 @ k +[5] handle write intent error req2: pushing timestamp of txn 00000001 above 0.000000010,1 +[5] handle write intent error req2: resolving intent "k2" for txn 00000001 with COMMITTED status +[5] handle write intent error req2: handled conflicting intents on "k2", released latches debug-lock-table ---- global: num=0 local: num=0 +sequence req=req2 +---- +[6] sequence req2: re-sequencing request +[6] sequence req2: acquiring latches +[6] sequence req2: scanning lock table for conflicting locks +[6] sequence req2: sequencing complete, returned guard + on-lock-acquired txn=txn2 key=k ---- [-] acquire lock: txn2 @ k @@ -157,14 +185,14 @@ local: num=0 sequence req=req3 ---- -[4] sequence req3: sequencing request -[4] sequence req3: acquiring latches -[4] sequence req3: scanning lock table for conflicting locks -[4] sequence req3: sequencing complete, returned guard +[7] sequence req3: sequencing request +[7] sequence req3: acquiring latches +[7] sequence req3: scanning lock table for conflicting locks +[7] sequence req3: sequencing complete, returned guard handle-write-intent-error req=req3 txn=txn2 key=k ---- -[-] handle write intent error req3: handling conflicting intents on "k" +[8] handle write intent error req3: handled conflicting intents on "k", released latches debug-lock-table ---- @@ -177,11 +205,11 @@ local: num=0 sequence req=req3 ---- -[5] sequence req3: re-sequencing request -[5] sequence req3: acquiring latches -[5] sequence req3: scanning lock table for conflicting locks -[5] sequence req3: waiting in lock wait-queues -[5] sequence req3: blocked on select in concurrency.(*lockTableWaiterImpl).WaitOn +[9] sequence req3: re-sequencing request +[9] sequence req3: acquiring latches +[9] sequence req3: scanning lock table for conflicting locks +[9] sequence req3: waiting in lock wait-queues +[9] sequence req3: blocked on select in concurrency.(*lockTableWaiterImpl).WaitOn debug-lock-table ---- @@ -196,9 +224,9 @@ local: num=0 on-lock-updated txn=txn2 key=k status=committed ---- [-] update lock: committing txn2 @ k -[5] sequence req3: acquiring latches -[5] sequence req3: scanning lock table for conflicting locks -[5] sequence req3: sequencing complete, returned guard +[9] sequence req3: acquiring latches +[9] sequence req3: scanning lock table for conflicting locks +[9] sequence req3: sequencing complete, returned guard debug-lock-table ---- @@ -316,7 +344,7 @@ local: num=0 handle-write-intent-error req=req2 txn=txn1 key=k ---- -[-] handle write intent error req2: handling conflicting intents on "k" +[3] handle write intent error req2: handled conflicting intents on "k", released latches debug-lock-table ---- @@ -329,18 +357,18 @@ local: num=0 sequence req=req2 ---- -[3] sequence req2: re-sequencing request -[3] sequence req2: acquiring latches -[3] sequence req2: scanning lock table for conflicting locks -[3] sequence req2: waiting in lock wait-queues -[3] sequence req2: blocked on select in concurrency.(*lockTableWaiterImpl).WaitOn +[4] sequence req2: re-sequencing request +[4] sequence req2: acquiring latches +[4] sequence req2: scanning lock table for conflicting locks +[4] sequence req2: waiting in lock wait-queues +[4] sequence req2: blocked on select in concurrency.(*lockTableWaiterImpl).WaitOn on-lock-updated txn=txn1 key=k status=committed ---- [-] update lock: committing txn1 @ k -[3] sequence req2: acquiring latches -[3] sequence req2: scanning lock table for conflicting locks -[3] sequence req2: sequencing complete, returned guard +[4] sequence req2: acquiring latches +[4] sequence req2: scanning lock table for conflicting locks +[4] sequence req2: sequencing complete, returned guard debug-lock-table ---- @@ -373,12 +401,14 @@ subtest end # OnRangeMerge - a Range merge clears the lock-table and # disables it. # -# Setup: txn1 acquires lock +# Setup: txn1 acquires lock on k and k2 # # Test: txn2 enters lock's wait-queue # range is merged # txn2 proceeds -# txn2 discovers txn1's lock (ignored) +# txn2 discovers txn1's lock on k while writing (not ignored, waits) +# txn2 re-sequences +# txn2 discovers txn1's lock on k2 while reading (not ignored, waits) # txn2 re-sequences # txn1 lock is released (ignored) # txn2 proceeds and acquires lock (ignored) @@ -393,11 +423,13 @@ new-txn name=txn2 ts=10,1 epoch=0 ---- new-request name=req1 txn=txn1 ts=10,1 - put key=k value=v + put key=k value=v + put key=k2 value=v ---- new-request name=req2 txn=txn2 ts=10,1 - put key=k value=v + put key=k value=v + get key=k2 ---- sequence req=req1 @@ -411,15 +443,21 @@ on-lock-acquired txn=txn1 key=k ---- [-] acquire lock: txn1 @ k +on-lock-acquired txn=txn1 key=k2 +---- +[-] acquire lock: txn1 @ k2 + finish req=req1 ---- [-] finish req1: finishing request debug-lock-table ---- -global: num=1 +global: num=2 lock: "k" holder: txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000010,1, info: unrepl epoch: 0, seqs: [0] + lock: "k2" + holder: txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000010,1, info: unrepl epoch: 0, seqs: [0] local: num=0 # -------------------------------- @@ -436,12 +474,14 @@ sequence req=req2 debug-lock-table ---- -global: num=1 +global: num=2 lock: "k" holder: txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000010,1, info: unrepl epoch: 0, seqs: [0] queued writers: active: true req: 7, txn: 00000002-0000-0000-0000-000000000000 distinguished req: 7 + lock: "k2" + holder: txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000010,1, info: unrepl epoch: 0, seqs: [0] local: num=0 on-merge @@ -458,7 +498,14 @@ local: num=0 handle-write-intent-error req=req2 txn=txn1 key=k ---- -[-] handle write intent error req2: handling conflicting intents on "k" +[3] handle write intent error req2: pushing txn 00000001 to abort +[3] handle write intent error req2: blocked on select in concurrency_test.(*cluster).PushTransaction + +on-txn-updated txn=txn1 status=committed +---- +[-] update txn: committing txn1 +[3] handle write intent error req2: resolving intent "k" for txn 00000001 with COMMITTED status +[3] handle write intent error req2: handled conflicting intents on "k", released latches debug-lock-table ---- @@ -467,20 +514,29 @@ local: num=0 sequence req=req2 ---- -[3] sequence req2: re-sequencing request -[3] sequence req2: acquiring latches -[3] sequence req2: scanning lock table for conflicting locks -[3] sequence req2: sequencing complete, returned guard +[4] sequence req2: re-sequencing request +[4] sequence req2: acquiring latches +[4] sequence req2: scanning lock table for conflicting locks +[4] sequence req2: sequencing complete, returned guard -on-lock-updated txn=txn1 key=k status=committed +handle-write-intent-error req=req2 txn=txn1 key=k2 ---- -[-] update lock: committing txn1 @ k +[5] handle write intent error req2: pushing timestamp of txn 00000001 above 0.000000010,1 +[5] handle write intent error req2: resolving intent "k2" for txn 00000001 with COMMITTED status +[5] handle write intent error req2: handled conflicting intents on "k2", released latches debug-lock-table ---- global: num=0 local: num=0 +sequence req=req2 +---- +[6] sequence req2: re-sequencing request +[6] sequence req2: acquiring latches +[6] sequence req2: scanning lock table for conflicting locks +[6] sequence req2: sequencing complete, returned guard + on-lock-acquired txn=txn2 key=k ---- [-] acquire lock: txn2 @ k @@ -588,7 +644,7 @@ local: num=0 handle-write-intent-error req=req2 txn=txn1 key=k ---- -[-] handle write intent error req2: handling conflicting intents on "k" +[3] handle write intent error req2: handled conflicting intents on "k", released latches debug-lock-table ---- @@ -601,18 +657,18 @@ local: num=0 sequence req=req2 ---- -[3] sequence req2: re-sequencing request -[3] sequence req2: acquiring latches -[3] sequence req2: scanning lock table for conflicting locks -[3] sequence req2: waiting in lock wait-queues -[3] sequence req2: blocked on select in concurrency.(*lockTableWaiterImpl).WaitOn +[4] sequence req2: re-sequencing request +[4] sequence req2: acquiring latches +[4] sequence req2: scanning lock table for conflicting locks +[4] sequence req2: waiting in lock wait-queues +[4] sequence req2: blocked on select in concurrency.(*lockTableWaiterImpl).WaitOn on-lock-updated txn=txn1 key=k status=committed ---- [-] update lock: committing txn1 @ k -[3] sequence req2: acquiring latches -[3] sequence req2: scanning lock table for conflicting locks -[3] sequence req2: sequencing complete, returned guard +[4] sequence req2: acquiring latches +[4] sequence req2: scanning lock table for conflicting locks +[4] sequence req2: sequencing complete, returned guard debug-lock-table ---- diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/uncertainty b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/uncertainty new file mode 100644 index 000000000000..035cf1e73aee --- /dev/null +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/uncertainty @@ -0,0 +1,118 @@ +# ------------------------------------------------------------- +# A transactional (txn2) read-only request runs into replicated +# intent below its read timestamp. It informs the lock table and +# pushes the intent's transaction (txn1) above its uncertainty +# window. The push succeeds and the request is able to proceed. +# ------------------------------------------------------------- + +new-txn name=txn1 ts=10,1 epoch=0 +---- + +new-txn name=txn2 ts=12,1 epoch=0 maxts=15,1 +---- + +new-request name=req1 txn=txn2 ts=12,1 + get key=k +---- + +sequence req=req1 +---- +[1] sequence req1: sequencing request +[1] sequence req1: acquiring latches +[1] sequence req1: scanning lock table for conflicting locks +[1] sequence req1: sequencing complete, returned guard + +handle-write-intent-error req=req1 txn=txn1 key=k +---- +[2] handle write intent error req1: handled conflicting intents on "k", released latches + +debug-lock-table +---- +global: num=1 + lock: "k" + holder: txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [0] +local: num=0 + +sequence req=req1 +---- +[3] sequence req1: re-sequencing request +[3] sequence req1: acquiring latches +[3] sequence req1: scanning lock table for conflicting locks +[3] sequence req1: waiting in lock wait-queues +[3] sequence req1: pushing timestamp of txn 00000001 above 0.000000015,1 +[3] sequence req1: blocked on select in concurrency_test.(*cluster).PushTransaction + +on-txn-updated txn=txn1 status=pending ts=15,2 +---- +[-] update txn: increasing timestamp of txn1 +[3] sequence req1: resolving intent "k" for txn 00000001 with PENDING status +[3] sequence req1: acquiring latches +[3] sequence req1: scanning lock table for conflicting locks +[3] sequence req1: sequencing complete, returned guard + +finish req=req1 +---- +[-] finish req1: finishing request + +reset namespace +---- + +# ------------------------------------------------------------- +# A transactional (txn2) read-only request runs into replicated +# intent above its read timestamp but in its uncertainty window. +# It informs the lock table and pushes the intent's transaction +# (txn1) above its uncertainty window. The push succeeds and the +# request is able to proceed. +# ------------------------------------------------------------- + +new-txn name=txn1 ts=14,1 epoch=0 +---- + +new-txn name=txn2 ts=12,1 epoch=0 maxts=15,1 +---- + +new-request name=req1 txn=txn2 ts=12,1 + get key=k +---- + +sequence req=req1 +---- +[1] sequence req1: sequencing request +[1] sequence req1: acquiring latches +[1] sequence req1: scanning lock table for conflicting locks +[1] sequence req1: sequencing complete, returned guard + +handle-write-intent-error req=req1 txn=txn1 key=k +---- +[2] handle write intent error req1: handled conflicting intents on "k", released latches + +debug-lock-table +---- +global: num=1 + lock: "k" + holder: txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000014,1, info: repl epoch: 0, seqs: [0] +local: num=0 + +sequence req=req1 +---- +[3] sequence req1: re-sequencing request +[3] sequence req1: acquiring latches +[3] sequence req1: scanning lock table for conflicting locks +[3] sequence req1: waiting in lock wait-queues +[3] sequence req1: pushing timestamp of txn 00000001 above 0.000000015,1 +[3] sequence req1: blocked on select in concurrency_test.(*cluster).PushTransaction + +on-txn-updated txn=txn1 status=pending ts=15,2 +---- +[-] update txn: increasing timestamp of txn1 +[3] sequence req1: resolving intent "k" for txn 00000001 with PENDING status +[3] sequence req1: acquiring latches +[3] sequence req1: scanning lock table for conflicting locks +[3] sequence req1: sequencing complete, returned guard + +finish req=req1 +---- +[-] finish req1: finishing request + +reset namespace +---- diff --git a/pkg/kv/kvserver/queue.go b/pkg/kv/kvserver/queue.go index dd97e92e665f..b038f15ecef8 100644 --- a/pkg/kv/kvserver/queue.go +++ b/pkg/kv/kvserver/queue.go @@ -242,7 +242,7 @@ type replicaInQueue interface { IsDestroyed() (DestroyReason, error) Desc() *roachpb.RangeDescriptor maybeInitializeRaftGroup(context.Context) - redirectOnOrAcquireLease(context.Context) (storagepb.LeaseStatus, hlc.Timestamp, *roachpb.Error) + redirectOnOrAcquireLease(context.Context) (storagepb.LeaseStatus, *roachpb.Error) IsLeaseValid(roachpb.Lease, hlc.Timestamp) bool GetLease() (roachpb.Lease, roachpb.Lease) } @@ -932,7 +932,7 @@ func (bq *baseQueue) processReplica(ctx context.Context, repl replicaInQueue) er // order to be processed, check whether this replica has range lease // and renew or acquire if necessary. if bq.needsLease { - if _, _, pErr := repl.redirectOnOrAcquireLease(ctx); pErr != nil { + if _, pErr := repl.redirectOnOrAcquireLease(ctx); pErr != nil { switch v := pErr.GetDetail().(type) { case *roachpb.NotLeaseHolderError, *roachpb.RangeNotFoundError: log.VEventf(ctx, 3, "%s; skipping", v) diff --git a/pkg/kv/kvserver/queue_concurrency_test.go b/pkg/kv/kvserver/queue_concurrency_test.go index 3440bc120e0f..13e123aec081 100644 --- a/pkg/kv/kvserver/queue_concurrency_test.go +++ b/pkg/kv/kvserver/queue_concurrency_test.go @@ -169,9 +169,9 @@ func (fr *fakeReplica) Desc() *roachpb.RangeDescriptor { func (fr *fakeReplica) maybeInitializeRaftGroup(context.Context) {} func (fr *fakeReplica) redirectOnOrAcquireLease( context.Context, -) (storagepb.LeaseStatus, hlc.Timestamp, *roachpb.Error) { +) (storagepb.LeaseStatus, *roachpb.Error) { // baseQueue only checks that the returned error is nil. - return storagepb.LeaseStatus{}, hlc.Timestamp{}, nil + return storagepb.LeaseStatus{}, nil } func (fr *fakeReplica) IsLeaseValid(roachpb.Lease, hlc.Timestamp) bool { return true } func (fr *fakeReplica) GetLease() (roachpb.Lease, roachpb.Lease) { diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 52a1a6247a72..285feb5017c8 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -714,7 +714,7 @@ func (r *Replica) GetGCThreshold() hlc.Timestamp { // is enabled and the TTL has passed. If this is an admin command or this range // contains data outside of the user keyspace, we return the true GC threshold. func (r *Replica) getImpliedGCThresholdRLocked( - now hlc.Timestamp, st *storagepb.LeaseStatus, isAdmin bool, + st *storagepb.LeaseStatus, isAdmin bool, ) hlc.Timestamp { threshold := *r.mu.state.GCThreshold @@ -733,11 +733,11 @@ func (r *Replica) getImpliedGCThresholdRLocked( // user experience win; it's always safe to allow reads to continue so long // as they are after the GC threshold. c := r.mu.cachedProtectedTS - if st == nil || c.readAt.Less(st.Lease.Start) { + if st.State != storagepb.LeaseState_VALID || c.readAt.Less(st.Lease.Start) { return threshold } - impliedThreshold := gc.CalculateThreshold(now, *r.mu.zone.GC) + impliedThreshold := gc.CalculateThreshold(st.Timestamp, *r.mu.zone.GC) threshold.Forward(impliedThreshold) // If we have a protected timestamp record which precedes the implied @@ -1002,7 +1002,7 @@ func (r *Replica) assertStateLocked(ctx context.Context, reader storage.Reader) // they know that they will end up checking for a pending merge at some later // time. func (r *Replica) checkExecutionCanProceed( - ba *roachpb.BatchRequest, g *concurrency.Guard, now hlc.Timestamp, st *storagepb.LeaseStatus, + ba *roachpb.BatchRequest, g *concurrency.Guard, st *storagepb.LeaseStatus, ) error { rSpan, err := keys.Range(ba.Requests) if err != nil { @@ -1014,7 +1014,7 @@ func (r *Replica) checkExecutionCanProceed( return err } else if err := r.checkSpanInRangeRLocked(rSpan); err != nil { return err - } else if err := r.checkTSAboveGCThresholdRLocked(ba.Timestamp, now, st, ba.IsAdmin()); err != nil { + } else if err := r.checkTSAboveGCThresholdRLocked(ba.Timestamp, st, ba.IsAdmin()); err != nil { return err } else if g.HoldingLatches() && st != nil { // Only check for a pending merge if latches are held and the Range @@ -1030,13 +1030,15 @@ func (r *Replica) checkExecutionCanProceed( func (r *Replica) checkExecutionCanProceedForRangeFeed( rSpan roachpb.RSpan, ts hlc.Timestamp, ) error { + now := r.Clock().Now() r.mu.RLock() defer r.mu.RUnlock() + status := r.leaseStatus(*r.mu.state.Lease, now, r.mu.minLeaseProposedTS) if _, err := r.isDestroyedRLocked(); err != nil { return err } else if err := r.checkSpanInRangeRLocked(rSpan); err != nil { return err - } else if err := r.checkTSAboveGCThresholdRLocked(ts, r.Clock().Now(), nil, false /* isAdmin */); err != nil { + } else if err := r.checkTSAboveGCThresholdRLocked(ts, &status, false /* isAdmin */); err != nil { return err } else if r.requiresExpiringLeaseRLocked() { // Ensure that the range does not require an expiration-based lease. If it @@ -1062,9 +1064,9 @@ func (r *Replica) checkSpanInRangeRLocked(rspan roachpb.RSpan) error { // checkTSAboveGCThresholdRLocked returns an error if a request (identified // by its MVCC timestamp) can be run on the replica. func (r *Replica) checkTSAboveGCThresholdRLocked( - ts, now hlc.Timestamp, st *storagepb.LeaseStatus, isAdmin bool, + ts hlc.Timestamp, st *storagepb.LeaseStatus, isAdmin bool, ) error { - threshold := r.getImpliedGCThresholdRLocked(now, st, isAdmin) + threshold := r.getImpliedGCThresholdRLocked(st, isAdmin) if threshold.Less(ts) { return nil } diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index b457a6dab009..ddba3d5b4d83 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -525,7 +525,7 @@ func (r *Replica) executeAdminCommandWithDescriptor( // Without the lease, a replica's local descriptor can be arbitrarily // stale, which will result in a ConditionFailedError. To avoid this, we // make sure that we still have the lease before each attempt. - if _, _, pErr := r.redirectOnOrAcquireLease(ctx); pErr != nil { + if _, pErr := r.redirectOnOrAcquireLease(ctx); pErr != nil { return pErr } diff --git a/pkg/kv/kvserver/replica_follower_read.go b/pkg/kv/kvserver/replica_follower_read.go index 1f3d9024e1c6..9430482dba2b 100644 --- a/pkg/kv/kvserver/replica_follower_read.go +++ b/pkg/kv/kvserver/replica_follower_read.go @@ -31,26 +31,14 @@ var FollowerReadsEnabled = settings.RegisterPublicBoolSetting( true, ) -// canServeFollowerRead tests, when a range lease could not be -// acquired, whether the read only batch can be served as a follower -// read despite the error. +// canServeFollowerRead tests, when a range lease could not be acquired, whether +// the batch can be served as a follower read despite the error. Only +// non-locking, read-only requests can be served as follower reads. The batch +// must be composed exclusively only this kind of request to be accepted as a +// follower read. func (r *Replica) canServeFollowerRead( ctx context.Context, ba *roachpb.BatchRequest, pErr *roachpb.Error, ) *roachpb.Error { - // There's no known reason that a non-VOTER_FULL replica couldn't serve follower - // reads (or RangeFeed), but as of the time of writing, these are expected - // to be short-lived, so it's not worth working out the edge-cases. Revisit if - // we add long-lived learners or feel that incoming/outgoing voters also need - // to be able to serve follower reads. - repDesc, err := r.GetReplicaDescriptor() - if err != nil { - return roachpb.NewError(err) - } - if typ := repDesc.GetType(); typ != roachpb.VOTER_FULL { - log.Eventf(ctx, "%s replicas cannot serve follower reads", typ) - return pErr - } - canServeFollowerRead := false if lErr, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError); ok && lErr.LeaseHolder != nil && lErr.Lease.Type() == roachpb.LeaseEpoch && @@ -58,6 +46,20 @@ func (r *Replica) canServeFollowerRead( (ba.Txn == nil || !ba.Txn.IsLocking()) && // followerreadsccl.txnCanPerformFollowerRead FollowerReadsEnabled.Get(&r.store.cfg.Settings.SV) { + // There's no known reason that a non-VOTER_FULL replica couldn't serve follower + // reads (or RangeFeed), but as of the time of writing, these are expected + // to be short-lived, so it's not worth working out the edge-cases. Revisit if + // we add long-lived learners or feel that incoming/outgoing voters also need + // to be able to serve follower reads. + repDesc, err := r.GetReplicaDescriptor() + if err != nil { + return roachpb.NewError(err) + } + if typ := repDesc.GetType(); typ != roachpb.VOTER_FULL { + log.Eventf(ctx, "%s replicas cannot serve follower reads", typ) + return pErr + } + ts := ba.Timestamp if ba.Txn != nil { ts.Forward(ba.Txn.MaxTimestamp) @@ -91,6 +93,7 @@ func (r *Replica) canServeFollowerRead( // TODO(tschottdorf): once a read for a timestamp T has been served, the replica may // serve reads for that and smaller timestamps forever. log.Event(ctx, "serving via follower read") + r.store.metrics.FollowerReadsCount.Inc(1) return nil } diff --git a/pkg/kv/kvserver/replica_gossip.go b/pkg/kv/kvserver/replica_gossip.go index a662d2324812..48ae1ab0b244 100644 --- a/pkg/kv/kvserver/replica_gossip.go +++ b/pkg/kv/kvserver/replica_gossip.go @@ -216,7 +216,7 @@ func (r *Replica) getLeaseForGossip(ctx context.Context) (bool, *roachpb.Error) ctx, "storage.Replica: acquiring lease to gossip", func(ctx context.Context) { // Check for or obtain the lease, if none active. - _, _, pErr = r.redirectOnOrAcquireLease(ctx) + _, pErr = r.redirectOnOrAcquireLease(ctx) hasLease = pErr == nil if pErr != nil { switch e := pErr.GetDetail().(type) { diff --git a/pkg/kv/kvserver/replica_protected_timestamp.go b/pkg/kv/kvserver/replica_protected_timestamp.go index 694734e20ee1..fb8a80b9ed3d 100644 --- a/pkg/kv/kvserver/replica_protected_timestamp.go +++ b/pkg/kv/kvserver/replica_protected_timestamp.go @@ -144,7 +144,7 @@ func (r *Replica) protectedTimestampRecordCurrentlyApplies( // record or we're not and if we don't then we'll push the cache and re-assert // that we're still the leaseholder. If somebody else becomes the leaseholder // then they will have to go through the same process. - ls, _, pErr := r.redirectOnOrAcquireLease(ctx) + ls, pErr := r.redirectOnOrAcquireLease(ctx) if pErr != nil { return false, false, pErr.GoError() } diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index 3cb3e2091d74..ecbbc79c2a0b 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -807,14 +807,14 @@ func newNotLeaseHolderError( // leaseGoodToGo is a fast-path for lease checks which verifies that an // existing lease is valid and owned by the current store. This method should // not be called directly. Use redirectOnOrAcquireLease instead. -func (r *Replica) leaseGoodToGo(ctx context.Context) (storagepb.LeaseStatus, hlc.Timestamp, bool) { +func (r *Replica) leaseGoodToGo(ctx context.Context) (storagepb.LeaseStatus, bool) { timestamp := r.store.Clock().Now() r.mu.RLock() defer r.mu.RUnlock() if r.requiresExpiringLeaseRLocked() { // Slow-path for expiration-based leases. - return storagepb.LeaseStatus{}, hlc.Timestamp{}, false + return storagepb.LeaseStatus{}, false } status := r.leaseStatus(*r.mu.state.Lease, timestamp, r.mu.minLeaseProposedTS) @@ -823,30 +823,33 @@ func (r *Replica) leaseGoodToGo(ctx context.Context) (storagepb.LeaseStatus, hlc if repDesc, err := r.getReplicaDescriptorRLocked(); err == nil { if _, ok := r.mu.pendingLeaseRequest.TransferInProgress(repDesc.ReplicaID); !ok { // ...and there is no transfer pending. - return status, timestamp, true + return status, true } } } - return storagepb.LeaseStatus{}, hlc.Timestamp{}, false + return storagepb.LeaseStatus{}, false } // redirectOnOrAcquireLease checks whether this replica has the lease at the -// current timestamp. If it does, returns success and the current view of time. -// If another replica currently holds the lease, redirects by returning +// current timestamp. If it does, returns the lease and its status. If +// another replica currently holds the lease, redirects by returning // NotLeaseHolderError. If the lease is expired, a renewal is synchronously -// requested. Leases are eagerly renewed when a request with a timestamp within -// rangeLeaseRenewalDuration of the lease expiration is served. +// requested. Leases are eagerly renewed when a request with a timestamp +// within rangeLeaseRenewalDuration of the lease expiration is served. // // TODO(spencer): for write commands, don't wait while requesting // the range lease. If the lease acquisition fails, the write cmd // will fail as well. If it succeeds, as is likely, then the write // will not incur latency waiting for the command to complete. // Reads, however, must wait. +// +// TODO(rangeLeaseRenewalDuration): what is rangeLeaseRenewalDuration +// referring to? It appears to have rotted. func (r *Replica) redirectOnOrAcquireLease( ctx context.Context, -) (storagepb.LeaseStatus, hlc.Timestamp, *roachpb.Error) { - if status, timestamp, ok := r.leaseGoodToGo(ctx); ok { - return status, timestamp, nil +) (storagepb.LeaseStatus, *roachpb.Error) { + if status, ok := r.leaseGoodToGo(ctx); ok { + return status, nil } // Loop until the lease is held or the replica ascertains the actual @@ -974,11 +977,11 @@ func (r *Replica) redirectOnOrAcquireLease( return nil, nil }() if pErr != nil { - return storagepb.LeaseStatus{}, hlc.Timestamp{}, pErr + return storagepb.LeaseStatus{}, pErr } if llHandle == nil { // We own a valid lease. - return status, timestamp, nil + return status, nil } // Wait for the range lease to finish, or the context to expire. @@ -1046,7 +1049,7 @@ func (r *Replica) redirectOnOrAcquireLease( } }() if pErr != nil { - return storagepb.LeaseStatus{}, hlc.Timestamp{}, pErr + return storagepb.LeaseStatus{}, pErr } } } diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index 0e60da0aec1b..4540dff73b0b 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -657,7 +657,7 @@ func (r *Replica) ensureClosedTimestampStarted(ctx context.Context) *roachpb.Err // Make sure there's a leaseholder. If there's no leaseholder, there's no // closed timestamp updates. var leaseholderNodeID roachpb.NodeID - _, _, err := r.redirectOnOrAcquireLease(ctx) + _, err := r.redirectOnOrAcquireLease(ctx) if err == nil { // We have the lease. Request is essentially a wrapper for calling EmitMLAI // on a remote node, so cut out the middleman. diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go index 27e1b92dd769..a995db5eaa4f 100644 --- a/pkg/kv/kvserver/replica_read.go +++ b/pkg/kv/kvserver/replica_read.go @@ -22,7 +22,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util" - "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/kr/pretty" ) @@ -32,30 +31,14 @@ import ( // iterator to evaluate the batch and then updates the timestamp cache to // reflect the key spans that it read. func (r *Replica) executeReadOnlyBatch( - ctx context.Context, ba *roachpb.BatchRequest, g *concurrency.Guard, + ctx context.Context, ba *roachpb.BatchRequest, st storagepb.LeaseStatus, g *concurrency.Guard, ) (br *roachpb.BatchResponse, _ *concurrency.Guard, pErr *roachpb.Error) { - // If the read is not inconsistent, the read requires the range lease or - // permission to serve via follower reads. - var status storagepb.LeaseStatus - var now hlc.Timestamp - if ba.ReadConsistency.RequiresReadLease() { - if status, now, pErr = r.redirectOnOrAcquireLease(ctx); pErr != nil { - if nErr := r.canServeFollowerRead(ctx, ba, pErr); nErr != nil { - return nil, g, nErr - } - r.store.metrics.FollowerReadsCount.Inc(1) - } - } else { - now = r.Clock().Now() // get a clock reading for checkExecutionCanProceed - } - r.limitTxnMaxTimestamp(ctx, ba, status) - log.Event(ctx, "waiting for read lock") r.readOnlyCmdMu.RLock() defer r.readOnlyCmdMu.RUnlock() // Verify that the batch can be executed. - if err := r.checkExecutionCanProceed(ba, g, now, &status); err != nil { + if err := r.checkExecutionCanProceed(ba, g, &st); err != nil { return nil, g, roachpb.NewError(err) } diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 4f16426ddf47..d59a6cf92a2e 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -118,9 +118,9 @@ func (r *Replica) sendWithRangeID( } // batchExecutionFn is a method on Replica that is able to execute a -// BatchRequest. It is called with the batch, along with the span bounds that -// the batch will operate over and a guard for the latches protecting the span -// bounds. +// BatchRequest. It is called with the batch, along with the status of +// the lease that the batch is operating under and a guard for the +// latches protecting the request. // // The function will return either a batch response or an error. The function // also has the option to pass ownership of the concurrency guard back to the @@ -138,7 +138,7 @@ func (r *Replica) sendWithRangeID( // the replicated state machine. In all of these cases, responsibility // for releasing the concurrency guard is handed to Raft. type batchExecutionFn func( - *Replica, context.Context, *roachpb.BatchRequest, *concurrency.Guard, + *Replica, context.Context, *roachpb.BatchRequest, storagepb.LeaseStatus, *concurrency.Guard, ) (*roachpb.BatchResponse, *concurrency.Guard, *roachpb.Error) var _ batchExecutionFn = (*Replica).executeWriteBatch @@ -184,6 +184,27 @@ func (r *Replica) executeBatchWithConcurrencyRetries( return nil, roachpb.NewError(errors.Wrap(err, "aborted during Replica.Send")) } + // Determine the lease under which to evaluate the request. + var status storagepb.LeaseStatus + if !ba.ReadConsistency.RequiresReadLease() { + // Get a clock reading for checkExecutionCanProceed. + status.Timestamp = r.Clock().Now() + } else if ba.IsSingleSkipLeaseCheckRequest() { + // For lease commands, use the provided previous lease for verification. + status.Lease = ba.GetPrevLeaseForLeaseRequest() + status.Timestamp = r.Clock().Now() + } else { + // If the request is a write or a consistent read, it requires the + // range lease or permission to serve via follower reads. + if status, pErr = r.redirectOnOrAcquireLease(ctx); pErr != nil { + if nErr := r.canServeFollowerRead(ctx, ba, pErr); nErr != nil { + return nil, nErr + } + } + } + // Limit the transaction's maximum timestamp using observed timestamps. + r.limitTxnMaxTimestamp(ctx, ba, status) + // Acquire latches to prevent overlapping requests from executing until // this request completes. After latching, wait on any conflicting locks // to ensure that the request has full isolation during evaluation. This @@ -212,7 +233,7 @@ func (r *Replica) executeBatchWithConcurrencyRetries( } } - br, g, pErr = fn(r, ctx, ba, g) + br, g, pErr = fn(r, ctx, ba, status, g) switch t := pErr.GetDetail().(type) { case nil: // Success. @@ -269,7 +290,7 @@ func (r *Replica) handleWriteIntentError( return g, pErr } // g's latches will be dropped, but it retains its spot in lock wait-queues. - return r.concMgr.HandleWriterIntentError(ctx, g, t), nil + return r.concMgr.HandleWriterIntentError(ctx, g, t) } func (r *Replica) handleTransactionPushError( @@ -366,7 +387,7 @@ func (r *Replica) executeAdminBatch( } // Admin commands always require the range lease. - status, now, pErr := r.redirectOnOrAcquireLease(ctx) + status, pErr := r.redirectOnOrAcquireLease(ctx) if pErr != nil { return nil, pErr } @@ -376,7 +397,7 @@ func (r *Replica) executeAdminBatch( // NB: we pass nil for the spanlatch guard because we haven't acquired // latches yet. This is ok because each individual request that the admin // request sends will acquire latches. - if err := r.checkExecutionCanProceed(ba, nil /* g */, now, &status); err != nil { + if err := r.checkExecutionCanProceed(ba, nil /* g */, &status); err != nil { return nil, roachpb.NewError(err) } diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 0554e3418f97..70f0897b3ff2 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -923,7 +923,7 @@ func TestReplicaRangeBoundsChecking(t *testing.T) { key := roachpb.RKey("a") firstRepl := tc.store.LookupReplica(key) newRepl := splitTestRange(tc.store, key, key, t) - if _, _, pErr := newRepl.redirectOnOrAcquireLease(context.Background()); pErr != nil { + if _, pErr := newRepl.redirectOnOrAcquireLease(context.Background()); pErr != nil { t.Fatal(pErr) } @@ -1012,7 +1012,7 @@ func TestReplicaLease(t *testing.T) { } { - _, _, pErr := tc.repl.redirectOnOrAcquireLease(context.Background()) + _, pErr := tc.repl.redirectOnOrAcquireLease(context.Background()) if lErr, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError); !ok || lErr == nil { t.Fatalf("wanted NotLeaseHolderError, got %s", pErr) } @@ -1028,7 +1028,7 @@ func TestReplicaLease(t *testing.T) { filterErr.Store(roachpb.NewError(&roachpb.LeaseRejectedError{Message: "replica not found"})) { - _, _, err := tc.repl.redirectOnOrAcquireLease(context.Background()) + _, err := tc.repl.redirectOnOrAcquireLease(context.Background()) if _, ok := err.GetDetail().(*roachpb.NotLeaseHolderError); !ok { t.Fatalf("expected %T, got %s", &roachpb.NotLeaseHolderError{}, err) } @@ -1410,7 +1410,7 @@ func TestReplicaDrainLease(t *testing.T) { // Acquire initial lease. ctx := context.Background() - status, _, pErr := tc.repl.redirectOnOrAcquireLease(ctx) + status, pErr := tc.repl.redirectOnOrAcquireLease(ctx) if pErr != nil { t.Fatal(pErr) } @@ -1425,7 +1425,7 @@ func TestReplicaDrainLease(t *testing.T) { } tc.store.SetDraining(false) // Newly undrained, leases work again. - if _, _, pErr := tc.repl.redirectOnOrAcquireLease(ctx); pErr != nil { + if _, pErr := tc.repl.redirectOnOrAcquireLease(ctx); pErr != nil { t.Fatal(pErr) } } @@ -1558,7 +1558,7 @@ func TestReplicaNoGossipFromNonLeader(t *testing.T) { // Make sure the information for db1 is not gossiped. Since obtaining // a lease updates the gossiped information, we do that. - if _, _, pErr := tc.repl.redirectOnOrAcquireLease(context.Background()); pErr != nil { + if _, pErr := tc.repl.redirectOnOrAcquireLease(context.Background()); pErr != nil { t.Fatal(pErr) } // Fetch the raw gossip info. GetSystemConfig is based on callbacks at diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index e316e6aacf98..730a008adc2a 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -63,34 +63,22 @@ import ( // as this method makes the assumption that it operates on a shallow copy (see // call to applyTimestampCache). func (r *Replica) executeWriteBatch( - ctx context.Context, ba *roachpb.BatchRequest, g *concurrency.Guard, + ctx context.Context, ba *roachpb.BatchRequest, st storagepb.LeaseStatus, g *concurrency.Guard, ) (br *roachpb.BatchResponse, _ *concurrency.Guard, pErr *roachpb.Error) { startTime := timeutil.Now() - // Determine the lease under which to evaluate the write. - var now hlc.Timestamp - var lease roachpb.Lease - var status storagepb.LeaseStatus - // For lease commands, use the provided previous lease for verification. - if ba.IsSingleSkipLeaseCheckRequest() { - lease = ba.GetPrevLeaseForLeaseRequest() - now = r.Clock().Now() - } else { - // Other write commands require that this replica has the range - // lease. - if status, now, pErr = r.redirectOnOrAcquireLease(ctx); pErr != nil { - return nil, g, pErr - } - lease = status.Lease - } - r.limitTxnMaxTimestamp(ctx, ba, status) + // TODO(nvanbenschoten): unlike on the read-path (executeReadOnlyBatch), we + // don't synchronize with r.readOnlyCmdMu here. Is that ok? What if the + // replica is destroyed concurrently with a write? We won't be able to + // successfully propose as the lease will presumably have changed, but what + // if we hit an error during evaluation (e.g. a ConditionFailedError)? // Verify that the batch can be executed. // NB: we only need to check that the request is in the Range's key bounds // at proposal time, not at application time, because the spanlatch manager // will synchronize all requests (notably EndTxn with SplitTrigger) that may // cause this condition to change. - if err := r.checkExecutionCanProceed(ba, g, now, &status); err != nil { + if err := r.checkExecutionCanProceed(ba, g, &st); err != nil { return nil, g, roachpb.NewError(err) } @@ -126,10 +114,23 @@ func (r *Replica) executeWriteBatch( return nil, g, roachpb.NewError(errors.Wrap(err, "aborted before proposing")) } + // Check that the lease is still valid before proposing to avoid discovering + // this after replication and potentially missing out on the chance to retry + // if the request is using AsyncConsensus. This is best-effort, but can help + // in cases where the request waited arbitrarily long for locks acquired by + // other transactions to be released while sequencing in the concurrency + // manager. + if curLease, _ := r.GetLease(); curLease.Sequence > st.Lease.Sequence { + curLeaseCpy := curLease // avoid letting curLease escape + err := newNotLeaseHolderError(&curLeaseCpy, r.store.StoreID(), r.Desc()) + log.VEventf(ctx, 2, "%s before proposing: %s", err, ba.Summary()) + return nil, g, roachpb.NewError(err) + } + // If the command is proposed to Raft, ownership of and responsibility for // the concurrency guard will be assumed by Raft, so provide the guard to // evalAndPropose. - ch, abandon, maxLeaseIndex, g, pErr := r.evalAndPropose(ctx, ba, g, &lease) + ch, abandon, maxLeaseIndex, g, pErr := r.evalAndPropose(ctx, ba, g, &st.Lease) if pErr != nil { if maxLeaseIndex != 0 { log.Fatalf( @@ -144,7 +145,7 @@ func (r *Replica) executeWriteBatch( // cannot communicate under the lease's epoch. Instead the code calls EmitMLAI explicitly // as a side effect of stepping up as leaseholder. if maxLeaseIndex != 0 { - untrack(ctx, ctpb.Epoch(lease.Epoch), r.RangeID, ctpb.LAI(maxLeaseIndex)) + untrack(ctx, ctpb.Epoch(st.Lease.Epoch), r.RangeID, ctpb.LAI(maxLeaseIndex)) } // If the command was accepted by raft, wait for the range to apply it. diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index c0823c83d0b4..1cec49de8530 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -311,7 +311,7 @@ func (rq *replicateQueue) processOneChange( if _, err := repl.IsDestroyed(); err != nil { return false, err } - if _, _, pErr := repl.redirectOnOrAcquireLease(ctx); pErr != nil { + if _, pErr := repl.redirectOnOrAcquireLease(ctx); pErr != nil { return false, pErr.GoError() } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 148311b96144..a8e41f1f4a6f 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -1587,7 +1587,7 @@ func (s *Store) startLeaseRenewer(ctx context.Context) { s.renewableLeases.Range(func(k int64, v unsafe.Pointer) bool { repl := (*Replica)(v) annotatedCtx := repl.AnnotateCtx(ctx) - if _, _, pErr := repl.redirectOnOrAcquireLease(annotatedCtx); pErr != nil { + if _, pErr := repl.redirectOnOrAcquireLease(annotatedCtx); pErr != nil { if _, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError); !ok { log.Warningf(annotatedCtx, "failed to proactively renew lease: %s", pErr) } diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 36fc6ee3ca8b..a8a5e81b4ce1 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -704,8 +704,8 @@ func TestStoreRemoveReplicaDestroy(t *testing.T) { t.Fatal("replica was not marked as destroyed") } - now := repl1.Clock().Now() - if err = repl1.checkExecutionCanProceed(&roachpb.BatchRequest{}, nil /* g */, now, nil /* st */); err != expErr { + st := &storagepb.LeaseStatus{Timestamp: repl1.Clock().Now()} + if err = repl1.checkExecutionCanProceed(&roachpb.BatchRequest{}, nil /* g */, st); err != expErr { t.Fatalf("expected error %s, but got %v", expErr, err) } } @@ -1499,7 +1499,7 @@ func TestStoreResolveWriteIntent(t *testing.T) { defer leaktest.AfterTest(t)() manual := hlc.NewManualClock(123) - cfg := TestStoreConfig(hlc.NewClock(manual.UnixNano, time.Nanosecond)) + cfg := TestStoreConfig(hlc.NewClock(manual.UnixNano, 1000*time.Nanosecond)) cfg.TestingKnobs.EvalKnobs.TestingEvalFilter = func(filterArgs storagebase.FilterArgs) *roachpb.Error { pr, ok := filterArgs.Req.(*roachpb.PushTxnRequest)