From 4bcb8c73b25e3624d753330390fab4fd36d52cf7 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Tue, 24 Jan 2023 09:37:21 -0500 Subject: [PATCH] raftlog: introduce EntryEncoding{Standard,Sideloaded}WithAC Part of #95563. Predecessor to #95637. This commit introduces two new encodings for raft log entries, EntryEncoding{Standard,Sideloaded}WithAC. Raft log entries have prefix byte that informs decoding routines how to interpret the subsequent bytes. To date we've had two, EntryEncoding{Standard,Sideloaded}[^1], to indicate whether the entry came with sideloaded data[^2]. Our two additions here will be used to indicate whether the particular entry is subject to replication admission control. If so, right as we persist entries into the raft log storage, we'll "admit the work without blocking", which is further explained in #95637. The decision to use replication admission control happens above raft and a per-entry basis. If using replication admission control, AC-specific metadata will be plumbed down as part of the marshaled raft command. This too is explained in in #95637, specifically, the 'RaftAdmissionMeta' section. This commit then adds an unused version gate (V23_1UseEncodingWithBelowRaftAdmissionData) to use replication admission control. Since we're using a different prefix byte for raft commands (see EntryEncodings above), one not recognized in earlier CRDB versions, we need explicit versioning. We add it out of development convenience -- adding version gates is most prone to merge conflicts. We expect to use it shortly, before alpha/beta cuts. [^1]: Now renamed to EntryEncoding{Standard,Sideloaded}WithoutAC. [^2]: These are typically AddSSTs, the storage for which is treated differently for performance reasons. Release note: None --- .../settings/settings-for-tenants.txt | 2 +- docs/generated/settings/settings.html | 2 +- pkg/clusterversion/cockroach_versions.go | 10 ++ pkg/kv/kvserver/logstore/logstore.go | 4 +- .../kvserver/logstore/logstore_bench_test.go | 2 +- pkg/kv/kvserver/logstore/sideload.go | 10 +- pkg/kv/kvserver/logstore/sideload_test.go | 16 +- .../kvserver/loqrecovery/recovery_env_test.go | 2 +- pkg/kv/kvserver/raft.go | 13 +- pkg/kv/kvserver/raftlog/encoding.go | 139 ++++++++++++------ pkg/kv/kvserver/raftlog/entry.go | 39 +++-- pkg/kv/kvserver/raftlog/entry_test.go | 2 +- pkg/kv/kvserver/raftlog/iter_bench_test.go | 18 ++- pkg/kv/kvserver/raftlog/iter_test.go | 6 +- pkg/kv/kvserver/replica_proposal_buf_test.go | 2 +- pkg/kv/kvserver/replica_raft.go | 6 +- pkg/kv/kvserver/replica_raft_quiesce.go | 2 +- pkg/kv/kvserver/replica_sideload_test.go | 2 +- 18 files changed, 175 insertions(+), 102 deletions(-) diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 1d256d3e947e..529b3cafc7d2 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -297,4 +297,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https:///#/debug/tracez trace.zipkin.collector string the address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -version version 1000022.2-30 set the active cluster version in the format '.' +version version 1000022.2-32 set the active cluster version in the format '.' diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index bb9b50924c4f..0ef19edc2bac 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -235,6 +235,6 @@
trace.opentelemetry.collector
stringaddress of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.span_registry.enabled
booleantrueif set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector
stringthe address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used. -
version
version1000022.2-30set the active cluster version in the format '<major>.<minor>' +
version
version1000022.2-32set the active cluster version in the format '<major>.<minor>' diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index 92bb5cc6d747..d6a0cbbbfa49 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -401,6 +401,12 @@ const ( // chagnefeeds created prior to this version. V23_1_ChangefeedExpressionProductionReady + // V23_1UseEncodingWithBelowRaftAdmissionData enables the use of raft + // command encodings that include below-raft admission control data. + // + // TODO(irfansharif): Actually use this. + V23_1UseEncodingWithBelowRaftAdmissionData + // ************************************************* // Step (1): Add new versions here. // Do not add new versions to a patch release. @@ -687,6 +693,10 @@ var rawVersionsSingleton = keyedVersions{ Key: V23_1_ChangefeedExpressionProductionReady, Version: roachpb.Version{Major: 22, Minor: 2, Internal: 30}, }, + { + Key: V23_1UseEncodingWithBelowRaftAdmissionData, + Version: roachpb.Version{Major: 22, Minor: 2, Internal: 32}, + }, // ************************************************* // Step (2): Add new versions here. diff --git a/pkg/kv/kvserver/logstore/logstore.go b/pkg/kv/kvserver/logstore/logstore.go index e53e9c17f756..f1784431b0f5 100644 --- a/pkg/kv/kvserver/logstore/logstore.go +++ b/pkg/kv/kvserver/logstore/logstore.go @@ -328,7 +328,7 @@ func LoadTerm( if err != nil { return 0, err } - if typ != raftlog.EntryEncodingSideloaded { + if !typ.IsSideloaded() { eCache.Add(rangeID, []raftpb.Entry{entry}, false /* truncate */) } return entry.Term, nil @@ -405,7 +405,7 @@ func LoadEntries( if err != nil { return err } - if typ == raftlog.EntryEncodingSideloaded { + if typ.IsSideloaded() { newEnt, err := MaybeInlineSideloadedRaftCommand( ctx, rangeID, ent, sideloaded, eCache, ) diff --git a/pkg/kv/kvserver/logstore/logstore_bench_test.go b/pkg/kv/kvserver/logstore/logstore_bench_test.go index e2e31c257ff8..0c3f8e0473e2 100644 --- a/pkg/kv/kvserver/logstore/logstore_bench_test.go +++ b/pkg/kv/kvserver/logstore/logstore_bench_test.go @@ -76,7 +76,7 @@ func runBenchmarkLogStore_StoreEntries(b *testing.B, bytes int64) { Term: 1, Index: 1, Type: raftpb.EntryNormal, - Data: raftlog.EncodeRaftCommand(raftlog.EntryEncodingStandardPrefixByte, "deadbeef", data), + Data: raftlog.EncodeRaftCommand(raftlog.EntryEncodingStandardWithoutAC, "deadbeef", data), }) stats := &AppendStats{} diff --git a/pkg/kv/kvserver/logstore/sideload.go b/pkg/kv/kvserver/logstore/sideload.go index 9b6f60f0e899..2c21a24fae44 100644 --- a/pkg/kv/kvserver/logstore/sideload.go +++ b/pkg/kv/kvserver/logstore/sideload.go @@ -86,7 +86,7 @@ func MaybeSideloadEntries( if err != nil { return nil, 0, 0, 0, err } - if typ != raftlog.EntryEncodingSideloaded { + if !typ.IsSideloaded() { otherEntriesSize += int64(len(input[i].Data)) continue } @@ -124,7 +124,7 @@ func MaybeSideloadEntries( // TODO(tbg): this should be supported by a method as well. { data := make([]byte, raftlog.RaftCommandPrefixLen+e.Cmd.Size()) - raftlog.EncodeRaftCommandPrefix(data[:raftlog.RaftCommandPrefixLen], raftlog.EntryEncodingSideloadedPrefixByte, e.ID) + raftlog.EncodeRaftCommandPrefix(data[:raftlog.RaftCommandPrefixLen], typ, e.ID) _, err := protoutil.MarshalTo(&e.Cmd, data[raftlog.RaftCommandPrefixLen:]) if err != nil { return nil, 0, 0, 0, errors.Wrap(err, "while marshaling stripped sideloaded command") @@ -165,7 +165,7 @@ func MaybeInlineSideloadedRaftCommand( if err != nil { return nil, err } - if typ != raftlog.EntryEncodingSideloaded { + if !typ.IsSideloaded() { return nil, nil } log.Event(ctx, "inlining sideloaded SSTable") @@ -213,7 +213,7 @@ func MaybeInlineSideloadedRaftCommand( // the EntryEncoding. { data := make([]byte, raftlog.RaftCommandPrefixLen+e.Cmd.Size()) - raftlog.EncodeRaftCommandPrefix(data[:raftlog.RaftCommandPrefixLen], raftlog.EntryEncodingSideloadedPrefixByte, e.ID) + raftlog.EncodeRaftCommandPrefix(data[:raftlog.RaftCommandPrefixLen], typ, e.ID) _, err := protoutil.MarshalTo(&e.Cmd, data[raftlog.RaftCommandPrefixLen:]) if err != nil { return nil, err @@ -232,7 +232,7 @@ func AssertSideloadedRaftCommandInlined(ctx context.Context, ent *raftpb.Entry) if err != nil { log.Fatalf(ctx, "%v", err) } - if typ != raftlog.EntryEncodingSideloaded { + if !typ.IsSideloaded() { return } diff --git a/pkg/kv/kvserver/logstore/sideload_test.go b/pkg/kv/kvserver/logstore/sideload_test.go index 4d9cc93e36f1..9867e1b20870 100644 --- a/pkg/kv/kvserver/logstore/sideload_test.go +++ b/pkg/kv/kvserver/logstore/sideload_test.go @@ -51,7 +51,7 @@ func mustEntryEq(t testing.TB, l, r raftpb.Entry) { } func mkEnt( - v byte, index, term uint64, as *kvserverpb.ReplicatedEvalResult_AddSSTable, + enc raftlog.EntryEncoding, index, term uint64, as *kvserverpb.ReplicatedEvalResult_AddSSTable, ) raftpb.Entry { cmdIDKey := strings.Repeat("x", raftlog.RaftCommandIDLen) var cmd kvserverpb.RaftCommand @@ -62,7 +62,7 @@ func mkEnt( } var ent raftpb.Entry ent.Index, ent.Term = index, term - ent.Data = raftlog.EncodeRaftCommand(v, kvserverbase.CmdIDKey(cmdIDKey), b) + ent.Data = raftlog.EncodeRaftCommand(enc, kvserverbase.CmdIDKey(cmdIDKey), b) return ent } @@ -355,7 +355,7 @@ func TestRaftSSTableSideloadingInline(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - v1, v2 := raftlog.EntryEncodingStandardPrefixByte, raftlog.EntryEncodingSideloadedPrefixByte + v1, v2 := raftlog.EntryEncodingStandardWithAC, raftlog.EntryEncodingSideloadedWithAC rangeID := roachpb.RangeID(1) type testCase struct { @@ -477,11 +477,11 @@ func TestRaftSSTableSideloadingSideload(t *testing.T) { addSSTStripped := addSST addSSTStripped.Data = nil - entV1Reg := mkEnt(raftlog.EntryEncodingStandardPrefixByte, 10, 99, nil) - entV1SST := mkEnt(raftlog.EntryEncodingStandardPrefixByte, 11, 99, &addSST) - entV2Reg := mkEnt(raftlog.EntryEncodingSideloadedPrefixByte, 12, 99, nil) - entV2SST := mkEnt(raftlog.EntryEncodingSideloadedPrefixByte, 13, 99, &addSST) - entV2SSTStripped := mkEnt(raftlog.EntryEncodingSideloadedPrefixByte, 13, 99, &addSSTStripped) + entV1Reg := mkEnt(raftlog.EntryEncodingStandardWithAC, 10, 99, nil) + entV1SST := mkEnt(raftlog.EntryEncodingStandardWithAC, 11, 99, &addSST) + entV2Reg := mkEnt(raftlog.EntryEncodingSideloadedWithAC, 12, 99, nil) + entV2SST := mkEnt(raftlog.EntryEncodingSideloadedWithAC, 13, 99, &addSST) + entV2SSTStripped := mkEnt(raftlog.EntryEncodingSideloadedWithAC, 13, 99, &addSSTStripped) type tc struct { name string diff --git a/pkg/kv/kvserver/loqrecovery/recovery_env_test.go b/pkg/kv/kvserver/loqrecovery/recovery_env_test.go index b0bf5a05cb1a..f90dd1072d91 100644 --- a/pkg/kv/kvserver/loqrecovery/recovery_env_test.go +++ b/pkg/kv/kvserver/loqrecovery/recovery_env_test.go @@ -380,7 +380,7 @@ func raftLogFromPendingDescriptorUpdate( t.Fatalf("failed to serialize raftCommand: %v", err) } data := raftlog.EncodeRaftCommand( - raftlog.EntryEncodingStandardPrefixByte, kvserverbase.CmdIDKey(fmt.Sprintf("%08d", entryIndex)), out) + raftlog.EntryEncodingStandardWithoutAC, kvserverbase.CmdIDKey(fmt.Sprintf("%08d", entryIndex)), out) ent := raftpb.Entry{ Term: 1, Index: replica.RaftCommittedIndex + entryIndex, diff --git a/pkg/kv/kvserver/raft.go b/pkg/kv/kvserver/raft.go index 8ba284ad6e06..6e1b6fdb62a1 100644 --- a/pkg/kv/kvserver/raft.go +++ b/pkg/kv/kvserver/raft.go @@ -221,9 +221,9 @@ func raftEntryFormatter(data []byte) string { } // NB: a raft.EntryFormatter is only invoked for EntryNormal (raft methods // that call this take care of unwrapping the ConfChange), and since - // len(data)>0 it has to be EntryEncodingStandard or EntryEncodingSideloaded and - // they are encoded identically. - cmdID, data := raftlog.DecomposeRaftVersionStandardOrSideloaded(data) + // len(data)>0 it has to be {Deprecated,}EntryEncoding{Standard,Sideloaded} + // and they are encoded identically. + cmdID, data := raftlog.DecomposeRaftEncodingStandardOrSideloaded(data) return fmt.Sprintf("[%x] [%d]", cmdID, len(data)) } @@ -274,8 +274,11 @@ func extractIDs(ids []kvserverbase.CmdIDKey, ents []raftpb.Entry) []kvserverbase continue } switch typ { - case raftlog.EntryEncodingStandard, raftlog.EntryEncodingSideloaded: - id, _ := raftlog.DecomposeRaftVersionStandardOrSideloaded(e.Data) + case raftlog.EntryEncodingStandardWithAC, + raftlog.EntryEncodingSideloadedWithAC, + raftlog.EntryEncodingStandardWithoutAC, + raftlog.EntryEncodingSideloadedWithoutAC: + id, _ := raftlog.DecomposeRaftEncodingStandardOrSideloaded(e.Data) ids = append(ids, id) case raftlog.EntryEncodingRaftConfChange, raftlog.EntryEncodingRaftConfChangeV2: // Configuration changes don't have the CmdIDKey easily accessible but are diff --git a/pkg/kv/kvserver/raftlog/encoding.go b/pkg/kv/kvserver/raftlog/encoding.go index dd52176a0607..988e8a063299 100644 --- a/pkg/kv/kvserver/raftlog/encoding.go +++ b/pkg/kv/kvserver/raftlog/encoding.go @@ -23,83 +23,128 @@ import ( // and, in some cases, the first byte of the Entry's Data payload. type EntryEncoding byte -// TODO(tbg): use auto-assigned consts (iota) for the encodings below, since -// they aren't on the wire. - const ( - // EntryEncodingStandard is the default encoding for a CockroachDB raft log - // entry. + // EntryEncodingEmpty is an empty entry. These are used by raft after + // leader election. Since they hold no data, there is nothing in them to + // decode. + EntryEncodingEmpty EntryEncoding = iota + // EntryEncodingStandardWithAC is the default encoding for a CockroachDB + // raft log entry. // - // This is a raftpb.Entry of type EntryNormal whose Data slice is either empty - // or whose first byte matches EntryEncodingStandardPrefixByte. The subsequent - // eight bytes represent a CmdIDKey. The remaining bytes represent a - // kvserverpb.RaftCommand. - EntryEncodingStandard EntryEncoding = 0 - // EntryEncodingSideloaded indicates a proposal representing the result of a - // roachpb.AddSSTableRequest for which the payload (the SST) is stored outside - // the storage engine to improve storage performance. + // This is a raftpb.Entry of type EntryNormal whose Data slice is either + // empty or whose first byte matches entryEncodingStandardWithACPrefixByte. + // The subsequent eight bytes represent a CmdIDKey. The remaining bytes + // represent a kvserverpb.RaftCommand that also includes below-raft + // admission control metadata. + EntryEncodingStandardWithAC + // EntryEncodingSideloadedWithAC indicates a proposal representing the + // result of a roachpb.AddSSTableRequest for which the payload (the SST) is + // stored outside the storage engine to improve storage performance. // - // This is a raftpb.Entry of type EntryNormal whose data slice is either empty - // or whose first byte matches EntryEncodingSideloadedPrefixByte. The subsequent - // eight bytes represent a CmdIDKey. The remaining bytes represent a + // This is a raftpb.Entry of type EntryNormal whose data slice is either + // empty or whose first byte matches + // entryEncodingSideloadedWithACPrefixByte. The subsequent eight bytes + // represent a CmdIDKey. The remaining bytes represent a // kvserverpb.RaftCommand whose kvserverpb.ReplicatedEvalResult holds a // nontrival kvserverpb.ReplicatedEvalResult_AddSSTable, the Data field of // which is an SST to be ingested (and which is present in memory but made // durable via direct storage on the filesystem, bypassing the storage - // engine). - EntryEncodingSideloaded EntryEncoding = 1 - // EntryEncodingEmpty is an empty entry. These are used by raft after - // leader election. Since they hold no data, there is nothing in them to - // decode. - EntryEncodingEmpty EntryEncoding = 253 + // engine). The kvserverpb.RaftCommand also includes below-raft admission + // control metadata. + EntryEncodingSideloadedWithAC + // EntryEncodingStandardWithoutAC is like EntryEncodingStandardWithAC but + // without below-raft admission metadata. + EntryEncodingStandardWithoutAC + // EntryEncodingSideloadedWithoutAC is like EntryEncodingStandardWithoutAC + // but without below-raft admission metadata. + EntryEncodingSideloadedWithoutAC // EntryEncodingRaftConfChange is a raftpb.Entry whose raftpb.EntryType is // raftpb.EntryConfChange. The Entry's Data field holds a raftpb.ConfChange // whose Context field is a kvserverpb.ConfChangeContext whose Payload is a // kvserverpb.RaftCommand. In particular, the CmdIDKey requires a round of // protobuf unmarshaling. - EntryEncodingRaftConfChange EntryEncoding = 254 - // EntryEncodingRaftConfChangeV2 is analogous to EntryEncodingRaftConfChange, with - // the replacements raftpb.EntryConfChange{,V2} and raftpb.ConfChange{,V2} - // applied. - EntryEncodingRaftConfChangeV2 EntryEncoding = 255 + EntryEncodingRaftConfChange + // EntryEncodingRaftConfChangeV2 is analogous to + // EntryEncodingRaftConfChange, with the replacements + // raftpb.EntryConfChange{,V2} and raftpb.ConfChange{,V2} applied. + EntryEncodingRaftConfChangeV2 +) + +// IsSideloaded returns true if the encoding is +// EntryEncodingSideloadedWith{,out}AC. +func (enc EntryEncoding) IsSideloaded() bool { + return enc == EntryEncodingSideloadedWithAC || enc == EntryEncodingSideloadedWithoutAC +} + +// UsesAdmissionControl returns true if the encoding is +// EntryEncoding{Standard,Sideloaded}WithAC. +func (enc EntryEncoding) UsesAdmissionControl() bool { + return enc == EntryEncodingStandardWithAC || enc == EntryEncodingSideloadedWithAC +} + +// prefixByte returns the prefix byte used during encoding, applicable only to +// EntryEncoding{Standard,Sideloaded}With{,out}AC. +func (enc EntryEncoding) prefixByte() byte { + switch enc { + case EntryEncodingStandardWithAC: + return entryEncodingStandardWithACPrefixByte + case EntryEncodingSideloadedWithAC: + return entryEncodingSideloadedWithACPrefixByte + case EntryEncodingStandardWithoutAC: + return entryEncodingStandardWithoutACPrefixByte + case EntryEncodingSideloadedWithoutAC: + return entryEncodingSideloadedWithoutACPrefixByte + default: + panic(fmt.Sprintf("invalid encoding: %v has no prefix byte", enc)) + } +} + +const ( + // entryEncodingStandardWithACPrefixByte is the first byte of a + // raftpb.Entry's Data slice for an Entry of encoding + // EntryEncodingStandardWithAC. + entryEncodingStandardWithACPrefixByte = byte(2) // 0b00000010 + // entryEncodingSideloadedWithACPrefixByte is the first byte of a + // raftpb.Entry's Data slice for an Entry of encoding + // EntryEncodingSideloadedWithAC. + entryEncodingSideloadedWithACPrefixByte = byte(3) // 0b00000011 + // entryEncodingStandardWithoutACPrefixByte is the first byte of a + // raftpb.Entry's Data slice for an Entry of encoding + // EntryEncodingStandardWithoutAC. + entryEncodingStandardWithoutACPrefixByte = byte(0) // 0b00000000 + // entryEncodingSideloadedWithoutACPrefixByte is the first byte of a + // raftpb.Entry's Data slice for an Entry of encoding + // EntryEncodingSideloadedWithoutAC. + entryEncodingSideloadedWithoutACPrefixByte = byte(1) // 0b00000001 ) -// TODO(tbg): when we have a good library for encoding entries, these should -// no longer be exported. const ( // RaftCommandIDLen is the length of a command ID. RaftCommandIDLen = 8 - // RaftCommandPrefixLen is the length of the prefix of raft entries that - // use the EntryEncodingStandard or EntryEncodingSideloaded encodings. The - // bytes after the prefix represent the kvserverpb.RaftCommand. - // + // RaftCommandPrefixLen is the length of the prefix of raft entries that use + // the EntryEncoding{Standard,Sideloaded}With{,out}AC encodings. The bytes + // after the prefix represent the kvserverpb.RaftCommand. RaftCommandPrefixLen = 1 + RaftCommandIDLen - // EntryEncodingStandardPrefixByte is the first byte of a raftpb.Entry's - // Data slice for an Entry of encoding EntryEncodingStandard. - EntryEncodingStandardPrefixByte = byte(0) - // EntryEncodingSideloadedPrefixByte is the first byte of a raftpb.Entry's Data - // slice for an Entry of encoding EntryEncodingSideloaded. - EntryEncodingSideloadedPrefixByte = byte(1) ) -// EncodeRaftCommand encodes a raft command of type EntryEncodingStandard or -// EntryEncodingSideloaded. -func EncodeRaftCommand(prefixByte byte, commandID kvserverbase.CmdIDKey, command []byte) []byte { +// EncodeRaftCommand encodes a marshaled kvserverpb.RaftCommand using +// the given encoding (one of EntryEncoding{Standard,Sideloaded}With{,out}AC). +func EncodeRaftCommand(enc EntryEncoding, commandID kvserverbase.CmdIDKey, command []byte) []byte { b := make([]byte, RaftCommandPrefixLen+len(command)) - EncodeRaftCommandPrefix(b[:RaftCommandPrefixLen], prefixByte, commandID) + EncodeRaftCommandPrefix(b[:RaftCommandPrefixLen], enc, commandID) copy(b[RaftCommandPrefixLen:], command) return b } -// EncodeRaftCommandPrefix encodes the prefix for a Raft command of type -// EntryEncodingStandard or EntryEncodingSideloaded. -func EncodeRaftCommandPrefix(b []byte, prefixByte byte, commandID kvserverbase.CmdIDKey) { +// EncodeRaftCommandPrefix encodes the prefix for a Raft command, using the +// given encoding (one of EntryEncoding{Standard,Sideloaded}With{,out}AC). +func EncodeRaftCommandPrefix(b []byte, enc EntryEncoding, commandID kvserverbase.CmdIDKey) { if len(commandID) != RaftCommandIDLen { panic(fmt.Sprintf("invalid command ID length; %d != %d", len(commandID), RaftCommandIDLen)) } if len(b) != RaftCommandPrefixLen { panic(fmt.Sprintf("invalid command prefix length; %d != %d", len(b), RaftCommandPrefixLen)) } - b[0] = prefixByte + b[0] = enc.prefixByte() copy(b[1:], commandID) } diff --git a/pkg/kv/kvserver/raftlog/entry.go b/pkg/kv/kvserver/raftlog/entry.go index d0d04ba2378d..f05d10baa3dd 100644 --- a/pkg/kv/kvserver/raftlog/entry.go +++ b/pkg/kv/kvserver/raftlog/entry.go @@ -36,36 +36,40 @@ func EncodingOf(ent raftpb.Entry) (EntryEncoding, error) { } switch ent.Type { - case raftpb.EntryNormal: case raftpb.EntryConfChange: return EntryEncodingRaftConfChange, nil case raftpb.EntryConfChangeV2: return EntryEncodingRaftConfChangeV2, nil + case raftpb.EntryNormal: default: return 0, errors.AssertionFailedf("unknown EntryType %d", ent.Type) } switch ent.Data[0] { - case EntryEncodingStandardPrefixByte: - return EntryEncodingStandard, nil - case EntryEncodingSideloadedPrefixByte: - return EntryEncodingSideloaded, nil + case entryEncodingStandardWithACPrefixByte: + return EntryEncodingStandardWithAC, nil + case entryEncodingSideloadedWithACPrefixByte: + return EntryEncodingSideloadedWithAC, nil + case entryEncodingStandardWithoutACPrefixByte: + return EntryEncodingStandardWithoutAC, nil + case entryEncodingSideloadedWithoutACPrefixByte: + return EntryEncodingSideloadedWithoutAC, nil default: return 0, errors.AssertionFailedf("unknown command encoding version %d", ent.Data[0]) } } -// DecomposeRaftVersionStandardOrSideloaded extracts the CmdIDKey and the -// marshaled kvserverpb.RaftCommand from a slice which is known to have come -// from a raftpb.Entry of type raftlog.EntryEncodingStandard or -// raftlog.EntryEncodingSideloaded (which, mod the prefix byte, share an -// encoding). -func DecomposeRaftVersionStandardOrSideloaded(data []byte) (kvserverbase.CmdIDKey, []byte) { +// DecomposeRaftEncodingStandardOrSideloaded extracts the CmdIDKey and the +// marshaled kvserverpb.RaftCommand from a raftpb.Entry slice known to have +// Entry with type EntryEncoding{Standard,Sideloaded}With{,out}AC. +// All these variants, mod the prefix byte, share an encoding. +func DecomposeRaftEncodingStandardOrSideloaded(data []byte) (kvserverbase.CmdIDKey, []byte) { return kvserverbase.CmdIDKey(data[1 : 1+RaftCommandIDLen]), data[1+RaftCommandIDLen:] } // Entry contains data related to a raft log entry. This is the raftpb.Entry -// itself but also all encapsulated data relevant for command application. +// itself but also all encapsulated data relevant for command application and +// admission control. type Entry struct { raftpb.Entry ID kvserverbase.CmdIDKey // may be empty for zero Entry @@ -73,6 +77,10 @@ type Entry struct { ConfChangeV1 *raftpb.ConfChange // only set for config change ConfChangeV2 *raftpb.ConfChangeV2 // only set for config change ConfChangeContext *kvserverpb.ConfChangeContext // only set for config change + // ApplyAdmissionControl determines whether this entry is subject to + // replication admission control. Only applies for entries with encoding + // EntryEncoding{Standard,Sideloaded}WithAC. + ApplyAdmissionControl bool } var entryPool = sync.Pool{ @@ -145,8 +153,11 @@ func (e *Entry) load() error { AsV2() raftpb.ConfChangeV2 } switch typ { - case EntryEncodingStandard, EntryEncodingSideloaded: - e.ID, raftCmdBytes = DecomposeRaftVersionStandardOrSideloaded(e.Entry.Data) + case EntryEncodingStandardWithAC, EntryEncodingSideloadedWithAC: + e.ID, raftCmdBytes = DecomposeRaftEncodingStandardOrSideloaded(e.Entry.Data) + e.ApplyAdmissionControl = true + case EntryEncodingStandardWithoutAC, EntryEncodingSideloadedWithoutAC: + e.ID, raftCmdBytes = DecomposeRaftEncodingStandardOrSideloaded(e.Entry.Data) case EntryEncodingEmpty: // Nothing to load, the empty raftpb.Entry is represented by a trivial // Entry. diff --git a/pkg/kv/kvserver/raftlog/entry_test.go b/pkg/kv/kvserver/raftlog/entry_test.go index f7ac225e81c3..6893d4b3541c 100644 --- a/pkg/kv/kvserver/raftlog/entry_test.go +++ b/pkg/kv/kvserver/raftlog/entry_test.go @@ -25,7 +25,7 @@ func TestLoadInvalidEntry(t *testing.T) { Data: EncodeRaftCommand( // It would be nice to have an "even more invalid" command here but it // turns out that DecodeRaftCommand "handles" errors via panic(). - EntryEncodingStandardPrefixByte, "foobarzz", []byte("definitely not a protobuf"), + EntryEncodingStandardWithAC, "foobarzz", []byte("definitely not a protobuf"), ), } ent, err := NewEntry(invalidEnt) diff --git a/pkg/kv/kvserver/raftlog/iter_bench_test.go b/pkg/kv/kvserver/raftlog/iter_bench_test.go index 722510919d8f..adc0850b9c37 100644 --- a/pkg/kv/kvserver/raftlog/iter_bench_test.go +++ b/pkg/kv/kvserver/raftlog/iter_bench_test.go @@ -59,10 +59,9 @@ func (m *mockReader) NewMVCCIterator( return m.iter } -func mkBenchEnt(b *testing.B) (_ raftpb.Entry, metaB []byte) { +func mkRaftCommand(keySize, valSize, writeBatchSize int) *kvserverpb.RaftCommand { r := rand.New(rand.NewSource(123)) - // A realistic-ish raft command for a ~1kb write. - cmd := &kvserverpb.RaftCommand{ + return &kvserverpb.RaftCommand{ ProposerLeaseSequence: 1, MaxLeaseIndex: 1159192591, ClosedTimestamp: &hlc.Timestamp{WallTime: 12512591925, Logical: 1}, @@ -79,20 +78,25 @@ func mkBenchEnt(b *testing.B) (_ raftpb.Entry, metaB []byte) { }, RaftLogDelta: 1300, }, - WriteBatch: &kvserverpb.WriteBatch{Data: randutil.RandBytes(r, 2000)}, + WriteBatch: &kvserverpb.WriteBatch{Data: randutil.RandBytes(r, writeBatchSize)}, LogicalOpLog: &kvserverpb.LogicalOpLog{Ops: []enginepb.MVCCLogicalOp{ { WriteValue: &enginepb.MVCCWriteValueOp{ - Key: roachpb.Key(randutil.RandBytes(r, 100)), + Key: roachpb.Key(randutil.RandBytes(r, keySize)), Timestamp: hlc.Timestamp{WallTime: 1284581285}, - Value: roachpb.Key(randutil.RandBytes(r, 1800)), + Value: roachpb.Key(randutil.RandBytes(r, valSize)), }, }, }}, } +} + +func mkBenchEnt(b *testing.B) (_ raftpb.Entry, metaB []byte) { + // A realistic-ish raft command for a ~1kb write. + cmd := mkRaftCommand(100, 1800, 2000) cmdB, err := protoutil.Marshal(cmd) require.NoError(b, err) - data := EncodeRaftCommand(EntryEncodingStandardPrefixByte, "cmd12345", cmdB) + data := EncodeRaftCommand(EntryEncodingStandardWithoutAC, "cmd12345", cmdB) ent := raftpb.Entry{ Term: 1, diff --git a/pkg/kv/kvserver/raftlog/iter_test.go b/pkg/kv/kvserver/raftlog/iter_test.go index 189c4f965f95..6d372e654885 100644 --- a/pkg/kv/kvserver/raftlog/iter_test.go +++ b/pkg/kv/kvserver/raftlog/iter_test.go @@ -44,11 +44,11 @@ func ents(inds ...uint64) []raftpb.Entry { typ := raftpb.EntryType(ind % 3) switch typ { case raftpb.EntryNormal: - prefixByte := EntryEncodingStandardPrefixByte + enc := EntryEncodingStandardWithAC if ind%2 == 0 { - prefixByte = EntryEncodingSideloadedPrefixByte + enc = EntryEncodingSideloadedWithAC } - data = EncodeRaftCommand(prefixByte, cmdID, b) + data = EncodeRaftCommand(enc, cmdID, b) case raftpb.EntryConfChangeV2: c := kvserverpb.ConfChangeContext{ CommandID: string(cmdID), diff --git a/pkg/kv/kvserver/replica_proposal_buf_test.go b/pkg/kv/kvserver/replica_proposal_buf_test.go index 05be5ad0d3da..5b872d6e2771 100644 --- a/pkg/kv/kvserver/replica_proposal_buf_test.go +++ b/pkg/kv/kvserver/replica_proposal_buf_test.go @@ -303,7 +303,7 @@ func (pc proposalCreator) encodeProposal(p *ProposalData) []byte { cmdLen := p.command.Size() needed := raftlog.RaftCommandPrefixLen + cmdLen + kvserverpb.MaxRaftCommandFooterSize() data := make([]byte, raftlog.RaftCommandPrefixLen, needed) - raftlog.EncodeRaftCommandPrefix(data, raftlog.EntryEncodingStandardPrefixByte, p.idKey) + raftlog.EncodeRaftCommandPrefix(data, raftlog.EntryEncodingStandardWithoutAC, p.idKey) data = data[:raftlog.RaftCommandPrefixLen+p.command.Size()] if _, err := protoutil.MarshalTo(p.command, data[raftlog.RaftCommandPrefixLen:]); err != nil { panic(err) diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index e6f8b4369a93..690c2e72445b 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -344,7 +344,7 @@ func (r *Replica) propose( // Determine the encoding style for the Raft command. prefix := true - encodingPrefixByte := raftlog.EntryEncodingStandardPrefixByte + entryEncoding := raftlog.EntryEncodingStandardWithoutAC if crt := p.command.ReplicatedEvalResult.ChangeReplicas; crt != nil { // EndTxnRequest with a ChangeReplicasTrigger is special because Raft // needs to understand it; it cannot simply be an opaque command. To @@ -416,7 +416,7 @@ func (r *Replica) propose( } } else if p.command.ReplicatedEvalResult.AddSSTable != nil { log.VEvent(p.ctx, 4, "sideloadable proposal detected") - encodingPrefixByte = raftlog.EntryEncodingSideloadedPrefixByte + entryEncoding = raftlog.EntryEncodingSideloadedWithoutAC r.store.metrics.AddSSTableProposals.Inc(1) if p.command.ReplicatedEvalResult.AddSSTable.Data == nil { @@ -438,7 +438,7 @@ func (r *Replica) propose( data := make([]byte, preLen, needed) // Encode prefix with command ID, if necessary. if prefix { - raftlog.EncodeRaftCommandPrefix(data, encodingPrefixByte, p.idKey) + raftlog.EncodeRaftCommandPrefix(data, entryEncoding, p.idKey) } // Encode body of command. data = data[:preLen+cmdLen] diff --git a/pkg/kv/kvserver/replica_raft_quiesce.go b/pkg/kv/kvserver/replica_raft_quiesce.go index 1fb80c595813..147ee7447721 100644 --- a/pkg/kv/kvserver/replica_raft_quiesce.go +++ b/pkg/kv/kvserver/replica_raft_quiesce.go @@ -87,7 +87,7 @@ func (r *Replica) maybeUnquiesceAndWakeLeaderLocked() bool { r.store.unquiescedReplicas.Unlock() r.maybeCampaignOnWakeLocked(ctx) // Propose an empty command which will wake the leader. - data := raftlog.EncodeRaftCommand(raftlog.EntryEncodingStandardPrefixByte, makeIDKey(), nil) + data := raftlog.EncodeRaftCommand(raftlog.EntryEncodingStandardWithoutAC, makeIDKey(), nil) _ = r.mu.internalRaftGroup.Propose(data) return true } diff --git a/pkg/kv/kvserver/replica_sideload_test.go b/pkg/kv/kvserver/replica_sideload_test.go index 2c7de2197518..74501c638a78 100644 --- a/pkg/kv/kvserver/replica_sideload_test.go +++ b/pkg/kv/kvserver/replica_sideload_test.go @@ -199,7 +199,7 @@ func TestRaftSSTableSideloading(t *testing.T) { var idx int for idx = 0; idx < len(ents); idx++ { // Get the SST back from the raft log. - if typ, _ := raftlog.EncodingOf(ents[idx]); typ != raftlog.EntryEncodingSideloaded { + if typ, _ := raftlog.EncodingOf(ents[idx]); !typ.IsSideloaded() { continue } ent, err := logstore.MaybeInlineSideloadedRaftCommand(ctx, tc.repl.RangeID, ents[idx], tc.repl.raftMu.sideloaded, tc.store.raftEntryCache)