From 875be84bc98fddb5f3fce96c302cf94d6db44e7c Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Mon, 14 Nov 2016 21:46:44 +0800 Subject: [PATCH] storage: Call evaluateProposal only once This method was previously called both before and after raft, and the after-raft logic relied on fields that were set in the ReplicatedProposalData by the before-raft logic. This caused inconsistencies when #10327 was deployed without freeze-cluster. Now, evaluateProposal is called only once, either before or after raft, and the post-raft logic handles the command in whichever fashion is appropriate for the version that proposed it. Fixes #10602 --- pkg/cli/debug.go | 2 +- pkg/roachpb/string_test.go | 2 +- pkg/storage/raft.go | 2 +- pkg/storage/replica.go | 244 +++++--- pkg/storage/replica_proposal.go | 15 +- pkg/storage/replica_test.go | 14 +- pkg/storage/storagebase/proposer_kv.go | 12 +- pkg/storage/storagebase/proposer_kv.pb.go | 710 +++++++++++++--------- pkg/storage/storagebase/proposer_kv.proto | 96 +-- 9 files changed, 647 insertions(+), 450 deletions(-) diff --git a/pkg/cli/debug.go b/pkg/cli/debug.go index 92a24aa1ef2f..c10e5619c8b0 100644 --- a/pkg/cli/debug.go +++ b/pkg/cli/debug.go @@ -424,7 +424,7 @@ func tryRaftLogEntry(kv engine.MVCCKeyValue) (string, error) { if ent.Type == raftpb.EntryNormal { if len(ent.Data) > 0 { _, cmdData := storage.DecodeRaftCommand(ent.Data) - var cmd storagebase.ReplicatedProposalData + var cmd storagebase.RaftCommand if err := cmd.Unmarshal(cmdData); err != nil { return "", err } diff --git a/pkg/roachpb/string_test.go b/pkg/roachpb/string_test.go index 8d0738ee4bd0..e84cd2a9ec8b 100644 --- a/pkg/roachpb/string_test.go +++ b/pkg/roachpb/string_test.go @@ -62,7 +62,7 @@ func TestTransactionString(t *testing.T) { var txnEmpty roachpb.Transaction _ = txnEmpty.String() // prevent regression of NPE - cmd := storagebase.ReplicatedProposalData{ + cmd := storagebase.RaftCommand{ Cmd: &roachpb.BatchRequest{}, } cmd.Cmd.Txn = &txn diff --git a/pkg/storage/raft.go b/pkg/storage/raft.go index 0cd1c87ef814..0127c3475ec6 100644 --- a/pkg/storage/raft.go +++ b/pkg/storage/raft.go @@ -184,7 +184,7 @@ func raftEntryFormatter(data []byte) string { // large snapshot entries. return fmt.Sprintf("[%x] [%d]", commandID, len(data)) } - var cmd storagebase.ReplicatedProposalData + var cmd storagebase.RaftCommand if err := proto.Unmarshal(encodedCmd, &cmd); err != nil { return fmt.Sprintf("[error parsing entry: %s]", err) } diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 1b9d72372e6b..ec6e5f1afb33 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -1808,6 +1808,27 @@ func (r *Replica) tryAddWriteCmd( } } +// requestToProposal converts a BatchRequest into a ProposalData, +// evalutating it or not according to the propEvalKV setting. +func (r *Replica) requestToProposal( + ctx context.Context, + idKey storagebase.CmdIDKey, + replica roachpb.ReplicaDescriptor, + ba roachpb.BatchRequest, +) (*ProposalData, *roachpb.Error) { + if propEvalKV { + return r.evaluateProposal(ctx, idKey, replica, ba) + } + return &ProposalData{ + Cmd: &ba, + LocalProposalData: LocalProposalData{ + ctx: ctx, + idKey: idKey, + done: make(chan proposalResult, 1), + }, + }, nil +} + // evaluateProposal generates ProposalData from the given request by evaluating // it, returning both state which is held only on the proposer and that which // is to be replicated through Raft. The return value is ready to be inserted @@ -1818,17 +1839,8 @@ func (r *Replica) tryAddWriteCmd( // handling LocalProposalData. // // Replica.mu must not be held. -// -// reallyEvaluate is a temporary parameter aiding the transition to -// proposer-evaluated kv. It is true iff the method is called in a pre-Raft -// (i.e. proposer-evaluated) context, in which case a WriteBatch will be -// prepared. In the other mode, the BatchRequest is put on the returned -// ProposalData and is not evaluated. The intention is that in that case, the -// same invocation with reallyEvaluate=true will be carried out downstream of -// Raft, simulating the "old" follower-evaluated behavior. func (r *Replica) evaluateProposal( ctx context.Context, - reallyEvaluate bool, idKey storagebase.CmdIDKey, replica roachpb.ReplicaDescriptor, ba roachpb.BatchRequest, @@ -1838,49 +1850,18 @@ func (r *Replica) evaluateProposal( // evaluated KV). var pd ProposalData - if !reallyEvaluate { - // Not using proposer-evaluated KV. Stick the Batch on - // ReplicatedProposalData and (mostly) call it a day. - pd.Cmd = &ba - - // Populating these fields here avoids making code in - // processRaftCommand more awkward to deal with both cases. - if union, ok := ba.GetArg(roachpb.EndTransaction); ok { - ict := union.(*roachpb.EndTransactionRequest).InternalCommitTrigger - if tr := ict.GetChangeReplicasTrigger(); tr != nil { - pd.ChangeReplicas = &storagebase.ChangeReplicas{ - ChangeReplicasTrigger: *tr, - } - } - if tr := ict.GetSplitTrigger(); tr != nil { - pd.Split = &storagebase.Split{ - SplitTrigger: *tr, - } - } - if tr := ict.GetMergeTrigger(); tr != nil { - pd.Merge = &storagebase.Merge{ - MergeTrigger: *tr, - } - } - } - // Set a bogus WriteBatch so that we know below that this isn't - // a failfast proposal (we didn't evaluate anything, so we can't fail - // fast). - pd.WriteBatch = &storagebase.ReplicatedProposalData_WriteBatch{} - } else { - if ba.Timestamp == hlc.ZeroTimestamp { - return nil, roachpb.NewErrorf("can't propose Raft command with zero timestamp") - } - - pd = r.applyRaftCommandInBatch(ctx, idKey, ba) - // TODO(tschottdorf): tests which use TestingCommandFilter use this. - // Decide how that will work in the future, presumably the - // CommandFilter would run at proposal time or we allow an opaque - // struct to be attached to a proposal which is then available as it - // applies. - pd.Cmd = &ba + if ba.Timestamp == hlc.ZeroTimestamp { + return nil, roachpb.NewErrorf("can't propose Raft command with zero timestamp") } + pd = r.applyRaftCommandInBatch(ctx, idKey, ba) + // TODO(tschottdorf): tests which use TestingCommandFilter use this. + // Decide how that will work in the future, presumably the + // CommandFilter would run at proposal time or we allow an opaque + // struct to be attached to a proposal which is then available as it + // applies. + pd.Cmd = &ba + if pd.Err != nil { // Failed proposals (whether they're failfast or not) can't have any // ProposalData except what's whitelisted here. @@ -1894,8 +1875,6 @@ func (r *Replica) evaluateProposal( } } - pd.RangeID = r.RangeID - pd.OriginReplica = replica pd.ctx = ctx pd.idKey = idKey pd.done = make(chan proposalResult, 1) @@ -1916,7 +1895,7 @@ func (r *Replica) evaluateProposal( return &pd, nil } -func (r *Replica) insertProposalLocked(pd *ProposalData) { +func (r *Replica) insertProposalLocked(pd *ProposalData, originReplica roachpb.ReplicaDescriptor) { // Assign a lease index. Note that we do this as late as possible // to make sure (to the extent that we can) that we don't assign // (=predict) the index differently from the order in which commands are @@ -1924,10 +1903,11 @@ func (r *Replica) insertProposalLocked(pd *ProposalData) { if r.mu.lastAssignedLeaseIndex < r.mu.state.LeaseAppliedIndex { r.mu.lastAssignedLeaseIndex = r.mu.state.LeaseAppliedIndex } - if !pd.IsLeaseRequest { + if !pd.Cmd.IsLeaseRequest() { r.mu.lastAssignedLeaseIndex++ } pd.MaxLeaseIndex = r.mu.lastAssignedLeaseIndex + pd.OriginReplica = originReplica if log.V(4) { log.Infof(pd.ctx, "submitting proposal %x: maxLeaseIndex=%d", pd.idKey, pd.MaxLeaseIndex) @@ -1988,12 +1968,13 @@ func (r *Replica) propose( r.raftMu.Lock() defer r.raftMu.Unlock() - pCmd, pErr := r.evaluateProposal(ctx, propEvalKV, makeIDKey(), repDesc, ba) + idKey := makeIDKey() + pCmd, pErr := r.requestToProposal(ctx, idKey, repDesc, ba) // An error here corresponds to a failfast-proposal: The command resulted // in an error and did not need to commit a batch (the common error case). if pErr != nil { r.handleProposalData( - ctx, pCmd.LocalProposalData, pCmd.ReplicatedProposalData, + ctx, repDesc, pCmd.LocalProposalData, pCmd.ReplicatedProposalData, ) ch := make(chan proposalResult, 1) ch <- proposalResult{Err: pErr} @@ -2003,7 +1984,7 @@ func (r *Replica) propose( r.mu.Lock() defer r.mu.Unlock() - r.insertProposalLocked(pCmd) + r.insertProposalLocked(pCmd, repDesc) if err := r.submitProposalLocked(pCmd); err != nil { delete(r.mu.proposals, pCmd.idKey) @@ -2011,7 +1992,6 @@ func (r *Replica) propose( } // Must not use `pCmd` in the closure below as a proposal which is not // present in r.mu.proposals is no longer protected by the mutex. - idKey := pCmd.idKey tryAbandon := func() bool { r.mu.Lock() _, ok := r.mu.proposals[idKey] @@ -2041,13 +2021,37 @@ func (r *Replica) isSoloReplicaLocked() bool { func defaultSubmitProposalLocked(r *Replica, p *ProposalData) error { ctx := r.AnnotateCtx(context.TODO()) - data, err := protoutil.Marshal(&p.ReplicatedProposalData) + raftCmd := storagebase.RaftCommand{ + Cmd: p.Cmd, + OriginReplica: p.OriginReplica, + MaxLeaseIndex: p.MaxLeaseIndex, + } + if p.ReplicatedProposalData != (storagebase.ReplicatedProposalData{}) { + raftCmd.ReplicatedProposalData = &p.ReplicatedProposalData + raftCmd.WriteBatch = p.WriteBatch + } + + data, err := protoutil.Marshal(&raftCmd) if err != nil { return err } defer r.store.enqueueRaftUpdateCheck(r.RangeID) - if crt := p.ChangeReplicas; crt != nil { + var changeReplicas *storagebase.ChangeReplicas + if p.ReplicatedProposalData != (storagebase.ReplicatedProposalData{}) { + changeReplicas = p.ChangeReplicas + } else { + if union, ok := p.Cmd.GetArg(roachpb.EndTransaction); ok { + ict := union.(*roachpb.EndTransactionRequest).InternalCommitTrigger + if tr := ict.GetChangeReplicasTrigger(); tr != nil { + changeReplicas = &storagebase.ChangeReplicas{ + ChangeReplicasTrigger: *tr, + } + } + } + } + + if crt := changeReplicas; crt != nil { // EndTransactionRequest with a ChangeReplicasTrigger is special // because raft needs to understand it; it cannot simply be an // opaque command. @@ -2310,8 +2314,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(inSnap IncomingSnapshot) error { case raftpb.EntryNormal: var commandID storagebase.CmdIDKey - // TODO(tschottdorf): rename to `rpd`. - var command storagebase.ReplicatedProposalData + var command storagebase.RaftCommand // Process committed entries. etcd raft occasionally adds a nil entry // (our own commands are never empty). This happens in two situations: @@ -2349,8 +2352,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(inSnap IncomingSnapshot) error { if err := ccCtx.Unmarshal(cc.Context); err != nil { return err } - // TODO(tschottdorf): rename to `rpd`. - var command storagebase.ReplicatedProposalData + var command storagebase.RaftCommand if err := command.Unmarshal(ccCtx.Payload); err != nil { return err } @@ -2930,13 +2932,8 @@ func (r *Replica) reportSnapshotStatus(to uint64, snapErr error) { // TODO(tschottdorf): once we properly check leases and lease requests etc, // make sure that the error returned from this method is always populated in // those cases, as one of the callers uses it to abort replica changes. -// -// TODO(tschottdorf): rename raftCmd to `rpd` func (r *Replica) processRaftCommand( - ctx context.Context, - idKey storagebase.CmdIDKey, - index uint64, - raftCmd storagebase.ReplicatedProposalData, + ctx context.Context, idKey storagebase.CmdIDKey, index uint64, raftCmd storagebase.RaftCommand, ) (pErr *roachpb.Error) { if index == 0 { log.Fatalf(ctx, "processRaftCommand requires a non-zero index") @@ -2946,16 +2943,32 @@ func (r *Replica) processRaftCommand( log.Infof(ctx, "processing command %x: maxLeaseIndex=%d", idKey, raftCmd.MaxLeaseIndex) } + // TODO(bdarnell): the isConsistencyRelated field is insufficiently tested; + // no tests fail if it is always set to false. + var isLeaseRequest, isFreeze, isConsistencyRelated bool + var ts hlc.Timestamp + if raftCmd.ReplicatedProposalData != nil { + isLeaseRequest = raftCmd.ReplicatedProposalData.IsLeaseRequest + isFreeze = raftCmd.ReplicatedProposalData.IsFreeze + isConsistencyRelated = raftCmd.ReplicatedProposalData.IsConsistencyRelated + ts = raftCmd.ReplicatedProposalData.Timestamp + } else if idKey != "" { + isLeaseRequest = raftCmd.Cmd.IsLeaseRequest() + isFreeze = raftCmd.Cmd.IsFreeze() + ts = raftCmd.Cmd.Timestamp + isConsistencyRelated = raftCmd.Cmd.IsConsistencyRelated() + } + r.mu.Lock() cmd, cmdProposedLocally := r.mu.proposals[idKey] isLeaseError := func() bool { l, origin := r.mu.state.Lease, raftCmd.OriginReplica - if l.Replica != origin && !raftCmd.IsLeaseRequest { + if l.Replica != origin && !isLeaseRequest { return true } - notCovered := !l.OwnedBy(origin.StoreID) || !l.Covers(raftCmd.Timestamp) - if notCovered && !raftCmd.IsFreeze && !raftCmd.IsLeaseRequest { + notCovered := !l.OwnedBy(origin.StoreID) || !l.Covers(ts) + if notCovered && !isFreeze && !isLeaseRequest { // Verify the range lease is held, unless this command is trying // to obtain it or is a freeze change (which can be proposed by any // Replica). Any other Raft command has had the range lease held @@ -2997,7 +3010,7 @@ func (r *Replica) processRaftCommand( ) forcedErr = roachpb.NewError(newNotLeaseHolderError( r.mu.state.Lease, raftCmd.OriginReplica.StoreID, r.mu.state.Desc)) - } else if raftCmd.IsLeaseRequest { + } else if isLeaseRequest { // Lease commands are ignored by the counter (and their MaxLeaseIndex // is ignored). This makes sense since lease commands are proposed by // anyone, so we can't expect a coherent MaxLeaseIndex. Also, lease @@ -3050,7 +3063,7 @@ func (r *Replica) processRaftCommand( // TODO(tschottdorf): move up to processRaftCommand and factor it out from // there so that proposer-evaluated KV can run this check too before even // proposing. - if mayApply := !r.mu.state.IsFrozen() || cmd.IsFreeze || cmd.IsConsistencyRelated; !mayApply { + if mayApply := !r.mu.state.IsFrozen() || isFreeze || isConsistencyRelated; !mayApply { forcedErr = roachpb.NewError(roachpb.NewRangeFrozenError(*r.mu.state.Desc)) } r.mu.Unlock() @@ -3063,7 +3076,7 @@ func (r *Replica) processRaftCommand( } else { log.Event(ctx, "applying command") - if splitMergeUnlock := r.maybeAcquireSplitMergeLock(&raftCmd); splitMergeUnlock != nil { + if splitMergeUnlock := r.maybeAcquireSplitMergeLock(raftCmd); splitMergeUnlock != nil { // Close over pErr to capture its value at execution time. defer func() { splitMergeUnlock(pErr) @@ -3072,14 +3085,13 @@ func (r *Replica) processRaftCommand( } var response proposalResult + var writeBatch *storagebase.WriteBatch { - if !propEvalKV && forcedErr == nil { + if raftCmd.ReplicatedProposalData == nil && forcedErr == nil { // If not proposer-evaluating, then our raftCmd consists only of - // the BatchRequest and some metadata. Call the evaluation step - // (again), but this time passing reallyEvaluate=true. + // the BatchRequest and some metadata. innerPD, pErr := r.evaluateProposal( ctx, - true, // reallyEvaluate idKey, raftCmd.OriginReplica, *raftCmd.Cmd, @@ -3092,7 +3104,8 @@ func (r *Replica) processRaftCommand( // Note that this (intentionally) overwrites the LocalProposalData, // so we must salvage the done channel if we have a client waiting // on it. - raftCmd = innerPD.ReplicatedProposalData + raftCmd.ReplicatedProposalData = &innerPD.ReplicatedProposalData + writeBatch = innerPD.WriteBatch if cmdProposedLocally { done := cmd.LocalProposalData.done cmd.LocalProposalData = innerPD.LocalProposalData @@ -3107,24 +3120,32 @@ func (r *Replica) processRaftCommand( if forcedErr != nil { // Apply an empty entry. - raftCmd.Strip() + if raftCmd.ReplicatedProposalData != nil { + raftCmd.ReplicatedProposalData.Strip() + } else { + raftCmd.ReplicatedProposalData = &storagebase.ReplicatedProposalData{} + } + raftCmd.WriteBatch = nil } - raftCmd.State.RaftAppliedIndex = index - raftCmd.State.LeaseAppliedIndex = leaseIndex + raftCmd.ReplicatedProposalData.State.RaftAppliedIndex = index + raftCmd.ReplicatedProposalData.State.LeaseAppliedIndex = leaseIndex // Update the node clock with the serviced request. This maintains // a high water mark for all ops serviced, so that received ops without // a timestamp specified are guaranteed one higher than any op already // executed for overlapping keys. - r.store.Clock().Update(raftCmd.Timestamp) + r.store.Clock().Update(ts) var pErr *roachpb.Error - raftCmd.Delta, pErr = r.applyRaftCommand(ctx, idKey, raftCmd) + if raftCmd.WriteBatch != nil { + writeBatch = raftCmd.WriteBatch + } + raftCmd.ReplicatedProposalData.Delta, pErr = r.applyRaftCommand(ctx, idKey, *raftCmd.ReplicatedProposalData, writeBatch) if filter := r.store.cfg.TestingKnobs.TestingApplyFilter; pErr == nil && filter != nil { pErr = filter(storagebase.ApplyFilterArgs{ CmdID: idKey, - ReplicatedProposalData: raftCmd, + ReplicatedProposalData: *raftCmd.ReplicatedProposalData, StoreID: r.store.StoreID(), RangeID: r.RangeID, }) @@ -3159,7 +3180,7 @@ func (r *Replica) processRaftCommand( // // Note that this must happen after committing (the engine.Batch), but // before notifying a potentially waiting client. - r.handleProposalData(ctx, lpd, raftCmd) + r.handleProposalData(ctx, raftCmd.OriginReplica, lpd, *raftCmd.ReplicatedProposalData) } if cmdProposedLocally { @@ -3173,12 +3194,34 @@ func (r *Replica) processRaftCommand( } func (r *Replica) maybeAcquireSplitMergeLock( - rpd *storagebase.ReplicatedProposalData, + raftCmd storagebase.RaftCommand, ) func(pErr *roachpb.Error) { - if rpd.Split != nil { - return r.acquireSplitLock(&rpd.Split.SplitTrigger) - } else if rpd.Merge != nil { - return r.acquireMergeLock(&rpd.Merge.MergeTrigger) + var split *storagebase.Split + var merge *storagebase.Merge + if raftCmd.ReplicatedProposalData != nil { + split = raftCmd.ReplicatedProposalData.Split + merge = raftCmd.ReplicatedProposalData.Merge + } else { + if union, ok := raftCmd.Cmd.GetArg(roachpb.EndTransaction); ok { + ict := union.(*roachpb.EndTransactionRequest).InternalCommitTrigger + if tr := ict.GetSplitTrigger(); tr != nil { + split = &storagebase.Split{ + SplitTrigger: *tr, + } + } + if tr := ict.GetMergeTrigger(); tr != nil { + merge = &storagebase.Merge{ + MergeTrigger: *tr, + } + } + + } + } + + if split != nil { + return r.acquireSplitLock(&split.SplitTrigger) + } else if merge != nil { + return r.acquireMergeLock(&merge.MergeTrigger) } return nil } @@ -3246,7 +3289,10 @@ func (r *Replica) acquireMergeLock(merge *roachpb.MergeTrigger) func(pErr *roach // be updated, an error (which is likely a ReplicaCorruptionError) is returned // and must be handled by the caller. func (r *Replica) applyRaftCommand( - ctx context.Context, idKey storagebase.CmdIDKey, rpd storagebase.ReplicatedProposalData, + ctx context.Context, + idKey storagebase.CmdIDKey, + rpd storagebase.ReplicatedProposalData, + writeBatch *storagebase.WriteBatch, ) (enginepb.MVCCStats, *roachpb.Error) { if rpd.State.RaftAppliedIndex <= 0 { log.Fatalf(ctx, "raft command index is <= 0") @@ -3267,8 +3313,8 @@ func (r *Replica) applyRaftCommand( batch := r.store.Engine().NewBatch() defer batch.Close() - if rpd.WriteBatch != nil { - if err := batch.ApplyBatchRepr(rpd.WriteBatch.Data); err != nil { + if writeBatch != nil { + if err := batch.ApplyBatchRepr(writeBatch.Data); err != nil { return enginepb.MVCCStats{}, roachpb.NewError(NewReplicaCorruptionError( errors.Wrap(err, "unable to apply WriteBatch"))) } @@ -3381,6 +3427,10 @@ func (r *Replica) applyRaftCommandInBatch( // TODO(tschottdorf): we're mutating the client's original // memory erroneously when proposer-evaluated KV is on, failing // TestTxnDBLostDeleteAnomaly (and likely others). + // + // TODO(bdarnell): we shouldn't be looking at the propEvalKV + // variable downstream of raft, we should look at the request + // to see whether this request was proposer-evaluated or not. if propEvalKV { ba.Txn.Writing = wasWriting } @@ -3394,7 +3444,7 @@ func (r *Replica) applyRaftCommandInBatch( pd.Reply = nil } - pd.WriteBatch = &storagebase.ReplicatedProposalData_WriteBatch{ + pd.WriteBatch = &storagebase.WriteBatch{ Data: pd.Batch.Repr(), } // TODO(tschottdorf): could keep this open and commit as the proposal diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go index e9673babad59..edbab45a35d1 100644 --- a/pkg/storage/replica_proposal.go +++ b/pkg/storage/replica_proposal.go @@ -104,7 +104,11 @@ type LocalProposalData struct { // it must run when the command has applied (such as resolving intents). type ProposalData struct { LocalProposalData + MaxLeaseIndex uint64 + OriginReplica roachpb.ReplicaDescriptor + Cmd *roachpb.BatchRequest storagebase.ReplicatedProposalData + WriteBatch *storagebase.WriteBatch } // coalesceBool ORs rhs into lhs and then zeroes rhs. @@ -398,13 +402,9 @@ func (r *Replica) handleReplicatedProposalData( // they don't trigger an assertion at the end of the method (which checks // that all fields were handled). { - rpd.WriteBatch = nil rpd.IsLeaseRequest = false rpd.IsConsistencyRelated = false rpd.IsFreeze = false - rpd.RangeID = 0 - rpd.Cmd = nil - rpd.MaxLeaseIndex = 0 rpd.Timestamp = hlc.ZeroTimestamp } @@ -440,7 +440,6 @@ func (r *Replica) handleReplicatedProposalData( rpd.State.Stats = enginepb.MVCCStats{} rpd.State.LeaseAppliedIndex = 0 rpd.State.RaftAppliedIndex = 0 - rpd.OriginReplica = roachpb.ReplicaDescriptor{} // The above are always present, so we assert only if there are // "nontrivial" actions below. @@ -670,9 +669,11 @@ func (r *Replica) handleLocalProposalData( } func (r *Replica) handleProposalData( - ctx context.Context, lpd LocalProposalData, rpd storagebase.ReplicatedProposalData, + ctx context.Context, + originReplica roachpb.ReplicaDescriptor, + lpd LocalProposalData, + rpd storagebase.ReplicatedProposalData, ) { - originReplica := rpd.OriginReplica // Careful: `shouldAssert = f() || g()` will not run both if `f()` is true. shouldAssert := r.handleReplicatedProposalData(ctx, rpd) shouldAssert = r.handleLocalProposalData(ctx, originReplica, lpd) || shouldAssert diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 47e2946e22e0..d7abc651d953 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -6122,14 +6122,14 @@ func TestReplicaCancelRaftCommandProgress(t *testing.T) { ba.Timestamp = tc.Clock().Now() ba.Add(&roachpb.PutRequest{Span: roachpb.Span{ Key: roachpb.Key(fmt.Sprintf("k%d", i))}}) - cmd, pErr := repl.evaluateProposal( - context.Background(), propEvalKV, makeIDKey(), repDesc, ba, + cmd, pErr := repl.requestToProposal( + context.Background(), makeIDKey(), repDesc, ba, ) if pErr != nil { t.Fatal(pErr) } repl.mu.Lock() - repl.insertProposalLocked(cmd) + repl.insertProposalLocked(cmd, repDesc) // We actually propose the command only if we don't // cancel it to simulate the case in which Raft loses // the command and it isn't reproposed due to the @@ -6200,13 +6200,13 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) { ba.Timestamp = tc.Clock().Now() ba.Add(&roachpb.PutRequest{Span: roachpb.Span{ Key: roachpb.Key(fmt.Sprintf("k%d", i))}}) - cmd, pErr := tc.repl.evaluateProposal(ctx, propEvalKV, makeIDKey(), repDesc, ba) + cmd, pErr := tc.repl.requestToProposal(ctx, makeIDKey(), repDesc, ba) if pErr != nil { t.Fatal(pErr) } tc.repl.mu.Lock() - tc.repl.insertProposalLocked(cmd) + tc.repl.insertProposalLocked(cmd, repDesc) chs = append(chs, cmd.done) tc.repl.mu.Unlock() } @@ -6311,7 +6311,7 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { var ba roachpb.BatchRequest ba.Timestamp = tc.Clock().Now() ba.Add(&roachpb.PutRequest{Span: roachpb.Span{Key: roachpb.Key(id)}}) - cmd, pErr := r.evaluateProposal(context.Background(), propEvalKV, + cmd, pErr := r.requestToProposal(context.Background(), storagebase.CmdIDKey(id), repDesc, ba) if pErr != nil { t.Fatal(pErr) @@ -6322,7 +6322,7 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { dropProposals.Unlock() r.mu.Lock() - r.insertProposalLocked(cmd) + r.insertProposalLocked(cmd, repDesc) if err := r.submitProposalLocked(cmd); err != nil { t.Error(err) } diff --git a/pkg/storage/storagebase/proposer_kv.go b/pkg/storage/storagebase/proposer_kv.go index 39593b2dec2d..497429afff0d 100644 --- a/pkg/storage/storagebase/proposer_kv.go +++ b/pkg/storage/storagebase/proposer_kv.go @@ -14,11 +14,11 @@ package storagebase -// Strip removes all state changes from the ReplicatedProposalData, leaving -// only metadata behind. +// Strip removes all state changes from the ReplicatedProposalData, +// leaving only metadata behind. +// +// TODO(bdarnell): this method is insufficiently tested; there are +// no tests that fail if it becomes a no-op. func (rpd *ReplicatedProposalData) Strip() { - *rpd = ReplicatedProposalData{ - OriginReplica: rpd.OriginReplica, - RangeID: rpd.RangeID, - } + *rpd = ReplicatedProposalData{} } diff --git a/pkg/storage/storagebase/proposer_kv.pb.go b/pkg/storage/storagebase/proposer_kv.pb.go index a4e61bc3bd6a..8a86efec52a5 100644 --- a/pkg/storage/storagebase/proposer_kv.pb.go +++ b/pkg/storage/storagebase/proposer_kv.pb.go @@ -14,6 +14,8 @@ Merge ChangeReplicas ReplicatedProposalData + WriteBatch + RaftCommand ReplicaState RangeInfo */ @@ -28,8 +30,6 @@ import cockroach_roachpb "github.com/cockroachdb/cockroach/pkg/roachpb" import cockroach_storage_engine_enginepb "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" import cockroach_util_hlc "github.com/cockroachdb/cockroach/pkg/util/hlc" -import github_com_cockroachdb_cockroach_pkg_roachpb "github.com/cockroachdb/cockroach/pkg/roachpb" - import io "io" // Reference imports to suppress errors if they are not otherwise used. @@ -83,7 +83,7 @@ func (m *ChangeReplicas) String() string { return proto.CompactTextSt func (*ChangeReplicas) ProtoMessage() {} func (*ChangeReplicas) Descriptor() ([]byte, []int) { return fileDescriptorProposerKv, []int{2} } -// ReplicaProposalData is the structured information which together with +// ReplicatedProposalData is the structured information which together with // a RocksDB WriteBatch constitutes the proposal payload in proposer-evaluated // KV. For the majority of proposals, we expect ReplicatedProposalData to be // trivial; only changes to the metadata state (splits, merges, rebalances, @@ -94,9 +94,56 @@ func (*ChangeReplicas) Descriptor() ([]byte, []int) { return fileDescriptorPropo // followers to reliably produce errors for proposals which apply after a // lease change. type ReplicatedProposalData struct { - RangeID github_com_cockroachdb_cockroach_pkg_roachpb.RangeID `protobuf:"varint,1,opt,name=range_id,json=rangeId,casttype=github.com/cockroachdb/cockroach/pkg/roachpb.RangeID" json:"range_id"` - OriginReplica cockroach_roachpb.ReplicaDescriptor `protobuf:"bytes,2,opt,name=origin_replica,json=originReplica" json:"origin_replica"` - Cmd *cockroach_roachpb3.BatchRequest `protobuf:"bytes,3,opt,name=cmd" json:"cmd,omitempty"` + // Whether to block concurrent readers while processing the proposal data. + BlockReads bool `protobuf:"varint,10001,opt,name=block_reads,json=blockReads" json:"block_reads"` + // Updates to the Replica's ReplicaState. By convention and as outlined on + // the comment on the ReplicaState message, this field is sparsely populated + // and any field set overwrites the corresponding field in the state, perhaps + // which additional side effects (for instance on a descriptor update). + State ReplicaState `protobuf:"bytes,10002,opt,name=state" json:"state"` + Split *Split `protobuf:"bytes,10003,opt,name=split" json:"split,omitempty"` + Merge *Merge `protobuf:"bytes,10004,opt,name=merge" json:"merge,omitempty"` + // TODO(tschottdorf): trim this down; we shouldn't need the whole request. + ComputeChecksum *cockroach_roachpb3.ComputeChecksumRequest `protobuf:"bytes,10005,opt,name=compute_checksum,json=computeChecksum" json:"compute_checksum,omitempty"` + IsLeaseRequest bool `protobuf:"varint,10006,opt,name=is_lease_request,json=isLeaseRequest" json:"is_lease_request"` + IsFreeze bool `protobuf:"varint,10007,opt,name=is_freeze,json=isFreeze" json:"is_freeze"` + // Denormalizes BatchRequest.Timestamp during the transition period for + // proposer-evaluated KV. Only used to verify lease coverage. + Timestamp cockroach_util_hlc.Timestamp `protobuf:"bytes,10008,opt,name=timestamp" json:"timestamp"` + IsConsistencyRelated bool `protobuf:"varint,10009,opt,name=is_consistency_related,json=isConsistencyRelated" json:"is_consistency_related"` + // The stats delta corresponding to the data in this WriteBatch. On + // a split, contains only the contributions to the left-hand side. + Delta cockroach_storage_engine_enginepb.MVCCStats `protobuf:"bytes,10010,opt,name=delta" json:"delta"` + ChangeReplicas *ChangeReplicas `protobuf:"bytes,10012,opt,name=change_replicas,json=changeReplicas" json:"change_replicas,omitempty"` +} + +func (m *ReplicatedProposalData) Reset() { *m = ReplicatedProposalData{} } +func (m *ReplicatedProposalData) String() string { return proto.CompactTextString(m) } +func (*ReplicatedProposalData) ProtoMessage() {} +func (*ReplicatedProposalData) Descriptor() ([]byte, []int) { return fileDescriptorProposerKv, []int{3} } + +// WriteBatch is the serialized representation of a RocksDB write +// batch. A wrapper message is used so that the absence of the field +// can be distinguished from a zero-length batch, and so structs +// containing pointers to it can be compared with the == operator (we +// rely on this in storage.ProposalData) +type WriteBatch struct { + Data []byte `protobuf:"bytes,1,opt,name=data" json:"data,omitempty"` +} + +func (m *WriteBatch) Reset() { *m = WriteBatch{} } +func (m *WriteBatch) String() string { return proto.CompactTextString(m) } +func (*WriteBatch) ProtoMessage() {} +func (*WriteBatch) Descriptor() ([]byte, []int) { return fileDescriptorProposerKv, []int{4} } + +// RaftCommand is the message written to the raft log. It contains +// some metadata about the proposal itself, then either a BatchRequest +// (legacy mode) or a ReplicatedProposalData + WriteBatch +// (proposer-evaluated KV mode). +type RaftCommand struct { + // origin_replica is the replica which proposed this command, to be + // used for lease validation. + OriginReplica cockroach_roachpb.ReplicaDescriptor `protobuf:"bytes,2,opt,name=origin_replica,json=originReplica" json:"origin_replica"` // When the command is applied, its result is an error if the lease log // counter has already reached (or exceeded) max_lease_index. // @@ -124,55 +171,29 @@ type ReplicatedProposalData struct { // updated accordingly. Managing retry of proposals becomes trickier as // well as that uproots whatever ordering was originally envisioned. MaxLeaseIndex uint64 `protobuf:"varint,4,opt,name=max_lease_index,json=maxLeaseIndex" json:"max_lease_index"` - // Whether to block concurrent readers while processing the proposal data. - BlockReads bool `protobuf:"varint,10001,opt,name=block_reads,json=blockReads" json:"block_reads"` - // Updates to the Replica's ReplicaState. By convention and as outlined on - // the comment on the ReplicaState message, this field is sparsely populated - // and any field set overwrites the corresponding field in the state, perhaps - // which additional side effects (for instance on a descriptor update). - State ReplicaState `protobuf:"bytes,10002,opt,name=state" json:"state"` - Split *Split `protobuf:"bytes,10003,opt,name=split" json:"split,omitempty"` - Merge *Merge `protobuf:"bytes,10004,opt,name=merge" json:"merge,omitempty"` - // TODO(tschottdorf): trim this down; we shouldn't need the whole request. - ComputeChecksum *cockroach_roachpb3.ComputeChecksumRequest `protobuf:"bytes,10005,opt,name=compute_checksum,json=computeChecksum" json:"compute_checksum,omitempty"` - IsLeaseRequest bool `protobuf:"varint,10006,opt,name=is_lease_request,json=isLeaseRequest" json:"is_lease_request"` - IsFreeze bool `protobuf:"varint,10007,opt,name=is_freeze,json=isFreeze" json:"is_freeze"` - // Denormalizes BatchRequest.Timestamp during the transition period for - // proposer-evaluated KV. Only used to verify lease coverage. - Timestamp cockroach_util_hlc.Timestamp `protobuf:"bytes,10008,opt,name=timestamp" json:"timestamp"` - IsConsistencyRelated bool `protobuf:"varint,10009,opt,name=is_consistency_related,json=isConsistencyRelated" json:"is_consistency_related"` - // The stats delta corresponding to the data in this WriteBatch. On - // a split, contains only the contributions to the left-hand side. - Delta cockroach_storage_engine_enginepb.MVCCStats `protobuf:"bytes,10010,opt,name=delta" json:"delta"` + // cmd is the KV command to apply. + // TODO(bdarnell): Should not be set when propEvalKV is used, but is currently + // required to support test filters. + Cmd *cockroach_roachpb3.BatchRequest `protobuf:"bytes,3,opt,name=cmd" json:"cmd,omitempty"` + ReplicatedProposalData *ReplicatedProposalData `protobuf:"bytes,10013,opt,name=replicated_proposal_data,json=replicatedProposalData" json:"replicated_proposal_data,omitempty"` // TODO(tschottdorf): using an extra message here (and not just `bytes`) to - // allow the generated ReplicatedProposalData to be compared directly. If + // allow the generated RaftCommand to be compared directly. If // this costs an extra large allocation, we need to do something different. - WriteBatch *ReplicatedProposalData_WriteBatch `protobuf:"bytes,10011,opt,name=write_batch,json=writeBatch" json:"write_batch,omitempty"` - ChangeReplicas *ChangeReplicas `protobuf:"bytes,10012,opt,name=change_replicas,json=changeReplicas" json:"change_replicas,omitempty"` + WriteBatch *WriteBatch `protobuf:"bytes,10011,opt,name=write_batch,json=writeBatch" json:"write_batch,omitempty"` } -func (m *ReplicatedProposalData) Reset() { *m = ReplicatedProposalData{} } -func (m *ReplicatedProposalData) String() string { return proto.CompactTextString(m) } -func (*ReplicatedProposalData) ProtoMessage() {} -func (*ReplicatedProposalData) Descriptor() ([]byte, []int) { return fileDescriptorProposerKv, []int{3} } - -type ReplicatedProposalData_WriteBatch struct { - Data []byte `protobuf:"bytes,1,opt,name=data" json:"data,omitempty"` -} - -func (m *ReplicatedProposalData_WriteBatch) Reset() { *m = ReplicatedProposalData_WriteBatch{} } -func (m *ReplicatedProposalData_WriteBatch) String() string { return proto.CompactTextString(m) } -func (*ReplicatedProposalData_WriteBatch) ProtoMessage() {} -func (*ReplicatedProposalData_WriteBatch) Descriptor() ([]byte, []int) { - return fileDescriptorProposerKv, []int{3, 0} -} +func (m *RaftCommand) Reset() { *m = RaftCommand{} } +func (m *RaftCommand) String() string { return proto.CompactTextString(m) } +func (*RaftCommand) ProtoMessage() {} +func (*RaftCommand) Descriptor() ([]byte, []int) { return fileDescriptorProposerKv, []int{5} } func init() { proto.RegisterType((*Split)(nil), "cockroach.storage.storagebase.Split") proto.RegisterType((*Merge)(nil), "cockroach.storage.storagebase.Merge") proto.RegisterType((*ChangeReplicas)(nil), "cockroach.storage.storagebase.ChangeReplicas") proto.RegisterType((*ReplicatedProposalData)(nil), "cockroach.storage.storagebase.ReplicatedProposalData") - proto.RegisterType((*ReplicatedProposalData_WriteBatch)(nil), "cockroach.storage.storagebase.ReplicatedProposalData.WriteBatch") + proto.RegisterType((*WriteBatch)(nil), "cockroach.storage.storagebase.WriteBatch") + proto.RegisterType((*RaftCommand)(nil), "cockroach.storage.storagebase.RaftCommand") } func (m *Split) Marshal() (dAtA []byte, err error) { size := m.Size() @@ -275,30 +296,6 @@ func (m *ReplicatedProposalData) MarshalTo(dAtA []byte) (int, error) { _ = i var l int _ = l - dAtA[i] = 0x8 - i++ - i = encodeVarintProposerKv(dAtA, i, uint64(m.RangeID)) - dAtA[i] = 0x12 - i++ - i = encodeVarintProposerKv(dAtA, i, uint64(m.OriginReplica.Size())) - n5, err := m.OriginReplica.MarshalTo(dAtA[i:]) - if err != nil { - return 0, err - } - i += n5 - if m.Cmd != nil { - dAtA[i] = 0x1a - i++ - i = encodeVarintProposerKv(dAtA, i, uint64(m.Cmd.Size())) - n6, err := m.Cmd.MarshalTo(dAtA[i:]) - if err != nil { - return 0, err - } - i += n6 - } - dAtA[i] = 0x20 - i++ - i = encodeVarintProposerKv(dAtA, i, uint64(m.MaxLeaseIndex)) dAtA[i] = 0x88 i++ dAtA[i] = 0xf1 @@ -318,11 +315,11 @@ func (m *ReplicatedProposalData) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x4 i++ i = encodeVarintProposerKv(dAtA, i, uint64(m.State.Size())) - n7, err := m.State.MarshalTo(dAtA[i:]) + n5, err := m.State.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n7 + i += n5 if m.Split != nil { dAtA[i] = 0x9a i++ @@ -331,11 +328,11 @@ func (m *ReplicatedProposalData) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x4 i++ i = encodeVarintProposerKv(dAtA, i, uint64(m.Split.Size())) - n8, err := m.Split.MarshalTo(dAtA[i:]) + n6, err := m.Split.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n8 + i += n6 } if m.Merge != nil { dAtA[i] = 0xa2 @@ -345,11 +342,11 @@ func (m *ReplicatedProposalData) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x4 i++ i = encodeVarintProposerKv(dAtA, i, uint64(m.Merge.Size())) - n9, err := m.Merge.MarshalTo(dAtA[i:]) + n7, err := m.Merge.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n9 + i += n7 } if m.ComputeChecksum != nil { dAtA[i] = 0xaa @@ -359,11 +356,11 @@ func (m *ReplicatedProposalData) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x4 i++ i = encodeVarintProposerKv(dAtA, i, uint64(m.ComputeChecksum.Size())) - n10, err := m.ComputeChecksum.MarshalTo(dAtA[i:]) + n8, err := m.ComputeChecksum.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n10 + i += n8 } dAtA[i] = 0xb0 i++ @@ -396,11 +393,11 @@ func (m *ReplicatedProposalData) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x4 i++ i = encodeVarintProposerKv(dAtA, i, uint64(m.Timestamp.Size())) - n11, err := m.Timestamp.MarshalTo(dAtA[i:]) + n9, err := m.Timestamp.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n11 + i += n9 dAtA[i] = 0xc8 i++ dAtA[i] = 0xf1 @@ -420,25 +417,11 @@ func (m *ReplicatedProposalData) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x4 i++ i = encodeVarintProposerKv(dAtA, i, uint64(m.Delta.Size())) - n12, err := m.Delta.MarshalTo(dAtA[i:]) + n10, err := m.Delta.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n12 - if m.WriteBatch != nil { - dAtA[i] = 0xda - i++ - dAtA[i] = 0xf1 - i++ - dAtA[i] = 0x4 - i++ - i = encodeVarintProposerKv(dAtA, i, uint64(m.WriteBatch.Size())) - n13, err := m.WriteBatch.MarshalTo(dAtA[i:]) - if err != nil { - return 0, err - } - i += n13 - } + i += n10 if m.ChangeReplicas != nil { dAtA[i] = 0xe2 i++ @@ -447,16 +430,16 @@ func (m *ReplicatedProposalData) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x4 i++ i = encodeVarintProposerKv(dAtA, i, uint64(m.ChangeReplicas.Size())) - n14, err := m.ChangeReplicas.MarshalTo(dAtA[i:]) + n11, err := m.ChangeReplicas.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n14 + i += n11 } return i, nil } -func (m *ReplicatedProposalData_WriteBatch) Marshal() (dAtA []byte, err error) { +func (m *WriteBatch) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) n, err := m.MarshalTo(dAtA) @@ -466,7 +449,7 @@ func (m *ReplicatedProposalData_WriteBatch) Marshal() (dAtA []byte, err error) { return dAtA[:n], nil } -func (m *ReplicatedProposalData_WriteBatch) MarshalTo(dAtA []byte) (int, error) { +func (m *WriteBatch) MarshalTo(dAtA []byte) (int, error) { var i int _ = i var l int @@ -480,6 +463,73 @@ func (m *ReplicatedProposalData_WriteBatch) MarshalTo(dAtA []byte) (int, error) return i, nil } +func (m *RaftCommand) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *RaftCommand) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + dAtA[i] = 0x12 + i++ + i = encodeVarintProposerKv(dAtA, i, uint64(m.OriginReplica.Size())) + n12, err := m.OriginReplica.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n12 + if m.Cmd != nil { + dAtA[i] = 0x1a + i++ + i = encodeVarintProposerKv(dAtA, i, uint64(m.Cmd.Size())) + n13, err := m.Cmd.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n13 + } + dAtA[i] = 0x20 + i++ + i = encodeVarintProposerKv(dAtA, i, uint64(m.MaxLeaseIndex)) + if m.WriteBatch != nil { + dAtA[i] = 0xda + i++ + dAtA[i] = 0xf1 + i++ + dAtA[i] = 0x4 + i++ + i = encodeVarintProposerKv(dAtA, i, uint64(m.WriteBatch.Size())) + n14, err := m.WriteBatch.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n14 + } + if m.ReplicatedProposalData != nil { + dAtA[i] = 0xea + i++ + dAtA[i] = 0xf1 + i++ + dAtA[i] = 0x4 + i++ + i = encodeVarintProposerKv(dAtA, i, uint64(m.ReplicatedProposalData.Size())) + n15, err := m.ReplicatedProposalData.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n15 + } + return i, nil +} + func encodeFixed64ProposerKv(dAtA []byte, offset int, v uint64) int { dAtA[offset] = uint8(v) dAtA[offset+1] = uint8(v >> 8) @@ -536,14 +586,6 @@ func (m *ChangeReplicas) Size() (n int) { func (m *ReplicatedProposalData) Size() (n int) { var l int _ = l - n += 1 + sovProposerKv(uint64(m.RangeID)) - l = m.OriginReplica.Size() - n += 1 + l + sovProposerKv(uint64(l)) - if m.Cmd != nil { - l = m.Cmd.Size() - n += 1 + l + sovProposerKv(uint64(l)) - } - n += 1 + sovProposerKv(uint64(m.MaxLeaseIndex)) n += 4 l = m.State.Size() n += 3 + l + sovProposerKv(uint64(l)) @@ -566,10 +608,6 @@ func (m *ReplicatedProposalData) Size() (n int) { n += 4 l = m.Delta.Size() n += 3 + l + sovProposerKv(uint64(l)) - if m.WriteBatch != nil { - l = m.WriteBatch.Size() - n += 3 + l + sovProposerKv(uint64(l)) - } if m.ChangeReplicas != nil { l = m.ChangeReplicas.Size() n += 3 + l + sovProposerKv(uint64(l)) @@ -577,7 +615,7 @@ func (m *ReplicatedProposalData) Size() (n int) { return n } -func (m *ReplicatedProposalData_WriteBatch) Size() (n int) { +func (m *WriteBatch) Size() (n int) { var l int _ = l if m.Data != nil { @@ -587,6 +625,27 @@ func (m *ReplicatedProposalData_WriteBatch) Size() (n int) { return n } +func (m *RaftCommand) Size() (n int) { + var l int + _ = l + l = m.OriginReplica.Size() + n += 1 + l + sovProposerKv(uint64(l)) + if m.Cmd != nil { + l = m.Cmd.Size() + n += 1 + l + sovProposerKv(uint64(l)) + } + n += 1 + sovProposerKv(uint64(m.MaxLeaseIndex)) + if m.WriteBatch != nil { + l = m.WriteBatch.Size() + n += 3 + l + sovProposerKv(uint64(l)) + } + if m.ReplicatedProposalData != nil { + l = m.ReplicatedProposalData.Size() + n += 3 + l + sovProposerKv(uint64(l)) + } + return n +} + func sovProposerKv(x uint64) (n int) { for { n++ @@ -899,107 +958,6 @@ func (m *ReplicatedProposalData) Unmarshal(dAtA []byte) error { return fmt.Errorf("proto: ReplicatedProposalData: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { - case 1: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field RangeID", wireType) - } - m.RangeID = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowProposerKv - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.RangeID |= (github_com_cockroachdb_cockroach_pkg_roachpb.RangeID(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field OriginReplica", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowProposerKv - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= (int(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLengthProposerKv - } - postIndex := iNdEx + msglen - if postIndex > l { - return io.ErrUnexpectedEOF - } - if err := m.OriginReplica.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - case 3: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Cmd", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowProposerKv - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= (int(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLengthProposerKv - } - postIndex := iNdEx + msglen - if postIndex > l { - return io.ErrUnexpectedEOF - } - if m.Cmd == nil { - m.Cmd = &cockroach_roachpb3.BatchRequest{} - } - if err := m.Cmd.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - case 4: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field MaxLeaseIndex", wireType) - } - m.MaxLeaseIndex = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowProposerKv - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.MaxLeaseIndex |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } case 10001: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field BlockReads", wireType) @@ -1269,9 +1227,9 @@ func (m *ReplicatedProposalData) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex - case 10011: + case 10012: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field WriteBatch", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field ChangeReplicas", wireType) } var msglen int for shift := uint(0); ; shift += 7 { @@ -1295,18 +1253,68 @@ func (m *ReplicatedProposalData) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - if m.WriteBatch == nil { - m.WriteBatch = &ReplicatedProposalData_WriteBatch{} + if m.ChangeReplicas == nil { + m.ChangeReplicas = &ChangeReplicas{} } - if err := m.WriteBatch.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + if err := m.ChangeReplicas.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { return err } iNdEx = postIndex - case 10012: + default: + iNdEx = preIndex + skippy, err := skipProposerKv(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthProposerKv + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *WriteBatch) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProposerKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: WriteBatch: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: WriteBatch: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field ChangeReplicas", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType) } - var msglen int + var byteLen int for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowProposerKv @@ -1316,23 +1324,21 @@ func (m *ReplicatedProposalData) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - msglen |= (int(b) & 0x7F) << shift + byteLen |= (int(b) & 0x7F) << shift if b < 0x80 { break } } - if msglen < 0 { + if byteLen < 0 { return ErrInvalidLengthProposerKv } - postIndex := iNdEx + msglen + postIndex := iNdEx + byteLen if postIndex > l { return io.ErrUnexpectedEOF } - if m.ChangeReplicas == nil { - m.ChangeReplicas = &ChangeReplicas{} - } - if err := m.ChangeReplicas.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { - return err + m.Data = append(m.Data[:0], dAtA[iNdEx:postIndex]...) + if m.Data == nil { + m.Data = []byte{} } iNdEx = postIndex default: @@ -1356,7 +1362,7 @@ func (m *ReplicatedProposalData) Unmarshal(dAtA []byte) error { } return nil } -func (m *ReplicatedProposalData_WriteBatch) Unmarshal(dAtA []byte) error { +func (m *RaftCommand) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 for iNdEx < l { @@ -1379,17 +1385,17 @@ func (m *ReplicatedProposalData_WriteBatch) Unmarshal(dAtA []byte) error { fieldNum := int32(wire >> 3) wireType := int(wire & 0x7) if wireType == 4 { - return fmt.Errorf("proto: WriteBatch: wiretype end group for non-group") + return fmt.Errorf("proto: RaftCommand: wiretype end group for non-group") } if fieldNum <= 0 { - return fmt.Errorf("proto: WriteBatch: illegal tag %d (wire type %d)", fieldNum, wire) + return fmt.Errorf("proto: RaftCommand: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { - case 1: + case 2: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field OriginReplica", wireType) } - var byteLen int + var msglen int for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowProposerKv @@ -1399,21 +1405,138 @@ func (m *ReplicatedProposalData_WriteBatch) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - byteLen |= (int(b) & 0x7F) << shift + msglen |= (int(b) & 0x7F) << shift if b < 0x80 { break } } - if byteLen < 0 { + if msglen < 0 { return ErrInvalidLengthProposerKv } - postIndex := iNdEx + byteLen + postIndex := iNdEx + msglen if postIndex > l { return io.ErrUnexpectedEOF } - m.Data = append(m.Data[:0], dAtA[iNdEx:postIndex]...) - if m.Data == nil { - m.Data = []byte{} + if err := m.OriginReplica.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Cmd", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProposerKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthProposerKv + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Cmd == nil { + m.Cmd = &cockroach_roachpb3.BatchRequest{} + } + if err := m.Cmd.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field MaxLeaseIndex", wireType) + } + m.MaxLeaseIndex = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProposerKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.MaxLeaseIndex |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 10011: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field WriteBatch", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProposerKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthProposerKv + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.WriteBatch == nil { + m.WriteBatch = &WriteBatch{} + } + if err := m.WriteBatch.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 10013: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ReplicatedProposalData", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProposerKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthProposerKv + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.ReplicatedProposalData == nil { + m.ReplicatedProposalData = &ReplicatedProposalData{} + } + if err := m.ReplicatedProposalData.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err } iNdEx = postIndex default: @@ -1547,56 +1670,57 @@ func init() { } var fileDescriptorProposerKv = []byte{ - // 812 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x9c, 0x55, 0xdb, 0x6e, 0xe3, 0x44, - 0x18, 0xae, 0x69, 0xa2, 0xa6, 0x13, 0x48, 0xab, 0xd1, 0x6a, 0x65, 0x55, 0xda, 0x24, 0x8a, 0x96, - 0x55, 0x10, 0xbb, 0x36, 0xa7, 0xbb, 0xbd, 0x41, 0x49, 0x04, 0x8d, 0xd4, 0x46, 0xe0, 0x16, 0x2a, - 0x81, 0x84, 0x35, 0x1e, 0x0f, 0xf6, 0x28, 0x76, 0x6c, 0x66, 0x26, 0x6d, 0xe1, 0x29, 0x38, 0x9f, - 0x5f, 0x80, 0x37, 0xe9, 0x65, 0x2f, 0xb9, 0x8a, 0x20, 0x3c, 0x02, 0x77, 0x5c, 0xa1, 0x39, 0x38, - 0x87, 0xd6, 0x3d, 0x68, 0xaf, 0x3c, 0x99, 0xf9, 0xbe, 0xef, 0x3f, 0xcc, 0x37, 0x7f, 0xc0, 0xdb, - 0x38, 0xc3, 0x63, 0x96, 0x21, 0x1c, 0xbb, 0xf9, 0x38, 0x72, 0xb9, 0xc8, 0x18, 0x8a, 0x48, 0xf1, - 0x0d, 0x10, 0x27, 0x6e, 0xce, 0xb2, 0x3c, 0xe3, 0x84, 0xf9, 0xe3, 0x53, 0x27, 0x67, 0x99, 0xc8, - 0xe0, 0xa3, 0x05, 0xc9, 0x31, 0x40, 0x67, 0x85, 0xb0, 0xd7, 0x5a, 0xd7, 0x54, 0xab, 0x3c, 0x70, - 0x51, 0x4e, 0x35, 0x7f, 0xaf, 0x5d, 0x0e, 0x08, 0x91, 0x40, 0x06, 0xf1, 0xb8, 0x1c, 0x91, 0x12, - 0x81, 0x56, 0x50, 0x6f, 0x94, 0x27, 0x4f, 0x26, 0x11, 0x9d, 0x14, 0x1f, 0xc9, 0x3a, 0xc5, 0xd8, - 0x30, 0x9e, 0xdd, 0x5d, 0x2e, 0x17, 0x48, 0x10, 0x03, 0x7f, 0xb2, 0x0e, 0x9f, 0x0a, 0x9a, 0xb8, - 0x71, 0x82, 0x5d, 0x41, 0x53, 0xc2, 0x05, 0x4a, 0x73, 0x83, 0x7b, 0x10, 0x65, 0x51, 0xa6, 0x96, - 0xae, 0x5c, 0xe9, 0xdd, 0xce, 0x1f, 0x16, 0xa8, 0x1e, 0xe5, 0x09, 0x15, 0xb0, 0x0f, 0xb6, 0x04, - 0xa3, 0x51, 0x44, 0x98, 0x6d, 0xb5, 0xad, 0x6e, 0xfd, 0xad, 0x96, 0xb3, 0x6c, 0xa1, 0x29, 0xce, - 0x51, 0xd0, 0x63, 0x0d, 0xeb, 0xd5, 0x2e, 0x66, 0xad, 0x8d, 0xcb, 0x59, 0xcb, 0xf2, 0x0a, 0x26, - 0xfc, 0x14, 0x6c, 0xb3, 0x98, 0xfb, 0x21, 0x49, 0x04, 0xb2, 0x5f, 0x52, 0x32, 0x4f, 0x9d, 0xeb, - 0x37, 0xa1, 0xcb, 0x76, 0x8a, 0xea, 0x9d, 0xc3, 0x8f, 0xfb, 0xfd, 0x23, 0x81, 0x04, 0xef, 0xed, - 0x4a, 0xcd, 0xf9, 0xac, 0x55, 0xf3, 0xf6, 0x8f, 0x06, 0x52, 0xc5, 0xab, 0xb1, 0x98, 0xab, 0x55, - 0xe7, 0x00, 0x54, 0x0f, 0x09, 0x8b, 0xc8, 0xfd, 0x52, 0x55, 0xd0, 0x9b, 0x53, 0xed, 0x7c, 0x06, - 0x1a, 0xfd, 0x18, 0x4d, 0x22, 0xe2, 0x91, 0x3c, 0xa1, 0x18, 0x71, 0x78, 0x70, 0x55, 0xb6, 0x5b, - 0x22, 0xbb, 0xce, 0xb9, 0x45, 0xff, 0xdf, 0x1a, 0x78, 0x68, 0x60, 0x82, 0x84, 0x1f, 0x28, 0x83, - 0xa2, 0x64, 0x80, 0x04, 0x82, 0x01, 0xa8, 0x31, 0xa9, 0xe2, 0xd3, 0x50, 0x45, 0xda, 0xec, 0xbd, - 0x6f, 0xca, 0xde, 0xf2, 0xe4, 0xfe, 0x70, 0xf0, 0xdf, 0xac, 0xf5, 0x4e, 0x44, 0x45, 0x3c, 0x0d, - 0x1c, 0x9c, 0xa5, 0xee, 0x22, 0x8d, 0x30, 0x70, 0x4b, 0x5d, 0xe7, 0x18, 0x9e, 0xb7, 0xa5, 0x84, - 0x87, 0x21, 0xfc, 0x10, 0x34, 0x32, 0x46, 0x23, 0x3a, 0xf1, 0x99, 0x4e, 0xc2, 0x5c, 0xc7, 0xe3, - 0x92, 0x9a, 0x4c, 0x9a, 0x03, 0xc2, 0x31, 0xa3, 0xb9, 0xc8, 0x58, 0xaf, 0x22, 0xf3, 0xf1, 0x5e, - 0xd1, 0x0a, 0xe6, 0x18, 0xbe, 0x09, 0x36, 0x71, 0x1a, 0xda, 0x9b, 0x37, 0xb6, 0xbc, 0x87, 0x04, - 0x8e, 0x3d, 0xf2, 0xc5, 0x94, 0x70, 0xe1, 0x49, 0x2c, 0x7c, 0x0a, 0x76, 0x52, 0x74, 0xee, 0x27, - 0x04, 0x71, 0xe2, 0xd3, 0x49, 0x48, 0xce, 0xed, 0x4a, 0xdb, 0xea, 0x56, 0x8a, 0x00, 0x29, 0x3a, - 0x3f, 0x90, 0x67, 0x43, 0x79, 0x04, 0x9f, 0x80, 0x7a, 0x90, 0x64, 0x78, 0xec, 0x33, 0x82, 0x42, - 0x6e, 0x7f, 0x33, 0x6a, 0x5b, 0xdd, 0x9a, 0x81, 0x02, 0x75, 0xe2, 0xc9, 0x03, 0xb8, 0x0f, 0xaa, - 0xea, 0x05, 0xd8, 0xdf, 0x8e, 0x54, 0x2e, 0xaf, 0x3b, 0xb7, 0x3e, 0xf6, 0xa2, 0x3e, 0xe9, 0x30, - 0x62, 0xe4, 0xb4, 0x00, 0x7c, 0x0e, 0xaa, 0x5c, 0x5a, 0xda, 0xfe, 0x6e, 0x74, 0xad, 0x3b, 0x65, - 0x4a, 0xca, 0xff, 0x9e, 0xe6, 0x48, 0x72, 0x2a, 0x4d, 0x66, 0x7f, 0x7f, 0x3f, 0xb2, 0x72, 0xa4, - 0xa7, 0x39, 0xf0, 0x23, 0xb0, 0x8b, 0xb3, 0x34, 0x9f, 0x0a, 0xe2, 0xe3, 0x98, 0xe0, 0x31, 0x9f, - 0xa6, 0xf6, 0x0f, 0x5a, 0xe7, 0xb5, 0x32, 0xdb, 0x69, 0x6c, 0xdf, 0x40, 0x8b, 0x26, 0xef, 0xe0, - 0xf5, 0x7d, 0xe8, 0x82, 0x5d, 0xca, 0x4d, 0xbf, 0x99, 0x06, 0xd9, 0x3f, 0xae, 0xf6, 0xb1, 0x41, - 0xb9, 0xea, 0xb8, 0x51, 0x80, 0x1d, 0xb0, 0x4d, 0xb9, 0xff, 0x39, 0x23, 0xe4, 0x2b, 0x62, 0xff, - 0xb4, 0x8a, 0xac, 0x51, 0xfe, 0x9e, 0xda, 0x86, 0x3d, 0xb0, 0xbd, 0x98, 0x26, 0xf6, 0xcf, 0x3a, - 0xc9, 0x47, 0x2b, 0x49, 0xca, 0x99, 0xe3, 0xc4, 0x09, 0x76, 0x8e, 0x0b, 0x94, 0x91, 0x58, 0xd2, - 0xe0, 0x73, 0xf0, 0x90, 0x72, 0x1f, 0x67, 0x13, 0x4e, 0xb9, 0x20, 0x13, 0xfc, 0xa5, 0xcf, 0x48, - 0x22, 0x5f, 0x86, 0xfd, 0xcb, 0x6a, 0xd0, 0x07, 0x94, 0xf7, 0x97, 0x18, 0x4f, 0x43, 0xe0, 0x10, - 0x54, 0xf5, 0x48, 0xf9, 0x75, 0xf4, 0x02, 0x33, 0xc5, 0xdc, 0xb8, 0x52, 0x80, 0x01, 0xa8, 0x9f, - 0x31, 0x2a, 0x88, 0x1f, 0x48, 0xb3, 0xda, 0xbf, 0x69, 0xc1, 0x77, 0xef, 0xe7, 0xa0, 0x2b, 0x0f, - 0xd9, 0x39, 0x91, 0x4a, 0xda, 0xf5, 0xe0, 0x6c, 0xb1, 0x86, 0x27, 0x60, 0x07, 0xab, 0x31, 0x51, - 0xbc, 0x3d, 0x6e, 0xff, 0xae, 0xe3, 0x3c, 0xbb, 0x23, 0xce, 0xfa, 0x74, 0xf1, 0x1a, 0x78, 0xed, - 0xf7, 0x5e, 0x1b, 0x80, 0x65, 0x48, 0x08, 0x41, 0x45, 0xfe, 0xd1, 0xa8, 0x11, 0xf2, 0xb2, 0xa7, - 0xd6, 0xbd, 0x57, 0x2f, 0xfe, 0x6e, 0x6e, 0x5c, 0xcc, 0x9b, 0xd6, 0xe5, 0xbc, 0x69, 0xfd, 0x39, - 0x6f, 0x5a, 0x7f, 0xcd, 0x9b, 0xd6, 0xd7, 0xff, 0x34, 0x37, 0x3e, 0xa9, 0xaf, 0xc4, 0xf9, 0x3f, - 0x00, 0x00, 0xff, 0xff, 0xc0, 0x1b, 0x18, 0xde, 0x53, 0x07, 0x00, 0x00, + // 823 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x9c, 0x54, 0xdb, 0x6e, 0x1b, 0x45, + 0x18, 0xce, 0x36, 0x76, 0xd9, 0x8c, 0x21, 0xb1, 0x46, 0x55, 0x34, 0xaa, 0x54, 0xdb, 0xb2, 0x42, + 0x15, 0x44, 0xbb, 0xcb, 0x41, 0x5c, 0xf5, 0xce, 0x8e, 0x50, 0x1b, 0x52, 0x0b, 0x26, 0x85, 0x4a, + 0x20, 0xb1, 0x1a, 0x8f, 0xa7, 0xbb, 0x23, 0xef, 0x89, 0x99, 0x71, 0x1b, 0x78, 0x0a, 0xca, 0xa1, + 0x1c, 0xda, 0x3e, 0x00, 0x6f, 0x92, 0xcb, 0xde, 0x20, 0x71, 0x15, 0x81, 0x79, 0x11, 0x34, 0x87, + 0x8d, 0xed, 0x76, 0x9b, 0x44, 0x5c, 0xed, 0xec, 0xcc, 0xf7, 0x7d, 0xf3, 0x1f, 0xbe, 0x7f, 0xc0, + 0x87, 0xb4, 0xa0, 0x53, 0x51, 0x10, 0x9a, 0x84, 0xe5, 0x34, 0x0e, 0xa5, 0x2a, 0x04, 0x89, 0x59, + 0xf5, 0x1d, 0x13, 0xc9, 0xc2, 0x52, 0x14, 0x65, 0x21, 0x99, 0x88, 0xa6, 0x0f, 0x83, 0x52, 0x14, + 0xaa, 0x80, 0xd7, 0x4e, 0x49, 0x81, 0x03, 0x06, 0x4b, 0x84, 0xab, 0xdd, 0x55, 0x4d, 0xb3, 0x2a, + 0xc7, 0x21, 0x29, 0xb9, 0xe5, 0x5f, 0xed, 0xd5, 0x03, 0x26, 0x44, 0x11, 0x87, 0xd8, 0xa9, 0x47, + 0x64, 0x4c, 0x91, 0x25, 0xd4, 0x7b, 0xf5, 0xc1, 0xb3, 0x3c, 0xe6, 0x79, 0xf5, 0xd1, 0xac, 0x87, + 0x94, 0x3a, 0xc6, 0xcd, 0xf3, 0xd3, 0x95, 0x8a, 0x28, 0xe6, 0xe0, 0xd7, 0x57, 0xe1, 0x33, 0xc5, + 0xd3, 0x30, 0x49, 0x69, 0xa8, 0x78, 0xc6, 0xa4, 0x22, 0x59, 0xe9, 0x70, 0x57, 0xe2, 0x22, 0x2e, + 0xcc, 0x32, 0xd4, 0x2b, 0xbb, 0xdb, 0xff, 0xc3, 0x03, 0xcd, 0xc3, 0x32, 0xe5, 0x0a, 0x0e, 0xc1, + 0x1b, 0x4a, 0xf0, 0x38, 0x66, 0x02, 0x79, 0x3d, 0x6f, 0xb7, 0xf5, 0x41, 0x37, 0x58, 0x94, 0xd0, + 0x25, 0x17, 0x18, 0xe8, 0x3d, 0x0b, 0x1b, 0xf8, 0xc7, 0x27, 0xdd, 0xb5, 0x17, 0x27, 0x5d, 0x0f, + 0x57, 0x4c, 0xf8, 0x15, 0xd8, 0x10, 0x89, 0x8c, 0x26, 0x2c, 0x55, 0x04, 0x5d, 0x32, 0x32, 0x37, + 0x82, 0x57, 0x3b, 0x61, 0xd3, 0x0e, 0xaa, 0xec, 0x83, 0xbb, 0x5f, 0x0c, 0x87, 0x87, 0x8a, 0x28, + 0x39, 0x68, 0x6b, 0xcd, 0xf9, 0x49, 0xd7, 0xc7, 0xb7, 0x0f, 0xf7, 0xb4, 0x0a, 0xf6, 0x45, 0x22, + 0xcd, 0xaa, 0x7f, 0x00, 0x9a, 0x77, 0x99, 0x88, 0xd9, 0xc5, 0x42, 0x35, 0xd0, 0xd7, 0x87, 0xda, + 0xff, 0x1a, 0x6c, 0x0e, 0x13, 0x92, 0xc7, 0x0c, 0xb3, 0x32, 0xe5, 0x94, 0x48, 0x78, 0xf0, 0xb2, + 0xec, 0x6e, 0x8d, 0xec, 0x2a, 0xe7, 0x0c, 0xfd, 0x3f, 0x9b, 0x60, 0xdb, 0xc1, 0x14, 0x9b, 0x7c, + 0x6a, 0x0c, 0x4a, 0xd2, 0x3d, 0xa2, 0x08, 0xbc, 0x0e, 0x5a, 0xe3, 0xb4, 0xa0, 0xd3, 0x48, 0x30, + 0x32, 0x91, 0xe8, 0xf1, 0xa8, 0xe7, 0xed, 0xfa, 0x83, 0x86, 0xd6, 0xc0, 0xc0, 0x9c, 0x60, 0x7d, + 0x00, 0x6f, 0x83, 0xa6, 0xe9, 0x34, 0xfa, 0x61, 0x64, 0xe2, 0x79, 0x37, 0x38, 0xd3, 0xd4, 0x81, + 0xbb, 0x4e, 0x57, 0x92, 0x39, 0x39, 0x2b, 0x00, 0x6f, 0x81, 0xa6, 0xd4, 0xad, 0x43, 0x3f, 0x5a, + 0xa5, 0x9d, 0x73, 0x94, 0x4c, 0x9f, 0xb1, 0xe5, 0x68, 0x72, 0xa6, 0x8b, 0x89, 0x7e, 0xba, 0x18, + 0xd9, 0x54, 0x1e, 0x5b, 0x0e, 0xfc, 0x1c, 0xb4, 0x69, 0x91, 0x95, 0x33, 0xc5, 0x22, 0x9a, 0x30, + 0x3a, 0x95, 0xb3, 0x0c, 0xfd, 0x6c, 0x75, 0xde, 0xa9, 0x2b, 0xaf, 0xc5, 0x0e, 0x1d, 0x14, 0xb3, + 0x6f, 0x66, 0x4c, 0x2a, 0xbc, 0x45, 0x57, 0xf7, 0x61, 0x08, 0xda, 0x5c, 0x46, 0x29, 0x23, 0x92, + 0x45, 0xc2, 0x82, 0xd0, 0x93, 0xe5, 0x3a, 0x6e, 0x72, 0x79, 0xa0, 0x4f, 0x9d, 0x02, 0xec, 0x83, + 0x0d, 0x2e, 0xa3, 0x07, 0x82, 0xb1, 0xef, 0x18, 0xfa, 0x65, 0x19, 0xe9, 0x73, 0xf9, 0xb1, 0xd9, + 0x86, 0x03, 0xb0, 0x71, 0x3a, 0x35, 0xe8, 0x57, 0x1b, 0xe4, 0xb5, 0xa5, 0x20, 0xf5, 0x6c, 0x05, + 0x49, 0x4a, 0x83, 0x7b, 0x15, 0xca, 0x49, 0x2c, 0x68, 0xf0, 0x16, 0xd8, 0xe6, 0x32, 0xa2, 0x45, + 0x2e, 0xb9, 0x54, 0x2c, 0xa7, 0xdf, 0x46, 0x82, 0xa5, 0xda, 0x01, 0xe8, 0xb7, 0xe5, 0x4b, 0xaf, + 0x70, 0x39, 0x5c, 0x60, 0xb0, 0x85, 0xc0, 0x3b, 0xa0, 0x69, 0x47, 0xe7, 0xf7, 0xd1, 0xff, 0x98, + 0x1d, 0xd7, 0x71, 0xa3, 0x00, 0xef, 0x83, 0x2d, 0x6a, 0xac, 0x1a, 0x09, 0xe7, 0x55, 0xf4, 0xcc, + 0x8a, 0xde, 0x3c, 0xa7, 0x7d, 0xab, 0x0e, 0xc7, 0x9b, 0x74, 0xe5, 0xbf, 0xdf, 0x03, 0xe0, 0xbe, + 0xe0, 0x8a, 0x0d, 0x88, 0xa2, 0x09, 0x84, 0xa0, 0xa1, 0x1f, 0x3b, 0x33, 0x30, 0x6f, 0x62, 0xb3, + 0xee, 0x3f, 0x59, 0x07, 0x2d, 0x4c, 0x1e, 0xa8, 0x61, 0x91, 0x65, 0x24, 0x9f, 0xc0, 0xcf, 0xc0, + 0x66, 0x21, 0x78, 0xcc, 0xf3, 0x2a, 0x14, 0xf7, 0x32, 0xec, 0xd4, 0xf4, 0xdf, 0x5d, 0xb3, 0xc7, + 0x24, 0x15, 0xbc, 0x54, 0x85, 0x70, 0x59, 0xbd, 0x65, 0x15, 0xdc, 0x31, 0x7c, 0x1f, 0xac, 0xd3, + 0x6c, 0x82, 0xd6, 0x5f, 0x3b, 0xfd, 0x26, 0xba, 0xca, 0x3d, 0x1a, 0x0b, 0x6f, 0x80, 0xad, 0x8c, + 0x1c, 0x39, 0xcb, 0xf0, 0x7c, 0xc2, 0x8e, 0x50, 0xa3, 0xe7, 0xed, 0x36, 0xaa, 0x0b, 0x32, 0x72, + 0x64, 0x0c, 0x73, 0x47, 0x1f, 0xc1, 0x4f, 0x40, 0xeb, 0x91, 0xce, 0x32, 0x1a, 0x6b, 0x21, 0xf4, + 0xf4, 0x55, 0xc7, 0xd6, 0x95, 0x6e, 0x51, 0x18, 0x0c, 0x1e, 0x2d, 0x8a, 0x54, 0x02, 0x24, 0x4e, + 0x5f, 0x82, 0xa8, 0x74, 0x4f, 0x41, 0x64, 0x0a, 0xf7, 0xdc, 0x2a, 0x7f, 0x74, 0xb1, 0xd1, 0x7e, + 0xe9, 0x25, 0xc1, 0xdb, 0xa2, 0x76, 0x7f, 0xbf, 0xe1, 0x7b, 0xed, 0x4b, 0xfb, 0x97, 0xfd, 0xc7, + 0xa3, 0xf6, 0xd3, 0xd1, 0xfe, 0x65, 0xff, 0xd9, 0xa8, 0xfd, 0x7c, 0x34, 0x78, 0xfb, 0xf8, 0x9f, + 0xce, 0xda, 0xf1, 0xbc, 0xe3, 0xbd, 0x98, 0x77, 0xbc, 0xbf, 0xe6, 0x1d, 0xef, 0xef, 0x79, 0xc7, + 0xfb, 0xfe, 0xdf, 0xce, 0xda, 0x97, 0xad, 0xa5, 0xbb, 0xfe, 0x0b, 0x00, 0x00, 0xff, 0xff, 0x4a, + 0x1d, 0x65, 0x5b, 0x70, 0x07, 0x00, 0x00, } diff --git a/pkg/storage/storagebase/proposer_kv.proto b/pkg/storage/storagebase/proposer_kv.proto index 60c3a0ba398d..b0e38d52ca15 100644 --- a/pkg/storage/storagebase/proposer_kv.proto +++ b/pkg/storage/storagebase/proposer_kv.proto @@ -59,7 +59,7 @@ message ChangeReplicas { (gogoproto.embed) = true]; } -// ReplicaProposalData is the structured information which together with +// ReplicatedProposalData is the structured information which together with // a RocksDB WriteBatch constitutes the proposal payload in proposer-evaluated // KV. For the majority of proposals, we expect ReplicatedProposalData to be // trivial; only changes to the metadata state (splits, merges, rebalances, @@ -70,15 +70,51 @@ message ChangeReplicas { // followers to reliably produce errors for proposals which apply after a // lease change. message ReplicatedProposalData { - // ====================================== - // Beginning of what was formerly RaftCommand. - // ====================================== + // Whether to block concurrent readers while processing the proposal data. + optional bool block_reads = 10001 [(gogoproto.nullable) = false]; + // Updates to the Replica's ReplicaState. By convention and as outlined on + // the comment on the ReplicaState message, this field is sparsely populated + // and any field set overwrites the corresponding field in the state, perhaps + // which additional side effects (for instance on a descriptor update). + optional storage.storagebase.ReplicaState state = 10002 [(gogoproto.nullable) = false]; + optional Split split = 10003; + optional Merge merge = 10004; + // TODO(tschottdorf): trim this down; we shouldn't need the whole request. + optional roachpb.ComputeChecksumRequest compute_checksum = 10005; + optional bool is_lease_request = 10006 [(gogoproto.nullable) = false]; + optional bool is_freeze = 10007 [(gogoproto.nullable) = false]; + // Denormalizes BatchRequest.Timestamp during the transition period for + // proposer-evaluated KV. Only used to verify lease coverage. + optional util.hlc.Timestamp timestamp = 10008 [(gogoproto.nullable) = false]; + optional bool is_consistency_related = 10009 [(gogoproto.nullable) = false]; + // The stats delta corresponding to the data in this WriteBatch. On + // a split, contains only the contributions to the left-hand side. + optional storage.engine.enginepb.MVCCStats delta = 10010 [(gogoproto.nullable) = false]; + optional ChangeReplicas change_replicas = 10012; +} + +// WriteBatch is the serialized representation of a RocksDB write +// batch. A wrapper message is used so that the absence of the field +// can be distinguished from a zero-length batch, and so structs +// containing pointers to it can be compared with the == operator (we +// rely on this in storage.ProposalData) +message WriteBatch { + optional bytes data = 1; +} + +// RaftCommand is the message written to the raft log. It contains +// some metadata about the proposal itself, then either a BatchRequest +// (legacy mode) or a ReplicatedProposalData + WriteBatch +// (proposer-evaluated KV mode). +message RaftCommand { + // Metadata about the proposal itself. These fields exist at + // top-level instead of being grouped in a sub-message for + // backwards-compatibility. - optional int64 range_id = 1 [(gogoproto.nullable) = false, - (gogoproto.customname) = "RangeID", - (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.RangeID"]; + // origin_replica is the replica which proposed this command, to be + // used for lease validation. optional roachpb.ReplicaDescriptor origin_replica = 2 [(gogoproto.nullable) = false]; - optional roachpb.BatchRequest cmd = 3; + // When the command is applied, its result is an error if the lease log // counter has already reached (or exceeded) max_lease_index. // @@ -107,39 +143,25 @@ message ReplicatedProposalData { // well as that uproots whatever ordering was originally envisioned. optional uint64 max_lease_index = 4 [(gogoproto.nullable) = false]; - // ====================================== - // End of what was formerly RaftCommand and beginning of proposer-evaluated - // KV protos. These are not stable. While general proto compatibility rules + // Legacy mode (post-raft evaluation): + + // cmd is the KV command to apply. + // TODO(bdarnell): Should not be set when propEvalKV is used, but is currently + // required to support test filters. + optional roachpb.BatchRequest cmd = 3; + + // Proposer-evaluated KV mode. + // These are not stable. While general proto compatibility rules // apply, these are intentionally kept at high tag numbers for now so that // a stabilized version can be inserted at low tag numbers in the future. - // ====================================== + // These fields are only populated if proposer-evaluated KV was in effect when + // the command was proposed. - // Whether to block concurrent readers while processing the proposal data. - optional bool block_reads = 10001 [(gogoproto.nullable) = false]; - // Updates to the Replica's ReplicaState. By convention and as outlined on - // the comment on the ReplicaState message, this field is sparsely populated - // and any field set overwrites the corresponding field in the state, perhaps - // which additional side effects (for instance on a descriptor update). - optional storage.storagebase.ReplicaState state = 10002 [(gogoproto.nullable) = false]; - optional Split split = 10003; - optional Merge merge = 10004; - // TODO(tschottdorf): trim this down; we shouldn't need the whole request. - optional roachpb.ComputeChecksumRequest compute_checksum = 10005; - optional bool is_lease_request = 10006 [(gogoproto.nullable) = false]; - optional bool is_freeze = 10007 [(gogoproto.nullable) = false]; - // Denormalizes BatchRequest.Timestamp during the transition period for - // proposer-evaluated KV. Only used to verify lease coverage. - optional util.hlc.Timestamp timestamp = 10008 [(gogoproto.nullable) = false]; - optional bool is_consistency_related = 10009 [(gogoproto.nullable) = false]; - // The stats delta corresponding to the data in this WriteBatch. On - // a split, contains only the contributions to the left-hand side. - optional storage.engine.enginepb.MVCCStats delta = 10010 [(gogoproto.nullable) = false]; - message WriteBatch { - optional bytes data = 1; - } + optional ReplicatedProposalData replicated_proposal_data = 10013; // TODO(tschottdorf): using an extra message here (and not just `bytes`) to - // allow the generated ReplicatedProposalData to be compared directly. If + // allow the generated RaftCommand to be compared directly. If // this costs an extra large allocation, we need to do something different. optional WriteBatch write_batch = 10011; - optional ChangeReplicas change_replicas = 10012; + + reserved 1, 10001 to 10010, 10012; }