Skip to content

Commit

Permalink
kvserver: start making proposal generation reusable
Browse files Browse the repository at this point in the history
This commit introduces `TestCreateManualProposal` which attempts to
carry out an "offline" (i.e. `Replica`-less) version of evaluation &
entry creation (i.e. the middle and right-hand side [here]). We start
with a `*roachpb.PutRequest` and end up with a payload fit for a
`(raftpb.Entry).Data`.

This is helpful since it provides an easily digestible "life of a
proposal" read-along, but more importantly represents a step towards
establishing clearer interfaces for the various stages of a request -
evaluation, replication, and application - and in particular separation
of these stages from the surrounding `Replica`. The test helps pinpoint
pieces of code that will need to be worked on in order to reach one of
the goals of cockroachdb#75729, being able to create a meaningful raft log
programmatically in a unit test without also instantiating a `Replica`
and, later, also applying raft logs in a similar way.

As a first step, this commit extracts a method `raftCmdToPayload` from
`(*Replica).propose` which translates a `kvserverpb.RaftCommand` (which
is roughly the result of evaluating a `BatchRequest`) into a payload
that can be handed to raft for replication. This extracted method is far
from clean - it takes a few crufty parameters, lives in `kvserver`
(where it is hard to import) and also uses the logging singleton which
is undesirable for a library method. However, it's enough to keep the
ball rolling, and can be improved on later.

[here]: cockroachdb#75729 (comment)

Touches cockroachdb#75729.

Release note: None
  • Loading branch information
tbg committed Feb 6, 2022
1 parent 49a0636 commit 4e7b8a1
Show file tree
Hide file tree
Showing 4 changed files with 230 additions and 54 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ go_test(
"below_raft_protos_test.go",
"client_atomic_membership_change_test.go",
"client_lease_test.go",
"client_manual_proposal_test.go",
"client_merge_test.go",
"client_metrics_test.go",
"client_migration_test.go",
Expand Down
136 changes: 136 additions & 0 deletions pkg/kv/kvserver/client_manual_proposal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvserver_test

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/require"
)

