Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
91394: changefeedccl: roachtest refactor and initial-scan-only r=samiskin a=samiskin

Epic: https://cockroachlabs.atlassian.net/browse/CRDB-19057

Changefeed roachtests were setup focused on running a workload for a specific duration and then quitting, making it difficult to run an `initial_scan_only` test that terminated upon Job success.

We as a team have also noticed a greater need to test and observe changefeeds running in production against real sinks to catch issues we are unable to mock or observe from simple unit tests.  This is currently a notable hassle as one has to set up each individual sink and run them, ensure the changefeed is pointing to the right URI, and then be able to monitor the metrics of this long running process.  

This change refactors the cdcBasicTest into distinct pieces that are then put together in a test.  This allows for easier experimentation with live tests, allowing us to spin up a cluster and a workload, run one or more changefeeds on it, set up a poller to print out job details,have an accessible grafana URL to view metrics, and wait for some completion condition.

Changing the specialized `runCDCKafkaAuth`, `runCDCBank`, and `runCDCSchemaRegistry` functions were left out of scope for this first big change.

The main APIs involved in basic roachtests are now:
- `newCDCTester`: This creates a tester struct to run the rest of the APIs and initializes the database
- `tester.runTPCCWorkload(tpccArgs)`: Starts a TPCC workload from the last node in the cluster
- `tester.runLedgerWorkload(ledgerArgs)`: Starts a Ledger workload from the last node in the cluster
- `tester.runFeedLatencyVerifier(changefeedJob, latencyTargets)`: starts a routine that monitors the changefeed latency until the tester is `Close`'d
- `tester.waitForWorkload`: waits for a workload started by `setupAndRunWorkload` to complete its duration
- `tester.startCRDBChaos`: This starts a Chaos routine that periodically shuts nodes down and brings them back up
- `tester.newChangefeed(feedArgs)`: starts a new changefeed on the cluster and returns `changefeedJob` object
- `changefeedJob.waitForCompletion`: waits for a changefeed to complete (either success or failure)
- `tester.startGrafana`: Sets up a grafana instance on the last node of the cluster and prints out a link to a grafana, this automatically runs unless `--skip-init` is provided.  If `--debug` is not used, `StopGrafana` will be called on test teardown to publish prometheus metrics to the artifacts directory.

An API that is going to be more useful for experimentation are:
- `changefeedJob.runFeedPoller(ctx, stopper, onInfo)`: runs a given callback every second with the changefeed info

Roachtests can be ran locally with the `--local` flag or on an existing cluster without destroying it afterwards with `--cluster="my-cluster" --debug`

Ex: After adding a new test (lets say `"cdc/my-test"`) to the `registerCDC` function you can keep running 
```bash
./dev build cockroach --cross # if changes made to crdb
./dev build roachtest         # if changes made to the test

./bin/roachtest run cdc/my-test --cluster="my-cluster" --debug
```
as you try out different changes or options.  If you want to try a set of steps against different versions of the app you could download those binaries and use the `--cockroach="path-to-binary"` flag to test against those instead.

If you want to set up a large TPCC database on a cluster and reuse it for tests this can be done with roachtests's `--wipe` and `--skip-init` flags.

Release note: None

91627: upgrade: introduce "permanent" upgrades r=andreimatei a=andreimatei

This patch introduces "permanent" upgrades - a type of upgrade that is
tied to a particular cluster version (just like the existing upgrades)
but that runs regardless of the version at which the cluster was
bootstrapped (in contrast with the existing upgrades that are not run
when they're associated with a cluster version <= the bootstrap
version). These upgrades are called "permanent" because they cannot be
deleted from the codebase at a later point, in contrast with the others
that are deleted once the version they're tied drops below
BinaryMinSupportedVersion).

Existing upgrades are explicitly or implicitly baked into the bootstrap
image of the binary that introduced them. For example, an upgrade that
creates a system table is only run when upgrading an existing,
older-version, cluster to the new version; it does not run for a cluster
bootstrapped by the binary that introduced the upgrade because the
respective system tables are also included in the bootstrap metadata.
For some upcoming upgrades, though, including them in the bootstrap
image is difficult. For example, creating a job record at bootstrap
time is proving to be difficult (the system.jobs table has indexes, so
you want to insert into it through SQL because figuring out the kv's for
a row is tedious, etc). This is where these new permanent upgrades come
in.

