Skip to content

Commit

Permalink
kvnemesis: add Raft double-apply assertions
Browse files Browse the repository at this point in the history
Very basic proof-of-concept. kvnemesis passes, at least.

Epic: none
Release note: None
  • Loading branch information
erikgrinaker committed Dec 15, 2023
1 parent 709be6d commit cc4ca2e
Show file tree
Hide file tree
Showing 13 changed files with 197 additions and 9 deletions.
2 changes: 2 additions & 0 deletions pkg/kv/kvnemesis/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ go_test(
"//pkg/kv/kvnemesis/kvnemesisutil",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/apply",
"//pkg/kv/kvserver/concurrency/isolation",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
Expand Down
20 changes: 20 additions & 0 deletions pkg/kv/kvnemesis/kvnemesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/apply"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
Expand Down Expand Up @@ -127,6 +129,20 @@ func (cfg kvnemesisTestCfg) testClusterArgs(tr *SeqTracker) base.TestClusterArgs
}
}

if cfg.assertRaftApply {
asserter := apply.NewAsserter()
storeKnobs.TestingApplyCalledTwiceFilter = func(args kvserverbase.ApplyFilterArgs) (int, *kvpb.Error) {
if !args.Ephemeral {
asserter.Apply(args.RangeID, args.ReplicaID, args.CmdID, args.Entry)
}
return 0, nil
}
storeKnobs.BeforeSnapshotSSTIngestion = func(r *kvserver.Replica, snap kvserver.IncomingSnapshot, _ []string) error {
asserter.ApplySnapshot(snap.Desc.RangeID, r.ReplicaID(), snap.RaftAppliedIndex)
return nil
}
}

return base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Expand Down Expand Up @@ -224,6 +240,7 @@ type kvnemesisTestCfg struct {
// considered truly random, but is random enough for the desired purpose.
invalidLeaseAppliedIndexProb float64 // [0,1)
injectReproposalErrorProb float64 // [0,1)
assertRaftApply bool
}

func TestKVNemesisSingleNode(t *testing.T) {
Expand All @@ -237,6 +254,7 @@ func TestKVNemesisSingleNode(t *testing.T) {
seedOverride: 0,
invalidLeaseAppliedIndexProb: 0.2,
injectReproposalErrorProb: 0.2,
assertRaftApply: true,
})
}

Expand All @@ -251,6 +269,7 @@ func TestKVNemesisSingleNode_ReproposalChaos(t *testing.T) {
seedOverride: 0,
invalidLeaseAppliedIndexProb: 0.9,
injectReproposalErrorProb: 0.5,
assertRaftApply: true,
})
}

Expand All @@ -265,6 +284,7 @@ func TestKVNemesisMultiNode(t *testing.T) {
seedOverride: 0,
invalidLeaseAppliedIndexProb: 0.2,
injectReproposalErrorProb: 0.2,
assertRaftApply: true,
})
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/apply/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "apply",
srcs = [
"asserter.go",
"cmd.go",
"doc.go",
"task.go",
Expand All @@ -11,6 +12,9 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/roachpb",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
"@io_etcd_go_raft_v3//raftpb",
],
Expand Down
145 changes: 145 additions & 0 deletions pkg/kv/kvserver/apply/asserter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

// TODO: put in a test package
package apply

import (
fmt "fmt"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"go.etcd.io/raft/v3/raftpb"
)

type appliedCmd struct {
cmdID kvserverbase.CmdIDKey
rangeID roachpb.RangeID
logIndex kvpb.RaftIndex
replicas map[roachpb.ReplicaID]bool
}

// Asserter is a test utility that tracks application of Raft commands, and
// asserts that a command is only applied once per replica, and never applied at
// different log indexes (i.e. a double-apply or replay).
//
// It accounts for snapshots, but not splits/merges which are considered
// separate Raft groups, as the risk of proposals leaking across Raft groups
// appears negligible.
type Asserter struct {
mu syncutil.Mutex
// appliedIndex tracks the applied index of each replica.
appliedIndex map[roachpb.RangeID]map[roachpb.ReplicaID]kvpb.RaftIndex
// appliedCmds tracks applied commands by range and command ID.
appliedCmds map[roachpb.RangeID]map[kvserverbase.CmdIDKey]*appliedCmd
}

