diff --git a/pkg/ccl/sqlccl/backup.go b/pkg/ccl/sqlccl/backup.go index 1880591bf2ff..29a4ec781e6e 100644 --- a/pkg/ccl/sqlccl/backup.go +++ b/pkg/ccl/sqlccl/backup.go @@ -109,7 +109,7 @@ func ValidatePreviousBackups(ctx context.Context, uris []string) (hlc.Timestamp, // This reuses Restore's logic for lining up all the start and end // timestamps to validate the previous backups that this one is incremental // from. - _, endTime, err := makeImportRequests(nil, backups) + _, endTime, err := makeImportSpans(nil, backups) return endTime, err } diff --git a/pkg/ccl/sqlccl/backup_test.go b/pkg/ccl/sqlccl/backup_test.go index e90d2e193863..c55feda7c239 100644 --- a/pkg/ccl/sqlccl/backup_test.go +++ b/pkg/ccl/sqlccl/backup_test.go @@ -37,7 +37,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/sqlccl" "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl/sampledataccl" - "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" @@ -49,7 +48,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util" - "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -1481,64 +1479,6 @@ func TestTimestampMismatch(t *testing.T) { }) } -func TestPresplitRanges(t *testing.T) { - defer leaktest.AfterTest(t)() - - ctx, _, tc, _, cleanupFn := backupRestoreTestSetup(t, multiNode, 0) - defer cleanupFn() - kvDB := tc.Server(0).KVClient().(*client.DB) - - numRangesTests := []int{0, 1, 2, 3, 4, 10} - for testNum, numRanges := range numRangesTests { - t.Run(strconv.Itoa(numRanges), func(t *testing.T) { - baseKey := keys.MakeTablePrefix(uint32(keys.MaxReservedDescID + testNum)) - var splitPoints []roachpb.Key - for i := 0; i < numRanges; i++ { - key := encoding.EncodeUvarintAscending(append([]byte(nil), baseKey...), uint64(i)) - splitPoints = append(splitPoints, key) - } - if err := sqlccl.PresplitRanges(ctx, *kvDB, splitPoints); err != nil { - t.Error(err) - } - - // Verify that the splits exist. - // Note that PresplitRanges adds the row sentinel to make a valid table - // key, but AdminSplit internally removes it (via EnsureSafeSplitKey). So - // we expect splits that match the splitPoints exactly. - for _, splitKey := range splitPoints { - // Scan the meta range for splitKey. - rk, err := keys.Addr(splitKey) - if err != nil { - t.Fatal(err) - } - - startKey := keys.RangeMetaKey(rk) - endKey := keys.Meta2Prefix.PrefixEnd() - - kvs, err := kvDB.Scan(context.Background(), startKey, endKey, 1) - if err != nil { - t.Fatal(err) - } - if len(kvs) != 1 { - t.Fatalf("expected 1 KV, got %v", kvs) - } - desc := &roachpb.RangeDescriptor{} - if err := kvs[0].ValueProto(desc); err != nil { - t.Fatal(err) - } - if !desc.EndKey.Equal(rk) { - t.Errorf( - "missing split %s: range %s to %s", - keys.PrettyPrint(splitKey), - keys.PrettyPrint(desc.StartKey.AsRawKey()), - keys.PrettyPrint(desc.EndKey.AsRawKey()), - ) - } - } - }) - } -} - func TestBackupLevelDB(t *testing.T) { defer leaktest.AfterTest(t)() diff --git a/pkg/ccl/sqlccl/restore.go b/pkg/ccl/sqlccl/restore.go index 9129cb356e3d..4734fec03fa9 100644 --- a/pkg/ccl/sqlccl/restore.go +++ b/pkg/ccl/sqlccl/restore.go @@ -14,13 +14,11 @@ import ( "github.com/pkg/errors" "golang.org/x/net/context" "golang.org/x/sync/errgroup" - "golang.org/x/time/rate" "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl/intervalccl" "github.com/cockroachdb/cockroach/pkg/internal/client" - "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/jobs" @@ -41,57 +39,6 @@ const ( restoreOptSkipMissingFKs = "skip_missing_foreign_keys" ) -// Import loads some data in sstables into an empty range. Only the keys between -// startKey and endKey are loaded. Every row's key is rewritten to be for -// newTableID. -func Import( - ctx context.Context, - db client.DB, - startKey, endKey roachpb.Key, - files []roachpb.ImportRequest_File, - kr *storageccl.KeyRewriter, - rekeys []roachpb.ImportRequest_TableRekey, -) (*roachpb.ImportResponse, error) { - var newStartKey, newEndKey roachpb.Key - { - var ok bool - newStartKey, ok, _ = kr.RewriteKey(append([]byte(nil), startKey...)) - if !ok { - return nil, errors.Errorf("could not rewrite key: %s", newStartKey) - } - newEndKey, ok, _ = kr.RewriteKey(append([]byte(nil), endKey...)) - if !ok { - return nil, errors.Errorf("could not rewrite key: %s", newEndKey) - } - } - - if log.V(1) { - log.Infof(ctx, "import [%s,%s) (%d files)", newStartKey, newEndKey, len(files)) - } - if len(files) == 0 { - return &roachpb.ImportResponse{}, nil - } - - req := &roachpb.ImportRequest{ - // Import is a point request because we don't want DistSender to split - // it. Assume (but don't require) the entire post-rewrite span is on the - // same range. - Span: roachpb.Span{Key: newStartKey}, - DataSpan: roachpb.Span{ - Key: startKey, - EndKey: endKey, - }, - Files: files, - Rekeys: rekeys, - } - res, pErr := client.SendWrapped(ctx, db.GetSender(), req) - if pErr != nil { - return nil, pErr.GoError() - } - - return res.(*roachpb.ImportResponse), nil -} - func loadBackupDescs(ctx context.Context, uris []string) ([]BackupDescriptor, error) { backupDescs := make([]BackupDescriptor, len(uris)) @@ -331,7 +278,7 @@ type intervalSpan roachpb.Span var _ interval.Interface = intervalSpan{} -// ID is part of `interval.Interface` but unused in makeImportRequests. +// ID is part of `interval.Interface` but unused in makeImportSpans. func (ie intervalSpan) ID() uintptr { return 0 } // Range is part of `interval.Interface`. @@ -363,8 +310,8 @@ type importEntry struct { files []roachpb.ImportRequest_File } -// makeImportRequests pivots the backups, which are grouped by time, into -// requests for import, which are grouped by keyrange. +// makeImportSpans pivots the backups, which are grouped by time, into +// spans for import, which are grouped by keyrange. // // The core logic of this is in OverlapCoveringMerge, which accepts sets of // non-overlapping key ranges (aka coverings) each with a payload, and returns @@ -383,13 +330,13 @@ type importEntry struct { // - [B, C) -> /file1, /file4, requested (note that file1 was split into two ranges) // - [C, D) -> /file2, /file5, requested // -// This would be turned into two Import requests, one restoring [B, C) out of +// This would be turned into two Import spans, one restoring [B, C) out of // /file1 and /file3, the other restoring [C, D) out of /file2 and /file5. // Nothing is restored out of /file3 and only part of /file1 is used. // // NB: All grouping operates in the pre-rewrite keyspace, meaning the keyranges // as they were backed up, not as they're being restored. -func makeImportRequests( +func makeImportSpans( tableSpans []roachpb.Span, backups []BackupDescriptor, ) ([]importEntry, hlc.Timestamp, error) { // Put the merged table data covering first into the OverlapCoveringMerge @@ -497,65 +444,6 @@ func makeImportRequests( return requestEntries, maxEndTime, nil } -// PresplitRanges concurrently creates the splits described by `input`. It does -// this by finding the middle key, splitting and recursively presplitting the -// resulting left and right hand ranges. NB: The split code assumes that the LHS -// of the resulting ranges is the smaller, so normally you'd split from the -// left, but this method should only be called on empty keyranges, so it's okay. -// -// The `input` parameter expected to be sorted. -func PresplitRanges(baseCtx context.Context, db client.DB, input []roachpb.Key) error { - // TODO(dan): This implementation does nothing to control the maximum - // parallelization or number of goroutines spawned. Revisit (possibly via a - // semaphore) if this becomes a problem in practice. - - ctx, span := tracing.ChildSpan(baseCtx, "PresplitRanges") - defer tracing.FinishSpan(span) - log.Infof(ctx, "presplitting %d ranges", len(input)) - - if len(input) == 0 { - return nil - } - - // 100 was picked because it's small enough that a ~16000 presplit restore - // finishes smoothly on a 4-node azure production cluster. - // - // TODO(dan): See if there's some better solution #14798. - const splitsPerSecond, splitsBurst = 100, 1 - limiter := rate.NewLimiter(splitsPerSecond, splitsBurst) - - g, ctx := errgroup.WithContext(ctx) - var splitFn func([]roachpb.Key) error - splitFn = func(splitPoints []roachpb.Key) error { - if err := limiter.Wait(ctx); err != nil { - return err - } - - // Pick the index such that it's 0 if len(splitPoints) == 1. - splitIdx := len(splitPoints) / 2 - if err := db.AdminSplit(ctx, splitPoints[splitIdx], splitPoints[splitIdx]); err != nil { - return err - } - - splitPointsLeft, splitPointsRight := splitPoints[:splitIdx], splitPoints[splitIdx+1:] - if len(splitPointsLeft) > 0 { - g.Go(func() error { - return splitFn(splitPointsLeft) - }) - } - if len(splitPointsRight) > 0 { - // Save a few goroutines by reusing this one. - return splitFn(splitPointsRight) - } - return nil - } - - g.Go(func() error { - return splitFn(input) - }) - return g.Wait() -} - // Write the new descriptors. First the ID -> TableDescriptor for the new table, // then flip (or initialize) the name -> ID entry so any new queries will use // the new one. @@ -604,7 +492,7 @@ func restoreJobDescription(restore *parser.Restore, from []string) (string, erro // restore imports a SQL table (or tables) from sets of non-overlapping sstable // files. func restore( - ctx context.Context, + restoreCtx context.Context, p sql.PlanHookState, uris []string, targets parser.TargetList, @@ -624,7 +512,7 @@ func restore( // A note about contexts and spans in this method: the top-level context // `ctx` is used for orchestration logging. All operations that carry out // work get their individual contexts. - initCtx, initSpan := tracing.ChildSpan(ctx, "init") + initCtx, initSpan := tracing.ChildSpan(restoreCtx, "init") defer func() { tracing.FinishSpan(initSpan) // want late binding }() @@ -657,12 +545,12 @@ func restore( } } - log.Eventf(ctx, "starting restore for %d tables", len(tables)) + log.Eventf(restoreCtx, "starting restore for %d tables", len(tables)) // Fail fast if the necessary databases don't exist since the below logic // leaks table IDs when Restore fails. - if err := db.Txn(initCtx, func(ctx context.Context, txn *client.Txn) error { - return reassignParentIDs(ctx, txn, p, databasesByID, tables, opt) + if err := db.Txn(initCtx, func(txnCtx context.Context, txn *client.Txn) error { + return reassignParentIDs(txnCtx, txn, p, databasesByID, tables, opt) }); err != nil { return failed, err } @@ -686,7 +574,7 @@ func restore( // Pivot the backups, which are grouped by time, into requests for import, // which are grouped by keyrange. - importRequests, _, err := makeImportRequests(spans, backupDescs) + importSpans, _, err := makeImportSpans(spans, backupDescs) if err != nil { return failed, errors.Wrapf(err, "making import requests for %d backups", len(backupDescs)) } @@ -704,59 +592,34 @@ func restore( tracing.FinishSpan(initSpan) initCtx, initSpan = nil, nil - splitCtx, splitSpan := tracing.ChildSpan(ctx, "presplit") - defer func() { - tracing.FinishSpan(splitSpan) // want late binding - }() - - // The Import (and resulting WriteBatch) requests made below run on - // leaseholders, so presplit the ranges to balance the work among many - // nodes - splitKeys := make([]roachpb.Key, len(importRequests)+1) - for i, r := range importRequests { - var ok bool - splitKeys[i], ok, _ = kr.RewriteKey(append([]byte(nil), r.Key...)) - if !ok { - return failed, errors.Errorf("failed to rewrite key: %s", r.Key) - } - } - // Split at the end of the last table; otherwise, the last range will span - // to MaxKey and potentially contain data, which breaks scatter. - var maxID sqlbase.ID - for _, id := range newTableIDs { - if id > maxID { - maxID = id - } - } - splitKeys[len(splitKeys)-1] = keys.MakeTablePrefix(uint32(maxID + 1)) - if err := PresplitRanges(splitCtx, db, splitKeys); err != nil { - return failed, errors.Wrapf(err, "presplitting %d ranges", len(importRequests)) + mu := struct { + syncutil.Mutex + res roachpb.BulkOpSummary + requestsCompleted []bool + lowWaterMark int + }{ + requestsCompleted: make([]bool, len(importSpans)), + lowWaterMark: -1, } - log.Eventf(ctx, "presplit ranges along %d keys", len(splitKeys)) - tracing.FinishSpan(splitSpan) - splitCtx, splitSpan = nil, nil - { - newSpans := spansForAllTableIndexes(tables) - g, gCtx := errgroup.WithContext(ctx) - for i := range newSpans { - span := newSpans[i] - g.Go(func() error { - scatterCtx, sp := tracing.ChildSpan(gCtx, "scatter") - defer tracing.FinishSpan(sp) - - req := &roachpb.AdminScatterRequest{ - Span: roachpb.Span{Key: span.Key, EndKey: span.EndKey}, + progressLogger := jobProgressLogger{ + job: job, + totalChunks: len(importSpans), + progressedFn: func(progressedCtx context.Context, details interface{}) { + switch d := details.(type) { + case *jobs.Payload_Restore: + mu.Lock() + if mu.lowWaterMark >= 0 { + d.Restore.LowWaterMark = importSpans[mu.lowWaterMark].Key } - _, pErr := client.SendWrapped(scatterCtx, db.GetSender(), req) - return pErr.GoError() - }) - } - if err := g.Wait(); err != nil { - log.Errorf(ctx, "failed scattering %d ranges: %s", len(importRequests), err) - } - log.Eventf(ctx, "scattered lease holders for %d key spans", len(newSpans)) + mu.Unlock() + default: + log.Errorf(progressedCtx, "job payload had unexpected type %T", d) + } + }, } + progressCtx, progressSpan := tracing.ChildSpan(restoreCtx, "progress-log") + defer tracing.FinishSpan(progressSpan) // We're already limiting these on the server-side, but sending all the // Import requests at once would fill up distsender/grpc/something and cause @@ -775,64 +638,102 @@ func restore( maxConcurrentImports := clusterNodeCount(p.ExecCfg().Gossip) * runtime.NumCPU() importsSem := make(chan struct{}, maxConcurrentImports) - log.Eventf(ctx, "commencing import of data with concurrency %d", maxConcurrentImports) + log.Eventf(restoreCtx, "commencing import of data with concurrency %d", maxConcurrentImports) tBegin := timeutil.Now() - mu := struct { - syncutil.Mutex - res roachpb.BulkOpSummary - requestsCompleted []bool - lowWaterMark int - }{ - requestsCompleted: make([]bool, len(importRequests)), - lowWaterMark: -1, - } - - progressLogger := jobProgressLogger{ - job: job, - totalChunks: len(importRequests), - progressedFn: func(ctx context.Context, details interface{}) { - switch d := details.(type) { - case *jobs.Payload_Restore: - mu.Lock() - if mu.lowWaterMark >= 0 { - d.Restore.LowWaterMark = importRequests[mu.lowWaterMark].Key + // We're about to start off one goroutine that serially presplits & scatters + // each import span. Once split and scattered, the span is submitted to + // importRequestsCh to indicate it's ready for Import. Since import is so + // much slower, we buffer the channel to keep the split/scatter work from + // getting too far ahead. This both naturally rate limits the split/scatters + // and bounds the number of empty ranges crated if the RESTORE fails (or is + // cancelled). + const presplitLeadLimit = 10 + importRequestsCh := make(chan *roachpb.ImportRequest, presplitLeadLimit) + + g, gCtx := errgroup.WithContext(restoreCtx) + g.Go(func() error { + splitScatterCtx, splitScatterSpan := tracing.ChildSpan(gCtx, "presplit-scatter") + defer tracing.FinishSpan(splitScatterSpan) + defer close(importRequestsCh) + + // The Import (and resulting AddSSTable) requests made below run on + // leaseholders, so presplit and scatter the ranges to balance the work + // among many nodes. + for i, importSpan := range importSpans { + var newSpan roachpb.Span + { + var ok bool + newSpan.Key, ok, _ = kr.RewriteKey(append([]byte(nil), importSpan.Key...)) + if !ok { + return errors.Errorf("could not rewrite key: %s", importSpan.Key) + } + newSpan.EndKey, ok, _ = kr.RewriteKey(append([]byte(nil), importSpan.EndKey...)) + if !ok { + return errors.Errorf("could not rewrite key: %s", importSpan.EndKey) } - mu.Unlock() - default: - log.Errorf(ctx, "job payload had unexpected type %T", d) } - }, - } - progressCtx, progressSpan := tracing.ChildSpan(ctx, "progress-log") - defer tracing.FinishSpan(progressSpan) + log.VEventf(restoreCtx, 1, "presplitting %d of %d", i+1, len(importSpans)) + if err := db.AdminSplit(splitScatterCtx, newSpan.Key, newSpan.Key); err != nil { + return err + } + + log.VEventf(restoreCtx, 1, "scattering %d of %d", i+1, len(importSpans)) + scatterReq := &roachpb.AdminScatterRequest{Span: newSpan} + if _, pErr := client.SendWrapped(splitScatterCtx, db.GetSender(), scatterReq); pErr != nil { + // TODO(dan): Unfortunately, Scatter is still too unreliable to + // fail the RESTORE when Scatter fails. I'm uncomfortable that + // this could break entirely and not start failing the tests, + // but on the bright side, it doesn't affect correctness, only + // throughput. + log.Errorf(restoreCtx, "failed to scatter %d: %s", i, pErr.GoError()) + } + + importReq := &roachpb.ImportRequest{ + // Import is a point request because we don't want DistSender to split + // it. Assume (but don't require) the entire post-rewrite span is on the + // same range. + Span: roachpb.Span{Key: newSpan.Key}, + DataSpan: importSpan.Span, + Files: importSpan.files, + Rekeys: rekeys, + } + select { + case <-gCtx.Done(): + return gCtx.Err() + case importRequestsCh <- importReq: + } + } + + return nil + }) + + var importIdx int + for importRequest := range importRequestsCh { + importCtx, importSpan := tracing.ChildSpan(gCtx, "import") + idx := importIdx + importIdx++ + log.VEventf(restoreCtx, 1, "importing %d of %d", idx, len(importSpans)) - g, gCtx := errgroup.WithContext(ctx) - for i := range importRequests { select { case importsSem <- struct{}{}: - case <-ctx.Done(): - return failed, ctx.Err() + case <-gCtx.Done(): + return failed, gCtx.Err() } - log.Eventf(ctx, "importing %d of %d", i+1, len(importRequests)) - - ir := importRequests[i] - idx := i + log.Event(importCtx, "acquired semaphore") g.Go(func() error { - importCtx, span := tracing.ChildSpan(gCtx, "import") - defer tracing.FinishSpan(span) + defer tracing.FinishSpan(importSpan) defer func() { <-importsSem }() - log.Event(importCtx, "acquired semaphore") - res, err := Import(importCtx, db, ir.Key, ir.EndKey, ir.files, kr, rekeys) - if err != nil { - return err + importRes, pErr := client.SendWrapped(importCtx, db.GetSender(), importRequest) + if pErr != nil { + return pErr.GoError() } mu.Lock() - mu.res.Add(res.Imported) + mu.res.Add(importRes.(*roachpb.ImportResponse).Imported) mu.requestsCompleted[idx] = true for j := mu.lowWaterMark + 1; j < len(mu.requestsCompleted) && mu.requestsCompleted[j]; j++ { mu.lowWaterMark = j @@ -849,22 +750,20 @@ func restore( }) } - log.Event(ctx, "wait for outstanding imports to finish") + log.Event(restoreCtx, "wait for outstanding imports to finish") if err := g.Wait(); err != nil { // This leaves the data that did get imported in case the user wants to // retry. // TODO(dan): Build tooling to allow a user to restart a failed restore. - return failed, errors.Wrapf(err, "importing %d ranges", len(importRequests)) + return failed, errors.Wrapf(err, "importing %d ranges", len(importSpans)) } - log.Event(ctx, "making tables live") + log.Event(restoreCtx, "making tables live") - makeLiveCtx, makeLiveSpan := tracing.ChildSpan(ctx, "make-live") - defer tracing.FinishSpan(makeLiveSpan) // Write the new TableDescriptors and flip the namespace entries over to // them. After this call, any queries on a table will be served by the newly // restored data. - if err := restoreTableDescs(makeLiveCtx, db, tables); err != nil { + if err := restoreTableDescs(restoreCtx, db, tables); err != nil { return failed, errors.Wrapf(err, "restoring %d TableDescriptors", len(tables)) } @@ -873,7 +772,7 @@ func restore( // everything works but the table data is left abandoned. // Don't need the lock any more; we're the only moving part at this stage. - log.Eventf(ctx, "restore completed: ingested %s of data (before replication) at %s/sec", + log.Eventf(restoreCtx, "restore completed: ingested %s of data (before replication) at %s/sec", humanizeutil.IBytes(mu.res.DataSize), humanizeutil.IBytes(mu.res.DataSize/int64(1+timeutil.Since(tBegin).Seconds())), ) @@ -937,16 +836,14 @@ func restorePlanHook( restoreStmt.Options, job, ) - jobCtx, jobSpan := tracing.ChildSpan(ctx, "log-job") - defer tracing.FinishSpan(jobSpan) if err != nil { - job.Failed(jobCtx, err) + job.Failed(ctx, err) return nil, err } - if err := job.Succeeded(jobCtx); err != nil { + if err := job.Succeeded(ctx); err != nil { // An error while marking the job as successful is not important enough to // merit failing the entire restore. - log.Errorf(jobCtx, "RESTORE ignoring error while marking job %d (%s) as successful: %+v", + log.Errorf(ctx, "RESTORE ignoring error while marking job %d (%s) as successful: %+v", job.ID(), description, err) } // TODO(benesch): emit periodic progress updates once we have the diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index 8c78e8012566..5c46d6006c51 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -26,7 +26,6 @@ import ( "fmt" "io" "math" - "math/rand" "strings" "sync" "sync/atomic" @@ -3961,19 +3960,6 @@ func (r *Replica) adminScatter( return nil } - // Sleep for a random duration to avoid dumping too many leases or replicas on - // one underfull store when many ranges are scattered simultaneously. It's - // unfortunate this sleep is server-side instead of client-side, but since - // scatter is the only command that needs it, it's not worth building jitter - // support into DistSender. - const maxJitter = 3 * time.Second - jitter := time.Duration(rand.Int63n(maxJitter.Nanoseconds())) * time.Nanosecond - select { - case <-time.After(jitter): - case <-ctx.Done(): - return roachpb.AdminScatterResponse{}, ctx.Err() - } - if err := refreshDescAndZone(); err != nil { return roachpb.AdminScatterResponse{}, err }