From a3d35c80ce172c5c8de53a13096ffbf84b9ad16d Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Sun, 28 Feb 2021 22:01:42 +0100 Subject: [PATCH 1/4] util/log: ensure that all channel logs are displayed with `-show-logs` When `-show-logs` is specified, the `log.Scope` becomes a no-op and the default configuration in the `log` package is used. This is the only time ever when the default configuration is used. Prior to this patch, only the logging for the DEV channel would make its way to the standard error (and the test output) in that case. This was unfortunate, since the intent (as spelled out in a comment already) was to display everything. This patch fixes that. Release justification: non-production code changes Release note: None --- pkg/util/log/flags.go | 6 ++---- pkg/util/log/flags_test.go | 2 +- pkg/util/log/logconfig/config.go | 8 ++++---- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/pkg/util/log/flags.go b/pkg/util/log/flags.go index 26d3e0d15856..9392366fc6f6 100644 --- a/pkg/util/log/flags.go +++ b/pkg/util/log/flags.go @@ -53,10 +53,8 @@ func init() { // Default stderrThreshold to log everything to the process' // external stderr (OrigStderr). defaultConfig.Sinks.Stderr.Filter = severity.INFO - // We only register it for the DEV channels. No other - // channels get a configuration, whereby every channel - // ends up sharing the DEV logger (debugLog). - defaultConfig.Sinks.Stderr.Channels.Channels = []logpb.Channel{channel.DEV} + // Ensure all channels go to stderr. + defaultConfig.Sinks.Stderr.Channels.Channels = logconfig.SelectAllChannels() // We also don't capture internal writes to fd2 by default: // let the writes go to the external stderr. defaultConfig.CaptureFd2.Enable = false diff --git a/pkg/util/log/flags_test.go b/pkg/util/log/flags_test.go index f0ce02a437ef..54a3b7e618c3 100644 --- a/pkg/util/log/flags_test.go +++ b/pkg/util/log/flags_test.go @@ -25,7 +25,7 @@ func TestAppliedStandaloneConfig(t *testing.T) { const expected = `sinks: stderr: - channels: [DEV] + channels: all filter: INFO format: crdb-v2-tty redact: false diff --git a/pkg/util/log/logconfig/config.go b/pkg/util/log/logconfig/config.go index 2305f5545a1e..6fef5711191e 100644 --- a/pkg/util/log/logconfig/config.go +++ b/pkg/util/log/logconfig/config.go @@ -564,7 +564,7 @@ func parseChannelList(s string) ([]logpb.Channel, error) { // Special case: "ALL" selects all channels. if s == "ALL" { - return selectAllChannels(), nil + return SelectAllChannels(), nil } // If channels starts with "all except", we invert the selection. @@ -597,7 +597,7 @@ func selectChannels(invert bool, parts []string) ([]logpb.Channel, error) { if len(parts) != 1 { return nil, errors.New("cannot use ALL if there are other channel names present in the list") } - return selectAllChannels(), nil + return SelectAllChannels(), nil } // Verify the channel name is known. @@ -636,9 +636,9 @@ func selectChannels(invert bool, parts []string) ([]logpb.Channel, error) { return selected, nil } -// selectAllChannels returns a copy of channelValues, +// SelectAllChannels returns a copy of channelValues, // for use in the ALL configuration. -func selectAllChannels() []logpb.Channel { +func SelectAllChannels() []logpb.Channel { // Copy the default in case the code that uses a Config overwrites // its channel list in-place. chans := make([]logpb.Channel, 0, len(channelValues)) From a45152aceb3d1ed5f048c19c3b2d01d3dfc2082a Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Fri, 26 Feb 2021 02:14:44 -0500 Subject: [PATCH 2/4] kv/kvserver: stop allowing historical reads up to freeze time on subsumed ranges In b192bba, we began allowing historical reads up to the freeze start time on subsumed ranges. This turned out not to be quite the right idea, because we can now ship the timestamp cache to the LHS and be more optimal about the resulting timestamp cache on the joint range. However, since we started allowing reads up to the freeze time, we were effectively closing this time for all future writes on the joint range, so we couldn't take advantage of the new ability to ship the timestamp cache around. But the change was very well intentioned and revealed that we should have no problem allowing reads below the closed timestamp on subsumed ranges. We will add support for this in the near future, likely once the new closed timestamp system is phased in. --- pkg/kv/kvserver/batcheval/cmd_subsume.go | 17 +- pkg/kv/kvserver/batcheval/result/result.go | 21 +-- pkg/kv/kvserver/client_merge_test.go | 183 +++++++++++---------- pkg/kv/kvserver/replica.go | 71 ++------ pkg/kv/kvserver/replica_proposal.go | 12 +- pkg/kv/kvserver/replica_read.go | 10 +- 6 files changed, 133 insertions(+), 181 deletions(-) diff --git a/pkg/kv/kvserver/batcheval/cmd_subsume.go b/pkg/kv/kvserver/batcheval/cmd_subsume.go index 61610efa99ce..83686e351593 100644 --- a/pkg/kv/kvserver/batcheval/cmd_subsume.go +++ b/pkg/kv/kvserver/batcheval/cmd_subsume.go @@ -110,6 +110,11 @@ func Subsume( return result.Result{}, errors.AssertionFailedf("non-deletion intent on local range descriptor") } + // NOTE: the deletion intent on the range's meta2 descriptor is just as + // important to correctness as the deletion intent on the local descriptor, + // but the check is too expensive as it would involve a network roundtrip on + // most nodes. + // We prevent followers of the RHS from being able to serve follower reads on // timestamps that fall in the timestamp window representing the range's // subsumed state (i.e. between the subsumption time (FreezeStart) and the @@ -142,16 +147,16 @@ func Subsume( // is subsumed, we ensure that the initial MLAI update broadcast by the new // leaseholder respects the invariant in question, in much the same way we do // here. Take a look at `EmitMLAI()` in replica_closedts.go for more details. + // + // TODO(nvanbenschoten): remove this in v21.2 when the rest of the v1 closed + // timestamp system disappears. _, untrack := cArgs.EvalCtx.GetTracker().Track(ctx) lease, _ := cArgs.EvalCtx.GetLease() lai := cArgs.EvalCtx.GetLeaseAppliedIndex() untrack(ctx, ctpb.Epoch(lease.Epoch), desc.RangeID, ctpb.LAI(lai+1)) - // NOTE: the deletion intent on the range's meta2 descriptor is just as - // important to correctness as the deletion intent on the local descriptor, - // but the check is too expensive as it would involve a network roundtrip on - // most nodes. - + // Now that the range is frozen, collect some information to ship to the LHS + // leaseholder through the merge trigger. reply.MVCCStats = cArgs.EvalCtx.GetMVCCStats() reply.LeaseAppliedIndex = lai reply.FreezeStart = cArgs.EvalCtx.Clock().NowAsClockTimestamp() @@ -183,6 +188,6 @@ func Subsume( reply.ReadSummary = &priorReadSum return result.Result{ - Local: result.LocalResult{FreezeStart: reply.FreezeStart.ToTimestamp()}, + Local: result.LocalResult{MaybeWatchForMerge: true}, }, nil } diff --git a/pkg/kv/kvserver/batcheval/result/result.go b/pkg/kv/kvserver/batcheval/result/result.go index 7d444371cf86..448bee0f2630 100644 --- a/pkg/kv/kvserver/batcheval/result/result.go +++ b/pkg/kv/kvserver/batcheval/result/result.go @@ -16,7 +16,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" "github.com/kr/pretty" @@ -64,10 +63,8 @@ type LocalResult struct { MaybeAddToSplitQueue bool // Call MaybeGossipNodeLiveness with the specified Span, if set. MaybeGossipNodeLiveness *roachpb.Span - // FreezeStart indicates the high water mark timestamp beyond which the range - // is guaranteed to not have served any requests. This value is only set when - // a range merge is in progress. If set, call maybeWatchForMerge. - FreezeStart hlc.Timestamp + // Call maybeWatchForMerge. + MaybeWatchForMerge bool // Metrics contains counters which are to be passed to the // metrics subsystem. @@ -87,7 +84,7 @@ func (lResult *LocalResult) IsZero() bool { !lResult.MaybeGossipSystemConfig && !lResult.MaybeGossipSystemConfigIfHaveFailure && lResult.MaybeGossipNodeLiveness == nil && - lResult.FreezeStart.IsEmpty() && + !lResult.MaybeWatchForMerge && lResult.Metrics == nil } @@ -100,13 +97,13 @@ func (lResult *LocalResult) String() string { "#updated txns: %d #end txns: %d, "+ "GossipFirstRange:%t MaybeGossipSystemConfig:%t "+ "MaybeGossipSystemConfigIfHaveFailure:%t MaybeAddToSplitQueue:%t "+ - "MaybeGossipNodeLiveness:%s FreezeStart:%s", + "MaybeGossipNodeLiveness:%s MaybeWatchForMerge:%t", lResult.Reply, len(lResult.EncounteredIntents), len(lResult.AcquiredLocks), len(lResult.ResolvedLocks), len(lResult.UpdatedTxns), len(lResult.EndTxns), lResult.GossipFirstRange, lResult.MaybeGossipSystemConfig, lResult.MaybeGossipSystemConfigIfHaveFailure, lResult.MaybeAddToSplitQueue, - lResult.MaybeGossipNodeLiveness, lResult.FreezeStart) + lResult.MaybeGossipNodeLiveness, lResult.MaybeWatchForMerge) } // DetachEncounteredIntents returns (and removes) those encountered @@ -343,17 +340,11 @@ func (p *Result) MergeAndDestroy(q Result) error { } q.Local.MaybeGossipNodeLiveness = nil - if p.Local.FreezeStart.IsEmpty() { - p.Local.FreezeStart = q.Local.FreezeStart - } else if !q.Local.FreezeStart.IsEmpty() { - return errors.AssertionFailedf("conflicting FreezeStart") - } - q.Local.FreezeStart = hlc.Timestamp{} - coalesceBool(&p.Local.GossipFirstRange, &q.Local.GossipFirstRange) coalesceBool(&p.Local.MaybeGossipSystemConfig, &q.Local.MaybeGossipSystemConfig) coalesceBool(&p.Local.MaybeGossipSystemConfigIfHaveFailure, &q.Local.MaybeGossipSystemConfigIfHaveFailure) coalesceBool(&p.Local.MaybeAddToSplitQueue, &q.Local.MaybeAddToSplitQueue) + coalesceBool(&p.Local.MaybeWatchForMerge, &q.Local.MaybeWatchForMerge) if p.Local.Metrics == nil { p.Local.Metrics = q.Local.Metrics diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index b4390c522831..acb8d232ef23 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -4350,92 +4350,103 @@ func sendWithTxn( return pErr.GoError() } -// TestHistoricalReadsAfterSubsume tests that a subsumed right hand side range -// can only serve read-only traffic for timestamps that precede the subsumption -// time, but don't contain the subsumption time in their uncertainty interval. -func TestHistoricalReadsAfterSubsume(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - ctx := context.Background() - - maxOffset := 100 * time.Millisecond - preUncertaintyTs := func(ts hlc.Timestamp) hlc.Timestamp { - return hlc.Timestamp{ - WallTime: ts.GoTime().Add(-maxOffset).UnixNano() - 1, - Logical: ts.Logical, - } - } - - type testCase struct { - name string - queryTsFunc func(freezeStart hlc.Timestamp) hlc.Timestamp - queryArgsFunc func(key roachpb.Key) roachpb.Request - shouldBlock bool - } - - tests := []testCase{ - // Ensure that a read query for a timestamp older than freezeStart-MaxOffset - // is let through. - { - name: "historical read", - queryTsFunc: preUncertaintyTs, - queryArgsFunc: func(key roachpb.Key) roachpb.Request { - return getArgs(key) - }, - shouldBlock: false, - }, - // Write queries for the same historical timestamp should block (and then - // eventually fail because the range no longer exists). - { - name: "historical write", - queryTsFunc: preUncertaintyTs, - queryArgsFunc: func(key roachpb.Key) roachpb.Request { - return putArgs(key, []byte(`test value`)) - }, - shouldBlock: true, - }, - // Read queries that contain the subsumption time in its uncertainty interval - // should block and eventually fail. - { - name: "historical read with uncertainty", - queryTsFunc: func(freezeStart hlc.Timestamp) hlc.Timestamp { - return freezeStart.Prev() - }, - queryArgsFunc: func(key roachpb.Key) roachpb.Request { - return getArgs(key) - }, - shouldBlock: true, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - tc, store, rhsDesc, freezeStart, waitForBlocked, cleanupFunc := - setupClusterWithSubsumedRange(ctx, t, 1 /* numNodes */, maxOffset) - defer tc.Stopper().Stop(ctx) - errCh := make(chan error) - go func() { - errCh <- sendWithTxn(store, rhsDesc, test.queryTsFunc(freezeStart), maxOffset, - test.queryArgsFunc(rhsDesc.StartKey.AsRawKey())) - }() - if test.shouldBlock { - waitForBlocked() - cleanupFunc() - // RHS should cease to exist once the merge completes but we cannot - // guarantee that the merge wasn't internally retried before it was able - // to successfully commit. If it did, requests blocked on the previous - // merge attempt might go through successfully. Thus, we cannot make any - // assertions about the result of these blocked requests. - <-errCh - } else { - require.NoError(t, <-errCh) - // We cleanup *after* the non-blocking read request succeeds to prevent - // it from racing with the merge commit trigger. - cleanupFunc() - } - }) - } -} +// TODO(nvanbenschoten): fix this test. In b192bba, we allowed historical reads +// up to the freeze start time on subsumed ranges. This turned out not to be +// quite the right idea, because we can now ship the timestamp cache to the LHS +// and be more optimal about the resulting timestamp cache on the joint range. +// However, since we started allowing reads up to the freeze time, we were +// effectively closing this time for all future writes on the joint range, so we +// couldn't take advantage of the new ability to ship the timestamp cache +// around. But the change was very well intentioned and revealed that we should +// have no problem allowing reads below the closed timestamp on subsumed ranges. +// Add support for this and update this test. +// +// // TestHistoricalReadsAfterSubsume tests that a subsumed right hand side range +// // can only serve read-only traffic for timestamps that precede the subsumption +// // time, but don't contain the subsumption time in their uncertainty interval. +// func TestHistoricalReadsAfterSubsume(t *testing.T) { +// defer leaktest.AfterTest(t)() +// defer log.Scope(t).Close(t) +// ctx := context.Background() + +// maxOffset := 100 * time.Millisecond +// preUncertaintyTs := func(ts hlc.Timestamp) hlc.Timestamp { +// return hlc.Timestamp{ +// WallTime: ts.GoTime().Add(-maxOffset).UnixNano() - 1, +// Logical: ts.Logical, +// } +// } + +// type testCase struct { +// name string +// queryTsFunc func(freezeStart hlc.Timestamp) hlc.Timestamp +// queryArgsFunc func(key roachpb.Key) roachpb.Request +// shouldBlock bool +// } + +// tests := []testCase{ +// // Ensure that a read query for a timestamp older than freezeStart-MaxOffset +// // is let through. +// { +// name: "historical read", +// queryTsFunc: preUncertaintyTs, +// queryArgsFunc: func(key roachpb.Key) roachpb.Request { +// return getArgs(key) +// }, +// shouldBlock: false, +// }, +// // Write queries for the same historical timestamp should block (and then +// // eventually fail because the range no longer exists). +// { +// name: "historical write", +// queryTsFunc: preUncertaintyTs, +// queryArgsFunc: func(key roachpb.Key) roachpb.Request { +// return putArgs(key, []byte(`test value`)) +// }, +// shouldBlock: true, +// }, +// // Read queries that contain the subsumption time in its uncertainty interval +// // should block and eventually fail. +// { +// name: "historical read with uncertainty", +// queryTsFunc: func(freezeStart hlc.Timestamp) hlc.Timestamp { +// return freezeStart.Prev() +// }, +// queryArgsFunc: func(key roachpb.Key) roachpb.Request { +// return getArgs(key) +// }, +// shouldBlock: true, +// }, +// } + +// for _, test := range tests { +// t.Run(test.name, func(t *testing.T) { +// tc, store, rhsDesc, freezeStart, waitForBlocked, cleanupFunc := +// setupClusterWithSubsumedRange(ctx, t, 1 /* numNodes */, maxOffset) +// defer tc.Stopper().Stop(ctx) +// errCh := make(chan error) +// go func() { +// errCh <- sendWithTxn(store, rhsDesc, test.queryTsFunc(freezeStart), maxOffset, +// test.queryArgsFunc(rhsDesc.StartKey.AsRawKey())) +// }() +// if test.shouldBlock { +// waitForBlocked() +// cleanupFunc() +// // RHS should cease to exist once the merge completes but we cannot +// // guarantee that the merge wasn't internally retried before it was able +// // to successfully commit. If it did, requests blocked on the previous +// // merge attempt might go through successfully. Thus, we cannot make any +// // assertions about the result of these blocked requests. +// <-errCh +// } else { +// require.NoError(t, <-errCh) +// // We cleanup *after* the non-blocking read request succeeds to prevent +// // it from racing with the merge commit trigger. +// cleanupFunc() +// } +// }) +// } +// } // TestStoreBlockTransferLeaseRequestAfterSubsumption tests that a // TransferLeaseRequest checks & waits for an ongoing merge before it can be diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index cc6e73d1fa98..a4cad6e40f32 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -964,10 +964,6 @@ func (r *Replica) mergeInProgressRLocked() bool { return r.mu.mergeComplete != nil } -func (r *Replica) getFreezeStartRLocked() hlc.Timestamp { - return r.mu.freezeStart -} - // setLastReplicaDescriptors sets the most recently seen replica // descriptors to those contained in the *RaftMessageRequest, acquiring r.mu // to do so. @@ -1269,6 +1265,8 @@ func (r *Replica) checkExecutionCanProceed( // a merge's critical phase (i.e. while the RHS of the merge is // subsumed). if err := r.shouldWaitForPendingMergeRLocked(ctx, ba); err != nil { + // TODO(nvanbenschoten): we should still be able to serve reads + // below the closed timestamp in this case. return kvserverpb.LeaseStatus{}, err } } @@ -1344,51 +1342,12 @@ func (r *Replica) shouldWaitForPendingMergeRLocked( return nil } - // TODO(nvanbenschoten): this isn't quite right. We shouldn't allow non-txn - // requests through here for the same reason why lease transfers can only - // allow concurrent reads > max_offset below the lease transfer time. - if ba.IsReadOnly() { - freezeStart := r.getFreezeStartRLocked() - ts := ba.Timestamp - if ba.Txn != nil { - ts.Forward(ba.Txn.GlobalUncertaintyLimit) - } - if ts.Less(freezeStart) { - // When the max timestamp of a read request is less than the subsumption - // time recorded by this Range (freezeStart), we're guaranteed that none - // of the writes accepted by the leaseholder for the keyspan (which could - // be a part of the subsuming range if the merge succeeded, or part of - // this range if it didn't) for timestamps after the subsumption timestamp - // could have causally preceded the current request. Letting such requests - // go through does not violate any of the invariants guaranteed by - // Subsume(). - // - // NB: It would be incorrect to serve this read request if freezeStart - // were in its uncertainty window. For the sake of contradiction, consider - // the following scenario, if such a request were allowed to proceed: - // 1. This range gets subsumed, `maybeWatchForMerge` is called and the - // `mergeCompleteCh` channel is set up. - // 2. A read request *that succeeds the subsumption in real time* comes in - // for a timestamp that contains `freezeStart` in its uncertainty interval - // before the `mergeCompleteCh` channel is removed. Let's say the read - // timestamp of this request is X (with X <= freezeStart), and let's - // denote its uncertainty interval by [X, Y). - // 3. By the time this request reaches `shouldWaitForPendingMergeRLocked`, the - // merge has committed so all subsequent requests are directed to the - // leaseholder of the (subsuming) left-hand range but this pre-merge range - // hasn't been destroyed yet. - // 4. If the (post-merge) left-hand side leaseholder had accepted any new - // writes with timestamps in the window [freezeStart, Y), we would - // potentially have a stale read, as any of the writes in this window could - // have causally preceded the aforementioned read. - return nil - } - } - // This request cannot proceed until the merge completes, signaled by the - // closing of the channel. + // The replica is being merged into its left-hand neighbor. This request + // cannot proceed until the merge completes, signaled by the closing of the + // channel. // // It is very important that this check occur after we have acquired latches - // from the spanlatch manager. Only after we release these latches are we + // from the spanlatch manager. Only after we acquire these latches are we // guaranteed that we're not racing with a Subsume command. (Subsume // commands declare a conflict with all other commands.) It is also // important that this check occur after we have verified that this replica @@ -1486,16 +1445,15 @@ func (r *Replica) isNewerThanSplitRLocked(split *roachpb.SplitTrigger) bool { } // maybeWatchForMerge checks whether a merge of this replica into its left -// neighbor is in its critical phase and, if so, arranges to block all requests, -// except for read-only requests that are older than `freezeStart`, until the -// merge completes. -func (r *Replica) maybeWatchForMerge(ctx context.Context, freezeStart hlc.Timestamp) error { +// neighbor is in its critical phase and, if so, arranges to block all requests +// until the merge completes. +func (r *Replica) maybeWatchForMerge(ctx context.Context) error { r.mu.Lock() defer r.mu.Unlock() - return r.maybeWatchForMergeLocked(ctx, freezeStart) + return r.maybeWatchForMergeLocked(ctx) } -func (r *Replica) maybeWatchForMergeLocked(ctx context.Context, freezeStart hlc.Timestamp) error { +func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) error { desc := r.descRLocked() descKey := keys.RangeDescriptorKey(desc.StartKey) _, intent, err := storage.MVCCGet(ctx, r.Engine(), descKey, r.Clock().Now(), @@ -1525,12 +1483,6 @@ func (r *Replica) maybeWatchForMergeLocked(ctx context.Context, freezeStart hlc. // Nothing more to do. return nil } - // Note that if the merge txn retries for any reason (for example, if the - // left-hand side range undergoes a lease transfer before the merge - // completes), the right-hand side range will get re-subsumed. This will - // lead to `freezeStart` being overwritten with the new subsumption time. - // This is fine. - r.mu.freezeStart = freezeStart r.mu.mergeComplete = mergeCompleteCh // The RHS of a merge is not permitted to quiesce while a mergeComplete // channel is installed. (If the RHS is quiescent when the merge commits, any @@ -1647,7 +1599,6 @@ func (r *Replica) maybeWatchForMergeLocked(ctx context.Context, freezeStart hlc. // Unblock pending requests. If the merge committed, the requests will // notice that the replica has been destroyed and return an appropriate // error. If the merge aborted, the requests will be handled normally. - r.mu.freezeStart = hlc.Timestamp{} r.mu.mergeComplete = nil close(mergeCompleteCh) r.mu.Unlock() diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 2d3da2289028..f072339edc39 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -396,13 +396,7 @@ func (r *Replica) leasePostApplyLocked( // progress, as only the old leaseholder would have been explicitly notified // of the merge. If there is a merge in progress, maybeWatchForMerge will // arrange to block all traffic to this replica unless the merge aborts. - // NB: If the subsumed range changes leaseholders after subsumption, - // `freezeStart` will be zero and we will effectively be blocking all read - // requests. - // TODO(aayush): In the future, if we permit co-operative lease transfers - // when a range is subsumed, it should be relatively straightforward to - // allow historical reads on the subsumed RHS after such lease transfers. - if err := r.maybeWatchForMergeLocked(ctx, hlc.Timestamp{} /* freezeStart */); err != nil { + if err := r.maybeWatchForMergeLocked(ctx); err != nil { // We were unable to determine whether a merge was in progress. We cannot // safely proceed. log.Fatalf(ctx, "failed checking for in-progress merge while installing new lease %s: %s", @@ -629,8 +623,8 @@ func (r *Replica) handleReadWriteLocalEvalResult(ctx context.Context, lResult re if lResult.EndTxns != nil { log.Fatalf(ctx, "LocalEvalResult.EndTxns should be nil: %+v", lResult.EndTxns) } - if !lResult.FreezeStart.IsEmpty() { - log.Fatalf(ctx, "LocalEvalResult.FreezeStart should have been handled and reset: %s", lResult.FreezeStart) + if lResult.MaybeWatchForMerge { + log.Fatalf(ctx, "LocalEvalResult.MaybeWatchForMerge should be false") } if lResult.AcquiredLocks != nil { diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go index 5f614a4db004..190154442eae 100644 --- a/pkg/kv/kvserver/replica_read.go +++ b/pkg/kv/kvserver/replica_read.go @@ -180,14 +180,14 @@ func (r *Replica) handleReadOnlyLocalEvalResult( lResult.AcquiredLocks = nil } - if !lResult.FreezeStart.IsEmpty() { - // A merge is (likely) about to be carried out, and this replica needs to - // block all non-read traffic until the merge either commits or aborts. See + if lResult.MaybeWatchForMerge { + // A merge is (likely) about to be carried out, and this replica needs + // to block all traffic until the merge either commits or aborts. See // docs/tech-notes/range-merges.md. - if err := r.maybeWatchForMerge(ctx, lResult.FreezeStart); err != nil { + if err := r.maybeWatchForMerge(ctx); err != nil { return roachpb.NewError(err) } - lResult.FreezeStart = hlc.Timestamp{} + lResult.MaybeWatchForMerge = false } if !lResult.IsZero() { From a3cb57c120487217000b9a809fcc16e65d295942 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Fri, 26 Feb 2021 12:17:02 -0500 Subject: [PATCH 3/4] kv/kvserver: merge GetFrozenClosedTimestamp and GetCurrentReadSummary These are doing roughly the same thing, so merge the methods. --- .../kvserver/batcheval/cmd_lease_transfer.go | 2 +- pkg/kv/kvserver/batcheval/cmd_subsume.go | 22 ++++++++++-------- pkg/kv/kvserver/batcheval/eval_context.go | 17 ++++++++------ pkg/kv/kvserver/replica_eval_context_span.go | 18 +-------------- pkg/kv/kvserver/replica_follower_read.go | 23 +++++++++++-------- pkg/kv/kvserver/replica_tscache.go | 10 ++++---- 6 files changed, 43 insertions(+), 49 deletions(-) diff --git a/pkg/kv/kvserver/batcheval/cmd_lease_transfer.go b/pkg/kv/kvserver/batcheval/cmd_lease_transfer.go index badf400f007d..d744aa79d61d 100644 --- a/pkg/kv/kvserver/batcheval/cmd_lease_transfer.go +++ b/pkg/kv/kvserver/batcheval/cmd_lease_transfer.go @@ -81,7 +81,7 @@ func TransferLease( // incoming leaseholder. This is used to instruct the new leaseholder on how // to update its timestamp cache to ensure that no future writes are allowed // to invalidate prior reads. - priorReadSum := cArgs.EvalCtx.GetCurrentReadSummary() + priorReadSum, _ := cArgs.EvalCtx.GetCurrentReadSummary() // For now, forward this summary to the proposed lease's start time. This // may appear to undermine the benefit of the read summary, but it doesn't // entirely. Until we ship higher-resolution read summaries, the read diff --git a/pkg/kv/kvserver/batcheval/cmd_subsume.go b/pkg/kv/kvserver/batcheval/cmd_subsume.go index 83686e351593..c278afe57eb2 100644 --- a/pkg/kv/kvserver/batcheval/cmd_subsume.go +++ b/pkg/kv/kvserver/batcheval/cmd_subsume.go @@ -160,20 +160,12 @@ func Subsume( reply.MVCCStats = cArgs.EvalCtx.GetMVCCStats() reply.LeaseAppliedIndex = lai reply.FreezeStart = cArgs.EvalCtx.Clock().NowAsClockTimestamp() - // FrozenClosedTimestamp might return an empty timestamp if the Raft-based - // closed timestamp transport hasn't been enabled yet. That's OK because, if - // the new transport is not enabled, then ranges with leading closed - // timestamps can't exist yet, and so the closed timestamp must be below the - // FreezeStart. The FreezeStart is used by Store.MergeRange to bump the RHS' - // ts cache if LHS/RHS leases are not collocated. The case when the leases are - // collocated also works out because then the closed timestamp (according to - // the old mechanism) is the same for both ranges being merged. - reply.ClosedTimestamp = cArgs.EvalCtx.GetFrozenClosedTimestamp() + // Collect a read summary from the RHS leaseholder to ship to the LHS // leaseholder. This is used to instruct the LHS on how to update its // timestamp cache to ensure that no future writes are allowed to invalidate // prior reads performed to this point on the RHS range. - priorReadSum := cArgs.EvalCtx.GetCurrentReadSummary() + priorReadSum, closedTS := cArgs.EvalCtx.GetCurrentReadSummary() // For now, forward this summary to the freeze time. This may appear to // undermine the benefit of the read summary, but it doesn't entirely. Until // we ship higher-resolution read summaries, the read summary doesn't @@ -186,6 +178,16 @@ func Subsume( // think about. priorReadSum.Merge(rspb.FromTimestamp(reply.FreezeStart.ToTimestamp())) reply.ReadSummary = &priorReadSum + // NOTE FOR v21.1: GetCurrentReadSummary might return an empty timestamp if + // the Raft-based closed timestamp transport hasn't been enabled yet. That's + // OK because, if the new transport is not enabled, then ranges with leading + // closed timestamps can't exist yet, and so the closed timestamp must be + // below the FreezeStart. The FreezeStart is used by Store.MergeRange to + // bump the RHS' ts cache if LHS/RHS leases are not collocated. The case + // when the leases are collocated also works out because then the closed + // timestamp (according to the old mechanism) is the same for both ranges + // being merged. + reply.ClosedTimestamp = closedTS return result.Result{ Local: result.LocalResult{MaybeWatchForMerge: true}, diff --git a/pkg/kv/kvserver/batcheval/eval_context.go b/pkg/kv/kvserver/batcheval/eval_context.go index 747e805b885d..80b10b541c37 100644 --- a/pkg/kv/kvserver/batcheval/eval_context.go +++ b/pkg/kv/kvserver/batcheval/eval_context.go @@ -97,14 +97,20 @@ type EvalContext interface { GetLastReplicaGCTimestamp(context.Context) (hlc.Timestamp, error) GetLease() (roachpb.Lease, roachpb.Lease) GetRangeInfo(context.Context) roachpb.RangeInfo - GetFrozenClosedTimestamp() hlc.Timestamp // GetCurrentReadSummary returns a new ReadSummary reflecting all reads // served by the range to this point. The method requires a write latch // across all keys in the range (see declareAllKeys), because it will only // return a meaningful summary if the caller has serialized with all other // requests on the range. - GetCurrentReadSummary() rspb.ReadSummary + // + // The method also returns the current closed timestamp on the range. This + // closed timestamp is already incorporated into the read summary, but some + // callers also need is separated out. It is expected that a caller will + // have performed some action (either calling RevokeLease or WatchForMerge) + // to freeze further progression of the closed timestamp before calling this + // method. + GetCurrentReadSummary() (rspb.ReadSummary, hlc.Timestamp) GetExternalStorage(ctx context.Context, dest roachpb.ExternalStorage) (cloud.ExternalStorage, error) GetExternalStorageFromURI(ctx context.Context, uri string, user security.SQLUsername) (cloud.ExternalStorage, @@ -194,9 +200,6 @@ func (m *mockEvalCtxImpl) GetLeaseAppliedIndex() uint64 { func (m *mockEvalCtxImpl) GetTracker() closedts.TrackerI { panic("unimplemented") } -func (m *mockEvalCtxImpl) GetFrozenClosedTimestamp() hlc.Timestamp { - panic("unimplemented") -} func (m *mockEvalCtxImpl) Desc() *roachpb.RangeDescriptor { return m.MockEvalCtx.Desc } @@ -226,8 +229,8 @@ func (m *mockEvalCtxImpl) GetLease() (roachpb.Lease, roachpb.Lease) { func (m *mockEvalCtxImpl) GetRangeInfo(ctx context.Context) roachpb.RangeInfo { return roachpb.RangeInfo{Desc: *m.Desc(), Lease: m.Lease} } -func (m *mockEvalCtxImpl) GetCurrentReadSummary() rspb.ReadSummary { - return m.CurrentReadSummary +func (m *mockEvalCtxImpl) GetCurrentReadSummary() (rspb.ReadSummary, hlc.Timestamp) { + return m.CurrentReadSummary, hlc.Timestamp{} } func (m *mockEvalCtxImpl) GetExternalStorage( ctx context.Context, dest roachpb.ExternalStorage, diff --git a/pkg/kv/kvserver/replica_eval_context_span.go b/pkg/kv/kvserver/replica_eval_context_span.go index 5088a69a0357..7ea060d51333 100644 --- a/pkg/kv/kvserver/replica_eval_context_span.go +++ b/pkg/kv/kvserver/replica_eval_context_span.go @@ -118,22 +118,6 @@ func (rec *SpanSetReplicaEvalContext) GetTracker() closedts.TrackerI { return rec.i.GetTracker() } -// GetFrozenClosedTimestamp is part of the EvalContext interface. -func (rec *SpanSetReplicaEvalContext) GetFrozenClosedTimestamp() hlc.Timestamp { - // To capture a closed timestamp, all keys must be latched to prevent any - // concurrent writes (which could advance the closed timestamp). - desc := rec.i.Desc() - rec.ss.AssertAllowed(spanset.SpanReadWrite, roachpb.Span{ - Key: keys.MakeRangeKeyPrefix(desc.StartKey), - EndKey: keys.MakeRangeKeyPrefix(desc.EndKey), - }) - rec.ss.AssertAllowed(spanset.SpanReadWrite, roachpb.Span{ - Key: desc.StartKey.AsRawKey(), - EndKey: desc.EndKey.AsRawKey(), - }) - return rec.i.GetFrozenClosedTimestamp() -} - // IsFirstRange returns true iff the replica belongs to the first range. func (rec *SpanSetReplicaEvalContext) IsFirstRange() bool { return rec.i.IsFirstRange() @@ -228,7 +212,7 @@ func (rec SpanSetReplicaEvalContext) GetRangeInfo(ctx context.Context) roachpb.R } // GetCurrentReadSummary is part of the EvalContext interface. -func (rec *SpanSetReplicaEvalContext) GetCurrentReadSummary() rspb.ReadSummary { +func (rec *SpanSetReplicaEvalContext) GetCurrentReadSummary() (rspb.ReadSummary, hlc.Timestamp) { // To capture a read summary over the range, all keys must be latched for // writing to prevent any concurrent reads or writes. desc := rec.i.Desc() diff --git a/pkg/kv/kvserver/replica_follower_read.go b/pkg/kv/kvserver/replica_follower_read.go index 4bbee7dfdb28..9bef96e47c58 100644 --- a/pkg/kv/kvserver/replica_follower_read.go +++ b/pkg/kv/kvserver/replica_follower_read.go @@ -158,17 +158,20 @@ func (r *Replica) maxClosedRLocked(ctx context.Context) (_ hlc.Timestamp, ok boo return maxClosed, true } -// GetFrozenClosedTimestamp returns the closed timestamp. Unlike -// MaxClosedTimestamp, it only looks at the "new" closed timestamp mechanism, -// ignoring the old one. It returns an empty result if the new mechanism is not -// enabled yet. The new mechanism has better properties than the old one - -// namely the closing of timestamps is synchronized with subsumption requests -// (through latches). Callers who need that property should be prepared to get -// an empty result back, meaning that the closed timestamp cannot be known. -func (r *Replica) GetFrozenClosedTimestamp() hlc.Timestamp { +// ClosedTimestampV2 returns the closed timestamp. Unlike MaxClosedTimestamp, it +// only looks at the "new" closed timestamp mechanism, ignoring the old one. It +// returns an empty result if the new mechanism is not enabled yet. The new +// mechanism has better properties than the old one - namely the closing of +// timestamps is synchronized with lease transfers and subsumption requests. +// Callers who need that property should be prepared to get an empty result +// back, meaning that the closed timestamp cannot be known. +func (r *Replica) ClosedTimestampV2() hlc.Timestamp { r.mu.RLock() defer r.mu.RUnlock() - // TODO(andrei): Make sure that this synchronizes with the closed timestamps - // side-transport once the side-transport is written. + return r.closedTimestampV2RLocked() +} + +func (r *Replica) closedTimestampV2RLocked() hlc.Timestamp { + // TODO(andrei,nvanbenschoten): include sideTransportClosedTimestamp. return r.mu.state.ClosedTimestamp } diff --git a/pkg/kv/kvserver/replica_tscache.go b/pkg/kv/kvserver/replica_tscache.go index 614cf938b8cf..aaf37efc9fd8 100644 --- a/pkg/kv/kvserver/replica_tscache.go +++ b/pkg/kv/kvserver/replica_tscache.go @@ -548,12 +548,14 @@ func transactionPushMarker(key roachpb.Key, txnID uuid.UUID) roachpb.Key { // GetCurrentReadSummary returns a new ReadSummary reflecting all reads served // by the range to this point. -func (r *Replica) GetCurrentReadSummary() rspb.ReadSummary { +func (r *Replica) GetCurrentReadSummary() (rspb.ReadSummary, hlc.Timestamp) { sum := collectReadSummaryFromTimestampCache(r.store.tsCache, r.Desc()) // Forward the read summary by the range's closed timestamp, because any - // replica could have served reads below this time. - sum.Merge(rspb.FromTimestamp(r.GetFrozenClosedTimestamp())) - return sum + // replica could have served reads below this time. We also return the + // closed timestamp separately, in case callers want it split out. + closedTS := r.ClosedTimestampV2() + sum.Merge(rspb.FromTimestamp(closedTS)) + return sum, closedTS } // collectReadSummaryFromTimestampCache constucts a read summary for the range From 9c3ed731fd9821ca850c2e12b4dff4b36bdbda12 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Fri, 26 Feb 2021 23:48:55 -0500 Subject: [PATCH 4/4] kv: sync lease transfers and range merges with closedts side-transport Needed for #61137. This commit updates the manner through which lease transfers (through `LeaseTransferRequest`) and range merges (through `SubsumeRequest`) handle the "transfer of power" from their outgoing leaseholder to their incoming leaseholder. Specifically, it updates the handling of these requests in order to rationalize the interaction between their evaluation and the closing of timestamps through the closed timestamp side-transport. It is now clearer when and how these requests instruct the outgoing leaseholder to relinquish its ability to advance the closed timestamp and, as a result, now possible for the requests to query and operate on the maximum closed timestamp published by the outgoing leaseholder. For lease transfers, this commit begins by addressing an existing TODO to push the revocation of the outgoing lease out of `AdminTransferLease` and into the evaluation of `LeaseTransferRequest` through a new `RevokeLease` method on the `EvalContext`. Once a lease is revoked, the side-transport will no longer be able to advance the closed timestamp under it. This was made possible by #59086 and was suggested by @tbg during the code review. We generally like to keep replica state changes out of "admin" requests themselves, which are intended to coordinate changes through individual non-admin requests. Admin requests generally don't even need to evaluate on a leaseholder (though they try to), so having them perform any state changes is fragile. For range merges, this commit removes the `MaybeWatchForMerge` flag from the `LocalResult` returned by `SubsumeRequest` and replaces it with a `WatchForMerge` method on the `EvalContext`. This allows the `SubsumeRequest` to launch the range merge watcher goroutine during it evaluation, which the side-transport checks for to see whether a leaseholder can advance its closed timestamp. In doing so, the `SubsumeRequest` is able to pause closed timestamps when it wants and is also able to observe and return the maximum closed timestamp _after_ the closed timestamp has stopped advancing. This is a stark improvement over the approach with the original closed timestamp system, which required a herculean effort in #50265 to make correct. With these refactors complete, the closed timestamp side-transport should have a much easier and safer time checking whether a given leaseholder is able to advance its closed timestamp. Release justification: Necessary for the safety of new functionality. --- pkg/kv/kvnemesis/validator.go | 4 +- pkg/kv/kvserver/batcheval/cmd_lease_test.go | 27 ++++++++---- .../kvserver/batcheval/cmd_lease_transfer.go | 33 +++++++++++--- pkg/kv/kvserver/batcheval/cmd_subsume.go | 14 ++++-- pkg/kv/kvserver/batcheval/eval_context.go | 16 +++++++ pkg/kv/kvserver/batcheval/result/BUILD.bazel | 1 - pkg/kv/kvserver/batcheval/result/result.go | 8 +--- pkg/kv/kvserver/replica.go | 44 ++++++++++++++----- pkg/kv/kvserver/replica_eval_context_span.go | 11 +++++ pkg/kv/kvserver/replica_learner_test.go | 8 ++-- pkg/kv/kvserver/replica_proposal.go | 5 +-- pkg/kv/kvserver/replica_range_lease.go | 33 +++++--------- pkg/kv/kvserver/replica_read.go | 10 ----- pkg/kv/kvserver/replica_test.go | 2 +- 14 files changed, 138 insertions(+), 78 deletions(-) diff --git a/pkg/kv/kvnemesis/validator.go b/pkg/kv/kvnemesis/validator.go index 108d1f90ec2a..5ad450fd5e7c 100644 --- a/pkg/kv/kvnemesis/validator.go +++ b/pkg/kv/kvnemesis/validator.go @@ -339,12 +339,10 @@ func (v *validator) processOp(txnID *string, op Operation) { v.failIfError(op, t.Result) } case *TransferLeaseOperation: - if resultIsErrorStr(t.Result, `cannot transfer lease to replica of type (VOTER_INCOMING|VOTER_OUTGOING|VOTER_DEMOTING|LEARNER|NON_VOTER)`) { + if resultIsErrorStr(t.Result, `replica cannot hold lease`) { // Only VOTER_FULL replicas can currently hold a range lease. // Attempts to transfer to lease to any other replica type are // rejected. - } else if resultIsErrorStr(t.Result, `replica cannot hold lease`) { - // Same case as above, but with a slightly different message. } else if resultIsErrorStr(t.Result, `unable to find store \d+ in range`) { // A lease transfer that races with a replica removal may find that // the store it was targeting is no longer part of the range. diff --git a/pkg/kv/kvserver/batcheval/cmd_lease_test.go b/pkg/kv/kvserver/batcheval/cmd_lease_test.go index 1e4bfdc7eaaf..d5cd5efa023e 100644 --- a/pkg/kv/kvserver/batcheval/cmd_lease_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_lease_test.go @@ -190,6 +190,10 @@ func TestLeaseTransferForwardsStartTime(t *testing.T) { manual := hlc.NewManualClock(123) clock := hlc.NewClock(manual.UnixNano, time.Nanosecond) + prevLease := roachpb.Lease{ + Replica: replicas[0], + Sequence: 1, + } nextLease := roachpb.Lease{ Replica: replicas[1], Start: clock.NowAsClockTimestamp(), @@ -209,16 +213,19 @@ func TestLeaseTransferForwardsStartTime(t *testing.T) { } currentReadSummary := rspb.FromTimestamp(maxPriorReadTS) + evalCtx := &MockEvalCtx{ + ClusterSettings: cluster.MakeTestingClusterSettings(), + StoreID: 1, + Desc: &desc, + Clock: clock, + Lease: prevLease, + CurrentReadSummary: currentReadSummary, + } cArgs := CommandArgs{ - EvalCtx: (&MockEvalCtx{ - ClusterSettings: cluster.MakeTestingClusterSettings(), - StoreID: 1, - Desc: &desc, - Clock: clock, - CurrentReadSummary: currentReadSummary, - }).EvalContext(), + EvalCtx: evalCtx.EvalContext(), Args: &roachpb.TransferLeaseRequest{ - Lease: nextLease, + Lease: nextLease, + PrevLease: prevLease, }, } @@ -233,6 +240,10 @@ func TestLeaseTransferForwardsStartTime(t *testing.T) { require.NotNil(t, propLease) require.True(t, nextLease.Start.Less(propLease.Start)) require.True(t, beforeEval.Less(propLease.Start)) + require.Equal(t, prevLease.Sequence+1, propLease.Sequence) + + // The previous lease should have been revoked. + require.Equal(t, prevLease.Sequence, evalCtx.RevokedLeaseSeq) // The prior read summary should reflect the maximum read times // served under the current leaseholder. diff --git a/pkg/kv/kvserver/batcheval/cmd_lease_transfer.go b/pkg/kv/kvserver/batcheval/cmd_lease_transfer.go index d744aa79d61d..bb59285f73c0 100644 --- a/pkg/kv/kvserver/batcheval/cmd_lease_transfer.go +++ b/pkg/kv/kvserver/batcheval/cmd_lease_transfer.go @@ -77,6 +77,33 @@ func TransferLease( newLease.Start.Forward(cArgs.EvalCtx.Clock().NowAsClockTimestamp()) args.Lease = roachpb.Lease{} // prevent accidental use below + // If this check is removed at some point, the filtering of learners on the + // sending side would have to be removed as well. + if err := roachpb.CheckCanReceiveLease(newLease.Replica, cArgs.EvalCtx.Desc()); err != nil { + return newFailedLeaseTrigger(true /* isTransfer */), err + } + + // Stop using the current lease. All future calls to leaseStatus on this + // node with the current lease will now return a PROSCRIBED status. This + // includes calls to leaseStatus from the closed timestamp side-transport, + // meaning that the following call to GetCurrentReadSummary is guaranteed to + // observe the highest closed timestamp published under this lease. + // + // We perform this action during evaluation to ensure that the lease + // revocation takes place regardless of whether the corresponding Raft + // proposal succeeds, fails, or is ambiguous - in which case there's no + // guarantee that the transfer will not still apply. This means that if the + // proposal fails, we'll have relinquished the current lease but not managed + // to give the lease to someone else, so we'll have to re-acquire the lease + // again through a RequestLease request to recover. This situation is tested + // in TestBehaviorDuringLeaseTransfer/transferSucceeds=false. + // + // NOTE: RevokeLease will be a no-op if the lease has already changed. In + // such cases, we could detect that here and fail fast, but it's safe and + // easier to just let the TransferLease be proposed under the wrong lease + // and be rejected with the correct error below Raft. + cArgs.EvalCtx.RevokeLease(ctx, args.PrevLease.Sequence) + // Collect a read summary from the outgoing leaseholder to ship to the // incoming leaseholder. This is used to instruct the new leaseholder on how // to update its timestamp cache to ensure that no future writes are allowed @@ -94,12 +121,6 @@ func TransferLease( // think about. priorReadSum.Merge(rspb.FromTimestamp(newLease.Start.ToTimestamp())) - // If this check is removed at some point, the filtering of learners on the - // sending side would have to be removed as well. - if err := roachpb.CheckCanReceiveLease(newLease.Replica, cArgs.EvalCtx.Desc()); err != nil { - return newFailedLeaseTrigger(true /* isTransfer */), err - } - log.VEventf(ctx, 2, "lease transfer: prev lease: %+v, new lease: %+v", prevLease, newLease) return evalNewLease(ctx, cArgs.EvalCtx, readWriter, cArgs.Stats, newLease, prevLease, &priorReadSum, false /* isExtension */, true /* isTransfer */) diff --git a/pkg/kv/kvserver/batcheval/cmd_subsume.go b/pkg/kv/kvserver/batcheval/cmd_subsume.go index c278afe57eb2..18cb3c5a5dd5 100644 --- a/pkg/kv/kvserver/batcheval/cmd_subsume.go +++ b/pkg/kv/kvserver/batcheval/cmd_subsume.go @@ -115,6 +115,16 @@ func Subsume( // but the check is too expensive as it would involve a network roundtrip on // most nodes. + // Freeze the range. Do so by blocking all requests while a newly launched + // async goroutine watches (pushes with low priority) the merge transaction. + // This will also block the closed timestamp side-transport from closing new + // timestamps, meaning that the following call to GetCurrentReadSummary is + // guaranteed to observe the highest closed timestamp ever published by this + // range (if the merge eventually completes). + if err := cArgs.EvalCtx.WatchForMerge(ctx); err != nil { + return result.Result{}, errors.Wrap(err, "watching for merge during subsume") + } + // We prevent followers of the RHS from being able to serve follower reads on // timestamps that fall in the timestamp window representing the range's // subsumed state (i.e. between the subsumption time (FreezeStart) and the @@ -189,7 +199,5 @@ func Subsume( // being merged. reply.ClosedTimestamp = closedTS - return result.Result{ - Local: result.LocalResult{MaybeWatchForMerge: true}, - }, nil + return result.Result{}, nil } diff --git a/pkg/kv/kvserver/batcheval/eval_context.go b/pkg/kv/kvserver/batcheval/eval_context.go index 80b10b541c37..771a25cf9572 100644 --- a/pkg/kv/kvserver/batcheval/eval_context.go +++ b/pkg/kv/kvserver/batcheval/eval_context.go @@ -115,6 +115,15 @@ type EvalContext interface { GetExternalStorage(ctx context.Context, dest roachpb.ExternalStorage) (cloud.ExternalStorage, error) GetExternalStorageFromURI(ctx context.Context, uri string, user security.SQLUsername) (cloud.ExternalStorage, error) + + // RevokeLease stops the replica from using its current lease, if that lease + // matches the provided lease sequence. All future calls to leaseStatus on + // this node with the current lease will now return a PROSCRIBED status. + RevokeLease(context.Context, roachpb.LeaseSequence) + + // WatchForMerge arranges to block all requests until the in-progress merge + // completes. Returns an error if no in-progress merge is detected. + WatchForMerge(ctx context.Context) error } // MockEvalCtx is a dummy implementation of EvalContext for testing purposes. @@ -132,6 +141,7 @@ type MockEvalCtx struct { CanCreateTxn func() (bool, hlc.Timestamp, roachpb.TransactionAbortedReason) Lease roachpb.Lease CurrentReadSummary rspb.ReadSummary + RevokedLeaseSeq roachpb.LeaseSequence } // EvalContext returns the MockEvalCtx as an EvalContext. It will reflect future @@ -242,3 +252,9 @@ func (m *mockEvalCtxImpl) GetExternalStorageFromURI( ) (cloud.ExternalStorage, error) { panic("unimplemented") } +func (m *mockEvalCtxImpl) RevokeLease(_ context.Context, seq roachpb.LeaseSequence) { + m.RevokedLeaseSeq = seq +} +func (m *mockEvalCtxImpl) WatchForMerge(ctx context.Context) error { + panic("unimplemented") +} diff --git a/pkg/kv/kvserver/batcheval/result/BUILD.bazel b/pkg/kv/kvserver/batcheval/result/BUILD.bazel index 83fd2b90b2eb..57423b77f80b 100644 --- a/pkg/kv/kvserver/batcheval/result/BUILD.bazel +++ b/pkg/kv/kvserver/batcheval/result/BUILD.bazel @@ -13,7 +13,6 @@ go_library( "//pkg/kv/kvserver/concurrency/lock", "//pkg/kv/kvserver/kvserverpb", "//pkg/roachpb", - "//pkg/util/hlc", "//pkg/util/log", "@com_github_cockroachdb_errors//:errors", "@com_github_kr_pretty//:pretty", diff --git a/pkg/kv/kvserver/batcheval/result/result.go b/pkg/kv/kvserver/batcheval/result/result.go index 448bee0f2630..3ac37d887c40 100644 --- a/pkg/kv/kvserver/batcheval/result/result.go +++ b/pkg/kv/kvserver/batcheval/result/result.go @@ -63,8 +63,6 @@ type LocalResult struct { MaybeAddToSplitQueue bool // Call MaybeGossipNodeLiveness with the specified Span, if set. MaybeGossipNodeLiveness *roachpb.Span - // Call maybeWatchForMerge. - MaybeWatchForMerge bool // Metrics contains counters which are to be passed to the // metrics subsystem. @@ -84,7 +82,6 @@ func (lResult *LocalResult) IsZero() bool { !lResult.MaybeGossipSystemConfig && !lResult.MaybeGossipSystemConfigIfHaveFailure && lResult.MaybeGossipNodeLiveness == nil && - !lResult.MaybeWatchForMerge && lResult.Metrics == nil } @@ -97,13 +94,13 @@ func (lResult *LocalResult) String() string { "#updated txns: %d #end txns: %d, "+ "GossipFirstRange:%t MaybeGossipSystemConfig:%t "+ "MaybeGossipSystemConfigIfHaveFailure:%t MaybeAddToSplitQueue:%t "+ - "MaybeGossipNodeLiveness:%s MaybeWatchForMerge:%t", + "MaybeGossipNodeLiveness:%s ", lResult.Reply, len(lResult.EncounteredIntents), len(lResult.AcquiredLocks), len(lResult.ResolvedLocks), len(lResult.UpdatedTxns), len(lResult.EndTxns), lResult.GossipFirstRange, lResult.MaybeGossipSystemConfig, lResult.MaybeGossipSystemConfigIfHaveFailure, lResult.MaybeAddToSplitQueue, - lResult.MaybeGossipNodeLiveness, lResult.MaybeWatchForMerge) + lResult.MaybeGossipNodeLiveness) } // DetachEncounteredIntents returns (and removes) those encountered @@ -344,7 +341,6 @@ func (p *Result) MergeAndDestroy(q Result) error { coalesceBool(&p.Local.MaybeGossipSystemConfig, &q.Local.MaybeGossipSystemConfig) coalesceBool(&p.Local.MaybeGossipSystemConfigIfHaveFailure, &q.Local.MaybeGossipSystemConfigIfHaveFailure) coalesceBool(&p.Local.MaybeAddToSplitQueue, &q.Local.MaybeAddToSplitQueue) - coalesceBool(&p.Local.MaybeWatchForMerge, &q.Local.MaybeWatchForMerge) if p.Local.Metrics == nil { p.Local.Metrics = q.Local.Metrics diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index a4cad6e40f32..13368fe4e44d 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -1444,31 +1444,53 @@ func (r *Replica) isNewerThanSplitRLocked(split *roachpb.SplitTrigger) bool { r.mu.replicaID > rightDesc.ReplicaID } -// maybeWatchForMerge checks whether a merge of this replica into its left +// WatchForMerge is like maybeWatchForMergeLocked, except it expects a merge to +// be in progress and returns an error if one is not. +// +// See docs/tech-notes/range-merges.md. +func (r *Replica) WatchForMerge(ctx context.Context) error { + ok, err := r.maybeWatchForMerge(ctx) + if err != nil { + return err + } else if !ok { + return errors.AssertionFailedf("range merge unexpectedly not in-progress") + } + return nil +} + +// maybeWatchForMergeLocked checks whether a merge of this replica into its left // neighbor is in its critical phase and, if so, arranges to block all requests -// until the merge completes. -func (r *Replica) maybeWatchForMerge(ctx context.Context) error { +// until the merge completes. Returns a boolean indicating whether a merge was +// found to be in progress. +// +// See docs/tech-notes/range-merges.md. +func (r *Replica) maybeWatchForMerge(ctx context.Context) (bool, error) { r.mu.Lock() defer r.mu.Unlock() return r.maybeWatchForMergeLocked(ctx) } -func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) error { +func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) (bool, error) { + // Checking for a deletion intent on the local range descriptor, which + // indicates that a merge is in progress and this range is currently in its + // critical phase of being subsumed by its left-hand side neighbor. Read + // inconsistently at the maximum timestamp to ensure that we see an intent + // if one exists, regardless of what timestamp it is written at. desc := r.descRLocked() descKey := keys.RangeDescriptorKey(desc.StartKey) - _, intent, err := storage.MVCCGet(ctx, r.Engine(), descKey, r.Clock().Now(), + _, intent, err := storage.MVCCGet(ctx, r.Engine(), descKey, hlc.MaxTimestamp, storage.MVCCGetOptions{Inconsistent: true}) if err != nil { - return err + return false, err } else if intent == nil { - return nil + return false, nil } val, _, err := storage.MVCCGetAsTxn( ctx, r.Engine(), descKey, intent.Txn.WriteTimestamp, intent.Txn) if err != nil { - return err + return false, err } else if val != nil { - return nil + return false, nil } // At this point, we know we have a deletion intent on our range descriptor. @@ -1481,7 +1503,7 @@ func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) error { // Another request already noticed the merge, installed a mergeComplete // channel, and launched a goroutine to watch for the merge's completion. // Nothing more to do. - return nil + return true, nil } r.mu.mergeComplete = mergeCompleteCh // The RHS of a merge is not permitted to quiesce while a mergeComplete @@ -1612,7 +1634,7 @@ func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) error { // requests will get dropped and retried on another node. Suppress the error. err = nil } - return err + return true, err } // maybeTransferRaftLeadershipToLeaseholderLocked attempts to transfer the diff --git a/pkg/kv/kvserver/replica_eval_context_span.go b/pkg/kv/kvserver/replica_eval_context_span.go index 7ea060d51333..93e7e97e8b9c 100644 --- a/pkg/kv/kvserver/replica_eval_context_span.go +++ b/pkg/kv/kvserver/replica_eval_context_span.go @@ -246,3 +246,14 @@ func (rec *SpanSetReplicaEvalContext) GetExternalStorageFromURI( ) (cloud.ExternalStorage, error) { return rec.i.GetExternalStorageFromURI(ctx, uri, user) } + +// RevokeLease stops the replica from using its current lease. +func (rec *SpanSetReplicaEvalContext) RevokeLease(ctx context.Context, seq roachpb.LeaseSequence) { + rec.i.RevokeLease(ctx, seq) +} + +// WatchForMerge arranges to block all requests until the in-progress merge +// completes. +func (rec *SpanSetReplicaEvalContext) WatchForMerge(ctx context.Context) error { + return rec.i.WatchForMerge(ctx) +} diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index 42b63760bb52..a5d8ff40f7db 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -735,8 +735,9 @@ func TestLearnerNoAcceptLease(t *testing.T) { desc := tc.LookupRangeOrFatal(t, scratchStartKey) err := tc.TransferRangeLease(desc, tc.Target(1)) - if !testutils.IsError(err, `cannot transfer lease to replica of type LEARNER`) { - t.Fatalf(`expected "cannot transfer lease to replica of type LEARNER" error got: %+v`, err) + exp := `replica cannot hold lease` + if !testutils.IsError(err, exp) { + t.Fatalf(`expected %q error got: %+v`, exp, err) } } @@ -760,7 +761,7 @@ func TestJointConfigLease(t *testing.T) { require.True(t, desc.Replicas().InAtomicReplicationChange(), desc) err := tc.TransferRangeLease(desc, tc.Target(1)) - exp := `cannot transfer lease to replica of type VOTER_INCOMING` + exp := `replica cannot hold lease` require.True(t, testutils.IsError(err, exp), err) // NB: we don't have to transition out of the previous joint config first @@ -768,7 +769,6 @@ func TestJointConfigLease(t *testing.T) { // it's asked to do. desc = tc.RemoveVotersOrFatal(t, k, tc.Target(1)) err = tc.TransferRangeLease(desc, tc.Target(1)) - exp = `cannot transfer lease to replica of type VOTER_DEMOTING_LEARNER` require.True(t, testutils.IsError(err, exp), err) } diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index f072339edc39..5b25354c6104 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -396,7 +396,7 @@ func (r *Replica) leasePostApplyLocked( // progress, as only the old leaseholder would have been explicitly notified // of the merge. If there is a merge in progress, maybeWatchForMerge will // arrange to block all traffic to this replica unless the merge aborts. - if err := r.maybeWatchForMergeLocked(ctx); err != nil { + if _, err := r.maybeWatchForMergeLocked(ctx); err != nil { // We were unable to determine whether a merge was in progress. We cannot // safely proceed. log.Fatalf(ctx, "failed checking for in-progress merge while installing new lease %s: %s", @@ -623,9 +623,6 @@ func (r *Replica) handleReadWriteLocalEvalResult(ctx context.Context, lResult re if lResult.EndTxns != nil { log.Fatalf(ctx, "LocalEvalResult.EndTxns should be nil: %+v", lResult.EndTxns) } - if lResult.MaybeWatchForMerge { - log.Fatalf(ctx, "LocalEvalResult.MaybeWatchForMerge should be false") - } if lResult.AcquiredLocks != nil { for i := range lResult.AcquiredLocks { diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index 97cfffd64cbe..8557f23eed74 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -785,18 +785,6 @@ func (r *Replica) AdminTransferLease(ctx context.Context, target roachpb.StoreID return nil, nil, errors.Errorf("unable to find store %d in range %+v", target, desc) } - // For now, don't allow replicas of type LEARNER to be leaseholders, see - // comments in RequestLease and TransferLease for why. - // - // TODO(dan): We shouldn't need this, the checks in RequestLease and - // TransferLease are the canonical ones and should be sufficient. Sadly, the - // `r.mu.minLeaseProposedTS = status.Timestamp` line below will likely play - // badly with that. This would be an issue even without learners, but - // omitting this check would make it worse. Fixme. - if t := nextLeaseHolder.GetType(); t != roachpb.VOTER_FULL { - return nil, nil, errors.Errorf(`cannot transfer lease to replica of type %s`, t) - } - if nextLease, ok := r.mu.pendingLeaseRequest.RequestPending(); ok && nextLease.Replica != nextLeaseHolder { repDesc, err := r.getReplicaDescriptorRLocked() @@ -813,15 +801,7 @@ func (r *Replica) AdminTransferLease(ctx context.Context, target roachpb.StoreID return nil, nil, newNotLeaseHolderError(nextLease, r.store.StoreID(), desc, "another transfer to a different store is in progress") } - // Stop using the current lease. All future calls to leaseStatus on this - // node with the current lease will now return a PROSCRIBED status. - // - // TODO(nvanbenschoten): since we aren't pulling the transfer time here - // anymore, we could also move this below latching as well, similar to - // how a Subsume request sets the FreezeStart time and pauses closed - // timestamps during evaluation and communicates this back up using the - // LocalResult. - r.mu.minLeaseProposedTS = now + transfer = r.mu.pendingLeaseRequest.InitOrJoinRequest( ctx, nextLeaseHolder, status, desc.StartKey.AsRawKey(), true, /* transfer */ ) @@ -877,6 +857,17 @@ func (r *Replica) getLeaseRLocked() (roachpb.Lease, roachpb.Lease) { return *r.mu.state.Lease, roachpb.Lease{} } +// RevokeLease stops the replica from using its current lease, if that lease +// matches the provided lease sequence. All future calls to leaseStatus on this +// node with the current lease will now return a PROSCRIBED status. +func (r *Replica) RevokeLease(ctx context.Context, seq roachpb.LeaseSequence) { + r.mu.Lock() + defer r.mu.Unlock() + if r.mu.state.Lease.Sequence == seq { + r.mu.minLeaseProposedTS = r.Clock().NowAsClockTimestamp() + } +} + // newNotLeaseHolderError returns a NotLeaseHolderError initialized with the // replica for the holder (if any) of the given lease. // diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go index 190154442eae..5b77e3caad10 100644 --- a/pkg/kv/kvserver/replica_read.go +++ b/pkg/kv/kvserver/replica_read.go @@ -180,16 +180,6 @@ func (r *Replica) handleReadOnlyLocalEvalResult( lResult.AcquiredLocks = nil } - if lResult.MaybeWatchForMerge { - // A merge is (likely) about to be carried out, and this replica needs - // to block all traffic until the merge either commits or aborts. See - // docs/tech-notes/range-merges.md. - if err := r.maybeWatchForMerge(ctx); err != nil { - return roachpb.NewError(err) - } - lResult.MaybeWatchForMerge = false - } - if !lResult.IsZero() { log.Fatalf(ctx, "unhandled field in LocalEvalResult: %s", pretty.Diff(lResult, result.LocalResult{})) } diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 8a4cb17102cc..6d7fc678e039 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -741,7 +741,7 @@ func TestBehaviorDuringLeaseTransfer(t *testing.T) { return nil } transferSem := make(chan struct{}) - tsc.TestingKnobs.EvalKnobs.TestingEvalFilter = + tsc.TestingKnobs.EvalKnobs.TestingPostEvalFilter = func(filterArgs kvserverbase.FilterArgs) *roachpb.Error { if _, ok := filterArgs.Req.(*roachpb.TransferLeaseRequest); ok { // Notify the test that the transfer has been trapped.