From 1045f2d593bd7cb53453ea8d23ae2be6ddb8ac07 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Tue, 23 Jan 2024 19:55:56 +0000 Subject: [PATCH] rangefeed: assert intent commits above resolved timestamp Epic: none Release note: None --- pkg/kv/kvserver/rangefeed/processor.go | 2 +- .../kvserver/rangefeed/resolved_timestamp.go | 40 +++++-- .../rangefeed/resolved_timestamp_test.go | 101 ++++++++++-------- 3 files changed, 86 insertions(+), 57 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index 8cdeff5c4b47..b817df38c985 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -790,7 +790,7 @@ func (p *Processor) consumeLogicalOps( // Determine whether the operation caused the resolved timestamp to // move forward. If so, publish a RangeFeedCheckpoint notification. - if p.rts.ConsumeLogicalOp(op) { + if p.rts.ConsumeLogicalOp(ctx, op) { p.publishCheckpoint(ctx) } } diff --git a/pkg/kv/kvserver/rangefeed/resolved_timestamp.go b/pkg/kv/kvserver/rangefeed/resolved_timestamp.go index f1f9e8d57a8f..c6793040e9b0 100644 --- a/pkg/kv/kvserver/rangefeed/resolved_timestamp.go +++ b/pkg/kv/kvserver/rangefeed/resolved_timestamp.go @@ -13,11 +13,13 @@ package rangefeed import ( "bytes" "container/heap" + "context" "fmt" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" ) @@ -128,32 +130,40 @@ func (rts *resolvedTimestamp) ForwardClosedTS(newClosedTS hlc.Timestamp) bool { // operation within its range of tracked keys. This allows the structure to // update its internal intent tracking to reflect the change. The method returns // whether this caused the resolved timestamp to move forward. -func (rts *resolvedTimestamp) ConsumeLogicalOp(op enginepb.MVCCLogicalOp) bool { - if rts.consumeLogicalOp(op) { +func (rts *resolvedTimestamp) ConsumeLogicalOp( + ctx context.Context, op enginepb.MVCCLogicalOp, +) bool { + if rts.consumeLogicalOp(ctx, op) { return rts.recompute() } rts.assertNoChange() return false } -func (rts *resolvedTimestamp) consumeLogicalOp(op enginepb.MVCCLogicalOp) bool { +func (rts *resolvedTimestamp) consumeLogicalOp( + ctx context.Context, op enginepb.MVCCLogicalOp, +) bool { switch t := op.GetValue().(type) { case *enginepb.MVCCWriteValueOp: - rts.assertOpAboveRTS(op, t.Timestamp) + rts.assertOpAboveRTS(ctx, op, t.Timestamp, true /* fatal */) return false case *enginepb.MVCCDeleteRangeOp: - rts.assertOpAboveRTS(op, t.Timestamp) + rts.assertOpAboveRTS(ctx, op, t.Timestamp, true /* fatal */) return false case *enginepb.MVCCWriteIntentOp: - rts.assertOpAboveRTS(op, t.Timestamp) + rts.assertOpAboveRTS(ctx, op, t.Timestamp, true /* fatal */) return rts.intentQ.IncRef(t.TxnID, t.TxnKey, t.TxnMinTimestamp, t.Timestamp) case *enginepb.MVCCUpdateIntentOp: return rts.intentQ.UpdateTS(t.TxnID, t.Timestamp) case *enginepb.MVCCCommitIntentOp: + // This assertion can be violated in mixed-version clusters prior + // to 24.1, so make it non-fatal for now. See: + // https://github.com/cockroachdb/cockroach/issues/104309 + rts.assertOpAboveRTS(ctx, op, t.Timestamp, false /* fatal */) return rts.intentQ.DecrRef(t.TxnID, t.Timestamp) case *enginepb.MVCCAbortIntentOp: @@ -264,10 +274,22 @@ func (rts *resolvedTimestamp) assertNoChange() { // assertOpAboveTimestamp asserts that this operation is at a larger timestamp // than the current resolved timestamp. A violation of this assertion would // indicate a failure of the closed timestamp mechanism. -func (rts *resolvedTimestamp) assertOpAboveRTS(op enginepb.MVCCLogicalOp, opTS hlc.Timestamp) { +func (rts *resolvedTimestamp) assertOpAboveRTS( + ctx context.Context, op enginepb.MVCCLogicalOp, opTS hlc.Timestamp, fatal bool, +) { if opTS.LessEq(rts.resolvedTS) { - panic(fmt.Sprintf("resolved timestamp %s equal to or above timestamp of operation %v", - rts.resolvedTS, op)) + // NB: MVCCLogicalOp.String() is only implemented for pointer receiver. + // We shadow the variable to avoid it escaping to the heap. + op := op + err := errors.AssertionFailedf( + "resolved timestamp %s equal to or above timestamp of operation %v", rts.resolvedTS, &op) + if fatal { + // TODO(erikgrinaker): use log.Fatalf. Panic for now, since tests expect + // it and to minimize code churn for backports. + panic(err) + } else { + log.Errorf(ctx, "%v", err) + } } } diff --git a/pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go b/pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go index ff5f9795131d..7496c7475bd9 100644 --- a/pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go +++ b/pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go @@ -11,6 +11,7 @@ package rangefeed import ( + "context" "testing" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -176,6 +177,7 @@ func TestUnresolvedIntentQueue(t *testing.T) { func TestResolvedTimestamp(t *testing.T) { defer leaktest.AfterTest(t)() + ctx := context.Background() rts := makeResolvedTimestamp() rts.Init() @@ -184,13 +186,13 @@ func TestResolvedTimestamp(t *testing.T) { // Add an intent. No closed timestamp so no resolved timestamp. txn1 := uuid.MakeV4() - fwd := rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 10})) + fwd := rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 10})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Add another intent. No closed timestamp so no resolved timestamp. txn2 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn2, hlc.Timestamp{WallTime: 12})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn2, hlc.Timestamp{WallTime: 12})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) @@ -201,16 +203,16 @@ func TestResolvedTimestamp(t *testing.T) { // Write intent at earlier timestamp. Assertion failure. require.Panics(t, func() { - rts.ConsumeLogicalOp(writeIntentOp(uuid.MakeV4(), hlc.Timestamp{WallTime: 3})) + rts.ConsumeLogicalOp(ctx, writeIntentOp(uuid.MakeV4(), hlc.Timestamp{WallTime: 3})) }) // Write value at earlier timestamp. Assertion failure. require.Panics(t, func() { - rts.ConsumeLogicalOp(writeValueOp(hlc.Timestamp{WallTime: 4})) + rts.ConsumeLogicalOp(ctx, writeValueOp(hlc.Timestamp{WallTime: 4})) }) // Write value at later timestamp. No effect on resolved timestamp. - fwd = rts.ConsumeLogicalOp(writeValueOp(hlc.Timestamp{WallTime: 6})) + fwd = rts.ConsumeLogicalOp(ctx, writeValueOp(hlc.Timestamp{WallTime: 6})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 5}, rts.Get()) @@ -221,12 +223,12 @@ func TestResolvedTimestamp(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 9}, rts.Get()) // Update the timestamp of txn2. No effect on the resolved timestamp. - fwd = rts.ConsumeLogicalOp(updateIntentOp(txn2, hlc.Timestamp{WallTime: 18})) + fwd = rts.ConsumeLogicalOp(ctx, updateIntentOp(txn2, hlc.Timestamp{WallTime: 18})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 9}, rts.Get()) // Update the timestamp of txn1. Resolved timestamp moves forward. - fwd = rts.ConsumeLogicalOp(updateIntentOp(txn1, hlc.Timestamp{WallTime: 20})) + fwd = rts.ConsumeLogicalOp(ctx, updateIntentOp(txn1, hlc.Timestamp{WallTime: 20})) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get()) @@ -236,13 +238,13 @@ func TestResolvedTimestamp(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 17}, rts.Get()) // Write intent for earliest txn at same timestamp. No change. - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn2, hlc.Timestamp{WallTime: 18})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn2, hlc.Timestamp{WallTime: 18})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 17}, rts.Get()) // Write intent for earliest txn at later timestamp. Resolved // timestamp moves forward. - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn2, hlc.Timestamp{WallTime: 25})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn2, hlc.Timestamp{WallTime: 25})) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 18}, rts.Get()) @@ -253,47 +255,47 @@ func TestResolvedTimestamp(t *testing.T) { // First transaction aborted. Resolved timestamp moves to next earliest // intent. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortTxnOp(txn1)) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) - fwd = rts.ConsumeLogicalOp(abortIntentOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortIntentOp(txn1)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) // Third transaction at higher timestamp. No effect. txn3 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn3, hlc.Timestamp{WallTime: 30})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn3, hlc.Timestamp{WallTime: 30})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn3, hlc.Timestamp{WallTime: 31})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn3, hlc.Timestamp{WallTime: 31})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) // Third transaction aborted. No effect. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn3)) + fwd = rts.ConsumeLogicalOp(ctx, abortTxnOp(txn3)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) - fwd = rts.ConsumeLogicalOp(abortIntentOp(txn3)) + fwd = rts.ConsumeLogicalOp(ctx, abortIntentOp(txn3)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) - fwd = rts.ConsumeLogicalOp(abortIntentOp(txn3)) + fwd = rts.ConsumeLogicalOp(ctx, abortIntentOp(txn3)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) // Fourth transaction at higher timestamp. No effect. txn4 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn4, hlc.Timestamp{WallTime: 45})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn4, hlc.Timestamp{WallTime: 45})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) // Fourth transaction committed. No effect. - fwd = rts.ConsumeLogicalOp(commitIntentOp(txn4, hlc.Timestamp{WallTime: 45})) + fwd = rts.ConsumeLogicalOp(ctx, commitIntentOp(txn4, hlc.Timestamp{WallTime: 45})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) // Second transaction observes one intent being resolved at timestamp // above closed time. Resolved timestamp moves to closed timestamp. - fwd = rts.ConsumeLogicalOp(commitIntentOp(txn2, hlc.Timestamp{WallTime: 35})) + fwd = rts.ConsumeLogicalOp(ctx, commitIntentOp(txn2, hlc.Timestamp{WallTime: 35})) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 30}, rts.Get()) @@ -304,22 +306,22 @@ func TestResolvedTimestamp(t *testing.T) { // Second transaction observes another intent being resolved at timestamp // below closed time. Still one intent left. - fwd = rts.ConsumeLogicalOp(commitIntentOp(txn2, hlc.Timestamp{WallTime: 35})) + fwd = rts.ConsumeLogicalOp(ctx, commitIntentOp(txn2, hlc.Timestamp{WallTime: 35})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 34}, rts.Get()) // Second transaction observes final intent being resolved at timestamp // below closed time. Resolved timestamp moves to closed timestamp. - fwd = rts.ConsumeLogicalOp(commitIntentOp(txn2, hlc.Timestamp{WallTime: 35})) + fwd = rts.ConsumeLogicalOp(ctx, commitIntentOp(txn2, hlc.Timestamp{WallTime: 35})) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 40}, rts.Get()) // Fifth transaction at higher timestamp. No effect. txn5 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn5, hlc.Timestamp{WallTime: 45})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn5, hlc.Timestamp{WallTime: 45})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 40}, rts.Get()) - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn5, hlc.Timestamp{WallTime: 46})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn5, hlc.Timestamp{WallTime: 46})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 40}, rts.Get()) @@ -330,7 +332,7 @@ func TestResolvedTimestamp(t *testing.T) { // Fifth transaction bumps epoch and re-writes one of its intents. Resolved // timestamp moves to the new transaction timestamp. - fwd = rts.ConsumeLogicalOp(updateIntentOp(txn5, hlc.Timestamp{WallTime: 47})) + fwd = rts.ConsumeLogicalOp(ctx, updateIntentOp(txn5, hlc.Timestamp{WallTime: 47})) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 46}, rts.Get()) @@ -338,48 +340,49 @@ func TestResolvedTimestamp(t *testing.T) { // its final epoch. Resolved timestamp moves forward after observing the // first intent committing at a higher timestamp and moves to the closed // timestamp after observing the second intent aborting. - fwd = rts.ConsumeLogicalOp(commitIntentOp(txn5, hlc.Timestamp{WallTime: 49})) + fwd = rts.ConsumeLogicalOp(ctx, commitIntentOp(txn5, hlc.Timestamp{WallTime: 49})) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 48}, rts.Get()) - fwd = rts.ConsumeLogicalOp(abortIntentOp(txn5)) + fwd = rts.ConsumeLogicalOp(ctx, abortIntentOp(txn5)) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 50}, rts.Get()) } func TestResolvedTimestampNoClosedTimestamp(t *testing.T) { defer leaktest.AfterTest(t)() + ctx := context.Background() rts := makeResolvedTimestamp() rts.Init() // Add a value. No closed timestamp so no resolved timestamp. - fwd := rts.ConsumeLogicalOp(writeValueOp(hlc.Timestamp{WallTime: 1})) + fwd := rts.ConsumeLogicalOp(ctx, writeValueOp(hlc.Timestamp{WallTime: 1})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Add an intent. No closed timestamp so no resolved timestamp. txn1 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 1})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 1})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Update intent. No closed timestamp so no resolved timestamp. - fwd = rts.ConsumeLogicalOp(updateIntentOp(txn1, hlc.Timestamp{WallTime: 2})) + fwd = rts.ConsumeLogicalOp(ctx, updateIntentOp(txn1, hlc.Timestamp{WallTime: 2})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Add another intent. No closed timestamp so no resolved timestamp. txn2 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn2, hlc.Timestamp{WallTime: 3})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn2, hlc.Timestamp{WallTime: 3})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Abort the first intent. No closed timestamp so no resolved timestamp. - fwd = rts.ConsumeLogicalOp(abortIntentOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortIntentOp(txn1)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Commit the second intent. No closed timestamp so no resolved timestamp. - fwd = rts.ConsumeLogicalOp(commitIntentOp(txn2, hlc.Timestamp{WallTime: 3})) + fwd = rts.ConsumeLogicalOp(ctx, commitIntentOp(txn2, hlc.Timestamp{WallTime: 3})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) } @@ -418,6 +421,8 @@ func TestResolvedTimestampNoIntents(t *testing.T) { func TestResolvedTimestampInit(t *testing.T) { defer leaktest.AfterTest(t)() + ctx := context.Background() + t.Run("CT Before Init", func(t *testing.T) { rts := makeResolvedTimestamp() @@ -436,7 +441,7 @@ func TestResolvedTimestampInit(t *testing.T) { // Add an intent. Not initialized so no resolved timestamp. txn1 := uuid.MakeV4() - fwd := rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 3})) + fwd := rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 3})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) @@ -450,7 +455,7 @@ func TestResolvedTimestampInit(t *testing.T) { // Add an intent. Not initialized so no resolved timestamp. txn1 := uuid.MakeV4() - fwd := rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 3})) + fwd := rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 3})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) @@ -469,12 +474,12 @@ func TestResolvedTimestampInit(t *testing.T) { // Abort an intent. Not initialized so no resolved timestamp. txn1 := uuid.MakeV4() - fwd := rts.ConsumeLogicalOp(abortIntentOp(txn1)) + fwd := rts.ConsumeLogicalOp(ctx, abortIntentOp(txn1)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Abort that intent's transaction. Not initialized so no-op. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortTxnOp(txn1)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) @@ -482,7 +487,7 @@ func TestResolvedTimestampInit(t *testing.T) { // out with the out-of-order intent abort operation. If this abort hadn't // allowed the unresolvedTxn's ref count to drop below 0, this would // have created a reference that would never be cleaned up. - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 3})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 3})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) @@ -501,7 +506,7 @@ func TestResolvedTimestampInit(t *testing.T) { // Abort an intent. Not initialized so no resolved timestamp. txn1 := uuid.MakeV4() - fwd := rts.ConsumeLogicalOp(abortIntentOp(txn1)) + fwd := rts.ConsumeLogicalOp(ctx, abortIntentOp(txn1)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) @@ -513,6 +518,7 @@ func TestResolvedTimestampInit(t *testing.T) { func TestResolvedTimestampTxnAborted(t *testing.T) { defer leaktest.AfterTest(t)() + ctx := context.Background() rts := makeResolvedTimestamp() rts.Init() @@ -523,7 +529,7 @@ func TestResolvedTimestampTxnAborted(t *testing.T) { // Add an intent for a new transaction. txn1 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 10})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 10})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 5}, rts.Get()) @@ -533,23 +539,23 @@ func TestResolvedTimestampTxnAborted(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 9}, rts.Get()) // Abort txn1 after a periodic txn push. Resolved timestamp advances. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortTxnOp(txn1)) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get()) // Update one of txn1's intents. Should be ignored. - fwd = rts.ConsumeLogicalOp(updateIntentOp(txn1, hlc.Timestamp{WallTime: 20})) + fwd = rts.ConsumeLogicalOp(ctx, updateIntentOp(txn1, hlc.Timestamp{WallTime: 20})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get()) // Abort one of txn1's intents. Should be ignored. - fwd = rts.ConsumeLogicalOp(abortIntentOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortIntentOp(txn1)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get()) // Write another intent as txn1. Should add txn1 back into queue. // This will eventually require another txn push to evict. - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 20})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 20})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get()) @@ -560,7 +566,7 @@ func TestResolvedTimestampTxnAborted(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 19}, rts.Get()) // Abort txn1 again after another periodic push. Resolved timestamp advances. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortTxnOp(txn1)) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 25}, rts.Get()) } @@ -569,6 +575,7 @@ func TestResolvedTimestampTxnAborted(t *testing.T) { func TestClosedTimestampLogicalPart(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + ctx := context.Background() rts := makeResolvedTimestamp() rts.Init() @@ -579,7 +586,7 @@ func TestClosedTimestampLogicalPart(t *testing.T) { // Add an intent for a new transaction. txn1 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 10, Logical: 4})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 10, Logical: 4})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 10, Logical: 0}, rts.Get()) @@ -591,7 +598,7 @@ func TestClosedTimestampLogicalPart(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 10, Logical: 0}, rts.Get()) // Abort txn1. Resolved timestamp advances. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortTxnOp(txn1)) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 11, Logical: 0}, rts.Get()) @@ -600,7 +607,7 @@ func TestClosedTimestampLogicalPart(t *testing.T) { // and an intent is in the next wall tick; this used to cause an issue because // of the rounding logic. txn2 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn2, hlc.Timestamp{WallTime: 12, Logical: 7})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn2, hlc.Timestamp{WallTime: 12, Logical: 7})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 11, Logical: 0}, rts.Get()) }