Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
68026: kv: add LockTimeout option to BatchRequest r=nvanbenschoten a=nvanbenschoten

Informs cockroachdb#67513.

This commit introduces a new `lock_timeout` field on the BatchRequest Header struct.

lock_timeout specifies the maximum amount of time that the batch request will wait while attempting to acquire a lock on a key or while blocking on an existing lock in order to perform a non-locking read on a key. The time limit applies separately to each lock acquisition attempt. If the timeout elapses when waiting for a lock, a WriteIntentError will be returned.

Unlike in some other systems like PostgreSQL, where non-locking reads do not wait on conflicting locks, in CockroachDB, non-locking reads do wait for conflicting locks to be released. Because of this, the lock_timeout configuration applies to non-locking reads in read-write and read-only transactions as well.

Only the (default) Block wait policy will allow a request to wait on conflicting locks, so the timeout only makes sense in conjunction with the Block wait policy. The Error wait policy will throw an error immediately if a conflicting lock held by an active transaction is encountered, so this timeout can never be hit with an Error wait policy.

A value of zero disables the timeout.

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Aug 10, 2021
2 parents f8956ee + 77405a5 commit 847514d
Show file tree
Hide file tree
Showing 19 changed files with 1,836 additions and 876 deletions.
47 changes: 44 additions & 3 deletions pkg/kv/kvclient/kvcoord/testdata/savepoints
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ subtest end



subtest rollback_after_write_intent_error
subtest rollback_after_wait_policy_write_intent_error
# Write intent errors are white-listed to allow a rollback to savepoint afterwards.
# They make their way back up to the kv client when requests are run with an Error
# wait policy.
Expand All @@ -391,15 +391,56 @@ savepoint x

get conflict-key locking nowait
----
(*roachpb.WriteIntentError) conflicting intents on "conflict-key"
(*roachpb.WriteIntentError) conflicting intents on "conflict-key" [reason=wait_policy]

rollback x
----
0 <noignore>

put conflict-key b nowait
----
(*roachpb.WriteIntentError) conflicting intents on "conflict-key"
(*roachpb.WriteIntentError) conflicting intents on "conflict-key" [reason=wait_policy]

rollback x
----
1 [1-1]

subtest end



subtest rollback_after_lock_timeout_write_intent_error
# Write intent errors are white-listed to allow a rollback to savepoint afterwards.
# They make their way back up to the kv client when requests are run with a lock
# timeout.

# NB: we're going to leak this txn, so write to an otherwise unused key.
begin
----
0 <noignore>

put conflict-key-2 a
----

begin
----
0 <noignore>

savepoint x
----
0 <noignore>

get conflict-key-2 lock-timeout
----
(*roachpb.WriteIntentError) conflicting intents on "conflict-key-2" [reason=lock_timeout]

rollback x
----
0 <noignore>

put conflict-key-2 b lock-timeout
----
(*roachpb.WriteIntentError) conflicting intents on "conflict-key-2" [reason=lock_timeout]

