Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
76670: backup: move resolution from planning to exec r=dt a=dt

Release note (ops change): the cluster setting bulkio.backup.resolve_destination_in_job.enabled can be used to delay resolution of backup's destination until the job starts running.

76721: spanconfig: introduce new read-only system target r=arulajmani a=arulajmani

This patch is motivated by the host tenant's desire to fetch all system
span configurations it has installed over secondary tenants without
explicitly constructing targets for all tenant IDs in the system. This
is handy for the host tenant's reconciliation job, which populates an
in-memory view of all system span configurations to diff against the
state implied by the schema. This in-turn allows the host tenant to issue
targeted updates/deletes.

We introduce a new system target type to achieve this which allows a
tenant to request all system span configurations it has installed that
target tenants. This is only ever consequential in the context of the
host as only the host tenant can install system span configurations on
other tenants. We also make this target read-only, in that, we disallow
persisting its implied encoding in `system.span_configurations`. We add
validation to the KVAccessor to this effect.

While here, we also disallow creating span targets that overlap with
the reserved system span configuration keyspace.

Release note: None

----
Alternative approach to #76704

76757: ui: change invalid lease to expired lease on problem ranges page r=Santamaura a=Santamaura

When browsing the problem ranges page and invalid leases appears customers
regularly are concerned with this status as they interpret it as something
with the cluster is wrong which is often not the case. In order to imrpove this,
the invalid lease section has been changed to expired lease and there is a
description added below which explains that this status is often not a cause
for concern.

Release note (ui change): change invalid lease to expired lease on problem ranges page

Resolves: #38616

![Screen Shot 2022-02-17 at 3 06 19 PM](https://user-images.githubusercontent.com/17861665/154571094-4454f4e9-9cec-471c-8c41-48c6f1afd5a4.png)


Co-authored-by: David Taylor <[email protected]>
Co-authored-by: arulajmani <[email protected]>
Co-authored-by: Santamaura <[email protected]>
  • Loading branch information
4 people committed Feb 18, 2022
4 parents 89198dd + c017ccb + 02c3370 + e0de475 commit 9cb7e3e
Show file tree
Hide file tree
Showing 25 changed files with 912 additions and 250 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ admission.sql_sql_response.enabled boolean true when true, work performed by the
bulkio.backup.file_size byte size 128 MiB target size for individual data files produced during BACKUP
bulkio.backup.read_timeout duration 5m0s amount of time after which a read attempt is considered timed out, which causes the backup to fail
bulkio.backup.read_with_priority_after duration 1m0s amount of time since the read-as-of time above which a BACKUP should use priority when retrying reads
bulkio.backup.resolve_destination_in_job.enabled boolean false defer the interaction with the external storage used to resolve backup destination until the job starts
bulkio.stream_ingestion.minimum_flush_interval duration 5s the minimum timestamp between flushes; flushes may still occur if internal buffers fill up
changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds
cloudstorage.http.custom_ca string custom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
<tr><td><code>bulkio.backup.file_size</code></td><td>byte size</td><td><code>128 MiB</code></td><td>target size for individual data files produced during BACKUP</td></tr>
<tr><td><code>bulkio.backup.read_timeout</code></td><td>duration</td><td><code>5m0s</code></td><td>amount of time after which a read attempt is considered timed out, which causes the backup to fail</td></tr>
<tr><td><code>bulkio.backup.read_with_priority_after</code></td><td>duration</td><td><code>1m0s</code></td><td>amount of time since the read-as-of time above which a BACKUP should use priority when retrying reads</td></tr>
<tr><td><code>bulkio.backup.resolve_destination_in_job.enabled</code></td><td>boolean</td><td><code>false</code></td><td>defer the interaction with the external storage used to resolve backup destination until the job starts</td></tr>
<tr><td><code>bulkio.stream_ingestion.minimum_flush_interval</code></td><td>duration</td><td><code>5s</code></td><td>the minimum timestamp between flushes; flushes may still occur if internal buffers fill up</td></tr>
<tr><td><code>changefeed.node_throttle_config</code></td><td>string</td><td><code></code></td><td>specifies node level throttling configuration for all changefeeeds</td></tr>
<tr><td><code>cloudstorage.http.custom_ca</code></td><td>string</td><td><code></code></td><td>custom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage</td></tr>
Expand Down
87 changes: 83 additions & 4 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
Expand Down Expand Up @@ -376,6 +377,80 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
details := b.job.Details().(jobspb.BackupDetails)
p := execCtx.(sql.JobExecContext)

var backupManifest *BackupManifest

// If planning didn't resolve the external destination, then we need to now.
if details.URI == "" {
initialDetails := details
backupDetails, m, err := getBackupDetailAndManifest(
ctx, p.ExecCfg(), p.ExtendedEvalContext().Txn, details, p.User(),
)
if err != nil {
return err
}
details = backupDetails
backupManifest = &m

if len(backupManifest.Spans) > 0 && p.ExecCfg().Codec.ForSystemTenant() {
protectedtsID := uuid.MakeV4()
details.ProtectedTimestampRecord = &protectedtsID

if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return protectTimestampForBackup(
ctx, p.ExecCfg(), txn, b.job.ID(), m, details,
)
}); err != nil {
return err
}
}

