From 3051f65f353d112f97b60570c9e3e2fd75a377a6 Mon Sep 17 00:00:00 2001 From: Daniel Harrison Date: Fri, 21 Jul 2017 10:16:52 -0400 Subject: [PATCH] sqlccl: rework split+scatter+import concurrency in RESTORE This was largely motivated by how long it takes to presplit in very large restores. Doing them all upfront required rate-limiting, but no number was well-tuned for every cluster. Additionally, even at a high presplit rate of 100, a 10 TB cluster would take 52 minutes before it even started scattering. Now, one goroutine iterates through every span being imported, presplitting and scattering before moving on to the next one. Upon split+scatter, the span is sent into a buffered channel read by the Import goroutines, which prevents it from getting too far ahead of the Imports. This both acts as a natural rate limiter for the splits as well as bounds the number of empty ranges created if a RESTORE fails or is cancelled. Overall tpch-10 RESTORE time remains 12:30 on a 4-node cluster. Since each range is now scattered individually, we no longer need the jitter in the scatter implementation (plus it now slows down the RESTORE), so it's removed. Restore really needs a refactor, but I'm going to be making a couple more changes leading up to 1.1 so I'll leave cleanup until after they go in. This removes most tunable constants in RESTORE and the remaining ones are defined in terms of the number of nodes in the cluster and the number of cpus on a node, so this: Closes #14798. --- pkg/ccl/sqlccl/backup.go | 2 +- pkg/ccl/sqlccl/backup_test.go | 60 ------ pkg/ccl/sqlccl/restore.go | 349 ++++++++++++--------------------- pkg/storage/replica_command.go | 14 -- 4 files changed, 124 insertions(+), 301 deletions(-) diff --git a/pkg/ccl/sqlccl/backup.go b/pkg/ccl/sqlccl/backup.go index 5db533e9cb01..f2ef4851d407 100644 --- a/pkg/ccl/sqlccl/backup.go +++ b/pkg/ccl/sqlccl/backup.go @@ -109,7 +109,7 @@ func ValidatePreviousBackups(ctx context.Context, uris []string) (hlc.Timestamp, // This reuses Restore's logic for lining up all the start and end // timestamps to validate the previous backups that this one is incremental // from. - _, endTime, err := makeImportRequests(nil, backups) + _, endTime, err := makeImportSpans(nil, backups) return endTime, err } diff --git a/pkg/ccl/sqlccl/backup_test.go b/pkg/ccl/sqlccl/backup_test.go index e90d2e193863..c55feda7c239 100644 --- a/pkg/ccl/sqlccl/backup_test.go +++ b/pkg/ccl/sqlccl/backup_test.go @@ -37,7 +37,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/sqlccl" "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl/sampledataccl" - "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" @@ -49,7 +48,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util" - "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -1481,64 +1479,6 @@ func TestTimestampMismatch(t *testing.T) { }) } -func TestPresplitRanges(t *testing.T) { - defer leaktest.AfterTest(t)() - - ctx, _, tc, _, cleanupFn := backupRestoreTestSetup(t, multiNode, 0) - defer cleanupFn() - kvDB := tc.Server(0).KVClient().(*client.DB) - - numRangesTests := []int{0, 1, 2, 3, 4, 10} - for testNum, numRanges := range numRangesTests { - t.Run(strconv.Itoa(numRanges), func(t *testing.T) { - baseKey := keys.MakeTablePrefix(uint32(keys.MaxReservedDescID + testNum)) - var splitPoints []roachpb.Key - for i := 0; i < numRanges; i++ { - key := encoding.EncodeUvarintAscending(append([]byte(nil), baseKey...), uint64(i)) - splitPoints = append(splitPoints, key) - } - if err := sqlccl.PresplitRanges(ctx, *kvDB, splitPoints); err != nil { - t.Error(err) - } - - // Verify that the splits exist. - // Note that PresplitRanges adds the row sentinel to make a valid table - // key, but AdminSplit internally removes it (via EnsureSafeSplitKey). So - // we expect splits that match the splitPoints exactly. - for _, splitKey := range splitPoints { - // Scan the meta range for splitKey. - rk, err := keys.Addr(splitKey) - if err != nil { - t.Fatal(err) - } - - startKey := keys.RangeMetaKey(rk) - endKey := keys.Meta2Prefix.PrefixEnd() - - kvs, err := kvDB.Scan(context.Background(), startKey, endKey, 1) - if err != nil { - t.Fatal(err) - } - if len(kvs) != 1 { - t.Fatalf("expected 1 KV, got %v", kvs) - } - desc := &roachpb.RangeDescriptor{} - if err := kvs[0].ValueProto(desc); err != nil { - t.Fatal(err) - } - if !desc.EndKey.Equal(rk) { - t.Errorf( - "missing split %s: range %s to %s", - keys.PrettyPrint(splitKey), - keys.PrettyPrint(desc.StartKey.AsRawKey()), - keys.PrettyPrint(desc.EndKey.AsRawKey()), - ) - } - } - }) - } -} - func TestBackupLevelDB(t *testing.T) { defer leaktest.AfterTest(t)() diff --git a/pkg/ccl/sqlccl/restore.go b/pkg/ccl/sqlccl/restore.go index 41d113b2af86..8024ac93312b 100644 --- a/pkg/ccl/sqlccl/restore.go +++ b/pkg/ccl/sqlccl/restore.go @@ -14,13 +14,11 @@ import ( "github.com/pkg/errors" "golang.org/x/net/context" "golang.org/x/sync/errgroup" - "golang.org/x/time/rate" "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl/intervalccl" "github.com/cockroachdb/cockroach/pkg/internal/client" - "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/jobs" @@ -41,57 +39,6 @@ const ( restoreOptSkipMissingFKs = "skip_missing_foreign_keys" ) -// Import loads some data in sstables into an empty range. Only the keys between -// startKey and endKey are loaded. Every row's key is rewritten to be for -// newTableID. -func Import( - ctx context.Context, - db client.DB, - startKey, endKey roachpb.Key, - files []roachpb.ImportRequest_File, - kr *storageccl.KeyRewriter, - rekeys []roachpb.ImportRequest_TableRekey, -) (*roachpb.ImportResponse, error) { - var newStartKey, newEndKey roachpb.Key - { - var ok bool - newStartKey, ok, _ = kr.RewriteKey(append([]byte(nil), startKey...)) - if !ok { - return nil, errors.Errorf("could not rewrite key: %s", newStartKey) - } - newEndKey, ok, _ = kr.RewriteKey(append([]byte(nil), endKey...)) - if !ok { - return nil, errors.Errorf("could not rewrite key: %s", newEndKey) - } - } - - if log.V(1) { - log.Infof(ctx, "import [%s,%s) (%d files)", newStartKey, newEndKey, len(files)) - } - if len(files) == 0 { - return &roachpb.ImportResponse{}, nil - } - - req := &roachpb.ImportRequest{ - // Import is a point request because we don't want DistSender to split - // it. Assume (but don't require) the entire post-rewrite span is on the - // same range. - Span: roachpb.Span{Key: newStartKey}, - DataSpan: roachpb.Span{ - Key: startKey, - EndKey: endKey, - }, - Files: files, - Rekeys: rekeys, - } - res, pErr := client.SendWrapped(ctx, db.GetSender(), req) - if pErr != nil { - return nil, pErr.GoError() - } - - return res.(*roachpb.ImportResponse), nil -} - func loadBackupDescs(ctx context.Context, uris []string) ([]BackupDescriptor, error) { backupDescs := make([]BackupDescriptor, len(uris)) @@ -331,7 +278,7 @@ type intervalSpan roachpb.Span var _ interval.Interface = intervalSpan{} -// ID is part of `interval.Interface` but unused in makeImportRequests. +// ID is part of `interval.Interface` but unused in makeImportSpans. func (ie intervalSpan) ID() uintptr { return 0 } // Range is part of `interval.Interface`. @@ -363,8 +310,8 @@ type importEntry struct { files []roachpb.ImportRequest_File } -// makeImportRequests pivots the backups, which are grouped by time, into -// requests for import, which are grouped by keyrange. +// makeImportSpans pivots the backups, which are grouped by time, into +// spans for import, which are grouped by keyrange. // // The core logic of this is in OverlapCoveringMerge, which accepts sets of // non-overlapping key ranges (aka coverings) each with a payload, and returns @@ -383,13 +330,13 @@ type importEntry struct { // - [B, C) -> /file1, /file4, requested (note that file1 was split into two ranges) // - [C, D) -> /file2, /file5, requested // -// This would be turned into two Import requests, one restoring [B, C) out of +// This would be turned into two Import spans, one restoring [B, C) out of // /file1 and /file3, the other restoring [C, D) out of /file2 and /file5. // Nothing is restored out of /file3 and only part of /file1 is used. // // NB: All grouping operates in the pre-rewrite keyspace, meaning the keyranges // as they were backed up, not as they're being restored. -func makeImportRequests( +func makeImportSpans( tableSpans []roachpb.Span, backups []BackupDescriptor, ) ([]importEntry, hlc.Timestamp, error) { // Put the merged table data covering first into the OverlapCoveringMerge @@ -497,65 +444,6 @@ func makeImportRequests( return requestEntries, maxEndTime, nil } -// PresplitRanges concurrently creates the splits described by `input`. It does -// this by finding the middle key, splitting and recursively presplitting the -// resulting left and right hand ranges. NB: The split code assumes that the LHS -// of the resulting ranges is the smaller, so normally you'd split from the -// left, but this method should only be called on empty keyranges, so it's okay. -// -// The `input` parameter expected to be sorted. -func PresplitRanges(baseCtx context.Context, db client.DB, input []roachpb.Key) error { - // TODO(dan): This implementation does nothing to control the maximum - // parallelization or number of goroutines spawned. Revisit (possibly via a - // semaphore) if this becomes a problem in practice. - - ctx, span := tracing.ChildSpan(baseCtx, "PresplitRanges") - defer tracing.FinishSpan(span) - log.Infof(ctx, "presplitting %d ranges", len(input)) - - if len(input) == 0 { - return nil - } - - // 100 was picked because it's small enough that a ~16000 presplit restore - // finishes smoothly on a 4-node azure production cluster. - // - // TODO(dan): See if there's some better solution #14798. - const splitsPerSecond, splitsBurst = 100, 1 - limiter := rate.NewLimiter(splitsPerSecond, splitsBurst) - - g, ctx := errgroup.WithContext(ctx) - var splitFn func([]roachpb.Key) error - splitFn = func(splitPoints []roachpb.Key) error { - if err := limiter.Wait(ctx); err != nil { - return err - } - - // Pick the index such that it's 0 if len(splitPoints) == 1. - splitIdx := len(splitPoints) / 2 - if err := db.AdminSplit(ctx, splitPoints[splitIdx], splitPoints[splitIdx]); err != nil { - return err - } - - splitPointsLeft, splitPointsRight := splitPoints[:splitIdx], splitPoints[splitIdx+1:] - if len(splitPointsLeft) > 0 { - g.Go(func() error { - return splitFn(splitPointsLeft) - }) - } - if len(splitPointsRight) > 0 { - // Save a few goroutines by reusing this one. - return splitFn(splitPointsRight) - } - return nil - } - - g.Go(func() error { - return splitFn(input) - }) - return g.Wait() -} - // Write the new descriptors. First the ID -> TableDescriptor for the new table, // then flip (or initialize) the name -> ID entry so any new queries will use // the new one. @@ -604,7 +492,7 @@ func restoreJobDescription(restore *parser.Restore, from []string) (string, erro // Restore imports a SQL table (or tables) from sets of non-overlapping sstable // files. func Restore( - ctx context.Context, + restoreCtx context.Context, p sql.PlanHookState, uris []string, targets parser.TargetList, @@ -624,7 +512,7 @@ func Restore( // A note about contexts and spans in this method: the top-level context // `ctx` is used for orchestration logging. All operations that carry out // work get their individual contexts. - initCtx, initSpan := tracing.ChildSpan(ctx, "init") + initCtx, initSpan := tracing.ChildSpan(restoreCtx, "init") defer func() { tracing.FinishSpan(initSpan) // want late binding }() @@ -657,12 +545,12 @@ func Restore( } } - log.Eventf(ctx, "starting restore for %d tables", len(tables)) + log.Eventf(restoreCtx, "starting restore for %d tables", len(tables)) // Fail fast if the necessary databases don't exist since the below logic // leaks table IDs when Restore fails. - if err := db.Txn(initCtx, func(ctx context.Context, txn *client.Txn) error { - return reassignParentIDs(ctx, txn, p, databasesByID, tables, opt) + if err := db.Txn(initCtx, func(txnCtx context.Context, txn *client.Txn) error { + return reassignParentIDs(txnCtx, txn, p, databasesByID, tables, opt) }); err != nil { return failed, err } @@ -686,7 +574,7 @@ func Restore( // Pivot the backups, which are grouped by time, into requests for import, // which are grouped by keyrange. - importRequests, _, err := makeImportRequests(spans, backupDescs) + importSpans, _, err := makeImportSpans(spans, backupDescs) if err != nil { return failed, errors.Wrapf(err, "making import requests for %d backups", len(backupDescs)) } @@ -704,59 +592,34 @@ func Restore( tracing.FinishSpan(initSpan) initCtx, initSpan = nil, nil - splitCtx, splitSpan := tracing.ChildSpan(ctx, "presplit") - defer func() { - tracing.FinishSpan(splitSpan) // want late binding - }() - - // The Import (and resulting WriteBatch) requests made below run on - // leaseholders, so presplit the ranges to balance the work among many - // nodes - splitKeys := make([]roachpb.Key, len(importRequests)+1) - for i, r := range importRequests { - var ok bool - splitKeys[i], ok, _ = kr.RewriteKey(append([]byte(nil), r.Key...)) - if !ok { - return failed, errors.Errorf("failed to rewrite key: %s", r.Key) - } - } - // Split at the end of the last table; otherwise, the last range will span - // to MaxKey and potentially contain data, which breaks scatter. - var maxID sqlbase.ID - for _, id := range newTableIDs { - if id > maxID { - maxID = id - } - } - splitKeys[len(splitKeys)-1] = keys.MakeTablePrefix(uint32(maxID + 1)) - if err := PresplitRanges(splitCtx, db, splitKeys); err != nil { - return failed, errors.Wrapf(err, "presplitting %d ranges", len(importRequests)) + mu := struct { + syncutil.Mutex + res roachpb.BulkOpSummary + requestsCompleted []bool + lowWaterMark int + }{ + requestsCompleted: make([]bool, len(importSpans)), + lowWaterMark: -1, } - log.Eventf(ctx, "presplit ranges along %d keys", len(splitKeys)) - tracing.FinishSpan(splitSpan) - splitCtx, splitSpan = nil, nil - { - newSpans := spansForAllTableIndexes(tables) - g, gCtx := errgroup.WithContext(ctx) - for i := range newSpans { - span := newSpans[i] - g.Go(func() error { - scatterCtx, sp := tracing.ChildSpan(gCtx, "scatter") - defer tracing.FinishSpan(sp) - - req := &roachpb.AdminScatterRequest{ - Span: roachpb.Span{Key: span.Key, EndKey: span.EndKey}, + progressLogger := jobProgressLogger{ + job: job, + totalChunks: len(importSpans), + progressedFn: func(progressedCtx context.Context, details interface{}) { + switch d := details.(type) { + case *jobs.Payload_Restore: + mu.Lock() + if mu.lowWaterMark >= 0 { + d.Restore.LowWaterMark = importSpans[mu.lowWaterMark].Key } - _, pErr := client.SendWrapped(scatterCtx, db.GetSender(), req) - return pErr.GoError() - }) - } - if err := g.Wait(); err != nil { - log.Errorf(ctx, "failed scattering %d ranges: %s", len(importRequests), err) - } - log.Eventf(ctx, "scattered lease holders for %d key spans", len(newSpans)) + mu.Unlock() + default: + log.Errorf(progressedCtx, "job payload had unexpected type %T", d) + } + }, } + progressCtx, progressSpan := tracing.ChildSpan(restoreCtx, "progress-log") + defer tracing.FinishSpan(progressSpan) // We're already limiting these on the server-side, but sending all the // Import requests at once would fill up distsender/grpc/something and cause @@ -775,64 +638,102 @@ func Restore( maxConcurrentImports := clusterNodeCount(p.ExecCfg().Gossip) * runtime.NumCPU() importsSem := make(chan struct{}, maxConcurrentImports) - log.Eventf(ctx, "commencing import of data with concurrency %d", maxConcurrentImports) + log.Eventf(restoreCtx, "commencing import of data with concurrency %d", maxConcurrentImports) tBegin := timeutil.Now() - mu := struct { - syncutil.Mutex - res roachpb.BulkOpSummary - requestsCompleted []bool - lowWaterMark int - }{ - requestsCompleted: make([]bool, len(importRequests)), - lowWaterMark: -1, - } - - progressLogger := jobProgressLogger{ - job: job, - totalChunks: len(importRequests), - progressedFn: func(ctx context.Context, details interface{}) { - switch d := details.(type) { - case *jobs.Payload_Restore: - mu.Lock() - if mu.lowWaterMark >= 0 { - d.Restore.LowWaterMark = importRequests[mu.lowWaterMark].Key + // We're about to start off one goroutine that serially presplits & scatters + // each import span. Once split and scattered, the span is submitted to + // importRequestsCh to indicate it's ready for Import. Since import is so + // much slower, we buffer the channel to keep the split/scatter work from + // getting too far ahead. This both naturally rate limits the split/scatters + // and bounds the number of empty ranges crated if the RESTORE fails (or is + // cancelled). + const presplitLeadLimit = 10 + importRequestsCh := make(chan *roachpb.ImportRequest, presplitLeadLimit) + + g, gCtx := errgroup.WithContext(restoreCtx) + g.Go(func() error { + splitScatterCtx, splitScatterSpan := tracing.ChildSpan(gCtx, "presplit-scatter") + defer tracing.FinishSpan(splitScatterSpan) + defer close(importRequestsCh) + + // The Import (and resulting AddSSTable) requests made below run on + // leaseholders, so presplit and scatter the ranges to balance the work + // among many nodes. + for i, importSpan := range importSpans { + var newSpan roachpb.Span + { + var ok bool + newSpan.Key, ok, _ = kr.RewriteKey(append([]byte(nil), importSpan.Key...)) + if !ok { + return errors.Errorf("could not rewrite key: %s", importSpan.Key) + } + newSpan.EndKey, ok, _ = kr.RewriteKey(append([]byte(nil), importSpan.EndKey...)) + if !ok { + return errors.Errorf("could not rewrite key: %s", importSpan.EndKey) } - mu.Unlock() - default: - log.Errorf(ctx, "job payload had unexpected type %T", d) } - }, - } - progressCtx, progressSpan := tracing.ChildSpan(ctx, "progress-log") - defer tracing.FinishSpan(progressSpan) + log.VEventf(restoreCtx, 1, "presplitting %d of %d", i+1, len(importSpans)) + if err := db.AdminSplit(splitScatterCtx, newSpan.Key, newSpan.Key); err != nil { + return err + } + + log.VEventf(restoreCtx, 1, "scattering %d of %d", i+1, len(importSpans)) + scatterReq := &roachpb.AdminScatterRequest{Span: newSpan} + if _, pErr := client.SendWrapped(splitScatterCtx, db.GetSender(), scatterReq); pErr != nil { + // TODO(dan): Unfortunately, Scatter is still too unreliable to + // fail the RESTORE when Scatter fails. I'm uncomfortable that + // this could break entirely and not start failing the tests, + // but on the bright side, it doesn't affect correctness, only + // throughput. + log.Errorf(restoreCtx, "failed to scatter %d: %s", i, pErr.GoError()) + } + + importReq := &roachpb.ImportRequest{ + // Import is a point request because we don't want DistSender to split + // it. Assume (but don't require) the entire post-rewrite span is on the + // same range. + Span: roachpb.Span{Key: newSpan.Key}, + DataSpan: importSpan.Span, + Files: importSpan.files, + Rekeys: rekeys, + } + select { + case <-gCtx.Done(): + return gCtx.Err() + case importRequestsCh <- importReq: + } + } + + return nil + }) + + var importIdx int + for importRequest := range importRequestsCh { + importCtx, importSpan := tracing.ChildSpan(gCtx, "import") + idx := importIdx + importIdx++ + log.VEventf(restoreCtx, 1, "importing %d of %d", idx, len(importSpans)) - g, gCtx := errgroup.WithContext(ctx) - for i := range importRequests { select { case importsSem <- struct{}{}: - case <-ctx.Done(): - return failed, ctx.Err() + case <-gCtx.Done(): + return failed, gCtx.Err() } - log.Eventf(ctx, "importing %d of %d", i+1, len(importRequests)) - - ir := importRequests[i] - idx := i + log.Event(importCtx, "acquired semaphore") g.Go(func() error { - importCtx, span := tracing.ChildSpan(gCtx, "import") - defer tracing.FinishSpan(span) + defer tracing.FinishSpan(importSpan) defer func() { <-importsSem }() - log.Event(importCtx, "acquired semaphore") - res, err := Import(importCtx, db, ir.Key, ir.EndKey, ir.files, kr, rekeys) - if err != nil { - return err + importRes, pErr := client.SendWrapped(importCtx, db.GetSender(), importRequest) + if pErr != nil { + return pErr.GoError() } mu.Lock() - mu.res.Add(res.Imported) + mu.res.Add(importRes.(*roachpb.ImportResponse).Imported) mu.requestsCompleted[idx] = true for j := mu.lowWaterMark + 1; j < len(mu.requestsCompleted) && mu.requestsCompleted[j]; j++ { mu.lowWaterMark = j @@ -849,22 +750,20 @@ func Restore( }) } - log.Event(ctx, "wait for outstanding imports to finish") + log.Event(restoreCtx, "wait for outstanding imports to finish") if err := g.Wait(); err != nil { // This leaves the data that did get imported in case the user wants to // retry. // TODO(dan): Build tooling to allow a user to restart a failed restore. - return failed, errors.Wrapf(err, "importing %d ranges", len(importRequests)) + return failed, errors.Wrapf(err, "importing %d ranges", len(importSpans)) } - log.Event(ctx, "making tables live") + log.Event(restoreCtx, "making tables live") - makeLiveCtx, makeLiveSpan := tracing.ChildSpan(ctx, "make-live") - defer tracing.FinishSpan(makeLiveSpan) // Write the new TableDescriptors and flip the namespace entries over to // them. After this call, any queries on a table will be served by the newly // restored data. - if err := restoreTableDescs(makeLiveCtx, db, tables); err != nil { + if err := restoreTableDescs(restoreCtx, db, tables); err != nil { return failed, errors.Wrapf(err, "restoring %d TableDescriptors", len(tables)) } @@ -873,7 +772,7 @@ func Restore( // everything works but the table data is left abandoned. // Don't need the lock any more; we're the only moving part at this stage. - log.Eventf(ctx, "restore completed: ingested %s of data (before replication) at %s/sec", + log.Eventf(restoreCtx, "restore completed: ingested %s of data (before replication) at %s/sec", humanizeutil.IBytes(mu.res.DataSize), humanizeutil.IBytes(mu.res.DataSize/int64(1+timeutil.Since(tBegin).Seconds())), ) @@ -937,16 +836,14 @@ func restorePlanHook( restore.Options, job, ) - jobCtx, jobSpan := tracing.ChildSpan(ctx, "log-job") - defer tracing.FinishSpan(jobSpan) if err != nil { - job.Failed(jobCtx, err) + job.Failed(ctx, err) return nil, err } - if err := job.Succeeded(jobCtx); err != nil { + if err := job.Succeeded(ctx); err != nil { // An error while marking the job as successful is not important enough to // merit failing the entire restore. - log.Errorf(jobCtx, "RESTORE ignoring error while marking job %d (%s) as successful: %+v", + log.Errorf(ctx, "RESTORE ignoring error while marking job %d (%s) as successful: %+v", job.ID(), description, err) } // TODO(benesch): emit periodic progress updates once we have the diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index 8c78e8012566..5c46d6006c51 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -26,7 +26,6 @@ import ( "fmt" "io" "math" - "math/rand" "strings" "sync" "sync/atomic" @@ -3961,19 +3960,6 @@ func (r *Replica) adminScatter( return nil } - // Sleep for a random duration to avoid dumping too many leases or replicas on - // one underfull store when many ranges are scattered simultaneously. It's - // unfortunate this sleep is server-side instead of client-side, but since - // scatter is the only command that needs it, it's not worth building jitter - // support into DistSender. - const maxJitter = 3 * time.Second - jitter := time.Duration(rand.Int63n(maxJitter.Nanoseconds())) * time.Nanosecond - select { - case <-time.After(jitter): - case <-ctx.Done(): - return roachpb.AdminScatterResponse{}, ctx.Err() - } - if err := refreshDescAndZone(); err != nil { return roachpb.AdminScatterResponse{}, err }