rollback x
----
Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"strings"
"sync/atomic"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv"
Expand Down Expand Up @@ -128,6 +129,9 @@ func TestSavepoints(t *testing.T) {
if td.HasArg("nowait") {
b.Header.WaitPolicy = lock.WaitPolicy_Error
}
if td.HasArg("lock-timeout") {
b.Header.LockTimeout = 1 * time.Nanosecond
}
if err := txn.Run(ctx, b); err != nil {
fmt.Fprintf(&buf, "(%T) %v\n", err, err)
}
Expand Down Expand Up @@ -163,6 +167,9 @@ func TestSavepoints(t *testing.T) {
if td.HasArg("nowait") {
b.Header.WaitPolicy = lock.WaitPolicy_Error
}
if td.HasArg("lock-timeout") {
b.Header.LockTimeout = 1 * time.Nanosecond
}
if err := txn.Run(ctx, b); err != nil {
fmt.Fprintf(&buf, "(%T) %v\n", err, err)
} else {
Expand Down
25 changes: 24 additions & 1 deletion pkg/kv/kvclient/kvcoord/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,9 @@ func TestTxnWaitPolicies(t *testing.T) {
// Priority does not matter.
err := <-errorC
require.NotNil(t, err)
require.IsType(t, &roachpb.WriteIntentError{}, err)
wiErr := new(roachpb.WriteIntentError)
require.True(t, errors.As(err, &wiErr))
require.Equal(t, roachpb.WriteIntentError_REASON_WAIT_POLICY, wiErr.Reason)

// Let blocked requests proceed.
require.NoError(t, txn.Commit(ctx))
Expand All @@ -813,6 +815,27 @@ func TestTxnWaitPolicies(t *testing.T) {
})
}

func TestTxnLockTimeout(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
s := createTestDB(t)
defer s.Stop()

key := []byte("b")
txn := s.DB.NewTxn(ctx, "test txn")
require.NoError(t, txn.Put(ctx, key, "new value"))

var b kv.Batch
b.Header.LockTimeout = 25 * time.Millisecond
b.Get(key)
err := s.DB.Run(ctx, &b)
require.NotNil(t, err)
wiErr := new(roachpb.WriteIntentError)
require.True(t, errors.As(err, &wiErr))
require.Equal(t, roachpb.WriteIntentError_REASON_LOCK_TIMEOUT, wiErr.Reason)
}

// TestTxnReturnsWriteTooOldErrorOnConflictingDeleteRange tests that if two
// transactions issue delete range operations over the same keys, the later
// transaction eagerly returns a WriteTooOld error instead of deferring the
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/concurrency/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ go_test(
"//pkg/settings/cluster",
"//pkg/storage/enginepb",
"//pkg/testutils",
"//pkg/testutils/skip",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/concurrency/concurrency_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package concurrency

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
Expand Down Expand Up @@ -371,6 +372,11 @@ type Request struct {
// transactions.
WaitPolicy lock.WaitPolicy

// The maximum amount of time that the batch request will wait while
// attempting to acquire a lock on a key or while blocking on an
// existing lock in order to perform a non-locking read on a key.
LockTimeout time.Duration

// The maximum length of a lock wait-queue that the request is willing
// to enter and wait in. Used to provide a release valve and ensure some
// level of quality-of-service under severe per-key contention. If set
Expand Down
12 changes: 11 additions & 1 deletion pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import (
// The input files use the following DSL:
//
// new-txn name=<txn-name> ts=<int>[,<int>] epoch=<int> [uncertainty-limit=<int>[,<int>]]
// new-request name=<req-name> txn=<txn-name>|none ts=<int>[,<int>] [priority] [inconsistent] [wait-policy=<policy>] [max-lock-wait-queue-length=<int>]
// new-request name=<req-name> txn=<txn-name>|none ts=<int>[,<int>] [priority] [inconsistent] [wait-policy=<policy>] [lock-timeout] [max-lock-wait-queue-length=<int>]
// <proto-name> [<field-name>=<field-value>...] (hint: see scanSingleRequest)
// sequence req=<req-name> [eval-kind=<pess|opt|pess-after-opt]
// finish req=<req-name>
Expand Down Expand Up @@ -154,6 +154,15 @@ func TestConcurrencyManagerBasic(t *testing.T) {

waitPolicy := scanWaitPolicy(t, d, false /* required */)

var lockTimeout time.Duration
if d.HasArg("lock-timeout") {
// A lock timeout of 1ns will be considered immediately expired
// without a delay by the lockTableWaiter, ensuring that the lock
// timeout logic deterministically fires.
// See (*lockTableWaiterImpl).timeUntilDeadline.
lockTimeout = 1 * time.Nanosecond
}

var maxLockWaitQueueLength int
if d.HasArg("max-lock-wait-queue-length") {
d.ScanArgs(t, "max-lock-wait-queue-length", &maxLockWaitQueueLength)
Expand All @@ -169,6 +178,7 @@ func TestConcurrencyManagerBasic(t *testing.T) {
// TODO(nvanbenschoten): test Priority
ReadConsistency: readConsistency,
WaitPolicy: waitPolicy,
LockTimeout: lockTimeout,
MaxLockWaitQueueLength: maxLockWaitQueueLength,
Requests: reqUnions,
LatchSpans: latchSpans,
Expand Down
Loading

0 comments on commit 847514d

Please sign in to comment.