Skip to content

Commit

Permalink
kv: make RaftCommand.ClosedTimestamp nullable
Browse files Browse the repository at this point in the history
Fixes #60852.
Fixes #60833.
Fixes #58298.
Fixes #59428.
Fixes #60756.
Fixes #60848.
Fixes #60849.

In #60852 and related issues, we saw that the introduction of a non-nullable
`RaftCommand.ClosedTimestamp`, coupled with the `ClosedTimestampFooter` encoding
strategy we use, led to encoded `RaftCommand` protos with their ClosedTimestamp
field set twice. This is ok from a correctness perspective, at least as protobuf
is concerned, but it led to a subtle interaction where the process of passing
through sideloading (`maybeInlineSideloadedRaftCommand(maybeSideloadEntriesImpl(e))`)
would reduce the size of an encoded RaftCommand by 3 bytes (the encoded size of
an empty `hlc.Timestamp`). This was resulting in an `uncommittedSize` leak in
Raft, which was eventually stalling on its `MaxUncommittedEntriesSize` limit.

This commit fixes this issue by making `RaftCommand.ClosedTimestamp` nullable.
With the field marked as nullable, it will no longer be encoded as an empty
timestamp when unset, ensuring that when the encoded `ClosedTimestampFooter` is
appended, it contains the only instance of the `ClosedTimestamp` field.
  • Loading branch information
