Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jobs: introduce jobspb.JobID #61128

Merged
merged 1 commit into from
Feb 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -1119,7 +1119,7 @@ func backupPlanHook(
// 2. Verifies we can write to destination location.
// This temporary checkpoint file gets renamed to real checkpoint
// file when the backup jobs starts execution.
doWriteBackupManifestCheckpoint := func(ctx context.Context, jobID int64) error {
doWriteBackupManifestCheckpoint := func(ctx context.Context, jobID jobspb.JobID) error {
if err := writeBackupManifest(
ctx, p.ExecCfg().Settings, defaultStore, tempCheckpointFileNameForJob(jobID),
encryptionOptions, &backupManifest,
Expand Down Expand Up @@ -1335,7 +1335,7 @@ func protectTimestampForBackup(
ctx context.Context,
p sql.PlanHookState,
txn *kv.Txn,
jobID int64,
jobID jobspb.JobID,
spans []roachpb.Span,
startTime, endTime hlc.Timestamp,
backupDetails jobspb.BackupDetails,
Expand Down
18 changes: 9 additions & 9 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1278,8 +1278,8 @@ type inProgressState struct {
dir, name string
}

func (ip inProgressState) latestJobID() (int64, error) {
var id int64
func (ip inProgressState) latestJobID() (jobspb.JobID, error) {
var id jobspb.JobID
if err := ip.QueryRow(
`SELECT job_id FROM crdb_internal.jobs ORDER BY created DESC LIMIT 1`,
).Scan(&id); err != nil {
Expand Down Expand Up @@ -1514,7 +1514,7 @@ func createAndWaitForJob(
t.Fatal(err)
}

var jobID int64
var jobID jobspb.JobID
db.QueryRow(
t, `INSERT INTO system.jobs (created, status, payload, progress) VALUES ($1, $2, $3, $4) RETURNING id`,
timeutil.FromUnixMicros(now), jobs.StatusRunning, payload, progressBytes,
Expand Down Expand Up @@ -1642,7 +1642,7 @@ func TestBackupRestoreResume(t *testing.T) {
})
}

func getHighWaterMark(jobID int64, sqlDB *gosql.DB) (roachpb.Key, error) {
func getHighWaterMark(jobID jobspb.JobID, sqlDB *gosql.DB) (roachpb.Key, error) {
var progressBytes []byte
if err := sqlDB.QueryRow(
`SELECT progress FROM system.jobs WHERE id = $1`, jobID,
Expand Down Expand Up @@ -1897,7 +1897,7 @@ func TestRestoreFailCleanup(t *testing.T) {
// stress test, it is here for safety.
blockGC := make(chan struct{})
params.Knobs.GCJob = &sql.GCJobTestingKnobs{
RunBeforeResume: func(_ int64) error {
RunBeforeResume: func(_ jobspb.JobID) error {
<-blockGC
return nil
},
Expand Down Expand Up @@ -5043,7 +5043,7 @@ func TestFileIOLimits(t *testing.T) {
)
}

func waitForSuccessfulJob(t *testing.T, tc *testcluster.TestCluster, id int64) {
func waitForSuccessfulJob(t *testing.T, tc *testcluster.TestCluster, id jobspb.JobID) {
// Force newly created job to be adopted and verify it succeeds.
tc.Server(0).JobRegistry().(*jobs.Registry).TestingNudgeAdoptionQueue()
testutils.SucceedsSoon(t, func() error {
Expand All @@ -5065,7 +5065,7 @@ func TestDetachedBackup(t *testing.T) {
db := sqlDB.DB.(*gosql.DB)

// running backup under transaction requires DETACHED.
var jobID int64
var jobID jobspb.JobID
err := crdb.ExecuteTx(ctx, db, nil /* txopts */, func(tx *gosql.Tx) error {
return tx.QueryRow(`BACKUP DATABASE data TO $1`, LocalFoo).Scan(&jobID)
})
Expand Down Expand Up @@ -5118,7 +5118,7 @@ func TestDetachedRestore(t *testing.T) {
sqlDB.Exec(t, `CREATE DATABASE test`)

// Running RESTORE under transaction requires DETACHED.
var jobID int64
var jobID jobspb.JobID
err := crdb.ExecuteTx(ctx, db, nil /* txopts */, func(tx *gosql.Tx) error {
return tx.QueryRow(`RESTORE TABLE t FROM $1 WITH INTO_DB=test`, LocalFoo).Scan(&jobID)
})
Expand Down Expand Up @@ -5703,7 +5703,7 @@ func TestProtectedTimestampsDuringBackup(t *testing.T) {
func(t *testing.T, query string, sqlDB *sqlutils.SQLRunner) {
backupWithDetachedOption := query + ` WITH DETACHED`
db := sqlDB.DB.(*gosql.DB)
var jobID int64
var jobID jobspb.JobID
err := crdb.ExecuteTx(ctx, db, nil /* txopts */, func(tx *gosql.Tx) error {
return tx.QueryRow(backupWithDetachedOption).Scan(&jobID)
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/full_cluster_backup_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ func TestClusterRestoreFailCleanup(t *testing.T) {
blockCh := make(chan struct{})
defer close(blockCh)
params.Knobs.GCJob = &sql.GCJobTestingKnobs{
RunBeforeResume: func(_ int64) error { <-blockCh; return nil },
RunBeforeResume: func(_ jobspb.JobID) error { <-blockCh; return nil },
}

const numAccounts = 1000
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/manifest_handling.go
Original file line number Diff line number Diff line change
Expand Up @@ -994,6 +994,6 @@ func checkForPreviousBackup(
}

// tempCheckpointFileNameForJob returns temporary filename for backup manifest checkpoint.
func tempCheckpointFileNameForJob(jobID int64) string {
func tempCheckpointFileNameForJob(jobID jobspb.JobID) string {
return fmt.Sprintf("%s-%d", backupManifestCheckpointName, jobID)
}
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_schema_change_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func createSchemaChangeJobsFromMutations(
}
newMutationJob := descpb.TableDescriptor_MutationJob{
MutationID: mutationID,
JobID: jobID,
JobID: int64(jobID),
}
mutationJobs = append(mutationJobs, newMutationJob)

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/schedule_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func planBackup(
// NotifyJobTermination implements jobs.ScheduledJobExecutor interface.
func (e *scheduledBackupExecutor) NotifyJobTermination(
ctx context.Context,
jobID int64,
jobID jobspb.JobID,
jobStatus jobs.Status,
details jobspb.Details,
env scheduledjobs.JobSchedulerEnv,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/cdctest/testfeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ type jobFeed struct {
db *gosql.DB
flushCh chan struct{}

JobID int64
JobID jobspb.JobID
jobErr error
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func createProtectedTimestampRecord(
codec keys.SQLCodec,
pts protectedts.Storage,
txn *kv.Txn,
jobID int64,
jobID jobspb.JobID,
targets jobspb.ChangefeedTargets,
resolved hlc.Timestamp,
progress *jobspb.ChangefeedProgress,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ const (
func distChangefeedFlow(
ctx context.Context,
execCtx sql.JobExecContext,
jobID int64,
jobID jobspb.JobID,
details jobspb.ChangefeedDetails,
progress jobspb.Progress,
resultsCh chan<- tree.Datums,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2158,7 +2158,7 @@ func TestChangefeedDescription(t *testing.T) {
sink.Scheme = changefeedbase.SinkSchemeExperimentalSQL
sink.Path = `d`

var jobID int64
var jobID jobspb.JobID
sqlDB.QueryRow(t,
`CREATE CHANGEFEED FOR foo INTO $1 WITH updated, envelope = $2`, sink.String(), `wrapped`,
).Scan(&jobID)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeeddist/distflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var ChangefeedResultTypes = []*types.T{
func StartDistChangefeed(
ctx context.Context,
execCtx sql.JobExecContext,
jobID int64,
jobID jobspb.JobID,
details jobspb.ChangefeedDetails,
trackedSpans []roachpb.Span,
initialHighWater hlc.Timestamp,
Expand Down
16 changes: 8 additions & 8 deletions pkg/ccl/importccl/import_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,8 +542,8 @@ var _ jobs.Resumer = &cancellableImportResumer{}

type cancellableImportResumer struct {
ctx context.Context
jobIDCh chan int64
jobID int64
jobIDCh chan jobspb.JobID
jobID jobspb.JobID
onSuccessBarrier syncBarrier
wrapped *importResumer
}
Expand Down Expand Up @@ -590,7 +590,7 @@ type jobState struct {
prog jobspb.ImportProgress
}

func queryJob(db sqlutils.DBHandle, jobID int64) (js jobState) {
func queryJob(db sqlutils.DBHandle, jobID jobspb.JobID) (js jobState) {
js = jobState{
err: nil,
status: "",
Expand Down Expand Up @@ -623,7 +623,7 @@ func queryJob(db sqlutils.DBHandle, jobID int64) (js jobState) {

// Repeatedly queries job status/progress until specified function returns true.
func queryJobUntil(
t *testing.T, db sqlutils.DBHandle, jobID int64, isDone func(js jobState) bool,
t *testing.T, db sqlutils.DBHandle, jobID jobspb.JobID, isDone func(js jobState) bool,
) (js jobState) {
t.Helper()
for r := retry.Start(base.DefaultRetryOptions()); r.Next(); {
Expand Down Expand Up @@ -666,8 +666,8 @@ func TestCSVImportCanBeResumed(t *testing.T) {
defer sqlDB.Exec(t, `DROP TABLE t`)

jobCtx, cancelImport := context.WithCancel(ctx)
jobIDCh := make(chan int64)
var jobID int64 = -1
jobIDCh := make(chan jobspb.JobID)
var jobID jobspb.JobID = -1
var importSummary backupccl.RowCount

registry.TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{
Expand Down Expand Up @@ -772,10 +772,10 @@ func TestCSVImportMarksFilesFullyProcessed(t *testing.T) {
sqlDB.Exec(t, "CREATE TABLE t (id INT, data STRING)")
defer sqlDB.Exec(t, `DROP TABLE t`)

jobIDCh := make(chan int64)
jobIDCh := make(chan jobspb.JobID)
controllerBarrier, importBarrier := newSyncBarrier()

var jobID int64 = -1
var jobID jobspb.JobID = -1
var importSummary backupccl.RowCount

registry.TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{
Expand Down
12 changes: 6 additions & 6 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1070,7 +1070,7 @@ func protectTimestampForImport(
ctx context.Context,
p sql.PlanHookState,
txn *kv.Txn,
jobID int64,
jobID jobspb.JobID,
spansToProtect []roachpb.Span,
walltime int64,
importDetails jobspb.ImportDetails,
Expand Down Expand Up @@ -1270,14 +1270,14 @@ func writeNonDropDatabaseChange(
descsCol *descs.Collection,
p sql.JobExecContext,
jobDesc string,
) ([]int64, error) {
) ([]jobspb.JobID, error) {
var job *jobs.Job
var err error
if job, err = createNonDropDatabaseChangeJob(p.User(), desc.ID, jobDesc, p, txn); err != nil {
return nil, err
}

queuedJob := []int64{job.ID()}
queuedJob := []jobspb.JobID{job.ID()}
b := txn.NewBatch()
dg := catalogkv.NewOneLevelUncachedDescGetter(txn, p.ExecCfg().Codec)
if err := desc.Validate(ctx, dg); err != nil {
Expand Down Expand Up @@ -1801,7 +1801,7 @@ type preparedSchemaMetadata struct {
schemaRewrites backupccl.DescRewriteMap
newSchemaIDToName map[descpb.ID]string
oldSchemaIDToName map[descpb.ID]string
queuedSchemaJobs []int64
queuedSchemaJobs []jobspb.JobID
}

// Resume is part of the jobs.Resumer interface.
Expand Down Expand Up @@ -2143,7 +2143,7 @@ func (r *importResumer) OnFailOrCancel(ctx context.Context, execCtx interface{})
addToFileFormatTelemetry(details.Format.Format.String(), "failed")
cfg := execCtx.(sql.JobExecContext).ExecCfg()
lm, ie, db := cfg.LeaseManager, cfg.InternalExecutor, cfg.DB
var jobsToRunAfterTxnCommit []int64
var jobsToRunAfterTxnCommit []jobspb.JobID
if err := descs.Txn(ctx, cfg.Settings, lm, ie, db, func(
ctx context.Context, txn *kv.Txn, descsCol *descs.Collection,
) error {
Expand Down Expand Up @@ -2205,7 +2205,7 @@ func (r *importResumer) dropSchemas(
descsCol *descs.Collection,
execCfg *sql.ExecutorConfig,
p sql.JobExecContext,
) ([]int64, error) {
) ([]jobspb.JobID, error) {
details := r.job.Details().(jobspb.ImportDetails)

// If the prepare step of the import job was not completed then the
Expand Down
18 changes: 10 additions & 8 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1698,7 +1698,7 @@ func TestImportCSVStmt(t *testing.T) {
SQLMemoryPoolSize: 256 << 20,
ExternalIODir: baseDir,
Knobs: base.TestingKnobs{
GCJob: &sql.GCJobTestingKnobs{RunBeforeResume: func(_ int64) error { <-blockGC; return nil }},
GCJob: &sql.GCJobTestingKnobs{RunBeforeResume: func(_ jobspb.JobID) error { <-blockGC; return nil }},
},
}})
defer tc.Stopper().Stop(ctx)
Expand Down Expand Up @@ -4200,8 +4200,8 @@ func TestImportDefaultWithResume(t *testing.T) {
sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE t (%s)`, test.create))

jobCtx, cancelImport := context.WithCancel(ctx)
jobIDCh := make(chan int64)
var jobID int64 = -1
jobIDCh := make(chan jobspb.JobID)
var jobID jobspb.JobID = -1

registry.TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{
// Arrange for our special job resumer to be
Expand Down Expand Up @@ -4835,7 +4835,7 @@ func TestImportWorkerFailure(t *testing.T) {
case err := <-errCh:
t.Fatalf("%s: query returned before expected: %s", err, query)
}
var jobID int64
var jobID jobspb.JobID
sqlDB.QueryRow(t, `SELECT id FROM system.jobs ORDER BY created DESC LIMIT 1`).Scan(&jobID)

// Shut down a node. This should force LoadCSV to fail in its current
Expand Down Expand Up @@ -4934,7 +4934,7 @@ func TestImportLivenessWithRestart(t *testing.T) {
}
}
// Fetch the new job ID and lease since we know it's running now.
var jobID int64
var jobID jobspb.JobID
originalLease := &jobspb.Progress{}
{
var expectedLeaseBytes []byte
Expand Down Expand Up @@ -5066,7 +5066,7 @@ func TestImportLivenessWithLeniency(t *testing.T) {
}
}
// Fetch the new job ID and lease since we know it's running now.
var jobID int64
var jobID jobspb.JobID
originalLease := &jobspb.Payload{}
{
var expectedLeaseBytes []byte
Expand Down Expand Up @@ -6659,7 +6659,9 @@ func putUserfile(
return tx.Commit()
}

func waitForJobResult(t *testing.T, tc *testcluster.TestCluster, id int64, expected jobs.Status) {
func waitForJobResult(
t *testing.T, tc *testcluster.TestCluster, id jobspb.JobID, expected jobs.Status,
) {
// Force newly created job to be adopted and verify its result.
tc.Server(0).JobRegistry().(*jobs.Registry).TestingNudgeAdoptionQueue()
testutils.SucceedsSoon(t, func() error {
Expand Down Expand Up @@ -6695,7 +6697,7 @@ func TestDetachedImport(t *testing.T) {
importIntoQueryDetached := importIntoQuery + " WITH DETACHED"

// DETACHED import w/out transaction is okay.
var jobID int64
var jobID jobspb.JobID
sqlDB.QueryRow(t, importQueryDetached, simpleOcf).Scan(&jobID)
waitForJobResult(t, tc, jobID, jobs.StatusSucceeded)

Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/partitionccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ go_test(
"//pkg/ccl/utilccl",
"//pkg/config",
"//pkg/config/zonepb",
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/kv/kvserver",
"//pkg/roachpb",
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/partitionccl/drop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv"
Expand Down Expand Up @@ -51,7 +52,7 @@ func TestDropIndexWithZoneConfigCCL(t *testing.T) {
params, _ := tests.CreateTestServerParams()
params.Knobs = base.TestingKnobs{
GCJob: &sql.GCJobTestingKnobs{
RunBeforeResume: func(_ int64) error {
RunBeforeResume: func(_ jobspb.JobID) error {
<-asyncNotification
return nil
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func ingest(
execCtx sql.JobExecContext,
streamAddress streamingccl.StreamAddress,
progress jobspb.Progress,
jobID int64,
jobID jobspb.JobID,
) error {
// Initialize a stream client and resolve topology.
client, err := streamclient.NewStreamClient(streamAddress)
Expand Down
Loading