Skip to content

Commit

Permalink
jobs: ensure newly created adoptable jobs populate the job_type column
Browse files Browse the repository at this point in the history
The `job_type` column in the `system.jobs` table is not being populated
when jobs are created by `CreateAdoptableJobWithTxn`. This change updates
that function to populate the column.

Release note: none
Epic: none
  • Loading branch information
jayshrivastava committed Feb 7, 2023
1 parent e75ede3 commit abce491
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 12 deletions.
25 changes: 13 additions & 12 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,25 +626,26 @@ func (r *Registry) CreateAdoptableJobWithTxn(
createdByType = j.createdBy.Name
createdByID = j.createdBy.ID
}
typ := j.mu.payload.Type().String()

nCols := 7
cols := [7]string{"id", "status", "payload", "progress", "created_by_type", "created_by_id", "job_type"}
placeholders := [7]string{"$1", "$2", "$3", "$4", "$5", "$6", "$7"}
values := [7]interface{}{jobID, StatusRunning, payloadBytes, progressBytes, createdByType, createdByID, typ}
if !r.settings.Version.IsActive(ctx, clusterversion.V23_1AddTypeColumnToJobsTable) {
nCols -= 1
}
// Insert the job row, but do not set a `claim_session_id`. By not
// setting the claim, the job can be adopted by any node and will
// be adopted by the node which next runs the adoption loop.
const stmt = `INSERT
INTO system.jobs (
id,
status,
payload,
progress,
created_by_type,
created_by_id
)
VALUES ($1, $2, $3, $4, $5, $6);`
stmt := fmt.Sprintf(
`INSERT INTO system.jobs (%s) VALUES (%s);`,
strings.Join(cols[:nCols], ","), strings.Join(placeholders[:nCols], ","),
)
_, err = txn.ExecEx(ctx, "job-insert", txn.KV(), sessiondata.InternalExecutorOverride{
User: username.NodeUserName(),
Database: catconstants.SystemDatabaseName,
}, stmt,
jobID, StatusRunning, payloadBytes, progressBytes, createdByType, createdByID)
}, stmt, values[:nCols]...)
return err
}
run := r.db.Txn
Expand Down
41 changes: 41 additions & 0 deletions pkg/upgrade/upgrades/alter_jobs_add_job_type_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package upgrades_test

import (
"context"
"fmt"
"strings"
"testing"

Expand All @@ -29,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/upgrade/upgrades"
"github.com/cockroachdb/cockroach/pkg/util/intsets"
Expand All @@ -53,6 +55,45 @@ func (*fakeResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}, job
return jobErr
}

// TestCreateAdoptableJobPopulatesJobType verifies that the job_type column in system.jobs is populated
// by CreateAdoptableJobWithTxn after upgrading to V23_1AddTypeColumnToJobsTable.
func TestCreateAdoptableJobPopulatesJobType(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

clusterArgs := base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
DisableAutomaticVersionUpgrade: make(chan struct{}),
BinaryVersionOverride: clusterversion.ByKey(
clusterversion.V23_1AddTypeColumnToJobsTable),
},
},
},
}

var (
ctx = context.Background()
tc = testcluster.StartTestCluster(t, 1, clusterArgs)
s = tc.Server(0)
sqlDB = tc.ServerConn(0)
)
defer tc.Stopper().Stop(ctx)

record := jobs.Record{
Description: "fake job",
Username: username.TestUserName(),
Details: jobspb.ImportDetails{},
Progress: jobspb.ImportProgress{},
}

j, err := s.JobRegistry().(*jobs.Registry).CreateAdoptableJobWithTxn(ctx, record, 0, nil)
require.NoError(t, err)
runner := sqlutils.MakeSQLRunner(sqlDB)
runner.CheckQueryResults(t, fmt.Sprintf("SELECT job_type from system.jobs WHERE id = %d", j.ID()), [][]string{{"IMPORT"}})
}

// TestAlterSystemJobsTableAddJobTypeColumn verifies that the migrations that add & backfill
// the type column to system.jobs succeed.
func TestAlterSystemJobsTableAddJobTypeColumn(t *testing.T) {
Expand Down

0 comments on commit abce491

Please sign in to comment.