diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index eff4bba4fffc..58db7fdb0157 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -847,7 +847,7 @@ func (p *LegacyProcessor) consumeLogicalOps( // Determine whether the operation caused the resolved timestamp to // move forward. If so, publish a RangeFeedCheckpoint notification. - if p.rts.ConsumeLogicalOp(op) { + if p.rts.ConsumeLogicalOp(ctx, op) { p.publishCheckpoint(ctx) } } diff --git a/pkg/kv/kvserver/rangefeed/resolved_timestamp.go b/pkg/kv/kvserver/rangefeed/resolved_timestamp.go index 6c5e09643e4d..f109bdc131bd 100644 --- a/pkg/kv/kvserver/rangefeed/resolved_timestamp.go +++ b/pkg/kv/kvserver/rangefeed/resolved_timestamp.go @@ -12,6 +12,7 @@ package rangefeed import ( "bytes" + "context" "fmt" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" @@ -19,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/container/heap" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" ) @@ -129,32 +131,40 @@ func (rts *resolvedTimestamp) ForwardClosedTS(newClosedTS hlc.Timestamp) bool { // operation within its range of tracked keys. This allows the structure to // update its internal intent tracking to reflect the change. The method returns // whether this caused the resolved timestamp to move forward. -func (rts *resolvedTimestamp) ConsumeLogicalOp(op enginepb.MVCCLogicalOp) bool { - if rts.consumeLogicalOp(op) { +func (rts *resolvedTimestamp) ConsumeLogicalOp( + ctx context.Context, op enginepb.MVCCLogicalOp, +) bool { + if rts.consumeLogicalOp(ctx, op) { return rts.recompute() } rts.assertNoChange() return false } -func (rts *resolvedTimestamp) consumeLogicalOp(op enginepb.MVCCLogicalOp) bool { +func (rts *resolvedTimestamp) consumeLogicalOp( + ctx context.Context, op enginepb.MVCCLogicalOp, +) bool { switch t := op.GetValue().(type) { case *enginepb.MVCCWriteValueOp: - rts.assertOpAboveRTS(op, t.Timestamp) + rts.assertOpAboveRTS(ctx, op, t.Timestamp, true /* fatal */) return false case *enginepb.MVCCDeleteRangeOp: - rts.assertOpAboveRTS(op, t.Timestamp) + rts.assertOpAboveRTS(ctx, op, t.Timestamp, true /* fatal */) return false case *enginepb.MVCCWriteIntentOp: - rts.assertOpAboveRTS(op, t.Timestamp) + rts.assertOpAboveRTS(ctx, op, t.Timestamp, true /* fatal */) return rts.intentQ.IncRef(t.TxnID, t.TxnKey, t.TxnIsoLevel, t.TxnMinTimestamp, t.Timestamp) case *enginepb.MVCCUpdateIntentOp: return rts.intentQ.UpdateTS(t.TxnID, t.Timestamp) case *enginepb.MVCCCommitIntentOp: + // This assertion can be violated in mixed-version clusters prior + // to 24.1, so make it non-fatal for now. See: + // https://github.com/cockroachdb/cockroach/issues/104309 + rts.assertOpAboveRTS(ctx, op, t.Timestamp, false /* fatal */) return rts.intentQ.DecrRef(t.TxnID, t.Timestamp) case *enginepb.MVCCAbortIntentOp: @@ -265,10 +275,22 @@ func (rts *resolvedTimestamp) assertNoChange() { // assertOpAboveTimestamp asserts that this operation is at a larger timestamp // than the current resolved timestamp. A violation of this assertion would // indicate a failure of the closed timestamp mechanism. -func (rts *resolvedTimestamp) assertOpAboveRTS(op enginepb.MVCCLogicalOp, opTS hlc.Timestamp) { +func (rts *resolvedTimestamp) assertOpAboveRTS( + ctx context.Context, op enginepb.MVCCLogicalOp, opTS hlc.Timestamp, fatal bool, +) { if opTS.LessEq(rts.resolvedTS) { - panic(fmt.Sprintf("resolved timestamp %s equal to or above timestamp of operation %v", - rts.resolvedTS, op)) + // NB: MVCCLogicalOp.String() is only implemented for pointer receiver. + // We shadow the variable to avoid it escaping to the heap. + op := op + err := errors.AssertionFailedf( + "resolved timestamp %s equal to or above timestamp of operation %v", rts.resolvedTS, &op) + if fatal { + // TODO(erikgrinaker): use log.Fatalf. Panic for now, since tests expect + // it and to minimize code churn for backports. + panic(err) + } else { + log.Errorf(ctx, "%v", err) + } } } diff --git a/pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go b/pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go index 4c890bd75780..961457d819f6 100644 --- a/pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go +++ b/pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go @@ -11,6 +11,7 @@ package rangefeed import ( + "context" "testing" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" @@ -179,6 +180,7 @@ func TestUnresolvedIntentQueue(t *testing.T) { func TestResolvedTimestamp(t *testing.T) { defer leaktest.AfterTest(t)() + ctx := context.Background() rts := makeResolvedTimestamp() rts.Init() @@ -187,13 +189,13 @@ func TestResolvedTimestamp(t *testing.T) { // Add an intent. No closed timestamp so no resolved timestamp. txn1 := uuid.MakeV4() - fwd := rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 10})) + fwd := rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 10})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Add another intent. No closed timestamp so no resolved timestamp. txn2 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn2, hlc.Timestamp{WallTime: 12})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn2, hlc.Timestamp{WallTime: 12})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) @@ -204,16 +206,16 @@ func TestResolvedTimestamp(t *testing.T) { // Write intent at earlier timestamp. Assertion failure. require.Panics(t, func() { - rts.ConsumeLogicalOp(writeIntentOp(uuid.MakeV4(), hlc.Timestamp{WallTime: 3})) + rts.ConsumeLogicalOp(ctx, writeIntentOp(uuid.MakeV4(), hlc.Timestamp{WallTime: 3})) }) // Write value at earlier timestamp. Assertion failure. require.Panics(t, func() { - rts.ConsumeLogicalOp(writeValueOp(hlc.Timestamp{WallTime: 4})) + rts.ConsumeLogicalOp(ctx, writeValueOp(hlc.Timestamp{WallTime: 4})) }) // Write value at later timestamp. No effect on resolved timestamp. - fwd = rts.ConsumeLogicalOp(writeValueOp(hlc.Timestamp{WallTime: 6})) + fwd = rts.ConsumeLogicalOp(ctx, writeValueOp(hlc.Timestamp{WallTime: 6})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 5}, rts.Get()) @@ -224,12 +226,12 @@ func TestResolvedTimestamp(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 9}, rts.Get()) // Update the timestamp of txn2. No effect on the resolved timestamp. - fwd = rts.ConsumeLogicalOp(updateIntentOp(txn2, hlc.Timestamp{WallTime: 18})) + fwd = rts.ConsumeLogicalOp(ctx, updateIntentOp(txn2, hlc.Timestamp{WallTime: 18})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 9}, rts.Get()) // Update the timestamp of txn1. Resolved timestamp moves forward. - fwd = rts.ConsumeLogicalOp(updateIntentOp(txn1, hlc.Timestamp{WallTime: 20})) + fwd = rts.ConsumeLogicalOp(ctx, updateIntentOp(txn1, hlc.Timestamp{WallTime: 20})) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get()) @@ -239,13 +241,13 @@ func TestResolvedTimestamp(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 17}, rts.Get()) // Write intent for earliest txn at same timestamp. No change. - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn2, hlc.Timestamp{WallTime: 18})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn2, hlc.Timestamp{WallTime: 18})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 17}, rts.Get()) // Write intent for earliest txn at later timestamp. Resolved // timestamp moves forward. - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn2, hlc.Timestamp{WallTime: 25})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn2, hlc.Timestamp{WallTime: 25})) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 18}, rts.Get()) @@ -256,47 +258,47 @@ func TestResolvedTimestamp(t *testing.T) { // First transaction aborted. Resolved timestamp moves to next earliest // intent. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortTxnOp(txn1)) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) - fwd = rts.ConsumeLogicalOp(abortIntentOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortIntentOp(txn1)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) // Third transaction at higher timestamp. No effect. txn3 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn3, hlc.Timestamp{WallTime: 30})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn3, hlc.Timestamp{WallTime: 30})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn3, hlc.Timestamp{WallTime: 31})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn3, hlc.Timestamp{WallTime: 31})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) // Third transaction aborted. No effect. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn3)) + fwd = rts.ConsumeLogicalOp(ctx, abortTxnOp(txn3)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) - fwd = rts.ConsumeLogicalOp(abortIntentOp(txn3)) + fwd = rts.ConsumeLogicalOp(ctx, abortIntentOp(txn3)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) - fwd = rts.ConsumeLogicalOp(abortIntentOp(txn3)) + fwd = rts.ConsumeLogicalOp(ctx, abortIntentOp(txn3)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) // Fourth transaction at higher timestamp. No effect. txn4 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn4, hlc.Timestamp{WallTime: 45})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn4, hlc.Timestamp{WallTime: 45})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) // Fourth transaction committed. No effect. - fwd = rts.ConsumeLogicalOp(commitIntentOp(txn4, hlc.Timestamp{WallTime: 45})) + fwd = rts.ConsumeLogicalOp(ctx, commitIntentOp(txn4, hlc.Timestamp{WallTime: 45})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) // Second transaction observes one intent being resolved at timestamp // above closed time. Resolved timestamp moves to closed timestamp. - fwd = rts.ConsumeLogicalOp(commitIntentOp(txn2, hlc.Timestamp{WallTime: 35})) + fwd = rts.ConsumeLogicalOp(ctx, commitIntentOp(txn2, hlc.Timestamp{WallTime: 35})) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 30}, rts.Get()) @@ -307,22 +309,22 @@ func TestResolvedTimestamp(t *testing.T) { // Second transaction observes another intent being resolved at timestamp // below closed time. Still one intent left. - fwd = rts.ConsumeLogicalOp(commitIntentOp(txn2, hlc.Timestamp{WallTime: 35})) + fwd = rts.ConsumeLogicalOp(ctx, commitIntentOp(txn2, hlc.Timestamp{WallTime: 35})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 34}, rts.Get()) // Second transaction observes final intent being resolved at timestamp // below closed time. Resolved timestamp moves to closed timestamp. - fwd = rts.ConsumeLogicalOp(commitIntentOp(txn2, hlc.Timestamp{WallTime: 35})) + fwd = rts.ConsumeLogicalOp(ctx, commitIntentOp(txn2, hlc.Timestamp{WallTime: 35})) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 40}, rts.Get()) // Fifth transaction at higher timestamp. No effect. txn5 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn5, hlc.Timestamp{WallTime: 45})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn5, hlc.Timestamp{WallTime: 45})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 40}, rts.Get()) - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn5, hlc.Timestamp{WallTime: 46})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn5, hlc.Timestamp{WallTime: 46})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 40}, rts.Get()) @@ -333,7 +335,7 @@ func TestResolvedTimestamp(t *testing.T) { // Fifth transaction bumps epoch and re-writes one of its intents. Resolved // timestamp moves to the new transaction timestamp. - fwd = rts.ConsumeLogicalOp(updateIntentOp(txn5, hlc.Timestamp{WallTime: 47})) + fwd = rts.ConsumeLogicalOp(ctx, updateIntentOp(txn5, hlc.Timestamp{WallTime: 47})) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 46}, rts.Get()) @@ -341,48 +343,49 @@ func TestResolvedTimestamp(t *testing.T) { // its final epoch. Resolved timestamp moves forward after observing the // first intent committing at a higher timestamp and moves to the closed // timestamp after observing the second intent aborting. - fwd = rts.ConsumeLogicalOp(commitIntentOp(txn5, hlc.Timestamp{WallTime: 49})) + fwd = rts.ConsumeLogicalOp(ctx, commitIntentOp(txn5, hlc.Timestamp{WallTime: 49})) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 48}, rts.Get()) - fwd = rts.ConsumeLogicalOp(abortIntentOp(txn5)) + fwd = rts.ConsumeLogicalOp(ctx, abortIntentOp(txn5)) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 50}, rts.Get()) } func TestResolvedTimestampNoClosedTimestamp(t *testing.T) { defer leaktest.AfterTest(t)() + ctx := context.Background() rts := makeResolvedTimestamp() rts.Init() // Add a value. No closed timestamp so no resolved timestamp. - fwd := rts.ConsumeLogicalOp(writeValueOp(hlc.Timestamp{WallTime: 1})) + fwd := rts.ConsumeLogicalOp(ctx, writeValueOp(hlc.Timestamp{WallTime: 1})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Add an intent. No closed timestamp so no resolved timestamp. txn1 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 1})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 1})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Update intent. No closed timestamp so no resolved timestamp. - fwd = rts.ConsumeLogicalOp(updateIntentOp(txn1, hlc.Timestamp{WallTime: 2})) + fwd = rts.ConsumeLogicalOp(ctx, updateIntentOp(txn1, hlc.Timestamp{WallTime: 2})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Add another intent. No closed timestamp so no resolved timestamp. txn2 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn2, hlc.Timestamp{WallTime: 3})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn2, hlc.Timestamp{WallTime: 3})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Abort the first intent. No closed timestamp so no resolved timestamp. - fwd = rts.ConsumeLogicalOp(abortIntentOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortIntentOp(txn1)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Commit the second intent. No closed timestamp so no resolved timestamp. - fwd = rts.ConsumeLogicalOp(commitIntentOp(txn2, hlc.Timestamp{WallTime: 3})) + fwd = rts.ConsumeLogicalOp(ctx, commitIntentOp(txn2, hlc.Timestamp{WallTime: 3})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) } @@ -421,6 +424,8 @@ func TestResolvedTimestampNoIntents(t *testing.T) { func TestResolvedTimestampInit(t *testing.T) { defer leaktest.AfterTest(t)() + ctx := context.Background() + t.Run("CT Before Init", func(t *testing.T) { rts := makeResolvedTimestamp() @@ -439,7 +444,7 @@ func TestResolvedTimestampInit(t *testing.T) { // Add an intent. Not initialized so no resolved timestamp. txn1 := uuid.MakeV4() - fwd := rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 3})) + fwd := rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 3})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) @@ -453,7 +458,7 @@ func TestResolvedTimestampInit(t *testing.T) { // Add an intent. Not initialized so no resolved timestamp. txn1 := uuid.MakeV4() - fwd := rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 3})) + fwd := rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 3})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) @@ -472,12 +477,12 @@ func TestResolvedTimestampInit(t *testing.T) { // Abort an intent. Not initialized so no resolved timestamp. txn1 := uuid.MakeV4() - fwd := rts.ConsumeLogicalOp(abortIntentOp(txn1)) + fwd := rts.ConsumeLogicalOp(ctx, abortIntentOp(txn1)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Abort that intent's transaction. Not initialized so no-op. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortTxnOp(txn1)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) @@ -485,7 +490,7 @@ func TestResolvedTimestampInit(t *testing.T) { // out with the out-of-order intent abort operation. If this abort hadn't // allowed the unresolvedTxn's ref count to drop below 0, this would // have created a reference that would never be cleaned up. - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 3})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 3})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) @@ -504,7 +509,7 @@ func TestResolvedTimestampInit(t *testing.T) { // Abort an intent. Not initialized so no resolved timestamp. txn1 := uuid.MakeV4() - fwd := rts.ConsumeLogicalOp(abortIntentOp(txn1)) + fwd := rts.ConsumeLogicalOp(ctx, abortIntentOp(txn1)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) @@ -516,6 +521,7 @@ func TestResolvedTimestampInit(t *testing.T) { func TestResolvedTimestampTxnAborted(t *testing.T) { defer leaktest.AfterTest(t)() + ctx := context.Background() rts := makeResolvedTimestamp() rts.Init() @@ -526,7 +532,7 @@ func TestResolvedTimestampTxnAborted(t *testing.T) { // Add an intent for a new transaction. txn1 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 10})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 10})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 5}, rts.Get()) @@ -536,23 +542,23 @@ func TestResolvedTimestampTxnAborted(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 9}, rts.Get()) // Abort txn1 after a periodic txn push. Resolved timestamp advances. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortTxnOp(txn1)) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get()) // Update one of txn1's intents. Should be ignored. - fwd = rts.ConsumeLogicalOp(updateIntentOp(txn1, hlc.Timestamp{WallTime: 20})) + fwd = rts.ConsumeLogicalOp(ctx, updateIntentOp(txn1, hlc.Timestamp{WallTime: 20})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get()) // Abort one of txn1's intents. Should be ignored. - fwd = rts.ConsumeLogicalOp(abortIntentOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortIntentOp(txn1)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get()) // Write another intent as txn1. Should add txn1 back into queue. // This will eventually require another txn push to evict. - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 20})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 20})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get()) @@ -563,7 +569,7 @@ func TestResolvedTimestampTxnAborted(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 19}, rts.Get()) // Abort txn1 again after another periodic push. Resolved timestamp advances. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortTxnOp(txn1)) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 25}, rts.Get()) } @@ -572,6 +578,7 @@ func TestResolvedTimestampTxnAborted(t *testing.T) { func TestClosedTimestampLogicalPart(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + ctx := context.Background() rts := makeResolvedTimestamp() rts.Init() @@ -582,7 +589,7 @@ func TestClosedTimestampLogicalPart(t *testing.T) { // Add an intent for a new transaction. txn1 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 10, Logical: 4})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 10, Logical: 4})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 10, Logical: 0}, rts.Get()) @@ -594,7 +601,7 @@ func TestClosedTimestampLogicalPart(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 10, Logical: 0}, rts.Get()) // Abort txn1. Resolved timestamp advances. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortTxnOp(txn1)) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 11, Logical: 0}, rts.Get()) @@ -603,7 +610,7 @@ func TestClosedTimestampLogicalPart(t *testing.T) { // and an intent is in the next wall tick; this used to cause an issue because // of the rounding logic. txn2 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn2, hlc.Timestamp{WallTime: 12, Logical: 7})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn2, hlc.Timestamp{WallTime: 12, Logical: 7})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 11, Logical: 0}, rts.Get()) } diff --git a/pkg/kv/kvserver/rangefeed/scheduled_processor.go b/pkg/kv/kvserver/rangefeed/scheduled_processor.go index f00e670a0616..8cee96698aa0 100644 --- a/pkg/kv/kvserver/rangefeed/scheduled_processor.go +++ b/pkg/kv/kvserver/rangefeed/scheduled_processor.go @@ -698,7 +698,7 @@ func (p *ScheduledProcessor) consumeLogicalOps( // Determine whether the operation caused the resolved timestamp to // move forward. If so, publish a RangeFeedCheckpoint notification. - if p.rts.ConsumeLogicalOp(op) { + if p.rts.ConsumeLogicalOp(ctx, op) { p.publishCheckpoint(ctx) } }