Skip to content

Commit

Permalink
storage: facilitate side effect mutation in applyRaftCommand
Browse files Browse the repository at this point in the history
Future code changes (sending snapshots without historical Raft log) will
make it necessary to sometimes alter the ReplicatedEvalResult in
applyRaftCommand (turning a TruncatedState updated into a noop).

Make this possible by returning the modified result back up to the
caller.

Release note: None
  • Loading branch information
tbg committed Mar 13, 2019
1 parent a512e39 commit b40672f
Showing 1 changed file with 16 additions and 16 deletions.
32 changes: 16 additions & 16 deletions pkg/storage/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1932,12 +1932,10 @@ func (r *Replica) processRaftCommand(
tmpBatch.Close()
}

var delta enginepb.MVCCStats
{
var err error
delta, err = r.applyRaftCommand(
raftCmd.ReplicatedEvalResult, err = r.applyRaftCommand(
ctx, idKey, raftCmd.ReplicatedEvalResult, raftIndex, leaseIndex, writeBatch)
raftCmd.ReplicatedEvalResult.Delta = delta.ToStatsDelta()

// applyRaftCommand returned an error, which usually indicates
// either a serious logic bug in CockroachDB or a disk
Expand Down Expand Up @@ -2206,15 +2204,16 @@ func (r *Replica) acquireMergeLock(
// underlying state machine (i.e. the engine). When the state machine can not be
// updated, an error (which is likely fatal!) is returned and must be handled by
// the caller.
// The returned ReplicatedEvalResult replaces the caller's.
func (r *Replica) applyRaftCommand(
ctx context.Context,
idKey storagebase.CmdIDKey,
rResult storagepb.ReplicatedEvalResult,
raftAppliedIndex, leaseAppliedIndex uint64,
writeBatch *storagepb.WriteBatch,
) (enginepb.MVCCStats, error) {
) (storagepb.ReplicatedEvalResult, error) {
if raftAppliedIndex <= 0 {
return enginepb.MVCCStats{}, errors.New("raft command index is <= 0")
return storagepb.ReplicatedEvalResult{}, errors.New("raft command index is <= 0")
}
if writeBatch != nil && len(writeBatch.Data) > 0 {
// Record the write activity, passing a 0 nodeID because replica.writeStats
Expand Down Expand Up @@ -2253,7 +2252,7 @@ func (r *Replica) applyRaftCommand(
// If we have an out of order index, there's corruption. No sense in
// trying to update anything or running the command. Simply return
// a corruption error.
return enginepb.MVCCStats{}, errors.Errorf("applied index jumped from %d to %d",
return storagepb.ReplicatedEvalResult{}, errors.Errorf("applied index jumped from %d to %d",
oldRaftAppliedIndex, raftAppliedIndex)
}

Expand All @@ -2271,7 +2270,7 @@ func (r *Replica) applyRaftCommand(

if writeBatch != nil {
if err := batch.ApplyBatchRepr(writeBatch.Data, false); err != nil {
return enginepb.MVCCStats{}, errors.Wrap(err, "unable to apply WriteBatch")
return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to apply WriteBatch")
}
}

Expand All @@ -2290,7 +2289,7 @@ func (r *Replica) applyRaftCommand(
// that this new key is replacing.
err := r.raftMu.stateLoader.MigrateToRangeAppliedStateKey(ctx, writer, &deltaStats)
if err != nil {
return enginepb.MVCCStats{}, errors.Wrap(err, "unable to migrate to range applied state")
return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to migrate to range applied state")
}
usingAppliedStateKey = true
}
Expand All @@ -2305,7 +2304,7 @@ func (r *Replica) applyRaftCommand(
// lease index along with the mvcc stats, all in one key.
if err := r.raftMu.stateLoader.SetRangeAppliedState(ctx, writer,
raftAppliedIndex, leaseAppliedIndex, &ms); err != nil {
return enginepb.MVCCStats{}, errors.Wrap(err, "unable to set range applied state")
return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to set range applied state")
}
} else {
// Advance the last applied index. We use a blind write in order to avoid
Expand All @@ -2314,7 +2313,7 @@ func (r *Replica) applyRaftCommand(
var appliedIndexNewMS enginepb.MVCCStats
if err := r.raftMu.stateLoader.SetLegacyAppliedIndexBlind(ctx, writer, &appliedIndexNewMS,
raftAppliedIndex, leaseAppliedIndex); err != nil {
return enginepb.MVCCStats{}, errors.Wrap(err, "unable to set applied index")
return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to set applied index")
}
deltaStats.SysBytes += appliedIndexNewMS.SysBytes -
r.raftMu.stateLoader.CalcAppliedIndexSysBytes(oldRaftAppliedIndex, oldLeaseAppliedIndex)
Expand All @@ -2324,14 +2323,14 @@ func (r *Replica) applyRaftCommand(
// across all deltaStats).
ms.Add(deltaStats)
if err := r.raftMu.stateLoader.SetMVCCStats(ctx, writer, &ms); err != nil {
return enginepb.MVCCStats{}, errors.Wrap(err, "unable to update MVCCStats")
return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to update MVCCStats")
}
}

if haveTruncatedState {
apply, err := handleTruncatedStateBelowRaft(ctx, oldTruncatedState, rResult.State.TruncatedState, r.raftMu.stateLoader, writer)
if err != nil {
return enginepb.MVCCStats{}, err
return storagepb.ReplicatedEvalResult{}, err
}
if !apply {
// TODO(tbg): As written, there is low confidence that nil'ing out
Expand Down Expand Up @@ -2365,20 +2364,20 @@ func (r *Replica) applyRaftCommand(
rsl := stateloader.Make(rResult.Split.RightDesc.RangeID)
oldHS, err := rsl.LoadHardState(ctx, r.store.Engine())
if err != nil {
return enginepb.MVCCStats{}, errors.Wrap(err, "unable to load HardState")
return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to load HardState")
}
assertHS = &oldHS
}
if err := batch.Commit(false); err != nil {
return enginepb.MVCCStats{}, errors.Wrap(err, "could not commit batch")
return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "could not commit batch")
}

if assertHS != nil {
// Load the HardState that was just committed (if any).
rsl := stateloader.Make(rResult.Split.RightDesc.RangeID)
newHS, err := rsl.LoadHardState(ctx, r.store.Engine())
if err != nil {
return enginepb.MVCCStats{}, errors.Wrap(err, "unable to load HardState")
return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to load HardState")
}
// Assert that nothing moved "backwards".
if newHS.Term < assertHS.Term || (newHS.Term == assertHS.Term && newHS.Commit < assertHS.Commit) {
Expand All @@ -2389,7 +2388,8 @@ func (r *Replica) applyRaftCommand(

elapsed := timeutil.Since(start)
r.store.metrics.RaftCommandCommitLatency.RecordValue(elapsed.Nanoseconds())
return deltaStats, nil
rResult.Delta = deltaStats.ToStatsDelta()
return rResult, nil
}

func handleTruncatedStateBelowRaft(
Expand Down

0 comments on commit b40672f

Please sign in to comment.