Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
50265: kvserver: prevent follower reads while a range is subsumed r=nvanbenschoten,andreimatei a=aayushshah15

Before this commit, during a merge, the RHS leaseholder’s store
could continue broadcasting (actionable) closed timestamp updates
even after it had been subsumed. This allowed the followers to be
able to serve follower reads past the subsumption time of RHS.
Additionally, after the merge, if the LHS had a lower closed timestamp
than the RHS, it could allow writes to the keyspan owned by RHS
at timestamps lower than the RHS’s max closed timestamp.

This commit fixes this bug by requiring that the followers catch up
to a LeaseAppliedIndex that belongs to the entry succeeding the
Subsume request.

Fixes #44878

Release note (bug fix): Fixed a rare bug that could cause actionable
closed timestamps to effectively regress over a given keyspan. This
could in turn lead to a serializability violation when using follower
reads. This was due to ill-defined interactions between range merges
and the closed timestamp subsystem.



52241: backupccl: direct spans after split and scatter r=pbardea a=pbardea

The AdminScatter request sent in the SplitAndScatterProcessor returns
the lease information of the range after the scatter requst has
completed. The SplitAndScatterProcessor now looks at this field to
properly direct the spans to the appropriate RestoreData processor.

Release note: None.

52263: cli: change label on printed build revision r=dt a=dbist

Fixes #52249 

Release note (cli change): update label used for commit ID in printed version info

Co-authored-by: Aayush Shah <[email protected]>
Co-authored-by: Paul Bardea <[email protected]>
Co-authored-by: Artem Ervits <[email protected]>
  • Loading branch information
