From bd608fc107ca5924c51c57d6677a82feb1097b8f Mon Sep 17 00:00:00 2001 From: Alex Sarkesian Date: Wed, 26 Jul 2023 02:29:52 -0400 Subject: [PATCH] kv: enable replay protection for ambiguous writes on commits While previously, RPC failures were assumed to be retryable, as write operations (with the notable exception of `EndTxn`) were assumed to be idempotent, it has been seen in #67765 and documented in #103817 that RPC failures on write operations that occur in parallel with a commit (i.e. a partial batch where `withCommit==true`), it is not always possible to assume idempotency and retry the "ambiguous" writes. This is due to the fact that the retried write RPC could result in the transaction's `WriteTimestamp` being bumped, changing the commit timestamp of the transaction that may in fact already be implicitly committed if the initial "ambiguous" write actually succeeded. This change modifies the protocol of the DistSender to flag in subsequent retries that a batch with a commit has previously experienced ambiguity, as well as the handling of the retried write in the MVCC layer to detect this previous ambiguity and reject retries that change the write timestamp as a non-idempotent replay. The flag allows subsequent retries to "remember" the earlier ambiguous write and evaluate accordingly. The flag allows us to properly handle RPC failures (i.e. ambiguous writes) that occur on commit, as a transaction that is implicitly committed is eligible to be marked as explicitly committed by contending transactions via the `RecoverTxn` operation, resulting in a race between retries by the transaction coordinator and recovery by contending transactions that could result in either incorrectly reporting a transaction as having failed with a `RETRY_SERIALIZABLE` error (despite the possibility that it already was or could be recovered and successfully committed), or in attempting to explicitly commit an already-recovered and committed transaction, resulting in seeing an assertion failure due to `transaction unexpectedly committed`. The replay protection introduced here allows us to avoid both of these situations by detecting a replay that should be considered non-idempotent and returning an error, causing the original RPC error remembered by the DistSender to be propagated as an `AmbiguousResultError`. As such, this can be handled by application code by validating the success/failure of a transaction when receiving this error. Depends on #107680, #107323, #108154, #108001 Fixes: #103817 Release note (bug fix): Properly handles RPC failures on writes using the parallel commit protocol that execute in parallel to the commit operation, avoiding incorrect retryable failures and `transaction unexpectedly committed` assertions by detecting when writes cannot be retried idempotently, instead returning an `AmbiguousResultError`. --- pkg/kv/kvclient/kvcoord/dist_sender.go | 44 +- .../kvcoord/dist_sender_ambiguous_test.go | 1001 ++++++++++++++++- pkg/kv/kvpb/api.proto | 11 +- pkg/kv/kvpb/batch.go | 3 + pkg/kv/kvpb/string_test.go | 3 +- .../kvserver/batcheval/cmd_conditional_put.go | 7 +- pkg/kv/kvserver/batcheval/cmd_delete.go | 7 +- pkg/kv/kvserver/batcheval/cmd_delete_range.go | 11 +- pkg/kv/kvserver/batcheval/cmd_increment.go | 7 +- pkg/kv/kvserver/batcheval/cmd_init_put.go | 7 +- pkg/kv/kvserver/batcheval/cmd_put.go | 7 +- pkg/storage/mvcc.go | 28 +- pkg/storage/mvcc_history_test.go | 59 +- .../testdata/mvcc_histories/ambiguous_writes | 394 +++++++ 14 files changed, 1484 insertions(+), 105 deletions(-) create mode 100644 pkg/storage/testdata/mvcc_histories/ambiguous_writes diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index b6203424d82c..3c4838d55ded 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -1268,7 +1268,7 @@ func (ds *DistSender) detectIntentMissingDueToIntentResolution( // We weren't able to determine whether the intent missing error is // due to intent resolution or not, so it is still ambiguous whether // the commit succeeded. - return false, kvpb.NewAmbiguousResultErrorf("error=%s [intent missing]", pErr) + return false, kvpb.NewAmbiguousResultErrorf("error=%v [intent missing]", pErr) } resp := br.Responses[0].GetQueryTxn() respTxn := &resp.QueriedTxn @@ -2082,7 +2082,8 @@ func maybeSetResumeSpan( // the error that the last attempt to execute the request returned. func noMoreReplicasErr(ambiguousErr, lastAttemptErr error) error { if ambiguousErr != nil { - return kvpb.NewAmbiguousResultErrorf("error=%s [exhausted]", ambiguousErr) + return kvpb.NewAmbiguousResultErrorf("error=%v [exhausted] (last error: %v)", + ambiguousErr, lastAttemptErr) } // TODO(bdarnell): The error from the last attempt is not necessarily the best @@ -2260,6 +2261,23 @@ func (ds *DistSender) sendToReplicas( ba = ba.ShallowCopy() ba.Replica = curReplica ba.RangeID = desc.RangeID + + // When a sub-batch from a batch containing a commit experiences an + // ambiguous error, it is critical to ensure subsequent replay attempts + // do not permit changing the write timestamp, as the transaction may + // already have been considered implicitly committed. + ba.AmbiguousReplayProtection = ambiguousError != nil + + // In the case that the batch has already seen an ambiguous error, in + // addition to enabling ambiguous replay protection, we also need to + // disable the ability for the server to forward the read timestamp. This + // protects against moving the timestamp when we are retrying a successful + // write operation (after an ambiguous error) for which the intent has + // already been cleaned up. + if ambiguousError != nil && ba.CanForwardReadTimestamp { + ba.CanForwardReadTimestamp = false + } + // Communicate to the server the information our cache has about the // range. If it's stale, the server will return an update. ba.ClientRangeInfo = roachpb.ClientRangeInfo{ @@ -2294,10 +2312,13 @@ func (ds *DistSender) sendToReplicas( ds.maybeIncrementErrCounters(br, err) if err != nil { + log.VErrEventf(ctx, 2, "RPC error: %s", err) + if grpcutil.IsAuthError(err) { // Authentication or authorization error. Propagate. if ambiguousError != nil { - return nil, kvpb.NewAmbiguousResultErrorf("error=%s [propagate]", ambiguousError) + return nil, kvpb.NewAmbiguousResultErrorf("error=%v [propagate] (last error: %v)", + ambiguousError, err) } return nil, err } @@ -2319,10 +2340,6 @@ func (ds *DistSender) sendToReplicas( // ambiguity. // 2) SQL recognizes AmbiguousResultErrors and gives them a special code // (StatementCompletionUnknown). - // TODO(andrei): The use of this code is inconsistent because a) the - // DistSender tries to only return the code for commits, but it'll happily - // forward along AmbiguousResultErrors coming from the replica and b) we - // probably should be returning that code for non-commit statements too. // // We retry requests in order to avoid returning errors (in particular, // AmbiguousResultError). Retrying the batch will either: @@ -2337,6 +2354,12 @@ func (ds *DistSender) sendToReplicas( // can't claim success (and even if we could claim success, we still // wouldn't have the complete result of the successful evaluation). // + // Note that in case c), a request is not idempotent if the retry finds + // the request succeeded the first time around, but requires a change to + // the transaction's write timestamp. This is guarded against by setting + // the AmbiguousReplayProtection flag, so that the replay is aware the + // batch has seen an ambiguous error. + // // Case a) is great - the retry made the request succeed. Case b) is also // good; due to idempotency we managed to swallow a communication error. // Case c) is not great - we'll end up returning an error even though the @@ -2349,10 +2372,12 @@ func (ds *DistSender) sendToReplicas( // evaluating twice, overwriting another unrelated write that fell // in-between. // + // NB: If this partial batch does not contain the EndTxn request but the + // batch contains a commit, the ambiguous error should be caught on + // retrying the writes, should it need to be propagated. if withCommit && !grpcutil.RequestDidNotStart(err) { ambiguousError = err } - log.VErrEventf(ctx, 2, "RPC error: %s", err) // If the error wasn't just a context cancellation and the down replica // is cached as the lease holder, evict it. The only other eviction @@ -2508,7 +2533,8 @@ func (ds *DistSender) sendToReplicas( } default: if ambiguousError != nil { - return nil, kvpb.NewAmbiguousResultErrorf("error=%s [propagate]", ambiguousError) + return nil, kvpb.NewAmbiguousResultErrorf("error=%v [propagate] (last error: %v)", + ambiguousError, br.Error.GoError()) } // The error received is likely not specific to this diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go index 799bbc5fd92c..2841d4e6fd67 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go @@ -238,18 +238,33 @@ func (tMu *interceptorHelperMutex) configureSubTest(t *testing.T) (restore func( return restore } +// validateTxnCommitAmbiguousError checks that an error on txn commit is +// ambiguous, rather than an assertion failure or a retryable error. +func validateTxnCommitAmbiguousError(t *testing.T, err error, reason string) { + aErr := (*kvpb.AmbiguousResultError)(nil) + rErr := (*kvpb.TransactionRetryWithProtoRefreshError)(nil) + tErr := (*kvpb.TransactionStatusError)(nil) + require.Errorf(t, err, "expected an AmbiguousResultError") + require.ErrorAsf(t, err, &aErr, + "expected AmbiguousResultError due to %s", reason) + require.Falsef(t, errors.As(err, &tErr), + "did not expect TransactionStatusError due to being already committed") + require.Falsef(t, errors.As(err, &rErr), + "did not expect incorrect TransactionRetryWithProtoRefreshError due to failed refresh") + require.Falsef(t, errors.HasAssertionFailure(err), + "expected no AssertionFailedError due to sanity check on transaction already committed") + require.ErrorContainsf(t, aErr, reason, + "expected AmbiguousResultError to include message \"%s\"", reason) +} + // TestTransactionUnexpectedlyCommitted validates the handling of the case where // a parallel commit transaction with an ambiguous error on a write races with // a contending transaction's recovery attempt. In the case that the recovery // succeeds prior to the original transaction's retries, an ambiguous error // should be raised. // -// NB: This case encounters a known issue described in #103817 and seen in -// #67765, where it currently is surfaced as an assertion failure that will -// result in a node crash. -// -// TODO(sarkesian): Validate the ambiguous result error once the initial fix as -// outlined in #103817 has been resolved. +// NB: These tests deal with a known issue described in #103817 and seen in +// #67765. func TestTransactionUnexpectedlyCommitted(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -382,7 +397,7 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { }) } - getInBatch := func(ctx context.Context, txn *kv.Txn, keys ...roachpb.Key) []int64 { + getInBatch := func(t *testing.T, ctx context.Context, txn *kv.Txn, keys ...roachpb.Key) []int64 { batch := txn.NewBatch() for _, key := range keys { batch.GetForUpdate(key) @@ -440,8 +455,8 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { // Filtering and logging annotations for the request interceptor. tMu.Lock() tMu.filter = func(req *interceptedReq) bool { - // Log all requests on txn1 or txn2, except for heartbeats. - if (req.txnName == "txn1" || req.txnName == "txn2") && !req.ba.IsSingleHeartbeatTxnRequest() { + // Log all requests on txn1/txn2/txn3, except for heartbeats. + if (req.txnName == "txn1" || req.txnName == "txn2" || req.txnName == "txn3") && !req.ba.IsSingleHeartbeatTxnRequest() { return true } @@ -477,10 +492,12 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { WHERE id IN ($1, $2)` const xferAmount = 10 + type opFn func(t *testing.T, name string) error + execWorkloadTxn := func(t *testing.T, name string) error { tCtx := context.Background() txn := db.NewTxn(tCtx, name) - vals := getInBatch(tCtx, txn, keyA, keyB) + vals := getInBatch(t, tCtx, txn, keyA, keyB) batch := txn.NewBatch() batch.Put(keyA, vals[0]-xferAmount) @@ -488,6 +505,14 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { return txn.CommitInBatch(tCtx, batch) } + execLeaseMover := func(t *testing.T, name string) error { + desc, err := tc.LookupRange(keyB) + assert.NoError(t, err) + t.Logf("Transferring r%d lease to n%d", desc.RangeID, tc.Target(0).NodeID) + assert.NoError(t, tc.TransferRangeLease(desc, tc.Target(0))) + return nil + } + waitUntilReady := func(t *testing.T, name string, readyCh chan struct{}) (finishedWithoutTimeout bool) { select { case <-readyCh: @@ -499,7 +524,7 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { return true } - runTxn := func(t *testing.T, name string, wg *sync.WaitGroup, readyCh, doneCh chan struct{}, resultCh chan error) { + runConcurrentOp := func(t *testing.T, name string, execOp opFn, wg *sync.WaitGroup, readyCh, doneCh chan struct{}, resultCh chan error) { defer wg.Done() if doneCh != nil { defer close(doneCh) @@ -507,7 +532,7 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { if !waitUntilReady(t, name, readyCh) { return } - err := execWorkloadTxn(t, name) + err := execOp(t, name) if resultCh != nil { resultCh <- err } else { @@ -515,21 +540,8 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { } } - runLeaseMover := func(t *testing.T, wg *sync.WaitGroup, readyCh chan struct{}, doneCh chan struct{}) { - defer wg.Done() - if !waitUntilReady(t, "lease mover", readyCh) { - return - } - - desc, err := tc.LookupRange(keyB) - assert.NoError(t, err) - t.Logf("Transferring r%d lease to n%d", desc.RangeID, tc.Target(0).NodeID) - assert.NoError(t, tc.TransferRangeLease(desc, tc.Target(0))) - - close(doneCh) - } - - t.Run("workload conflict", func(t *testing.T) { + // The "basic" test case, allowing for multiple variants of txn2. + runBasicTestCase := func(t *testing.T, txn2Ops opFn, txn2PutsAfterTxn1 bool) { defer initSubTest(t)() // Checkpoints in test. @@ -547,9 +559,9 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { // Concurrent transactions. var wg sync.WaitGroup wg.Add(3) - go runTxn(t, "txn1", &wg, txn1Ready, txn1Done, txn1ResultCh) - go runTxn(t, "txn2", &wg, txn2Ready, nil /* doneCh */, nil /* resultCh */) - go runLeaseMover(t, &wg, leaseMoveReady, leaseMoveComplete) + go runConcurrentOp(t, "txn1", execWorkloadTxn, &wg, txn1Ready, txn1Done, txn1ResultCh) + go runConcurrentOp(t, "txn2", txn2Ops, &wg, txn2Ready, nil /* doneCh */, nil /* resultCh */) + go runConcurrentOp(t, "lease mover", execLeaseMover, &wg, leaseMoveReady, leaseMoveComplete, nil /* resultCh */) // KV Request sequencing. tMu.Lock() @@ -570,8 +582,8 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { close(txn2Ready) } - // 5. txn2->n1: Get(a) -- Discovers txn1's locks, issues push request. - // 6. txn2->n2: Get(b) + // 5. txn2->n1: Get(a) OR Put(a) -- Discovers txn1's locks, issues push request. + // 6. txn2->n2: Get(b) OR Put(b) // 7. _->n1: PushTxn(txn2->txn1) -- Discovers txn1 in STAGING and starts // recovery. // 8. _->n1: RecoverTxn(txn1) -- Before sending, pause the request so we @@ -594,12 +606,13 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) } - // -- NB: If ambiguous errors were propagated, txn1 would end here. - // -- NB: When ambiguous errors are not propagated, txn1 continues with: - // // 9. txn1->n1: Put(b) -- Retry on new leaseholder sees new lease start // timestamp, and attempts to evaluate it as an idempotent replay, but at // a higher timestamp, which breaks idempotency due to being on commit. + + // -- NB: With ambiguous replay protection, txn1 should end here. + // -- NB: Without ambiguous replay protection, txn1 would continue with: + // 10. txn1->n1: Refresh(a) // 11. txn1->n1: Refresh(b) // 12. txn1->n1: EndTxn(commit) -- Before sending, pause the request so @@ -629,6 +642,337 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { req.pauseUntil(t, recoverComplete, cp) } + // + if txn2PutsAfterTxn1 && req.txnName == "txn2" && hasPut && cp == BeforeSending { + // While txn2's Puts can only occur after txn1 is marked as explicitly + // committed, if the Recovery and the subsequent txn2 Put(b) operations + // happen before txn1 retries its Put(b) on n1, it will encounter txn2's + // intent and get a WriteTooOld error instead of potentially being an + // idempotent replay. + <-txn1Done + } + + return nil + } + tMu.Unlock() + + // Start test, await concurrent operations and validate results. + close(txn1Ready) + err := <-txn1ResultCh + t.Logf("txn1 completed with err: %v", err) + wg.Wait() + + // NB: While ideally we would hope to see a successful commit without + // error, without querying txn record/intents from the txn coordinator we + // expect an AmbiguousResultError in this case (as outlined in #103817). + validateTxnCommitAmbiguousError(t, err, "replay protection" /* reason */) + } + + // The set of test cases that use the same request scheduling. + basicVariants := []struct { + name string + txn2Ops opFn + txn2PutsAfterTxn1 bool + }{ + { + name: "writer reader conflict", + txn2Ops: func(t *testing.T, name string) error { + tCtx := context.Background() + + // txn2 just performs a simple Get on a conflicting key, causing + // it to issue a PushTxn for txn1 which will kick off recovery. + txn := db.NewTxn(tCtx, name) + _, err := txn.Get(tCtx, keyA) + assert.NoError(t, err) + assert.NoError(t, txn.Commit(ctx)) + return nil + }, + }, + { + name: "writer writer conflict", + txn2Ops: func(t *testing.T, name string) error { + tCtx := context.Background() + + // txn2 performs simple Puts on conflicting keys, causing it to issue a + // PushTxn for txn1 which will kick off recovery. + txn := db.NewTxn(tCtx, name) + batch := txn.NewBatch() + batch.Put(keyA, 0) + batch.Put(keyB, 0) + assert.NoError(t, txn.CommitInBatch(ctx, batch)) + t.Logf("txn2 finished here") + return nil + }, + }, + { + name: "workload conflict", + txn2Ops: execWorkloadTxn, + txn2PutsAfterTxn1: true, + }, + } + + for _, variant := range basicVariants { + t.Run(variant.name, func(t *testing.T) { + runBasicTestCase(t, variant.txn2Ops, variant.txn2PutsAfterTxn1) + }) + } + + // Test cases with custom request scheduling. + + // The txn coordinator shouldn't respond with an incorrect retryable failure + // based on a refresh as the transaction may have already been committed. + t.Run("recovery before refresh fails", func(t *testing.T) { + keyAPrime := roachpb.Key(encoding.EncodeBytesAscending(tablePrefix.Clone(), []byte("a'"))) + defer func() { + _, err := db.Del(ctx, keyAPrime) + require.NoError(t, err) + }() + + defer initSubTest(t)() + + // Checkpoints in test. + txn1Ready := make(chan struct{}) + txn2Ready := make(chan struct{}) + leaseMoveReady := make(chan struct{}) + leaseMoveComplete := make(chan struct{}) + txn1Done := make(chan struct{}) + + // Final result. + txn1ResultCh := make(chan error, 1) + + // Operation functions. + execTxn1 := func(t *testing.T, name string) error { + tCtx := context.Background() + txn := db.NewTxn(tCtx, name) + vals := getInBatch(t, tCtx, txn, keyA, keyB, keyAPrime) + + batch := txn.NewBatch() + batch.Put(keyA, vals[0]-xferAmount) + batch.Put(keyB, vals[1]+xferAmount) + return txn.CommitInBatch(tCtx, batch) + } + execTxn2 := func(t *testing.T, name string) error { + tCtx := context.Background() + txn := db.NewTxn(tCtx, name) + + // The intent from txn2 on a' will cause txn1's read refresh to fail. + assert.NoError(t, txn.Put(tCtx, keyAPrime, 100)) + _ = getInBatch(t, tCtx, txn, keyA) + assert.NoError(t, txn.Commit(tCtx)) + return nil + } + + // Concurrent transactions. + var wg sync.WaitGroup + wg.Add(3) + go runConcurrentOp(t, "txn1", execTxn1, &wg, txn1Ready, txn1Done, txn1ResultCh) + go runConcurrentOp(t, "txn2", execTxn2, &wg, txn2Ready, nil /* doneCh */, nil /* resultCh */) + go runConcurrentOp(t, "lease mover", execLeaseMover, &wg, leaseMoveReady, leaseMoveComplete, nil /* resultCh */) + + // KV Request sequencing. + tMu.Lock() + tMu.maybeWait = func(cp InterceptPoint, req *interceptedReq, resp *interceptedResp) (override error) { + _, hasPut := req.ba.GetArg(kvpb.Put) + + // These conditions are checked in order of expected operations of the + // test. + + // 1. txn1->n1: Get(a) + // 2. txn1->n2: Get(b) + // 3. txn1->n1: Put(a), EndTxn(parallel commit) -- Puts txn1 in STAGING. + // 4. txn1->n2: Put(b) -- Send the request, but pause before returning + // the response so we can inject network failure. + if req.txnName == "txn1" && hasPut && req.toNodeID == tc.Server(1).NodeID() && cp == AfterSending { + // Once we have seen the write on txn1 to n2 that we will fail, we can + // move the lease. + close(txn2Ready) + } + + // 5. txn2->n1: Put(a') + // 6. txn2->n1: Get(a) -- Discovers txn1's locks, issues push request. + // 7. _->n1: PushTxn(txn2->txn1) -- Discovers txn1 in STAGING and starts + // recovery. + // 8. _->n1: RecoverTxn(txn1) -- Allow to proceed and finish. Since txn1 + // is in STAGING and has all of its writes, it is implicitly committed, + // so the recovery will succeed in marking it explicitly committed. + if req.ba.IsSingleRecoverTxnRequest() && cp == AfterSending { + t.Logf("%s - complete, resp={%s}", req.prefix, resp) + close(leaseMoveReady) + } + + // + // n2: Put(b) to + // return with error> + if req.txnName == "txn1" && hasPut && req.toNodeID == tc.Server(1).NodeID() && cp == AfterSending { + // Hold the operation open until we are ready to retry on the new + // replica, after which we will return the injected failure. + req.pauseUntil(t, leaseMoveComplete, cp) + t.Logf("%s - injected RPC error", req.prefix) + return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) + } + + // 9. txn1->n1: Put(b) -- Retry on new leaseholder sees new lease start + // timestamp, and attempts to evaluate it as an idempotent replay, but at + // a higher timestamp, which breaks idempotency due to being on commit. + + // -- NB: With ambiguous replay protection, txn1 should end here. + // -- NB: Without ambiguous replay protection, txn1 would continue with: + + // 10. txn1->n1: Refresh(a) + // 11. txn1->n1: Refresh(b,c) -- This fails due to txn2's intent on c. + // Causes the transaction coordinator to return a retryable error, + // although the transaction has been actually committed during recovery; + // a highly problematic bug. + + // + // This way we can prevent txn1's intents from being resolved too early. + if _, ok := req.ba.GetArg(kvpb.PushTxn); ok && cp == AfterSending && + resp != nil && resp.err == nil { + req.pauseUntil(t, txn1Done, cp) + } + + return nil + } + tMu.Unlock() + + // Start test, await concurrent operations and validate results. + close(txn1Ready) + err := <-txn1ResultCh + t.Logf("txn1 completed with err: %v", err) + wg.Wait() + + // NB: While ideally we would hope to see a successful commit without + // error, without querying txn record/intents from the txn coordinator we + // expect an AmbiguousResultError in this case (as outlined in #103817). + validateTxnCommitAmbiguousError(t, err, "replay protection" /* reason */) + }) + + // The txn coordinator shouldn't respond with an incorrect retryable failure + // based on a refresh as the transaction may yet be recovered/committed. + t.Run("recovery after refresh fails", func(t *testing.T) { + keyC := roachpb.Key(encoding.EncodeBytesAscending(tablePrefix.Clone(), []byte("c"))) + defer func() { + _, err := db.Del(ctx, keyC) + require.NoError(t, err) + }() + + defer initSubTest(t)() + + // Checkpoints in test. + txn1Ready := make(chan struct{}) + otherTxnsReady := make(chan struct{}) + leaseMoveReady := make(chan struct{}) + leaseMoveComplete := make(chan struct{}) + recoverComplete := make(chan struct{}) + txn1Done := make(chan struct{}) + + // Final result. + txn1ResultCh := make(chan error, 1) + + execTxn1 := func(t *testing.T, name string) error { + tCtx := context.Background() + txn := db.NewTxn(tCtx, name) + vals := getInBatch(t, tCtx, txn, keyA, keyB, keyC) + + batch := txn.NewBatch() + batch.Put(keyA, vals[0]-xferAmount) + batch.Put(keyB, vals[1]+xferAmount) + return txn.CommitInBatch(tCtx, batch) + } + execOtherTxns := func(t *testing.T, name string) error { + tCtx := context.Background() + + // The intent from txn3 will cause txn1's read refresh to fail. + txn3 := db.NewTxn(tCtx, "txn3") + batch := txn3.NewBatch() + batch.Put(keyC, 100) + assert.NoError(t, txn3.CommitInBatch(tCtx, batch)) + + txn2 := db.NewTxn(tCtx, "txn2") + vals := getInBatch(t, tCtx, txn2, keyA, keyB) + + batch = txn2.NewBatch() + batch.Put(keyA, vals[0]-xferAmount) + batch.Put(keyB, vals[1]+xferAmount) + assert.NoError(t, txn2.CommitInBatch(tCtx, batch)) + return nil + } + + // Concurrent transactions. + var wg sync.WaitGroup + wg.Add(3) + go runConcurrentOp(t, "txn1", execTxn1, &wg, txn1Ready, txn1Done, txn1ResultCh) + go runConcurrentOp(t, "txn2/txn3", execOtherTxns, &wg, otherTxnsReady, nil /* doneCh */, nil /* resultCh */) + go runConcurrentOp(t, "lease mover", execLeaseMover, &wg, leaseMoveReady, leaseMoveComplete, nil /* resultCh */) + + // KV Request sequencing. + tMu.Lock() + tMu.maybeWait = func(cp InterceptPoint, req *interceptedReq, resp *interceptedResp) (override error) { + _, hasPut := req.ba.GetArg(kvpb.Put) + + // These conditions are checked in order of expected operations of the + // test. + + // 1. txn1->n1: Get(a) + // 2. txn1->n2: Get(b), Get(c) + // 3. txn1->n1: Put(a), EndTxn(parallel commit) -- Puts txn1 in STAGING. + // 4. txn1->n2: Put(b) -- Send the request, but pause before returning the + // response so we can inject network failure. + if req.txnName == "txn1" && hasPut && req.toNodeID == tc.Server(1).NodeID() && cp == AfterSending { + // Once we have seen the write on txn1 to n2 that we will fail, txn2/txn3 can start. + close(otherTxnsReady) + } + + // 5. txn3->n2: Put(c), EndTxn(parallel commit) -- Hits 1PC fast path. + // 6. txn2->n1: Get(a) -- Discovers txn1's locks, issues push request. + // 7. txn2->n2: Get(b) + // 8. _->n1: PushTxn(txn2->txn1) -- Discovers txn1 in STAGING and starts + // recovery. + // 9. _->n1: RecoverTxn(txn1) -- Before sending, pause the request so we + // can ensure it gets evaluated after txn1 retries (and refreshes), but + // before its final EndTxn. + if req.ba.IsSingleRecoverTxnRequest() && cp == BeforeSending { + // Once the RecoverTxn request is issued, as part of txn2's PushTxn + // request, the lease can be moved. + close(leaseMoveReady) + } + + // + // n2: Put(b) to + // return with error> + if req.txnName == "txn1" && hasPut && req.toNodeID == tc.Server(1).NodeID() && cp == AfterSending { + // Hold the operation open until we are ready to retry on the new + // replica, after which we will return the injected failure. + req.pauseUntil(t, leaseMoveComplete, cp) + t.Logf("%s - injected RPC error", req.prefix) + return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) + } + + // 10. txn1->n1: Put(b) -- Retry on new leaseholder sees new lease start + // timestamp, and attempts to evaluate it as an idempotent replay, but at + // a higher timestamp, which breaks idempotency due to being on commit. + + // -- NB: With ambiguous replay protection, txn1 should end here. + // -- NB: Without ambiguous replay protection, txn1 would continue with: + + // 11. txn1->n1: Refresh(a) + // 12. txn1->n1: Refresh(b,c) -- This fails due to txn3's write on c. + // Causes the transaction coordinator to return a retryable error, + // although the transaction could be actually committed during recovery; + // a highly problematic bug. + // -- because txn1 + // is in STAGING and has all of its writes, it is implicitly committed, + // so the recovery will succeed in marking it explicitly committed. + if req.ba.IsSingleRecoverTxnRequest() && cp == BeforeSending { + // The RecoverTxn operation should be evaluated after txn1 completes, + // in this case with a problematic retryable error. + req.pauseUntil(t, txn1Done, cp) + } + if req.ba.IsSingleRecoverTxnRequest() && cp == AfterSending { + t.Logf("%s - complete, resp={%s}", req.prefix, resp) + close(recoverComplete) + } + // if req.txnName == "txn2" && hasPut && cp == BeforeSending { // While txn2's Puts can only occur after txn1 is marked as explicitly @@ -646,20 +990,581 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { // Start test, await concurrent operations and validate results. close(txn1Ready) err := <-txn1ResultCh - t.Logf("txn1 completed with err: %+v", err) + t.Logf("txn1 completed with err: %v", err) + wg.Wait() + + // NB: While ideally we would hope to see a successful commit without + // error, without querying txn record/intents from the txn coordinator we + // expect an AmbiguousResultError in this case (as outlined in #103817). + validateTxnCommitAmbiguousError(t, err, "replay protection" /* reason */) + }) + + // This test is primarily included for completeness, in order to ensure the + // same behavior regardless of if the recovery occurs before or after the + // lease transfer. + t.Run("recovery after transfer lease", func(t *testing.T) { + defer initSubTest(t)() + + // Checkpoints in test. + txn1Ready := make(chan struct{}) + txn2Ready := make(chan struct{}) + leaseMoveReady := make(chan struct{}) + leaseMoveComplete := make(chan struct{}) + recoverComplete := make(chan struct{}) + txn1Done := make(chan struct{}) + + // Final result. + txn1ResultCh := make(chan error, 1) + + // Concurrent transactions. + var wg sync.WaitGroup + wg.Add(3) + go runConcurrentOp(t, "txn1", execWorkloadTxn, &wg, txn1Ready, txn1Done, txn1ResultCh) + go runConcurrentOp(t, "txn2", execWorkloadTxn, &wg, txn2Ready, nil /* doneCh */, nil /* resultCh */) + go runConcurrentOp(t, "lease mover", execLeaseMover, &wg, leaseMoveReady, leaseMoveComplete, nil /* resultCh */) + + // KV Request sequencing. + tMu.Lock() + tMu.maybeWait = func(cp InterceptPoint, req *interceptedReq, resp *interceptedResp) (override error) { + _, hasPut := req.ba.GetArg(kvpb.Put) + + // These conditions are checked in order of expected operations of the + // test. + + // 1. txn1->n1: Get(a) + // 2. txn1->n2: Get(b) + // 3. txn1->n1: Put(a), EndTxn(parallel commit) -- Puts txn1 in STAGING. + // 4. txn1->n2: Put(b) -- Send the request, but pause before returning the + // response so we can inject network failure. + // + if req.txnName == "txn1" && hasPut && req.toNodeID == tc.Server(1).NodeID() && cp == AfterSending { + close(leaseMoveReady) + req.pauseUntil(t, leaseMoveComplete, cp) + close(txn2Ready) + } + + // 5. txn2->n1: Get(a) -- Discovers txn1's locks, issues push request. + // 6. txn2->n1: Get(b) + // 7. _->n1: PushTxn(txn2->txn1) -- Discovers txn1 in STAGING and starts + // recovery. + // 8. _->n1: RecoverTxn(txn1) -- Recovery should mark txn1 committed, but + // pauses before returning so that txn1's intents don't get cleaned up. + if req.ba.IsSingleRecoverTxnRequest() && cp == AfterSending { + close(recoverComplete) + } + + // n2: Put(b) to + // return with error> + if req.txnName == "txn1" && hasPut && req.toNodeID == tc.Server(1).NodeID() && cp == AfterSending { + // Hold the operation open until we are ready to retry on the new + // replica, after which we will return the injected failure. + req.pauseUntil(t, recoverComplete, cp) + t.Logf("%s - injected RPC error", req.prefix) + return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) + } + + // 9. txn1->n1: Put(b) -- Retry on new leaseholder sees new lease start + // timestamp, and attempts to evaluate it as an idempotent replay, but at + // a higher timestamp, which breaks idempotency due to being on commit. + + // -- NB: With ambiguous replay protection, txn1 should end here. + // -- NB: Without ambiguous replay protection, txn1 would continue with: + + // 10. txn1->n1: Refresh(a) + // 11. txn1->n1: Refresh(b) + // 12. txn1->n1: EndTxn(commit) -- Recovery has already completed, so this + // request fails with "transaction unexpectedly committed". + + // + if req.ba.IsSingleRecoverTxnRequest() && cp == AfterSending { + req.pauseUntil(t, txn1Done, cp) + t.Logf("%s - complete, resp={%s}", req.prefix, resp) + } + + return nil + } + tMu.Unlock() + + // Start test, await concurrent operations and validate results. + close(txn1Ready) + err := <-txn1ResultCh + t.Logf("txn1 completed with err: %v", err) + wg.Wait() + + // NB: While ideally we would hope to see a successful commit without + // error, without querying txn record/intents from the txn coordinator we + // expect an AmbiguousResultError in this case (as outlined in #103817). + validateTxnCommitAmbiguousError(t, err, "replay protection" /* reason */) + }) + + // When a retried write happens after another txn's intent already exists on + // the key, we expect to see a WriteTooOld error propagate an ambiguous error. + t.Run("retry sees other intent", func(t *testing.T) { + defer initSubTest(t)() + + // Checkpoints in test. + txn1Ready := make(chan struct{}) + txn2Ready := make(chan struct{}) + leaseMoveReady := make(chan struct{}) + leaseMoveComplete := make(chan struct{}) + txn2ETReady := make(chan struct{}) + txn1Done := make(chan struct{}) + + // Final result. + txn1ResultCh := make(chan error, 1) + + // Concurrent transactions. + var wg sync.WaitGroup + wg.Add(3) + go runConcurrentOp(t, "txn1", execWorkloadTxn, &wg, txn1Ready, txn1Done, txn1ResultCh) + go runConcurrentOp(t, "txn2", execWorkloadTxn, &wg, txn2Ready, nil /* doneCh */, nil /* resultCh */) + go runConcurrentOp(t, "lease mover", execLeaseMover, &wg, leaseMoveReady, leaseMoveComplete, nil /* resultCh */) + + // KV Request sequencing. + tMu.Lock() + tMu.maybeWait = func(cp InterceptPoint, req *interceptedReq, resp *interceptedResp) (override error) { + _, hasPut := req.ba.GetArg(kvpb.Put) + + // These conditions are checked in order of expected operations of the test. + + // 1. txn1->n1: Get(a) + // 2. txn1->n2: Get(b) + // 3. txn1->n1: Put(a), EndTxn(parallel commit) -- Puts txn1 in STAGING. + // 4. txn1->n2: Put(b) -- Send the request, but pause before returning the + // response so we can inject network failure. + // + if req.txnName == "txn1" && hasPut && req.toNodeID == tc.Server(1).NodeID() && cp == AfterSending { + close(leaseMoveReady) + req.pauseUntil(t, leaseMoveComplete, cp) + close(txn2Ready) + } + + // 5. txn2->n1: Get(a) -- Discovers txn1's locks, issues push request. + // 6. txn2->n1: Get(b) + // 7. _->n1: PushTxn(txn2->txn1) -- Discovers txn1 in STAGING and starts + // recovery. + // 8. _->n1: RecoverTxn(txn1) -- Recovery should mark txn1 committed. + // 9. txn2->n1: Put(a), EndTxn(parallel commit) -- Writes intent. + // 10. txn2->n1: Put(b) + // 11. txn2->n1: EndTxn(commit) -- Happens asynchronously, so txn2's + // coordinator has already returned success, but we want recovery of txn1 + // and txn1's retry to complete before txn2's intents have been resolved. + if req.txnName == "txn2" && req.ba.IsSingleEndTxnRequest() && cp == BeforeSending { + close(txn2ETReady) + } + + // n2: Put(b) to + // return with error> + if req.txnName == "txn1" && hasPut && req.toNodeID == tc.Server(1).NodeID() && cp == AfterSending { + // Hold the operation open until we are ready to retry on the new + // replica, after which we will return the injected failure. + req.pauseUntil(t, txn2ETReady, cp) + t.Logf("%s - injected RPC error", req.prefix) + return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) + } + + // 9. txn1->n1: Put(b) -- Retry sees intent, so returns a WriteTooOld + // error. Since the transaction had an earlier ambiguous failure on a + // batch with a commit, it should propagate the ambiguous error. + + // -- NB: Ambiguous replay protection is not required, and should not + // come into play here. + + // + if req.txnName == "txn2" && req.ba.IsSingleEndTxnRequest() && cp == BeforeSending { + req.pauseUntil(t, txn1Done, cp) + } + + return nil + } + tMu.Unlock() + + // Start test, await concurrent operations and validate results. + close(txn1Ready) + err := <-txn1ResultCh + t.Logf("txn1 completed with err: %v", err) + wg.Wait() + + // NB: It is likely not possible to eliminate ambiguity in this case, as + // the original intents were already cleaned up. + validateTxnCommitAmbiguousError(t, err, "WriteTooOld" /* reason */) + }) + + // When a retried write happens after our txn's intent has already been + // resolved post-recovery, even at the same timestamp we expect to see a + // WriteTooOld error propagate an ambiguous error. + t.Run("recovery before retry at same timestamp", func(t *testing.T) { + defer initSubTest(t)() + + // Checkpoints in test. + txn1Ready := make(chan struct{}) + txn2Ready := make(chan struct{}) + recoverComplete := make(chan struct{}) + resolveIntentComplete := make(chan struct{}) + txn1Done := make(chan struct{}) + + // Final result. + txn1ResultCh := make(chan error, 1) + + // Place second range on n1 (same as first). + secondRange := tc.LookupRangeOrFatal(t, keyB) + tc.TransferRangeLeaseOrFatal(t, secondRange, tc.Target(0)) + requireRangeLease(t, secondRange, 0) + + // Operation functions. + execTxn2 := func(t *testing.T, name string) error { + tCtx := context.Background() + + // txn2 just performs a simple GetForUpdate on a conflicting key, causing + // it to issue a PushTxn for txn1 which will kick off recovery. + txn := db.NewTxn(tCtx, name) + _ = getInBatch(t, tCtx, txn, keyB) + assert.NoError(t, txn.Commit(ctx)) + return nil + } + + // Concurrent transactions. + var wg sync.WaitGroup + wg.Add(2) + go runConcurrentOp(t, "txn1", execWorkloadTxn, &wg, txn1Ready, txn1Done, txn1ResultCh) + go runConcurrentOp(t, "txn2", execTxn2, &wg, txn2Ready, nil /* doneCh */, nil /* resultCh */) + + // KV Request sequencing. + var txn1KeyBWriteCount int64 + tMu.Lock() + tMu.maybeWait = func(cp InterceptPoint, req *interceptedReq, resp *interceptedResp) (override error) { + putReq, hasPut := req.ba.GetArg(kvpb.Put) + var keyBWriteCount int64 = -1 + + // These conditions are checked in order of expected operations of the + // test. + + // 1. txn1->n1: Get(a) + // 2. txn1->n1: Get(b) + // 3. txn1->n1: Put(a), EndTxn(parallel commit) -- Puts txn1 in STAGING. + // 4. txn1->n1: Put(b) -- Send the request, but pause before returning + // the response so we can inject network failure. + if req.txnName == "txn1" && hasPut && putReq.Header().Key.Equal(keyB) && cp == AfterSending { + keyBWriteCount = atomic.AddInt64(&txn1KeyBWriteCount, 1) + if keyBWriteCount == 1 { + close(txn2Ready) + } + } + + // 5. txn2->n1: Get(b) -- Discovers txn1's locks, issues push request. + // 6. _->n1: PushTxn(txn2->txn1) -- Discovers txn1 in STAGING and starts + // recovery. + // 7. _->n1: RecoverTxn(txn1) -- Allow to proceed. + if req.ba.IsSingleRecoverTxnRequest() && cp == AfterSending { + t.Logf("%s - complete, resp={%s}", req.prefix, resp) + close(recoverComplete) + } + + // 8. _->n1: ResolveIntent(txn1, b) + if riReq, ok := req.ba.GetArg(kvpb.ResolveIntent); ok && riReq.Header().Key.Equal(keyB) && cp == AfterSending { + t.Logf("%s - complete", req.prefix) + close(resolveIntentComplete) + } + + // n1: Put(b) to + // return with error> + if req.txnName == "txn1" && keyBWriteCount == 1 && cp == AfterSending { + // Hold the operation open until we are ready to retry, after which we + // will return the injected failure. + req.pauseUntil(t, resolveIntentComplete, cp) + t.Logf("%s - injected RPC error", req.prefix) + return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) + } + + // 9. txn1->n1: Put(b) -- Retry gets WriteTooOld and cannot perform a + // serverside refresh as reads were already returned. Fails, and should + // propagate the ambiguous error. + + // -- NB: Ambiguous replay protection is not required, and should not + // come into play here. + + // + if req.txnName == "txn2" && hasPut && cp == BeforeSending { + <-txn1Done + } + + return nil + } + tMu.Unlock() + + // Start test, await concurrent operations and validate results. + close(txn1Ready) + err := <-txn1ResultCh + t.Logf("txn1 completed with err: %v", err) + wg.Wait() + + // NB: It is likely not possible to eliminate ambiguity in this case, as + // the original intents were already cleaned up. + validateTxnCommitAmbiguousError(t, err, "WriteTooOld" /* reason */) + }) + + // This test is included for completeness, but tests expected behavior; + // when a retried write happens successfully at the same timestamp, and no + // refresh is required, the explicit commit can happen asynchronously and the + // txn coordinator can return early, reporting a successful commit. + // When this occurs, even if the recovery happens before the async EndTxn, + // it is expected behavior for the EndTxn to encounter an already committed + // txn record, and this is not treated as error. + t.Run("recovery after retry at same timestamp", func(t *testing.T) { + defer initSubTest(t)() + + // Checkpoints in test. + txn1Ready := make(chan struct{}) + txn2Ready := make(chan struct{}) + putRetryReady := make(chan struct{}) + receivedFinalET := make(chan struct{}) + recoverComplete := make(chan struct{}) + txn1Done := make(chan struct{}) + + // Final result. + txn1ResultCh := make(chan error, 1) + + // Place second range on n1 (same as first). + secondRange := tc.LookupRangeOrFatal(t, keyB) + tc.TransferRangeLeaseOrFatal(t, secondRange, tc.Target(0)) + requireRangeLease(t, secondRange, 0) + + // Operation functions. + execTxn2 := func(t *testing.T, name string) error { + tCtx := context.Background() + + // txn2 just performs a simple GetForUpdate on a conflicting key, causing + // it to issue a PushTxn for txn1 which will kick off recovery. + txn := db.NewTxn(tCtx, name) + _ = getInBatch(t, tCtx, txn, keyB) + assert.NoError(t, txn.Commit(ctx)) + return nil + } + + // Concurrent transactions. + var wg sync.WaitGroup + wg.Add(2) + go runConcurrentOp(t, "txn1", execWorkloadTxn, &wg, txn1Ready, txn1Done, txn1ResultCh) + go runConcurrentOp(t, "txn2", execTxn2, &wg, txn2Ready, nil /* doneCh */, nil /* resultCh */) + + // KV Request sequencing. + var txn1KeyBWriteCount int64 + tMu.Lock() + tMu.maybeWait = func(cp InterceptPoint, req *interceptedReq, resp *interceptedResp) (override error) { + putReq, hasPut := req.ba.GetArg(kvpb.Put) + var keyBWriteCount int64 = -1 + + // These conditions are checked in order of expected operations of the + // test. + + // 1. txn1->n1: Get(a) + // 2. txn1->n1: Get(b) + // 3. txn1->n1: Put(a), EndTxn(parallel commit) -- Puts txn1 in STAGING. + // 4. txn1->n1: Put(b) -- Send the request, but pause before returning + // the response so we can inject network failure. + if req.txnName == "txn1" && hasPut && putReq.Header().Key.Equal(keyB) && cp == AfterSending { + keyBWriteCount = atomic.AddInt64(&txn1KeyBWriteCount, 1) + if keyBWriteCount == 1 { + close(txn2Ready) + } + } + + // 5. txn2->n1: Get(b) -- Discovers txn1's locks, issues push request. + // 6. _->n1: PushTxn(txn2->txn1) -- Discovers txn1 in STAGING and starts + // recovery. + // 7. _->n1: RecoverTxn(txn1) -- Before sending, pause the request so we + // can ensure it gets evaluated after txn1 retries, but before its final + // EndTxn. + if req.ba.IsSingleRecoverTxnRequest() && cp == BeforeSending { + close(putRetryReady) + } + + // n1: Put(b) to + // return with error> + if req.txnName == "txn1" && keyBWriteCount == 1 && cp == AfterSending { + // Hold the operation open until we are ready to retry, after which we + // will return the injected failure. + req.pauseUntil(t, putRetryReady, cp) + t.Logf("%s - injected RPC error", req.prefix) + return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) + } + + // 8. txn1->n1: Put(b) -- Retry gets evaluated as idempotent replay and + // correctly succeeds. Ambiguous replay protection, though enabled, + // should not cause the retried write to error. + + // -- NB: In this case, txn1 returns without error here, as there is no + // need to refresh and the final EndTxn can be split off and run + // asynchronously. + + // 9. txn1->n1: EndTxn(commit) -- Before sending, pause the request so + // that we can allow (8) RecoverTxn(txn1) to proceed, simulating a race + // in which the recovery wins. + if req.txnName == "txn1" && req.ba.IsSingleEndTxnRequest() && cp == BeforeSending { + close(receivedFinalET) + } + + // -- because txn1 + // is in STAGING and has all of its writes, it is implicitly committed, + // so the recovery will succeed in marking it explicitly committed. + if req.ba.IsSingleRecoverTxnRequest() && cp == BeforeSending { + req.pauseUntilFirst(t, receivedFinalET, txn1Done, cp) + } + if req.ba.IsSingleRecoverTxnRequest() && cp == AfterSending { + t.Logf("%s - complete, resp={%s}", req.prefix, resp) + close(recoverComplete) + } + + // -- even though the + // recovery won, this is allowed. See makeTxnCommitExplicitLocked(..). + if req.txnName == "txn1" && req.ba.IsSingleEndTxnRequest() && cp == BeforeSending { + req.pauseUntil(t, recoverComplete, cp) + } + + // + if req.txnName == "txn2" && hasPut && cp == BeforeSending { + <-txn1Done + } + + return nil + } + tMu.Unlock() + + // Start test, await concurrent operations and validate results. + close(txn1Ready) + err := <-txn1ResultCh + t.Logf("txn1 completed with err: %v", err) + wg.Wait() + + require.NoErrorf(t, err, "expected txn1 to succeed") + }) + + // When a retried write happens after our txn's intent has already been + // resolved post-recovery, it should not be able to perform a serverside + // refresh to "handle" the WriteTooOld error as it may already be committed. + t.Run("recovery before retry with serverside refresh", func(t *testing.T) { + defer initSubTest(t)() + + // Checkpoints in test. + txn1Ready := make(chan struct{}) + txn2Ready := make(chan struct{}) + recoverComplete := make(chan struct{}) + resolveIntentComplete := make(chan struct{}) + txn1Done := make(chan struct{}) + + // Final result. + txn1ResultCh := make(chan error, 1) + + // Place second range on n1 (same as first). + secondRange := tc.LookupRangeOrFatal(t, keyB) + tc.TransferRangeLeaseOrFatal(t, secondRange, tc.Target(0)) + requireRangeLease(t, secondRange, 0) + + // Operation functions. + execTxn1 := func(t *testing.T, name string) error { + tCtx := context.Background() + + // txn2 just performs a simple GetForUpdate on a conflicting key, causing + // it to issue a PushTxn for txn1 which will kick off recovery. + txn := db.NewTxn(tCtx, name) + batch := txn.NewBatch() + batch.Put(keyA, 100) + batch.Put(keyB, 100) + return txn.CommitInBatch(tCtx, batch) + } + execTxn2 := func(t *testing.T, name string) error { + tCtx := context.Background() + + // txn2 just performs a simple GetForUpdate on a conflicting key, causing + // it to issue a PushTxn for txn1 which will kick off recovery. + txn := db.NewTxn(tCtx, name) + _ = getInBatch(t, tCtx, txn, keyB) + assert.NoError(t, txn.Commit(ctx)) + return nil + } + + // Concurrent transactions. + var wg sync.WaitGroup + wg.Add(2) + go runConcurrentOp(t, "txn1", execTxn1, &wg, txn1Ready, txn1Done, txn1ResultCh) + go runConcurrentOp(t, "txn2", execTxn2, &wg, txn2Ready, nil /* doneCh */, nil /* resultCh */) + + // KV Request sequencing. + var txn1KeyBWriteCount int64 + var txn1KeyBResolveIntentCount int64 + tMu.Lock() + tMu.maybeWait = func(cp InterceptPoint, req *interceptedReq, resp *interceptedResp) (override error) { + putReq, hasPut := req.ba.GetArg(kvpb.Put) + var keyBWriteCount int64 = -1 + var keyBResolveIntentCount int64 = -1 + + // These conditions are checked in order of expected operations of the + // test. + + // 1. txn1->n1: Put(a), EndTxn(parallel commit) -- Puts txn1 in STAGING. + // 2. txn1->n1: Put(b) -- Send the request, but pause before returning + // the response so we can inject network failure. + if req.txnName == "txn1" && hasPut && putReq.Header().Key.Equal(keyB) && cp == AfterSending { + keyBWriteCount = atomic.AddInt64(&txn1KeyBWriteCount, 1) + if keyBWriteCount == 1 { + close(txn2Ready) + } + } + + // 3. txn2->n1: Get(b) -- Discovers txn1's locks, issues push request. + // 4. _->n1: PushTxn(txn2->txn1) -- Discovers txn1 in STAGING and starts + // recovery. + // 5. _->n1: RecoverTxn(txn1) -- Allow to proceed. + if req.ba.IsSingleRecoverTxnRequest() && cp == AfterSending { + t.Logf("%s - complete, resp={%s}", req.prefix, resp) + close(recoverComplete) + } + + // 6. _->n1: ResolveIntent(txn1, b) + if riReq, ok := req.ba.GetArg(kvpb.ResolveIntent); ok && riReq.Header().Key.Equal(keyB) && cp == AfterSending { + keyBResolveIntentCount = atomic.AddInt64(&txn1KeyBResolveIntentCount, 1) + if keyBResolveIntentCount == 1 { + t.Logf("%s - complete", req.prefix) + close(resolveIntentComplete) + } + } + + // n1: Put(b) to + // return with error> + if req.txnName == "txn1" && keyBWriteCount == 1 && cp == AfterSending { + // Hold the operation open until we are ready to retry, after which we + // will return the injected failure. + req.pauseUntil(t, resolveIntentComplete, cp) + t.Logf("%s - injected RPC error", req.prefix) + return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) + } + + // 7. txn1->n1: Put(b) -- Retry gets WriteTooOld; with ambiguous replay + // protection enabled, it should not be able to perform a serverside + // refresh. Fails, and should propagate the ambiguous error. + + // -- NB: Without ambiguous replay protection, txn1 would continue with: + + // 8. txn1->n1: EndTxn(commit) -- Would get "transaction unexpectedly + // committed" due to the recovery completing first. + + // + if req.txnName == "txn2" && hasPut && cp == BeforeSending { + <-txn1Done + } + + return nil + } + tMu.Unlock() + + // Start test, await concurrent operations and validate results. + close(txn1Ready) + err := <-txn1ResultCh + t.Logf("txn1 completed with err: %v", err) wg.Wait() - // TODO(sarkesian): While we expect an AmbiguousResultError once the - // immediate changes outlined in #103817 are implemented, right now this is - // essentially validating the existence of the bug. This needs to be fixed, - // and we should not see an assertion failure from the transaction - // coordinator once fixed. - tErr := (*kvpb.TransactionStatusError)(nil) - require.ErrorAsf(t, err, &tErr, - "expected TransactionStatusError due to being already committed") - require.Equalf(t, kvpb.TransactionStatusError_REASON_TXN_COMMITTED, tErr.Reason, - "expected TransactionStatusError due to being already committed") - require.Truef(t, errors.HasAssertionFailure(err), - "expected AssertionFailedError due to sanity check on transaction already committed") + // NB: It is likely not possible to eliminate ambiguity in this case, as + // the original intents were already cleaned up. + validateTxnCommitAmbiguousError(t, err, "WriteTooOld" /* reason */) }) } diff --git a/pkg/kv/kvpb/api.proto b/pkg/kv/kvpb/api.proto index b2571f46bbc4..4375ab8d9f16 100644 --- a/pkg/kv/kvpb/api.proto +++ b/pkg/kv/kvpb/api.proto @@ -2766,9 +2766,18 @@ message Header { // sender. repeated string profile_labels = 31 [(gogoproto.customname) = "ProfileLabels"]; + // AmbiguousReplayProtection, if set, prevents a retried write operation + // from being considered an idempotent replay of a successful prior attempt + // (of the same operation) if the request's write timestamp is different from + // the prior attempt's. This protection is required when there has been an + // ambiguous write (i.e. RPC error) on a batch that contained a commit, + // as the transaction may have already been considered implicitly committed, + // and/or been explicitly committed by a RecoverTxn request. See #103817. + bool ambiguous_replay_protection = 32; + reserved 7, 10, 12, 14, 20; - // Next ID: 32 + // Next ID: 33 } // BoundedStalenessHeader contains configuration values pertaining to bounded diff --git a/pkg/kv/kvpb/batch.go b/pkg/kv/kvpb/batch.go index 331164716f3b..c3df7a5eb68f 100644 --- a/pkg/kv/kvpb/batch.go +++ b/pkg/kv/kvpb/batch.go @@ -855,6 +855,9 @@ func (ba BatchRequest) SafeFormat(s redact.SafePrinter, _ rune) { if ba.WaitPolicy != lock.WaitPolicy_Block { s.Printf(", [wait-policy: %s]", ba.WaitPolicy) } + if ba.AmbiguousReplayProtection { + s.Printf(", [protect-ambiguous-replay]") + } if ba.CanForwardReadTimestamp { s.Printf(", [can-forward-ts]") } diff --git a/pkg/kv/kvpb/string_test.go b/pkg/kv/kvpb/string_test.go index efb879051ff6..ddba8de974e9 100644 --- a/pkg/kv/kvpb/string_test.go +++ b/pkg/kv/kvpb/string_test.go @@ -44,6 +44,7 @@ func TestBatchRequestString(t *testing.T) { txn.ID = uuid.NamespaceDNS ba.Txn = &txn ba.WaitPolicy = lock.WaitPolicy_Error + ba.AmbiguousReplayProtection = true ba.CanForwardReadTimestamp = true ba.BoundedStaleness = &kvpb.BoundedStalenessHeader{ MinTimestampBound: hlc.Timestamp{WallTime: 1}, @@ -60,7 +61,7 @@ func TestBatchRequestString(t *testing.T) { ba.Requests = append(ba.Requests, ru) { - exp := `Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min),... 76 skipped ..., Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), EndTxn(abort) [/Min], [txn: 6ba7b810], [wait-policy: Error], [can-forward-ts], [bounded-staleness, min_ts_bound: 0.000000001,0, min_ts_bound_strict, max_ts_bound: 0.000000002,0]` + exp := `Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min),... 76 skipped ..., Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), EndTxn(abort) [/Min], [txn: 6ba7b810], [wait-policy: Error], [protect-ambiguous-replay], [can-forward-ts], [bounded-staleness, min_ts_bound: 0.000000001,0, min_ts_bound_strict, max_ts_bound: 0.000000002,0]` act := ba.String() require.Equal(t, exp, act) } diff --git a/pkg/kv/kvserver/batcheval/cmd_conditional_put.go b/pkg/kv/kvserver/batcheval/cmd_conditional_put.go index c619a1e4ca30..90768a6b4c69 100644 --- a/pkg/kv/kvserver/batcheval/cmd_conditional_put.go +++ b/pkg/kv/kvserver/batcheval/cmd_conditional_put.go @@ -59,9 +59,10 @@ func ConditionalPut( handleMissing := storage.CPutMissingBehavior(args.AllowIfDoesNotExist) opts := storage.MVCCWriteOptions{ - Txn: h.Txn, - LocalTimestamp: cArgs.Now, - Stats: cArgs.Stats, + Txn: h.Txn, + LocalTimestamp: cArgs.Now, + Stats: cArgs.Stats, + ReplayWriteTimestampProtection: h.AmbiguousReplayProtection, } var err error diff --git a/pkg/kv/kvserver/batcheval/cmd_delete.go b/pkg/kv/kvserver/batcheval/cmd_delete.go index e5321313c27b..2570f1c55c90 100644 --- a/pkg/kv/kvserver/batcheval/cmd_delete.go +++ b/pkg/kv/kvserver/batcheval/cmd_delete.go @@ -32,9 +32,10 @@ func Delete( reply := resp.(*kvpb.DeleteResponse) opts := storage.MVCCWriteOptions{ - Txn: h.Txn, - LocalTimestamp: cArgs.Now, - Stats: cArgs.Stats, + Txn: h.Txn, + LocalTimestamp: cArgs.Now, + Stats: cArgs.Stats, + ReplayWriteTimestampProtection: h.AmbiguousReplayProtection, } var err error diff --git a/pkg/kv/kvserver/batcheval/cmd_delete_range.go b/pkg/kv/kvserver/batcheval/cmd_delete_range.go index 103b0e76b1b9..969918e53bbd 100644 --- a/pkg/kv/kvserver/batcheval/cmd_delete_range.go +++ b/pkg/kv/kvserver/batcheval/cmd_delete_range.go @@ -223,6 +223,14 @@ func DeleteRange( if !args.Inline { timestamp = h.Timestamp } + + opts := storage.MVCCWriteOptions{ + Txn: h.Txn, + LocalTimestamp: cArgs.Now, + Stats: cArgs.Stats, + ReplayWriteTimestampProtection: h.AmbiguousReplayProtection, + } + // NB: Even if args.ReturnKeys is false, we want to know which intents were // written if we're evaluating the DeleteRange for a transaction so that we // can update the Result's AcquiredLocks field. @@ -230,8 +238,7 @@ func DeleteRange( deleted, resumeSpan, num, err := storage.MVCCDeleteRange( ctx, readWriter, args.Key, args.EndKey, h.MaxSpanRequestKeys, timestamp, - storage.MVCCWriteOptions{Txn: h.Txn, LocalTimestamp: cArgs.Now, Stats: cArgs.Stats}, - returnKeys) + opts, returnKeys) if err != nil { return result.Result{}, err } diff --git a/pkg/kv/kvserver/batcheval/cmd_increment.go b/pkg/kv/kvserver/batcheval/cmd_increment.go index be576e3d1c48..53c6f871290b 100644 --- a/pkg/kv/kvserver/batcheval/cmd_increment.go +++ b/pkg/kv/kvserver/batcheval/cmd_increment.go @@ -33,9 +33,10 @@ func Increment( reply := resp.(*kvpb.IncrementResponse) opts := storage.MVCCWriteOptions{ - Txn: h.Txn, - LocalTimestamp: cArgs.Now, - Stats: cArgs.Stats, + Txn: h.Txn, + LocalTimestamp: cArgs.Now, + Stats: cArgs.Stats, + ReplayWriteTimestampProtection: h.AmbiguousReplayProtection, } var err error diff --git a/pkg/kv/kvserver/batcheval/cmd_init_put.go b/pkg/kv/kvserver/batcheval/cmd_init_put.go index 7c6301fc7142..078013f45730 100644 --- a/pkg/kv/kvserver/batcheval/cmd_init_put.go +++ b/pkg/kv/kvserver/batcheval/cmd_init_put.go @@ -37,9 +37,10 @@ func InitPut( } opts := storage.MVCCWriteOptions{ - Txn: h.Txn, - LocalTimestamp: cArgs.Now, - Stats: cArgs.Stats, + Txn: h.Txn, + LocalTimestamp: cArgs.Now, + Stats: cArgs.Stats, + ReplayWriteTimestampProtection: h.AmbiguousReplayProtection, } var err error diff --git a/pkg/kv/kvserver/batcheval/cmd_put.go b/pkg/kv/kvserver/batcheval/cmd_put.go index 5db6b772a688..5fc6f427cc4d 100644 --- a/pkg/kv/kvserver/batcheval/cmd_put.go +++ b/pkg/kv/kvserver/batcheval/cmd_put.go @@ -55,9 +55,10 @@ func Put( } opts := storage.MVCCWriteOptions{ - Txn: h.Txn, - LocalTimestamp: cArgs.Now, - Stats: cArgs.Stats, + Txn: h.Txn, + LocalTimestamp: cArgs.Now, + Stats: cArgs.Stats, + ReplayWriteTimestampProtection: h.AmbiguousReplayProtection, } var err error diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index dd5012bf53f8..3581a5ad3b1b 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -1712,6 +1712,7 @@ func replayTransactionalWrite( value roachpb.Value, txn *roachpb.Transaction, valueFn func(optionalValue) (roachpb.Value, error), + replayWriteTimestampProtection bool, ) error { var writtenValue optionalValue var err error @@ -1797,6 +1798,14 @@ func replayTransactionalWrite( txn.ID, txn.Sequence, value.RawBytes, writtenValue.RawBytes) } + // If ambiguous replay protection is enabled, a replay that changes the + // timestamp should fail, as this would break idempotency (see #103817). + if replayWriteTimestampProtection && !txn.WriteTimestamp.Equal(meta.Txn.WriteTimestamp) { + return errors.Errorf("transaction %s with sequence %d prevented from changing "+ + "write timestamp from %s to %s due to ambiguous replay protection", + txn.ID, txn.Sequence, meta.Txn.WriteTimestamp, txn.WriteTimestamp) + } + return nil } @@ -1866,6 +1875,9 @@ func mvccPutInternal( if !value.Timestamp.IsEmpty() { return false, errors.Errorf("cannot have timestamp set in value") } + if err := opts.validate(); err != nil { + return false, err + } metaKey := MakeMVCCMetadataKey(key) ok, origMetaKeySize, origMetaValSize, origRealKeyChanged, err := @@ -1965,7 +1977,7 @@ func mvccPutInternal( // The transaction has executed at this sequence before. This is merely a // replay of the transactional write. Assert that all is in order and return // early. - return false, replayTransactionalWrite(ctx, iter, meta, key, readTimestamp, value, opts.Txn, valueFn) + return false, replayTransactionalWrite(ctx, iter, meta, key, readTimestamp, value, opts.Txn, valueFn, opts.ReplayWriteTimestampProtection) } // We're overwriting the intent that was present at this key, before we do @@ -3980,9 +3992,17 @@ func buildScanIntents(data []byte) ([]roachpb.Intent, error) { // MVCCWriteOptions bundles options for the MVCCPut and MVCCDelete families of functions. type MVCCWriteOptions struct { // See the comment on mvccPutInternal for details on these parameters. - Txn *roachpb.Transaction - LocalTimestamp hlc.ClockTimestamp - Stats *enginepb.MVCCStats + Txn *roachpb.Transaction + LocalTimestamp hlc.ClockTimestamp + Stats *enginepb.MVCCStats + ReplayWriteTimestampProtection bool +} + +func (opts *MVCCWriteOptions) validate() error { + if opts.ReplayWriteTimestampProtection && opts.Txn == nil { + return errors.Errorf("cannot enable replay protection without a transaction") + } + return nil } // MVCCScanOptions bundles options for the MVCCScan family of functions. diff --git a/pkg/storage/mvcc_history_test.go b/pkg/storage/mvcc_history_test.go index 5bf6ce70375d..1dbf73c18726 100644 --- a/pkg/storage/mvcc_history_test.go +++ b/pkg/storage/mvcc_history_test.go @@ -87,15 +87,14 @@ var ( // check_intent k= [none] // add_lock t= k= // -// cput [t=] [ts=[,]] [localTs=[,]] [resolve [status=]] k= v= [raw] [cond=] -// del [t=] [ts=[,]] [localTs=[,]] [resolve [status=]] k= -// del_range [t=] [ts=[,]] [localTs=[,]] [resolve [status=]] k= [end=] [max=] [returnKeys] +// cput [t=] [ts=[,]] [localTs=[,]] [resolve [status=]] [ambiguousReplay] k= v= [raw] [cond=] +// del [t=] [ts=[,]] [localTs=[,]] [resolve [status=]] [ambiguousReplay] k= +// del_range [t=] [ts=[,]] [localTs=[,]] [resolve [status=]] [ambiguousReplay] k= [end=] [max=] [returnKeys] // del_range_ts [ts=[,]] [localTs=[,]] k= end= [idempotent] [noCoveredStats] // del_range_pred [ts=[,]] [localTs=[,]] k= end= [startTime=,max=,maxBytes=,rangeThreshold=] -// increment [t=] [ts=[,]] [localTs=[,]] [resolve [status=]] k= [inc=] -// initput [t=] [ts=[,]] [resolve [status=]] k= v= [raw] [failOnTombstones] -// merge [t=] [ts=[,]] [resolve [status=]] k= v= [raw] -// put [t=] [ts=[,]] [localTs=[,]] [resolve [status=]] k= v= [raw] +// increment [t=] [ts=[,]] [localTs=[,]] [resolve [status=]] [ambiguousReplay] k= [inc=] +// initput [t=] [ts=[,]] [resolve [status=]] [ambiguousReplay] k= v= [raw] [failOnTombstones] +// put [t=] [ts=[,]] [localTs=[,]] [resolve [status=]] [ambiguousReplay] k= v= [raw] // put_rangekey ts=[,] [localTs=[,]] k= end= // put_blind_inline k= v= [prev=] // get [t=] [ts=[,]] [resolve [status=]] k= [inconsistent] [skipLocked] [tombstones] [failOnMoreRecent] [localUncertaintyLimit=[,]] [globalUncertaintyLimit=[,]] [maxKeys=] [targetBytes=] [allowEmpty] @@ -1104,9 +1103,10 @@ func cmdCPut(e *evalCtx) error { return e.withWriter("cput", func(rw storage.ReadWriter) error { opts := storage.MVCCWriteOptions{ - Txn: txn, - LocalTimestamp: localTs, - Stats: e.ms, + Txn: txn, + LocalTimestamp: localTs, + Stats: e.ms, + ReplayWriteTimestampProtection: e.getAmbiguousReplay(), } if err := storage.MVCCConditionalPut(e.ctx, rw, key, ts, val, expVal, behavior, opts); err != nil { return err @@ -1130,9 +1130,10 @@ func cmdInitPut(e *evalCtx) error { return e.withWriter("initput", func(rw storage.ReadWriter) error { opts := storage.MVCCWriteOptions{ - Txn: txn, - LocalTimestamp: localTs, - Stats: e.ms, + Txn: txn, + LocalTimestamp: localTs, + Stats: e.ms, + ReplayWriteTimestampProtection: e.getAmbiguousReplay(), } if err := storage.MVCCInitPut(e.ctx, rw, key, ts, val, failOnTombstones, opts); err != nil { return err @@ -1152,9 +1153,10 @@ func cmdDelete(e *evalCtx) error { resolve, resolveStatus := e.getResolve() return e.withWriter("del", func(rw storage.ReadWriter) error { opts := storage.MVCCWriteOptions{ - Txn: txn, - LocalTimestamp: localTs, - Stats: e.ms, + Txn: txn, + LocalTimestamp: localTs, + Stats: e.ms, + ReplayWriteTimestampProtection: e.getAmbiguousReplay(), } foundKey, err := storage.MVCCDelete(e.ctx, rw, key, ts, opts) if err == nil || errors.HasType(err, &kvpb.WriteTooOldError{}) { @@ -1186,9 +1188,10 @@ func cmdDeleteRange(e *evalCtx) error { resolve, resolveStatus := e.getResolve() return e.withWriter("del_range", func(rw storage.ReadWriter) error { opts := storage.MVCCWriteOptions{ - Txn: txn, - LocalTimestamp: localTs, - Stats: e.ms, + Txn: txn, + LocalTimestamp: localTs, + Stats: e.ms, + ReplayWriteTimestampProtection: e.getAmbiguousReplay(), } deleted, resumeSpan, num, err := storage.MVCCDeleteRange( e.ctx, rw, key, endKey, int64(max), ts, opts, returnKeys) @@ -1345,9 +1348,10 @@ func cmdIncrement(e *evalCtx) error { return e.withWriter("increment", func(rw storage.ReadWriter) error { opts := storage.MVCCWriteOptions{ - Txn: txn, - LocalTimestamp: localTs, - Stats: e.ms, + Txn: txn, + LocalTimestamp: localTs, + Stats: e.ms, + ReplayWriteTimestampProtection: e.getAmbiguousReplay(), } curVal, err := storage.MVCCIncrement(e.ctx, rw, key, ts, opts, inc) if err != nil { @@ -1386,9 +1390,10 @@ func cmdPut(e *evalCtx) error { return e.withWriter("put", func(rw storage.ReadWriter) error { opts := storage.MVCCWriteOptions{ - Txn: txn, - LocalTimestamp: localTs, - Stats: e.ms, + Txn: txn, + LocalTimestamp: localTs, + Stats: e.ms, + ReplayWriteTimestampProtection: e.getAmbiguousReplay(), } if err := storage.MVCCPut(e.ctx, rw, key, ts, val, opts); err != nil { return err @@ -2252,6 +2257,10 @@ func (e *evalCtx) getResolve() (bool, roachpb.TransactionStatus) { return true, e.getTxnStatus() } +func (e *evalCtx) getAmbiguousReplay() bool { + return e.hasArg("ambiguousReplay") +} + func (e *evalCtx) getTs(txn *roachpb.Transaction) hlc.Timestamp { return e.getTsWithTxnAndName(txn, "ts") } diff --git a/pkg/storage/testdata/mvcc_histories/ambiguous_writes b/pkg/storage/testdata/mvcc_histories/ambiguous_writes new file mode 100644 index 000000000000..e59e1f9677d6 --- /dev/null +++ b/pkg/storage/testdata/mvcc_histories/ambiguous_writes @@ -0,0 +1,394 @@ +# Ambiguous replay protection is only valid on transactional writes. + +run error +del k=a ambiguousReplay +---- +>> at end: + +error: (*withstack.withStack:) cannot enable replay protection without a transaction + +# Idempotent replays should normally be allowed, even if they move the timestamp. + +run stats ok +with t=A k=a + txn_begin ts=11 + # Lay down an intent. + put v=first +---- +>> put v=first t=A k=a +stats: key_count=+1 key_bytes=+14 val_count=+1 val_bytes=+61 live_count=+1 live_bytes=+75 intent_count=+1 intent_bytes=+22 separated_intent_count=+1 intent_age=+89 +>> at end: +txn: "A" meta={id=00000000 key="a" iso=Serializable pri=0.00000000 epo=0 ts=11.000000000,0 min=0,0 seq=0} lock=true stat=PENDING rts=11.000000000,0 wto=false gul=0,0 +meta: "a"/0,0 -> txn={id=00000000 key="a" iso=Serializable pri=0.00000000 epo=0 ts=11.000000000,0 min=0,0 seq=0} ts=11.000000000,0 del=false klen=12 vlen=10 mergeTs= txnDidNotUpdateMeta=true +data: "a"/11.000000000,0 -> /BYTES/first +stats: key_count=1 key_bytes=14 val_count=1 val_bytes=61 live_count=1 live_bytes=75 intent_count=1 intent_bytes=22 separated_intent_count=1 intent_age=89 + +run stats ok +with t=A k=a + # Perform an idempotent replay, but at a higher ts. + txn_advance ts=12 + put v=first +---- +>> put v=first t=A k=a +stats: no change +>> at end: +txn: "A" meta={id=00000000 key="a" iso=Serializable pri=0.00000000 epo=0 ts=12.000000000,0 min=0,0 seq=0} lock=true stat=PENDING rts=11.000000000,0 wto=false gul=0,0 +meta: "a"/0,0 -> txn={id=00000000 key="a" iso=Serializable pri=0.00000000 epo=0 ts=11.000000000,0 min=0,0 seq=0} ts=11.000000000,0 del=false klen=12 vlen=10 mergeTs= txnDidNotUpdateMeta=true +data: "a"/11.000000000,0 -> /BYTES/first +stats: key_count=1 key_bytes=14 val_count=1 val_bytes=61 live_count=1 live_bytes=75 intent_count=1 intent_bytes=22 separated_intent_count=1 intent_age=89 + +run ok +with t=A k=a + resolve_intent + txn_remove +---- +>> at end: +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first + +# Ambiguous replay protection should allow initial writes. + +run stats ok +with t=B k=k + txn_begin ts=0,1 + initput k=k ts=0,1 v=k1 ambiguousReplay +---- +>> initput k=k ts=0,1 v=k1 ambiguousReplay t=B k=k +stats: key_count=+1 key_bytes=+14 val_count=+1 val_bytes=+50 live_count=+1 live_bytes=+64 intent_count=+1 intent_bytes=+19 separated_intent_count=+1 intent_age=+100 +>> at end: +txn: "B" meta={id=00000000 key="k" iso=Serializable pri=0.00000000 epo=0 ts=0,1 min=0,0 seq=0} lock=true stat=PENDING rts=0,1 wto=false gul=0,0 +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +meta: "k"/0,0 -> txn={id=00000000 key="k" iso=Serializable pri=0.00000000 epo=0 ts=0,1 min=0,0 seq=0} ts=0,1 del=false klen=12 vlen=7 mergeTs= txnDidNotUpdateMeta=true +data: "k"/0,1 -> /BYTES/k1 +stats: key_count=2 key_bytes=28 val_count=2 val_bytes=73 live_count=2 live_bytes=101 intent_count=1 intent_bytes=19 separated_intent_count=1 intent_age=100 + +run ok +with t=B k=k + resolve_intent + txn_remove +---- +>> at end: +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +data: "k"/0,1 -> /BYTES/k1 + +# Ambiguous replay protection should not affect identically evaluating cputs. + +run stats ok +with t=C k=k + txn_begin ts=0,2 + cput v=k2 cond=k1 +---- +>> cput v=k2 cond=k1 t=C k=k +stats: key_bytes=+12 val_count=+1 val_bytes=+50 live_bytes=+43 gc_bytes_age=+1900 intent_count=+1 intent_bytes=+19 separated_intent_count=+1 intent_age=+100 +>> at end: +txn: "C" meta={id=00000000 key="k" iso=Serializable pri=0.00000000 epo=0 ts=0,2 min=0,0 seq=0} lock=true stat=PENDING rts=0,2 wto=false gul=0,0 +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +meta: "k"/0,0 -> txn={id=00000000 key="k" iso=Serializable pri=0.00000000 epo=0 ts=0,2 min=0,0 seq=0} ts=0,2 del=false klen=12 vlen=7 mergeTs= txnDidNotUpdateMeta=true +data: "k"/0,2 -> /BYTES/k2 +data: "k"/0,1 -> /BYTES/k1 +stats: key_count=2 key_bytes=40 val_count=3 val_bytes=80 live_count=2 live_bytes=101 gc_bytes_age=1900 intent_count=1 intent_bytes=19 separated_intent_count=1 intent_age=100 + +run stats ok +with t=C k=k + cput v=k2 cond=k1 ambiguousReplay +---- +>> cput v=k2 cond=k1 ambiguousReplay t=C k=k +stats: no change +>> at end: +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +meta: "k"/0,0 -> txn={id=00000000 key="k" iso=Serializable pri=0.00000000 epo=0 ts=0,2 min=0,0 seq=0} ts=0,2 del=false klen=12 vlen=7 mergeTs= txnDidNotUpdateMeta=true +data: "k"/0,2 -> /BYTES/k2 +data: "k"/0,1 -> /BYTES/k1 +stats: key_count=2 key_bytes=40 val_count=3 val_bytes=80 live_count=2 live_bytes=101 gc_bytes_age=1900 intent_count=1 intent_bytes=19 separated_intent_count=1 intent_age=100 + +run ok +with t=C k=k + resolve_intent + txn_remove +---- +>> at end: +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +data: "k"/0,2 -> /BYTES/k2 +data: "k"/0,1 -> /BYTES/k1 + +# Ambiguous replay protection should prevent a replay from evaluating at a higher timestamp. + +run stats ok +with t=D k=k + txn_begin ts=3,0 + put v=k3 +---- +>> put v=k3 t=D k=k +stats: key_bytes=+12 val_count=+1 val_bytes=+58 live_bytes=+51 gc_bytes_age=+1843 intent_count=+1 intent_bytes=+19 separated_intent_count=+1 intent_age=+97 +>> at end: +txn: "D" meta={id=00000000 key="k" iso=Serializable pri=0.00000000 epo=0 ts=3.000000000,0 min=0,0 seq=0} lock=true stat=PENDING rts=3.000000000,0 wto=false gul=0,0 +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +meta: "k"/0,0 -> txn={id=00000000 key="k" iso=Serializable pri=0.00000000 epo=0 ts=3.000000000,0 min=0,0 seq=0} ts=3.000000000,0 del=false klen=12 vlen=7 mergeTs= txnDidNotUpdateMeta=true +data: "k"/3.000000000,0 -> /BYTES/k3 +data: "k"/0,2 -> /BYTES/k2 +data: "k"/0,1 -> /BYTES/k1 +stats: key_count=2 key_bytes=52 val_count=4 val_bytes=95 live_count=2 live_bytes=109 gc_bytes_age=3743 intent_count=1 intent_bytes=19 separated_intent_count=1 intent_age=97 + +run stats error +with t=D k=k + txn_advance ts=3,1 + put v=k3 ambiguousReplay +---- +>> put v=k3 ambiguousReplay t=D k=k +stats: no change +>> at end: +txn: "D" meta={id=00000000 key="k" iso=Serializable pri=0.00000000 epo=0 ts=3.000000000,1 min=0,0 seq=0} lock=true stat=PENDING rts=3.000000000,0 wto=false gul=0,0 +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +meta: "k"/0,0 -> txn={id=00000000 key="k" iso=Serializable pri=0.00000000 epo=0 ts=3.000000000,0 min=0,0 seq=0} ts=3.000000000,0 del=false klen=12 vlen=7 mergeTs= txnDidNotUpdateMeta=true +data: "k"/3.000000000,0 -> /BYTES/k3 +data: "k"/0,2 -> /BYTES/k2 +data: "k"/0,1 -> /BYTES/k1 +stats: key_count=2 key_bytes=52 val_count=4 val_bytes=95 live_count=2 live_bytes=109 gc_bytes_age=3743 intent_count=1 intent_bytes=19 separated_intent_count=1 intent_age=97 +error: (*withstack.withStack:) transaction 00000000-0000-0000-0000-000000000004 with sequence 0 prevented from changing write timestamp from 3.000000000,0 to 3.000000000,1 due to replay protection + +run ok +with t=D k=k + # Commit at the original timestamp (i.e. if committed by a recovery operation). + txn_advance ts=3,0 + resolve_intent + txn_remove +---- +>> at end: +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +data: "k"/3.000000000,0 -> /BYTES/k3 +data: "k"/0,2 -> /BYTES/k2 +data: "k"/0,1 -> /BYTES/k1 + +# Ambiguous replay protection still results in WriteTooOld errors after intent cleanup. + +run stats ok +with t=E k=k + txn_begin ts=4,0 + del resolve +---- +>> del resolve t=E k=k +del: "k": found key true +stats: key_bytes=+12 val_count=+1 live_count=-1 live_bytes=-21 gc_bytes_age=+3168 +>> at end: +txn: "E" meta={id=00000000 key="k" iso=Serializable pri=0.00000000 epo=0 ts=4.000000000,0 min=0,0 seq=0} lock=true stat=PENDING rts=4.000000000,0 wto=false gul=0,0 +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +data: "k"/4.000000000,0 -> / +data: "k"/3.000000000,0 -> /BYTES/k3 +data: "k"/0,2 -> /BYTES/k2 +data: "k"/0,1 -> /BYTES/k1 +stats: key_count=2 key_bytes=64 val_count=5 val_bytes=44 live_count=1 live_bytes=37 gc_bytes_age=6911 + +run stats error +with t=E k=k + del ambiguousReplay +---- +>> del ambiguousReplay t=E k=k +del: "k": found key false +stats: no change +>> at end: +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +data: "k"/4.000000000,0 -> / +data: "k"/3.000000000,0 -> /BYTES/k3 +data: "k"/0,2 -> /BYTES/k2 +data: "k"/0,1 -> /BYTES/k1 +stats: key_count=2 key_bytes=64 val_count=5 val_bytes=44 live_count=1 live_bytes=37 gc_bytes_age=6911 +error: (*kvpb.WriteTooOldError:) WriteTooOldError: write for key "k" at timestamp 4.000000000,0 too old; must write at or above 4.000000000,1 + +run ok +txn_remove t=E +---- +>> at end: + +# Ambiguous replay protects against timestamp change on point delete. + +run stats ok +with t=F k=k + txn_begin ts=5,0 + # Write an initial value at the first sequence number. + put v=k5 +---- +>> put v=k5 t=F k=k +stats: key_bytes=+12 val_count=+1 val_bytes=+58 live_count=+1 live_bytes=+72 gc_bytes_age=-192 intent_count=+1 intent_bytes=+19 separated_intent_count=+1 intent_age=+95 +>> at end: +txn: "F" meta={id=00000000 key="k" iso=Serializable pri=0.00000000 epo=0 ts=5.000000000,0 min=0,0 seq=0} lock=true stat=PENDING rts=5.000000000,0 wto=false gul=0,0 +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +meta: "k"/0,0 -> txn={id=00000000 key="k" iso=Serializable pri=0.00000000 epo=0 ts=5.000000000,0 min=0,0 seq=0} ts=5.000000000,0 del=false klen=12 vlen=7 mergeTs= txnDidNotUpdateMeta=true +data: "k"/5.000000000,0 -> /BYTES/k5 +data: "k"/4.000000000,0 -> / +data: "k"/3.000000000,0 -> /BYTES/k3 +data: "k"/0,2 -> /BYTES/k2 +data: "k"/0,1 -> /BYTES/k1 +stats: key_count=2 key_bytes=76 val_count=6 val_bytes=102 live_count=2 live_bytes=109 gc_bytes_age=6719 intent_count=1 intent_bytes=19 separated_intent_count=1 intent_age=95 + +run stats ok +with t=F k=k + txn_step + # Let's assume we had an RPC error prior to the first successful operation. + del ambiguousReplay +---- +>> del ambiguousReplay t=F k=k +del: "k": found key true +stats: val_bytes=+6 live_count=-1 live_bytes=-72 gc_bytes_age=+7410 intent_bytes=-7 +>> at end: +txn: "F" meta={id=00000000 key="k" iso=Serializable pri=0.00000000 epo=0 ts=5.000000000,0 min=0,0 seq=1} lock=true stat=PENDING rts=5.000000000,0 wto=false gul=0,0 +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +meta: "k"/0,0 -> txn={id=00000000 key="k" iso=Serializable pri=0.00000000 epo=0 ts=5.000000000,0 min=0,0 seq=1} ts=5.000000000,0 del=true klen=12 vlen=0 ih={{0 /BYTES/k5}} mergeTs= txnDidNotUpdateMeta=false +data: "k"/5.000000000,0 -> / +data: "k"/4.000000000,0 -> / +data: "k"/3.000000000,0 -> /BYTES/k3 +data: "k"/0,2 -> /BYTES/k2 +data: "k"/0,1 -> /BYTES/k1 +stats: key_count=2 key_bytes=76 val_count=6 val_bytes=108 live_count=1 live_bytes=37 gc_bytes_age=14129 intent_count=1 intent_bytes=12 separated_intent_count=1 intent_age=95 + +run stats ok +with t=F k=k + txn_step n=-1 + # A replay of a lower sequence number with the same timestamp should be allowed. + put v=k5 ambiguousReplay +---- +>> put v=k5 ambiguousReplay t=F k=k +stats: no change +>> at end: +txn: "F" meta={id=00000000 key="k" iso=Serializable pri=0.00000000 epo=0 ts=5.000000000,0 min=0,0 seq=0} lock=true stat=PENDING rts=5.000000000,0 wto=false gul=0,0 +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +meta: "k"/0,0 -> txn={id=00000000 key="k" iso=Serializable pri=0.00000000 epo=0 ts=5.000000000,0 min=0,0 seq=1} ts=5.000000000,0 del=true klen=12 vlen=0 ih={{0 /BYTES/k5}} mergeTs= txnDidNotUpdateMeta=false +data: "k"/5.000000000,0 -> / +data: "k"/4.000000000,0 -> / +data: "k"/3.000000000,0 -> /BYTES/k3 +data: "k"/0,2 -> /BYTES/k2 +data: "k"/0,1 -> /BYTES/k1 +stats: key_count=2 key_bytes=76 val_count=6 val_bytes=108 live_count=1 live_bytes=37 gc_bytes_age=14129 intent_count=1 intent_bytes=12 separated_intent_count=1 intent_age=95 + +run stats error +with t=F k=k + txn_step + txn_advance ts=5,1 + del ambiguousReplay +---- +>> del ambiguousReplay t=F k=k +stats: no change +>> at end: +txn: "F" meta={id=00000000 key="k" iso=Serializable pri=0.00000000 epo=0 ts=5.000000000,1 min=0,0 seq=1} lock=true stat=PENDING rts=5.000000000,0 wto=false gul=0,0 +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +meta: "k"/0,0 -> txn={id=00000000 key="k" iso=Serializable pri=0.00000000 epo=0 ts=5.000000000,0 min=0,0 seq=1} ts=5.000000000,0 del=true klen=12 vlen=0 ih={{0 /BYTES/k5}} mergeTs= txnDidNotUpdateMeta=false +data: "k"/5.000000000,0 -> / +data: "k"/4.000000000,0 -> / +data: "k"/3.000000000,0 -> /BYTES/k3 +data: "k"/0,2 -> /BYTES/k2 +data: "k"/0,1 -> /BYTES/k1 +stats: key_count=2 key_bytes=76 val_count=6 val_bytes=108 live_count=1 live_bytes=37 gc_bytes_age=14129 intent_count=1 intent_bytes=12 separated_intent_count=1 intent_age=95 +error: (*withstack.withStack:) transaction 00000000-0000-0000-0000-000000000006 with sequence 1 prevented from changing write timestamp from 5.000000000,0 to 5.000000000,1 due to replay protection + +run ok +with t=F k=k + resolve_intent status=ABORTED + txn_remove +---- +>> at end: +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +data: "k"/4.000000000,0 -> / +data: "k"/3.000000000,0 -> /BYTES/k3 +data: "k"/0,2 -> /BYTES/k2 +data: "k"/0,1 -> /BYTES/k1 + +# Ambiguous replay protection prevents timestamp change on transactional DeleteRange. + +run ok +put k=k v=k6 ts=6,0 +---- +>> at end: +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +data: "k"/6.000000000,0 -> /BYTES/k6 +data: "k"/4.000000000,0 -> / +data: "k"/3.000000000,0 -> /BYTES/k3 +data: "k"/0,2 -> /BYTES/k2 +data: "k"/0,1 -> /BYTES/k1 + +run stats ok +with t=G k=k + txn_begin ts=12,1 + del_range k=a end=z returnKeys +---- +>> del_range k=a end=z returnKeys t=G k=k +del_range: "a"-"z" -> deleted 2 key(s) +del_range: returned "a" +del_range: returned "k" +stats: key_bytes=+24 val_count=+2 val_bytes=+106 live_count=-2 live_bytes=-58 gc_bytes_age=+16544 intent_count=+2 intent_bytes=+24 separated_intent_count=+2 intent_age=+176 +>> at end: +txn: "G" meta={id=00000000 key="k" iso=Serializable pri=0.00000000 epo=0 ts=12.000000000,1 min=0,0 seq=0} lock=true stat=PENDING rts=12.000000000,1 wto=false gul=0,0 +meta: "a"/0,0 -> txn={id=00000000 key="k" iso=Serializable pri=0.00000000 epo=0 ts=12.000000000,1 min=0,0 seq=0} ts=12.000000000,1 del=true klen=12 vlen=0 mergeTs= txnDidNotUpdateMeta=true +data: "a"/12.000000000,1 -> / +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +meta: "k"/0,0 -> txn={id=00000000 key="k" iso=Serializable pri=0.00000000 epo=0 ts=12.000000000,1 min=0,0 seq=0} ts=12.000000000,1 del=true klen=12 vlen=0 mergeTs= txnDidNotUpdateMeta=true +data: "k"/12.000000000,1 -> / +data: "k"/6.000000000,0 -> /BYTES/k6 +data: "k"/4.000000000,0 -> / +data: "k"/3.000000000,0 -> /BYTES/k3 +data: "k"/0,2 -> /BYTES/k2 +data: "k"/0,1 -> /BYTES/k1 +stats: key_count=2 key_bytes=100 val_count=8 val_bytes=157 gc_bytes_age=23263 intent_count=2 intent_bytes=24 separated_intent_count=2 intent_age=176 + +run stats ok +with t=G k=k + # First attempt a standard idempotent replay at a higher timestamp. + txn_advance ts=12,2 + del_range k=a end=z returnKeys +---- +>> del_range k=a end=z returnKeys t=G k=k +del_range: "a"-"z" -> deleted 2 key(s) +del_range: returned "a" +del_range: returned "k" +stats: no change +>> at end: +txn: "G" meta={id=00000000 key="k" iso=Serializable pri=0.00000000 epo=0 ts=12.000000000,2 min=0,0 seq=0} lock=true stat=PENDING rts=12.000000000,1 wto=false gul=0,0 +meta: "a"/0,0 -> txn={id=00000000 key="k" iso=Serializable pri=0.00000000 epo=0 ts=12.000000000,1 min=0,0 seq=0} ts=12.000000000,1 del=true klen=12 vlen=0 mergeTs= txnDidNotUpdateMeta=true +data: "a"/12.000000000,1 -> / +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +meta: "k"/0,0 -> txn={id=00000000 key="k" iso=Serializable pri=0.00000000 epo=0 ts=12.000000000,1 min=0,0 seq=0} ts=12.000000000,1 del=true klen=12 vlen=0 mergeTs= txnDidNotUpdateMeta=true +data: "k"/12.000000000,1 -> / +data: "k"/6.000000000,0 -> /BYTES/k6 +data: "k"/4.000000000,0 -> / +data: "k"/3.000000000,0 -> /BYTES/k3 +data: "k"/0,2 -> /BYTES/k2 +data: "k"/0,1 -> /BYTES/k1 +stats: key_count=2 key_bytes=100 val_count=8 val_bytes=157 gc_bytes_age=23263 intent_count=2 intent_bytes=24 separated_intent_count=2 intent_age=176 + +run stats error +with t=G k=k + txn_advance ts=12,3 + # However with ambiguous replay protection, a timestamp change should error. + del_range k=a end=z ambiguousReplay returnKeys +---- +>> del_range k=a end=z ambiguousReplay returnKeys t=G k=k +stats: no change +>> at end: +txn: "G" meta={id=00000000 key="k" iso=Serializable pri=0.00000000 epo=0 ts=12.000000000,3 min=0,0 seq=0} lock=true stat=PENDING rts=12.000000000,1 wto=false gul=0,0 +meta: "a"/0,0 -> txn={id=00000000 key="k" iso=Serializable pri=0.00000000 epo=0 ts=12.000000000,1 min=0,0 seq=0} ts=12.000000000,1 del=true klen=12 vlen=0 mergeTs= txnDidNotUpdateMeta=true +data: "a"/12.000000000,1 -> / +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +meta: "k"/0,0 -> txn={id=00000000 key="k" iso=Serializable pri=0.00000000 epo=0 ts=12.000000000,1 min=0,0 seq=0} ts=12.000000000,1 del=true klen=12 vlen=0 mergeTs= txnDidNotUpdateMeta=true +data: "k"/12.000000000,1 -> / +data: "k"/6.000000000,0 -> /BYTES/k6 +data: "k"/4.000000000,0 -> / +data: "k"/3.000000000,0 -> /BYTES/k3 +data: "k"/0,2 -> /BYTES/k2 +data: "k"/0,1 -> /BYTES/k1 +stats: key_count=2 key_bytes=100 val_count=8 val_bytes=157 gc_bytes_age=23263 intent_count=2 intent_bytes=24 separated_intent_count=2 intent_age=176 +error: (*withstack.withStack:) transaction 00000000-0000-0000-0000-000000000007 with sequence 0 prevented from changing write timestamp from 12.000000000,1 to 12.000000000,3 due to replay protection + +run ok +with t=G + # Commit at the last "correct" timestamp. + txn_advance ts=12,2 + resolve_intent k=a + resolve_intent k=k + txn_remove +---- +>> at end: +data: "a"/12.000000000,2 -> {localTs=12.000000000,1}/ +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +data: "k"/12.000000000,2 -> {localTs=12.000000000,1}/ +data: "k"/6.000000000,0 -> /BYTES/k6 +data: "k"/4.000000000,0 -> / +data: "k"/3.000000000,0 -> /BYTES/k3 +data: "k"/0,2 -> /BYTES/k2 +data: "k"/0,1 -> /BYTES/k1