diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index f9217c9c08e7..f756dd9cbd6a 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/covering" @@ -163,14 +164,13 @@ type spanAndTime struct { func backup( ctx context.Context, db *kv.DB, - gossip *gossip.Gossip, + numClusterNodes int, settings *cluster.Settings, defaultStore cloud.ExternalStorage, storageByLocalityKV map[string]*roachpb.ExternalStorage, job *jobs.Job, backupManifest *BackupManifest, checkpointDesc *BackupManifest, - resultsCh chan<- tree.Datums, makeExternalStorage cloud.ExternalStorageFactory, encryption *roachpb.FileEncryptionOptions, ) (RowCount, error) { @@ -265,7 +265,7 @@ func backup( // TODO(dan): Make this limiting per node. // // TODO(dan): See if there's some better solution than rate-limiting #14798. - maxConcurrentExports := clusterNodeCount(gossip) * int(kvserver.ExportRequestsLimit.Get(&settings.SV)) * 10 + maxConcurrentExports := numClusterNodes * int(kvserver.ExportRequestsLimit.Get(&settings.SV)) * 10 exportsSem := make(chan struct{}, maxConcurrentExports) g := ctxgroup.WithContext(ctx) @@ -441,10 +441,7 @@ func (b *backupResumer) releaseProtectedTimestamp( } type backupResumer struct { - job *jobs.Job - settings *cluster.Settings - res RowCount - makeExternalStorage cloud.ExternalStorageFactory + job *jobs.Job testingKnobs struct { ignoreProtectedTimestamps bool @@ -457,7 +454,6 @@ func (b *backupResumer) Resume( ) error { details := b.job.Details().(jobspb.BackupDetails) p := phs.(sql.PlanHookState) - b.makeExternalStorage = p.ExecCfg().DistSQLSrv.ExternalStorage ptsID := details.ProtectedTimestampRecord if ptsID != nil && !b.testingKnobs.ignoreProtectedTimestamps { @@ -487,7 +483,7 @@ func (b *backupResumer) Resume( if err != nil { return errors.Wrapf(err, "export configuration") } - defaultStore, err := b.makeExternalStorage(ctx, defaultConf) + defaultStore, err := p.ExecCfg().DistSQLSrv.ExternalStorage(ctx, defaultConf) if err != nil { return errors.Wrapf(err, "make storage") } @@ -519,38 +515,39 @@ func (b *backupResumer) Resume( // implementations. log.Warningf(ctx, "unable to load backup checkpoint while resuming job %d: %v", *b.job.ID(), err) } + + numClusterNodes := clusterNodeCount(p.ExecCfg().Gossip) + res, err := backup( ctx, p.ExecCfg().DB, - p.ExecCfg().Gossip, + numClusterNodes, p.ExecCfg().Settings, defaultStore, storageByLocalityKV, b.job, &backupManifest, checkpointDesc, - resultsCh, - b.makeExternalStorage, + p.ExecCfg().DistSQLSrv.ExternalStorage, details.Encryption, ) if err != nil { return err } - b.res = res err = b.clearStats(ctx, p.ExecCfg().DB) if err != nil { log.Warningf(ctx, "unable to clear stats from job payload: %+v", err) } - b.deleteCheckpoint(ctx) + b.deleteCheckpoint(ctx, p.ExecCfg()) resultsCh <- tree.Datums{ tree.NewDInt(tree.DInt(*b.job.ID())), tree.NewDString(string(jobs.StatusSucceeded)), tree.NewDFloat(tree.DFloat(1.0)), - tree.NewDInt(tree.DInt(b.res.Rows)), - tree.NewDInt(tree.DInt(b.res.IndexEntries)), - tree.NewDInt(tree.DInt(b.res.DataSize)), + tree.NewDInt(tree.DInt(res.Rows)), + tree.NewDInt(tree.DInt(res.IndexEntries)), + tree.NewDInt(tree.DInt(res.DataSize)), } if ptsID != nil && !b.testingKnobs.ignoreProtectedTimestamps { @@ -560,6 +557,30 @@ func (b *backupResumer) Resume( log.Errorf(ctx, "failed to release protected timestamp: %v", err) } } + + // Collect telemetry. + { + telemetry.Count("backup.total.succeeded") + const mb = 1 << 20 + sizeMb := res.DataSize / mb + sec := int64(timeutil.Since(timeutil.FromUnixMicros(b.job.Payload().StartedMicros)).Seconds()) + var mbps int64 + if sec > 0 { + mbps = mb / sec + } + if details.StartTime.IsEmpty() { + telemetry.CountBucketed("backup.duration-sec.full-succeeded", sec) + telemetry.CountBucketed("backup.size-mb.full", sizeMb) + telemetry.CountBucketed("backup.speed-mbps.full.total", mbps) + telemetry.CountBucketed("backup.speed-mbps.full.per-node", mbps/int64(numClusterNodes)) + } else { + telemetry.CountBucketed("backup.duration-sec.inc-succeeded", sec) + telemetry.CountBucketed("backup.size-mb.inc", sizeMb) + telemetry.CountBucketed("backup.speed-mbps.inc.total", mbps) + telemetry.CountBucketed("backup.speed-mbps.inc.per-node", mbps/int64(numClusterNodes)) + } + } + return nil } @@ -583,14 +604,18 @@ func (b *backupResumer) clearStats(ctx context.Context, DB *kv.DB) error { // OnFailOrCancel is part of the jobs.Resumer interface. func (b *backupResumer) OnFailOrCancel(ctx context.Context, phs interface{}) error { + telemetry.Count("backup.total.failed") + telemetry.CountBucketed("backup.duration-sec.failed", + int64(timeutil.Since(timeutil.FromUnixMicros(b.job.Payload().StartedMicros)).Seconds())) + cfg := phs.(sql.PlanHookState).ExecCfg() - b.deleteCheckpoint(ctx) + b.deleteCheckpoint(ctx, cfg) return cfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { return b.releaseProtectedTimestamp(ctx, txn, cfg.ProtectedTimestampProvider) }) } -func (b *backupResumer) deleteCheckpoint(ctx context.Context) { +func (b *backupResumer) deleteCheckpoint(ctx context.Context, cfg *sql.ExecutorConfig) { // Attempt to delete BACKUP-CHECKPOINT. if err := func() error { details := b.job.Details().(jobspb.BackupDetails) @@ -600,7 +625,7 @@ func (b *backupResumer) deleteCheckpoint(ctx context.Context) { if err != nil { return err } - exportStore, err := b.makeExternalStorage(ctx, conf) + exportStore, err := cfg.DistSQLSrv.ExternalStorage(ctx, conf) if err != nil { return err } @@ -615,10 +640,9 @@ var _ jobs.Resumer = &backupResumer{} func init() { jobs.RegisterConstructor( jobspb.TypeBackup, - func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { + func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer { return &backupResumer{ - job: job, - settings: settings, + job: job, } }, ) diff --git a/pkg/ccl/backupccl/backup_planning.go b/pkg/ccl/backupccl/backup_planning.go index 57bc0835ddc4..077991e2f001 100644 --- a/pkg/ccl/backupccl/backup_planning.go +++ b/pkg/ccl/backupccl/backup_planning.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/covering" @@ -36,6 +37,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/interval" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" @@ -664,6 +666,32 @@ func backupPlanHook( } } + // Collect telemetry. + { + telemetry.Count("backup.total.started") + if startTime.IsEmpty() { + telemetry.Count("backup.span.full") + } else { + telemetry.Count("backup.span.incremental") + telemetry.CountBucketed("backup.incremental-span-sec", int64(timeutil.Since(startTime.GoTime()).Seconds())) + if len(incrementalFrom) == 0 { + telemetry.Count("backup.auto-incremental") + } + } + if len(backupStmt.To) > 1 { + telemetry.Count("backup.partitioned") + } + if mvccFilter == MVCCFilter_All { + telemetry.Count("backup.revision-history") + } + if encryption != nil { + telemetry.Count("backup.encrypted") + } + if backupStmt.DescriptorCoverage == tree.AllDescriptors { + telemetry.Count("backup.targets.full_cluster") + } + } + errCh, err := sj.Start(ctx) if err != nil { return err diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 39ed3cb7f2ac..862c7e14af46 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -18,12 +18,12 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" - "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/covering" @@ -39,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" "github.com/opentracing/opentracing-go" @@ -558,7 +559,7 @@ func rewriteBackupSpanKey(kr *storageccl.KeyRewriter, key roachpb.Key) (roachpb. func restore( restoreCtx context.Context, db *kv.DB, - gossip *gossip.Gossip, + numClusterNodes int, settings *cluster.Settings, backupManifests []BackupManifest, backupLocalityInfo []jobspb.RestoreDetails_BackupLocalityInfo, @@ -645,7 +646,6 @@ func restore( // that's wrong. // // TODO(dan): Make this limiting per node. - numClusterNodes := clusterNodeCount(gossip) maxConcurrentImports := numClusterNodes * runtime.NumCPU() importsSem := make(chan struct{}, maxConcurrentImports) @@ -789,7 +789,6 @@ func loadBackupSQLDescs( type restoreResumer struct { job *jobs.Job settings *cluster.Settings - res RowCount databases []*sqlbase.DatabaseDescriptor tables []*sqlbase.TableDescriptor descriptorCoverage tree.DescriptorCoverage @@ -977,10 +976,11 @@ func (r *restoreResumer) Resume( return nil } + numClusterNodes := clusterNodeCount(p.ExecCfg().Gossip) res, err := restore( ctx, p.ExecCfg().DB, - p.ExecCfg().Gossip, + numClusterNodes, p.ExecCfg().Settings, backupManifests, details.BackupLocalityInfo, @@ -991,7 +991,6 @@ func (r *restoreResumer) Resume( r.job, details.Encryption, ) - r.res = res if err != nil { return err } @@ -1014,11 +1013,31 @@ func (r *restoreResumer) Resume( tree.NewDInt(tree.DInt(*r.job.ID())), tree.NewDString(string(jobs.StatusSucceeded)), tree.NewDFloat(tree.DFloat(1.0)), - tree.NewDInt(tree.DInt(r.res.Rows)), - tree.NewDInt(tree.DInt(r.res.IndexEntries)), - tree.NewDInt(tree.DInt(r.res.DataSize)), + tree.NewDInt(tree.DInt(res.Rows)), + tree.NewDInt(tree.DInt(res.IndexEntries)), + tree.NewDInt(tree.DInt(res.DataSize)), } + // Collect telemetry. + { + telemetry.Count("restore.total.succeeded") + const mb = 1 << 20 + sizeMb := res.DataSize / mb + sec := int64(timeutil.Since(timeutil.FromUnixMicros(r.job.Payload().StartedMicros)).Seconds()) + var mbps int64 + if sec > 0 { + mbps = mb / sec + } + telemetry.CountBucketed("restore.duration-sec.succeeded", sec) + telemetry.CountBucketed("restore.size-mb.full", sizeMb) + telemetry.CountBucketed("restore.speed-mbps.total", mbps) + telemetry.CountBucketed("restore.speed-mbps.per-node", mbps/int64(numClusterNodes)) + // Tiny restores may skew throughput numbers due to overhead. + if sizeMb > 10 { + telemetry.CountBucketed("restore.speed-mbps.over10mb", mbps) + telemetry.CountBucketed("restore.speed-mbps.over10mb.per-node", mbps/int64(numClusterNodes)) + } + } return nil } @@ -1103,6 +1122,10 @@ func (r *restoreResumer) publishTables(ctx context.Context) error { // this by adding the table descriptors in DROP state, which causes the schema // change stuff to delete the keys in the background. func (r *restoreResumer) OnFailOrCancel(ctx context.Context, phs interface{}) error { + telemetry.Count("restore.total.failed") + telemetry.CountBucketed("restore.duration-sec.failed", + int64(timeutil.Since(timeutil.FromUnixMicros(r.job.Payload().StartedMicros)).Seconds())) + return phs.(sql.PlanHookState).ExecCfg().DB.Txn(ctx, r.dropTables) } diff --git a/pkg/ccl/backupccl/restore_planning.go b/pkg/ccl/backupccl/restore_planning.go index 5a272e6616b8..16468d88549c 100644 --- a/pkg/ccl/backupccl/restore_planning.go +++ b/pkg/ccl/backupccl/restore_planning.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/covering" "github.com/cockroachdb/cockroach/pkg/sql/parser" @@ -763,6 +764,14 @@ func doRestorePlan( return err } + // Collect telemetry. + { + telemetry.Count("restore.total.started") + if restoreStmt.DescriptorCoverage == tree.AllDescriptors { + telemetry.Count("restore.full-cluster") + } + } + _, errCh, err := p.ExecCfg().JobRegistry.CreateAndStartJob(ctx, resultsCh, jobs.Record{ Description: description, Username: p.User(),