diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 44f34c938df8..9e45e7c51a8d 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -53,7 +53,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/cloud/cloudpb" "github.com/cockroachdb/cockroach/pkg/cloud/gcp" _ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register cloud storage providers - "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -67,7 +66,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/security/username" - "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/sql" @@ -10150,67 +10148,6 @@ func TestBackupNoOverwriteLatest(t *testing.T) { require.NotEqual(t, firstLatest, thirdLatest) } -// TestBackupLatestInBaseDirectory tests to see that a LATEST -// file in the base directory can be properly read when one is not found -// in metadata/latest. This can occur when an older version node creates -// the backup. -func TestBackupLatestInBaseDirectory(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - disableUpgradeCh := make(chan struct{}) - const numAccounts = 1 - const userfile = "'userfile:///a'" - args := base.TestClusterArgs{ - ServerArgs: base.TestServerArgs{ - Knobs: base.TestingKnobs{ - Server: &server.TestingKnobs{ - BinaryVersionOverride: clusterversion.ByKey(clusterversion.BackupDoesNotOverwriteLatestAndCheckpoint - 1), - DisableAutomaticVersionUpgrade: disableUpgradeCh, - }, - }, - }, - } - - tc, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, InitManualReplication, args) - defer cleanupFn() - execCfg := tc.Server(0).ExecutorConfig().(sql.ExecutorConfig) - ctx := context.Background() - store, err := execCfg.DistSQLSrv.ExternalStorageFromURI(ctx, "userfile:///a", username.RootUserName()) - require.NoError(t, err) - - query := fmt.Sprintf("BACKUP INTO %s", userfile) - sqlDB.Exec(t, query) - - // Confirm that the LATEST file was written to the base directory. - r, err := store.ReadFile(ctx, backupbase.LatestFileName) - require.NoError(t, err) - r.Close(ctx) - - // Close the channel so that the cluster version is upgraded. - close(disableUpgradeCh) - // Check the cluster version is bumped to newVersion. - testutils.SucceedsSoon(t, func() error { - var version string - sqlDB.QueryRow(t, "SELECT value FROM system.settings WHERE name = 'version'").Scan(&version) - var v clusterversion.ClusterVersion - if err := protoutil.Unmarshal([]byte(version), &v); err != nil { - return err - } - version = v.String() - if version != clusterversion.TestingBinaryVersion.String() { - return errors.Errorf("cluster version is still %s, should be %s", version, clusterversion.TestingBinaryVersion.String()) - } - return nil - }) - - // Take an incremental backup on the new version using the latest file - // written by the old version in the base directory. - query = fmt.Sprintf("BACKUP INTO LATEST IN %s", userfile) - sqlDB.Exec(t, query) - -} - // TestBackupRestoreTelemetryEvents tests that BACKUP and RESTORE correctly // publishes telemetry events. func TestBackupRestoreTelemetryEvents(t *testing.T) { diff --git a/pkg/ccl/backupccl/backupdest/BUILD.bazel b/pkg/ccl/backupccl/backupdest/BUILD.bazel index dde48e96258a..f4bab9168bde 100644 --- a/pkg/ccl/backupccl/backupdest/BUILD.bazel +++ b/pkg/ccl/backupccl/backupdest/BUILD.bazel @@ -48,9 +48,7 @@ go_test( "//pkg/ccl/utilccl", "//pkg/cloud", "//pkg/cloud/impl:cloudimpl", - "//pkg/clusterversion", "//pkg/jobs/jobspb", - "//pkg/roachpb", "//pkg/security/securityassets", "//pkg/security/securitytest", "//pkg/security/username", diff --git a/pkg/ccl/backupccl/backupdest/backup_destination.go b/pkg/ccl/backupccl/backupdest/backup_destination.go index 9f7e7c3ec479..af38d2093b9a 100644 --- a/pkg/ccl/backupccl/backupdest/backup_destination.go +++ b/pkg/ccl/backupccl/backupdest/backup_destination.go @@ -325,13 +325,6 @@ func FindLatestFile( func WriteNewLatestFile( ctx context.Context, settings *cluster.Settings, exportStore cloud.ExternalStorage, suffix string, ) error { - // If the cluster is still running on a mixed version, we want to write - // to the base directory instead of the metadata/latest directory. That - // way an old node can still find the LATEST file. - if !settings.Version.IsActive(ctx, clusterversion.BackupDoesNotOverwriteLatestAndCheckpoint) { - return cloud.WriteFile(ctx, exportStore, backupbase.LatestFileName, strings.NewReader(suffix)) - } - // HTTP storage does not support listing and so we cannot rely on the // above-mentioned List method to return us the most recent latest file. // Instead, we disregard write once semantics and always read and write diff --git a/pkg/ccl/backupccl/backupdest/backup_destination_test.go b/pkg/ccl/backupccl/backupdest/backup_destination_test.go index 5b13b3d956b7..feee9df61758 100644 --- a/pkg/ccl/backupccl/backupdest/backup_destination_test.go +++ b/pkg/ccl/backupccl/backupdest/backup_destination_test.go @@ -21,9 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuputils" "github.com/cockroachdb/cockroach/pkg/cloud" _ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register cloud storage providers - "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -298,7 +296,6 @@ func TestBackupRestoreResolveDestination(t *testing.T) { inc8Time := full3Time.Add(time.Minute * 30) inc9Time := inc8Time.Add(time.Minute * 30) full4Time := inc9Time.Add(time.Minute * 30) - inc10Time := full4Time.Add(time.Minute * 30) // firstBackupChain is maintained throughout the tests as the history of // backups that were taken based on the initial full backup. @@ -311,14 +308,12 @@ func TestBackupRestoreResolveDestination(t *testing.T) { // separate path relative to the full backup in their chain. Otherwise, // it should be an empty array of strings noIncrementalStorage := []string(nil) - incrementalBackupSubdirEnabled := true - notIncrementalBackupSubdirEnabled := false firstRemoteBackupChain := []string(nil) testCollectionBackup := func(t *testing.T, backupTime time.Time, expectedDefault, expectedSuffix, expectedIncDir string, expectedPrevBackups []string, - appendToLatest bool, subdir string, incrementalTo []string, incrementalBackupSubdirEnabled bool) { + appendToLatest bool, subdir string, incrementalTo []string) { endTime := hlc.Timestamp{WallTime: backupTime.UnixNano()} incrementalFrom := []string(nil) @@ -336,17 +331,6 @@ func TestBackupRestoreResolveDestination(t *testing.T) { _, localityCollections, err = backupdest.GetURIsByLocalityKV(incrementalTo, "") require.NoError(t, err) } - currentVersion := execCfg.Settings.Version.ActiveVersion(ctx) - if !incrementalBackupSubdirEnabled { - // Downgrading is disallowed in normal operation, but fine for testing here. - err = execCfg.Settings.Version.SetActiveVersion(ctx, clusterversion.ClusterVersion{ - Version: roachpb.Version{ - Major: 21, - Minor: 2, - }, - }) - require.NoError(t, err) - } fullBackupExists := false if expectedIncDir != "" { @@ -360,9 +344,7 @@ func TestBackupRestoreResolveDestination(t *testing.T) { incrementalFrom, &execCfg, ) - clusterErr := execCfg.Settings.Version.SetActiveVersion(ctx, currentVersion) require.NoError(t, err) - require.NoError(t, clusterErr) localityDests := make(map[string]string, len(localityCollections)) for locality, localityDest := range localityCollections { @@ -389,7 +371,7 @@ func TestBackupRestoreResolveDestination(t *testing.T) { testCollectionBackup(t, fullTime, expectedDefault, expectedSuffix, expectedIncDir, firstBackupChain, - false /* intoLatest */, noExplicitSubDir, noIncrementalStorage, incrementalBackupSubdirEnabled) + false /* intoLatest */, noExplicitSubDir, noIncrementalStorage) firstBackupChain = append(firstBackupChain, expectedDefault) firstRemoteBackupChain = append(firstRemoteBackupChain, expectedDefault) writeManifest(t, expectedDefault) @@ -406,7 +388,7 @@ func TestBackupRestoreResolveDestination(t *testing.T) { testCollectionBackup(t, inc1Time, expectedDefault, expectedSuffix, expectedIncDir, firstBackupChain, - true /* intoLatest */, noExplicitSubDir, defaultIncrementalTo, incrementalBackupSubdirEnabled) + true /* intoLatest */, noExplicitSubDir, defaultIncrementalTo) firstBackupChain = append(firstBackupChain, expectedDefault) writeManifest(t, expectedDefault) } @@ -420,7 +402,7 @@ func TestBackupRestoreResolveDestination(t *testing.T) { testCollectionBackup(t, inc2Time, expectedDefault, expectedSuffix, expectedIncDir, firstBackupChain, - true /* intoLatest */, noExplicitSubDir, defaultIncrementalTo, incrementalBackupSubdirEnabled) + true /* intoLatest */, noExplicitSubDir, defaultIncrementalTo) firstBackupChain = append(firstBackupChain, expectedDefault) writeManifest(t, expectedDefault) } @@ -435,7 +417,7 @@ func TestBackupRestoreResolveDestination(t *testing.T) { testCollectionBackup(t, full2Time, expectedDefault, expectedSuffix, expectedIncDir, []string(nil), - false /* intoLatest */, noExplicitSubDir, noIncrementalStorage, incrementalBackupSubdirEnabled) + false /* intoLatest */, noExplicitSubDir, noIncrementalStorage) writeManifest(t, expectedDefault) // We also wrote a new full backup, so let's update the latest. writeLatest(t, collectionLoc, expectedSuffix) @@ -450,7 +432,7 @@ func TestBackupRestoreResolveDestination(t *testing.T) { testCollectionBackup(t, inc3Time, expectedDefault, expectedSuffix, expectedIncDir, []string{backup2Location}, - true /* intoLatest */, noExplicitSubDir, defaultIncrementalTo, incrementalBackupSubdirEnabled) + true /* intoLatest */, noExplicitSubDir, defaultIncrementalTo) writeManifest(t, expectedDefault) } @@ -463,7 +445,7 @@ func TestBackupRestoreResolveDestination(t *testing.T) { testCollectionBackup(t, inc4Time, expectedDefault, expectedSuffix, expectedIncDir, firstBackupChain, - false /* intoLatest */, expectedSubdir, defaultIncrementalTo, incrementalBackupSubdirEnabled) + false /* intoLatest */, expectedSubdir, defaultIncrementalTo) writeManifest(t, expectedDefault) } @@ -479,7 +461,7 @@ func TestBackupRestoreResolveDestination(t *testing.T) { testCollectionBackup(t, inc5Time, expectedDefault, expectedSuffix, expectedIncDir, firstRemoteBackupChain, - false /* intoLatest */, expectedSubdir, customIncrementalTo, incrementalBackupSubdirEnabled) + false /* intoLatest */, expectedSubdir, customIncrementalTo) writeManifest(t, expectedDefault) firstRemoteBackupChain = append(firstRemoteBackupChain, expectedDefault) @@ -497,7 +479,7 @@ func TestBackupRestoreResolveDestination(t *testing.T) { testCollectionBackup(t, inc6Time, expectedDefault, expectedSuffix, expectedIncDir, firstRemoteBackupChain, - false /* intoLatest */, expectedSubdir, customIncrementalTo, incrementalBackupSubdirEnabled) + false /* intoLatest */, expectedSubdir, customIncrementalTo) writeManifest(t, expectedDefault) } @@ -514,7 +496,7 @@ func TestBackupRestoreResolveDestination(t *testing.T) { testCollectionBackup(t, inc7Time, expectedDefault, expectedSuffix, expectedIncDir, []string{backup2Location}, - true /* intoLatest */, expectedSubdir, customIncrementalTo, incrementalBackupSubdirEnabled) + true /* intoLatest */, expectedSubdir, customIncrementalTo) writeManifest(t, expectedDefault) } @@ -528,7 +510,7 @@ func TestBackupRestoreResolveDestination(t *testing.T) { testCollectionBackup(t, full3Time, expectedDefault, expectedSuffix, expectedIncDir, []string(nil), - false /* intoLatest */, noExplicitSubDir, noIncrementalStorage, incrementalBackupSubdirEnabled) + false /* intoLatest */, noExplicitSubDir, noIncrementalStorage) writeManifest(t, expectedDefault) // We also wrote a new full backup, so let's update the latest. writeLatest(t, collectionLoc, expectedSuffix) @@ -557,42 +539,23 @@ func TestBackupRestoreResolveDestination(t *testing.T) { testCollectionBackup(t, inc9Time, expectedDefault, expectedSuffix, expectedIncDir, []string{backup3Location, oldStyleDefault}, - true /* intoLatest */, expectedSubdir, noIncrementalStorage, incrementalBackupSubdirEnabled) + true /* intoLatest */, expectedSubdir, noIncrementalStorage) writeManifest(t, expectedDefault) } // A new full backup: BACKUP INTO collection - var backup4Location string { expectedSuffix := "/2020/12/25-120000.00" expectedIncDir := "" expectedDefault := fmt.Sprintf("nodelocal://1/%s%s?AUTH=implicit", t.Name(), expectedSuffix) - backup4Location = expectedDefault testCollectionBackup(t, full4Time, expectedDefault, expectedSuffix, expectedIncDir, []string(nil), - false /* intoLatest */, noExplicitSubDir, noIncrementalStorage, notIncrementalBackupSubdirEnabled) + false /* intoLatest */, noExplicitSubDir, noIncrementalStorage) writeManifest(t, expectedDefault) // We also wrote a new full backup, so let's update the latest. writeLatest(t, collectionLoc, expectedSuffix) } - - // An automatic incremental into the fourth full backup: BACKUP INTO LATEST - // IN collection, BUT simulating an old cluster that doesn't support - // dedicated incrementals subdir by default yet. - { - expectedSuffix := "/2020/12/25-120000.00" - expectedIncDir := "/20201225/123000.00" - expectedSubdir := expectedSuffix - expectedDefault := fmt.Sprintf("nodelocal://1/%s%s%s?AUTH=implicit", - t.Name(), - expectedSuffix, expectedIncDir) - - testCollectionBackup(t, inc10Time, - expectedDefault, expectedSuffix, expectedIncDir, []string{backup4Location}, - true /* intoLatest */, expectedSubdir, noIncrementalStorage, notIncrementalBackupSubdirEnabled) - writeManifest(t, expectedDefault) - } }) }) } diff --git a/pkg/ccl/backupccl/backupdest/incrementals.go b/pkg/ccl/backupccl/backupdest/incrementals.go index b412fb8233d1..3431cfb68222 100644 --- a/pkg/ccl/backupccl/backupdest/incrementals.go +++ b/pkg/ccl/backupccl/backupdest/incrementals.go @@ -18,7 +18,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupbase" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuputils" "github.com/cockroachdb/cockroach/pkg/cloud" - "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/errors" @@ -166,9 +165,8 @@ func ResolveIncrementalsBackupLocation( "Please choose a location manually with the `incremental_location` parameter.") } - // If the cluster isn't fully migrated, or we have backups in the old default - // location, continue to use the old location. - if len(prevOld) > 0 || !execCfg.Settings.Version.IsActive(ctx, clusterversion.IncrementalBackupSubdir) { + // If we have backups in the old default location, continue to use the old location. + if len(prevOld) > 0 { return resolvedIncrementalsBackupLocationOld, nil } diff --git a/pkg/ccl/backupccl/backupinfo/BUILD.bazel b/pkg/ccl/backupccl/backupinfo/BUILD.bazel index 8085531cda19..5692ff983f68 100644 --- a/pkg/ccl/backupccl/backupinfo/BUILD.bazel +++ b/pkg/ccl/backupccl/backupinfo/BUILD.bazel @@ -18,7 +18,6 @@ go_library( "//pkg/ccl/storageccl", "//pkg/cloud", "//pkg/cloud/cloudpb", - "//pkg/clusterversion", "//pkg/jobs/jobspb", "//pkg/keys", "//pkg/roachpb", diff --git a/pkg/ccl/backupccl/backupinfo/manifest_handling.go b/pkg/ccl/backupccl/backupinfo/manifest_handling.go index 9dd6397524d6..976b56daa87c 100644 --- a/pkg/ccl/backupccl/backupinfo/manifest_handling.go +++ b/pkg/ccl/backupccl/backupinfo/manifest_handling.go @@ -26,7 +26,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/cloud/cloudpb" - "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" @@ -984,26 +983,6 @@ func WriteBackupManifestCheckpoint( } } - // If the cluster is still running on a mixed version, we want to write - // to the base directory instead of the progress directory. That way if - // an old node resumes a backup, it doesn't have to start over. - if !execCfg.Settings.Version.IsActive(ctx, clusterversion.BackupDoesNotOverwriteLatestAndCheckpoint) { - // We want to overwrite the latest checkpoint in the base directory, - // just write to the non versioned BACKUP-CHECKPOINT file. - err = cloud.WriteFile(ctx, defaultStore, BackupManifestCheckpointName, bytes.NewReader(descBuf)) - if err != nil { - return err - } - - checksum, err := GetChecksum(descBuf) - if err != nil { - return err - } - - return cloud.WriteFile(ctx, defaultStore, BackupManifestCheckpointName+ - BackupManifestChecksumSuffix, bytes.NewReader(checksum)) - } - // We timestamp the checkpoint files in order to enforce write once backups. // When the job goes to read these timestamped files, it will List // the checkpoints and pick the file whose name is lexicographically diff --git a/pkg/cloud/externalconn/connectionpb/BUILD.bazel b/pkg/cloud/externalconn/connectionpb/BUILD.bazel index d002bf35068c..9d9e9adbd347 100644 --- a/pkg/cloud/externalconn/connectionpb/BUILD.bazel +++ b/pkg/cloud/externalconn/connectionpb/BUILD.bazel @@ -8,10 +8,7 @@ proto_library( srcs = ["connection.proto"], strip_import_prefix = "/pkg", visibility = ["//visibility:public"], - deps = [ - "//pkg/cloud/cloudpb:cloudpb_proto", - "@com_github_gogo_protobuf//gogoproto:gogo_proto", - ], + deps = ["@com_github_gogo_protobuf//gogoproto:gogo_proto"], ) go_proto_library( @@ -20,10 +17,7 @@ go_proto_library( importpath = "github.com/cockroachdb/cockroach/pkg/cloud/externalconn/connectionpb", proto = ":connectionpb_proto", visibility = ["//visibility:public"], - deps = [ - "//pkg/cloud/cloudpb", - "@com_github_gogo_protobuf//gogoproto", - ], + deps = ["@com_github_gogo_protobuf//gogoproto"], ) go_library( diff --git a/pkg/cloud/externalconn/connectionpb/connection.proto b/pkg/cloud/externalconn/connectionpb/connection.proto index a68e0f5a702a..d5d22d9ab24a 100644 --- a/pkg/cloud/externalconn/connectionpb/connection.proto +++ b/pkg/cloud/externalconn/connectionpb/connection.proto @@ -13,7 +13,6 @@ package cockroach.cloud.externalconn.connectionpb; option go_package = "connectionpb"; import "gogoproto/gogo.proto"; -import "cloud/cloudpb/external_storage.proto"; // ConnectionType is the type of the External Connection object. enum ConnectionType { diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index 497cc32ac7be..72ec94d32ec7 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -239,10 +239,6 @@ const ( // ChangefeedIdleness is the version where changefeed aggregators forward // idleness-related information alnog with resolved spans to the frontier ChangefeedIdleness - // BackupDoesNotOverwriteLatestAndCheckpoint is the version where we - // stop overwriting the LATEST and checkpoint files during backup execution. - // Instead, it writes new files alongside the old in reserved subdirectories. - BackupDoesNotOverwriteLatestAndCheckpoint // EnableDeclarativeSchemaChanger is the version where new declarative schema changer // can be used to construct schema change plan node. EnableDeclarativeSchemaChanger @@ -251,9 +247,6 @@ const ( // PebbleFormatSplitUserKeysMarked performs a Pebble-level migration and // upgrades the Pebble format major version to FormatSplitUserKeysMarked. PebbleFormatSplitUserKeysMarked - // IncrementalBackupSubdir enables backing up new incremental backups to a - // dedicated subdirectory, to make it easier to apply a different ttl. - IncrementalBackupSubdir // EnableNewStoreRebalancer enables the new store rebalancer introduced in // 22.1. EnableNewStoreRebalancer @@ -457,10 +450,6 @@ var versionsSingleton = keyedVersions{ Key: ChangefeedIdleness, Version: roachpb.Version{Major: 21, Minor: 2, Internal: 82}, }, - { - Key: BackupDoesNotOverwriteLatestAndCheckpoint, - Version: roachpb.Version{Major: 21, Minor: 2, Internal: 84}, - }, { Key: EnableDeclarativeSchemaChanger, Version: roachpb.Version{Major: 21, Minor: 2, Internal: 86}, @@ -473,10 +462,6 @@ var versionsSingleton = keyedVersions{ Key: PebbleFormatSplitUserKeysMarked, Version: roachpb.Version{Major: 21, Minor: 2, Internal: 90}, }, - { - Key: IncrementalBackupSubdir, - Version: roachpb.Version{Major: 21, Minor: 2, Internal: 92}, - }, { Key: EnableNewStoreRebalancer, Version: roachpb.Version{Major: 21, Minor: 2, Internal: 96}, diff --git a/pkg/clusterversion/key_string.go b/pkg/clusterversion/key_string.go index 73c5f7405ace..2087c66a50a7 100644 --- a/pkg/clusterversion/key_string.go +++ b/pkg/clusterversion/key_string.go @@ -30,41 +30,39 @@ func _() { _ = x[EnableLeaseHolderRemoval-19] _ = x[LooselyCoupledRaftLogTruncation-20] _ = x[ChangefeedIdleness-21] - _ = x[BackupDoesNotOverwriteLatestAndCheckpoint-22] - _ = x[EnableDeclarativeSchemaChanger-23] - _ = x[RowLevelTTL-24] - _ = x[PebbleFormatSplitUserKeysMarked-25] - _ = x[IncrementalBackupSubdir-26] - _ = x[EnableNewStoreRebalancer-27] - _ = x[ClusterLocksVirtualTable-28] - _ = x[AutoStatsTableSettings-29] - _ = x[SuperRegions-30] - _ = x[EnableNewChangefeedOptions-31] - _ = x[SpanCountTable-32] - _ = x[PreSeedSpanCountTable-33] - _ = x[SeedSpanCountTable-34] - _ = x[V22_1-35] - _ = x[Start22_2-36] - _ = x[LocalTimestamps-37] - _ = x[PebbleFormatSplitUserKeysMarkedCompacted-38] - _ = x[EnsurePebbleFormatVersionRangeKeys-39] - _ = x[EnablePebbleFormatVersionRangeKeys-40] - _ = x[TrigramInvertedIndexes-41] - _ = x[RemoveGrantPrivilege-42] - _ = x[MVCCRangeTombstones-43] - _ = x[UpgradeSequenceToBeReferencedByID-44] - _ = x[SampledStmtDiagReqs-45] - _ = x[AddSSTableTombstones-46] - _ = x[SystemPrivilegesTable-47] - _ = x[EnablePredicateProjectionChangefeed-48] - _ = x[AlterSystemSQLInstancesAddLocality-49] - _ = x[SystemExternalConnectionsTable-50] - _ = x[AlterSystemStatementStatisticsAddIndexRecommendations-51] + _ = x[EnableDeclarativeSchemaChanger-22] + _ = x[RowLevelTTL-23] + _ = x[PebbleFormatSplitUserKeysMarked-24] + _ = x[EnableNewStoreRebalancer-25] + _ = x[ClusterLocksVirtualTable-26] + _ = x[AutoStatsTableSettings-27] + _ = x[SuperRegions-28] + _ = x[EnableNewChangefeedOptions-29] + _ = x[SpanCountTable-30] + _ = x[PreSeedSpanCountTable-31] + _ = x[SeedSpanCountTable-32] + _ = x[V22_1-33] + _ = x[Start22_2-34] + _ = x[LocalTimestamps-35] + _ = x[PebbleFormatSplitUserKeysMarkedCompacted-36] + _ = x[EnsurePebbleFormatVersionRangeKeys-37] + _ = x[EnablePebbleFormatVersionRangeKeys-38] + _ = x[TrigramInvertedIndexes-39] + _ = x[RemoveGrantPrivilege-40] + _ = x[MVCCRangeTombstones-41] + _ = x[UpgradeSequenceToBeReferencedByID-42] + _ = x[SampledStmtDiagReqs-43] + _ = x[AddSSTableTombstones-44] + _ = x[SystemPrivilegesTable-45] + _ = x[EnablePredicateProjectionChangefeed-46] + _ = x[AlterSystemSQLInstancesAddLocality-47] + _ = x[SystemExternalConnectionsTable-48] + _ = x[AlterSystemStatementStatisticsAddIndexRecommendations-49] } -const _Key_name = "V21_2Start22_1PebbleFormatBlockPropertyCollectorProbeRequestPublicSchemasWithDescriptorsEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreSCRAMAuthenticationUnsafeLossOfQuorumRecoveryRangeLogAlterSystemProtectedTimestampAddColumnEnableProtectedTimestampsForTenantDeleteCommentsWithDroppedIndexesRemoveIncompatibleDatabasePrivilegesAddRaftAppliedIndexTermMigrationPostAddRaftAppliedIndexTermMigrationDontProposeWriteTimestampForLeaseTransfersEnablePebbleFormatVersionBlockPropertiesMVCCIndexBackfillerEnableLeaseHolderRemovalLooselyCoupledRaftLogTruncationChangefeedIdlenessBackupDoesNotOverwriteLatestAndCheckpointEnableDeclarativeSchemaChangerRowLevelTTLPebbleFormatSplitUserKeysMarkedIncrementalBackupSubdirEnableNewStoreRebalancerClusterLocksVirtualTableAutoStatsTableSettingsSuperRegionsEnableNewChangefeedOptionsSpanCountTablePreSeedSpanCountTableSeedSpanCountTableV22_1Start22_2LocalTimestampsPebbleFormatSplitUserKeysMarkedCompactedEnsurePebbleFormatVersionRangeKeysEnablePebbleFormatVersionRangeKeysTrigramInvertedIndexesRemoveGrantPrivilegeMVCCRangeTombstonesUpgradeSequenceToBeReferencedByIDSampledStmtDiagReqsAddSSTableTombstonesSystemPrivilegesTableEnablePredicateProjectionChangefeedAlterSystemSQLInstancesAddLocalitySystemExternalConnectionsTableAlterSystemStatementStatisticsAddIndexRecommendations" +const _Key_name = "V21_2Start22_1PebbleFormatBlockPropertyCollectorProbeRequestPublicSchemasWithDescriptorsEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreSCRAMAuthenticationUnsafeLossOfQuorumRecoveryRangeLogAlterSystemProtectedTimestampAddColumnEnableProtectedTimestampsForTenantDeleteCommentsWithDroppedIndexesRemoveIncompatibleDatabasePrivilegesAddRaftAppliedIndexTermMigrationPostAddRaftAppliedIndexTermMigrationDontProposeWriteTimestampForLeaseTransfersEnablePebbleFormatVersionBlockPropertiesMVCCIndexBackfillerEnableLeaseHolderRemovalLooselyCoupledRaftLogTruncationChangefeedIdlenessEnableDeclarativeSchemaChangerRowLevelTTLPebbleFormatSplitUserKeysMarkedEnableNewStoreRebalancerClusterLocksVirtualTableAutoStatsTableSettingsSuperRegionsEnableNewChangefeedOptionsSpanCountTablePreSeedSpanCountTableSeedSpanCountTableV22_1Start22_2LocalTimestampsPebbleFormatSplitUserKeysMarkedCompactedEnsurePebbleFormatVersionRangeKeysEnablePebbleFormatVersionRangeKeysTrigramInvertedIndexesRemoveGrantPrivilegeMVCCRangeTombstonesUpgradeSequenceToBeReferencedByIDSampledStmtDiagReqsAddSSTableTombstonesSystemPrivilegesTableEnablePredicateProjectionChangefeedAlterSystemSQLInstancesAddLocalitySystemExternalConnectionsTableAlterSystemStatementStatisticsAddIndexRecommendations" -var _Key_index = [...]uint16{0, 5, 14, 48, 60, 88, 118, 146, 167, 186, 220, 258, 292, 324, 360, 392, 428, 470, 510, 529, 553, 584, 602, 643, 673, 684, 715, 738, 762, 786, 808, 820, 846, 860, 881, 899, 904, 913, 928, 968, 1002, 1036, 1058, 1078, 1097, 1130, 1149, 1169, 1190, 1225, 1259, 1289, 1342} +var _Key_index = [...]uint16{0, 5, 14, 48, 60, 88, 118, 146, 167, 186, 220, 258, 292, 324, 360, 392, 428, 470, 510, 529, 553, 584, 602, 632, 643, 674, 698, 722, 744, 756, 782, 796, 817, 835, 840, 849, 864, 904, 938, 972, 994, 1014, 1033, 1066, 1085, 1105, 1126, 1161, 1195, 1225, 1278} func (i Key) String() string { if i < 0 || i >= Key(len(_Key_index)-1) { diff --git a/pkg/sql/flowinfra/BUILD.bazel b/pkg/sql/flowinfra/BUILD.bazel index f89af91067ca..3657ec52aa13 100644 --- a/pkg/sql/flowinfra/BUILD.bazel +++ b/pkg/sql/flowinfra/BUILD.bazel @@ -48,6 +48,7 @@ go_library( "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_redact//:redact", + "@com_github_gogo_protobuf//proto", "@io_opentelemetry_go_otel//attribute", ], ) diff --git a/pkg/sql/flowinfra/flow_registry.go b/pkg/sql/flowinfra/flow_registry.go index 687d95978399..497e03284c76 100644 --- a/pkg/sql/flowinfra/flow_registry.go +++ b/pkg/sql/flowinfra/flow_registry.go @@ -12,7 +12,6 @@ package flowinfra import ( "context" - "fmt" "sync" "time" @@ -25,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" + "github.com/gogo/protobuf/proto" ) // errNoInboundStreamConnection is the error propagated through the flow when @@ -239,8 +239,20 @@ type flowRetryableError struct { cause error } -func (e *flowRetryableError) Error() string { - return fmt.Sprintf("flow retryable error: %+v", e.cause) +var _ errors.Wrapper = &flowRetryableError{} + +func (e *flowRetryableError) Error() string { return e.cause.Error() } +func (e *flowRetryableError) Cause() error { return e.cause } +func (e *flowRetryableError) Unwrap() error { return e.Cause() } + +func decodeFlowRetryableError( + _ context.Context, cause error, _ string, _ []string, _ proto.Message, +) error { + return &flowRetryableError{cause: cause} +} + +func init() { + errors.RegisterWrapperDecoder(errors.GetTypeKey((*flowRetryableError)(nil)), decodeFlowRetryableError) } // IsFlowRetryableError returns true if an error represents a retryable diff --git a/pkg/storage/metamorphic/operations.go b/pkg/storage/metamorphic/operations.go index 2479ea91f231..ea5602891b65 100644 --- a/pkg/storage/metamorphic/operations.go +++ b/pkg/storage/metamorphic/operations.go @@ -1468,6 +1468,18 @@ var opGenerators = []opGenerator{ sort.Slice(keys, func(i, j int) bool { return keys[i].Less(keys[j]) }) + // An sstable intended for ingest cannot have the same key appear + // multiple times. Remove any duplicates. + n := len(keys) + for i := 1; i < n; { + if keys[i-1].Equal(keys[i]) { + copy(keys[i:], keys[i+1:]) + n-- + } else { + i++ + } + } + keys = keys[:n] return &ingestOp{ m: m, diff --git a/pkg/testutils/lint/lint_test.go b/pkg/testutils/lint/lint_test.go index ad304a62ba7e..ee86c1922cff 100644 --- a/pkg/testutils/lint/lint_test.go +++ b/pkg/testutils/lint/lint_test.go @@ -1193,6 +1193,7 @@ func TestLint(t *testing.T) { ":!spanconfig/errors.go", ":!roachpb/replica_unavailable_error.go", ":!roachpb/ambiguous_result_error.go", + ":!sql/flowinfra/flow_registry.go", ":!sql/pgwire/pgerror/constraint_name.go", ":!sql/pgwire/pgerror/severity.go", ":!sql/pgwire/pgerror/with_candidate_code.go",