// NewAsserter creates a new asserter.
func NewAsserter() *Asserter {
return &Asserter{
appliedIndex: map[roachpb.RangeID]map[roachpb.ReplicaID]kvpb.RaftIndex{},
appliedCmds: map[roachpb.RangeID]map[kvserverbase.CmdIDKey]*appliedCmd{},
}
}

// ensureRangeLocked ensures that data structures exist for the given range.
// Asserter.mu must be held.
func (a *Asserter) ensureRangeLocked(rangeID roachpb.RangeID) {
if _, ok := a.appliedCmds[rangeID]; !ok {
a.appliedCmds[rangeID] = map[kvserverbase.CmdIDKey]*appliedCmd{}
}
if _, ok := a.appliedIndex[rangeID]; !ok {
a.appliedIndex[rangeID] = map[roachpb.ReplicaID]kvpb.RaftIndex{}
}
}

// Apply records a command being applied, and asserts validity.
//
// It assumes that entry application is durable, i.e. that nodes won't lose
// applied state. If needed, we can loosen this requirement by signalling
// truncation of applied state (e.g. on node restart).
func (a *Asserter) Apply(
rangeID roachpb.RangeID,
replicaID roachpb.ReplicaID,
cmdID kvserverbase.CmdIDKey,
entry raftpb.Entry,
) {
if len(cmdID) == 0 {
return
}

a.mu.Lock()
defer a.mu.Unlock()

a.ensureRangeLocked(rangeID)

// Assert and record the applied index.
if appliedIndex := a.appliedIndex[rangeID][replicaID]; kvpb.RaftIndex(entry.Index) <= appliedIndex {
panic(fmt.Sprintf("applied index regression for r%d/%d: %d -> %d", rangeID, replicaID, appliedIndex, entry.Index))
}
a.appliedIndex[rangeID][replicaID] = kvpb.RaftIndex(entry.Index)

if ac, ok := a.appliedCmds[rangeID][cmdID]; !ok {
// New command was applied, record it.
a.appliedCmds[rangeID][cmdID] = &appliedCmd{
cmdID: cmdID,
rangeID: rangeID,
logIndex: kvpb.RaftIndex(entry.Index),
replicas: map[roachpb.ReplicaID]bool{replicaID: true},
}
} else if ac.logIndex == kvpb.RaftIndex(entry.Index) {
// Command applying at the expected index, record the replica.
ac.replicas[replicaID] = true
} else {
// Applied command at unexpected log index, bail out.
var replicas []roachpb.ReplicaID
for id := range ac.replicas {
replicas = append(replicas, id)
}
msg := fmt.Sprintf("command %s re-applied at index %d on r%d/%d\n", cmdID, entry.Index, rangeID, replicaID)
msg += fmt.Sprintf("previously applied at index %d on replicas %s\n", ac.logIndex, replicas)
msg += fmt.Sprintf("entry: %+v", entry) // TODO: this formatting is not helpful
panic(msg)
}
}

