From d8937112833242c17b648e71c5efadcfda3c5d3f Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Fri, 11 Feb 2022 21:04:09 -0500 Subject: [PATCH 1/2] spanconfig: squash benign data race Fixes #75849. Release note: None --- .../migrations/migrate_span_configs_test.go | 4 ++-- pkg/spanconfig/spanconfigjob/BUILD.bazel | 1 + pkg/spanconfig/spanconfigjob/job.go | 24 ++++++++++++++----- 3 files changed, 21 insertions(+), 8 deletions(-) diff --git a/pkg/migration/migrations/migrate_span_configs_test.go b/pkg/migration/migrations/migrate_span_configs_test.go index 956052920fa7..1b0658f5b684 100644 --- a/pkg/migration/migrations/migrate_span_configs_test.go +++ b/pkg/migration/migrations/migrate_span_configs_test.go @@ -21,7 +21,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/testutils" - "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -33,7 +32,6 @@ import ( // span config reconciliation attempt, blocking until it occurs. func TestEnsureSpanConfigReconciliation(t *testing.T) { defer leaktest.AfterTest(t)() - skip.WithIssue(t, 75849, "flaky test") defer log.Scope(t).Close(t) ctx := context.Background() @@ -99,6 +97,8 @@ func TestEnsureSpanConfigReconciliation(t *testing.T) { } } +// TestEnsureSpanConfigReconciliationMultiNode verifies that the span config +// reconciliation migration works in a multi-node setting. func TestEnsureSpanConfigReconciliationMultiNode(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/spanconfig/spanconfigjob/BUILD.bazel b/pkg/spanconfig/spanconfigjob/BUILD.bazel index b9a314698752..995675888a53 100644 --- a/pkg/spanconfig/spanconfigjob/BUILD.bazel +++ b/pkg/spanconfig/spanconfigjob/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "//pkg/util/hlc", "//pkg/util/log", "//pkg/util/retry", + "//pkg/util/syncutil", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", ], diff --git a/pkg/spanconfig/spanconfigjob/job.go b/pkg/spanconfig/spanconfigjob/job.go index a8d277523de1..5afc9025a0af 100644 --- a/pkg/spanconfig/spanconfigjob/job.go +++ b/pkg/spanconfig/spanconfigjob/job.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" ) @@ -91,18 +92,25 @@ func (r *resumer) Resume(ctx context.Context, execCtxI interface{}) (jobErr erro // timestamp. settingValues := &execCtx.ExecCfg().Settings.SV - persistCheckpoints := util.Every(reconciliationJobCheckpointInterval.Get(settingValues)) + persistCheckpointsMu := struct { + syncutil.Mutex + util.EveryN + }{} + persistCheckpointsMu.EveryN = util.Every(reconciliationJobCheckpointInterval.Get(settingValues)) + reconciliationJobCheckpointInterval.SetOnChange(settingValues, func(ctx context.Context) { - persistCheckpoints = util.Every(reconciliationJobCheckpointInterval.Get(settingValues)) + persistCheckpointsMu.Lock() + defer persistCheckpointsMu.Unlock() + persistCheckpointsMu.EveryN = util.Every(reconciliationJobCheckpointInterval.Get(settingValues)) }) - shouldPersistCheckpoint := true + checkpointingDisabled := false shouldSkipRetry := false var onCheckpointInterceptor func() error if knobs := execCtx.ExecCfg().SpanConfigTestingKnobs; knobs != nil { if knobs.JobDisablePersistingCheckpoints { - shouldPersistCheckpoint = false + checkpointingDisabled = true } shouldSkipRetry = knobs.JobDisableInternalRetry onCheckpointInterceptor = knobs.JobOnCheckpointInterceptor @@ -125,10 +133,14 @@ func (r *resumer) Resume(ctx context.Context, execCtxI interface{}) (jobErr erro } } - if !shouldPersistCheckpoint { + if checkpointingDisabled { return nil } - if !persistCheckpoints.ShouldProcess(timeutil.Now()) { + + persistCheckpointsMu.Lock() + shouldPersistCheckpoint := persistCheckpointsMu.ShouldProcess(timeutil.Now()) + persistCheckpointsMu.Unlock() + if !shouldPersistCheckpoint { return nil } From 9178688a7d874250fd25ca2af8a4d2933ec34da1 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Fri, 11 Feb 2022 22:22:42 -0500 Subject: [PATCH 2/2] spanconfig: verify migration for rangefeed enablement \#74555 starts using the span configs infrastructure to control whether rangefeeds are enabled over a given range. Before dynamic system table IDs (#76003), we used the range's key boundaries to determine whether the range in question was for a system table ID. In mixed-version clusters, it's possible to have both forms of this check. To ensure things work in this form (something we suspected in #76331), we add a test. NB: The reason things still work is because in #74555 we modified the system config span to hard code the relevant config fields for constant system table IDs -- behaving identically to previous version nodes. Release note: None --- pkg/migration/migrations/BUILD.bazel | 3 + .../migrations/migrate_span_configs_test.go | 106 ++++++++++++++++++ 2 files changed, 109 insertions(+) diff --git a/pkg/migration/migrations/BUILD.bazel b/pkg/migration/migrations/BUILD.bazel index f4a67d457a42..12b68055b2dc 100644 --- a/pkg/migration/migrations/BUILD.bazel +++ b/pkg/migration/migrations/BUILD.bazel @@ -87,6 +87,7 @@ go_test( "//pkg/jobs", "//pkg/keys", "//pkg/kv", + "//pkg/kv/kvclient/rangefeed", "//pkg/kv/kvclient/rangefeed/rangefeedcache", "//pkg/kv/kvserver", "//pkg/kv/kvserver/stateloader", @@ -96,6 +97,7 @@ go_test( "//pkg/server", "//pkg/settings/cluster", "//pkg/spanconfig", + "//pkg/spanconfig/spanconfigsqlwatcher", "//pkg/sql", "//pkg/sql/catalog", "//pkg/sql/catalog/catalogkeys", @@ -118,6 +120,7 @@ go_test( "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/protoutil", + "//pkg/util/syncutil", "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", diff --git a/pkg/migration/migrations/migrate_span_configs_test.go b/pkg/migration/migrations/migrate_span_configs_test.go index 1b0658f5b684..1581bdd1b555 100644 --- a/pkg/migration/migrations/migrate_span_configs_test.go +++ b/pkg/migration/migrations/migrate_span_configs_test.go @@ -13,21 +13,127 @@ package migrations_test import ( "context" "fmt" + "sync" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedcache" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigsqlwatcher" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) +// TestMixedVersionClusterEnableRangefeeds tests that clusters that haven't +// migrated into the span configs still support rangefeeds over system table +// ranges. +func TestMixedVersionClusterEnableRangefeeds(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: make(chan struct{}), + BinaryVersionOverride: clusterversion.ByKey( + clusterversion.EnsureSpanConfigReconciliation - 1, + ), + }, + SpanConfig: &spanconfig.TestingKnobs{ + ManagerDisableJobCreation: true, + }, + }, + }, + }) + + defer tc.Stopper().Stop(ctx) + ts := tc.Server(0) + + // We spin up a SQL watcher, which makes use of range feeds internally over + // system tables. By observing SQL descriptor updates through the watcher, we + // know that the rangefeeds are enabled. + noopCheckpointDuration := 100 * time.Millisecond + sqlWatcher := spanconfigsqlwatcher.New( + keys.SystemSQLCodec, + ts.ClusterSettings(), + ts.RangeFeedFactory().(*rangefeed.Factory), + 1<<20, /* 1 MB, bufferMemLimit */ + ts.Stopper(), + noopCheckpointDuration, + nil, /* knobs */ + ) + + tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0 /* idx */)) + beforeStmtTS := ts.Clock().Now() + tdb.Exec(t, "CREATE TABLE t()") + afterStmtTS := ts.Clock().Now() + var expDescID descpb.ID + row := tdb.QueryRow(t, `SELECT id FROM system.namespace WHERE name='t'`) + row.Scan(&expDescID) + + var wg sync.WaitGroup + mu := struct { + syncutil.Mutex + lastCheckpoint hlc.Timestamp + }{} + + watch := func(ctx context.Context, onCheckpoint func(hlc.Timestamp)) { + defer wg.Done() + + receivedIDs := make(map[descpb.ID]struct{}) + err := sqlWatcher.WatchForSQLUpdates(ctx, beforeStmtTS, + func(_ context.Context, updates []spanconfig.SQLUpdate, checkpointTS hlc.Timestamp) error { + onCheckpoint(checkpointTS) + + for _, update := range updates { + receivedIDs[update.GetDescriptorUpdate().ID] = struct{}{} + } + return nil + }) + require.True(t, testutils.IsError(err, "context canceled")) + require.Equal(t, 1, len(receivedIDs)) + _, seen := receivedIDs[expDescID] + require.True(t, seen) + } + + watcherCtx, watcherCancel := context.WithCancel(ctx) + wg.Add(1) + go watch(watcherCtx, func(ts hlc.Timestamp) { + mu.Lock() + mu.lastCheckpoint = ts + mu.Unlock() + }) + + testutils.SucceedsSoon(t, func() error { + mu.Lock() + defer mu.Unlock() + + if mu.lastCheckpoint.Less(afterStmtTS) { + return errors.New("w1 checkpoint precedes statement timestamp") + } + return nil + }) + + watcherCancel() + wg.Wait() +} + // TestEnsureSpanConfigReconciliation verifies that the migration waits for a // span config reconciliation attempt, blocking until it occurs. func TestEnsureSpanConfigReconciliation(t *testing.T) {