if err := writeBackupManifestCheckpoint(
ctx, b.job.ID(), backupDetails, backupManifest, p.ExecCfg(), p.User(),
); err != nil {
return err
}

if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return planSchedulePTSChaining(ctx, p.ExecCfg(), txn, &details, b.job.CreatedBy())
}); err != nil {
return err
}

// The description picked during original planning might still say "LATEST",
// if resolving that to the actual directory only just happened above here.
// Ideally we'd re-render the description now that we know the subdir, but
// we don't have backup AST node anymore to easily call the rendering func.
// Instead we can just do a bit of dirty string replacement iff there is one
// "INTO 'LATEST' IN" (if there's >1, somenoe has a weird table/db names and
// we should just leave the description as-is, since it is just for humans).
description := b.job.Payload().Description
const unresolvedText = "INTO 'LATEST' IN"
if initialDetails.Destination.Subdir == "LATEST" && strings.Count(description, unresolvedText) == 1 {
description = strings.ReplaceAll(description, unresolvedText, fmt.Sprintf("INTO '%s' IN", details.Destination.Subdir))
}

// Update the job payload (non-volatile job definition) once, with the now
// resolved destination, updated description, etc. If we resume again we'll
// skip this whole block so this isn't an excessive update of payload.
if err := b.job.Update(ctx, nil, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
if err := md.CheckRunningOrReverting(); err != nil {
return err
}
md.Payload.Details = jobspb.WrapPayloadDetails(details)
md.Payload.Description = description
ju.UpdatePayload(md.Payload)
return nil
}); err != nil {
return err
}

// Collect telemetry, once per backup after resolving its destination.
lic := utilccl.CheckEnterpriseEnabled(
p.ExecCfg().Settings, p.ExecCfg().ClusterID(), p.ExecCfg().Organization(), "",
) != nil
collectTelemetry(m, details, details, lic)
}

// For all backups, partitioned or not, the main BACKUP manifest is stored at
// details.URI.
defaultConf, err := cloud.ExternalStorageConfFromURI(details.URI, p.User())
Expand Down Expand Up @@ -424,16 +499,20 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
mem := p.ExecCfg().RootMemoryMonitor.MakeBoundAccount()
defer mem.Close(ctx)

backupManifest, memSize, err := b.readManifestOnResume(ctx, &mem, p.ExecCfg(), defaultStore, details)
if err != nil {
return err
}
var memSize int64
defer func() {
if memSize != 0 {
mem.Shrink(ctx, memSize)
}
}()

if backupManifest == nil || util.ConstantWithMetamorphicTestBool("backup-read-manifest", false) {
backupManifest, memSize, err = b.readManifestOnResume(ctx, &mem, p.ExecCfg(), defaultStore, details)
if err != nil {
return err
}
}

statsCache := p.ExecCfg().TableStatsCache
// We retry on pretty generic failures -- any rpc error. If a worker node were
// to restart, it would produce this kind of error, but there may be other
Expand Down
159 changes: 123 additions & 36 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/interval"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -88,6 +89,13 @@ var featureBackupEnabled = settings.RegisterBoolSetting(
featureflag.FeatureFlagEnabledDefault,
).WithPublic()

var resolveDuringExec = settings.RegisterBoolSetting(
settings.TenantWritable,
"bulkio.backup.resolve_destination_in_job.enabled",
"defer the interaction with the external storage used to resolve backup destination until the job starts",
util.ConstantWithMetamorphicTestBool("backup-resolve-exec", false),
).WithPublic()

func (p *backupKMSEnv) ClusterSettings() *cluster.Settings {
return p.settings
}
Expand Down Expand Up @@ -785,6 +793,75 @@ func backupPlanHook(
initialDetails.SpecificTenantIds = []roachpb.TenantID{backupStmt.Targets.Tenant}
}

jobID := p.ExecCfg().JobRegistry.MakeJobID()

