diff --git a/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go b/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go index 875b84cea6a9..c291a39d42ab 100644 --- a/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go +++ b/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go @@ -47,6 +47,7 @@ func TestEvaluator(t *testing.T) { defer s.Stopper().Stop(context.Background()) sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, "SET CLUSTER SETTING kv.rangefeed.enabled = true") sqlDB.Exec(t, `CREATE TYPE status AS ENUM ('open', 'closed', 'inactive')`) sqlDB.Exec(t, ` CREATE TABLE foo ( diff --git a/pkg/ccl/changefeedccl/cdcevent/event_test.go b/pkg/ccl/changefeedccl/cdcevent/event_test.go index 2b5d00da3e73..7c64058218d7 100644 --- a/pkg/ccl/changefeedccl/cdcevent/event_test.go +++ b/pkg/ccl/changefeedccl/cdcevent/event_test.go @@ -140,6 +140,7 @@ func TestEventDecoder(t *testing.T) { defer s.Stopper().Stop(context.Background()) sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, "SET CLUSTER SETTING kv.rangefeed.enabled = true") sqlDB.Exec(t, `CREATE TYPE status AS ENUM ('open', 'closed', 'inactive')`) sqlDB.Exec(t, ` CREATE TABLE foo ( @@ -419,6 +420,7 @@ func TestEventColumnOrderingWithSchemaChanges(t *testing.T) { defer s.Stopper().Stop(context.Background()) sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, "SET CLUSTER SETTING kv.rangefeed.enabled = true") // Use alter column type to force column reordering. sqlDB.Exec(t, `SET enable_experimental_alter_column_type_general = true`) @@ -761,6 +763,7 @@ func BenchmarkEventDecoder(b *testing.B) { defer s.Stopper().Stop(context.Background()) sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(b, "SET CLUSTER SETTING kv.rangefeed.enabled = true") sqlDB.Exec(b, ` CREATE TABLE foo ( a INT, diff --git a/pkg/ccl/changefeedccl/cdctest/BUILD.bazel b/pkg/ccl/changefeedccl/cdctest/BUILD.bazel index 9fc078027ba0..195dab063c15 100644 --- a/pkg/ccl/changefeedccl/cdctest/BUILD.bazel +++ b/pkg/ccl/changefeedccl/cdctest/BUILD.bazel @@ -20,6 +20,7 @@ go_library( "//pkg/keys", "//pkg/kv/kvclient/rangefeed", "//pkg/kv/kvpb", + "//pkg/kv/kvserver", "//pkg/roachpb", "//pkg/sql", "//pkg/sql/catalog", diff --git a/pkg/ccl/changefeedccl/cdctest/row.go b/pkg/ccl/changefeedccl/cdctest/row.go index 7bbf19c8ee0f..cde58e42aba7 100644 --- a/pkg/ccl/changefeedccl/cdctest/row.go +++ b/pkg/ccl/changefeedccl/cdctest/row.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" @@ -37,6 +38,13 @@ func MakeRangeFeedValueReader( ) (func(t testing.TB) *kvpb.RangeFeedValue, func()) { t.Helper() execCfg := execCfgI.(sql.ExecutorConfig) + + // Rangefeeds might still work even when this setting is false because span + // configs may enable them, but relying on span configs can be prone to + // issues as seen in #109507. Therefore, we assert that the cluster setting + // is set. + require.True(t, kvserver.RangefeedEnabled.Get(&execCfg.Settings.SV)) + rows := make(chan *kvpb.RangeFeedValue) ctx, cleanup := context.WithCancel(context.Background()) diff --git a/pkg/ccl/changefeedccl/parquet_test.go b/pkg/ccl/changefeedccl/parquet_test.go index c545fed3b3fd..822bb7e88f02 100644 --- a/pkg/ccl/changefeedccl/parquet_test.go +++ b/pkg/ccl/changefeedccl/parquet_test.go @@ -58,6 +58,7 @@ func TestParquetRows(t *testing.T) { maxRowGroupSize := int64(2) sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, "SET CLUSTER SETTING kv.rangefeed.enabled = true") for _, tc := range []struct { testName string diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index d4e8b5a204fc..30b468a56f02 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -1844,8 +1844,8 @@ func (r *Replica) checkExecutionCanProceedForRangeFeed( } else if err := r.checkSpanInRangeRLocked(ctx, rSpan); err != nil { return err } else if !r.isRangefeedEnabledRLocked() && !RangefeedEnabled.Get(&r.store.cfg.Settings.SV) { - return errors.Errorf("rangefeeds require the kv.rangefeed.enabled setting. See %s", - docs.URL(`change-data-capture.html#enable-rangefeeds-to-reduce-latency`)) + return errors.Errorf("[r%d] rangefeeds require the kv.rangefeed.enabled setting. See %s", + r.RangeID, docs.URL(`change-data-capture.html#enable-rangefeeds-to-reduce-latency`)) } else if err := r.checkTSAboveGCThresholdRLocked(ts, status, false /* isAdmin */); err != nil { return err }