nvanbenschoten committed Feb 23, 2021
1 parent 4317460 commit a2b9c19
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 146 deletions.
16 changes: 8 additions & 8 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3356,15 +3356,15 @@ func TestProposalOverhead(t *testing.T) {
defer tc.Stopper().Stop(ctx)

db := tc.Server(0).DB()
// NB: the expected overhead reflects the space overhead currently
// present in Raft commands. This test will fail if that overhead
// changes. Try to make this number go down and not up. It slightly
// undercounts because our proposal filter is called before
// maxLeaseIndex is filled in. The difference between the user and system
// overhead is that users ranges do not have rangefeeds on by default whereas
// system ranges do.
// NB: the expected overhead reflects the space overhead currently present
// in Raft commands. This test will fail if that overhead changes. Try to
// make this number go down and not up. It slightly undercounts because our
// proposal filter is called before MaxLeaseIndex or ClosedTimestamp are
// filled in. The difference between the user and system overhead is that
// users ranges do not have rangefeeds on by default whereas system ranges
// do.
const (
expectedUserOverhead uint32 = 45
expectedUserOverhead uint32 = 42
)
t.Run("user-key overhead", func(t *testing.T) {
userKey := tc.ScratchRange(t)
Expand Down
250 changes: 132 additions & 118 deletions pkg/kv/kvserver/kvserverpb/proposer_kv.pb.go

Large diffs are not rendered by default.

10 changes: 8 additions & 2 deletions pkg/kv/kvserver/kvserverpb/proposer_kv.proto
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,11 @@ message RaftCommand {
// in a command-specific way. If the value is not zero, the value is greater
// or equal to that of the previous commands (and all before it).
//
// This field is set through ClosedTimestampFooter hackery.
util.hlc.Timestamp closed_timestamp = 17 [(gogoproto.nullable) = false];
// This field is set through ClosedTimestampFooter hackery. Unlike in the
// ClosedTimestampFooter, the field is nullable here so that it does not get
// encoded when empty. This prevents the field from being encoded twice in the
// combined RaftCommand+ClosedTimestampFooter proto.
util.hlc.Timestamp closed_timestamp = 17;

reserved 3;

Expand Down Expand Up @@ -298,5 +301,8 @@ message RaftCommandFooter {
// buffer to fill in the closed_timestamp field after most of the proto has been
// marshaled already.
message ClosedTimestampFooter {
// NOTE: unlike in RaftCommand, there's no reason to make this field nullable.
// If we don't want to include the field, we don't need to append the encoded
// footer to an encoded RaftCommand buffer.
util.hlc.Timestamp closed_timestamp = 17 [(gogoproto.nullable) = false];
}
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ func (b *replicaAppBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error
cmd.raftCmd.ReplicatedEvalResult = kvserverpb.ReplicatedEvalResult{}
cmd.raftCmd.WriteBatch = nil
cmd.raftCmd.LogicalOpLog = nil
cmd.raftCmd.ClosedTimestamp.Reset()
cmd.raftCmd.ClosedTimestamp = nil
} else {
// Assert that we're not writing under the closed timestamp. We can only do
// these checks on IsIntentWrite requests, since others (for example,
Expand Down Expand Up @@ -639,7 +639,7 @@ func (b *replicaAppBatch) runPreApplyTriggersAfterStagingWriteBatch(
//
// Alternatively if we discover that the RHS has already been removed
// from this store, clean up its data.
splitPreApply(ctx, b.batch, res.Split.SplitTrigger, b.r, cmd.raftCmd.ClosedTimestamp)
splitPreApply(ctx, b.r, b.batch, res.Split.SplitTrigger, cmd.raftCmd.ClosedTimestamp)

// The rangefeed processor will no longer be provided logical ops for
// its entire range, so it needs to be shut down and all registrations
Expand Down Expand Up @@ -823,13 +823,13 @@ func (b *replicaAppBatch) stageTrivialReplicatedEvalResult(
if leaseAppliedIndex := cmd.leaseIndex; leaseAppliedIndex != 0 {
b.state.LeaseAppliedIndex = leaseAppliedIndex
}
if cts := cmd.raftCmd.ClosedTimestamp; !cts.IsEmpty() {
if cts := cmd.raftCmd.ClosedTimestamp; cts != nil && !cts.IsEmpty() {
if cts.Less(b.state.ClosedTimestamp) {
log.Fatalf(ctx,
"closed timestamp regressing from %s to %s when applying command %x",
b.state.ClosedTimestamp, cts, cmd.idKey)
}
b.state.ClosedTimestamp = cts
b.state.ClosedTimestamp = *cts
if clockTS, ok := cts.TryToClockTimestamp(); ok {
b.maxTS.Forward(clockTS)
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/kv/kvserver/replica_proposal_buf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,12 @@ func TestProposalBufferClosedTimestamp(t *testing.T) {
type reqType int
checkClosedTS := func(t *testing.T, r *testProposerRaft, exp hlc.Timestamp) {
require.Len(t, r.lastProps, 1)
require.Equal(t, exp, r.lastProps[0].ClosedTimestamp)
if exp.IsEmpty() {
require.Nil(t, r.lastProps[0].ClosedTimestamp)
} else {
require.NotNil(t, r.lastProps[0].ClosedTimestamp)
require.Equal(t, exp, *r.lastProps[0].ClosedTimestamp)
}
}

// The lease that the proposals are made under.
Expand Down
17 changes: 8 additions & 9 deletions pkg/kv/kvserver/stateloader/stateloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,24 +298,23 @@ func (rsl StateLoader) LoadMVCCStats(
// keys. We now deem those keys to be "legacy" because they have been replaced
// by the range applied state key.
//
// TODO(andrei): closedTimestamp is a pointer to avoid an allocation when
// putting it in RangeAppliedState. RangeAppliedState.ClosedTimestamp is made
// non-nullable (see comments on the field), this argument should be taken by
// value.
// TODO(andrei): closedTS is a pointer to avoid an allocation when putting it in
// RangeAppliedState. RangeAppliedState.ClosedTimestamp is made non-nullable
// (see comments on the field), this argument should be taken by value.
func (rsl StateLoader) SetRangeAppliedState(
ctx context.Context,
readWriter storage.ReadWriter,
appliedIndex, leaseAppliedIndex uint64,
newMS *enginepb.MVCCStats,
closedTimestamp *hlc.Timestamp,
closedTS *hlc.Timestamp,
) error {
as := enginepb.RangeAppliedState{
RaftAppliedIndex: appliedIndex,
LeaseAppliedIndex: leaseAppliedIndex,
RangeStats: newMS.ToPersistentStats(),
}
if closedTimestamp != nil && !closedTimestamp.IsEmpty() {
as.ClosedTimestamp = closedTimestamp
if closedTS != nil && !closedTS.IsEmpty() {
as.ClosedTimestamp = closedTS
}
// The RangeAppliedStateKey is not included in stats. This is also reflected
// in C.MVCCComputeStats and ComputeStatsForRange.
Expand Down Expand Up @@ -498,15 +497,15 @@ func (rsl StateLoader) SetMVCCStats(

// SetClosedTimestamp overwrites the closed timestamp.
func (rsl StateLoader) SetClosedTimestamp(
ctx context.Context, readWriter storage.ReadWriter, closedTS hlc.Timestamp,
ctx context.Context, readWriter storage.ReadWriter, closedTS *hlc.Timestamp,
) error {
as, err := rsl.LoadRangeAppliedState(ctx, readWriter)
if err != nil {
return err
}
return rsl.SetRangeAppliedState(
ctx, readWriter, as.RaftAppliedIndex, as.LeaseAppliedIndex,
as.RangeStats.ToStatsPtr(), &closedTS)
as.RangeStats.ToStatsPtr(), closedTS)
}

// SetLegacyRaftTruncatedState overwrites the truncated state.
Expand Down
7 changes: 3 additions & 4 deletions pkg/kv/kvserver/store_split.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@ import (
// split commit.
func splitPreApply(
ctx context.Context,
r *Replica,
readWriter storage.ReadWriter,
split roachpb.SplitTrigger,
r *Replica,
// The closed timestamp used to initialize the RHS.
closedTS hlc.Timestamp,
initClosedTS *hlc.Timestamp,
) {
// Sanity check that the store is in the split.
//
Expand Down Expand Up @@ -123,7 +122,7 @@ func splitPreApply(
}

// Persist the closed timestamp.
if err := rsl.SetClosedTimestamp(ctx, readWriter, closedTS); err != nil {
if err := rsl.SetClosedTimestamp(ctx, readWriter, initClosedTS); err != nil {
log.Fatalf(ctx, "%s", err)
}

Expand Down

0 comments on commit a2b9c19

Please sign in to comment.