Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
85500: flowinfra: preserve flowRetryableError correctly across network r=yuzefovich a=yuzefovich

This commit makes it so that `flowinfra.flowRetryableError` type is
correctly preserved across network. Previously, if the error originated
on the remote node, the coordinator node would receive
`errbase.opaqueLeaf` error since the decoder method wasn't registered
for the error, now the error is preserved correctly.

Release note: None

85536: backupccl: remove support for pre-22.1 backups r=dt a=dt

Release note: none.

85555: storage: deduplicate ingested keys in MVCC metamorphic tests r=nicktrav a=jbowens

In the MVCC metamorphic tests, deduplicate the keys intended for ingestion.
It's illegal to add the same key twice to an sstable intended for ingestion,
because all keys in an ingested sstable adopt the same sequence number.

Release note: None

Fix #85515.

85557: connectionpb: remove unnecessary import r=adityamaru a=rickystewart

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: David Taylor <[email protected]>
Co-authored-by: Jackson Owens <[email protected]>
Co-authored-by: Ricky Stewart <[email protected]>
  • Loading branch information
5 people committed Aug 3, 2022
5 parents f353698 + 91acca3 + 71becf9 + 1679618 + 432e849 commit 9650b0d
Show file tree
Hide file tree
Showing 15 changed files with 76 additions and 207 deletions.
63 changes: 0 additions & 63 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/cloud/cloudpb"
"github.com/cockroachdb/cockroach/pkg/cloud/gcp"
_ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register cloud storage providers
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/jobs"
Expand All @@ -67,7 +66,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand Down Expand Up @@ -10150,67 +10148,6 @@ func TestBackupNoOverwriteLatest(t *testing.T) {
require.NotEqual(t, firstLatest, thirdLatest)
}

// TestBackupLatestInBaseDirectory tests to see that a LATEST
// file in the base directory can be properly read when one is not found
// in metadata/latest. This can occur when an older version node creates
// the backup.
func TestBackupLatestInBaseDirectory(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

disableUpgradeCh := make(chan struct{})
const numAccounts = 1
const userfile = "'userfile:///a'"
args := base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
BinaryVersionOverride: clusterversion.ByKey(clusterversion.BackupDoesNotOverwriteLatestAndCheckpoint - 1),
DisableAutomaticVersionUpgrade: disableUpgradeCh,
},
},
},
}

tc, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, InitManualReplication, args)
defer cleanupFn()
execCfg := tc.Server(0).ExecutorConfig().(sql.ExecutorConfig)
ctx := context.Background()
store, err := execCfg.DistSQLSrv.ExternalStorageFromURI(ctx, "userfile:///a", username.RootUserName())
require.NoError(t, err)

query := fmt.Sprintf("BACKUP INTO %s", userfile)
sqlDB.Exec(t, query)

// Confirm that the LATEST file was written to the base directory.
r, err := store.ReadFile(ctx, backupbase.LatestFileName)
require.NoError(t, err)
r.Close(ctx)

// Close the channel so that the cluster version is upgraded.
close(disableUpgradeCh)
// Check the cluster version is bumped to newVersion.
testutils.SucceedsSoon(t, func() error {
var version string
sqlDB.QueryRow(t, "SELECT value FROM system.settings WHERE name = 'version'").Scan(&version)
var v clusterversion.ClusterVersion
if err := protoutil.Unmarshal([]byte(version), &v); err != nil {
return err
}
version = v.String()
if version != clusterversion.TestingBinaryVersion.String() {
return errors.Errorf("cluster version is still %s, should be %s", version, clusterversion.TestingBinaryVersion.String())
}
return nil
})

// Take an incremental backup on the new version using the latest file
// written by the old version in the base directory.
query = fmt.Sprintf("BACKUP INTO LATEST IN %s", userfile)
sqlDB.Exec(t, query)

}