4 people committed Aug 4, 2020
4 parents 1cbffd2 + 1458427 + 2b5b45c + 0664a5d commit 6305432
Show file tree
Hide file tree
Showing 26 changed files with 1,008 additions and 90 deletions.
11 changes: 11 additions & 0 deletions pkg/base/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ const (
// See https://github.com/cockroachdb/cockroach/issues/20310.
DefaultMetricsSampleInterval = 10 * time.Second

// defaultRaftHeartbeatIntervalTicks is the default value for
// RaftHeartbeatIntervalTicks, which determines the number of ticks between
// each heartbeat.
defaultRaftHeartbeatIntervalTicks = 5

// defaultRPCHeartbeatInterval is the default value of RPCHeartbeatInterval
// used by the rpc context.
defaultRPCHeartbeatInterval = 3 * time.Second
Expand Down Expand Up @@ -300,6 +305,9 @@ type RaftConfig struct {
// unless overridden.
RaftElectionTimeoutTicks int

// RaftHeartbeatIntervalTicks is the number of ticks that pass between heartbeats.
RaftHeartbeatIntervalTicks int

// RangeLeaseRaftElectionTimeoutMultiplier specifies what multiple the leader
// lease active duration should be of the raft election timeout.
RangeLeaseRaftElectionTimeoutMultiplier float64
Expand Down Expand Up @@ -364,6 +372,9 @@ func (cfg *RaftConfig) SetDefaults() {
if cfg.RaftElectionTimeoutTicks == 0 {
cfg.RaftElectionTimeoutTicks = defaultRaftElectionTimeoutTicks
}
if cfg.RaftHeartbeatIntervalTicks == 0 {
cfg.RaftHeartbeatIntervalTicks = defaultRaftHeartbeatIntervalTicks
}
if cfg.RangeLeaseRaftElectionTimeoutMultiplier == 0 {
cfg.RangeLeaseRaftElectionTimeoutMultiplier = defaultRangeLeaseRaftElectionTimeoutMultiplier
}
Expand Down
52 changes: 33 additions & 19 deletions pkg/ccl/backupccl/split_and_scatter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type splitAndScatterer interface {
// splitAndScatterSpan issues a split request at a given key and then scatters
// the range around the cluster. It returns the node ID of the leaseholder of
// the span after the scatter.
splitAndScatterKey(ctx context.Context, db *kv.DB, kr *storageccl.KeyRewriter, key roachpb.Key) (roachpb.NodeID, error)
splitAndScatterKey(ctx context.Context, db *kv.DB, kr *storageccl.KeyRewriter, key roachpb.Key, randomizeLeases bool) (roachpb.NodeID, error)
}

// dbSplitAndScatter is the production implementation of this processor's
Expand All @@ -46,34 +46,40 @@ type dbSplitAndScatterer struct{}
// to which the span was scattered. If the destination node could not be
// determined, node ID of 0 is returned.
func (s dbSplitAndScatterer) splitAndScatterKey(
ctx context.Context, db *kv.DB, kr *storageccl.KeyRewriter, key roachpb.Key,
ctx context.Context, db *kv.DB, kr *storageccl.KeyRewriter, key roachpb.Key, randomizeLeases bool,
) (roachpb.NodeID, error) {
expirationTime := db.Clock().Now().Add(time.Hour.Nanoseconds(), 0)
newSpanKey, err := rewriteBackupSpanKey(kr, key)
if err != nil {
return 0, err
}

// TODO(dan): Really, this should be splitting the Key of
// the _next_ entry.
// TODO(pbardea): Really, this should be splitting the Key of the _next_
// entry.
log.VEventf(ctx, 1, "presplitting new key %+v", newSpanKey)
if err := db.AdminSplit(ctx, newSpanKey, expirationTime); err != nil {
return 0, errors.Wrapf(err, "splitting key %s", newSpanKey)
}

log.VEventf(ctx, 1, "scattering new key %+v", newSpanKey)
var ba roachpb.BatchRequest
ba.Header.ReturnRangeInfo = true
ba.Add(&roachpb.AdminScatterRequest{
req := &roachpb.AdminScatterRequest{
RequestHeader: roachpb.RequestHeaderFromSpan(roachpb.Span{
Key: newSpanKey,
EndKey: newSpanKey.Next(),
}),
})
// This is a bit of a hack, but it seems to be an effective one (see #36665
// for graphs). As of the commit that added this, scatter is not very good
// at actually balancing leases. This is likely for two reasons: 1) there's
// almost certainly some regression in scatter's behavior, it used to work
// much better and 2) scatter has to operate by balancing leases for all
// ranges in a cluster, but in RESTORE, we really just want it to be
// balancing the span being restored into.
RandomizeLeases: randomizeLeases,
}

br, pErr := db.NonTransactionalSender().Send(ctx, ba)
res, pErr := kv.SendWrapped(ctx, db.NonTransactionalSender(), req)
if pErr != nil {
// TODO(dan): Unfortunately, Scatter is still too unreliable to
// TODO(pbardea): Unfortunately, Scatter is still too unreliable to
// fail the RESTORE when Scatter fails. I'm uncomfortable that
// this could break entirely and not start failing the tests,
// but on the bright side, it doesn't affect correctness, only
Expand All @@ -83,14 +89,23 @@ func (s dbSplitAndScatterer) splitAndScatterKey(
return 0, nil
}

return s.findDestination(ctx, br), nil
return s.findDestination(res.(*roachpb.AdminScatterResponse)), nil
}

// findDestination returns the node ID of the node of the destination of the
// AdminScatter request. If the destination cannot be found, 0 is returned.
func (s dbSplitAndScatterer) findDestination(
_ context.Context, _ *roachpb.BatchResponse,
) roachpb.NodeID {
func (s dbSplitAndScatterer) findDestination(res *roachpb.AdminScatterResponse) roachpb.NodeID {
// A request from a 20.1 node will not have a RangeInfos with a lease.
// For this mixed-version state, we'll report the destination as node 0
// and suffer a bit of inefficiency.
if len(res.RangeInfos) > 0 {
// If the lease is not populated, we return the 0 value anyway. We receive 1
// RangeInfo per range that was scattered. Since we send a scatter request
// to each range that we make, we are only interested in the first range,
// which contains the key at which we're splitting and scattering.
return res.RangeInfos[0].Lease.Replica.NodeID
}

return roachpb.NodeID(0)
}

Expand Down Expand Up @@ -215,7 +230,7 @@ func runSplitAndScatter(
g.GoCtx(func(ctx context.Context) error {
defer close(importSpanChunksCh)
for _, importSpanChunk := range spec.Chunks {
_, err := scatterer.splitAndScatterKey(ctx, db, kr, importSpanChunk.Entries[0].Span.Key)
_, err := scatterer.splitAndScatterKey(ctx, db, kr, importSpanChunk.Entries[0].Span.Key, true /* randomizeLeases */)
if err != nil {
return err
}
Expand All @@ -229,17 +244,16 @@ func runSplitAndScatter(
return nil
})

// TODO(dan): This tries to cover for a bad scatter by having 2 * the number
// of nodes in the cluster. Is it necessary?
// TODO(pbardea): Run some experiments to tune this knob.
// TODO(pbardea): This tries to cover for a bad scatter by having 2 * the
// number of nodes in the cluster. Is it necessary?
splitScatterWorkers := 2
for worker := 0; worker < splitScatterWorkers; worker++ {
g.GoCtx(func(ctx context.Context) error {
for importSpanChunk := range importSpanChunksCh {
log.Infof(ctx, "processing a chunk")
for _, importSpan := range importSpanChunk {
log.Infof(ctx, "processing a span")
destination, err := scatterer.splitAndScatterKey(ctx, db, kr, importSpan.Span.Key)
destination, err := scatterer.splitAndScatterKey(ctx, db, kr, importSpan.Span.Key, false /* randomizeLeases */)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/split_and_scatter_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type mockScatterer struct {
// This mock implementation of the split and scatterer simulates a scattering of
// ranges.
func (s *mockScatterer) splitAndScatterKey(
_ context.Context, _ *kv.DB, _ *storageccl.KeyRewriter, _ roachpb.Key,
_ context.Context, _ *kv.DB, _ *storageccl.KeyRewriter, _ roachpb.Key, _ bool,
) (roachpb.NodeID, error) {
s.Lock()
defer s.Unlock()
Expand Down
16 changes: 8 additions & 8 deletions pkg/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,18 +141,18 @@ Output build version information.
RunE: func(cmd *cobra.Command, args []string) error {
info := build.GetInfo()
tw := tabwriter.NewWriter(os.Stdout, 2, 1, 2, ' ', 0)
fmt.Fprintf(tw, "Build Tag: %s\n", info.Tag)
fmt.Fprintf(tw, "Build Time: %s\n", info.Time)
fmt.Fprintf(tw, "Distribution: %s\n", info.Distribution)
fmt.Fprintf(tw, "Platform: %s", info.Platform)
fmt.Fprintf(tw, "Build Tag: %s\n", info.Tag)
fmt.Fprintf(tw, "Build Time: %s\n", info.Time)
fmt.Fprintf(tw, "Distribution: %s\n", info.Distribution)
fmt.Fprintf(tw, "Platform: %s", info.Platform)
if info.CgoTargetTriple != "" {
fmt.Fprintf(tw, " (%s)", info.CgoTargetTriple)
}
fmt.Fprintln(tw)
fmt.Fprintf(tw, "Go Version: %s\n", info.GoVersion)
fmt.Fprintf(tw, "C Compiler: %s\n", info.CgoCompiler)
fmt.Fprintf(tw, "Build SHA-1: %s\n", info.Revision)
fmt.Fprintf(tw, "Build Type: %s\n", info.Type)
fmt.Fprintf(tw, "Go Version: %s\n", info.GoVersion)
fmt.Fprintf(tw, "C Compiler: %s\n", info.CgoCompiler)
fmt.Fprintf(tw, "Build Commit ID: %s\n", info.Revision)
fmt.Fprintf(tw, "Build Type: %s\n", info.Type)
return tw.Flush()
},
}
Expand Down
20 changes: 16 additions & 4 deletions pkg/kv/kvserver/batcheval/cmd_compute_checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
Expand All @@ -27,11 +28,22 @@ func init() {
}

func declareKeysComputeChecksum(
_ *roachpb.RangeDescriptor, _ roachpb.Header, _ roachpb.Request, _, _ *spanset.SpanSet,
desc *roachpb.RangeDescriptor,
_ roachpb.Header,
_ roachpb.Request,
latchSpans, _ *spanset.SpanSet,
) {
// Intentionally declare no keys, as ComputeChecksum does not need to be
// serialized with any other commands. It simply needs to be committed into
// the Raft log.
// The correctness of range merges depends on the lease applied index of a
// range not being bumped while the RHS is subsumed. ComputeChecksum bumps a
// range's LAI and thus needs to be serialized with Subsume requests, in order
// prevent a rare closed timestamp violation due to writes on the post-merged
// range that violate a closed timestamp spuriously reported by the pre-merged
// range. This can, in turn, lead to a serializability violation. See comment
// at the end of Subsume() in cmd_subsume.go for details. Thus, it must
// declare access over at least one key. We choose to declare read-only access
// over the range descriptor key.
rdKey := keys.RangeDescriptorKey(desc.StartKey)
latchSpans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: rdKey})
}

// Version numbers for Replica checksum computation. Requests silently no-op
Expand Down
40 changes: 39 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_subsume.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
Expand Down Expand Up @@ -125,13 +126,50 @@ func Subsume(
return result.Result{}, errors.New("non-deletion intent on local range descriptor")
}

// We prevent followers of the RHS from being able to serve follower reads on
// timestamps that fall in the timestamp window representing the range's
// subsumed state (i.e. between the subsumption time (FreezeStart) and the
// timestamp at which the merge transaction commits or aborts), by requiring
// follower replicas to catch up to an MLAI that succeeds the range's current
// LeaseAppliedIndex (note that we're tracking lai + 1 below instead of lai).
// In case the merge successfully commits, this MLAI will never be caught up
// to since the RHS will be destroyed. In case the merge aborts, this ensures
// that the followers can only activate the newer closed timestamps once they
// catch up to the LAI associated with the merge abort. We need to do this
// because the closed timestamps that are broadcast by RHS in this subsumed
// state are not going to be reflected in the timestamp cache of the LHS range
// after the merge, which can cause a serializability violation.
//
// Note that we are essentially lying to the closed timestamp tracker here in
// order to achieve the effect of unactionable closed timestamp updates until
// the merge concludes. Tracking lai + 1 here ensures that the follower
// replicas need to catch up to at least that index before they are able to
// activate _any of the closed timestamps from this point onwards_. In other
// words, we will never publish a closed timestamp update for this range below
// this lai, regardless of whether a different proposal untracks a lower lai
// at any point in the future.
//
// NB: The above statement relies on the invariant that the LAI that follows a
// Subsume request will be applied only after the merge aborts. More
// specifically, this means that no intervening request can bump the LAI of
// range while it is subsumed. This invariant is upheld because the only Raft
// proposals allowed after a range has been subsumed are lease requests, which
// do not bump the LAI. In case there is lease transfer on this range while it
// is subsumed, we ensure that the initial MLAI update broadcast by the new
// leaseholder respects the invariant in question, in much the same way we do
// here. Take a look at `EmitMLAI()` in replica_closedts.go for more details.
_, untrack := cArgs.EvalCtx.GetTracker().Track(ctx)
lease, _ := cArgs.EvalCtx.GetLease()
lai := cArgs.EvalCtx.GetLeaseAppliedIndex()
untrack(ctx, ctpb.Epoch(lease.Epoch), desc.RangeID, ctpb.LAI(lai+1))

// NOTE: the deletion intent on the range's meta2 descriptor is just as
// important to correctness as the deletion intent on the local descriptor,
// but the check is too expensive as it would involve a network roundtrip on
// most nodes.

reply.MVCCStats = cArgs.EvalCtx.GetMVCCStats()
reply.LeaseAppliedIndex = cArgs.EvalCtx.GetLeaseAppliedIndex()
reply.LeaseAppliedIndex = lai
reply.FreezeStart = cArgs.EvalCtx.Clock().Now()

return result.Result{
Expand Down
76 changes: 76 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_subsume_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package batcheval

import (
"testing"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)

// TestRequestsSerializeWithSubsume ensures that no request can be evaluated
// concurrently with a Subsume request. For more details, refer to the big
// comment block at the end of Subsume() in cmd_subsume.go.
//
// NB: This test is broader than it really needs to be. A more precise statement
// of the condition necessary to uphold the invariant mentioned in Subsume() is:
// No request that bumps the lease applied index of a range can be evaluated
// concurrently with a Subsume request.
func TestRequestsSerializeWithSubsume(t *testing.T) {
defer leaktest.AfterTest(t)()
var subsumeLatchSpans, subsumeLockSpans, otherLatchSpans, otherLockSpans spanset.SpanSet
startKey := []byte(`a`)
endKey := []byte(`b`)
desc := &roachpb.RangeDescriptor{
RangeID: 0,
StartKey: startKey,
EndKey: endKey,
}
testTxn := &roachpb.Transaction{
TxnMeta: enginepb.TxnMeta{
ID: uuid.FastMakeV4(),
Key: startKey,
WriteTimestamp: hlc.Timestamp{WallTime: 1},
},
Name: "test txn",
}
header := roachpb.Header{Txn: testTxn}
subsumeRequest := &roachpb.SubsumeRequest{RightDesc: *desc}
declareKeysSubsume(desc, header, subsumeRequest, &subsumeLatchSpans, &subsumeLockSpans)
for method, command := range cmds {
t.Run(method.String(), func(t *testing.T) {
otherRequest := roachpb.CreateRequest(method)
if queryTxnReq, ok := otherRequest.(*roachpb.QueryTxnRequest); ok {
// QueryTxnRequest declares read-only access over the txn record of the txn
// it is supposed to query and not the txn that sent it. We fill this Txn
// field in here to prevent it from being nil and leading to the txn key
// falling outside our test range's keyspace.
queryTxnReq.Txn = testTxn.TxnMeta
}

otherRequest.SetHeader(roachpb.RequestHeader{
Key: startKey,
EndKey: endKey,
Sequence: 0,
})

command.DeclareKeys(desc, header, otherRequest, &otherLatchSpans, &otherLockSpans)
if !subsumeLatchSpans.Intersects(&otherLatchSpans) {
t.Errorf("%s does not serialize with Subsume", method)
}
})
}
}
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -65,6 +66,7 @@ type EvalContext interface {
GetFirstIndex() (uint64, error)
GetTerm(uint64) (uint64, error)
GetLeaseAppliedIndex() uint64
GetTracker() closedts.TrackerI

Desc() *roachpb.RangeDescriptor
ContainsKey(key roachpb.Key) bool
Expand Down Expand Up @@ -178,6 +180,9 @@ func (m *mockEvalCtxImpl) GetTerm(uint64) (uint64, error) {
func (m *mockEvalCtxImpl) GetLeaseAppliedIndex() uint64 {
panic("unimplemented")
}
func (m *mockEvalCtxImpl) GetTracker() closedts.TrackerI {
panic("unimplemented")
}
func (m *mockEvalCtxImpl) Desc() *roachpb.RangeDescriptor {
return m.MockEvalCtx.Desc
}
Expand Down
Loading

0 comments on commit 6305432

Please sign in to comment.