Skip to content

Commit

Permalink
Merge pull request #132259 from dt/backport24.1-129750
Browse files Browse the repository at this point in the history
release-24.1: backupccl: do restore chunk work dispatch/wait in group
  • Loading branch information
dt authored Oct 10, 2024
2 parents 4f58cb4 + 4a7e017 commit 4e87856
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 66 deletions.
129 changes: 65 additions & 64 deletions pkg/ccl/backupccl/generative_split_and_scatter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,82 +539,83 @@ func runGenerativeSplitAndScatter(

importSpanChunksCh := make(chan scatteredChunk, chunkSplitAndScatterWorkers*2)

// This group of goroutines processes the chunks from restoreEntryChunksCh.
// For each chunk, a split is created at the start key of the next chunk. The
// current chunk is then scattered, and the chunk with its destination is
// passed to importSpanChunksCh.
g2 := ctxgroup.WithContext(ctx)
for worker := 0; worker < chunkSplitAndScatterWorkers; worker++ {
worker := worker
g2.GoCtx(func(ctx context.Context) error {
hash := fnv.New32a()

// Chunks' leaseholders should be randomly placed throughout the
// cluster.
for importSpanChunk := range restoreEntryChunksCh {
scatterKey := importSpanChunk.entries[0].Span.Key
if !importSpanChunk.splitKey.Equal(roachpb.Key{}) {
// Split at the start of the next chunk, to partition off a
// prefix of the space to scatter.
if err := chunkSplitAndScatterers[worker].split(ctx, flowCtx.Codec(), importSpanChunk.splitKey); err != nil {
g.GoCtx(func(ctx context.Context) error {
defer close(importSpanChunksCh)

// This group of goroutines processes the chunks from restoreEntryChunksCh.
// For each chunk, a split is created at the start key of the next chunk. The
// current chunk is then scattered, and the chunk with its destination is
// passed to importSpanChunksCh.
g2 := ctxgroup.WithContext(ctx)
for worker := 0; worker < chunkSplitAndScatterWorkers; worker++ {
worker := worker
g2.GoCtx(func(ctx context.Context) error {
hash := fnv.New32a()

// Chunks' leaseholders should be randomly placed throughout the
// cluster.
for importSpanChunk := range restoreEntryChunksCh {
scatterKey := importSpanChunk.entries[0].Span.Key
if !importSpanChunk.splitKey.Equal(roachpb.Key{}) {
// Split at the start of the next chunk, to partition off a
// prefix of the space to scatter.
if err := chunkSplitAndScatterers[worker].split(ctx, flowCtx.Codec(), importSpanChunk.splitKey); err != nil {
return err
}
}
chunkDestination, err := chunkSplitAndScatterers[worker].scatter(ctx, flowCtx.Codec(), scatterKey)
if err != nil {
return err
}
}
chunkDestination, err := chunkSplitAndScatterers[worker].scatter(ctx, flowCtx.Codec(), scatterKey)
if err != nil {
return err
}
if chunkDestination == 0 {
// If scatter failed to find a node for range ingestion, route the
// range to a random node that has already been scattered to so far.
// The random node is chosen by hashing the scatter key.
if nodeID, ok := flowCtx.NodeID.OptionalNodeID(); ok {
cachedNodeIDs := cache.cachedNodeIDs()
if len(cachedNodeIDs) > 0 {
hash.Reset()
if _, err := hash.Write(scatterKey); err != nil {
log.Warningf(ctx, "scatter returned node 0. Route span starting at %s to current node %v because of hash error: %v",
scatterKey, nodeID, err)
if chunkDestination == 0 {
// If scatter failed to find a node for range ingestion, route the
// range to a random node that has already been scattered to so far.
// The random node is chosen by hashing the scatter key.
if nodeID, ok := flowCtx.NodeID.OptionalNodeID(); ok {
cachedNodeIDs := cache.cachedNodeIDs()
if len(cachedNodeIDs) > 0 {
hash.Reset()
if _, err := hash.Write(scatterKey); err != nil {
log.Warningf(ctx, "scatter returned node 0. Route span starting at %s to current node %v because of hash error: %v",
scatterKey, nodeID, err)
} else {
hashedKey := int(hash.Sum32())
nodeID = cachedNodeIDs[hashedKey%len(cachedNodeIDs)]
}

log.Warningf(ctx, "scatter returned node 0. "+
"Random route span starting at %s node %v", scatterKey, nodeID)
} else {
hashedKey := int(hash.Sum32())
nodeID = cachedNodeIDs[hashedKey%len(cachedNodeIDs)]
log.Warningf(ctx, "scatter returned node 0. "+
"Route span starting at %s to current node %v", scatterKey, nodeID)
}

log.Warningf(ctx, "scatter returned node 0. "+
"Random route span starting at %s node %v", scatterKey, nodeID)
chunkDestination = nodeID
} else {
// TODO(rui): OptionalNodeID only returns a node if the sql server runs
// in the same process as the kv server (e.g., not serverless). Figure
// out how to handle this error in serverless restore.
log.Warningf(ctx, "scatter returned node 0. "+
"Route span starting at %s to current node %v", scatterKey, nodeID)
"Route span starting at %s to default stream", scatterKey)
}
chunkDestination = nodeID
} else {
// TODO(rui): OptionalNodeID only returns a node if the sql server runs
// in the same process as the kv server (e.g., not serverless). Figure
// out how to handle this error in serverless restore.
log.Warningf(ctx, "scatter returned node 0. "+
"Route span starting at %s to default stream", scatterKey)
}
}

sc := scatteredChunk{
destination: chunkDestination,
entries: importSpanChunk.entries,
}
sc := scatteredChunk{
destination: chunkDestination,
entries: importSpanChunk.entries,
}

select {
case <-ctx.Done():
return errors.Wrap(ctx.Err(), "sending scattered chunk")
case importSpanChunksCh <- sc:
select {
case <-ctx.Done():
return errors.Wrap(ctx.Err(), "sending scattered chunk")
case importSpanChunksCh <- sc:
}
}
}
return nil
})
}
return nil
})
}

// This goroutine waits for the chunkSplitAndScatter workers to finish so that
// it can close importSpanChunksCh.
g.GoCtx(func(ctx context.Context) error {
defer close(importSpanChunksCh)
// This goroutine waits for the chunkSplitAndScatter workers to finish so that
// it can close importSpanChunksCh.
return errors.Wrap(g2.Wait(), "waiting for chunkSplitAndScatter workers")
})

Expand Down
16 changes: 14 additions & 2 deletions pkg/ccl/backupccl/generative_split_and_scatter_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -226,23 +227,34 @@ func TestRunGenerativeSplitAndScatterRandomizedDestOnFailScatter(t *testing.T) {
// and can break if it changes.
require.GreaterOrEqual(t, cnt, 2)
}

// Also test that errors from split mid-chunk are returned (this deadlocked at
// one point).
spec.ChunkSize = 2
require.Error(t, runGenerativeSplitAndScatter(ctx, &flowCtx, &spec,
[]splitAndScatterer{&scatterAlwaysFailsSplitScatterer{}},
[]splitAndScatterer{&scatterAlwaysFailsSplitScatterer{err: errors.New("injected")}},
make(chan entryNode, 1000),
&cache,
))
}

// scatterAlwaysFailsSplitScatterer always fails the scatter and returns 0 as
// the chunk destination.
type scatterAlwaysFailsSplitScatterer struct {
err error
}

func (t *scatterAlwaysFailsSplitScatterer) split(
ctx context.Context, codec keys.SQLCodec, splitKey roachpb.Key,
) error {
return nil
return t.err
}

func (t *scatterAlwaysFailsSplitScatterer) scatter(
ctx context.Context, codec keys.SQLCodec, scatterKey roachpb.Key,
) (roachpb.NodeID, error) {
return 0, nil
return 0, t.err
}

func makeTestingGenerativeSplitAndScatterSpec(
Expand Down

0 comments on commit 4e87856

Please sign in to comment.