From 44af22572838f9b9c9533dc254783b42720c297f Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 19 Dec 2023 16:56:43 -0500 Subject: [PATCH] kv: remove unnecessary use of Timestamp.WithSynthetic in tests Informs #101938. These tests were using the Timestamp.WithSynthetic method even when doing so was not necessary. Release note: None --- .../kvclient/kvcoord/txn_coord_sender_test.go | 8 +- .../batcheval/cmd_end_transaction_test.go | 9 +- pkg/kv/kvserver/client_lease_test.go | 3 +- pkg/kv/kvserver/client_merge_test.go | 12 +- pkg/kv/kvserver/client_replica_test.go | 16 +- pkg/kv/kvserver/replica_test.go | 317 +++++++++--------- .../kvserver/txn_recovery_integration_test.go | 5 +- 7 files changed, 171 insertions(+), 199 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go index 4022059eef58..fff099a1466e 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go @@ -733,8 +733,8 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) { func testTxnCoordSenderTxnUpdatedOnError(t *testing.T, isoLevel isolation.Level) { ctx := context.Background() origTS := makeTS(123, 0) - plus10 := origTS.Add(10, 10).WithSynthetic(false) - plus20 := origTS.Add(20, 0).WithSynthetic(false) + plus10 := origTS.Add(10, 10) + plus20 := origTS.Add(20, 0) testCases := []struct { // The test's name. name string @@ -1526,9 +1526,7 @@ func TestTxnCommitWait(t *testing.T) { // transaction is read-write, it will need to bump its write // timestamp above the other value. if futureTime { - ts := txn.TestingCloneTxn().WriteTimestamp. - Add(futureOffset.Nanoseconds(), 0). - WithSynthetic(true) + ts := txn.TestingCloneTxn().WriteTimestamp.Add(futureOffset.Nanoseconds(), 0) h := kvpb.Header{Timestamp: ts} put := kvpb.NewPut(key, roachpb.Value{}) if _, pErr := kv.SendWrappedWith(ctx, s.DB.NonTransactionalSender(), h, put); pErr != nil { diff --git a/pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go b/pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go index 7b3d360749a8..bba1b2d9b881 100644 --- a/pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go @@ -1301,13 +1301,8 @@ func TestCommitWaitBeforeIntentResolutionIfCommitTrigger(t *testing.T) { expError: false, }, { - name: "past-syn", - commitTS: func(now hlc.Timestamp) hlc.Timestamp { return now.WithSynthetic(true) }, - expError: false, - }, - { - name: "future-syn", - commitTS: func(now hlc.Timestamp) hlc.Timestamp { return now.Add(100, 0).WithSynthetic(true) }, + name: "future", + commitTS: func(now hlc.Timestamp) hlc.Timestamp { return now.Add(100, 0) }, // If the EndTxn carried a commit trigger and its transaction will need // to commit-wait because the transaction has a future-time commit // timestamp, evaluating the request should return an error. diff --git a/pkg/kv/kvserver/client_lease_test.go b/pkg/kv/kvserver/client_lease_test.go index 4ecc5fad509e..3023bb90ddd7 100644 --- a/pkg/kv/kvserver/client_lease_test.go +++ b/pkg/kv/kvserver/client_lease_test.go @@ -621,7 +621,7 @@ func TestStoreLeaseTransferTimestampCacheRead(t *testing.T) { // Determine when to read. readTS := tc.Servers[0].Clock().Now() if futureRead { - readTS = readTS.Add(500*time.Millisecond.Nanoseconds(), 0).WithSynthetic(true) + readTS = readTS.Add(500*time.Millisecond.Nanoseconds(), 0) } // Read the key at readTS. @@ -651,7 +651,6 @@ func TestStoreLeaseTransferTimestampCacheRead(t *testing.T) { require.Nil(t, pErr) require.NotEqual(t, readTS, br.Timestamp) require.True(t, readTS.Less(br.Timestamp)) - require.Equal(t, readTS.Synthetic, br.Timestamp.Synthetic) }) } diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index 0e109b678ab9..5f4f1c9228e8 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -416,7 +416,7 @@ func mergeWithData(t *testing.T, retries int64) { // // - futureRead: configures whether or not the reads performed on the RHS range // before the merge is initiated are performed in the future of present -// time using synthetic timestamps. +// time. func TestStoreRangeMergeTimestampCache(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -565,7 +565,7 @@ func mergeCheckingTimestampCaches( readTS := tc.Servers[0].Clock().Now() if futureRead { - readTS = readTS.Add(500*time.Millisecond.Nanoseconds(), 0).WithSynthetic(true) + readTS = readTS.Add(500*time.Millisecond.Nanoseconds(), 0) } // Simulate a read on the RHS from a node with a newer clock. @@ -1178,14 +1178,8 @@ func TestStoreRangeMergeTxnRefresh(t *testing.T) { // Detect the range merge's deletion of the local range descriptor // and use it as an opportunity to bump the merge transaction's // write timestamp. This will necessitate a refresh. - // - // Also mark as synthetic, while we're here, to simulate the - // behavior of a range merge across two ranges with the - // LEAD_FOR_GLOBAL_READS closed timestamp policy. if !v.Value.IsPresent() && bytes.HasSuffix(v.Key, keys.LocalRangeDescriptorSuffix) { - br.Txn.WriteTimestamp = br.Txn.WriteTimestamp. - Add(100*time.Millisecond.Nanoseconds(), 0). - WithSynthetic(true) + br.Txn.WriteTimestamp = br.Txn.WriteTimestamp.Add(100*time.Millisecond.Nanoseconds(), 0) } case *kvpb.RefreshRequest: if bytes.HasSuffix(v.Key, keys.LocalRangeDescriptorSuffix) { diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index 6e166540a0a5..3afd3ffb4dce 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -120,10 +120,10 @@ func TestReplicaClockUpdates(t *testing.T) { clock.Pause() } // Pick a timestamp in the future of all nodes by less than the - // MaxOffset. Set the synthetic flag according to the test case. - reqTS := clocks[0].Now().Add(clocks[0].MaxOffset().Nanoseconds()/2, 0).WithSynthetic(synthetic) + // MaxOffset. + reqTS := clocks[0].Now().Add(clocks[0].MaxOffset().Nanoseconds()/2, 0) h := kvpb.Header{Timestamp: reqTS} - if !reqTS.Synthetic { + if !synthetic { h.Now = hlc.ClockTimestamp(reqTS) } @@ -1922,17 +1922,14 @@ func TestLeaseExpirationBelowFutureTimeRequest(t *testing.T) { now := l.tc.Servers[1].Clock().Now() // Construct a future-time request timestamp past the current lease's - // expiration. Remember to set the synthetic bit so that it is not used - // to update the store's clock. See Replica.checkRequestTimeRLocked for - // the exact determination of whether a request timestamp is too far in - // the future or not. + // expiration. See Replica.checkRequestTimeRLocked for the determination + // of whether a request timestamp is too far in the future or not. leaseRenewal := l.tc.Servers[1].RaftConfig().RangeLeaseRenewalDuration() leaseRenewalMinusStasis := leaseRenewal - l.tc.Servers[1].Clock().MaxOffset() reqTime := now.Add(leaseRenewalMinusStasis.Nanoseconds()-10, 0) if tooFarInFuture { reqTime = reqTime.Add(20, 0) } - reqTime = reqTime.WithSynthetic(true) // Issue a get with the request timestamp. args := getArgs(l.leftKey) @@ -2834,7 +2831,7 @@ func TestLeaseTransferInSnapshotUpdatesTimestampCache(t *testing.T) { // Determine when to read. readTS := tc.Servers[0].Clock().Now() if futureRead { - readTS = readTS.Add(500*time.Millisecond.Nanoseconds(), 0).WithSynthetic(true) + readTS = readTS.Add(500*time.Millisecond.Nanoseconds(), 0) } // Read the key at readTS. @@ -2911,7 +2908,6 @@ func TestLeaseTransferInSnapshotUpdatesTimestampCache(t *testing.T) { require.Nil(t, pErr) require.NotEqual(t, readTS, br.Timestamp) require.True(t, readTS.Less(br.Timestamp)) - require.Equal(t, readTS.Synthetic, br.Timestamp.Synthetic) }) } diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 5d1dc2139bf8..853d77ac1a9a 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -2189,54 +2189,52 @@ func TestRequestLeaseLimit(t *testing.T) { } // TestReplicaUpdateTSCache verifies that reads and ranged writes update the -// timestamp cache. The test performs the operations with and without the use -// of synthetic timestamps. +// timestamp cache. func TestReplicaUpdateTSCache(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - testutils.RunTrueAndFalse(t, "synthetic", func(t *testing.T, synthetic bool) { - ctx := context.Background() - tc := testContext{} - stopper := stop.NewStopper() - defer stopper.Stop(ctx) - tc.Start(ctx, t, stopper) - startNanos := tc.Clock().Now().WallTime + ctx := context.Background() + tc := testContext{} + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + tc.Start(ctx, t, stopper) - // Set clock to time 1s and do the read. - tc.manualClock.MustAdvanceTo(timeutil.Unix(1, 0)) - ts1 := tc.Clock().Now().WithSynthetic(synthetic) - gArgs := getArgs([]byte("a")) + startNanos := tc.Clock().Now().WallTime - if _, pErr := tc.SendWrappedWith(kvpb.Header{Timestamp: ts1}, &gArgs); pErr != nil { - t.Error(pErr) - } - // Set clock to time 2s for write. - tc.manualClock.MustAdvanceTo(timeutil.Unix(2, 0)) - ts2 := tc.Clock().Now().WithSynthetic(synthetic) - key := roachpb.Key([]byte("b")) - drArgs := kvpb.NewDeleteRange(key, key.Next(), false /* returnKeys */) + // Set clock to time 1s and do the read. + tc.manualClock.MustAdvanceTo(timeutil.Unix(1, 0)) + ts1 := tc.Clock().Now() + gArgs := getArgs([]byte("a")) - if _, pErr := tc.SendWrappedWith(kvpb.Header{Timestamp: ts2}, drArgs); pErr != nil { - t.Error(pErr) - } - // Verify the timestamp cache has rTS=1s and wTS=0s for "a". - noID := uuid.UUID{} - rTS, rTxnID := tc.repl.store.tsCache.GetMax(roachpb.Key("a"), nil) - if rTS != ts1 || rTxnID != noID { - t.Errorf("expected rTS=%s but got %s; rTxnID=%s", ts1, rTS, rTxnID) - } - // Verify the timestamp cache has rTS=2s for "b". - rTS, rTxnID = tc.repl.store.tsCache.GetMax(roachpb.Key("b"), nil) - if rTS != ts2 || rTxnID != noID { - t.Errorf("expected rTS=%s but got %s; rTxnID=%s", ts2, rTS, rTxnID) - } - // Verify another key ("c") has 0sec in timestamp cache. - rTS, rTxnID = tc.repl.store.tsCache.GetMax(roachpb.Key("c"), nil) - if rTS.WallTime != startNanos || rTxnID != noID { - t.Errorf("expected rTS=0s but got %s; rTxnID=%s", rTS, rTxnID) - } - }) + if _, pErr := tc.SendWrappedWith(kvpb.Header{Timestamp: ts1}, &gArgs); pErr != nil { + t.Error(pErr) + } + // Set clock to time 2s for write. + tc.manualClock.MustAdvanceTo(timeutil.Unix(2, 0)) + ts2 := tc.Clock().Now() + key := roachpb.Key("b") + drArgs := kvpb.NewDeleteRange(key, key.Next(), false /* returnKeys */) + + if _, pErr := tc.SendWrappedWith(kvpb.Header{Timestamp: ts2}, drArgs); pErr != nil { + t.Error(pErr) + } + // Verify the timestamp cache has rTS=1s and wTS=0s for "a". + noID := uuid.UUID{} + rTS, rTxnID := tc.repl.store.tsCache.GetMax(roachpb.Key("a"), nil) + if rTS != ts1 || rTxnID != noID { + t.Errorf("expected rTS=%s but got %s; rTxnID=%s", ts1, rTS, rTxnID) + } + // Verify the timestamp cache has rTS=2s for "b". + rTS, rTxnID = tc.repl.store.tsCache.GetMax(roachpb.Key("b"), nil) + if rTS != ts2 || rTxnID != noID { + t.Errorf("expected rTS=%s but got %s; rTxnID=%s", ts2, rTS, rTxnID) + } + // Verify another key ("c") has 0sec in timestamp cache. + rTS, rTxnID = tc.repl.store.tsCache.GetMax(roachpb.Key("c"), nil) + if rTS.WallTime != startNanos || rTxnID != noID { + t.Errorf("expected rTS=0s but got %s; rTxnID=%s", rTS, rTxnID) + } } // TestReplicaLatching verifies that reads/writes must wait for @@ -3057,38 +3055,36 @@ func TestReplicaLatchingOptimisticEvaluationSkipLocked(t *testing.T) { } // TestReplicaUseTSCache verifies that write timestamps are upgraded based on -// the timestamp cache. The test performs the operations with and without the -// use of synthetic timestamps. +// the timestamp cache. func TestReplicaUseTSCache(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - testutils.RunTrueAndFalse(t, "synthetic", func(t *testing.T, synthetic bool) { - ctx := context.Background() - tc := testContext{} - stopper := stop.NewStopper() - defer stopper.Stop(ctx) - tc.Start(ctx, t, stopper) - startTS := tc.Clock().Now() - // Set clock to time 1s and do the read. - tc.manualClock.Advance(1) - readTS := tc.Clock().Now().WithSynthetic(synthetic) - args := getArgs([]byte("a")) + ctx := context.Background() + tc := testContext{} + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + tc.Start(ctx, t, stopper) + startTS := tc.Clock().Now() + + // Set clock to time 1s and do the read. + tc.manualClock.Advance(1) + readTS := tc.Clock().Now() + args := getArgs([]byte("a")) - _, pErr := tc.SendWrappedWith(kvpb.Header{Timestamp: readTS}, &args) - require.Nil(t, pErr) + _, pErr := tc.SendWrappedWith(kvpb.Header{Timestamp: readTS}, &args) + require.Nil(t, pErr) - // Perform a conflicting write. Should get bumped. - pArgs := putArgs([]byte("a"), []byte("value")) - ba := &kvpb.BatchRequest{} - ba.Add(&pArgs) - ba.Timestamp = startTS + // Perform a conflicting write. Should get bumped. + pArgs := putArgs([]byte("a"), []byte("value")) + ba := &kvpb.BatchRequest{} + ba.Add(&pArgs) + ba.Timestamp = startTS - br, pErr := tc.Sender().Send(ctx, ba) - require.Nil(t, pErr) - require.NotEqual(t, startTS, br.Timestamp) - require.Equal(t, readTS.Next(), br.Timestamp) - }) + br, pErr := tc.Sender().Send(ctx, ba) + require.Nil(t, pErr) + require.NotEqual(t, startTS, br.Timestamp) + require.Equal(t, readTS.Next(), br.Timestamp) } // TestReplicaTSCacheForwardsIntentTS verifies that the timestamp cache affects @@ -3096,73 +3092,69 @@ func TestReplicaUseTSCache(t *testing.T) { // write is forwarded by the timestamp cache due to a more recent read, the // written intents must be left at the forwarded timestamp. See the comment on // the enginepb.TxnMeta.Timestamp field for rationale. -// -// The test performs the operations with and without the use of synthetic -// timestamps. func TestReplicaTSCacheForwardsIntentTS(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - testutils.RunTrueAndFalse(t, "synthetic", func(t *testing.T, synthetic bool) { - ctx := context.Background() - tc := testContext{} - stopper := stop.NewStopper() - defer stopper.Stop(ctx) - sc := TestStoreConfig(nil) - sc.TestingKnobs.DisableCanAckBeforeApplication = true - tc.StartWithStoreConfig(ctx, t, stopper, sc) - tsOld := tc.Clock().Now() - tsNew := tsOld.Add(time.Millisecond.Nanoseconds(), 0).WithSynthetic(synthetic) - - // Read at tNew to populate the timestamp cache. - // DeleteRange at tNew to populate the timestamp cache. - txnNew := newTransaction("new", roachpb.Key("txn-anchor"), roachpb.NormalUserPriority, tc.Clock()) - txnNew.ReadTimestamp = tsNew - txnNew.WriteTimestamp = tsNew - keyGet := roachpb.Key("get") - keyDeleteRange := roachpb.Key("delete-range") - gArgs := getArgs(keyGet) - drArgs := deleteRangeArgs(keyDeleteRange, keyDeleteRange.Next()) - assignSeqNumsForReqs(txnNew, &gArgs, &drArgs) - ba := &kvpb.BatchRequest{} - ba.Header.Txn = txnNew - ba.Add(&gArgs, &drArgs) - if _, pErr := tc.Sender().Send(ctx, ba); pErr != nil { - t.Fatal(pErr) - } + ctx := context.Background() + tc := testContext{} + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + sc := TestStoreConfig(nil) + sc.TestingKnobs.DisableCanAckBeforeApplication = true + tc.StartWithStoreConfig(ctx, t, stopper, sc) - // Write under the timestamp cache within the transaction, and verify that - // the intents are written above the timestamp cache. - txnOld := newTransaction("old", roachpb.Key("txn-anchor"), roachpb.NormalUserPriority, tc.Clock()) - txnOld.ReadTimestamp = tsOld - txnOld.WriteTimestamp = tsOld - for _, key := range []roachpb.Key{keyGet, keyDeleteRange} { - t.Run(string(key), func(t *testing.T) { - pArgs := putArgs(key, []byte("foo")) - assignSeqNumsForReqs(txnOld, &pArgs) - if _, pErr := tc.SendWrappedWith(kvpb.Header{Txn: txnOld}, &pArgs); pErr != nil { - t.Fatal(pErr) - } - iter, err := tc.engine.NewMVCCIterator(context.Background(), storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{Prefix: true}) - if err != nil { - t.Fatal(err) - } - defer iter.Close() - mvccKey := storage.MakeMVCCMetadataKey(key) - iter.SeekGE(mvccKey) - var keyMeta enginepb.MVCCMetadata - if ok, err := iter.Valid(); !ok || !iter.UnsafeKey().Equal(mvccKey) { - t.Fatalf("missing mvcc metadata for %q: %+v", mvccKey, err) - } else if err := iter.ValueProto(&keyMeta); err != nil { - t.Fatalf("failed to unmarshal metadata for %q", mvccKey) - } - if tsNext := tsNew.Next(); keyMeta.Timestamp.ToTimestamp() != tsNext { - t.Errorf("timestamp not forwarded for %q intent: expected %s but got %s", - key, tsNext, keyMeta.Timestamp) - } - }) - } - }) + tsOld := tc.Clock().Now() + tsNew := tsOld.Add(time.Millisecond.Nanoseconds(), 0) + + // Read at tNew to populate the timestamp cache. + // DeleteRange at tNew to populate the timestamp cache. + txnNew := newTransaction("new", roachpb.Key("txn-anchor"), roachpb.NormalUserPriority, tc.Clock()) + txnNew.ReadTimestamp = tsNew + txnNew.WriteTimestamp = tsNew + keyGet := roachpb.Key("get") + keyDeleteRange := roachpb.Key("delete-range") + gArgs := getArgs(keyGet) + drArgs := deleteRangeArgs(keyDeleteRange, keyDeleteRange.Next()) + assignSeqNumsForReqs(txnNew, &gArgs, &drArgs) + ba := &kvpb.BatchRequest{} + ba.Header.Txn = txnNew + ba.Add(&gArgs, &drArgs) + if _, pErr := tc.Sender().Send(ctx, ba); pErr != nil { + t.Fatal(pErr) + } + + // Write under the timestamp cache within the transaction, and verify that + // the intents are written above the timestamp cache. + txnOld := newTransaction("old", roachpb.Key("txn-anchor"), roachpb.NormalUserPriority, tc.Clock()) + txnOld.ReadTimestamp = tsOld + txnOld.WriteTimestamp = tsOld + for _, key := range []roachpb.Key{keyGet, keyDeleteRange} { + t.Run(string(key), func(t *testing.T) { + pArgs := putArgs(key, []byte("foo")) + assignSeqNumsForReqs(txnOld, &pArgs) + if _, pErr := tc.SendWrappedWith(kvpb.Header{Txn: txnOld}, &pArgs); pErr != nil { + t.Fatal(pErr) + } + iter, err := tc.engine.NewMVCCIterator(context.Background(), storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{Prefix: true}) + if err != nil { + t.Fatal(err) + } + defer iter.Close() + mvccKey := storage.MakeMVCCMetadataKey(key) + iter.SeekGE(mvccKey) + var keyMeta enginepb.MVCCMetadata + if ok, err := iter.Valid(); !ok || !iter.UnsafeKey().Equal(mvccKey) { + t.Fatalf("missing mvcc metadata for %q: %+v", mvccKey, err) + } else if err := iter.ValueProto(&keyMeta); err != nil { + t.Fatalf("failed to unmarshal metadata for %q", mvccKey) + } + if tsNext := tsNew.Next(); keyMeta.Timestamp.ToTimestamp() != tsNext { + t.Errorf("timestamp not forwarded for %q intent: expected %s but got %s", + key, tsNext, keyMeta.Timestamp) + } + }) + } } func TestConditionalPutUpdatesTSCacheOnError(t *testing.T) { @@ -6067,45 +6059,44 @@ func TestPushTxnPriorities(t *testing.T) { func TestPushTxnPushTimestamp(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - testutils.RunTrueAndFalse(t, "synthetic", func(t *testing.T, synthetic bool) { - ctx := context.Background() - tc := testContext{} - stopper := stop.NewStopper() - defer stopper.Stop(ctx) - tc.Start(ctx, t, stopper) - pusher := newTransaction("test", roachpb.Key("a"), 1, tc.Clock()) - pushee := newTransaction("test", roachpb.Key("b"), 1, tc.Clock()) - pusher.Priority = enginepb.MaxTxnPriority - pushee.Priority = enginepb.MinTxnPriority // pusher will win - now := tc.Clock().Now() - pusher.WriteTimestamp = now.Add(50, 25).WithSynthetic(synthetic) - pushee.WriteTimestamp = now.Add(5, 1) + ctx := context.Background() + tc := testContext{} + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + tc.Start(ctx, t, stopper) - key := roachpb.Key("a") - put := putArgs(key, key) - assignSeqNumsForReqs(pushee, &put) - if _, pErr := kv.SendWrappedWith(ctx, tc.Sender(), kvpb.Header{Txn: pushee}, &put); pErr != nil { - t.Fatal(pErr) - } + pusher := newTransaction("test", roachpb.Key("a"), 1, tc.Clock()) + pushee := newTransaction("test", roachpb.Key("b"), 1, tc.Clock()) + pusher.Priority = enginepb.MaxTxnPriority + pushee.Priority = enginepb.MinTxnPriority // pusher will win + now := tc.Clock().Now() + pusher.WriteTimestamp = now.Add(50, 25) + pushee.WriteTimestamp = now.Add(5, 1) - // Now, push the transaction using a PUSH_TIMESTAMP push request. - args := pushTxnArgs(pusher, pushee, kvpb.PUSH_TIMESTAMP) + key := roachpb.Key("a") + put := putArgs(key, key) + assignSeqNumsForReqs(pushee, &put) + if _, pErr := kv.SendWrappedWith(ctx, tc.Sender(), kvpb.Header{Txn: pushee}, &put); pErr != nil { + t.Fatal(pErr) + } - resp, pErr := tc.SendWrappedWith(kvpb.Header{Timestamp: args.PushTo}, &args) - if pErr != nil { - t.Fatalf("unexpected error on push: %s", pErr) - } - expTS := pusher.WriteTimestamp - expTS.Logical++ - reply := resp.(*kvpb.PushTxnResponse) - if reply.PusheeTxn.WriteTimestamp != expTS { - t.Errorf("expected timestamp to be pushed to %+v; got %+v", expTS, reply.PusheeTxn.WriteTimestamp) - } - if reply.PusheeTxn.Status != roachpb.PENDING { - t.Errorf("expected pushed txn to have status PENDING; got %s", reply.PusheeTxn.Status) - } - }) + // Now, push the transaction using a PUSH_TIMESTAMP push request. + args := pushTxnArgs(pusher, pushee, kvpb.PUSH_TIMESTAMP) + + resp, pErr := tc.SendWrappedWith(kvpb.Header{Timestamp: args.PushTo}, &args) + if pErr != nil { + t.Fatalf("unexpected error on push: %s", pErr) + } + expTS := pusher.WriteTimestamp + expTS.Logical++ + reply := resp.(*kvpb.PushTxnResponse) + if reply.PusheeTxn.WriteTimestamp != expTS { + t.Errorf("expected timestamp to be pushed to %+v; got %+v", expTS, reply.PusheeTxn.WriteTimestamp) + } + if reply.PusheeTxn.Status != roachpb.PENDING { + t.Errorf("expected pushed txn to have status PENDING; got %s", reply.PusheeTxn.Status) + } } // TestPushTxnPushTimestampAlreadyPushed verifies that pushing diff --git a/pkg/kv/kvserver/txn_recovery_integration_test.go b/pkg/kv/kvserver/txn_recovery_integration_test.go index 9e0b9bc1ab44..7e2efdef12e3 100644 --- a/pkg/kv/kvserver/txn_recovery_integration_test.go +++ b/pkg/kv/kvserver/txn_recovery_integration_test.go @@ -47,8 +47,7 @@ func TestTxnRecoveryFromStaging(t *testing.T) { // implicit-commit condition. implicitCommit bool // futureWrites dictates whether the transaction has been writing at the - // present time or whether it has been writing into the future with a - // synthetic timestamp. + // present time or whether it has been writing into the future. futureWrites bool }{ { @@ -85,7 +84,7 @@ func TestTxnRecoveryFromStaging(t *testing.T) { // where it has refreshed up to its write timestamp in preparation // to commit. if tc.futureWrites { - txn.WriteTimestamp = txn.ReadTimestamp.Add(50, 0).WithSynthetic(true) + txn.WriteTimestamp = txn.ReadTimestamp.Add(50, 0) txn.ReadTimestamp = txn.WriteTimestamp // simulate refresh }