Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
100997: jobs: add some more debugging to TestJobControlByType r=adityamaru a=knz

This test is currently flaky (cockroachdb#99200, cockroachdb#98558). However, its failures do not tell us much.

This PR adds some additional debugging.

Epic: None

Co-authored-by: Raphael 'kena' Poss <[email protected]>
  • Loading branch information
craig[bot] and knz committed Apr 10, 2023
2 parents 6bbb865 + f59ae77 commit f59767c
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 17 deletions.
74 changes: 58 additions & 16 deletions pkg/jobs/delegate_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"fmt"
"strings"
"testing"
"text/tabwriter"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -338,6 +340,8 @@ func TestJobControlByType(t *testing.T) {
defer log.Scope(t).Close(t)
defer ResetConstructors()()

skip.UnderShort(t) // Test runs for more than 30s.

argsFn := func(args *base.TestServerArgs) {
// Prevent registry from changing job state while running this test.
interval := 24 * time.Hour
Expand All @@ -346,6 +350,31 @@ func TestJobControlByType(t *testing.T) {
th, cleanup := newTestHelperForTables(t, jobstest.UseSystemTables, argsFn)
defer cleanup()

delayedShowJobs := func(t *testing.T, query string) fmt.Stringer {
return delayedStringer{
func() string {
qrows := th.sqlDB.QueryStr(t, query)
jrows := th.sqlDB.QueryStr(t, "TABLE crdb_internal.jobs")

var buf strings.Builder
tw := tabwriter.NewWriter(&buf, 2, 1, 2, ' ', 0)
fmt.Fprintf(tw, "output of %q:\n", query)
for _, row := range qrows {
fmt.Fprintln(tw, strings.Join(row, "\t"))
}
_ = tw.Flush()

tw = tabwriter.NewWriter(&buf, 2, 1, 2, ' ', 0)
fmt.Fprintf(tw, "current jobs:\n")
for _, row := range jrows {
fmt.Fprintln(tw, strings.Join(row, "\t"))
}
_ = tw.Flush()
return buf.String()
},
}
}

registry := th.server.JobRegistry().(*Registry)
blockResume := make(chan struct{})
defer close(blockResume)
Expand All @@ -359,10 +388,10 @@ func TestJobControlByType(t *testing.T) {
sessiondata.RootUserSessionDataOverride,
invalidTypeQuery,
)
require.Error(t, err)
require.Error(t, err, "unsupported job type")
})

// To test the commands on valid job types, one job of every type in every state will be created
// To test the commands on valid job types, one job of every type in every state will be created.
var allJobTypes = []jobspb.Type{jobspb.TypeChangefeed, jobspb.TypeImport, jobspb.TypeBackup, jobspb.TypeRestore}
var jobspbTypeToString = map[jobspb.Type]string{
jobspb.TypeChangefeed: "CHANGEFEED",
Expand All @@ -374,7 +403,7 @@ func TestJobControlByType(t *testing.T) {
var allJobStates = []Status{StatusPending, StatusRunning, StatusPaused, StatusFailed,
StatusReverting, StatusSucceeded, StatusCanceled, StatusCancelRequested, StatusPauseRequested}

// This is required to make the jobs of each type controllable
// Make the jobs of each type controllable.
for _, jobType := range allJobTypes {
RegisterConstructor(jobType, func(job *Job, _ *cluster.Settings) Resumer {
return FakeResumer{
Expand All @@ -397,10 +426,11 @@ func TestJobControlByType(t *testing.T) {
{"cancel", []Status{StatusPending, StatusRunning, StatusPaused}, StatusCancelRequested},
} {
commandQuery := fmt.Sprintf("%s ALL %s JOBS", tc.command, jobspbTypeToString[jobType])
subJobs := "SHOW JOBS SELECT id FROM system.jobs WHERE job_type='" + jobspbTypeToString[jobType] + "'"
t.Run(commandQuery, func(t *testing.T) {
var jobIDStrings []string

// Make multiple jobs of every permutation of job type and job state
// Make multiple jobs of every permutation of job type and job state.
const numJobsPerStatus = 3
for _, jobInfo := range []struct {
jobDetails jobspb.Details
Expand All @@ -425,32 +455,38 @@ func TestJobControlByType(t *testing.T) {
_, err := registry.CreateAdoptableJobWithTxn(context.Background(), record, jobID, nil /* txn */)
require.NoError(t, err)
th.sqlDB.Exec(t, "UPDATE system.jobs SET status=$1 WHERE id=$2", status, jobID)
t.Logf("created job %d (%T) with status %s", jobID, record.Details, status)
}
}
}

jobIdsClause := fmt.Sprint(strings.Join(jobIDStrings, ", "))
jobIdsClause := strings.Join(jobIDStrings, ", ")
defer func() {
// Clear the system.jobs table for the next test run.
th.sqlDB.Exec(t, fmt.Sprintf("DELETE FROM system.jobs WHERE id IN (%s)", jobIdsClause))
th.sqlDB.Exec(t, fmt.Sprintf("DELETE FROM system.job_info WHERE job_id IN (%s)", jobIdsClause))
}()

// Execute the command and verify its executed on the expected number of rows
// Execute the command and verify it is executed on the expected number of rows.
numEffected, err := th.cfg.DB.Executor().ExecEx(
context.Background(),
"test-num-effected",
nil,
sessiondata.RootUserSessionDataOverride,
commandQuery,
)
require.NoError(t, err)
require.NoError(t, err, "%s", delayedShowJobs(t, subJobs))

// Jobs in the starting state should be affected
// Jobs in the starting state should be affected.
numExpectedJobsAffected := numJobsPerStatus * len(tc.startingStates)
require.Equal(t, numExpectedJobsAffected, numEffected)
require.Equal(t, numExpectedJobsAffected, numEffected, "%s", delayedShowJobs(t, subJobs))

// Both the affected jobs + the jobs originally in the target state should be in that state
// Both the affected jobs + the jobs originally in the target state should be in that state.
numExpectedJobsWithEndState := numExpectedJobsAffected + numJobsPerStatus

// By verifying that the correct number of jobs are in the expected end state and
// the expected number of jobs were affected by the command, we guarantee that
// only the expected jobs have changed
// only the expected jobs have changed.
var numJobs = 0
th.sqlDB.QueryRow(
t,
Expand All @@ -459,12 +495,18 @@ func TestJobControlByType(t *testing.T) {
tc.endState, jobType, jobIdsClause,
),
).Scan(&numJobs)
require.Equal(t, numJobs, numExpectedJobsWithEndState)

// Clear the system.jobs table for the next test run.
th.sqlDB.Exec(t, fmt.Sprintf("DELETE FROM system.jobs WHERE id IN (%s)", jobIdsClause))
th.sqlDB.Exec(t, fmt.Sprintf("DELETE FROM system.job_info WHERE job_id IN (%s)", jobIdsClause))
require.Equal(t, numJobs, numExpectedJobsWithEndState, "%s", delayedShowJobs(t, subJobs))
})
}
}
}

// delayedStringer is a stringer that delays the construction of its
// result string to the point String() is called.
type delayedStringer struct {
stringFn func() string
}

func (d delayedStringer) String() string {
return d.stringFn()
}
2 changes: 1 addition & 1 deletion pkg/sql/delegate/job_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ AND jobs.status IN (%s) AND jobs.created_by_id IN (%s)`,

if stmt.Type != "" {
if _, ok := protobufNameForType[stmt.Type]; !ok {
return nil, errors.New("Unsupported job type")
return nil, errors.New("unsupported job type")
}
queryStrFormat := `%s JOBS (
SELECT id
Expand Down

0 comments on commit f59767c

Please sign in to comment.