Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

intentresolver: fix test flake for CleanupIntents #35273

Merged
merged 1 commit into from
Mar 6, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 151 additions & 88 deletions pkg/storage/intentresolver/intent_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ func TestCleanupTxnIntentsOnGCAsync(t *testing.T) {
},
},
sendFuncs: []sendFunc{
singlePushTxnSendFunc,
resolveIntentsSendFunc,
singlePushTxnSendFunc(t),
resolveIntentsSendFunc(t),
failSendFunc,
},
expectPushed: true,
Expand All @@ -165,10 +165,10 @@ func TestCleanupTxnIntentsOnGCAsync(t *testing.T) {
{Span: roachpb.Span{Key: key, EndKey: roachpb.Key("b")}, Txn: txn1.TxnMeta},
},
sendFuncs: []sendFunc{
singlePushTxnSendFunc,
resolveIntentsSendFunc,
resolveIntentsSendFunc,
gcSendFunc,
singlePushTxnSendFunc(t),
resolveIntentsSendFunc(t),
resolveIntentsSendFunc(t),
gcSendFunc(t),
},
expectPushed: true,
expectSucceed: true,
Expand All @@ -179,7 +179,7 @@ func TestCleanupTxnIntentsOnGCAsync(t *testing.T) {
{
txn: txn2,
intents: []roachpb.Intent{},
sendFuncs: []sendFunc{gcSendFunc},
sendFuncs: []sendFunc{gcSendFunc(t)},
expectSucceed: true,
},
}
Expand Down Expand Up @@ -356,7 +356,7 @@ func TestContendedIntent(t *testing.T) {
go func() {
cleanupFunc, pErr := ir.ProcessWriteIntentError(testCtx, roachpb.NewError(wiErr), nil, h, roachpb.PUSH_ABORT)
if pErr != nil {
panic(pErr)
t.Errorf("unexpected error from ProcessWriteIntentError: %v", pErr)
}
cleanupFuncs <- cleanupFunc
wg.Done()
Expand Down Expand Up @@ -437,9 +437,9 @@ func TestCleanupIntentsAsyncThrottled(t *testing.T) {
Clock: clock,
}
txn := beginTransaction(t, clock, 1, roachpb.Key("a"), true /* putKey */)
sf := newSendFuncs(
pushTxnSendFunc(1),
resolveIntentsSendFunc,
sf := newSendFuncs(t,
pushTxnSendFunc(t, 1),
resolveIntentsSendFunc(t),
)
ir := newIntentResolverWithSendFuncs(cfg, sf)
// Run defaultTaskLimit tasks which will block until blocker is closed.
Expand Down Expand Up @@ -492,14 +492,14 @@ func TestCleanupIntentsAsync(t *testing.T) {
{
intents: testIntentsWithArg,
sendFuncs: []sendFunc{
singlePushTxnSendFunc,
resolveIntentsSendFunc,
singlePushTxnSendFunc(t),
resolveIntentsSendFunc(t),
},
},
{
intents: testIntentsWithArg,
sendFuncs: []sendFunc{
singlePushTxnSendFunc,
singlePushTxnSendFunc(t),
failSendFunc,
},
},
Expand All @@ -513,7 +513,7 @@ func TestCleanupIntentsAsync(t *testing.T) {
for _, c := range cases {
t.Run("", func(t *testing.T) {
stopper := stop.NewStopper()
sf := newSendFuncs(c.sendFuncs...)
sf := newSendFuncs(t, c.sendFuncs...)
cfg := Config{
Stopper: stopper,
Clock: clock,
Expand All @@ -532,11 +532,12 @@ func TestCleanupIntentsAsync(t *testing.T) {
}
}

func newSendFuncs(sf ...sendFunc) *sendFuncs {
return &sendFuncs{sendFuncs: sf}
func newSendFuncs(t *testing.T, sf ...sendFunc) *sendFuncs {
return &sendFuncs{t: t, sendFuncs: sf}
}

type sendFuncs struct {
t *testing.T
mu syncutil.Mutex
sendFuncs []sendFunc
}
Expand All @@ -547,11 +548,13 @@ func (sf *sendFuncs) len() int {
return len(sf.sendFuncs)
}

func (sf *sendFuncs) pop() sendFunc {
sf.mu.Lock()
defer sf.mu.Unlock()
func (sf *sendFuncs) pushFrontLocked(f ...sendFunc) {
sf.sendFuncs = append(f, sf.sendFuncs...)
}

func (sf *sendFuncs) popLocked() sendFunc {
if len(sf.sendFuncs) == 0 {
panic("no send funcs left!")
sf.t.Errorf("No send funcs left!")
}
ret := sf.sendFuncs[0]
sf.sendFuncs = sf.sendFuncs[1:]
Expand Down Expand Up @@ -592,8 +595,8 @@ func TestCleanupTxnIntentsAsync(t *testing.T) {
{
intents: testEndTxnIntents,
sendFuncs: []sendFunc{
resolveIntentsSendFunc,
gcSendFunc,
resolveIntentsSendFunc(t),
gcSendFunc(t),
},
},
}
Expand All @@ -608,7 +611,7 @@ func TestCleanupTxnIntentsAsync(t *testing.T) {
}
var sendFuncCalled int64
numSendFuncs := int64(len(c.sendFuncs))
sf := newSendFuncs(counterSendFuncs(&sendFuncCalled, c.sendFuncs)...)
sf := newSendFuncs(t, counterSendFuncs(&sendFuncCalled, c.sendFuncs)...)
ir := newIntentResolverWithSendFuncs(cfg, sf)
if c.before != nil {
defer c.before(&c, ir)()
Expand Down Expand Up @@ -654,51 +657,50 @@ func TestCleanupIntents(t *testing.T) {
}
type testCase struct {
intents []roachpb.Intent
sendFuncs []sendFunc
sendFuncs *sendFuncs
expectedErr bool
expectedNum int
cfg Config
}
cases := []testCase{
{
intents: testIntents,
sendFuncs: []sendFunc{
singlePushTxnSendFunc,
resolveIntentsSendFunc,
},
sendFuncs: newSendFuncs(t,
singlePushTxnSendFunc(t),
resolveIntentsSendFunc(t),
),
expectedNum: 1,
},
{
intents: testIntents,
sendFuncs: []sendFunc{
sendFuncs: newSendFuncs(t,
failSendFunc,
},
),
expectedErr: true,
},
{
intents: append(makeTxnIntents(t, clock, 3*intentResolverBatchSize),
// Three intents with the same transaction will only attempt to push the
// txn 1 time. Hence 3 full batches plus 1 extra.
testIntents[0], testIntents[0], testIntents[0]),
sendFuncs: []sendFunc{
pushTxnSendFunc(intentResolverBatchSize),
resolveIntentsSendFunc,
resolveIntentsSendFunc,
pushTxnSendFunc(intentResolverBatchSize),
resolveIntentsSendFunc,
pushTxnSendFunc(intentResolverBatchSize),
resolveIntentsSendFunc,
pushTxnSendFunc(1),
resolveIntentsSendFunc,
},
sendFuncs: func() *sendFuncs {
sf := newSendFuncs(t)
sf.pushFrontLocked( // don't need to lock
pushTxnSendFuncs(sf, intentResolverBatchSize),
resolveIntentsSendFuncs(sf, 102 /* numIntents */, 2 /* minNumReqs */),
pushTxnSendFuncs(sf, intentResolverBatchSize),
resolveIntentsSendFuncs(sf, 100 /* numIntents */, 1 /* minNumReqs */),
pushTxnSendFuncs(sf, intentResolverBatchSize),
resolveIntentsSendFuncs(sf, 100 /* numIntents */, 1 /* minNumReqs */),
pushTxnSendFuncs(sf, 1),
resolveIntentsSendFuncs(sf, 1 /* numIntents */, 1 /* minNumReqs */),
)
return sf
}(),
expectedNum: 3*intentResolverBatchSize + 3,
// Under stress sometimes it can take more than 10ms to even call send.
// The batch wait is disabled and batch idle increased during this test
// to eliminate flakiness and ensure that all requests make it to the
// batcher in a timely manner.
cfg: Config{
MaxIntentResolutionBatchWait: -1, // disabled
MaxIntentResolutionBatchIdle: 20 * time.Millisecond,
MaxIntentResolutionBatchIdle: 1 * time.Microsecond,
},
},
}
Expand All @@ -708,7 +710,7 @@ func TestCleanupIntents(t *testing.T) {
t.Run("", func(t *testing.T) {
c.cfg.Stopper = stopper
c.cfg.Clock = clock
ir := newIntentResolverWithSendFuncs(c.cfg, newSendFuncs(c.sendFuncs...))
ir := newIntentResolverWithSendFuncs(c.cfg, c.sendFuncs)
num, err := ir.CleanupIntents(context.Background(), c.intents, clock.Now(), roachpb.PUSH_ABORT)
assert.Equal(t, num, c.expectedNum, "number of resolved intents")
assert.Equal(t, err != nil, c.expectedErr, "error during CleanupIntents: %v", err)
Expand Down Expand Up @@ -798,7 +800,9 @@ type sendFunc func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Er
func newIntentResolverWithSendFuncs(c Config, sf *sendFuncs) *IntentResolver {
txnSenderFactory := client.NonTransactionalFactoryFunc(
func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
f := sf.pop()
sf.mu.Lock()
defer sf.mu.Unlock()
f := sf.popLocked()
return f(ba)
})
db := client.NewDB(log.AmbientContext{
Expand All @@ -809,53 +813,112 @@ func newIntentResolverWithSendFuncs(c Config, sf *sendFuncs) *IntentResolver {
return New(c)
}

var (
pushTxnSendFunc = func(numPushes int) sendFunc {
return func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
if len(ba.Requests) != numPushes {
panic(fmt.Errorf("expected %d PushTxnRequests in batch, got %d",
numPushes, len(ba.Requests)))
}
resp := &roachpb.BatchResponse{}
for _, r := range ba.Requests {
req := r.GetInner().(*roachpb.PushTxnRequest)
txn := req.PusheeTxn
resp.Add(&roachpb.PushTxnResponse{
PusheeTxn: roachpb.Transaction{
Status: roachpb.ABORTED,
TxnMeta: txn,
},
})
}
return resp, nil
// pushTxnSendFuncs allows the pushing of N txns across several invocations.
func pushTxnSendFuncs(sf *sendFuncs, N int) sendFunc {
toPush := int64(N)
var f sendFunc
f = func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
if remaining := atomic.LoadInt64(&toPush); len(ba.Requests) > int(remaining) {
sf.t.Errorf("expected at most %d PushTxnRequests in batch, got %d",
remaining, len(ba.Requests))
}
nowRemaining := atomic.AddInt64(&toPush, -1*int64(len(ba.Requests)))
if nowRemaining > 0 {
sf.pushFrontLocked(f)
}
return respForPushTxnBatch(sf.t, ba), nil
}
singlePushTxnSendFunc = pushTxnSendFunc(1)
resolveIntentsSendFunc sendFunc = func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
resp := &roachpb.BatchResponse{}
for _, r := range ba.Requests {
if _, ok := r.GetInner().(*roachpb.ResolveIntentRequest); ok {
resp.Add(&roachpb.ResolveIntentResponse{})
} else if _, ok := r.GetInner().(*roachpb.ResolveIntentRangeRequest); ok {
resp.Add(&roachpb.ResolveIntentRangeResponse{})
} else {
panic(fmt.Errorf("Unexpected request in batch for intent resolution: %T", r.GetInner()))
}
return f
}

func pushTxnSendFunc(t *testing.T, numPushes int) sendFunc {
return func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
if len(ba.Requests) != numPushes {
t.Errorf("expected %d PushTxnRequests in batch, got %d",
numPushes, len(ba.Requests))
}
return resp, nil
return respForPushTxnBatch(t, ba), nil
}
failSendFunc sendFunc = func(roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
return nil, roachpb.NewError(fmt.Errorf("boom"))
}

func singlePushTxnSendFunc(t *testing.T) sendFunc {
return pushTxnSendFunc(t, 1)
}

func resolveIntentsSendFuncs(sf *sendFuncs, numIntents int, minRequests int) sendFunc {
toResolve := int64(numIntents)
reqsSeen := int64(0)
var f sendFunc
f = func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
if remaining := atomic.LoadInt64(&toResolve); len(ba.Requests) > int(remaining) {
sf.t.Errorf("expected at most %d ResolveIntentRequests in batch, got %d",
remaining, len(ba.Requests))
}
nowRemaining := atomic.AddInt64(&toResolve, -1*int64(len(ba.Requests)))
seen := atomic.AddInt64(&reqsSeen, 1)
if nowRemaining > 0 {
sf.pushFrontLocked(f)
} else if seen < int64(minRequests) {
sf.t.Errorf("expected at least %d requests to resolve %d intents, only saw %d",
minRequests, numIntents, seen)
}
return respForResolveIntentBatch(sf.t, ba), nil
}
return f
}

func resolveIntentsSendFunc(t *testing.T) sendFunc {
return func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
return respForResolveIntentBatch(t, ba), nil
}
gcSendFunc sendFunc = func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
}

func failSendFunc(roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
return nil, roachpb.NewError(fmt.Errorf("boom"))
}

func gcSendFunc(t *testing.T) sendFunc {
return func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
resp := &roachpb.BatchResponse{}
for _, r := range ba.Requests {
if _, ok := r.GetInner().(*roachpb.GCRequest); ok {
resp.Add(&roachpb.GCResponse{})
} else {
panic(fmt.Errorf("Unexpected request type %T, expecte GCRequest", r.GetInner()))
if _, ok := r.GetInner().(*roachpb.GCRequest); !ok {
t.Errorf("Unexpected request type %T, expected GCRequest", r.GetInner())
}
resp.Add(&roachpb.GCResponse{})
}
return resp, nil
}
)
}

func respForPushTxnBatch(t *testing.T, ba roachpb.BatchRequest) *roachpb.BatchResponse {
resp := &roachpb.BatchResponse{}
for _, r := range ba.Requests {
var txn enginepb.TxnMeta
if req, ok := r.GetInner().(*roachpb.PushTxnRequest); ok {
txn = req.PusheeTxn
} else {
t.Errorf("Unexpected request type %T, expected PushTxnRequest", r.GetInner())
}
resp.Add(&roachpb.PushTxnResponse{
PusheeTxn: roachpb.Transaction{
Status: roachpb.ABORTED,
TxnMeta: txn,
},
})
}
return resp
}

func respForResolveIntentBatch(t *testing.T, ba roachpb.BatchRequest) *roachpb.BatchResponse {
resp := &roachpb.BatchResponse{}
for _, r := range ba.Requests {
if _, ok := r.GetInner().(*roachpb.ResolveIntentRequest); ok {
resp.Add(&roachpb.ResolveIntentResponse{})
} else if _, ok := r.GetInner().(*roachpb.ResolveIntentRangeRequest); ok {
resp.Add(&roachpb.ResolveIntentRangeResponse{})
} else {
t.Errorf("Unexpected request in batch for intent resolution: %T", r.GetInner())
}
}
return resp
}