From af5e7dabcf2b147934eba0ce2d2b03753acee245 Mon Sep 17 00:00:00 2001 From: Aayush Shah Date: Sun, 19 Jul 2020 23:29:28 -0400 Subject: [PATCH 1/5] kvserver: move RaftHeartbeatIntervalTicks into RaftConfig Currently, RaftHeartbeatIntervalTicks is not configurable from a TestCluster and we disallow RaftElectionTimeoutTicks to be less than or equal to it. Thus, tests that use TestCluster and, for instance, sleep for a node's liveness to expire take longer than they really need to. Release note: None --- pkg/base/config.go | 11 +++++++++++ pkg/kv/kvserver/store.go | 11 ++--------- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/pkg/base/config.go b/pkg/base/config.go index 86e70d7e9a44..4d4ec6b56dd6 100644 --- a/pkg/base/config.go +++ b/pkg/base/config.go @@ -63,6 +63,11 @@ const ( // See https://github.com/cockroachdb/cockroach/issues/20310. DefaultMetricsSampleInterval = 10 * time.Second + // defaultRaftHeartbeatIntervalTicks is the default value for + // RaftHeartbeatIntervalTicks, which determines the number of ticks between + // each heartbeat. + defaultRaftHeartbeatIntervalTicks = 5 + // defaultRPCHeartbeatInterval is the default value of RPCHeartbeatInterval // used by the rpc context. defaultRPCHeartbeatInterval = 3 * time.Second @@ -300,6 +305,9 @@ type RaftConfig struct { // unless overridden. RaftElectionTimeoutTicks int + // RaftHeartbeatIntervalTicks is the number of ticks that pass between heartbeats. + RaftHeartbeatIntervalTicks int + // RangeLeaseRaftElectionTimeoutMultiplier specifies what multiple the leader // lease active duration should be of the raft election timeout. RangeLeaseRaftElectionTimeoutMultiplier float64 @@ -364,6 +372,9 @@ func (cfg *RaftConfig) SetDefaults() { if cfg.RaftElectionTimeoutTicks == 0 { cfg.RaftElectionTimeoutTicks = defaultRaftElectionTimeoutTicks } + if cfg.RaftHeartbeatIntervalTicks == 0 { + cfg.RaftHeartbeatIntervalTicks = defaultRaftHeartbeatIntervalTicks + } if cfg.RangeLeaseRaftElectionTimeoutMultiplier == 0 { cfg.RangeLeaseRaftElectionTimeoutMultiplier = defaultRangeLeaseRaftElectionTimeoutMultiplier } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index f117c4ae4922..beb3658fc3ee 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -76,8 +76,7 @@ import ( const ( // rangeIDAllocCount is the number of Range IDs to allocate per allocation. - rangeIDAllocCount = 10 - defaultRaftHeartbeatIntervalTicks = 5 + rangeIDAllocCount = 10 // defaultRaftEntryCacheSize is the default size in bytes for a // store's Raft log entry cache. @@ -179,7 +178,6 @@ func TestStoreConfig(clock *hlc.Clock) StoreConfig { AmbientCtx: log.AmbientContext{Tracer: st.Tracer}, Clock: clock, CoalescedHeartbeatsInterval: 50 * time.Millisecond, - RaftHeartbeatIntervalTicks: 1, ScanInterval: 10 * time.Minute, HistogramWindowInterval: metric.TestSampleInterval, EnableEpochRangeLeases: true, @@ -189,6 +187,7 @@ func TestStoreConfig(clock *hlc.Clock) StoreConfig { // Use shorter Raft tick settings in order to minimize start up and failover // time in tests. + sc.RaftHeartbeatIntervalTicks = 1 sc.RaftElectionTimeoutTicks = 3 sc.RaftTickInterval = 100 * time.Millisecond sc.SetDefaults() @@ -662,9 +661,6 @@ type StoreConfig struct { // the quiesce cadence. CoalescedHeartbeatsInterval time.Duration - // RaftHeartbeatIntervalTicks is the number of ticks that pass between heartbeats. - RaftHeartbeatIntervalTicks int - // ScanInterval is the default value for the scan interval ScanInterval time.Duration @@ -751,9 +747,6 @@ func (sc *StoreConfig) SetDefaults() { if sc.CoalescedHeartbeatsInterval == 0 { sc.CoalescedHeartbeatsInterval = sc.RaftTickInterval / 2 } - if sc.RaftHeartbeatIntervalTicks == 0 { - sc.RaftHeartbeatIntervalTicks = defaultRaftHeartbeatIntervalTicks - } if sc.RaftEntryCacheSize == 0 { sc.RaftEntryCacheSize = defaultRaftEntryCacheSize } From 2b5b45c8e5937abb0ae7194e96dcc403c31ed78c Mon Sep 17 00:00:00 2001 From: Paul Bardea Date: Mon, 3 Aug 2020 00:33:37 -0400 Subject: [PATCH 2/5] backupccl: direct spans after split and scatter The AdminScatter request sent in the SplitAndScatterProcessor returns the lease information of the range after the scatter requst has completed. The SplitAndScatterProcessor now looks at this field to properly direct the spans to the appropriate RestoreData processor. Release note: None. --- .../backupccl/split_and_scatter_processor.go | 52 ++++++++++++------- .../split_and_scatter_processor_test.go | 2 +- 2 files changed, 34 insertions(+), 20 deletions(-) diff --git a/pkg/ccl/backupccl/split_and_scatter_processor.go b/pkg/ccl/backupccl/split_and_scatter_processor.go index 9221fc241db8..a317a47803bd 100644 --- a/pkg/ccl/backupccl/split_and_scatter_processor.go +++ b/pkg/ccl/backupccl/split_and_scatter_processor.go @@ -33,7 +33,7 @@ type splitAndScatterer interface { // splitAndScatterSpan issues a split request at a given key and then scatters // the range around the cluster. It returns the node ID of the leaseholder of // the span after the scatter. - splitAndScatterKey(ctx context.Context, db *kv.DB, kr *storageccl.KeyRewriter, key roachpb.Key) (roachpb.NodeID, error) + splitAndScatterKey(ctx context.Context, db *kv.DB, kr *storageccl.KeyRewriter, key roachpb.Key, randomizeLeases bool) (roachpb.NodeID, error) } // dbSplitAndScatter is the production implementation of this processor's @@ -46,7 +46,7 @@ type dbSplitAndScatterer struct{} // to which the span was scattered. If the destination node could not be // determined, node ID of 0 is returned. func (s dbSplitAndScatterer) splitAndScatterKey( - ctx context.Context, db *kv.DB, kr *storageccl.KeyRewriter, key roachpb.Key, + ctx context.Context, db *kv.DB, kr *storageccl.KeyRewriter, key roachpb.Key, randomizeLeases bool, ) (roachpb.NodeID, error) { expirationTime := db.Clock().Now().Add(time.Hour.Nanoseconds(), 0) newSpanKey, err := rewriteBackupSpanKey(kr, key) @@ -54,26 +54,32 @@ func (s dbSplitAndScatterer) splitAndScatterKey( return 0, err } - // TODO(dan): Really, this should be splitting the Key of - // the _next_ entry. + // TODO(pbardea): Really, this should be splitting the Key of the _next_ + // entry. log.VEventf(ctx, 1, "presplitting new key %+v", newSpanKey) if err := db.AdminSplit(ctx, newSpanKey, expirationTime); err != nil { return 0, errors.Wrapf(err, "splitting key %s", newSpanKey) } log.VEventf(ctx, 1, "scattering new key %+v", newSpanKey) - var ba roachpb.BatchRequest - ba.Header.ReturnRangeInfo = true - ba.Add(&roachpb.AdminScatterRequest{ + req := &roachpb.AdminScatterRequest{ RequestHeader: roachpb.RequestHeaderFromSpan(roachpb.Span{ Key: newSpanKey, EndKey: newSpanKey.Next(), }), - }) + // This is a bit of a hack, but it seems to be an effective one (see #36665 + // for graphs). As of the commit that added this, scatter is not very good + // at actually balancing leases. This is likely for two reasons: 1) there's + // almost certainly some regression in scatter's behavior, it used to work + // much better and 2) scatter has to operate by balancing leases for all + // ranges in a cluster, but in RESTORE, we really just want it to be + // balancing the span being restored into. + RandomizeLeases: randomizeLeases, + } - br, pErr := db.NonTransactionalSender().Send(ctx, ba) + res, pErr := kv.SendWrapped(ctx, db.NonTransactionalSender(), req) if pErr != nil { - // TODO(dan): Unfortunately, Scatter is still too unreliable to + // TODO(pbardea): Unfortunately, Scatter is still too unreliable to // fail the RESTORE when Scatter fails. I'm uncomfortable that // this could break entirely and not start failing the tests, // but on the bright side, it doesn't affect correctness, only @@ -83,14 +89,23 @@ func (s dbSplitAndScatterer) splitAndScatterKey( return 0, nil } - return s.findDestination(ctx, br), nil + return s.findDestination(res.(*roachpb.AdminScatterResponse)), nil } // findDestination returns the node ID of the node of the destination of the // AdminScatter request. If the destination cannot be found, 0 is returned. -func (s dbSplitAndScatterer) findDestination( - _ context.Context, _ *roachpb.BatchResponse, -) roachpb.NodeID { +func (s dbSplitAndScatterer) findDestination(res *roachpb.AdminScatterResponse) roachpb.NodeID { + // A request from a 20.1 node will not have a RangeInfos with a lease. + // For this mixed-version state, we'll report the destination as node 0 + // and suffer a bit of inefficiency. + if len(res.RangeInfos) > 0 { + // If the lease is not populated, we return the 0 value anyway. We receive 1 + // RangeInfo per range that was scattered. Since we send a scatter request + // to each range that we make, we are only interested in the first range, + // which contains the key at which we're splitting and scattering. + return res.RangeInfos[0].Lease.Replica.NodeID + } + return roachpb.NodeID(0) } @@ -215,7 +230,7 @@ func runSplitAndScatter( g.GoCtx(func(ctx context.Context) error { defer close(importSpanChunksCh) for _, importSpanChunk := range spec.Chunks { - _, err := scatterer.splitAndScatterKey(ctx, db, kr, importSpanChunk.Entries[0].Span.Key) + _, err := scatterer.splitAndScatterKey(ctx, db, kr, importSpanChunk.Entries[0].Span.Key, true /* randomizeLeases */) if err != nil { return err } @@ -229,9 +244,8 @@ func runSplitAndScatter( return nil }) - // TODO(dan): This tries to cover for a bad scatter by having 2 * the number - // of nodes in the cluster. Is it necessary? - // TODO(pbardea): Run some experiments to tune this knob. + // TODO(pbardea): This tries to cover for a bad scatter by having 2 * the + // number of nodes in the cluster. Is it necessary? splitScatterWorkers := 2 for worker := 0; worker < splitScatterWorkers; worker++ { g.GoCtx(func(ctx context.Context) error { @@ -239,7 +253,7 @@ func runSplitAndScatter( log.Infof(ctx, "processing a chunk") for _, importSpan := range importSpanChunk { log.Infof(ctx, "processing a span") - destination, err := scatterer.splitAndScatterKey(ctx, db, kr, importSpan.Span.Key) + destination, err := scatterer.splitAndScatterKey(ctx, db, kr, importSpan.Span.Key, false /* randomizeLeases */) if err != nil { return err } diff --git a/pkg/ccl/backupccl/split_and_scatter_processor_test.go b/pkg/ccl/backupccl/split_and_scatter_processor_test.go index 4c158afdc4bb..faefdb75840e 100644 --- a/pkg/ccl/backupccl/split_and_scatter_processor_test.go +++ b/pkg/ccl/backupccl/split_and_scatter_processor_test.go @@ -40,7 +40,7 @@ type mockScatterer struct { // This mock implementation of the split and scatterer simulates a scattering of // ranges. func (s *mockScatterer) splitAndScatterKey( - _ context.Context, _ *kv.DB, _ *storageccl.KeyRewriter, _ roachpb.Key, + _ context.Context, _ *kv.DB, _ *storageccl.KeyRewriter, _ roachpb.Key, _ bool, ) (roachpb.NodeID, error) { s.Lock() defer s.Unlock() From 0664a5d1d9080f4d3e4ca77b4d219fba7bf50019 Mon Sep 17 00:00:00 2001 From: Artem Ervits Date: Mon, 3 Aug 2020 13:20:59 -0400 Subject: [PATCH 3/5] cli: change label on printed build revision Fixes #52249 Release note (cli change): update label used for commit ID in printed version info --- pkg/cli/cli.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index c578fd28b89a..ea0074aa7505 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -141,18 +141,18 @@ Output build version information. RunE: func(cmd *cobra.Command, args []string) error { info := build.GetInfo() tw := tabwriter.NewWriter(os.Stdout, 2, 1, 2, ' ', 0) - fmt.Fprintf(tw, "Build Tag: %s\n", info.Tag) - fmt.Fprintf(tw, "Build Time: %s\n", info.Time) - fmt.Fprintf(tw, "Distribution: %s\n", info.Distribution) - fmt.Fprintf(tw, "Platform: %s", info.Platform) + fmt.Fprintf(tw, "Build Tag: %s\n", info.Tag) + fmt.Fprintf(tw, "Build Time: %s\n", info.Time) + fmt.Fprintf(tw, "Distribution: %s\n", info.Distribution) + fmt.Fprintf(tw, "Platform: %s", info.Platform) if info.CgoTargetTriple != "" { fmt.Fprintf(tw, " (%s)", info.CgoTargetTriple) } fmt.Fprintln(tw) - fmt.Fprintf(tw, "Go Version: %s\n", info.GoVersion) - fmt.Fprintf(tw, "C Compiler: %s\n", info.CgoCompiler) - fmt.Fprintf(tw, "Build SHA-1: %s\n", info.Revision) - fmt.Fprintf(tw, "Build Type: %s\n", info.Type) + fmt.Fprintf(tw, "Go Version: %s\n", info.GoVersion) + fmt.Fprintf(tw, "C Compiler: %s\n", info.CgoCompiler) + fmt.Fprintf(tw, "Build Commit ID: %s\n", info.Revision) + fmt.Fprintf(tw, "Build Type: %s\n", info.Type) return tw.Flush() }, } From 4ed88b93c7ce25f98832eb9abfa1f7801e99af50 Mon Sep 17 00:00:00 2001 From: Aayush Shah Date: Sun, 19 Jul 2020 23:40:30 -0400 Subject: [PATCH 4/5] kvserver: prevent follower reads while a range is subsumed. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Before this commit, during a merge, the RHS leaseholder’s store could continue broadcasting (actionable) closed timestamp updates even after it had been subsumed. This allowed the followers to be able to serve follower reads past the subsumption time of RHS. Additionally, after the merge, if the LHS had a lower closed timestamp than the RHS, it could allow writes to the keyspan owned by RHS at timestamps lower than the RHS’s max closed timestamp. This commit fixes this bug by requiring that the followers catch up to a LeaseAppliedIndex that belongs to the entry succeeding the Subsume request. It also adds necessary assertions around the invariant that while a range is subsumed, nothing (except the merge txn being rolled back) can bump its lease applied index. Fixes #44878 Release note (bug fix): Fixed a rare bug that could cause actionable closed timestamps to effectively regress over a given keyspan. This could in turn lead to a serializability violation when using follower reads. This was due to ill-defined interactions between range merges and the closed timestamp subsystem. --- .../batcheval/cmd_compute_checksum.go | 20 +- pkg/kv/kvserver/batcheval/cmd_subsume.go | 40 +- pkg/kv/kvserver/batcheval/cmd_subsume_test.go | 76 +++ pkg/kv/kvserver/batcheval/eval_context.go | 5 + pkg/kv/kvserver/client_replica_test.go | 5 +- pkg/kv/kvserver/closed_timestamp_test.go | 528 +++++++++++++++++- pkg/kv/kvserver/replica.go | 14 +- pkg/kv/kvserver/replica_closedts.go | 22 +- pkg/kv/kvserver/replica_eval_context_span.go | 7 + pkg/kv/kvserver/replica_learner_test.go | 2 +- pkg/kv/kvserver/replica_raftstorage.go | 7 + pkg/kv/kvserver/replica_range_lease.go | 4 +- pkg/kv/kvserver/replica_rangefeed_test.go | 4 +- pkg/kv/kvserver/replica_read.go | 1 - pkg/kv/kvserver/replica_test.go | 5 +- pkg/kv/kvserver/replica_write.go | 20 +- pkg/kv/kvserver/spanset/spanset.go | 19 + pkg/kv/kvserver/store.go | 10 + pkg/kv/kvserver/testing_knobs.go | 2 +- pkg/roachpb/batch_generated.go | 96 ++++ pkg/roachpb/gen_batch.go | 16 + 21 files changed, 850 insertions(+), 53 deletions(-) create mode 100644 pkg/kv/kvserver/batcheval/cmd_subsume_test.go diff --git a/pkg/kv/kvserver/batcheval/cmd_compute_checksum.go b/pkg/kv/kvserver/batcheval/cmd_compute_checksum.go index 2028f06f76b4..b96db6cc14c9 100644 --- a/pkg/kv/kvserver/batcheval/cmd_compute_checksum.go +++ b/pkg/kv/kvserver/batcheval/cmd_compute_checksum.go @@ -14,6 +14,7 @@ import ( "context" "time" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" @@ -27,11 +28,22 @@ func init() { } func declareKeysComputeChecksum( - _ *roachpb.RangeDescriptor, _ roachpb.Header, _ roachpb.Request, _, _ *spanset.SpanSet, + desc *roachpb.RangeDescriptor, + _ roachpb.Header, + _ roachpb.Request, + latchSpans, _ *spanset.SpanSet, ) { - // Intentionally declare no keys, as ComputeChecksum does not need to be - // serialized with any other commands. It simply needs to be committed into - // the Raft log. + // The correctness of range merges depends on the lease applied index of a + // range not being bumped while the RHS is subsumed. ComputeChecksum bumps a + // range's LAI and thus needs to be serialized with Subsume requests, in order + // prevent a rare closed timestamp violation due to writes on the post-merged + // range that violate a closed timestamp spuriously reported by the pre-merged + // range. This can, in turn, lead to a serializability violation. See comment + // at the end of Subsume() in cmd_subsume.go for details. Thus, it must + // declare access over at least one key. We choose to declare read-only access + // over the range descriptor key. + rdKey := keys.RangeDescriptorKey(desc.StartKey) + latchSpans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: rdKey}) } // Version numbers for Replica checksum computation. Requests silently no-op diff --git a/pkg/kv/kvserver/batcheval/cmd_subsume.go b/pkg/kv/kvserver/batcheval/cmd_subsume.go index 9435a9656f87..b0f6d0246f8e 100644 --- a/pkg/kv/kvserver/batcheval/cmd_subsume.go +++ b/pkg/kv/kvserver/batcheval/cmd_subsume.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" @@ -125,13 +126,50 @@ func Subsume( return result.Result{}, errors.New("non-deletion intent on local range descriptor") } + // We prevent followers of the RHS from being able to serve follower reads on + // timestamps that fall in the timestamp window representing the range's + // subsumed state (i.e. between the subsumption time (FreezeStart) and the + // timestamp at which the merge transaction commits or aborts), by requiring + // follower replicas to catch up to an MLAI that succeeds the range's current + // LeaseAppliedIndex (note that we're tracking lai + 1 below instead of lai). + // In case the merge successfully commits, this MLAI will never be caught up + // to since the RHS will be destroyed. In case the merge aborts, this ensures + // that the followers can only activate the newer closed timestamps once they + // catch up to the LAI associated with the merge abort. We need to do this + // because the closed timestamps that are broadcast by RHS in this subsumed + // state are not going to be reflected in the timestamp cache of the LHS range + // after the merge, which can cause a serializability violation. + // + // Note that we are essentially lying to the closed timestamp tracker here in + // order to achieve the effect of unactionable closed timestamp updates until + // the merge concludes. Tracking lai + 1 here ensures that the follower + // replicas need to catch up to at least that index before they are able to + // activate _any of the closed timestamps from this point onwards_. In other + // words, we will never publish a closed timestamp update for this range below + // this lai, regardless of whether a different proposal untracks a lower lai + // at any point in the future. + // + // NB: The above statement relies on the invariant that the LAI that follows a + // Subsume request will be applied only after the merge aborts. More + // specifically, this means that no intervening request can bump the LAI of + // range while it is subsumed. This invariant is upheld because the only Raft + // proposals allowed after a range has been subsumed are lease requests, which + // do not bump the LAI. In case there is lease transfer on this range while it + // is subsumed, we ensure that the initial MLAI update broadcast by the new + // leaseholder respects the invariant in question, in much the same way we do + // here. Take a look at `EmitMLAI()` in replica_closedts.go for more details. + _, untrack := cArgs.EvalCtx.GetTracker().Track(ctx) + lease, _ := cArgs.EvalCtx.GetLease() + lai := cArgs.EvalCtx.GetLeaseAppliedIndex() + untrack(ctx, ctpb.Epoch(lease.Epoch), desc.RangeID, ctpb.LAI(lai+1)) + // NOTE: the deletion intent on the range's meta2 descriptor is just as // important to correctness as the deletion intent on the local descriptor, // but the check is too expensive as it would involve a network roundtrip on // most nodes. reply.MVCCStats = cArgs.EvalCtx.GetMVCCStats() - reply.LeaseAppliedIndex = cArgs.EvalCtx.GetLeaseAppliedIndex() + reply.LeaseAppliedIndex = lai reply.FreezeStart = cArgs.EvalCtx.Clock().Now() return result.Result{ diff --git a/pkg/kv/kvserver/batcheval/cmd_subsume_test.go b/pkg/kv/kvserver/batcheval/cmd_subsume_test.go new file mode 100644 index 000000000000..1ce40c608796 --- /dev/null +++ b/pkg/kv/kvserver/batcheval/cmd_subsume_test.go @@ -0,0 +1,76 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package batcheval + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/uuid" +) + +// TestRequestsSerializeWithSubsume ensures that no request can be evaluated +// concurrently with a Subsume request. For more details, refer to the big +// comment block at the end of Subsume() in cmd_subsume.go. +// +// NB: This test is broader than it really needs to be. A more precise statement +// of the condition necessary to uphold the invariant mentioned in Subsume() is: +// No request that bumps the lease applied index of a range can be evaluated +// concurrently with a Subsume request. +func TestRequestsSerializeWithSubsume(t *testing.T) { + defer leaktest.AfterTest(t)() + var subsumeLatchSpans, subsumeLockSpans, otherLatchSpans, otherLockSpans spanset.SpanSet + startKey := []byte(`a`) + endKey := []byte(`b`) + desc := &roachpb.RangeDescriptor{ + RangeID: 0, + StartKey: startKey, + EndKey: endKey, + } + testTxn := &roachpb.Transaction{ + TxnMeta: enginepb.TxnMeta{ + ID: uuid.FastMakeV4(), + Key: startKey, + WriteTimestamp: hlc.Timestamp{WallTime: 1}, + }, + Name: "test txn", + } + header := roachpb.Header{Txn: testTxn} + subsumeRequest := &roachpb.SubsumeRequest{RightDesc: *desc} + declareKeysSubsume(desc, header, subsumeRequest, &subsumeLatchSpans, &subsumeLockSpans) + for method, command := range cmds { + t.Run(method.String(), func(t *testing.T) { + otherRequest := roachpb.CreateRequest(method) + if queryTxnReq, ok := otherRequest.(*roachpb.QueryTxnRequest); ok { + // QueryTxnRequest declares read-only access over the txn record of the txn + // it is supposed to query and not the txn that sent it. We fill this Txn + // field in here to prevent it from being nil and leading to the txn key + // falling outside our test range's keyspace. + queryTxnReq.Txn = testTxn.TxnMeta + } + + otherRequest.SetHeader(roachpb.RequestHeader{ + Key: startKey, + EndKey: endKey, + Sequence: 0, + }) + + command.DeclareKeys(desc, header, otherRequest, &otherLatchSpans, &otherLockSpans) + if !subsumeLatchSpans.Intersects(&otherLatchSpans) { + t.Errorf("%s does not serialize with Subsume", method) + } + }) + } +} diff --git a/pkg/kv/kvserver/batcheval/eval_context.go b/pkg/kv/kvserver/batcheval/eval_context.go index 5d2bbbbbb230..17417dcd96d9 100644 --- a/pkg/kv/kvserver/batcheval/eval_context.go +++ b/pkg/kv/kvserver/batcheval/eval_context.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -65,6 +66,7 @@ type EvalContext interface { GetFirstIndex() (uint64, error) GetTerm(uint64) (uint64, error) GetLeaseAppliedIndex() uint64 + GetTracker() closedts.TrackerI Desc() *roachpb.RangeDescriptor ContainsKey(key roachpb.Key) bool @@ -178,6 +180,9 @@ func (m *mockEvalCtxImpl) GetTerm(uint64) (uint64, error) { func (m *mockEvalCtxImpl) GetLeaseAppliedIndex() uint64 { panic("unimplemented") } +func (m *mockEvalCtxImpl) GetTracker() closedts.TrackerI { + panic("unimplemented") +} func (m *mockEvalCtxImpl) Desc() *roachpb.RangeDescriptor { return m.MockEvalCtx.Desc } diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index c57c491b2f41..efb8adbf2c88 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -1120,15 +1120,16 @@ func TestLeaseNotUsedAfterRestart(t *testing.T) { // below to trap any lease request and infer that it refers to the range we're // interested in. sc.TestingKnobs.DisableSplitQueue = true - sc.TestingKnobs.LeaseRequestEvent = func(ts hlc.Timestamp) { + sc.TestingKnobs.LeaseRequestEvent = func(ts hlc.Timestamp, _ roachpb.StoreID, _ roachpb.RangeID) *roachpb.Error { val := leaseAcquisitionTrap.Load() if val == nil { - return + return nil } trapCallback := val.(func(ts hlc.Timestamp)) if trapCallback != nil { trapCallback(ts) } + return nil } mtc := &multiTestContext{storeConfig: &sc} defer mtc.Stop() diff --git a/pkg/kv/kvserver/closed_timestamp_test.go b/pkg/kv/kvserver/closed_timestamp_test.go index 0deeac831910..fbdc5d888327 100644 --- a/pkg/kv/kvserver/closed_timestamp_test.go +++ b/pkg/kv/kvserver/closed_timestamp_test.go @@ -15,7 +15,9 @@ import ( gosql "database/sql" "fmt" "math/rand" + "reflect" "strconv" + "sync" "sync/atomic" "testing" "time" @@ -25,12 +27,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -38,11 +42,21 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" ) +var aggressiveResolvedTimestampClusterArgs = base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: aggressiveResolvedTimestampPushKnobs(), + }, + }, +} + func TestClosedTimestampCanServe(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -53,7 +67,8 @@ func TestClosedTimestampCanServe(t *testing.T) { skip.UnderRace(t) ctx := context.Background() - tc, db0, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t, testingTargetDuration) + tc, db0, desc, repls := setupClusterForClosedTimestampTesting(ctx, t, testingTargetDuration, + testingCloseFraction, aggressiveResolvedTimestampClusterArgs) defer tc.Stopper().Stop(ctx) if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { @@ -113,7 +128,8 @@ func TestClosedTimestampCanServeThroughoutLeaseTransfer(t *testing.T) { skip.UnderRace(t) ctx := context.Background() - tc, db0, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t, testingTargetDuration) + tc, db0, desc, repls := setupClusterForClosedTimestampTesting(ctx, t, testingTargetDuration, + testingCloseFraction, aggressiveResolvedTimestampClusterArgs) defer tc.Stopper().Stop(ctx) if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { @@ -185,7 +201,8 @@ func TestClosedTimestampCanServeWithConflictingIntent(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - tc, _, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t, testingTargetDuration) + tc, _, desc, repls := setupClusterForClosedTimestampTesting(ctx, t, testingTargetDuration, + testingCloseFraction, aggressiveResolvedTimestampClusterArgs) defer tc.Stopper().Stop(ctx) ds := tc.Server(0).DistSenderI().(*kvcoord.DistSender) @@ -266,7 +283,8 @@ func TestClosedTimestampCanServeAfterSplitAndMerges(t *testing.T) { skip.UnderRace(t) ctx := context.Background() - tc, db0, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t, testingTargetDuration) + tc, db0, desc, repls := setupClusterForClosedTimestampTesting(ctx, t, testingTargetDuration, + testingCloseFraction, aggressiveResolvedTimestampClusterArgs) // Disable the automatic merging. if _, err := db0.Exec("SET CLUSTER SETTING kv.range_merge.queue_enabled = false"); err != nil { t.Fatal(err) @@ -346,7 +364,8 @@ func TestClosedTimestampCantServeBasedOnMaxTimestamp(t *testing.T) { ctx := context.Background() // Set up the target duration to be very long and rely on lease transfers to // drive MaxClosed. - tc, db0, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t, time.Hour) + tc, db0, desc, repls := setupClusterForClosedTimestampTesting(ctx, t, time.Hour, + testingCloseFraction, aggressiveResolvedTimestampClusterArgs) defer tc.Stopper().Stop(ctx) if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { @@ -383,7 +402,8 @@ func TestClosedTimestampCantServeForWritingTransaction(t *testing.T) { skip.UnderRace(t) ctx := context.Background() - tc, db0, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t, testingTargetDuration) + tc, db0, desc, repls := setupClusterForClosedTimestampTesting(ctx, t, testingTargetDuration, + testingCloseFraction, aggressiveResolvedTimestampClusterArgs) defer tc.Stopper().Stop(ctx) if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { @@ -416,14 +436,15 @@ func TestClosedTimestampCantServeForNonTransactionalReadRequest(t *testing.T) { skip.UnderRace(t) ctx := context.Background() - tc, db0, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t, testingTargetDuration) + tc, db0, desc, repls := setupClusterForClosedTimestampTesting(ctx, t, testingTargetDuration, + testingCloseFraction, aggressiveResolvedTimestampClusterArgs) defer tc.Stopper().Stop(ctx) if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { t.Fatal(err) } - // Verify that we can serve a follower read at a timestamp. Wait if necessary. + // Verify that we can serve a follower read at a timestamp. Wait if necessary ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} baRead := makeReadBatchRequestForDesc(desc, ts) testutils.SucceedsSoon(t, func() error { @@ -445,9 +466,461 @@ func TestClosedTimestampCantServeForNonTransactionalReadRequest(t *testing.T) { verifyNotLeaseHolderErrors(t, baQueryTxn, repls, 2) } +// TestClosedTimestampInactiveAfterSubsumption verifies that, during a merge, +// replicas of the subsumed range (RHS) cannot serve follower reads for +// timestamps after the subsumption time. +func TestClosedTimestampInactiveAfterSubsumption(t *testing.T) { + defer leaktest.AfterTest(t)() + // Skipping under short because this test pauses for a few seconds in order to + // trigger a node liveness expiration. + skip.UnderShort(t) + + type postSubsumptionCallback func( + ctx context.Context, + t *testing.T, + tc serverutils.TestClusterInterface, + g *errgroup.Group, + rightDesc roachpb.RangeDescriptor, + rightLeaseholder roachpb.ReplicationTarget, + freezeStartTimestamp hlc.Timestamp, + leaseAcquisitionTrap *atomic.Value, + ) (roachpb.ReplicationTarget, hlc.Timestamp, error) + + type testCase struct { + name string + callback postSubsumptionCallback + } + + tests := []testCase{ + { + name: "without lease transfer", + callback: nil, + }, + { + name: "with intervening lease transfer", + // TODO(aayush): Maybe allowlist `TransferLease` requests while a range is + // subsumed and use that here, instead of forcing a lease transfer by + // pausing heartbeats. + callback: forceLeaseTransferOnSubsumedRange, + }, + } + + runTest := func(t *testing.T, callback postSubsumptionCallback) { + ctx := context.Background() + st := mergeFilterState{ + blockMergeTrigger: make(chan hlc.Timestamp), + finishMergeTxn: make(chan struct{}), + } + var leaseAcquisitionTrap atomic.Value + clusterArgs := base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + RaftConfig: base.RaftConfig{ + // We set the raft election timeout to a small duration. This should + // result in the node liveness duration being ~2.4 seconds + RaftHeartbeatIntervalTicks: 3, + RaftElectionTimeoutTicks: 4, + }, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + // This test suspends the merge txn right before it can apply the + // commit trigger and can lead to the merge txn taking longer than + // the defaults specified in aggressiveResolvedTimestampPushKnobs(). + // We use really high values here in order to avoid the merge txn + // being pushed due to resolved timestamps. + RangeFeedPushTxnsInterval: 5 * time.Second, + RangeFeedPushTxnsAge: 60 * time.Second, + TestingRequestFilter: st.suspendMergeTrigger, + LeaseRequestEvent: func(ts hlc.Timestamp, storeID roachpb.StoreID, rangeID roachpb.RangeID) *roachpb.Error { + val := leaseAcquisitionTrap.Load() + if val == nil { + return nil + } + leaseAcquisitionCallback := val.(func(storeID roachpb.StoreID, rangeID roachpb.RangeID) *roachpb.Error) + if err := leaseAcquisitionCallback(storeID, rangeID); err != nil { + return err + } + return nil + }, + DisableMergeQueue: true, + }, + }, + }, + } + // If the initial phase of the merge txn takes longer than the closed + // timestamp target duration, its initial CPuts can have their write + // timestamps bumped due to an intervening closed timestamp update. This + // causes the entire merge txn to retry. So we use a long closed timestamp + // duration at the beginning of the test until we have the merge txn + // suspended at its commit trigger, and then change it back down to + // `testingTargetDuration`. + tc, db, leftDesc, rightDesc := initClusterWithSplitRanges(ctx, t, 5*time.Second, + testingCloseFraction, clusterArgs) + defer tc.Stopper().Stop(ctx) + + leftLeaseholder := getCurrentLeaseholder(t, tc, leftDesc) + rightLeaseholder := getCurrentLeaseholder(t, tc, rightDesc) + if leftLeaseholder.StoreID == rightLeaseholder.StoreID { + // In this test, we may pause the heartbeats of the store that holds the + // lease for the right hand side range, in order to force a lease + // transfer. If the LHS and RHS share a leaseholder, this may cause a + // lease transfer for the left hand range as well. This can cause a merge + // txn retry and we'd like to avoid that, so we ensure that LHS and RHS + // have different leaseholders before beginning the test. + target := pickRandomTarget(tc, leftLeaseholder, leftDesc) + if err := tc.TransferRangeLease(leftDesc, target); err != nil { + t.Fatal(err) + } + leftLeaseholder = target + } + + g, ctx := errgroup.WithContext(ctx) + // Merge the ranges back together. The LHS rightLeaseholder should block right + // before the merge trigger request is sent. + g.Go(func() error { + return mergeTxn(tc, leftDesc) + }) + defer func() { + // Unblock the rightLeaseholder so it can finally commit the merge. + close(st.finishMergeTxn) + if err := g.Wait(); err != nil { + t.Error(err) + } + }() + + // We now have the RHS in its subsumed state. + freezeStartTimestamp := <-st.blockMergeTrigger + // Reduce the closed timestamp target duration in order to make the rest of + // the test faster. + if _, err := db.Exec(fmt.Sprintf(`SET CLUSTER SETTING kv.closed_timestamp.target_duration = '%s';`, + testingTargetDuration)); err != nil { + t.Fatal(err) + } + // inactiveClosedTSBoundary indicates the low water mark for closed + // timestamp updates beyond which we expect none of the followers to be able + // to serve follower reads until the merge is complete. + inactiveClosedTSBoundary := freezeStartTimestamp + if callback != nil { + newRightLeaseholder, ts, err := callback(ctx, t, tc, g, rightDesc, rightLeaseholder, + freezeStartTimestamp, &leaseAcquisitionTrap) + if err != nil { + t.Fatal(err) + } + rightLeaseholder, inactiveClosedTSBoundary = newRightLeaseholder, ts + } + // Poll the store for closed timestamp updates for timestamps greater than + // our `inactiveClosedTSBoundary`. + closedTimestampCh := make(chan ctpb.Entry, 1) + g.Go(func() (e error) { + pollForGreaterClosedTimestamp(t, tc, rightLeaseholder, rightDesc, inactiveClosedTSBoundary, closedTimestampCh) + return + }) + // We expect that none of the closed timestamp updates greater than + // `inactiveClosedTSBoundary` will be actionable by the RHS follower + // replicas. + log.Infof(ctx, "waiting for next closed timestamp update for the RHS") + select { + case <-closedTimestampCh: + case <-time.After(30 * time.Second): + t.Fatal("failed to receive next closed timestamp update") + } + baReadAfterLeaseTransfer := makeReadBatchRequestForDesc(rightDesc, inactiveClosedTSBoundary.Next()) + rightReplFollowers := getFollowerReplicas(ctx, t, tc, rightDesc, rightLeaseholder) + log.Infof(ctx, "sending read requests from followers after the inactiveClosedTSBoundary") + verifyNotLeaseHolderErrors(t, baReadAfterLeaseTransfer, rightReplFollowers, 2 /* expectedNLEs */) + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + runTest(t, test.callback) + }) + } + +} + +// forceLeaseTransferOnSubsumedRange triggers a lease transfer on `rightDesc` by +// pausing the liveness heartbeats of the store that holds the lease for it. +func forceLeaseTransferOnSubsumedRange( + ctx context.Context, + t *testing.T, + tc serverutils.TestClusterInterface, + g *errgroup.Group, + rightDesc roachpb.RangeDescriptor, + rightLeaseholder roachpb.ReplicationTarget, + freezeStartTimestamp hlc.Timestamp, + leaseAcquisitionTrap *atomic.Value, +) (newLeaseholder roachpb.ReplicationTarget, leaseStart hlc.Timestamp, err error) { + oldLeaseholderStore := getTargetStoreOrFatal(t, tc, rightLeaseholder) + // Co-operative lease transfers will block while a range is subsumed, so we + // pause the node liveness heartbeats until a lease transfer occurs. + oldLease, _ := oldLeaseholderStore.LookupReplica(rightDesc.StartKey).GetLease() + require.True(t, oldLease.Replica.StoreID == oldLeaseholderStore.StoreID()) + // Instantiate the lease acquisition callback right before we pause the node + // liveness heartbeats. We do this here because leases may be requested at + // any time for any reason, even before we pause the heartbeats. + leaseAcquisitionCh := make(chan roachpb.StoreID) + newRightLeaseholder := getFollowerReplicas(ctx, t, tc, rightDesc, rightLeaseholder)[0] + var once sync.Once + leaseAcquisitionTrap.Store(func(storeID roachpb.StoreID, rangeID roachpb.RangeID) *roachpb.Error { + if rangeID == rightDesc.RangeID { + if expectedStoreID := newRightLeaseholder.StoreID(); expectedStoreID != storeID { + return roachpb.NewError(&roachpb.NotLeaseHolderError{ + CustomMsg: fmt.Sprintf("only store %d must acquire the RHS's lease", expectedStoreID), + }) + } + once.Do(func() { + log.Infof(ctx, "received lease request from store %d for RHS range %d", + storeID, rangeID) + leaseAcquisitionCh <- storeID + }) + } + return nil + }) + restartHeartbeats := oldLeaseholderStore.NodeLiveness().DisableAllHeartbeatsForTest() + defer restartHeartbeats() + log.Infof(ctx, "paused RHS rightLeaseholder's liveness heartbeats") + time.Sleep(oldLeaseholderStore.NodeLiveness().GetLivenessThreshold()) + + // Send a read request from one of the followers of RHS so that it notices + // that the current rightLeaseholder has stopped heartbeating. This will prompt + // it to acquire the range lease for itself. + g.Go(func() error { + leaseAcquisitionRequest := makeReadBatchRequestForDesc(rightDesc, freezeStartTimestamp) + log.Infof(ctx, + "sending a read request from a follower of RHS (store %d) in order to trigger lease acquisition", + newRightLeaseholder.StoreID()) + _, pErr := newRightLeaseholder.Send(ctx, leaseAcquisitionRequest) + // After the merge commits, the RHS will cease to exist. Thus, we expect + // all pending queries on RHS to return RangeNotFoundErrors. + if !assert.ObjectsAreEqual(reflect.TypeOf(pErr.GetDetail()), reflect.TypeOf(&roachpb.RangeNotFoundError{})) { + return errors.AssertionFailedf("expected a RangeNotFoundError; got %v", pErr.GetDetail()) + } + return nil + }) + select { + case storeID := <-leaseAcquisitionCh: + if storeID != newRightLeaseholder.StoreID() { + err = errors.Newf("expected store %d to try to acquire the lease; got a request from store %d instead", + newRightLeaseholder.StoreID(), storeID) + return roachpb.ReplicationTarget{}, hlc.Timestamp{}, err + } + case <-time.After(30 * time.Second): + err = errors.New("failed to receive lease acquisition request") + return roachpb.ReplicationTarget{}, hlc.Timestamp{}, err + } + rightLeaseholder = roachpb.ReplicationTarget{ + NodeID: newRightLeaseholder.NodeID(), + StoreID: newRightLeaseholder.StoreID(), + } + oldLeaseholderStore = getTargetStoreOrFatal(t, tc, rightLeaseholder) + err = retry.ForDuration(testutils.DefaultSucceedsSoonDuration, func() error { + newLease, _ := oldLeaseholderStore.LookupReplica(rightDesc.StartKey).GetLease() + if newLease.Sequence == oldLease.Sequence { + return errors.New("RHS lease not updated") + } + leaseStart = newLease.Start + return nil + }) + if err != nil { + return + } + if !freezeStartTimestamp.LessEq(leaseStart) { + err = errors.New("freeze timestamp greater than the start time of the new lease") + return roachpb.ReplicationTarget{}, hlc.Timestamp{}, err + } + + return rightLeaseholder, leaseStart, nil +} + +type mergeFilterState struct { + retries int32 + + blockMergeTrigger chan hlc.Timestamp + finishMergeTxn chan struct{} +} + +// suspendMergeTrigger blocks a merge transaction right before its commit +// trigger is evaluated. This is intended to get the RHS range suspended in its +// subsumed state. The `finishMergeTxn` channel must be closed by the caller in +// order to unblock the merge txn. +func (state *mergeFilterState) suspendMergeTrigger( + ctx context.Context, ba roachpb.BatchRequest, +) *roachpb.Error { + for _, req := range ba.Requests { + if et := req.GetEndTxn(); et != nil && et.InternalCommitTrigger.GetMergeTrigger() != nil { + if state.retries >= 1 { + // TODO(aayush): make these tests resilient to merge txn retries. + return roachpb.NewError(errors.AssertionFailedf("merge txn retried")) + } + freezeStart := et.InternalCommitTrigger.MergeTrigger.FreezeStart + log.Infof(ctx, "suspending the merge txn with FreezeStart: %s", + freezeStart) + state.retries++ + // We block the LHS leaseholder from applying the merge trigger. Note + // that RHS followers will have already caught up to the leaseholder + // well before this point. + state.blockMergeTrigger <- freezeStart + // Let the merge try to commit. + <-state.finishMergeTxn + } + } + return nil +} + +func mergeTxn(tc serverutils.TestClusterInterface, leftDesc roachpb.RangeDescriptor) error { + ctx := context.Background() + ctx, record, cancel := tracing.ContextWithRecordingSpan(ctx, "merge txn") + defer cancel() + if _, err := tc.Server(0).MergeRanges(leftDesc.StartKey.AsRawKey()); err != nil { + log.Infof(ctx, "trace: %s", record().String()) + return err + } + return nil +} + +func initClusterWithSplitRanges( + ctx context.Context, + t *testing.T, + targetDuration time.Duration, + closeFraction float64, + clusterArgs base.TestClusterArgs, +) ( + serverutils.TestClusterInterface, + *gosql.DB, + roachpb.RangeDescriptor, + roachpb.RangeDescriptor, +) { + tc, db0, desc, repls := setupClusterForClosedTimestampTesting(ctx, t, targetDuration, + closeFraction, clusterArgs) + + if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { + t.Fatal(err) + } + if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(3, $1)`, "foo"); err != nil { + t.Fatal(err) + } + // Start by ensuring that the values can be read from all replicas at ts. + ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + baRead := makeReadBatchRequestForDesc(desc, ts) + testutils.SucceedsSoon(t, func() error { + return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(2)) + }) + // Manually split the table to have easier access to descriptors. + tableID, err := getTableID(db0, "cttest", "kv") + if err != nil { + t.Fatalf("failed to lookup ids: %+v", err) + } + + idxPrefix := keys.SystemSQLCodec.IndexPrefix(uint32(tableID), 1) + k, err := sqlbase.EncodeTableKey(idxPrefix, tree.NewDInt(2), encoding.Ascending) + if err != nil { + t.Fatalf("failed to encode split key: %+v", err) + } + tcImpl := tc.(*testcluster.TestCluster) + leftDesc, rightDesc := tcImpl.SplitRangeOrFatal(t, k) + if err := tcImpl.WaitForFullReplication(); err != nil { + t.Fatal(err) + } + return tc, db0, leftDesc, rightDesc +} + +func getCurrentMaxClosed( + t *testing.T, + tc serverutils.TestClusterInterface, + target roachpb.ReplicationTarget, + desc roachpb.RangeDescriptor, +) ctpb.Entry { + deadline := timeutil.Now().Add(2 * testingTargetDuration) + store := getTargetStoreOrFatal(t, tc, target) + var maxClosed ctpb.Entry + attempts := 0 + for attempts == 0 || timeutil.Now().Before(deadline) { + attempts++ + store.ClosedTimestamp().Storage.VisitDescending(target.NodeID, func(entry ctpb.Entry) (done bool) { + if _, ok := entry.MLAI[desc.RangeID]; ok { + maxClosed = entry + return true + } + return false + }) + if _, ok := maxClosed.MLAI[desc.RangeID]; !ok { + // We ran out of closed timestamps to visit without finding one that + // corresponds to rightDesc. It is likely that no closed timestamps have + // been broadcast for desc yet, try again. + continue + } + return maxClosed + } + return ctpb.Entry{} +} + +func pollForGreaterClosedTimestamp( + t *testing.T, + tc serverutils.TestClusterInterface, + target roachpb.ReplicationTarget, + desc roachpb.RangeDescriptor, + lowerBound hlc.Timestamp, + returnCh chan<- ctpb.Entry, +) { + for { + if t.Failed() { + return + } + maxClosed := getCurrentMaxClosed(t, tc, target, desc) + if _, ok := maxClosed.MLAI[desc.RangeID]; ok && lowerBound.LessEq(maxClosed.ClosedTimestamp) { + returnCh <- maxClosed + return + } + } +} + +func getFollowerReplicas( + ctx context.Context, + t *testing.T, + tc serverutils.TestClusterInterface, + rangeDesc roachpb.RangeDescriptor, + leaseholder roachpb.ReplicationTarget, +) []*kvserver.Replica { + repls := replsForRange(ctx, t, tc, rangeDesc) + followers := make([]*kvserver.Replica, 0, len(repls)-1) + for _, repl := range repls { + if repl.StoreID() == leaseholder.StoreID && repl.NodeID() == leaseholder.NodeID { + continue + } + followers = append(followers, repl) + } + return followers +} + +func getTargetStoreOrFatal( + t *testing.T, tc serverutils.TestClusterInterface, target roachpb.ReplicationTarget, +) (store *kvserver.Store) { + for i := 0; i < tc.NumServers(); i++ { + if server := tc.Server(i); server.NodeID() == target.NodeID && + server.GetStores().(*kvserver.Stores).HasStore(target.StoreID) { + store, err := server.GetStores().(*kvserver.Stores).GetStore(target.StoreID) + if err != nil { + t.Fatal(err) + } + return store + } + } + t.Fatalf("Could not find store for replication target %+v\n", target) + return nil +} + func verifyNotLeaseHolderErrors( t *testing.T, ba roachpb.BatchRequest, repls []*kvserver.Replica, expectedNLEs int, ) { + notLeaseholderErrs, err := countNotLeaseHolderErrors(ba, repls) + if err != nil { + t.Fatal(err) + } + if a, e := notLeaseholderErrs, int64(expectedNLEs); a != e { + t.Fatalf("expected %d NotLeaseHolderError; found %d", e, a) + } +} + +func countNotLeaseHolderErrors(ba roachpb.BatchRequest, repls []*kvserver.Replica) (int64, error) { g, ctx := errgroup.WithContext(context.Background()) var notLeaseholderErrs int64 for i := range repls { @@ -464,18 +937,16 @@ func verifyNotLeaseHolderErrors( }) } if err := g.Wait(); err != nil { - t.Fatal(err) - } - if a, e := notLeaseholderErrs, int64(expectedNLEs); a != e { - t.Fatalf("expected %d NotLeaseHolderError; found %d", e, a) + return 0, err } + return notLeaseholderErrs, nil } // Every 0.1s=100ms, try close out a timestamp ~300ms in the past. // We don't want to be more aggressive than that since it's also // a limit on how long transactions can run. const testingTargetDuration = 300 * time.Millisecond -const closeFraction = 0.333 +const testingCloseFraction = 0.333 const numNodes = 3 func replsForRange( @@ -541,28 +1012,27 @@ func aggressiveResolvedTimestampPushKnobs() *kvserver.StoreTestingKnobs { } } -// This function creates a test cluster that is prepared to exercise follower -// reads. The returned test cluster has follower reads enabled using the above -// targetDuration and closeFraction. In addition to the newly minted test -// cluster, this function returns a db handle to node 0, a range descriptor for -// the range used by the table `cttest.kv` and the replica objects corresponding -// to the replicas for the range. It is the caller's responsibility to Stop the -// Stopper on the returned test cluster when done. -func setupTestClusterForClosedTimestampTesting( - ctx context.Context, t *testing.T, targetDuration time.Duration, +// setupClusterForClosedTimestampTesting creates a test cluster that is prepared +// to exercise follower reads. The returned test cluster has follower reads +// enabled using the given targetDuration and testingCloseFraction. In addition +// to the newly minted test cluster, this function returns a db handle to node +// 0, a range descriptor for the range used by the table `cttest.kv` and the +// replica objects corresponding to the replicas for the range. It is the +// caller's responsibility to Stop the Stopper on the returned test cluster when +// done. +func setupClusterForClosedTimestampTesting( + ctx context.Context, + t *testing.T, + targetDuration time.Duration, + closeFraction float64, + clusterArgs base.TestClusterArgs, ) ( tc serverutils.TestClusterInterface, db0 *gosql.DB, kvTableDesc roachpb.RangeDescriptor, repls []*kvserver.Replica, ) { - tc = serverutils.StartTestCluster(t, numNodes, base.TestClusterArgs{ - ServerArgs: base.TestServerArgs{ - Knobs: base.TestingKnobs{ - Store: aggressiveResolvedTimestampPushKnobs(), - }, - }, - }) + tc = serverutils.StartTestCluster(t, numNodes, clusterArgs) db0 = tc.ServerConn(0) if _, err := db0.Exec(fmt.Sprintf(` diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 727330dbcbc7..8374d718196f 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -889,6 +889,16 @@ func (r *Replica) getMergeCompleteChRLocked() chan struct{} { return r.mu.mergeComplete } +func (r *Replica) mergeInProgress() bool { + r.mu.RLock() + defer r.mu.RUnlock() + return r.mergeInProgressRLocked() +} + +func (r *Replica) mergeInProgressRLocked() bool { + return r.mu.mergeComplete != nil +} + // setLastReplicaDescriptors sets the the most recently seen replica // descriptors to those contained in the *RaftMessageRequest, acquiring r.mu // to do so. @@ -1534,10 +1544,6 @@ func (r *Replica) maybeTransferRaftLeadershipLocked(ctx context.Context) { } } -func (r *Replica) mergeInProgressRLocked() bool { - return r.mu.mergeComplete != nil -} - func (r *Replica) getReplicaDescriptorByIDRLocked( replicaID roachpb.ReplicaID, fallback roachpb.ReplicaDescriptor, ) (roachpb.ReplicaDescriptor, error) { diff --git a/pkg/kv/kvserver/replica_closedts.go b/pkg/kv/kvserver/replica_closedts.go index f34a8c63bb35..06a460996c03 100644 --- a/pkg/kv/kvserver/replica_closedts.go +++ b/pkg/kv/kvserver/replica_closedts.go @@ -27,6 +27,7 @@ func (r *Replica) EmitMLAI() { } epoch := r.mu.state.Lease.Epoch isLeaseholder := r.mu.state.Lease.Replica.ReplicaID == r.mu.replicaID + isMergeInProgress := r.mergeInProgressRLocked() r.mu.RUnlock() // If we're the leaseholder of an epoch-based lease, notify the minPropTracker @@ -34,6 +35,25 @@ func (r *Replica) EmitMLAI() { if isLeaseholder && epoch > 0 { ctx := r.AnnotateCtx(context.Background()) _, untrack := r.store.cfg.ClosedTimestamp.Tracker.Track(ctx) - untrack(ctx, ctpb.Epoch(epoch), r.RangeID, ctpb.LAI(lai)) + if isMergeInProgress { + // A critical requirement for the correctness of range merges is that we + // don't allow follower reads on closed timestamps that are greater than + // the subsumption time of the RHS range. Thus, while a range is subsumed, + // we ensure that any intervening closed timestamp updates (until the + // merge either commits or aborts) can only be activated *after* the merge + // has completed (successfully or otherwise), by requiring that follower + // replicas must catch up to an MLAI that succeeds the range's current + // lease applied index. See comment block at the end of Subsume() in + // cmd_subsume.go for more details. + // + // Omitting the closed timestamp update here would be legal, but + // undesirable because if the range were to go on to quiesce, the follower + // replicas would not be able to implicitly tick their closed timestamps + // without `Request`ing it from the new leaseholder. Emitting it here + // avoids that little bit of latency. + untrack(ctx, ctpb.Epoch(epoch), r.RangeID, ctpb.LAI(lai+1)) + } else { + untrack(ctx, ctpb.Epoch(epoch), r.RangeID, ctpb.LAI(lai)) + } } } diff --git a/pkg/kv/kvserver/replica_eval_context_span.go b/pkg/kv/kvserver/replica_eval_context_span.go index cc91feeb7845..c7fb650a4b9c 100644 --- a/pkg/kv/kvserver/replica_eval_context_span.go +++ b/pkg/kv/kvserver/replica_eval_context_span.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" @@ -109,6 +110,12 @@ func (rec *SpanSetReplicaEvalContext) GetLeaseAppliedIndex() uint64 { return rec.i.GetLeaseAppliedIndex() } +// GetTracker returns the min prop tracker that keeps tabs over ongoing command +// evaluations for the closed timestamp subsystem. +func (rec *SpanSetReplicaEvalContext) GetTracker() closedts.TrackerI { + return rec.i.GetTracker() +} + // IsFirstRange returns true iff the replica belongs to the first range. func (rec *SpanSetReplicaEvalContext) IsFirstRange() bool { return rec.i.IsFirstRange() diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index b5c5378c9197..f97e4a418062 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -742,7 +742,7 @@ func TestLearnerAndJointConfigFollowerRead(t *testing.T) { defer tc.Stopper().Stop(ctx) db := sqlutils.MakeSQLRunner(tc.ServerConn(0)) db.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = $1`, testingTargetDuration) - db.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.close_fraction = $1`, closeFraction) + db.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.close_fraction = $1`, testingCloseFraction) db.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.follower_reads_enabled = true`) scratchStartKey := tc.ScratchRange(t) diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index 6ea02d61a901..925ea053e06e 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -18,6 +18,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry" @@ -351,6 +352,12 @@ func (r *Replica) GetLeaseAppliedIndex() uint64 { return r.mu.state.LeaseAppliedIndex } +// GetTracker returns the min prop tracker that keeps tabs over ongoing command +// evaluations for the closed timestamp subsystem. +func (r *Replica) GetTracker() closedts.TrackerI { + return r.store.cfg.ClosedTimestamp.Tracker +} + // Snapshot implements the raft.Storage interface. Snapshot requires that // r.mu is held. Note that the returned snapshot is a placeholder and // does not contain any of the replica data. The snapshot is actually generated diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index 00bedf805f7c..22c77f76732f 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -603,7 +603,9 @@ func (r *Replica) requestLeaseLocked( ctx context.Context, status kvserverpb.LeaseStatus, ) *leaseRequestHandle { if r.store.TestingKnobs().LeaseRequestEvent != nil { - r.store.TestingKnobs().LeaseRequestEvent(status.Timestamp) + if err := r.store.TestingKnobs().LeaseRequestEvent(status.Timestamp, r.StoreID(), r.GetRangeID()); err != nil { + return r.mu.pendingLeaseRequest.newResolvedHandle(err) + } } // Propose a Raft command to get a lease for this replica. repDesc, err := r.getReplicaDescriptorRLocked() diff --git a/pkg/kv/kvserver/replica_rangefeed_test.go b/pkg/kv/kvserver/replica_rangefeed_test.go index 31a5de533ceb..1853e4d6fcab 100644 --- a/pkg/kv/kvserver/replica_rangefeed_test.go +++ b/pkg/kv/kvserver/replica_rangefeed_test.go @@ -725,7 +725,7 @@ func TestReplicaRangefeedPushesTransactions(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - tc, db, _, repls := setupTestClusterForClosedTimestampTesting(ctx, t, testingTargetDuration) + tc, db, _, repls := setupClusterForClosedTimestampTesting(ctx, t, testingTargetDuration, testingCloseFraction, aggressiveResolvedTimestampClusterArgs) defer tc.Stopper().Stop(ctx) sqlDB := sqlutils.MakeSQLRunner(db) @@ -837,7 +837,7 @@ func TestReplicaRangefeedNudgeSlowClosedTimestamp(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - tc, db, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t, testingTargetDuration) + tc, db, desc, repls := setupClusterForClosedTimestampTesting(ctx, t, testingTargetDuration, testingCloseFraction, aggressiveResolvedTimestampClusterArgs) defer tc.Stopper().Stop(ctx) sqlDB := sqlutils.MakeSQLRunner(db) diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go index cc138cedb114..0527f2cc5136 100644 --- a/pkg/kv/kvserver/replica_read.go +++ b/pkg/kv/kvserver/replica_read.go @@ -127,7 +127,6 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes( log.VEventf(ctx, 2, "server-side retry of batch") } br, res, pErr = evaluateBatch(ctx, kvserverbase.CmdIDKey(""), rw, rec, nil, ba, true /* readOnly */) - // If we can retry, set a higher batch timestamp and continue. // Allow one retry only. if pErr == nil || retries > 0 || !canDoServersideRetry(ctx, pErr, ba, br, latchSpans, nil /* deadline */) { diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index df772d7b74b5..93876db8754c 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -720,15 +720,16 @@ func TestBehaviorDuringLeaseTransfer(t *testing.T) { tsc := TestStoreConfig(clock) var leaseAcquisitionTrap atomic.Value tsc.TestingKnobs.DisableAutomaticLeaseRenewal = true - tsc.TestingKnobs.LeaseRequestEvent = func(ts hlc.Timestamp) { + tsc.TestingKnobs.LeaseRequestEvent = func(ts hlc.Timestamp, _ roachpb.StoreID, _ roachpb.RangeID) *roachpb.Error { val := leaseAcquisitionTrap.Load() if val == nil { - return + return nil } trapCallback := val.(func(ts hlc.Timestamp)) if trapCallback != nil { trapCallback(ts) } + return nil } transferSem := make(chan struct{}) tsc.TestingKnobs.EvalKnobs.TestingEvalFilter = diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index 2ad2e64abc27..750250c3b735 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -142,11 +142,23 @@ func (r *Replica) executeWriteBatch( } g = nil // ownership passed to Raft, prevent misuse - // A max lease index of zero is returned when no proposal was made or a lease was proposed. - // In both cases, we don't need to communicate a MLAI. Furthermore, for lease proposals we - // cannot communicate under the lease's epoch. Instead the code calls EmitMLAI explicitly - // as a side effect of stepping up as leaseholder. + // A max lease index of zero is returned when no proposal was made or a lease + // was proposed. In case no proposal was made or a lease was proposed, we + // don't need to communicate a MLAI. Furthermore, for lease proposals we + // cannot communicate under the lease's epoch. Instead the code calls EmitMLAI + // explicitly as a side effect of stepping up as leaseholder. if maxLeaseIndex != 0 { + if r.mergeInProgress() { + // The correctness of range merges relies on the invariant that the + // LeaseAppliedIndex of the range is not bumped while a range is in its + // subsumed state. If this invariant is ever violated, the follower + // replicas of the subsumed range (RHS) are free to activate any future + // closed timestamp updates even before the merge completes. This would be + // a serializability violation. + // + // See comment block in Subsume() in cmd_subsume.go for details. + log.Fatalf(ctx, "lease applied index bumped while the range was subsumed") + } untrack(ctx, ctpb.Epoch(st.Lease.Epoch), r.RangeID, ctpb.LAI(maxLeaseIndex)) } diff --git a/pkg/kv/kvserver/spanset/spanset.go b/pkg/kv/kvserver/spanset/spanset.go index faffc8f877b3..4ba50823bcc5 100644 --- a/pkg/kv/kvserver/spanset/spanset.go +++ b/pkg/kv/kvserver/spanset/spanset.go @@ -196,6 +196,25 @@ func (s *SpanSet) MaxProtectedTimestamp() hlc.Timestamp { return maxTS } +// Intersects returns true iff the span set denoted by `other` has any +// overlapping spans with `s`, and that those spans overlap in access type. Note +// that timestamps associated with the spans in the spanset are not considered, +// only the span boundaries are checked. +func (s *SpanSet) Intersects(other *SpanSet) bool { + for sa := SpanAccess(0); sa < NumSpanAccess; sa++ { + for ss := SpanScope(0); ss < NumSpanScope; ss++ { + otherSpans := other.GetSpans(sa, ss) + for _, span := range otherSpans { + // If access is allowed, we must have an overlap. + if err := s.CheckAllowed(sa, span.Span); err == nil { + return true + } + } + } + } + return false +} + // AssertAllowed calls CheckAllowed and fatals if the access is not allowed. // Timestamps associated with the spans in the spanset are not considered, // only the span boundaries are checked. diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index beb3658fc3ee..acf10f089800 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -2219,6 +2219,16 @@ func (s *Store) Stopper() *stop.Stopper { return s.stopper } // TestingKnobs accessor. func (s *Store) TestingKnobs() *StoreTestingKnobs { return &s.cfg.TestingKnobs } +// ClosedTimestamp accessor. +func (s *Store) ClosedTimestamp() *container.Container { + return s.cfg.ClosedTimestamp +} + +// NodeLiveness accessor. +func (s *Store) NodeLiveness() *NodeLiveness { + return s.cfg.NodeLiveness +} + // IsDraining accessor. func (s *Store) IsDraining() bool { return s.draining.Load().(bool) diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 706e73a617e0..d735ca362abb 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -93,7 +93,7 @@ type StoreTestingKnobs struct { // LeaseRequestEvent, if set, is called when replica.requestLeaseLocked() is // called to acquire a new lease. This can be used to assert that a request // triggers a lease acquisition. - LeaseRequestEvent func(ts hlc.Timestamp) + LeaseRequestEvent func(ts hlc.Timestamp, storeID roachpb.StoreID, rangeID roachpb.RangeID) *roachpb.Error // LeaseTransferBlockedOnExtensionEvent, if set, is called when // replica.TransferLease() encounters an in-progress lease extension. // nextLeader is the replica that we're trying to transfer the lease to. diff --git a/pkg/roachpb/batch_generated.go b/pkg/roachpb/batch_generated.go index 6016beccf06d..db4de754ddb7 100644 --- a/pkg/roachpb/batch_generated.go +++ b/pkg/roachpb/batch_generated.go @@ -1252,3 +1252,99 @@ func (ba *BatchRequest) CreateReply() *BatchResponse { } return br } + +// CreateRequest creates an empty Request for each of the Method types. +func CreateRequest(method Method) Request { + switch method { + case Get: + return &GetRequest{} + case Put: + return &PutRequest{} + case ConditionalPut: + return &ConditionalPutRequest{} + case Increment: + return &IncrementRequest{} + case Delete: + return &DeleteRequest{} + case DeleteRange: + return &DeleteRangeRequest{} + case ClearRange: + return &ClearRangeRequest{} + case RevertRange: + return &RevertRangeRequest{} + case Scan: + return &ScanRequest{} + case EndTxn: + return &EndTxnRequest{} + case AdminSplit: + return &AdminSplitRequest{} + case AdminUnsplit: + return &AdminUnsplitRequest{} + case AdminMerge: + return &AdminMergeRequest{} + case AdminTransferLease: + return &AdminTransferLeaseRequest{} + case AdminChangeReplicas: + return &AdminChangeReplicasRequest{} + case AdminRelocateRange: + return &AdminRelocateRangeRequest{} + case HeartbeatTxn: + return &HeartbeatTxnRequest{} + case GC: + return &GCRequest{} + case PushTxn: + return &PushTxnRequest{} + case RecoverTxn: + return &RecoverTxnRequest{} + case ResolveIntent: + return &ResolveIntentRequest{} + case ResolveIntentRange: + return &ResolveIntentRangeRequest{} + case Merge: + return &MergeRequest{} + case TruncateLog: + return &TruncateLogRequest{} + case RequestLease: + return &RequestLeaseRequest{} + case ReverseScan: + return &ReverseScanRequest{} + case ComputeChecksum: + return &ComputeChecksumRequest{} + case CheckConsistency: + return &CheckConsistencyRequest{} + case InitPut: + return &InitPutRequest{} + case TransferLease: + return &TransferLeaseRequest{} + case LeaseInfo: + return &LeaseInfoRequest{} + case WriteBatch: + return &WriteBatchRequest{} + case Export: + return &ExportRequest{} + case Import: + return &ImportRequest{} + case QueryTxn: + return &QueryTxnRequest{} + case QueryIntent: + return &QueryIntentRequest{} + case AdminScatter: + return &AdminScatterRequest{} + case AddSSTable: + return &AddSSTableRequest{} + case RecomputeStats: + return &RecomputeStatsRequest{} + case Refresh: + return &RefreshRequest{} + case RefreshRange: + return &RefreshRangeRequest{} + case Subsume: + return &SubsumeRequest{} + case RangeStats: + return &RangeStatsRequest{} + case AdminVerifyProtectedTimestamp: + return &AdminVerifyProtectedTimestampRequest{} + default: + panic(fmt.Sprintf("unsupported method: %+v", method)) + } +} diff --git a/pkg/roachpb/gen_batch.go b/pkg/roachpb/gen_batch.go index 3191144929cc..649eea337069 100644 --- a/pkg/roachpb/gen_batch.go +++ b/pkg/roachpb/gen_batch.go @@ -319,6 +319,22 @@ func (ba *BatchRequest) CreateReply() *BatchResponse { } return br } +`) + + fmt.Fprint(f, ` +// CreateRequest creates an empty Request for each of the Method types. +func CreateRequest(method Method) Request { + switch method {`) + for _, v := range reqVariants { + fmt.Fprintf(f, ` + case %s: + return &%s{}`, v.msgType[:len(v.msgType)-7], v.msgType) + } + fmt.Fprintf(f, "%s", ` + default: + panic(fmt.Sprintf("unsupported method: %+v", method)) + } +} `) if err := f.Close(); err != nil { From 14584271d59e9a6a22c1659c9a37ef888f112146 Mon Sep 17 00:00:00 2001 From: Aayush Shah Date: Tue, 30 Jun 2020 19:50:16 -0400 Subject: [PATCH 5/5] kvserver: test interaction between ComputeChecksum and range merges This commit adds a "sanity check" test around the assertion at the end of Replica.propose() that ensures that the lease applied index of a subsumed range is never bumped. Release note: None --- pkg/kv/kvserver/client_merge_test.go | 103 +++++++++++++++++++++++++++ 1 file changed, 103 insertions(+) diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index ad704a7ce425..0fda10e841b4 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -1330,6 +1330,19 @@ func TestStoreRangeMergeSplitRace_SplitWins(t *testing.T) { } } +func checkConsistencyArgs(desc *roachpb.RangeDescriptor) *roachpb.CheckConsistencyRequest { + return &roachpb.CheckConsistencyRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: desc.StartKey.AsRawKey(), + EndKey: desc.EndKey.AsRawKey(), + }, + WithDiff: false, + Mode: 1, + Checkpoint: false, + Terminate: nil, + } +} + // TestStoreRangeMergeRHSLeaseExpiration verifies that, if the right-hand range // in a merge loses its lease while a merge is in progress, the new leaseholder // does not incorrectly serve traffic before the merge completes. @@ -1527,6 +1540,96 @@ func TestStoreRangeMergeRHSLeaseExpiration(t *testing.T) { } } +// TestStoreRangeMergeCheckConsistencyAfterSubsumption verifies the the following: +// 1. While a range is subsumed, ComputeChecksum requests wait until the merge +// is complete before proceeding. +// 2. Once a merge is aborted, pending (and future) requests will be allowed to +// be proposed. An assertion at the end of Replica.propose() ensures that the +// lease applied index of a range cannot be bumped while it is subsumed. A large +// comment block at the end of Subsume() in cmd_subsume.go explains the hazard +// in detail. This test is meant as a sanity check for this assertion. +func TestStoreRangeMergeCheckConsistencyAfterSubsumption(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + storeCfg := kvserver.TestStoreConfig(nil) + storeCfg.TestingKnobs.DisableReplicateQueue = true + storeCfg.TestingKnobs.DisableMergeQueue = true + + // Install a hook to control when the merge transaction aborts. + mergeEndTxnReceived := make(chan *roachpb.Transaction, 10) // headroom in case the merge transaction retries + abortMergeTxn := make(chan struct{}) + storeCfg.TestingKnobs.TestingRequestFilter = func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error { + for _, r := range ba.Requests { + if et := r.GetEndTxn(); et != nil && et.InternalCommitTrigger.GetMergeTrigger() != nil { + mergeEndTxnReceived <- ba.Txn + <-abortMergeTxn + return &roachpb.Error{ + Message: "abort the merge for test", + } + } + } + return nil + } + + mtc := &multiTestContext{ + storeConfig: &storeCfg, + startWithSingleRange: true, + } + + mtc.Start(t, 2) + defer mtc.Stop() + + // Create the ranges to be merged. Put both ranges on both stores, but give + // the second store the lease on the RHS. + lhsDesc, rhsDesc, err := createSplitRanges(ctx, mtc.stores[0]) + if err != nil { + t.Fatal(err) + } + mtc.replicateRange(lhsDesc.RangeID, 1) + mtc.replicateRange(rhsDesc.RangeID, 1) + mtc.transferLease(ctx, rhsDesc.RangeID, 0, 1) + + // Launch the merge. + mergeErr := make(chan *roachpb.Error) + go func() { + args := adminMergeArgs(lhsDesc.StartKey.AsRawKey()) + _, pErr := kv.SendWrapped(ctx, mtc.stores[0].TestSender(), args) + mergeErr <- pErr + }() + + // Wait for the merge transaction to send its EndTxn request. It won't + // be able to complete just yet, thanks to the hook we installed above. + <-mergeEndTxnReceived + + checkConsistencyResp := make(chan interface{}) + go func() { + args := checkConsistencyArgs(rhsDesc) + _, pErr := kv.SendWrapped(ctx, mtc.stores[1].TestSender(), args) + checkConsistencyResp <- pErr + }() + + select { + case <-checkConsistencyResp: + t.Fatalf("expected the consistency check to wait until the merge was complete") + case <-time.After(1 * time.Second): + } + + // Let the merge abort, and then ensure that the consistency check + // successfully goes through. + close(abortMergeTxn) + + pErr := <-mergeErr + require.IsType(t, &roachpb.Error{}, pErr) + require.Regexp(t, "abort the merge for test", pErr.Message) + + testutils.SucceedsSoon(t, func() error { + pErr := <-checkConsistencyResp + require.Nil(t, pErr) + return nil + }) +} + // TestStoreRangeMergeConcurrentRequests tests merging ranges that are serving // other traffic concurrently. func TestStoreRangeMergeConcurrentRequests(t *testing.T) {