diff --git a/pkg/base/config.go b/pkg/base/config.go index 86e70d7e9a44..4d4ec6b56dd6 100644 --- a/pkg/base/config.go +++ b/pkg/base/config.go @@ -63,6 +63,11 @@ const ( // See https://github.com/cockroachdb/cockroach/issues/20310. DefaultMetricsSampleInterval = 10 * time.Second + // defaultRaftHeartbeatIntervalTicks is the default value for + // RaftHeartbeatIntervalTicks, which determines the number of ticks between + // each heartbeat. + defaultRaftHeartbeatIntervalTicks = 5 + // defaultRPCHeartbeatInterval is the default value of RPCHeartbeatInterval // used by the rpc context. defaultRPCHeartbeatInterval = 3 * time.Second @@ -300,6 +305,9 @@ type RaftConfig struct { // unless overridden. RaftElectionTimeoutTicks int + // RaftHeartbeatIntervalTicks is the number of ticks that pass between heartbeats. + RaftHeartbeatIntervalTicks int + // RangeLeaseRaftElectionTimeoutMultiplier specifies what multiple the leader // lease active duration should be of the raft election timeout. RangeLeaseRaftElectionTimeoutMultiplier float64 @@ -364,6 +372,9 @@ func (cfg *RaftConfig) SetDefaults() { if cfg.RaftElectionTimeoutTicks == 0 { cfg.RaftElectionTimeoutTicks = defaultRaftElectionTimeoutTicks } + if cfg.RaftHeartbeatIntervalTicks == 0 { + cfg.RaftHeartbeatIntervalTicks = defaultRaftHeartbeatIntervalTicks + } if cfg.RangeLeaseRaftElectionTimeoutMultiplier == 0 { cfg.RangeLeaseRaftElectionTimeoutMultiplier = defaultRangeLeaseRaftElectionTimeoutMultiplier } diff --git a/pkg/ccl/backupccl/split_and_scatter_processor.go b/pkg/ccl/backupccl/split_and_scatter_processor.go index 9221fc241db8..a317a47803bd 100644 --- a/pkg/ccl/backupccl/split_and_scatter_processor.go +++ b/pkg/ccl/backupccl/split_and_scatter_processor.go @@ -33,7 +33,7 @@ type splitAndScatterer interface { // splitAndScatterSpan issues a split request at a given key and then scatters // the range around the cluster. It returns the node ID of the leaseholder of // the span after the scatter. - splitAndScatterKey(ctx context.Context, db *kv.DB, kr *storageccl.KeyRewriter, key roachpb.Key) (roachpb.NodeID, error) + splitAndScatterKey(ctx context.Context, db *kv.DB, kr *storageccl.KeyRewriter, key roachpb.Key, randomizeLeases bool) (roachpb.NodeID, error) } // dbSplitAndScatter is the production implementation of this processor's @@ -46,7 +46,7 @@ type dbSplitAndScatterer struct{} // to which the span was scattered. If the destination node could not be // determined, node ID of 0 is returned. func (s dbSplitAndScatterer) splitAndScatterKey( - ctx context.Context, db *kv.DB, kr *storageccl.KeyRewriter, key roachpb.Key, + ctx context.Context, db *kv.DB, kr *storageccl.KeyRewriter, key roachpb.Key, randomizeLeases bool, ) (roachpb.NodeID, error) { expirationTime := db.Clock().Now().Add(time.Hour.Nanoseconds(), 0) newSpanKey, err := rewriteBackupSpanKey(kr, key) @@ -54,26 +54,32 @@ func (s dbSplitAndScatterer) splitAndScatterKey( return 0, err } - // TODO(dan): Really, this should be splitting the Key of - // the _next_ entry. + // TODO(pbardea): Really, this should be splitting the Key of the _next_ + // entry. log.VEventf(ctx, 1, "presplitting new key %+v", newSpanKey) if err := db.AdminSplit(ctx, newSpanKey, expirationTime); err != nil { return 0, errors.Wrapf(err, "splitting key %s", newSpanKey) } log.VEventf(ctx, 1, "scattering new key %+v", newSpanKey) - var ba roachpb.BatchRequest - ba.Header.ReturnRangeInfo = true - ba.Add(&roachpb.AdminScatterRequest{ + req := &roachpb.AdminScatterRequest{ RequestHeader: roachpb.RequestHeaderFromSpan(roachpb.Span{ Key: newSpanKey, EndKey: newSpanKey.Next(), }), - }) + // This is a bit of a hack, but it seems to be an effective one (see #36665 + // for graphs). As of the commit that added this, scatter is not very good + // at actually balancing leases. This is likely for two reasons: 1) there's + // almost certainly some regression in scatter's behavior, it used to work + // much better and 2) scatter has to operate by balancing leases for all + // ranges in a cluster, but in RESTORE, we really just want it to be + // balancing the span being restored into. + RandomizeLeases: randomizeLeases, + } - br, pErr := db.NonTransactionalSender().Send(ctx, ba) + res, pErr := kv.SendWrapped(ctx, db.NonTransactionalSender(), req) if pErr != nil { - // TODO(dan): Unfortunately, Scatter is still too unreliable to + // TODO(pbardea): Unfortunately, Scatter is still too unreliable to // fail the RESTORE when Scatter fails. I'm uncomfortable that // this could break entirely and not start failing the tests, // but on the bright side, it doesn't affect correctness, only @@ -83,14 +89,23 @@ func (s dbSplitAndScatterer) splitAndScatterKey( return 0, nil } - return s.findDestination(ctx, br), nil + return s.findDestination(res.(*roachpb.AdminScatterResponse)), nil } // findDestination returns the node ID of the node of the destination of the // AdminScatter request. If the destination cannot be found, 0 is returned. -func (s dbSplitAndScatterer) findDestination( - _ context.Context, _ *roachpb.BatchResponse, -) roachpb.NodeID { +func (s dbSplitAndScatterer) findDestination(res *roachpb.AdminScatterResponse) roachpb.NodeID { + // A request from a 20.1 node will not have a RangeInfos with a lease. + // For this mixed-version state, we'll report the destination as node 0 + // and suffer a bit of inefficiency. + if len(res.RangeInfos) > 0 { + // If the lease is not populated, we return the 0 value anyway. We receive 1 + // RangeInfo per range that was scattered. Since we send a scatter request + // to each range that we make, we are only interested in the first range, + // which contains the key at which we're splitting and scattering. + return res.RangeInfos[0].Lease.Replica.NodeID + } + return roachpb.NodeID(0) } @@ -215,7 +230,7 @@ func runSplitAndScatter( g.GoCtx(func(ctx context.Context) error { defer close(importSpanChunksCh) for _, importSpanChunk := range spec.Chunks { - _, err := scatterer.splitAndScatterKey(ctx, db, kr, importSpanChunk.Entries[0].Span.Key) + _, err := scatterer.splitAndScatterKey(ctx, db, kr, importSpanChunk.Entries[0].Span.Key, true /* randomizeLeases */) if err != nil { return err } @@ -229,9 +244,8 @@ func runSplitAndScatter( return nil }) - // TODO(dan): This tries to cover for a bad scatter by having 2 * the number - // of nodes in the cluster. Is it necessary? - // TODO(pbardea): Run some experiments to tune this knob. + // TODO(pbardea): This tries to cover for a bad scatter by having 2 * the + // number of nodes in the cluster. Is it necessary? splitScatterWorkers := 2 for worker := 0; worker < splitScatterWorkers; worker++ { g.GoCtx(func(ctx context.Context) error { @@ -239,7 +253,7 @@ func runSplitAndScatter( log.Infof(ctx, "processing a chunk") for _, importSpan := range importSpanChunk { log.Infof(ctx, "processing a span") - destination, err := scatterer.splitAndScatterKey(ctx, db, kr, importSpan.Span.Key) + destination, err := scatterer.splitAndScatterKey(ctx, db, kr, importSpan.Span.Key, false /* randomizeLeases */) if err != nil { return err } diff --git a/pkg/ccl/backupccl/split_and_scatter_processor_test.go b/pkg/ccl/backupccl/split_and_scatter_processor_test.go index 4c158afdc4bb..faefdb75840e 100644 --- a/pkg/ccl/backupccl/split_and_scatter_processor_test.go +++ b/pkg/ccl/backupccl/split_and_scatter_processor_test.go @@ -40,7 +40,7 @@ type mockScatterer struct { // This mock implementation of the split and scatterer simulates a scattering of // ranges. func (s *mockScatterer) splitAndScatterKey( - _ context.Context, _ *kv.DB, _ *storageccl.KeyRewriter, _ roachpb.Key, + _ context.Context, _ *kv.DB, _ *storageccl.KeyRewriter, _ roachpb.Key, _ bool, ) (roachpb.NodeID, error) { s.Lock() defer s.Unlock() diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index c578fd28b89a..ea0074aa7505 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -141,18 +141,18 @@ Output build version information. RunE: func(cmd *cobra.Command, args []string) error { info := build.GetInfo() tw := tabwriter.NewWriter(os.Stdout, 2, 1, 2, ' ', 0) - fmt.Fprintf(tw, "Build Tag: %s\n", info.Tag) - fmt.Fprintf(tw, "Build Time: %s\n", info.Time) - fmt.Fprintf(tw, "Distribution: %s\n", info.Distribution) - fmt.Fprintf(tw, "Platform: %s", info.Platform) + fmt.Fprintf(tw, "Build Tag: %s\n", info.Tag) + fmt.Fprintf(tw, "Build Time: %s\n", info.Time) + fmt.Fprintf(tw, "Distribution: %s\n", info.Distribution) + fmt.Fprintf(tw, "Platform: %s", info.Platform) if info.CgoTargetTriple != "" { fmt.Fprintf(tw, " (%s)", info.CgoTargetTriple) } fmt.Fprintln(tw) - fmt.Fprintf(tw, "Go Version: %s\n", info.GoVersion) - fmt.Fprintf(tw, "C Compiler: %s\n", info.CgoCompiler) - fmt.Fprintf(tw, "Build SHA-1: %s\n", info.Revision) - fmt.Fprintf(tw, "Build Type: %s\n", info.Type) + fmt.Fprintf(tw, "Go Version: %s\n", info.GoVersion) + fmt.Fprintf(tw, "C Compiler: %s\n", info.CgoCompiler) + fmt.Fprintf(tw, "Build Commit ID: %s\n", info.Revision) + fmt.Fprintf(tw, "Build Type: %s\n", info.Type) return tw.Flush() }, } diff --git a/pkg/kv/kvserver/batcheval/cmd_compute_checksum.go b/pkg/kv/kvserver/batcheval/cmd_compute_checksum.go index 2028f06f76b4..b96db6cc14c9 100644 --- a/pkg/kv/kvserver/batcheval/cmd_compute_checksum.go +++ b/pkg/kv/kvserver/batcheval/cmd_compute_checksum.go @@ -14,6 +14,7 @@ import ( "context" "time" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" @@ -27,11 +28,22 @@ func init() { } func declareKeysComputeChecksum( - _ *roachpb.RangeDescriptor, _ roachpb.Header, _ roachpb.Request, _, _ *spanset.SpanSet, + desc *roachpb.RangeDescriptor, + _ roachpb.Header, + _ roachpb.Request, + latchSpans, _ *spanset.SpanSet, ) { - // Intentionally declare no keys, as ComputeChecksum does not need to be - // serialized with any other commands. It simply needs to be committed into - // the Raft log. + // The correctness of range merges depends on the lease applied index of a + // range not being bumped while the RHS is subsumed. ComputeChecksum bumps a + // range's LAI and thus needs to be serialized with Subsume requests, in order + // prevent a rare closed timestamp violation due to writes on the post-merged + // range that violate a closed timestamp spuriously reported by the pre-merged + // range. This can, in turn, lead to a serializability violation. See comment + // at the end of Subsume() in cmd_subsume.go for details. Thus, it must + // declare access over at least one key. We choose to declare read-only access + // over the range descriptor key. + rdKey := keys.RangeDescriptorKey(desc.StartKey) + latchSpans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: rdKey}) } // Version numbers for Replica checksum computation. Requests silently no-op diff --git a/pkg/kv/kvserver/batcheval/cmd_subsume.go b/pkg/kv/kvserver/batcheval/cmd_subsume.go index 9435a9656f87..b0f6d0246f8e 100644 --- a/pkg/kv/kvserver/batcheval/cmd_subsume.go +++ b/pkg/kv/kvserver/batcheval/cmd_subsume.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" @@ -125,13 +126,50 @@ func Subsume( return result.Result{}, errors.New("non-deletion intent on local range descriptor") } + // We prevent followers of the RHS from being able to serve follower reads on + // timestamps that fall in the timestamp window representing the range's + // subsumed state (i.e. between the subsumption time (FreezeStart) and the + // timestamp at which the merge transaction commits or aborts), by requiring + // follower replicas to catch up to an MLAI that succeeds the range's current + // LeaseAppliedIndex (note that we're tracking lai + 1 below instead of lai). + // In case the merge successfully commits, this MLAI will never be caught up + // to since the RHS will be destroyed. In case the merge aborts, this ensures + // that the followers can only activate the newer closed timestamps once they + // catch up to the LAI associated with the merge abort. We need to do this + // because the closed timestamps that are broadcast by RHS in this subsumed + // state are not going to be reflected in the timestamp cache of the LHS range + // after the merge, which can cause a serializability violation. + // + // Note that we are essentially lying to the closed timestamp tracker here in + // order to achieve the effect of unactionable closed timestamp updates until + // the merge concludes. Tracking lai + 1 here ensures that the follower + // replicas need to catch up to at least that index before they are able to + // activate _any of the closed timestamps from this point onwards_. In other + // words, we will never publish a closed timestamp update for this range below + // this lai, regardless of whether a different proposal untracks a lower lai + // at any point in the future. + // + // NB: The above statement relies on the invariant that the LAI that follows a + // Subsume request will be applied only after the merge aborts. More + // specifically, this means that no intervening request can bump the LAI of + // range while it is subsumed. This invariant is upheld because the only Raft + // proposals allowed after a range has been subsumed are lease requests, which + // do not bump the LAI. In case there is lease transfer on this range while it + // is subsumed, we ensure that the initial MLAI update broadcast by the new + // leaseholder respects the invariant in question, in much the same way we do + // here. Take a look at `EmitMLAI()` in replica_closedts.go for more details. + _, untrack := cArgs.EvalCtx.GetTracker().Track(ctx) + lease, _ := cArgs.EvalCtx.GetLease() + lai := cArgs.EvalCtx.GetLeaseAppliedIndex() + untrack(ctx, ctpb.Epoch(lease.Epoch), desc.RangeID, ctpb.LAI(lai+1)) + // NOTE: the deletion intent on the range's meta2 descriptor is just as // important to correctness as the deletion intent on the local descriptor, // but the check is too expensive as it would involve a network roundtrip on // most nodes. reply.MVCCStats = cArgs.EvalCtx.GetMVCCStats() - reply.LeaseAppliedIndex = cArgs.EvalCtx.GetLeaseAppliedIndex() + reply.LeaseAppliedIndex = lai reply.FreezeStart = cArgs.EvalCtx.Clock().Now() return result.Result{ diff --git a/pkg/kv/kvserver/batcheval/cmd_subsume_test.go b/pkg/kv/kvserver/batcheval/cmd_subsume_test.go new file mode 100644 index 000000000000..1ce40c608796 --- /dev/null +++ b/pkg/kv/kvserver/batcheval/cmd_subsume_test.go @@ -0,0 +1,76 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package batcheval + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/uuid" +) + +// TestRequestsSerializeWithSubsume ensures that no request can be evaluated +// concurrently with a Subsume request. For more details, refer to the big +// comment block at the end of Subsume() in cmd_subsume.go. +// +// NB: This test is broader than it really needs to be. A more precise statement +// of the condition necessary to uphold the invariant mentioned in Subsume() is: +// No request that bumps the lease applied index of a range can be evaluated +// concurrently with a Subsume request. +func TestRequestsSerializeWithSubsume(t *testing.T) { + defer leaktest.AfterTest(t)() + var subsumeLatchSpans, subsumeLockSpans, otherLatchSpans, otherLockSpans spanset.SpanSet + startKey := []byte(`a`) + endKey := []byte(`b`) + desc := &roachpb.RangeDescriptor{ + RangeID: 0, + StartKey: startKey, + EndKey: endKey, + } + testTxn := &roachpb.Transaction{ + TxnMeta: enginepb.TxnMeta{ + ID: uuid.FastMakeV4(), + Key: startKey, + WriteTimestamp: hlc.Timestamp{WallTime: 1}, + }, + Name: "test txn", + } + header := roachpb.Header{Txn: testTxn} + subsumeRequest := &roachpb.SubsumeRequest{RightDesc: *desc} + declareKeysSubsume(desc, header, subsumeRequest, &subsumeLatchSpans, &subsumeLockSpans) + for method, command := range cmds { + t.Run(method.String(), func(t *testing.T) { + otherRequest := roachpb.CreateRequest(method) + if queryTxnReq, ok := otherRequest.(*roachpb.QueryTxnRequest); ok { + // QueryTxnRequest declares read-only access over the txn record of the txn + // it is supposed to query and not the txn that sent it. We fill this Txn + // field in here to prevent it from being nil and leading to the txn key + // falling outside our test range's keyspace. + queryTxnReq.Txn = testTxn.TxnMeta + } + + otherRequest.SetHeader(roachpb.RequestHeader{ + Key: startKey, + EndKey: endKey, + Sequence: 0, + }) + + command.DeclareKeys(desc, header, otherRequest, &otherLatchSpans, &otherLockSpans) + if !subsumeLatchSpans.Intersects(&otherLatchSpans) { + t.Errorf("%s does not serialize with Subsume", method) + } + }) + } +} diff --git a/pkg/kv/kvserver/batcheval/eval_context.go b/pkg/kv/kvserver/batcheval/eval_context.go index 5d2bbbbbb230..17417dcd96d9 100644 --- a/pkg/kv/kvserver/batcheval/eval_context.go +++ b/pkg/kv/kvserver/batcheval/eval_context.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -65,6 +66,7 @@ type EvalContext interface { GetFirstIndex() (uint64, error) GetTerm(uint64) (uint64, error) GetLeaseAppliedIndex() uint64 + GetTracker() closedts.TrackerI Desc() *roachpb.RangeDescriptor ContainsKey(key roachpb.Key) bool @@ -178,6 +180,9 @@ func (m *mockEvalCtxImpl) GetTerm(uint64) (uint64, error) { func (m *mockEvalCtxImpl) GetLeaseAppliedIndex() uint64 { panic("unimplemented") } +func (m *mockEvalCtxImpl) GetTracker() closedts.TrackerI { + panic("unimplemented") +} func (m *mockEvalCtxImpl) Desc() *roachpb.RangeDescriptor { return m.MockEvalCtx.Desc } diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index ad704a7ce425..0fda10e841b4 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -1330,6 +1330,19 @@ func TestStoreRangeMergeSplitRace_SplitWins(t *testing.T) { } } +func checkConsistencyArgs(desc *roachpb.RangeDescriptor) *roachpb.CheckConsistencyRequest { + return &roachpb.CheckConsistencyRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: desc.StartKey.AsRawKey(), + EndKey: desc.EndKey.AsRawKey(), + }, + WithDiff: false, + Mode: 1, + Checkpoint: false, + Terminate: nil, + } +} + // TestStoreRangeMergeRHSLeaseExpiration verifies that, if the right-hand range // in a merge loses its lease while a merge is in progress, the new leaseholder // does not incorrectly serve traffic before the merge completes. @@ -1527,6 +1540,96 @@ func TestStoreRangeMergeRHSLeaseExpiration(t *testing.T) { } } +// TestStoreRangeMergeCheckConsistencyAfterSubsumption verifies the the following: +// 1. While a range is subsumed, ComputeChecksum requests wait until the merge +// is complete before proceeding. +// 2. Once a merge is aborted, pending (and future) requests will be allowed to +// be proposed. An assertion at the end of Replica.propose() ensures that the +// lease applied index of a range cannot be bumped while it is subsumed. A large +// comment block at the end of Subsume() in cmd_subsume.go explains the hazard +// in detail. This test is meant as a sanity check for this assertion. +func TestStoreRangeMergeCheckConsistencyAfterSubsumption(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + storeCfg := kvserver.TestStoreConfig(nil) + storeCfg.TestingKnobs.DisableReplicateQueue = true + storeCfg.TestingKnobs.DisableMergeQueue = true + + // Install a hook to control when the merge transaction aborts. + mergeEndTxnReceived := make(chan *roachpb.Transaction, 10) // headroom in case the merge transaction retries + abortMergeTxn := make(chan struct{}) + storeCfg.TestingKnobs.TestingRequestFilter = func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error { + for _, r := range ba.Requests { + if et := r.GetEndTxn(); et != nil && et.InternalCommitTrigger.GetMergeTrigger() != nil { + mergeEndTxnReceived <- ba.Txn + <-abortMergeTxn + return &roachpb.Error{ + Message: "abort the merge for test", + } + } + } + return nil + } + + mtc := &multiTestContext{ + storeConfig: &storeCfg, + startWithSingleRange: true, + } + + mtc.Start(t, 2) + defer mtc.Stop() + + // Create the ranges to be merged. Put both ranges on both stores, but give + // the second store the lease on the RHS. + lhsDesc, rhsDesc, err := createSplitRanges(ctx, mtc.stores[0]) + if err != nil { + t.Fatal(err) + } + mtc.replicateRange(lhsDesc.RangeID, 1) + mtc.replicateRange(rhsDesc.RangeID, 1) + mtc.transferLease(ctx, rhsDesc.RangeID, 0, 1) + + // Launch the merge. + mergeErr := make(chan *roachpb.Error) + go func() { + args := adminMergeArgs(lhsDesc.StartKey.AsRawKey()) + _, pErr := kv.SendWrapped(ctx, mtc.stores[0].TestSender(), args) + mergeErr <- pErr + }() + + // Wait for the merge transaction to send its EndTxn request. It won't + // be able to complete just yet, thanks to the hook we installed above. + <-mergeEndTxnReceived + + checkConsistencyResp := make(chan interface{}) + go func() { + args := checkConsistencyArgs(rhsDesc) + _, pErr := kv.SendWrapped(ctx, mtc.stores[1].TestSender(), args) + checkConsistencyResp <- pErr + }() + + select { + case <-checkConsistencyResp: + t.Fatalf("expected the consistency check to wait until the merge was complete") + case <-time.After(1 * time.Second): + } + + // Let the merge abort, and then ensure that the consistency check + // successfully goes through. + close(abortMergeTxn) + + pErr := <-mergeErr + require.IsType(t, &roachpb.Error{}, pErr) + require.Regexp(t, "abort the merge for test", pErr.Message) + + testutils.SucceedsSoon(t, func() error { + pErr := <-checkConsistencyResp + require.Nil(t, pErr) + return nil + }) +} + // TestStoreRangeMergeConcurrentRequests tests merging ranges that are serving // other traffic concurrently. func TestStoreRangeMergeConcurrentRequests(t *testing.T) { diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index c57c491b2f41..efb8adbf2c88 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -1120,15 +1120,16 @@ func TestLeaseNotUsedAfterRestart(t *testing.T) { // below to trap any lease request and infer that it refers to the range we're // interested in. sc.TestingKnobs.DisableSplitQueue = true - sc.TestingKnobs.LeaseRequestEvent = func(ts hlc.Timestamp) { + sc.TestingKnobs.LeaseRequestEvent = func(ts hlc.Timestamp, _ roachpb.StoreID, _ roachpb.RangeID) *roachpb.Error { val := leaseAcquisitionTrap.Load() if val == nil { - return + return nil } trapCallback := val.(func(ts hlc.Timestamp)) if trapCallback != nil { trapCallback(ts) } + return nil } mtc := &multiTestContext{storeConfig: &sc} defer mtc.Stop() diff --git a/pkg/kv/kvserver/closed_timestamp_test.go b/pkg/kv/kvserver/closed_timestamp_test.go index 0deeac831910..fbdc5d888327 100644 --- a/pkg/kv/kvserver/closed_timestamp_test.go +++ b/pkg/kv/kvserver/closed_timestamp_test.go @@ -15,7 +15,9 @@ import ( gosql "database/sql" "fmt" "math/rand" + "reflect" "strconv" + "sync" "sync/atomic" "testing" "time" @@ -25,12 +27,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -38,11 +42,21 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" ) +var aggressiveResolvedTimestampClusterArgs = base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: aggressiveResolvedTimestampPushKnobs(), + }, + }, +} + func TestClosedTimestampCanServe(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -53,7 +67,8 @@ func TestClosedTimestampCanServe(t *testing.T) { skip.UnderRace(t) ctx := context.Background() - tc, db0, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t, testingTargetDuration) + tc, db0, desc, repls := setupClusterForClosedTimestampTesting(ctx, t, testingTargetDuration, + testingCloseFraction, aggressiveResolvedTimestampClusterArgs) defer tc.Stopper().Stop(ctx) if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { @@ -113,7 +128,8 @@ func TestClosedTimestampCanServeThroughoutLeaseTransfer(t *testing.T) { skip.UnderRace(t) ctx := context.Background() - tc, db0, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t, testingTargetDuration) + tc, db0, desc, repls := setupClusterForClosedTimestampTesting(ctx, t, testingTargetDuration, + testingCloseFraction, aggressiveResolvedTimestampClusterArgs) defer tc.Stopper().Stop(ctx) if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { @@ -185,7 +201,8 @@ func TestClosedTimestampCanServeWithConflictingIntent(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - tc, _, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t, testingTargetDuration) + tc, _, desc, repls := setupClusterForClosedTimestampTesting(ctx, t, testingTargetDuration, + testingCloseFraction, aggressiveResolvedTimestampClusterArgs) defer tc.Stopper().Stop(ctx) ds := tc.Server(0).DistSenderI().(*kvcoord.DistSender) @@ -266,7 +283,8 @@ func TestClosedTimestampCanServeAfterSplitAndMerges(t *testing.T) { skip.UnderRace(t) ctx := context.Background() - tc, db0, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t, testingTargetDuration) + tc, db0, desc, repls := setupClusterForClosedTimestampTesting(ctx, t, testingTargetDuration, + testingCloseFraction, aggressiveResolvedTimestampClusterArgs) // Disable the automatic merging. if _, err := db0.Exec("SET CLUSTER SETTING kv.range_merge.queue_enabled = false"); err != nil { t.Fatal(err) @@ -346,7 +364,8 @@ func TestClosedTimestampCantServeBasedOnMaxTimestamp(t *testing.T) { ctx := context.Background() // Set up the target duration to be very long and rely on lease transfers to // drive MaxClosed. - tc, db0, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t, time.Hour) + tc, db0, desc, repls := setupClusterForClosedTimestampTesting(ctx, t, time.Hour, + testingCloseFraction, aggressiveResolvedTimestampClusterArgs) defer tc.Stopper().Stop(ctx) if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { @@ -383,7 +402,8 @@ func TestClosedTimestampCantServeForWritingTransaction(t *testing.T) { skip.UnderRace(t) ctx := context.Background() - tc, db0, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t, testingTargetDuration) + tc, db0, desc, repls := setupClusterForClosedTimestampTesting(ctx, t, testingTargetDuration, + testingCloseFraction, aggressiveResolvedTimestampClusterArgs) defer tc.Stopper().Stop(ctx) if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { @@ -416,14 +436,15 @@ func TestClosedTimestampCantServeForNonTransactionalReadRequest(t *testing.T) { skip.UnderRace(t) ctx := context.Background() - tc, db0, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t, testingTargetDuration) + tc, db0, desc, repls := setupClusterForClosedTimestampTesting(ctx, t, testingTargetDuration, + testingCloseFraction, aggressiveResolvedTimestampClusterArgs) defer tc.Stopper().Stop(ctx) if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { t.Fatal(err) } - // Verify that we can serve a follower read at a timestamp. Wait if necessary. + // Verify that we can serve a follower read at a timestamp. Wait if necessary ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} baRead := makeReadBatchRequestForDesc(desc, ts) testutils.SucceedsSoon(t, func() error { @@ -445,9 +466,461 @@ func TestClosedTimestampCantServeForNonTransactionalReadRequest(t *testing.T) { verifyNotLeaseHolderErrors(t, baQueryTxn, repls, 2) } +// TestClosedTimestampInactiveAfterSubsumption verifies that, during a merge, +// replicas of the subsumed range (RHS) cannot serve follower reads for +// timestamps after the subsumption time. +func TestClosedTimestampInactiveAfterSubsumption(t *testing.T) { + defer leaktest.AfterTest(t)() + // Skipping under short because this test pauses for a few seconds in order to + // trigger a node liveness expiration. + skip.UnderShort(t) + + type postSubsumptionCallback func( + ctx context.Context, + t *testing.T, + tc serverutils.TestClusterInterface, + g *errgroup.Group, + rightDesc roachpb.RangeDescriptor, + rightLeaseholder roachpb.ReplicationTarget, + freezeStartTimestamp hlc.Timestamp, + leaseAcquisitionTrap *atomic.Value, + ) (roachpb.ReplicationTarget, hlc.Timestamp, error) + + type testCase struct { + name string + callback postSubsumptionCallback + } + + tests := []testCase{ + { + name: "without lease transfer", + callback: nil, + }, + { + name: "with intervening lease transfer", + // TODO(aayush): Maybe allowlist `TransferLease` requests while a range is + // subsumed and use that here, instead of forcing a lease transfer by + // pausing heartbeats. + callback: forceLeaseTransferOnSubsumedRange, + }, + } + + runTest := func(t *testing.T, callback postSubsumptionCallback) { + ctx := context.Background() + st := mergeFilterState{ + blockMergeTrigger: make(chan hlc.Timestamp), + finishMergeTxn: make(chan struct{}), + } + var leaseAcquisitionTrap atomic.Value + clusterArgs := base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + RaftConfig: base.RaftConfig{ + // We set the raft election timeout to a small duration. This should + // result in the node liveness duration being ~2.4 seconds + RaftHeartbeatIntervalTicks: 3, + RaftElectionTimeoutTicks: 4, + }, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + // This test suspends the merge txn right before it can apply the + // commit trigger and can lead to the merge txn taking longer than + // the defaults specified in aggressiveResolvedTimestampPushKnobs(). + // We use really high values here in order to avoid the merge txn + // being pushed due to resolved timestamps. + RangeFeedPushTxnsInterval: 5 * time.Second, + RangeFeedPushTxnsAge: 60 * time.Second, + TestingRequestFilter: st.suspendMergeTrigger, + LeaseRequestEvent: func(ts hlc.Timestamp, storeID roachpb.StoreID, rangeID roachpb.RangeID) *roachpb.Error { + val := leaseAcquisitionTrap.Load() + if val == nil { + return nil + } + leaseAcquisitionCallback := val.(func(storeID roachpb.StoreID, rangeID roachpb.RangeID) *roachpb.Error) + if err := leaseAcquisitionCallback(storeID, rangeID); err != nil { + return err + } + return nil + }, + DisableMergeQueue: true, + }, + }, + }, + } + // If the initial phase of the merge txn takes longer than the closed + // timestamp target duration, its initial CPuts can have their write + // timestamps bumped due to an intervening closed timestamp update. This + // causes the entire merge txn to retry. So we use a long closed timestamp + // duration at the beginning of the test until we have the merge txn + // suspended at its commit trigger, and then change it back down to + // `testingTargetDuration`. + tc, db, leftDesc, rightDesc := initClusterWithSplitRanges(ctx, t, 5*time.Second, + testingCloseFraction, clusterArgs) + defer tc.Stopper().Stop(ctx) + + leftLeaseholder := getCurrentLeaseholder(t, tc, leftDesc) + rightLeaseholder := getCurrentLeaseholder(t, tc, rightDesc) + if leftLeaseholder.StoreID == rightLeaseholder.StoreID { + // In this test, we may pause the heartbeats of the store that holds the + // lease for the right hand side range, in order to force a lease + // transfer. If the LHS and RHS share a leaseholder, this may cause a + // lease transfer for the left hand range as well. This can cause a merge + // txn retry and we'd like to avoid that, so we ensure that LHS and RHS + // have different leaseholders before beginning the test. + target := pickRandomTarget(tc, leftLeaseholder, leftDesc) + if err := tc.TransferRangeLease(leftDesc, target); err != nil { + t.Fatal(err) + } + leftLeaseholder = target + } + + g, ctx := errgroup.WithContext(ctx) + // Merge the ranges back together. The LHS rightLeaseholder should block right + // before the merge trigger request is sent. + g.Go(func() error { + return mergeTxn(tc, leftDesc) + }) + defer func() { + // Unblock the rightLeaseholder so it can finally commit the merge. + close(st.finishMergeTxn) + if err := g.Wait(); err != nil { + t.Error(err) + } + }() + + // We now have the RHS in its subsumed state. + freezeStartTimestamp := <-st.blockMergeTrigger + // Reduce the closed timestamp target duration in order to make the rest of + // the test faster. + if _, err := db.Exec(fmt.Sprintf(`SET CLUSTER SETTING kv.closed_timestamp.target_duration = '%s';`, + testingTargetDuration)); err != nil { + t.Fatal(err) + } + // inactiveClosedTSBoundary indicates the low water mark for closed + // timestamp updates beyond which we expect none of the followers to be able + // to serve follower reads until the merge is complete. + inactiveClosedTSBoundary := freezeStartTimestamp + if callback != nil { + newRightLeaseholder, ts, err := callback(ctx, t, tc, g, rightDesc, rightLeaseholder, + freezeStartTimestamp, &leaseAcquisitionTrap) + if err != nil { + t.Fatal(err) + } + rightLeaseholder, inactiveClosedTSBoundary = newRightLeaseholder, ts + } + // Poll the store for closed timestamp updates for timestamps greater than + // our `inactiveClosedTSBoundary`. + closedTimestampCh := make(chan ctpb.Entry, 1) + g.Go(func() (e error) { + pollForGreaterClosedTimestamp(t, tc, rightLeaseholder, rightDesc, inactiveClosedTSBoundary, closedTimestampCh) + return + }) + // We expect that none of the closed timestamp updates greater than + // `inactiveClosedTSBoundary` will be actionable by the RHS follower + // replicas. + log.Infof(ctx, "waiting for next closed timestamp update for the RHS") + select { + case <-closedTimestampCh: + case <-time.After(30 * time.Second): + t.Fatal("failed to receive next closed timestamp update") + } + baReadAfterLeaseTransfer := makeReadBatchRequestForDesc(rightDesc, inactiveClosedTSBoundary.Next()) + rightReplFollowers := getFollowerReplicas(ctx, t, tc, rightDesc, rightLeaseholder) + log.Infof(ctx, "sending read requests from followers after the inactiveClosedTSBoundary") + verifyNotLeaseHolderErrors(t, baReadAfterLeaseTransfer, rightReplFollowers, 2 /* expectedNLEs */) + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + runTest(t, test.callback) + }) + } + +} + +// forceLeaseTransferOnSubsumedRange triggers a lease transfer on `rightDesc` by +// pausing the liveness heartbeats of the store that holds the lease for it. +func forceLeaseTransferOnSubsumedRange( + ctx context.Context, + t *testing.T, + tc serverutils.TestClusterInterface, + g *errgroup.Group, + rightDesc roachpb.RangeDescriptor, + rightLeaseholder roachpb.ReplicationTarget, + freezeStartTimestamp hlc.Timestamp, + leaseAcquisitionTrap *atomic.Value, +) (newLeaseholder roachpb.ReplicationTarget, leaseStart hlc.Timestamp, err error) { + oldLeaseholderStore := getTargetStoreOrFatal(t, tc, rightLeaseholder) + // Co-operative lease transfers will block while a range is subsumed, so we + // pause the node liveness heartbeats until a lease transfer occurs. + oldLease, _ := oldLeaseholderStore.LookupReplica(rightDesc.StartKey).GetLease() + require.True(t, oldLease.Replica.StoreID == oldLeaseholderStore.StoreID()) + // Instantiate the lease acquisition callback right before we pause the node + // liveness heartbeats. We do this here because leases may be requested at + // any time for any reason, even before we pause the heartbeats. + leaseAcquisitionCh := make(chan roachpb.StoreID) + newRightLeaseholder := getFollowerReplicas(ctx, t, tc, rightDesc, rightLeaseholder)[0] + var once sync.Once + leaseAcquisitionTrap.Store(func(storeID roachpb.StoreID, rangeID roachpb.RangeID) *roachpb.Error { + if rangeID == rightDesc.RangeID { + if expectedStoreID := newRightLeaseholder.StoreID(); expectedStoreID != storeID { + return roachpb.NewError(&roachpb.NotLeaseHolderError{ + CustomMsg: fmt.Sprintf("only store %d must acquire the RHS's lease", expectedStoreID), + }) + } + once.Do(func() { + log.Infof(ctx, "received lease request from store %d for RHS range %d", + storeID, rangeID) + leaseAcquisitionCh <- storeID + }) + } + return nil + }) + restartHeartbeats := oldLeaseholderStore.NodeLiveness().DisableAllHeartbeatsForTest() + defer restartHeartbeats() + log.Infof(ctx, "paused RHS rightLeaseholder's liveness heartbeats") + time.Sleep(oldLeaseholderStore.NodeLiveness().GetLivenessThreshold()) + + // Send a read request from one of the followers of RHS so that it notices + // that the current rightLeaseholder has stopped heartbeating. This will prompt + // it to acquire the range lease for itself. + g.Go(func() error { + leaseAcquisitionRequest := makeReadBatchRequestForDesc(rightDesc, freezeStartTimestamp) + log.Infof(ctx, + "sending a read request from a follower of RHS (store %d) in order to trigger lease acquisition", + newRightLeaseholder.StoreID()) + _, pErr := newRightLeaseholder.Send(ctx, leaseAcquisitionRequest) + // After the merge commits, the RHS will cease to exist. Thus, we expect + // all pending queries on RHS to return RangeNotFoundErrors. + if !assert.ObjectsAreEqual(reflect.TypeOf(pErr.GetDetail()), reflect.TypeOf(&roachpb.RangeNotFoundError{})) { + return errors.AssertionFailedf("expected a RangeNotFoundError; got %v", pErr.GetDetail()) + } + return nil + }) + select { + case storeID := <-leaseAcquisitionCh: + if storeID != newRightLeaseholder.StoreID() { + err = errors.Newf("expected store %d to try to acquire the lease; got a request from store %d instead", + newRightLeaseholder.StoreID(), storeID) + return roachpb.ReplicationTarget{}, hlc.Timestamp{}, err + } + case <-time.After(30 * time.Second): + err = errors.New("failed to receive lease acquisition request") + return roachpb.ReplicationTarget{}, hlc.Timestamp{}, err + } + rightLeaseholder = roachpb.ReplicationTarget{ + NodeID: newRightLeaseholder.NodeID(), + StoreID: newRightLeaseholder.StoreID(), + } + oldLeaseholderStore = getTargetStoreOrFatal(t, tc, rightLeaseholder) + err = retry.ForDuration(testutils.DefaultSucceedsSoonDuration, func() error { + newLease, _ := oldLeaseholderStore.LookupReplica(rightDesc.StartKey).GetLease() + if newLease.Sequence == oldLease.Sequence { + return errors.New("RHS lease not updated") + } + leaseStart = newLease.Start + return nil + }) + if err != nil { + return + } + if !freezeStartTimestamp.LessEq(leaseStart) { + err = errors.New("freeze timestamp greater than the start time of the new lease") + return roachpb.ReplicationTarget{}, hlc.Timestamp{}, err + } + + return rightLeaseholder, leaseStart, nil +} + +type mergeFilterState struct { + retries int32 + + blockMergeTrigger chan hlc.Timestamp + finishMergeTxn chan struct{} +} + +// suspendMergeTrigger blocks a merge transaction right before its commit +// trigger is evaluated. This is intended to get the RHS range suspended in its +// subsumed state. The `finishMergeTxn` channel must be closed by the caller in +// order to unblock the merge txn. +func (state *mergeFilterState) suspendMergeTrigger( + ctx context.Context, ba roachpb.BatchRequest, +) *roachpb.Error { + for _, req := range ba.Requests { + if et := req.GetEndTxn(); et != nil && et.InternalCommitTrigger.GetMergeTrigger() != nil { + if state.retries >= 1 { + // TODO(aayush): make these tests resilient to merge txn retries. + return roachpb.NewError(errors.AssertionFailedf("merge txn retried")) + } + freezeStart := et.InternalCommitTrigger.MergeTrigger.FreezeStart + log.Infof(ctx, "suspending the merge txn with FreezeStart: %s", + freezeStart) + state.retries++ + // We block the LHS leaseholder from applying the merge trigger. Note + // that RHS followers will have already caught up to the leaseholder + // well before this point. + state.blockMergeTrigger <- freezeStart + // Let the merge try to commit. + <-state.finishMergeTxn + } + } + return nil +} + +func mergeTxn(tc serverutils.TestClusterInterface, leftDesc roachpb.RangeDescriptor) error { + ctx := context.Background() + ctx, record, cancel := tracing.ContextWithRecordingSpan(ctx, "merge txn") + defer cancel() + if _, err := tc.Server(0).MergeRanges(leftDesc.StartKey.AsRawKey()); err != nil { + log.Infof(ctx, "trace: %s", record().String()) + return err + } + return nil +} + +func initClusterWithSplitRanges( + ctx context.Context, + t *testing.T, + targetDuration time.Duration, + closeFraction float64, + clusterArgs base.TestClusterArgs, +) ( + serverutils.TestClusterInterface, + *gosql.DB, + roachpb.RangeDescriptor, + roachpb.RangeDescriptor, +) { + tc, db0, desc, repls := setupClusterForClosedTimestampTesting(ctx, t, targetDuration, + closeFraction, clusterArgs) + + if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { + t.Fatal(err) + } + if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(3, $1)`, "foo"); err != nil { + t.Fatal(err) + } + // Start by ensuring that the values can be read from all replicas at ts. + ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + baRead := makeReadBatchRequestForDesc(desc, ts) + testutils.SucceedsSoon(t, func() error { + return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(2)) + }) + // Manually split the table to have easier access to descriptors. + tableID, err := getTableID(db0, "cttest", "kv") + if err != nil { + t.Fatalf("failed to lookup ids: %+v", err) + } + + idxPrefix := keys.SystemSQLCodec.IndexPrefix(uint32(tableID), 1) + k, err := sqlbase.EncodeTableKey(idxPrefix, tree.NewDInt(2), encoding.Ascending) + if err != nil { + t.Fatalf("failed to encode split key: %+v", err) + } + tcImpl := tc.(*testcluster.TestCluster) + leftDesc, rightDesc := tcImpl.SplitRangeOrFatal(t, k) + if err := tcImpl.WaitForFullReplication(); err != nil { + t.Fatal(err) + } + return tc, db0, leftDesc, rightDesc +} + +func getCurrentMaxClosed( + t *testing.T, + tc serverutils.TestClusterInterface, + target roachpb.ReplicationTarget, + desc roachpb.RangeDescriptor, +) ctpb.Entry { + deadline := timeutil.Now().Add(2 * testingTargetDuration) + store := getTargetStoreOrFatal(t, tc, target) + var maxClosed ctpb.Entry + attempts := 0 + for attempts == 0 || timeutil.Now().Before(deadline) { + attempts++ + store.ClosedTimestamp().Storage.VisitDescending(target.NodeID, func(entry ctpb.Entry) (done bool) { + if _, ok := entry.MLAI[desc.RangeID]; ok { + maxClosed = entry + return true + } + return false + }) + if _, ok := maxClosed.MLAI[desc.RangeID]; !ok { + // We ran out of closed timestamps to visit without finding one that + // corresponds to rightDesc. It is likely that no closed timestamps have + // been broadcast for desc yet, try again. + continue + } + return maxClosed + } + return ctpb.Entry{} +} + +func pollForGreaterClosedTimestamp( + t *testing.T, + tc serverutils.TestClusterInterface, + target roachpb.ReplicationTarget, + desc roachpb.RangeDescriptor, + lowerBound hlc.Timestamp, + returnCh chan<- ctpb.Entry, +) { + for { + if t.Failed() { + return + } + maxClosed := getCurrentMaxClosed(t, tc, target, desc) + if _, ok := maxClosed.MLAI[desc.RangeID]; ok && lowerBound.LessEq(maxClosed.ClosedTimestamp) { + returnCh <- maxClosed + return + } + } +} + +func getFollowerReplicas( + ctx context.Context, + t *testing.T, + tc serverutils.TestClusterInterface, + rangeDesc roachpb.RangeDescriptor, + leaseholder roachpb.ReplicationTarget, +) []*kvserver.Replica { + repls := replsForRange(ctx, t, tc, rangeDesc) + followers := make([]*kvserver.Replica, 0, len(repls)-1) + for _, repl := range repls { + if repl.StoreID() == leaseholder.StoreID && repl.NodeID() == leaseholder.NodeID { + continue + } + followers = append(followers, repl) + } + return followers +} + +func getTargetStoreOrFatal( + t *testing.T, tc serverutils.TestClusterInterface, target roachpb.ReplicationTarget, +) (store *kvserver.Store) { + for i := 0; i < tc.NumServers(); i++ { + if server := tc.Server(i); server.NodeID() == target.NodeID && + server.GetStores().(*kvserver.Stores).HasStore(target.StoreID) { + store, err := server.GetStores().(*kvserver.Stores).GetStore(target.StoreID) + if err != nil { + t.Fatal(err) + } + return store + } + } + t.Fatalf("Could not find store for replication target %+v\n", target) + return nil +} + func verifyNotLeaseHolderErrors( t *testing.T, ba roachpb.BatchRequest, repls []*kvserver.Replica, expectedNLEs int, ) { + notLeaseholderErrs, err := countNotLeaseHolderErrors(ba, repls) + if err != nil { + t.Fatal(err) + } + if a, e := notLeaseholderErrs, int64(expectedNLEs); a != e { + t.Fatalf("expected %d NotLeaseHolderError; found %d", e, a) + } +} + +func countNotLeaseHolderErrors(ba roachpb.BatchRequest, repls []*kvserver.Replica) (int64, error) { g, ctx := errgroup.WithContext(context.Background()) var notLeaseholderErrs int64 for i := range repls { @@ -464,18 +937,16 @@ func verifyNotLeaseHolderErrors( }) } if err := g.Wait(); err != nil { - t.Fatal(err) - } - if a, e := notLeaseholderErrs, int64(expectedNLEs); a != e { - t.Fatalf("expected %d NotLeaseHolderError; found %d", e, a) + return 0, err } + return notLeaseholderErrs, nil } // Every 0.1s=100ms, try close out a timestamp ~300ms in the past. // We don't want to be more aggressive than that since it's also // a limit on how long transactions can run. const testingTargetDuration = 300 * time.Millisecond -const closeFraction = 0.333 +const testingCloseFraction = 0.333 const numNodes = 3 func replsForRange( @@ -541,28 +1012,27 @@ func aggressiveResolvedTimestampPushKnobs() *kvserver.StoreTestingKnobs { } } -// This function creates a test cluster that is prepared to exercise follower -// reads. The returned test cluster has follower reads enabled using the above -// targetDuration and closeFraction. In addition to the newly minted test -// cluster, this function returns a db handle to node 0, a range descriptor for -// the range used by the table `cttest.kv` and the replica objects corresponding -// to the replicas for the range. It is the caller's responsibility to Stop the -// Stopper on the returned test cluster when done. -func setupTestClusterForClosedTimestampTesting( - ctx context.Context, t *testing.T, targetDuration time.Duration, +// setupClusterForClosedTimestampTesting creates a test cluster that is prepared +// to exercise follower reads. The returned test cluster has follower reads +// enabled using the given targetDuration and testingCloseFraction. In addition +// to the newly minted test cluster, this function returns a db handle to node +// 0, a range descriptor for the range used by the table `cttest.kv` and the +// replica objects corresponding to the replicas for the range. It is the +// caller's responsibility to Stop the Stopper on the returned test cluster when +// done. +func setupClusterForClosedTimestampTesting( + ctx context.Context, + t *testing.T, + targetDuration time.Duration, + closeFraction float64, + clusterArgs base.TestClusterArgs, ) ( tc serverutils.TestClusterInterface, db0 *gosql.DB, kvTableDesc roachpb.RangeDescriptor, repls []*kvserver.Replica, ) { - tc = serverutils.StartTestCluster(t, numNodes, base.TestClusterArgs{ - ServerArgs: base.TestServerArgs{ - Knobs: base.TestingKnobs{ - Store: aggressiveResolvedTimestampPushKnobs(), - }, - }, - }) + tc = serverutils.StartTestCluster(t, numNodes, clusterArgs) db0 = tc.ServerConn(0) if _, err := db0.Exec(fmt.Sprintf(` diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 727330dbcbc7..8374d718196f 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -889,6 +889,16 @@ func (r *Replica) getMergeCompleteChRLocked() chan struct{} { return r.mu.mergeComplete } +func (r *Replica) mergeInProgress() bool { + r.mu.RLock() + defer r.mu.RUnlock() + return r.mergeInProgressRLocked() +} + +func (r *Replica) mergeInProgressRLocked() bool { + return r.mu.mergeComplete != nil +} + // setLastReplicaDescriptors sets the the most recently seen replica // descriptors to those contained in the *RaftMessageRequest, acquiring r.mu // to do so. @@ -1534,10 +1544,6 @@ func (r *Replica) maybeTransferRaftLeadershipLocked(ctx context.Context) { } } -func (r *Replica) mergeInProgressRLocked() bool { - return r.mu.mergeComplete != nil -} - func (r *Replica) getReplicaDescriptorByIDRLocked( replicaID roachpb.ReplicaID, fallback roachpb.ReplicaDescriptor, ) (roachpb.ReplicaDescriptor, error) { diff --git a/pkg/kv/kvserver/replica_closedts.go b/pkg/kv/kvserver/replica_closedts.go index f34a8c63bb35..06a460996c03 100644 --- a/pkg/kv/kvserver/replica_closedts.go +++ b/pkg/kv/kvserver/replica_closedts.go @@ -27,6 +27,7 @@ func (r *Replica) EmitMLAI() { } epoch := r.mu.state.Lease.Epoch isLeaseholder := r.mu.state.Lease.Replica.ReplicaID == r.mu.replicaID + isMergeInProgress := r.mergeInProgressRLocked() r.mu.RUnlock() // If we're the leaseholder of an epoch-based lease, notify the minPropTracker @@ -34,6 +35,25 @@ func (r *Replica) EmitMLAI() { if isLeaseholder && epoch > 0 { ctx := r.AnnotateCtx(context.Background()) _, untrack := r.store.cfg.ClosedTimestamp.Tracker.Track(ctx) - untrack(ctx, ctpb.Epoch(epoch), r.RangeID, ctpb.LAI(lai)) + if isMergeInProgress { + // A critical requirement for the correctness of range merges is that we + // don't allow follower reads on closed timestamps that are greater than + // the subsumption time of the RHS range. Thus, while a range is subsumed, + // we ensure that any intervening closed timestamp updates (until the + // merge either commits or aborts) can only be activated *after* the merge + // has completed (successfully or otherwise), by requiring that follower + // replicas must catch up to an MLAI that succeeds the range's current + // lease applied index. See comment block at the end of Subsume() in + // cmd_subsume.go for more details. + // + // Omitting the closed timestamp update here would be legal, but + // undesirable because if the range were to go on to quiesce, the follower + // replicas would not be able to implicitly tick their closed timestamps + // without `Request`ing it from the new leaseholder. Emitting it here + // avoids that little bit of latency. + untrack(ctx, ctpb.Epoch(epoch), r.RangeID, ctpb.LAI(lai+1)) + } else { + untrack(ctx, ctpb.Epoch(epoch), r.RangeID, ctpb.LAI(lai)) + } } } diff --git a/pkg/kv/kvserver/replica_eval_context_span.go b/pkg/kv/kvserver/replica_eval_context_span.go index cc91feeb7845..c7fb650a4b9c 100644 --- a/pkg/kv/kvserver/replica_eval_context_span.go +++ b/pkg/kv/kvserver/replica_eval_context_span.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" @@ -109,6 +110,12 @@ func (rec *SpanSetReplicaEvalContext) GetLeaseAppliedIndex() uint64 { return rec.i.GetLeaseAppliedIndex() } +// GetTracker returns the min prop tracker that keeps tabs over ongoing command +// evaluations for the closed timestamp subsystem. +func (rec *SpanSetReplicaEvalContext) GetTracker() closedts.TrackerI { + return rec.i.GetTracker() +} + // IsFirstRange returns true iff the replica belongs to the first range. func (rec *SpanSetReplicaEvalContext) IsFirstRange() bool { return rec.i.IsFirstRange() diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index b5c5378c9197..f97e4a418062 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -742,7 +742,7 @@ func TestLearnerAndJointConfigFollowerRead(t *testing.T) { defer tc.Stopper().Stop(ctx) db := sqlutils.MakeSQLRunner(tc.ServerConn(0)) db.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = $1`, testingTargetDuration) - db.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.close_fraction = $1`, closeFraction) + db.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.close_fraction = $1`, testingCloseFraction) db.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.follower_reads_enabled = true`) scratchStartKey := tc.ScratchRange(t) diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index 6ea02d61a901..925ea053e06e 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -18,6 +18,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry" @@ -351,6 +352,12 @@ func (r *Replica) GetLeaseAppliedIndex() uint64 { return r.mu.state.LeaseAppliedIndex } +// GetTracker returns the min prop tracker that keeps tabs over ongoing command +// evaluations for the closed timestamp subsystem. +func (r *Replica) GetTracker() closedts.TrackerI { + return r.store.cfg.ClosedTimestamp.Tracker +} + // Snapshot implements the raft.Storage interface. Snapshot requires that // r.mu is held. Note that the returned snapshot is a placeholder and // does not contain any of the replica data. The snapshot is actually generated diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index 00bedf805f7c..22c77f76732f 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -603,7 +603,9 @@ func (r *Replica) requestLeaseLocked( ctx context.Context, status kvserverpb.LeaseStatus, ) *leaseRequestHandle { if r.store.TestingKnobs().LeaseRequestEvent != nil { - r.store.TestingKnobs().LeaseRequestEvent(status.Timestamp) + if err := r.store.TestingKnobs().LeaseRequestEvent(status.Timestamp, r.StoreID(), r.GetRangeID()); err != nil { + return r.mu.pendingLeaseRequest.newResolvedHandle(err) + } } // Propose a Raft command to get a lease for this replica. repDesc, err := r.getReplicaDescriptorRLocked() diff --git a/pkg/kv/kvserver/replica_rangefeed_test.go b/pkg/kv/kvserver/replica_rangefeed_test.go index 31a5de533ceb..1853e4d6fcab 100644 --- a/pkg/kv/kvserver/replica_rangefeed_test.go +++ b/pkg/kv/kvserver/replica_rangefeed_test.go @@ -725,7 +725,7 @@ func TestReplicaRangefeedPushesTransactions(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - tc, db, _, repls := setupTestClusterForClosedTimestampTesting(ctx, t, testingTargetDuration) + tc, db, _, repls := setupClusterForClosedTimestampTesting(ctx, t, testingTargetDuration, testingCloseFraction, aggressiveResolvedTimestampClusterArgs) defer tc.Stopper().Stop(ctx) sqlDB := sqlutils.MakeSQLRunner(db) @@ -837,7 +837,7 @@ func TestReplicaRangefeedNudgeSlowClosedTimestamp(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - tc, db, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t, testingTargetDuration) + tc, db, desc, repls := setupClusterForClosedTimestampTesting(ctx, t, testingTargetDuration, testingCloseFraction, aggressiveResolvedTimestampClusterArgs) defer tc.Stopper().Stop(ctx) sqlDB := sqlutils.MakeSQLRunner(db) diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go index cc138cedb114..0527f2cc5136 100644 --- a/pkg/kv/kvserver/replica_read.go +++ b/pkg/kv/kvserver/replica_read.go @@ -127,7 +127,6 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes( log.VEventf(ctx, 2, "server-side retry of batch") } br, res, pErr = evaluateBatch(ctx, kvserverbase.CmdIDKey(""), rw, rec, nil, ba, true /* readOnly */) - // If we can retry, set a higher batch timestamp and continue. // Allow one retry only. if pErr == nil || retries > 0 || !canDoServersideRetry(ctx, pErr, ba, br, latchSpans, nil /* deadline */) { diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index a7f41d468910..7c1cb59e771c 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -720,15 +720,16 @@ func TestBehaviorDuringLeaseTransfer(t *testing.T) { tsc := TestStoreConfig(clock) var leaseAcquisitionTrap atomic.Value tsc.TestingKnobs.DisableAutomaticLeaseRenewal = true - tsc.TestingKnobs.LeaseRequestEvent = func(ts hlc.Timestamp) { + tsc.TestingKnobs.LeaseRequestEvent = func(ts hlc.Timestamp, _ roachpb.StoreID, _ roachpb.RangeID) *roachpb.Error { val := leaseAcquisitionTrap.Load() if val == nil { - return + return nil } trapCallback := val.(func(ts hlc.Timestamp)) if trapCallback != nil { trapCallback(ts) } + return nil } transferSem := make(chan struct{}) tsc.TestingKnobs.EvalKnobs.TestingEvalFilter = diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index 2ad2e64abc27..750250c3b735 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -142,11 +142,23 @@ func (r *Replica) executeWriteBatch( } g = nil // ownership passed to Raft, prevent misuse - // A max lease index of zero is returned when no proposal was made or a lease was proposed. - // In both cases, we don't need to communicate a MLAI. Furthermore, for lease proposals we - // cannot communicate under the lease's epoch. Instead the code calls EmitMLAI explicitly - // as a side effect of stepping up as leaseholder. + // A max lease index of zero is returned when no proposal was made or a lease + // was proposed. In case no proposal was made or a lease was proposed, we + // don't need to communicate a MLAI. Furthermore, for lease proposals we + // cannot communicate under the lease's epoch. Instead the code calls EmitMLAI + // explicitly as a side effect of stepping up as leaseholder. if maxLeaseIndex != 0 { + if r.mergeInProgress() { + // The correctness of range merges relies on the invariant that the + // LeaseAppliedIndex of the range is not bumped while a range is in its + // subsumed state. If this invariant is ever violated, the follower + // replicas of the subsumed range (RHS) are free to activate any future + // closed timestamp updates even before the merge completes. This would be + // a serializability violation. + // + // See comment block in Subsume() in cmd_subsume.go for details. + log.Fatalf(ctx, "lease applied index bumped while the range was subsumed") + } untrack(ctx, ctpb.Epoch(st.Lease.Epoch), r.RangeID, ctpb.LAI(maxLeaseIndex)) } diff --git a/pkg/kv/kvserver/spanset/spanset.go b/pkg/kv/kvserver/spanset/spanset.go index faffc8f877b3..4ba50823bcc5 100644 --- a/pkg/kv/kvserver/spanset/spanset.go +++ b/pkg/kv/kvserver/spanset/spanset.go @@ -196,6 +196,25 @@ func (s *SpanSet) MaxProtectedTimestamp() hlc.Timestamp { return maxTS } +// Intersects returns true iff the span set denoted by `other` has any +// overlapping spans with `s`, and that those spans overlap in access type. Note +// that timestamps associated with the spans in the spanset are not considered, +// only the span boundaries are checked. +func (s *SpanSet) Intersects(other *SpanSet) bool { + for sa := SpanAccess(0); sa < NumSpanAccess; sa++ { + for ss := SpanScope(0); ss < NumSpanScope; ss++ { + otherSpans := other.GetSpans(sa, ss) + for _, span := range otherSpans { + // If access is allowed, we must have an overlap. + if err := s.CheckAllowed(sa, span.Span); err == nil { + return true + } + } + } + } + return false +} + // AssertAllowed calls CheckAllowed and fatals if the access is not allowed. // Timestamps associated with the spans in the spanset are not considered, // only the span boundaries are checked. diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index f117c4ae4922..acf10f089800 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -76,8 +76,7 @@ import ( const ( // rangeIDAllocCount is the number of Range IDs to allocate per allocation. - rangeIDAllocCount = 10 - defaultRaftHeartbeatIntervalTicks = 5 + rangeIDAllocCount = 10 // defaultRaftEntryCacheSize is the default size in bytes for a // store's Raft log entry cache. @@ -179,7 +178,6 @@ func TestStoreConfig(clock *hlc.Clock) StoreConfig { AmbientCtx: log.AmbientContext{Tracer: st.Tracer}, Clock: clock, CoalescedHeartbeatsInterval: 50 * time.Millisecond, - RaftHeartbeatIntervalTicks: 1, ScanInterval: 10 * time.Minute, HistogramWindowInterval: metric.TestSampleInterval, EnableEpochRangeLeases: true, @@ -189,6 +187,7 @@ func TestStoreConfig(clock *hlc.Clock) StoreConfig { // Use shorter Raft tick settings in order to minimize start up and failover // time in tests. + sc.RaftHeartbeatIntervalTicks = 1 sc.RaftElectionTimeoutTicks = 3 sc.RaftTickInterval = 100 * time.Millisecond sc.SetDefaults() @@ -662,9 +661,6 @@ type StoreConfig struct { // the quiesce cadence. CoalescedHeartbeatsInterval time.Duration - // RaftHeartbeatIntervalTicks is the number of ticks that pass between heartbeats. - RaftHeartbeatIntervalTicks int - // ScanInterval is the default value for the scan interval ScanInterval time.Duration @@ -751,9 +747,6 @@ func (sc *StoreConfig) SetDefaults() { if sc.CoalescedHeartbeatsInterval == 0 { sc.CoalescedHeartbeatsInterval = sc.RaftTickInterval / 2 } - if sc.RaftHeartbeatIntervalTicks == 0 { - sc.RaftHeartbeatIntervalTicks = defaultRaftHeartbeatIntervalTicks - } if sc.RaftEntryCacheSize == 0 { sc.RaftEntryCacheSize = defaultRaftEntryCacheSize } @@ -2226,6 +2219,16 @@ func (s *Store) Stopper() *stop.Stopper { return s.stopper } // TestingKnobs accessor. func (s *Store) TestingKnobs() *StoreTestingKnobs { return &s.cfg.TestingKnobs } +// ClosedTimestamp accessor. +func (s *Store) ClosedTimestamp() *container.Container { + return s.cfg.ClosedTimestamp +} + +// NodeLiveness accessor. +func (s *Store) NodeLiveness() *NodeLiveness { + return s.cfg.NodeLiveness +} + // IsDraining accessor. func (s *Store) IsDraining() bool { return s.draining.Load().(bool) diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 706e73a617e0..d735ca362abb 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -93,7 +93,7 @@ type StoreTestingKnobs struct { // LeaseRequestEvent, if set, is called when replica.requestLeaseLocked() is // called to acquire a new lease. This can be used to assert that a request // triggers a lease acquisition. - LeaseRequestEvent func(ts hlc.Timestamp) + LeaseRequestEvent func(ts hlc.Timestamp, storeID roachpb.StoreID, rangeID roachpb.RangeID) *roachpb.Error // LeaseTransferBlockedOnExtensionEvent, if set, is called when // replica.TransferLease() encounters an in-progress lease extension. // nextLeader is the replica that we're trying to transfer the lease to. diff --git a/pkg/roachpb/batch_generated.go b/pkg/roachpb/batch_generated.go index 6016beccf06d..db4de754ddb7 100644 --- a/pkg/roachpb/batch_generated.go +++ b/pkg/roachpb/batch_generated.go @@ -1252,3 +1252,99 @@ func (ba *BatchRequest) CreateReply() *BatchResponse { } return br } + +// CreateRequest creates an empty Request for each of the Method types. +func CreateRequest(method Method) Request { + switch method { + case Get: + return &GetRequest{} + case Put: + return &PutRequest{} + case ConditionalPut: + return &ConditionalPutRequest{} + case Increment: + return &IncrementRequest{} + case Delete: + return &DeleteRequest{} + case DeleteRange: + return &DeleteRangeRequest{} + case ClearRange: + return &ClearRangeRequest{} + case RevertRange: + return &RevertRangeRequest{} + case Scan: + return &ScanRequest{} + case EndTxn: + return &EndTxnRequest{} + case AdminSplit: + return &AdminSplitRequest{} + case AdminUnsplit: + return &AdminUnsplitRequest{} + case AdminMerge: + return &AdminMergeRequest{} + case AdminTransferLease: + return &AdminTransferLeaseRequest{} + case AdminChangeReplicas: + return &AdminChangeReplicasRequest{} + case AdminRelocateRange: + return &AdminRelocateRangeRequest{} + case HeartbeatTxn: + return &HeartbeatTxnRequest{} + case GC: + return &GCRequest{} + case PushTxn: + return &PushTxnRequest{} + case RecoverTxn: + return &RecoverTxnRequest{} + case ResolveIntent: + return &ResolveIntentRequest{} + case ResolveIntentRange: + return &ResolveIntentRangeRequest{} + case Merge: + return &MergeRequest{} + case TruncateLog: + return &TruncateLogRequest{} + case RequestLease: + return &RequestLeaseRequest{} + case ReverseScan: + return &ReverseScanRequest{} + case ComputeChecksum: + return &ComputeChecksumRequest{} + case CheckConsistency: + return &CheckConsistencyRequest{} + case InitPut: + return &InitPutRequest{} + case TransferLease: + return &TransferLeaseRequest{} + case LeaseInfo: + return &LeaseInfoRequest{} + case WriteBatch: + return &WriteBatchRequest{} + case Export: + return &ExportRequest{} + case Import: + return &ImportRequest{} + case QueryTxn: + return &QueryTxnRequest{} + case QueryIntent: + return &QueryIntentRequest{} + case AdminScatter: + return &AdminScatterRequest{} + case AddSSTable: + return &AddSSTableRequest{} + case RecomputeStats: + return &RecomputeStatsRequest{} + case Refresh: + return &RefreshRequest{} + case RefreshRange: + return &RefreshRangeRequest{} + case Subsume: + return &SubsumeRequest{} + case RangeStats: + return &RangeStatsRequest{} + case AdminVerifyProtectedTimestamp: + return &AdminVerifyProtectedTimestampRequest{} + default: + panic(fmt.Sprintf("unsupported method: %+v", method)) + } +} diff --git a/pkg/roachpb/gen_batch.go b/pkg/roachpb/gen_batch.go index 3191144929cc..649eea337069 100644 --- a/pkg/roachpb/gen_batch.go +++ b/pkg/roachpb/gen_batch.go @@ -319,6 +319,22 @@ func (ba *BatchRequest) CreateReply() *BatchResponse { } return br } +`) + + fmt.Fprint(f, ` +// CreateRequest creates an empty Request for each of the Method types. +func CreateRequest(method Method) Request { + switch method {`) + for _, v := range reqVariants { + fmt.Fprintf(f, ` + case %s: + return &%s{}`, v.msgType[:len(v.msgType)-7], v.msgType) + } + fmt.Fprintf(f, "%s", ` + default: + panic(fmt.Sprintf("unsupported method: %+v", method)) + } +} `) if err := f.Close(); err != nil {