From 76fd805519611ad75610c1605f5b6c275f64b6cf Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Tue, 26 Jul 2016 05:33:30 -0400 Subject: [PATCH] storage: hoist split side effects Previously, a committed split would have the Store process that split via a `batch.Defer`. As part of #6286, this `batch.Defer` was removed and explicit metainformation is now passed up the callchain up to `(*Replica).processRaftCommand`, where it is translated to calls to the Store. A similar mechanism was previously used for passing up skipped intents, and that mechanism has been folded into this new one (with more side effects to be extracted in future work). --- storage/replica.go | 138 ++++++++++++++++++++++--------------- storage/replica_command.go | 124 +++++++++++++++++++-------------- storage/replica_trigger.go | 65 +++++++++++++++++ 3 files changed, 217 insertions(+), 110 deletions(-) create mode 100644 storage/replica_trigger.go diff --git a/storage/replica.go b/storage/replica.go index 84337dd37f82..34bfae8f8923 100644 --- a/storage/replica.go +++ b/storage/replica.go @@ -1179,8 +1179,8 @@ func (r *Replica) addReadOnlyCmd(ctx context.Context, ba roachpb.BatchRequest) ( // Execute read-only batch command. It checks for matching key range; note // that holding readMu throughout is important to avoid reads from the // "wrong" key range being served after the range has been split. - var intents []intentsWithArg - br, intents, pErr = r.executeBatch(ctx, storagebase.CmdIDKey(""), r.store.Engine(), nil, ba) + var trigger *PostCommitTrigger + br, trigger, pErr = r.executeBatch(ctx, storagebase.CmdIDKey(""), r.store.Engine(), nil, ba) if pErr == nil && ba.Txn != nil { r.assert5725(ba) @@ -1189,7 +1189,9 @@ func (r *Replica) addReadOnlyCmd(ctx context.Context, ba roachpb.BatchRequest) ( // described in #2231. pErr = r.checkIfTxnAborted(ctx, r.store.Engine(), *ba.Txn) } - r.store.intentResolver.processIntentsAsync(r, intents) + if trigger != nil && len(trigger.intents) > 0 { + r.store.intentResolver.processIntentsAsync(r, trigger.intents) + } return br, pErr } @@ -1552,7 +1554,7 @@ func (r *Replica) handleRaftReady() error { if err := command.Unmarshal(ctx.Payload); err != nil { return err } - if err := r.processRaftCommand(storagebase.CmdIDKey(ctx.CommandID), e.Index, command); err != nil { + if pErr := r.processRaftCommand(storagebase.CmdIDKey(ctx.CommandID), e.Index, command); pErr != nil { // If processRaftCommand failed, tell raft that the config change was aborted. cc = raftpb.ConfChange{} } @@ -1566,7 +1568,6 @@ func (r *Replica) handleRaftReady() error { default: log.Fatalf(context.TODO(), "%s: unexpected Raft entry: %v", r, e) } - } if shouldReproposeCmds { r.mu.Lock() @@ -1746,7 +1747,9 @@ func (r *Replica) refurbishPendingCmdLocked(cmd *pendingCmd) *roachpb.Error { // As a special case, the zero idKey signifies an empty Raft command, // which will apply as a no-op (without accessing raftCmd, via an error), // updating only the applied index. -func (r *Replica) processRaftCommand(idKey storagebase.CmdIDKey, index uint64, raftCmd roachpb.RaftCommand) *roachpb.Error { +func (r *Replica) processRaftCommand( + idKey storagebase.CmdIDKey, index uint64, raftCmd roachpb.RaftCommand, +) *roachpb.Error { if index == 0 { log.Fatalf(context.TODO(), "%s: processRaftCommand requires a non-zero index", r) } @@ -1875,18 +1878,46 @@ func (r *Replica) processRaftCommand(idKey storagebase.CmdIDKey, index uint64, r log.Infof(context.TODO(), "%s: applying command with forced error: %v", r, forcedErr) } - br, err := r.applyRaftCommand(idKey, ctx, index, leaseIndex, + br, trigger, pErr := r.applyRaftCommand(idKey, ctx, index, leaseIndex, raftCmd.OriginReplica, raftCmd.Cmd, forcedErr) - err = r.maybeSetCorrupt(err) + pErr = r.maybeSetCorrupt(pErr) + + // Handle all returned side effects. This must happen after commit but + // before returning to the client. + if trigger != nil { + if trigger.split != nil { + if pErr != nil { + panic("split trigger emitted but error returned") + } + splitTriggerPostCommit( + context.Background(), + trigger.split.MVCCStats, + &trigger.split.SplitTrigger, + r, + ) + } + + // On the replica on which this command originated, resolve skipped intents + // asynchronously - even on failure. + if raftCmd.OriginReplica.StoreID == r.store.StoreID() { + r.store.intentResolver.processIntentsAsync(r, trigger.intents) + } + } + // On successful write commands handle write-related triggers including + // splitting and raft log truncation. + if pErr == nil && raftCmd.Cmd.IsWrite() { + r.maybeAddToSplitQueue() + r.maybeAddToRaftLogQueue(index) + } if cmd != nil { - cmd.done <- roachpb.ResponseWithError{Reply: br, Err: err} + cmd.done <- roachpb.ResponseWithError{Reply: br, Err: pErr} close(cmd.done) - } else if err != nil && log.V(1) { - log.Errorf(context.TODO(), "%s: error executing raft command: %s", r, err) + } else if pErr != nil && log.V(1) { + log.Errorf(context.TODO(), "%s: error executing raft command: %s", r, pErr) } - return err + return pErr } // applyRaftCommand applies a raft command from the replicated log to the @@ -1900,7 +1931,7 @@ func (r *Replica) applyRaftCommand( originReplica roachpb.ReplicaDescriptor, ba roachpb.BatchRequest, forcedError *roachpb.Error, -) (*roachpb.BatchResponse, *roachpb.Error) { +) (*roachpb.BatchResponse, *PostCommitTrigger, *roachpb.Error) { if index <= 0 { log.Fatalf(ctx, "raft command index is <= 0") } @@ -1917,7 +1948,7 @@ func (r *Replica) applyRaftCommand( r.mu.Unlock() if index != oldIndex+1 { - return nil, roachpb.NewError(newReplicaCorruptionError(errors.Errorf("applied index jumped from %d to %d", oldIndex, index))) + return nil, nil, roachpb.NewError(newReplicaCorruptionError(errors.Errorf("applied index jumped from %d to %d", oldIndex, index))) } // Call the helper, which returns a batch containing data written @@ -1925,7 +1956,7 @@ func (r *Replica) applyRaftCommand( var batch engine.Batch var br *roachpb.BatchResponse var ms enginepb.MVCCStats - var intents []intentsWithArg + var trigger *PostCommitTrigger var rErr *roachpb.Error if forcedError != nil { @@ -1935,7 +1966,7 @@ func (r *Replica) applyRaftCommand( }) br, rErr = nil, forcedError } else { - batch, ms, br, intents, rErr = r.applyRaftCommandInBatch(ctx, idKey, + batch, ms, br, trigger, rErr = r.applyRaftCommandInBatch(ctx, idKey, originReplica, ba) } @@ -1988,20 +2019,7 @@ func (r *Replica) applyRaftCommand( rErr = roachpb.NewError(newReplicaCorruptionError(errors.Errorf("could not commit batch"), err, rErr.GoError())) } - // On successful write commands handle write-related triggers including - // splitting and raft log truncation. - if rErr == nil && ba.IsWrite() { - r.maybeAddToSplitQueue() - r.maybeAddToRaftLogQueue(index) - } - - // On the replica on which this command originated, resolve skipped intents - // asynchronously - even on failure. - if originReplica.StoreID == r.store.StoreID() { - r.store.intentResolver.processIntentsAsync(r, intents) - } - - return br, rErr + return br, trigger, rErr } // applyRaftCommandInBatch executes the command in a batch engine and @@ -2012,7 +2030,12 @@ func (r *Replica) applyRaftCommandInBatch( idKey storagebase.CmdIDKey, originReplica roachpb.ReplicaDescriptor, ba roachpb.BatchRequest, -) (engine.Batch, enginepb.MVCCStats, *roachpb.BatchResponse, []intentsWithArg, *roachpb.Error) { +) ( + engine.Batch, + enginepb.MVCCStats, + *roachpb.BatchResponse, + *PostCommitTrigger, *roachpb.Error, +) { // Check whether this txn has been aborted. Only applies to transactional // requests which write intents (for example HeartbeatTxn does not get // hindered by this). @@ -2030,7 +2053,7 @@ func (r *Replica) applyRaftCommandInBatch( // Execute the commands. If this returns without an error, the batch must // be committed (EndTransaction with a CommitTrigger may unlock // readOnlyCmdMu via a batch.Defer). - btch, ms, br, intents, pErr := r.executeWriteBatch(ctx, idKey, ba) + btch, ms, br, trigger, pErr := r.executeWriteBatch(ctx, idKey, ba) if ba.IsWrite() { if pErr != nil { @@ -2055,7 +2078,7 @@ func (r *Replica) applyRaftCommandInBatch( } } - return btch, ms, br, intents, pErr + return btch, ms, br, trigger, pErr } // checkIfTxnAborted checks the txn abort cache for the given @@ -2104,14 +2127,18 @@ type intentsWithArg struct { // txn is restored and it's re-executed as transactional. func (r *Replica) executeWriteBatch( ctx context.Context, idKey storagebase.CmdIDKey, ba roachpb.BatchRequest) ( - engine.Batch, enginepb.MVCCStats, *roachpb.BatchResponse, []intentsWithArg, *roachpb.Error) { + engine.Batch, + enginepb.MVCCStats, + *roachpb.BatchResponse, + *PostCommitTrigger, *roachpb.Error, +) { batch := r.store.Engine().NewBatch() ms := enginepb.MVCCStats{} // If not transactional or there are indications that the batch's txn // will require restart or retry, execute as normal. if r.store.TestingKnobs().DisableOnePhaseCommits || !isOnePhaseCommit(ba) { - br, intents, pErr := r.executeBatch(ctx, idKey, batch, &ms, ba) - return batch, ms, br, intents, pErr + br, trigger, pErr := r.executeBatch(ctx, idKey, batch, &ms, ba) + return batch, ms, br, trigger, pErr } // Try executing with transaction stripped. @@ -2120,7 +2147,7 @@ func (r *Replica) executeWriteBatch( strippedBa.Requests = ba.Requests[1 : len(ba.Requests)-1] // strip begin/end txn reqs // If all writes occurred at the intended timestamp, we've succeeded on the fast path. - br, intents, pErr := r.executeBatch(ctx, idKey, batch, &ms, strippedBa) + br, trigger, pErr := r.executeBatch(ctx, idKey, batch, &ms, strippedBa) if pErr == nil && ba.Timestamp == br.Timestamp { clonedTxn := ba.Txn.Clone() clonedTxn.Writing = true @@ -2136,8 +2163,9 @@ func (r *Replica) executeWriteBatch( ms = enginepb.MVCCStats{} } else { // Run commit trigger manually. - if err := r.runCommitTrigger(ctx, batch, &ms, *etArg, &clonedTxn); err != nil { - return batch, ms, br, intents, roachpb.NewErrorf("failed to run commit trigger: %s", err) + var err error + if trigger, err = r.runCommitTrigger(ctx, batch, &ms, *etArg, &clonedTxn); err != nil { + return batch, ms, br, trigger, roachpb.NewErrorf("failed to run commit trigger: %s", err) } } @@ -2145,15 +2173,15 @@ func (r *Replica) executeWriteBatch( // Add placeholder responses for begin & end transaction requests. br.Responses = append([]roachpb.ResponseUnion{{BeginTransaction: &roachpb.BeginTransactionResponse{}}}, br.Responses...) br.Responses = append(br.Responses, roachpb.ResponseUnion{EndTransaction: &roachpb.EndTransactionResponse{OnePhaseCommit: true}}) - return batch, ms, br, intents, nil + return batch, ms, br, trigger, nil } // Otherwise, re-execute with the original, transactional batch. batch.Close() batch = r.store.Engine().NewBatch() ms = enginepb.MVCCStats{} - br, intents, pErr = r.executeBatch(ctx, idKey, batch, &ms, ba) - return batch, ms, br, intents, pErr + br, trigger, pErr = r.executeBatch(ctx, idKey, batch, &ms, ba) + return batch, ms, br, trigger, pErr } // isOnePhaseCommit returns true iff the BatchRequest contains all @@ -2257,9 +2285,9 @@ func optimizePuts(batch engine.ReadWriter, reqs []roachpb.RequestUnion, distinct func (r *Replica) executeBatch( ctx context.Context, idKey storagebase.CmdIDKey, batch engine.ReadWriter, ms *enginepb.MVCCStats, ba roachpb.BatchRequest) ( - *roachpb.BatchResponse, []intentsWithArg, *roachpb.Error) { + *roachpb.BatchResponse, *PostCommitTrigger, *roachpb.Error) { br := ba.CreateReply() - var intents []intentsWithArg + var trigger *PostCommitTrigger r.mu.Lock() threshold := r.mu.state.GCThreshold @@ -2287,13 +2315,9 @@ func (r *Replica) executeBatch( ba.Txn.BatchIndex = int32(index) } reply := br.Responses[index].GetInner() - curIntents, pErr := r.executeCmd(ctx, idKey, index, batch, ms, ba.Header, remScanResults, args, reply) + curTrigger, pErr := r.executeCmd(ctx, idKey, index, batch, ms, ba.Header, remScanResults, args, reply) - // Collect intents skipped over the course of execution. - if len(curIntents) > 0 { - // TODO(tschottdorf): see about refactoring the args away. - intents = append(intents, intentsWithArg{args: args, intents: curIntents}) - } + trigger = updateTrigger(trigger, curTrigger) if pErr != nil { switch tErr := pErr.GetDetail().(type) { @@ -2312,7 +2336,7 @@ func (r *Replica) executeBatch( // a txn, intents from earlier commands in the same batch // won't return a WriteTooOldError. if ba.Txn != nil { - return nil, intents, pErr + return nil, trigger, pErr } // If not in a txn, need to make sure we don't propagate the // error unless there are no earlier commands in the batch @@ -2327,7 +2351,7 @@ func (r *Replica) executeBatch( } } if !overlap { - return nil, intents, pErr + return nil, trigger, pErr } } // On WriteTooOldError, we've written a new value or an intent @@ -2346,7 +2370,7 @@ func (r *Replica) executeBatch( default: // Initialize the error index. pErr.SetErrorIndex(int32(index)) - return nil, intents, pErr + return nil, trigger, pErr } } @@ -2380,7 +2404,7 @@ func (r *Replica) executeBatch( br.Timestamp.Forward(ba.Timestamp) } - return br, intents, nil + return br, trigger, nil } // getLeaseForGossip tries to obtain a range lease. Only one of the replicas @@ -2592,16 +2616,16 @@ func (r *Replica) loadSystemConfigSpan() ([]roachpb.KeyValue, []byte, error) { ba.ReadConsistency = roachpb.INCONSISTENT ba.Timestamp = r.store.Clock().Now() ba.Add(&roachpb.ScanRequest{Span: keys.SystemConfigSpan}) - br, intents, pErr := + br, trigger, pErr := r.executeBatch(context.Background(), storagebase.CmdIDKey(""), r.store.Engine(), nil, ba) if pErr != nil { return nil, nil, pErr.GoError() } - if len(intents) > 0 { + if trigger != nil && len(trigger.intents) > 0 { // There were intents, so what we read may not be consistent. Attempt // to nudge the intents in case they're expired; next time around we'll // hopefully have more luck. - r.store.intentResolver.processIntentsAsync(r, intents) + r.store.intentResolver.processIntentsAsync(r, trigger.intents) return nil, nil, errSystemConfigIntent } kvs := br.Responses[0].GetInner().(*roachpb.ScanResponse).Rows diff --git a/storage/replica_command.go b/storage/replica_command.go index 42912277d14b..f4d0d8a986bd 100644 --- a/storage/replica_command.go +++ b/storage/replica_command.go @@ -58,7 +58,7 @@ var errTransactionUnsupported = errors.New("not supported within a transaction") func (r *Replica) executeCmd(ctx context.Context, raftCmdID storagebase.CmdIDKey, index int, batch engine.ReadWriter, ms *enginepb.MVCCStats, h roachpb.Header, remScanResults int64, - args roachpb.Request, reply roachpb.Response) ([]roachpb.Intent, *roachpb.Error) { + args roachpb.Request, reply roachpb.Response) (*PostCommitTrigger, *roachpb.Error) { ts := h.Timestamp if _, ok := args.(*roachpb.NoopRequest); ok { @@ -88,6 +88,8 @@ func (r *Replica) executeCmd(ctx context.Context, raftCmdID storagebase.CmdIDKey var intents []roachpb.Intent var err error + var trigger *PostCommitTrigger + // Note that responses are populated even when an error is returned. // TODO(tschottdorf): Change that. IIRC there is nontrivial use of it currently. switch tArgs := args.(type) { @@ -123,7 +125,7 @@ func (r *Replica) executeCmd(ctx context.Context, raftCmdID storagebase.CmdIDKey *resp, err = r.BeginTransaction(ctx, batch, ms, h, *tArgs) case *roachpb.EndTransactionRequest: resp := reply.(*roachpb.EndTransactionResponse) - *resp, intents, err = r.EndTransaction(ctx, batch, ms, h, *tArgs) + *resp, trigger, intents, err = r.EndTransaction(ctx, batch, ms, h, *tArgs) case *roachpb.RangeLookupRequest: resp := reply.(*roachpb.RangeLookupResponse) *resp, intents, err = r.RangeLookup(ctx, batch, h, *tArgs) @@ -182,7 +184,15 @@ func (r *Replica) executeCmd(ctx context.Context, raftCmdID storagebase.CmdIDKey } pErr = roachpb.NewErrorWithTxn(err, txn) } - return intents, pErr + + if len(intents) > 0 { + trigger = updateTrigger( + trigger, + &PostCommitTrigger{intents: []intentsWithArg{{args: args, intents: intents}}}, + ) + } + + return trigger, pErr } // Get returns the value for a specified key. @@ -426,11 +436,11 @@ func (r *Replica) BeginTransaction( func (r *Replica) EndTransaction( ctx context.Context, batch engine.ReadWriter, ms *enginepb.MVCCStats, h roachpb.Header, args roachpb.EndTransactionRequest, -) (roachpb.EndTransactionResponse, []roachpb.Intent, error) { +) (roachpb.EndTransactionResponse, *PostCommitTrigger, []roachpb.Intent, error) { var reply roachpb.EndTransactionResponse if err := verifyTransaction(h, &args); err != nil { - return reply, nil, err + return reply, nil, nil, err } key := keys.TransactionKey(h.Txn.Key, h.Txn.ID) @@ -440,12 +450,12 @@ func (r *Replica) EndTransaction( if ok, err := engine.MVCCGetProto( ctx, batch, key, hlc.ZeroTimestamp, true, nil, reply.Txn, ); err != nil { - return reply, nil, err + return reply, nil, nil, err } else if !ok { // Return a fresh empty reply because there's an empty Transaction // proto in our existing one. return roachpb.EndTransactionResponse{}, - nil, + nil, nil, roachpb.NewTransactionStatusError("does not exist") } @@ -454,7 +464,7 @@ func (r *Replica) EndTransaction( // not suffered regression. switch reply.Txn.Status { case roachpb.COMMITTED: - return reply, nil, roachpb.NewTransactionStatusError("already committed") + return reply, nil, nil, roachpb.NewTransactionStatusError("already committed") case roachpb.ABORTED: if !args.Commit { @@ -465,16 +475,16 @@ func (r *Replica) EndTransaction( if err := updateTxnWithExternalIntents( ctx, batch, ms, args, reply.Txn, externalIntents, ); err != nil { - return reply, nil, err + return reply, nil, nil, err } - return reply, externalIntents, nil + return reply, nil, externalIntents, nil } // If the transaction was previously aborted by a concurrent // writer's push, any intents written are still open. It's only now // that we know them, so we return them all for asynchronous // resolution (we're currently not able to write on error, but // see #1989). - return reply, + return reply, nil, roachpb.AsIntents(args.IntentSpans, reply.Txn), roachpb.NewTransactionAbortedError() @@ -484,7 +494,7 @@ func (r *Replica) EndTransaction( // importantly, intents) dangling; we can't currently write on // error. Would panic, but that makes TestEndTransactionWithErrors // awkward. - return reply, nil, roachpb.NewTransactionStatusError( + return reply, nil, nil, roachpb.NewTransactionStatusError( fmt.Sprintf("epoch regression: %d", h.Txn.Epoch), ) } else if h.Txn.Epoch == reply.Txn.Epoch && reply.Txn.Timestamp.Less(h.Txn.OrigTimestamp) { @@ -493,13 +503,13 @@ func (r *Replica) EndTransaction( // than the original transaction timestamp. // TODO(tschottdorf): see above comment on epoch regression. - return reply, nil, roachpb.NewTransactionStatusError( + return reply, nil, nil, roachpb.NewTransactionStatusError( fmt.Sprintf("timestamp regression: %s", h.Txn.OrigTimestamp), ) } default: - return reply, nil, roachpb.NewTransactionStatusError( + return reply, nil, nil, roachpb.NewTransactionStatusError( fmt.Sprintf("bad txn status: %s", reply.Txn), ) } @@ -529,14 +539,14 @@ func (r *Replica) EndTransaction( // and (b) not able to write on error (see #1989), we can't write // ABORTED into the master transaction record, which remains // PENDING, and that's pretty bad. - return reply, roachpb.AsIntents(args.IntentSpans, reply.Txn), roachpb.NewTransactionAbortedError() + return reply, nil, roachpb.AsIntents(args.IntentSpans, reply.Txn), roachpb.NewTransactionAbortedError() } // Set transaction status to COMMITTED or ABORTED as per the // args.Commit parameter. if args.Commit { if isEndTransactionTriggeringRetryError(h.Txn, reply.Txn) { - return reply, nil, roachpb.NewTransactionRetryError() + return reply, nil, nil, roachpb.NewTransactionRetryError() } reply.Txn.Status = roachpb.COMMITTED } else { @@ -545,16 +555,18 @@ func (r *Replica) EndTransaction( externalIntents := r.resolveLocalIntents(ctx, batch, ms, args, reply.Txn) if err := updateTxnWithExternalIntents(ctx, batch, ms, args, reply.Txn, externalIntents); err != nil { - return reply, nil, err + return reply, nil, nil, err } // Run triggers if successfully committed. + var trigger *PostCommitTrigger if reply.Txn.Status == roachpb.COMMITTED { - if err := r.runCommitTrigger(ctx, batch.(engine.Batch), ms, args, reply.Txn); err != nil { + var err error + if trigger, err = r.runCommitTrigger(ctx, batch.(engine.Batch), ms, args, reply.Txn); err != nil { // TODO(tschottdorf): should an error here always amount to a // ReplicaCorruptionError? log.Error(context.TODO(), errors.Wrapf(err, "%s: commit trigger", r)) - return reply, nil, err + return reply, nil, nil, err } } @@ -576,7 +588,7 @@ func (r *Replica) EndTransaction( // will immediately succeed as a missing txn record on push sets the // transaction to aborted. In both cases, the txn will be GC'd on // the slow path. - return reply, externalIntents, nil + return reply, trigger, externalIntents, nil } // isEndTransactionExceedingDeadline returns true if the transaction @@ -752,8 +764,14 @@ func intersectSpan(span roachpb.Span, desc roachpb.RangeDescriptor) (middle *roa return } -func (r *Replica) runCommitTrigger(ctx context.Context, batch engine.Batch, ms *enginepb.MVCCStats, - args roachpb.EndTransactionRequest, txn *roachpb.Transaction) error { +func (r *Replica) runCommitTrigger( + ctx context.Context, + batch engine.Batch, + ms *enginepb.MVCCStats, + args roachpb.EndTransactionRequest, + txn *roachpb.Transaction, +) (*PostCommitTrigger, error) { + var trigger *PostCommitTrigger ct := args.InternalCommitTrigger if ct != nil { // Assert that the on-disk state doesn't diverge from the in-memory @@ -771,7 +789,8 @@ func (r *Replica) runCommitTrigger(ctx context.Context, batch engine.Batch, ms * if err := func() error { if ct.GetSplitTrigger() != nil { - if err := r.splitTrigger(ctx, batch, ms, ct.SplitTrigger, txn.Timestamp); err != nil { + var err error + if trigger, err = r.splitTrigger(ctx, batch, ms, ct.SplitTrigger, txn.Timestamp); err != nil { return err } } @@ -805,9 +824,9 @@ func (r *Replica) runCommitTrigger(ctx context.Context, batch engine.Batch, ms * return nil }(); err != nil { r.readOnlyCmdMu.Unlock() // since the batch.Defer above won't run - return err + return nil, err } - return nil + return trigger, nil } // RangeLookup is used to look up RangeDescriptors - a RangeDescriptor @@ -2509,8 +2528,12 @@ func splitTriggerPostCommit( // the split. That Raft group is starting from an empty Raft log (positioned at // log entry 10) and a snapshot of the RHS of the split range. func (r *Replica) splitTrigger( - ctx context.Context, batch engine.Batch, ms *enginepb.MVCCStats, split *roachpb.SplitTrigger, ts hlc.Timestamp, -) error { + ctx context.Context, + batch engine.Batch, + ms *enginepb.MVCCStats, + split *roachpb.SplitTrigger, + ts hlc.Timestamp, +) (*PostCommitTrigger, error) { // TODO(tschottdorf): should have an incoming context from the corresponding // EndTransaction, but the plumbing has not been done yet. sp := r.store.Tracer().StartSpan("split") @@ -2518,7 +2541,7 @@ func (r *Replica) splitTrigger( desc := r.Desc() if !bytes.Equal(desc.StartKey, split.LeftDesc.StartKey) || !bytes.Equal(desc.EndKey, split.RightDesc.EndKey) { - return errors.Errorf("range does not match splits: (%s-%s) + (%s-%s) != %s", + return nil, errors.Errorf("range does not match splits: (%s-%s) + (%s-%s) != %s", split.LeftDesc.StartKey, split.LeftDesc.EndKey, split.RightDesc.StartKey, split.RightDesc.EndKey, r) } @@ -2539,7 +2562,7 @@ func (r *Replica) splitTrigger( // Account for MVCCStats' own contribution to the RHS range's statistics. if err := engine.AccountForSelf(&deltaMS, split.RightDesc.RangeID); err != nil { - return errors.Wrap(err, "unable to account for enginepb.MVCCStats's own stats impact") + return nil, errors.Wrap(err, "unable to account for enginepb.MVCCStats's own stats impact") } // TODO(d4l3k): we should check which side of the split is smaller @@ -2549,12 +2572,12 @@ func (r *Replica) splitTrigger( // Compute stats for LHS range. leftMS, err := ComputeStatsForRange(&split.LeftDesc, batch, ts.WallTime) if err != nil { - return errors.Wrap(err, "unable to compute stats for LHS range after split") + return nil, errors.Wrap(err, "unable to compute stats for LHS range after split") } log.Trace(ctx, "computed stats for left hand side range") if err := setMVCCStats(batch, r.RangeID, leftMS); err != nil { - return errors.Wrap(err, "unable to write MVCC stats") + return nil, errors.Wrap(err, "unable to write MVCC stats") } r.mu.Lock() r.mu.state.Stats = leftMS @@ -2565,24 +2588,24 @@ func (r *Replica) splitTrigger( // nil on calls to MVCCPutProto. replicaGCTS, err := r.getLastReplicaGCTimestamp() if err != nil { - return errors.Wrap(err, "unable to fetch last replica GC timestamp") + return nil, errors.Wrap(err, "unable to fetch last replica GC timestamp") } if err := engine.MVCCPutProto(ctx, batch, nil, keys.RangeLastReplicaGCTimestampKey(split.RightDesc.RangeID), hlc.ZeroTimestamp, nil, &replicaGCTS); err != nil { - return errors.Wrap(err, "unable to copy last replica GC timestamp") + return nil, errors.Wrap(err, "unable to copy last replica GC timestamp") } verifyTS, err := r.getLastVerificationTimestamp() if err != nil { - return errors.Wrap(err, "unable to fetch last verification timestamp") + return nil, errors.Wrap(err, "unable to fetch last verification timestamp") } if err := engine.MVCCPutProto(ctx, batch, nil, keys.RangeLastVerificationTimestampKey(split.RightDesc.RangeID), hlc.ZeroTimestamp, nil, &verifyTS); err != nil { - return errors.Wrap(err, "unable to copy last verification timestamp") + return nil, errors.Wrap(err, "unable to copy last verification timestamp") } // Initialize the RHS range's abort cache by copying the LHS's. seqCount, err := r.abortCache.CopyInto(batch, &deltaMS, split.RightDesc.RangeID) if err != nil { // TODO(tschottdorf): ReplicaCorruptionError. - return errors.Wrap(err, "unable to copy abort cache to RHS split range") + return nil, errors.Wrap(err, "unable to copy abort cache to RHS split range") } log.Trace(ctx, fmt.Sprintf("copied abort cache (%d entries)", seqCount)) @@ -2602,7 +2625,7 @@ func (r *Replica) splitTrigger( // RHS' stats from it below. deltaMS, err = writeInitialState(batch, deltaMS, split.RightDesc) if err != nil { - return errors.Wrap(err, "unable to write initial state") + return nil, errors.Wrap(err, "unable to write initial state") } // Initialize the right-hand lease to be the same as the left-hand lease. @@ -2624,12 +2647,12 @@ func (r *Replica) splitTrigger( { leftLease, err := loadLease(r.store.Engine(), r.RangeID) if err != nil { - return errors.Wrap(err, "unable to load lease") + return nil, errors.Wrap(err, "unable to load lease") } replica, found := split.RightDesc.GetReplicaDescriptor(leftLease.Replica.StoreID) if !found { - return errors.Errorf( + return nil, errors.Errorf( "pre-split lease holder %+v not found in post-split descriptor %+v", leftLease.Replica, split.RightDesc, ) @@ -2639,7 +2662,7 @@ func (r *Replica) splitTrigger( if err := setLease( batch, &deltaMS, split.RightDesc.RangeID, rightLease, ); err != nil { - return errors.Wrap(err, "unable to seed right-hand side lease") + return nil, errors.Wrap(err, "unable to seed right-hand side lease") } } @@ -2652,7 +2675,7 @@ func (r *Replica) splitTrigger( // over the keys and counting. rightMS, err = ComputeStatsForRange(&split.RightDesc, batch, ts.WallTime) if err != nil { - return errors.Wrap(err, "unable to compute stats for RHS range after split") + return nil, errors.Wrap(err, "unable to compute stats for RHS range after split") } } else { // Because neither the original stats or the delta stats contain @@ -2671,22 +2694,17 @@ func (r *Replica) splitTrigger( rightMS.Subtract(leftMS) } if err := setMVCCStats(batch, split.RightDesc.RangeID, rightMS); err != nil { - return errors.Wrap(err, "unable to write MVCC stats") + return nil, errors.Wrap(err, "unable to write MVCC stats") } log.Trace(ctx, "computed stats for RHS range") - // This is the part of the split trigger which coordinates the actual split - // with the Store. As such, it tries to avoid using any of the intermediate - // results of the code above, the goal being moving it closer to Store's - // Raft processing goroutine. - - theTrigger := func() { - splitTriggerPostCommit(ctx, deltaMS, split, r) + trigger := &PostCommitTrigger{ + split: &postCommitSplit{ + SplitTrigger: *split, + MVCCStats: deltaMS, + }, } - - batch.Defer(theTrigger) - - return nil + return trigger, nil } // AdminMerge extends this range to subsume the range that comes next diff --git a/storage/replica_trigger.go b/storage/replica_trigger.go new file mode 100644 index 000000000000..e08a6813493f --- /dev/null +++ b/storage/replica_trigger.go @@ -0,0 +1,65 @@ +// Copyright 2016 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. +// +// Author: Tobias Schottdorf (tobias.schottdorf@gmail.com) + +package storage + +import ( + "github.com/cockroachdb/cockroach/roachpb" + "github.com/cockroachdb/cockroach/storage/engine/enginepb" +) + +// postCommitSplit is emitted when a Replica commits a split trigger and +// signals that the Replica has prepared the on-disk state for both the left +// and right hand sides of the split, and that the left hand side Replica +// should be updated as well as the right hand side created. +type postCommitSplit struct { + roachpb.SplitTrigger + enginepb.MVCCStats +} + +// PostCommitTrigger is returned from Raft processing as a side effect which +// signals that further action should be taken as part of the processing of the +// Raft command. +// Depending on the content, actions may be executed on all Replicas, the lease +// holder, or a Replica determined by other conditions present in the specific +// trigger. +type PostCommitTrigger struct { + // intents stores any intents encountered but not conflicted with. They + // should be handed off to asynchronous intent processing so that an + // attempt to resolve them is made. + intents []intentsWithArg + // split contains a postCommitSplit trigger emitted on a split. + split *postCommitSplit +} + +// updateTrigger takes a previous and new commit trigger and combines their +// contents into an updated trigger, consuming both inputs. It will panic on +// illegal combinations (such as being asked to combine two split triggers). +func updateTrigger(old, new *PostCommitTrigger) *PostCommitTrigger { + if old == nil { + old = new + } else { + if new.intents != nil { + old.intents = append(old.intents, new.intents...) + } + if old.split == nil { + old.split = new.split + } else if new.split != nil { + panic("more than one split trigger") + } + } + return old +}