Skip to content

Commit

Permalink
kvserver: deflake TestFollowerReadsWithStaleDescriptor
Browse files Browse the repository at this point in the history
A preceeding change (cockroachdb#62696) introduced a flakey update to this test.
Prior to that change, this test was using 2 voting replicas but that
change tried to make it use 1 voter and 1 non-voter instead (as a litmus
test for the new syntax added in cockroachdb#62696).

The test rebalances a replica away from a node and ensures that a
historical read sent immediately afterwards gets re-routed to the
leaseholder replica, since the receiving store had its replica
destroyed. However, when we're using a non-voter in this test, that
non-voter may not have learned about this replication change by the time
it receives this historical query and that fails the assertion.

This commit re-organizes the test and fixes the flake.

Release note: None
  • Loading branch information
aayushshah15 committed Mar 31, 2021
1 parent fe14895 commit 5afab80
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 17 deletions.
47 changes: 36 additions & 11 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,14 +503,29 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
recCh := make(chan tracing.Recording, 1)

var n2Addr, n3Addr syncutil.AtomicString
waitForSnapApplication := make(chan struct{})
tc := testcluster.StartTestCluster(t, 4,
base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{UseDatabase: "t"},
// n4 pretends to have low latency to n2 and n3, so that it tries to use
// them for follower reads.
// Also, we're going to collect a trace of the test's final query.
ServerArgsPerNode: map[int]base.TestServerArgs{
2: {
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
// We'll be adding a non-voter to n3 below. Non-voters upreplicate
// via snapshots sent by the raft snapshot queue. We'll use
// `waitForSnapApplication` to signal that.
AfterApplySnapshot: func(inSnap *kvserver.IncomingSnapshot) {
if inSnap.SnapType == kvserver.SnapshotRequest_VIA_SNAPSHOT_QUEUE {
close(waitForSnapApplication)
}
},
},
},
},
// n4 pretends to have low latency to n2 and n3, so that it tries to use
// them for follower reads.
// Also, we're going to collect a trace of the test's final query.
3: {
UseDatabase: "t",
Knobs: base.TestingKnobs{
Expand Down Expand Up @@ -540,8 +555,7 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
n1 := sqlutils.MakeSQLRunner(tc.Conns[0])
n1.Exec(t, `CREATE DATABASE t`)
n1.Exec(t, `CREATE TABLE test (k INT PRIMARY KEY)`)
n1.Exec(t, `ALTER TABLE test EXPERIMENTAL_RELOCATE VOTERS VALUES (ARRAY[1], 1)`)
n1.Exec(t, `ALTER TABLE test EXPERIMENTAL_RELOCATE NON_VOTERS VALUES (ARRAY[2], 1)`)
n1.Exec(t, `ALTER TABLE test EXPERIMENTAL_RELOCATE VOTERS VALUES (ARRAY[1,2], 1)`)
// Speed up closing of timestamps, as we'll in order to be able to use
// follower_read_timestamp().
// Every 0.2s we'll close the timestamp from 0.4s ago. We'll attempt follower reads
Expand Down Expand Up @@ -571,12 +585,23 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
require.Equal(t, roachpb.StoreID(1), entry.Lease().Replica.StoreID)
require.Equal(t, []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 2, StoreID: 2, ReplicaID: 2, Type: roachpb.ReplicaTypeNonVoter()},
{NodeID: 2, StoreID: 2, ReplicaID: 2},
}, entry.Desc().Replicas().Descriptors())

// Relocate the follower. n2 will no longer have a replica.
n1.Exec(t, `ALTER TABLE test EXPERIMENTAL_RELOCATE VOTERS VALUES (ARRAY[1,3], 1)`)
n1.Exec(t, `ALTER TABLE test EXPERIMENTAL_RELOCATE NON_VOTERS VALUES (ARRAY[], 1)`)
// Remove the follower and add a new non-voter to n3. n2 will no longer have a
// replica.
n1.Exec(t, `ALTER TABLE test EXPERIMENTAL_RELOCATE VOTERS VALUES (ARRAY[1], 1)`)
n1.Exec(t, `ALTER TABLE test EXPERIMENTAL_RELOCATE NON_VOTERS VALUES (ARRAY[3], 1)`)

// Wait until the new non-voter receives its snapshot. This needs to be done
// in case of non-voters since they up-replicate via a snapshot sent by the
// raft snapshot queue and not through an out-of-band snapshot like LEARNERs
// do.
select {
case <-waitForSnapApplication:
case <-time.After(testutils.DefaultSucceedsSoonDuration):
t.Fatal("snapshot for non-voter took too long")
}

// Execute the query again and assert the cache is updated. This query will
// not be executed as a follower read since it attempts to use n2 which
Expand All @@ -585,15 +610,15 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
n4.Exec(t, historicalQuery)
// As a sanity check, verify that this was not a follower read.
rec := <-recCh
require.False(t, kv.OnlyFollowerReads(rec), "query was not served through follower reads: %s", rec)
require.False(t, kv.OnlyFollowerReads(rec), "query was served through follower reads: %s", rec)
// Check that the cache was properly updated.
entry = n4Cache.GetCached(ctx, tablePrefix, false /* inverted */)
require.NotNil(t, entry)
require.False(t, entry.Lease().Empty())
require.Equal(t, roachpb.StoreID(1), entry.Lease().Replica.StoreID)
require.Equal(t, []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 3, StoreID: 3, ReplicaID: 3},
{NodeID: 3, StoreID: 3, ReplicaID: 3, Type: roachpb.ReplicaTypeNonVoter()},
}, entry.Desc().Replicas().Descriptors())

