From 8ee448e329d5eb204d857a12328dae741bb6f30a Mon Sep 17 00:00:00 2001 From: David Taylor Date: Mon, 30 Mar 2020 03:14:58 +0000 Subject: [PATCH 1/2] backupccl: remove some resumer fields Release note: none. Release justification: extremely minor change. --- pkg/ccl/backupccl/backup_job.go | 32 ++++++++++++-------------------- pkg/ccl/backupccl/restore_job.go | 8 +++----- 2 files changed, 15 insertions(+), 25 deletions(-) diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index f9217c9c08e7..b7dec105d888 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -170,7 +170,6 @@ func backup( job *jobs.Job, backupManifest *BackupManifest, checkpointDesc *BackupManifest, - resultsCh chan<- tree.Datums, makeExternalStorage cloud.ExternalStorageFactory, encryption *roachpb.FileEncryptionOptions, ) (RowCount, error) { @@ -441,10 +440,7 @@ func (b *backupResumer) releaseProtectedTimestamp( } type backupResumer struct { - job *jobs.Job - settings *cluster.Settings - res RowCount - makeExternalStorage cloud.ExternalStorageFactory + job *jobs.Job testingKnobs struct { ignoreProtectedTimestamps bool @@ -457,7 +453,6 @@ func (b *backupResumer) Resume( ) error { details := b.job.Details().(jobspb.BackupDetails) p := phs.(sql.PlanHookState) - b.makeExternalStorage = p.ExecCfg().DistSQLSrv.ExternalStorage ptsID := details.ProtectedTimestampRecord if ptsID != nil && !b.testingKnobs.ignoreProtectedTimestamps { @@ -487,7 +482,7 @@ func (b *backupResumer) Resume( if err != nil { return errors.Wrapf(err, "export configuration") } - defaultStore, err := b.makeExternalStorage(ctx, defaultConf) + defaultStore, err := p.ExecCfg().DistSQLSrv.ExternalStorage(ctx, defaultConf) if err != nil { return errors.Wrapf(err, "make storage") } @@ -529,28 +524,26 @@ func (b *backupResumer) Resume( b.job, &backupManifest, checkpointDesc, - resultsCh, - b.makeExternalStorage, + p.ExecCfg().DistSQLSrv.ExternalStorage, details.Encryption, ) if err != nil { return err } - b.res = res err = b.clearStats(ctx, p.ExecCfg().DB) if err != nil { log.Warningf(ctx, "unable to clear stats from job payload: %+v", err) } - b.deleteCheckpoint(ctx) + b.deleteCheckpoint(ctx, p.ExecCfg()) resultsCh <- tree.Datums{ tree.NewDInt(tree.DInt(*b.job.ID())), tree.NewDString(string(jobs.StatusSucceeded)), tree.NewDFloat(tree.DFloat(1.0)), - tree.NewDInt(tree.DInt(b.res.Rows)), - tree.NewDInt(tree.DInt(b.res.IndexEntries)), - tree.NewDInt(tree.DInt(b.res.DataSize)), + tree.NewDInt(tree.DInt(res.Rows)), + tree.NewDInt(tree.DInt(res.IndexEntries)), + tree.NewDInt(tree.DInt(res.DataSize)), } if ptsID != nil && !b.testingKnobs.ignoreProtectedTimestamps { @@ -584,13 +577,13 @@ func (b *backupResumer) clearStats(ctx context.Context, DB *kv.DB) error { // OnFailOrCancel is part of the jobs.Resumer interface. func (b *backupResumer) OnFailOrCancel(ctx context.Context, phs interface{}) error { cfg := phs.(sql.PlanHookState).ExecCfg() - b.deleteCheckpoint(ctx) + b.deleteCheckpoint(ctx, cfg) return cfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { return b.releaseProtectedTimestamp(ctx, txn, cfg.ProtectedTimestampProvider) }) } -func (b *backupResumer) deleteCheckpoint(ctx context.Context) { +func (b *backupResumer) deleteCheckpoint(ctx context.Context, cfg *sql.ExecutorConfig) { // Attempt to delete BACKUP-CHECKPOINT. if err := func() error { details := b.job.Details().(jobspb.BackupDetails) @@ -600,7 +593,7 @@ func (b *backupResumer) deleteCheckpoint(ctx context.Context) { if err != nil { return err } - exportStore, err := b.makeExternalStorage(ctx, conf) + exportStore, err := cfg.DistSQLSrv.ExternalStorage(ctx, conf) if err != nil { return err } @@ -615,10 +608,9 @@ var _ jobs.Resumer = &backupResumer{} func init() { jobs.RegisterConstructor( jobspb.TypeBackup, - func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { + func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer { return &backupResumer{ - job: job, - settings: settings, + job: job, } }, ) diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 39ed3cb7f2ac..8a59b382bba0 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -789,7 +789,6 @@ func loadBackupSQLDescs( type restoreResumer struct { job *jobs.Job settings *cluster.Settings - res RowCount databases []*sqlbase.DatabaseDescriptor tables []*sqlbase.TableDescriptor descriptorCoverage tree.DescriptorCoverage @@ -991,7 +990,6 @@ func (r *restoreResumer) Resume( r.job, details.Encryption, ) - r.res = res if err != nil { return err } @@ -1014,9 +1012,9 @@ func (r *restoreResumer) Resume( tree.NewDInt(tree.DInt(*r.job.ID())), tree.NewDString(string(jobs.StatusSucceeded)), tree.NewDFloat(tree.DFloat(1.0)), - tree.NewDInt(tree.DInt(r.res.Rows)), - tree.NewDInt(tree.DInt(r.res.IndexEntries)), - tree.NewDInt(tree.DInt(r.res.DataSize)), + tree.NewDInt(tree.DInt(res.Rows)), + tree.NewDInt(tree.DInt(res.IndexEntries)), + tree.NewDInt(tree.DInt(res.DataSize)), } return nil From cd1b2367a2b4292f0237cac8535e5a3161e65877 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Mon, 30 Mar 2020 03:20:22 +0000 Subject: [PATCH 2/2] backupccl: add telemetry This adds counters to usage of various features of BACKUP and RESTORE: - if encryption is used - if full-cluster is used - if incremental is used, either explicitly or automatically - total, succeeded and failed counts as well as runtime, size and throughput These should help us measure how BACKUP and RESTORE are used and are behaving in the wild and where to direct our future efforts. Release note (enterprise change): BACKUP and RESTORE now collect some anonymous telemetry on throughput and feature usage. Release justification: low-risk, high impact. --- pkg/ccl/backupccl/backup_job.go | 38 ++++++++++++++++++++++++--- pkg/ccl/backupccl/backup_planning.go | 28 ++++++++++++++++++++ pkg/ccl/backupccl/restore_job.go | 33 ++++++++++++++++++++--- pkg/ccl/backupccl/restore_planning.go | 9 +++++++ 4 files changed, 101 insertions(+), 7 deletions(-) diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index b7dec105d888..f756dd9cbd6a 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/covering" @@ -163,7 +164,7 @@ type spanAndTime struct { func backup( ctx context.Context, db *kv.DB, - gossip *gossip.Gossip, + numClusterNodes int, settings *cluster.Settings, defaultStore cloud.ExternalStorage, storageByLocalityKV map[string]*roachpb.ExternalStorage, @@ -264,7 +265,7 @@ func backup( // TODO(dan): Make this limiting per node. // // TODO(dan): See if there's some better solution than rate-limiting #14798. - maxConcurrentExports := clusterNodeCount(gossip) * int(kvserver.ExportRequestsLimit.Get(&settings.SV)) * 10 + maxConcurrentExports := numClusterNodes * int(kvserver.ExportRequestsLimit.Get(&settings.SV)) * 10 exportsSem := make(chan struct{}, maxConcurrentExports) g := ctxgroup.WithContext(ctx) @@ -514,10 +515,13 @@ func (b *backupResumer) Resume( // implementations. log.Warningf(ctx, "unable to load backup checkpoint while resuming job %d: %v", *b.job.ID(), err) } + + numClusterNodes := clusterNodeCount(p.ExecCfg().Gossip) + res, err := backup( ctx, p.ExecCfg().DB, - p.ExecCfg().Gossip, + numClusterNodes, p.ExecCfg().Settings, defaultStore, storageByLocalityKV, @@ -553,6 +557,30 @@ func (b *backupResumer) Resume( log.Errorf(ctx, "failed to release protected timestamp: %v", err) } } + + // Collect telemetry. + { + telemetry.Count("backup.total.succeeded") + const mb = 1 << 20 + sizeMb := res.DataSize / mb + sec := int64(timeutil.Since(timeutil.FromUnixMicros(b.job.Payload().StartedMicros)).Seconds()) + var mbps int64 + if sec > 0 { + mbps = mb / sec + } + if details.StartTime.IsEmpty() { + telemetry.CountBucketed("backup.duration-sec.full-succeeded", sec) + telemetry.CountBucketed("backup.size-mb.full", sizeMb) + telemetry.CountBucketed("backup.speed-mbps.full.total", mbps) + telemetry.CountBucketed("backup.speed-mbps.full.per-node", mbps/int64(numClusterNodes)) + } else { + telemetry.CountBucketed("backup.duration-sec.inc-succeeded", sec) + telemetry.CountBucketed("backup.size-mb.inc", sizeMb) + telemetry.CountBucketed("backup.speed-mbps.inc.total", mbps) + telemetry.CountBucketed("backup.speed-mbps.inc.per-node", mbps/int64(numClusterNodes)) + } + } + return nil } @@ -576,6 +604,10 @@ func (b *backupResumer) clearStats(ctx context.Context, DB *kv.DB) error { // OnFailOrCancel is part of the jobs.Resumer interface. func (b *backupResumer) OnFailOrCancel(ctx context.Context, phs interface{}) error { + telemetry.Count("backup.total.failed") + telemetry.CountBucketed("backup.duration-sec.failed", + int64(timeutil.Since(timeutil.FromUnixMicros(b.job.Payload().StartedMicros)).Seconds())) + cfg := phs.(sql.PlanHookState).ExecCfg() b.deleteCheckpoint(ctx, cfg) return cfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { diff --git a/pkg/ccl/backupccl/backup_planning.go b/pkg/ccl/backupccl/backup_planning.go index 57bc0835ddc4..077991e2f001 100644 --- a/pkg/ccl/backupccl/backup_planning.go +++ b/pkg/ccl/backupccl/backup_planning.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/covering" @@ -36,6 +37,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/interval" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" @@ -664,6 +666,32 @@ func backupPlanHook( } } + // Collect telemetry. + { + telemetry.Count("backup.total.started") + if startTime.IsEmpty() { + telemetry.Count("backup.span.full") + } else { + telemetry.Count("backup.span.incremental") + telemetry.CountBucketed("backup.incremental-span-sec", int64(timeutil.Since(startTime.GoTime()).Seconds())) + if len(incrementalFrom) == 0 { + telemetry.Count("backup.auto-incremental") + } + } + if len(backupStmt.To) > 1 { + telemetry.Count("backup.partitioned") + } + if mvccFilter == MVCCFilter_All { + telemetry.Count("backup.revision-history") + } + if encryption != nil { + telemetry.Count("backup.encrypted") + } + if backupStmt.DescriptorCoverage == tree.AllDescriptors { + telemetry.Count("backup.targets.full_cluster") + } + } + errCh, err := sj.Start(ctx) if err != nil { return err diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 8a59b382bba0..862c7e14af46 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -18,12 +18,12 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" - "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/covering" @@ -39,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" "github.com/opentracing/opentracing-go" @@ -558,7 +559,7 @@ func rewriteBackupSpanKey(kr *storageccl.KeyRewriter, key roachpb.Key) (roachpb. func restore( restoreCtx context.Context, db *kv.DB, - gossip *gossip.Gossip, + numClusterNodes int, settings *cluster.Settings, backupManifests []BackupManifest, backupLocalityInfo []jobspb.RestoreDetails_BackupLocalityInfo, @@ -645,7 +646,6 @@ func restore( // that's wrong. // // TODO(dan): Make this limiting per node. - numClusterNodes := clusterNodeCount(gossip) maxConcurrentImports := numClusterNodes * runtime.NumCPU() importsSem := make(chan struct{}, maxConcurrentImports) @@ -976,10 +976,11 @@ func (r *restoreResumer) Resume( return nil } + numClusterNodes := clusterNodeCount(p.ExecCfg().Gossip) res, err := restore( ctx, p.ExecCfg().DB, - p.ExecCfg().Gossip, + numClusterNodes, p.ExecCfg().Settings, backupManifests, details.BackupLocalityInfo, @@ -1017,6 +1018,26 @@ func (r *restoreResumer) Resume( tree.NewDInt(tree.DInt(res.DataSize)), } + // Collect telemetry. + { + telemetry.Count("restore.total.succeeded") + const mb = 1 << 20 + sizeMb := res.DataSize / mb + sec := int64(timeutil.Since(timeutil.FromUnixMicros(r.job.Payload().StartedMicros)).Seconds()) + var mbps int64 + if sec > 0 { + mbps = mb / sec + } + telemetry.CountBucketed("restore.duration-sec.succeeded", sec) + telemetry.CountBucketed("restore.size-mb.full", sizeMb) + telemetry.CountBucketed("restore.speed-mbps.total", mbps) + telemetry.CountBucketed("restore.speed-mbps.per-node", mbps/int64(numClusterNodes)) + // Tiny restores may skew throughput numbers due to overhead. + if sizeMb > 10 { + telemetry.CountBucketed("restore.speed-mbps.over10mb", mbps) + telemetry.CountBucketed("restore.speed-mbps.over10mb.per-node", mbps/int64(numClusterNodes)) + } + } return nil } @@ -1101,6 +1122,10 @@ func (r *restoreResumer) publishTables(ctx context.Context) error { // this by adding the table descriptors in DROP state, which causes the schema // change stuff to delete the keys in the background. func (r *restoreResumer) OnFailOrCancel(ctx context.Context, phs interface{}) error { + telemetry.Count("restore.total.failed") + telemetry.CountBucketed("restore.duration-sec.failed", + int64(timeutil.Since(timeutil.FromUnixMicros(r.job.Payload().StartedMicros)).Seconds())) + return phs.(sql.PlanHookState).ExecCfg().DB.Txn(ctx, r.dropTables) } diff --git a/pkg/ccl/backupccl/restore_planning.go b/pkg/ccl/backupccl/restore_planning.go index 5a272e6616b8..16468d88549c 100644 --- a/pkg/ccl/backupccl/restore_planning.go +++ b/pkg/ccl/backupccl/restore_planning.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/covering" "github.com/cockroachdb/cockroach/pkg/sql/parser" @@ -763,6 +764,14 @@ func doRestorePlan( return err } + // Collect telemetry. + { + telemetry.Count("restore.total.started") + if restoreStmt.DescriptorCoverage == tree.AllDescriptors { + telemetry.Count("restore.full-cluster") + } + } + _, errCh, err := p.ExecCfg().JobRegistry.CreateAndStartJob(ctx, resultsCh, jobs.Record{ Description: description, Username: p.User(),