diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 52083e5f4d62..6c5424d40c26 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -1944,15 +1944,17 @@ func TestChangefeedSchemaChangeBackfillCheckpoint(t *testing.T) { changefeedbase.FrontierCheckpointMaxBytes.Override( context.Background(), &s.Server.ClusterSettings().SV, maxCheckpointSize) - // Note the tableSpan to avoid resolved events that leave no gaps - fooDesc := desctestutils.TestingGetPublicTableDescriptor( - s.SystemServer.DB(), s.Codec, "d", "foo") - tableSpan := fooDesc.PrimaryIndexSpan(s.Codec) + var tableSpan roachpb.Span + refreshTableSpan := func() { + fooDesc := desctestutils.TestingGetPublicTableDescriptor( + s.SystemServer.DB(), s.Codec, "d", "foo") + tableSpan = fooDesc.PrimaryIndexSpan(s.Codec) + } // FilterSpanWithMutation should ensure that once the backfill begins, the following resolved events // that are for that backfill (are of the timestamp right after the backfill timestamp) resolve some // but not all of the time, which results in a checkpoint eventually being created - haveGaps := false + numGaps := 0 var backfillTimestamp hlc.Timestamp var initialCheckpoint roachpb.SpanGroup var foundCheckpoint int32 @@ -1966,6 +1968,11 @@ func TestChangefeedSchemaChangeBackfillCheckpoint(t *testing.T) { // timestamp such that all backfill spans have a timestamp of // timestamp.Next(). if r.BoundaryType == expectedBoundaryType { + // NB: We wait until the schema change is public before looking + // up the table span. When using the declarative schema changer, + // the table span will be different before and after the schema + // change due to a primary index swap. + refreshTableSpan() backfillTimestamp = r.Timestamp return false, nil } @@ -1988,11 +1995,18 @@ func TestChangefeedSchemaChangeBackfillCheckpoint(t *testing.T) { return !(backfillTimestamp.IsEmpty() || r.Timestamp.LessEq(backfillTimestamp.Next())), nil } - // Only allow resolving if we definitely won't have a completely resolved table - if !r.Span.Equal(tableSpan) && haveGaps { + // At the end of a backfill, kv feed will emit a resolved span for the whole table. + // Filter this out because we would like to leave gaps. + if r.Span.Equal(tableSpan) { + return true, nil + } + + // Ensure that we have at least 2 gaps, so when a second checkpoint happens later in this test, + // the second checkpoint can still leave at least one gap. + if numGaps >= 2 { return rnd.Intn(10) > 7, nil } - haveGaps = true + numGaps += 1 return true, nil } @@ -2021,7 +2035,7 @@ func TestChangefeedSchemaChangeBackfillCheckpoint(t *testing.T) { // as well as the newly resolved ones var secondCheckpoint roachpb.SpanGroup foundCheckpoint = 0 - haveGaps = false + numGaps = 0 knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) (bool, error) { // Stop resolving anything after second checkpoint set to avoid backfill completion if secondCheckpoint.Len() > 0 { @@ -2049,11 +2063,17 @@ func TestChangefeedSchemaChangeBackfillCheckpoint(t *testing.T) { require.Falsef(t, initialCheckpoint.Encloses(r.Span), "second backfill should not resolve checkpointed span") - // Only allow resolving if we definitely won't have a completely resolved table - if !r.Span.Equal(tableSpan) && haveGaps { + // At the end of a backfill, kv feed will emit a resolved span for the whole table. + // Filter this out because we would like to leave at least one gap. + if r.Span.Equal(tableSpan) { + return true, nil + } + + // Ensure there is at least one gap so that we can receive resolved spans later. + if numGaps >= 1 { return rnd.Intn(10) > 7, nil } - haveGaps = true + numGaps += 1 return true, nil } @@ -2092,15 +2112,10 @@ func TestChangefeedSchemaChangeBackfillCheckpoint(t *testing.T) { // Pause job to avoid race on the resolved array require.NoError(t, jobFeed.Pause()) - // NB: With the declarative schema changer, there is a primary index swap, - // so the primary index span will change. - freshFooDesc := desctestutils.TestingGetPublicTableDescriptor( - s.SystemServer.DB(), s.Codec, "d", "foo") - tableSpanAfter := freshFooDesc.PrimaryIndexSpan(s.Codec) - // Verify that none of the resolved spans after resume were checkpointed. + t.Logf("Table Span: %s, Second Checkpoint: %v, Resolved Spans: %v", tableSpan, secondCheckpoint, resolved) for _, sp := range resolved { - require.Falsef(t, !sp.Equal(tableSpanAfter) && secondCheckpoint.Contains(sp.Key), "span should not have been resolved: %s", sp) + require.Falsef(t, !sp.Equal(tableSpan) && secondCheckpoint.Contains(sp.Key), "span should not have been resolved: %s", sp) } }