Skip to content

Commit

Permalink
kvserver: move LAI assignment to flush time
Browse files Browse the repository at this point in the history
We used to assign LAIs on proposals when the proposals were inserted
into the proposal buffer. Knowing the LAI at evaluation time was needed
by the old closed timestamp mechanism. Now that that mechanism is gone,
the assignment can move to the proposal buffer flush. Moving it there
mirrors the assignment of the closed timestamp, which is in relationship
with the LAI. This move also lets us simplify the proposal buffer code
significantly since atomically assigning an index into the propBuf's
array and an LAI is no longer needed. These assignments were done in a
semi-lock-free manner, and so doing it atomically was complex.

Release note: None
  • Loading branch information
andreimatei committed Aug 30, 2021
1 parent 1311032 commit 6d35693
Show file tree
Hide file tree
Showing 15 changed files with 451 additions and 776 deletions.
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,19 @@ func (r *Replica) GetLastIndex() (uint64, error) {
return r.raftLastIndexLocked()
}

// LastAssignedLeaseIndexRLocked returns the last assigned lease index.
func (r *Replica) LastAssignedLeaseIndex() uint64 {
r.mu.RLock()
defer r.mu.RUnlock()
return r.mu.proposalBuf.LastAssignedLeaseIndexRLocked()
}

// LastAssignedLeaseIndexRLocked is like LastAssignedLeaseIndex, but requires
// b.mu to be held in read mode.
func (b *propBuf) LastAssignedLeaseIndexRLocked() uint64 {
return b.assignedLAI
}

