Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: make RaftCommand.ClosedTimestamp nullable #60992

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3356,15 +3356,15 @@ func TestProposalOverhead(t *testing.T) {
defer tc.Stopper().Stop(ctx)

db := tc.Server(0).DB()
// NB: the expected overhead reflects the space overhead currently
// present in Raft commands. This test will fail if that overhead
// changes. Try to make this number go down and not up. It slightly
// undercounts because our proposal filter is called before
// maxLeaseIndex is filled in. The difference between the user and system
// overhead is that users ranges do not have rangefeeds on by default whereas
// system ranges do.
// NB: the expected overhead reflects the space overhead currently present
// in Raft commands. This test will fail if that overhead changes. Try to
// make this number go down and not up. It slightly undercounts because our
// proposal filter is called before MaxLeaseIndex or ClosedTimestamp are
// filled in. The difference between the user and system overhead is that
// users ranges do not have rangefeeds on by default whereas system ranges
// do.
const (
expectedUserOverhead uint32 = 45
expectedUserOverhead uint32 = 42
)
t.Run("user-key overhead", func(t *testing.T) {
userKey := tc.ScratchRange(t)
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/kvserverpb/proposer_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

var maxRaftCommandFooterSize = (&RaftCommandFooter{
var maxMaxLeaseFooterSize = (&MaxLeaseFooter{
MaxLeaseIndex: math.MaxUint64,
}).Size()

Expand All @@ -28,10 +28,10 @@ var maxClosedTimestampFooterSize = (&ClosedTimestampFooter{
},
}).Size()

// MaxRaftCommandFooterSize returns the maximum possible size of an
// encoded RaftCommandFooter proto.
func MaxRaftCommandFooterSize() int {
return maxRaftCommandFooterSize
// MaxMaxLeaseFooterSize returns the maximum possible size of an encoded
// MaxLeaseFooter proto.
func MaxMaxLeaseFooterSize() int {
return maxMaxLeaseFooterSize
}

// MaxClosedTimestampFooterSize returns the maximmum possible size of an encoded
Expand Down
301 changes: 160 additions & 141 deletions pkg/kv/kvserver/kvserverpb/proposer_kv.pb.go

Large diffs are not rendered by default.

23 changes: 17 additions & 6 deletions pkg/kv/kvserver/kvserverpb/proposer_kv.proto
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,12 @@ message RaftCommand {
// updated accordingly. Managing retry of proposals becomes trickier as
// well as that uproots whatever ordering was originally envisioned.
//
// This field is set through RaftCommandFooter hackery.
// This field is set through MaxLeaseFooter hackery. Unlike with the
// ClosedTimestamp, which needs to be nullable in this proto (see comment),
// there are no nullability concerns with this field. This is because
// max_lease_index is a primitive type, so it does not get encoded when zero.
// This alone ensures that the field is not encoded twice in the combined
// RaftCommand+MaxLeaseFooter proto.
uint64 max_lease_index = 4;

// The closed timestamp carried by this command. Once a follower is told to
Expand All @@ -255,8 +260,11 @@ message RaftCommand {
// in a command-specific way. If the value is not zero, the value is greater
// or equal to that of the previous commands (and all before it).
//
// This field is set through ClosedTimestampFooter hackery.
util.hlc.Timestamp closed_timestamp = 17 [(gogoproto.nullable) = false];
// This field is set through ClosedTimestampFooter hackery. Unlike in the
// ClosedTimestampFooter, the field is nullable here so that it does not get
// encoded when empty. This prevents the field from being encoded twice in the
// combined RaftCommand+ClosedTimestampFooter proto.
util.hlc.Timestamp closed_timestamp = 17;

reserved 3;

Expand Down Expand Up @@ -284,19 +292,22 @@ message RaftCommand {
reserved 1, 2, 10001 to 10014;
}

// RaftCommandFooter contains a subset of the fields in RaftCommand. It is used
// MaxLeaseFooter contains a subset of the fields in RaftCommand. It is used
// to optimize a pattern where most of the fields in RaftCommand are marshaled
// outside of a heavily contended critical section, except for the fields in the
// footer, which are assigned and marhsaled inside of the critical section and
// appended to the marshaled byte buffer. This minimizes the memory allocation
// and marshaling work performed under lock.
message RaftCommandFooter {
message MaxLeaseFooter {
uint64 max_lease_index = 4;
}

// ClosedTimestampFooter is similar to RaftCommandFooter, allowing the proposal
// ClosedTimestampFooter is similar to MaxLeaseFooter, allowing the proposal
// buffer to fill in the closed_timestamp field after most of the proto has been
// marshaled already.
message ClosedTimestampFooter {
// NOTE: unlike in RaftCommand, there's no reason to make this field nullable.
// If we don't want to include the field, we don't need to append the encoded
// footer to an encoded RaftCommand buffer.
util.hlc.Timestamp closed_timestamp = 17 [(gogoproto.nullable) = false];
}
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ func (b *replicaAppBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error
cmd.raftCmd.ReplicatedEvalResult = kvserverpb.ReplicatedEvalResult{}
cmd.raftCmd.WriteBatch = nil
cmd.raftCmd.LogicalOpLog = nil
cmd.raftCmd.ClosedTimestamp.Reset()
cmd.raftCmd.ClosedTimestamp = nil
} else {
// Assert that we're not writing under the closed timestamp. We can only do
// these checks on IsIntentWrite requests, since others (for example,
Expand Down Expand Up @@ -639,7 +639,7 @@ func (b *replicaAppBatch) runPreApplyTriggersAfterStagingWriteBatch(
//
// Alternatively if we discover that the RHS has already been removed
// from this store, clean up its data.
splitPreApply(ctx, b.batch, res.Split.SplitTrigger, b.r, cmd.raftCmd.ClosedTimestamp)
splitPreApply(ctx, b.r, b.batch, res.Split.SplitTrigger, cmd.raftCmd.ClosedTimestamp)

// The rangefeed processor will no longer be provided logical ops for
// its entire range, so it needs to be shut down and all registrations
Expand Down Expand Up @@ -823,13 +823,13 @@ func (b *replicaAppBatch) stageTrivialReplicatedEvalResult(
if leaseAppliedIndex := cmd.leaseIndex; leaseAppliedIndex != 0 {
b.state.LeaseAppliedIndex = leaseAppliedIndex
}
if cts := cmd.raftCmd.ClosedTimestamp; !cts.IsEmpty() {
if cts := cmd.raftCmd.ClosedTimestamp; cts != nil && !cts.IsEmpty() {
if cts.Less(b.state.ClosedTimestamp) {
log.Fatalf(ctx,
"closed timestamp regressing from %s to %s when applying command %x",
b.state.ClosedTimestamp, cts, cmd.idKey)
}
b.state.ClosedTimestamp = cts
b.state.ClosedTimestamp = *cts
if clockTS, ok := cts.TryToClockTimestamp(); ok {
b.maxTS.Forward(clockTS)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type ProposalData struct {
quotaAlloc *quotapool.IntAlloc

// tmpFooter is used to avoid an allocation.
tmpFooter kvserverpb.RaftCommandFooter
tmpFooter kvserverpb.MaxLeaseFooter

// ec.done is called after command application to update the timestamp
// cache and optionally release latches and exits lock wait-queues.
Expand Down
9 changes: 7 additions & 2 deletions pkg/kv/kvserver/replica_proposal_buf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (pc proposalCreator) newProposal(ba roachpb.BatchRequest) (*ProposalData, [
func (pc proposalCreator) encodeProposal(p *ProposalData) []byte {
cmdLen := p.command.Size()
needed := raftCommandPrefixLen + cmdLen +
kvserverpb.MaxRaftCommandFooterSize() +
kvserverpb.MaxMaxLeaseFooterSize() +
kvserverpb.MaxClosedTimestampFooterSize()
data := make([]byte, raftCommandPrefixLen, needed)
encodeRaftCommandPrefix(data, raftVersionStandard, p.idKey)
Expand Down Expand Up @@ -716,7 +716,12 @@ func TestProposalBufferClosedTimestamp(t *testing.T) {
type reqType int
checkClosedTS := func(t *testing.T, r *testProposerRaft, exp hlc.Timestamp) {
require.Len(t, r.lastProps, 1)
require.Equal(t, exp, r.lastProps[0].ClosedTimestamp)
if exp.IsEmpty() {
require.Nil(t, r.lastProps[0].ClosedTimestamp)
} else {
require.NotNil(t, r.lastProps[0].ClosedTimestamp)
require.Equal(t, exp, *r.lastProps[0].ClosedTimestamp)
}
}

// The lease that the proposals are made under.
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (r *Replica) propose(

// Make sure the maximum lease index is unset. This field will be set in
// propBuf.Insert and its encoded bytes will be appended to the encoding
// buffer as a RaftCommandFooter.
// buffer as a MaxLeaseFooter.
p.command.MaxLeaseIndex = 0

// Determine the encoding style for the Raft command.
Expand Down Expand Up @@ -320,7 +320,7 @@ func (r *Replica) propose(
// Allocate the data slice with enough capacity to eventually hold the two
// "footers" that are filled later.
needed := preLen + cmdLen +
kvserverpb.MaxRaftCommandFooterSize() +
kvserverpb.MaxMaxLeaseFooterSize() +
kvserverpb.MaxClosedTimestampFooterSize()
data := make([]byte, preLen, needed)
// Encode prefix with command ID, if necessary.
Expand Down
17 changes: 8 additions & 9 deletions pkg/kv/kvserver/stateloader/stateloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,24 +298,23 @@ func (rsl StateLoader) LoadMVCCStats(
// keys. We now deem those keys to be "legacy" because they have been replaced
// by the range applied state key.
//
// TODO(andrei): closedTimestamp is a pointer to avoid an allocation when
// putting it in RangeAppliedState. RangeAppliedState.ClosedTimestamp is made
// non-nullable (see comments on the field), this argument should be taken by
// value.
// TODO(andrei): closedTS is a pointer to avoid an allocation when putting it in
// RangeAppliedState. RangeAppliedState.ClosedTimestamp is made non-nullable
// (see comments on the field), this argument should be taken by value.
func (rsl StateLoader) SetRangeAppliedState(
ctx context.Context,
readWriter storage.ReadWriter,
appliedIndex, leaseAppliedIndex uint64,
newMS *enginepb.MVCCStats,
closedTimestamp *hlc.Timestamp,
closedTS *hlc.Timestamp,
) error {
as := enginepb.RangeAppliedState{
RaftAppliedIndex: appliedIndex,
LeaseAppliedIndex: leaseAppliedIndex,
RangeStats: newMS.ToPersistentStats(),
}
if closedTimestamp != nil && !closedTimestamp.IsEmpty() {
as.ClosedTimestamp = closedTimestamp
if closedTS != nil && !closedTS.IsEmpty() {
as.ClosedTimestamp = closedTS
}
// The RangeAppliedStateKey is not included in stats. This is also reflected
// in C.MVCCComputeStats and ComputeStatsForRange.
Expand Down Expand Up @@ -498,15 +497,15 @@ func (rsl StateLoader) SetMVCCStats(

// SetClosedTimestamp overwrites the closed timestamp.
func (rsl StateLoader) SetClosedTimestamp(
ctx context.Context, readWriter storage.ReadWriter, closedTS hlc.Timestamp,
ctx context.Context, readWriter storage.ReadWriter, closedTS *hlc.Timestamp,
) error {
as, err := rsl.LoadRangeAppliedState(ctx, readWriter)
if err != nil {
return err
}
return rsl.SetRangeAppliedState(
ctx, readWriter, as.RaftAppliedIndex, as.LeaseAppliedIndex,
as.RangeStats.ToStatsPtr(), &closedTS)
as.RangeStats.ToStatsPtr(), closedTS)
}

// SetLegacyRaftTruncatedState overwrites the truncated state.
Expand Down
7 changes: 3 additions & 4 deletions pkg/kv/kvserver/store_split.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@ import (
// split commit.
func splitPreApply(
ctx context.Context,
r *Replica,
readWriter storage.ReadWriter,
split roachpb.SplitTrigger,
r *Replica,
// The closed timestamp used to initialize the RHS.
closedTS hlc.Timestamp,
initClosedTS *hlc.Timestamp,
) {
// Sanity check that the store is in the split.
//
Expand Down Expand Up @@ -123,7 +122,7 @@ func splitPreApply(
}

// Persist the closed timestamp.
if err := rsl.SetClosedTimestamp(ctx, readWriter, closedTS); err != nil {
if err := rsl.SetClosedTimestamp(ctx, readWriter, initClosedTS); err != nil {
log.Fatalf(ctx, "%s", err)
}

Expand Down