Skip to content

Commit

Permalink
kv: add active pushee variants TestLockTableWaiterWithErrorWaitPolicy
Browse files Browse the repository at this point in the history
The test was not exercising the case where a pushee was still active
during the PUSH_TOUCH issued by requests with an Error wait policy.
  • Loading branch information
nvanbenschoten committed Aug 9, 2021
1 parent 8de063f commit 5dbc576
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 53 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/concurrency/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ go_test(
"//pkg/settings/cluster",
"//pkg/storage/enginepb",
"//pkg/testutils",
"//pkg/testutils/skip",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
128 changes: 75 additions & 53 deletions pkg/kv/kvserver/concurrency/lock_table_waiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -411,6 +412,8 @@ func notifyUntilDone(t *testing.T, g *mockLockTableGuard) func() {
return func() { <-done }
}

// TestLockTableWaiterWithErrorWaitPolicy tests the lockTableWaiter's behavior
// under different waiting states with an Error wait policy.
func TestLockTableWaiterWithErrorWaitPolicy(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -470,66 +473,85 @@ func testErrorWaitPush(t *testing.T, k waitKind, makeReq func() Request, expPush
ctx := context.Background()
keyA := roachpb.Key("keyA")
testutils.RunTrueAndFalse(t, "lockHeld", func(t *testing.T, lockHeld bool) {
w, ir, g := setupLockTableWaiterTest()
defer w.stopper.Stop(ctx)
pusheeTxn := makeTxnProto("pushee")
testutils.RunTrueAndFalse(t, "pusheeActive", func(t *testing.T, pusheeActive bool) {
if !lockHeld && !pusheeActive {
// !lockHeld means a lock reservation, so is only possible when
// pusheeActive is true.
skip.IgnoreLint(t, "incompatible params")
}

req := makeReq()
g.state = waitingState{
kind: k,
txn: &pusheeTxn.TxnMeta,
key: keyA,
held: lockHeld,
guardAccess: spanset.SpanReadOnly,
}
g.notify()
w, ir, g := setupLockTableWaiterTest()
defer w.stopper.Stop(ctx)
pusheeTxn := makeTxnProto("pushee")

// If the lock is not held or expPushTS is empty, expect an error
// immediately. The one exception to this is waitElsewhere, which
// expects no error.
if !lockHeld || expPushTS == dontExpectPush {
err := w.WaitOn(ctx, req, g)
if k == waitElsewhere {
require.Nil(t, err)
} else {
require.NotNil(t, err)
require.Regexp(t, "conflicting intents", err)
req := makeReq()
g.state = waitingState{
kind: k,
txn: &pusheeTxn.TxnMeta,
key: keyA,
held: lockHeld,
guardAccess: spanset.SpanReadOnly,
}
return
}
g.notify()

ir.pushTxn = func(
_ context.Context,
pusheeArg *enginepb.TxnMeta,
h roachpb.Header,
pushType roachpb.PushTxnType,
) (*roachpb.Transaction, *Error) {
require.Equal(t, &pusheeTxn.TxnMeta, pusheeArg)
require.Equal(t, req.Txn, h.Txn)
require.Equal(t, expPushTS, h.Timestamp)
require.Equal(t, roachpb.PUSH_TOUCH, pushType)

resp := &roachpb.Transaction{TxnMeta: *pusheeArg, Status: roachpb.ABORTED}

// Next, we'll try to resolve the lock now that we know the
// holder is ABORTED.
w.lt.(*mockLockTable).txnFinalizedFn = func(txn *roachpb.Transaction) {
require.Equal(t, pusheeTxn.ID, txn.ID)
require.Equal(t, roachpb.ABORTED, txn.Status)
// If the lock is not held or expPushTS is empty, expect an error
// immediately. The one exception to this is waitElsewhere, which
// expects no error.
if !lockHeld || expPushTS == dontExpectPush {
err := w.WaitOn(ctx, req, g)
if k == waitElsewhere {
require.Nil(t, err)
} else {
require.NotNil(t, err)
require.Regexp(t, "conflicting intents", err)
}
return
}
ir.resolveIntent = func(_ context.Context, intent roachpb.LockUpdate) *Error {
require.Equal(t, keyA, intent.Key)
require.Equal(t, pusheeTxn.ID, intent.Txn.ID)
require.Equal(t, roachpb.ABORTED, intent.Status)
g.state = waitingState{kind: doneWaiting}
g.notify()
return nil

ir.pushTxn = func(
_ context.Context,
pusheeArg *enginepb.TxnMeta,
h roachpb.Header,
pushType roachpb.PushTxnType,
) (*roachpb.Transaction, *Error) {
require.Equal(t, &pusheeTxn.TxnMeta, pusheeArg)
require.Equal(t, req.Txn, h.Txn)
require.Equal(t, expPushTS, h.Timestamp)
require.Equal(t, roachpb.PUSH_TOUCH, pushType)

resp := &roachpb.Transaction{TxnMeta: *pusheeArg, Status: roachpb.PENDING}
if pusheeActive {
return nil, roachpb.NewError(&roachpb.TransactionPushError{
PusheeTxn: *resp,
})
}

// Next, we'll try to resolve the lock now that we know the
// holder is ABORTED.
w.lt.(*mockLockTable).txnFinalizedFn = func(txn *roachpb.Transaction) {
require.Equal(t, pusheeTxn.ID, txn.ID)
require.Equal(t, roachpb.ABORTED, txn.Status)
}
ir.resolveIntent = func(_ context.Context, intent roachpb.LockUpdate) *Error {
require.Equal(t, keyA, intent.Key)
require.Equal(t, pusheeTxn.ID, intent.Txn.ID)
require.Equal(t, roachpb.ABORTED, intent.Status)
g.state = waitingState{kind: doneWaiting}
g.notify()
return nil
}
resp.Status = roachpb.ABORTED
return resp, nil
}
return resp, nil
}

err := w.WaitOn(ctx, req, g)
require.Nil(t, err)
err := w.WaitOn(ctx, req, g)
if pusheeActive {
require.NotNil(t, err)
require.Regexp(t, "conflicting intents", err)
} else {
require.Nil(t, err)
}
})
})
}

Expand Down

0 comments on commit 5dbc576

Please sign in to comment.