Skip to content

Commit

Permalink
Merge pull request #65500 from tbg/backport20.2-64060
Browse files Browse the repository at this point in the history
release-20.2: kvserver: fix delaying of splits with uninitialized followers
  • Loading branch information
tbg authored Jun 1, 2021
2 parents 577e08e + e9738cc commit 31cc103
Show file tree
Hide file tree
Showing 5 changed files with 412 additions and 168 deletions.
197 changes: 197 additions & 0 deletions pkg/kv/kvserver/client_split_burst_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvserver_test

import (
"bytes"
"context"
"math/rand"
"sync/atomic"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

type splitBurstTest struct {
*testcluster.TestCluster
baseKey roachpb.Key
numSplitsSeenOnSlowFollower *int32 // atomic
initialRaftSnaps int
}

func (sbt *splitBurstTest) SplitWithDelay(t *testing.T, location byte) {
t.Helper()
require.NoError(t, sbt.SplitWithDelayE(location))
}

func (sbt *splitBurstTest) SplitWithDelayE(location byte) error {
k := append([]byte(nil), sbt.baseKey...)
splitKey := append(k, location)
_, _, err := sbt.SplitRange(splitKey)
return err
}

func (sbt *splitBurstTest) NumRaftSnaps(t *testing.T) int {
var totalSnaps int
for i := range sbt.Servers {
var n int // num rows (sanity check against test rotting)
var c int // num Raft snapshots
if err := sbt.ServerConn(i).QueryRow(`
SELECT count(*), sum(value) FROM crdb_internal.node_metrics WHERE
name = 'range.snapshots.normal-applied'
`).Scan(&n, &c); err != nil {
t.Fatal(err)
}
require.EqualValues(t, 1, n)
totalSnaps += c
}
return totalSnaps - sbt.initialRaftSnaps
}

func setupSplitBurstTest(t *testing.T, delay time.Duration) *splitBurstTest {
numSplitsSeenOnSlowFollower := new(int32) // atomic
var quiesceCh <-chan struct{}
knobs := base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{
TestingApplyFilter: func(args kvserverbase.ApplyFilterArgs) (int, *roachpb.Error) {
if args.Split == nil || delay == 0 {
return 0, nil
}
if !bytes.HasPrefix(args.Split.LeftDesc.StartKey, keys.TableDataMax) {
// Unrelated split.
return 0, nil
}
select {
case <-time.After(delay):
case <-quiesceCh:
}
atomic.AddInt32(numSplitsSeenOnSlowFollower, 1)
return 0, nil
},
}}

// n1 and n3 are fast, n2 is slow (to apply the splits). We need
// three nodes here; delaying the apply loop on n2 also delays
// how quickly commands can reach quorum and would backpressure
// the splits by accident.
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ServerArgsPerNode: map[int]base.TestServerArgs{
1: {Knobs: knobs},
},
ReplicationMode: base.ReplicationManual,
})
quiesceCh = tc.Stopper().ShouldQuiesce()

k := tc.ScratchRange(t)
if _, err := tc.AddReplicas(k, tc.Target(1), tc.Target(2)); err != nil {
tc.Stopper().Stop(context.Background())
t.Fatal(err)
}

sbc := &splitBurstTest{
TestCluster: tc,
baseKey: k,
numSplitsSeenOnSlowFollower: numSplitsSeenOnSlowFollower,
}
sbc.initialRaftSnaps = sbc.NumRaftSnaps(t)
return sbc
}

