Skip to content

Commit

Permalink
backupccl: send chunks with fail scatters to random node in generativ…
Browse files Browse the repository at this point in the history
…e ssp

For chunks that have failed to scatter, this patch routes the chunk to a
random node instead of the current node. This is necessary as prior to the
generative version, split and scatter processors were on every node, thus there
was no imbalance introduced from routing chunks that have failed to scatter to
the current node. The new generative split and scatter processor is only on 1
node, and thus would cause the same node to process all chunks that have failed
to scatter.

Release note: None
  • Loading branch information
Rui Hu committed Mar 30, 2023
1 parent 3de803f commit 210650d
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 10 deletions.
69 changes: 60 additions & 9 deletions pkg/ccl/backupccl/generative_split_and_scatter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package backupccl

import (
"context"
"hash/fnv"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupencryption"
Expand Down Expand Up @@ -67,7 +68,7 @@ type generativeSplitAndScatterProcessor struct {

doneScatterCh chan entryNode
// A cache for routing datums, so only 1 is allocated per node.
routingDatumCache map[roachpb.NodeID]rowenc.EncDatum
routingDatumCache routingDatumCache
scatterErr error
}

Expand Down Expand Up @@ -129,7 +130,7 @@ func newGenerativeSplitAndScatterProcessor(
// in parallel downstream. It has been verified ad-hoc that this
// sizing does not bottleneck restore.
doneScatterCh: make(chan entryNode, int(spec.NumNodes)*maxConcurrentRestoreWorkers),
routingDatumCache: make(map[roachpb.NodeID]rowenc.EncDatum),
routingDatumCache: newRoutingDatumCache(),
}
if err := ssp.Init(ctx, ssp, post, generativeSplitAndScatterOutputTypes, flowCtx, processorID, nil, /* memMonitor */
execinfra.ProcStateOpts{
Expand Down Expand Up @@ -161,7 +162,8 @@ func (gssp *generativeSplitAndScatterProcessor) Start(ctx context.Context) {
TaskName: "generativeSplitAndScatter-worker",
SpanOpt: stop.ChildSpan,
}, func(ctx context.Context) {
gssp.scatterErr = runGenerativeSplitAndScatter(scatterCtx, gssp.flowCtx, &gssp.spec, gssp.chunkSplitAndScatterers, gssp.chunkEntrySplitAndScatterers, gssp.doneScatterCh)
gssp.scatterErr = runGenerativeSplitAndScatter(scatterCtx, gssp.flowCtx, &gssp.spec, gssp.chunkSplitAndScatterers, gssp.chunkEntrySplitAndScatterers, gssp.doneScatterCh,
&gssp.routingDatumCache)
cancel()
close(gssp.doneScatterCh)
close(workerDone)
Expand Down Expand Up @@ -191,10 +193,10 @@ func (gssp *generativeSplitAndScatterProcessor) Next() (
}

// The routing datums informs the router which output stream should be used.
routingDatum, ok := gssp.routingDatumCache[scatteredEntry.node]
routingDatum, ok := gssp.routingDatumCache.getRoutingDatum(scatteredEntry.node)
if !ok {
routingDatum, _ = routingDatumsForSQLInstance(base.SQLInstanceID(scatteredEntry.node))
gssp.routingDatumCache[scatteredEntry.node] = routingDatum
gssp.routingDatumCache.putRoutingDatum(scatteredEntry.node, routingDatum)
}

row := rowenc.EncDatumRow{
Expand Down Expand Up @@ -263,6 +265,7 @@ func runGenerativeSplitAndScatter(
chunkSplitAndScatterers []splitAndScatterer,
chunkEntrySplitAndScatterers []splitAndScatterer,
doneScatterCh chan<- entryNode,
cache *routingDatumCache,
) error {
log.Infof(ctx, "Running generative split and scatter with %d total spans, %d chunk size, %d nodes",
spec.NumEntries, spec.ChunkSize, spec.NumNodes)
Expand Down Expand Up @@ -359,6 +362,8 @@ func runGenerativeSplitAndScatter(
for worker := 0; worker < chunkSplitAndScatterWorkers; worker++ {
worker := worker
g2.GoCtx(func(ctx context.Context) error {
hash := fnv.New32a()

// Chunks' leaseholders should be randomly placed throughout the
// cluster.
for importSpanChunk := range restoreEntryChunksCh {
Expand All @@ -375,13 +380,32 @@ func runGenerativeSplitAndScatter(
return err
}
if chunkDestination == 0 {
// If scatter failed to find a node for range ingestion, route the range
// to the node currently running the split and scatter processor.
// If scatter failed to find a node for range ingestion, route the
// range to a random node that has already been scattered to so far.
// The random node is chosen by hashing the scatter key.
if nodeID, ok := flowCtx.NodeID.OptionalNodeID(); ok {
cachedNodeIDs := cache.cachedNodeIDs()
if len(cachedNodeIDs) > 0 {
hash.Reset()
if _, err := hash.Write(scatterKey); err != nil {
log.Warningf(ctx, "scatter returned node 0. Route span starting at %s to current node %v because of hash error: %v",
scatterKey, nodeID, err)
} else {
hashedKey := int(hash.Sum32())
nodeID = cachedNodeIDs[hashedKey%len(cachedNodeIDs)]
}

log.Warningf(ctx, "scatter returned node 0. "+
"Random route span starting at %s node %v", scatterKey, nodeID)
} else {
log.Warningf(ctx, "scatter returned node 0. "+
"Route span starting at %s to current node %v", scatterKey, nodeID)
}
chunkDestination = nodeID
log.Warningf(ctx, "scatter returned node 0. "+
"Route span starting at %s to current node %v", scatterKey, nodeID)
} else {
// TODO(rui): OptionalNodeID only returns a node if the sql server runs
// in the same process as the kv server (e.g., not serverless). Figure
// out how to handle this error in serverless restore.
log.Warningf(ctx, "scatter returned node 0. "+
"Route span starting at %s to default stream", scatterKey)
}
Expand Down Expand Up @@ -457,6 +481,33 @@ func runGenerativeSplitAndScatter(
return g.Wait()
}

type routingDatumCache struct {
cache map[roachpb.NodeID]rowenc.EncDatum
nodeIDs []roachpb.NodeID
}

func (c *routingDatumCache) getRoutingDatum(nodeID roachpb.NodeID) (rowenc.EncDatum, bool) {
d, ok := c.cache[nodeID]
return d, ok
}

func (c *routingDatumCache) putRoutingDatum(nodeID roachpb.NodeID, datum rowenc.EncDatum) {
if _, ok := c.cache[nodeID]; !ok {
c.nodeIDs = append(c.nodeIDs, nodeID)
}
c.cache[nodeID] = datum
}

func (c *routingDatumCache) cachedNodeIDs() []roachpb.NodeID {
return c.nodeIDs
}

func newRoutingDatumCache() routingDatumCache {
return routingDatumCache{
cache: make(map[roachpb.NodeID]rowenc.EncDatum),
}
}

func init() {
rowexec.NewGenerativeSplitAndScatterProcessor = newGenerativeSplitAndScatterProcessor
}
125 changes: 124 additions & 1 deletion pkg/ccl/backupccl/generative_split_and_scatter_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -22,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -111,13 +113,134 @@ func TestRunGenerativeSplitAndScatterContextCancel(t *testing.T) {
chunkSplitScatterers := []splitAndScatterer{makeSplitAndScatterer(flowCtx.Cfg.DB.KV(), kr)}
chunkEntrySpliterScatterers := []splitAndScatterer{makeSplitAndScatterer(flowCtx.Cfg.DB.KV(), kr)}

cache := routingDatumCache{
cache: make(map[roachpb.NodeID]rowenc.EncDatum),
}

// Large enough so doneScatterCh never blocks.
doneScatterCh := make(chan entryNode, 1000)
err = runGenerativeSplitAndScatter(ctx, &flowCtx, &spec, chunkSplitScatterers, chunkEntrySpliterScatterers, doneScatterCh)
err = runGenerativeSplitAndScatter(ctx, &flowCtx, &spec, chunkSplitScatterers, chunkEntrySpliterScatterers, doneScatterCh, &cache)

require.Error(t, err, "context canceled")
}

// TestRunGenerativeSplitAndScatterRandomizedDestOnFailScatter verifies that if
// a chunk fails to scatter, then the chunk will be sent to a random destination
// that we've already seen before.
func TestRunGenerativeSplitAndScatterRandomizedDestOnFailScatter(t *testing.T) {
defer leaktest.AfterTest(t)()

const numAccounts = 1000
const localFoo = "nodelocal://0/foo"
ctx := context.Background()
tc, sqlDB, _, cleanupFn := backupRestoreTestSetup(t, singleNode, numAccounts,
InitManualReplication)
defer cleanupFn()

st := cluster.MakeTestingClusterSettings()
evalCtx := eval.MakeTestingEvalContext(st)
evalCtx.NodeID = base.NewSQLIDContainerForNode(tc.Server(0).RPCContext().NodeID)

testDiskMonitor := execinfra.NewTestDiskMonitor(ctx, st)
defer testDiskMonitor.Stop(ctx)

s0 := tc.Server(0)
registry := tc.Server(0).JobRegistry().(*jobs.Registry)
execCfg := s0.ExecutorConfig().(sql.ExecutorConfig)
flowCtx := execinfra.FlowCtx{
Cfg: &execinfra.ServerConfig{
Settings: st,
DB: s0.InternalDB().(descs.DB),
JobRegistry: registry,
ExecutorConfig: &execCfg,
},
EvalCtx: &evalCtx,
Mon: evalCtx.TestingMon,
DiskMonitor: testDiskMonitor,
NodeID: evalCtx.NodeID,
}

sqlDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.file_size = '1'`)
sqlDB.Exec(t, `BACKUP INTO $1`, localFoo)

backups := sqlDB.QueryStr(t, `SHOW BACKUPS IN $1`, localFoo)
require.Equal(t, 1, len(backups))
uri := localFoo + "/" + backups[0][0]

codec := keys.MakeSQLCodec(s0.RPCContext().TenantID)
backupTableDesc := desctestutils.TestingGetPublicTableDescriptor(tc.Servers[0].DB(), codec, "data", "bank")
backupStartKey := backupTableDesc.PrimaryIndexSpan(codec).Key

spec := makeTestingGenerativeSplitAndScatterSpec(
[]string{uri},
[]roachpb.Span{{
Key: backupStartKey,
EndKey: backupStartKey.PrefixEnd(),
}},
)

// These split and scatterers will always fail the scatter and return 0 as the
// chunk destination.
chunkSplitScatterers := []splitAndScatterer{
&scatterAlwaysFailsSplitScatterer{},
&scatterAlwaysFailsSplitScatterer{},
&scatterAlwaysFailsSplitScatterer{},
}

// Fake some entries in the routing datum cache for nodes that we've scattered
// to already.
cache := routingDatumCache{
cache: map[roachpb.NodeID]rowenc.EncDatum{8: {}, 9: {}},
nodeIDs: []roachpb.NodeID{8, 9},
}

// Large enough so doneScatterCh never blocks.
doneScatterCh := make(chan entryNode, 1000)
err := runGenerativeSplitAndScatter(ctx, &flowCtx, &spec, chunkSplitScatterers, chunkSplitScatterers, doneScatterCh, &cache)
require.NoError(t, err)

close(doneScatterCh)
var doneEntries []entryNode
numEntriesByNode := make(map[roachpb.NodeID]int)

for e := range doneScatterCh {
doneEntries = append(doneEntries, e)
numEntriesByNode[e.node]++
}

// There are at least 10 splits from the original backed up bank table. Plus
// the entries from the system table, etc. Sanity check this.
require.GreaterOrEqual(t, len(doneEntries), 10)

// The failed scatter chunks should be scattered to the nodes that have been
// scattered to before and cached.
for node, cnt := range numEntriesByNode {
require.Contains(t, cache.nodeIDs, node)

// Sanity check that we have at least a couple of entries per node.
// This assertion is dependent on the hashing mechanism of the entries
// and can break if it changes.
require.GreaterOrEqual(t, cnt, 2)
}
}

// scatterAlwaysFailsSplitScatterer always fails the scatter and returns 0 as
// the chunk destination.
type scatterAlwaysFailsSplitScatterer struct {
}

func (t *scatterAlwaysFailsSplitScatterer) split(
ctx context.Context, codec keys.SQLCodec, splitKey roachpb.Key,
) error {
return nil
}

func (t *scatterAlwaysFailsSplitScatterer) scatter(
ctx context.Context, codec keys.SQLCodec, scatterKey roachpb.Key,
) (roachpb.NodeID, error) {
return 0, nil
}

func makeTestingGenerativeSplitAndScatterSpec(
backupURIs []string, requiredSpans []roachpb.Span,
) execinfrapb.GenerativeSplitAndScatterSpec {
Expand Down

0 comments on commit 210650d

Please sign in to comment.