Skip to content

Commit

Permalink
backupccl: refactor getBackupDetailsAndManifest
Browse files Browse the repository at this point in the history
This should be a complete no-op from a functionality perspective.
The smaller, more encapsulated pieces should be easier to reason about.

Release note: None
  • Loading branch information
benbardin committed Mar 30, 2022
1 parent a2d43c8 commit 14551a9
Showing 1 changed file with 157 additions and 95 deletions.
252 changes: 157 additions & 95 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -1524,36 +1524,10 @@ func getBackupDetailAndManifest(
user security.SQLUsername,
) (jobspb.BackupDetails, BackupManifest, error) {
makeCloudStorage := execCfg.DistSQLSrv.ExternalStorageFromURI

mvccFilter := MVCCFilter_Latest
if initialDetails.RevisionHistory {
mvccFilter = MVCCFilter_All
}
endTime := initialDetails.EndTime
var targetDescs []catalog.Descriptor
var descriptorProtos []descpb.Descriptor
if initialDetails.FullCluster {
var err error
targetDescs, _, err = fullClusterTargetsBackup(ctx, execCfg, endTime)
if err != nil {
return jobspb.BackupDetails{}, BackupManifest{}, err
}
descriptorProtos = make([]descpb.Descriptor, len(targetDescs))
for i, desc := range targetDescs {
descriptorProtos[i] = *desc.DescriptorProto()
}
} else {
descriptorProtos = initialDetails.ResolvedTargets
targetDescs = make([]catalog.Descriptor, len(descriptorProtos))
for i := range descriptorProtos {
targetDescs[i] = descbuilder.NewBuilder(&descriptorProtos[i]).BuildExistingMutable()
}
}

// TODO(pbardea): Refactor (defaultURI and urisByLocalityKV) pairs into a
// backupDestination struct.
collectionURI, defaultURI, resolvedSubdir, urisByLocalityKV, prevs, err :=
resolveDest(ctx, user, initialDetails.Destination, endTime, initialDetails.IncrementalFrom, execCfg)
resolveDest(ctx, user, initialDetails.Destination, initialDetails.EndTime, initialDetails.IncrementalFrom, execCfg)
if err != nil {
return jobspb.BackupDetails{}, BackupManifest{}, err
}
Expand All @@ -1575,6 +1549,7 @@ func getBackupDetailAndManifest(

prevBackups, encryptionOptions, memSize, err := fetchPreviousBackups(ctx, &mem, user,
makeCloudStorage, prevs, *initialDetails.EncryptionOptions, kmsEnv)

if err != nil {
return jobspb.BackupDetails{}, BackupManifest{}, err
}
Expand All @@ -1588,91 +1563,163 @@ func getBackupDetailAndManifest(
!initialDetails.FullCluster {
return jobspb.BackupDetails{}, BackupManifest{}, errors.Errorf("cannot append a backup of specific tables or databases to a cluster backup")
}
}

var startTime hlc.Timestamp
if len(prevBackups) > 0 {
if err := requireEnterprise(execCfg, "incremental"); err != nil {
return jobspb.BackupDetails{}, BackupManifest{}, err
}
startTime = prevBackups[len(prevBackups)-1].EndTime
}

var tables []catalog.TableDescriptor
statsFiles := make(map[descpb.ID]string)
for _, desc := range targetDescs {
switch desc := desc.(type) {
case catalog.TableDescriptor:
tables = append(tables, desc)
// TODO (anzo): look into the tradeoffs of having all objects in the array to be in the same file,
// vs having each object in a separate file, or somewhere in between.
statsFiles[desc.GetID()] = backupStatisticsFileName
}
}

clusterID := execCfg.ClusterID()
for i := range prevBackups {
// IDs are how we identify tables, and those are only meaningful in the
// context of their own cluster, so we need to ensure we only allow
// incremental previous backups that we created.
if fromCluster := prevBackups[i].ClusterID; !fromCluster.Equal(clusterID) {
if fromCluster := prevBackups[i].ClusterID; !fromCluster.Equal(execCfg.ClusterID()) {
return jobspb.BackupDetails{}, BackupManifest{}, errors.Newf("previous BACKUP belongs to cluster %s", fromCluster.String())
}
}

var newSpans roachpb.Spans

var priorIDs map[descpb.ID]descpb.ID
// updatedDetails and backupManifest should be treated as read-only after
// they're returned from their respective functions. Future changes to those
// objects should be made within those functions.
updatedDetails, err := updateBackupDetails(
ctx,
initialDetails,
collectionURI,
defaultURI,
resolvedSubdir,
urisByLocalityKV,
prevBackups,
encryptionOptions,
kmsEnv)
if err != nil {
return jobspb.BackupDetails{}, BackupManifest{}, err
}

var revs []BackupManifest_DescriptorRevision
if mvccFilter == MVCCFilter_All {
priorIDs = make(map[descpb.ID]descpb.ID)
revs, err = getRelevantDescChanges(ctx, execCfg, startTime, endTime, targetDescs,
initialDetails.ResolvedCompleteDbs, priorIDs, initialDetails.FullCluster)
if err != nil {
return jobspb.BackupDetails{}, BackupManifest{}, err
}
backupManifest, err := createBackupManifest(
ctx,
execCfg,
txn,
updatedDetails,
prevBackups)
if err != nil {
return jobspb.BackupDetails{}, BackupManifest{}, err
}

return updatedDetails, backupManifest, nil
}

func getTenantInfo(
ctx context.Context, execCfg *sql.ExecutorConfig, txn *kv.Txn, jobDetails jobspb.BackupDetails,
) ([]roachpb.Span, []descpb.TenantInfoWithUsage, error) {
var spans []roachpb.Span
var tenants []descpb.TenantInfoWithUsage

if initialDetails.FullCluster && execCfg.Codec.ForSystemTenant() {
var err error
if jobDetails.FullCluster && execCfg.Codec.ForSystemTenant() {
// Include all tenants.
tenants, err = retrieveAllTenantsMetadata(
ctx, execCfg.InternalExecutor, txn,
)
if err != nil {
return jobspb.BackupDetails{}, BackupManifest{}, err
return nil, nil, err
}
} else if len(initialDetails.SpecificTenantIds) > 0 {
for _, id := range initialDetails.SpecificTenantIds {
} else if len(jobDetails.SpecificTenantIds) > 0 {
for _, id := range jobDetails.SpecificTenantIds {
tenantInfo, err := retrieveSingleTenantMetadata(
ctx, execCfg.InternalExecutor, txn, id,
)
if err != nil {
return jobspb.BackupDetails{}, BackupManifest{}, err
return nil, nil, err
}
tenants = append(tenants, tenantInfo)
}
}
if len(tenants) > 0 && jobDetails.RevisionHistory {
return spans, tenants, errors.UnimplementedError(
errors.IssueLink{IssueURL: "https://github.com/cockroachdb/cockroach/issues/47896"},
"can not backup tenants with revision history",
)
}
for i := range tenants {
prefix := keys.MakeTenantPrefix(roachpb.MakeTenantID(tenants[i].ID))
spans = append(spans, roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()})
}
return spans, tenants, nil
}

if len(tenants) > 0 {
if initialDetails.RevisionHistory {
return jobspb.BackupDetails{}, BackupManifest{}, errors.UnimplementedError(
errors.IssueLink{IssueURL: "https://github.com/cockroachdb/cockroach/issues/47896"},
"can not backup tenants with revision history",
)
func createBackupManifest(
ctx context.Context,
execCfg *sql.ExecutorConfig,
txn *kv.Txn,
jobDetails jobspb.BackupDetails,
prevBackups []BackupManifest,
) (BackupManifest, error) {
mvccFilter := MVCCFilter_Latest
if jobDetails.RevisionHistory {
mvccFilter = MVCCFilter_All
}
endTime := jobDetails.EndTime
var targetDescs []catalog.Descriptor
var descriptorProtos []descpb.Descriptor
var err error
if jobDetails.FullCluster {
var err error
targetDescs, _, err = fullClusterTargetsBackup(ctx, execCfg, endTime)
if err != nil {
return BackupManifest{}, err
}
for i := range tenants {
prefix := keys.MakeTenantPrefix(roachpb.MakeTenantID(tenants[i].ID))
spans = append(spans, roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()})
descriptorProtos = make([]descpb.Descriptor, len(targetDescs))
for i, desc := range targetDescs {
descriptorProtos[i] = *desc.DescriptorProto()
}
} else {
descriptorProtos = jobDetails.ResolvedTargets
targetDescs = make([]catalog.Descriptor, len(descriptorProtos))
for i := range descriptorProtos {
targetDescs[i] = descbuilder.NewBuilder(&descriptorProtos[i]).BuildExistingMutable()
}
}

startTime := jobDetails.StartTime

var tables []catalog.TableDescriptor
statsFiles := make(map[descpb.ID]string)
for _, desc := range targetDescs {
switch desc := desc.(type) {
case catalog.TableDescriptor:
tables = append(tables, desc)
// TODO (anzo): look into the tradeoffs of having all objects in the array to be in the same file,
// vs having each object in a separate file, or somewhere in between.
statsFiles[desc.GetID()] = backupStatisticsFileName
}
}

var newSpans roachpb.Spans
var priorIDs map[descpb.ID]descpb.ID

var revs []BackupManifest_DescriptorRevision
if mvccFilter == MVCCFilter_All {
priorIDs = make(map[descpb.ID]descpb.ID)
revs, err = getRelevantDescChanges(ctx, execCfg, startTime, endTime, targetDescs,
jobDetails.ResolvedCompleteDbs, priorIDs, jobDetails.FullCluster)
if err != nil {
return BackupManifest{}, err
}
}

var spans []roachpb.Span
var tenants []descpb.TenantInfoWithUsage
tenantSpans, tenantInfos, err := getTenantInfo(
ctx, execCfg, txn, jobDetails,
)
if err != nil {
return BackupManifest{}, err
}
spans = append(spans, tenantSpans...)
tenants = append(tenants, tenantInfos...)

tableSpans, err := spansForAllTableIndexes(execCfg, tables, revs)
if err != nil {
return jobspb.BackupDetails{}, BackupManifest{}, err
return BackupManifest{}, err
}
spans = append(spans, tableSpans...)

Expand All @@ -1689,22 +1736,22 @@ func getBackupDetailAndManifest(
dbsInPrev[d] = struct{}{}
}

if !initialDetails.FullCluster {
if !jobDetails.FullCluster {
if err := checkForNewTables(ctx, execCfg.Codec, execCfg.DB, targetDescs, tablesInPrev, dbsInPrev, priorIDs, startTime, endTime); err != nil {
return jobspb.BackupDetails{}, BackupManifest{}, err
return BackupManifest{}, err
}
// Let's check that we're not widening the scope of this backup to an
// entire database, even if no tables were created in the meantime.
if err := checkForNewCompleteDatabases(targetDescs, initialDetails.ResolvedCompleteDbs, dbsInPrev); err != nil {
return jobspb.BackupDetails{}, BackupManifest{}, err
if err := checkForNewCompleteDatabases(targetDescs, jobDetails.ResolvedCompleteDbs, dbsInPrev); err != nil {
return BackupManifest{}, err
}
}

newSpans = filterSpans(spans, prevBackups[len(prevBackups)-1].Spans)

tableSpans, err := getReintroducedSpans(ctx, execCfg, prevBackups, tables, revs, endTime)
if err != nil {
return jobspb.BackupDetails{}, BackupManifest{}, err
return BackupManifest{}, err
}
newSpans = append(newSpans, tableSpans...)
}
Expand All @@ -1717,7 +1764,7 @@ func getBackupDetailAndManifest(
// of requiring full backups after schema changes remains.

coverage := tree.RequestedDescriptors
if initialDetails.FullCluster {
if jobDetails.FullCluster {
coverage = tree.AllDescriptors
}

Expand All @@ -1728,7 +1775,7 @@ func getBackupDetailAndManifest(
Descriptors: descriptorProtos,
Tenants: tenants,
DescriptorChanges: revs,
CompleteDbs: initialDetails.ResolvedCompleteDbs,
CompleteDbs: jobDetails.ResolvedCompleteDbs,
Spans: spans,
IntroducedSpans: newSpans,
FormatVersion: BackupFormatDescriptorTrackingVersion,
Expand All @@ -1738,33 +1785,48 @@ func getBackupDetailAndManifest(
StatisticsFilenames: statsFiles,
DescriptorCoverage: coverage,
}
if err := checkCoverage(ctx, backupManifest.Spans, append(prevBackups, backupManifest)); err != nil {
return BackupManifest{}, errors.Wrap(err, "new backup would not cover expected time")
}
return backupManifest, nil
}

// Verify this backup on its prior chain cover its spans up to its end time,
// as restore would do if it tried to restore this backup.
if err := checkCoverage(ctx, spans, append(prevBackups, backupManifest)); err != nil {
return jobspb.BackupDetails{}, BackupManifest{}, errors.Wrap(err, "new backup would not cover expected time")
func updateBackupDetails(
ctx context.Context,
details jobspb.BackupDetails,
collectionURI string,
defaultURI string,
resolvedSubdir string,
urisByLocalityKV map[string]string,
prevBackups []BackupManifest,
encryptionOptions *jobspb.BackupEncryptionOptions,
kmsEnv *backupKMSEnv,
) (jobspb.BackupDetails, error) {
var err error
var startTime hlc.Timestamp
if len(prevBackups) > 0 {
startTime = prevBackups[len(prevBackups)-1].EndTime
}

// If we didn't load any prior backups from which get encryption info, we
// need to generate encryption specific data.
var encryptionInfo *jobspb.EncryptionInfo
if encryptionOptions == nil {
encryptionOptions, encryptionInfo, err = makeNewEncryptionOptions(ctx, *initialDetails.EncryptionOptions, kmsEnv)
encryptionOptions, encryptionInfo, err = makeNewEncryptionOptions(ctx, *details.EncryptionOptions, kmsEnv)
if err != nil {
return jobspb.BackupDetails{}, BackupManifest{}, err
return jobspb.BackupDetails{}, err
}
}

return jobspb.BackupDetails{
Destination: jobspb.BackupDetails_Destination{Subdir: resolvedSubdir},
StartTime: startTime,
EndTime: endTime,
URI: defaultURI,
URIsByLocalityKV: urisByLocalityKV,
EncryptionOptions: encryptionOptions,
EncryptionInfo: encryptionInfo,
CollectionURI: collectionURI,
}, backupManifest, nil
details.Destination = jobspb.BackupDetails_Destination{Subdir: resolvedSubdir}
details.StartTime = startTime
details.URI = defaultURI
details.URIsByLocalityKV = urisByLocalityKV
details.EncryptionOptions = encryptionOptions
details.EncryptionInfo = encryptionInfo
details.CollectionURI = collectionURI

return details, nil
}

func init() {
Expand Down

0 comments on commit 14551a9

Please sign in to comment.