Skip to content

Commit

Permalink
streamingccl: split destination based on source
Browse files Browse the repository at this point in the history
This adds initial splits and scatters to the stream ingestion job.  We
split based on the topology delivered by the source to align our
initial splits with the splits that are likely coming in from the
source cluster.

These splits substantially improve the throughput of a stream's
initial scan.

Careful reviewers may note that we are not calling EnsureSafeSplitKey
here.  That is because the Spans in the source topology should already
be safe split keys.  Since EnsureSafeSplitKey isn't idempotent, if we
were ta call EnsureSafeSplitKey, we would end up creating splits at
rather poor split locations since any key that ends in an integer
would have components erroneously trimmed from it.

An alternative here would be to do what the buffering adder does and
create an initial set of splits by distributing them over the keyspace
of the first buffer. That has the advantage of allowing us to call
EnsureSafeSplitKey at the cost of less effective splits.

Note that we face this same problem during bulk operations.  Perhaps
in the future the source cluster should send some metadata about
manual splits that are issued or _something_ that lets the destination
know about the fact that we expect a large amount of data in a
particular span.

Epic: none

Release note: None
  • Loading branch information
stevendanna committed Sep 29, 2023
1 parent d0c8f31 commit 27e87b6
Show file tree
Hide file tree
Showing 4 changed files with 263 additions and 8 deletions.
15 changes: 9 additions & 6 deletions pkg/ccl/streamingccl/streamclient/random_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ type randomEventGenerator struct {
config randomStreamConfig
numEventsSinceLastResolved int
sstMaker SSTableMakerFn
codec keys.SQLCodec
tableDesc *tabledesc.Mutable
systemKVs []roachpb.KeyValue
}
Expand All @@ -228,6 +229,7 @@ func newRandomEventGenerator(
numEventsSinceLastResolved: 0,
sstMaker: fn,
tableDesc: tableDesc,
codec: keys.MakeSQLCodec(config.tenantID),
systemKVs: systemKVs,
}, nil
}
Expand All @@ -238,7 +240,7 @@ func (r *randomEventGenerator) generateNewEvent() streamingccl.Event {
// Emit a CheckpointEvent.
resolvedTime := timeutil.Now()
hlcResolvedTime := hlc.Timestamp{WallTime: resolvedTime.UnixNano()}
resolvedSpan := jobspb.ResolvedSpan{Span: r.tableDesc.TableSpan(keys.SystemSQLCodec), Timestamp: hlcResolvedTime}
resolvedSpan := jobspb.ResolvedSpan{Span: r.tableDesc.TableSpan(r.codec), Timestamp: hlcResolvedTime}
event = streamingccl.MakeCheckpointEvent([]jobspb.ResolvedSpan{resolvedSpan})
r.numEventsSinceLastResolved = 0
} else {
Expand All @@ -257,11 +259,11 @@ func (r *randomEventGenerator) generateNewEvent() streamingccl.Event {
size := 10 + r.rng.Intn(30)
keyVals := make([]roachpb.KeyValue, 0, size)
for i := 0; i < size; i++ {
keyVals = append(keyVals, makeRandomKey(r.rng, r.config, r.tableDesc))
keyVals = append(keyVals, makeRandomKey(r.rng, r.config, r.codec, r.tableDesc))
}
event = streamingccl.MakeSSTableEvent(r.sstMaker(keyVals))
} else {
event = streamingccl.MakeKVEvent(makeRandomKey(r.rng, r.config, r.tableDesc))
event = streamingccl.MakeKVEvent(makeRandomKey(r.rng, r.config, r.codec, r.tableDesc))
}
r.numEventsSinceLastResolved++
}
Expand Down Expand Up @@ -360,6 +362,7 @@ func (m *RandomStreamClient) Plan(ctx context.Context, _ streampb.StreamID) (Top
log.Infof(ctx, "planning random stream for tenant %d", m.config.tenantID)

// Allocate table IDs and return one per partition address in the topology.
srcCodec := keys.MakeSQLCodec(m.config.tenantID)
for i := 0; i < m.config.numPartitions; i++ {
tableID := m.getNextTableID()
tableDesc, err := m.tableDescForID(tableID)
Expand All @@ -375,7 +378,7 @@ func (m *RandomStreamClient) Plan(ctx context.Context, _ streampb.StreamID) (Top
ID: strconv.Itoa(i),
SrcAddr: streamingccl.PartitionAddress(partitionURI),
SubscriptionToken: []byte(partitionURI),
Spans: []roachpb.Span{tableDesc.TableSpan(keys.SystemSQLCodec)},
Spans: []roachpb.Span{tableDesc.TableSpan(srcCodec)},
})
}

Expand Down Expand Up @@ -579,7 +582,7 @@ func rekey(tenantID roachpb.TenantID, k roachpb.Key) roachpb.Key {
}