// TODO(dt): replace setting with cluster version gate.
if resolveDuringExec.Get(p.ExecCfg().SV()) {
description, err := backupJobDescription(p,
backupStmt.Backup, to, incrementalFrom,
encryptionParams.RawKmsUris,
initialDetails.Destination.Subdir,
initialDetails.Destination.IncrementalStorage,
)
if err != nil {
return err
}
jr := jobs.Record{
Description: description,
Details: initialDetails,
Progress: jobspb.BackupProgress{},
CreatedBy: backupStmt.CreatedByInfo,
Username: p.User(),
DescriptorIDs: func() (sqlDescIDs []descpb.ID) {
for i := range targetDescs {
sqlDescIDs = append(sqlDescIDs, targetDescs[i].GetID())
}
return sqlDescIDs
}(),
}
plannerTxn := p.ExtendedEvalContext().Txn

if backupStmt.Options.Detached {
// When running inside an explicit transaction, we simply create the job
// record. We do not wait for the job to finish.
_, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(
ctx, jr, jobID, plannerTxn)
if err != nil {
return err
}
resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(jobID))}
return nil
}
var sj *jobs.StartableJob
if err := func() (err error) {
defer func() {
if err == nil || sj == nil {
return
}
if cleanupErr := sj.CleanupOnRollback(ctx); cleanupErr != nil {
log.Errorf(ctx, "failed to cleanup job: %v", cleanupErr)
}
}()
if err := p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, &sj, jobID, plannerTxn, jr); err != nil {
return err
}
// We commit the transaction here so that the job can be started. This
// is safe because we're in an implicit transaction. If we were in an
// explicit transaction the job would have to be run with the detached
// option and would have been handled above.
return plannerTxn.Commit(ctx)
}(); err != nil {
return err
}
if err := sj.Start(ctx); err != nil {
return err
}
if err := sj.AwaitCompletion(ctx); err != nil {
return err
}
return sj.ReportExecutionResults(ctx, resultsCh)
}

