Skip to content

Commit

Permalink
streamingccl: add physical_replication.consumer.split_on_job_retry se…
Browse files Browse the repository at this point in the history
…tting

This patch adds the default off
physical_replication.consumer.split_on_job_retry setting, which, when enabled
issues admin splits over the topology after a job level retry triggered by
distSQL replanning. This setting may help c2c catch up more quickly after a
replanning event, as it would prevent the destination side from issueing admin
splits during catchup scans.

Informs cockroachdb#114706

Release note: none
  • Loading branch information
msbutler committed Nov 28, 2023
1 parent b8c7824 commit e652a54
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 10 deletions.
27 changes: 27 additions & 0 deletions pkg/ccl/streamingccl/replicationtestutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,3 +624,30 @@ func SSTMaker(t *testing.T, keyValues []roachpb.KeyValue) kvpb.RangeFeedSSTable
WriteTS: batchTS,
}
}

// InitialSplitTester helps test expected behavior around initial splits: we
// always split before the initial scan, and after a job level retry if we just
// observed a replanning error and if
// physical_replication.consumer.split_on_job_retry.enabled is set to true.
type InitialSplitTester struct {
SplitOnRetry bool
InitialSplitCount int
}

func (ist *InitialSplitTester) GenInitialSplitterInspector(t *testing.T) func(noopScatter bool) {
return func(noopScatter bool) {
ist.InitialSplitCount++
if noopScatter {
require.Greater(t, ist.InitialSplitCount, 1)
}
}
}

func (ist *InitialSplitTester) MaybeSetSplitOnRetry(
t *testing.T, rng *rand.Rand, c *testcluster.TestCluster,
) {
ist.SplitOnRetry = rng.Intn(2) == 1
if ist.SplitOnRetry {
serverutils.SetClusterSetting(t, c, "physical_replication.consumer.split_on_job_retry.enabled", true)
}
}
7 changes: 7 additions & 0 deletions pkg/ccl/streamingccl/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ var InterNodeLag = settings.RegisterDurationSetting(
settings.NonNegativeDuration,
)

var SplitOnRetry = settings.RegisterBoolSetting(
settings.SystemOnly,
"physical_replication.consumer.split_on_job_retry.enabled",
"controls whether we issue admin splits on the partition spans after a job level retry related to distsql replanning",
false,
)

// DumpFrontierEntries controls the frequency at which we persist the entries in
// the frontier to the `system.job_info` table.
//
Expand Down
26 changes: 25 additions & 1 deletion pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,13 @@ func TestTenantStreamingPauseResumeIngestion(t *testing.T) {
defer log.Scope(t).Close(t)

ctx := context.Background()
ist := replicationtestutils.InitialSplitTester{}
args := replicationtestutils.DefaultTenantStreamingClustersArgs
args.TestingKnobs = &sql.StreamingTestingKnobs{
InspectInitialSplitter: ist.GenInitialSplitterInspector(t)}
c, cleanup := replicationtestutils.CreateTenantStreamingClusters(ctx, t, args)
defer cleanup()

ist.MaybeSetSplitOnRetry(t, c.Rng, c.DestCluster)
producerJobID, ingestionJobID := c.StartStreamReplication(ctx)

jobutils.WaitForJobToRun(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID))
Expand Down Expand Up @@ -195,6 +198,9 @@ func TestTenantStreamingPauseResumeIngestion(t *testing.T) {
srcTime = c.SrcCluster.Server(0).Clock().Now()
c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(ingestionJobID))
c.RequireFingerprintMatchAtTimestamp(srcTime.AsOfSystemTime())

// We should only observe one set of initial splits, as no replanning error occured.
require.Equal(t, 1, ist.InitialSplitCount)
}

