Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
52836: bulkio: Make incremental scheduled backup wait for full backup. r=miretskiy a=miretskiy

Fixes #52835

Add ability to record schedule groups: set of related schedules.
Use this functionality to makean incremental schedule wait
until the full one completes before it begins its execution.

Release Notes: None

53016: jobs: unskip TestRegistryLifecycle/rollback r=spaskob a=spaskob

The test flakiness was introduced by #52697 and fixed by #52710.

Fixes #52767.

Release note: none.

53018: colexec: create new message to send metadata in unordered synchronizer r=yuzefovich a=asubiotto

This commit fixes a race condition where a metadata message would be
double-freed and therefore the same object returned to two different goroutines
from a sync.Pool.

The root cause of this issue was that input goroutines in the parallel
unordered synchronizer use a single message that is sent repeatedly over a
channel instead of multiple messages to avoid allocations. A scenario could
occur where an input would drain metadata and set its message's metadata field
while its message was still unread in the channel. The message would then be
sent on the channel again, and the synchronizer's DrainMeta method would read
the first message with the metadata field set, followed by the same message a
second time. This results in returning the same metadata message twice to the
distsql receiver, which would release the same metadata twice.

The solution is to instead allocate a new message when draining, which will
leave message already present in the channel untouched.

Release note: None (no release with bug)

Fixes #52890 
Fixes #52948 

Co-authored-by: Yevgeniy Miretskiy <[email protected]>
Co-authored-by: Spas Bojanov <[email protected]>
Co-authored-by: Alfonso Subiotto Marques <[email protected]>
  • Loading branch information
4 people committed Aug 19, 2020
4 parents 935ae2e + f04cc3c + 71288b9 + 43710e5 commit 51b0af4
Show file tree
Hide file tree
Showing 14 changed files with 444 additions and 261 deletions.
276 changes: 153 additions & 123 deletions pkg/ccl/backupccl/backup.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkg/ccl/backupccl/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ message ScheduledBackupExecutionArgs {
}
BackupType backup_type = 1;
string backup_statement = 2;
int64 unpause_on_success = 3;
}

// RestoreProgress is the information that the RestoreData processor sends back
Expand Down
50 changes: 36 additions & 14 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/scheduledjobs"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand All @@ -32,7 +34,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/cockroach/pkg/storage/cloudimpl"
Expand Down Expand Up @@ -577,24 +578,45 @@ func (b *backupResumer) Resume(
}
}

b.maybeNotifyScheduledJobCompletion(ctx, jobs.StatusSucceeded, p.ExecCfg().InternalExecutor)

b.maybeNotifyScheduledJobCompletion(ctx, jobs.StatusSucceeded, p.ExecCfg())
return nil
}

func (b *backupResumer) maybeNotifyScheduledJobCompletion(
ctx context.Context, jobStatus jobs.Status, ex sqlutil.InternalExecutor,
ctx context.Context, jobStatus jobs.Status, exec *sql.ExecutorConfig,
) {
if b.job.CreatedBy() == nil || b.job.CreatedBy().Name != jobs.CreatedByScheduledJobs {
return
}
info := b.job.CreatedBy()
env := scheduledjobs.ProdJobSchedulerEnv

if err := exec.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// Do not rely on b.job containing created_by_id. Query it directly.
datums, err := exec.InternalExecutor.QueryRowEx(
ctx,
"lookup-schedule-info",
txn,
sqlbase.InternalExecutorSessionDataOverride{User: security.NodeUser},
fmt.Sprintf(
"SELECT created_by_id FROM %s WHERE id=$1 AND created_by_type=$2",
env.SystemJobsTableName()),
*b.job.ID(), jobs.CreatedByScheduledJobs)

if err != nil {
return errors.Wrap(err, "schedule info lookup")
}
if datums == nil {
// Not a scheduled backup.
return nil
}

if err := jobs.NotifyJobTermination(
ctx, nil /* env */, *b.job.ID(), jobStatus, info.ID, ex, nil); err != nil {
log.Warningf(ctx,
"failed to notify schedule %d of completion of job %d; err=%s",
info.ID, *b.job.ID(), err)
scheduleID := int64(tree.MustBeDInt(datums[0]))
if err := jobs.NotifyJobTermination(
ctx, env, *b.job.ID(), jobStatus, scheduleID, exec.InternalExecutor, txn); err != nil {
log.Warningf(ctx,
"failed to notify schedule %d of completion of job %d; err=%s",
scheduleID, *b.job.ID(), err)
}
return nil
}); err != nil {
log.Errorf(ctx, "maybeNotifySchedule error: %v", err)
}
}

