Skip to content

Commit

Permalink
storage: hoist split side effects
Browse files Browse the repository at this point in the history
Previously, a committed split would have the Store process that split
via a `batch.Defer`. As part of cockroachdb#6286, this `batch.Defer` was removed
and explicit metainformation is now passed up the callchain up to
`(*Replica).processRaftCommand`, where it is translated to calls to
the Store.

A similar mechanism was previously used for passing up skipped intents,
and that mechanism has been folded into this new one (with more side
effects to be extracted in future work).
  • Loading branch information
tbg committed Jul 26, 2016
1 parent c1114f9 commit 76fd805
Show file tree
Hide file tree
Showing 3 changed files with 217 additions and 110 deletions.
138 changes: 81 additions & 57 deletions storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1179,8 +1179,8 @@ func (r *Replica) addReadOnlyCmd(ctx context.Context, ba roachpb.BatchRequest) (
// Execute read-only batch command. It checks for matching key range; note
// that holding readMu throughout is important to avoid reads from the
// "wrong" key range being served after the range has been split.
var intents []intentsWithArg
br, intents, pErr = r.executeBatch(ctx, storagebase.CmdIDKey(""), r.store.Engine(), nil, ba)
var trigger *PostCommitTrigger
br, trigger, pErr = r.executeBatch(ctx, storagebase.CmdIDKey(""), r.store.Engine(), nil, ba)

if pErr == nil && ba.Txn != nil {
r.assert5725(ba)
Expand All @@ -1189,7 +1189,9 @@ func (r *Replica) addReadOnlyCmd(ctx context.Context, ba roachpb.BatchRequest) (
// described in #2231.
pErr = r.checkIfTxnAborted(ctx, r.store.Engine(), *ba.Txn)
}
r.store.intentResolver.processIntentsAsync(r, intents)
if trigger != nil && len(trigger.intents) > 0 {
r.store.intentResolver.processIntentsAsync(r, trigger.intents)
}
return br, pErr
}

Expand Down Expand Up @@ -1552,7 +1554,7 @@ func (r *Replica) handleRaftReady() error {
if err := command.Unmarshal(ctx.Payload); err != nil {
return err
}
if err := r.processRaftCommand(storagebase.CmdIDKey(ctx.CommandID), e.Index, command); err != nil {
if pErr := r.processRaftCommand(storagebase.CmdIDKey(ctx.CommandID), e.Index, command); pErr != nil {
// If processRaftCommand failed, tell raft that the config change was aborted.
cc = raftpb.ConfChange{}
}
Expand All @@ -1566,7 +1568,6 @@ func (r *Replica) handleRaftReady() error {
default:
log.Fatalf(context.TODO(), "%s: unexpected Raft entry: %v", r, e)
}

}
if shouldReproposeCmds {
r.mu.Lock()
Expand Down Expand Up @@ -1746,7 +1747,9 @@ func (r *Replica) refurbishPendingCmdLocked(cmd *pendingCmd) *roachpb.Error {
// As a special case, the zero idKey signifies an empty Raft command,
// which will apply as a no-op (without accessing raftCmd, via an error),
// updating only the applied index.
func (r *Replica) processRaftCommand(idKey storagebase.CmdIDKey, index uint64, raftCmd roachpb.RaftCommand) *roachpb.Error {
func (r *Replica) processRaftCommand(
idKey storagebase.CmdIDKey, index uint64, raftCmd roachpb.RaftCommand,
) *roachpb.Error {
if index == 0 {
log.Fatalf(context.TODO(), "%s: processRaftCommand requires a non-zero index", r)
}
Expand Down Expand Up @@ -1875,18 +1878,46 @@ func (r *Replica) processRaftCommand(idKey storagebase.CmdIDKey, index uint64, r
log.Infof(context.TODO(), "%s: applying command with forced error: %v", r, forcedErr)
}

br, err := r.applyRaftCommand(idKey, ctx, index, leaseIndex,
br, trigger, pErr := r.applyRaftCommand(idKey, ctx, index, leaseIndex,
raftCmd.OriginReplica, raftCmd.Cmd, forcedErr)
err = r.maybeSetCorrupt(err)
pErr = r.maybeSetCorrupt(pErr)

// Handle all returned side effects. This must happen after commit but
// before returning to the client.
if trigger != nil {
if trigger.split != nil {
if pErr != nil {
panic("split trigger emitted but error returned")
}
splitTriggerPostCommit(
context.Background(),
trigger.split.MVCCStats,
&trigger.split.SplitTrigger,
r,
)
}

// On the replica on which this command originated, resolve skipped intents
// asynchronously - even on failure.
if raftCmd.OriginReplica.StoreID == r.store.StoreID() {
r.store.intentResolver.processIntentsAsync(r, trigger.intents)
}
}
// On successful write commands handle write-related triggers including
// splitting and raft log truncation.
if pErr == nil && raftCmd.Cmd.IsWrite() {
r.maybeAddToSplitQueue()
r.maybeAddToRaftLogQueue(index)
}

if cmd != nil {
cmd.done <- roachpb.ResponseWithError{Reply: br, Err: err}
cmd.done <- roachpb.ResponseWithError{Reply: br, Err: pErr}
close(cmd.done)
} else if err != nil && log.V(1) {
log.Errorf(context.TODO(), "%s: error executing raft command: %s", r, err)
} else if pErr != nil && log.V(1) {
log.Errorf(context.TODO(), "%s: error executing raft command: %s", r, pErr)
}

return err
return pErr
}

// applyRaftCommand applies a raft command from the replicated log to the
Expand All @@ -1900,7 +1931,7 @@ func (r *Replica) applyRaftCommand(
originReplica roachpb.ReplicaDescriptor,
ba roachpb.BatchRequest,
forcedError *roachpb.Error,
) (*roachpb.BatchResponse, *roachpb.Error) {
) (*roachpb.BatchResponse, *PostCommitTrigger, *roachpb.Error) {
if index <= 0 {
log.Fatalf(ctx, "raft command index is <= 0")
}
Expand All @@ -1917,15 +1948,15 @@ func (r *Replica) applyRaftCommand(
r.mu.Unlock()

if index != oldIndex+1 {
return nil, roachpb.NewError(newReplicaCorruptionError(errors.Errorf("applied index jumped from %d to %d", oldIndex, index)))
return nil, nil, roachpb.NewError(newReplicaCorruptionError(errors.Errorf("applied index jumped from %d to %d", oldIndex, index)))
}

// Call the helper, which returns a batch containing data written
// during command execution and any associated error.
var batch engine.Batch
var br *roachpb.BatchResponse
var ms enginepb.MVCCStats
var intents []intentsWithArg
var trigger *PostCommitTrigger
var rErr *roachpb.Error

if forcedError != nil {
Expand All @@ -1935,7 +1966,7 @@ func (r *Replica) applyRaftCommand(
})
br, rErr = nil, forcedError
} else {
batch, ms, br, intents, rErr = r.applyRaftCommandInBatch(ctx, idKey,
batch, ms, br, trigger, rErr = r.applyRaftCommandInBatch(ctx, idKey,
originReplica, ba)
}

Expand Down Expand Up @@ -1988,20 +2019,7 @@ func (r *Replica) applyRaftCommand(
rErr = roachpb.NewError(newReplicaCorruptionError(errors.Errorf("could not commit batch"), err, rErr.GoError()))
}

// On successful write commands handle write-related triggers including
// splitting and raft log truncation.
if rErr == nil && ba.IsWrite() {
r.maybeAddToSplitQueue()
r.maybeAddToRaftLogQueue(index)
}

// On the replica on which this command originated, resolve skipped intents
// asynchronously - even on failure.
if originReplica.StoreID == r.store.StoreID() {
r.store.intentResolver.processIntentsAsync(r, intents)
}

return br, rErr
return br, trigger, rErr
}

// applyRaftCommandInBatch executes the command in a batch engine and
Expand All @@ -2012,7 +2030,12 @@ func (r *Replica) applyRaftCommandInBatch(
idKey storagebase.CmdIDKey,
originReplica roachpb.ReplicaDescriptor,
ba roachpb.BatchRequest,
) (engine.Batch, enginepb.MVCCStats, *roachpb.BatchResponse, []intentsWithArg, *roachpb.Error) {
) (
engine.Batch,
enginepb.MVCCStats,
*roachpb.BatchResponse,
*PostCommitTrigger, *roachpb.Error,
) {
// Check whether this txn has been aborted. Only applies to transactional
// requests which write intents (for example HeartbeatTxn does not get
// hindered by this).
Expand All @@ -2030,7 +2053,7 @@ func (r *Replica) applyRaftCommandInBatch(
// Execute the commands. If this returns without an error, the batch must
// be committed (EndTransaction with a CommitTrigger may unlock
// readOnlyCmdMu via a batch.Defer).
btch, ms, br, intents, pErr := r.executeWriteBatch(ctx, idKey, ba)
btch, ms, br, trigger, pErr := r.executeWriteBatch(ctx, idKey, ba)

if ba.IsWrite() {
if pErr != nil {
Expand All @@ -2055,7 +2078,7 @@ func (r *Replica) applyRaftCommandInBatch(
}
}

return btch, ms, br, intents, pErr
return btch, ms, br, trigger, pErr
}

// checkIfTxnAborted checks the txn abort cache for the given
Expand Down Expand Up @@ -2104,14 +2127,18 @@ type intentsWithArg struct {
// txn is restored and it's re-executed as transactional.
func (r *Replica) executeWriteBatch(
ctx context.Context, idKey storagebase.CmdIDKey, ba roachpb.BatchRequest) (
engine.Batch, enginepb.MVCCStats, *roachpb.BatchResponse, []intentsWithArg, *roachpb.Error) {
engine.Batch,
enginepb.MVCCStats,
*roachpb.BatchResponse,
*PostCommitTrigger, *roachpb.Error,
) {
batch := r.store.Engine().NewBatch()
ms := enginepb.MVCCStats{}
// If not transactional or there are indications that the batch's txn
// will require restart or retry, execute as normal.
if r.store.TestingKnobs().DisableOnePhaseCommits || !isOnePhaseCommit(ba) {
br, intents, pErr := r.executeBatch(ctx, idKey, batch, &ms, ba)
return batch, ms, br, intents, pErr
br, trigger, pErr := r.executeBatch(ctx, idKey, batch, &ms, ba)
return batch, ms, br, trigger, pErr
}

// Try executing with transaction stripped.
Expand All @@ -2120,7 +2147,7 @@ func (r *Replica) executeWriteBatch(
strippedBa.Requests = ba.Requests[1 : len(ba.Requests)-1] // strip begin/end txn reqs

// If all writes occurred at the intended timestamp, we've succeeded on the fast path.
br, intents, pErr := r.executeBatch(ctx, idKey, batch, &ms, strippedBa)
br, trigger, pErr := r.executeBatch(ctx, idKey, batch, &ms, strippedBa)
if pErr == nil && ba.Timestamp == br.Timestamp {
clonedTxn := ba.Txn.Clone()
clonedTxn.Writing = true
Expand All @@ -2136,24 +2163,25 @@ func (r *Replica) executeWriteBatch(
ms = enginepb.MVCCStats{}
} else {
// Run commit trigger manually.
if err := r.runCommitTrigger(ctx, batch, &ms, *etArg, &clonedTxn); err != nil {
return batch, ms, br, intents, roachpb.NewErrorf("failed to run commit trigger: %s", err)
var err error
if trigger, err = r.runCommitTrigger(ctx, batch, &ms, *etArg, &clonedTxn); err != nil {
return batch, ms, br, trigger, roachpb.NewErrorf("failed to run commit trigger: %s", err)
}
}

br.Txn = &clonedTxn
// Add placeholder responses for begin & end transaction requests.
br.Responses = append([]roachpb.ResponseUnion{{BeginTransaction: &roachpb.BeginTransactionResponse{}}}, br.Responses...)
br.Responses = append(br.Responses, roachpb.ResponseUnion{EndTransaction: &roachpb.EndTransactionResponse{OnePhaseCommit: true}})
return batch, ms, br, intents, nil
return batch, ms, br, trigger, nil
}

// Otherwise, re-execute with the original, transactional batch.
batch.Close()
batch = r.store.Engine().NewBatch()
ms = enginepb.MVCCStats{}
br, intents, pErr = r.executeBatch(ctx, idKey, batch, &ms, ba)
return batch, ms, br, intents, pErr
br, trigger, pErr = r.executeBatch(ctx, idKey, batch, &ms, ba)
return batch, ms, br, trigger, pErr
}

// isOnePhaseCommit returns true iff the BatchRequest contains all
Expand Down Expand Up @@ -2257,9 +2285,9 @@ func optimizePuts(batch engine.ReadWriter, reqs []roachpb.RequestUnion, distinct
func (r *Replica) executeBatch(
ctx context.Context, idKey storagebase.CmdIDKey,
batch engine.ReadWriter, ms *enginepb.MVCCStats, ba roachpb.BatchRequest) (
*roachpb.BatchResponse, []intentsWithArg, *roachpb.Error) {
*roachpb.BatchResponse, *PostCommitTrigger, *roachpb.Error) {
br := ba.CreateReply()
var intents []intentsWithArg
var trigger *PostCommitTrigger

r.mu.Lock()
threshold := r.mu.state.GCThreshold
Expand Down Expand Up @@ -2287,13 +2315,9 @@ func (r *Replica) executeBatch(
ba.Txn.BatchIndex = int32(index)
}
reply := br.Responses[index].GetInner()
curIntents, pErr := r.executeCmd(ctx, idKey, index, batch, ms, ba.Header, remScanResults, args, reply)
curTrigger, pErr := r.executeCmd(ctx, idKey, index, batch, ms, ba.Header, remScanResults, args, reply)

// Collect intents skipped over the course of execution.
if len(curIntents) > 0 {
// TODO(tschottdorf): see about refactoring the args away.
intents = append(intents, intentsWithArg{args: args, intents: curIntents})
}
trigger = updateTrigger(trigger, curTrigger)

if pErr != nil {
switch tErr := pErr.GetDetail().(type) {
Expand All @@ -2312,7 +2336,7 @@ func (r *Replica) executeBatch(
// a txn, intents from earlier commands in the same batch
// won't return a WriteTooOldError.
if ba.Txn != nil {
return nil, intents, pErr
return nil, trigger, pErr
}
// If not in a txn, need to make sure we don't propagate the
// error unless there are no earlier commands in the batch
Expand All @@ -2327,7 +2351,7 @@ func (r *Replica) executeBatch(
}
}
if !overlap {
return nil, intents, pErr
return nil, trigger, pErr
}
}
// On WriteTooOldError, we've written a new value or an intent
Expand All @@ -2346,7 +2370,7 @@ func (r *Replica) executeBatch(
default:
// Initialize the error index.
pErr.SetErrorIndex(int32(index))
return nil, intents, pErr
return nil, trigger, pErr
}
}

Expand Down Expand Up @@ -2380,7 +2404,7 @@ func (r *Replica) executeBatch(
br.Timestamp.Forward(ba.Timestamp)
}

return br, intents, nil
return br, trigger, nil
}

// getLeaseForGossip tries to obtain a range lease. Only one of the replicas
Expand Down Expand Up @@ -2592,16 +2616,16 @@ func (r *Replica) loadSystemConfigSpan() ([]roachpb.KeyValue, []byte, error) {
ba.ReadConsistency = roachpb.INCONSISTENT
ba.Timestamp = r.store.Clock().Now()
ba.Add(&roachpb.ScanRequest{Span: keys.SystemConfigSpan})
br, intents, pErr :=
br, trigger, pErr :=
r.executeBatch(context.Background(), storagebase.CmdIDKey(""), r.store.Engine(), nil, ba)
if pErr != nil {
return nil, nil, pErr.GoError()
}
if len(intents) > 0 {
if trigger != nil && len(trigger.intents) > 0 {
// There were intents, so what we read may not be consistent. Attempt
// to nudge the intents in case they're expired; next time around we'll
// hopefully have more luck.
r.store.intentResolver.processIntentsAsync(r, intents)
r.store.intentResolver.processIntentsAsync(r, trigger.intents)
return nil, nil, errSystemConfigIntent
}
kvs := br.Responses[0].GetInner().(*roachpb.ScanResponse).Rows
Expand Down
Loading

0 comments on commit 76fd805

Please sign in to comment.