Skip to content

Commit

Permalink
kv: add WaitPolicy option to BatchRequest
Browse files Browse the repository at this point in the history
Informs #40476.
Informs #51624.

This commit introduces the concept of request "wait policies". A
WaitPolicy specifies the behavior of a request when it encounters
conflicting locks held by other active transactions. The default
behavior is to block until the conflicting lock is released, but other
policies can make sense in special situations.

Within this new formalization, the commit creates two initial wait
policy variants:
```
// Block indicates that if a request encounters a conflicting locks held by
// another active transaction, it should wait for the conflicting lock to be
// released before proceeding.
Block = 0;

// Error indicates that if a request encounters a conflicting locks held by
// another active transaction, it should raise an error instead of blocking.
Error = 1;
```

`Block` is equivalent to the current behavior, so there's no further
action needed for that variant.

However, the `Error` policy is new, and this commit teaches the
`lockTableWaiter` about it. It ensures that when a request with the
`Error` wait policy encounters a conflicting lock, it uses a PUSH_TOUCH
PushRequest to determine whether the lock is abandoned or whether its
holder is still active. If the holder is abandoned, the request cleans
up the lock and proceeds. If the holder is still active, a
WriteIntentError is returned to the client.

This will unblock both of the referenced issues.
  • Loading branch information
nvanbenschoten committed Aug 7, 2020
1 parent 9ab5dbc commit 1682c40
Show file tree
Hide file tree
Showing 19 changed files with 1,396 additions and 689 deletions.
10 changes: 10 additions & 0 deletions c-deps/libroach/protos/kv/kvserver/concurrency/lock/locking.pb.cc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions c-deps/libroach/protos/kv/kvserver/concurrency/lock/locking.pb.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

66 changes: 49 additions & 17 deletions c-deps/libroach/protos/roachpb/api.pb.cc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 28 additions & 7 deletions c-deps/libroach/protos/roachpb/api.pb.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 64 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/tscache"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
Expand Down Expand Up @@ -722,3 +723,66 @@ func TestTxnContinueAfterCputError(t *testing.T) {
require.NoError(t, txn.Put(ctx, "a", "b'"))
require.NoError(t, txn.Commit(ctx))
}

func TestTxnWaitPolicies(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
s := createTestDB(t)
defer s.Stop()

testutils.RunTrueAndFalse(t, "highPriority", func(t *testing.T, highPriority bool) {
key := []byte("b")
txn := s.DB.NewTxn(ctx, "test txn")
require.NoError(t, txn.Put(ctx, key, "new value"))

pri := roachpb.NormalUserPriority
if highPriority {
pri = roachpb.MaxUserPriority
}

// Block wait policy.
blockC := make(chan error)
go func() {
var b kv.Batch
b.Header.UserPriority = pri
b.Header.WaitPolicy = lock.WaitPolicy_Block
b.Get(key)
blockC <- s.DB.Run(ctx, &b)
}()

if highPriority {
// Should push txn and not block.
require.NoError(t, <-blockC)
} else {
// Should block.
select {
case err := <-blockC:
t.Fatalf("blocking wait policy unexpected returned with err=%v", err)
case <-time.After(10 * time.Millisecond):
}
}

// Error wait policy.
errorC := make(chan error)
go func() {
var b kv.Batch
b.Header.UserPriority = pri
b.Header.WaitPolicy = lock.WaitPolicy_Error
b.Get(key)
errorC <- s.DB.Run(ctx, &b)
}()

// Should return error immediately, without blocking.
// Priority does not matter.
err := <-errorC
require.NotNil(t, err)
require.IsType(t, &roachpb.WriteIntentError{}, err)

// Let blocked requests proceed.
require.NoError(t, txn.Commit(ctx))
if !highPriority {
require.NoError(t, <-blockC)
}
})
}
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/concurrency/concurrency_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,11 @@ type Request struct {
// The consistency level of the request. Only set if Txn is nil.
ReadConsistency roachpb.ReadConsistencyType

// The wait policy of the request. Signifies how the request should
// behave if it encounters conflicting locks held by other active
// transactions.
WaitPolicy lock.WaitPolicy

// The individual requests in the batch.
Requests []roachpb.RequestUnion

Expand Down
13 changes: 11 additions & 2 deletions pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import (
// The input files use the following DSL:
//
// new-txn name=<txn-name> ts=<int>[,<int>] epoch=<int> [maxts=<int>[,<int>]]
// new-request name=<req-name> txn=<txn-name>|none ts=<int>[,<int>] [priority] [consistency]
// new-request name=<req-name> txn=<txn-name>|none ts=<int>[,<int>] [priority] [inconsistent] [wait-policy=<policy>]
// <proto-name> [<field-name>=<field-value>...] (hint: see scanSingleRequest)
// sequence req=<req-name>
// finish req=<req-name>
Expand Down Expand Up @@ -149,6 +149,8 @@ func TestConcurrencyManagerBasic(t *testing.T) {
readConsistency = roachpb.INCONSISTENT
}

waitPolicy := scanWaitPolicy(t, d, false /* required */)

// Each roachpb.Request is provided on an indented line.
var reqs []roachpb.Request
singleReqLines := strings.Split(d.Input, "\n")
Expand All @@ -167,6 +169,7 @@ func TestConcurrencyManagerBasic(t *testing.T) {
Timestamp: ts,
// TODO(nvanbenschoten): test Priority
ReadConsistency: readConsistency,
WaitPolicy: waitPolicy,
Requests: reqUnions,
LatchSpans: latchSpans,
LockSpans: lockSpans,
Expand Down Expand Up @@ -563,14 +566,20 @@ func (c *cluster) PushTransaction(
switch pushType {
case roachpb.PUSH_TIMESTAMP:
pushed = h.Timestamp.Less(pusheeTxn.WriteTimestamp) || pusheeTxn.Status.IsFinalized()
case roachpb.PUSH_ABORT:
case roachpb.PUSH_ABORT, roachpb.PUSH_TOUCH:
pushed = pusheeTxn.Status.IsFinalized()
default:
return nil, roachpb.NewErrorf("unexpected push type: %s", pushType)
}
if pushed {
return pusheeTxn, nil
}
// If PUSH_TOUCH, return error instead of waiting.
if pushType == roachpb.PUSH_TOUCH {
log.Eventf(ctx, "pushee not abandoned")
err := roachpb.NewTransactionPushError(*pusheeTxn)
return nil, roachpb.NewError(err)
}
// Or the pusher aborted?
var pusherRecordSig chan struct{}
if pusherRecord != nil {
Expand Down
Loading

0 comments on commit 1682c40

Please sign in to comment.