Skip to content

Commit

Permalink
Merge #95523 #95748
Browse files Browse the repository at this point in the history
95523: insights: record insights when txn contention meets threshold r=xinhaoz a=xinhaoz

Part of #90393

Previously, we only write a transaction and its statements to the insights system when there is an issue detected for one of the statements in the transaction. However, a transaction should still report high contention when the total amount of contention time is high, even if the contention experienced by each of its statements is not over the threshold.

This commit ensures that transactions which have a recorded contention time above the insights latency threshold get reported to the insights system.

Release note: None

95748: raftlog: introduce EntryEncoding{Standard,Sideloaded}WithAC r=irfansharif a=irfansharif

Part of #95563. Predecessor to #95637.

This commit introduces two new encodings for raft log entries, `EntryEncoding{Standard,Sideloaded}WithAC`. Raft log entries have prefix byte that informs decoding routines how to interpret the subsequent bytes. To date we've had two, `EntryEncoding{Standard,Sideloaded}`[^1], to indicate whether the entry came with sideloaded data[^2]. Our two additions here will be used to indicate whether the particular entry is subject to replication admission control. If so, right as we persist entries into the raft log storage, we'll "admit the work without blocking", which is further explained in #95637.

The decision to use replication admission control happens above raft and a per-entry basis. If using replication admission control, AC-specific metadata will be plumbed down as part of the marshaled raft command. This too is explained in in #95637, specifically, the 'RaftAdmissionMeta' section. This commit then adds an unused version gate (`V23_1UseEncodingWithBelowRaftAdmissionData`) to use replication admission control. Since we're using a different prefix byte for raft commands, one not recognized in earlier CRDB versions, we need explicit versioning. We add it out of development convenience -- adding version gates is most prone to merge conflicts. We expect to use it shortly, before alpha/beta cuts.

[^1]: Now renamed to `EntryEncoding{Standard,Sideloaded}WithoutAC`.
[^2]: These are typically AddSSTs, the storage for which is treated differently for performance reasons.

Release note: None

Co-authored-by: Xin Hao Zhang <[email protected]>
Co-authored-by: irfan sharif <[email protected]>
  • Loading branch information