// TestBackupRestoreTelemetryEvents tests that BACKUP and RESTORE correctly
// publishes telemetry events.
func TestBackupRestoreTelemetryEvents(t *testing.T) {
Expand Down
2 changes: 0 additions & 2 deletions pkg/ccl/backupccl/backupdest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ go_test(
"//pkg/ccl/utilccl",
"//pkg/cloud",
"//pkg/cloud/impl:cloudimpl",
"//pkg/clusterversion",
"//pkg/jobs/jobspb",
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/security/username",
Expand Down
7 changes: 0 additions & 7 deletions pkg/ccl/backupccl/backupdest/backup_destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,13 +325,6 @@ func FindLatestFile(
func WriteNewLatestFile(
ctx context.Context, settings *cluster.Settings, exportStore cloud.ExternalStorage, suffix string,
) error {
// If the cluster is still running on a mixed version, we want to write
// to the base directory instead of the metadata/latest directory. That
// way an old node can still find the LATEST file.
if !settings.Version.IsActive(ctx, clusterversion.BackupDoesNotOverwriteLatestAndCheckpoint) {
return cloud.WriteFile(ctx, exportStore, backupbase.LatestFileName, strings.NewReader(suffix))
}

// HTTP storage does not support listing and so we cannot rely on the
// above-mentioned List method to return us the most recent latest file.
// Instead, we disregard write once semantics and always read and write
Expand Down
63 changes: 13 additions & 50 deletions pkg/ccl/backupccl/backupdest/backup_destination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuputils"
"github.com/cockroachdb/cockroach/pkg/cloud"
_ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register cloud storage providers
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -298,7 +296,6 @@ func TestBackupRestoreResolveDestination(t *testing.T) {
inc8Time := full3Time.Add(time.Minute * 30)
inc9Time := inc8Time.Add(time.Minute * 30)
full4Time := inc9Time.Add(time.Minute * 30)
inc10Time := full4Time.Add(time.Minute * 30)

// firstBackupChain is maintained throughout the tests as the history of
// backups that were taken based on the initial full backup.
Expand All @@ -311,14 +308,12 @@ func TestBackupRestoreResolveDestination(t *testing.T) {
// separate path relative to the full backup in their chain. Otherwise,
// it should be an empty array of strings
noIncrementalStorage := []string(nil)
incrementalBackupSubdirEnabled := true
notIncrementalBackupSubdirEnabled := false

firstRemoteBackupChain := []string(nil)

testCollectionBackup := func(t *testing.T, backupTime time.Time,
expectedDefault, expectedSuffix, expectedIncDir string, expectedPrevBackups []string,
appendToLatest bool, subdir string, incrementalTo []string, incrementalBackupSubdirEnabled bool) {
appendToLatest bool, subdir string, incrementalTo []string) {

endTime := hlc.Timestamp{WallTime: backupTime.UnixNano()}
incrementalFrom := []string(nil)
Expand All @@ -336,17 +331,6 @@ func TestBackupRestoreResolveDestination(t *testing.T) {
_, localityCollections, err = backupdest.GetURIsByLocalityKV(incrementalTo, "")
require.NoError(t, err)
}
currentVersion := execCfg.Settings.Version.ActiveVersion(ctx)
if !incrementalBackupSubdirEnabled {
// Downgrading is disallowed in normal operation, but fine for testing here.
err = execCfg.Settings.Version.SetActiveVersion(ctx, clusterversion.ClusterVersion{
Version: roachpb.Version{
Major: 21,
Minor: 2,
},
})
require.NoError(t, err)
}

fullBackupExists := false
if expectedIncDir != "" {
Expand All @@ -360,9 +344,7 @@ func TestBackupRestoreResolveDestination(t *testing.T) {
incrementalFrom,
&execCfg,
)
clusterErr := execCfg.Settings.Version.SetActiveVersion(ctx, currentVersion)
require.NoError(t, err)
require.NoError(t, clusterErr)

localityDests := make(map[string]string, len(localityCollections))
for locality, localityDest := range localityCollections {
Expand All @@ -389,7 +371,7 @@ func TestBackupRestoreResolveDestination(t *testing.T) {

testCollectionBackup(t, fullTime,
expectedDefault, expectedSuffix, expectedIncDir, firstBackupChain,
false /* intoLatest */, noExplicitSubDir, noIncrementalStorage, incrementalBackupSubdirEnabled)
false /* intoLatest */, noExplicitSubDir, noIncrementalStorage)
firstBackupChain = append(firstBackupChain, expectedDefault)
firstRemoteBackupChain = append(firstRemoteBackupChain, expectedDefault)
writeManifest(t, expectedDefault)
Expand All @@ -406,7 +388,7 @@ func TestBackupRestoreResolveDestination(t *testing.T) {

testCollectionBackup(t, inc1Time,
expectedDefault, expectedSuffix, expectedIncDir, firstBackupChain,
true /* intoLatest */, noExplicitSubDir, defaultIncrementalTo, incrementalBackupSubdirEnabled)
true /* intoLatest */, noExplicitSubDir, defaultIncrementalTo)
firstBackupChain = append(firstBackupChain, expectedDefault)
writeManifest(t, expectedDefault)
}
Expand All @@ -420,7 +402,7 @@ func TestBackupRestoreResolveDestination(t *testing.T) {

testCollectionBackup(t, inc2Time,
expectedDefault, expectedSuffix, expectedIncDir, firstBackupChain,
true /* intoLatest */, noExplicitSubDir, defaultIncrementalTo, incrementalBackupSubdirEnabled)
true /* intoLatest */, noExplicitSubDir, defaultIncrementalTo)
firstBackupChain = append(firstBackupChain, expectedDefault)
writeManifest(t, expectedDefault)
}
Expand All @@ -435,7 +417,7 @@ func TestBackupRestoreResolveDestination(t *testing.T) {

testCollectionBackup(t, full2Time,
expectedDefault, expectedSuffix, expectedIncDir, []string(nil),
false /* intoLatest */, noExplicitSubDir, noIncrementalStorage, incrementalBackupSubdirEnabled)
false /* intoLatest */, noExplicitSubDir, noIncrementalStorage)
writeManifest(t, expectedDefault)
// We also wrote a new full backup, so let's update the latest.
writeLatest(t, collectionLoc, expectedSuffix)
Expand All @@ -450,7 +432,7 @@ func TestBackupRestoreResolveDestination(t *testing.T) {

testCollectionBackup(t, inc3Time,
expectedDefault, expectedSuffix, expectedIncDir, []string{backup2Location},
true /* intoLatest */, noExplicitSubDir, defaultIncrementalTo, incrementalBackupSubdirEnabled)
true /* intoLatest */, noExplicitSubDir, defaultIncrementalTo)
writeManifest(t, expectedDefault)
}

Expand All @@ -463,7 +445,7 @@ func TestBackupRestoreResolveDestination(t *testing.T) {

testCollectionBackup(t, inc4Time,
expectedDefault, expectedSuffix, expectedIncDir, firstBackupChain,
false /* intoLatest */, expectedSubdir, defaultIncrementalTo, incrementalBackupSubdirEnabled)
false /* intoLatest */, expectedSubdir, defaultIncrementalTo)
writeManifest(t, expectedDefault)
}

Expand All @@ -479,7 +461,7 @@ func TestBackupRestoreResolveDestination(t *testing.T) {

testCollectionBackup(t, inc5Time,
expectedDefault, expectedSuffix, expectedIncDir, firstRemoteBackupChain,
false /* intoLatest */, expectedSubdir, customIncrementalTo, incrementalBackupSubdirEnabled)
false /* intoLatest */, expectedSubdir, customIncrementalTo)
writeManifest(t, expectedDefault)

firstRemoteBackupChain = append(firstRemoteBackupChain, expectedDefault)
Expand All @@ -497,7 +479,7 @@ func TestBackupRestoreResolveDestination(t *testing.T) {

testCollectionBackup(t, inc6Time,
expectedDefault, expectedSuffix, expectedIncDir, firstRemoteBackupChain,
false /* intoLatest */, expectedSubdir, customIncrementalTo, incrementalBackupSubdirEnabled)
false /* intoLatest */, expectedSubdir, customIncrementalTo)
writeManifest(t, expectedDefault)
}

Expand All @@ -514,7 +496,7 @@ func TestBackupRestoreResolveDestination(t *testing.T) {

testCollectionBackup(t, inc7Time,
expectedDefault, expectedSuffix, expectedIncDir, []string{backup2Location},
true /* intoLatest */, expectedSubdir, customIncrementalTo, incrementalBackupSubdirEnabled)
true /* intoLatest */, expectedSubdir, customIncrementalTo)
writeManifest(t, expectedDefault)
}

Expand All @@ -528,7 +510,7 @@ func TestBackupRestoreResolveDestination(t *testing.T) {

testCollectionBackup(t, full3Time,
expectedDefault, expectedSuffix, expectedIncDir, []string(nil),
false /* intoLatest */, noExplicitSubDir, noIncrementalStorage, incrementalBackupSubdirEnabled)
false /* intoLatest */, noExplicitSubDir, noIncrementalStorage)
writeManifest(t, expectedDefault)
// We also wrote a new full backup, so let's update the latest.
writeLatest(t, collectionLoc, expectedSuffix)
Expand Down Expand Up @@ -557,42 +539,23 @@ func TestBackupRestoreResolveDestination(t *testing.T) {

testCollectionBackup(t, inc9Time,
expectedDefault, expectedSuffix, expectedIncDir, []string{backup3Location, oldStyleDefault},
true /* intoLatest */, expectedSubdir, noIncrementalStorage, incrementalBackupSubdirEnabled)
true /* intoLatest */, expectedSubdir, noIncrementalStorage)
writeManifest(t, expectedDefault)
}

// A new full backup: BACKUP INTO collection
var backup4Location string
{
expectedSuffix := "/2020/12/25-120000.00"
expectedIncDir := ""
expectedDefault := fmt.Sprintf("nodelocal://1/%s%s?AUTH=implicit", t.Name(), expectedSuffix)
backup4Location = expectedDefault

testCollectionBackup(t, full4Time,
expectedDefault, expectedSuffix, expectedIncDir, []string(nil),
false /* intoLatest */, noExplicitSubDir, noIncrementalStorage, notIncrementalBackupSubdirEnabled)
false /* intoLatest */, noExplicitSubDir, noIncrementalStorage)
writeManifest(t, expectedDefault)
// We also wrote a new full backup, so let's update the latest.
writeLatest(t, collectionLoc, expectedSuffix)
}

// An automatic incremental into the fourth full backup: BACKUP INTO LATEST
// IN collection, BUT simulating an old cluster that doesn't support
// dedicated incrementals subdir by default yet.
{
expectedSuffix := "/2020/12/25-120000.00"
expectedIncDir := "/20201225/123000.00"
expectedSubdir := expectedSuffix
expectedDefault := fmt.Sprintf("nodelocal://1/%s%s%s?AUTH=implicit",
t.Name(),
expectedSuffix, expectedIncDir)

testCollectionBackup(t, inc10Time,
expectedDefault, expectedSuffix, expectedIncDir, []string{backup4Location},
true /* intoLatest */, expectedSubdir, noIncrementalStorage, notIncrementalBackupSubdirEnabled)
writeManifest(t, expectedDefault)
}
})
})
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/ccl/backupccl/backupdest/incrementals.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupbase"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuputils"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -166,9 +165,8 @@ func ResolveIncrementalsBackupLocation(
"Please choose a location manually with the `incremental_location` parameter.")
}

