From e7362167ff3f0fed2573e93630ab0a74a8b55505 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Thu, 28 Feb 2019 12:53:22 -0500 Subject: [PATCH] intentresolver: fix test flake for CleanupIntents The test before was brittle and assumed that batching was always triggered on size rather than on timing. This mean that the test would fail if something slowed test execution even though there was no correctness violation. This PR makes the test less brittle by allowing intents to be resolved in an arbitrary number of batches. This change has the nice side effect of allowing the wait time for the requestbatcher to be shortened. Fixes #35166. Release note: None --- .../intentresolver/intent_resolver_test.go | 239 +++++++++++------- 1 file changed, 151 insertions(+), 88 deletions(-) diff --git a/pkg/storage/intentresolver/intent_resolver_test.go b/pkg/storage/intentresolver/intent_resolver_test.go index c8e02ef323fe..602cfaaebb8b 100644 --- a/pkg/storage/intentresolver/intent_resolver_test.go +++ b/pkg/storage/intentresolver/intent_resolver_test.go @@ -144,8 +144,8 @@ func TestCleanupTxnIntentsOnGCAsync(t *testing.T) { }, }, sendFuncs: []sendFunc{ - singlePushTxnSendFunc, - resolveIntentsSendFunc, + singlePushTxnSendFunc(t), + resolveIntentsSendFunc(t), failSendFunc, }, expectPushed: true, @@ -165,10 +165,10 @@ func TestCleanupTxnIntentsOnGCAsync(t *testing.T) { {Span: roachpb.Span{Key: key, EndKey: roachpb.Key("b")}, Txn: txn1.TxnMeta}, }, sendFuncs: []sendFunc{ - singlePushTxnSendFunc, - resolveIntentsSendFunc, - resolveIntentsSendFunc, - gcSendFunc, + singlePushTxnSendFunc(t), + resolveIntentsSendFunc(t), + resolveIntentsSendFunc(t), + gcSendFunc(t), }, expectPushed: true, expectSucceed: true, @@ -179,7 +179,7 @@ func TestCleanupTxnIntentsOnGCAsync(t *testing.T) { { txn: txn2, intents: []roachpb.Intent{}, - sendFuncs: []sendFunc{gcSendFunc}, + sendFuncs: []sendFunc{gcSendFunc(t)}, expectSucceed: true, }, } @@ -356,7 +356,7 @@ func TestContendedIntent(t *testing.T) { go func() { cleanupFunc, pErr := ir.ProcessWriteIntentError(testCtx, roachpb.NewError(wiErr), nil, h, roachpb.PUSH_ABORT) if pErr != nil { - panic(pErr) + t.Errorf("unexpected error from ProcessWriteIntentError: %v", pErr) } cleanupFuncs <- cleanupFunc wg.Done() @@ -437,9 +437,9 @@ func TestCleanupIntentsAsyncThrottled(t *testing.T) { Clock: clock, } txn := beginTransaction(t, clock, 1, roachpb.Key("a"), true /* putKey */) - sf := newSendFuncs( - pushTxnSendFunc(1), - resolveIntentsSendFunc, + sf := newSendFuncs(t, + pushTxnSendFunc(t, 1), + resolveIntentsSendFunc(t), ) ir := newIntentResolverWithSendFuncs(cfg, sf) // Run defaultTaskLimit tasks which will block until blocker is closed. @@ -492,14 +492,14 @@ func TestCleanupIntentsAsync(t *testing.T) { { intents: testIntentsWithArg, sendFuncs: []sendFunc{ - singlePushTxnSendFunc, - resolveIntentsSendFunc, + singlePushTxnSendFunc(t), + resolveIntentsSendFunc(t), }, }, { intents: testIntentsWithArg, sendFuncs: []sendFunc{ - singlePushTxnSendFunc, + singlePushTxnSendFunc(t), failSendFunc, }, }, @@ -513,7 +513,7 @@ func TestCleanupIntentsAsync(t *testing.T) { for _, c := range cases { t.Run("", func(t *testing.T) { stopper := stop.NewStopper() - sf := newSendFuncs(c.sendFuncs...) + sf := newSendFuncs(t, c.sendFuncs...) cfg := Config{ Stopper: stopper, Clock: clock, @@ -532,11 +532,12 @@ func TestCleanupIntentsAsync(t *testing.T) { } } -func newSendFuncs(sf ...sendFunc) *sendFuncs { - return &sendFuncs{sendFuncs: sf} +func newSendFuncs(t *testing.T, sf ...sendFunc) *sendFuncs { + return &sendFuncs{t: t, sendFuncs: sf} } type sendFuncs struct { + t *testing.T mu syncutil.Mutex sendFuncs []sendFunc } @@ -547,11 +548,13 @@ func (sf *sendFuncs) len() int { return len(sf.sendFuncs) } -func (sf *sendFuncs) pop() sendFunc { - sf.mu.Lock() - defer sf.mu.Unlock() +func (sf *sendFuncs) pushFrontLocked(f ...sendFunc) { + sf.sendFuncs = append(f, sf.sendFuncs...) +} + +func (sf *sendFuncs) popLocked() sendFunc { if len(sf.sendFuncs) == 0 { - panic("no send funcs left!") + sf.t.Errorf("No send funcs left!") } ret := sf.sendFuncs[0] sf.sendFuncs = sf.sendFuncs[1:] @@ -592,8 +595,8 @@ func TestCleanupTxnIntentsAsync(t *testing.T) { { intents: testEndTxnIntents, sendFuncs: []sendFunc{ - resolveIntentsSendFunc, - gcSendFunc, + resolveIntentsSendFunc(t), + gcSendFunc(t), }, }, } @@ -608,7 +611,7 @@ func TestCleanupTxnIntentsAsync(t *testing.T) { } var sendFuncCalled int64 numSendFuncs := int64(len(c.sendFuncs)) - sf := newSendFuncs(counterSendFuncs(&sendFuncCalled, c.sendFuncs)...) + sf := newSendFuncs(t, counterSendFuncs(&sendFuncCalled, c.sendFuncs)...) ir := newIntentResolverWithSendFuncs(cfg, sf) if c.before != nil { defer c.before(&c, ir)() @@ -654,7 +657,7 @@ func TestCleanupIntents(t *testing.T) { } type testCase struct { intents []roachpb.Intent - sendFuncs []sendFunc + sendFuncs *sendFuncs expectedErr bool expectedNum int cfg Config @@ -662,17 +665,17 @@ func TestCleanupIntents(t *testing.T) { cases := []testCase{ { intents: testIntents, - sendFuncs: []sendFunc{ - singlePushTxnSendFunc, - resolveIntentsSendFunc, - }, + sendFuncs: newSendFuncs(t, + singlePushTxnSendFunc(t), + resolveIntentsSendFunc(t), + ), expectedNum: 1, }, { intents: testIntents, - sendFuncs: []sendFunc{ + sendFuncs: newSendFuncs(t, failSendFunc, - }, + ), expectedErr: true, }, { @@ -680,25 +683,24 @@ func TestCleanupIntents(t *testing.T) { // Three intents with the same transaction will only attempt to push the // txn 1 time. Hence 3 full batches plus 1 extra. testIntents[0], testIntents[0], testIntents[0]), - sendFuncs: []sendFunc{ - pushTxnSendFunc(intentResolverBatchSize), - resolveIntentsSendFunc, - resolveIntentsSendFunc, - pushTxnSendFunc(intentResolverBatchSize), - resolveIntentsSendFunc, - pushTxnSendFunc(intentResolverBatchSize), - resolveIntentsSendFunc, - pushTxnSendFunc(1), - resolveIntentsSendFunc, - }, + sendFuncs: func() *sendFuncs { + sf := newSendFuncs(t) + sf.pushFrontLocked( // don't need to lock + pushTxnSendFuncs(sf, intentResolverBatchSize), + resolveIntentsSendFuncs(sf, 102 /* numIntents */, 2 /* minNumReqs */), + pushTxnSendFuncs(sf, intentResolverBatchSize), + resolveIntentsSendFuncs(sf, 100 /* numIntents */, 1 /* minNumReqs */), + pushTxnSendFuncs(sf, intentResolverBatchSize), + resolveIntentsSendFuncs(sf, 100 /* numIntents */, 1 /* minNumReqs */), + pushTxnSendFuncs(sf, 1), + resolveIntentsSendFuncs(sf, 1 /* numIntents */, 1 /* minNumReqs */), + ) + return sf + }(), expectedNum: 3*intentResolverBatchSize + 3, - // Under stress sometimes it can take more than 10ms to even call send. - // The batch wait is disabled and batch idle increased during this test - // to eliminate flakiness and ensure that all requests make it to the - // batcher in a timely manner. cfg: Config{ MaxIntentResolutionBatchWait: -1, // disabled - MaxIntentResolutionBatchIdle: 20 * time.Millisecond, + MaxIntentResolutionBatchIdle: 1 * time.Microsecond, }, }, } @@ -708,7 +710,7 @@ func TestCleanupIntents(t *testing.T) { t.Run("", func(t *testing.T) { c.cfg.Stopper = stopper c.cfg.Clock = clock - ir := newIntentResolverWithSendFuncs(c.cfg, newSendFuncs(c.sendFuncs...)) + ir := newIntentResolverWithSendFuncs(c.cfg, c.sendFuncs) num, err := ir.CleanupIntents(context.Background(), c.intents, clock.Now(), roachpb.PUSH_ABORT) assert.Equal(t, num, c.expectedNum, "number of resolved intents") assert.Equal(t, err != nil, c.expectedErr, "error during CleanupIntents: %v", err) @@ -798,7 +800,9 @@ type sendFunc func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Er func newIntentResolverWithSendFuncs(c Config, sf *sendFuncs) *IntentResolver { txnSenderFactory := client.NonTransactionalFactoryFunc( func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { - f := sf.pop() + sf.mu.Lock() + defer sf.mu.Unlock() + f := sf.popLocked() return f(ba) }) db := client.NewDB(log.AmbientContext{ @@ -809,53 +813,112 @@ func newIntentResolverWithSendFuncs(c Config, sf *sendFuncs) *IntentResolver { return New(c) } -var ( - pushTxnSendFunc = func(numPushes int) sendFunc { - return func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { - if len(ba.Requests) != numPushes { - panic(fmt.Errorf("expected %d PushTxnRequests in batch, got %d", - numPushes, len(ba.Requests))) - } - resp := &roachpb.BatchResponse{} - for _, r := range ba.Requests { - req := r.GetInner().(*roachpb.PushTxnRequest) - txn := req.PusheeTxn - resp.Add(&roachpb.PushTxnResponse{ - PusheeTxn: roachpb.Transaction{ - Status: roachpb.ABORTED, - TxnMeta: txn, - }, - }) - } - return resp, nil +// pushTxnSendFuncs allows the pushing of N txns across several invocations. +func pushTxnSendFuncs(sf *sendFuncs, N int) sendFunc { + toPush := int64(N) + var f sendFunc + f = func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + if remaining := atomic.LoadInt64(&toPush); len(ba.Requests) > int(remaining) { + sf.t.Errorf("expected at most %d PushTxnRequests in batch, got %d", + remaining, len(ba.Requests)) } + nowRemaining := atomic.AddInt64(&toPush, -1*int64(len(ba.Requests))) + if nowRemaining > 0 { + sf.pushFrontLocked(f) + } + return respForPushTxnBatch(sf.t, ba), nil } - singlePushTxnSendFunc = pushTxnSendFunc(1) - resolveIntentsSendFunc sendFunc = func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { - resp := &roachpb.BatchResponse{} - for _, r := range ba.Requests { - if _, ok := r.GetInner().(*roachpb.ResolveIntentRequest); ok { - resp.Add(&roachpb.ResolveIntentResponse{}) - } else if _, ok := r.GetInner().(*roachpb.ResolveIntentRangeRequest); ok { - resp.Add(&roachpb.ResolveIntentRangeResponse{}) - } else { - panic(fmt.Errorf("Unexpected request in batch for intent resolution: %T", r.GetInner())) - } + return f +} + +func pushTxnSendFunc(t *testing.T, numPushes int) sendFunc { + return func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + if len(ba.Requests) != numPushes { + t.Errorf("expected %d PushTxnRequests in batch, got %d", + numPushes, len(ba.Requests)) } - return resp, nil + return respForPushTxnBatch(t, ba), nil } - failSendFunc sendFunc = func(roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { - return nil, roachpb.NewError(fmt.Errorf("boom")) +} + +func singlePushTxnSendFunc(t *testing.T) sendFunc { + return pushTxnSendFunc(t, 1) +} + +func resolveIntentsSendFuncs(sf *sendFuncs, numIntents int, minRequests int) sendFunc { + toResolve := int64(numIntents) + reqsSeen := int64(0) + var f sendFunc + f = func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + if remaining := atomic.LoadInt64(&toResolve); len(ba.Requests) > int(remaining) { + sf.t.Errorf("expected at most %d ResolveIntentRequests in batch, got %d", + remaining, len(ba.Requests)) + } + nowRemaining := atomic.AddInt64(&toResolve, -1*int64(len(ba.Requests))) + seen := atomic.AddInt64(&reqsSeen, 1) + if nowRemaining > 0 { + sf.pushFrontLocked(f) + } else if seen < int64(minRequests) { + sf.t.Errorf("expected at least %d requests to resolve %d intents, only saw %d", + minRequests, numIntents, seen) + } + return respForResolveIntentBatch(sf.t, ba), nil + } + return f +} + +func resolveIntentsSendFunc(t *testing.T) sendFunc { + return func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + return respForResolveIntentBatch(t, ba), nil } - gcSendFunc sendFunc = func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { +} + +func failSendFunc(roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + return nil, roachpb.NewError(fmt.Errorf("boom")) +} + +func gcSendFunc(t *testing.T) sendFunc { + return func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { resp := &roachpb.BatchResponse{} for _, r := range ba.Requests { - if _, ok := r.GetInner().(*roachpb.GCRequest); ok { - resp.Add(&roachpb.GCResponse{}) - } else { - panic(fmt.Errorf("Unexpected request type %T, expecte GCRequest", r.GetInner())) + if _, ok := r.GetInner().(*roachpb.GCRequest); !ok { + t.Errorf("Unexpected request type %T, expected GCRequest", r.GetInner()) } + resp.Add(&roachpb.GCResponse{}) } return resp, nil } -) +} + +func respForPushTxnBatch(t *testing.T, ba roachpb.BatchRequest) *roachpb.BatchResponse { + resp := &roachpb.BatchResponse{} + for _, r := range ba.Requests { + var txn enginepb.TxnMeta + if req, ok := r.GetInner().(*roachpb.PushTxnRequest); ok { + txn = req.PusheeTxn + } else { + t.Errorf("Unexpected request type %T, expected PushTxnRequest", r.GetInner()) + } + resp.Add(&roachpb.PushTxnResponse{ + PusheeTxn: roachpb.Transaction{ + Status: roachpb.ABORTED, + TxnMeta: txn, + }, + }) + } + return resp +} + +func respForResolveIntentBatch(t *testing.T, ba roachpb.BatchRequest) *roachpb.BatchResponse { + resp := &roachpb.BatchResponse{} + for _, r := range ba.Requests { + if _, ok := r.GetInner().(*roachpb.ResolveIntentRequest); ok { + resp.Add(&roachpb.ResolveIntentResponse{}) + } else if _, ok := r.GetInner().(*roachpb.ResolveIntentRangeRequest); ok { + resp.Add(&roachpb.ResolveIntentRangeResponse{}) + } else { + t.Errorf("Unexpected request in batch for intent resolution: %T", r.GetInner()) + } + } + return resp +}