// TODO(dt): move this to job execution phase.
backupDetails, backupManifest, err := getBackupDetailAndManifest(
ctx, p.ExecCfg(), p.ExtendedEvalContext().Txn, initialDetails, p.User(),
Expand All @@ -798,6 +875,10 @@ func backupPlanHook(
return err
}

// We create the job record in the planner's transaction to ensure that
// the job record creation happens transactionally.
plannerTxn := p.ExtendedEvalContext().Txn

// Write backup manifest into a temporary checkpoint file.
// This accomplishes 2 purposes:
// 1. Persists large state needed for backup job completion.
Expand All @@ -807,23 +888,7 @@ func backupPlanHook(
//
// TODO (pbardea): For partitioned backups, also add verification for other
// stores we are writing to in addition to the default.
doWriteBackupManifestCheckpoint := func(ctx context.Context, jobID jobspb.JobID) error {
defaultStore, err := p.ExecCfg().DistSQLSrv.ExternalStorageFromURI(ctx, backupDetails.URI, p.User())
if err != nil {
return err
}
defer defaultStore.Close()

if err := writeBackupManifest(
ctx, p.ExecCfg().Settings, defaultStore, tempCheckpointFileNameForJob(jobID),
backupDetails.EncryptionOptions, &backupManifest,
); err != nil {
return errors.Wrapf(err, "writing checkpoint file")
}
return nil
}

if err := planSchedulePTSChaining(ctx, p, &backupDetails, backupStmt); err != nil {
if err := planSchedulePTSChaining(ctx, p.ExecCfg(), plannerTxn, &backupDetails, backupStmt.CreatedByInfo); err != nil {
return err
}

Expand Down Expand Up @@ -852,14 +917,8 @@ func backupPlanHook(
p.ExecCfg().Settings, p.ExecCfg().ClusterID(), p.ExecCfg().Organization(), "",
) != nil

// We create the job record in the planner's transaction to ensure that
// the job record creation happens transactionally.
plannerTxn := p.ExtendedEvalContext().Txn

jobID := p.ExecCfg().JobRegistry.MakeJobID()

if err := protectTimestampForBackup(
ctx, p, plannerTxn, jobID, backupManifest, backupDetails,
ctx, p.ExecCfg(), plannerTxn, jobID, backupManifest, backupDetails,
); err != nil {
return err
}
Expand All @@ -873,7 +932,9 @@ func backupPlanHook(
return err
}

if err := doWriteBackupManifestCheckpoint(ctx, jobID); err != nil {
if err := writeBackupManifestCheckpoint(
ctx, jobID, backupDetails, &backupManifest, p.ExecCfg(), p.User(),
); err != nil {
return err
}

Expand All @@ -897,7 +958,9 @@ func backupPlanHook(
if err := p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, &sj, jobID, plannerTxn, jr); err != nil {
return err
}
if err := doWriteBackupManifestCheckpoint(ctx, jobID); err != nil {
if err := writeBackupManifestCheckpoint(
ctx, jobID, backupDetails, &backupManifest, p.ExecCfg(), p.User(),
); err != nil {
return err
}

Expand Down Expand Up @@ -1007,30 +1070,54 @@ func getScheduledBackupExecutionArgsFromSchedule(
return sj, args, nil
}

func writeBackupManifestCheckpoint(
ctx context.Context,
jobID jobspb.JobID,
backupDetails jobspb.BackupDetails,
backupManifest *BackupManifest,
execCfg *sql.ExecutorConfig,
user security.SQLUsername,
) error {
defaultStore, err := execCfg.DistSQLSrv.ExternalStorageFromURI(ctx, backupDetails.URI, user)
if err != nil {
return err
}
defer defaultStore.Close()

if err := writeBackupManifest(
ctx, execCfg.Settings, defaultStore, tempCheckpointFileNameForJob(jobID),
backupDetails.EncryptionOptions, backupManifest,
); err != nil {
return errors.Wrapf(err, "writing checkpoint file")
}
return nil
}

// planSchedulePTSChaining populates backupDetails with information relevant to
// the chaining of protected timestamp records between scheduled backups.
// Depending on whether backupStmt is a full or incremental backup, we populate
// relevant fields that are used to perform this chaining, on successful
// completion of the backup job.
func planSchedulePTSChaining(
ctx context.Context,
p sql.PlanHookState,
execCfg *sql.ExecutorConfig,
txn *kv.Txn,
backupDetails *jobspb.BackupDetails,
backupStmt *annotatedBackupStatement,
createdBy *jobs.CreatedByInfo,
) error {
env := scheduledjobs.ProdJobSchedulerEnv
if knobs, ok := p.ExecCfg().DistSQLSrv.TestingKnobs.JobsTestingKnobs.(*jobs.TestingKnobs); ok {
if knobs, ok := execCfg.DistSQLSrv.TestingKnobs.JobsTestingKnobs.(*jobs.TestingKnobs); ok {
if knobs.JobSchedulerEnv != nil {
env = knobs.JobSchedulerEnv
}
}
// If this is not a scheduled backup, we do not chain pts records.
if backupStmt.CreatedByInfo == nil || backupStmt.CreatedByInfo.Name != jobs.CreatedByScheduledJobs {
if createdBy == nil || createdBy.Name != jobs.CreatedByScheduledJobs {
return nil
}

_, args, err := getScheduledBackupExecutionArgsFromSchedule(ctx, env,
p.ExtendedEvalContext().Txn, p.ExecCfg().InternalExecutor, backupStmt.CreatedByInfo.ID)
_, args, err := getScheduledBackupExecutionArgsFromSchedule(
ctx, env, txn, execCfg.InternalExecutor, createdBy.ID)
if err != nil {
return err
}
Expand All @@ -1048,8 +1135,8 @@ func planSchedulePTSChaining(
return nil
}

_, incArgs, err := getScheduledBackupExecutionArgsFromSchedule(ctx, env,
p.ExtendedEvalContext().Txn, p.ExecCfg().InternalExecutor, args.DependentScheduleID)
_, incArgs, err := getScheduledBackupExecutionArgsFromSchedule(
ctx, env, txn, execCfg.InternalExecutor, args.DependentScheduleID)
if err != nil {
// If we are unable to resolve the dependent incremental schedule (it
// could have been dropped) we do not need to perform any chaining.
Expand Down Expand Up @@ -1240,7 +1327,7 @@ func getProtectedTimestampTargetForBackup(backupManifest BackupManifest) *ptpb.T

func protectTimestampForBackup(
ctx context.Context,
p sql.PlanHookState,
execCfg *sql.ExecutorConfig,
txn *kv.Txn,
jobID jobspb.JobID,
backupManifest BackupManifest,
Expand All @@ -1260,7 +1347,7 @@ func protectTimestampForBackup(
target := getProtectedTimestampTargetForBackup(backupManifest)
rec := jobsprotectedts.MakeRecord(*backupDetails.ProtectedTimestampRecord, int64(jobID),
tsToProtect, backupManifest.Spans, jobsprotectedts.Jobs, target)
err := p.ExecCfg().ProtectedTimestampProvider.Protect(ctx, txn, rec)
err := execCfg.ProtectedTimestampProvider.Protect(ctx, txn, rec)
if err != nil {
return err
}
Expand Down
1 change: 0 additions & 1 deletion pkg/ccl/spanconfigccl/spanconfigreconcilerccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ go_test(
"//pkg/ccl/partitionccl",
"//pkg/ccl/utilccl",
"//pkg/jobs",
"//pkg/keys",
"//pkg/roachpb",
"//pkg/security",
"//pkg/security/securitytest",
Expand Down
Loading

0 comments on commit 9cb7e3e

Please sign in to comment.