// TestCreateManualProposal tracks progress on #75729. Ultimately we would like
// to be able to programmatically build up a raft log from a sequence of
// BatchRequests and apply it to an initial state, all without instantiating
// a Replica.
func TestCreateManualProposal(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()

eng := storage.NewDefaultInMemForTesting()
defer eng.Close()
b := storage.NewOpLoggerBatch(eng.NewBatch())
defer b.Close()

const (
rangeID = roachpb.RangeID(1)
replicaID = roachpb.ReplicaID(1)
)
replicas := roachpb.MakeReplicaSet(nil)
replicas.AddReplica(roachpb.ReplicaDescriptor{
NodeID: 1,
StoreID: 1,
ReplicaID: replicaID,
Type: roachpb.ReplicaTypeVoterFull(),
})
desc := roachpb.NewRangeDescriptor(rangeID, roachpb.RKeyMin, roachpb.RKeyMax, replicas)
require.NoError(t, stateloader.WriteInitialRangeState(
ctx, eng, *desc, replicaID, clusterversion.TestingBinaryVersion,
))

put := roachpb.NewPut(roachpb.Key("foo"), roachpb.MakeValueFromString("bar"))

cmd, ok := batcheval.LookupCommand(put.Method())
require.True(t, ok)

evalCtx := batcheval.MockEvalCtx{}

// NB: this should really operate on a BatchRequest. We need to librarize
// evaluateBatch:
// https://github.com/cockroachdb/cockroach/blob/9c09473ec9da9d458869abb3fe08a9db251c9291/pkg/kv/kvserver/replica_evaluate.go#L141-L153
// The good news is, this is already in good shape! Just needs to be moved
// to a leaf package, like `batcheval`.
// To use the "true" logic we want this for sure, and probably even the
// caller of evaluateBatch and a few more levels. Pulling it out until
// there's nothing left basically.

resp := &roachpb.PutResponse{}
args := batcheval.CommandArgs{
EvalCtx: evalCtx.EvalContext(),
Header: roachpb.Header{},
Args: put,
Stats: &enginepb.MVCCStats{},
Uncertainty: uncertainty.Interval{},
}
res, err := cmd.EvalRW(ctx, b, args, resp)
require.NoError(t, err)
// TODO: there's more stuff in evaluateProposal that would need to be lifted
// here:
// https://github.com/cockroachdb/cockroach/blob/f048ab082c58ec0357b2ecad763606ef64faa3b7/pkg/kv/kvserver/replica_proposal.go#L842-L869
res.WriteBatch = &kvserverpb.WriteBatch{Data: b.Repr()}
res.LogicalOpLog = &kvserverpb.LogicalOpLog{Ops: b.LogicalOps()}

// End of evaluation. Start of "proposing".

// TODO: the "requires consensus" logic is not reusable, make it so:
// https://github.com/cockroachdb/cockroach/blob/f048ab082c58ec0357b2ecad763606ef64faa3b7/pkg/kv/kvserver/replica_proposal.go#L827-L840

sl := stateloader.Make(rangeID)
st, err := sl.LoadLease(ctx, eng)
require.NoError(t, err)
raftCmd := kvserverpb.RaftCommand{
// Propose under latest lease, this isn't necessarily what you want (in a
// test) but it reflects the steady state when proposing under the leader.
// To also support proposing leases itself, we need this whole chunk of code
// to be reusable:
// https://github.com/cockroachdb/cockroach/blob/9a7b735b1282bbb3fb7472cc26a47d516a446958/pkg/kv/kvserver/replica_raft.go#L192-L219
// Really we probably just want to librarize the relevant parts of
// evalAndPropose and requestToProposal.
ProposerLeaseSequence: st.Sequence,
// TODO: this would need to be assigned usefully as well if this log has
// any chance of applying. So when we build a log, we need a state keeper
// that uses sane defaults but can be overridden if test so desires. The
// prod code assigns this when flushing from the proposal buffer.
MaxLeaseIndex: 0,
// Ditto.
ClosedTimestamp: nil,

TraceData: nil, // can be injected at will

// Rest was determined by evaluation.
ReplicatedEvalResult: res.Replicated,
WriteBatch: res.WriteBatch,
LogicalOpLog: res.LogicalOpLog,
}

// TODO need library code to make an entry. Also something to write it
// to disk (these have to be separate things since we hand the entry to
// raft and then raft hands it back to us via the raft.Storage).
// In effect this means librarizing `(*Replica).propose`.
idKey := kvserver.MakeIDKey()
payload, err := kvserver.RaftCmdToPayload(ctx, idKey, &raftCmd, replicaID)
require.NoError(t, err)
// This is something suitable for raft.Propose{,ConfChange}...
_ = payload
// ... which will result in a log entry being written out in (*Replica).append:
// https://github.com/cockroachdb/cockroach/blob/9a7b735b1282bbb3fb7472cc26a47d516a446958/pkg/kv/kvserver/replica_raftstorage.go#L642-L649
// which should definitely also lean on a library-style method.
}
18 changes: 18 additions & 0 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
Expand Down Expand Up @@ -545,3 +546,20 @@ func WatchForDisappearingReplicas(t testing.TB, store *Store) {
}
}
}

// RaftCmdToPayload wraps raftCmdToPayload for testing.
func RaftCmdToPayload(
ctx context.Context,
cmdIDKey kvserverbase.CmdIDKey,
raftCmd *kvserverpb.RaftCommand,
replID roachpb.ReplicaID,
) ([]byte, error) {
return raftCmdToPayload(ctx, raftCmd, cmdIDKey, replID, &roachpb.BatchRequest{}, func() {})
}