These permanent upgrades replace the `startupmigrations` that don't have
the `includedInBootstrap` field set. All such startupmigrations have
been copied over as upgrades. None of the current `startupmigrations`
have `includedInBootstrap` set (except one but that's dummy one since
the actual migration code has been deleted), so the startupmigrations
package is now deleted. That's a good thing - we had one too many
migrations frameworks.

These permanent upgrades, though, do not have exactly the same semantics
as the startupmigrations they replace. To the extent that there is a
difference, the new semantics are considered more desirable:
- startupmigrations run when a node that has the code for a particular
  migration startups up for the first time. In other words, the
  startupmigrations were not associated with a cluster version; they were
  associated with a binary version. Migrations can run while old-version
  nodes are still around.  This means that one cannot add a
  migration that is a problem for old nodes - e.g. a migration creating a
  job of a type that the old version wouldn't recognize.
- upgrades are tied to a cluster version - they only run when the
  cluster's active version moves past the upgrade's version. This stays
  the case for the new permanent migrations too, so a v2 node will not
  immediately run the permant migrations introduced since v1 when it joins
  a v1 cluster. Instead, the migrations will run when the cluster version
  is bumped. As such, the migrations can be backwards incompatible.

startupmigrations do arguably have a property that can be desirable:
when there are no backwards compatibility issues, the v2 node can rely
on the effects of the startupmigrations it knows about regardless of the
cluster version. In contrast, with upgrades, not only is a node unable
to simply assume that a particular upgrade has run during startup, but,
more than that, a node is not even able to look at a version gate during
the startup sequence in order to determine whether a particular upgrade
has run or not (because, in clusters that are bootstrapped at v2, the
active cluster version starts as v2 even before the upgrades run). This
is a fact of life for existing upgrades, and now becomes a fact of life
for permanent upgrades too. However, by the time user SQL traffic is
admitted on a node, the node can rely on version gates to correspond to
migrations that have run.

After thinking about it, this possible advantage of startupmigrations
doesn't seem too useful and so it's not reason enough to keep the
startupmigrations machinery around.

Since the relevant startupmigrations have been moved over to upgrades,
and the two libraries use different methods for not running the same
migration twice, a 23.1 node that comes up in a 22.2 cluster will re-run
the several permanent upgrades in question, even though they had already
run as startupmigrations. This is OK since both startupmigrations and
upgrades are idempotent. None of the current permanent upgrades are too
expensive.

Closes cockroachdb#73813

Release note: None
Epic: None

Co-authored-by: Shiranka Miskin <[email protected]>
Co-authored-by: Andrei Matei <[email protected]>
  • Loading branch information
