Skip to content

Commit

Permalink
raftlog: introduce EntryEncoding{Standard,Sideloaded}WithAC
Browse files Browse the repository at this point in the history
Part of #95563. Predecessor to #95637.

This commit introduces two new encodings for raft log entries,
EntryEncoding{Standard,Sideloaded}WithAC. Raft log entries have prefix
byte that informs decoding routines how to interpret the subsequent
bytes. To date we've had two, EntryEncoding{Standard,Sideloaded}[^1], to
indicate whether the entry came with sideloaded data[^2]. Our two
additions here will be used to indicate whether the particular entry is
subject to replication admission control. If so, right as we persist
entries into the raft log storage, we'll "admit the work without
blocking", which is further explained in #95637.

The decision to use replication admission control happens above raft and
a per-entry basis. If using replication admission control, AC-specific
metadata will be plumbed down as part of the marshaled raft command.
This too is explained in in #95637, specifically, the
'RaftAdmissionMeta' section. This commit then adds an unused version
gate (V23_1UseEncodingWithBelowRaftAdmissionData) to use replication
admission control. Since we're using a different prefix byte for raft
commands (see EntryEncodings above), one not recognized in earlier CRDB
versions, we need explicit versioning. We add it out of development
convenience -- adding version gates is most prone to merge conflicts. We
expect to use it shortly, before alpha/beta cuts.

[^1]: Now renamed to EntryEncoding{Standard,Sideloaded}WithoutAC.
[^2]: These are typically AddSSTs, the storage for which is treated
      differently for performance reasons.

Release note: None
  • Loading branch information
