From abce491c3793522bbf83ccbf8585bb2eb8aaab77 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Tue, 7 Feb 2023 16:21:13 -0500 Subject: [PATCH] jobs: ensure newly created adoptable jobs populate the job_type column The `job_type` column in the `system.jobs` table is not being populated when jobs are created by `CreateAdoptableJobWithTxn`. This change updates that function to populate the column. Release note: none Epic: none --- pkg/jobs/registry.go | 25 +++++------ .../upgrades/alter_jobs_add_job_type_test.go | 41 +++++++++++++++++++ 2 files changed, 54 insertions(+), 12 deletions(-) diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index a8f47562e251..382b32d2cbc7 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -626,25 +626,26 @@ func (r *Registry) CreateAdoptableJobWithTxn( createdByType = j.createdBy.Name createdByID = j.createdBy.ID } + typ := j.mu.payload.Type().String() + nCols := 7 + cols := [7]string{"id", "status", "payload", "progress", "created_by_type", "created_by_id", "job_type"} + placeholders := [7]string{"$1", "$2", "$3", "$4", "$5", "$6", "$7"} + values := [7]interface{}{jobID, StatusRunning, payloadBytes, progressBytes, createdByType, createdByID, typ} + if !r.settings.Version.IsActive(ctx, clusterversion.V23_1AddTypeColumnToJobsTable) { + nCols -= 1 + } // Insert the job row, but do not set a `claim_session_id`. By not // setting the claim, the job can be adopted by any node and will // be adopted by the node which next runs the adoption loop. - const stmt = `INSERT - INTO system.jobs ( - id, - status, - payload, - progress, - created_by_type, - created_by_id - ) -VALUES ($1, $2, $3, $4, $5, $6);` + stmt := fmt.Sprintf( + `INSERT INTO system.jobs (%s) VALUES (%s);`, + strings.Join(cols[:nCols], ","), strings.Join(placeholders[:nCols], ","), + ) _, err = txn.ExecEx(ctx, "job-insert", txn.KV(), sessiondata.InternalExecutorOverride{ User: username.NodeUserName(), Database: catconstants.SystemDatabaseName, - }, stmt, - jobID, StatusRunning, payloadBytes, progressBytes, createdByType, createdByID) + }, stmt, values[:nCols]...) return err } run := r.db.Txn diff --git a/pkg/upgrade/upgrades/alter_jobs_add_job_type_test.go b/pkg/upgrade/upgrades/alter_jobs_add_job_type_test.go index 3738ee88931a..06b8f19ebc6d 100644 --- a/pkg/upgrade/upgrades/alter_jobs_add_job_type_test.go +++ b/pkg/upgrade/upgrades/alter_jobs_add_job_type_test.go @@ -12,6 +12,7 @@ package upgrades_test import ( "context" + "fmt" "strings" "testing" @@ -29,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/privilege" "github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/upgrade/upgrades" "github.com/cockroachdb/cockroach/pkg/util/intsets" @@ -53,6 +55,45 @@ func (*fakeResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}, job return jobErr } +// TestCreateAdoptableJobPopulatesJobType verifies that the job_type column in system.jobs is populated +// by CreateAdoptableJobWithTxn after upgrading to V23_1AddTypeColumnToJobsTable. +func TestCreateAdoptableJobPopulatesJobType(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + clusterArgs := base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: make(chan struct{}), + BinaryVersionOverride: clusterversion.ByKey( + clusterversion.V23_1AddTypeColumnToJobsTable), + }, + }, + }, + } + + var ( + ctx = context.Background() + tc = testcluster.StartTestCluster(t, 1, clusterArgs) + s = tc.Server(0) + sqlDB = tc.ServerConn(0) + ) + defer tc.Stopper().Stop(ctx) + + record := jobs.Record{ + Description: "fake job", + Username: username.TestUserName(), + Details: jobspb.ImportDetails{}, + Progress: jobspb.ImportProgress{}, + } + + j, err := s.JobRegistry().(*jobs.Registry).CreateAdoptableJobWithTxn(ctx, record, 0, nil) + require.NoError(t, err) + runner := sqlutils.MakeSQLRunner(sqlDB) + runner.CheckQueryResults(t, fmt.Sprintf("SELECT job_type from system.jobs WHERE id = %d", j.ID()), [][]string{{"IMPORT"}}) +} + // TestAlterSystemJobsTableAddJobTypeColumn verifies that the migrations that add & backfill // the type column to system.jobs succeed. func TestAlterSystemJobsTableAddJobTypeColumn(t *testing.T) {