func TestTenantStreamingPauseOnPermanentJobError(t *testing.T) {
Expand Down Expand Up @@ -688,6 +694,8 @@ func TestStreamingAutoReplan(t *testing.T) {
turnOffReplanning := make(chan struct{})
var alreadyReplanned atomic.Bool

ist := replicationtestutils.InitialSplitTester{}

// Track the number of unique addresses that we're connected to.
clientAddresses := make(map[string]struct{})
var addressesMu syncutil.Mutex
Expand All @@ -705,12 +713,14 @@ func TestStreamingAutoReplan(t *testing.T) {
alreadyReplanned.Swap(true)
}
},
InspectInitialSplitter: ist.GenInitialSplitterInspector(t),
}
c, cleanup := replicationtestutils.CreateMultiTenantStreamingCluster(ctx, t, args)
defer cleanup()
// Don't allow for replanning until the new nodes and scattered table have been created.
serverutils.SetClusterSetting(t, c.DestCluster, "stream_replication.replan_flow_threshold", 0)
serverutils.SetClusterSetting(t, c.DestCluster, "stream_replication.replan_flow_frequency", time.Millisecond*500)
ist.MaybeSetSplitOnRetry(t, c.Rng, c.DestCluster)

// Don't allow inter node lag replanning to affect the test.
serverutils.SetClusterSetting(t, c.DestCluster, "physical_replication.consumer.node_lag_replanning_threshold", 0)
Expand Down Expand Up @@ -748,6 +758,11 @@ func TestStreamingAutoReplan(t *testing.T) {
c.WaitUntilReplicatedTime(cutoverTime, jobspb.JobID(ingestionJobID))

require.Greater(t, len(clientAddresses), 1)
expectedInitialSplits := 1
if ist.SplitOnRetry {
expectedInitialSplits = 2
}
require.Equal(t, expectedInitialSplits, ist.InitialSplitCount)
}

// TestStreamingReplanOnLag asserts that the c2c job retries if a node lags far
Expand All @@ -766,6 +781,8 @@ func TestStreamingReplanOnLag(t *testing.T) {
turnOffReplanning := make(chan struct{})
var alreadyReplanned atomic.Bool

ist := replicationtestutils.InitialSplitTester{}

// Track the number of unique addresses that we're connected to, to ensure
// that all destination nodes participate in the replication stream.
clientAddresses := make(map[string]struct{})
Expand All @@ -792,11 +809,13 @@ func TestStreamingReplanOnLag(t *testing.T) {
}
return false
},
InspectInitialSplitter: ist.GenInitialSplitterInspector(t),
}
c, cleanup := replicationtestutils.CreateMultiTenantStreamingCluster(ctx, t, args)
defer cleanup()
// Don't allow for replanning based on node participation.
serverutils.SetClusterSetting(t, c.DestCluster, "stream_replication.replan_flow_threshold", 0)
ist.MaybeSetSplitOnRetry(t, c.Rng, c.DestCluster)

replicationtestutils.CreateScatteredTable(t, c, 3)

Expand All @@ -821,6 +840,11 @@ func TestStreamingReplanOnLag(t *testing.T) {

cutoverTime := c.DestSysServer.Clock().Now()
c.WaitUntilReplicatedTime(cutoverTime, jobspb.JobID(ingestionJobID))
expectedInitialSplits := 1
if ist.SplitOnRetry {
expectedInitialSplits = 2
}
require.Equal(t, expectedInitialSplits, ist.InitialSplitCount)
}

// TestProtectedTimestampManagement tests the active protected
Expand Down
31 changes: 23 additions & 8 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,24 @@ func startDistIngestion(
return rw.Err()
}

