Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
46716: backupccl: add telemetry r=dt a=dt

This adds counters to usage of various features of BACKUP and RESTORE:
  - if encryption is used
 - if full-cluster is used
 - if incremental is used, either explicitly or automatically
 - total, succeeded and failed counts as well as runtime, size and throughput

These should help us measure how BACKUP and RESTORE are used and are behaving in the wild and where to direct our future efforts.

Fixes cockroachdb#46518 
Fixes cockroachdb#46520 
Fixes cockroachdb#46521

Release note (enterprise change): BACKUP and RESTORE now collect some anonymous telemetry on throughput and feature usage.

Release justification: low-risk, high impact.

Co-authored-by: David Taylor <[email protected]>
  • Loading branch information
craig[bot] and dt committed Mar 30, 2020
2 parents 93cb2eb + cd1b236 commit f5ebd5e
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 32 deletions.
70 changes: 47 additions & 23 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/covering"
Expand Down Expand Up @@ -163,14 +164,13 @@ type spanAndTime struct {
func backup(
ctx context.Context,
db *kv.DB,
gossip *gossip.Gossip,
numClusterNodes int,
settings *cluster.Settings,
defaultStore cloud.ExternalStorage,
storageByLocalityKV map[string]*roachpb.ExternalStorage,
job *jobs.Job,
backupManifest *BackupManifest,
checkpointDesc *BackupManifest,
resultsCh chan<- tree.Datums,
makeExternalStorage cloud.ExternalStorageFactory,
encryption *roachpb.FileEncryptionOptions,
) (RowCount, error) {
Expand Down Expand Up @@ -265,7 +265,7 @@ func backup(
// TODO(dan): Make this limiting per node.
//
// TODO(dan): See if there's some better solution than rate-limiting #14798.
maxConcurrentExports := clusterNodeCount(gossip) * int(kvserver.ExportRequestsLimit.Get(&settings.SV)) * 10
maxConcurrentExports := numClusterNodes * int(kvserver.ExportRequestsLimit.Get(&settings.SV)) * 10
exportsSem := make(chan struct{}, maxConcurrentExports)

g := ctxgroup.WithContext(ctx)
Expand Down Expand Up @@ -441,10 +441,7 @@ func (b *backupResumer) releaseProtectedTimestamp(
}

type backupResumer struct {
job *jobs.Job
settings *cluster.Settings
res RowCount
makeExternalStorage cloud.ExternalStorageFactory
job *jobs.Job

testingKnobs struct {
ignoreProtectedTimestamps bool
Expand All @@ -457,7 +454,6 @@ func (b *backupResumer) Resume(
) error {
details := b.job.Details().(jobspb.BackupDetails)
p := phs.(sql.PlanHookState)
b.makeExternalStorage = p.ExecCfg().DistSQLSrv.ExternalStorage

ptsID := details.ProtectedTimestampRecord
if ptsID != nil && !b.testingKnobs.ignoreProtectedTimestamps {
Expand Down Expand Up @@ -487,7 +483,7 @@ func (b *backupResumer) Resume(
if err != nil {
return errors.Wrapf(err, "export configuration")
}
defaultStore, err := b.makeExternalStorage(ctx, defaultConf)
defaultStore, err := p.ExecCfg().DistSQLSrv.ExternalStorage(ctx, defaultConf)
if err != nil {
return errors.Wrapf(err, "make storage")
}
Expand Down Expand Up @@ -519,38 +515,39 @@ func (b *backupResumer) Resume(
// implementations.
log.Warningf(ctx, "unable to load backup checkpoint while resuming job %d: %v", *b.job.ID(), err)
}

numClusterNodes := clusterNodeCount(p.ExecCfg().Gossip)

res, err := backup(
ctx,
p.ExecCfg().DB,
p.ExecCfg().Gossip,
numClusterNodes,
p.ExecCfg().Settings,
defaultStore,
storageByLocalityKV,
b.job,
&backupManifest,
checkpointDesc,
resultsCh,
b.makeExternalStorage,
p.ExecCfg().DistSQLSrv.ExternalStorage,
details.Encryption,
)
if err != nil {
return err
}
b.res = res

err = b.clearStats(ctx, p.ExecCfg().DB)
if err != nil {
log.Warningf(ctx, "unable to clear stats from job payload: %+v", err)
}
b.deleteCheckpoint(ctx)
b.deleteCheckpoint(ctx, p.ExecCfg())

resultsCh <- tree.Datums{
tree.NewDInt(tree.DInt(*b.job.ID())),
tree.NewDString(string(jobs.StatusSucceeded)),
tree.NewDFloat(tree.DFloat(1.0)),
tree.NewDInt(tree.DInt(b.res.Rows)),
tree.NewDInt(tree.DInt(b.res.IndexEntries)),
tree.NewDInt(tree.DInt(b.res.DataSize)),
tree.NewDInt(tree.DInt(res.Rows)),
tree.NewDInt(tree.DInt(res.IndexEntries)),
tree.NewDInt(tree.DInt(res.DataSize)),
}

if ptsID != nil && !b.testingKnobs.ignoreProtectedTimestamps {
Expand All @@ -560,6 +557,30 @@ func (b *backupResumer) Resume(
log.Errorf(ctx, "failed to release protected timestamp: %v", err)
}
}

// Collect telemetry.
{
telemetry.Count("backup.total.succeeded")
const mb = 1 << 20
sizeMb := res.DataSize / mb
sec := int64(timeutil.Since(timeutil.FromUnixMicros(b.job.Payload().StartedMicros)).Seconds())
var mbps int64
if sec > 0 {
mbps = mb / sec
}
if details.StartTime.IsEmpty() {
telemetry.CountBucketed("backup.duration-sec.full-succeeded", sec)
telemetry.CountBucketed("backup.size-mb.full", sizeMb)
telemetry.CountBucketed("backup.speed-mbps.full.total", mbps)
telemetry.CountBucketed("backup.speed-mbps.full.per-node", mbps/int64(numClusterNodes))
} else {
telemetry.CountBucketed("backup.duration-sec.inc-succeeded", sec)
telemetry.CountBucketed("backup.size-mb.inc", sizeMb)
telemetry.CountBucketed("backup.speed-mbps.inc.total", mbps)
telemetry.CountBucketed("backup.speed-mbps.inc.per-node", mbps/int64(numClusterNodes))
}
}

return nil
}

Expand All @@ -583,14 +604,18 @@ func (b *backupResumer) clearStats(ctx context.Context, DB *kv.DB) error {

// OnFailOrCancel is part of the jobs.Resumer interface.
func (b *backupResumer) OnFailOrCancel(ctx context.Context, phs interface{}) error {
telemetry.Count("backup.total.failed")
telemetry.CountBucketed("backup.duration-sec.failed",
int64(timeutil.Since(timeutil.FromUnixMicros(b.job.Payload().StartedMicros)).Seconds()))

cfg := phs.(sql.PlanHookState).ExecCfg()
b.deleteCheckpoint(ctx)
b.deleteCheckpoint(ctx, cfg)
return cfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return b.releaseProtectedTimestamp(ctx, txn, cfg.ProtectedTimestampProvider)
})
}

func (b *backupResumer) deleteCheckpoint(ctx context.Context) {
func (b *backupResumer) deleteCheckpoint(ctx context.Context, cfg *sql.ExecutorConfig) {
// Attempt to delete BACKUP-CHECKPOINT.
if err := func() error {
details := b.job.Details().(jobspb.BackupDetails)
Expand All @@ -600,7 +625,7 @@ func (b *backupResumer) deleteCheckpoint(ctx context.Context) {
if err != nil {
return err
}
exportStore, err := b.makeExternalStorage(ctx, conf)
exportStore, err := cfg.DistSQLSrv.ExternalStorage(ctx, conf)
if err != nil {
return err
}
Expand All @@ -615,10 +640,9 @@ var _ jobs.Resumer = &backupResumer{}
func init() {
jobs.RegisterConstructor(
jobspb.TypeBackup,
func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer {
func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return &backupResumer{
job: job,
settings: settings,
job: job,
}
},
)
Expand Down
28 changes: 28 additions & 0 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/covering"
Expand All @@ -36,6 +37,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/interval"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -664,6 +666,32 @@ func backupPlanHook(
}
}

// Collect telemetry.
{
telemetry.Count("backup.total.started")
if startTime.IsEmpty() {
telemetry.Count("backup.span.full")
} else {
telemetry.Count("backup.span.incremental")
telemetry.CountBucketed("backup.incremental-span-sec", int64(timeutil.Since(startTime.GoTime()).Seconds()))
if len(incrementalFrom) == 0 {
telemetry.Count("backup.auto-incremental")
}
}
if len(backupStmt.To) > 1 {
telemetry.Count("backup.partitioned")
}
if mvccFilter == MVCCFilter_All {
telemetry.Count("backup.revision-history")
}
if encryption != nil {
telemetry.Count("backup.encrypted")
}
if backupStmt.DescriptorCoverage == tree.AllDescriptors {
telemetry.Count("backup.targets.full_cluster")
}
}

errCh, err := sj.Start(ctx)
if err != nil {
return err
Expand Down
41 changes: 32 additions & 9 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/covering"
Expand All @@ -39,6 +39,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -558,7 +559,7 @@ func rewriteBackupSpanKey(kr *storageccl.KeyRewriter, key roachpb.Key) (roachpb.
func restore(
restoreCtx context.Context,
db *kv.DB,
gossip *gossip.Gossip,
numClusterNodes int,
settings *cluster.Settings,
backupManifests []BackupManifest,
backupLocalityInfo []jobspb.RestoreDetails_BackupLocalityInfo,
Expand Down Expand Up @@ -645,7 +646,6 @@ func restore(
// that's wrong.
//
// TODO(dan): Make this limiting per node.
numClusterNodes := clusterNodeCount(gossip)
maxConcurrentImports := numClusterNodes * runtime.NumCPU()
importsSem := make(chan struct{}, maxConcurrentImports)

Expand Down Expand Up @@ -789,7 +789,6 @@ func loadBackupSQLDescs(
type restoreResumer struct {
job *jobs.Job
settings *cluster.Settings
res RowCount
databases []*sqlbase.DatabaseDescriptor
tables []*sqlbase.TableDescriptor
descriptorCoverage tree.DescriptorCoverage
Expand Down Expand Up @@ -977,10 +976,11 @@ func (r *restoreResumer) Resume(
return nil
}

numClusterNodes := clusterNodeCount(p.ExecCfg().Gossip)
res, err := restore(
ctx,
p.ExecCfg().DB,
p.ExecCfg().Gossip,
numClusterNodes,
p.ExecCfg().Settings,
backupManifests,
details.BackupLocalityInfo,
Expand All @@ -991,7 +991,6 @@ func (r *restoreResumer) Resume(
r.job,
details.Encryption,
)
r.res = res
if err != nil {
return err
}
Expand All @@ -1014,11 +1013,31 @@ func (r *restoreResumer) Resume(
tree.NewDInt(tree.DInt(*r.job.ID())),
tree.NewDString(string(jobs.StatusSucceeded)),
tree.NewDFloat(tree.DFloat(1.0)),
tree.NewDInt(tree.DInt(r.res.Rows)),
tree.NewDInt(tree.DInt(r.res.IndexEntries)),
tree.NewDInt(tree.DInt(r.res.DataSize)),
tree.NewDInt(tree.DInt(res.Rows)),
tree.NewDInt(tree.DInt(res.IndexEntries)),
tree.NewDInt(tree.DInt(res.DataSize)),
}

// Collect telemetry.
{
telemetry.Count("restore.total.succeeded")
const mb = 1 << 20
sizeMb := res.DataSize / mb
sec := int64(timeutil.Since(timeutil.FromUnixMicros(r.job.Payload().StartedMicros)).Seconds())
var mbps int64
if sec > 0 {
mbps = mb / sec
}
telemetry.CountBucketed("restore.duration-sec.succeeded", sec)
telemetry.CountBucketed("restore.size-mb.full", sizeMb)
telemetry.CountBucketed("restore.speed-mbps.total", mbps)
telemetry.CountBucketed("restore.speed-mbps.per-node", mbps/int64(numClusterNodes))
// Tiny restores may skew throughput numbers due to overhead.
if sizeMb > 10 {
telemetry.CountBucketed("restore.speed-mbps.over10mb", mbps)
telemetry.CountBucketed("restore.speed-mbps.over10mb.per-node", mbps/int64(numClusterNodes))
}
}
return nil
}

Expand Down Expand Up @@ -1103,6 +1122,10 @@ func (r *restoreResumer) publishTables(ctx context.Context) error {
// this by adding the table descriptors in DROP state, which causes the schema
// change stuff to delete the keys in the background.
func (r *restoreResumer) OnFailOrCancel(ctx context.Context, phs interface{}) error {
telemetry.Count("restore.total.failed")
telemetry.CountBucketed("restore.duration-sec.failed",
int64(timeutil.Since(timeutil.FromUnixMicros(r.job.Payload().StartedMicros)).Seconds()))

return phs.(sql.PlanHookState).ExecCfg().DB.Txn(ctx, r.dropTables)
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/ccl/backupccl/restore_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/covering"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
Expand Down Expand Up @@ -763,6 +764,14 @@ func doRestorePlan(
return err
}

// Collect telemetry.
{
telemetry.Count("restore.total.started")
if restoreStmt.DescriptorCoverage == tree.AllDescriptors {
telemetry.Count("restore.full-cluster")
}
}

_, errCh, err := p.ExecCfg().JobRegistry.CreateAndStartJob(ctx, resultsCh, jobs.Record{
Description: description,
Username: p.User(),
Expand Down

0 comments on commit f5ebd5e

Please sign in to comment.