diff --git a/pkg/migration/migrations/BUILD.bazel b/pkg/migration/migrations/BUILD.bazel index f4a67d457a42..12b68055b2dc 100644 --- a/pkg/migration/migrations/BUILD.bazel +++ b/pkg/migration/migrations/BUILD.bazel @@ -87,6 +87,7 @@ go_test( "//pkg/jobs", "//pkg/keys", "//pkg/kv", + "//pkg/kv/kvclient/rangefeed", "//pkg/kv/kvclient/rangefeed/rangefeedcache", "//pkg/kv/kvserver", "//pkg/kv/kvserver/stateloader", @@ -96,6 +97,7 @@ go_test( "//pkg/server", "//pkg/settings/cluster", "//pkg/spanconfig", + "//pkg/spanconfig/spanconfigsqlwatcher", "//pkg/sql", "//pkg/sql/catalog", "//pkg/sql/catalog/catalogkeys", @@ -118,6 +120,7 @@ go_test( "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/protoutil", + "//pkg/util/syncutil", "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", diff --git a/pkg/migration/migrations/migrate_span_configs_test.go b/pkg/migration/migrations/migrate_span_configs_test.go index 1b0658f5b684..1581bdd1b555 100644 --- a/pkg/migration/migrations/migrate_span_configs_test.go +++ b/pkg/migration/migrations/migrate_span_configs_test.go @@ -13,21 +13,127 @@ package migrations_test import ( "context" "fmt" + "sync" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedcache" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigsqlwatcher" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) +// TestMixedVersionClusterEnableRangefeeds tests that clusters that haven't +// migrated into the span configs still support rangefeeds over system table +// ranges. +func TestMixedVersionClusterEnableRangefeeds(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: make(chan struct{}), + BinaryVersionOverride: clusterversion.ByKey( + clusterversion.EnsureSpanConfigReconciliation - 1, + ), + }, + SpanConfig: &spanconfig.TestingKnobs{ + ManagerDisableJobCreation: true, + }, + }, + }, + }) + + defer tc.Stopper().Stop(ctx) + ts := tc.Server(0) + + // We spin up a SQL watcher, which makes use of range feeds internally over + // system tables. By observing SQL descriptor updates through the watcher, we + // know that the rangefeeds are enabled. + noopCheckpointDuration := 100 * time.Millisecond + sqlWatcher := spanconfigsqlwatcher.New( + keys.SystemSQLCodec, + ts.ClusterSettings(), + ts.RangeFeedFactory().(*rangefeed.Factory), + 1<<20, /* 1 MB, bufferMemLimit */ + ts.Stopper(), + noopCheckpointDuration, + nil, /* knobs */ + ) + + tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0 /* idx */)) + beforeStmtTS := ts.Clock().Now() + tdb.Exec(t, "CREATE TABLE t()") + afterStmtTS := ts.Clock().Now() + var expDescID descpb.ID + row := tdb.QueryRow(t, `SELECT id FROM system.namespace WHERE name='t'`) + row.Scan(&expDescID) + + var wg sync.WaitGroup + mu := struct { + syncutil.Mutex + lastCheckpoint hlc.Timestamp + }{} + + watch := func(ctx context.Context, onCheckpoint func(hlc.Timestamp)) { + defer wg.Done() + + receivedIDs := make(map[descpb.ID]struct{}) + err := sqlWatcher.WatchForSQLUpdates(ctx, beforeStmtTS, + func(_ context.Context, updates []spanconfig.SQLUpdate, checkpointTS hlc.Timestamp) error { + onCheckpoint(checkpointTS) + + for _, update := range updates { + receivedIDs[update.GetDescriptorUpdate().ID] = struct{}{} + } + return nil + }) + require.True(t, testutils.IsError(err, "context canceled")) + require.Equal(t, 1, len(receivedIDs)) + _, seen := receivedIDs[expDescID] + require.True(t, seen) + } + + watcherCtx, watcherCancel := context.WithCancel(ctx) + wg.Add(1) + go watch(watcherCtx, func(ts hlc.Timestamp) { + mu.Lock() + mu.lastCheckpoint = ts + mu.Unlock() + }) + + testutils.SucceedsSoon(t, func() error { + mu.Lock() + defer mu.Unlock() + + if mu.lastCheckpoint.Less(afterStmtTS) { + return errors.New("w1 checkpoint precedes statement timestamp") + } + return nil + }) + + watcherCancel() + wg.Wait() +} + // TestEnsureSpanConfigReconciliation verifies that the migration waits for a // span config reconciliation attempt, blocking until it occurs. func TestEnsureSpanConfigReconciliation(t *testing.T) {