// If the cluster isn't fully migrated, or we have backups in the old default
// location, continue to use the old location.
if len(prevOld) > 0 || !execCfg.Settings.Version.IsActive(ctx, clusterversion.IncrementalBackupSubdir) {
// If we have backups in the old default location, continue to use the old location.
if len(prevOld) > 0 {
return resolvedIncrementalsBackupLocationOld, nil
}

Expand Down
1 change: 0 additions & 1 deletion pkg/ccl/backupccl/backupinfo/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ go_library(
"//pkg/ccl/storageccl",
"//pkg/cloud",
"//pkg/cloud/cloudpb",
"//pkg/clusterversion",
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/roachpb",
Expand Down
21 changes: 0 additions & 21 deletions pkg/ccl/backupccl/backupinfo/manifest_handling.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/cloud/cloudpb"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
Expand Down Expand Up @@ -984,26 +983,6 @@ func WriteBackupManifestCheckpoint(
}
}

// If the cluster is still running on a mixed version, we want to write
// to the base directory instead of the progress directory. That way if
// an old node resumes a backup, it doesn't have to start over.
if !execCfg.Settings.Version.IsActive(ctx, clusterversion.BackupDoesNotOverwriteLatestAndCheckpoint) {
// We want to overwrite the latest checkpoint in the base directory,
// just write to the non versioned BACKUP-CHECKPOINT file.
err = cloud.WriteFile(ctx, defaultStore, BackupManifestCheckpointName, bytes.NewReader(descBuf))
if err != nil {
return err
}

checksum, err := GetChecksum(descBuf)
if err != nil {
return err
}

return cloud.WriteFile(ctx, defaultStore, BackupManifestCheckpointName+
BackupManifestChecksumSuffix, bytes.NewReader(checksum))
}

// We timestamp the checkpoint files in order to enforce write once backups.
// When the job goes to read these timestamped files, it will List
// the checkpoints and pick the file whose name is lexicographically
Expand Down
10 changes: 2 additions & 8 deletions pkg/cloud/externalconn/connectionpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@ proto_library(
srcs = ["connection.proto"],
strip_import_prefix = "/pkg",
visibility = ["//visibility:public"],
deps = [
"//pkg/cloud/cloudpb:cloudpb_proto",
"@com_github_gogo_protobuf//gogoproto:gogo_proto",
],
deps = ["@com_github_gogo_protobuf//gogoproto:gogo_proto"],
)

go_proto_library(
Expand All @@ -20,10 +17,7 @@ go_proto_library(
importpath = "github.com/cockroachdb/cockroach/pkg/cloud/externalconn/connectionpb",
proto = ":connectionpb_proto",
visibility = ["//visibility:public"],
deps = [
"//pkg/cloud/cloudpb",
"@com_github_gogo_protobuf//gogoproto",
],
deps = ["@com_github_gogo_protobuf//gogoproto"],
)

go_library(
Expand Down
1 change: 0 additions & 1 deletion pkg/cloud/externalconn/connectionpb/connection.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package cockroach.cloud.externalconn.connectionpb;
option go_package = "connectionpb";

import "gogoproto/gogo.proto";
import "cloud/cloudpb/external_storage.proto";

// ConnectionType is the type of the External Connection object.
enum ConnectionType {
Expand Down
Loading

0 comments on commit 9650b0d

Please sign in to comment.