diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index c8dd4dfa10ee..e4c31946b1fe 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -215,6 +215,7 @@ go_test( "below_raft_protos_test.go", "client_atomic_membership_change_test.go", "client_lease_test.go", + "client_manual_proposal_test.go", "client_merge_test.go", "client_metrics_test.go", "client_migration_test.go", diff --git a/pkg/kv/kvserver/client_manual_proposal_test.go b/pkg/kv/kvserver/client_manual_proposal_test.go new file mode 100644 index 000000000000..adad985bad0b --- /dev/null +++ b/pkg/kv/kvserver/client_manual_proposal_test.go @@ -0,0 +1,136 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/require" +) + +// TestCreateManualProposal tracks progress on #75729. Ultimately we would like +// to be able to programmatically build up a raft log from a sequence of +// BatchRequests and apply it to an initial state, all without instantiating +// a Replica. +func TestCreateManualProposal(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + + eng := storage.NewDefaultInMemForTesting() + defer eng.Close() + b := storage.NewOpLoggerBatch(eng.NewBatch()) + defer b.Close() + + const ( + rangeID = roachpb.RangeID(1) + replicaID = roachpb.ReplicaID(1) + ) + replicas := roachpb.MakeReplicaSet(nil) + replicas.AddReplica(roachpb.ReplicaDescriptor{ + NodeID: 1, + StoreID: 1, + ReplicaID: replicaID, + Type: roachpb.ReplicaTypeVoterFull(), + }) + desc := roachpb.NewRangeDescriptor(rangeID, roachpb.RKeyMin, roachpb.RKeyMax, replicas) + require.NoError(t, stateloader.WriteInitialRangeState( + ctx, eng, *desc, replicaID, clusterversion.TestingBinaryVersion, + )) + + put := roachpb.NewPut(roachpb.Key("foo"), roachpb.MakeValueFromString("bar")) + + cmd, ok := batcheval.LookupCommand(put.Method()) + require.True(t, ok) + + evalCtx := batcheval.MockEvalCtx{} + + // NB: this should really operate on a BatchRequest. We need to librarize + // evaluateBatch: + // https://github.com/cockroachdb/cockroach/blob/9c09473ec9da9d458869abb3fe08a9db251c9291/pkg/kv/kvserver/replica_evaluate.go#L141-L153 + // The good news is, this is already in good shape! Just needs to be moved + // to a leaf package, like `batcheval`. + // To use the "true" logic we want this for sure, and probably even the + // caller of evaluateBatch and a few more levels. Pulling it out until + // there's nothing left basically. + + resp := &roachpb.PutResponse{} + args := batcheval.CommandArgs{ + EvalCtx: evalCtx.EvalContext(), + Header: roachpb.Header{}, + Args: put, + Stats: &enginepb.MVCCStats{}, + Uncertainty: uncertainty.Interval{}, + } + res, err := cmd.EvalRW(ctx, b, args, resp) + require.NoError(t, err) + // TODO: there's more stuff in evaluateProposal that would need to be lifted + // here: + // https://github.com/cockroachdb/cockroach/blob/f048ab082c58ec0357b2ecad763606ef64faa3b7/pkg/kv/kvserver/replica_proposal.go#L842-L869 + res.WriteBatch = &kvserverpb.WriteBatch{Data: b.Repr()} + res.LogicalOpLog = &kvserverpb.LogicalOpLog{Ops: b.LogicalOps()} + + // End of evaluation. Start of "proposing". + + // TODO: the "requires consensus" logic is not reusable, make it so: + // https://github.com/cockroachdb/cockroach/blob/f048ab082c58ec0357b2ecad763606ef64faa3b7/pkg/kv/kvserver/replica_proposal.go#L827-L840 + + sl := stateloader.Make(rangeID) + st, err := sl.LoadLease(ctx, eng) + require.NoError(t, err) + raftCmd := kvserverpb.RaftCommand{ + // Propose under latest lease, this isn't necessarily what you want (in a + // test) but it reflects the steady state when proposing under the leader. + // To also support proposing leases itself, we need this whole chunk of code + // to be reusable: + // https://github.com/cockroachdb/cockroach/blob/9a7b735b1282bbb3fb7472cc26a47d516a446958/pkg/kv/kvserver/replica_raft.go#L192-L219 + // Really we probably just want to librarize the relevant parts of + // evalAndPropose and requestToProposal. + ProposerLeaseSequence: st.Sequence, + // TODO: this would need to be assigned usefully as well if this log has + // any chance of applying. So when we build a log, we need a state keeper + // that uses sane defaults but can be overridden if test so desires. The + // prod code assigns this when flushing from the proposal buffer. + MaxLeaseIndex: 0, + // Ditto. + ClosedTimestamp: nil, + + TraceData: nil, // can be injected at will + + // Rest was determined by evaluation. + ReplicatedEvalResult: res.Replicated, + WriteBatch: res.WriteBatch, + LogicalOpLog: res.LogicalOpLog, + } + + // TODO need library code to make an entry. Also something to write it + // to disk (these have to be separate things since we hand the entry to + // raft and then raft hands it back to us via the raft.Storage). + // In effect this means librarizing `(*Replica).propose`. + idKey := kvserver.MakeIDKey() + payload, err := kvserver.RaftCmdToPayload(ctx, idKey, &raftCmd, replicaID) + require.NoError(t, err) + // This is something suitable for raft.Propose{,ConfChange}... + _ = payload + // ... which will result in a log entry being written out in (*Replica).append: + // https://github.com/cockroachdb/cockroach/blob/9a7b735b1282bbb3fb7472cc26a47d516a446958/pkg/kv/kvserver/replica_raftstorage.go#L642-L649 + // which should definitely also lean on a library-style method. +} diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index 886a230addf2..136a2867b4d2 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer" @@ -545,3 +546,20 @@ func WatchForDisappearingReplicas(t testing.TB, store *Store) { } } } + +// RaftCmdToPayload wraps raftCmdToPayload for testing. +func RaftCmdToPayload( + ctx context.Context, + cmdIDKey kvserverbase.CmdIDKey, + raftCmd *kvserverpb.RaftCommand, + replID roachpb.ReplicaID, +) ([]byte, error) { + return raftCmdToPayload(ctx, raftCmd, cmdIDKey, replID, &roachpb.BatchRequest{}, func() {}) +} + +// MakeIDKey exports makeIDKey for testing. +// +// TODO(tbg): this should be in the `raftlog` package. +func MakeIDKey() kvserverbase.CmdIDKey { + return makeIDKey() +} diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index a3b824a26604..ef1fcc0e0bdc 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -65,6 +65,8 @@ var ( func makeIDKey() kvserverbase.CmdIDKey { idKeyBuf := make([]byte, 0, kvserverbase.RaftCommandIDLen) + // TODO(tbg): this goes through a global mutex, which can't be helpful. Pass + // in a rand instead. idKeyBuf = encoding.EncodeUint64Ascending(idKeyBuf, uint64(rand.Int63())) return kvserverbase.CmdIDKey(idKeyBuf) } @@ -290,46 +292,26 @@ func (r *Replica) evalAndPropose( return proposalCh, abandon, idKey, nil } -// propose encodes a command, starts tracking it, and proposes it to Raft. -// -// The method hands ownership of the command over to the Raft machinery. After -// the method returns, all access to the command must be performed while holding -// Replica.mu and Replica.raftMu. -// -// propose takes ownership of the supplied token; the caller should tok.Move() -// it into this method. It will be used to untrack the request once it comes out -// of the proposal buffer. -func (r *Replica) propose( - ctx context.Context, p *ProposalData, tok TrackedRequestToken, -) (pErr *roachpb.Error) { - defer tok.DoneIfNotMoved(ctx) - - // If an error occurs reset the command's MaxLeaseIndex to its initial value. - // Failure to propose will propagate to the client. An invariant of this - // package is that proposals which are finished carry a raft command with a - // MaxLeaseIndex equal to the proposal command's max lease index. - defer func(prev uint64) { - if pErr != nil { - p.command.MaxLeaseIndex = prev - } - }(p.command.MaxLeaseIndex) - - // Make sure the maximum lease index is unset. This field will be set in - // propBuf.Insert and its encoded bytes will be appended to the encoding - // buffer as a MaxLeaseFooter. - p.command.MaxLeaseIndex = 0 - +// TODO(tbg): extract logging singleton. +func raftCmdToPayload( + ctx context.Context, + command *kvserverpb.RaftCommand, + idKey kvserverbase.CmdIDKey, + replID roachpb.ReplicaID, + ba *roachpb.BatchRequest, // TODO(tbg): only for logging, remove + onAddSSTable func(), // TODO(tbg): only for metrics , remove +) ([]byte, error) { // Determine the encoding style for the Raft command. prefix := true version := kvserverbase.RaftVersionStandard - if crt := p.command.ReplicatedEvalResult.ChangeReplicas; crt != nil { + if crt := command.ReplicatedEvalResult.ChangeReplicas; crt != nil { // EndTxnRequest with a ChangeReplicasTrigger is special because Raft // needs to understand it; it cannot simply be an opaque command. To // permit this, the command is proposed by the proposal buffer using // ProposeConfChange. For that reason, we also don't need a Raft command // prefix because the command ID is stored in a field in // raft.ConfChange. - log.Infof(p.ctx, "proposing %s", crt) + log.Infof(ctx, "proposing %s", crt) prefix = false // Ensure that we aren't trying to remove ourselves from the range without @@ -345,26 +327,25 @@ func (r *Replica) propose( // since there's no stopping the actual removal/demotion once it's there. // IsVoterNewConfig checks that the leaseholder is a voter in the // proposed configuration. - replID := r.ReplicaID() - rDesc, ok := p.command.ReplicatedEvalResult.State.Desc.GetReplicaDescriptorByID(replID) + rDesc, ok := command.ReplicatedEvalResult.State.Desc.GetReplicaDescriptorByID(replID) for !ok || !rDesc.IsVoterNewConfig() { if rDesc.ReplicaID == replID { err := errors.Mark(errors.Newf("received invalid ChangeReplicasTrigger %s to remove self (leaseholder)", crt), errMarkInvalidReplicationChange) - log.Errorf(p.ctx, "%v", err) - return roachpb.NewError(err) + log.Errorf(ctx, "%v", err) + return nil, err } } - } else if p.command.ReplicatedEvalResult.AddSSTable != nil { - log.VEvent(p.ctx, 4, "sideloadable proposal detected") + } else if command.ReplicatedEvalResult.AddSSTable != nil { + log.VEvent(ctx, 4, "sideloadable proposal detected") version = kvserverbase.RaftVersionSideloaded - r.store.metrics.AddSSTableProposals.Inc(1) + onAddSSTable() - if p.command.ReplicatedEvalResult.AddSSTable.Data == nil { - return roachpb.NewErrorf("cannot sideload empty SSTable") + if command.ReplicatedEvalResult.AddSSTable.Data == nil { + return nil, errors.New("cannot sideload empty SSTable") } } else if log.V(4) { - log.Infof(p.ctx, "proposing command %x: %s", p.idKey, p.Request.Summary()) + log.Infof(ctx, "proposing command %x: %s", idKey, ba.Summary()) } // Create encoding buffer. @@ -372,33 +353,32 @@ func (r *Replica) propose( if prefix { preLen = kvserverbase.RaftCommandPrefixLen } - cmdLen := p.command.Size() + cmdLen := command.Size() // Allocate the data slice with enough capacity to eventually hold the two // "footers" that are filled later. needed := preLen + cmdLen + kvserverpb.MaxRaftCommandFooterSize() data := make([]byte, preLen, needed) // Encode prefix with command ID, if necessary. if prefix { - kvserverbase.EncodeRaftCommandPrefix(data, version, p.idKey) + kvserverbase.EncodeRaftCommandPrefix(data, version, idKey) } // Encode body of command. data = data[:preLen+cmdLen] - if _, err := protoutil.MarshalTo(p.command, data[preLen:]); err != nil { - return roachpb.NewError(err) + if _, err := protoutil.MarshalTo(command, data[preLen:]); err != nil { + return nil, err } - p.encodedCommand = data // Too verbose even for verbose logging, so manually enable if you want to // debug proposal sizes. if false { - log.Infof(p.ctx, `%s: proposal: %d + log.Infof(ctx, `%s: proposal: %d RaftCommand.ReplicatedEvalResult: %d RaftCommand.ReplicatedEvalResult.Delta: %d RaftCommand.WriteBatch: %d -`, p.Request.Summary(), cmdLen, - p.command.ReplicatedEvalResult.Size(), - p.command.ReplicatedEvalResult.Delta.Size(), - p.command.WriteBatch.Size(), +`, ba.Summary(), cmdLen, + command.ReplicatedEvalResult.Size(), + command.ReplicatedEvalResult.Delta.Size(), + command.WriteBatch.Size(), ) } @@ -408,17 +388,58 @@ func (r *Replica) propose( // TODO(tschottdorf): can we mark them so lightstep can group them? const largeProposalEventThresholdBytes = 2 << 19 // 512kb if cmdLen > largeProposalEventThresholdBytes { - log.Eventf(p.ctx, "proposal is large: %s", humanizeutil.IBytes(int64(cmdLen))) + log.Eventf(ctx, "proposal is large: %s", humanizeutil.IBytes(int64(cmdLen))) } + return data, nil +} + +// propose encodes a command, starts tracking it, and proposes it to Raft. +// +// The method hands ownership of the command over to the Raft machinery. After +// the method returns, all access to the command must be performed while holding +// Replica.mu and Replica.raftMu. +// +// propose takes ownership of the supplied token; the caller should tok.Move() +// it into this method. It will be used to untrack the request once it comes out +// of the proposal buffer. +func (r *Replica) propose( + ctx context.Context, p *ProposalData, tok TrackedRequestToken, +) (pErr *roachpb.Error) { + defer tok.DoneIfNotMoved(ctx) + replID := r.ReplicaID() + onAddSSTable := func() { + r.store.metrics.AddSSTableProposals.Inc(1) + } + + // If an error occurs reset the command's MaxLeaseIndex to its initial value. + // Failure to propose will propagate to the client. An invariant of this + // package is that proposals which are finished carry a raft command with a + // MaxLeaseIndex equal to the proposal command's max lease index. + defer func(prev uint64) { + if pErr != nil { + p.command.MaxLeaseIndex = prev + } + }(p.command.MaxLeaseIndex) + + // Make sure the maximum lease index is unset. This field will be set in + // propBuf.Insert and its encoded bytes will be appended to the encoding + // buffer as a MaxLeaseFooter. + p.command.MaxLeaseIndex = 0 + + payload, err := raftCmdToPayload(p.ctx, p.command, p.idKey, replID, p.Request, onAddSSTable) + if err != nil { + return roachpb.NewError(err) + } + p.encodedCommand = payload + // Insert into the proposal buffer, which passes the command to Raft to be // proposed. The proposal buffer assigns the command a maximum lease index // when it sequences it. // // NB: we must not hold r.mu while using the proposal buffer, see comment // on the field. - err := r.mu.proposalBuf.Insert(ctx, p, tok.Move(ctx)) - if err != nil { + if err := r.mu.proposalBuf.Insert(ctx, p, tok.Move(ctx)); err != nil { return roachpb.NewError(err) } return nil