func makeRandomKey(
r *rand.Rand, config randomStreamConfig, tableDesc *tabledesc.Mutable,
r *rand.Rand, config randomStreamConfig, codec keys.SQLCodec, tableDesc *tabledesc.Mutable,
) roachpb.KeyValue {
// Create a key holding a random integer.
keyDatum := tree.NewDInt(tree.DInt(r.Intn(config.valueRange)))
Expand All @@ -590,7 +593,7 @@ func makeRandomKey(
var colIDToRowIndex catalog.TableColMap
colIDToRowIndex.Set(index.GetKeyColumnID(0), 0)

keyPrefix := rowenc.MakeIndexKeyPrefix(keys.SystemSQLCodec, tableDesc.GetID(), index.GetID())
keyPrefix := rowenc.MakeIndexKeyPrefix(codec, tableDesc.GetID(), index.GetID())
k, _, err := rowenc.EncodeIndexKey(tableDesc, index, colIDToRowIndex, tree.Datums{keyDatum}, keyPrefix)
if err != nil {
panic(err)
Expand Down
156 changes: 155 additions & 1 deletion pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
Expand All @@ -33,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/cockroachdb/redact"
Expand Down Expand Up @@ -211,14 +215,162 @@ func startDistIngestion(
return rw.Err()
}

updateRunningStatus(ctx, ingestionJob, jobspb.Replicating, "physical replication running")
// We now attempt to create initial splits. We currently do
// this once during initial planning to avoid re-splitting on
// resume since it isn't clear to us at the moment whether
// re-splitting is always going to be useful.
if !streamProgress.InitialSplitComplete {
codec := execCtx.ExtendedEvalContext().Codec
splitter := &dbSplitAndScatter{db: execCtx.ExecCfg().DB}
if err := createInitialSplits(ctx, codec, splitter, planner.initialTopology, details.DestinationTenantID); err != nil {
return err
}
} else {
log.Infof(ctx, "initial splits already complete")
}

if err := ingestionJob.NoTxn().Update(ctx, func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
md.Progress.GetStreamIngest().ReplicationStatus = jobspb.Replicating
md.Progress.GetStreamIngest().InitialSplitComplete = true
md.Progress.RunningStatus = "physical replication running"
ju.UpdateProgress(md.Progress)
return nil
}); err != nil {
return err
}

err = ctxgroup.GoAndWait(ctx, execInitialPlan, replanner, tracingAggLoop, streamSpanConfigs)
if errors.Is(err, sql.ErrPlanChanged) {
execCtx.ExecCfg().JobRegistry.MetricsStruct().StreamIngest.(*Metrics).ReplanCount.Inc(1)
}
return err
}

// TODO(ssd): This is a duplicative with the split_and_scatter processor in
// backupccl.
type splitAndScatterer interface {
split(
ctx context.Context,
splitKey roachpb.Key,
expirationTime hlc.Timestamp,
) error

scatter(
ctx context.Context,
scatterKey roachpb.Key,
) error

now() hlc.Timestamp
}

type dbSplitAndScatter struct {
db *kv.DB
}

func (s *dbSplitAndScatter) split(
ctx context.Context, splitKey roachpb.Key, expirationTime hlc.Timestamp,
) error {
return s.db.AdminSplit(ctx, splitKey, expirationTime)
}

func (s *dbSplitAndScatter) scatter(ctx context.Context, scatterKey roachpb.Key) error {
_, pErr := kv.SendWrapped(ctx, s.db.NonTransactionalSender(), &kvpb.AdminScatterRequest{
RequestHeader: kvpb.RequestHeaderFromSpan(roachpb.Span{
Key: scatterKey,
EndKey: scatterKey.Next(),
}),
RandomizeLeases: true,
MaxSize: 1, // don't scatter non-empty ranges on resume.
})
return pErr.GoError()
}

func (s *dbSplitAndScatter) now() hlc.Timestamp {
return s.db.Clock().Now()
}

// createInitialSplits creates splits based on the given toplogy from the
// source.
//
// The idea here is to use the information from the source cluster about
// the distribution of the data to produce split points to help prevent
// ingestion processors from pushing data into the same ranges during
// the initial scan.
func createInitialSplits(
ctx context.Context,
codec keys.SQLCodec,
splitter splitAndScatterer,
topology streamclient.Topology,
destTenantID roachpb.TenantID,
) error {
ctx, sp := tracing.ChildSpan(ctx, "streamingest.createInitialSplits")
defer sp.Finish()

rekeyer, err := backupccl.MakeKeyRewriterFromRekeys(codec,
nil /* tableRekeys */, []execinfrapb.TenantRekey{
{
OldID: topology.SourceTenantID,
NewID: destTenantID,
}},
true /* restoreTenantFromStream */)
if err != nil {
return err
}
for _, partition := range topology.Partitions {
for _, span := range partition.Spans {
startKey := span.Key.Clone()
splitKey, _, err := rekeyer.RewriteKey(startKey, 0 /* walltimeForImportElision */)
if err != nil {
return err
}

// NOTE(ssd): EnsureSafeSplitKey called on an arbitrary
// key unfortunately results in many of our split keys
// mapping to the same key for workloads like TPCC where
// the schema of the table includes integers that will
// get erroneously treated as the column family length.
//
// Since the partitions are generated from a call to
// PartitionSpans on the source cluster, they should be
// aligned with the split points in the original cluster
// and thus should be valid split keys. But, we are
// opening ourselves up to replicating bad splits from
// the original cluster.
//
// if newSplitKey, err := keys.EnsureSafeSplitKey(splitKey); err != nil {
// // Ignore the error since keys such as
// // /Tenant/2/Table/13 is an OK start key but
// // returns an error.
// } else if len(newSplitKey) != 0 {
// splitKey = newSplitKey
// }
//
if err := splitAndScatter(ctx, roachpb.Key(splitKey), splitter); err != nil {
return err
}

}
}
return nil
}

var splitAndScatterSitckyBitDuration = time.Hour

func splitAndScatter(
ctx context.Context, splitAndScatterKey roachpb.Key, s splitAndScatterer,
) error {
log.Infof(ctx, "splitting and scattering at %s", splitAndScatterKey)
expirationTime := s.now().AddDuration(splitAndScatterSitckyBitDuration)
if err := s.split(ctx, splitAndScatterKey, expirationTime); err != nil {
return err
}
if err := s.scatter(ctx, splitAndScatterKey); err != nil {
log.Warningf(ctx, "failed to scatter span starting at %s: %v",
splitAndScatterKey, err)
}
return nil
}

// makeReplicationFlowPlanner creates a replicationFlowPlanner and the initial physical plan.
func makeReplicationFlowPlanner(
ctx context.Context,
Expand Down Expand Up @@ -254,6 +406,7 @@ type replicationFlowPlanner struct {
initialPlanCtx *sql.PlanningCtx

initialStreamAddresses []string
initialTopology streamclient.Topology

srcTenantID roachpb.TenantID
}
Expand Down Expand Up @@ -287,6 +440,7 @@ func (p *replicationFlowPlanner) constructPlanGenerator(
return nil, nil, err
}
if !p.containsInitialStreamAddresses() {
p.initialTopology = topology
p.initialStreamAddresses = topology.StreamAddresses()
}

Expand Down
93 changes: 93 additions & 0 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_dist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -324,3 +326,94 @@ func TestSourceDestMatching(t *testing.T) {
})
}
}

