Skip to content

Commit

Permalink
kvserver: reset tscache after merge to closed ts
Browse files Browse the repository at this point in the history
This patch deals with what happens to the RHS's timestamp cache on a
merge. Before this patch, we were either not touching the cache at all,
when the leases of the LHS and RHS were collocated at merge time, or we
were bumping the RHS's ts cache up to the freeze point otherwise
(because, in this case, the RHS's ts cache info has been lost).  This
patch goes further: now we'll bump the RHS ts cache up to the RHS closed
timestamp on the argument the the RHS's closed timestamp is lost.

This patch is needed by the effort to move closed timestamp to be
per-range instead of per-store, and also to have future-time closed
timestamps. Today, the new ts cache bump is not necessary for a fairly
subtle reason: if the pre-merge leases are collocated,, then the closed
ts of the RHS is not "lost" because it's the same as the one of the LHS.
If the leases are not collocated, the freeze time of the RHS is
certainly above its closed ts. So, in either case, the current code
doesn't lead to the possibility of accepting write post-merge that
invalidate previous follower reads.

The RHS' closed timestamp is plumbed from the freeze to the merge
through subsume response.

Release note: None
  • Loading branch information
andreimatei committed Feb 20, 2021
1 parent 0668efb commit 8738d72
Show file tree
Hide file tree
Showing 11 changed files with 1,386 additions and 1,181 deletions.
9 changes: 9 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_subsume.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,15 @@ func Subsume(
reply.MVCCStats = cArgs.EvalCtx.GetMVCCStats()
reply.LeaseAppliedIndex = lai
reply.FreezeStart = cArgs.EvalCtx.Clock().NowAsClockTimestamp()
// FrozenClosedTimestamp might return an empty timestamp if the Raft-based
// closed timestamp transport hasn't been enabled yet. That's OK because, if
// the new transport is not enabled, then ranges with leading closed
// timestamps can't exist yet, and so the closed timestamp must be below the
// FreezeStart. The FreezeStart is used by Store.MergeRange to bump the RHS'
// ts cache if LHS/RHS leases are not collocated. The case when the leases are
// collocated also works out because then the closed timestamp (according to
// the old mechanism) is the same for both ranges being merged.
reply.ClosedTimestamp = cArgs.EvalCtx.FrozenClosedTimestamp(ctx)

return result.Result{
Local: result.LocalResult{FreezeStart: reply.FreezeStart.ToTimestamp()},
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type EvalContext interface {
GetTerm(uint64) (uint64, error)
GetLeaseAppliedIndex() uint64
GetTracker() closedts.TrackerI
FrozenClosedTimestamp(ctx context.Context) hlc.Timestamp

Desc() *roachpb.RangeDescriptor
ContainsKey(key roachpb.Key) bool
Expand Down Expand Up @@ -184,6 +185,9 @@ func (m *mockEvalCtxImpl) GetLeaseAppliedIndex() uint64 {
func (m *mockEvalCtxImpl) GetTracker() closedts.TrackerI {
panic("unimplemented")
}
func (m *mockEvalCtxImpl) FrozenClosedTimestamp(ctx context.Context) hlc.Timestamp {
panic("unimplemented")
}
func (m *mockEvalCtxImpl) Desc() *roachpb.RangeDescriptor {
return m.MockEvalCtx.Desc
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_application_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func (r *Replica) handleSplitResult(ctx context.Context, split *kvserverpb.Split

func (r *Replica) handleMergeResult(ctx context.Context, merge *kvserverpb.Merge) {
if err := r.store.MergeRange(
ctx, r, merge.LeftDesc, merge.RightDesc, merge.FreezeStart,
ctx, r, merge.LeftDesc, merge.RightDesc, merge.FreezeStart, merge.RightClosedTimestamp,
); err != nil {
// Our in-memory state has diverged from the on-disk state.
log.Fatalf(ctx, "failed to update store after merging range: %s", err)
Expand Down
9 changes: 5 additions & 4 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,10 +744,11 @@ func (r *Replica) AdminMerge(
Commit: true,
InternalCommitTrigger: &roachpb.InternalCommitTrigger{
MergeTrigger: &roachpb.MergeTrigger{
LeftDesc: updatedLeftDesc,
RightDesc: rightDesc,
RightMVCCStats: rhsSnapshotRes.MVCCStats,
FreezeStart: rhsSnapshotRes.FreezeStart,
LeftDesc: updatedLeftDesc,
RightDesc: rightDesc,
RightMVCCStats: rhsSnapshotRes.MVCCStats,
FreezeStart: rhsSnapshotRes.FreezeStart,
RightClosedTimestamp: rhsSnapshotRes.ClosedTimestamp,
},
},
})
Expand Down
16 changes: 16 additions & 0 deletions pkg/kv/kvserver/replica_eval_context_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,22 @@ func (rec *SpanSetReplicaEvalContext) GetTracker() closedts.TrackerI {
return rec.i.GetTracker()
}

// FrozenClosedTimestamp is part of the EvalContext interface.
func (rec *SpanSetReplicaEvalContext) FrozenClosedTimestamp(ctx context.Context) hlc.Timestamp {
// To capture a closed timestamp, all keys must be latched to prevent any
// concurrent writes (which could advance the closed timestamp).
desc := rec.i.Desc()
rec.ss.AssertAllowed(spanset.SpanReadWrite, roachpb.Span{
Key: keys.MakeRangeKeyPrefix(desc.StartKey),
EndKey: keys.MakeRangeKeyPrefix(desc.EndKey),
})
rec.ss.AssertAllowed(spanset.SpanReadWrite, roachpb.Span{
Key: desc.StartKey.AsRawKey(),
EndKey: desc.EndKey.AsRawKey(),
})
return rec.i.FrozenClosedTimestamp(ctx)
}

// IsFirstRange returns true iff the replica belongs to the first range.
func (rec *SpanSetReplicaEvalContext) IsFirstRange() bool {
return rec.i.IsFirstRange()
Expand Down
15 changes: 15 additions & 0 deletions pkg/kv/kvserver/replica_follower_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,18 @@ func (r *Replica) maxClosedRLocked(ctx context.Context) (_ hlc.Timestamp, ok boo

return maxClosed, true
}

// FrozenClosedTimestamp returns the closed timestamp. Unlike
// MaxClosedTimestamp, it only looks at the "new" closed timestamp mechanism,
// ignoring the old one. It returns an empty result if the new mechanism is not
// enabled yet. The new mechanism has better properties than the old one -
// namely the closing of timestamps is synchronized with subsumption requests
// (through latches). Callers who need that property should be prepared to get
// an empty result back, meaning that the closed timestamp cannot be known.
func (r *Replica) FrozenClosedTimestamp(ctx context.Context) hlc.Timestamp {
r.mu.RLock()
defer r.mu.RUnlock()
// TODO(andrei): Make sure that this synchronizes with the closed timestamps
// side-transport once the side-transport is written.
return r.mu.state.ClosedTimestamp
}
45 changes: 31 additions & 14 deletions pkg/kv/kvserver/store_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func (s *Store) MergeRange(
leftRepl *Replica,
newLeftDesc, rightDesc roachpb.RangeDescriptor,
freezeStart hlc.ClockTimestamp,
rightClosedTS hlc.Timestamp,
) error {
if oldLeftDesc := leftRepl.Desc(); !oldLeftDesc.EndKey.Less(newLeftDesc.EndKey) {
return errors.Errorf("the new end key is not greater than the current one: %+v <= %+v",
Expand Down Expand Up @@ -74,20 +75,36 @@ func (s *Store) MergeRange(

leftLease, _ := leftRepl.GetLease()
rightLease, _ := rightRepl.GetLease()
if leftLease.OwnedBy(s.Ident.StoreID) && !rightLease.OwnedBy(s.Ident.StoreID) {
// We hold the lease for the LHS, but do not hold the lease for the RHS.
// That means we don't have up-to-date timestamp cache entries for the
// keyspace previously owned by the RHS. Bump the low water mark for the RHS
// keyspace to freezeStart, the time at which the RHS promised to stop
// serving traffic, as freezeStart is guaranteed to be greater than any
// entry in the RHS's timestamp cache.
//
// Note that we need to update our clock with freezeStart to preserve the
// invariant that our clock is always greater than or equal to any
// timestamps in the timestamp cache. For a full discussion, see the comment
// on TestStoreRangeMergeTimestampCacheCausality.
s.Clock().Update(freezeStart)
setTimestampCacheLowWaterMark(s.tsCache, &rightDesc, freezeStart.ToTimestamp())
if leftLease.OwnedBy(s.Ident.StoreID) {
if !rightLease.OwnedBy(s.Ident.StoreID) {
// We hold the lease for the LHS, but do not hold the lease for the RHS.
// That means we don't have up-to-date timestamp cache entries for the
// keyspace previously owned by the RHS. Bump the low water mark for the RHS
// keyspace to freezeStart, the time at which the RHS promised to stop
// serving traffic, as freezeStart is guaranteed to be greater than any
// entry in the RHS's timestamp cache.
//
// Note that we need to update our clock with freezeStart to preserve the
// invariant that our clock is always greater than or equal to any
// timestamps in the timestamp cache. For a full discussion, see the comment
// on TestStoreRangeMergeTimestampCacheCausality.
s.Clock().Update(freezeStart)
setTimestampCacheLowWaterMark(s.tsCache, &rightDesc, freezeStart.ToTimestamp())
}
// When merging ranges, the closed timestamp of the RHS can regress. It's
// possible that, at subsumption time, the RHS had a high closed timestamp.
// Being ingested by the LHS, the closed timestamp of the RHS is lost, and
// the LHS's closed timestamp takes over the respective keys. In order to
// not violate reads that might have been performed by the RHS according to
// the old closed ts (either by the leaseholder or by followers), we bump
// the timestamp cache.
// In the case when the RHS lease was not collocated with the LHS, this bump
// is frequently (but not necessarily) redundant with the bumping to the
// freeze time done above.
if !rightClosedTS.Synthetic {
s.Clock().Update(rightClosedTS.UnsafeToClockTimestamp())
}
setTimestampCacheLowWaterMark(s.tsCache, &rightDesc, rightClosedTS)
}

// Update the subsuming range's descriptor.
Expand Down
Loading

0 comments on commit 8738d72

Please sign in to comment.