Skip to content

Commit

Permalink
backupccl: delete old-style split and scatter processor
Browse files Browse the repository at this point in the history
This processor has not been used since 23.1, there is
little value in keeping it around. We now have the
generative split and scatter processor.

Epic: none
Release note: None
  • Loading branch information
adityamaru committed Oct 5, 2023
1 parent 767e135 commit a343158
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 810 deletions.
5 changes: 0 additions & 5 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ go_library(
"schedule_exec.go",
"schedule_pts_chaining.go",
"show.go",
"split_and_scatter_processor.go",
"system_schema.go",
"targets.go",
":gen-targetscope-stringer", # keep
Expand Down Expand Up @@ -196,7 +195,6 @@ go_test(
"restore_span_covering_test.go",
"schedule_pts_chaining_test.go",
"show_test.go",
"split_and_scatter_processor_test.go",
"system_schema_test.go",
"tenant_backup_nemesis_test.go",
"utils_test.go",
Expand Down Expand Up @@ -267,7 +265,6 @@ go_test(
"//pkg/sql/catalog",
"//pkg/sql/catalog/bootstrap",
"//pkg/sql/catalog/catalogkeys",
"//pkg/sql/catalog/catenumpb",
"//pkg/sql/catalog/dbdesc",
"//pkg/sql/catalog/descbuilder",
"//pkg/sql/catalog/descpb",
Expand All @@ -286,7 +283,6 @@ go_test(
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/randgen",
"//pkg/sql/rowenc",
"//pkg/sql/rowflow",
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
Expand All @@ -295,7 +291,6 @@ go_test(
"//pkg/storage",
"//pkg/testutils",
"//pkg/testutils/datapathutils",
"//pkg/testutils/distsqlutils",
"//pkg/testutils/fingerprintutils",
"//pkg/testutils/jobutils",
"//pkg/testutils/kvclientutils",
Expand Down
192 changes: 192 additions & 0 deletions pkg/ccl/backupccl/generative_split_and_scatter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,21 @@ package backupccl

import (
"context"
"fmt"
"hash/fnv"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupencryption"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupinfo"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catenumpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
Expand All @@ -29,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
)

Expand Down Expand Up @@ -73,6 +81,167 @@ type generativeSplitAndScatterProcessor struct {
scatterErr error
}

// scatteredChunk is the entries of a chunk of entries to process along with the
// node the chunk was scattered to.
type scatteredChunk struct {
destination roachpb.NodeID
entries []execinfrapb.RestoreSpanEntry
}

type splitAndScatterer interface {
// split issues a split request at the given key, which may be rewritten to
// the RESTORE keyspace.
split(ctx context.Context, codec keys.SQLCodec, splitKey roachpb.Key) error
// scatter issues a scatter request at the given key. It returns the node ID
// of where the range was scattered to.
scatter(ctx context.Context, codec keys.SQLCodec, scatterKey roachpb.Key) (roachpb.NodeID, error)
}

type noopSplitAndScatterer struct {
scatterNode roachpb.NodeID
}

var _ splitAndScatterer = noopSplitAndScatterer{}

// split implements splitAndScatterer.
func (n noopSplitAndScatterer) split(_ context.Context, _ keys.SQLCodec, _ roachpb.Key) error {
return nil
}

// scatter implements splitAndScatterer.
func (n noopSplitAndScatterer) scatter(
_ context.Context, _ keys.SQLCodec, _ roachpb.Key,
) (roachpb.NodeID, error) {
return n.scatterNode, nil
}

// dbSplitAndScatter is the production implementation of this processor's
// scatterer. It actually issues the split and scatter requests against the KV
// layer.
type dbSplitAndScatterer struct {
db *kv.DB
kr *KeyRewriter
}

var _ splitAndScatterer = dbSplitAndScatterer{}

func makeSplitAndScatterer(db *kv.DB, kr *KeyRewriter) splitAndScatterer {
return dbSplitAndScatterer{db: db, kr: kr}
}

// split implements splitAndScatterer.
func (s dbSplitAndScatterer) split(
ctx context.Context, codec keys.SQLCodec, splitKey roachpb.Key,
) error {
if s.kr == nil {
return errors.AssertionFailedf("KeyRewriter was not set when expected to be")
}
if s.db == nil {
return errors.AssertionFailedf("split and scatterer's database was not set when expected")
}

expirationTime := s.db.Clock().Now().Add(time.Hour.Nanoseconds(), 0)
newSplitKey, err := rewriteBackupSpanKey(codec, s.kr, splitKey)
if err != nil {
return err
}
if splitAt, err := keys.EnsureSafeSplitKey(newSplitKey); err != nil {
// Ignore the error, not all keys are table keys.
} else if len(splitAt) != 0 {
newSplitKey = splitAt
}
log.VEventf(ctx, 1, "presplitting new key %+v", newSplitKey)
if err := s.db.AdminSplit(ctx, newSplitKey, expirationTime); err != nil {
return errors.Wrapf(err, "splitting key %s", newSplitKey)
}

return nil
}

// scatter implements splitAndScatterer.
func (s dbSplitAndScatterer) scatter(
ctx context.Context, codec keys.SQLCodec, scatterKey roachpb.Key,
) (roachpb.NodeID, error) {
if s.kr == nil {
return 0, errors.AssertionFailedf("KeyRewriter was not set when expected to be")
}
if s.db == nil {
return 0, errors.AssertionFailedf("split and scatterer's database was not set when expected")
}

newScatterKey, err := rewriteBackupSpanKey(codec, s.kr, scatterKey)
if err != nil {
return 0, err
}
if scatterAt, err := keys.EnsureSafeSplitKey(newScatterKey); err != nil {
// Ignore the error, not all keys are table keys.
} else if len(scatterAt) != 0 {
newScatterKey = scatterAt
}

log.VEventf(ctx, 1, "scattering new key %+v", newScatterKey)
req := &kvpb.AdminScatterRequest{
RequestHeader: kvpb.RequestHeaderFromSpan(roachpb.Span{
Key: newScatterKey,
EndKey: newScatterKey.Next(),
}),
// This is a bit of a hack, but it seems to be an effective one (see #36665
// for graphs). As of the commit that added this, scatter is not very good
// at actually balancing leases. This is likely for two reasons: 1) there's
// almost certainly some regression in scatter's behavior, it used to work
// much better and 2) scatter has to operate by balancing leases for all
// ranges in a cluster, but in RESTORE, we really just want it to be
// balancing the span being restored into.
RandomizeLeases: true,
MaxSize: 1, // don't scatter non-empty ranges on resume.
}

res, pErr := kv.SendWrapped(ctx, s.db.NonTransactionalSender(), req)
if pErr != nil {
// TODO(dt): typed error.
if !strings.Contains(pErr.String(), "existing range size") {
// TODO(pbardea): Unfortunately, Scatter is still too unreliable to
// fail the RESTORE when Scatter fails. I'm uncomfortable that
// this could break entirely and not start failing the tests,
// but on the bright side, it doesn't affect correctness, only
// throughput.
log.Errorf(ctx, "failed to scatter span [%s,%s): %+v",
newScatterKey, newScatterKey.Next(), pErr.GoError())
}
return 0, nil
}

return s.findDestination(res.(*kvpb.AdminScatterResponse)), nil
}

// findDestination returns the node ID of the node of the destination of the
// AdminScatter request. If the destination cannot be found, 0 is returned.
func (s dbSplitAndScatterer) findDestination(res *kvpb.AdminScatterResponse) roachpb.NodeID {
if len(res.RangeInfos) > 0 {
// If the lease is not populated, we return the 0 value anyway. We receive 1
// RangeInfo per range that was scattered. Since we send a scatter request
// to each range that we make, we are only interested in the first range,
// which contains the key at which we're splitting and scattering.
return res.RangeInfos[0].Lease.Replica.NodeID
}

return roachpb.NodeID(0)
}

func routingDatumsForSQLInstance(
sqlInstanceID base.SQLInstanceID,
) (rowenc.EncDatum, rowenc.EncDatum) {
routingBytes := roachpb.Key(fmt.Sprintf("node%d", sqlInstanceID))
startDatum := rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes(routingBytes)))
endDatum := rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes(routingBytes.Next())))
return startDatum, endDatum
}

type entryNode struct {
entry execinfrapb.RestoreSpanEntry
node roachpb.NodeID
}

var _ execinfra.Processor = &generativeSplitAndScatterProcessor{}

func newGenerativeSplitAndScatterProcessor(
Expand Down Expand Up @@ -515,6 +684,29 @@ func newRoutingDatumCache() routingDatumCache {
}
}

var splitAndScatterOutputTypes = []*types.T{
types.Bytes, // Span key for the range router
types.Bytes, // RestoreDataEntry bytes
}

// routingSpanForSQLInstance provides the mapping to be used during distsql planning
// when setting up the output router.
func routingSpanForSQLInstance(sqlInstanceID base.SQLInstanceID) ([]byte, []byte, error) {
var alloc tree.DatumAlloc
startDatum, endDatum := routingDatumsForSQLInstance(sqlInstanceID)

startBytes, endBytes := make([]byte, 0), make([]byte, 0)
startBytes, err := startDatum.Encode(splitAndScatterOutputTypes[0], &alloc, catenumpb.DatumEncoding_ASCENDING_KEY, startBytes)
if err != nil {
return nil, nil, err
}
endBytes, err = endDatum.Encode(splitAndScatterOutputTypes[0], &alloc, catenumpb.DatumEncoding_ASCENDING_KEY, endBytes)
if err != nil {
return nil, nil, err
}
return startBytes, endBytes, nil
}

func init() {
rowexec.NewGenerativeSplitAndScatterProcessor = newGenerativeSplitAndScatterProcessor
}
Loading

0 comments on commit a343158

Please sign in to comment.