diff --git a/pkg/ccl/streamingccl/streamingest/datadriven_test.go b/pkg/ccl/streamingccl/streamingest/datadriven_test.go index 5290961a5ed6..fe816f6b074b 100644 --- a/pkg/ccl/streamingccl/streamingest/datadriven_test.go +++ b/pkg/ccl/streamingccl/streamingest/datadriven_test.go @@ -12,6 +12,7 @@ import ( "context" gosql "database/sql" "fmt" + "regexp" "strconv" "strings" "testing" @@ -163,7 +164,8 @@ func TestDataDriven(t *testing.T) { } var as string d.ScanArgs(t, "as", &as) - output := ds.queryAs(t, as, d.Input) + output, err := ds.queryAs(ctx, t, as, d.Input) + require.NoError(t, err) output = strings.TrimSpace(output) values := strings.Split(output, "\n") if len(values) != len(d.CmdArgs)-1 { @@ -207,11 +209,18 @@ func TestDataDriven(t *testing.T) { case "query-sql": var as string + var regexError string + d.MaybeScanArgs(t, "regex-error", ®exError) d.ScanArgs(t, "as", &as) if d.HasArg("retry") { - ds.queryAsWithRetry(t, as, d.Input, d.Expected) + ds.queryAsWithRetry(ctx, t, as, d.Input, d.Expected, regexError) + } + output, err := ds.queryAs(ctx, t, as, d.Input) + if regexError != "" { + require.NoError(t, handleRegex(t, err, regexError)) + return "" } - return ds.queryAs(t, as, d.Input) + return output case "compare-replication-results": ds.replicationClusters.CompareResult(d.Input) @@ -346,7 +355,7 @@ func TestDataDriven(t *testing.T) { FROM system.span_configurations WHERE start_key >= '\x%x' AND start_key <= '\x%x' ORDER BY start_key;`, startKey, endKey) - return ds.queryAsWithRetry(t, as, listQuery, d.Expected) + return ds.queryAsWithRetry(ctx, t, as, listQuery, d.Expected, "") default: t.Fatalf("unsupported instruction: %s", d.Cmd) @@ -378,30 +387,43 @@ func (d *datadrivenTestState) cleanup(t *testing.T) { } } -func (d *datadrivenTestState) queryAs(t *testing.T, as, query string) string { +func (d *datadrivenTestState) queryAs( + ctx context.Context, t *testing.T, as, query string, +) (string, error) { var rows *gosql.Rows + var err error switch as { case "source-system": - rows = d.replicationClusters.SrcSysSQL.Query(t, query) + rows, err = d.replicationClusters.SrcSysSQL.DB.QueryContext(ctx, query) case "source-tenant": - rows = d.replicationClusters.SrcTenantSQL.Query(t, query) + rows, err = d.replicationClusters.SrcTenantSQL.DB.QueryContext(ctx, query) case "destination-system": - rows = d.replicationClusters.DestSysSQL.Query(t, query) + rows, err = d.replicationClusters.DestSysSQL.DB.QueryContext(ctx, query) case "destination-tenant": - rows = d.replicationClusters.DestTenantSQL.Query(t, query) + rows, err = d.replicationClusters.DestTenantSQL.DB.QueryContext(ctx, query) default: t.Fatalf("unsupported value to run SQL query as: %s", as) } + if err != nil { + return "", err + } output, err := sqlutils.RowsToDataDrivenOutput(rows) require.NoError(t, err) - return output + return output, nil } -func (d *datadrivenTestState) queryAsWithRetry(t *testing.T, as, query, expected string) string { +func (d *datadrivenTestState) queryAsWithRetry( + ctx context.Context, t *testing.T, as, query, expected string, regexError string, +) string { var output string + var err error testutils.SucceedsSoon(t, func() error { - output = d.queryAs(t, as, query) + output, err = d.queryAs(ctx, t, as, query) + if regexError != "" { + output = "" + return handleRegex(t, err, regexError) + } if output != expected { return errors.Newf("latest output: %s\n expected: %s", output, expected) } @@ -425,6 +447,19 @@ func (d *datadrivenTestState) execAs(t *testing.T, as, query string) { } } +func handleRegex(t *testing.T, err error, regexError string) error { + if err == nil { + return errors.Newf("expected non nil error to match %s", regexError) + } + matched, matchStringErr := regexp.MatchString(regexError, err.Error()) + require.NoError(t, matchStringErr) + if matched { + return nil + } else { + return errors.Wrapf(err, "error does not match regex error %s", regexError) + } +} + func newDatadrivenTestState() datadrivenTestState { return datadrivenTestState{ cleanupFns: make([]func() error, 0), diff --git a/pkg/ccl/streamingccl/streamingest/testdata/jobs_post_cutover b/pkg/ccl/streamingccl/streamingest/testdata/jobs_post_cutover new file mode 100644 index 000000000000..6d028a2a9e5f --- /dev/null +++ b/pkg/ccl/streamingccl/streamingest/testdata/jobs_post_cutover @@ -0,0 +1,48 @@ + +create-replication-clusters +---- + +start-replication-stream +---- + +exec-sql as=source-tenant +SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.after.write_first_checkpoint'; +---- + +query-sql as=source-tenant regex-error=(backup.after.write_first_checkpoint) +BACKUP INTO 'nodelocal://1/example-schedule'; +---- + +let $backupID as=source-tenant +WITH jobs AS (SHOW JOBS) SELECT job_id FROM jobs WHERE job_type='BACKUP'; +---- + +query-sql retry as=source-tenant +WITH jobs AS (SHOW JOBS) SELECT status FROM jobs WHERE job_id=$backupID; +---- +paused + +let $ts as=source-system +SELECT clock_timestamp()::timestamp::string +---- + +cutover ts=$ts +---- + +start-replicated-tenant +---- + +exec-sql as=destination-tenant +SET CLUSTER SETTING jobs.debug.pausepoints = ''; +---- + +exec-sql as=destination-tenant +RESUME JOB $backupID +---- + + +query-sql retry as=destination-tenant +WITH jobs AS (SHOW JOBS) SELECT status FROM jobs WHERE job_id=$backupID; +---- +failed +