diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index fdf88ab6f2dd..d9f3232d9956 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -135,9 +135,6 @@ func backup( encryption *jobspb.BackupEncryptionOptions, statsCache *stats.TableStatisticsCache, ) (roachpb.RowCount, error) { - // TODO(dan): Figure out how permissions should work. #6713 is tracking this - // for grpc. - resumerSpan := tracing.SpanFromContext(ctx) var lastCheckpoint time.Time @@ -413,16 +410,71 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error { kmsEnv := backupencryption.MakeBackupKMSEnv(p.ExecCfg().Settings, &p.ExecCfg().ExternalIODirConfig, p.ExecCfg().DB, p.User(), p.ExecCfg().InternalExecutor) + // Resolve the backup destination. We can skip this step if we + // have already resolved and persisted the destination either + // during a previous resumption of this job. + defaultURI := details.URI + var backupDest backupdest.ResolvedDestination + if details.URI == "" { + var err error + backupDest, err = backupdest.ResolveDest(ctx, p.User(), details.Destination, details.EndTime, + details.IncrementalFrom, p.ExecCfg()) + if err != nil { + return err + } + defaultURI = backupDest.DefaultURI + } + + // The backup job needs to lay claim to the bucket it is writing to, to + // prevent concurrent backups from writing to the same location. + // + // If we have already locked the location, either on a previous resume of the + // job or during planning because `clusterversion.BackupResolutionInJob` isn't + // active, we do not want to lock it again. + foundLockFile, err := backupinfo.CheckForBackupLock(ctx, p.ExecCfg(), defaultURI, b.job.ID(), p.User()) + if err != nil { + return err + } + + // TODO(ssd): If we restricted how old a resumed job could be, + // we could remove the check for details.URI == "". This is + // present to guard against the case where we have already + // written a BACKUP-LOCK file during planning and do not want + // to re-check and re-write the lock file. In that case + // `details.URI` will non-empty. + if details.URI == "" && !foundLockFile { + if err := backupinfo.CheckForPreviousBackup(ctx, p.ExecCfg(), backupDest.DefaultURI, b.job.ID(), + p.User()); err != nil { + return err + } + + if err := p.ExecCfg().JobRegistry.CheckPausepoint("backup.before.write_lock"); err != nil { + return err + } + + if err := backupinfo.WriteBackupLock(ctx, p.ExecCfg(), backupDest.DefaultURI, + b.job.ID(), p.User()); err != nil { + return err + } + + if err := p.ExecCfg().JobRegistry.CheckPausepoint("backup.after.write_lock"); err != nil { + return err + } + } + var backupManifest *backuppb.BackupManifest - // If the backup job has already resolved the destination in a previous - // resumption, we can skip this step. + // Populate the BackupDetails with the resolved backup + // destination, and construct the BackupManifest to be written + // to external storage as a BACKUP-CHECKPOINT. We can skip + // this step if the job has already persisted the resolved + // details and manifest in a prior resumption. // // TODO(adityamaru: Break this code block into helper methods. if details.URI == "" { initialDetails := details backupDetails, m, err := getBackupDetailAndManifest( - ctx, p.ExecCfg(), p.Txn(), details, p.User(), + ctx, p.ExecCfg(), p.Txn(), details, p.User(), backupDest, ) if err != nil { return err @@ -460,12 +512,20 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error { } } + if err := p.ExecCfg().JobRegistry.CheckPausepoint("backup.before.write_first_checkpoint"); err != nil { + return err + } + if err := backupinfo.WriteBackupManifestCheckpoint( ctx, details.URI, details.EncryptionOptions, &kmsEnv, backupManifest, p.ExecCfg(), p.User(), ); err != nil { return err } + if err := p.ExecCfg().JobRegistry.CheckPausepoint("backup.after.write_first_checkpoint"); err != nil { + return err + } + if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { return planSchedulePTSChaining(ctx, p.ExecCfg(), txn, &details, b.job.CreatedBy()) }); err != nil { @@ -481,6 +541,8 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error { // we should just leave the description as-is, since it is just for humans). description := b.job.Payload().Description const unresolvedText = "INTO 'LATEST' IN" + // Note, we are using initialDetails below which is a copy of the + // BackupDetails before destination resolution. if initialDetails.Destination.Subdir == "LATEST" && strings.Count(description, unresolvedText) == 1 { description = strings.ReplaceAll(description, unresolvedText, fmt.Sprintf("INTO '%s' IN", details.Destination.Subdir)) } @@ -500,6 +562,10 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error { return err } + if err := p.ExecCfg().JobRegistry.CheckPausepoint("backup.after.details_has_checkpoint"); err != nil { + return err + } + // Collect telemetry, once per backup after resolving its destination. lic := utilccl.CheckEnterpriseEnabled( p.ExecCfg().Settings, p.ExecCfg().NodeInfo.LogicalClusterID(), p.ExecCfg().Organization(), "", @@ -565,7 +631,7 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error { MaxRetries: 5, } - if err := p.ExecCfg().JobRegistry.CheckPausepoint("backup.before_flow"); err != nil { + if err := p.ExecCfg().JobRegistry.CheckPausepoint("backup.before.flow"); err != nil { return err } diff --git a/pkg/ccl/backupccl/backup_planning.go b/pkg/ccl/backupccl/backup_planning.go index 1077135314a8..f95a48a5d835 100644 --- a/pkg/ccl/backupccl/backup_planning.go +++ b/pkg/ccl/backupccl/backup_planning.go @@ -1132,25 +1132,9 @@ func getBackupDetailAndManifest( txn *kv.Txn, initialDetails jobspb.BackupDetails, user username.SQLUsername, + backupDestination backupdest.ResolvedDestination, ) (jobspb.BackupDetails, backuppb.BackupManifest, error) { makeCloudStorage := execCfg.DistSQLSrv.ExternalStorageFromURI - // TODO(pbardea): Refactor (defaultURI and urisByLocalityKV) pairs into a - // backupDestination struct. - collectionURI, defaultURI, resolvedSubdir, urisByLocalityKV, prevs, err := - backupdest.ResolveDest(ctx, user, initialDetails.Destination, initialDetails.EndTime, initialDetails.IncrementalFrom, execCfg) - if err != nil { - return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err - } - - defaultStore, err := execCfg.DistSQLSrv.ExternalStorageFromURI(ctx, defaultURI, user) - if err != nil { - return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err - } - defer defaultStore.Close() - - if err := backupinfo.CheckForPreviousBackup(ctx, defaultStore, defaultURI); err != nil { - return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err - } kmsEnv := backupencryption.MakeBackupKMSEnv(execCfg.Settings, &execCfg.ExternalIODirConfig, execCfg.DB, user, execCfg.InternalExecutor) @@ -1159,7 +1143,7 @@ func getBackupDetailAndManifest( defer mem.Close(ctx) prevBackups, encryptionOptions, memSize, err := backupinfo.FetchPreviousBackups(ctx, &mem, user, - makeCloudStorage, prevs, *initialDetails.EncryptionOptions, &kmsEnv) + makeCloudStorage, backupDestination.PrevBackupURIs, *initialDetails.EncryptionOptions, &kmsEnv) if err != nil { return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err @@ -1187,9 +1171,9 @@ func getBackupDetailAndManifest( } } - localityKVs := make([]string, len(urisByLocalityKV)) + localityKVs := make([]string, len(backupDestination.URIsByLocalityKV)) i := 0 - for k := range urisByLocalityKV { + for k := range backupDestination.URIsByLocalityKV { localityKVs[i] = k i++ } @@ -1234,10 +1218,10 @@ func getBackupDetailAndManifest( updatedDetails, err := updateBackupDetails( ctx, initialDetails, - collectionURI, - defaultURI, - resolvedSubdir, - urisByLocalityKV, + backupDestination.CollectionURI, + backupDestination.DefaultURI, + backupDestination.ChosenSubdir, + backupDestination.URIsByLocalityKV, prevBackups, encryptionOptions, &kmsEnv) diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 9685c2a15c44..15db56a62a51 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -3477,10 +3477,10 @@ func TestBackupTenantsWithRevisionHistory(t *testing.T) { const msg = "can not backup tenants with revision history" - _, err = sqlDB.DB.ExecContext(ctx, `BACKUP TENANT 10 TO 'nodelocal://0/' WITH revision_history`) + _, err = sqlDB.DB.ExecContext(ctx, `BACKUP TENANT 10 TO 'nodelocal://0/foo' WITH revision_history`) require.Contains(t, fmt.Sprint(err), msg) - _, err = sqlDB.DB.ExecContext(ctx, `BACKUP TO 'nodelocal://0/' WITH revision_history`) + _, err = sqlDB.DB.ExecContext(ctx, `BACKUP TO 'nodelocal://0/bar' WITH revision_history`) require.Contains(t, fmt.Sprint(err), msg) } @@ -4016,28 +4016,28 @@ func TestTimestampMismatch(t *testing.T) { sqlDB.ExpectErr( t, "backups listed out of order", `BACKUP DATABASE data TO $1 INCREMENTAL FROM $2`, - localFoo, incrementalT1FromFull, + localFoo+"/missing-initial", incrementalT1FromFull, ) // Missing an intermediate incremental backup. sqlDB.ExpectErr( t, "backups listed out of order", `BACKUP DATABASE data TO $1 INCREMENTAL FROM $2, $3`, - localFoo, fullBackup, incrementalT2FromT1, + localFoo+"/missing-incremental", fullBackup, incrementalT2FromT1, ) // Backups specified out of order. sqlDB.ExpectErr( t, "out of order", `BACKUP DATABASE data TO $1 INCREMENTAL FROM $2, $3`, - localFoo, incrementalT1FromFull, fullBackup, + localFoo+"/ooo", incrementalT1FromFull, fullBackup, ) // Missing data for one table in the most recent backup. sqlDB.ExpectErr( t, "previous backup does not contain table", `BACKUP DATABASE data TO $1 INCREMENTAL FROM $2, $3`, - localFoo, fullBackup, incrementalT3FromT1OneTable, + localFoo+"/missing-table-data", fullBackup, incrementalT3FromT1OneTable, ) }) @@ -5963,7 +5963,7 @@ func TestProtectedTimestampsDuringBackup(t *testing.T) { // Kickoff an incremental backup, but pause it just after it writes its // protected timestamps. - runner.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.before_flow'`) + runner.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.before.flow'`) var jobID int runner.QueryRow(t, @@ -6438,7 +6438,7 @@ func TestProtectedTimestampsFailDueToLimits(t *testing.T) { // Creating the protected timestamp record should fail because there are too // many spans. Ensure that we get the appropriate error. - _, err := db.Exec(`BACKUP TABLE foo, bar TO 'nodelocal://0/foo'`) + _, err := db.Exec(`BACKUP TABLE foo, bar TO 'nodelocal://0/foo/byte-limit'`) require.EqualError(t, err, "pq: protectedts: limit exceeded: 0+30 > 1 bytes") // TODO(adityamaru): Remove in 22.2 once no records protect spans. @@ -6459,7 +6459,7 @@ func TestProtectedTimestampsFailDueToLimits(t *testing.T) { // Creating the protected timestamp record should fail because there are too // many spans. Ensure that we get the appropriate error. - _, err := db.Exec(`BACKUP TABLE foo, bar TO 'nodelocal://0/foo'`) + _, err := db.Exec(`BACKUP TABLE foo, bar TO 'nodelocal://0/foo/spans-limit'`) require.EqualError(t, err, "pq: protectedts: limit exceeded: 0+2 > 1 spans") }) } @@ -9437,7 +9437,7 @@ func TestExportRequestBelowGCThresholdOnDataExcludedFromBackup(t *testing.T) { return nil }) - _, err = conn.Exec(fmt.Sprintf("BACKUP TABLE foo TO $1 AS OF SYSTEM TIME '%s'", tsBefore), localFoo) + _, err = conn.Exec(fmt.Sprintf("BACKUP TABLE foo TO $1 AS OF SYSTEM TIME '%s'", tsBefore), localFoo+"/fail") testutils.IsError(err, "must be after replica GC threshold") _, err = conn.Exec(`ALTER TABLE foo SET (exclude_data_from_backup = true)`) @@ -9449,7 +9449,7 @@ func TestExportRequestBelowGCThresholdOnDataExcludedFromBackup(t *testing.T) { return true, nil }) - _, err = conn.Exec(fmt.Sprintf("BACKUP TABLE foo TO $1 AS OF SYSTEM TIME '%s'", tsBefore), localFoo) + _, err = conn.Exec(fmt.Sprintf("BACKUP TABLE foo TO $1 AS OF SYSTEM TIME '%s'", tsBefore), localFoo+"/succeed") require.NoError(t, err) } @@ -9514,7 +9514,7 @@ func TestExcludeDataFromBackupDoesNotHoldupGC(t *testing.T) { return true, nil }) - runner.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.before_flow'`) + runner.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.before.flow'`) if _, err := conn.Exec(`BACKUP DATABASE test INTO $1`, localFoo); !testutils.IsError(err, "pause") { t.Fatal(err) } diff --git a/pkg/ccl/backupccl/backupdest/backup_destination.go b/pkg/ccl/backupccl/backupdest/backup_destination.go index af38d2093b9a..ad3a94cef6f5 100644 --- a/pkg/ccl/backupccl/backupdest/backup_destination.go +++ b/pkg/ccl/backupccl/backupdest/backup_destination.go @@ -82,6 +82,27 @@ func containsManifest(ctx context.Context, exportStore cloud.ExternalStorage) (b return true, nil } +// ResolvedDestination encapsulates information that is populated while +// resolving the destination of a backup. +type ResolvedDestination struct { + // collectionURI is the URI pointing to the backup collection. + CollectionURI string + + // defaultURI is the full path of the defaultURI of the backup. + DefaultURI string + + // ChosenSubdir is the automatically chosen suffix within the collection path + // if we're backing up INTO a collection. + ChosenSubdir string + + // URIsByLocalityKV is a mapping from the locality tag to the corresponding + // locality aware backup URI. + URIsByLocalityKV map[string]string + + // PrevBackupURIs is the list of full paths for previous backups in the chain. + PrevBackupURIs []string +} + // ResolveDest resolves the true destination of a backup. The backup command // provided by the user may point to a backup collection, or a backup location // which auto-appends incremental backups to it. This method checks for these @@ -98,25 +119,16 @@ func ResolveDest( endTime hlc.Timestamp, incrementalFrom []string, execCfg *sql.ExecutorConfig, -) ( - collectionURI string, - plannedBackupDefaultURI string, /* the full path for the planned backup */ - /* chosenSuffix is the automatically chosen suffix within the collection path - if we're backing up INTO a collection. */ - chosenSuffix string, - urisByLocalityKV map[string]string, - prevBackupURIs []string, /* list of full paths for previous backups in the chain */ - err error, -) { +) (ResolvedDestination, error) { makeCloudStorage := execCfg.DistSQLSrv.ExternalStorageFromURI defaultURI, _, err := GetURIsByLocalityKV(dest.To, "") if err != nil { - return "", "", "", nil, nil, err + return ResolvedDestination{}, err } - chosenSuffix = dest.Subdir - + var collectionURI string + chosenSuffix := dest.Subdir if chosenSuffix != "" { // The legacy backup syntax, BACKUP TO, leaves the dest.Subdir and collection parameters empty. collectionURI = defaultURI @@ -124,15 +136,15 @@ func ResolveDest( if chosenSuffix == backupbase.LatestFileName { latest, err := ReadLatestFile(ctx, defaultURI, makeCloudStorage, user) if err != nil { - return "", "", "", nil, nil, err + return ResolvedDestination{}, err } chosenSuffix = latest } } - plannedBackupDefaultURI, urisByLocalityKV, err = GetURIsByLocalityKV(dest.To, chosenSuffix) + plannedBackupDefaultURI, urisByLocalityKV, err := GetURIsByLocalityKV(dest.To, chosenSuffix) if err != nil { - return "", "", "", nil, nil, err + return ResolvedDestination{}, err } // At this point, the plannedBackupDefaultURI is the full path for the backup. For BACKUP @@ -140,28 +152,30 @@ func ResolveDest( // plannedBackupDefaultURI will be the full path for this backup in planning. if len(incrementalFrom) != 0 { // Legacy backup with deprecated BACKUP TO-syntax. - prevBackupURIs = incrementalFrom - return collectionURI, plannedBackupDefaultURI, chosenSuffix, urisByLocalityKV, prevBackupURIs, nil + prevBackupURIs := incrementalFrom + return ResolvedDestination{ + CollectionURI: collectionURI, + DefaultURI: plannedBackupDefaultURI, + ChosenSubdir: chosenSuffix, + URIsByLocalityKV: urisByLocalityKV, + PrevBackupURIs: prevBackupURIs, + }, nil } defaultStore, err := makeCloudStorage(ctx, plannedBackupDefaultURI, user) if err != nil { - return "", "", "", nil, nil, err + return ResolvedDestination{}, err } defer defaultStore.Close() exists, err := containsManifest(ctx, defaultStore) if err != nil { - return "", "", "", nil, nil, err + return ResolvedDestination{}, err } - if exists && !dest.Exists && chosenSuffix != "" && execCfg.Settings.Version.IsActive(ctx, - clusterversion.Start22_1) { + if exists && !dest.Exists && chosenSuffix != "" && + execCfg.Settings.Version.IsActive(ctx, clusterversion.Start22_1) { // We disallow a user from writing a full backup to a path in a collection containing an // existing backup iff we're 99.9% confident this backup was planned on a 22.1 node. - return "", - "", - "", - nil, - nil, + return ResolvedDestination{}, errors.Newf("A full backup already exists in %s. "+ "Consider running an incremental backup to this full backup via `BACKUP INTO '%s' IN '%s'`", plannedBackupDefaultURI, chosenSuffix, dest.To[0]) @@ -178,7 +192,7 @@ func ResolveDest( // - 22.2+: the backup will fail unconditionally. // TODO (msbutler): throw error in 22.2 if !featureFullBackupUserSubdir.Get(execCfg.SV()) { - return "", "", "", nil, nil, + return ResolvedDestination{}, errors.Errorf("A full backup cannot be written to %q, a user defined subdirectory. "+ "To take a full backup, remove the subdirectory from the backup command "+ "(i.e. run 'BACKUP ... INTO '). "+ @@ -189,7 +203,13 @@ func ResolveDest( } } // There's no full backup in the resolved subdirectory; therefore, we're conducting a full backup. - return collectionURI, plannedBackupDefaultURI, chosenSuffix, urisByLocalityKV, prevBackupURIs, nil + return ResolvedDestination{ + CollectionURI: collectionURI, + DefaultURI: plannedBackupDefaultURI, + ChosenSubdir: chosenSuffix, + URIsByLocalityKV: urisByLocalityKV, + PrevBackupURIs: nil, + }, nil } // The defaultStore contains a full backup; consequently, we're conducting an incremental backup. @@ -201,28 +221,29 @@ func ResolveDest( dest.To, chosenSuffix) if err != nil { - return "", "", "", nil, nil, err + return ResolvedDestination{}, err } priorsDefaultURI, _, err := GetURIsByLocalityKV(fullyResolvedIncrementalsLocation, "") if err != nil { - return "", "", "", nil, nil, err + return ResolvedDestination{}, err } incrementalStore, err := makeCloudStorage(ctx, priorsDefaultURI, user) if err != nil { - return "", "", "", nil, nil, err + return ResolvedDestination{}, err } defer incrementalStore.Close() priors, err := FindPriorBackups(ctx, incrementalStore, OmitManifest) if err != nil { - return "", "", "", nil, nil, errors.Wrap(err, "adjusting backup destination to append new layer to existing backup") + return ResolvedDestination{}, errors.Wrap(err, "adjusting backup destination to append new layer to existing backup") } + prevBackupURIs := make([]string, 0, len(priors)) for _, prior := range priors { priorURI, err := url.Parse(priorsDefaultURI) if err != nil { - return "", "", "", nil, nil, errors.Wrapf(err, "parsing default backup location %s", + return ResolvedDestination{}, errors.Wrapf(err, "parsing default backup location %s", priorsDefaultURI) } priorURI.Path = backuputils.JoinURLPath(priorURI.Path, prior) @@ -234,9 +255,16 @@ func ResolveDest( partName := endTime.GoTime().Format(backupbase.DateBasedIncFolderName) defaultIncrementalsURI, urisByLocalityKV, err := GetURIsByLocalityKV(fullyResolvedIncrementalsLocation, partName) if err != nil { - return "", "", "", nil, nil, err + return ResolvedDestination{}, err } - return collectionURI, defaultIncrementalsURI, chosenSuffix, urisByLocalityKV, prevBackupURIs, nil + + return ResolvedDestination{ + CollectionURI: collectionURI, + DefaultURI: defaultIncrementalsURI, + ChosenSubdir: chosenSuffix, + URIsByLocalityKV: urisByLocalityKV, + PrevBackupURIs: prevBackupURIs, + }, nil } // ReadLatestFile reads the LATEST file from collectionURI and returns the path diff --git a/pkg/ccl/backupccl/backupdest/backup_destination_test.go b/pkg/ccl/backupccl/backupdest/backup_destination_test.go index feee9df61758..e8204bf4ad45 100644 --- a/pkg/ccl/backupccl/backupdest/backup_destination_test.go +++ b/pkg/ccl/backupccl/backupdest/backup_destination_test.go @@ -123,7 +123,7 @@ func TestBackupRestoreResolveDestination(t *testing.T) { defaultDest, localitiesDest, err := backupdest.GetURIsByLocalityKV(to, "") require.NoError(t, err) - collectionURI, defaultURI, chosenSuffix, urisByLocalityKV, prevBackupURIs, err := backupdest.ResolveDest( + backupDest, err := backupdest.ResolveDest( ctx, username.RootUserName(), jobspb.BackupDetails_Destination{To: to}, endTime, @@ -133,12 +133,12 @@ func TestBackupRestoreResolveDestination(t *testing.T) { require.NoError(t, err) // Not an INTO backup, so no collection of suffix info. - require.Equal(t, "", collectionURI) - require.Equal(t, "", chosenSuffix) + require.Equal(t, "", backupDest.CollectionURI) + require.Equal(t, "", backupDest.ChosenSubdir) - require.Equal(t, defaultDest, defaultURI) - require.Equal(t, localitiesDest, urisByLocalityKV) - require.Equal(t, incrementalFrom, prevBackupURIs) + require.Equal(t, defaultDest, backupDest.DefaultURI) + require.Equal(t, localitiesDest, backupDest.URIsByLocalityKV) + require.Equal(t, incrementalFrom, backupDest.PrevBackupURIs) } // The first initial full backup: BACKUP TO full. @@ -190,7 +190,7 @@ func TestBackupRestoreResolveDestination(t *testing.T) { ) { endTime := hlc.Timestamp{WallTime: backupTime.UnixNano()} - collectionURI, defaultURI, chosenSuffix, urisByLocalityKV, prevBackupURIs, err := backupdest.ResolveDest( + backupDest, err := backupdest.ResolveDest( ctx, username.RootUserName(), jobspb.BackupDetails_Destination{To: to}, endTime, @@ -200,11 +200,11 @@ func TestBackupRestoreResolveDestination(t *testing.T) { require.NoError(t, err) // Not a backup collection. - require.Equal(t, "", collectionURI) - require.Equal(t, "", chosenSuffix) - require.Equal(t, expectedDefault, defaultURI) - require.Equal(t, expectedLocalities, urisByLocalityKV) - require.Equal(t, expectedPrevBackups, prevBackupURIs) + require.Equal(t, "", backupDest.CollectionURI) + require.Equal(t, "", backupDest.ChosenSubdir) + require.Equal(t, expectedDefault, backupDest.DefaultURI) + require.Equal(t, expectedLocalities, backupDest.URIsByLocalityKV) + require.Equal(t, expectedPrevBackups, backupDest.PrevBackupURIs) } // Initial full backup: BACKUP TO baseDir. @@ -336,7 +336,7 @@ func TestBackupRestoreResolveDestination(t *testing.T) { if expectedIncDir != "" { fullBackupExists = true } - collectionURI, defaultURI, chosenSuffix, urisByLocalityKV, prevBackupURIs, err := backupdest.ResolveDest( + backupDest, err := backupdest.ResolveDest( ctx, username.RootUserName(), jobspb.BackupDetails_Destination{To: collectionTo, Subdir: subdir, IncrementalStorage: incrementalTo, Exists: fullBackupExists}, @@ -354,13 +354,11 @@ func TestBackupRestoreResolveDestination(t *testing.T) { localityDests[locality] = u.String() } - require.Equal(t, collectionLoc, collectionURI) - require.Equal(t, expectedSuffix, chosenSuffix) - - require.Equal(t, expectedDefault, defaultURI) - require.Equal(t, localityDests, urisByLocalityKV) - - require.Equal(t, expectedPrevBackups, prevBackupURIs) + require.Equal(t, collectionLoc, backupDest.CollectionURI) + require.Equal(t, expectedSuffix, backupDest.ChosenSubdir) + require.Equal(t, expectedDefault, backupDest.DefaultURI) + require.Equal(t, localityDests, backupDest.URIsByLocalityKV) + require.Equal(t, expectedPrevBackups, backupDest.PrevBackupURIs) } // Initial: BACKUP INTO collection diff --git a/pkg/ccl/backupccl/backupinfo/BUILD.bazel b/pkg/ccl/backupccl/backupinfo/BUILD.bazel index 5692ff983f68..a7ca79d41cfc 100644 --- a/pkg/ccl/backupccl/backupinfo/BUILD.bazel +++ b/pkg/ccl/backupccl/backupinfo/BUILD.bazel @@ -38,6 +38,7 @@ go_library( "//pkg/util/hlc", "//pkg/util/ioctx", "//pkg/util/json", + "//pkg/util/log", "//pkg/util/mon", "//pkg/util/protoutil", "//pkg/util/syncutil", diff --git a/pkg/ccl/backupccl/backupinfo/manifest_handling.go b/pkg/ccl/backupccl/backupinfo/manifest_handling.go index 0c8813e7e580..4179efd966fa 100644 --- a/pkg/ccl/backupccl/backupinfo/manifest_handling.go +++ b/pkg/ccl/backupccl/backupinfo/manifest_handling.go @@ -17,6 +17,7 @@ import ( "fmt" "path" "sort" + "strconv" "strings" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupbase" @@ -42,6 +43,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/ioctx" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -65,6 +67,10 @@ const ( // table statistics for the tables being backed up. BackupStatisticsFileName = "BACKUP-STATISTICS" + // BackupLockFile is the prefix of the file name used by the backup job to + // lock the bucket from running concurrent backups to the same destination. + BackupLockFilePrefix = "BACKUP-LOCK-" + // BackupFormatDescriptorTrackingVersion added tracking of complete DBs. BackupFormatDescriptorTrackingVersion uint32 = 1 @@ -486,6 +492,34 @@ func GetStatisticsFromBackup( return tableStatistics, nil } +// WriteBackupLock is responsible for writing a job ID suffixed +// `BACKUP-LOCK` file that will prevent concurrent backups from writing to the +// same location. +func WriteBackupLock( + ctx context.Context, + execCfg *sql.ExecutorConfig, + defaultURI string, + jobID jobspb.JobID, + user username.SQLUsername, +) error { + ctx, sp := tracing.ChildSpan(ctx, "backupinfo.WriteBackupLock") + defer sp.Finish() + + defaultStore, err := execCfg.DistSQLSrv.ExternalStorageFromURI(ctx, defaultURI, user) + if err != nil { + return err + } + defer defaultStore.Close() + + // The lock file name consists of two parts `BACKUP-LOCK-. + // + // The jobID is used in `checkForPreviousBackups` to ensure that we do not + // read our own lock file on job resumption. + lockFileName := fmt.Sprintf("%s%s", BackupLockFilePrefix, strconv.FormatInt(int64(jobID), 10)) + + return cloud.WriteFile(ctx, defaultStore, lockFileName, bytes.NewReader([]byte("lock"))) +} + // WriteBackupManifest compresses and writes the passed in BackupManifest `desc` // to `exportStore`. func WriteBackupManifest( @@ -892,18 +926,72 @@ func SanitizeLocalityKV(kv string) string { return string(sanitizedKV) } +// CheckForBackupLock returns true if a lock file for this job already exists. +func CheckForBackupLock( + ctx context.Context, + execCfg *sql.ExecutorConfig, + defaultURI string, + jobID jobspb.JobID, + user username.SQLUsername, +) (bool, error) { + defaultStore, err := execCfg.DistSQLSrv.ExternalStorageFromURI(ctx, defaultURI, user) + if err != nil { + return false, err + } + defer defaultStore.Close() + + // Check for the existence of a BACKUP-LOCK file written by our job + // corresponding to `jobID`. If present, we have already laid claim on the + // location and do not need to check further. + lockFileName := fmt.Sprintf("%s%s", BackupLockFilePrefix, strconv.FormatInt(int64(jobID), 10)) + r, err := defaultStore.ReadFile(ctx, lockFileName) + if err == nil { + r.Close(ctx) + return true, nil + } else if errors.Is(err, cloud.ErrFileDoesNotExist) { + return false, nil + } + + return false, err +} + // CheckForPreviousBackup ensures that the target location does not already -// contain a BACKUP or checkpoint, locking out accidental concurrent operations -// on that location. Note that the checkpoint file should be written as soon as -// the job actually starts. +// contain a previous or concurrently running backup. It does this by checking +// for the existence of one of: +// +// 1) BACKUP_MANIFEST: Written on completion of a backup. +// +// 2) BACKUP-LOCK: Written by the coordinator node to lay claim on a backup +// location. This file is suffixed with the ID of the backup job to prevent a +// node from reading its own lock file on job resumption. +// +// 3) BACKUP-CHECKPOINT: Prior to 22.1.1, nodes would use the BACKUP-CHECKPOINT +// to lay claim on a backup location. To account for a mixed-version cluster +// where an older coordinator node may be running a concurrent backup to the +// same location, we must continue to check for a BACKUP-CHECKPOINT file. +// +// NB: The node will continue to write a BACKUP-CHECKPOINT file later in its +// execution, but we do not have to worry about reading our own +// BACKUP-CHECKPOINT file (and locking ourselves out) since +// `checkForPreviousBackup` is invoked as the first step on job resumption, and +// is not called again. func CheckForPreviousBackup( - ctx context.Context, exportStore cloud.ExternalStorage, defaultURI string, + ctx context.Context, + execCfg *sql.ExecutorConfig, + defaultURI string, + jobID jobspb.JobID, + user username.SQLUsername, ) error { ctx, sp := tracing.ChildSpan(ctx, "backupinfo.CheckForPreviousBackup") defer sp.Finish() + defaultStore, err := execCfg.DistSQLSrv.ExternalStorageFromURI(ctx, defaultURI, user) + if err != nil { + return err + } + defer defaultStore.Close() redactedURI := backuputils.RedactURIForErrorMessage(defaultURI) - r, err := exportStore.ReadFile(ctx, backupbase.BackupManifestName) + r, err := defaultStore.ReadFile(ctx, backupbase.BackupManifestName) if err == nil { r.Close(ctx) return pgerror.Newf(pgcode.FileAlreadyExists, @@ -917,7 +1005,38 @@ func CheckForPreviousBackup( redactedURI, backupbase.BackupManifestName) } - r, err = readLatestCheckpointFile(ctx, exportStore, BackupManifestCheckpointName) + // Check for the presence of a BACKUP-LOCK file with a job ID different from + // that of our job. + if err := defaultStore.List(ctx, "", "", func(s string) error { + s = strings.TrimPrefix(s, "/") + if strings.HasPrefix(s, BackupLockFilePrefix) { + jobIDSuffix := strings.TrimPrefix(s, BackupLockFilePrefix) + if len(jobIDSuffix) == 0 { + return errors.AssertionFailedf("malformed BACKUP-LOCK file %s, expected a job ID suffix", s) + } + if jobIDSuffix != strconv.FormatInt(int64(jobID), 10) { + return pgerror.Newf(pgcode.FileAlreadyExists, + "%s already contains a `BACKUP-LOCK` file written by job %s", + redactedURI, jobIDSuffix) + } + } + return nil + }); err != nil { + // HTTP external storage does not support listing, and so we skip checking + // for a BACKUP-LOCK file. + if !errors.Is(err, cloud.ErrListingUnsupported) { + return errors.Wrap(err, "checking for BACKUP-LOCK file") + } + log.Warningf(ctx, "external storage %s does not support listing: skip checking for BACKUP_LOCK", redactedURI) + } + + // Check for a BACKUP-CHECKPOINT that might have been written by a node + // running a pre-22.1.1 binary. + // + // TODO(adityamaru): Delete in 23.1 since we will no longer need to check for + // BACKUP-CHECKPOINT files as all backups will rely on BACKUP-LOCK to lock a + // location. + r, err = readLatestCheckpointFile(ctx, defaultStore, BackupManifestCheckpointName) if err == nil { r.Close(ctx) return pgerror.Newf(pgcode.FileAlreadyExists, diff --git a/pkg/ccl/backupccl/datadriven_test.go b/pkg/ccl/backupccl/datadriven_test.go index b172e48bbce6..f51a794639aa 100644 --- a/pkg/ccl/backupccl/datadriven_test.go +++ b/pkg/ccl/backupccl/datadriven_test.go @@ -271,6 +271,9 @@ func (d *datadrivenTestState) getSQLDB(t *testing.T, server string, user string) // // Supported arguments: // +// + expect-error-regex=: expects the query to return an error with a string +// matching the provided regex +// // + expect-error-ignore: expects the query to return an error, but we will // ignore it. // @@ -288,9 +291,15 @@ func (d *datadrivenTestState) getSQLDB(t *testing.T, server string, user string) // // Supported arguments: // +// + resume=: resumes the job referenced by the tag, use in conjunction +// with wait-for-state. +// // + cancel=: cancels the job referenced by the tag and waits for it to // reach a CANCELED state. // +// + wait-for-state= tag=: wait for +// the job referenced by the tag to reach the specified state. +// // - "let" [args] // Assigns the returned value of the SQL query to the provided args as variables. // @@ -471,6 +480,17 @@ func TestDataDriven(t *testing.T) { return strings.Join(ret, "\n") } + // Check if we are expecting an error, and want to match it against a + // regex. + if d.HasArg("expect-error-regex") { + require.NotNilf(t, err, "expected error") + var expectErrorRegex string + d.ScanArgs(t, "expect-error-regex", &expectErrorRegex) + testutils.IsError(err, expectErrorRegex) + ret = append(ret, "regex matches error") + return strings.Join(ret, "\n") + } + // Check for other errors. if err != nil { if pqErr := (*pq.Error)(nil); errors.As(err, &pqErr) { @@ -674,6 +694,39 @@ func TestDataDriven(t *testing.T) { runner := sqlutils.MakeSQLRunner(ds.getSQLDB(t, server, user)) runner.Exec(t, `CANCEL JOB $1`, jobID) jobutils.WaitForJobToCancel(t, runner, jobID) + } else if d.HasArg("resume") { + var resumeJobTag string + d.ScanArgs(t, "resume", &resumeJobTag) + var jobID jobspb.JobID + var ok bool + if jobID, ok = ds.jobTags[resumeJobTag]; !ok { + t.Fatalf("could not find job with tag %s", resumeJobTag) + } + runner := sqlutils.MakeSQLRunner(ds.getSQLDB(t, server, user)) + runner.Exec(t, `RESUME JOB $1`, jobID) + } else if d.HasArg("wait-for-state") { + var tag string + d.ScanArgs(t, "tag", &tag) + var jobID jobspb.JobID + var ok bool + if jobID, ok = ds.jobTags[tag]; !ok { + t.Fatalf("could not find job with tag %s", tag) + } + runner := sqlutils.MakeSQLRunner(ds.getSQLDB(t, server, user)) + var state string + d.ScanArgs(t, "wait-for-state", &state) + switch state { + case "succeeded": + jobutils.WaitForJobToSucceed(t, runner, jobID) + case "cancelled": + jobutils.WaitForJobToCancel(t, runner, jobID) + case "paused": + jobutils.WaitForJobToPause(t, runner, jobID) + case "failed": + jobutils.WaitForJobToFail(t, runner, jobID) + default: + t.Fatalf("unknown state %s", state) + } } return "" diff --git a/pkg/ccl/backupccl/testdata/backup-restore/lock-concurrent-backups b/pkg/ccl/backupccl/testdata/backup-restore/lock-concurrent-backups new file mode 100644 index 000000000000..2aa38aeb8d45 --- /dev/null +++ b/pkg/ccl/backupccl/testdata/backup-restore/lock-concurrent-backups @@ -0,0 +1,137 @@ +new-server name=s1 +---- + +# Test that a backup job does not read its own lock file on resumption, +# effectively locking itself out. We pause the job after it has written its +# BACKUP-LOCK file and then resume it to ensure we don't read our own write. +subtest backup-does-not-read-its-own-lock + +exec-sql +SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.after.write_lock'; +---- + +backup expect-pausepoint tag=a +BACKUP INTO 'userfile://defaultdb.public.foo/foo'; +---- +job paused at pausepoint + +# The job should have written a `BACKUP-LOCK` file suffixed with a job ID and a +# timestamp. +query-sql +SELECT regexp_replace(filename, '.*BACKUP-LOCK-[0-9]+$', 'BACKUP-LOCK') FROM defaultdb.public.foo_upload_files; +---- +BACKUP-LOCK + +exec-sql +SET CLUSTER SETTING jobs.debug.pausepoints = ''; +---- + +# Resume the job and expect it to succeed. +job resume=a +---- + +job tag=a wait-for-state=succeeded +---- + +subtest end + + +# Test that a backup job on resume will not rewrite the `BACKUP-LOCK` file if it +# already sees one, thus maintaining write-once semantics. +subtest backup-lock-is-write-once + +exec-sql +SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.before.write_first_checkpoint'; +---- + +backup expect-pausepoint tag=b +BACKUP INTO 'userfile://defaultdb.public.bar/bar'; +---- +job paused at pausepoint + +# The job should have written a `BACKUP-LOCK` file suffixed with a job ID and a +# timestamp. +query-sql +SELECT regexp_replace(filename, '.*BACKUP-LOCK-[0-9]+$', 'BACKUP-LOCK') FROM defaultdb.public.bar_upload_files; +---- +BACKUP-LOCK + +# Resume the job and expect it to pause again after writing `BACKUP-LOCK` again. +job resume=b +---- + +job tag=b wait-for-state=paused +---- + +# We expect to see only one lock file since the resumed job would see the +# previously written one. +query-sql +SELECT regexp_replace(filename, '.*BACKUP-LOCK-[0-9]+$', 'BACKUP-LOCK') FROM defaultdb.public.bar_upload_files; +---- +BACKUP-LOCK + +exec-sql +SET CLUSTER SETTING jobs.debug.pausepoints = ''; +---- + +# Resume the job and expect it to succeed. +job resume=b +---- + +job tag=b wait-for-state=succeeded +---- + +subtest end + +# Note, `BACKUP TO` is going away, and `BACKUP INTO` picks a timestamped +# directory making it *impossible* for two backups to write to the same +# directory in the future. +# +# Backup should fail if it sees a BACKUP_LOCK in the bucket. +subtest backup-lock-file-prevents-concurrent-backups + +exec-sql +SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.before.flow'; +---- + +backup expect-pausepoint +BACKUP TO 'userfile://defaultdb.public.baz/baz'; +---- +job paused at pausepoint + +exec-sql expect-error-regex='userfile://defaultdb.public.baz/baz already contains a `BACKUP-LOCK`' +BACKUP TO 'userfile://defaultdb.public.baz/baz'; +---- +regex matches error + +subtest end + +# For mixed version compatability the backup job also checks for a +# `BACKUP-CHECKPOINT` file when ensuring that there are no concurrent backups +# writing to the same bucket. +# +# This test ensures that a backup job does not check for a `BACKUP-CHECKPOINT` +# lock file after writing its own `BACKUP-CHECKPOINT`. +subtest backup-does-not-read-its-own-checkpoint + +exec-sql +SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.after.write_first_checkpoint'; +---- + +backup expect-pausepoint tag=d +BACKUP TO 'userfile://defaultdb.public.bat/bat'; +---- +job paused at pausepoint + +exec-sql +SET CLUSTER SETTING jobs.debug.pausepoints = ''; +---- + +# Resume the job and expect it to succeed. +job resume=d +---- + +job tag=d wait-for-state=succeeded +---- + +subtest end diff --git a/pkg/sql/catalog/schematelemetry/schematelemetrycontroller/controller.go b/pkg/sql/catalog/schematelemetry/schematelemetrycontroller/controller.go index 6b73c49e1255..709add89f5af 100644 --- a/pkg/sql/catalog/schematelemetry/schematelemetrycontroller/controller.go +++ b/pkg/sql/catalog/schematelemetry/schematelemetrycontroller/controller.go @@ -99,19 +99,18 @@ func (c *Controller) Start(ctx context.Context, stopper *stop.Stopper) { // ch is used to notify a goroutine to ensure the schedule exists and // update its recurrence. ch := make(chan struct{}, 1) - stopper.AddCloser(stop.CloserFn(func() { - close(ch) - c.mon.Stop(ctx) - })) + stopper.AddCloser(stop.CloserFn(func() { c.mon.Stop(ctx) })) // Start a goroutine that ensures the presence of the schema telemetry // schedule and updates its recurrence. _ = stopper.RunAsyncTask(ctx, "schema-telemetry-schedule-updater", func(ctx context.Context) { + stopCtx, cancel := stopper.WithCancelOnQuiesce(ctx) + defer cancel() for { select { case <-stopper.ShouldQuiesce(): return case <-ch: - updateSchedule(ctx, c.db, c.ie, c.st) + updateSchedule(stopCtx, c.db, c.ie, c.st) } } }) @@ -174,8 +173,8 @@ func updateSchedule( } sj.SetScheduleStatus(string(jobs.StatusPending)) return sj.Update(ctx, ie, txn) - }); err != nil { - log.Errorf(ctx, "failed to update SQL schema telemetry schedule: %s", err) + }); err != nil && ctx.Err() == nil { + log.Warningf(ctx, "failed to update SQL schema telemetry schedule: %s", err) } else { return } diff --git a/pkg/sql/catalog/typedesc/type_desc.go b/pkg/sql/catalog/typedesc/type_desc.go index 02e77e5a40d4..4e107b80fa95 100644 --- a/pkg/sql/catalog/typedesc/type_desc.go +++ b/pkg/sql/catalog/typedesc/type_desc.go @@ -876,41 +876,52 @@ func (desc *immutable) HydrateTypeInfoWithName( if typ.IsHydrated() { return nil } - typ.TypeMeta.Name = &types.UserDefinedTypeName{ - Catalog: name.Catalog(), - ExplicitSchema: name.ExplicitSchema, - Schema: name.Schema(), - Name: name.Object(), - } - typ.TypeMeta.Version = uint32(desc.Version) + var enumData *types.EnumMetadata switch desc.Kind { case descpb.TypeDescriptor_ENUM, descpb.TypeDescriptor_MULTIREGION_ENUM: if typ.Family() != types.EnumFamily { return errors.New("cannot hydrate a non-enum type with an enum type descriptor") } - typ.TypeMeta.EnumData = &types.EnumMetadata{ + enumData = &types.EnumMetadata{ LogicalRepresentations: desc.logicalReps, PhysicalRepresentations: desc.physicalReps, IsMemberReadOnly: desc.readOnlyMembers, } - return nil case descpb.TypeDescriptor_ALIAS: if typ.UserDefined() { switch typ.Family() { case types.ArrayFamily: // Hydrate the element type. elemType := typ.ArrayContents() - return EnsureTypeIsHydrated(ctx, elemType, res) + if err := EnsureTypeIsHydrated(ctx, elemType, res); err != nil { + return err + } case types.TupleFamily: - return EnsureTypeIsHydrated(ctx, typ, res) + if err := EnsureTypeIsHydrated(ctx, typ, res); err != nil { + return err + } default: return errors.AssertionFailedf("unhandled alias type family %s", typ.Family()) } } - return nil default: return errors.AssertionFailedf("unknown type descriptor kind %s", desc.Kind) } + + // Only hydrate the type if we did not fail to perform any of the above + // steps. If we were to populate these before something that might fail, + // the type may end up partially hydrated. + typ.TypeMeta = types.UserDefinedTypeMetadata{ + Name: &types.UserDefinedTypeName{ + Catalog: name.Catalog(), + ExplicitSchema: name.ExplicitSchema, + Schema: name.Schema(), + Name: name.Object(), + }, + Version: uint32(desc.Version), + EnumData: enumData, + } + return nil } // NumEnumMembers implements the TypeDescriptor interface. diff --git a/pkg/sql/types/types.go b/pkg/sql/types/types.go index 9872796c589b..497715ab9e61 100644 --- a/pkg/sql/types/types.go +++ b/pkg/sql/types/types.go @@ -1861,6 +1861,13 @@ func (t *T) SQLString() string { if t.Oid() == oid.T_anyenum { return "anyenum" } + // We do not expect to be in a situation where we want to format a + // user-defined type to a string and do not have the TypeMeta hydrated, + // but there have been bugs in the past, and returning a less informative + // string is better than a nil-pointer panic. + if t.TypeMeta.Name == nil { + return fmt.Sprintf("@%d", t.Oid()) + } return t.TypeMeta.Name.FQName() } return strings.ToUpper(t.Name()) diff --git a/pkg/sql/types/types_test.go b/pkg/sql/types/types_test.go index 7a43ddaff869..e9c6aa0a4d64 100644 --- a/pkg/sql/types/types_test.go +++ b/pkg/sql/types/types_test.go @@ -1077,3 +1077,12 @@ func TestDelimiter(t *testing.T) { } } } + +// Prior to the patch which introduced this test, the below calls would +// have panicked. +func TestEnumWithoutTypeMetaNameDoesNotPanicInSQLString(t *testing.T) { + typ := MakeEnum(100100, 100101) + require.Equal(t, "@100100", typ.SQLString()) + arrayType := MakeArray(typ) + require.Equal(t, "@100100[]", arrayType.SQLString()) +}