diff --git a/pkg/kv/kvserver/rangefeed/BUILD.bazel b/pkg/kv/kvserver/rangefeed/BUILD.bazel index 6564dff16970..d0fa7525b766 100644 --- a/pkg/kv/kvserver/rangefeed/BUILD.bazel +++ b/pkg/kv/kvserver/rangefeed/BUILD.bazel @@ -24,6 +24,7 @@ go_library( "//pkg/kv/kvserver/concurrency/lock", "//pkg/roachpb", "//pkg/settings", + "//pkg/settings/cluster", "//pkg/storage", "//pkg/storage/enginepb", "//pkg/util/admission", diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index 8c7f003f1905..d9e62a1ddfb0 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -18,6 +18,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/future" @@ -38,6 +40,15 @@ var ( // consider a transaction old enough to push. defaultPushTxnsAge = envutil.EnvOrDefaultDuration( "COCKROACH_RANGEFEED_PUSH_TXNS_AGE", 10*time.Second) + + // PushTxnsEnabled can be used to disable rangefeed txn pushes, typically to + // temporarily alleviate contention. + PushTxnsEnabled = settings.RegisterBoolSetting( + settings.SystemOnly, + "kv.rangefeed.push_txns.enabled", + "periodically push txn write timestamps to advance rangefeed resolved timestamps", + true, + ) ) // newErrBufferCapacityExceeded creates an error that is returned to subscribers @@ -52,10 +63,11 @@ func newErrBufferCapacityExceeded() *kvpb.Error { // Config encompasses the configuration required to create a Processor. type Config struct { log.AmbientContext - Clock *hlc.Clock - Stopper *stop.Stopper - RangeID roachpb.RangeID - Span roachpb.RSpan + Clock *hlc.Clock + Stopper *stop.Stopper + Settings *cluster.Settings + RangeID roachpb.RangeID + Span roachpb.RSpan TxnPusher TxnPusher // PushTxnsInterval specifies the interval at which a Processor will push @@ -463,9 +475,9 @@ func (p *LegacyProcessor) run( // Check whether any unresolved intents need a push. case <-txnPushTickerC: - // Don't perform transaction push attempts until the resolved timestamp - // has been initialized, or if we're not tracking any intents. - if !p.rts.IsInit() || p.rts.intentQ.Len() == 0 { + // Don't perform transaction push attempts if disabled, until the resolved + // timestamp has been initialized, or if we're not tracking any intents. + if !PushTxnsEnabled.Get(&p.Settings.SV) || !p.rts.IsInit() || p.rts.intentQ.Len() == 0 { continue } diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index 183e56edd939..9a2e9459623b 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -72,6 +72,11 @@ func writeIntentOpWithDetails( }) } +func writeIntentOpFromMeta(txn enginepb.TxnMeta) enginepb.MVCCLogicalOp { + return writeIntentOpWithDetails( + txn.ID, txn.Key, txn.IsoLevel, txn.MinTimestamp, txn.WriteTimestamp) +} + func writeIntentOpWithKey( txnID uuid.UUID, key []byte, iso isolation.Level, ts hlc.Timestamp, ) enginepb.MVCCLogicalOp { @@ -291,6 +296,19 @@ func withSpan(span roachpb.RSpan) option { } } +func withSettings(st *cluster.Settings) option { + return func(config *testConfig) { + config.Settings = st + } +} + +func withPushTxnsIntervalAge(interval, age time.Duration) option { + return func(config *testConfig) { + config.PushTxnsInterval = interval + config.PushTxnsAge = age + } +} + // blockingScanner is a test intent scanner that allows test to track lifecycle // of tasks. // 1. it will always block on startup and will wait for block to be closed to @@ -344,11 +362,13 @@ func newTestProcessor( ) (Processor, *processorTestHelper, *stop.Stopper) { t.Helper() stopper := stop.NewStopper() + st := cluster.MakeTestingClusterSettings() cfg := testConfig{ Config: Config{ RangeID: 2, Stopper: stopper, + Settings: st, AmbientContext: log.MakeTestingAmbientCtxWithNewTracer(), Clock: hlc.NewClockForTesting(nil), Span: roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")}, @@ -1096,10 +1116,6 @@ func TestProcessorTxnPushAttempt(t *testing.T) { defer stopper.Stop(ctx) // Add a few intents and move the closed timestamp forward. - writeIntentOpFromMeta := func(txn enginepb.TxnMeta) enginepb.MVCCLogicalOp { - return writeIntentOpWithDetails(txn.ID, txn.Key, txn.IsoLevel, txn.MinTimestamp, - txn.WriteTimestamp) - } p.ConsumeLogicalOps(ctx, writeIntentOpFromMeta(txn1Meta), writeIntentOpFromMeta(txn2Meta), @@ -1171,6 +1187,64 @@ func TestProcessorTxnPushAttempt(t *testing.T) { }) } +// TestProcessorTxnPushDisabled tests that processors don't attempt txn pushes +// when disabled. +func TestProcessorTxnPushDisabled(t *testing.T) { + defer leaktest.AfterTest(t)() + + const pushInterval = 10 * time.Millisecond + + // Set up a txn to write intents. + ts := hlc.Timestamp{WallTime: 10} + txnID := uuid.MakeV4() + txnMeta := enginepb.TxnMeta{ + ID: txnID, + Key: keyA, + IsoLevel: isolation.Serializable, + WriteTimestamp: ts, + MinTimestamp: ts, + } + + // Disable txn pushes. + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + PushTxnsEnabled.Override(ctx, &st.SV, false) + + // Set up a txn pusher and processor that errors on any pushes. + // + // TODO(kv): We don't test the scheduled processor here, since the setting + // instead controls the Store.startRangefeedTxnPushNotifier() loop which sits + // outside of the processor and can't be tested with this test harness. Write + // a new test when the legacy processor is removed and the scheduled processor + // is used by default. + var tp testTxnPusher + tp.mockPushTxns(func(txns []enginepb.TxnMeta, ts hlc.Timestamp) ([]*roachpb.Transaction, error) { + err := errors.Errorf("unexpected txn push for txns=%v ts=%s", txns, ts) + t.Errorf("%v", err) + return nil, err + }) + + p, h, stopper := newTestProcessor(t, withSettings(st), withPusher(&tp), + withPushTxnsIntervalAge(pushInterval, time.Millisecond)) + defer stopper.Stop(ctx) + + // Move the resolved ts forward to just before the txn timestamp. + rts := ts.Add(-1, 0) + require.True(t, p.ForwardClosedTS(ctx, rts)) + h.syncEventC() + require.Equal(t, rts, h.rts.Get()) + + // Add a few intents and move the closed timestamp forward. + p.ConsumeLogicalOps(ctx, writeIntentOpFromMeta(txnMeta)) + p.ForwardClosedTS(ctx, ts) + h.syncEventC() + require.Equal(t, rts, h.rts.Get()) + + // Wait for 10x the push txns interval, to make sure pushes are disabled. + // Waiting for something to not happen is a bit smelly, but gets the job done. + time.Sleep(10 * pushInterval) +} + // TestProcessorConcurrentStop tests that all methods in Processor's API // correctly handle the processor concurrently shutting down. If they did // not then it would be possible for them to deadlock. diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index 6dab2f5d5823..56d5615a3f6d 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -444,6 +444,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( AmbientContext: r.AmbientContext, Clock: r.Clock(), Stopper: r.store.stopper, + Settings: r.store.ClusterSettings(), RangeID: r.RangeID, Span: desc.RSpan(), TxnPusher: &tp, diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 226be5970edf..f6f204cff3bb 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -2450,6 +2450,9 @@ func (s *Store) startRangefeedTxnPushNotifier(ctx context.Context) { for { select { case <-ticker.C: + if !rangefeed.PushTxnsEnabled.Get(&s.ClusterSettings().SV) { + continue + } batch := makeSchedulerBatch() s.rangefeedScheduler.EnqueueBatch(batch, rangefeed.PushTxnQueued) batch.Close()