Expand All @@ -621,7 +643,7 @@ func (b *backupResumer) OnFailOrCancel(ctx context.Context, phs interface{}) err
defer b.maybeNotifyScheduledJobCompletion(
ctx,
jobs.StatusFailed,
phs.(sql.PlanHookState).ExecCfg().InternalExecutor,
phs.(sql.PlanHookState).ExecCfg(),
)

telemetry.Count("backup.total.failed")
Expand Down
122 changes: 62 additions & 60 deletions pkg/ccl/backupccl/create_scheduled_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage/cloudimpl"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -152,19 +151,6 @@ func computeScheduleRecurrence(
return &scheduleRecurrence{cron, frequency}, nil
}

var humanDurations = map[time.Duration]string{
time.Hour: "hour",
24 * time.Hour: "day",
7 * 24 * time.Hour: "week",
}

func (r *scheduleRecurrence) Humanize() string {
if d, ok := humanDurations[r.frequency]; ok {
return "every " + d
}
return "every " + r.frequency.String()
}

var forceFullBackup *scheduleRecurrence

func pickFullRecurrenceFromIncremental(inc *scheduleRecurrence) *scheduleRecurrence {
Expand Down Expand Up @@ -300,96 +286,110 @@ func doCreateBackupSchedules(

ex := p.ExecCfg().InternalExecutor
return p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// Create FULL backup schedule.
fullFirstRun := firstRun
if eval.isEnterpriseUser && fullFirstRun == nil && fullRecurrencePicked {
// The enterprise user did not indicate preference when to run full backups,
// and we picked the schedule ourselves.
// Run full backup immediately so that we do not wind up waiting for a long
// time before the first full backup runs. Without full backup, we can't
// execute incrementals.
now := env.Now()
fullFirstRun = &now
}

if err := createBackupSchedule(
ctx, env, p.User(), fullScheduleName, fullRecurrence,
fullFirstRun, details, backupNode, resultsCh, ex, txn,
); err != nil {
return err
}
unpauseOnSuccessID := jobs.InvalidScheduleID

// If needed, create incremental.
if incRecurrence != nil {
backupNode.AppendToLatest = true
inc, err := makeBackupSchedule(
env, p.User(), fullScheduleName+": INCREMENTAL",
incRecurrence, details, unpauseOnSuccessID, backupNode)

if err := createBackupSchedule(
ctx, env, p.User(), fullScheduleName+": INCREMENTAL", incRecurrence,
firstRun, details, backupNode, resultsCh, ex, txn,
); err != nil {
if err != nil {
return err
}
// Incremental is paused until FULL completes.
inc.Pause("Waiting for initial backup to complete")

if err := inc.Create(ctx, ex, txn); err != nil {
return err
}
if err := emitSchedule(inc, tree.AsString(backupNode), resultsCh); err != nil {
return err
}
unpauseOnSuccessID = inc.ScheduleID()
}

return nil
})
// Create FULL backup schedule.
backupNode.AppendToLatest = false
fullBackupStmt := tree.AsString(backupNode)
full, err := makeBackupSchedule(
env, p.User(), fullScheduleName,
fullRecurrence, details, unpauseOnSuccessID, backupNode)
if err != nil {
return err
}

if firstRun != nil {
full.SetNextRun(*firstRun)
} else if eval.isEnterpriseUser && fullRecurrencePicked {
// The enterprise user did not indicate preference when to run full backups,
// and we picked the schedule ourselves.
// Run full backup immediately so that we do not wind up waiting for a long
// time before the first full backup runs. Without full backup, we can't
// execute incremental.
full.SetNextRun(env.Now())
}

// Create the schedule (we need its ID to create incremental below).
if err := full.Create(ctx, ex, txn); err != nil {
return err
}
return emitSchedule(full, fullBackupStmt, resultsCh)
})
}

