Skip to content

Commit

Permalink
Merge #97589 #98978
Browse files Browse the repository at this point in the history
97589: backupccl: send chunks with fail scatters to random node in generative ssp r=rhu713 a=rhu713

For chunks that have failed to scatter, this patch routes the chunk to a
random node instead of the current node. This is necessary as prior to the
generative version, split and scatter processors were on every node, thus there
was no imbalance introduced from routing chunks that have failed to scatter to
the current node. The new generative split and scatter processor is only on 1
node, and thus would cause the same node to process all chunks that have failed
to scatter.

Addresses run 6 and 9 of #99206 

Release note: None

98978: upgrades: add a missing unit test r=adityamaru a=knz

My previous change in this area failed to add the unit test.

Epic: CRDB-23559
Release note: None

Co-authored-by: Rui Hu <[email protected]>
Co-authored-by: Raphael 'kena' Poss <[email protected]>
  • Loading branch information
3 people committed Apr 3, 2023
3 parents 82284bd + 210650d + 545d3f5 commit 66dfb23
Show file tree
Hide file tree
Showing 4 changed files with 248 additions and 10 deletions.
69 changes: 60 additions & 9 deletions pkg/ccl/backupccl/generative_split_and_scatter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package backupccl

import (
"context"
"hash/fnv"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupencryption"
Expand Down Expand Up @@ -67,7 +68,7 @@ type generativeSplitAndScatterProcessor struct {

doneScatterCh chan entryNode
// A cache for routing datums, so only 1 is allocated per node.
routingDatumCache map[roachpb.NodeID]rowenc.EncDatum
routingDatumCache routingDatumCache
scatterErr error
}

Expand Down Expand Up @@ -129,7 +130,7 @@ func newGenerativeSplitAndScatterProcessor(
// in parallel downstream. It has been verified ad-hoc that this
// sizing does not bottleneck restore.
doneScatterCh: make(chan entryNode, int(spec.NumNodes)*maxConcurrentRestoreWorkers),
routingDatumCache: make(map[roachpb.NodeID]rowenc.EncDatum),
routingDatumCache: newRoutingDatumCache(),
}
if err := ssp.Init(ctx, ssp, post, generativeSplitAndScatterOutputTypes, flowCtx, processorID, nil, /* memMonitor */
execinfra.ProcStateOpts{
Expand Down Expand Up @@ -161,7 +162,8 @@ func (gssp *generativeSplitAndScatterProcessor) Start(ctx context.Context) {
TaskName: "generativeSplitAndScatter-worker",
SpanOpt: stop.ChildSpan,
}, func(ctx context.Context) {
gssp.scatterErr = runGenerativeSplitAndScatter(scatterCtx, gssp.flowCtx, &gssp.spec, gssp.chunkSplitAndScatterers, gssp.chunkEntrySplitAndScatterers, gssp.doneScatterCh)
gssp.scatterErr = runGenerativeSplitAndScatter(scatterCtx, gssp.flowCtx, &gssp.spec, gssp.chunkSplitAndScatterers, gssp.chunkEntrySplitAndScatterers, gssp.doneScatterCh,
&gssp.routingDatumCache)
cancel()
close(gssp.doneScatterCh)
close(workerDone)
Expand Down Expand Up @@ -191,10 +193,10 @@ func (gssp *generativeSplitAndScatterProcessor) Next() (
}

// The routing datums informs the router which output stream should be used.
routingDatum, ok := gssp.routingDatumCache[scatteredEntry.node]
routingDatum, ok := gssp.routingDatumCache.getRoutingDatum(scatteredEntry.node)
if !ok {
routingDatum, _ = routingDatumsForSQLInstance(base.SQLInstanceID(scatteredEntry.node))
gssp.routingDatumCache[scatteredEntry.node] = routingDatum
gssp.routingDatumCache.putRoutingDatum(scatteredEntry.node, routingDatum)
}

row := rowenc.EncDatumRow{
Expand Down Expand Up @@ -263,6 +265,7 @@ func runGenerativeSplitAndScatter(
chunkSplitAndScatterers []splitAndScatterer,
chunkEntrySplitAndScatterers []splitAndScatterer,
doneScatterCh chan<- entryNode,
cache *routingDatumCache,
) error {
log.Infof(ctx, "Running generative split and scatter with %d total spans, %d chunk size, %d nodes",
spec.NumEntries, spec.ChunkSize, spec.NumNodes)
Expand Down Expand Up @@ -359,6 +362,8 @@ func runGenerativeSplitAndScatter(
for worker := 0; worker < chunkSplitAndScatterWorkers; worker++ {
worker := worker
g2.GoCtx(func(ctx context.Context) error {
hash := fnv.New32a()

// Chunks' leaseholders should be randomly placed throughout the
// cluster.
for importSpanChunk := range restoreEntryChunksCh {
Expand All @@ -375,13 +380,32 @@ func runGenerativeSplitAndScatter(
return err
}
if chunkDestination == 0 {
// If scatter failed to find a node for range ingestion, route the range
// to the node currently running the split and scatter processor.
// If scatter failed to find a node for range ingestion, route the
// range to a random node that has already been scattered to so far.
// The random node is chosen by hashing the scatter key.
if nodeID, ok := flowCtx.NodeID.OptionalNodeID(); ok {
cachedNodeIDs := cache.cachedNodeIDs()
if len(cachedNodeIDs) > 0 {
hash.Reset()
if _, err := hash.Write(scatterKey); err != nil {
log.Warningf(ctx, "scatter returned node 0. Route span starting at %s to current node %v because of hash error: %v",
scatterKey, nodeID, err)
} else {
hashedKey := int(hash.Sum32())
nodeID = cachedNodeIDs[hashedKey%len(cachedNodeIDs)]
}

log.Warningf(ctx, "scatter returned node 0. "+
"Random route span starting at %s node %v", scatterKey, nodeID)
} else {
log.Warningf(ctx, "scatter returned node 0. "+
"Route span starting at %s to current node %v", scatterKey, nodeID)
}
chunkDestination = nodeID
log.Warningf(ctx, "scatter returned node 0. "+
"Route span starting at %s to current node %v", scatterKey, nodeID)
} else {
// TODO(rui): OptionalNodeID only returns a node if the sql server runs
// in the same process as the kv server (e.g., not serverless). Figure
// out how to handle this error in serverless restore.
log.Warningf(ctx, "scatter returned node 0. "+
"Route span starting at %s to default stream", scatterKey)
}
Expand Down Expand Up @@ -457,6 +481,33 @@ func runGenerativeSplitAndScatter(
return g.Wait()
}

type routingDatumCache struct {
cache map[roachpb.NodeID]rowenc.EncDatum
nodeIDs []roachpb.NodeID
}

func (c *routingDatumCache) getRoutingDatum(nodeID roachpb.NodeID) (rowenc.EncDatum, bool) {
d, ok := c.cache[nodeID]
return d, ok
}

func (c *routingDatumCache) putRoutingDatum(nodeID roachpb.NodeID, datum rowenc.EncDatum) {
if _, ok := c.cache[nodeID]; !ok {
c.nodeIDs = append(c.nodeIDs, nodeID)
}
c.cache[nodeID] = datum
}

func (c *routingDatumCache) cachedNodeIDs() []roachpb.NodeID {
return c.nodeIDs
}

func newRoutingDatumCache() routingDatumCache {
return routingDatumCache{
cache: make(map[roachpb.NodeID]rowenc.EncDatum),
}
}

func init() {
rowexec.NewGenerativeSplitAndScatterProcessor = newGenerativeSplitAndScatterProcessor
}
125 changes: 124 additions & 1 deletion pkg/ccl/backupccl/generative_split_and_scatter_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -22,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -111,13 +113,134 @@ func TestRunGenerativeSplitAndScatterContextCancel(t *testing.T) {
chunkSplitScatterers := []splitAndScatterer{makeSplitAndScatterer(flowCtx.Cfg.DB.KV(), kr)}
chunkEntrySpliterScatterers := []splitAndScatterer{makeSplitAndScatterer(flowCtx.Cfg.DB.KV(), kr)}

cache := routingDatumCache{
cache: make(map[roachpb.NodeID]rowenc.EncDatum),
}

// Large enough so doneScatterCh never blocks.
doneScatterCh := make(chan entryNode, 1000)
err = runGenerativeSplitAndScatter(ctx, &flowCtx, &spec, chunkSplitScatterers, chunkEntrySpliterScatterers, doneScatterCh)
err = runGenerativeSplitAndScatter(ctx, &flowCtx, &spec, chunkSplitScatterers, chunkEntrySpliterScatterers, doneScatterCh, &cache)

require.Error(t, err, "context canceled")
}

// TestRunGenerativeSplitAndScatterRandomizedDestOnFailScatter verifies that if
// a chunk fails to scatter, then the chunk will be sent to a random destination
// that we've already seen before.
func TestRunGenerativeSplitAndScatterRandomizedDestOnFailScatter(t *testing.T) {
defer leaktest.AfterTest(t)()

const numAccounts = 1000
const localFoo = "nodelocal://0/foo"
ctx := context.Background()
tc, sqlDB, _, cleanupFn := backupRestoreTestSetup(t, singleNode, numAccounts,
InitManualReplication)
defer cleanupFn()

st := cluster.MakeTestingClusterSettings()
evalCtx := eval.MakeTestingEvalContext(st)
evalCtx.NodeID = base.NewSQLIDContainerForNode(tc.Server(0).RPCContext().NodeID)

testDiskMonitor := execinfra.NewTestDiskMonitor(ctx, st)
defer testDiskMonitor.Stop(ctx)

s0 := tc.Server(0)
registry := tc.Server(0).JobRegistry().(*jobs.Registry)
execCfg := s0.ExecutorConfig().(sql.ExecutorConfig)
flowCtx := execinfra.FlowCtx{
Cfg: &execinfra.ServerConfig{
Settings: st,
DB: s0.InternalDB().(descs.DB),
JobRegistry: registry,
ExecutorConfig: &execCfg,
},
EvalCtx: &evalCtx,
Mon: evalCtx.TestingMon,
DiskMonitor: testDiskMonitor,
NodeID: evalCtx.NodeID,
}

sqlDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.file_size = '1'`)
sqlDB.Exec(t, `BACKUP INTO $1`, localFoo)

backups := sqlDB.QueryStr(t, `SHOW BACKUPS IN $1`, localFoo)
require.Equal(t, 1, len(backups))
uri := localFoo + "/" + backups[0][0]

codec := keys.MakeSQLCodec(s0.RPCContext().TenantID)
backupTableDesc := desctestutils.TestingGetPublicTableDescriptor(tc.Servers[0].DB(), codec, "data", "bank")
backupStartKey := backupTableDesc.PrimaryIndexSpan(codec).Key

spec := makeTestingGenerativeSplitAndScatterSpec(
[]string{uri},
[]roachpb.Span{{
Key: backupStartKey,
EndKey: backupStartKey.PrefixEnd(),
}},
)

// These split and scatterers will always fail the scatter and return 0 as the
// chunk destination.
chunkSplitScatterers := []splitAndScatterer{
&scatterAlwaysFailsSplitScatterer{},
&scatterAlwaysFailsSplitScatterer{},
&scatterAlwaysFailsSplitScatterer{},
}

// Fake some entries in the routing datum cache for nodes that we've scattered
// to already.
cache := routingDatumCache{
cache: map[roachpb.NodeID]rowenc.EncDatum{8: {}, 9: {}},
nodeIDs: []roachpb.NodeID{8, 9},
}

// Large enough so doneScatterCh never blocks.
doneScatterCh := make(chan entryNode, 1000)
err := runGenerativeSplitAndScatter(ctx, &flowCtx, &spec, chunkSplitScatterers, chunkSplitScatterers, doneScatterCh, &cache)
require.NoError(t, err)

close(doneScatterCh)
var doneEntries []entryNode
numEntriesByNode := make(map[roachpb.NodeID]int)

for e := range doneScatterCh {
doneEntries = append(doneEntries, e)
numEntriesByNode[e.node]++
}

// There are at least 10 splits from the original backed up bank table. Plus
// the entries from the system table, etc. Sanity check this.
require.GreaterOrEqual(t, len(doneEntries), 10)

// The failed scatter chunks should be scattered to the nodes that have been
// scattered to before and cached.
for node, cnt := range numEntriesByNode {
require.Contains(t, cache.nodeIDs, node)

// Sanity check that we have at least a couple of entries per node.
// This assertion is dependent on the hashing mechanism of the entries
// and can break if it changes.
require.GreaterOrEqual(t, cnt, 2)
}
}

// scatterAlwaysFailsSplitScatterer always fails the scatter and returns 0 as
// the chunk destination.
type scatterAlwaysFailsSplitScatterer struct {
}

func (t *scatterAlwaysFailsSplitScatterer) split(
ctx context.Context, codec keys.SQLCodec, splitKey roachpb.Key,
) error {
return nil
}

func (t *scatterAlwaysFailsSplitScatterer) scatter(
ctx context.Context, codec keys.SQLCodec, scatterKey roachpb.Key,
) (roachpb.NodeID, error) {
return 0, nil
}

func makeTestingGenerativeSplitAndScatterSpec(
backupURIs []string, requiredSpans []roachpb.Span,
) execinfrapb.GenerativeSplitAndScatterSpec {
Expand Down
1 change: 1 addition & 0 deletions pkg/upgrade/upgrades/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ go_test(
"create_computed_indexes_sql_statistics_test.go",
"create_index_usage_statement_statistics_test.go",
"create_jobs_metrics_polling_job_test.go",
"create_task_system_tables_test.go",
"database_role_settings_table_user_id_migration_test.go",
"delete_descriptors_of_dropped_functions_test.go",
"desc_id_sequence_for_system_tenant_test.go",
Expand Down
63 changes: 63 additions & 0 deletions pkg/upgrade/upgrades/create_task_system_tables_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package upgrades_test

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/upgrade/upgrades"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/assert"
)

func TestTaskTablesMigration(t *testing.T) {
skip.UnderStressRace(t)
defer leaktest.AfterTest(t)()
ctx := context.Background()

clusterArgs := base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
DisableAutomaticVersionUpgrade: make(chan struct{}),
BinaryVersionOverride: clusterversion.ByKey(
clusterversion.V23_1_TaskSystemTables - 1),
},
},
},
}

tc := testcluster.StartTestCluster(t, 1, clusterArgs)

defer tc.Stopper().Stop(ctx)
db := tc.ServerConn(0)
defer db.Close()

upgrades.Upgrade(
t,
db,
clusterversion.V23_1_TaskSystemTables,
nil,
false,
)

_, err := db.Exec("SELECT * FROM system.task_payloads")
assert.NoError(t, err, "system.task_payloads exists")

_, err = db.Exec("SELECT * FROM system.tenant_tasks")
assert.NoError(t, err, "system.tenant_tasks exists")
}

0 comments on commit 66dfb23

Please sign in to comment.