diff --git a/pkg/kv/kvserver/batcheval/cmd_lease_transfer.go b/pkg/kv/kvserver/batcheval/cmd_lease_transfer.go index bb59285f73c0..af08b3bab9c2 100644 --- a/pkg/kv/kvserver/batcheval/cmd_lease_transfer.go +++ b/pkg/kv/kvserver/batcheval/cmd_lease_transfer.go @@ -108,7 +108,7 @@ func TransferLease( // incoming leaseholder. This is used to instruct the new leaseholder on how // to update its timestamp cache to ensure that no future writes are allowed // to invalidate prior reads. - priorReadSum, _ := cArgs.EvalCtx.GetCurrentReadSummary() + priorReadSum, _ := cArgs.EvalCtx.GetCurrentReadSummary(ctx) // For now, forward this summary to the proposed lease's start time. This // may appear to undermine the benefit of the read summary, but it doesn't // entirely. Until we ship higher-resolution read summaries, the read diff --git a/pkg/kv/kvserver/batcheval/cmd_subsume.go b/pkg/kv/kvserver/batcheval/cmd_subsume.go index 18cb3c5a5dd5..7f7673b3d082 100644 --- a/pkg/kv/kvserver/batcheval/cmd_subsume.go +++ b/pkg/kv/kvserver/batcheval/cmd_subsume.go @@ -175,7 +175,7 @@ func Subsume( // leaseholder. This is used to instruct the LHS on how to update its // timestamp cache to ensure that no future writes are allowed to invalidate // prior reads performed to this point on the RHS range. - priorReadSum, closedTS := cArgs.EvalCtx.GetCurrentReadSummary() + priorReadSum, closedTS := cArgs.EvalCtx.GetCurrentReadSummary(ctx) // For now, forward this summary to the freeze time. This may appear to // undermine the benefit of the read summary, but it doesn't entirely. Until // we ship higher-resolution read summaries, the read summary doesn't diff --git a/pkg/kv/kvserver/batcheval/eval_context.go b/pkg/kv/kvserver/batcheval/eval_context.go index 771a25cf9572..94ab13ba7a25 100644 --- a/pkg/kv/kvserver/batcheval/eval_context.go +++ b/pkg/kv/kvserver/batcheval/eval_context.go @@ -110,7 +110,7 @@ type EvalContext interface { // have performed some action (either calling RevokeLease or WatchForMerge) // to freeze further progression of the closed timestamp before calling this // method. - GetCurrentReadSummary() (rspb.ReadSummary, hlc.Timestamp) + GetCurrentReadSummary(ctx context.Context) (rspb.ReadSummary, hlc.Timestamp) GetExternalStorage(ctx context.Context, dest roachpb.ExternalStorage) (cloud.ExternalStorage, error) GetExternalStorageFromURI(ctx context.Context, uri string, user security.SQLUsername) (cloud.ExternalStorage, @@ -239,7 +239,9 @@ func (m *mockEvalCtxImpl) GetLease() (roachpb.Lease, roachpb.Lease) { func (m *mockEvalCtxImpl) GetRangeInfo(ctx context.Context) roachpb.RangeInfo { return roachpb.RangeInfo{Desc: *m.Desc(), Lease: m.Lease} } -func (m *mockEvalCtxImpl) GetCurrentReadSummary() (rspb.ReadSummary, hlc.Timestamp) { +func (m *mockEvalCtxImpl) GetCurrentReadSummary( + ctx context.Context, +) (rspb.ReadSummary, hlc.Timestamp) { return m.CurrentReadSummary, hlc.Timestamp{} } func (m *mockEvalCtxImpl) GetExternalStorage( diff --git a/pkg/kv/kvserver/replica_eval_context_span.go b/pkg/kv/kvserver/replica_eval_context_span.go index 93e7e97e8b9c..83dc341c0a5a 100644 --- a/pkg/kv/kvserver/replica_eval_context_span.go +++ b/pkg/kv/kvserver/replica_eval_context_span.go @@ -212,7 +212,9 @@ func (rec SpanSetReplicaEvalContext) GetRangeInfo(ctx context.Context) roachpb.R } // GetCurrentReadSummary is part of the EvalContext interface. -func (rec *SpanSetReplicaEvalContext) GetCurrentReadSummary() (rspb.ReadSummary, hlc.Timestamp) { +func (rec *SpanSetReplicaEvalContext) GetCurrentReadSummary( + ctx context.Context, +) (rspb.ReadSummary, hlc.Timestamp) { // To capture a read summary over the range, all keys must be latched for // writing to prevent any concurrent reads or writes. desc := rec.i.Desc() @@ -224,7 +226,7 @@ func (rec *SpanSetReplicaEvalContext) GetCurrentReadSummary() (rspb.ReadSummary, Key: desc.StartKey.AsRawKey(), EndKey: desc.EndKey.AsRawKey(), }) - return rec.i.GetCurrentReadSummary() + return rec.i.GetCurrentReadSummary(ctx) } // GetLimiters returns the per-store limiters. diff --git a/pkg/kv/kvserver/replica_follower_read.go b/pkg/kv/kvserver/replica_follower_read.go index 965c97ff7245..cf836f7bc1ec 100644 --- a/pkg/kv/kvserver/replica_follower_read.go +++ b/pkg/kv/kvserver/replica_follower_read.go @@ -217,8 +217,6 @@ func (r *Replica) maxClosedRLocked( var update replicaUpdate // In some tests the lease can be empty, or the ClosedTimestampReceiver might // not be set. - // TODO(andrei): Remove the ClosedTimestampReceiver == nil protection once the - // multiTestContext goes away. if !replicationBehind && !lease.Empty() && r.store.cfg.ClosedTimestampReceiver != nil { otherSideTransportClosed, otherSideTransportLAI := r.store.cfg.ClosedTimestampReceiver.GetClosedTimestamp(ctx, r.RangeID, lease.Replica.NodeID) @@ -246,13 +244,31 @@ func (r *Replica) maxClosedRLocked( // timestamps is synchronized with lease transfers and subsumption requests. // Callers who need that property should be prepared to get an empty result // back, meaning that the closed timestamp cannot be known. -func (r *Replica) ClosedTimestampV2() hlc.Timestamp { +// +// TODO(andrei): Remove this in favor of maxClosed() once the old closed +// timestamp mechanism is deleted. At that point, the two should be equivalent. +func (r *Replica) ClosedTimestampV2(ctx context.Context) hlc.Timestamp { r.mu.RLock() defer r.mu.RUnlock() - return r.closedTimestampV2RLocked() -} -func (r *Replica) closedTimestampV2RLocked() hlc.Timestamp { - // TODO(andrei,nvanbenschoten): include sideTransportClosedTimestamp. - return r.mu.state.RaftClosedTimestamp + appliedLAI := ctpb.LAI(r.mu.state.LeaseAppliedIndex) + + closed := r.mu.state.RaftClosedTimestamp + sideTransportClosedMaybe, minLAI := r.getSideTransportClosedTimestampRLocked() + replicationBehind := appliedLAI < minLAI + if !replicationBehind { + closed.Forward(sideTransportClosedMaybe) + } + + // Tests might not be configured with a receiver. + if receiver := r.store.cfg.ClosedTimestampReceiver; receiver != nil { + otherSideTransportClosed, otherSideTransportLAI := + r.store.cfg.ClosedTimestampReceiver.GetClosedTimestamp(ctx, r.RangeID, r.mu.state.Lease.Replica.NodeID) + replicationBehind = appliedLAI < otherSideTransportLAI + if !replicationBehind { + closed.Forward(otherSideTransportClosed) + } + } + + return closed } diff --git a/pkg/kv/kvserver/replica_tscache.go b/pkg/kv/kvserver/replica_tscache.go index aaf37efc9fd8..cb73034fe814 100644 --- a/pkg/kv/kvserver/replica_tscache.go +++ b/pkg/kv/kvserver/replica_tscache.go @@ -548,12 +548,12 @@ func transactionPushMarker(key roachpb.Key, txnID uuid.UUID) roachpb.Key { // GetCurrentReadSummary returns a new ReadSummary reflecting all reads served // by the range to this point. -func (r *Replica) GetCurrentReadSummary() (rspb.ReadSummary, hlc.Timestamp) { +func (r *Replica) GetCurrentReadSummary(ctx context.Context) (rspb.ReadSummary, hlc.Timestamp) { sum := collectReadSummaryFromTimestampCache(r.store.tsCache, r.Desc()) // Forward the read summary by the range's closed timestamp, because any // replica could have served reads below this time. We also return the // closed timestamp separately, in case callers want it split out. - closedTS := r.ClosedTimestampV2() + closedTS := r.ClosedTimestampV2(ctx) sum.Merge(rspb.FromTimestamp(closedTS)) return sum, closedTS }