diff --git a/pkg/kv/kvserver/kvserverpb/BUILD.bazel b/pkg/kv/kvserver/kvserverpb/BUILD.bazel index b2871d40b64a..b767e4bfd4cb 100644 --- a/pkg/kv/kvserver/kvserverpb/BUILD.bazel +++ b/pkg/kv/kvserver/kvserverpb/BUILD.bazel @@ -12,7 +12,10 @@ go_library( embed = [":kvserverpb_go_proto"], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb", visibility = ["//visibility:public"], - deps = ["//pkg/roachpb"], + deps = [ + "//pkg/roachpb", + "//pkg/util/hlc", + ], ) proto_library( diff --git a/pkg/kv/kvserver/kvserverpb/lease_status.go b/pkg/kv/kvserver/kvserverpb/lease_status.go index 1bbac36973ec..5734d5c4e386 100644 --- a/pkg/kv/kvserver/kvserverpb/lease_status.go +++ b/pkg/kv/kvserver/kvserverpb/lease_status.go @@ -10,7 +10,10 @@ package kvserverpb -import roachpb "github.com/cockroachdb/cockroach/pkg/roachpb" +import ( + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" +) // IsValid returns whether the lease was valid at the time that the // lease status was computed. @@ -22,3 +25,15 @@ func (st LeaseStatus) IsValid() bool { func (st LeaseStatus) OwnedBy(storeID roachpb.StoreID) bool { return st.Lease.OwnedBy(storeID) } + +// Expiration returns the expiration of the lease. +func (st LeaseStatus) Expiration() hlc.Timestamp { + switch st.Lease.Type() { + case roachpb.LeaseExpiration: + return st.Lease.GetExpiration() + case roachpb.LeaseEpoch: + return st.Liveness.Expiration.ToTimestamp() + default: + panic("unexpected") + } +} diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index a0c2f4a46035..09d5d86bb504 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -71,7 +71,7 @@ func (r *Replica) evalAndPropose( ctx context.Context, ba *roachpb.BatchRequest, g *concurrency.Guard, - lease *roachpb.Lease, + st kvserverpb.LeaseStatus, lul hlc.Timestamp, ) (chan proposalResult, func(), int64, *roachpb.Error) { idKey := makeIDKey() @@ -81,13 +81,13 @@ func (r *Replica) evalAndPropose( // If the request hit a server-side concurrency retry error, immediately // proagate the error. Don't assume ownership of the concurrency guard. if isConcurrencyRetryError(pErr) { - pErr = maybeAttachLease(pErr, lease) + pErr = maybeAttachLease(pErr, &st.Lease) return nil, nil, 0, pErr } // Attach the endCmds to the proposal and assume responsibility for // releasing the concurrency guard if the proposal makes it to Raft. - proposal.ec = endCmds{repl: r, g: g} + proposal.ec = endCmds{repl: r, g: g, st: st} // Pull out proposal channel to return. proposal.doneCh may be set to // nil if it is signaled in this function. @@ -143,7 +143,7 @@ func (r *Replica) evalAndPropose( } // Attach information about the proposer to the command. - proposal.command.ProposerLeaseSequence = lease.Sequence + proposal.command.ProposerLeaseSequence = st.Lease.Sequence // Once a command is written to the raft log, it must be loaded into memory // and replayed on all replicas. If a command is too big, stop it here. If diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go index 533c35377136..5f614a4db004 100644 --- a/pkg/kv/kvserver/replica_read.go +++ b/pkg/kv/kvserver/replica_read.go @@ -92,7 +92,7 @@ func (r *Replica) executeReadOnlyBatch( } // Otherwise, update the timestamp cache and release the concurrency guard. - ec, g := endCmds{repl: r, g: g}, nil + ec, g := endCmds{repl: r, g: g, st: st}, nil ec.done(ctx, ba, br, pErr) // Semi-synchronously process any intents that need resolving here in diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 515909ae4b74..2df84c4e9a19 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -702,6 +702,7 @@ func (r *Replica) collectSpans( type endCmds struct { repl *Replica g *concurrency.Guard + st kvserverpb.LeaseStatus } // move moves the endCmds into the return value, clearing and making a call to @@ -726,10 +727,13 @@ func (ec *endCmds) done( } defer ec.move() // clear - // Update the timestamp cache if the request is not being re-evaluated. Each - // request is considered in turn; only those marked as affecting the cache - // are processed. - ec.repl.updateTimestampCache(ctx, ba, br, pErr) + // Update the timestamp cache. Each request within the batch is considered + // in turn; only those marked as affecting the cache are processed. However, + // only do so if the request is consistent and was operating on the + // leaseholder under a valid range lease. + if ba.ReadConsistency == roachpb.CONSISTENT && ec.st.State == kvserverpb.LeaseState_VALID { + ec.repl.updateTimestampCache(ctx, &ec.st, ba, br, pErr) + } // Release the latches acquired by the request and exit lock wait-queues. // Must be done AFTER the timestamp cache is updated. ec.g is only set when diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 1b058c1a47e2..d3c202d1aa3e 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -612,11 +612,12 @@ func TestReplicaContains(t *testing.T) { } func sendLeaseRequest(r *Replica, l *roachpb.Lease) error { + ctx := context.Background() ba := roachpb.BatchRequest{} ba.Timestamp = r.store.Clock().Now() ba.Add(&roachpb.RequestLeaseRequest{Lease: *l}) - exLease, _ := r.GetLease() - ch, _, _, pErr := r.evalAndPropose(context.Background(), &ba, allSpansGuard(), &exLease, hlc.Timestamp{}) + st := r.CurrentLeaseStatus(ctx) + ch, _, _, pErr := r.evalAndPropose(ctx, &ba, allSpansGuard(), st, hlc.Timestamp{}) if pErr == nil { // Next if the command was committed, wait for the range to apply it. // TODO(bdarnell): refactor this to a more conventional error-handling pattern. @@ -1393,6 +1394,7 @@ func TestReplicaLeaseRejectUnknownRaftNodeID(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + ctx := context.Background() stopper := stop.NewStopper() defer stopper.Stop(context.Background()) @@ -1412,11 +1414,11 @@ func TestReplicaLeaseRejectUnknownRaftNodeID(t *testing.T) { StoreID: 2, }, } - exLease, _ := tc.repl.GetLease() + st := tc.repl.CurrentLeaseStatus(ctx) ba := roachpb.BatchRequest{} ba.Timestamp = tc.repl.store.Clock().Now() ba.Add(&roachpb.RequestLeaseRequest{Lease: *lease}) - ch, _, _, pErr := tc.repl.evalAndPropose(context.Background(), &ba, allSpansGuard(), &exLease, hlc.Timestamp{}) + ch, _, _, pErr := tc.repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, hlc.Timestamp{}) if pErr == nil { // Next if the command was committed, wait for the range to apply it. // TODO(bdarnell): refactor to a more conventional error-handling pattern. @@ -7865,7 +7867,6 @@ func TestReplicaCancelRaftCommandProgress(t *testing.T) { repl := tc.repl tc.repl.mu.Lock() - lease := *repl.mu.state.Lease abandoned := make(map[int64]struct{}) // protected by repl.mu tc.repl.mu.proposalBuf.testing.submitProposalFilter = func(p *ProposalData) (drop bool, _ error) { if _, ok := abandoned[int64(p.command.MaxLeaseIndex)]; ok { @@ -7886,7 +7887,8 @@ func TestReplicaCancelRaftCommandProgress(t *testing.T) { Key: roachpb.Key(fmt.Sprintf("k%d", i)), }, }) - ch, _, idx, err := repl.evalAndPropose(ctx, &ba, allSpansGuard(), &lease, hlc.Timestamp{}) + st := repl.CurrentLeaseStatus(ctx) + ch, _, idx, err := repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, hlc.Timestamp{}) if err != nil { t.Fatal(err) } @@ -7942,7 +7944,6 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) { } return false, nil } - lease := *tc.repl.mu.state.Lease tc.repl.mu.Unlock() const num = 10 @@ -7956,7 +7957,8 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) { Key: roachpb.Key(fmt.Sprintf("k%d", i)), }, }) - ch, _, idx, err := tc.repl.evalAndPropose(ctx, &ba, allSpansGuard(), &lease, hlc.Timestamp{}) + st := tc.repl.CurrentLeaseStatus(ctx) + ch, _, idx, err := tc.repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, hlc.Timestamp{}) if err != nil { t.Fatal(err) } @@ -9287,7 +9289,8 @@ func TestErrorInRaftApplicationClearsIntents(t *testing.T) { } exLease, _ := repl.GetLease() - ch, _, _, pErr := repl.evalAndPropose(context.Background(), &ba, allSpansGuard(), &exLease, hlc.Timestamp{}) + st := kvserverpb.LeaseStatus{Lease: exLease, State: kvserverpb.LeaseState_VALID} + ch, _, _, pErr := repl.evalAndPropose(context.Background(), &ba, allSpansGuard(), st, hlc.Timestamp{}) if pErr != nil { t.Fatal(pErr) } @@ -9306,6 +9309,7 @@ func TestErrorInRaftApplicationClearsIntents(t *testing.T) { func TestProposeWithAsyncConsensus(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + ctx := context.Background() tc := testContext{} tsc := TestStoreConfig(nil) @@ -9320,7 +9324,7 @@ func TestProposeWithAsyncConsensus(t *testing.T) { } stopper := stop.NewStopper() - defer stopper.Stop(context.Background()) + defer stopper.Stop(ctx) tc.StartWithStoreConfig(t, stopper, tsc) repl := tc.repl @@ -9332,8 +9336,8 @@ func TestProposeWithAsyncConsensus(t *testing.T) { ba.AsyncConsensus = true atomic.StoreInt32(&filterActive, 1) - exLease, _ := repl.GetLease() - ch, _, _, pErr := repl.evalAndPropose(context.Background(), &ba, allSpansGuard(), &exLease, hlc.Timestamp{}) + st := tc.repl.CurrentLeaseStatus(ctx) + ch, _, _, pErr := repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, hlc.Timestamp{}) if pErr != nil { t.Fatal(pErr) } @@ -9396,8 +9400,8 @@ func TestApplyPaginatedCommittedEntries(t *testing.T) { ba.Timestamp = tc.Clock().Now() atomic.StoreInt32(&filterActive, 1) - exLease, _ := repl.GetLease() - _, _, _, pErr := repl.evalAndPropose(ctx, &ba, allSpansGuard(), &exLease, hlc.Timestamp{}) + st := repl.CurrentLeaseStatus(ctx) + _, _, _, pErr := repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, hlc.Timestamp{}) if pErr != nil { t.Fatal(pErr) } @@ -9415,7 +9419,7 @@ func TestApplyPaginatedCommittedEntries(t *testing.T) { ba2.Timestamp = tc.Clock().Now() var pErr *roachpb.Error - ch, _, _, pErr = repl.evalAndPropose(ctx, &ba2, allSpansGuard(), &exLease, hlc.Timestamp{}) + ch, _, _, pErr = repl.evalAndPropose(ctx, &ba2, allSpansGuard(), st, hlc.Timestamp{}) if pErr != nil { t.Fatal(pErr) } @@ -12507,7 +12511,7 @@ func TestProposalNotAcknowledgedOrReproposedAfterApplication(t *testing.T) { } tc.StartWithStoreConfig(t, stopper, cfg) key := roachpb.Key("a") - lease, _ := tc.repl.GetLease() + st := tc.repl.CurrentLeaseStatus(ctx) txn := newTransaction("test", key, roachpb.NormalUserPriority, tc.Clock()) txnID = txn.ID ba := roachpb.BatchRequest{ @@ -12529,7 +12533,7 @@ func TestProposalNotAcknowledgedOrReproposedAfterApplication(t *testing.T) { tc.repl.RaftLock() sp := cfg.AmbientCtx.Tracer.StartSpan("replica send", tracing.WithForceRealSpan()) tracedCtx := tracing.ContextWithSpan(ctx, sp) - ch, _, _, pErr := tc.repl.evalAndPropose(tracedCtx, &ba, allSpansGuard(), &lease, hlc.Timestamp{}) + ch, _, _, pErr := tc.repl.evalAndPropose(tracedCtx, &ba, allSpansGuard(), st, hlc.Timestamp{}) if pErr != nil { t.Fatal(pErr) } @@ -12599,7 +12603,7 @@ func TestLaterReproposalsDoNotReuseContext(t *testing.T) { cfg.AmbientCtx.Tracer = tracer tc.StartWithStoreConfig(t, stopper, cfg) key := roachpb.Key("a") - lease, _ := tc.repl.GetLease() + st := tc.repl.CurrentLeaseStatus(ctx) txn := newTransaction("test", key, roachpb.NormalUserPriority, tc.Clock()) ba := roachpb.BatchRequest{ Header: roachpb.Header{ @@ -12621,7 +12625,7 @@ func TestLaterReproposalsDoNotReuseContext(t *testing.T) { // Go out of our way to enable recording so that expensive logging is enabled // for this context. sp.SetVerbose(true) - ch, _, _, pErr := tc.repl.evalAndPropose(tracedCtx, &ba, allSpansGuard(), &lease, hlc.Timestamp{}) + ch, _, _, pErr := tc.repl.evalAndPropose(tracedCtx, &ba, allSpansGuard(), st, hlc.Timestamp{}) if pErr != nil { t.Fatal(pErr) } diff --git a/pkg/kv/kvserver/replica_tscache.go b/pkg/kv/kvserver/replica_tscache.go index 2b3827ef4790..56b2dcdc876d 100644 --- a/pkg/kv/kvserver/replica_tscache.go +++ b/pkg/kv/kvserver/replica_tscache.go @@ -12,14 +12,13 @@ package kvserver import ( "context" - "fmt" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/tscache" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/telemetry" - "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/uuid" @@ -38,19 +37,54 @@ func setTimestampCacheLowWaterMark( } } +// addToTSCacheChecked adds the specified timestamp to the timestamp cache +// covering the range of keys from start to end. Before doing so, the function +// performs a few assertions to check for proper use of the timestamp cache. +func (r *Replica) addToTSCacheChecked( + ctx context.Context, + st *kvserverpb.LeaseStatus, + ba *roachpb.BatchRequest, + br *roachpb.BatchResponse, + pErr *roachpb.Error, + start, end roachpb.Key, + ts hlc.Timestamp, + txnID uuid.UUID, +) { + // All updates to the timestamp cache must be performed below the expiration + // time of the leaseholder. This ensures correctness if the lease expires + // and is acquired by a new replica that begins serving writes immediately + // to the same keys at the next lease's start time. + if exp := st.Expiration(); exp.LessEq(ts) { + log.Fatalf(ctx, "Unsafe timestamp cache update! Cannot add timestamp %s to timestamp "+ + "cache after evaluating %v (resp=%v; err=%v) with lease expiration %v. The timestamp "+ + "cache update could be lost of a non-cooperative lease change.", ts, ba, br, pErr, exp) + } + // All updates the to timestamp cache with non-synthetic timestamps must be + // performed at or below the current time. This is no longer strictly + // required for correctness as lease transfers now read the timestamp cache + // directly instead of using the local HLC clock as a proxy for its high + // water-mark, but it serves as a good proxy for proper handling of HLC + // clock updates and, by extension, observed timestamps. + if !ts.Synthetic && st.Now.ToTimestamp().Less(ts) { + log.Fatalf(ctx, "Unsafe timestamp cache update! Cannot add timestamp %s to timestamp "+ + "cache after evaluating %v (resp=%v; err=%v) with local hlc clock at timestamp %s. "+ + "Non-synthetic timestamps should always lag the local hlc clock.", ts, ba, br, pErr, st.Now) + } + r.store.tsCache.Add(start, end, ts, txnID) +} + // updateTimestampCache updates the timestamp cache in order to set a low water // mark for the timestamp at which mutations to keys overlapping the provided // request can write, such that they don't re-write history. func (r *Replica) updateTimestampCache( - ctx context.Context, ba *roachpb.BatchRequest, br *roachpb.BatchResponse, pErr *roachpb.Error, + ctx context.Context, + st *kvserverpb.LeaseStatus, + ba *roachpb.BatchRequest, + br *roachpb.BatchResponse, + pErr *roachpb.Error, ) { - if ba.ReadConsistency != roachpb.CONSISTENT { - // Inconsistent reads are excluded from the timestamp cache. - return - } - addToTSCache := r.store.tsCache.Add - if util.RaceEnabled { - addToTSCache = checkedTSCacheUpdate(r.store.Clock().Now(), r.store.tsCache, ba, br, pErr) + addToTSCache := func(start, end roachpb.Key, ts hlc.Timestamp, txnID uuid.UUID) { + r.addToTSCacheChecked(ctx, st, ba, br, pErr, start, end, ts, txnID) } // Update the timestamp cache using the timestamp at which the batch // was executed. Note this may have moved forward from ba.Timestamp, @@ -213,25 +247,6 @@ func (r *Replica) updateTimestampCache( } } -// checkedTSCacheUpdate wraps tscache.Cache and asserts that any update to the -// cache is at or below the specified time. -func checkedTSCacheUpdate( - now hlc.Timestamp, - tc tscache.Cache, - ba *roachpb.BatchRequest, - br *roachpb.BatchResponse, - pErr *roachpb.Error, -) func(roachpb.Key, roachpb.Key, hlc.Timestamp, uuid.UUID) { - return func(start, end roachpb.Key, ts hlc.Timestamp, txnID uuid.UUID) { - if now.Less(ts) && !ts.Synthetic { - panic(fmt.Sprintf("Unsafe timestamp cache update! Cannot add timestamp %s to timestamp "+ - "cache after evaluating %v (resp=%v; err=%v) with local hlc clock at timestamp %s. "+ - "The timestamp cache update could be lost on a lease transfer.", ts, ba, br, pErr, now)) - } - tc.Add(start, end, ts, txnID) - } -} - // txnsPushedDueToClosedTimestamp is a telemetry counter for the number of // batch requests which have been pushed due to the closed timestamp. var batchesPushedDueToClosedTimestamp telemetry.Counter diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index 9609a2a74fb4..b02db0ae3af0 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -114,13 +114,13 @@ func (r *Replica) executeWriteBatch( // Checking the context just before proposing can help avoid ambiguous errors. if err := ctx.Err(); err != nil { log.VEventf(ctx, 2, "%s before proposing: %s", err, ba.Summary()) - return nil, g, roachpb.NewError(errors.Wrap(err, "aborted before proposing")) + return nil, g, roachpb.NewError(errors.Wrapf(err, "aborted before proposing")) } // If the command is proposed to Raft, ownership of and responsibility for // the concurrency guard will be assumed by Raft, so provide the guard to // evalAndPropose. - ch, abandon, maxLeaseIndex, pErr := r.evalAndPropose(ctx, ba, g, &st.Lease, localUncertaintyLimit) + ch, abandon, maxLeaseIndex, pErr := r.evalAndPropose(ctx, ba, g, st, localUncertaintyLimit) if pErr != nil { if maxLeaseIndex != 0 { log.Fatalf(