Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
85884: backupccl: introduce BACKUP-LOCK file r=adityamaru a=stevendanna

Only one backup job is allowed to write to a backup location.  Prior
to this change, the backup job would rely on the presence of a
BACKUP-CHECKPOINT file to know of the presence of a concurrent backup
job writing to the same location. This was problematic in subtle ways.

In 22.1, we moved backup destination resolution, and the writing of
the checkpoint file to the backup resumer. Before writing the
checkpoint file we would check if anyone else had laid claim to the
location. Now, all operations in a job resumer need to be idempotent
because a job can be resumed an arbitrary number of times, either due
to transient errors or user intervention. One can imagine (and we have
seen more than once in recent roachtests) a situation where a job:

1. Checks for other BACKUP-CHECKPOINT files in the location, but finds none.
2. Writes its own BACKUP-CHECKPOINT file.
3. Gets resumed before it gets to update BackupDetails to indicate it has
completed 1) and 2).

So, when the job repeats 1), it will now see its own BACKUP-CHECKPOINT
file and claim another backup is writing to the location, foolishly locking
itself out.

A similar situation can happen in a mixed version state where the node performs
1) and 2) during planning, and the planner txn retries.

Before we discuss the solution it is important to highlight the mixed
version states to consider:

1) Backups planned/executed by 21.2.x and 22.1.0 nodes will continue
to check BACKUP-CHECKPOINT files before laying claim to a location.

2) Backups planned/executed by 21.2.x and 22.1.0 nodes will continue
to write BACKUP-CHECKPOINT files as their way of claiming a location.

This change introduces a `BACKUP-LOCK` file that going forward will be
used to check and lay claim on a location. The `BACKUP-LOCK` file will
be suffixed with the jobID of the backup job. With this change a backup job will
check for the existence of `BACKUP-LOCK` files suffixed with a job ID other
than their own, before laying claim to a location. We continue to read
the BACKUP-CHECKPOINT file so as to respect the claim laid by backups
started on older binary nodes. Naturally, the job also continues to write
a BACKUP-CHECKPOINT file which prevents older nodes from starting concurrent
backups.

Release note: None

Release justification: This is a forward port of a feature that is
already shipped in 22.1.

Co-authored-by: Aditya Maru <[email protected]>

85940: sql/catalog/typedesc: fix bug which partially hydrated a type r=fqazi a=ajwerner

In rare cases where we hit an error hydrating the contained type of an
array alias type, we'd populate the name, but not the contents.

Fixes #85376

Release note (bug fix): Fixed a rare bug where errors could occur related
to the use of arrays of enums.

85941: sql/types: do not panic formatting unhydrated UDTs r=ajwerner a=ajwerner

Before this patch, if a bug resulted in a UDT's types.T getting to a point
where it was being formatted to a string without being hydrated, a panic would
occur. Now the type is formatted into an OIDTypeReference which is slightly
less pretty, but is valid.

Relates to #85447

Release note: None

85945: sql/catalog/schematelemetry/schematelemetrycontroller: exit on quiesce r=ajwerner a=ajwerner

Relates to #85944. 

Release note: None

