Skip to content

Commit

Permalink
kv: support STAGING transactions in kv protocol and client
Browse files Browse the repository at this point in the history
This PR polishes off the protocol and client level changes from #35165
to allow transactions to be moved to the STAGING state during parallel
commits. To do so, the change makes two major contributions.

First, it adds support to EndTransaction request and response for moving
a transaction record to the STAGING status in addition to the ABORTED
and COMMITTED status. This is done in a way that interacts elegantly
with a transaction's in-flight writes and allows the request to move a
transaction record directly to the COMMITTED status when all of the
in-flight writes are on the same range as the transaction record.

The second contribution is that the change adds full support to the
txnCommitter interceptor to issue parallel commits when possible and
then to launch the asynchronous portion of marking a transaction as
explicitly committed if it determines that a transaction is implicitly
committed. This is likely the area of the change where reviewers will
want to look first, as it incorporates the bulk of the parallel commits
algorithm and also serves as its primary documentation.

The final remaining piece of work after this change is to adjust
DistSender to actually send EndTransaction requests in parallel
with writes in the same batch and with QueryIntents for previous
writes. This change makes it safe to do so, but doesn't actually
add any logic to DistSender. There are a few complications to that
piece and it's also the least interesting part of this change,
which is why I have saved it for last.

Release note: None
  • Loading branch information
nvanbenschoten committed May 15, 2019

Verified

