diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index d5c6da43345b..b8491d1a4283 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -355,7 +355,9 @@ func (r *Replica) submitProposalLocked(p *ProposalData) error { } func defaultSubmitProposalLocked(r *Replica, p *ProposalData) error { - data, err := protoutil.Marshal(p.command) + cmdSize := p.command.Size() + data := make([]byte, raftCommandPrefixLen+cmdSize) + _, err := protoutil.MarshalToWithoutFuzzing(p.command, data[raftCommandPrefixLen:]) if err != nil { return err } @@ -369,7 +371,7 @@ func defaultSubmitProposalLocked(r *Replica, p *ProposalData) error { RaftCommand.ReplicatedEvalResult: %d RaftCommand.ReplicatedEvalResult.Delta: %d RaftCommand.WriteBatch: %d -`, p.Request.Summary(), len(data), +`, p.Request.Summary(), cmdSize, p.command.ProposerReplica.Size(), p.command.ReplicatedEvalResult.Size(), p.command.ReplicatedEvalResult.Delta.Size(), @@ -383,7 +385,7 @@ func defaultSubmitProposalLocked(r *Replica, p *ProposalData) error { // blips or worse, and it's good to be able to pick them from traces. // // TODO(tschottdorf): can we mark them so lightstep can group them? - if size := len(data); size > largeProposalEventThresholdBytes { + if size := cmdSize; size > largeProposalEventThresholdBytes { log.Eventf(p.ctx, "proposal is large: %s", humanizeutil.IBytes(int64(size))) } @@ -406,7 +408,7 @@ func defaultSubmitProposalLocked(r *Replica, p *ProposalData) error { confChangeCtx := ConfChangeContext{ CommandID: string(p.idKey), - Payload: data, + Payload: data[raftCommandPrefixLen:], // chop off prefix Replica: crt.Replica, } encodedCtx, err := protoutil.Marshal(&confChangeCtx) @@ -427,24 +429,26 @@ func defaultSubmitProposalLocked(r *Replica, p *ProposalData) error { }) } - return r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) { - encode := encodeRaftCommandV1 - if p.command.ReplicatedEvalResult.AddSSTable != nil { - if p.command.ReplicatedEvalResult.AddSSTable.Data == nil { - return false, errors.New("cannot sideload empty SSTable") - } - encode = encodeRaftCommandV2 - r.store.metrics.AddSSTableProposals.Inc(1) - log.Event(p.ctx, "sideloadable proposal detected") - } + if log.V(4) { + log.Infof(p.ctx, "proposing command %x: %s", p.idKey, p.Request.Summary()) + } - if log.V(4) { - log.Infof(p.ctx, "proposing command %x: %s", p.idKey, p.Request.Summary()) + encodingVersion := raftVersionStandard + if p.command.ReplicatedEvalResult.AddSSTable != nil { + if p.command.ReplicatedEvalResult.AddSSTable.Data == nil { + return errors.New("cannot sideload empty SSTable") } + encodingVersion = raftVersionSideloaded + r.store.metrics.AddSSTableProposals.Inc(1) + log.Event(p.ctx, "sideloadable proposal detected") + } + encodeRaftCommandPrefix(data[:raftCommandPrefixLen], encodingVersion, p.idKey) + + return r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) { // We're proposing a command so there is no need to wake the leader if // we're quiesced. r.unquiesceLocked() - return false /* unquiesceAndWakeLeader */, raftGroup.Propose(encode(p.idKey, data)) + return false /* unquiesceAndWakeLeader */, raftGroup.Propose(data) }) } diff --git a/pkg/storage/replica_raft_quiesce.go b/pkg/storage/replica_raft_quiesce.go index 8dd5c07d0d2e..6f0327238b4a 100644 --- a/pkg/storage/replica_raft_quiesce.go +++ b/pkg/storage/replica_raft_quiesce.go @@ -97,7 +97,8 @@ func (r *Replica) unquiesceAndWakeLeaderLocked() { r.store.unquiescedReplicas.Unlock() r.maybeCampaignOnWakeLocked(ctx) // Propose an empty command which will wake the leader. - _ = r.mu.internalRaftGroup.Propose(encodeRaftCommandV1(makeIDKey(), nil)) + data := encodeRaftCommand(raftVersionStandard, makeIDKey(), nil) + _ = r.mu.internalRaftGroup.Propose(data) } } diff --git a/pkg/storage/replica_raftstorage.go b/pkg/storage/replica_raftstorage.go index 47ad37fcd4b5..65bd758c584a 100644 --- a/pkg/storage/replica_raftstorage.go +++ b/pkg/storage/replica_raftstorage.go @@ -1030,8 +1030,6 @@ type raftCommandEncodingVersion byte // // TODO(bdarnell): is this commandID still appropriate for our needs? const ( - // The prescribed length for each command ID. - raftCommandIDLen = 8 // The initial Raft command version, used for all regular Raft traffic. raftVersionStandard raftCommandEncodingVersion = 0 // A proposal containing an SSTable which preferably should be sideloaded @@ -1040,6 +1038,10 @@ const ( // Raft log it necessary to inline the payload first as it has usually // been sideloaded. raftVersionSideloaded raftCommandEncodingVersion = 1 + // The prescribed length for each command ID. + raftCommandIDLen = 8 + // The prescribed length of each encoded command's prefix. + raftCommandPrefixLen = 1 + raftCommandIDLen // The no-split bit is now unused, but we still apply the mask to the first // byte of the command for backward compatibility. // @@ -1048,27 +1050,26 @@ const ( raftCommandNoSplitMask = raftCommandNoSplitBit - 1 ) -func encodeRaftCommandV1(commandID storagebase.CmdIDKey, command []byte) []byte { - return encodeRaftCommand(raftVersionStandard, commandID, command) -} - -func encodeRaftCommandV2(commandID storagebase.CmdIDKey, command []byte) []byte { - return encodeRaftCommand(raftVersionSideloaded, commandID, command) -} - -// encode a command ID, an encoded storagebase.RaftCommand, and -// whether the command contains a split. func encodeRaftCommand( version raftCommandEncodingVersion, commandID storagebase.CmdIDKey, command []byte, ) []byte { + b := make([]byte, raftCommandPrefixLen+len(command)) + encodeRaftCommandPrefix(b[:raftCommandPrefixLen], version, commandID) + copy(b[raftCommandPrefixLen:], command) + return b +} + +func encodeRaftCommandPrefix( + b []byte, version raftCommandEncodingVersion, commandID storagebase.CmdIDKey, +) { if len(commandID) != raftCommandIDLen { panic(fmt.Sprintf("invalid command ID length; %d != %d", len(commandID), raftCommandIDLen)) } - x := make([]byte, 1, 1+raftCommandIDLen+len(command)) - x[0] = byte(version) - x = append(x, []byte(commandID)...) - x = append(x, command...) - return x + if len(b) != raftCommandPrefixLen { + panic(fmt.Sprintf("invalid command prefix length; %d != %d", len(b), raftCommandPrefixLen)) + } + b[0] = byte(version) + copy(b[1:], []byte(commandID)) } // DecodeRaftCommand splits a raftpb.Entry.Data into its commandID and diff --git a/pkg/storage/replica_sideload.go b/pkg/storage/replica_sideload.go index cdbcb9de40b0..63b373443ae6 100644 --- a/pkg/storage/replica_sideload.go +++ b/pkg/storage/replica_sideload.go @@ -147,14 +147,15 @@ func maybeSideloadEntriesImpl( strippedCmd.ReplicatedEvalResult.AddSSTable.Data = nil { - var err error - data, err = protoutil.Marshal(&strippedCmd) + data = make([]byte, raftCommandPrefixLen+strippedCmd.Size()) + encodeRaftCommandPrefix(data[:raftCommandPrefixLen], raftVersionSideloaded, cmdID) + _, err := protoutil.MarshalToWithoutFuzzing(&strippedCmd, data[raftCommandPrefixLen:]) if err != nil { return nil, 0, errors.Wrap(err, "while marshaling stripped sideloaded command") } + ent.Data = data } - ent.Data = encodeRaftCommandV2(cmdID, data) log.Eventf(ctx, "writing payload at index=%d term=%d", ent.Index, ent.Term) if err = sideloaded.Put(ctx, ent.Index, ent.Term, dataToSideload); err != nil { return nil, 0, err @@ -226,11 +227,13 @@ func maybeInlineSideloadedRaftCommand( } command.ReplicatedEvalResult.AddSSTable.Data = sideloadedData { - data, err := protoutil.Marshal(&command) + data := make([]byte, raftCommandPrefixLen+command.Size()) + encodeRaftCommandPrefix(data[:raftCommandPrefixLen], raftVersionSideloaded, cmdID) + _, err := protoutil.MarshalToWithoutFuzzing(&command, data[raftCommandPrefixLen:]) if err != nil { return nil, err } - ent.Data = encodeRaftCommandV2(cmdID, data) + ent.Data = data } return &ent, nil } diff --git a/pkg/storage/split_delay_helper.go b/pkg/storage/split_delay_helper.go index f8b59f31b951..09208ca7f1d3 100644 --- a/pkg/storage/split_delay_helper.go +++ b/pkg/storage/split_delay_helper.go @@ -52,7 +52,8 @@ func (sdh *splitDelayHelper) ProposeEmptyCommand(ctx context.Context) { _ = r.withRaftGroup(true /* campaignOnWake */, func(rawNode *raft.RawNode) (bool, error) { // NB: intentionally ignore the error (which can be ErrProposalDropped // when there's an SST inflight). - _ = rawNode.Propose(encodeRaftCommandV1(makeIDKey(), nil)) + data := encodeRaftCommand(raftVersionStandard, makeIDKey(), nil) + _ = rawNode.Propose(data) // NB: we need to unquiesce as the group might be quiesced. return true /* unquiesceAndWakeLeader */, nil })