func TestSplitBurstWithSlowFollower(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
t.Run("forward", func(t *testing.T) {
// When splitting at an increasing sequence of keys, in each step we split
// the most recently split range, and we expect the splits to wait for that
// range to have caught up its follower across the preceding split, which
// was delayed as well. So when we're done splitting we should have seen at
// least (numSplits-1) splits get applied on the slow follower.
// This end-to-end exercises `splitDelayHelper`.
//
// This test is fairly slow because every split will incur a 1s penalty
// (dictated by the raft leader's probing interval). We could fix this
// delay here and in production if we had a way to send a signal from
// the slow follower to the leader when the split trigger initializes
// the right-hand side. This is actually an interesting point, because
// the split trigger *replaces* a snapshot - but doesn't fully act like
// one: when a raft group applies a snapshot, it generates an MsgAppResp
// to the leader which will let the leader probe proactively. We could
// signal the split trigger to the raft group as a snapshot being applied
// (as opposed to recreating the in-memory instance as we do now), and
// then this MsgAppResp should be created automatically.

sbt := setupSplitBurstTest(t, 50*time.Millisecond)
defer sbt.Stopper().Stop(ctx)

const numSplits = byte(5)

for i := byte(0); i < numSplits; i++ {
sbt.SplitWithDelay(t, i)
}
// We should have applied all but perhaps the last split on the slow node.
// If we didn't, that indicates a failure to delay the splits accordingly.
require.GreaterOrEqual(t, atomic.LoadInt32(sbt.numSplitsSeenOnSlowFollower), int32(numSplits-1))
require.Zero(t, sbt.NumRaftSnaps(t))
})
t.Run("backward", func(t *testing.T) {
// When splitting at a decreasing sequence of keys, we're repeatedly splitting
// the first range. All of its followers are initialized to begin with, and
// even though there is a slow follower, `splitDelayHelper` isn't interested in
// delaying this here (which would imply that it's trying to check that every-
// one is "caught up").
// We set a 100s timeout so that below we can assert that `splitDelayHelper`
// isn't somehow forcing us to wait here.
infiniteDelay := 100 * time.Second
sbt := setupSplitBurstTest(t, infiniteDelay)
defer sbt.Stopper().Stop(ctx)

const numSplits = byte(50)

for i := byte(0); i < numSplits; i++ {
tBegin := timeutil.Now()
sbt.SplitWithDelay(t, numSplits-i)
if dur := timeutil.Since(tBegin); dur > infiniteDelay {
t.Fatalf("waited %s for split #%d", dur, i+1)
}
}
require.Zero(t, atomic.LoadInt32(sbt.numSplitsSeenOnSlowFollower))
require.Zero(t, sbt.NumRaftSnaps(t))
})
t.Run("random", func(t *testing.T) {
// When splitting randomly, we'll see a mixture of forward and backward
// splits, so we can't assert on how many split triggers we observe.
// However, there still shouldn't be any snapshots.
sbt := setupSplitBurstTest(t, 10*time.Millisecond)
defer sbt.Stopper().Stop(ctx)

const numSplits = 20
perm := rand.Perm(numSplits)

doSplit := func(ctx context.Context, i int) error {
return sbt.SplitWithDelayE(byte(perm[i]))
}
require.NoError(t, ctxgroup.GroupWorkers(ctx, numSplits, doSplit))

require.Zero(t, sbt.NumRaftSnaps(t))
})
}
65 changes: 0 additions & 65 deletions pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/ts"
"github.com/cockroachdb/cockroach/pkg/ts/tspb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -474,70 +473,6 @@ func TestStoreRangeSplitAtRangeBounds(t *testing.T) {
}
}

// TestSplitTriggerRaftSnapshotRace verifies that when an uninitialized Replica
// resulting from a split hasn't been initialized via the split trigger yet, a
// grace period prevents the replica from requesting an errant Raft snapshot.
// This is verified by running a number of splits and asserting that no Raft
// snapshots are observed. As a nice side effect, this also verifies that log
// truncations don't cause any Raft snapshots in this test.
func TestSplitTriggerRaftSnapshotRace(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
const numNodes = 3
var args base.TestClusterArgs
// NB: the merge queue is enabled for additional "chaos". Note that the test
// uses three nodes and so there is no replica movement, which would other-
// wise tickle Raft snapshots for unrelated reasons.
tc := testcluster.StartTestCluster(t, numNodes, args)
defer tc.Stopper().Stop(ctx)

numSplits := 100
if util.RaceEnabled {
// Running 100 splits is overkill in race builds.
numSplits = 10
}
perm := rand.Perm(numSplits)
idx := int32(-1) // accessed atomically

numRaftSnaps := func(when string) int {
var totalSnaps int
for i := 0; i < numNodes; i++ {
var n int // num rows (sanity check against test rotting)
var c int // num Raft snapshots
if err := tc.ServerConn(i).QueryRow(`
SELECT count(*), sum(value) FROM crdb_internal.node_metrics WHERE
name = 'range.snapshots.normal-applied'
`).Scan(&n, &c); err != nil {
t.Fatal(err)
}
if expRows := 1; n != expRows {
t.Fatalf("%s: expected %d rows, got %d", when, expRows, n)
}
totalSnaps += c
}
return totalSnaps
}

// There are usually no raft snaps before, but there is a race condition where
// they can occasionally happen during upreplication.
numSnapsBefore := numRaftSnaps("before")

doSplit := func(ctx context.Context, _ int) error {
_, _, err := tc.SplitRange(
[]byte(fmt.Sprintf("key-%d", perm[atomic.AddInt32(&idx, 1)])))
return err
}

if err := ctxgroup.GroupWorkers(ctx, numSplits, doSplit); err != nil {
t.Fatal(err)
}

// Check that no snaps happened during the splits.
require.Equal(t, numSnapsBefore, numRaftSnaps("after"))
}

// TestStoreRangeSplitIdempotency executes a split of a range and
// verifies that the resulting ranges respond to the right key ranges
// and that their stats have been properly accounted for and requests
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1508,6 +1508,7 @@ func (r *Replica) maybeCampaignOnWakeLocked(ctx context.Context) {
if err := r.mu.internalRaftGroup.Campaign(); err != nil {
log.VEventf(ctx, 1, "failed to campaign: %s", err)
}
r.store.enqueueRaftUpdateCheck(r.RangeID)
}
}

Expand Down
Loading

0 comments on commit 31cc103

Please sign in to comment.