Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-19.1: storage: avoid copying marshalled RaftCommand when encoding #36670

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 21 additions & 17 deletions pkg/storage/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,9 @@ func (r *Replica) submitProposalLocked(p *ProposalData) error {
}

func defaultSubmitProposalLocked(r *Replica, p *ProposalData) error {
data, err := protoutil.Marshal(p.command)
cmdSize := p.command.Size()
data := make([]byte, raftCommandPrefixLen+cmdSize)
_, err := protoutil.MarshalToWithoutFuzzing(p.command, data[raftCommandPrefixLen:])
if err != nil {
return err
}
Expand All @@ -369,7 +371,7 @@ func defaultSubmitProposalLocked(r *Replica, p *ProposalData) error {
RaftCommand.ReplicatedEvalResult: %d
RaftCommand.ReplicatedEvalResult.Delta: %d
RaftCommand.WriteBatch: %d
`, p.Request.Summary(), len(data),
`, p.Request.Summary(), cmdSize,
p.command.ProposerReplica.Size(),
p.command.ReplicatedEvalResult.Size(),
p.command.ReplicatedEvalResult.Delta.Size(),
Expand All @@ -383,7 +385,7 @@ func defaultSubmitProposalLocked(r *Replica, p *ProposalData) error {
// blips or worse, and it's good to be able to pick them from traces.
//
// TODO(tschottdorf): can we mark them so lightstep can group them?
if size := len(data); size > largeProposalEventThresholdBytes {
if size := cmdSize; size > largeProposalEventThresholdBytes {
log.Eventf(p.ctx, "proposal is large: %s", humanizeutil.IBytes(int64(size)))
}

Expand All @@ -406,7 +408,7 @@ func defaultSubmitProposalLocked(r *Replica, p *ProposalData) error {

confChangeCtx := ConfChangeContext{
CommandID: string(p.idKey),
Payload: data,
Payload: data[raftCommandPrefixLen:], // chop off prefix
Replica: crt.Replica,
}
encodedCtx, err := protoutil.Marshal(&confChangeCtx)
Expand All @@ -427,24 +429,26 @@ func defaultSubmitProposalLocked(r *Replica, p *ProposalData) error {
})
}

return r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) {
encode := encodeRaftCommandV1
if p.command.ReplicatedEvalResult.AddSSTable != nil {
if p.command.ReplicatedEvalResult.AddSSTable.Data == nil {
return false, errors.New("cannot sideload empty SSTable")
}
encode = encodeRaftCommandV2
r.store.metrics.AddSSTableProposals.Inc(1)
log.Event(p.ctx, "sideloadable proposal detected")
}
if log.V(4) {
log.Infof(p.ctx, "proposing command %x: %s", p.idKey, p.Request.Summary())
}

if log.V(4) {
log.Infof(p.ctx, "proposing command %x: %s", p.idKey, p.Request.Summary())
encodingVersion := raftVersionStandard
if p.command.ReplicatedEvalResult.AddSSTable != nil {
if p.command.ReplicatedEvalResult.AddSSTable.Data == nil {
return errors.New("cannot sideload empty SSTable")
}
encodingVersion = raftVersionSideloaded
r.store.metrics.AddSSTableProposals.Inc(1)
log.Event(p.ctx, "sideloadable proposal detected")
}
encodeRaftCommandPrefix(data[:raftCommandPrefixLen], encodingVersion, p.idKey)

return r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) {
// We're proposing a command so there is no need to wake the leader if
// we're quiesced.
r.unquiesceLocked()
return false /* unquiesceAndWakeLeader */, raftGroup.Propose(encode(p.idKey, data))
return false /* unquiesceAndWakeLeader */, raftGroup.Propose(data)
})
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/replica_raft_quiesce.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ func (r *Replica) unquiesceAndWakeLeaderLocked() {
r.store.unquiescedReplicas.Unlock()
r.maybeCampaignOnWakeLocked(ctx)
// Propose an empty command which will wake the leader.
_ = r.mu.internalRaftGroup.Propose(encodeRaftCommandV1(makeIDKey(), nil))
data := encodeRaftCommand(raftVersionStandard, makeIDKey(), nil)
_ = r.mu.internalRaftGroup.Propose(data)
}
}

Expand Down
35 changes: 18 additions & 17 deletions pkg/storage/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -1030,8 +1030,6 @@ type raftCommandEncodingVersion byte
//
// TODO(bdarnell): is this commandID still appropriate for our needs?
const (
// The prescribed length for each command ID.
raftCommandIDLen = 8
// The initial Raft command version, used for all regular Raft traffic.
raftVersionStandard raftCommandEncodingVersion = 0
// A proposal containing an SSTable which preferably should be sideloaded
Expand All @@ -1040,6 +1038,10 @@ const (
// Raft log it necessary to inline the payload first as it has usually
// been sideloaded.
raftVersionSideloaded raftCommandEncodingVersion = 1
// The prescribed length for each command ID.
raftCommandIDLen = 8
// The prescribed length of each encoded command's prefix.
raftCommandPrefixLen = 1 + raftCommandIDLen
// The no-split bit is now unused, but we still apply the mask to the first
// byte of the command for backward compatibility.
//
Expand All @@ -1048,27 +1050,26 @@ const (
raftCommandNoSplitMask = raftCommandNoSplitBit - 1
)

func encodeRaftCommandV1(commandID storagebase.CmdIDKey, command []byte) []byte {
return encodeRaftCommand(raftVersionStandard, commandID, command)
}

func encodeRaftCommandV2(commandID storagebase.CmdIDKey, command []byte) []byte {
return encodeRaftCommand(raftVersionSideloaded, commandID, command)
}

// encode a command ID, an encoded storagebase.RaftCommand, and
// whether the command contains a split.
func encodeRaftCommand(
version raftCommandEncodingVersion, commandID storagebase.CmdIDKey, command []byte,
) []byte {
b := make([]byte, raftCommandPrefixLen+len(command))
encodeRaftCommandPrefix(b[:raftCommandPrefixLen], version, commandID)
copy(b[raftCommandPrefixLen:], command)
return b
}

func encodeRaftCommandPrefix(
b []byte, version raftCommandEncodingVersion, commandID storagebase.CmdIDKey,
) {
if len(commandID) != raftCommandIDLen {
panic(fmt.Sprintf("invalid command ID length; %d != %d", len(commandID), raftCommandIDLen))
}
x := make([]byte, 1, 1+raftCommandIDLen+len(command))
x[0] = byte(version)
x = append(x, []byte(commandID)...)
x = append(x, command...)
return x
if len(b) != raftCommandPrefixLen {
panic(fmt.Sprintf("invalid command prefix length; %d != %d", len(b), raftCommandPrefixLen))
}
b[0] = byte(version)
copy(b[1:], []byte(commandID))
}

// DecodeRaftCommand splits a raftpb.Entry.Data into its commandID and
Expand Down
13 changes: 8 additions & 5 deletions pkg/storage/replica_sideload.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,15 @@ func maybeSideloadEntriesImpl(
strippedCmd.ReplicatedEvalResult.AddSSTable.Data = nil

{
var err error
data, err = protoutil.Marshal(&strippedCmd)
data = make([]byte, raftCommandPrefixLen+strippedCmd.Size())
encodeRaftCommandPrefix(data[:raftCommandPrefixLen], raftVersionSideloaded, cmdID)
_, err := protoutil.MarshalToWithoutFuzzing(&strippedCmd, data[raftCommandPrefixLen:])
if err != nil {
return nil, 0, errors.Wrap(err, "while marshaling stripped sideloaded command")
}
ent.Data = data
}

ent.Data = encodeRaftCommandV2(cmdID, data)
log.Eventf(ctx, "writing payload at index=%d term=%d", ent.Index, ent.Term)
if err = sideloaded.Put(ctx, ent.Index, ent.Term, dataToSideload); err != nil {
return nil, 0, err
Expand Down Expand Up @@ -226,11 +227,13 @@ func maybeInlineSideloadedRaftCommand(
}
command.ReplicatedEvalResult.AddSSTable.Data = sideloadedData
{
data, err := protoutil.Marshal(&command)
data := make([]byte, raftCommandPrefixLen+command.Size())
encodeRaftCommandPrefix(data[:raftCommandPrefixLen], raftVersionSideloaded, cmdID)
_, err := protoutil.MarshalToWithoutFuzzing(&command, data[raftCommandPrefixLen:])
if err != nil {
return nil, err
}
ent.Data = encodeRaftCommandV2(cmdID, data)
ent.Data = data
}
return &ent, nil
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/split_delay_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ func (sdh *splitDelayHelper) ProposeEmptyCommand(ctx context.Context) {
_ = r.withRaftGroup(true /* campaignOnWake */, func(rawNode *raft.RawNode) (bool, error) {
// NB: intentionally ignore the error (which can be ErrProposalDropped
// when there's an SST inflight).
_ = rawNode.Propose(encodeRaftCommandV1(makeIDKey(), nil))
data := encodeRaftCommand(raftVersionStandard, makeIDKey(), nil)
_ = rawNode.Propose(data)
// NB: we need to unquiesce as the group might be quiesced.
return true /* unquiesceAndWakeLeader */, nil
})
Expand Down