Skip to content

Commit

Permalink
Merge #62501
Browse files Browse the repository at this point in the history
62501: kvserver: include sidetransport data in read summary  r=andreimatei a=andreimatei

Read summaries were only using closed timestamp data communicated over
the Raft transport. This was incorrect, as the replica generating the
summary, or other follower replicas, might have served higher reads
based on the side-transport closed timestamp.

Release note: None

Co-authored-by: Andrei Matei <[email protected]>
  • Loading branch information
craig[bot] and andreimatei committed Mar 30, 2021
2 parents ddf6538 + 1165696 commit a9cff79
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 16 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_lease_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TransferLease(
// incoming leaseholder. This is used to instruct the new leaseholder on how
// to update its timestamp cache to ensure that no future writes are allowed
// to invalidate prior reads.
priorReadSum, _ := cArgs.EvalCtx.GetCurrentReadSummary()
priorReadSum, _ := cArgs.EvalCtx.GetCurrentReadSummary(ctx)
// For now, forward this summary to the proposed lease's start time. This
// may appear to undermine the benefit of the read summary, but it doesn't
// entirely. Until we ship higher-resolution read summaries, the read
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_subsume.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func Subsume(
// leaseholder. This is used to instruct the LHS on how to update its
// timestamp cache to ensure that no future writes are allowed to invalidate
// prior reads performed to this point on the RHS range.
priorReadSum, closedTS := cArgs.EvalCtx.GetCurrentReadSummary()
priorReadSum, closedTS := cArgs.EvalCtx.GetCurrentReadSummary(ctx)
// For now, forward this summary to the freeze time. This may appear to
// undermine the benefit of the read summary, but it doesn't entirely. Until
// we ship higher-resolution read summaries, the read summary doesn't
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ type EvalContext interface {
// have performed some action (either calling RevokeLease or WatchForMerge)
// to freeze further progression of the closed timestamp before calling this
// method.
GetCurrentReadSummary() (rspb.ReadSummary, hlc.Timestamp)
GetCurrentReadSummary(ctx context.Context) (rspb.ReadSummary, hlc.Timestamp)

GetExternalStorage(ctx context.Context, dest roachpb.ExternalStorage) (cloud.ExternalStorage, error)
GetExternalStorageFromURI(ctx context.Context, uri string, user security.SQLUsername) (cloud.ExternalStorage,
Expand Down Expand Up @@ -239,7 +239,9 @@ func (m *mockEvalCtxImpl) GetLease() (roachpb.Lease, roachpb.Lease) {
func (m *mockEvalCtxImpl) GetRangeInfo(ctx context.Context) roachpb.RangeInfo {
return roachpb.RangeInfo{Desc: *m.Desc(), Lease: m.Lease}
}
func (m *mockEvalCtxImpl) GetCurrentReadSummary() (rspb.ReadSummary, hlc.Timestamp) {
func (m *mockEvalCtxImpl) GetCurrentReadSummary(
ctx context.Context,
) (rspb.ReadSummary, hlc.Timestamp) {
return m.CurrentReadSummary, hlc.Timestamp{}
}
func (m *mockEvalCtxImpl) GetExternalStorage(
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/replica_eval_context_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,9 @@ func (rec SpanSetReplicaEvalContext) GetRangeInfo(ctx context.Context) roachpb.R
}

// GetCurrentReadSummary is part of the EvalContext interface.
func (rec *SpanSetReplicaEvalContext) GetCurrentReadSummary() (rspb.ReadSummary, hlc.Timestamp) {
func (rec *SpanSetReplicaEvalContext) GetCurrentReadSummary(
ctx context.Context,
) (rspb.ReadSummary, hlc.Timestamp) {
// To capture a read summary over the range, all keys must be latched for
// writing to prevent any concurrent reads or writes.
desc := rec.i.Desc()
Expand All @@ -224,7 +226,7 @@ func (rec *SpanSetReplicaEvalContext) GetCurrentReadSummary() (rspb.ReadSummary,
Key: desc.StartKey.AsRawKey(),
EndKey: desc.EndKey.AsRawKey(),
})
return rec.i.GetCurrentReadSummary()
return rec.i.GetCurrentReadSummary(ctx)
}

// GetLimiters returns the per-store limiters.
Expand Down
32 changes: 24 additions & 8 deletions pkg/kv/kvserver/replica_follower_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,6 @@ func (r *Replica) maxClosedRLocked(
var update replicaUpdate
// In some tests the lease can be empty, or the ClosedTimestampReceiver might
// not be set.
// TODO(andrei): Remove the ClosedTimestampReceiver == nil protection once the
// multiTestContext goes away.
if !replicationBehind && !lease.Empty() && r.store.cfg.ClosedTimestampReceiver != nil {
otherSideTransportClosed, otherSideTransportLAI :=
r.store.cfg.ClosedTimestampReceiver.GetClosedTimestamp(ctx, r.RangeID, lease.Replica.NodeID)
Expand Down Expand Up @@ -246,13 +244,31 @@ func (r *Replica) maxClosedRLocked(
// timestamps is synchronized with lease transfers and subsumption requests.
// Callers who need that property should be prepared to get an empty result
// back, meaning that the closed timestamp cannot be known.
func (r *Replica) ClosedTimestampV2() hlc.Timestamp {
//
// TODO(andrei): Remove this in favor of maxClosed() once the old closed
// timestamp mechanism is deleted. At that point, the two should be equivalent.
func (r *Replica) ClosedTimestampV2(ctx context.Context) hlc.Timestamp {
r.mu.RLock()
defer r.mu.RUnlock()
return r.closedTimestampV2RLocked()
}

func (r *Replica) closedTimestampV2RLocked() hlc.Timestamp {
// TODO(andrei,nvanbenschoten): include sideTransportClosedTimestamp.
return r.mu.state.RaftClosedTimestamp
appliedLAI := ctpb.LAI(r.mu.state.LeaseAppliedIndex)

closed := r.mu.state.RaftClosedTimestamp
sideTransportClosedMaybe, minLAI := r.getSideTransportClosedTimestampRLocked()
replicationBehind := appliedLAI < minLAI
if !replicationBehind {
closed.Forward(sideTransportClosedMaybe)
}

// Tests might not be configured with a receiver.
if receiver := r.store.cfg.ClosedTimestampReceiver; receiver != nil {
otherSideTransportClosed, otherSideTransportLAI :=
r.store.cfg.ClosedTimestampReceiver.GetClosedTimestamp(ctx, r.RangeID, r.mu.state.Lease.Replica.NodeID)
replicationBehind = appliedLAI < otherSideTransportLAI
if !replicationBehind {
closed.Forward(otherSideTransportClosed)
}
}

return closed
}
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_tscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,12 +548,12 @@ func transactionPushMarker(key roachpb.Key, txnID uuid.UUID) roachpb.Key {

// GetCurrentReadSummary returns a new ReadSummary reflecting all reads served
// by the range to this point.
func (r *Replica) GetCurrentReadSummary() (rspb.ReadSummary, hlc.Timestamp) {
func (r *Replica) GetCurrentReadSummary(ctx context.Context) (rspb.ReadSummary, hlc.Timestamp) {
sum := collectReadSummaryFromTimestampCache(r.store.tsCache, r.Desc())
// Forward the read summary by the range's closed timestamp, because any
// replica could have served reads below this time. We also return the
// closed timestamp separately, in case callers want it split out.
closedTS := r.ClosedTimestampV2()
closedTS := r.ClosedTimestampV2(ctx)
sum.Merge(rspb.FromTimestamp(closedTS))
return sum, closedTS
}
Expand Down

0 comments on commit a9cff79

Please sign in to comment.