3 people committed Jan 26, 2023
3 parents 3b5bcf0 + de5fbab + 4df47f5 commit 705d6a1
Show file tree
Hide file tree
Showing 17 changed files with 225 additions and 104 deletions.
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/logstore/logstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ func LoadTerm(
if err != nil {
return 0, err
}
if typ != raftlog.EntryEncodingSideloaded {
if !typ.IsSideloaded() {
eCache.Add(rangeID, []raftpb.Entry{entry}, false /* truncate */)
}
return entry.Term, nil
Expand Down Expand Up @@ -405,7 +405,7 @@ func LoadEntries(
if err != nil {
return err
}
if typ == raftlog.EntryEncodingSideloaded {
if typ.IsSideloaded() {
newEnt, err := MaybeInlineSideloadedRaftCommand(
ctx, rangeID, ent, sideloaded, eCache,
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/logstore/logstore_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func runBenchmarkLogStore_StoreEntries(b *testing.B, bytes int64) {
Term: 1,
Index: 1,
Type: raftpb.EntryNormal,
Data: raftlog.EncodeRaftCommand(raftlog.EntryEncodingStandardPrefixByte, "deadbeef", data),
Data: raftlog.EncodeRaftCommand(raftlog.EntryEncodingStandardWithoutAC, "deadbeef", data),
})
stats := &AppendStats{}

Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/logstore/sideload.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func MaybeSideloadEntries(
if err != nil {
return nil, 0, 0, 0, err
}
if typ != raftlog.EntryEncodingSideloaded {
if !typ.IsSideloaded() {
otherEntriesSize += int64(len(input[i].Data))
continue
}
Expand Down Expand Up @@ -124,7 +124,7 @@ func MaybeSideloadEntries(
// TODO(tbg): this should be supported by a method as well.
{
data := make([]byte, raftlog.RaftCommandPrefixLen+e.Cmd.Size())
raftlog.EncodeRaftCommandPrefix(data[:raftlog.RaftCommandPrefixLen], raftlog.EntryEncodingSideloadedPrefixByte, e.ID)
raftlog.EncodeRaftCommandPrefix(data[:raftlog.RaftCommandPrefixLen], typ, e.ID)
_, err := protoutil.MarshalTo(&e.Cmd, data[raftlog.RaftCommandPrefixLen:])
if err != nil {
return nil, 0, 0, 0, errors.Wrap(err, "while marshaling stripped sideloaded command")
Expand Down Expand Up @@ -165,7 +165,7 @@ func MaybeInlineSideloadedRaftCommand(
if err != nil {
return nil, err
}
if typ != raftlog.EntryEncodingSideloaded {
if !typ.IsSideloaded() {
return nil, nil
}
log.Event(ctx, "inlining sideloaded SSTable")
Expand Down Expand Up @@ -213,7 +213,7 @@ func MaybeInlineSideloadedRaftCommand(
// the EntryEncoding.
{
data := make([]byte, raftlog.RaftCommandPrefixLen+e.Cmd.Size())
raftlog.EncodeRaftCommandPrefix(data[:raftlog.RaftCommandPrefixLen], raftlog.EntryEncodingSideloadedPrefixByte, e.ID)
raftlog.EncodeRaftCommandPrefix(data[:raftlog.RaftCommandPrefixLen], typ, e.ID)
_, err := protoutil.MarshalTo(&e.Cmd, data[raftlog.RaftCommandPrefixLen:])
if err != nil {
return nil, err
Expand All @@ -232,7 +232,7 @@ func AssertSideloadedRaftCommandInlined(ctx context.Context, ent *raftpb.Entry)
if err != nil {
log.Fatalf(ctx, "%v", err)
}
if typ != raftlog.EntryEncodingSideloaded {
if !typ.IsSideloaded() {
return
}

Expand Down
16 changes: 8 additions & 8 deletions pkg/kv/kvserver/logstore/sideload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func mustEntryEq(t testing.TB, l, r raftpb.Entry) {
}

func mkEnt(
v byte, index, term uint64, as *kvserverpb.ReplicatedEvalResult_AddSSTable,
enc raftlog.EntryEncoding, index, term uint64, as *kvserverpb.ReplicatedEvalResult_AddSSTable,
) raftpb.Entry {
cmdIDKey := strings.Repeat("x", raftlog.RaftCommandIDLen)
var cmd kvserverpb.RaftCommand
Expand All @@ -62,7 +62,7 @@ func mkEnt(
}
var ent raftpb.Entry
ent.Index, ent.Term = index, term
ent.Data = raftlog.EncodeRaftCommand(v, kvserverbase.CmdIDKey(cmdIDKey), b)
ent.Data = raftlog.EncodeRaftCommand(enc, kvserverbase.CmdIDKey(cmdIDKey), b)
return ent
}

Expand Down Expand Up @@ -355,7 +355,7 @@ func TestRaftSSTableSideloadingInline(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

v1, v2 := raftlog.EntryEncodingStandardPrefixByte, raftlog.EntryEncodingSideloadedPrefixByte
v1, v2 := raftlog.EntryEncodingStandardWithAC, raftlog.EntryEncodingSideloadedWithAC
rangeID := roachpb.RangeID(1)

type testCase struct {
Expand Down Expand Up @@ -477,11 +477,11 @@ func TestRaftSSTableSideloadingSideload(t *testing.T) {
addSSTStripped := addSST
addSSTStripped.Data = nil

entV1Reg := mkEnt(raftlog.EntryEncodingStandardPrefixByte, 10, 99, nil)
entV1SST := mkEnt(raftlog.EntryEncodingStandardPrefixByte, 11, 99, &addSST)
entV2Reg := mkEnt(raftlog.EntryEncodingSideloadedPrefixByte, 12, 99, nil)
entV2SST := mkEnt(raftlog.EntryEncodingSideloadedPrefixByte, 13, 99, &addSST)
entV2SSTStripped := mkEnt(raftlog.EntryEncodingSideloadedPrefixByte, 13, 99, &addSSTStripped)
entV1Reg := mkEnt(raftlog.EntryEncodingStandardWithAC, 10, 99, nil)
entV1SST := mkEnt(raftlog.EntryEncodingStandardWithAC, 11, 99, &addSST)
entV2Reg := mkEnt(raftlog.EntryEncodingSideloadedWithAC, 12, 99, nil)
entV2SST := mkEnt(raftlog.EntryEncodingSideloadedWithAC, 13, 99, &addSST)
entV2SSTStripped := mkEnt(raftlog.EntryEncodingSideloadedWithAC, 13, 99, &addSSTStripped)

type tc struct {
name string
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/loqrecovery/recovery_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func raftLogFromPendingDescriptorUpdate(
t.Fatalf("failed to serialize raftCommand: %v", err)
}
data := raftlog.EncodeRaftCommand(
raftlog.EntryEncodingStandardPrefixByte, kvserverbase.CmdIDKey(fmt.Sprintf("%08d", entryIndex)), out)
raftlog.EntryEncodingStandardWithoutAC, kvserverbase.CmdIDKey(fmt.Sprintf("%08d", entryIndex)), out)
ent := raftpb.Entry{
Term: 1,
Index: replica.RaftCommittedIndex + entryIndex,
Expand Down
13 changes: 8 additions & 5 deletions pkg/kv/kvserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,9 @@ func raftEntryFormatter(data []byte) string {
}
// NB: a raft.EntryFormatter is only invoked for EntryNormal (raft methods
// that call this take care of unwrapping the ConfChange), and since
// len(data)>0 it has to be EntryEncodingStandard or EntryEncodingSideloaded and
// they are encoded identically.
cmdID, data := raftlog.DecomposeRaftVersionStandardOrSideloaded(data)
// len(data)>0 it has to be {Deprecated,}EntryEncoding{Standard,Sideloaded}
// and they are encoded identically.
cmdID, data := raftlog.DecomposeRaftEncodingStandardOrSideloaded(data)
return fmt.Sprintf("[%x] [%d]", cmdID, len(data))
}

Expand Down Expand Up @@ -274,8 +274,11 @@ func extractIDs(ids []kvserverbase.CmdIDKey, ents []raftpb.Entry) []kvserverbase
continue
}
switch typ {
case raftlog.EntryEncodingStandard, raftlog.EntryEncodingSideloaded:
id, _ := raftlog.DecomposeRaftVersionStandardOrSideloaded(e.Data)
case raftlog.EntryEncodingStandardWithAC,
raftlog.EntryEncodingSideloadedWithAC,
raftlog.EntryEncodingStandardWithoutAC,
raftlog.EntryEncodingSideloadedWithoutAC:
id, _ := raftlog.DecomposeRaftEncodingStandardOrSideloaded(e.Data)
ids = append(ids, id)
case raftlog.EntryEncodingRaftConfChange, raftlog.EntryEncodingRaftConfChangeV2:
// Configuration changes don't have the CmdIDKey easily accessible but are
Expand Down
145 changes: 95 additions & 50 deletions pkg/kv/kvserver/raftlog/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,83 +23,128 @@ import (
// and, in some cases, the first byte of the Entry's Data payload.
type EntryEncoding byte

// TODO(tbg): use auto-assigned consts (iota) for the encodings below, since
// they aren't on the wire.

const (
// EntryEncodingStandard is the default encoding for a CockroachDB raft log
// entry.
//
// This is a raftpb.Entry of type EntryNormal whose Data slice is either empty
// or whose first byte matches EntryEncodingStandardPrefixByte. The subsequent
// eight bytes represent a CmdIDKey. The remaining bytes represent a
// kvserverpb.RaftCommand.
EntryEncodingStandard EntryEncoding = 0
// EntryEncodingSideloaded indicates a proposal representing the result of a
// roachpb.AddSSTableRequest for which the payload (the SST) is stored outside
// the storage engine to improve storage performance.
//
// This is a raftpb.Entry of type EntryNormal whose data slice is either empty
// or whose first byte matches EntryEncodingSideloadedPrefixByte. The subsequent
// eight bytes represent a CmdIDKey. The remaining bytes represent a
// kvserverpb.RaftCommand whose kvserverpb.ReplicatedEvalResult holds a
// nontrival kvserverpb.ReplicatedEvalResult_AddSSTable, the Data field of
// which is an SST to be ingested (and which is present in memory but made
// durable via direct storage on the filesystem, bypassing the storage
// engine).
EntryEncodingSideloaded EntryEncoding = 1
// EntryEncodingEmpty is an empty entry. These are used by raft after
// leader election. Since they hold no data, there is nothing in them to
// decode.
EntryEncodingEmpty EntryEncoding = 253
EntryEncodingEmpty EntryEncoding = iota
// EntryEncodingStandardWithAC is the default encoding for a CockroachDB
// raft log entry.
//
// This is a raftpb.Entry of type EntryNormal whose Data slice is either
// empty or whose first byte matches entryEncodingStandardWithACPrefixByte.
// The subsequent eight bytes represent a CmdIDKey. The remaining bytes
// represent a kvserverpb.RaftCommand that also includes data used for
// below-raft admission control (Admission{Priority,CreateTime,OriginNode}).
EntryEncodingStandardWithAC
// EntryEncodingSideloadedWithAC indicates a proposal representing the
// result of a roachpb.AddSSTableRequest for which the payload (the SST) is
// stored outside the storage engine to improve storage performance.
//
// This is a raftpb.Entry of type EntryNormal whose data slice is either
// empty or with first byte == entryEncodingSideloadedWithACPrefixByte. The
// subsequent eight bytes represent a CmdIDKey. The remaining bytes
// represent a kvserverpb.RaftCommand whose kvserverpb.ReplicatedEvalResult
// holds a nontrival kvserverpb.ReplicatedEvalResult_AddSSTable, the Data
// field of which is an SST to be ingested (and which is present in memory
// but made durable via direct storage on the filesystem, bypassing the
// storage engine). Admission{Priority,CreateTime,OriginNode} in the
// kvserverpb.RaftCommand are non-empty, data used for below-raft admission
// control.
EntryEncodingSideloadedWithAC
// EntryEncodingStandardWithoutAC is like EntryEncodingStandardWithAC but
// without the data for below-raft admission control.
EntryEncodingStandardWithoutAC
// EntryEncodingSideloadedWithoutAC is like EntryEncodingStandardWithoutAC
// but without below-raft admission metadata.
EntryEncodingSideloadedWithoutAC
// EntryEncodingRaftConfChange is a raftpb.Entry whose raftpb.EntryType is
// raftpb.EntryConfChange. The Entry's Data field holds a raftpb.ConfChange
// whose Context field is a kvserverpb.ConfChangeContext whose Payload is a
// kvserverpb.RaftCommand. In particular, the CmdIDKey requires a round of
// protobuf unmarshaling.
EntryEncodingRaftConfChange EntryEncoding = 254
// EntryEncodingRaftConfChangeV2 is analogous to EntryEncodingRaftConfChange, with
// the replacements raftpb.EntryConfChange{,V2} and raftpb.ConfChange{,V2}
// applied.
EntryEncodingRaftConfChangeV2 EntryEncoding = 255
EntryEncodingRaftConfChange
// EntryEncodingRaftConfChangeV2 is analogous to
// EntryEncodingRaftConfChange, with the replacements
// raftpb.EntryConfChange{,V2} and raftpb.ConfChange{,V2} applied.
EntryEncodingRaftConfChangeV2
)

// IsSideloaded returns true if the encoding is
// EntryEncodingSideloadedWith{,out}AC.
func (enc EntryEncoding) IsSideloaded() bool {
return enc == EntryEncodingSideloadedWithAC || enc == EntryEncodingSideloadedWithoutAC
}

// UsesAdmissionControl returns true if the encoding is
// EntryEncoding{Standard,Sideloaded}WithAC.
func (enc EntryEncoding) UsesAdmissionControl() bool {
return enc == EntryEncodingStandardWithAC || enc == EntryEncodingSideloadedWithAC
}

// prefixByte returns the prefix byte used during encoding, applicable only to
// EntryEncoding{Standard,Sideloaded}With{,out}AC.
func (enc EntryEncoding) prefixByte() byte {
switch enc {
case EntryEncodingStandardWithAC:
return entryEncodingStandardWithACPrefixByte
case EntryEncodingSideloadedWithAC:
return entryEncodingSideloadedWithACPrefixByte
case EntryEncodingStandardWithoutAC:
return entryEncodingStandardWithoutACPrefixByte
case EntryEncodingSideloadedWithoutAC:
return entryEncodingSideloadedWithoutACPrefixByte
default:
panic(fmt.Sprintf("invalid encoding: %v has no prefix byte", enc))
}
}

const (
// entryEncodingStandardWithACPrefixByte is the first byte of a
// raftpb.Entry's Data slice for an Entry of encoding
// EntryEncodingStandardWithAC.
entryEncodingStandardWithACPrefixByte = byte(2) // 0b00000010
// entryEncodingSideloadedWithACPrefixByte is the first byte of a
// raftpb.Entry's Data slice for an Entry of encoding
// EntryEncodingSideloadedWithAC.
entryEncodingSideloadedWithACPrefixByte = byte(3) // 0b00000011
// entryEncodingStandardWithoutACPrefixByte is the first byte of a
// raftpb.Entry's Data slice for an Entry of encoding
// EntryEncodingStandardWithoutAC.
entryEncodingStandardWithoutACPrefixByte = byte(0) // 0b00000000
// entryEncodingSideloadedWithoutACPrefixByte is the first byte of a
// raftpb.Entry's Data slice for an Entry of encoding
// EntryEncodingSideloadedWithoutAC.
entryEncodingSideloadedWithoutACPrefixByte = byte(1) // 0b00000001
)

// TODO(tbg): when we have a good library for encoding entries, these should
// no longer be exported.
const (
// RaftCommandIDLen is the length of a command ID.
RaftCommandIDLen = 8
// RaftCommandPrefixLen is the length of the prefix of raft entries that
// use the EntryEncodingStandard or EntryEncodingSideloaded encodings. The
// bytes after the prefix represent the kvserverpb.RaftCommand.
//
// RaftCommandPrefixLen is the length of the prefix of raft entries that use
// the EntryEncoding{Standard,Sideloaded}With{,out}AC encodings. The bytes
// after the prefix represent the kvserverpb.RaftCommand.
RaftCommandPrefixLen = 1 + RaftCommandIDLen
// EntryEncodingStandardPrefixByte is the first byte of a raftpb.Entry's
// Data slice for an Entry of encoding EntryEncodingStandard.
EntryEncodingStandardPrefixByte = byte(0)
// EntryEncodingSideloadedPrefixByte is the first byte of a raftpb.Entry's Data
// slice for an Entry of encoding EntryEncodingSideloaded.
EntryEncodingSideloadedPrefixByte = byte(1)
)

// EncodeRaftCommand encodes a raft command of type EntryEncodingStandard or
// EntryEncodingSideloaded.
func EncodeRaftCommand(prefixByte byte, commandID kvserverbase.CmdIDKey, command []byte) []byte {
// EncodeRaftCommand encodes a marshaled kvserverpb.RaftCommand using
// the given encoding (one of EntryEncoding{Standard,Sideloaded}With{,out}AC).
func EncodeRaftCommand(enc EntryEncoding, commandID kvserverbase.CmdIDKey, command []byte) []byte {
b := make([]byte, RaftCommandPrefixLen+len(command))
EncodeRaftCommandPrefix(b[:RaftCommandPrefixLen], prefixByte, commandID)
EncodeRaftCommandPrefix(b[:RaftCommandPrefixLen], enc, commandID)
copy(b[RaftCommandPrefixLen:], command)
return b
}

// EncodeRaftCommandPrefix encodes the prefix for a Raft command of type
// EntryEncodingStandard or EntryEncodingSideloaded.
func EncodeRaftCommandPrefix(b []byte, prefixByte byte, commandID kvserverbase.CmdIDKey) {
// EncodeRaftCommandPrefix encodes the prefix for a Raft command, using the
// given encoding (one of EntryEncoding{Standard,Sideloaded}With{,out}AC).
func EncodeRaftCommandPrefix(b []byte, enc EntryEncoding, commandID kvserverbase.CmdIDKey) {
if len(commandID) != RaftCommandIDLen {
panic(fmt.Sprintf("invalid command ID length; %d != %d", len(commandID), RaftCommandIDLen))
}
if len(b) != RaftCommandPrefixLen {
panic(fmt.Sprintf("invalid command prefix length; %d != %d", len(b), RaftCommandPrefixLen))
}
b[0] = prefixByte
b[0] = enc.prefixByte()
copy(b[1:], commandID)
}
Loading

0 comments on commit 705d6a1

Please sign in to comment.