Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
136497: kv: use crtime.Mono in sendPartialBatch r=RaduBerinde a=RaduBerinde

Informs: #133315
Release note: None

136536: kvserver: consolidate RaftTruncatedState handling r=tbg a=pav-kv

This PR simplifies handling the `RaftTruncatedState` eval result, and adds TODO to remove a zero check after the next below-raft migration.

`RaftExpectedFirstIndex` used to be zero before f9dee66 which started setting this field after a `LooselyCoupledRaftLogTruncation` version gate. Since then, the version gate has been removed, and the field is set unconditionally.

But we still need to have done at least one below-raft migration to safely say that this field can't be zero.

Related to #136109

Co-authored-by: Radu Berinde <[email protected]>
Co-authored-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
3 people committed Dec 3, 2024
3 parents 0ea9ae6 + 5b363bb + 1f16c56 commit 1836289
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 37 deletions.
12 changes: 6 additions & 6 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/startup"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/crlib/crtime"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -2145,7 +2144,8 @@ func (ds *DistSender) sendPartialBatch(
// Start a retry loop for sending the batch to the range. Each iteration of
// this loop uses a new descriptor. Attempts to send to multiple replicas in
// this descriptor are done at a lower level.
tBegin, attempts := timeutil.Now(), int64(0) // for slow log message
tBegin := crtime.NowMono() // for slow log message
var attempts int64
// prevTok maintains the EvictionToken used on the previous iteration.
var prevTok rangecache.EvictionToken
for r := retry.StartWithCtx(ctx, ds.rpcRetryOptions); r.Next(); {
Expand Down Expand Up @@ -2202,7 +2202,7 @@ func (ds *DistSender) sendPartialBatch(
prevTok = routingTok
reply, err = ds.sendToReplicas(ctx, ba, routingTok, withCommit)

if dur := timeutil.Since(tBegin); dur > slowDistSenderRangeThreshold && !tBegin.IsZero() {
if dur := tBegin.Elapsed(); dur > slowDistSenderRangeThreshold && tBegin != 0 {
{
var s redact.StringBuilder
slowRangeRPCWarningStr(&s, ba, dur, attempts, routingTok.Desc(), err, reply)
Expand All @@ -2212,14 +2212,14 @@ func (ds *DistSender) sendPartialBatch(
// RPC is not retried any more.
if err != nil || reply.Error != nil {
ds.metrics.SlowRPCs.Inc(1)
defer func(tBegin time.Time, attempts int64) {
defer func(tBegin crtime.Mono, attempts int64) {
ds.metrics.SlowRPCs.Dec(1)
var s redact.StringBuilder
slowRangeRPCReturnWarningStr(&s, timeutil.Since(tBegin), attempts)
slowRangeRPCReturnWarningStr(&s, tBegin.Elapsed(), attempts)
log.Warningf(ctx, "slow RPC response: %v", &s)
}(tBegin, attempts)
}
tBegin = time.Time{} // prevent reentering branch for this RPC
tBegin = 0 // prevent reentering branch for this RPC
}

if err != nil {
Expand Down
2 changes: 0 additions & 2 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1643,8 +1643,6 @@ message TruncateLogRequest {
// that is not equal to ExpectedFirstIndex. This is an optimization that
// typically allows the potentially expensive computation of the bytes being
// discarded from the raft log to be performed once, at the leaseholder.
//
// Populated starting at cluster version LooselyCoupledRaftLogTruncation.
uint64 expected_first_index = 4 [(gogoproto.casttype) = "RaftIndex"];
}

Expand Down
12 changes: 9 additions & 3 deletions pkg/kv/kvserver/kvserverpb/proposer_kv.proto
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,15 @@ message ReplicatedEvalResult {
// Sideloaded entries are not expected to be common enough that it is worth
// the optimization to calculate the delta once (at the leaseholder).
int64 raft_log_delta = 13;
// RaftExpectedFirstIndex is populated starting at cluster version
// LooselyCoupledRaftLogTruncation. When this is not populated, the replica
// should not delay enacting the truncation.
// RaftExpectedFirstIndex is the first index of the raft log used for
// computing the RaftLogDelta. It is populated only when evaluating a
// TruncateLogRequest.
//
// Introduced in v22.1. There can be historical truncation proposals with this
// field being zero.
// TODO(pav-kv): the likelihood of this is low, but to err on the safe side we
// should wait for the next below-raft migration, at which point we'll be able
// to say that this field is always populated.
uint64 raft_expected_first_index = 25 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/kv/kvpb.RaftIndex"];

// MVCCHistoryMutation describes mutations of MVCC history that may violate
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/raft_log_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,11 +708,11 @@ func (rlq *raftLogQueue) process(
}
b := &kv.Batch{}
truncRequest := &kvpb.TruncateLogRequest{
RequestHeader: kvpb.RequestHeader{Key: r.Desc().StartKey.AsRawKey()},
Index: decision.NewFirstIndex,
RangeID: r.RangeID,
RequestHeader: kvpb.RequestHeader{Key: r.Desc().StartKey.AsRawKey()},
Index: decision.NewFirstIndex,
RangeID: r.RangeID,
ExpectedFirstIndex: decision.Input.FirstIndex,
}
truncRequest.ExpectedFirstIndex = decision.Input.FirstIndex
b.AddRawRequest(truncRequest)
if err := rlq.db.Run(ctx, b); err != nil {
return false, err
Expand Down
45 changes: 23 additions & 22 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,15 +282,9 @@ func (sm *replicaStateMachine) handleNonTrivialReplicatedEvalResult(
log.Fatalf(ctx, "zero-value ReplicatedEvalResult passed to handleNonTrivialReplicatedEvalResult")
}

isRaftLogTruncationDeltaTrusted := true
handleTruncatedState := func(state *kvserverpb.RaftTruncatedState) {
raftLogDelta, expectedFirstIndexWasAccurate := sm.r.handleTruncatedStateResult(
ctx, state, rResult.RaftExpectedFirstIndex)
if !expectedFirstIndexWasAccurate && rResult.RaftExpectedFirstIndex != 0 {
isRaftLogTruncationDeltaTrusted = false
}
rResult.RaftLogDelta += raftLogDelta
rResult.RaftExpectedFirstIndex = 0
truncState := rResult.RaftTruncatedState
if truncState != nil {
rResult.RaftTruncatedState = nil
}

if rResult.State != nil {
Expand All @@ -300,13 +294,11 @@ func (sm *replicaStateMachine) handleNonTrivialReplicatedEvalResult(
rResult.PriorReadSummary = nil
}

// TODO(#93248): the strongly coupled truncation code will be removed once
// the loosely coupled truncations are the default.
if newTruncState := rResult.State.TruncatedState; newTruncState != nil {
if rResult.RaftTruncatedState != nil {
if truncState != nil {
log.Fatalf(ctx, "double RaftTruncatedState in ReplicatedEvalResult")
}
handleTruncatedState(newTruncState)
truncState = newTruncState
rResult.State.TruncatedState = nil
}

Expand All @@ -324,17 +316,26 @@ func (sm *replicaStateMachine) handleNonTrivialReplicatedEvalResult(
rResult.State = nil
}
}
if newTruncState := rResult.RaftTruncatedState; newTruncState != nil {
handleTruncatedState(newTruncState)
rResult.RaftTruncatedState = nil
}

if rResult.RaftLogDelta != 0 {
// This code path will be taken exactly when one of the preceding blocks has
// invoked handleTruncatedState. It is needlessly confusing that these two
// are not in the same place.
sm.r.handleRaftLogDeltaResult(ctx, rResult.RaftLogDelta, isRaftLogTruncationDeltaTrusted)
// TODO(#93248): the strongly coupled truncation code will be removed once the
// loosely coupled truncations are the default.
if truncState != nil {
// NB: raftLogDelta reflects removals of any sideloaded entries.
raftLogDelta, expectedFirstIndexWasAccurate := sm.r.handleTruncatedStateResult(
ctx, truncState, rResult.RaftExpectedFirstIndex)
// NB: The RaftExpectedFirstIndex field is zero if this proposal is from
// before v22.1 that added it, when all truncations were strongly coupled.
// The delta in these historical proposals is thus accurate.
// TODO(pav-kv): remove the zero check after any below-raft migration.
isRaftLogTruncationDeltaTrusted := expectedFirstIndexWasAccurate ||
rResult.RaftExpectedFirstIndex == 0
// The proposer hasn't included the sideloaded entries into the delta. We
// counted these above, and combine the deltas.
raftLogDelta += rResult.RaftLogDelta
sm.r.handleRaftLogDeltaResult(ctx, raftLogDelta, isRaftLogTruncationDeltaTrusted)

rResult.RaftLogDelta = 0
rResult.RaftExpectedFirstIndex = 0
}

// The rest of the actions are "nontrivial" and may have large effects on the
Expand Down

0 comments on commit 1836289

Please sign in to comment.