From aa5e3539df3b2937f39e5b55e61c8a174a415b55 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Wed, 15 Nov 2023 12:23:20 +0000 Subject: [PATCH 1/2] rangefeed: fix premature checkpoint due to intent resolution race It was possible for rangefeeds to emit a premature checkpoint, before all writes below its timestamp had been emitted. This in turn would cause changefeeds to not emit these write events at all. This could happen because `PushTxn` may return a false `ABORTED` status for a transaction that has in fact been committed, if the transaction record has been GCed (after resolving all intents). The timestamp cache does not retain sufficient information to disambiguate a committed transaction from an aborted one in this case, so it pessimistically assumes an abort (see `Replica.CanCreateTxnRecord` and `batcheval.SynthesizeTxnFromMeta`). However, the rangefeed txn pusher trusted this `ABORTED` status, ignoring the pending txn intents and allowing the resolved timestamp to advance past them before emitting the committed intents. This can lead to the following scenario: - A rangefeed is running on a lagging follower. - A txn writes an intent, which is replicated to the follower. - The closed timestamp advances past the intent. - The txn commits and resolves the intent at the original write timestamp, then GCs its txn record. This is not yet applied on the follower. - The rangefeed pushes the txn to advance its resolved timestamp. - The txn is GCed, so the push returns ABORTED (it can't know whether the txn was committed or aborted after its record is GCed). - The rangefeed disregards the "aborted" txn and advances the resolved timestamp, emitting a checkpoint. - The follower applies the resolved intent and emits an event below the checkpoint, violating the checkpoint guarantee. - The changefeed sees an event below its frontier and drops it, never emitting these events at all. This patch fixes the bug by submitting a barrier command to the leaseholder which waits for all past and ongoing writes (including intent resolution) to complete and apply, and then waits for the local replica to apply the barrier as well. This ensures any committed intent resolution will be applied and emitted before the transaction is removed from resolved timestamp tracking. Epic: none Release note (bug fix): fixed a bug where a changefeed could omit events in rare cases, logging the error "cdc ux violation: detected timestamp ... that is less or equal to the local frontier". This can happen if a rangefeed runs on a follower replica that lags significantly behind the leaseholder, a transaction commits and removes its transaction record before its intent resolution is applied on the follower, the follower's closed timestamp has advanced past the transaction commit timestamp, and the rangefeed attempts to push the transaction to a new timestamp (at least 10 seconds after the transaction began). This may cause the rangefeed to prematurely emit a checkpoint before emitting writes at lower timestamps, which in turn may cause the changefeed to drop these events entirely, never emitting them. --- pkg/kv/kvclient/rangefeed/BUILD.bazel | 3 + .../rangefeed/rangefeed_external_test.go | 323 ++++++++++++++++++ pkg/kv/kvserver/rangefeed/BUILD.bazel | 1 + pkg/kv/kvserver/rangefeed/processor.go | 13 + pkg/kv/kvserver/rangefeed/processor_test.go | 20 +- .../kvserver/rangefeed/scheduled_processor.go | 2 +- pkg/kv/kvserver/rangefeed/task.go | 61 +++- pkg/kv/kvserver/rangefeed/task_test.go | 16 +- pkg/kv/kvserver/replica_rangefeed.go | 59 +++- 9 files changed, 471 insertions(+), 27 deletions(-) diff --git a/pkg/kv/kvclient/rangefeed/BUILD.bazel b/pkg/kv/kvclient/rangefeed/BUILD.bazel index 2be7adff656d..2dc5b2b4a132 100644 --- a/pkg/kv/kvclient/rangefeed/BUILD.bazel +++ b/pkg/kv/kvclient/rangefeed/BUILD.bazel @@ -74,6 +74,7 @@ go_test( "//pkg/kv/kvpb", "//pkg/kv/kvserver", "//pkg/kv/kvserver/closedts", + "//pkg/kv/kvserver/kvserverbase", "//pkg/roachpb", "//pkg/security/securityassets", "//pkg/security/securitytest", @@ -85,12 +86,14 @@ go_test( "//pkg/storage", "//pkg/testutils", "//pkg/testutils/serverutils", + "//pkg/testutils/skip", "//pkg/testutils/sqlutils", "//pkg/testutils/storageutils", "//pkg/testutils/testcluster", "//pkg/util", "//pkg/util/ctxgroup", "//pkg/util/encoding", + "//pkg/util/future", "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", diff --git a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go index 998d9f434320..7e8a6ad66542 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go @@ -14,6 +14,7 @@ import ( "context" "runtime/pprof" "sync" + "sync/atomic" "testing" "time" @@ -25,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" @@ -32,10 +34,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/storageutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/cockroach/pkg/util/future" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -1229,3 +1233,322 @@ func TestRangeFeedStartTimeExclusive(t *testing.T) { t.Fatal("timed out waiting for event") } } + +// TestRangeFeedIntentResolutionRace is a regression test for +// https://github.com/cockroachdb/cockroach/issues/104309, i.e. the +// following scenario: +// +// - A rangefeed is running on a lagging follower. +// - A txn writes an intent, which is replicated to the follower. +// - The closed timestamp advances past the intent. +// - The txn commits and resolves the intent at the original write timestamp, +// then GCs its txn record. This is not yet applied on the follower. +// - The rangefeed pushes the txn to advance its resolved timestamp. +// - The txn is GCed, so the push returns ABORTED (it can't know whether the +// txn was committed or aborted after its record is GCed). +// - The rangefeed disregards the "aborted" txn and advances the resolved +// timestamp, emitting a checkpoint. +// - The follower applies the resolved intent and emits an event below +// the checkpoint, violating the checkpoint guarantee. +// +// This scenario is addressed by running a Barrier request through Raft and +// waiting for it to apply locally before removing the txn from resolved ts +// tracking. This ensures the pending intent resolution is applied before +// the resolved ts can advance. +func TestRangeFeedIntentResolutionRace(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + skip.UnderRace(t) // too slow, times out + skip.UnderDeadlock(t) + + // Use a timeout, to prevent a hung test. + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + // defer cancel() after Stopper.Stop(), so the context cancels first. + // Otherwise, the stopper will hang waiting for a rangefeed whose context is + // not yet cancelled. + + // Set up an apply filter that blocks Raft application on n3 (follower) for + // the given range. + var blockRangeOnN3 atomic.Int64 + unblockRangeC := make(chan struct{}) + applyFilter := func(args kvserverbase.ApplyFilterArgs) (int, *kvpb.Error) { + if args.StoreID == 3 { + if rangeID := blockRangeOnN3.Load(); rangeID > 0 && rangeID == int64(args.RangeID) { + t.Logf("blocked r%d on s%d", args.RangeID, args.StoreID) + select { + case <-unblockRangeC: + t.Logf("unblocked r%d on s%d", args.RangeID, args.StoreID) + case <-ctx.Done(): + return 0, kvpb.NewError(ctx.Err()) + } + } + } + return 0, nil + } + + // Set up a request filter that blocks transaction pushes for a specific key. + // This is used to prevent the rangefeed txn pusher from pushing the txn + // timestamp above the closed timestamp before it commits, only allowing the + // push to happen after the transaction has committed and GCed below the + // closed timestamp. + var blockPush atomic.Pointer[roachpb.Key] + unblockPushC := make(chan struct{}) + reqFilter := func(ctx context.Context, br *kvpb.BatchRequest) *kvpb.Error { + if br.IsSinglePushTxnRequest() { + req := br.Requests[0].GetPushTxn() + if key := blockPush.Load(); key != nil && req.Key.Equal(*key) { + t.Logf("blocked push for txn %s", req.PusheeTxn) + select { + case <-unblockPushC: + t.Logf("unblocked push for txn %s", req.PusheeTxn) + case <-ctx.Done(): + return kvpb.NewError(ctx.Err()) + } + } + } + return nil + } + + // Speed up the test by reducing various closed/resolved timestamp intervals. + const interval = 100 * time.Millisecond + st := cluster.MakeClusterSettings() + kvserver.RangeFeedRefreshInterval.Override(ctx, &st.SV, interval) + closedts.SideTransportCloseInterval.Override(ctx, &st.SV, interval) + closedts.TargetDuration.Override(ctx, &st.SV, interval) + + // Start a cluster with 3 nodes. + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: st, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + TestingRequestFilter: reqFilter, + TestingApplyCalledTwiceFilter: applyFilter, + RangeFeedPushTxnsInterval: interval, + RangeFeedPushTxnsAge: interval, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + defer cancel() + + n1 := tc.Server(0) + s3 := tc.GetFirstStoreFromServer(t, 2) + clock := n1.ApplicationLayer().Clock() + + // Determine the key/value we're going to write. + prefix := append(n1.ApplicationLayer().Codec().TenantPrefix(), keys.ScratchRangeMin...) + key := append(prefix.Clone(), []byte("/foo")...) + value := []byte("value") + + // Split off a range and upreplicate it, with leaseholder on n1. + _, _, err := n1.StorageLayer().SplitRange(prefix) + require.NoError(t, err) + desc := tc.AddVotersOrFatal(t, prefix, tc.Targets(1, 2)...) + t.Logf("split off range %s", desc) + + repl1 := tc.GetFirstStoreFromServer(t, 0).LookupReplica(roachpb.RKey(prefix)) // leaseholder + repl3 := tc.GetFirstStoreFromServer(t, 2).LookupReplica(roachpb.RKey(prefix)) // lagging follower + + require.True(t, repl1.OwnsValidLease(ctx, clock.NowAsClockTimestamp())) + + // Block pushes of the txn, to ensure it can write at a fixed timestamp. + // Otherwise, the rangefeed or someone else may push it beyond the closed + // timestamp. + blockPush.Store(&key) + + // We'll use n3 as our lagging follower. Start a rangefeed on it directly. + req := kvpb.RangeFeedRequest{ + Header: kvpb.Header{ + RangeID: desc.RangeID, + }, + Span: desc.RSpan().AsRawSpanWithNoLocals(), + } + eventC := make(chan *kvpb.RangeFeedEvent) + sink := newChannelSink(ctx, eventC) + fErr := future.MakeAwaitableFuture(s3.RangeFeed(&req, sink)) + require.NoError(t, fErr.Get()) // check if we've errored yet + t.Logf("started rangefeed on %s", repl3) + + // Spawn a rangefeed monitor, which posts checkpoint updates to checkpointC. + // This uses a buffered channel of size 1, and empties it out before posting a + // new update, such that it contains the latest known checkpoint and does not + // block the rangefeed. It also posts emitted values for our key to valueC, + // which should only happen once. + checkpointC := make(chan *kvpb.RangeFeedCheckpoint, 1) + valueC := make(chan *kvpb.RangeFeedValue, 1) + go func() { + defer close(checkpointC) + defer close(valueC) + for { + select { + case e := <-eventC: + switch { + case e.Checkpoint != nil: + // Clear checkpointC such that it always contains the latest. + select { + case <-checkpointC: + default: + } + checkpointC <- e.Checkpoint + case e.Val != nil && e.Val.Key.Equal(key): + select { + case valueC <- e.Val: + default: + t.Errorf("duplicate value event for %s: %s", key, e) + } + } + case <-ctx.Done(): + return + } + } + }() + + waitForCheckpoint := func(t *testing.T, ts hlc.Timestamp) hlc.Timestamp { + t.Helper() + timeoutC := time.After(3 * time.Second) + for { + select { + case c := <-checkpointC: + require.NotNil(t, c, "nil checkpoint") + if ts.LessEq(c.ResolvedTS) { + t.Logf("rangefeed checkpoint at %s >= %s", c.ResolvedTS, ts) + return c.ResolvedTS + } + case <-timeoutC: + require.Fail(t, "timed out waiting for checkpoint", "wanted %s", ts) + } + } + } + + // Wait for the initial checkpoint. + rts := waitForCheckpoint(t, clock.Now()) + t.Logf("initial checkpoint at %s", rts) + + // Start a new transaction on n1 with a fixed timestamp (to make sure it + // remains below the closed timestamp). Write an intent, and read it back to + // make sure it has applied. + writeTS := clock.Now() + txn := n1.ApplicationLayer().DB().NewTxn(ctx, "test") + require.NoError(t, txn.SetFixedTimestamp(ctx, writeTS)) + require.NoError(t, txn.Put(ctx, key, value)) + _, err = txn.Get(ctx, key) + require.NoError(t, err) + t.Logf("wrote %s", key) + + // Wait for both the leaseholder and the follower to close the transaction's + // write timestamp. + waitForClosedTimestamp := func(t *testing.T, repl *kvserver.Replica, ts hlc.Timestamp) hlc.Timestamp { + t.Helper() + timeoutC := time.After(3 * time.Second) + for { + if closedTS := repl.GetCurrentClosedTimestamp(ctx); ts.LessEq(closedTS) { + t.Logf("replica %s closed timestamp at %s >= %s", repl, closedTS, ts) + return closedTS + } + select { + case <-time.After(10 * time.Millisecond): + case <-timeoutC: + require.Fail(t, "timeout out waiting for closed timestamp", "wanted %s", ts) + } + } + } + cts := waitForClosedTimestamp(t, repl1, writeTS) + _ = waitForClosedTimestamp(t, repl3, writeTS) + t.Logf("closed timestamp %s is above txn write timestamp %s", cts, writeTS) + + // Wait for the rangefeed checkpoint to reach writeTS.Prev(). This ensures the + // rangefeed's view of the closed timestamp has been updated, and is now only + // blocked by the intent. + waitTS := writeTS.Prev() + waitTS.Logical = 0 + rts = waitForCheckpoint(t, waitTS) + t.Logf("rangefeed caught up to txn write timestamp at %s", rts) + + // Block Raft application on repl3. + blockRangeOnN3.Store(int64(desc.RangeID)) + + // Commit the transaction, and check its commit timestamp. + require.NoError(t, txn.Commit(ctx)) + commitTS, err := txn.CommitTimestamp() + require.NoError(t, err) + require.Equal(t, commitTS, writeTS) + t.Logf("txn committed at %s", writeTS) + + // Unblock transaction pushes. Since repl3 won't apply the intent resolution + // yet, the rangefeed will keep trying to push the transaction. Once the + // transaction record is GCed (which happens async), the rangefeed will see an + // ABORTED status. + // + // It may see the intermediate COMMITTED state too, but at the time of writing + // that does not matter, since the rangefeed needs to keep tracking the + // intent until it applies the resolution, and it will also see the ABORTED + // status before that happens. + blockPush.Store(nil) + close(unblockPushC) + + // Make sure repl3 does not emit a checkpoint beyond the write timestamp. Its + // closed timestamp has already advanced past it, but the unresolved intent + // should prevent the resolved timestamp from advancing, despite the false + // ABORTED state. We also make sure no value has been emitted. + waitC := time.After(3 * time.Second) + for done := false; !done; { + select { + case c := <-checkpointC: + require.NotNil(t, c) + require.False(t, commitTS.LessEq(c.ResolvedTS), + "repl %s emitted checkpoint %s beyond write timestamp %s", repl3, c.ResolvedTS, commitTS) + case v := <-valueC: + require.Fail(t, "repl3 emitted premature value %s", v) + case <-waitC: + done = true + } + } + t.Logf("checkpoint still below write timestamp") + + // Unblock application on repl3. Wait for the checkpoint to catch up to the + // commit timestamp, and the committed value to be emitted. + blockRangeOnN3.Store(0) + close(unblockRangeC) + + rts = waitForCheckpoint(t, writeTS) + t.Logf("checkpoint %s caught up to write timestamp %s", rts, writeTS) + + select { + case v := <-valueC: + require.Equal(t, v.Key, key) + require.Equal(t, v.Value.Timestamp, writeTS) + t.Logf("received value %s", v) + case <-time.After(3 * time.Second): + require.Fail(t, "timed out waiting for event") + } + + // The rangefeed should still be running. + require.NoError(t, fErr.Get()) +} + +// channelSink is a rangefeed sink which posts events to a channel. +type channelSink struct { + ctx context.Context + ch chan<- *kvpb.RangeFeedEvent +} + +func newChannelSink(ctx context.Context, ch chan<- *kvpb.RangeFeedEvent) *channelSink { + return &channelSink{ctx: ctx, ch: ch} +} + +func (c *channelSink) Context() context.Context { + return c.ctx +} + +func (c *channelSink) Send(e *kvpb.RangeFeedEvent) error { + select { + case c.ch <- e: + return nil + case <-c.ctx.Done(): + return c.ctx.Err() + } +} diff --git a/pkg/kv/kvserver/rangefeed/BUILD.bazel b/pkg/kv/kvserver/rangefeed/BUILD.bazel index de28901454df..55e22eeb168c 100644 --- a/pkg/kv/kvserver/rangefeed/BUILD.bazel +++ b/pkg/kv/kvserver/rangefeed/BUILD.bazel @@ -17,6 +17,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed", visibility = ["//visibility:public"], deps = [ + "//pkg/clusterversion", "//pkg/keys", "//pkg/kv/kvpb", "//pkg/kv/kvserver/concurrency/isolation", diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index 150dda770002..58f95cd1b97e 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -47,6 +47,19 @@ var ( "periodically push txn write timestamps to advance rangefeed resolved timestamps", true, ) + + // PushTxnsBarrierEnabled is an escape hatch to disable the txn push barrier + // command in case it causes unexpected problems. This can result in + // violations of the rangefeed checkpoint guarantee, emitting premature + // checkpoints before all writes below it have been emitted in rare cases. + // See: https://github.com/cockroachdb/cockroach/issues/104309 + PushTxnsBarrierEnabled = settings.RegisterBoolSetting( + settings.SystemOnly, + "kv.rangefeed.push_txns.barrier.enabled", + "flush and apply prior writes when a txn push returns an ambiguous abort "+ + "(disabling may emit premature checkpoints before writes in rare cases)", + true, + ) ) // newErrBufferCapacityExceeded creates an error that is returned to subscribers diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index e23d82f57d70..5b5ee2f0b133 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -1000,7 +1000,7 @@ func TestProcessorTxnPushAttempt(t *testing.T) { var tp testTxnPusher tp.mockPushTxns(func( ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, - ) ([]*roachpb.Transaction, error) { + ) ([]*roachpb.Transaction, bool, error) { // The txns are not in a sorted order. Enforce one. sort.Slice(txns, func(i, j int) bool { return bytes.Compare(txns[i].Key, txns[j].Key) < 0 @@ -1014,34 +1014,34 @@ func TestProcessorTxnPushAttempt(t *testing.T) { assert.Equal(t, txn2Meta, txns[1]) assert.Equal(t, txn3Meta, txns[2]) if t.Failed() { - return nil, errors.New("test failed") + return nil, false, errors.New("test failed") } // Push does not succeed. Protos not at larger ts. - return []*roachpb.Transaction{txn1Proto, txn2Proto, txn3Proto}, nil + return []*roachpb.Transaction{txn1Proto, txn2Proto, txn3Proto}, false, nil case 2: assert.Equal(t, 3, len(txns)) assert.Equal(t, txn1MetaT2Pre, txns[0]) assert.Equal(t, txn2Meta, txns[1]) assert.Equal(t, txn3Meta, txns[2]) if t.Failed() { - return nil, errors.New("test failed") + return nil, false, errors.New("test failed") } // Push succeeds. Return new protos. - return []*roachpb.Transaction{txn1ProtoT2, txn2ProtoT2, txn3ProtoT2}, nil + return []*roachpb.Transaction{txn1ProtoT2, txn2ProtoT2, txn3ProtoT2}, false, nil case 3: assert.Equal(t, 2, len(txns)) assert.Equal(t, txn2MetaT2Post, txns[0]) assert.Equal(t, txn3MetaT2Post, txns[1]) if t.Failed() { - return nil, errors.New("test failed") + return nil, false, errors.New("test failed") } // Push succeeds. Return new protos. - return []*roachpb.Transaction{txn2ProtoT3, txn3ProtoT3}, nil + return []*roachpb.Transaction{txn2ProtoT3, txn3ProtoT3}, false, nil default: - return nil, nil + return nil, false, nil } }) tp.mockResolveIntentsFn(func(ctx context.Context, intents []roachpb.LockUpdate) error { @@ -1665,11 +1665,11 @@ func TestProcessorContextCancellation(t *testing.T) { var pusher testTxnPusher pusher.mockPushTxns(func( ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, - ) ([]*roachpb.Transaction, error) { + ) ([]*roachpb.Transaction, bool, error) { pushReadyC <- struct{}{} <-ctx.Done() close(pushDoneC) - return nil, ctx.Err() + return nil, false, ctx.Err() }) pusher.mockResolveIntentsFn(func(ctx context.Context, intents []roachpb.LockUpdate) error { return nil diff --git a/pkg/kv/kvserver/rangefeed/scheduled_processor.go b/pkg/kv/kvserver/rangefeed/scheduled_processor.go index cc6d6f81408e..f00e670a0616 100644 --- a/pkg/kv/kvserver/rangefeed/scheduled_processor.go +++ b/pkg/kv/kvserver/rangefeed/scheduled_processor.go @@ -206,7 +206,7 @@ func (p *ScheduledProcessor) processPushTxn(ctx context.Context) { // Launch an async transaction push attempt that pushes the // timestamp of all transactions beneath the push offset. // Ignore error if quiescing. - pushTxns := newTxnPushAttempt(p.Span, p.TxnPusher, p, toPush, now, func() { + pushTxns := newTxnPushAttempt(p.Settings, p.Span, p.TxnPusher, p, toPush, now, func() { p.enqueueRequest(func(ctx context.Context) { p.txnPushActive = false }) diff --git a/pkg/kv/kvserver/rangefeed/task.go b/pkg/kv/kvserver/rangefeed/task.go index d5605d477664..f7c8a6280af3 100644 --- a/pkg/kv/kvserver/rangefeed/task.go +++ b/pkg/kv/kvserver/rangefeed/task.go @@ -18,11 +18,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" ) @@ -187,10 +189,17 @@ func (s *SeparatedIntentScanner) Close() { s.iter.Close() } // cleaning up the intents of transactions that are found to be committed. type TxnPusher interface { // PushTxns attempts to push the specified transactions to a new - // timestamp. It returns the resulting transaction protos. - PushTxns(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error) + // timestamp. It returns the resulting transaction protos, and a + // bool indicating whether any txn aborts were ambiguous (see + // PushTxnResponse.AmbiguousAbort). + // + // NB: anyAmbiguousAbort may be false with nodes <24.1. + PushTxns(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, bool, error) // ResolveIntents resolves the specified intents. ResolveIntents(ctx context.Context, intents []roachpb.LockUpdate) error + // Barrier waits for all past and ongoing write commands in the range to have + // applied on the leaseholder and the local replica. + Barrier(ctx context.Context) error } // txnPushAttempt pushes all old transactions that have unresolved intents on @@ -211,6 +220,7 @@ type TxnPusher interface { // - ABORTED: inform the Processor to stop caring about the transaction. // It will never commit and its intents can be safely ignored. type txnPushAttempt struct { + st *cluster.Settings span roachpb.RSpan pusher TxnPusher p processorTaskHelper @@ -220,6 +230,7 @@ type txnPushAttempt struct { } func newTxnPushAttempt( + st *cluster.Settings, span roachpb.RSpan, pusher TxnPusher, p processorTaskHelper, @@ -228,6 +239,7 @@ func newTxnPushAttempt( done func(), ) runnable { return &txnPushAttempt{ + st: st, span: span, pusher: pusher, p: p, @@ -251,7 +263,7 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error { // This may cause transaction restarts, but span refreshing should // prevent a restart for any transaction that has not been written // over at a larger timestamp. - pushedTxns, err := a.pusher.PushTxns(ctx, a.txns, a.ts) + pushedTxns, anyAmbiguousAbort, err := a.pusher.PushTxns(ctx, a.txns, a.ts) if err != nil { return err } @@ -324,6 +336,49 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error { } } + // It's possible that the ABORTED state is a false negative, where the + // transaction was in fact committed but the txn record has been removed after + // resolving all intents (see batcheval.SynthesizeTxnFromMeta and + // Replica.CanCreateTxnRecord). If this replica has not applied the intent + // resolution yet, we may prematurely emit an MVCCAbortTxnOp and advance + // the resolved ts before emitting the committed intents. This violates the + // rangefeed checkpoint guarantee, and will at the time of writing cause the + // changefeed to drop these events entirely. See: + // https://github.com/cockroachdb/cockroach/issues/104309 + // + // PushTxns will let us know if it found such an ambiguous abort. To guarantee + // that we've applied all resolved intents in this case, submit a Barrier + // command to the leaseholder and wait for it to apply on the local replica. + // + // By the time the local replica applies the barrier it will have enqueued the + // resolved intents in the rangefeed processor's queue. These updates may not + // yet have been applied to the resolved timestamp intent tracker, but that's + // ok -- our MVCCAbortTxnOp will be enqueued and processed after them. + // + // This incurs an additional Raft write, but so would PushTxns() if we hadn't + // hit the ambiguous abort case. This will also block until ongoing writes + // have completed and applied, but that's fine since we currently run on our + // own goroutine (as opposed to on a rangefeed scheduler goroutine). + // + // NB: We can't try to reduce the span of the barrier, because LockSpans may + // not have the full set of intents. + // + // NB: PushTxnResponse.AmbiguousAbort and BarrierResponse.LeaseAppliedIndex + // are not guaranteed to be populated prior to 24.1. In that case, we degrade + // to the old (buggy) behavior. + if anyAmbiguousAbort && PushTxnsBarrierEnabled.Get(&a.st.SV) { + // The barrier will error out if our context is cancelled (which happens on + // processor shutdown) or if the replica is destroyed. Regardless, use a 1 + // minute backstop to prevent getting wedged. + // + // TODO(erikgrinaker): consider removing this once we have some confidence + // that it won't get wedged. + err := timeutil.RunWithTimeout(ctx, "pushtxns barrier", time.Minute, a.pusher.Barrier) + if err != nil { + return err + } + } + // Inform the processor of all logical ops. a.p.sendEvent(ctx, event{ops: ops}, 0) diff --git a/pkg/kv/kvserver/rangefeed/task_test.go b/pkg/kv/kvserver/rangefeed/task_test.go index 0e2380db8a36..3e27f0163a91 100644 --- a/pkg/kv/kvserver/rangefeed/task_test.go +++ b/pkg/kv/kvserver/rangefeed/task_test.go @@ -360,13 +360,13 @@ func TestInitResolvedTSScan(t *testing.T) { } type testTxnPusher struct { - pushTxnsFn func(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error) + pushTxnsFn func(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, bool, error) resolveIntentsFn func(ctx context.Context, intents []roachpb.LockUpdate) error } func (tp *testTxnPusher) PushTxns( ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, -) ([]*roachpb.Transaction, error) { +) ([]*roachpb.Transaction, bool, error) { return tp.pushTxnsFn(ctx, txns, ts) } @@ -374,8 +374,12 @@ func (tp *testTxnPusher) ResolveIntents(ctx context.Context, intents []roachpb.L return tp.resolveIntentsFn(ctx, intents) } +func (tp *testTxnPusher) Barrier(ctx context.Context) error { + return nil +} + func (tp *testTxnPusher) mockPushTxns( - fn func(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error), + fn func(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, bool, error), ) { tp.pushTxnsFn = fn } @@ -434,7 +438,7 @@ func TestTxnPushAttempt(t *testing.T) { var tp testTxnPusher tp.mockPushTxns(func( ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, - ) ([]*roachpb.Transaction, error) { + ) ([]*roachpb.Transaction, bool, error) { require.Equal(t, 4, len(txns)) require.Equal(t, txn1Meta, txns[0]) require.Equal(t, txn2Meta, txns[1]) @@ -445,7 +449,7 @@ func TestTxnPushAttempt(t *testing.T) { // Return all four protos. The PENDING txn is pushed. txn1ProtoPushed := txn1Proto.Clone() txn1ProtoPushed.WriteTimestamp = ts - return []*roachpb.Transaction{txn1ProtoPushed, txn2Proto, txn3Proto, txn4Proto}, nil + return []*roachpb.Transaction{txn1ProtoPushed, txn2Proto, txn3Proto, txn4Proto}, false, nil }) tp.mockResolveIntentsFn(func(ctx context.Context, intents []roachpb.LockUpdate) error { require.Len(t, intents, 7) @@ -490,7 +494,7 @@ func TestTxnPushAttempt(t *testing.T) { txns := []enginepb.TxnMeta{txn1Meta, txn2Meta, txn3Meta, txn4Meta} doneC := make(chan struct{}) - pushAttempt := newTxnPushAttempt(p.Span, p.TxnPusher, &p, txns, hlc.Timestamp{WallTime: 15}, + pushAttempt := newTxnPushAttempt(p.Settings, p.Span, p.TxnPusher, &p, txns, hlc.Timestamp{WallTime: 15}, func() { close(doneC) }) diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index c21bdd815085..ccb843278acd 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -16,6 +16,7 @@ import ( "sync" "time" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" @@ -124,8 +125,9 @@ func (s *lockedRangefeedStream) Send(e *kvpb.RangeFeedEvent) error { // rangefeedTxnPusher is a shim around intentResolver that implements the // rangefeed.TxnPusher interface. type rangefeedTxnPusher struct { - ir *intentresolver.IntentResolver - r *Replica + ir *intentresolver.IntentResolver + r *Replica + span roachpb.RSpan } // PushTxns is part of the rangefeed.TxnPusher interface. It performs a @@ -133,7 +135,7 @@ type rangefeedTxnPusher struct { // transactions. func (tp *rangefeedTxnPusher) PushTxns( ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, -) ([]*roachpb.Transaction, error) { +) ([]*roachpb.Transaction, bool, error) { pushTxnMap := make(map[uuid.UUID]*enginepb.TxnMeta, len(txns)) for i := range txns { txn := &txns[i] @@ -149,18 +151,18 @@ func (tp *rangefeedTxnPusher) PushTxns( }, } - pushedTxnMap, _, pErr := tp.ir.MaybePushTransactions( + pushedTxnMap, anyAmbiguousAbort, pErr := tp.ir.MaybePushTransactions( ctx, pushTxnMap, h, kvpb.PUSH_TIMESTAMP, false, /* skipIfInFlight */ ) if pErr != nil { - return nil, pErr.GoError() + return nil, false, pErr.GoError() } pushedTxns := make([]*roachpb.Transaction, 0, len(pushedTxnMap)) for _, txn := range pushedTxnMap { pushedTxns = append(pushedTxns, txn) } - return pushedTxns, nil + return pushedTxns, anyAmbiguousAbort, nil } // ResolveIntents is part of the rangefeed.TxnPusher interface. @@ -182,6 +184,49 @@ func (tp *rangefeedTxnPusher) ResolveIntents( ).GoError() } +// Barrier is part of the rangefeed.TxnPusher interface. +func (tp *rangefeedTxnPusher) Barrier(ctx context.Context) error { + // Check for v24.1 before issuing the request, in case the server binary is + // somehow upgraded from 23.2 to 24.1 while the response is in flight. This + // seems very unlikely, but it doesn't really cost us anything. + isV24_1 := tp.r.store.ClusterSettings().Version.IsActive(ctx, clusterversion.V24_1Start) + + // Execute a Barrier on the leaseholder, and obtain its LAI. Error out on any + // range changes (e.g. splits/merges) that we haven't applied yet. + lai, desc, err := tp.r.store.db.BarrierWithLAI(ctx, tp.span.Key, tp.span.EndKey) + if err != nil { + if errors.HasType(err, &kvpb.RangeKeyMismatchError{}) { + return errors.Wrap(err, "range barrier failed, range split") + } + return errors.Wrap(err, "range barrier failed") + } + if lai == 0 { + if !isV24_1 { + // We may be talking to a <24.1 binary which doesn't support + // BarrierRequest.WithLeaseAppliedIndex. We don't expect to see this, + // because those nodes won't support PushTxnResponse.AmbiguousAbort either, + // but if we do just return success and degrade to the old behavior. + return nil + } + return errors.AssertionFailedf("barrier response without LeaseAppliedIndex") + } + if desc.RangeID != tp.r.RangeID { + return errors.Errorf("range barrier failed, range ID changed: %d -> %s", tp.r.RangeID, desc) + } + if !desc.RSpan().Equal(tp.span) { + return errors.Errorf("range barrier failed, range span changed: %s -> %s", tp.span, desc) + } + + // Wait for the local replica to apply it. In the common case where we are the + // leaseholder, the Barrier call will already have waited for application, so + // this succeeds immediately. + if _, err = tp.r.WaitForLeaseAppliedIndex(ctx, lai); err != nil { + return errors.Wrap(err, "range barrier failed") + } + + return nil +} + // RangeFeed registers a rangefeed over the specified span. It sends updates to // the provided stream and returns with a future error when the rangefeed is // complete. The surrounding store's ConcurrentRequestLimiter is used to limit @@ -422,7 +467,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( feedBudget := r.store.GetStoreConfig().RangefeedBudgetFactory.CreateBudget(isSystemSpan) desc := r.Desc() - tp := rangefeedTxnPusher{ir: r.store.intentResolver, r: r} + tp := rangefeedTxnPusher{ir: r.store.intentResolver, r: r, span: desc.RSpan()} cfg := rangefeed.Config{ AmbientContext: r.AmbientContext, Clock: r.Clock(), From 6d92cefe2991ac5585ed4e9def6f0b725c4fdbdb Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Tue, 23 Jan 2024 19:55:56 +0000 Subject: [PATCH 2/2] rangefeed: assert intent commits above resolved timestamp Epic: none Release note: None --- .../kvserver/rangefeed/resolved_timestamp.go | 54 ++++++-- .../rangefeed/resolved_timestamp_test.go | 122 ++++++++++-------- .../kvserver/rangefeed/scheduled_processor.go | 4 +- 3 files changed, 110 insertions(+), 70 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/resolved_timestamp.go b/pkg/kv/kvserver/rangefeed/resolved_timestamp.go index 6c5e09643e4d..eab7118558c6 100644 --- a/pkg/kv/kvserver/rangefeed/resolved_timestamp.go +++ b/pkg/kv/kvserver/rangefeed/resolved_timestamp.go @@ -12,17 +12,26 @@ package rangefeed import ( "bytes" + "context" "fmt" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/container/heap" + "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" ) +// TODO(erikgrinaker): remove this once we're confident it won't fire. +var DisableCommitIntentTimestampAssertion = envutil.EnvOrDefaultBool( + "COCKROACH_RANGEFEED_DISABLE_COMMIT_INTENT_TIMESTAMP_ASSERTION", false) + // A rangefeed's "resolved timestamp" is defined as the timestamp at which no // future updates will be emitted to the feed at or before. The timestamp is // monotonically increasing and is communicated through RangeFeedCheckpoint @@ -81,11 +90,13 @@ type resolvedTimestamp struct { closedTS hlc.Timestamp resolvedTS hlc.Timestamp intentQ unresolvedIntentQueue + settings *cluster.Settings } -func makeResolvedTimestamp() resolvedTimestamp { +func makeResolvedTimestamp(st *cluster.Settings) resolvedTimestamp { return resolvedTimestamp{ - intentQ: makeUnresolvedIntentQueue(), + intentQ: makeUnresolvedIntentQueue(), + settings: st, } } @@ -129,32 +140,44 @@ func (rts *resolvedTimestamp) ForwardClosedTS(newClosedTS hlc.Timestamp) bool { // operation within its range of tracked keys. This allows the structure to // update its internal intent tracking to reflect the change. The method returns // whether this caused the resolved timestamp to move forward. -func (rts *resolvedTimestamp) ConsumeLogicalOp(op enginepb.MVCCLogicalOp) bool { - if rts.consumeLogicalOp(op) { +func (rts *resolvedTimestamp) ConsumeLogicalOp( + ctx context.Context, op enginepb.MVCCLogicalOp, +) bool { + if rts.consumeLogicalOp(ctx, op) { return rts.recompute() } rts.assertNoChange() return false } -func (rts *resolvedTimestamp) consumeLogicalOp(op enginepb.MVCCLogicalOp) bool { +func (rts *resolvedTimestamp) consumeLogicalOp( + ctx context.Context, op enginepb.MVCCLogicalOp, +) bool { switch t := op.GetValue().(type) { case *enginepb.MVCCWriteValueOp: - rts.assertOpAboveRTS(op, t.Timestamp) + rts.assertOpAboveRTS(ctx, op, t.Timestamp, true /* fatal */) return false case *enginepb.MVCCDeleteRangeOp: - rts.assertOpAboveRTS(op, t.Timestamp) + rts.assertOpAboveRTS(ctx, op, t.Timestamp, true /* fatal */) return false case *enginepb.MVCCWriteIntentOp: - rts.assertOpAboveRTS(op, t.Timestamp) + rts.assertOpAboveRTS(ctx, op, t.Timestamp, true /* fatal */) return rts.intentQ.IncRef(t.TxnID, t.TxnKey, t.TxnIsoLevel, t.TxnMinTimestamp, t.Timestamp) case *enginepb.MVCCUpdateIntentOp: return rts.intentQ.UpdateTS(t.TxnID, t.Timestamp) case *enginepb.MVCCCommitIntentOp: + // This assertion can be violated in mixed-version clusters, so make it + // fatal only in 24.1, gated by an envvar just in case. See: + // https://github.com/cockroachdb/cockroach/issues/104309 + // + // TODO(erikgrinaker): make this unconditionally fatal. + fatal := rts.settings.Version.IsActive(ctx, clusterversion.V24_1Start) && + !DisableCommitIntentTimestampAssertion + rts.assertOpAboveRTS(ctx, op, t.Timestamp, fatal) return rts.intentQ.DecrRef(t.TxnID, t.Timestamp) case *enginepb.MVCCAbortIntentOp: @@ -265,10 +288,19 @@ func (rts *resolvedTimestamp) assertNoChange() { // assertOpAboveTimestamp asserts that this operation is at a larger timestamp // than the current resolved timestamp. A violation of this assertion would // indicate a failure of the closed timestamp mechanism. -func (rts *resolvedTimestamp) assertOpAboveRTS(op enginepb.MVCCLogicalOp, opTS hlc.Timestamp) { +func (rts *resolvedTimestamp) assertOpAboveRTS( + ctx context.Context, op enginepb.MVCCLogicalOp, opTS hlc.Timestamp, fatal bool, +) { if opTS.LessEq(rts.resolvedTS) { - panic(fmt.Sprintf("resolved timestamp %s equal to or above timestamp of operation %v", - rts.resolvedTS, op)) + err := errors.AssertionFailedf( + "resolved timestamp %s equal to or above timestamp of operation %v", rts.resolvedTS, op) + if fatal { + // TODO(erikgrinaker): use log.Fatalf. Panic for now, since tests expect + // it and to minimize code churn for backports. + panic(err) + } else { + log.Errorf(ctx, "%v", err) + } } } diff --git a/pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go b/pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go index 4c890bd75780..b13f54c17a64 100644 --- a/pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go +++ b/pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go @@ -11,10 +11,12 @@ package rangefeed import ( + "context" "testing" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -179,7 +181,8 @@ func TestUnresolvedIntentQueue(t *testing.T) { func TestResolvedTimestamp(t *testing.T) { defer leaktest.AfterTest(t)() - rts := makeResolvedTimestamp() + ctx := context.Background() + rts := makeResolvedTimestamp(cluster.MakeTestingClusterSettings()) rts.Init() // Test empty resolved timestamp. @@ -187,13 +190,13 @@ func TestResolvedTimestamp(t *testing.T) { // Add an intent. No closed timestamp so no resolved timestamp. txn1 := uuid.MakeV4() - fwd := rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 10})) + fwd := rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 10})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Add another intent. No closed timestamp so no resolved timestamp. txn2 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn2, hlc.Timestamp{WallTime: 12})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn2, hlc.Timestamp{WallTime: 12})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) @@ -204,16 +207,16 @@ func TestResolvedTimestamp(t *testing.T) { // Write intent at earlier timestamp. Assertion failure. require.Panics(t, func() { - rts.ConsumeLogicalOp(writeIntentOp(uuid.MakeV4(), hlc.Timestamp{WallTime: 3})) + rts.ConsumeLogicalOp(ctx, writeIntentOp(uuid.MakeV4(), hlc.Timestamp{WallTime: 3})) }) // Write value at earlier timestamp. Assertion failure. require.Panics(t, func() { - rts.ConsumeLogicalOp(writeValueOp(hlc.Timestamp{WallTime: 4})) + rts.ConsumeLogicalOp(ctx, writeValueOp(hlc.Timestamp{WallTime: 4})) }) // Write value at later timestamp. No effect on resolved timestamp. - fwd = rts.ConsumeLogicalOp(writeValueOp(hlc.Timestamp{WallTime: 6})) + fwd = rts.ConsumeLogicalOp(ctx, writeValueOp(hlc.Timestamp{WallTime: 6})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 5}, rts.Get()) @@ -224,12 +227,12 @@ func TestResolvedTimestamp(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 9}, rts.Get()) // Update the timestamp of txn2. No effect on the resolved timestamp. - fwd = rts.ConsumeLogicalOp(updateIntentOp(txn2, hlc.Timestamp{WallTime: 18})) + fwd = rts.ConsumeLogicalOp(ctx, updateIntentOp(txn2, hlc.Timestamp{WallTime: 18})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 9}, rts.Get()) // Update the timestamp of txn1. Resolved timestamp moves forward. - fwd = rts.ConsumeLogicalOp(updateIntentOp(txn1, hlc.Timestamp{WallTime: 20})) + fwd = rts.ConsumeLogicalOp(ctx, updateIntentOp(txn1, hlc.Timestamp{WallTime: 20})) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get()) @@ -239,13 +242,13 @@ func TestResolvedTimestamp(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 17}, rts.Get()) // Write intent for earliest txn at same timestamp. No change. - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn2, hlc.Timestamp{WallTime: 18})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn2, hlc.Timestamp{WallTime: 18})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 17}, rts.Get()) // Write intent for earliest txn at later timestamp. Resolved // timestamp moves forward. - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn2, hlc.Timestamp{WallTime: 25})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn2, hlc.Timestamp{WallTime: 25})) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 18}, rts.Get()) @@ -256,47 +259,47 @@ func TestResolvedTimestamp(t *testing.T) { // First transaction aborted. Resolved timestamp moves to next earliest // intent. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortTxnOp(txn1)) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) - fwd = rts.ConsumeLogicalOp(abortIntentOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortIntentOp(txn1)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) // Third transaction at higher timestamp. No effect. txn3 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn3, hlc.Timestamp{WallTime: 30})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn3, hlc.Timestamp{WallTime: 30})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn3, hlc.Timestamp{WallTime: 31})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn3, hlc.Timestamp{WallTime: 31})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) // Third transaction aborted. No effect. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn3)) + fwd = rts.ConsumeLogicalOp(ctx, abortTxnOp(txn3)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) - fwd = rts.ConsumeLogicalOp(abortIntentOp(txn3)) + fwd = rts.ConsumeLogicalOp(ctx, abortIntentOp(txn3)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) - fwd = rts.ConsumeLogicalOp(abortIntentOp(txn3)) + fwd = rts.ConsumeLogicalOp(ctx, abortIntentOp(txn3)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) // Fourth transaction at higher timestamp. No effect. txn4 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn4, hlc.Timestamp{WallTime: 45})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn4, hlc.Timestamp{WallTime: 45})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) // Fourth transaction committed. No effect. - fwd = rts.ConsumeLogicalOp(commitIntentOp(txn4, hlc.Timestamp{WallTime: 45})) + fwd = rts.ConsumeLogicalOp(ctx, commitIntentOp(txn4, hlc.Timestamp{WallTime: 45})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) // Second transaction observes one intent being resolved at timestamp // above closed time. Resolved timestamp moves to closed timestamp. - fwd = rts.ConsumeLogicalOp(commitIntentOp(txn2, hlc.Timestamp{WallTime: 35})) + fwd = rts.ConsumeLogicalOp(ctx, commitIntentOp(txn2, hlc.Timestamp{WallTime: 35})) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 30}, rts.Get()) @@ -307,22 +310,22 @@ func TestResolvedTimestamp(t *testing.T) { // Second transaction observes another intent being resolved at timestamp // below closed time. Still one intent left. - fwd = rts.ConsumeLogicalOp(commitIntentOp(txn2, hlc.Timestamp{WallTime: 35})) + fwd = rts.ConsumeLogicalOp(ctx, commitIntentOp(txn2, hlc.Timestamp{WallTime: 35})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 34}, rts.Get()) // Second transaction observes final intent being resolved at timestamp // below closed time. Resolved timestamp moves to closed timestamp. - fwd = rts.ConsumeLogicalOp(commitIntentOp(txn2, hlc.Timestamp{WallTime: 35})) + fwd = rts.ConsumeLogicalOp(ctx, commitIntentOp(txn2, hlc.Timestamp{WallTime: 35})) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 40}, rts.Get()) // Fifth transaction at higher timestamp. No effect. txn5 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn5, hlc.Timestamp{WallTime: 45})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn5, hlc.Timestamp{WallTime: 45})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 40}, rts.Get()) - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn5, hlc.Timestamp{WallTime: 46})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn5, hlc.Timestamp{WallTime: 46})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 40}, rts.Get()) @@ -333,7 +336,7 @@ func TestResolvedTimestamp(t *testing.T) { // Fifth transaction bumps epoch and re-writes one of its intents. Resolved // timestamp moves to the new transaction timestamp. - fwd = rts.ConsumeLogicalOp(updateIntentOp(txn5, hlc.Timestamp{WallTime: 47})) + fwd = rts.ConsumeLogicalOp(ctx, updateIntentOp(txn5, hlc.Timestamp{WallTime: 47})) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 46}, rts.Get()) @@ -341,55 +344,56 @@ func TestResolvedTimestamp(t *testing.T) { // its final epoch. Resolved timestamp moves forward after observing the // first intent committing at a higher timestamp and moves to the closed // timestamp after observing the second intent aborting. - fwd = rts.ConsumeLogicalOp(commitIntentOp(txn5, hlc.Timestamp{WallTime: 49})) + fwd = rts.ConsumeLogicalOp(ctx, commitIntentOp(txn5, hlc.Timestamp{WallTime: 49})) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 48}, rts.Get()) - fwd = rts.ConsumeLogicalOp(abortIntentOp(txn5)) + fwd = rts.ConsumeLogicalOp(ctx, abortIntentOp(txn5)) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 50}, rts.Get()) } func TestResolvedTimestampNoClosedTimestamp(t *testing.T) { defer leaktest.AfterTest(t)() - rts := makeResolvedTimestamp() + ctx := context.Background() + rts := makeResolvedTimestamp(cluster.MakeTestingClusterSettings()) rts.Init() // Add a value. No closed timestamp so no resolved timestamp. - fwd := rts.ConsumeLogicalOp(writeValueOp(hlc.Timestamp{WallTime: 1})) + fwd := rts.ConsumeLogicalOp(ctx, writeValueOp(hlc.Timestamp{WallTime: 1})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Add an intent. No closed timestamp so no resolved timestamp. txn1 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 1})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 1})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Update intent. No closed timestamp so no resolved timestamp. - fwd = rts.ConsumeLogicalOp(updateIntentOp(txn1, hlc.Timestamp{WallTime: 2})) + fwd = rts.ConsumeLogicalOp(ctx, updateIntentOp(txn1, hlc.Timestamp{WallTime: 2})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Add another intent. No closed timestamp so no resolved timestamp. txn2 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn2, hlc.Timestamp{WallTime: 3})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn2, hlc.Timestamp{WallTime: 3})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Abort the first intent. No closed timestamp so no resolved timestamp. - fwd = rts.ConsumeLogicalOp(abortIntentOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortIntentOp(txn1)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Commit the second intent. No closed timestamp so no resolved timestamp. - fwd = rts.ConsumeLogicalOp(commitIntentOp(txn2, hlc.Timestamp{WallTime: 3})) + fwd = rts.ConsumeLogicalOp(ctx, commitIntentOp(txn2, hlc.Timestamp{WallTime: 3})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) } func TestResolvedTimestampNoIntents(t *testing.T) { defer leaktest.AfterTest(t)() - rts := makeResolvedTimestamp() + rts := makeResolvedTimestamp(cluster.MakeTestingClusterSettings()) rts.Init() // Set a closed timestamp. Resolved timestamp advances. @@ -421,8 +425,10 @@ func TestResolvedTimestampNoIntents(t *testing.T) { func TestResolvedTimestampInit(t *testing.T) { defer leaktest.AfterTest(t)() + ctx := context.Background() + t.Run("CT Before Init", func(t *testing.T) { - rts := makeResolvedTimestamp() + rts := makeResolvedTimestamp(cluster.MakeTestingClusterSettings()) // Set a closed timestamp. Not initialized so no resolved timestamp. fwd := rts.ForwardClosedTS(hlc.Timestamp{WallTime: 5}) @@ -435,11 +441,11 @@ func TestResolvedTimestampInit(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 5}, rts.Get()) }) t.Run("No CT Before Init", func(t *testing.T) { - rts := makeResolvedTimestamp() + rts := makeResolvedTimestamp(cluster.MakeTestingClusterSettings()) // Add an intent. Not initialized so no resolved timestamp. txn1 := uuid.MakeV4() - fwd := rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 3})) + fwd := rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 3})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) @@ -449,11 +455,11 @@ func TestResolvedTimestampInit(t *testing.T) { require.Equal(t, hlc.Timestamp{}, rts.Get()) }) t.Run("Write Before Init", func(t *testing.T) { - rts := makeResolvedTimestamp() + rts := makeResolvedTimestamp(cluster.MakeTestingClusterSettings()) // Add an intent. Not initialized so no resolved timestamp. txn1 := uuid.MakeV4() - fwd := rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 3})) + fwd := rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 3})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) @@ -468,16 +474,16 @@ func TestResolvedTimestampInit(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 2}, rts.Get()) }) t.Run("Abort + Write Before Init", func(t *testing.T) { - rts := makeResolvedTimestamp() + rts := makeResolvedTimestamp(cluster.MakeTestingClusterSettings()) // Abort an intent. Not initialized so no resolved timestamp. txn1 := uuid.MakeV4() - fwd := rts.ConsumeLogicalOp(abortIntentOp(txn1)) + fwd := rts.ConsumeLogicalOp(ctx, abortIntentOp(txn1)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Abort that intent's transaction. Not initialized so no-op. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortTxnOp(txn1)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) @@ -485,7 +491,7 @@ func TestResolvedTimestampInit(t *testing.T) { // out with the out-of-order intent abort operation. If this abort hadn't // allowed the unresolvedTxn's ref count to drop below 0, this would // have created a reference that would never be cleaned up. - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 3})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 3})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) @@ -500,11 +506,11 @@ func TestResolvedTimestampInit(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 5}, rts.Get()) }) t.Run("Abort Before Init, No Write", func(t *testing.T) { - rts := makeResolvedTimestamp() + rts := makeResolvedTimestamp(cluster.MakeTestingClusterSettings()) // Abort an intent. Not initialized so no resolved timestamp. txn1 := uuid.MakeV4() - fwd := rts.ConsumeLogicalOp(abortIntentOp(txn1)) + fwd := rts.ConsumeLogicalOp(ctx, abortIntentOp(txn1)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) @@ -516,7 +522,8 @@ func TestResolvedTimestampInit(t *testing.T) { func TestResolvedTimestampTxnAborted(t *testing.T) { defer leaktest.AfterTest(t)() - rts := makeResolvedTimestamp() + ctx := context.Background() + rts := makeResolvedTimestamp(cluster.MakeTestingClusterSettings()) rts.Init() // Set a closed timestamp. Resolved timestamp advances. @@ -526,7 +533,7 @@ func TestResolvedTimestampTxnAborted(t *testing.T) { // Add an intent for a new transaction. txn1 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 10})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 10})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 5}, rts.Get()) @@ -536,23 +543,23 @@ func TestResolvedTimestampTxnAborted(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 9}, rts.Get()) // Abort txn1 after a periodic txn push. Resolved timestamp advances. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortTxnOp(txn1)) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get()) // Update one of txn1's intents. Should be ignored. - fwd = rts.ConsumeLogicalOp(updateIntentOp(txn1, hlc.Timestamp{WallTime: 20})) + fwd = rts.ConsumeLogicalOp(ctx, updateIntentOp(txn1, hlc.Timestamp{WallTime: 20})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get()) // Abort one of txn1's intents. Should be ignored. - fwd = rts.ConsumeLogicalOp(abortIntentOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortIntentOp(txn1)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get()) // Write another intent as txn1. Should add txn1 back into queue. // This will eventually require another txn push to evict. - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 20})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 20})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get()) @@ -563,7 +570,7 @@ func TestResolvedTimestampTxnAborted(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 19}, rts.Get()) // Abort txn1 again after another periodic push. Resolved timestamp advances. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortTxnOp(txn1)) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 25}, rts.Get()) } @@ -572,7 +579,8 @@ func TestResolvedTimestampTxnAborted(t *testing.T) { func TestClosedTimestampLogicalPart(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - rts := makeResolvedTimestamp() + ctx := context.Background() + rts := makeResolvedTimestamp(cluster.MakeTestingClusterSettings()) rts.Init() // Set a new closed timestamp. Resolved timestamp advances. @@ -582,7 +590,7 @@ func TestClosedTimestampLogicalPart(t *testing.T) { // Add an intent for a new transaction. txn1 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 10, Logical: 4})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 10, Logical: 4})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 10, Logical: 0}, rts.Get()) @@ -594,7 +602,7 @@ func TestClosedTimestampLogicalPart(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 10, Logical: 0}, rts.Get()) // Abort txn1. Resolved timestamp advances. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortTxnOp(txn1)) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 11, Logical: 0}, rts.Get()) @@ -603,7 +611,7 @@ func TestClosedTimestampLogicalPart(t *testing.T) { // and an intent is in the next wall tick; this used to cause an issue because // of the rounding logic. txn2 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn2, hlc.Timestamp{WallTime: 12, Logical: 7})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn2, hlc.Timestamp{WallTime: 12, Logical: 7})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 11, Logical: 0}, rts.Get()) } diff --git a/pkg/kv/kvserver/rangefeed/scheduled_processor.go b/pkg/kv/kvserver/rangefeed/scheduled_processor.go index f00e670a0616..3754f410e01f 100644 --- a/pkg/kv/kvserver/rangefeed/scheduled_processor.go +++ b/pkg/kv/kvserver/rangefeed/scheduled_processor.go @@ -82,7 +82,7 @@ func NewScheduledProcessor(cfg Config) *ScheduledProcessor { Config: cfg, scheduler: cfg.Scheduler.NewClientScheduler(), reg: makeRegistry(cfg.Metrics), - rts: makeResolvedTimestamp(), + rts: makeResolvedTimestamp(cfg.Settings), processCtx: cfg.AmbientContext.AnnotateCtx(context.Background()), requestQueue: make(chan request, 20), @@ -698,7 +698,7 @@ func (p *ScheduledProcessor) consumeLogicalOps( // Determine whether the operation caused the resolved timestamp to // move forward. If so, publish a RangeFeedCheckpoint notification. - if p.rts.ConsumeLogicalOp(op) { + if p.rts.ConsumeLogicalOp(ctx, op) { p.publishCheckpoint(ctx) } }