Skip to content

Commit

Permalink
spanconfig: verify migration for rangefeed enablement
Browse files Browse the repository at this point in the history
\#74555 starts using the span configs infrastructure to control whether
rangefeeds are enabled over a given range. Before dynamic system table
IDs (#76003), we used the range's key boundaries to determine whether
the range in question was for a system table ID. In mixed-version
clusters, it's possible to have both forms of this check. To ensure
things work in this form (something we suspected in #76331), we add a
test.

NB: The reason things still work is because in #74555 we modified the
system config span to hard code the relevant config fields for constant
system table IDs -- behaving identically to previous version nodes.

Release note: None
  • Loading branch information
irfansharif committed Feb 12, 2022
1 parent f82ce57 commit 0b1ac8b
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 0 deletions.
3 changes: 3 additions & 0 deletions pkg/migration/migrations/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ go_test(
"//pkg/jobs",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient/rangefeed",
"//pkg/kv/kvclient/rangefeed/rangefeedcache",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/stateloader",
Expand All @@ -96,6 +97,7 @@ go_test(
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigsqlwatcher",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catalogkeys",
Expand All @@ -118,6 +120,7 @@ go_test(
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
Expand Down
106 changes: 106 additions & 0 deletions pkg/migration/migrations/migrate_span_configs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,127 @@ package migrations_test
import (
"context"
"fmt"
"sync"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedcache"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigsqlwatcher"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

// TestMixedVersionClusterEnableRangefeeds tests that clusters that haven't
// migrated into the span configs still support rangefeeds over system table
// ranges.
func TestMixedVersionClusterEnableRangefeeds(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
DisableAutomaticVersionUpgrade: make(chan struct{}),
BinaryVersionOverride: clusterversion.ByKey(
clusterversion.EnsureSpanConfigReconciliation - 1,
),
},
SpanConfig: &spanconfig.TestingKnobs{
ManagerDisableJobCreation: true,
},
},
},
})

defer tc.Stopper().Stop(ctx)
ts := tc.Server(0)

// We spin up a SQL watcher, which makes use of range feeds internally over
// system tables. By observing SQL descriptor updates through the watcher, we
// know that the rangefeeds are enabled.
noopCheckpointDuration := 100 * time.Millisecond
sqlWatcher := spanconfigsqlwatcher.New(
keys.SystemSQLCodec,
ts.ClusterSettings(),
ts.RangeFeedFactory().(*rangefeed.Factory),
1<<20, /* 1 MB, bufferMemLimit */
ts.Stopper(),
noopCheckpointDuration,
nil, /* knobs */
)

tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0 /* idx */))
beforeStmtTS := ts.Clock().Now()
tdb.Exec(t, "CREATE TABLE t()")
afterStmtTS := ts.Clock().Now()
var expDescID descpb.ID
row := tdb.QueryRow(t, `SELECT id FROM system.namespace WHERE name='t'`)
row.Scan(&expDescID)

var wg sync.WaitGroup
mu := struct {
syncutil.Mutex
lastCheckpoint hlc.Timestamp
}{}

watch := func(ctx context.Context, onCheckpoint func(hlc.Timestamp)) {
defer wg.Done()

receivedIDs := make(map[descpb.ID]struct{})
err := sqlWatcher.WatchForSQLUpdates(ctx, beforeStmtTS,
func(_ context.Context, updates []spanconfig.SQLUpdate, checkpointTS hlc.Timestamp) error {
onCheckpoint(checkpointTS)

for _, update := range updates {
receivedIDs[update.GetDescriptorUpdate().ID] = struct{}{}
}
return nil
})
require.True(t, testutils.IsError(err, "context canceled"))
require.Equal(t, 1, len(receivedIDs))
_, seen := receivedIDs[expDescID]
require.True(t, seen)
}

watcherCtx, watcherCancel := context.WithCancel(ctx)
wg.Add(1)
go watch(watcherCtx, func(ts hlc.Timestamp) {
mu.Lock()
mu.lastCheckpoint = ts
mu.Unlock()
})

testutils.SucceedsSoon(t, func() error {
mu.Lock()
defer mu.Unlock()

if mu.lastCheckpoint.Less(afterStmtTS) {
return errors.New("w1 checkpoint precedes statement timestamp")
}
return nil
})

watcherCancel()
wg.Wait()
}

// TestEnsureSpanConfigReconciliation verifies that the migration waits for a
// span config reconciliation attempt, blocking until it occurs.
func TestEnsureSpanConfigReconciliation(t *testing.T) {
Expand Down

0 comments on commit 0b1ac8b

Please sign in to comment.