diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index aca1ed748f08..2f1577eb6f8c 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -1932,12 +1932,10 @@ func (r *Replica) processRaftCommand( tmpBatch.Close() } - var delta enginepb.MVCCStats { var err error - delta, err = r.applyRaftCommand( + raftCmd.ReplicatedEvalResult, err = r.applyRaftCommand( ctx, idKey, raftCmd.ReplicatedEvalResult, raftIndex, leaseIndex, writeBatch) - raftCmd.ReplicatedEvalResult.Delta = delta.ToStatsDelta() // applyRaftCommand returned an error, which usually indicates // either a serious logic bug in CockroachDB or a disk @@ -2206,15 +2204,16 @@ func (r *Replica) acquireMergeLock( // underlying state machine (i.e. the engine). When the state machine can not be // updated, an error (which is likely fatal!) is returned and must be handled by // the caller. +// The returned ReplicatedEvalResult replaces the caller's. func (r *Replica) applyRaftCommand( ctx context.Context, idKey storagebase.CmdIDKey, rResult storagepb.ReplicatedEvalResult, raftAppliedIndex, leaseAppliedIndex uint64, writeBatch *storagepb.WriteBatch, -) (enginepb.MVCCStats, error) { +) (storagepb.ReplicatedEvalResult, error) { if raftAppliedIndex <= 0 { - return enginepb.MVCCStats{}, errors.New("raft command index is <= 0") + return storagepb.ReplicatedEvalResult{}, errors.New("raft command index is <= 0") } if writeBatch != nil && len(writeBatch.Data) > 0 { // Record the write activity, passing a 0 nodeID because replica.writeStats @@ -2253,7 +2252,7 @@ func (r *Replica) applyRaftCommand( // If we have an out of order index, there's corruption. No sense in // trying to update anything or running the command. Simply return // a corruption error. - return enginepb.MVCCStats{}, errors.Errorf("applied index jumped from %d to %d", + return storagepb.ReplicatedEvalResult{}, errors.Errorf("applied index jumped from %d to %d", oldRaftAppliedIndex, raftAppliedIndex) } @@ -2271,7 +2270,7 @@ func (r *Replica) applyRaftCommand( if writeBatch != nil { if err := batch.ApplyBatchRepr(writeBatch.Data, false); err != nil { - return enginepb.MVCCStats{}, errors.Wrap(err, "unable to apply WriteBatch") + return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to apply WriteBatch") } } @@ -2290,7 +2289,7 @@ func (r *Replica) applyRaftCommand( // that this new key is replacing. err := r.raftMu.stateLoader.MigrateToRangeAppliedStateKey(ctx, writer, &deltaStats) if err != nil { - return enginepb.MVCCStats{}, errors.Wrap(err, "unable to migrate to range applied state") + return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to migrate to range applied state") } usingAppliedStateKey = true } @@ -2305,7 +2304,7 @@ func (r *Replica) applyRaftCommand( // lease index along with the mvcc stats, all in one key. if err := r.raftMu.stateLoader.SetRangeAppliedState(ctx, writer, raftAppliedIndex, leaseAppliedIndex, &ms); err != nil { - return enginepb.MVCCStats{}, errors.Wrap(err, "unable to set range applied state") + return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to set range applied state") } } else { // Advance the last applied index. We use a blind write in order to avoid @@ -2314,7 +2313,7 @@ func (r *Replica) applyRaftCommand( var appliedIndexNewMS enginepb.MVCCStats if err := r.raftMu.stateLoader.SetLegacyAppliedIndexBlind(ctx, writer, &appliedIndexNewMS, raftAppliedIndex, leaseAppliedIndex); err != nil { - return enginepb.MVCCStats{}, errors.Wrap(err, "unable to set applied index") + return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to set applied index") } deltaStats.SysBytes += appliedIndexNewMS.SysBytes - r.raftMu.stateLoader.CalcAppliedIndexSysBytes(oldRaftAppliedIndex, oldLeaseAppliedIndex) @@ -2324,14 +2323,14 @@ func (r *Replica) applyRaftCommand( // across all deltaStats). ms.Add(deltaStats) if err := r.raftMu.stateLoader.SetMVCCStats(ctx, writer, &ms); err != nil { - return enginepb.MVCCStats{}, errors.Wrap(err, "unable to update MVCCStats") + return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to update MVCCStats") } } if haveTruncatedState { apply, err := handleTruncatedStateBelowRaft(ctx, oldTruncatedState, rResult.State.TruncatedState, r.raftMu.stateLoader, writer) if err != nil { - return enginepb.MVCCStats{}, err + return storagepb.ReplicatedEvalResult{}, err } if !apply { // TODO(tbg): As written, there is low confidence that nil'ing out @@ -2365,12 +2364,12 @@ func (r *Replica) applyRaftCommand( rsl := stateloader.Make(rResult.Split.RightDesc.RangeID) oldHS, err := rsl.LoadHardState(ctx, r.store.Engine()) if err != nil { - return enginepb.MVCCStats{}, errors.Wrap(err, "unable to load HardState") + return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to load HardState") } assertHS = &oldHS } if err := batch.Commit(false); err != nil { - return enginepb.MVCCStats{}, errors.Wrap(err, "could not commit batch") + return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "could not commit batch") } if assertHS != nil { @@ -2378,7 +2377,7 @@ func (r *Replica) applyRaftCommand( rsl := stateloader.Make(rResult.Split.RightDesc.RangeID) newHS, err := rsl.LoadHardState(ctx, r.store.Engine()) if err != nil { - return enginepb.MVCCStats{}, errors.Wrap(err, "unable to load HardState") + return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to load HardState") } // Assert that nothing moved "backwards". if newHS.Term < assertHS.Term || (newHS.Term == assertHS.Term && newHS.Commit < assertHS.Commit) { @@ -2389,7 +2388,8 @@ func (r *Replica) applyRaftCommand( elapsed := timeutil.Since(start) r.store.metrics.RaftCommandCommitLatency.RecordValue(elapsed.Nanoseconds()) - return deltaStats, nil + rResult.Delta = deltaStats.ToStatsDelta() + return rResult, nil } func handleTruncatedStateBelowRaft(