// MakeIDKey exports makeIDKey for testing.
//
// TODO(tbg): this should be in the `raftlog` package.
func MakeIDKey() kvserverbase.CmdIDKey {
return makeIDKey()
}
129 changes: 75 additions & 54 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ var (

func makeIDKey() kvserverbase.CmdIDKey {
idKeyBuf := make([]byte, 0, kvserverbase.RaftCommandIDLen)
// TODO(tbg): this goes through a global mutex, which can't be helpful. Pass
// in a rand instead.
idKeyBuf = encoding.EncodeUint64Ascending(idKeyBuf, uint64(rand.Int63()))
return kvserverbase.CmdIDKey(idKeyBuf)
}
Expand Down Expand Up @@ -290,46 +292,26 @@ func (r *Replica) evalAndPropose(
return proposalCh, abandon, idKey, nil
}

// propose encodes a command, starts tracking it, and proposes it to Raft.
//
// The method hands ownership of the command over to the Raft machinery. After
// the method returns, all access to the command must be performed while holding
// Replica.mu and Replica.raftMu.
//
// propose takes ownership of the supplied token; the caller should tok.Move()
// it into this method. It will be used to untrack the request once it comes out
// of the proposal buffer.
func (r *Replica) propose(
ctx context.Context, p *ProposalData, tok TrackedRequestToken,
) (pErr *roachpb.Error) {
defer tok.DoneIfNotMoved(ctx)

// If an error occurs reset the command's MaxLeaseIndex to its initial value.
// Failure to propose will propagate to the client. An invariant of this
// package is that proposals which are finished carry a raft command with a
// MaxLeaseIndex equal to the proposal command's max lease index.
defer func(prev uint64) {
if pErr != nil {
p.command.MaxLeaseIndex = prev
}
}(p.command.MaxLeaseIndex)

// Make sure the maximum lease index is unset. This field will be set in
// propBuf.Insert and its encoded bytes will be appended to the encoding
// buffer as a MaxLeaseFooter.
p.command.MaxLeaseIndex = 0

// TODO(tbg): extract logging singleton.
func raftCmdToPayload(
ctx context.Context,
command *kvserverpb.RaftCommand,
idKey kvserverbase.CmdIDKey,
replID roachpb.ReplicaID,
ba *roachpb.BatchRequest, // TODO(tbg): only for logging, remove
onAddSSTable func(), // TODO(tbg): only for metrics , remove
) ([]byte, error) {
// Determine the encoding style for the Raft command.
prefix := true
version := kvserverbase.RaftVersionStandard
if crt := p.command.ReplicatedEvalResult.ChangeReplicas; crt != nil {
if crt := command.ReplicatedEvalResult.ChangeReplicas; crt != nil {
// EndTxnRequest with a ChangeReplicasTrigger is special because Raft
// needs to understand it; it cannot simply be an opaque command. To
// permit this, the command is proposed by the proposal buffer using
// ProposeConfChange. For that reason, we also don't need a Raft command
// prefix because the command ID is stored in a field in
// raft.ConfChange.
log.Infof(p.ctx, "proposing %s", crt)
log.Infof(ctx, "proposing %s", crt)
prefix = false

// Ensure that we aren't trying to remove ourselves from the range without
Expand All @@ -345,60 +327,58 @@ func (r *Replica) propose(
// since there's no stopping the actual removal/demotion once it's there.
// IsVoterNewConfig checks that the leaseholder is a voter in the
// proposed configuration.
replID := r.ReplicaID()
rDesc, ok := p.command.ReplicatedEvalResult.State.Desc.GetReplicaDescriptorByID(replID)
rDesc, ok := command.ReplicatedEvalResult.State.Desc.GetReplicaDescriptorByID(replID)
for !ok || !rDesc.IsVoterNewConfig() {
if rDesc.ReplicaID == replID {
err := errors.Mark(errors.Newf("received invalid ChangeReplicasTrigger %s to remove self (leaseholder)", crt),
errMarkInvalidReplicationChange)
log.Errorf(p.ctx, "%v", err)
return roachpb.NewError(err)
log.Errorf(ctx, "%v", err)
return nil, err
}
}
} else if p.command.ReplicatedEvalResult.AddSSTable != nil {
log.VEvent(p.ctx, 4, "sideloadable proposal detected")
} else if command.ReplicatedEvalResult.AddSSTable != nil {
log.VEvent(ctx, 4, "sideloadable proposal detected")
version = kvserverbase.RaftVersionSideloaded
r.store.metrics.AddSSTableProposals.Inc(1)
onAddSSTable()

if p.command.ReplicatedEvalResult.AddSSTable.Data == nil {
return roachpb.NewErrorf("cannot sideload empty SSTable")
if command.ReplicatedEvalResult.AddSSTable.Data == nil {
return nil, errors.New("cannot sideload empty SSTable")
}
} else if log.V(4) {
log.Infof(p.ctx, "proposing command %x: %s", p.idKey, p.Request.Summary())
log.Infof(ctx, "proposing command %x: %s", idKey, ba.Summary())
}

// Create encoding buffer.
preLen := 0
if prefix {
preLen = kvserverbase.RaftCommandPrefixLen
}
cmdLen := p.command.Size()
cmdLen := command.Size()
// Allocate the data slice with enough capacity to eventually hold the two
// "footers" that are filled later.
needed := preLen + cmdLen + kvserverpb.MaxRaftCommandFooterSize()
data := make([]byte, preLen, needed)
// Encode prefix with command ID, if necessary.
if prefix {
kvserverbase.EncodeRaftCommandPrefix(data, version, p.idKey)
kvserverbase.EncodeRaftCommandPrefix(data, version, idKey)
}
// Encode body of command.
data = data[:preLen+cmdLen]
if _, err := protoutil.MarshalTo(p.command, data[preLen:]); err != nil {
return roachpb.NewError(err)
if _, err := protoutil.MarshalTo(command, data[preLen:]); err != nil {
return nil, err
}
p.encodedCommand = data

// Too verbose even for verbose logging, so manually enable if you want to
// debug proposal sizes.
if false {
log.Infof(p.ctx, `%s: proposal: %d
log.Infof(ctx, `%s: proposal: %d
RaftCommand.ReplicatedEvalResult: %d
RaftCommand.ReplicatedEvalResult.Delta: %d
RaftCommand.WriteBatch: %d
`, p.Request.Summary(), cmdLen,
p.command.ReplicatedEvalResult.Size(),
p.command.ReplicatedEvalResult.Delta.Size(),
p.command.WriteBatch.Size(),
`, ba.Summary(), cmdLen,
command.ReplicatedEvalResult.Size(),
command.ReplicatedEvalResult.Delta.Size(),
command.WriteBatch.Size(),
)
}

Expand All @@ -408,17 +388,58 @@ func (r *Replica) propose(
// TODO(tschottdorf): can we mark them so lightstep can group them?
const largeProposalEventThresholdBytes = 2 << 19 // 512kb
if cmdLen > largeProposalEventThresholdBytes {
log.Eventf(p.ctx, "proposal is large: %s", humanizeutil.IBytes(int64(cmdLen)))
log.Eventf(ctx, "proposal is large: %s", humanizeutil.IBytes(int64(cmdLen)))
}

return data, nil
}

// propose encodes a command, starts tracking it, and proposes it to Raft.
//
// The method hands ownership of the command over to the Raft machinery. After
// the method returns, all access to the command must be performed while holding
// Replica.mu and Replica.raftMu.
//
// propose takes ownership of the supplied token; the caller should tok.Move()
// it into this method. It will be used to untrack the request once it comes out
// of the proposal buffer.
func (r *Replica) propose(
ctx context.Context, p *ProposalData, tok TrackedRequestToken,
) (pErr *roachpb.Error) {
defer tok.DoneIfNotMoved(ctx)
replID := r.ReplicaID()
onAddSSTable := func() {
r.store.metrics.AddSSTableProposals.Inc(1)
}

// If an error occurs reset the command's MaxLeaseIndex to its initial value.
// Failure to propose will propagate to the client. An invariant of this
// package is that proposals which are finished carry a raft command with a
// MaxLeaseIndex equal to the proposal command's max lease index.
defer func(prev uint64) {
if pErr != nil {
p.command.MaxLeaseIndex = prev
}
}(p.command.MaxLeaseIndex)

// Make sure the maximum lease index is unset. This field will be set in
// propBuf.Insert and its encoded bytes will be appended to the encoding
// buffer as a MaxLeaseFooter.
p.command.MaxLeaseIndex = 0

payload, err := raftCmdToPayload(p.ctx, p.command, p.idKey, replID, p.Request, onAddSSTable)
if err != nil {
return roachpb.NewError(err)
}
p.encodedCommand = payload

// Insert into the proposal buffer, which passes the command to Raft to be
// proposed. The proposal buffer assigns the command a maximum lease index
// when it sequences it.
//
// NB: we must not hold r.mu while using the proposal buffer, see comment
// on the field.
err := r.mu.proposalBuf.Insert(ctx, p, tok.Move(ctx))
if err != nil {
if err := r.mu.proposalBuf.Insert(ctx, p, tok.Move(ctx)); err != nil {
return roachpb.NewError(err)
}
return nil
Expand Down

0 comments on commit 4e7b8a1

Please sign in to comment.