irfansharif committed Jan 24, 2023
1 parent b21379b commit 4bcb8c7
Show file tree
Hide file tree
Showing 18 changed files with 175 additions and 102 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -297,4 +297,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 1000022.2-30 set the active cluster version in the format '<major>.<minor>'
version version 1000022.2-32 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,6 @@
<tr><td><div id="setting-trace-opentelemetry-collector" class="anchored"><code>trace.opentelemetry.collector</code></div></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 4317 will be used.</td></tr>
<tr><td><div id="setting-trace-span-registry-enabled" class="anchored"><code>trace.span_registry.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://&lt;ui&gt;/#/debug/tracez</td></tr>
<tr><td><div id="setting-trace-zipkin-collector" class="anchored"><code>trace.zipkin.collector</code></div></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 9411 will be used.</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000022.2-30</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000022.2-32</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td></tr>
</tbody>
</table>
10 changes: 10 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,12 @@ const (
// chagnefeeds created prior to this version.
V23_1_ChangefeedExpressionProductionReady

// V23_1UseEncodingWithBelowRaftAdmissionData enables the use of raft
// command encodings that include below-raft admission control data.
//
// TODO(irfansharif): Actually use this.
V23_1UseEncodingWithBelowRaftAdmissionData

// *************************************************
// Step (1): Add new versions here.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -687,6 +693,10 @@ var rawVersionsSingleton = keyedVersions{
Key: V23_1_ChangefeedExpressionProductionReady,
Version: roachpb.Version{Major: 22, Minor: 2, Internal: 30},
},
{
Key: V23_1UseEncodingWithBelowRaftAdmissionData,
Version: roachpb.Version{Major: 22, Minor: 2, Internal: 32},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/logstore/logstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ func LoadTerm(
if err != nil {
return 0, err
}
if typ != raftlog.EntryEncodingSideloaded {
if !typ.IsSideloaded() {
eCache.Add(rangeID, []raftpb.Entry{entry}, false /* truncate */)
}
return entry.Term, nil
Expand Down Expand Up @@ -405,7 +405,7 @@ func LoadEntries(
if err != nil {
return err
}
if typ == raftlog.EntryEncodingSideloaded {
if typ.IsSideloaded() {
newEnt, err := MaybeInlineSideloadedRaftCommand(
ctx, rangeID, ent, sideloaded, eCache,
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/logstore/logstore_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func runBenchmarkLogStore_StoreEntries(b *testing.B, bytes int64) {
Term: 1,
Index: 1,
Type: raftpb.EntryNormal,
Data: raftlog.EncodeRaftCommand(raftlog.EntryEncodingStandardPrefixByte, "deadbeef", data),
Data: raftlog.EncodeRaftCommand(raftlog.EntryEncodingStandardWithoutAC, "deadbeef", data),
})
stats := &AppendStats{}

Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/logstore/sideload.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func MaybeSideloadEntries(
if err != nil {
return nil, 0, 0, 0, err
}
if typ != raftlog.EntryEncodingSideloaded {
if !typ.IsSideloaded() {
otherEntriesSize += int64(len(input[i].Data))
continue
}
Expand Down Expand Up @@ -124,7 +124,7 @@ func MaybeSideloadEntries(
// TODO(tbg): this should be supported by a method as well.
{
data := make([]byte, raftlog.RaftCommandPrefixLen+e.Cmd.Size())
raftlog.EncodeRaftCommandPrefix(data[:raftlog.RaftCommandPrefixLen], raftlog.EntryEncodingSideloadedPrefixByte, e.ID)
raftlog.EncodeRaftCommandPrefix(data[:raftlog.RaftCommandPrefixLen], typ, e.ID)
_, err := protoutil.MarshalTo(&e.Cmd, data[raftlog.RaftCommandPrefixLen:])
if err != nil {
return nil, 0, 0, 0, errors.Wrap(err, "while marshaling stripped sideloaded command")
Expand Down Expand Up @@ -165,7 +165,7 @@ func MaybeInlineSideloadedRaftCommand(
if err != nil {
return nil, err
}
if typ != raftlog.EntryEncodingSideloaded {
if !typ.IsSideloaded() {
return nil, nil
}
log.Event(ctx, "inlining sideloaded SSTable")
Expand Down Expand Up @@ -213,7 +213,7 @@ func MaybeInlineSideloadedRaftCommand(
// the EntryEncoding.
{
data := make([]byte, raftlog.RaftCommandPrefixLen+e.Cmd.Size())
raftlog.EncodeRaftCommandPrefix(data[:raftlog.RaftCommandPrefixLen], raftlog.EntryEncodingSideloadedPrefixByte, e.ID)
raftlog.EncodeRaftCommandPrefix(data[:raftlog.RaftCommandPrefixLen], typ, e.ID)
_, err := protoutil.MarshalTo(&e.Cmd, data[raftlog.RaftCommandPrefixLen:])
if err != nil {
return nil, err
Expand All @@ -232,7 +232,7 @@ func AssertSideloadedRaftCommandInlined(ctx context.Context, ent *raftpb.Entry)
if err != nil {
log.Fatalf(ctx, "%v", err)
}
if typ != raftlog.EntryEncodingSideloaded {
if !typ.IsSideloaded() {
return
}

Expand Down
16 changes: 8 additions & 8 deletions pkg/kv/kvserver/logstore/sideload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func mustEntryEq(t testing.TB, l, r raftpb.Entry) {
}

func mkEnt(
v byte, index, term uint64, as *kvserverpb.ReplicatedEvalResult_AddSSTable,
enc raftlog.EntryEncoding, index, term uint64, as *kvserverpb.ReplicatedEvalResult_AddSSTable,
) raftpb.Entry {
cmdIDKey := strings.Repeat("x", raftlog.RaftCommandIDLen)
var cmd kvserverpb.RaftCommand
Expand All @@ -62,7 +62,7 @@ func mkEnt(
}
var ent raftpb.Entry
ent.Index, ent.Term = index, term
ent.Data = raftlog.EncodeRaftCommand(v, kvserverbase.CmdIDKey(cmdIDKey), b)
ent.Data = raftlog.EncodeRaftCommand(enc, kvserverbase.CmdIDKey(cmdIDKey), b)
return ent
}

Expand Down Expand Up @@ -355,7 +355,7 @@ func TestRaftSSTableSideloadingInline(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

v1, v2 := raftlog.EntryEncodingStandardPrefixByte, raftlog.EntryEncodingSideloadedPrefixByte
v1, v2 := raftlog.EntryEncodingStandardWithAC, raftlog.EntryEncodingSideloadedWithAC
rangeID := roachpb.RangeID(1)

type testCase struct {
Expand Down Expand Up @@ -477,11 +477,11 @@ func TestRaftSSTableSideloadingSideload(t *testing.T) {
addSSTStripped := addSST
addSSTStripped.Data = nil

entV1Reg := mkEnt(raftlog.EntryEncodingStandardPrefixByte, 10, 99, nil)
entV1SST := mkEnt(raftlog.EntryEncodingStandardPrefixByte, 11, 99, &addSST)
entV2Reg := mkEnt(raftlog.EntryEncodingSideloadedPrefixByte, 12, 99, nil)
entV2SST := mkEnt(raftlog.EntryEncodingSideloadedPrefixByte, 13, 99, &addSST)
entV2SSTStripped := mkEnt(raftlog.EntryEncodingSideloadedPrefixByte, 13, 99, &addSSTStripped)
entV1Reg := mkEnt(raftlog.EntryEncodingStandardWithAC, 10, 99, nil)
entV1SST := mkEnt(raftlog.EntryEncodingStandardWithAC, 11, 99, &addSST)
entV2Reg := mkEnt(raftlog.EntryEncodingSideloadedWithAC, 12, 99, nil)
entV2SST := mkEnt(raftlog.EntryEncodingSideloadedWithAC, 13, 99, &addSST)
entV2SSTStripped := mkEnt(raftlog.EntryEncodingSideloadedWithAC, 13, 99, &addSSTStripped)

type tc struct {
name string
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/loqrecovery/recovery_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func raftLogFromPendingDescriptorUpdate(
t.Fatalf("failed to serialize raftCommand: %v", err)
}
data := raftlog.EncodeRaftCommand(
raftlog.EntryEncodingStandardPrefixByte, kvserverbase.CmdIDKey(fmt.Sprintf("%08d", entryIndex)), out)
raftlog.EntryEncodingStandardWithoutAC, kvserverbase.CmdIDKey(fmt.Sprintf("%08d", entryIndex)), out)
ent := raftpb.Entry{
Term: 1,
Index: replica.RaftCommittedIndex + entryIndex,
Expand Down
13 changes: 8 additions & 5 deletions pkg/kv/kvserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,9 @@ func raftEntryFormatter(data []byte) string {
}
// NB: a raft.EntryFormatter is only invoked for EntryNormal (raft methods
// that call this take care of unwrapping the ConfChange), and since
// len(data)>0 it has to be EntryEncodingStandard or EntryEncodingSideloaded and
// they are encoded identically.
cmdID, data := raftlog.DecomposeRaftVersionStandardOrSideloaded(data)
// len(data)>0 it has to be {Deprecated,}EntryEncoding{Standard,Sideloaded}
// and they are encoded identically.
cmdID, data := raftlog.DecomposeRaftEncodingStandardOrSideloaded(data)
return fmt.Sprintf("[%x] [%d]", cmdID, len(data))
}

Expand Down Expand Up @@ -274,8 +274,11 @@ func extractIDs(ids []kvserverbase.CmdIDKey, ents []raftpb.Entry) []kvserverbase
continue
}
switch typ {
case raftlog.EntryEncodingStandard, raftlog.EntryEncodingSideloaded:
id, _ := raftlog.DecomposeRaftVersionStandardOrSideloaded(e.Data)
case raftlog.EntryEncodingStandardWithAC,
raftlog.EntryEncodingSideloadedWithAC,
raftlog.EntryEncodingStandardWithoutAC,
raftlog.EntryEncodingSideloadedWithoutAC:
id, _ := raftlog.DecomposeRaftEncodingStandardOrSideloaded(e.Data)
ids = append(ids, id)
case raftlog.EntryEncodingRaftConfChange, raftlog.EntryEncodingRaftConfChangeV2:
// Configuration changes don't have the CmdIDKey easily accessible but are
Expand Down
139 changes: 92 additions & 47 deletions pkg/kv/kvserver/raftlog/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,83 +23,128 @@ import (
// and, in some cases, the first byte of the Entry's Data payload.
type EntryEncoding byte

// TODO(tbg): use auto-assigned consts (iota) for the encodings below, since
// they aren't on the wire.

const (
// EntryEncodingStandard is the default encoding for a CockroachDB raft log
// entry.
// EntryEncodingEmpty is an empty entry. These are used by raft after
// leader election. Since they hold no data, there is nothing in them to
// decode.
EntryEncodingEmpty EntryEncoding = iota
// EntryEncodingStandardWithAC is the default encoding for a CockroachDB
// raft log entry.
//
// This is a raftpb.Entry of type EntryNormal whose Data slice is either empty
// or whose first byte matches EntryEncodingStandardPrefixByte. The subsequent
// eight bytes represent a CmdIDKey. The remaining bytes represent a
// kvserverpb.RaftCommand.
EntryEncodingStandard EntryEncoding = 0
// EntryEncodingSideloaded indicates a proposal representing the result of a
// roachpb.AddSSTableRequest for which the payload (the SST) is stored outside
// the storage engine to improve storage performance.
// This is a raftpb.Entry of type EntryNormal whose Data slice is either
// empty or whose first byte matches entryEncodingStandardWithACPrefixByte.
// The subsequent eight bytes represent a CmdIDKey. The remaining bytes
// represent a kvserverpb.RaftCommand that also includes below-raft
// admission control metadata.
EntryEncodingStandardWithAC
// EntryEncodingSideloadedWithAC indicates a proposal representing the
// result of a roachpb.AddSSTableRequest for which the payload (the SST) is
// stored outside the storage engine to improve storage performance.
//
// This is a raftpb.Entry of type EntryNormal whose data slice is either empty
// or whose first byte matches EntryEncodingSideloadedPrefixByte. The subsequent
// eight bytes represent a CmdIDKey. The remaining bytes represent a
// This is a raftpb.Entry of type EntryNormal whose data slice is either
// empty or whose first byte matches
// entryEncodingSideloadedWithACPrefixByte. The subsequent eight bytes
// represent a CmdIDKey. The remaining bytes represent a
// kvserverpb.RaftCommand whose kvserverpb.ReplicatedEvalResult holds a
// nontrival kvserverpb.ReplicatedEvalResult_AddSSTable, the Data field of
// which is an SST to be ingested (and which is present in memory but made
// durable via direct storage on the filesystem, bypassing the storage
// engine).
EntryEncodingSideloaded EntryEncoding = 1
// EntryEncodingEmpty is an empty entry. These are used by raft after
// leader election. Since they hold no data, there is nothing in them to
// decode.
EntryEncodingEmpty EntryEncoding = 253
// engine). The kvserverpb.RaftCommand also includes below-raft admission
// control metadata.
EntryEncodingSideloadedWithAC
// EntryEncodingStandardWithoutAC is like EntryEncodingStandardWithAC but
// without below-raft admission metadata.
EntryEncodingStandardWithoutAC
// EntryEncodingSideloadedWithoutAC is like EntryEncodingStandardWithoutAC
// but without below-raft admission metadata.
EntryEncodingSideloadedWithoutAC
// EntryEncodingRaftConfChange is a raftpb.Entry whose raftpb.EntryType is
// raftpb.EntryConfChange. The Entry's Data field holds a raftpb.ConfChange
// whose Context field is a kvserverpb.ConfChangeContext whose Payload is a
// kvserverpb.RaftCommand. In particular, the CmdIDKey requires a round of
// protobuf unmarshaling.
EntryEncodingRaftConfChange EntryEncoding = 254
// EntryEncodingRaftConfChangeV2 is analogous to EntryEncodingRaftConfChange, with
// the replacements raftpb.EntryConfChange{,V2} and raftpb.ConfChange{,V2}
// applied.
EntryEncodingRaftConfChangeV2 EntryEncoding = 255
EntryEncodingRaftConfChange
// EntryEncodingRaftConfChangeV2 is analogous to
// EntryEncodingRaftConfChange, with the replacements
// raftpb.EntryConfChange{,V2} and raftpb.ConfChange{,V2} applied.
EntryEncodingRaftConfChangeV2
)

// IsSideloaded returns true if the encoding is
// EntryEncodingSideloadedWith{,out}AC.
func (enc EntryEncoding) IsSideloaded() bool {
return enc == EntryEncodingSideloadedWithAC || enc == EntryEncodingSideloadedWithoutAC
}

// UsesAdmissionControl returns true if the encoding is
// EntryEncoding{Standard,Sideloaded}WithAC.
func (enc EntryEncoding) UsesAdmissionControl() bool {
return enc == EntryEncodingStandardWithAC || enc == EntryEncodingSideloadedWithAC
}

// prefixByte returns the prefix byte used during encoding, applicable only to
// EntryEncoding{Standard,Sideloaded}With{,out}AC.
func (enc EntryEncoding) prefixByte() byte {
switch enc {
case EntryEncodingStandardWithAC:
return entryEncodingStandardWithACPrefixByte
case EntryEncodingSideloadedWithAC:
return entryEncodingSideloadedWithACPrefixByte
case EntryEncodingStandardWithoutAC:
return entryEncodingStandardWithoutACPrefixByte
case EntryEncodingSideloadedWithoutAC:
return entryEncodingSideloadedWithoutACPrefixByte
default:
panic(fmt.Sprintf("invalid encoding: %v has no prefix byte", enc))
}
}

const (
// entryEncodingStandardWithACPrefixByte is the first byte of a
// raftpb.Entry's Data slice for an Entry of encoding
// EntryEncodingStandardWithAC.
entryEncodingStandardWithACPrefixByte = byte(2) // 0b00000010
// entryEncodingSideloadedWithACPrefixByte is the first byte of a
// raftpb.Entry's Data slice for an Entry of encoding
// EntryEncodingSideloadedWithAC.
entryEncodingSideloadedWithACPrefixByte = byte(3) // 0b00000011
// entryEncodingStandardWithoutACPrefixByte is the first byte of a
// raftpb.Entry's Data slice for an Entry of encoding
// EntryEncodingStandardWithoutAC.
entryEncodingStandardWithoutACPrefixByte = byte(0) // 0b00000000
// entryEncodingSideloadedWithoutACPrefixByte is the first byte of a
// raftpb.Entry's Data slice for an Entry of encoding
// EntryEncodingSideloadedWithoutAC.
entryEncodingSideloadedWithoutACPrefixByte = byte(1) // 0b00000001
)

// TODO(tbg): when we have a good library for encoding entries, these should
// no longer be exported.
const (
// RaftCommandIDLen is the length of a command ID.
RaftCommandIDLen = 8
// RaftCommandPrefixLen is the length of the prefix of raft entries that
// use the EntryEncodingStandard or EntryEncodingSideloaded encodings. The
// bytes after the prefix represent the kvserverpb.RaftCommand.
//
// RaftCommandPrefixLen is the length of the prefix of raft entries that use
// the EntryEncoding{Standard,Sideloaded}With{,out}AC encodings. The bytes
// after the prefix represent the kvserverpb.RaftCommand.
RaftCommandPrefixLen = 1 + RaftCommandIDLen
// EntryEncodingStandardPrefixByte is the first byte of a raftpb.Entry's
// Data slice for an Entry of encoding EntryEncodingStandard.
EntryEncodingStandardPrefixByte = byte(0)
// EntryEncodingSideloadedPrefixByte is the first byte of a raftpb.Entry's Data
// slice for an Entry of encoding EntryEncodingSideloaded.
EntryEncodingSideloadedPrefixByte = byte(1)
)

// EncodeRaftCommand encodes a raft command of type EntryEncodingStandard or
// EntryEncodingSideloaded.
func EncodeRaftCommand(prefixByte byte, commandID kvserverbase.CmdIDKey, command []byte) []byte {
// EncodeRaftCommand encodes a marshaled kvserverpb.RaftCommand using
// the given encoding (one of EntryEncoding{Standard,Sideloaded}With{,out}AC).
func EncodeRaftCommand(enc EntryEncoding, commandID kvserverbase.CmdIDKey, command []byte) []byte {
b := make([]byte, RaftCommandPrefixLen+len(command))
EncodeRaftCommandPrefix(b[:RaftCommandPrefixLen], prefixByte, commandID)
EncodeRaftCommandPrefix(b[:RaftCommandPrefixLen], enc, commandID)
copy(b[RaftCommandPrefixLen:], command)
return b
}

// EncodeRaftCommandPrefix encodes the prefix for a Raft command of type
// EntryEncodingStandard or EntryEncodingSideloaded.
func EncodeRaftCommandPrefix(b []byte, prefixByte byte, commandID kvserverbase.CmdIDKey) {
// EncodeRaftCommandPrefix encodes the prefix for a Raft command, using the
// given encoding (one of EntryEncoding{Standard,Sideloaded}With{,out}AC).
func EncodeRaftCommandPrefix(b []byte, enc EntryEncoding, commandID kvserverbase.CmdIDKey) {
if len(commandID) != RaftCommandIDLen {
panic(fmt.Sprintf("invalid command ID length; %d != %d", len(commandID), RaftCommandIDLen))
}
if len(b) != RaftCommandPrefixLen {
panic(fmt.Sprintf("invalid command prefix length; %d != %d", len(b), RaftCommandPrefixLen))
}
b[0] = prefixByte
b[0] = enc.prefixByte()
copy(b[1:], commandID)
}
Loading

0 comments on commit 4bcb8c7

Please sign in to comment.