Skip to content

Commit

Permalink
Merge pull request #32223 from andreimatei/backport2.0-32166
Browse files Browse the repository at this point in the history
release-2.0: storage: don't apply local results if cmd processing failed
  • Loading branch information
andreimatei authored Nov 12, 2018
2 parents 174cead + 09f68de commit 2fdb530
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 4 deletions.
23 changes: 23 additions & 0 deletions pkg/storage/batcheval/result/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package result

import (
"context"
"fmt"

"github.com/kr/pretty"
"github.com/pkg/errors"
Expand Down Expand Up @@ -80,6 +81,28 @@ type LocalResult struct {
UpdatedTxns *[]*roachpb.Transaction
}

func (lResult *LocalResult) String() string {
if lResult == nil {
return "LocalResult: nil"
}
var numIntents, numEndTxns, numUpdatedTxns int
if lResult.Intents != nil {
numIntents = len(*lResult.Intents)
}
if lResult.EndTxns != nil {
numEndTxns = len(*lResult.EndTxns)
}
if lResult.UpdatedTxns != nil {
numUpdatedTxns = len(*lResult.UpdatedTxns)
}
return fmt.Sprintf("LocalResult (reply: %v, #intents: %d, #endTxns: %d #updated txns: %d, "+
"GossipFirstRange:%t MaybeGossipSystemConfig:%t MaybeAddToSplitQueue:%t "+
"MaybeGossipNodeLiveness:%s",
lResult.Reply, numIntents, numEndTxns, numUpdatedTxns, lResult.GossipFirstRange,
lResult.MaybeGossipSystemConfig, lResult.MaybeAddToSplitQueue,
lResult.MaybeGossipNodeLiveness)
}

// DetachIntents returns (and removes) those intents from the
// LocalEvalResult which are supposed to be handled.
func (lResult *LocalResult) DetachIntents() []IntentsWithArg {
Expand Down
12 changes: 11 additions & 1 deletion pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -2612,8 +2612,10 @@ func (r *Replica) executeWriteBatch(
br, pErr, retry := r.tryExecuteWriteBatch(ctx, ba)
switch retry {
case proposalIllegalLeaseIndex:
log.VEventf(ctx, 2, "retry: proposalIllegalLeaseIndex")
continue // retry
case proposalAmbiguousShouldBeReevaluated:
log.VEventf(ctx, 2, "retry: proposalAmbiguousShouldBeReevaluated")
ambiguousResult = true
continue // retry
case proposalRangeNoLongerExists, proposalErrorReproposing:
Expand Down Expand Up @@ -4917,7 +4919,15 @@ func (r *Replica) processRaftCommand(
}
response.Intents = proposal.Local.DetachIntents()
response.EndTxns = proposal.Local.DetachEndTxns(response.Err != nil)
lResult = proposal.Local
if pErr == nil {
lResult = proposal.Local
}
}
if pErr != nil && lResult != nil {
log.Fatalf(ctx, "shouldn't have a local result if command processing failed. pErr: %s", pErr)
}
if log.ExpensiveLogEnabled(ctx, 2) {
log.VEvent(ctx, 2, lResult.String())
}

// Handle the Result, executing any side effects of the last
Expand Down
7 changes: 4 additions & 3 deletions pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,10 @@ type ProposalData struct {
// Always use ProposalData.finishRaftApplication().
doneCh chan proposalResult

// Local contains the results of evaluating the request
// tying the upstream evaluation of the request to the
// downstream application of the command.
// Local contains the results of evaluating the request tying the upstream
// evaluation of the request to the downstream application of the command.
// Nil when the proposal came from another node (i.e. the evaluation wasn't
// done here).
Local *result.LocalResult

// Request is the client's original BatchRequest.
Expand Down
69 changes: 69 additions & 0 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/shuffle"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)

Expand Down Expand Up @@ -8447,6 +8448,74 @@ func TestGCWithoutThreshold(t *testing.T) {
}
}

// Test that, if the Raft command resulting from EndTransaction request fails to
// be processed/apply, then the LocalResult associated with that command is
// cleared.
func TestFailureToProcessCommandClearsLocalResult(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
var tc testContext
cfg := TestStoreConfig(nil)
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
tc.StartWithStoreConfig(t, stopper, cfg)

key := roachpb.Key("a")
txn := newTransaction("test", key, 1, enginepb.SERIALIZABLE, tc.Clock())
bt, btH := beginTxnArgs(key, txn)
put := putArgs(key, []byte("value"))

var ba roachpb.BatchRequest
ba.Header = btH
ba.Add(&bt, &put)
if _, err := tc.Sender().Send(ctx, ba); err != nil {
t.Fatal(err)
}

var proposalRecognized int64 // accessed atomically

r := tc.repl
r.mu.Lock()
r.mu.submitProposalFn = func(pd *ProposalData) error {
// We're going to recognize the first time the commnand for the
// EndTransaction is proposed and we're going to hackily decrease its
// MaxLeaseIndex, so that the processing gets rejected further on.
ut := pd.Local.UpdatedTxns
if atomic.LoadInt64(&proposalRecognized) == 0 &&
ut != nil && len(*ut) == 1 && (*ut)[0].ID.Equal(txn.ID) {
pd.command.MaxLeaseIndex--
atomic.StoreInt64(&proposalRecognized, 1)
}
return defaultSubmitProposalLocked(r, pd)
}
r.mu.Unlock()

opCtx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, "test-recording")
defer cancel()

ba = roachpb.BatchRequest{}
et, etH := endTxnArgs(txn, true /* commit */)
et.IntentSpans = []roachpb.Span{{Key: key}}
ba.Header = etH
ba.Add(&et)
if _, err := tc.Sender().Send(opCtx, ba); err != nil {
t.Fatal(err)
}
formatted := tracing.FormatRecordedSpans(collect())
if err := testutils.MatchInOrder(formatted,
// The first proposal is rejected.
"retry proposal.*applied at lease index.*but required",
// The LocalResult is nil. This is the important part for this test.
"LocalResult: nil",
// The request will be re-evaluated.
"retry: proposalIllegalLeaseIndex",
// Re-evaluation succeeds and one txn is to be updated.
"LocalResult \\(reply.*#updated txns: 1",
); err != nil {
t.Fatal(err)
}
}

// TestCommandTimeThreshold verifies that commands outside the replica GC
// threshold fail.
func TestCommandTimeThreshold(t *testing.T) {
Expand Down

0 comments on commit 2fdb530

Please sign in to comment.