// Make a note of the follower reads metric on n3. We'll check that it was
Expand Down
16 changes: 11 additions & 5 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,9 @@ type IncomingSnapshot struct {
// point).
// See the comment on VersionUnreplicatedRaftTruncatedState for details.
UsesUnreplicatedTruncatedState bool
snapType SnapshotRequest_Type
// The type of the incoming snapshot. Used for metrics collection on the
// receiver.
SnapType SnapshotRequest_Type
}

func (s *IncomingSnapshot) String() string {
Expand All @@ -523,7 +525,7 @@ func (s *IncomingSnapshot) String() string {

// SafeFormat implements the redact.SafeFormatter interface.
func (s *IncomingSnapshot) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("%s snapshot %s at applied index %d", s.snapType, s.SnapUUID.Short(), s.State.RaftAppliedIndex)
w.Printf("%s snapshot %s at applied index %d", s.SnapType, s.SnapUUID.Short(), s.State.RaftAppliedIndex)
}

// snapshot creates an OutgoingSnapshot containing a rocksdb snapshot for the
Expand Down Expand Up @@ -789,6 +791,10 @@ func (r *Replica) applySnapshot(
log.Fatalf(ctx, "unexpected replica type %s while applying snapshot", typ)
}
}

if fn := r.store.TestingKnobs().AfterApplySnapshot; fn != nil {
fn(&inSnap)
}
}
}()

Expand Down Expand Up @@ -819,7 +825,7 @@ func (r *Replica) applySnapshot(
// Time to ingest SSTs.
ingestion time.Time
}
log.Infof(ctx, "applying snapshot of type %s [id=%s index=%d]", inSnap.snapType,
log.Infof(ctx, "applying snapshot of type %s [id=%s index=%d]", inSnap.SnapType,
inSnap.SnapUUID.Short(), snap.Metadata.Index)
defer func(start time.Time) {
now := timeutil.Now()
Expand All @@ -841,7 +847,7 @@ func (r *Replica) applySnapshot(
stats.ingestion.Sub(stats.subsumedReplicas).Seconds()*1000,
)
log.Infof(
ctx, "applied snapshot of type %s [%s%s%sid=%s index=%d]", inSnap.snapType, totalLog,
ctx, "applied snapshot of type %s [%s%s%sid=%s index=%d]", inSnap.SnapType, totalLog,
subsumedReplicasLog, ingestionLog, inSnap.SnapUUID.Short(), snap.Metadata.Index,
)
}(timeutil.Now())
Expand Down Expand Up @@ -948,7 +954,7 @@ func (r *Replica) applySnapshot(

// Ingest all SSTs atomically.
if fn := r.store.cfg.TestingKnobs.BeforeSnapshotSSTIngestion; fn != nil {
if err := fn(inSnap, inSnap.snapType, inSnap.SSTStorageScratch.SSTs()); err != nil {
if err := fn(inSnap, inSnap.SnapType, inSnap.SSTStorageScratch.SSTs()); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
SSTStorageScratch: kvSS.scratch,
LogEntries: logEntries,
State: &header.State,
snapType: header.Type,
SnapType: header.Type,
}

expLen := inSnap.State.RaftAppliedIndex - inSnap.State.TruncatedState.Index
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ type StoreTestingKnobs struct {
// acquiring snapshot quota or doing shouldAcceptSnapshotData checks. If an
// error is returned from the hook, it's sent as an ERROR SnapshotResponse.
ReceiveSnapshot func(*SnapshotRequest_Header) error
// AfterApplySnapshot is run after a snapshot is successfully applied by its
// recipient.
AfterApplySnapshot func(*IncomingSnapshot)
// ReplicaAddSkipRollback causes replica addition to skip the learner rollback
// that happens when promotion to a voter fails.
ReplicaAddSkipLearnerRollback func() bool
Expand Down

0 comments on commit 5afab80

Please sign in to comment.