diff --git a/pkg/ccl/backupccl/generative_split_and_scatter_processor.go b/pkg/ccl/backupccl/generative_split_and_scatter_processor.go index f48fed6fba18..2a9831773e58 100644 --- a/pkg/ccl/backupccl/generative_split_and_scatter_processor.go +++ b/pkg/ccl/backupccl/generative_split_and_scatter_processor.go @@ -10,6 +10,7 @@ package backupccl import ( "context" + "hash/fnv" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupencryption" @@ -67,7 +68,7 @@ type generativeSplitAndScatterProcessor struct { doneScatterCh chan entryNode // A cache for routing datums, so only 1 is allocated per node. - routingDatumCache map[roachpb.NodeID]rowenc.EncDatum + routingDatumCache routingDatumCache scatterErr error } @@ -129,7 +130,7 @@ func newGenerativeSplitAndScatterProcessor( // in parallel downstream. It has been verified ad-hoc that this // sizing does not bottleneck restore. doneScatterCh: make(chan entryNode, int(spec.NumNodes)*maxConcurrentRestoreWorkers), - routingDatumCache: make(map[roachpb.NodeID]rowenc.EncDatum), + routingDatumCache: newRoutingDatumCache(), } if err := ssp.Init(ctx, ssp, post, generativeSplitAndScatterOutputTypes, flowCtx, processorID, nil, /* memMonitor */ execinfra.ProcStateOpts{ @@ -161,7 +162,8 @@ func (gssp *generativeSplitAndScatterProcessor) Start(ctx context.Context) { TaskName: "generativeSplitAndScatter-worker", SpanOpt: stop.ChildSpan, }, func(ctx context.Context) { - gssp.scatterErr = runGenerativeSplitAndScatter(scatterCtx, gssp.flowCtx, &gssp.spec, gssp.chunkSplitAndScatterers, gssp.chunkEntrySplitAndScatterers, gssp.doneScatterCh) + gssp.scatterErr = runGenerativeSplitAndScatter(scatterCtx, gssp.flowCtx, &gssp.spec, gssp.chunkSplitAndScatterers, gssp.chunkEntrySplitAndScatterers, gssp.doneScatterCh, + &gssp.routingDatumCache) cancel() close(gssp.doneScatterCh) close(workerDone) @@ -191,10 +193,10 @@ func (gssp *generativeSplitAndScatterProcessor) Next() ( } // The routing datums informs the router which output stream should be used. - routingDatum, ok := gssp.routingDatumCache[scatteredEntry.node] + routingDatum, ok := gssp.routingDatumCache.getRoutingDatum(scatteredEntry.node) if !ok { routingDatum, _ = routingDatumsForSQLInstance(base.SQLInstanceID(scatteredEntry.node)) - gssp.routingDatumCache[scatteredEntry.node] = routingDatum + gssp.routingDatumCache.putRoutingDatum(scatteredEntry.node, routingDatum) } row := rowenc.EncDatumRow{ @@ -263,6 +265,7 @@ func runGenerativeSplitAndScatter( chunkSplitAndScatterers []splitAndScatterer, chunkEntrySplitAndScatterers []splitAndScatterer, doneScatterCh chan<- entryNode, + cache *routingDatumCache, ) error { log.Infof(ctx, "Running generative split and scatter with %d total spans, %d chunk size, %d nodes", spec.NumEntries, spec.ChunkSize, spec.NumNodes) @@ -359,6 +362,8 @@ func runGenerativeSplitAndScatter( for worker := 0; worker < chunkSplitAndScatterWorkers; worker++ { worker := worker g2.GoCtx(func(ctx context.Context) error { + hash := fnv.New32a() + // Chunks' leaseholders should be randomly placed throughout the // cluster. for importSpanChunk := range restoreEntryChunksCh { @@ -375,13 +380,32 @@ func runGenerativeSplitAndScatter( return err } if chunkDestination == 0 { - // If scatter failed to find a node for range ingestion, route the range - // to the node currently running the split and scatter processor. + // If scatter failed to find a node for range ingestion, route the + // range to a random node that has already been scattered to so far. + // The random node is chosen by hashing the scatter key. if nodeID, ok := flowCtx.NodeID.OptionalNodeID(); ok { + cachedNodeIDs := cache.cachedNodeIDs() + if len(cachedNodeIDs) > 0 { + hash.Reset() + if _, err := hash.Write(scatterKey); err != nil { + log.Warningf(ctx, "scatter returned node 0. Route span starting at %s to current node %v because of hash error: %v", + scatterKey, nodeID, err) + } else { + hashedKey := int(hash.Sum32()) + nodeID = cachedNodeIDs[hashedKey%len(cachedNodeIDs)] + } + + log.Warningf(ctx, "scatter returned node 0. "+ + "Random route span starting at %s node %v", scatterKey, nodeID) + } else { + log.Warningf(ctx, "scatter returned node 0. "+ + "Route span starting at %s to current node %v", scatterKey, nodeID) + } chunkDestination = nodeID - log.Warningf(ctx, "scatter returned node 0. "+ - "Route span starting at %s to current node %v", scatterKey, nodeID) } else { + // TODO(rui): OptionalNodeID only returns a node if the sql server runs + // in the same process as the kv server (e.g., not serverless). Figure + // out how to handle this error in serverless restore. log.Warningf(ctx, "scatter returned node 0. "+ "Route span starting at %s to default stream", scatterKey) } @@ -457,6 +481,33 @@ func runGenerativeSplitAndScatter( return g.Wait() } +type routingDatumCache struct { + cache map[roachpb.NodeID]rowenc.EncDatum + nodeIDs []roachpb.NodeID +} + +func (c *routingDatumCache) getRoutingDatum(nodeID roachpb.NodeID) (rowenc.EncDatum, bool) { + d, ok := c.cache[nodeID] + return d, ok +} + +func (c *routingDatumCache) putRoutingDatum(nodeID roachpb.NodeID, datum rowenc.EncDatum) { + if _, ok := c.cache[nodeID]; !ok { + c.nodeIDs = append(c.nodeIDs, nodeID) + } + c.cache[nodeID] = datum +} + +func (c *routingDatumCache) cachedNodeIDs() []roachpb.NodeID { + return c.nodeIDs +} + +func newRoutingDatumCache() routingDatumCache { + return routingDatumCache{ + cache: make(map[roachpb.NodeID]rowenc.EncDatum), + } +} + func init() { rowexec.NewGenerativeSplitAndScatterProcessor = newGenerativeSplitAndScatterProcessor } diff --git a/pkg/ccl/backupccl/generative_split_and_scatter_processor_test.go b/pkg/ccl/backupccl/generative_split_and_scatter_processor_test.go index a19d53bce1d6..6de4fef2cce0 100644 --- a/pkg/ccl/backupccl/generative_split_and_scatter_processor_test.go +++ b/pkg/ccl/backupccl/generative_split_and_scatter_processor_test.go @@ -12,6 +12,7 @@ import ( "context" "testing" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -22,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -111,13 +113,134 @@ func TestRunGenerativeSplitAndScatterContextCancel(t *testing.T) { chunkSplitScatterers := []splitAndScatterer{makeSplitAndScatterer(flowCtx.Cfg.DB.KV(), kr)} chunkEntrySpliterScatterers := []splitAndScatterer{makeSplitAndScatterer(flowCtx.Cfg.DB.KV(), kr)} + cache := routingDatumCache{ + cache: make(map[roachpb.NodeID]rowenc.EncDatum), + } + // Large enough so doneScatterCh never blocks. doneScatterCh := make(chan entryNode, 1000) - err = runGenerativeSplitAndScatter(ctx, &flowCtx, &spec, chunkSplitScatterers, chunkEntrySpliterScatterers, doneScatterCh) + err = runGenerativeSplitAndScatter(ctx, &flowCtx, &spec, chunkSplitScatterers, chunkEntrySpliterScatterers, doneScatterCh, &cache) require.Error(t, err, "context canceled") } +// TestRunGenerativeSplitAndScatterRandomizedDestOnFailScatter verifies that if +// a chunk fails to scatter, then the chunk will be sent to a random destination +// that we've already seen before. +func TestRunGenerativeSplitAndScatterRandomizedDestOnFailScatter(t *testing.T) { + defer leaktest.AfterTest(t)() + + const numAccounts = 1000 + const localFoo = "nodelocal://0/foo" + ctx := context.Background() + tc, sqlDB, _, cleanupFn := backupRestoreTestSetup(t, singleNode, numAccounts, + InitManualReplication) + defer cleanupFn() + + st := cluster.MakeTestingClusterSettings() + evalCtx := eval.MakeTestingEvalContext(st) + evalCtx.NodeID = base.NewSQLIDContainerForNode(tc.Server(0).RPCContext().NodeID) + + testDiskMonitor := execinfra.NewTestDiskMonitor(ctx, st) + defer testDiskMonitor.Stop(ctx) + + s0 := tc.Server(0) + registry := tc.Server(0).JobRegistry().(*jobs.Registry) + execCfg := s0.ExecutorConfig().(sql.ExecutorConfig) + flowCtx := execinfra.FlowCtx{ + Cfg: &execinfra.ServerConfig{ + Settings: st, + DB: s0.InternalDB().(descs.DB), + JobRegistry: registry, + ExecutorConfig: &execCfg, + }, + EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, + DiskMonitor: testDiskMonitor, + NodeID: evalCtx.NodeID, + } + + sqlDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.file_size = '1'`) + sqlDB.Exec(t, `BACKUP INTO $1`, localFoo) + + backups := sqlDB.QueryStr(t, `SHOW BACKUPS IN $1`, localFoo) + require.Equal(t, 1, len(backups)) + uri := localFoo + "/" + backups[0][0] + + codec := keys.MakeSQLCodec(s0.RPCContext().TenantID) + backupTableDesc := desctestutils.TestingGetPublicTableDescriptor(tc.Servers[0].DB(), codec, "data", "bank") + backupStartKey := backupTableDesc.PrimaryIndexSpan(codec).Key + + spec := makeTestingGenerativeSplitAndScatterSpec( + []string{uri}, + []roachpb.Span{{ + Key: backupStartKey, + EndKey: backupStartKey.PrefixEnd(), + }}, + ) + + // These split and scatterers will always fail the scatter and return 0 as the + // chunk destination. + chunkSplitScatterers := []splitAndScatterer{ + &scatterAlwaysFailsSplitScatterer{}, + &scatterAlwaysFailsSplitScatterer{}, + &scatterAlwaysFailsSplitScatterer{}, + } + + // Fake some entries in the routing datum cache for nodes that we've scattered + // to already. + cache := routingDatumCache{ + cache: map[roachpb.NodeID]rowenc.EncDatum{8: {}, 9: {}}, + nodeIDs: []roachpb.NodeID{8, 9}, + } + + // Large enough so doneScatterCh never blocks. + doneScatterCh := make(chan entryNode, 1000) + err := runGenerativeSplitAndScatter(ctx, &flowCtx, &spec, chunkSplitScatterers, chunkSplitScatterers, doneScatterCh, &cache) + require.NoError(t, err) + + close(doneScatterCh) + var doneEntries []entryNode + numEntriesByNode := make(map[roachpb.NodeID]int) + + for e := range doneScatterCh { + doneEntries = append(doneEntries, e) + numEntriesByNode[e.node]++ + } + + // There are at least 10 splits from the original backed up bank table. Plus + // the entries from the system table, etc. Sanity check this. + require.GreaterOrEqual(t, len(doneEntries), 10) + + // The failed scatter chunks should be scattered to the nodes that have been + // scattered to before and cached. + for node, cnt := range numEntriesByNode { + require.Contains(t, cache.nodeIDs, node) + + // Sanity check that we have at least a couple of entries per node. + // This assertion is dependent on the hashing mechanism of the entries + // and can break if it changes. + require.GreaterOrEqual(t, cnt, 2) + } +} + +// scatterAlwaysFailsSplitScatterer always fails the scatter and returns 0 as +// the chunk destination. +type scatterAlwaysFailsSplitScatterer struct { +} + +func (t *scatterAlwaysFailsSplitScatterer) split( + ctx context.Context, codec keys.SQLCodec, splitKey roachpb.Key, +) error { + return nil +} + +func (t *scatterAlwaysFailsSplitScatterer) scatter( + ctx context.Context, codec keys.SQLCodec, scatterKey roachpb.Key, +) (roachpb.NodeID, error) { + return 0, nil +} + func makeTestingGenerativeSplitAndScatterSpec( backupURIs []string, requiredSpans []roachpb.Span, ) execinfrapb.GenerativeSplitAndScatterSpec {