func createBackupSchedule(
ctx context.Context,
func makeBackupSchedule(
env scheduledjobs.JobSchedulerEnv,
owner string,
name string,
recurrence *scheduleRecurrence,
firstRun *time.Time,
details jobspb.ScheduleDetails,
unpauseOnSuccess int64,
backupNode *tree.Backup,
resultsCh chan<- tree.Datums,
ex sqlutil.InternalExecutor,
txn *kv.Txn,
) error {
) (*jobs.ScheduledJob, error) {
sj := jobs.NewScheduledJob(env)
sj.SetScheduleName(name)
sj.SetOwner(owner)

// Prepare arguments for scheduled backup execution.
args := &ScheduledBackupExecutionArgs{}
args := &ScheduledBackupExecutionArgs{UnpauseOnSuccess: unpauseOnSuccess}
if backupNode.AppendToLatest {
args.BackupType = ScheduledBackupExecutionArgs_INCREMENTAL
} else {
args.BackupType = ScheduledBackupExecutionArgs_FULL
}

if err := sj.SetSchedule(recurrence.cron); err != nil {
return err
return nil, err
}

sj.SetScheduleDetails(details)
if firstRun != nil {
sj.SetNextRun(*firstRun)
}

// We do not set backupNode.AsOf: this is done when the scheduler kicks off the backup.
// Serialize backup statement and set schedule executor and its args.
args.BackupStatement = tree.AsString(backupNode)
any, err := pbtypes.MarshalAny(args)
if err != nil {
return err
return nil, err
}
sj.SetExecutionDetails(
tree.ScheduledBackupExecutor.InternalName(),
jobspb.ExecutionArguments{Args: any},
)

// Create the schedule.
if err := sj.Create(ctx, ex, txn); err != nil {
return err
}
return sj, nil
}

func emitSchedule(sj *jobs.ScheduledJob, backupStmt string, resultsCh chan<- tree.Datums) error {
var nextRun tree.Datum
status := "ACTIVE"
if sj.IsPaused() {
nextRun = tree.DNull
status = "PAUSED"
if reason := sj.LastChangeReason(); reason != "" {
status += ": " + reason
}
} else {
next, err := tree.MakeDTimestampTZ(sj.NextRun(), time.Microsecond)
if err != nil {
Expand All @@ -400,10 +400,11 @@ func createBackupSchedule(

resultsCh <- tree.Datums{
tree.NewDInt(tree.DInt(sj.ScheduleID())),
tree.NewDString(name),
tree.NewDString(sj.ScheduleName()),
tree.NewDString(status),
nextRun,
tree.NewDString(recurrence.Humanize()),
tree.NewDString(tree.AsString(backupNode)),
tree.NewDString(sj.ScheduleExpr()),
tree.NewDString(backupStmt),
}
return nil
}
Expand Down Expand Up @@ -506,8 +507,9 @@ func makeScheduledBackupEval(
var scheduledBackupHeader = sqlbase.ResultColumns{
{Name: "schedule_id", Typ: types.Int},
{Name: "name", Typ: types.String},
{Name: "next_run", Typ: types.TimestampTZ},
{Name: "frequency", Typ: types.String},
{Name: "status", Typ: types.String},
{Name: "first_run", Typ: types.TimestampTZ},
{Name: "schedule", Typ: types.String},
{Name: "backup_stmt", Typ: types.String},
}

Expand Down
Loading

0 comments on commit 51b0af4

Please sign in to comment.