3 people committed Nov 23, 2022
3 parents 3bf4498 + 46c75a5 + 399e56b commit 1d04cec
Show file tree
Hide file tree
Showing 72 changed files with 1,704 additions and 2,900 deletions.
1 change: 0 additions & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,6 @@
/pkg/security/clientsecopts/ @cockroachdb/unowned @cockroachdb/server-prs @cockroachdb/sql-experience @cockroachdb/prodsec
/pkg/settings/ @cockroachdb/unowned
/pkg/spanconfig/ @cockroachdb/kv-prs
/pkg/startupmigrations/ @cockroachdb/unowned @cockroachdb/sql-schema
/pkg/repstream/ @cockroachdb/disaster-recovery
/pkg/testutils/ @cockroachdb/test-eng-noreview
/pkg/testutils/reduce/ @cockroachdb/sql-queries
Expand Down
2 changes: 0 additions & 2 deletions docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -3044,8 +3044,6 @@ may increase either contention or retry errors, or both.</p>
</span></td><td>Volatile</td></tr>
<tr><td><a name="crdb_internal.cluster_setting_encoded_default"></a><code>crdb_internal.cluster_setting_encoded_default(setting: <a href="string.html">string</a>) &rarr; <a href="string.html">string</a></code></td><td><span class="funcdesc"><p>Returns the encoded default value of the given cluster setting.</p>
</span></td><td>Immutable</td></tr>
<tr><td><a name="crdb_internal.completed_migrations"></a><code>crdb_internal.completed_migrations() &rarr; <a href="string.html">string</a>[]</code></td><td><span class="funcdesc"><p>This function is used only by CockroachDB’s developers for testing purposes.</p>
</span></td><td>Volatile</td></tr>
<tr><td><a name="crdb_internal.create_join_token"></a><code>crdb_internal.create_join_token() &rarr; <a href="string.html">string</a></code></td><td><span class="funcdesc"><p>Creates a join token for use when adding a new node to a secure cluster.</p>
</span></td><td>Volatile</td></tr>
<tr><td><a name="crdb_internal.create_session_revival_token"></a><code>crdb_internal.create_session_revival_token() &rarr; <a href="bytes.html">bytes</a></code></td><td><span class="funcdesc"><p>Generate a token that can be used to create a new session for the current user.</p>
Expand Down
12 changes: 4 additions & 8 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -507,8 +507,6 @@ ALL_TESTS = [
"//pkg/sql/types:types_test",
"//pkg/sql:sql_disallowed_imports_test",
"//pkg/sql:sql_test",
"//pkg/startupmigrations/leasemanager:leasemanager_test",
"//pkg/startupmigrations:startupmigrations_test",
"//pkg/storage/enginepb:enginepb_test",
"//pkg/storage/fs:fs_test",
"//pkg/storage/metamorphic:metamorphic_test",
Expand Down Expand Up @@ -1829,10 +1827,6 @@ GO_TARGETS = [
"//pkg/sql/vtable:vtable",
"//pkg/sql:sql",
"//pkg/sql:sql_test",
"//pkg/startupmigrations/leasemanager:leasemanager",
"//pkg/startupmigrations/leasemanager:leasemanager_test",
"//pkg/startupmigrations:startupmigrations",
"//pkg/startupmigrations:startupmigrations_test",
"//pkg/storage/enginepb:enginepb",
"//pkg/storage/enginepb:enginepb_test",
"//pkg/storage/fs:fs",
Expand Down Expand Up @@ -1919,7 +1913,9 @@ GO_TARGETS = [
"//pkg/ts:ts_test",
"//pkg/ui:ui",
"//pkg/ui:ui_test",
"//pkg/upgrade/migrationstable:migrationstable",
"//pkg/upgrade/nodelivenesstest:nodelivenesstest",
"//pkg/upgrade/upgradebase:upgradebase",
"//pkg/upgrade/upgradecluster:upgradecluster",
"//pkg/upgrade/upgradecluster:upgradecluster_test",
"//pkg/upgrade/upgradejob:upgrade_job",
Expand Down Expand Up @@ -2909,8 +2905,6 @@ GET_X_DATA_TARGETS = [
"//pkg/sql/ttl/ttlschedule:get_x_data",
"//pkg/sql/types:get_x_data",
"//pkg/sql/vtable:get_x_data",
"//pkg/startupmigrations:get_x_data",
"//pkg/startupmigrations/leasemanager:get_x_data",
"//pkg/storage:get_x_data",
"//pkg/storage/enginepb:get_x_data",
"//pkg/storage/fs:get_x_data",
Expand Down Expand Up @@ -2968,7 +2962,9 @@ GET_X_DATA_TARGETS = [
"//pkg/ts/tspb:get_x_data",
"//pkg/ui:get_x_data",
"//pkg/upgrade:get_x_data",
"//pkg/upgrade/migrationstable:get_x_data",
"//pkg/upgrade/nodelivenesstest:get_x_data",
"//pkg/upgrade/upgradebase:get_x_data",
"//pkg/upgrade/upgradecluster:get_x_data",
"//pkg/upgrade/upgradejob:get_x_data",
"//pkg/upgrade/upgrademanager:get_x_data",
Expand Down
7 changes: 7 additions & 0 deletions pkg/base/license.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ var CheckEnterpriseEnabled = func(_ *cluster.Settings, _ uuid.UUID, feature stri
return errEnterpriseNotEnabled // nb: this is squarely in the hot path on OSS builds
}

// CCLDistributionAndEnterpriseEnabled is a simpler version of
// CheckEnterpriseEnabled which doesn't take in feature-related info and doesn't
// return an error with a nice message.
var CCLDistributionAndEnterpriseEnabled = func(st *cluster.Settings, clusterID uuid.UUID) bool {
return CheckEnterpriseEnabled(st, clusterID, "" /* feature */) == nil
}

var licenseTTLMetadata = metric.Metadata{
// This metric name isn't namespaced for backwards
// compatibility. The prior version of this metric was manually
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ func TestBackupAndRestoreJobDescription(t *testing.T) {
asOf1 := strings.TrimPrefix(matches[1], "/full")

sqlDB.CheckQueryResults(
t, "SELECT description FROM [SHOW JOBS] WHERE status != 'failed'",
t, "SELECT description FROM [SHOW JOBS] WHERE job_type != 'MIGRATION' AND status != 'failed'",
[][]string{
{fmt.Sprintf("BACKUP TO ('%s', '%s', '%s')", backups[0].(string), backups[1].(string),
backups[2].(string))},
Expand Down Expand Up @@ -5608,7 +5608,7 @@ func TestBackupRestoreShowJob(t *testing.T) {
// TODO (lucy): Update this if/when we decide to change how these jobs queued by
// the startup migration are handled.
sqlDB.CheckQueryResults(
t, "SELECT description FROM [SHOW JOBS] WHERE description != 'updating privileges' ORDER BY description",
t, "SELECT description FROM [SHOW JOBS] WHERE job_type != 'MIGRATION' AND description != 'updating privileges' ORDER BY description",
[][]string{
{"BACKUP DATABASE data TO 'nodelocal://0/foo' WITH revision_history = true"},
{"RESTORE TABLE data.bank FROM 'nodelocal://0/foo' WITH into_db = 'data 2', skip_missing_foreign_keys"},
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,7 @@ func (ca *changeAggregator) emitResolved(batch jobspb.ResolvedSpans) error {
rowenc.EncDatum{Datum: tree.DNull}, // key
rowenc.EncDatum{Datum: tree.DNull}, // value
})
ca.metrics.ResolvedMessages.Inc(1)

ca.recentKVCount = 0
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/kvccl/kvtenantccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ go_test(
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/upgrade",
"//pkg/upgrade/upgrades",
"//pkg/upgrade/upgradebase",
"//pkg/util",
"//pkg/util/grpcutil",
"//pkg/util/hlc",
Expand Down
10 changes: 5 additions & 5 deletions pkg/ccl/kvccl/kvtenantccl/tenant_upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/upgrade"
"github.com/cockroachdb/cockroach/pkg/upgrade/upgrades"
"github.com/cockroachdb/cockroach/pkg/upgrade/upgradebase"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
Expand Down Expand Up @@ -251,16 +251,16 @@ func TestTenantUpgradeFailure(t *testing.T) {
TenantID: roachpb.MustMakeTenantID(id),
TestingKnobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
UpgradeManager: &upgrade.TestingKnobs{
UpgradeManager: &upgradebase.TestingKnobs{
ListBetweenOverride: func(from, to roachpb.Version) []roachpb.Version {
return []roachpb.Version{v1, v2}
},
RegistryOverride: func(v roachpb.Version) (upgrade.Upgrade, bool) {
RegistryOverride: func(v roachpb.Version) (upgradebase.Upgrade, bool) {
switch v {
case v1:
return upgrade.NewTenantUpgrade("testing",
v1,
upgrades.NoPrecondition,
upgrade.NoPrecondition,
func(
ctx context.Context, version clusterversion.ClusterVersion, deps upgrade.TenantDeps,
) error {
Expand All @@ -269,7 +269,7 @@ func TestTenantUpgradeFailure(t *testing.T) {
case v2:
return upgrade.NewTenantUpgrade("testing next",
v2,
upgrades.NoPrecondition,
upgrade.NoPrecondition,
func(
ctx context.Context, version clusterversion.ClusterVersion, deps upgrade.TenantDeps,
) error {
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ go_test(
"//pkg/testutils/sqlutils",
"//pkg/testutils/storageutils",
"//pkg/testutils/testcluster",
"//pkg/upgrade/upgradebase",
"//pkg/util/hlc",
"//pkg/util/json",
"//pkg/util/leaktest",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/distsqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/upgrade/upgradebase"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/limit"
Expand All @@ -53,6 +54,10 @@ func TestStreamIngestionFrontierProcessor(t *testing.T) {
// be adopted as the processors are being manually executed in the test.
DisableAdoptions: true,
},
// DisableAdoptions needs this.
UpgradeManager: &upgradebase.TestingKnobs{
DontUseJobs: true,
},
},
},
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,13 @@ go_library(
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/sqlstats",
"//pkg/startupmigrations",
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/storage/fs",
"//pkg/testutils/serverutils",
"//pkg/ts",
"//pkg/ts/tspb",
"//pkg/upgrade/upgrades",
"//pkg/util",
"//pkg/util/cgroups",
"//pkg/util/contextutil",
Expand Down
4 changes: 2 additions & 2 deletions pkg/cli/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/cli/clisqlexec"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/startupmigrations"
"github.com/cockroachdb/cockroach/pkg/upgrade/upgrades"
"github.com/cockroachdb/errors/oserror"
"github.com/spf13/cobra"
"github.com/spf13/cobra/doc"
Expand Down Expand Up @@ -239,7 +239,7 @@ Output the list of cluster settings known to this binary.
defaultVal = sm.SettingsListDefault()
} else {
defaultVal = setting.String(&s.SV)
if override, ok := startupmigrations.SettingsDefaultOverrides[name]; ok {
if override, ok := upgrades.SettingsDefaultOverrides[name]; ok {
defaultVal = override
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/testdata/doctor/test_examine_cluster
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ debug doctor examine cluster
debug doctor examine cluster
Examining 48 descriptors and 47 namespace entries...
ParentID 100, ParentSchemaID 101: relation "foo" (105): expected matching namespace entry, found none
Examining 4 jobs...
Examining 12 jobs...
ERROR: validation failed
59 changes: 58 additions & 1 deletion pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,19 @@ type Key int
const (
invalidVersionKey Key = iota - 1 // want first named one to start at zero

// VPrimordial versions are used by upgrades below BinaryMinSupportedVersion,
// for whom the exact version they were associated with no longer matters.

VPrimordial1
VPrimordial2
VPrimordial3
VPrimordial4
VPrimordial5
VPrimordial6
VPrimordial7
VPrimordial8
VPrimordialMax

// V22_1 is CockroachDB v22.1. It's used for all v22.1.x patch releases.
V22_1

Expand Down Expand Up @@ -360,6 +373,42 @@ const TODOPreV22_1 = V22_1
// large number to every major if building from master, so as to ensure that
// master builds cannot be upgraded to release-branch builds.
var rawVersionsSingleton = keyedVersions{
{
Key: VPrimordial1,
Version: roachpb.Version{Major: 0, Minor: 0, Internal: 2},
},
{
Key: VPrimordial2,
Version: roachpb.Version{Major: 0, Minor: 0, Internal: 4},
},
{
Key: VPrimordial3,
Version: roachpb.Version{Major: 0, Minor: 0, Internal: 6},
},
{
Key: VPrimordial4,
Version: roachpb.Version{Major: 0, Minor: 0, Internal: 8},
},
{
Key: VPrimordial5,
Version: roachpb.Version{Major: 0, Minor: 0, Internal: 10},
},
{
Key: VPrimordial6,
Version: roachpb.Version{Major: 0, Minor: 0, Internal: 12},
},
{
Key: VPrimordial7,
Version: roachpb.Version{Major: 0, Minor: 0, Internal: 14},
},
{
Key: VPrimordial8,
Version: roachpb.Version{Major: 0, Minor: 0, Internal: 16},
},
{
Key: VPrimordialMax,
Version: roachpb.Version{Major: 0, Minor: 0, Internal: 424242},
},
{
Key: V22_1,
Version: roachpb.Version{Major: 22, Minor: 1},
Expand Down Expand Up @@ -585,8 +634,16 @@ var versionsSingleton = func() keyedVersions {
// then on to 1000004, etc.
skipFirst := envutil.EnvOrDefaultBool("COCKROACH_UPGRADE_TO_DEV_VERSION", false)
const devOffset = 1000000
first := true
for i := range rawVersionsSingleton {
if i == 0 && skipFirst {
// VPrimordial versions are not offset; they don't matter for the logic
// offsetting is used for.
if rawVersionsSingleton[i].Major == rawVersionsSingleton.MustByKey(VPrimordialMax).Major {
continue
}

if skipFirst && first {
first = false
continue
}
rawVersionsSingleton[i].Major += devOffset
Expand Down
4 changes: 4 additions & 0 deletions pkg/clusterversion/keyed_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,7 @@ func (kv keyedVersions) Validate() error {

return nil
}

func (kv keyedVersions) LastVersion() roachpb.Version {
return kv[len(kv)-1].Version
}
Loading

0 comments on commit 1d04cec

Please sign in to comment.