diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 07dd384a65fe..c8dd4dfa10ee 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -133,6 +133,7 @@ go_library( "//pkg/kv/kvserver/protectedts", "//pkg/kv/kvserver/protectedts/ptpb", "//pkg/kv/kvserver/raftentry", + "//pkg/kv/kvserver/raftlog", "//pkg/kv/kvserver/rangefeed", "//pkg/kv/kvserver/rditer", "//pkg/kv/kvserver/readsummary", @@ -336,6 +337,7 @@ go_test( "//pkg/kv/kvserver/protectedts/ptstorage", "//pkg/kv/kvserver/protectedts/ptverifier", "//pkg/kv/kvserver/raftentry", + "//pkg/kv/kvserver/raftlog", "//pkg/kv/kvserver/rditer", "//pkg/kv/kvserver/readsummary/rspb", "//pkg/kv/kvserver/spanset", diff --git a/pkg/kv/kvserver/debug_print.go b/pkg/kv/kvserver/debug_print.go index a2b84648cb37..10b6ac1c303f 100644 --- a/pkg/kv/kvserver/debug_print.go +++ b/pkg/kv/kvserver/debug_print.go @@ -16,8 +16,8 @@ import ( "strings" "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" @@ -206,52 +206,20 @@ func decodeWriteBatch(writeBatch *kvserverpb.WriteBatch) (string, error) { } func tryRaftLogEntry(kv storage.MVCCKeyValue) (string, error) { - var ent raftpb.Entry - if err := maybeUnmarshalInline(kv.Value, &ent); err != nil { + var e raftlog.Entry + if err := e.LoadFromRawValue(kv.Value); err != nil { return "", err } - var cmd kvserverpb.RaftCommand - switch ent.Type { - case raftpb.EntryNormal: - if len(ent.Data) == 0 { - return fmt.Sprintf("%s: EMPTY\n", &ent), nil - } - _, cmdData := kvserverbase.DecodeRaftCommand(ent.Data) - if err := protoutil.Unmarshal(cmdData, &cmd); err != nil { - return "", err - } - case raftpb.EntryConfChange, raftpb.EntryConfChangeV2: - var c raftpb.ConfChangeI - if ent.Type == raftpb.EntryConfChange { - var cc raftpb.ConfChange - if err := protoutil.Unmarshal(ent.Data, &cc); err != nil { - return "", err - } - c = cc - } else { - var cc raftpb.ConfChangeV2 - if err := protoutil.Unmarshal(ent.Data, &cc); err != nil { - return "", err - } - c = cc - } - - var ctx kvserverpb.ConfChangeContext - if err := protoutil.Unmarshal(c.AsV2().Context, &ctx); err != nil { - return "", err - } - if err := protoutil.Unmarshal(ctx.Payload, &cmd); err != nil { - return "", err - } - default: - return "", fmt.Errorf("unknown log entry type: %s", &ent) + if len(e.Ent.Data) == 0 { + return fmt.Sprintf("%s: EMPTY\n", &e.Ent), nil } - ent.Data = nil + e.Ent.Data = nil + cmd := e.Cmd var leaseStr string if l := cmd.DeprecatedProposerLease; l != nil { - leaseStr = l.String() // use full lease, if available + leaseStr = l.String() // use the full lease, if available } else { leaseStr = fmt.Sprintf("lease #%d", cmd.ProposerLeaseSequence) } @@ -262,7 +230,7 @@ func tryRaftLogEntry(kv storage.MVCCKeyValue) (string, error) { } cmd.WriteBatch = nil - return fmt.Sprintf("%s by %s\n%s\nwrite batch:\n%s", &ent, leaseStr, &cmd, wbStr), nil + return fmt.Sprintf("%s by %s\n%s\nwrite batch:\n%s", &e.Ent, leaseStr, &cmd, wbStr), nil } func tryTxn(kv storage.MVCCKeyValue) (string, error) { diff --git a/pkg/kv/kvserver/loqrecovery/BUILD.bazel b/pkg/kv/kvserver/loqrecovery/BUILD.bazel index a8b337d73463..4ec48ca6bf31 100644 --- a/pkg/kv/kvserver/loqrecovery/BUILD.bazel +++ b/pkg/kv/kvserver/loqrecovery/BUILD.bazel @@ -14,20 +14,18 @@ go_library( deps = [ "//pkg/keys", "//pkg/kv/kvserver", - "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/kvserverpb", "//pkg/kv/kvserver/loqrecovery/loqrecoverypb", + "//pkg/kv/kvserver/raftlog", "//pkg/kv/kvserver/stateloader", "//pkg/roachpb:with-mocks", "//pkg/storage", - "//pkg/storage/enginepb", "//pkg/util/hlc", "//pkg/util/log", "//pkg/util/protoutil", "//pkg/util/timeutil", "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", - "@io_etcd_go_etcd_raft_v3//raftpb", ], ) diff --git a/pkg/kv/kvserver/loqrecovery/collect.go b/pkg/kv/kvserver/loqrecovery/collect.go index 59ddf6b1e81b..fd6f13d70d17 100644 --- a/pkg/kv/kvserver/loqrecovery/collect.go +++ b/pkg/kv/kvserver/loqrecovery/collect.go @@ -14,18 +14,13 @@ import ( "context" "math" - "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" - "github.com/cockroachdb/cockroach/pkg/storage/enginepb" - "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/errors" - "go.etcd.io/etcd/raft/v3/raftpb" ) // CollectReplicaInfo captures states of all replicas in all stores for the sake of quorum recovery. @@ -80,88 +75,25 @@ func CollectReplicaInfo( return loqrecoverypb.NodeReplicaInfo{Replicas: replicas}, nil } -// GetDescriptorChangesFromRaftLog iterates over raft log between indicies +// GetDescriptorChangesFromRaftLog iterates over raft log between indices // lo (inclusive) and hi (exclusive) and searches for changes to range -// descriptor. Changes are identified by commit trigger content which is -// extracted either from EntryNormal where change updates key range info -// (split/merge) or from EntryConfChange* for changes in replica set. +// descriptors, as identified by presence of a commit trigger. func GetDescriptorChangesFromRaftLog( rangeID roachpb.RangeID, lo, hi uint64, reader storage.Reader, ) ([]loqrecoverypb.DescriptorChangeInfo, error) { + it := raftlog.NewIterator(rangeID, reader) + defer it.Close() + it.Seek(lo, hi) var changes []loqrecoverypb.DescriptorChangeInfo - - key := keys.RaftLogKey(rangeID, lo) - endKey := keys.RaftLogKey(rangeID, hi) - iter := reader.NewMVCCIterator(storage.MVCCKeyIterKind, storage.IterOptions{ - UpperBound: endKey, - }) - defer iter.Close() - - var meta enginepb.MVCCMetadata - var ent raftpb.Entry - - decodeRaftChange := func(ccI raftpb.ConfChangeI) ([]byte, error) { - var ccC kvserverpb.ConfChangeContext - if err := protoutil.Unmarshal(ccI.AsV2().Context, &ccC); err != nil { - return nil, errors.Wrap(err, "while unmarshaling CCContext") - } - return ccC.Payload, nil - } - - iter.SeekGE(storage.MakeMVCCMetadataKey(key)) - for ; ; iter.Next() { - ok, err := iter.Valid() + for ; ; it.Next() { + ok, err := it.Valid() if err != nil { return nil, err } if !ok { return changes, nil } - if err := protoutil.Unmarshal(iter.UnsafeValue(), &meta); err != nil { - return nil, errors.Wrap(err, "unable to decode raft log MVCCMetadata") - } - if err := storage.MakeValue(meta).GetProto(&ent); err != nil { - return nil, errors.Wrap(err, "unable to unmarshal raft Entry") - } - if len(ent.Data) == 0 { - continue - } - // Following code extracts our raft command from raft log entry. Depending - // on entry type we either need to extract encoded command from configuration - // change (for replica changes) or from normal command (for splits and - // merges). - var payload []byte - switch ent.Type { - case raftpb.EntryConfChange: - var cc raftpb.ConfChange - if err := protoutil.Unmarshal(ent.Data, &cc); err != nil { - return nil, errors.Wrap(err, "while unmarshaling ConfChange") - } - payload, err = decodeRaftChange(cc) - if err != nil { - return nil, err - } - case raftpb.EntryConfChangeV2: - var cc raftpb.ConfChangeV2 - if err := protoutil.Unmarshal(ent.Data, &cc); err != nil { - return nil, errors.Wrap(err, "while unmarshaling ConfChangeV2") - } - payload, err = decodeRaftChange(cc) - if err != nil { - return nil, err - } - case raftpb.EntryNormal: - _, payload = kvserverbase.DecodeRaftCommand(ent.Data) - default: - continue - } - if len(payload) == 0 { - continue - } - var raftCmd kvserverpb.RaftCommand - if err := protoutil.Unmarshal(payload, &raftCmd); err != nil { - return nil, errors.Wrap(err, "unable to unmarshal raft command") - } + raftCmd := it.UnsafeEntry().Cmd switch { case raftCmd.ReplicatedEvalResult.Split != nil: changes = append(changes, diff --git a/pkg/kv/kvserver/raftlog/BUILD.bazel b/pkg/kv/kvserver/raftlog/BUILD.bazel new file mode 100644 index 000000000000..c9fd10e1b8f9 --- /dev/null +++ b/pkg/kv/kvserver/raftlog/BUILD.bazel @@ -0,0 +1,22 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "raftlog", + srcs = [ + "entry.go", + "iterator.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog", + visibility = ["//visibility:public"], + deps = [ + "//pkg/keys", + "//pkg/kv/kvserver/kvserverbase", + "//pkg/kv/kvserver/kvserverpb", + "//pkg/roachpb:with-mocks", + "//pkg/storage", + "//pkg/storage/enginepb", + "//pkg/util/protoutil", + "@com_github_cockroachdb_errors//:errors", + "@io_etcd_go_etcd_raft_v3//raftpb", + ], +) diff --git a/pkg/kv/kvserver/raftlog/entry.go b/pkg/kv/kvserver/raftlog/entry.go new file mode 100644 index 000000000000..73ffd97ca065 --- /dev/null +++ b/pkg/kv/kvserver/raftlog/entry.go @@ -0,0 +1,107 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package raftlog + +import ( + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/errors" + "go.etcd.io/etcd/raft/v3/raftpb" +) + +// Entry contains data related to a raft log entry. This is the raftpb.Entry +// itself but also all encapsulated data relevant for command application. +type Entry struct { + Meta enginepb.MVCCMetadata // only set after LoadFromRawValue + Ent raftpb.Entry // must be pre-populated when calling Load + // These fields will all be populated by Load. + ID kvserverbase.CmdIDKey // always set + CC1 *raftpb.ConfChange // only set for config change + CC2 *raftpb.ConfChangeV2 // only set for config change + CCC *kvserverpb.ConfChangeContext // only set for config change + Cmd kvserverpb.RaftCommand // always set +} + +// LoadFromRawValue populates the Entry from a raw value, i.e. from bytes as +// stored on a storage.Engine. +func (e *Entry) LoadFromRawValue(b []byte) error { + if err := protoutil.Unmarshal(b, &e.Meta); err != nil { + return errors.Wrap(err, "decoding raft log MVCCMetadata") + } + + if err := storage.MakeValue(e.Meta).GetProto(&e.Ent); err != nil { + return errors.Wrap(err, "unmarshalling raft Entry") + } + + return e.load() +} + +// Load populates the Entry from the Ent field. Will reset the Meta field. +func (e *Entry) Load() error { + e.Meta = enginepb.MVCCMetadata{} + return e.load() +} + +func (e *Entry) load() error { + if len(e.Ent.Data) == 0 { + // Raft-proposed empty entry. + return nil + } + + var payload []byte + switch e.Ent.Type { + case raftpb.EntryNormal: + e.ID, payload = kvserverbase.DecodeRaftCommand(e.Ent.Data) + case raftpb.EntryConfChange: + e.CC1 = &raftpb.ConfChange{} + if err := protoutil.Unmarshal(e.Ent.Data, e.CC1); err != nil { + return errors.Wrap(err, "unmarshalling ConfChange") + } + e.CCC = &kvserverpb.ConfChangeContext{} + if err := protoutil.Unmarshal(e.CC1.Context, e.CCC); err != nil { + return errors.Wrap(err, "unmarshalling ConfChangeContext") + } + payload = e.CCC.Payload + e.ID = kvserverbase.CmdIDKey(e.CCC.CommandID) + case raftpb.EntryConfChangeV2: + e.CC2 = &raftpb.ConfChangeV2{} + if err := protoutil.Unmarshal(e.Ent.Data, e.CC2); err != nil { + return errors.Wrap(err, "unmarshalling ConfChangeV2") + } + e.CCC = &kvserverpb.ConfChangeContext{} + if err := protoutil.Unmarshal(e.CC2.Context, e.CCC); err != nil { + return errors.Wrap(err, "unmarshalling ConfChangeContext") + } + payload = e.CCC.Payload + e.ID = kvserverbase.CmdIDKey(e.CCC.CommandID) + default: + return errors.AssertionFailedf("unknown entry type %d", e.Ent.Type) + } + + // TODO(tbg): can len(payload)==0 if we propose an empty command to wake up leader? + // If so, is that a problem here? + return errors.Wrap(protoutil.Unmarshal(payload, &e.Cmd), "unmarshalling RaftCommand") +} + +// CC returns CC1 or CC2 as an interface, if set. Otherwise, returns nil. +func (e *Entry) CC() raftpb.ConfChangeI { + if e.CC1 != nil { + return e.CC1 + } + if e.CC2 != nil { + return e.CC2 + } + // NB: nil != interface{}(nil). + return nil +} diff --git a/pkg/kv/kvserver/raftlog/iterator.go b/pkg/kv/kvserver/raftlog/iterator.go new file mode 100644 index 000000000000..4fb334217331 --- /dev/null +++ b/pkg/kv/kvserver/raftlog/iterator.go @@ -0,0 +1,95 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package raftlog + +import ( + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage" +) + +// An Iterator inspects the raft log. After creation, Seek should be invoked. +// Then, after each call to Valid() which returns (true, nil), an Entry is +// available via the UnsafeEntry method; the Entry becomes invalid once the +// Iterator is advanced via Next or Seek (however, pointers inside of Entry will +// not be re-used and can thus be held on to). Valid() returns (false, nil) when +// no more entries are in the search window last provided to Seek; the Iterator +// may be Seek'ed again. The Iterator must eventually be Close'd. +type Iterator struct { + eng storage.Reader + prefBuf keys.RangeIDPrefixBuf + + iter storage.MVCCIterator + entry Entry +} + +// NewIterator initializes an Iterator that reads the raft log for the given +// RangeID from the provided Reader. +func NewIterator(rangeID roachpb.RangeID, eng storage.Reader) *Iterator { + return &Iterator{ + eng: eng, + prefBuf: keys.MakeRangeIDPrefixBuf(rangeID), + } +} + +// Close releases the resources associated to this Iterator. +func (it *Iterator) Close() { + if it.iter != nil { + it.iter.Close() + } +} + +// Seek sets up the Iterator to read the raft log from indices lo (inclusive) +// to hi (exclusive). +func (it *Iterator) Seek(lo, hi uint64) { + it.iter = it.eng.NewMVCCIterator(storage.MVCCKeyIterKind, storage.IterOptions{ + UpperBound: it.prefBuf.RaftLogKey(hi), // exclusive + }) + + it.iter.SeekGE(storage.MakeMVCCMetadataKey(it.prefBuf.RaftLogKey(lo))) +} + +// Next advances the Iterator. This must follow a call to Valid that returned +// (true, nil). +func (it *Iterator) Next() { + it.iter.Next() +} + +// Valid returns (true, nil) when another entry is available via UnsafeEntry. +// It returns (false, nil) when the search window passed to the previous call +// to Seek is exhausted; note that this does not imply that all possible Entries +// in the iteration window actually existed. When an error is returned, the +// Iterator must no longer be used. +func (it *Iterator) Valid() (bool, error) { + ok, err := it.iter.Valid() + if err != nil { + return false, err + } + if !ok { + return false, nil + } + + if err := it.entry.LoadFromRawValue(it.iter.UnsafeValue()); err != nil { + return false, err + } + + return true, nil +} + +// UnsafeEntry returns the Entry the iterator is currently positioned at. This +// Entry is only valid for use until Seek or Next are invoked. Memory referenced +// from within the Entry can be held on to. +// +// TODO(tbg): this is not the most useful contract - reusing only the Entry +// doesn't buy us as much, reusing `Entry.Data` is the really useful part. +func (it *Iterator) UnsafeEntry() *Entry { + return &it.entry +} diff --git a/pkg/kv/kvserver/replica_application_cmd.go b/pkg/kv/kvserver/replica_application_cmd.go index 07c056babef6..6ed2e4c1d967 100644 --- a/pkg/kv/kvserver/replica_application_cmd.go +++ b/pkg/kv/kvserver/replica_application_cmd.go @@ -14,13 +14,11 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" - "github.com/cockroachdb/errors" "go.etcd.io/etcd/raft/v3/raftpb" ) @@ -44,8 +42,7 @@ import ( // batch's view of ReplicaState. Then the batch is committed, the side-effects // are applied and the local result is processed. type replicatedCmd struct { - ent *raftpb.Entry // the raft.Entry being applied - decodedRaftEntry // decoded from ent + decodedRaftEntry // decoded from ent // proposal is populated on the proposing Replica only and comes from the // Replica's proposal map. @@ -82,26 +79,17 @@ type replicatedCmd struct { // decodedRaftEntry represents the deserialized content of a raftpb.Entry. type decodedRaftEntry struct { - idKey kvserverbase.CmdIDKey - raftCmd kvserverpb.RaftCommand - confChange *decodedConfChange // only non-nil for config changes + raftlog.Entry } -// decodedConfChange represents the fields of a config change raft command. -type decodedConfChange struct { - raftpb.ConfChangeI - kvserverpb.ConfChangeContext -} - -// decode decodes the entry e into the replicatedCmd. +// decode the entry e into the replicatedCmd. func (c *replicatedCmd) decode(ctx context.Context, e *raftpb.Entry) error { - c.ent = e return c.decodedRaftEntry.decode(ctx, e) } // Index implements the apply.Command interface. func (c *replicatedCmd) Index() uint64 { - return c.ent.Index + return c.decodedRaftEntry.Ent.Index } // IsTrivial implements the apply.Command interface. @@ -194,66 +182,23 @@ func (c *replicatedCmd) finishTracingSpan() { // decode decodes the entry e into the decodedRaftEntry. func (d *decodedRaftEntry) decode(ctx context.Context, e *raftpb.Entry) error { *d = decodedRaftEntry{} - // etcd raft sometimes inserts nil commands, ours are never nil. - // This case is handled upstream of this call. - if len(e.Data) == 0 { - return nil - } - switch e.Type { - case raftpb.EntryNormal: - return d.decodeNormalEntry(e) - case raftpb.EntryConfChange, raftpb.EntryConfChangeV2: - return d.decodeConfChangeEntry(e) - default: - log.Fatalf(ctx, "unexpected Raft entry: %v", e) - return nil // unreachable - } -} -func (d *decodedRaftEntry) decodeNormalEntry(e *raftpb.Entry) error { - var encodedCommand []byte - d.idKey, encodedCommand = kvserverbase.DecodeRaftCommand(e.Data) - // An empty command is used to unquiesce a range and wake the - // leader. Clear commandID so it's ignored for processing. - if len(encodedCommand) == 0 { - d.idKey = "" - } else if err := protoutil.Unmarshal(encodedCommand, &d.raftCmd); err != nil { - return wrapWithNonDeterministicFailure(err, "while unmarshaling entry") + d.Ent = *e + if err := d.Load(); err != nil { + return wrapWithNonDeterministicFailure(err, "while decoding raft entry") } - return nil -} - -func (d *decodedRaftEntry) decodeConfChangeEntry(e *raftpb.Entry) error { - d.confChange = &decodedConfChange{} - switch e.Type { - case raftpb.EntryConfChange: - var cc raftpb.ConfChange - if err := protoutil.Unmarshal(e.Data, &cc); err != nil { - return wrapWithNonDeterministicFailure(err, "while unmarshaling ConfChange") - } - d.confChange.ConfChangeI = cc - case raftpb.EntryConfChangeV2: - var cc raftpb.ConfChangeV2 - if err := protoutil.Unmarshal(e.Data, &cc); err != nil { - return wrapWithNonDeterministicFailure(err, "while unmarshaling ConfChangeV2") - } - d.confChange.ConfChangeI = cc - default: - const msg = "unknown entry type" - err := errors.New(msg) - return wrapWithNonDeterministicFailure(err, msg) - } - if err := protoutil.Unmarshal(d.confChange.AsV2().Context, &d.confChange.ConfChangeContext); err != nil { - return wrapWithNonDeterministicFailure(err, "while unmarshaling ConfChangeContext") - } - if err := protoutil.Unmarshal(d.confChange.Payload, &d.raftCmd); err != nil { - return wrapWithNonDeterministicFailure(err, "while unmarshaling RaftCommand") + // etcd raft sometimes inserts nil commands. Additionally, we sometimes + // propose empty commands (where e.Data is just the CmdIDKey); both are + // handled by the caller. In the latter case, we clear the CmdIDKey to + // make it look like the empty raft command, leaving only one path to + // handle upstream. + if d.Cmd.ReplicatedEvalResult.IsZero() { + d.ID = "" } - d.idKey = kvserverbase.CmdIDKey(d.confChange.CommandID) return nil } func (d *decodedRaftEntry) replicatedResult() *kvserverpb.ReplicatedEvalResult { - return &d.raftCmd.ReplicatedEvalResult + return &d.Cmd.ReplicatedEvalResult } diff --git a/pkg/kv/kvserver/replica_application_decoder.go b/pkg/kv/kvserver/replica_application_decoder.go index 018b4925f42d..9e70c36eef6a 100644 --- a/pkg/kv/kvserver/replica_application_decoder.go +++ b/pkg/kv/kvserver/replica_application_decoder.go @@ -85,7 +85,7 @@ func (d *replicaDecoder) retrieveLocalProposals(ctx context.Context) (anyLocal b var it replicatedCmdBufSlice for it.init(&d.cmdBuf); it.Valid(); it.Next() { cmd := it.cur() - cmd.proposal = d.r.mu.proposals[cmd.idKey] + cmd.proposal = d.r.mu.proposals[cmd.ID] anyLocal = anyLocal || cmd.IsLocal() } if !anyLocal && d.r.mu.proposalQuota == nil { @@ -109,7 +109,7 @@ func (d *replicaDecoder) retrieveLocalProposals(ctx context.Context) (anyLocal b // criterion. While such proposals can be reproposed, only the first // instance that gets applied matters and so removing the command is // always what we want to happen. - cmd.raftCmd.MaxLeaseIndex == cmd.proposal.command.MaxLeaseIndex + cmd.Cmd.MaxLeaseIndex == cmd.proposal.command.MaxLeaseIndex if shouldRemove { // Delete the proposal from the proposals map. There may be reproposals // of the proposal in the pipeline, but those will all have the same max @@ -122,7 +122,7 @@ func (d *replicaDecoder) retrieveLocalProposals(ctx context.Context) (anyLocal b // We check the proposal map again first to avoid double free-ing quota // when reproposals from the same proposal end up in the same entry // application batch. - delete(d.r.mu.proposals, cmd.idKey) + delete(d.r.mu.proposals, cmd.ID) toRelease = cmd.proposal.quotaAlloc cmd.proposal.quotaAlloc = nil } @@ -146,11 +146,11 @@ func (d *replicaDecoder) createTracingSpans(ctx context.Context) { cmd := it.cur() if cmd.IsLocal() { cmd.ctx, cmd.sp = tracing.ChildSpan(cmd.proposal.ctx, opName) - } else if cmd.raftCmd.TraceData != nil { + } else if cmd.Cmd.TraceData != nil { // The proposal isn't local, and trace data is available. Extract // the remote span and start a server-side span that follows from it. spanMeta, err := d.r.AmbientContext.Tracer.ExtractMetaFrom(tracing.MapCarrier{ - Map: cmd.raftCmd.TraceData, + Map: cmd.Cmd.TraceData, }) if err != nil { log.Errorf(ctx, "unable to extract trace data from raft command: %s", err) diff --git a/pkg/kv/kvserver/replica_application_result.go b/pkg/kv/kvserver/replica_application_result.go index ee6496d99f41..a23d4e5811a4 100644 --- a/pkg/kv/kvserver/replica_application_result.go +++ b/pkg/kv/kvserver/replica_application_result.go @@ -118,7 +118,7 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) { if filter := r.store.cfg.TestingKnobs.TestingPostApplyFilter; filter != nil { var newPropRetry int newPropRetry, pErr = filter(kvserverbase.ApplyFilterArgs{ - CmdID: cmd.idKey, + CmdID: cmd.ID, ReplicatedEvalResult: *cmd.replicatedResult(), StoreID: r.store.StoreID(), RangeID: r.RangeID, @@ -191,7 +191,7 @@ func (r *Replica) tryReproposeWithNewLeaseIndex( // lease here - if we got this far, we know that everything but the // index is valid at this point in the log. p := cmd.proposal - if p.applied || cmd.raftCmd.MaxLeaseIndex != p.command.MaxLeaseIndex { + if p.applied || cmd.Cmd.MaxLeaseIndex != p.command.MaxLeaseIndex { // If the command associated with this rejected raft entry already // applied then we don't want to repropose it. Doing so could lead // to duplicate application of the same proposal. @@ -235,7 +235,7 @@ func (r *Replica) tryReproposeWithNewLeaseIndex( if pErr != nil { return pErr } - log.VEventf(ctx, 2, "reproposed command %x", cmd.idKey) + log.VEventf(ctx, 2, "reproposed command %x", cmd.ID) return nil } diff --git a/pkg/kv/kvserver/replica_application_state_machine.go b/pkg/kv/kvserver/replica_application_state_machine.go index 16e6e8a104a2..722b7016d97f 100644 --- a/pkg/kv/kvserver/replica_application_state_machine.go +++ b/pkg/kv/kvserver/replica_application_state_machine.go @@ -32,7 +32,6 @@ import ( "github.com/cockroachdb/redact" "github.com/kr/pretty" "go.etcd.io/etcd/raft/v3" - "go.etcd.io/etcd/raft/v3/raftpb" ) // replica_application_*.go files provide concrete implementations of @@ -135,12 +134,12 @@ func (r *Replica) shouldApplyCommand( ctx context.Context, cmd *replicatedCmd, replicaState *kvserverpb.ReplicaState, ) bool { cmd.leaseIndex, cmd.proposalRetry, cmd.forcedErr = checkForcedErr( - ctx, cmd.idKey, &cmd.raftCmd, cmd.IsLocal(), replicaState, + ctx, cmd.ID, &cmd.Cmd, cmd.IsLocal(), replicaState, ) // Consider testing-only filters. if filter := r.store.cfg.TestingKnobs.TestingApplyFilter; cmd.forcedErr != nil || filter != nil { args := kvserverbase.ApplyFilterArgs{ - CmdID: cmd.idKey, + CmdID: cmd.ID, ReplicatedEvalResult: *cmd.replicatedResult(), StoreID: r.store.StoreID(), RangeID: r.RangeID, @@ -467,17 +466,17 @@ func (b *replicaAppBatch) Stage( ctx context.Context, cmdI apply.Command, ) (apply.CheckedCommand, error) { cmd := cmdI.(*replicatedCmd) - if cmd.ent.Index == 0 { + if cmd.Ent.Index == 0 { return nil, makeNonDeterministicFailure("processRaftCommand requires a non-zero index") } - if idx, applied := cmd.ent.Index, b.state.RaftAppliedIndex; idx != applied+1 { + if idx, applied := cmd.Ent.Index, b.state.RaftAppliedIndex; idx != applied+1 { // If we have an out of order index, there's corruption. No sense in // trying to update anything or running the command. Simply return. return nil, makeNonDeterministicFailure("applied index jumped from %d to %d", applied, idx) } if log.V(4) { log.Infof(ctx, "processing command %x: raftIndex=%d maxLeaseIndex=%d closedts=%s", - cmd.idKey, cmd.ent.Index, cmd.raftCmd.MaxLeaseIndex, cmd.raftCmd.ClosedTimestamp) + cmd.ID, cmd.Ent.Index, cmd.Cmd.MaxLeaseIndex, cmd.Cmd.ClosedTimestamp) } // Determine whether the command should be applied to the replicated state @@ -488,10 +487,10 @@ func (b *replicaAppBatch) Stage( log.VEventf(ctx, 1, "applying command with forced error: %s", cmd.forcedErr) // Apply an empty command. - cmd.raftCmd.ReplicatedEvalResult = kvserverpb.ReplicatedEvalResult{} - cmd.raftCmd.WriteBatch = nil - cmd.raftCmd.LogicalOpLog = nil - cmd.raftCmd.ClosedTimestamp = nil + cmd.Cmd.ReplicatedEvalResult = kvserverpb.ReplicatedEvalResult{} + cmd.Cmd.WriteBatch = nil + cmd.Cmd.LogicalOpLog = nil + cmd.Cmd.ClosedTimestamp = nil } else { if err := b.assertNoCmdClosedTimestampRegression(ctx, cmd); err != nil { return nil, err @@ -509,8 +508,8 @@ func (b *replicaAppBatch) Stage( // TODO(tbg): can't this happen in splitPreApply which is called from // b.runPreApplyTriggersAfterStagingWriteBatch and similar for merges? That // way, it would become less of a one-off. - if splitMergeUnlock, err := b.r.maybeAcquireSplitMergeLock(ctx, cmd.raftCmd); err != nil { - if cmd.raftCmd.ReplicatedEvalResult.Split != nil { + if splitMergeUnlock, err := b.r.maybeAcquireSplitMergeLock(ctx, cmd.Cmd); err != nil { + if cmd.Cmd.ReplicatedEvalResult.Split != nil { err = wrapWithNonDeterministicFailure(err, "unable to acquire split lock") } else { err = wrapWithNonDeterministicFailure(err, "unable to acquire merge lock") @@ -553,7 +552,7 @@ func (b *replicaAppBatch) Stage( // them in the batch) is sufficient. b.stageTrivialReplicatedEvalResult(ctx, cmd) b.entries++ - if len(cmd.ent.Data) == 0 { + if len(cmd.Ent.Data) == 0 { b.emptyEntries++ } @@ -580,7 +579,7 @@ func (b *replicaAppBatch) migrateReplicatedResult(ctx context.Context, cmd *repl // stageWriteBatch applies the command's write batch to the application batch's // RocksDB batch. This batch is committed to RocksDB in replicaAppBatch.commit. func (b *replicaAppBatch) stageWriteBatch(ctx context.Context, cmd *replicatedCmd) error { - wb := cmd.raftCmd.WriteBatch + wb := cmd.Cmd.WriteBatch if wb == nil { return nil } @@ -613,7 +612,7 @@ func changeRemovesStore( func (b *replicaAppBatch) runPreApplyTriggersBeforeStagingWriteBatch( ctx context.Context, cmd *replicatedCmd, ) error { - if ops := cmd.raftCmd.LogicalOpLog; ops != nil { + if ops := cmd.Cmd.LogicalOpLog; ops != nil { b.r.populatePrevValsInLogicalOpLogRaftMuLocked(ctx, ops, b.batch) } return nil @@ -656,8 +655,8 @@ func (b *replicaAppBatch) runPreApplyTriggersAfterStagingWriteBatch( b.r.store.cfg.Settings, b.r.store.engine, b.r.raftMu.sideloaded, - cmd.ent.Term, - cmd.ent.Index, + cmd.Ent.Term, + cmd.Ent.Index, *res.AddSSTable, b.r.store.limiters.BulkIOWriteRate, ) @@ -685,7 +684,7 @@ func (b *replicaAppBatch) runPreApplyTriggersAfterStagingWriteBatch( // // Alternatively if we discover that the RHS has already been removed // from this store, clean up its data. - splitPreApply(ctx, b.r, b.batch, res.Split.SplitTrigger, cmd.raftCmd.ClosedTimestamp) + splitPreApply(ctx, b.r, b.batch, res.Split.SplitTrigger, cmd.Cmd.ClosedTimestamp) // The rangefeed processor will no longer be provided logical ops for // its entire range, so it needs to be shut down and all registrations @@ -833,10 +832,10 @@ func (b *replicaAppBatch) runPreApplyTriggersAfterStagingWriteBatch( // the logical operation log to also be nil. We don't want to trigger a // shutdown of the rangefeed in that situation, so we don't pass anything to // the rangefeed. If no rangefeed is running at all, this call will be a noop. - if ops := cmd.raftCmd.LogicalOpLog; cmd.raftCmd.WriteBatch != nil { + if ops := cmd.Cmd.LogicalOpLog; cmd.Cmd.WriteBatch != nil { b.r.handleLogicalOpLogRaftMuLocked(ctx, ops, b.batch) } else if ops != nil { - log.Fatalf(ctx, "non-nil logical op log with nil write batch: %v", cmd.raftCmd) + log.Fatalf(ctx, "non-nil logical op log with nil write batch: %v", cmd.Cmd) } return nil @@ -850,7 +849,7 @@ func (b *replicaAppBatch) runPreApplyTriggersAfterStagingWriteBatch( func (b *replicaAppBatch) stageTrivialReplicatedEvalResult( ctx context.Context, cmd *replicatedCmd, ) { - raftAppliedIndex := cmd.ent.Index + raftAppliedIndex := cmd.Ent.Index if raftAppliedIndex == 0 { log.Fatalf(ctx, "raft entry with index 0") } @@ -864,13 +863,13 @@ func (b *replicaAppBatch) stageTrivialReplicatedEvalResult( // Replica.mu.state. The latter is used to initialize b.state, so future // calls to this method will see that the migration has already happened // and will continue to populate the term. - b.state.RaftAppliedIndexTerm = cmd.ent.Term + b.state.RaftAppliedIndexTerm = cmd.Ent.Term } if leaseAppliedIndex := cmd.leaseIndex; leaseAppliedIndex != 0 { b.state.LeaseAppliedIndex = leaseAppliedIndex } - if cts := cmd.raftCmd.ClosedTimestamp; cts != nil && !cts.IsEmpty() { + if cts := cmd.Cmd.ClosedTimestamp; cts != nil && !cts.IsEmpty() { b.state.RaftClosedTimestamp = *cts b.closedTimestampSetter.record(cmd, b.state.Lease) if clockTS, ok := cts.TryToClockTimestamp(); ok { @@ -1028,8 +1027,8 @@ var raftClosedTimestampAssertionsEnabled = envutil.EnvOrDefaultBool("COCKROACH_R // // Note that we check that we're we're writing under b.state.RaftClosedTimestamp // (i.e. below the timestamp closed by previous commands), not below -// cmd.raftCmd.ClosedTimestamp. A command is allowed to write below the closed -// timestamp carried by itself; in other words cmd.raftCmd.ClosedTimestamp is a +// cmd.Cmd.ClosedTimestamp. A command is allowed to write below the closed +// timestamp carried by itself; in other words cmd.Cmd.ClosedTimestamp is a // promise about future commands, not the command carrying it. func (b *replicaAppBatch) assertNoWriteBelowClosedTimestamp(cmd *replicatedCmd) error { if !cmd.IsLocal() || !cmd.proposal.Request.AppliesTimestampCache() { @@ -1038,7 +1037,7 @@ func (b *replicaAppBatch) assertNoWriteBelowClosedTimestamp(cmd *replicatedCmd) if !raftClosedTimestampAssertionsEnabled { return nil } - wts := cmd.raftCmd.ReplicatedEvalResult.WriteTimestamp + wts := cmd.Cmd.ReplicatedEvalResult.WriteTimestamp if !wts.IsEmpty() && wts.LessEq(b.state.RaftClosedTimestamp) { wts := wts // Make a shadow variable that escapes to the heap. var req redact.StringBuilder @@ -1052,8 +1051,8 @@ func (b *replicaAppBatch) assertNoWriteBelowClosedTimestamp(cmd *replicatedCmd) "batch state closed: %s, command closed: %s, request: %s, lease: %s.\n"+ "This assertion will fire again on restart; to ignore run with env var\n"+ "COCKROACH_RAFT_CLOSEDTS_ASSERTIONS_ENABLED=false", - cmd.idKey, wts, - b.state.RaftClosedTimestamp, cmd.raftCmd.ClosedTimestamp, + cmd.ID, wts, + b.state.RaftClosedTimestamp, cmd.Cmd.ClosedTimestamp, req, b.state.Lease), "command writing below closed timestamp") } @@ -1069,7 +1068,7 @@ func (b *replicaAppBatch) assertNoCmdClosedTimestampRegression( return nil } existingClosed := &b.state.RaftClosedTimestamp - newClosed := cmd.raftCmd.ClosedTimestamp + newClosed := cmd.Cmd.ClosedTimestamp if newClosed != nil && !newClosed.IsEmpty() && newClosed.Less(*existingClosed) { var req redact.StringBuilder if cmd.IsLocal() { @@ -1098,7 +1097,7 @@ func (b *replicaAppBatch) assertNoCmdClosedTimestampRegression( "Closed timestamp was set by req: %s under lease: %s; applied at LAI: %d. Batch idx: %d.\n"+ "This assertion will fire again on restart; to ignore run with env var COCKROACH_RAFT_CLOSEDTS_ASSERTIONS_ENABLED=false\n"+ "Raft log tail:\n%s", - cmd.idKey, cmd.ent.Term, cmd.ent.Index, existingClosed, newClosed, b.state.Lease, req, cmd.leaseIndex, + cmd.ID, cmd.Ent.Term, cmd.Ent.Index, existingClosed, newClosed, b.state.Lease, req, cmd.leaseIndex, prevReq, b.closedTimestampSetter.lease, b.closedTimestampSetter.leaseIdx, b.entries, logTail) } @@ -1207,7 +1206,7 @@ func (sm *replicaStateMachine) ApplySideEffects( } rejected := cmd.Rejected() - higherReproposalsExist := cmd.raftCmd.MaxLeaseIndex != cmd.proposal.command.MaxLeaseIndex + higherReproposalsExist := cmd.Cmd.MaxLeaseIndex != cmd.proposal.command.MaxLeaseIndex if !rejected && higherReproposalsExist { log.Fatalf(ctx, "finishing proposal with outstanding reproposal at a higher max lease index") } @@ -1228,7 +1227,7 @@ func (sm *replicaStateMachine) ApplySideEffects( // have failed because it carried the same MaxLeaseIndex. if higherReproposalsExist { sm.r.mu.Lock() - delete(sm.r.mu.proposals, cmd.idKey) + delete(sm.r.mu.proposals, cmd.ID) sm.r.mu.Unlock() } cmd.proposal.applied = true @@ -1327,77 +1326,74 @@ func (sm *replicaStateMachine) handleNonTrivialReplicatedEvalResult( } func (sm *replicaStateMachine) maybeApplyConfChange(ctx context.Context, cmd *replicatedCmd) error { - switch cmd.ent.Type { - case raftpb.EntryNormal: + cc := cmd.CC() + if cc == nil { return nil - case raftpb.EntryConfChange, raftpb.EntryConfChangeV2: - sm.stats.numConfChangeEntries++ - if cmd.Rejected() { - // The command was rejected. There is no need to report a ConfChange - // to raft. - return nil - } - return sm.r.withRaftGroup(true, func(rn *raft.RawNode) (bool, error) { - // NB: `etcd/raft` configuration changes diverge from the official Raft way - // in that a configuration change becomes active when the corresponding log - // entry is applied (rather than appended). This ultimately enables the way - // we do things where the state machine's view of the range descriptor always - // dictates the active replication config but it is much trickier to prove - // correct. See: - // - // https://github.com/etcd-io/etcd/issues/7625#issuecomment-489232411 - // - // INVARIANT: a leader will not append a config change to its logs when it - // hasn't applied all previous config changes in its logs. - // - // INVARIANT: a node will not campaign until it has applied any - // configuration changes with indexes less than or equal to its committed - // index. - // - // INVARIANT: appending a config change to the log (at leader or follower) - // implies that any previous config changes are durably known to be - // committed. That is, a commit index is persisted (and synced) that - // encompasses any earlier config changes before a new config change is - // appended[1]. - // - // Together, these invariants ensure that a follower that is behind by - // multiple configuration changes will be using one of the two most recent - // configuration changes "by the time it matters", which is what is - // required for correctness (configuration changes are sequenced so that - // neighboring configurations are mutually compatible, i.e. don't cause - // split brain). To see this, consider a follower that is behind by - // multiple configuration changes. This is fine unless this follower - // becomes the leader (as it would then make quorum determinations based - // on its active config). To become leader, it needs to campaign, and - // thanks to the second invariant, it will only do so once it has applied - // all the configuration changes in its committed log. If it is to win the - // election, it will also have all committed configuration changes in its - // log (though not necessarily knowing that they are all committed). But - // the third invariant implies that when the follower received the most - // recent configuration change into its log, the one preceding it was - // durably marked as committed on the follower. In summary, we now know - // that it will apply all the way up to and including the second most - // recent configuration change, which is compatible with the most recent - // one. - // - // [1]: this rests on shaky and, in particular, untested foundations in - // etcd/raft and our syncing behavior. The argument goes as follows: - // because the leader will have at most one config change in flight at a - // given time, it will definitely wait until the previous config change is - // committed until accepting the next one. `etcd/raft` will always attach - // the optimal commit index to appends to followers, so each config change - // will mark the previous one as committed upon receipt, since we sync on - // append (as we have to) we make that HardState.Commit durable. Finally, - // when a follower is catching up on larger chunks of the historical log, - // it will receive batches of entries together with a committed index - // encompassing the entire batch, again making sure that these batches are - // durably committed upon receipt. - rn.ApplyConfChange(cmd.confChange.ConfChangeI) - return true, nil - }) - default: - panic("unexpected") } + sm.stats.numConfChangeEntries++ + if cmd.Rejected() { + // The command was rejected. There is no need to report a ConfChange + // to raft. + return nil + } + return sm.r.withRaftGroup(true, func(rn *raft.RawNode) (bool, error) { + // NB: `etcd/raft` configuration changes diverge from the official Raft way + // in that a configuration change becomes active when the corresponding log + // entry is applied (rather than appended). This ultimately enables the way + // we do things where the state machine's view of the range descriptor always + // dictates the active replication config but it is much trickier to prove + // correct. See: + // + // https://github.com/etcd-io/etcd/issues/7625#issuecomment-489232411 + // + // INVARIANT: a leader will not append a config change to its logs when it + // hasn't applied all previous config changes in its logs. + // + // INVARIANT: a node will not campaign until it has applied any + // configuration changes with indexes less than or equal to its committed + // index. + // + // INVARIANT: appending a config change to the log (at leader or follower) + // implies that any previous config changes are durably known to be + // committed. That is, a commit index is persisted (and synced) that + // encompasses any earlier config changes before a new config change is + // appended[1]. + // + // Together, these invariants ensure that a follower that is behind by + // multiple configuration changes will be using one of the two most recent + // configuration changes "by the time it matters", which is what is + // required for correctness (configuration changes are sequenced so that + // neighboring configurations are mutually compatible, i.e. don't cause + // split brain). To see this, consider a follower that is behind by + // multiple configuration changes. This is fine unless this follower + // becomes the leader (as it would then make quorum determinations based + // on its active config). To become leader, it needs to campaign, and + // thanks to the second invariant, it will only do so once it has applied + // all the configuration changes in its committed log. If it is to win the + // election, it will also have all committed configuration changes in its + // log (though not necessarily knowing that they are all committed). But + // the third invariant implies that when the follower received the most + // recent configuration change into its log, the one preceding it was + // durably marked as committed on the follower. In summary, we now know + // that it will apply all the way up to and including the second most + // recent configuration change, which is compatible with the most recent + // one. + // + // [1]: this rests on shaky and, in particular, untested foundations in + // etcd/raft and our syncing behavior. The argument goes as follows: + // because the leader will have at most one config change in flight at a + // given time, it will definitely wait until the previous config change is + // committed until accepting the next one. `etcd/raft` will always attach + // the optimal commit index to appends to followers, so each config change + // will mark the previous one as committed upon receipt, since we sync on + // append (as we have to) we make that HardState.Commit durable. Finally, + // when a follower is catching up on larger chunks of the historical log, + // it will receive batches of entries together with a committed index + // encompassing the entire batch, again making sure that these batches are + // durably committed upon receipt. + rn.ApplyConfChange(cc) + return true, nil + }) } func (sm *replicaStateMachine) moveStats() applyCommittedEntriesStats { diff --git a/pkg/kv/kvserver/replica_application_state_machine_test.go b/pkg/kv/kvserver/replica_application_state_machine_test.go index 3bc612bd6825..5ec2fcfe8bc8 100644 --- a/pkg/kv/kvserver/replica_application_state_machine_test.go +++ b/pkg/kv/kvserver/replica_application_state_machine_test.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/apply" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -89,23 +90,23 @@ func TestReplicaStateMachineChangeReplicas(t *testing.T) { // Stage a command with the ChangeReplicas trigger. cmd := &replicatedCmd{ ctx: ctx, - ent: &raftpb.Entry{ - Index: r.mu.state.RaftAppliedIndex + 1, - Type: raftpb.EntryConfChange, - }, decodedRaftEntry: decodedRaftEntry{ - idKey: makeIDKey(), - raftCmd: kvserverpb.RaftCommand{ - ProposerLeaseSequence: r.mu.state.Lease.Sequence, - MaxLeaseIndex: r.mu.state.LeaseAppliedIndex + 1, - ReplicatedEvalResult: kvserverpb.ReplicatedEvalResult{ - State: &kvserverpb.ReplicaState{Desc: &newDesc}, - ChangeReplicas: &kvserverpb.ChangeReplicas{ChangeReplicasTrigger: trigger}, - WriteTimestamp: r.mu.state.GCThreshold.Add(1, 0), + Entry: raftlog.Entry{ + Ent: raftpb.Entry{ + Index: r.mu.state.RaftAppliedIndex + 1, + Type: raftpb.EntryConfChange, }, - }, - confChange: &decodedConfChange{ - ConfChangeI: confChange, + ID: makeIDKey(), + Cmd: kvserverpb.RaftCommand{ + ProposerLeaseSequence: r.mu.state.Lease.Sequence, + MaxLeaseIndex: r.mu.state.LeaseAppliedIndex + 1, + ReplicatedEvalResult: kvserverpb.ReplicatedEvalResult{ + State: &kvserverpb.ReplicaState{Desc: &newDesc}, + ChangeReplicas: &kvserverpb.ChangeReplicas{ChangeReplicasTrigger: trigger}, + WriteTimestamp: r.mu.state.GCThreshold.Add(1, 0), + }, + }, + CC1: &confChange, }, }, } @@ -113,8 +114,8 @@ func TestReplicaStateMachineChangeReplicas(t *testing.T) { checkedCmd, err := b.Stage(cmd.ctx, cmd) require.NoError(t, err) require.Equal(t, !add, b.changeRemovesReplica) - require.Equal(t, b.state.RaftAppliedIndex, cmd.ent.Index) - require.Equal(t, b.state.LeaseAppliedIndex, cmd.raftCmd.MaxLeaseIndex) + require.Equal(t, b.state.RaftAppliedIndex, cmd.Ent.Index) + require.Equal(t, b.state.LeaseAppliedIndex, cmd.Cmd.MaxLeaseIndex) // Check the replica's destroy status. reason, _ := r.IsDestroyed()