// ApplySnapshot records snapshot application at the given applied index. The
// replica is considered to have applied all commands up to and including the
// applied index.
func (a *Asserter) ApplySnapshot(
rangeID roachpb.RangeID,
replicaID roachpb.ReplicaID,
index kvpb.RaftIndex,
) {
a.mu.Lock()
defer a.mu.Unlock()

a.ensureRangeLocked(rangeID)

// Record the applied index.
a.appliedIndex[rangeID][replicaID] = index

// Record the applied commands for this replica, i.e. all commands up to and
// including the snapshot index. A snapshot may regress the applied index, so
// also clear out any later commands that are now un-applied.
//
// A command must have applied
// on at least one replica before it can be included in a snapshot, so all
// relevant commands are guaranteed to be present in the map.
for _, cmd := range a.appliedCmds[rangeID] {
if cmd.logIndex <= index {
cmd.replicas[replicaID] = true
} else {
delete(cmd.replicas, replicaID)
}
}
}
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ func mergeCheckingTimestampCaches(
// snapshotFilter is used to listen for the completion of a Raft snapshot.
var snapshotFilter func(kvserver.IncomingSnapshot)
beforeSnapshotSSTIngestion := func(
_ *kvserver.Replica,
inSnap kvserver.IncomingSnapshot,
_ []string,
) error {
Expand Down Expand Up @@ -3724,6 +3725,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
var keyStart, keyA, keyB, keyC, keyD, keyEnd roachpb.Key
rangeIds := make(map[string]roachpb.RangeID, 4)
beforeSnapshotSSTIngestion := func(
_ *kvserver.Replica,
inSnap kvserver.IncomingSnapshot,
sstNames []string,
) error {
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4745,6 +4745,7 @@ func TestTenantID(t *testing.T) {
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
BeforeSnapshotSSTIngestion: func(
_ *kvserver.Replica,
snapshot kvserver.IncomingSnapshot,
strings []string,
) error {
Expand Down Expand Up @@ -4824,6 +4825,7 @@ func TestUninitializedMetric(t *testing.T) {
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
BeforeSnapshotSSTIngestion: func(
_ *kvserver.Replica,
snapshot kvserver.IncomingSnapshot,
_ []string,
) error {
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvserverbase/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ go_library(
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_pebble//vfs",
"@com_github_cockroachdb_redact//:redact",
"@io_etcd_go_raft_v3//raftpb",
"@org_golang_x_time//rate",
],
)
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/kvserverbase/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/redact"
"go.etcd.io/raft/v3/raftpb"
)

// MergeQueueEnabled is a setting that controls whether the merge queue is
Expand Down Expand Up @@ -150,8 +151,11 @@ type ProposalFilterArgs struct {
type ApplyFilterArgs struct {
kvserverpb.ReplicatedEvalResult
CmdID CmdIDKey
Entry raftpb.Entry
RangeID roachpb.RangeID
StoreID roachpb.StoreID
ReplicaID roachpb.ReplicaID
Ephemeral bool
Req *kvpb.BatchRequest // only set on the leaseholder
ForcedError *kvpb.Error
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_app_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (b *replicaAppBatch) Stage(

// Then, maybe override the result with testing knobs.
if b.r.store.TestingKnobs() != nil {
fr = replicaApplyTestingFilters(ctx, b.r, cmd, fr)
fr = replicaApplyTestingFilters(ctx, b.r, cmd, fr, false /* ephemeral */)
}

// Now update cmd. We'll either put the lease index in it or zero out
Expand Down Expand Up @@ -755,7 +755,7 @@ func (mb *ephemeralReplicaAppBatch) Stage(
fr := kvserverbase.CheckForcedErr(
ctx, cmd.ID, &cmd.Cmd, cmd.IsLocal(), &mb.state,
)
fr = replicaApplyTestingFilters(ctx, mb.r, cmd, fr)
fr = replicaApplyTestingFilters(ctx, mb.r, cmd, fr, true /* ephemeral */)
cmd.ForcedErrResult = fr
if !cmd.Rejected() && cmd.LeaseIndex > mb.state.LeaseAppliedIndex {
mb.state.LeaseAppliedIndex = cmd.LeaseIndex
Expand Down
9 changes: 8 additions & 1 deletion pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ func (r *Replica) getStateMachine() *replicaStateMachine {

// TODO(tbg): move this to replica_app_batch.go.
func replicaApplyTestingFilters(
ctx context.Context, r *Replica, cmd *replicatedCmd, fr kvserverbase.ForcedErrResult,
ctx context.Context,
r *Replica,
cmd *replicatedCmd,
fr kvserverbase.ForcedErrResult,
ephemeral bool,
) kvserverbase.ForcedErrResult {
// By default, output is input.
newFR := fr
Expand All @@ -96,9 +100,12 @@ func replicaApplyTestingFilters(
if filter := r.store.cfg.TestingKnobs.TestingApplyCalledTwiceFilter; fr.ForcedError != nil || filter != nil {
args := kvserverbase.ApplyFilterArgs{
CmdID: cmd.ID,
Entry: cmd.Entry.Entry,
ReplicatedEvalResult: *cmd.ReplicatedResult(),
StoreID: r.store.StoreID(),
RangeID: r.RangeID,
ReplicaID: r.replicaID,
Ephemeral: ephemeral,
ForcedError: fr.ForcedError,
}
if fr.ForcedError == nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,8 @@ type IncomingSnapshot struct {
// Size of the ssts containing these key-value pairs.
SSTSize int64
SharedSize int64
RaftAppliedIndex kvpb.RaftIndex // logging only
placeholder *ReplicaPlaceholder
raftAppliedIndex kvpb.RaftIndex // logging only
msgAppRespCh chan raftpb.Message // receives MsgAppResp if/when snap is applied
sharedSSTs []pebble.SharedSSTMeta
doExcise bool
Expand All @@ -400,7 +400,7 @@ func (s IncomingSnapshot) String() string {
// SafeFormat implements the redact.SafeFormatter interface.
func (s IncomingSnapshot) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("snapshot %s from %s at applied index %d",
redact.Safe(s.SnapUUID.Short()), s.FromReplica, s.raftAppliedIndex)
redact.Safe(s.SnapUUID.Short()), s.FromReplica, s.RaftAppliedIndex)
}

// snapshot creates an OutgoingSnapshot containing a pebble snapshot for the
Expand Down Expand Up @@ -661,7 +661,7 @@ func (r *Replica) applySnapshot(

// Ingest all SSTs atomically.
if fn := r.store.cfg.TestingKnobs.BeforeSnapshotSSTIngestion; fn != nil {
if err := fn(inSnap, inSnap.SSTStorageScratch.SSTs()); err != nil {
if err := fn(r, inSnap, inSnap.SSTStorageScratch.SSTs()); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
DataSize: dataSize,
SSTSize: sstSize,
SharedSize: sharedSize,
raftAppliedIndex: header.State.RaftAppliedIndex,
RaftAppliedIndex: header.State.RaftAppliedIndex,
msgAppRespCh: make(chan raftpb.Message, 1),
sharedSSTs: sharedSSTs,
doExcise: doExcise,
Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/kvserver/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ type StoreTestingKnobs struct {
// Users have to expect the filter to be invoked twice for each command, once
// from ephemerealReplicaAppBatch.Stage, and once from replicaAppBatch.Stage;
// this has to do with wanting to early-ack successful proposals. The second
// call is conditional on the first call succeeding.
// call is conditional on the first call succeeding. The field
// ApplyFilterArgs.Ephemeral will be true for the initial call.
//
// Consider using a TestPostApplyFilter instead, and use a
// TestingApplyCalledTwiceFilter only to inject forced errors.
Expand Down Expand Up @@ -337,7 +338,7 @@ type StoreTestingKnobs struct {
BeforeRemovingDemotedLearner func()
// BeforeSnapshotSSTIngestion is run just before the SSTs are ingested when
// applying a snapshot.
BeforeSnapshotSSTIngestion func(IncomingSnapshot, []string) error
BeforeSnapshotSSTIngestion func(*Replica, IncomingSnapshot, []string) error
// OnRelocatedOne intercepts the return values of s.relocateOne after they
// have successfully been put into effect.
OnRelocatedOne func(_ []kvpb.ReplicationChange, leaseTarget *roachpb.ReplicationTarget)
Expand Down

0 comments on commit cc4ca2e

Please sign in to comment.