// We now attempt to create initial splits. We currently do
// this once during initial planning to avoid re-splitting on
// resume since it isn't clear to us at the moment whether
// re-splitting is always going to be useful.
if !streamProgress.InitialSplitComplete {
isReplanErr := func(err error) bool {
return errors.Is(err, sql.ErrPlanChanged) || errors.Is(err, ErrNodeLagging)
}

// We now attempt to issue splits over the topology if we've never done so
// (i.e. we're beginning the c2c job), or if we allow splits after a job level
// retry.
if !streamProgress.InitialSplitComplete ||
(streamingccl.SplitOnRetry.Get(&execCtx.ExecCfg().Settings.SV) && isReplanErr(resumer.lastRetryableIngestionError)) {
codec := execCtx.ExtendedEvalContext().Codec
splitter := &dbSplitAndScatter{db: execCtx.ExecCfg().DB}
splitter := &dbSplitAndScatter{
db: execCtx.ExecCfg().DB,
// If we've already created initial splits, don't issue scatters.
noopScatter: streamProgress.InitialSplitComplete,
}
if knobs := execCtx.ExecCfg().StreamingTestingKnobs; knobs != nil && knobs.InspectInitialSplitter != nil {
knobs.InspectInitialSplitter(splitter.noopScatter)
}
if err := createInitialSplits(ctx, codec, splitter, planner.initialTopology, details.DestinationTenantID); err != nil {
return err
}
Expand All @@ -226,7 +237,7 @@ func startDistIngestion(
}

err = ctxgroup.GoAndWait(ctx, execInitialPlan, replanner, tracingAggLoop, streamSpanConfigs)
if errors.Is(err, sql.ErrPlanChanged) {
if isReplanErr(err) {
execCtx.ExecCfg().JobRegistry.MetricsStruct().StreamIngest.(*Metrics).ReplanCount.Inc(1)
}
return err
Expand All @@ -250,7 +261,8 @@ type splitAndScatterer interface {
}

type dbSplitAndScatter struct {
db *kv.DB
db *kv.DB
noopScatter bool
}

func (s *dbSplitAndScatter) split(
Expand All @@ -260,6 +272,9 @@ func (s *dbSplitAndScatter) split(
}

func (s *dbSplitAndScatter) scatter(ctx context.Context, scatterKey roachpb.Key) error {
if s.noopScatter {
return nil
}
_, pErr := kv.SendWrapped(ctx, s.db.NonTransactionalSender(), &kvpb.AdminScatterRequest{
RequestHeader: kvpb.RequestHeaderFromSpan(roachpb.Span{
Key: scatterKey,
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ import (
type streamIngestionResumer struct {
job *jobs.Job

lastRetryableIngestionError error

mu struct {
syncutil.Mutex
// perNodeAggregatorStats is a per component running aggregate of trace
Expand Down Expand Up @@ -223,6 +225,7 @@ func ingestWithRetries(
break
}
status := redact.Sprintf("waiting before retrying error: %s", err)
resumer.lastRetryableIngestionError = err
updateRunningStatus(ctx, ingestionJob, jobspb.ReplicationError, status)
newReplicatedTime := loadReplicatedTime(ctx, execCtx.ExecCfg().InternalDB, ingestionJob)
if lastReplicatedTime.Less(newReplicatedTime) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/cmd/roachtest/tests/cluster_to_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1752,7 +1752,8 @@ func destClusterSettings(t test.Test, db *sqlutils.SQLRunner, additionalDuration
db.ExecMultiple(t, `SET CLUSTER SETTING cross_cluster_replication.enabled = true;`,
`SET CLUSTER SETTING kv.rangefeed.enabled = true;`,
`SET CLUSTER SETTING stream_replication.replan_flow_threshold = 0.1;`,
`SET CLUSTER SETTING physical_replication.consumer.node_lag_replanning_threshold = '5m';`)
`SET CLUSTER SETTING physical_replication.consumer.node_lag_replanning_threshold = '5m';`,
`SET CLUSTER SETTING physical_replication.consumer.split_on_job_retry.enabled = true;`)

if additionalDuration != 0 {
replanFrequency := additionalDuration / 2
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1793,6 +1793,8 @@ type StreamingTestingKnobs struct {
AfterReplicationFlowPlan func(map[base.SQLInstanceID]*execinfrapb.StreamIngestionDataSpec,
*execinfrapb.StreamIngestionFrontierSpec)

InspectInitialSplitter func(noopScatter bool)

AfterPersistingPartitionSpecs func()

// OverrideRevertRangeBatchSize allows overriding the `MaxSpanRequestKeys`
Expand Down

0 comments on commit e652a54

Please sign in to comment.