Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importccl: fix flaky test TestImportCSVStmt #34589

Merged
merged 1 commit into from
Feb 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,9 +435,6 @@ func TestBackupRestoreSystemJobs(t *testing.T) {
conn := sqlDB.DB.(*gosql.DB)
defer cleanupFn()

// Get the number of existing jobs.
baseNumJobs := jobutils.GetSystemJobsCount(t, sqlDB)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this still need to be removed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea -- that was needed to calculate the offset when the code was selecting all jobs. Now that it's just selecting BACKUP/RESTORE jobs, we don't need baseNumJobs


sanitizedIncDir := localFoo + "/inc"
incDir := sanitizedIncDir + "?secretCredentialsHere"

Expand All @@ -462,7 +459,7 @@ func TestBackupRestoreSystemJobs(t *testing.T) {
sqlDB.Exec(t, `SET DATABASE = data`)

sqlDB.Exec(t, `BACKUP TABLE bank TO $1 INCREMENTAL FROM $2`, incDir, fullDir)
if err := jobutils.VerifySystemJob(t, sqlDB, baseNumJobs+1, jobspb.TypeBackup, jobs.StatusSucceeded, jobs.Record{
if err := jobutils.VerifySystemJob(t, sqlDB, 1, jobspb.TypeBackup, jobs.StatusSucceeded, jobs.Record{
Username: security.RootUser,
Description: fmt.Sprintf(
`BACKUP TABLE bank TO '%s' INCREMENTAL FROM '%s'`,
Expand All @@ -477,7 +474,7 @@ func TestBackupRestoreSystemJobs(t *testing.T) {
}

sqlDB.Exec(t, `RESTORE TABLE bank FROM $1, $2 WITH OPTIONS ('into_db'='restoredb')`, fullDir, incDir)
if err := jobutils.VerifySystemJob(t, sqlDB, baseNumJobs+2, jobspb.TypeRestore, jobs.StatusSucceeded, jobs.Record{
if err := jobutils.VerifySystemJob(t, sqlDB, 0, jobspb.TypeRestore, jobs.StatusSucceeded, jobs.Record{
Username: security.RootUser,
Description: fmt.Sprintf(
`RESTORE TABLE bank FROM '%s', '%s' WITH into_db = 'restoredb'`,
Expand Down
23 changes: 12 additions & 11 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -867,13 +867,18 @@ func TestImportCSVStmt(t *testing.T) {
if testing.Short() {
t.Skip("short")
}
t.Skip(`#34568`)

const (
nodes = 3
numFiles = nodes + 2
rowsPerFile = 1000
)
const nodes = 3

numFiles := nodes + 2
rowsPerFile := 1000
if util.RaceEnabled {
// This test takes a while with the race detector, so reduce the number of
// files and rows per file in an attempt to speed it up.
numFiles = nodes
rowsPerFile = 16
}

ctx := context.Background()
dir, cleanup := testutils.TempDir(t)
defer cleanup()
Expand All @@ -897,9 +902,6 @@ func TestImportCSVStmt(t *testing.T) {
t.Fatal(err)
}

// Get the number of existing jobs.
baseNumJobs := jobutils.GetSystemJobsCount(t, sqlDB)

if err := ioutil.WriteFile(filepath.Join(dir, "empty.csv"), nil, 0666); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1174,7 +1176,7 @@ func TestImportCSVStmt(t *testing.T) {
}
jobPrefix += `t (a INT8 PRIMARY KEY, b STRING, INDEX (b), INDEX (a, b)) CSV DATA (%s)`

if err := jobutils.VerifySystemJob(t, sqlDB, baseNumJobs+testNum, jobspb.TypeImport, jobs.StatusSucceeded, jobs.Record{
if err := jobutils.VerifySystemJob(t, sqlDB, testNum, jobspb.TypeImport, jobs.StatusSucceeded, jobs.Record{
Username: security.RootUser,
Description: fmt.Sprintf(jobPrefix+tc.jobOpts, strings.Join(tc.files, ", ")),
}); err != nil {
Expand All @@ -1188,7 +1190,6 @@ func TestImportCSVStmt(t *testing.T) {
t, "does not exist",
`SELECT count(*) FROM t`,
)
testNum++
sqlDB.QueryRow(
t, `RESTORE csv.* FROM $1 WITH into_db = $2`, backupPath, intodb,
).Scan(
Expand Down
28 changes: 9 additions & 19 deletions pkg/testutils/jobutils/jobs_verification.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,37 +118,30 @@ func BulkOpResponseFilter(allowProgressIota *chan struct{}) storagebase.ReplicaR
}
}

// GetSystemJobsCount queries the number of entries in the jobs table.
func GetSystemJobsCount(t testing.TB, db *sqlutils.SQLRunner) int {
var jobCount int
db.QueryRow(t, `SELECT count(*) FROM crdb_internal.jobs`).Scan(&jobCount)
return jobCount
}

func verifySystemJob(
t testing.TB,
db *sqlutils.SQLRunner,
offset int,
expectedType jobspb.Type,
filterType jobspb.Type,
expectedStatus string,
expectedRunningStatus string,
expected jobs.Record,
) error {
var actual jobs.Record
var rawDescriptorIDs pq.Int64Array
var actualType string
var statusString string
var runningStatus gosql.NullString
var runningStatusString string
// We have to query for the nth job created rather than filtering by ID,
// because job-generating SQL queries (e.g. BACKUP) do not currently return
// the job ID.
db.QueryRow(t, `
SELECT job_type, description, user_name, descriptor_ids, status, running_status
FROM crdb_internal.jobs ORDER BY created LIMIT 1 OFFSET $1`,
SELECT description, user_name, descriptor_ids, status, running_status
FROM crdb_internal.jobs WHERE job_type = $1 ORDER BY created LIMIT 1 OFFSET $2`,
filterType.String(),
offset,
).Scan(
&actualType, &actual.Description, &actual.Username, &rawDescriptorIDs,
&actual.Description, &actual.Username, &rawDescriptorIDs,
&statusString, &runningStatus,
)
if runningStatus.Valid {
Expand All @@ -173,9 +166,6 @@ func verifySystemJob(
return errors.Errorf("job %d: expected running status %v, got %v",
offset, expectedRunningStatus, runningStatusString)
}
if e, a := expectedType.String(), actualType; e != a {
return errors.Errorf("job %d: expected type %v, got type %v", offset, e, a)
}

return nil
}
Expand All @@ -186,23 +176,23 @@ func VerifyRunningSystemJob(
t testing.TB,
db *sqlutils.SQLRunner,
offset int,
expectedType jobspb.Type,
filterType jobspb.Type,
expectedRunningStatus jobs.RunningStatus,
expected jobs.Record,
) error {
return verifySystemJob(t, db, offset, expectedType, "running", string(expectedRunningStatus), expected)
return verifySystemJob(t, db, offset, filterType, "running", string(expectedRunningStatus), expected)
}

// VerifySystemJob checks that job records are created as expected.
func VerifySystemJob(
t testing.TB,
db *sqlutils.SQLRunner,
offset int,
expectedType jobspb.Type,
filterType jobspb.Type,
expectedStatus jobs.Status,
expected jobs.Record,
) error {
return verifySystemJob(t, db, offset, expectedType, string(expectedStatus), "", expected)
return verifySystemJob(t, db, offset, filterType, string(expectedStatus), "", expected)
}

// GetJobID gets a particular job's ID.
Expand Down