From 545d3f58dcb10e1faf33dcb294cf88cb186c2dc9 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Sun, 19 Mar 2023 12:57:09 +0100 Subject: [PATCH 1/2] upgrades: add a missing unit test My previous change in this area failed to add the unit test. Release note: None --- pkg/upgrade/upgrades/BUILD.bazel | 1 + .../create_task_system_tables_test.go | 63 +++++++++++++++++++ 2 files changed, 64 insertions(+) create mode 100644 pkg/upgrade/upgrades/create_task_system_tables_test.go diff --git a/pkg/upgrade/upgrades/BUILD.bazel b/pkg/upgrade/upgrades/BUILD.bazel index b05acdb18d6f..6a170a3b6d7e 100644 --- a/pkg/upgrade/upgrades/BUILD.bazel +++ b/pkg/upgrade/upgrades/BUILD.bazel @@ -94,6 +94,7 @@ go_test( "create_auto_config_runner_job_test.go", "create_index_usage_statement_statistics_test.go", "create_jobs_metrics_polling_job_test.go", + "create_task_system_tables_test.go", "database_role_settings_table_user_id_migration_test.go", "delete_descriptors_of_dropped_functions_test.go", "desc_id_sequence_for_system_tenant_test.go", diff --git a/pkg/upgrade/upgrades/create_task_system_tables_test.go b/pkg/upgrade/upgrades/create_task_system_tables_test.go new file mode 100644 index 000000000000..b1c6db9bbae1 --- /dev/null +++ b/pkg/upgrade/upgrades/create_task_system_tables_test.go @@ -0,0 +1,63 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package upgrades_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/upgrade/upgrades" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/assert" +) + +func TestTaskTablesMigration(t *testing.T) { + skip.UnderStressRace(t) + defer leaktest.AfterTest(t)() + ctx := context.Background() + + clusterArgs := base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: make(chan struct{}), + BinaryVersionOverride: clusterversion.ByKey( + clusterversion.V23_1_TaskSystemTables - 1), + }, + }, + }, + } + + tc := testcluster.StartTestCluster(t, 1, clusterArgs) + + defer tc.Stopper().Stop(ctx) + db := tc.ServerConn(0) + defer db.Close() + + upgrades.Upgrade( + t, + db, + clusterversion.V23_1_TaskSystemTables, + nil, + false, + ) + + _, err := db.Exec("SELECT * FROM system.task_payloads") + assert.NoError(t, err, "system.task_payloads exists") + + _, err = db.Exec("SELECT * FROM system.tenant_tasks") + assert.NoError(t, err, "system.tenant_tasks exists") +} From 210650d5e7ebc2145409b2fef290597956650daf Mon Sep 17 00:00:00 2001 From: Rui Hu Date: Thu, 23 Feb 2023 14:57:54 -0500 Subject: [PATCH 2/2] backupccl: send chunks with fail scatters to random node in generative ssp For chunks that have failed to scatter, this patch routes the chunk to a random node instead of the current node. This is necessary as prior to the generative version, split and scatter processors were on every node, thus there was no imbalance introduced from routing chunks that have failed to scatter to the current node. The new generative split and scatter processor is only on 1 node, and thus would cause the same node to process all chunks that have failed to scatter. Release note: None --- .../generative_split_and_scatter_processor.go | 69 ++++++++-- ...rative_split_and_scatter_processor_test.go | 125 +++++++++++++++++- 2 files changed, 184 insertions(+), 10 deletions(-) diff --git a/pkg/ccl/backupccl/generative_split_and_scatter_processor.go b/pkg/ccl/backupccl/generative_split_and_scatter_processor.go index f48fed6fba18..2a9831773e58 100644 --- a/pkg/ccl/backupccl/generative_split_and_scatter_processor.go +++ b/pkg/ccl/backupccl/generative_split_and_scatter_processor.go @@ -10,6 +10,7 @@ package backupccl import ( "context" + "hash/fnv" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupencryption" @@ -67,7 +68,7 @@ type generativeSplitAndScatterProcessor struct { doneScatterCh chan entryNode // A cache for routing datums, so only 1 is allocated per node. - routingDatumCache map[roachpb.NodeID]rowenc.EncDatum + routingDatumCache routingDatumCache scatterErr error } @@ -129,7 +130,7 @@ func newGenerativeSplitAndScatterProcessor( // in parallel downstream. It has been verified ad-hoc that this // sizing does not bottleneck restore. doneScatterCh: make(chan entryNode, int(spec.NumNodes)*maxConcurrentRestoreWorkers), - routingDatumCache: make(map[roachpb.NodeID]rowenc.EncDatum), + routingDatumCache: newRoutingDatumCache(), } if err := ssp.Init(ctx, ssp, post, generativeSplitAndScatterOutputTypes, flowCtx, processorID, nil, /* memMonitor */ execinfra.ProcStateOpts{ @@ -161,7 +162,8 @@ func (gssp *generativeSplitAndScatterProcessor) Start(ctx context.Context) { TaskName: "generativeSplitAndScatter-worker", SpanOpt: stop.ChildSpan, }, func(ctx context.Context) { - gssp.scatterErr = runGenerativeSplitAndScatter(scatterCtx, gssp.flowCtx, &gssp.spec, gssp.chunkSplitAndScatterers, gssp.chunkEntrySplitAndScatterers, gssp.doneScatterCh) + gssp.scatterErr = runGenerativeSplitAndScatter(scatterCtx, gssp.flowCtx, &gssp.spec, gssp.chunkSplitAndScatterers, gssp.chunkEntrySplitAndScatterers, gssp.doneScatterCh, + &gssp.routingDatumCache) cancel() close(gssp.doneScatterCh) close(workerDone) @@ -191,10 +193,10 @@ func (gssp *generativeSplitAndScatterProcessor) Next() ( } // The routing datums informs the router which output stream should be used. - routingDatum, ok := gssp.routingDatumCache[scatteredEntry.node] + routingDatum, ok := gssp.routingDatumCache.getRoutingDatum(scatteredEntry.node) if !ok { routingDatum, _ = routingDatumsForSQLInstance(base.SQLInstanceID(scatteredEntry.node)) - gssp.routingDatumCache[scatteredEntry.node] = routingDatum + gssp.routingDatumCache.putRoutingDatum(scatteredEntry.node, routingDatum) } row := rowenc.EncDatumRow{ @@ -263,6 +265,7 @@ func runGenerativeSplitAndScatter( chunkSplitAndScatterers []splitAndScatterer, chunkEntrySplitAndScatterers []splitAndScatterer, doneScatterCh chan<- entryNode, + cache *routingDatumCache, ) error { log.Infof(ctx, "Running generative split and scatter with %d total spans, %d chunk size, %d nodes", spec.NumEntries, spec.ChunkSize, spec.NumNodes) @@ -359,6 +362,8 @@ func runGenerativeSplitAndScatter( for worker := 0; worker < chunkSplitAndScatterWorkers; worker++ { worker := worker g2.GoCtx(func(ctx context.Context) error { + hash := fnv.New32a() + // Chunks' leaseholders should be randomly placed throughout the // cluster. for importSpanChunk := range restoreEntryChunksCh { @@ -375,13 +380,32 @@ func runGenerativeSplitAndScatter( return err } if chunkDestination == 0 { - // If scatter failed to find a node for range ingestion, route the range - // to the node currently running the split and scatter processor. + // If scatter failed to find a node for range ingestion, route the + // range to a random node that has already been scattered to so far. + // The random node is chosen by hashing the scatter key. if nodeID, ok := flowCtx.NodeID.OptionalNodeID(); ok { + cachedNodeIDs := cache.cachedNodeIDs() + if len(cachedNodeIDs) > 0 { + hash.Reset() + if _, err := hash.Write(scatterKey); err != nil { + log.Warningf(ctx, "scatter returned node 0. Route span starting at %s to current node %v because of hash error: %v", + scatterKey, nodeID, err) + } else { + hashedKey := int(hash.Sum32()) + nodeID = cachedNodeIDs[hashedKey%len(cachedNodeIDs)] + } + + log.Warningf(ctx, "scatter returned node 0. "+ + "Random route span starting at %s node %v", scatterKey, nodeID) + } else { + log.Warningf(ctx, "scatter returned node 0. "+ + "Route span starting at %s to current node %v", scatterKey, nodeID) + } chunkDestination = nodeID - log.Warningf(ctx, "scatter returned node 0. "+ - "Route span starting at %s to current node %v", scatterKey, nodeID) } else { + // TODO(rui): OptionalNodeID only returns a node if the sql server runs + // in the same process as the kv server (e.g., not serverless). Figure + // out how to handle this error in serverless restore. log.Warningf(ctx, "scatter returned node 0. "+ "Route span starting at %s to default stream", scatterKey) } @@ -457,6 +481,33 @@ func runGenerativeSplitAndScatter( return g.Wait() } +type routingDatumCache struct { + cache map[roachpb.NodeID]rowenc.EncDatum + nodeIDs []roachpb.NodeID +} + +func (c *routingDatumCache) getRoutingDatum(nodeID roachpb.NodeID) (rowenc.EncDatum, bool) { + d, ok := c.cache[nodeID] + return d, ok +} + +func (c *routingDatumCache) putRoutingDatum(nodeID roachpb.NodeID, datum rowenc.EncDatum) { + if _, ok := c.cache[nodeID]; !ok { + c.nodeIDs = append(c.nodeIDs, nodeID) + } + c.cache[nodeID] = datum +} + +func (c *routingDatumCache) cachedNodeIDs() []roachpb.NodeID { + return c.nodeIDs +} + +func newRoutingDatumCache() routingDatumCache { + return routingDatumCache{ + cache: make(map[roachpb.NodeID]rowenc.EncDatum), + } +} + func init() { rowexec.NewGenerativeSplitAndScatterProcessor = newGenerativeSplitAndScatterProcessor } diff --git a/pkg/ccl/backupccl/generative_split_and_scatter_processor_test.go b/pkg/ccl/backupccl/generative_split_and_scatter_processor_test.go index a19d53bce1d6..6de4fef2cce0 100644 --- a/pkg/ccl/backupccl/generative_split_and_scatter_processor_test.go +++ b/pkg/ccl/backupccl/generative_split_and_scatter_processor_test.go @@ -12,6 +12,7 @@ import ( "context" "testing" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -22,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -111,13 +113,134 @@ func TestRunGenerativeSplitAndScatterContextCancel(t *testing.T) { chunkSplitScatterers := []splitAndScatterer{makeSplitAndScatterer(flowCtx.Cfg.DB.KV(), kr)} chunkEntrySpliterScatterers := []splitAndScatterer{makeSplitAndScatterer(flowCtx.Cfg.DB.KV(), kr)} + cache := routingDatumCache{ + cache: make(map[roachpb.NodeID]rowenc.EncDatum), + } + // Large enough so doneScatterCh never blocks. doneScatterCh := make(chan entryNode, 1000) - err = runGenerativeSplitAndScatter(ctx, &flowCtx, &spec, chunkSplitScatterers, chunkEntrySpliterScatterers, doneScatterCh) + err = runGenerativeSplitAndScatter(ctx, &flowCtx, &spec, chunkSplitScatterers, chunkEntrySpliterScatterers, doneScatterCh, &cache) require.Error(t, err, "context canceled") } +// TestRunGenerativeSplitAndScatterRandomizedDestOnFailScatter verifies that if +// a chunk fails to scatter, then the chunk will be sent to a random destination +// that we've already seen before. +func TestRunGenerativeSplitAndScatterRandomizedDestOnFailScatter(t *testing.T) { + defer leaktest.AfterTest(t)() + + const numAccounts = 1000 + const localFoo = "nodelocal://0/foo" + ctx := context.Background() + tc, sqlDB, _, cleanupFn := backupRestoreTestSetup(t, singleNode, numAccounts, + InitManualReplication) + defer cleanupFn() + + st := cluster.MakeTestingClusterSettings() + evalCtx := eval.MakeTestingEvalContext(st) + evalCtx.NodeID = base.NewSQLIDContainerForNode(tc.Server(0).RPCContext().NodeID) + + testDiskMonitor := execinfra.NewTestDiskMonitor(ctx, st) + defer testDiskMonitor.Stop(ctx) + + s0 := tc.Server(0) + registry := tc.Server(0).JobRegistry().(*jobs.Registry) + execCfg := s0.ExecutorConfig().(sql.ExecutorConfig) + flowCtx := execinfra.FlowCtx{ + Cfg: &execinfra.ServerConfig{ + Settings: st, + DB: s0.InternalDB().(descs.DB), + JobRegistry: registry, + ExecutorConfig: &execCfg, + }, + EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, + DiskMonitor: testDiskMonitor, + NodeID: evalCtx.NodeID, + } + + sqlDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.file_size = '1'`) + sqlDB.Exec(t, `BACKUP INTO $1`, localFoo) + + backups := sqlDB.QueryStr(t, `SHOW BACKUPS IN $1`, localFoo) + require.Equal(t, 1, len(backups)) + uri := localFoo + "/" + backups[0][0] + + codec := keys.MakeSQLCodec(s0.RPCContext().TenantID) + backupTableDesc := desctestutils.TestingGetPublicTableDescriptor(tc.Servers[0].DB(), codec, "data", "bank") + backupStartKey := backupTableDesc.PrimaryIndexSpan(codec).Key + + spec := makeTestingGenerativeSplitAndScatterSpec( + []string{uri}, + []roachpb.Span{{ + Key: backupStartKey, + EndKey: backupStartKey.PrefixEnd(), + }}, + ) + + // These split and scatterers will always fail the scatter and return 0 as the + // chunk destination. + chunkSplitScatterers := []splitAndScatterer{ + &scatterAlwaysFailsSplitScatterer{}, + &scatterAlwaysFailsSplitScatterer{}, + &scatterAlwaysFailsSplitScatterer{}, + } + + // Fake some entries in the routing datum cache for nodes that we've scattered + // to already. + cache := routingDatumCache{ + cache: map[roachpb.NodeID]rowenc.EncDatum{8: {}, 9: {}}, + nodeIDs: []roachpb.NodeID{8, 9}, + } + + // Large enough so doneScatterCh never blocks. + doneScatterCh := make(chan entryNode, 1000) + err := runGenerativeSplitAndScatter(ctx, &flowCtx, &spec, chunkSplitScatterers, chunkSplitScatterers, doneScatterCh, &cache) + require.NoError(t, err) + + close(doneScatterCh) + var doneEntries []entryNode + numEntriesByNode := make(map[roachpb.NodeID]int) + + for e := range doneScatterCh { + doneEntries = append(doneEntries, e) + numEntriesByNode[e.node]++ + } + + // There are at least 10 splits from the original backed up bank table. Plus + // the entries from the system table, etc. Sanity check this. + require.GreaterOrEqual(t, len(doneEntries), 10) + + // The failed scatter chunks should be scattered to the nodes that have been + // scattered to before and cached. + for node, cnt := range numEntriesByNode { + require.Contains(t, cache.nodeIDs, node) + + // Sanity check that we have at least a couple of entries per node. + // This assertion is dependent on the hashing mechanism of the entries + // and can break if it changes. + require.GreaterOrEqual(t, cnt, 2) + } +} + +// scatterAlwaysFailsSplitScatterer always fails the scatter and returns 0 as +// the chunk destination. +type scatterAlwaysFailsSplitScatterer struct { +} + +func (t *scatterAlwaysFailsSplitScatterer) split( + ctx context.Context, codec keys.SQLCodec, splitKey roachpb.Key, +) error { + return nil +} + +func (t *scatterAlwaysFailsSplitScatterer) scatter( + ctx context.Context, codec keys.SQLCodec, scatterKey roachpb.Key, +) (roachpb.NodeID, error) { + return 0, nil +} + func makeTestingGenerativeSplitAndScatterSpec( backupURIs []string, requiredSpans []roachpb.Span, ) execinfrapb.GenerativeSplitAndScatterSpec {