type testSplitter struct {
splits []roachpb.Key
scatters []roachpb.Key
splitErr func(key roachpb.Key) error
scatterErr func(key roachpb.Key) error
}

func (ts *testSplitter) split(_ context.Context, splitKey roachpb.Key, _ hlc.Timestamp) error {
ts.splits = append(ts.splits, splitKey)
if ts.splitErr != nil {
return ts.splitErr(splitKey)
}
return nil
}

func (ts *testSplitter) scatter(_ context.Context, scatterKey roachpb.Key) error {
ts.scatters = append(ts.scatters, scatterKey)
if ts.scatterErr != nil {
return ts.scatterErr(scatterKey)
}
return nil
}

func (ts *testSplitter) now() hlc.Timestamp {
return hlc.Timestamp{}
}

func TestCreateInitialSplits(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
sourceTenantID := roachpb.MustMakeTenantID(11)
inputCodec := keys.MakeSQLCodec(sourceTenantID)
destTenantID := roachpb.MustMakeTenantID(12)
outputCodec := keys.MakeSQLCodec(destTenantID)

testSpan := func(codec keys.SQLCodec, tableID uint32) roachpb.Span {
return roachpb.Span{
Key: codec.IndexPrefix(tableID, 1),
EndKey: codec.IndexPrefix(tableID, 2),
}
}
inputSpans := []roachpb.Span{
testSpan(inputCodec, 100),
testSpan(inputCodec, 200),
testSpan(inputCodec, 300),
testSpan(inputCodec, 400),
}
outputSpans := []roachpb.Span{
testSpan(outputCodec, 100),
testSpan(outputCodec, 200),
testSpan(outputCodec, 300),
testSpan(outputCodec, 400),
}
topo := streamclient.Topology{
SourceTenantID: sourceTenantID,
Partitions: []streamclient.PartitionInfo{
{Spans: inputSpans},
},
}

t.Run("rekeys before splitting", func(t *testing.T) {
ts := &testSplitter{}
err := createInitialSplits(ctx, keys.SystemSQLCodec, ts, topo, destTenantID)
require.NoError(t, err)
expectedSplitsAndScatters := make([]roachpb.Key, 0, len(outputSpans))
for _, sp := range outputSpans {
expectedSplitsAndScatters = append(expectedSplitsAndScatters, sp.Key)
}

require.Equal(t, expectedSplitsAndScatters, ts.splits)
require.Equal(t, expectedSplitsAndScatters, ts.scatters)

})
t.Run("split errors are fatal", func(t *testing.T) {
require.Error(t, createInitialSplits(ctx, keys.SystemSQLCodec, &testSplitter{
splitErr: func(_ roachpb.Key) error {
return errors.New("test error")
},
}, topo, destTenantID))
})
t.Run("ignores scatter errors", func(t *testing.T) {
require.NoError(t, createInitialSplits(ctx, keys.SystemSQLCodec, &testSplitter{
scatterErr: func(_ roachpb.Key) error {
return errors.New("test error")
},
}, topo, destTenantID))
})
}
Loading

0 comments on commit 27e87b6

Please sign in to comment.