Skip to content

Commit

Permalink
kvserver: prepare unit tests for raft bump
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
tbg committed Nov 2, 2022
1 parent 06f2469 commit 175a9f8
Show file tree
Hide file tree
Showing 23 changed files with 188 additions and 78 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestWaiterOnRejectedCommit(t *testing.T) {
commitCmdID.Store(args.CmdID)
return nil
},
TestingApplyFilter: func(args kvserverbase.ApplyFilterArgs) (int, *roachpb.Error) {
TestingApplyCalledTwiceFilter: func(args kvserverbase.ApplyFilterArgs) (int, *roachpb.Error) {
// We'll trap the processing of the commit command and return an error
// for it.
v := commitCmdID.Load()
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func TestMigrateWaitsForApplication(t *testing.T) {
DisableAutomaticVersionUpgrade: make(chan struct{}),
},
Store: &kvserver.StoreTestingKnobs{
TestingApplyFilter: func(args kvserverbase.ApplyFilterArgs) (int, *roachpb.Error) {
TestingApplyCalledTwiceFilter: func(args kvserverbase.ApplyFilterArgs) (int, *roachpb.Error) {
if args.StoreID == roachpb.StoreID(n3) && args.State != nil && args.State.Version != nil {
<-blockApplicationCh
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/client_mvcc_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand All @@ -32,7 +33,9 @@ func TestMVCCGCCorrectStats(t *testing.T) {
defer log.Scope(t).Close(t)

ctx := context.Background()
serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{})
var args base.TestServerArgs
args.Knobs.Store = &kvserver.StoreTestingKnobs{DisableCanAckBeforeApplication: true}
serv, _, _ := serverutils.StartServer(t, args)
s := serv.(*server.TestServer)
defer s.Stopper().Stop(ctx)

Expand Down
19 changes: 9 additions & 10 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4413,7 +4413,7 @@ func TestFailedConfChange(t *testing.T) {
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
TestingApplyFilter: testingApplyFilter,
TestingApplyCalledTwiceFilter: testingApplyFilter,
},
},
},
Expand Down Expand Up @@ -4957,18 +4957,17 @@ func TestAckWriteBeforeApplication(t *testing.T) {
repls int
expAckBeforeAppl bool
}{
// In a single-replica Range, each handleRaftReady iteration will append
// new entries to the Raft log and immediately apply them. This prevents
// "early acknowledgement" from being possible or useful. See the comment
// on apply.Task.AckCommittedEntriesBeforeApplication.
{1, false},

// In a three-replica Range, each handleRaftReady iteration will append
// a set of entries to the Raft log and then apply the previous set of
// entries. This makes "early acknowledgement" a major optimization, as
// it pulls the entire latency required to append the next set of entries
// to the Raft log out of the client-perceived latency of the previous
// set of entries.
{3, true},
// In the past, single-replica groups behaved differently but as of #89632
// they too rely on early-acks as a major performance improvement.
{1, true},
} {
t.Run(fmt.Sprintf("numRepls=%d", testcase.repls), func(t *testing.T) {
var filterActive int32
Expand All @@ -4990,8 +4989,8 @@ func TestAckWriteBeforeApplication(t *testing.T) {
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
TestingApplyFilter: applyFilterFn(blockPreApplication),
TestingPostApplyFilter: applyFilterFn(blockPostApplication),
TestingApplyCalledTwiceFilter: applyFilterFn(blockPreApplication),
TestingPostApplyFilter: applyFilterFn(blockPostApplication),
},
},
},
Expand Down Expand Up @@ -5020,7 +5019,7 @@ func TestAckWriteBeforeApplication(t *testing.T) {
expResult := func() {
t.Helper()
if pErr := <-ch; pErr != nil {
t.Fatalf("unexpected proposal result error: %v", pErr)
t.Errorf("unexpected proposal result error: %v", pErr)
}
}
dontExpResult := func() {
Expand All @@ -5029,7 +5028,7 @@ func TestAckWriteBeforeApplication(t *testing.T) {
case <-time.After(10 * time.Millisecond):
// Expected.
case pErr := <-ch:
t.Fatalf("unexpected proposal acknowledged before TestingApplyFilter: %v", pErr)
t.Errorf("unexpected proposal acknowledged before TestingApplyCalledTwiceFilter: %v", pErr)
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/client_replica_circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,10 +706,10 @@ func setupCircuitBreakerTest(t *testing.T) *circuitBreakerTest {
// n1. However, we don't control raft leadership placement and without this knob,
// n1 may refuse to acquire the lease, which we don't want.
AllowLeaseRequestProposalsWhenNotLeader: true,
// The TestingApplyFilter prevents n2 from requesting a lease (or from the lease
// The TestingApplyCalledTwiceFilter prevents n2 from requesting a lease (or from the lease
// being transferred to n2). The test seems to pass pretty reliably without this
// but it can't hurt.
TestingApplyFilter: func(args kvserverbase.ApplyFilterArgs) (int, *roachpb.Error) {
TestingApplyCalledTwiceFilter: func(args kvserverbase.ApplyFilterArgs) (int, *roachpb.Error) {
if !args.IsLeaseRequest {
return 0, nil
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func TestLeaseholdersRejectClockUpdateWithJump(t *testing.T) {
Server: &server.TestingKnobs{
WallClock: manual,
},
Store: &kvserver.StoreTestingKnobs{DisableCanAckBeforeApplication: true},
},
})
s := serv.(*server.TestServer)
Expand Down Expand Up @@ -2769,7 +2770,13 @@ func TestClearRange(t *testing.T) {
defer log.Scope(t).Close(t)

ctx := context.Background()
serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{})
serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{
// This makes sure that our writes are visible when we go
// straight to the engine to check them.
DisableCanAckBeforeApplication: true,
}},
})
s := serv.(*server.TestServer)
defer s.Stopper().Stop(ctx)
store, err := s.Stores().GetStore(s.GetFirstStoreID())
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_split_burst_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func setupSplitBurstTest(t *testing.T, delay time.Duration) *splitBurstTest {
numSplitsSeenOnSlowFollower := new(int32) // atomic
var quiesceCh <-chan struct{}
knobs := base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{
TestingApplyFilter: func(args kvserverbase.ApplyFilterArgs) (int, *roachpb.Error) {
TestingApplyCalledTwiceFilter: func(args kvserverbase.ApplyFilterArgs) (int, *roachpb.Error) {
if args.Split == nil || delay == 0 {
return 0, nil
}
Expand Down
28 changes: 19 additions & 9 deletions pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,8 +599,9 @@ func TestStoreRangeSplitIdempotency(t *testing.T) {
serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
DisableMergeQueue: true,
DisableSplitQueue: true,
DisableMergeQueue: true,
DisableSplitQueue: true,
DisableCanAckBeforeApplication: true,
},
},
})
Expand Down Expand Up @@ -760,8 +761,9 @@ func TestStoreRangeSplitMergeStats(t *testing.T) {
serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
DisableMergeQueue: true,
DisableSplitQueue: true,
DisableMergeQueue: true,
DisableSplitQueue: true,
DisableCanAckBeforeApplication: true,
},
},
})
Expand Down Expand Up @@ -843,7 +845,8 @@ func TestStoreRangeSplitMergeStats(t *testing.T) {

// Merge the ranges back together, and assert that the merged stats
// agree with the pre-split stats.
_, pErr = kv.SendWrapped(ctx, store.TestSender(), adminMergeArgs(repl.Desc().StartKey.AsRawKey()))
mergeKey := repl.Desc().StartKey.AsRawKey()
_, pErr = kv.SendWrapped(ctx, store.TestSender(), adminMergeArgs(mergeKey))
require.NoError(t, pErr.GoError())

repl = store.LookupReplica(roachpb.RKey(keyPrefix))
Expand Down Expand Up @@ -1250,10 +1253,11 @@ func TestStoreRangeSplitBackpressureWrites(t *testing.T) {
DefaultZoneConfigOverride: &zoneConfig,
},
Store: &kvserver.StoreTestingKnobs{
DisableGCQueue: true,
DisableMergeQueue: true,
DisableSplitQueue: true,
TestingRequestFilter: testingRequestFilter,
DisableCanAckBeforeApplication: true,
DisableGCQueue: true,
DisableMergeQueue: true,
DisableSplitQueue: true,
TestingRequestFilter: testingRequestFilter,
},
},
})
Expand Down Expand Up @@ -2558,6 +2562,9 @@ func TestUnsplittableRange(t *testing.T) {
Store: &kvserver.StoreTestingKnobs{
DisableMergeQueue: true,
SplitQueuePurgatoryChan: splitQueuePurgatoryChan,
// Without this, the test is flaky: the range does not end up in
// purgatory "synchronously".
DisableCanAckBeforeApplication: true,
},
Server: &server.TestingKnobs{
WallClock: manualClock,
Expand Down Expand Up @@ -2769,6 +2776,9 @@ func TestStoreCapacityAfterSplit(t *testing.T) {
Server: &server.TestingKnobs{
WallClock: manualClock,
},
Store: &kvserver.StoreTestingKnobs{
DisableCanAckBeforeApplication: true,
},
},
},
})
Expand Down
7 changes: 6 additions & 1 deletion pkg/kv/kvserver/client_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand All @@ -30,7 +31,11 @@ func TestComputeStatsForKeySpan(t *testing.T) {
defer log.Scope(t).Close(t)

ctx := context.Background()
serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{})
serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{DisableCanAckBeforeApplication: true},
},
})
s := serv.(*server.TestServer)
defer s.Stopper().Stop(ctx)
store, err := s.Stores().GetStore(s.GetFirstStoreID())
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/consistency_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func TestCheckConsistencyReplay(t *testing.T) {
}
// Arrange to count the number of times each checksum command applies to each
// store.
testKnobs.TestingApplyFilter = func(args kvserverbase.ApplyFilterArgs) (int, *roachpb.Error) {
testKnobs.TestingPostApplyFilter = func(args kvserverbase.ApplyFilterArgs) (int, *roachpb.Error) {
state.Lock()
defer state.Unlock()
if ccr := args.ComputeChecksum; ccr != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/loqrecovery/collect_raft_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ func checkRaftLog(
nodeToMonitor: {
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
TestingApplyFilter: raftFilter,
DisableGCQueue: true,
TestingPostApplyFilter: raftFilter,
DisableGCQueue: true,
},
},
StoreSpecs: []base.StoreSpec{{InMemory: true}},
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/replica_application_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) {
StoreID: r.store.StoreID(),
RangeID: r.RangeID,
Req: cmd.proposal.Request,
ForcedError: cmd.forcedErr,
})
if cmd.proposalRetry == 0 {
cmd.proposalRetry = proposalReevaluationReason(newPropRetry)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (r *Replica) shouldApplyCommand(
ctx, cmd.idKey, &cmd.raftCmd, cmd.IsLocal(), replicaState,
)
// Consider testing-only filters.
if filter := r.store.cfg.TestingKnobs.TestingApplyFilter; cmd.forcedErr != nil || filter != nil {
if filter := r.store.cfg.TestingKnobs.TestingApplyCalledTwiceFilter; cmd.forcedErr != nil || filter != nil {
args := kvserverbase.ApplyFilterArgs{
CmdID: cmd.idKey,
ReplicatedEvalResult: *cmd.replicatedResult(),
Expand Down
45 changes: 31 additions & 14 deletions pkg/kv/kvserver/replica_application_state_machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,24 +255,40 @@ func TestReplicaStateMachineRaftLogTruncationLooselyCoupled(t *testing.T) {
// non-deterministic flush to cause the test to fail.
tc.store.engine.RegisterFlushCompletedCallback(func() {})
r := tc.repl
r.mu.Lock()
raftAppliedIndex := r.mu.state.RaftAppliedIndex
truncatedIndex := r.mu.state.TruncatedState.Index
raftLogSize := r.mu.raftLogSize
// Overwrite to be trusted, since we want to check if transitions to false
// or not.
r.mu.raftLogSizeTrusted = true
r.mu.Unlock()
expectedFirstIndex := truncatedIndex + 1
if !accurate {
expectedFirstIndex = truncatedIndex

{
k := tc.repl.Desc().EndKey.AsRawKey().Prevish(10)
pArgs := putArgs(k, []byte("foo"))
_, pErr := tc.SendWrapped(&pArgs)
require.NoError(t, pErr.GoError())
gArgs := getArgs(k)
_, pErr = tc.SendWrapped(&gArgs)
require.NoError(t, pErr.GoError())
}

// Enqueue the truncation.
func() {
// Lock the replica.
raftLogSize, truncatedIndex := func() (_rls int64, truncIdx uint64) {
// Lock the replica. We do this early to avoid interference from any other
// moving parts on the Replica, whatever they may be. For example, we don't
// want a skewed lease applied index because commands are applying concurrently
// while we are busy picking values. Though note that we flush out commands above
// because even if we serialize correctly, there might be an unapplied command
// that already consumed our chosen lease index, we just wouldn't know.
r.raftMu.Lock()
defer r.raftMu.Unlock()
r.mu.Lock()
raftAppliedIndex := r.mu.state.RaftAppliedIndex
truncatedIndex := r.mu.state.TruncatedState.Index
raftLogSize := r.mu.raftLogSize
// Overwrite to be trusted, since we want to check if transitions to false
// or not.
r.mu.raftLogSizeTrusted = true
r.mu.Unlock()
expectedFirstIndex := truncatedIndex + 1
if !accurate {
expectedFirstIndex = truncatedIndex
}

// Enqueue the truncation.
sm := r.getStateMachine()

// Create a new application batch.
Expand Down Expand Up @@ -329,6 +345,7 @@ func TestReplicaStateMachineRaftLogTruncationLooselyCoupled(t *testing.T) {
require.Equal(t, expectedFirstIndex, trunc.expectedFirstIndex)
require.EqualValues(t, -1, trunc.logDeltaBytes)
require.True(t, trunc.isDeltaTrusted)
return raftLogSize, truncatedIndex
}()
require.NoError(t, tc.store.Engine().Flush())
// Asynchronous call to advance durability.
Expand Down
21 changes: 17 additions & 4 deletions pkg/kv/kvserver/replica_closedts_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,14 +656,25 @@ func TestQueryResolvedTimestamp(t *testing.T) {
tc.manualClock = timeutil.NewManualTime(timeutil.Unix(0, 1)) // required by StartWithStoreConfig
cfg := TestStoreConfig(hlc.NewClock(tc.manualClock, 100*time.Nanosecond) /* maxOffset */)
cfg.TestingKnobs.DontCloseTimestamps = true
// Make sure commands are visible by the time they are applied. Otherwise
// this test can be flaky because we might have a lease applied index
// assigned to a command that is committed but not applied yet. When we
// then "commit" a command out of band, and the stored command gets
// applied, their indexes will clash and cause a fatal error.
cfg.TestingKnobs.DisableCanAckBeforeApplication = true
tc.StartWithStoreConfig(ctx, t, stopper, cfg)

// Write an intent.
txn := roachpb.MakeTransaction("test", intentKey, 0, intentTS, 0, 0)
pArgs := putArgs(intentKey, []byte("val"))
assignSeqNumsForReqs(&txn, &pArgs)
_, pErr := kv.SendWrappedWith(ctx, tc.Sender(), roachpb.Header{Txn: &txn}, &pArgs)
require.Nil(t, pErr)
{
pArgs := putArgs(intentKey, []byte("val"))
assignSeqNumsForReqs(&txn, &pArgs)
_, pErr := kv.SendWrappedWith(ctx, tc.Sender(), roachpb.Header{Txn: &txn}, &pArgs)
require.Nil(t, pErr)
}

// NB: the put is now visible, in particular it has applied, thanks
// to the testing knobs in this test.

// Inject a closed timestamp.
tc.repl.mu.Lock()
Expand Down Expand Up @@ -958,6 +969,7 @@ func TestServerSideBoundedStalenessNegotiation(t *testing.T) {
tc.manualClock = timeutil.NewManualTime(timeutil.Unix(0, 1)) // required by StartWithStoreConfig
cfg := TestStoreConfig(hlc.NewClock(tc.manualClock, 100*time.Nanosecond) /* maxOffset */)
cfg.TestingKnobs.DontCloseTimestamps = true
cfg.TestingKnobs.DisableCanAckBeforeApplication = true
tc.StartWithStoreConfig(ctx, t, stopper, cfg)

// Write an intent.
Expand Down Expand Up @@ -1136,6 +1148,7 @@ func TestServerSideBoundedStalenessNegotiationWithResumeSpan(t *testing.T) {
tc.manualClock = timeutil.NewManualTime(timeutil.Unix(0, 1)) // required by StartWithStoreConfig
cfg := TestStoreConfig(hlc.NewClock(tc.manualClock, 100*time.Nanosecond) /* maxOffset */)
cfg.TestingKnobs.DontCloseTimestamps = true
cfg.TestingKnobs.DisableCanAckBeforeApplication = true
tc.StartWithStoreConfig(ctx, t, stopper, cfg)

// Set up the test.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_closedts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func TestBumpSideTransportClosed(t *testing.T) {
}
return 0, nil
}
return &kvserver.StoreTestingKnobs{TestingApplyFilter: testingApplyFilter}, applyC
return &kvserver.StoreTestingKnobs{TestingApplyCalledTwiceFilter: testingApplyFilter}, applyC
},
setup: func(a setupArgs) (chan struct{}, chan error, error) {
// Initiate a Raft proposal and pause it during application.
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/replica_follower_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func TestCheckExecutionCanProceedAllowsFollowerReadWithInvalidLease(t *testing.T
manual := timeutil.NewManualTime(timeutil.Unix(5, 0))
clock := hlc.NewClock(manual, 1 /* maxOffset */)
tsc := TestStoreConfig(clock)
tsc.TestingKnobs.DisableCanAckBeforeApplication = true
// Permit only one lease attempt. The test is flaky if we allow the lease to
// be renewed by background processes.
var leaseOnce sync.Once
Expand Down
Loading

0 comments on commit 175a9f8

Please sign in to comment.