Co-authored-by: Steven Danna <[email protected]>
Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
3 people committed Aug 11, 2022
5 parents 33124be + 56fb3d9 + 6b37080 + eeee5a1 + afafb8e commit 47dfddf
Show file tree
Hide file tree
Showing 13 changed files with 536 additions and 124 deletions.
80 changes: 73 additions & 7 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,6 @@ func backup(
encryption *jobspb.BackupEncryptionOptions,
statsCache *stats.TableStatisticsCache,
) (roachpb.RowCount, error) {
// TODO(dan): Figure out how permissions should work. #6713 is tracking this
// for grpc.

resumerSpan := tracing.SpanFromContext(ctx)
var lastCheckpoint time.Time

Expand Down Expand Up @@ -413,16 +410,71 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
kmsEnv := backupencryption.MakeBackupKMSEnv(p.ExecCfg().Settings,
&p.ExecCfg().ExternalIODirConfig, p.ExecCfg().DB, p.User(), p.ExecCfg().InternalExecutor)

// Resolve the backup destination. We can skip this step if we
// have already resolved and persisted the destination either
// during a previous resumption of this job.
defaultURI := details.URI
var backupDest backupdest.ResolvedDestination
if details.URI == "" {
var err error
backupDest, err = backupdest.ResolveDest(ctx, p.User(), details.Destination, details.EndTime,
details.IncrementalFrom, p.ExecCfg())
if err != nil {
return err
}
defaultURI = backupDest.DefaultURI
}

// The backup job needs to lay claim to the bucket it is writing to, to
// prevent concurrent backups from writing to the same location.
//
// If we have already locked the location, either on a previous resume of the
// job or during planning because `clusterversion.BackupResolutionInJob` isn't
// active, we do not want to lock it again.
foundLockFile, err := backupinfo.CheckForBackupLock(ctx, p.ExecCfg(), defaultURI, b.job.ID(), p.User())
if err != nil {
return err
}

// TODO(ssd): If we restricted how old a resumed job could be,
// we could remove the check for details.URI == "". This is
// present to guard against the case where we have already
// written a BACKUP-LOCK file during planning and do not want
// to re-check and re-write the lock file. In that case
// `details.URI` will non-empty.
if details.URI == "" && !foundLockFile {
if err := backupinfo.CheckForPreviousBackup(ctx, p.ExecCfg(), backupDest.DefaultURI, b.job.ID(),
p.User()); err != nil {
return err
}

if err := p.ExecCfg().JobRegistry.CheckPausepoint("backup.before.write_lock"); err != nil {
return err
}

if err := backupinfo.WriteBackupLock(ctx, p.ExecCfg(), backupDest.DefaultURI,
b.job.ID(), p.User()); err != nil {
return err
}

if err := p.ExecCfg().JobRegistry.CheckPausepoint("backup.after.write_lock"); err != nil {
return err
}
}

var backupManifest *backuppb.BackupManifest

// If the backup job has already resolved the destination in a previous
// resumption, we can skip this step.
// Populate the BackupDetails with the resolved backup
// destination, and construct the BackupManifest to be written
// to external storage as a BACKUP-CHECKPOINT. We can skip
// this step if the job has already persisted the resolved
// details and manifest in a prior resumption.
//
// TODO(adityamaru: Break this code block into helper methods.
if details.URI == "" {
initialDetails := details
backupDetails, m, err := getBackupDetailAndManifest(
ctx, p.ExecCfg(), p.Txn(), details, p.User(),
ctx, p.ExecCfg(), p.Txn(), details, p.User(), backupDest,
)
if err != nil {
return err
Expand Down Expand Up @@ -460,12 +512,20 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
}
}

if err := p.ExecCfg().JobRegistry.CheckPausepoint("backup.before.write_first_checkpoint"); err != nil {
return err
}

if err := backupinfo.WriteBackupManifestCheckpoint(
ctx, details.URI, details.EncryptionOptions, &kmsEnv, backupManifest, p.ExecCfg(), p.User(),
); err != nil {
return err
}

if err := p.ExecCfg().JobRegistry.CheckPausepoint("backup.after.write_first_checkpoint"); err != nil {
return err
}

if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return planSchedulePTSChaining(ctx, p.ExecCfg(), txn, &details, b.job.CreatedBy())
}); err != nil {
Expand All @@ -481,6 +541,8 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
// we should just leave the description as-is, since it is just for humans).
description := b.job.Payload().Description
const unresolvedText = "INTO 'LATEST' IN"
// Note, we are using initialDetails below which is a copy of the
// BackupDetails before destination resolution.
if initialDetails.Destination.Subdir == "LATEST" && strings.Count(description, unresolvedText) == 1 {
description = strings.ReplaceAll(description, unresolvedText, fmt.Sprintf("INTO '%s' IN", details.Destination.Subdir))
}
Expand All @@ -500,6 +562,10 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
return err
}

if err := p.ExecCfg().JobRegistry.CheckPausepoint("backup.after.details_has_checkpoint"); err != nil {
return err
}

// Collect telemetry, once per backup after resolving its destination.
lic := utilccl.CheckEnterpriseEnabled(
p.ExecCfg().Settings, p.ExecCfg().NodeInfo.LogicalClusterID(), p.ExecCfg().Organization(), "",
Expand Down Expand Up @@ -565,7 +631,7 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
MaxRetries: 5,
}