// SetQuotaPool allows the caller to set a replica's quota pool initialized to
// a given quota. Additionally it initializes the replica's quota release queue
// and its command sizes map. Only safe to call on the replica that is both
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvserverbase/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"//pkg/roachpb:with-mocks",
"//pkg/settings",
"//pkg/util/hlc",
"//pkg/util/quotapool",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
],
Expand Down
10 changes: 6 additions & 4 deletions pkg/kv/kvserver/kvserverbase/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)
Expand Down Expand Up @@ -68,10 +69,11 @@ type FilterArgs struct {

// ProposalFilterArgs groups the arguments to ReplicaProposalFilter.
type ProposalFilterArgs struct {
Ctx context.Context
Cmd kvserverpb.RaftCommand
CmdID CmdIDKey
Req roachpb.BatchRequest
Ctx context.Context
Cmd kvserverpb.RaftCommand
QuotaAlloc *quotapool.IntAlloc
CmdID CmdIDKey
Req roachpb.BatchRequest
}

// ApplyFilterArgs groups the arguments to a ReplicaApplyFilter.
Expand Down
19 changes: 5 additions & 14 deletions pkg/kv/kvserver/kvserverpb/proposer_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,19 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

var maxMaxLeaseFooterSize = (&MaxLeaseFooter{
var maxRaftCommandFooterSize = (&RaftCommandFooter{
MaxLeaseIndex: math.MaxUint64,
}).Size()

var maxClosedTimestampFooterSize = (&ClosedTimestampFooter{
ClosedTimestamp: hlc.Timestamp{
WallTime: math.MaxInt64,
Logical: math.MaxInt32,
Synthetic: true,
},
}).Size()

// MaxMaxLeaseFooterSize returns the maximum possible size of an encoded
// MaxLeaseFooter proto.
func MaxMaxLeaseFooterSize() int {
return maxMaxLeaseFooterSize
}

// MaxClosedTimestampFooterSize returns the maximmum possible size of an encoded
// ClosedTimestampFooter.
func MaxClosedTimestampFooterSize() int {
return maxClosedTimestampFooterSize
// MaxRaftCommandFooterSize returns the maximum possible size of an encoded
// RaftCommandFooter proto.
func MaxRaftCommandFooterSize() int {
return maxRaftCommandFooterSize
}

// IsZero returns whether all fields are set to their zero value.
Expand Down
384 changes: 134 additions & 250 deletions pkg/kv/kvserver/kvserverpb/proposer_kv.pb.go

Large diffs are not rendered by default.

29 changes: 13 additions & 16 deletions pkg/kv/kvserver/kvserverpb/proposer_kv.proto
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,12 @@ message RaftCommand {
// update. If the value is not zero, the value is greater or equal to that of
// the previous commands (and all before it).
//
// This field is set through ClosedTimestampFooter hackery. Unlike in the
// ClosedTimestampFooter, the field is nullable here so that it does not get
// encoded when empty. This prevents the field from being encoded twice in the
// combined RaftCommand+ClosedTimestampFooter proto.
// This field is set through ClosedTimestampFooter hackery. The field is
// nullable so that it does not get encoded when empty. This prevents the
// field from being encoded twice in the combined
// RaftCommand+ClosedTimestampFooter proto (encoding it twice is not illegal
// as far as proto goes - the last value wins when decoding - but it is a
// problem for sideloading, which reduces the size of the proto).
util.hlc.Timestamp closed_timestamp = 17;

reserved 3;
Expand All @@ -318,22 +320,17 @@ message RaftCommand {
reserved 1, 2, 10001 to 10014;
}

// MaxLeaseFooter contains a subset of the fields in RaftCommand. It is used
// RaftCommandFooter contains a subset of the fields in RaftCommand. It is used
// to optimize a pattern where most of the fields in RaftCommand are marshaled
// outside of a heavily contended critical section, except for the fields in the
// footer, which are assigned and marhsaled inside of the critical section and
// footer, which are assigned and marshaled inside of the critical section and
// appended to the marshaled byte buffer. This minimizes the memory allocation
// and marshaling work performed under lock.
message MaxLeaseFooter {
message RaftCommandFooter {
uint64 max_lease_index = 4;
}

// ClosedTimestampFooter is similar to MaxLeaseFooter, allowing the proposal
// buffer to fill in the closed_timestamp field after most of the proto has been
// marshaled already.
message ClosedTimestampFooter {
// NOTE: unlike in RaftCommand, there's no reason to make this field nullable.
// If we don't want to include the field, we don't need to append the encoded
// footer to an encoded RaftCommand buffer.
// NOTE: unlike in RaftCommand, there's no reason to make this field nullable
// and so we make it non-nullable in order to save allocations. This means
// that the field on a decoded RaftCommand will also never be nil, but we
// don't rely on that.
util.hlc.Timestamp closed_timestamp = 17 [(gogoproto.nullable) = false];
}
2 changes: 0 additions & 2 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1180,8 +1180,6 @@ func (r *Replica) State(ctx context.Context) kvserverpb.RangeInfo {
}
}
ri.RangeMaxBytes = r.mu.conf.RangeMaxBytes
if desc := ri.ReplicaState.Desc; desc != nil {
}
if r.mu.tenantID != (roachpb.TenantID{}) {
ri.TenantID = r.mu.tenantID.ToUint64()
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_application_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,11 @@ func (r *Replica) tryReproposeWithNewLeaseIndex(
// Some tests check for this log message in the trace.
log.VEventf(ctx, 2, "retry: proposalIllegalLeaseIndex")

maxLeaseIndex, pErr := r.propose(ctx, p, tok.Move(ctx))
pErr := r.propose(ctx, p, tok.Move(ctx))
if pErr != nil {
return pErr
}
log.VEventf(ctx, 2, "reproposed command %x at maxLeaseIndex=%d", cmd.idKey, maxLeaseIndex)
log.VEventf(ctx, 2, "reproposed command %x", cmd.idKey)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_closedts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func TestBumpSideTransportClosed(t *testing.T) {
},
{
// We can't close all the way up to the lease expiration. See
// propBuf.assignClosedTimestampToProposalLocked.
// propBuf.assignClosedTimestampAndLAIToProposalLocked.
name: "close lease expiration",
computeTarget: func(r *kvserver.Replica) (target hlc.Timestamp, exp bool) {
ls := r.LeaseStatusAt(context.Background(), r.Clock().NowAsClockTimestamp())
Expand Down
14 changes: 6 additions & 8 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ type ProposalData struct {
sp *tracing.Span

// idKey uniquely identifies this proposal.
// TODO(andreimatei): idKey is legacy at this point: We could easily key
// commands by their MaxLeaseIndex, and doing so should be ok with a stop-
// the-world migration. However, various test facilities depend on the
// command ID for e.g. replay protection.
// TODO(andrei): idKey is legacy at this point: We could easily key commands
// by their MaxLeaseIndex, and doing so should be ok with a stop- the-world
// migration. However, various test facilities depend on the command ID for
// e.g. replay protection. Later edit: the MaxLeaseIndex assignment has,
// however, moved to happen later, at proposal time.
idKey kvserverbase.CmdIDKey

// proposedAtTicks is the (logical) time at which this command was
Expand All @@ -82,9 +83,6 @@ type ProposalData struct {
// passed to r.mu.quotaReleaseQueue.
quotaAlloc *quotapool.IntAlloc

// tmpFooter is used to avoid an allocation.
tmpFooter kvserverpb.MaxLeaseFooter

// ec.done is called after command application to update the timestamp
// cache and optionally release latches and exits lock wait-queues.
ec endCmds
Expand Down Expand Up @@ -444,7 +442,7 @@ func (r *Replica) leasePostApplyLocked(

// Inform the propBuf about the new lease so that it can initialize its closed
// timestamp tracking.
r.mu.proposalBuf.OnLeaseChangeLocked(iAmTheLeaseHolder, r.mu.state.RaftClosedTimestamp)
r.mu.proposalBuf.OnLeaseChangeLocked(iAmTheLeaseHolder, r.mu.state.RaftClosedTimestamp, r.mu.state.LeaseAppliedIndex)

// Ordering is critical here. We only install the new lease after we've
// checked for an in-progress merge and updated the timestamp cache. If the
Expand Down
Loading

0 comments on commit 6d35693

Please sign in to comment.