This commit was signed with the committer’s verified signature.
tombruijn Tom de Bruijn
1 parent 054b4bc commit dbed085
Showing 29 changed files with 2,992 additions and 1,325 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
@@ -53,6 +53,7 @@
<tr><td><code>kv.snapshot_recovery.max_rate</code></td><td>byte size</td><td><code>8.0 MiB</code></td><td>the rate limit (bytes/sec) to use for recovery snapshots</td></tr>
<tr><td><code>kv.transaction.max_intents_bytes</code></td><td>integer</td><td><code>262144</code></td><td>maximum number of bytes used to track write intents in transactions</td></tr>
<tr><td><code>kv.transaction.max_refresh_spans_bytes</code></td><td>integer</td><td><code>256000</code></td><td>maximum number of bytes used to track refresh spans in serializable transactions</td></tr>
<tr><td><code>kv.transaction.parallel_commits_enabled</code></td><td>boolean</td><td><code>true</code></td><td>if enabled, transactional commits will be parallelized with transactional writes</td></tr>
<tr><td><code>kv.transaction.write_pipelining_enabled</code></td><td>boolean</td><td><code>true</code></td><td>if enabled, transactional writes are pipelined through Raft consensus</td></tr>
<tr><td><code>kv.transaction.write_pipelining_max_batch_size</code></td><td>integer</td><td><code>128</code></td><td>if non-zero, defines that maximum size batch that will be pipelined through Raft consensus</td></tr>
<tr><td><code>kv.transaction.write_pipelining_max_outstanding_size</code></td><td>byte size</td><td><code>256 KiB</code></td><td>maximum number of bytes used to track in-flight pipelined writes before disabling pipelining</td></tr>
12 changes: 8 additions & 4 deletions pkg/kv/dist_sender_server_test.go
Original file line number Diff line number Diff line change
@@ -1816,14 +1816,13 @@ func TestAsyncAbortPoisons(t *testing.T) {
// Add a testing request filter which pauses a get request for the
// key until after the signal channel is closed.
var storeKnobs storage.StoreTestingKnobs
keyA := roachpb.Key("a")
var expectPoison int64
keyA, keyB := roachpb.Key("a"), roachpb.Key("b")
commitCh := make(chan error, 1)
storeKnobs.TestingRequestFilter = func(ba roachpb.BatchRequest) *roachpb.Error {
for _, req := range ba.Requests {
switch r := req.GetInner().(type) {
case *roachpb.EndTransactionRequest:
if r.Key.Equal(keyA) && atomic.LoadInt64(&expectPoison) == 1 {
if r.Key.Equal(keyA) {
if r.Poison {
close(commitCh)
} else {
@@ -1855,12 +1854,17 @@ func TestAsyncAbortPoisons(t *testing.T) {
if err := txn.SetUserPriority(roachpb.MaxUserPriority); err != nil {
return err
}
// Write to keyB first to locate this txn's record on a different key
// than the initial txn's record. This allows the request filter to
// trivially ignore this transaction.
if err := txn.Put(ctx, keyB, []byte("value2")); err != nil {
return err
}
return txn.Put(ctx, keyA, []byte("value2"))
}); err != nil {
t.Fatal(err)
}

atomic.StoreInt64(&expectPoison, 1)
expErr := regexp.QuoteMeta("TransactionAbortedError(ABORT_REASON_ABORT_SPAN)")
if _, err := txn.Get(ctx, keyA); !testutils.IsError(err, expErr) {
t.Fatalf("expected %s, got: %v", expErr, err)
5 changes: 5 additions & 0 deletions pkg/kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
@@ -493,6 +493,11 @@ func (tcf *TxnCoordSenderFactory) TransactionalSender(
tcs.stopper,
tcs.cleanupTxnLocked,
)
tcs.interceptorAlloc.txnCommitter = txnCommitter{
st: tcf.st,
stopper: tcs.stopper,
mu: &tcs.mu.Mutex,
}
tcs.interceptorAlloc.txnMetricRecorder = txnMetricRecorder{
metrics: &tcs.metrics,
clock: tcs.clock,
71 changes: 54 additions & 17 deletions pkg/kv/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
@@ -232,14 +232,18 @@ func TestTxnCoordSenderCondenseIntentSpans(t *testing.T) {
var sendFn simpleSendFn = func(
_ context.Context, _ SendOptions, _ ReplicaSlice, args roachpb.BatchRequest,
) (*roachpb.BatchResponse, error) {
resp := args.CreateReply()
resp.Txn = args.Txn
if req, ok := args.GetArg(roachpb.EndTransaction); ok {
if !req.(*roachpb.EndTransactionRequest).Commit {
t.Errorf("expected commit to be true")
}
et := req.(*roachpb.EndTransactionRequest)
if a, e := et.IntentSpans, expIntents; !reflect.DeepEqual(a, e) {
t.Errorf("expected end transaction to have intents %+v; got %+v", e, a)
}
resp.Txn.Status = roachpb.COMMITTED
}
resp := args.CreateReply()
resp.Txn = args.Txn
return resp, nil
}
ambient := log.AmbientContext{Tracer: tracing.NewTracer()}
@@ -917,14 +921,15 @@ func TestTxnCoordSenderNoDuplicateIntents(t *testing.T) {

var senderFn client.SenderFunc = func(_ context.Context, ba roachpb.BatchRequest) (
*roachpb.BatchResponse, *roachpb.Error) {
br := ba.CreateReply()
br.Txn = ba.Txn.Clone()
if rArgs, ok := ba.GetArg(roachpb.EndTransaction); ok {
et := rArgs.(*roachpb.EndTransactionRequest)
if !reflect.DeepEqual(et.IntentSpans, expectedIntents) {
t.Errorf("Invalid intents: %+v; expected %+v", et.IntentSpans, expectedIntents)
}
br.Txn.Status = roachpb.COMMITTED
}
br := ba.CreateReply()
br.Txn = ba.Txn.Clone()
return br, nil
}
ambient := log.AmbientContext{Tracer: tracing.NewTracer()}
@@ -1279,7 +1284,6 @@ func TestAbortTransactionOnCommitErrors(t *testing.T) {
union := &br.Responses[0] // avoid operating on copy
union.MustSetInner(&roachpb.PutResponse{})
if ba.Txn != nil && br.Txn == nil {
br.Txn = ba.Txn.Clone()
br.Txn.Status = roachpb.PENDING
}
} else if et, hasET := ba.GetArg(roachpb.EndTransaction); hasET {
@@ -1578,14 +1582,20 @@ func TestCommitMutatingTransaction(t *testing.T) {

var calls []roachpb.Method
sender.match(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
br := ba.CreateReply()
br.Txn = ba.Txn.Clone()

calls = append(calls, ba.Methods()...)
if !bytes.Equal(ba.Txn.Key, roachpb.Key("a")) {
t.Errorf("expected transaction key to be \"a\"; got %s", ba.Txn.Key)
}
if et, ok := ba.GetArg(roachpb.EndTransaction); ok && !et.(*roachpb.EndTransactionRequest).Commit {
t.Errorf("expected commit to be true")
if et, ok := ba.GetArg(roachpb.EndTransaction); ok {
if !et.(*roachpb.EndTransactionRequest).Commit {
t.Errorf("expected commit to be true")
}
br.Txn.Status = roachpb.COMMITTED
}
return nil, nil
return br, nil
})

factory := NewTxnCoordSenderFactory(
@@ -1668,14 +1678,20 @@ func TestTxnInsertBeginTransaction(t *testing.T) {

var calls []roachpb.Method
sender.match(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
br := ba.CreateReply()
br.Txn = ba.Txn.Clone()

calls = append(calls, ba.Methods()...)
if bt, ok := ba.GetArg(roachpb.BeginTransaction); ok && !bt.Header().Key.Equal(roachpb.Key("a")) {
t.Errorf("expected begin transaction key to be \"a\"; got %s", bt.Header().Key)
}
if et, ok := ba.GetArg(roachpb.EndTransaction); ok && !et.(*roachpb.EndTransactionRequest).Commit {
t.Errorf("expected commit to be true")
if et, ok := ba.GetArg(roachpb.EndTransaction); ok {
if !et.(*roachpb.EndTransactionRequest).Commit {
t.Errorf("expected commit to be true")
}
br.Txn.Status = roachpb.COMMITTED
}
return nil, nil
return br, nil
})

v := cluster.VersionByKey(cluster.Version2_1)
@@ -1822,13 +1838,19 @@ func TestEndWriteRestartReadOnlyTransaction(t *testing.T) {

var calls []roachpb.Method
sender.match(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
br := ba.CreateReply()
br.Txn = ba.Txn.Clone()

calls = append(calls, ba.Methods()...)
if _, ok := ba.GetArg(roachpb.Put); ok {
return nil, roachpb.NewErrorWithTxn(
roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE, "test err"),
ba.Txn)
}
return nil, nil
if _, ok := ba.GetArg(roachpb.EndTransaction); ok {
br.Txn.Status = roachpb.COMMITTED
}
return br, nil
})

factory := NewTxnCoordSenderFactory(
@@ -1885,11 +1907,13 @@ func TestTransactionKeyNotChangedInRestart(t *testing.T) {
keys := []string{"first", "second"}
attempt := 0
sender.match(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
br := ba.CreateReply()
br.Txn = ba.Txn.Clone()

// Ignore the final EndTxnRequest.
if _, ok := ba.GetArg(roachpb.EndTransaction); ok {
br := ba.CreateReply()
br.Txn = ba.Txn.Clone()
return nil, nil
br.Txn.Status = roachpb.COMMITTED
return br, nil
}

// Both attempts should have a PutRequest.
@@ -1908,7 +1932,7 @@ func TestTransactionKeyNotChangedInRestart(t *testing.T) {
roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE, "test err"),
ba.Txn)
}
return nil, nil
return br, nil
})
factory := NewTxnCoordSenderFactory(
TxnCoordSenderFactoryConfig{
@@ -2005,7 +2029,13 @@ func TestConcurrentTxnRequests(t *testing.T) {
callCounts[m]++
}
callCountsMu.Unlock()
return nil, nil

br := ba.CreateReply()
br.Txn = ba.Txn.Clone()
if _, ok := ba.GetArg(roachpb.EndTransaction); ok {
br.Txn.Status = roachpb.COMMITTED
}
return br, nil
})
factory := NewTxnCoordSenderFactory(
TxnCoordSenderFactoryConfig{
@@ -2186,6 +2216,10 @@ func TestTxnCoordSenderPipelining(t *testing.T) {
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
calls = append(calls, ba.Methods()...)
if et, ok := ba.GetArg(roachpb.EndTransaction); ok {
// Ensure that no transactions enter a STAGING state.
et.(*roachpb.EndTransactionRequest).InFlightWrites = nil
}
return distSender.Send(ctx, ba)
}

@@ -2259,6 +2293,9 @@ func TestAnchorKey(t *testing.T) {
}
br := ba.CreateReply()
br.Txn = ba.Txn.Clone()
if _, ok := ba.GetArg(roachpb.EndTransaction); ok {
br.Txn.Status = roachpb.COMMITTED
}
return br, nil
}

18 changes: 15 additions & 3 deletions pkg/kv/txn_correctness_test.go
Original file line number Diff line number Diff line change
@@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/localtestcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
@@ -649,7 +650,7 @@ func (hv *historyVerifier) runHistory(
}
// Execute pre-history if applicable.
if hv.preHistoryCmds != nil {
if str, _, err := hv.runCmds(hv.preHistoryCmds, db, t); err != nil {
if str, _, err := hv.runCmds("pre-history", hv.preHistoryCmds, db, t); err != nil {
t.Errorf("failed on execution of pre history %s: %s", str, err)
return err
}
@@ -711,7 +712,7 @@ func (hv *historyVerifier) runHistory(
actualStr := strings.Join(hv.mu.actual, " ")

// Verify history.
verifyStr, verifyEnv, err := hv.runCmds(hv.verifyCmds, db, t)
verifyStr, verifyEnv, err := hv.runCmds("verify", hv.verifyCmds, db, t)
if err != nil {
t.Errorf("failed on execution of verification history %s: %s", verifyStr, err)
return err
@@ -730,11 +731,12 @@ func (hv *historyVerifier) runHistory(
}

func (hv *historyVerifier) runCmds(
cmds []*cmd, db *client.DB, t *testing.T,
txnName string, cmds []*cmd, db *client.DB, t *testing.T,
) (string, map[string]int64, error) {
var strs []string
env := map[string]int64{}
err := db.Txn(context.TODO(), func(ctx context.Context, txn *client.Txn) error {
txn.SetDebugName(txnName)
for _, c := range cmds {
c.historyIdx = hv.idx
c.env = env
@@ -821,6 +823,16 @@ func checkConcurrency(name string, txns []string, verify *verifier, t *testing.T
s := &localtestcluster.LocalTestCluster{
StoreTestingKnobs: &storage.StoreTestingKnobs{
DontRetryPushTxnFailures: true,
// Immediately attempt to recover pushed transactions with STAGING
// statuses, even if the push would otherwise fail because the
// pushee has not yet expired. This prevents low-priority pushes from
// occasionally throwing retry errors due to DontRetryPushTxnFailures
// after the pushee's commit has already returned successfully. This
// is a result of the asynchronous nature of making transaction commits
// explicit after a parallel commit.
EvalKnobs: storagebase.BatchEvalTestingKnobs{
RecoverIndeterminateCommitsOnFailedPushes: true,
},
},
}
s.Start(t, testutils.NewNodeTestBaseContext(), InitFactoryForLocalTestCluster)
Loading

0 comments on commit dbed085

Please sign in to comment.