if err := p.ExecCfg().JobRegistry.CheckPausepoint("backup.before_flow"); err != nil {
if err := p.ExecCfg().JobRegistry.CheckPausepoint("backup.before.flow"); err != nil {
return err
}

Expand Down
32 changes: 8 additions & 24 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -1132,25 +1132,9 @@ func getBackupDetailAndManifest(
txn *kv.Txn,
initialDetails jobspb.BackupDetails,
user username.SQLUsername,
backupDestination backupdest.ResolvedDestination,
) (jobspb.BackupDetails, backuppb.BackupManifest, error) {
makeCloudStorage := execCfg.DistSQLSrv.ExternalStorageFromURI
// TODO(pbardea): Refactor (defaultURI and urisByLocalityKV) pairs into a
// backupDestination struct.
collectionURI, defaultURI, resolvedSubdir, urisByLocalityKV, prevs, err :=
backupdest.ResolveDest(ctx, user, initialDetails.Destination, initialDetails.EndTime, initialDetails.IncrementalFrom, execCfg)
if err != nil {
return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err
}

defaultStore, err := execCfg.DistSQLSrv.ExternalStorageFromURI(ctx, defaultURI, user)
if err != nil {
return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err
}
defer defaultStore.Close()

if err := backupinfo.CheckForPreviousBackup(ctx, defaultStore, defaultURI); err != nil {
return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err
}

kmsEnv := backupencryption.MakeBackupKMSEnv(execCfg.Settings, &execCfg.ExternalIODirConfig,
execCfg.DB, user, execCfg.InternalExecutor)
Expand All @@ -1159,7 +1143,7 @@ func getBackupDetailAndManifest(
defer mem.Close(ctx)

prevBackups, encryptionOptions, memSize, err := backupinfo.FetchPreviousBackups(ctx, &mem, user,
makeCloudStorage, prevs, *initialDetails.EncryptionOptions, &kmsEnv)
makeCloudStorage, backupDestination.PrevBackupURIs, *initialDetails.EncryptionOptions, &kmsEnv)

if err != nil {
return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err
Expand Down Expand Up @@ -1187,9 +1171,9 @@ func getBackupDetailAndManifest(
}
}

localityKVs := make([]string, len(urisByLocalityKV))
localityKVs := make([]string, len(backupDestination.URIsByLocalityKV))
i := 0
for k := range urisByLocalityKV {
for k := range backupDestination.URIsByLocalityKV {
localityKVs[i] = k
i++
}
Expand Down Expand Up @@ -1234,10 +1218,10 @@ func getBackupDetailAndManifest(
updatedDetails, err := updateBackupDetails(
ctx,
initialDetails,
collectionURI,
defaultURI,
resolvedSubdir,
urisByLocalityKV,
backupDestination.CollectionURI,
backupDestination.DefaultURI,
backupDestination.ChosenSubdir,
backupDestination.URIsByLocalityKV,
prevBackups,
encryptionOptions,
&kmsEnv)
Expand Down
24 changes: 12 additions & 12 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3477,10 +3477,10 @@ func TestBackupTenantsWithRevisionHistory(t *testing.T) {

const msg = "can not backup tenants with revision history"

_, err = sqlDB.DB.ExecContext(ctx, `BACKUP TENANT 10 TO 'nodelocal://0/' WITH revision_history`)
_, err = sqlDB.DB.ExecContext(ctx, `BACKUP TENANT 10 TO 'nodelocal://0/foo' WITH revision_history`)
require.Contains(t, fmt.Sprint(err), msg)

_, err = sqlDB.DB.ExecContext(ctx, `BACKUP TO 'nodelocal://0/' WITH revision_history`)
_, err = sqlDB.DB.ExecContext(ctx, `BACKUP TO 'nodelocal://0/bar' WITH revision_history`)
require.Contains(t, fmt.Sprint(err), msg)
}

Expand Down Expand Up @@ -4016,28 +4016,28 @@ func TestTimestampMismatch(t *testing.T) {
sqlDB.ExpectErr(
t, "backups listed out of order",
`BACKUP DATABASE data TO $1 INCREMENTAL FROM $2`,
localFoo, incrementalT1FromFull,
localFoo+"/missing-initial", incrementalT1FromFull,
)

// Missing an intermediate incremental backup.
sqlDB.ExpectErr(
t, "backups listed out of order",
`BACKUP DATABASE data TO $1 INCREMENTAL FROM $2, $3`,
localFoo, fullBackup, incrementalT2FromT1,
localFoo+"/missing-incremental", fullBackup, incrementalT2FromT1,
)

// Backups specified out of order.
sqlDB.ExpectErr(
t, "out of order",
`BACKUP DATABASE data TO $1 INCREMENTAL FROM $2, $3`,
localFoo, incrementalT1FromFull, fullBackup,
localFoo+"/ooo", incrementalT1FromFull, fullBackup,
)

// Missing data for one table in the most recent backup.
sqlDB.ExpectErr(
t, "previous backup does not contain table",
`BACKUP DATABASE data TO $1 INCREMENTAL FROM $2, $3`,
localFoo, fullBackup, incrementalT3FromT1OneTable,
localFoo+"/missing-table-data", fullBackup, incrementalT3FromT1OneTable,
)
})

Expand Down Expand Up @@ -5963,7 +5963,7 @@ func TestProtectedTimestampsDuringBackup(t *testing.T) {

// Kickoff an incremental backup, but pause it just after it writes its
// protected timestamps.
runner.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.before_flow'`)
runner.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.before.flow'`)

var jobID int
runner.QueryRow(t,
Expand Down Expand Up @@ -6438,7 +6438,7 @@ func TestProtectedTimestampsFailDueToLimits(t *testing.T) {

// Creating the protected timestamp record should fail because there are too
// many spans. Ensure that we get the appropriate error.
_, err := db.Exec(`BACKUP TABLE foo, bar TO 'nodelocal://0/foo'`)
_, err := db.Exec(`BACKUP TABLE foo, bar TO 'nodelocal://0/foo/byte-limit'`)
require.EqualError(t, err, "pq: protectedts: limit exceeded: 0+30 > 1 bytes")

// TODO(adityamaru): Remove in 22.2 once no records protect spans.
Expand All @@ -6459,7 +6459,7 @@ func TestProtectedTimestampsFailDueToLimits(t *testing.T) {

// Creating the protected timestamp record should fail because there are too
// many spans. Ensure that we get the appropriate error.
_, err := db.Exec(`BACKUP TABLE foo, bar TO 'nodelocal://0/foo'`)
_, err := db.Exec(`BACKUP TABLE foo, bar TO 'nodelocal://0/foo/spans-limit'`)
require.EqualError(t, err, "pq: protectedts: limit exceeded: 0+2 > 1 spans")
})
}
Expand Down Expand Up @@ -9437,7 +9437,7 @@ func TestExportRequestBelowGCThresholdOnDataExcludedFromBackup(t *testing.T) {
return nil
})

_, err = conn.Exec(fmt.Sprintf("BACKUP TABLE foo TO $1 AS OF SYSTEM TIME '%s'", tsBefore), localFoo)
_, err = conn.Exec(fmt.Sprintf("BACKUP TABLE foo TO $1 AS OF SYSTEM TIME '%s'", tsBefore), localFoo+"/fail")
testutils.IsError(err, "must be after replica GC threshold")

_, err = conn.Exec(`ALTER TABLE foo SET (exclude_data_from_backup = true)`)
Expand All @@ -9449,7 +9449,7 @@ func TestExportRequestBelowGCThresholdOnDataExcludedFromBackup(t *testing.T) {
return true, nil
})

_, err = conn.Exec(fmt.Sprintf("BACKUP TABLE foo TO $1 AS OF SYSTEM TIME '%s'", tsBefore), localFoo)
_, err = conn.Exec(fmt.Sprintf("BACKUP TABLE foo TO $1 AS OF SYSTEM TIME '%s'", tsBefore), localFoo+"/succeed")
require.NoError(t, err)
}

Expand Down Expand Up @@ -9514,7 +9514,7 @@ func TestExcludeDataFromBackupDoesNotHoldupGC(t *testing.T) {
return true, nil
})

runner.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.before_flow'`)
runner.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.before.flow'`)
if _, err := conn.Exec(`BACKUP DATABASE test INTO $1`, localFoo); !testutils.IsError(err, "pause") {
t.Fatal(err)
}
Expand Down
Loading

0 comments on commit 47dfddf

Please sign in to comment.