Skip to content

Commit

Permalink
cli/doctor: doctor should read from the right jobs table
Browse files Browse the repository at this point in the history
In #97762, we started writing a job's payload (and progress)
information to the `system.jobs_info` table. As a result, we
had to change the parts of our code that relied on the `system.jobs`
table to use `crdb_internal.system_jobs` instead (since that table
would inaccurately report that some `payload`s were `NULL`).
This change did not occur for the in-memory representation of the jobs
table created by debug doctor -- which can result in missing job
false-positives. This patch updates debug doctor's representation of
the jobs table by referring to `crdb_internal.system_jobs` instead.

Epic: none
Fixes: #122675

Release note: None
  • Loading branch information
annrpom committed May 6, 2024
1 parent 37e0fc5 commit 4416ff0
Show file tree
Hide file tree
Showing 14 changed files with 589 additions and 287 deletions.
35 changes: 18 additions & 17 deletions pkg/cli/doctor.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,9 +387,9 @@ func fromZipDir(
return errors.Wrapf(err, "failed to parse descriptor id %s", fields[0])
}

descBytes, err := hx.DecodeString(fields[last])
if err != nil {
return errors.Wrapf(err, "failed to decode hex descriptor %d", i)
descBytes, ok := interpretString(fields[last])
if !ok {
return errors.Newf("failed to decode hex descriptor %d", i)
}
ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
descTable = append(descTable, doctor.DescriptorTableRow{ID: int64(i), DescBytes: descBytes, ModTime: ts})
Expand Down Expand Up @@ -507,8 +507,8 @@ func fromZipDir(
}

jobsTable = make(doctor.JobsTable, 0)
if checkIfFileExists(zipDirPath, "system.jobs.txt") {
if err := slurp(zipDirPath, "system.jobs.txt", func(row string) error {
if checkIfFileExists(zipDirPath, "crdb_internal.system_jobs.txt") {
if err := slurp(zipDirPath, "crdb_internal.system_jobs.txt", func(row string) error {
fields := strings.Fields(row)
md := jobs.JobMetadata{}
md.Status = jobs.Status(fields[1])
Expand All @@ -519,23 +519,23 @@ func fromZipDir(
}
md.ID = jobspb.JobID(id)

last := len(fields) - 1
payloadBytes, err := hx.DecodeString(fields[last-1])
if err != nil {
// TODO(postamar): remove this check once payload redaction is improved
if fields[last-1] == "NULL" {
return nil
}
return errors.Wrapf(err, "job %d: failed to decode hex payload", id)
// N.B. The "created" column takes 2 positions in our fields array.
payloadBytes, ok := interpretString(fields[4])
if !ok {
return errors.Newf("job %d: failed to decode hex payload", id)
}
md.Payload = &jobspb.Payload{}
if err := protoutil.Unmarshal(payloadBytes, md.Payload); err != nil {
return errors.Wrap(err, "failed unmarshalling job payload")
}

progressBytes, err := hx.DecodeString(fields[last])
if err != nil {
return errors.Wrapf(err, "job %d: failed to decode hex progress", id)
// when can progress be null? if it exists but we have never started it?
if fields[5] == "NULL" {
return nil
}
progressBytes, ok := interpretString(fields[5])
if !ok {
return errors.Newf("job %d: failed to decode hex progress", id)
}
md.Progress = &jobspb.Progress{}
if err := protoutil.Unmarshal(progressBytes, md.Progress); err != nil {
Expand All @@ -554,8 +554,9 @@ func fromZipDir(
Payload string `json:"hex_payload"`
Progress string `json:"hex_progress"`
}, 0)
// TODO(before merge): when does this happen?
if err := parseJSONFile(zipDirPath, "system.jobs.json", &jobsTableJSON); err != nil {
return nil, nil, nil, errors.Wrapf(err, "failed to parse system.descriptor.json")
return nil, nil, nil, errors.Wrapf(err, "failed to parse system.jobs.json")
}
for _, job := range jobsTableJSON {
row := jobs.JobMetadata{
Expand Down
152 changes: 151 additions & 1 deletion pkg/cli/doctor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package cli

import (
"fmt"
"strings"
"testing"

Expand All @@ -19,6 +20,125 @@ import (
"github.com/cockroachdb/datadriven"
)

var descriptorWithFKMutation = `'{
"table": {
"columns": [
{
"id": 1,
"name": "i",
"type": {
"family": "IntFamily",
"oid": 20,
"width": 64
}
}
],
"createAsOfTime": {
"logical": 1,
"wallTime": "1713940112376217646"
},
"families": [
{
"columnIds": [
1
],
"columnNames": [
"i"
],
"name": "primary"
}
],
"formatVersion": 3,
"id": 104,
"modificationTime": {},
"mutationJobs": [
{
"jobId": "962952277419655169",
"mutationId": 1
}
],
"mutations": [
{
"constraint": {
"check": {},
"constraintType": "FOREIGN_KEY",
"foreignKey": {
"constraintId": 2,
"name": "foo_foo_fk",
"onDelete": "CASCADE",
"onUpdate": "CASCADE",
"originColumnIds": [
1
],
"originTableId": 104,
"referencedColumnIds": [
1
],
"referencedTableId": 104,
"validity": "Validating"
},
"name": "foo_foo_fk",
"uniqueWithoutIndexConstraint": {}
},
"direction": "ADD",
"mutationId": 1,
"state": "DELETE_ONLY"
}
],
"name": "bar",
"nextColumnId": 2,
"nextConstraintId": 3,
"nextFamilyId": 1,
"nextIndexId": 2,
"nextMutationId": 2,
"parentId": 183,
"primaryIndex": {
"constraintId": 1,
"createdAtNanos": "1713940112106985000",
"encodingType": 1,
"foreignKey": {},
"geoConfig": {},
"id": 1,
"interleave": {},
"keyColumnDirections": [
"DESC"
],
"keyColumnIds": [
1
],
"keyColumnNames": [
"col143_w3_144"
],
"name": "table_w3_143_pkey",
"partitioning": {},
"sharded": {},
"unique": true,
"version": 4
},
"privileges": {
"ownerProto": "roachprod",
"users": [
{
"privileges": "2",
"userProto": "admin",
"withGrantOption": "2"
},
{
"privileges": "2",
"userProto": "root",
"withGrantOption": "2"
}
],
"version": 3
},
"replacementOf": {
"time": {}
},
"unexposedParentSchemaId": 381,
"version": "2"
}
}'`

// This test doctoring a secure cluster.
func TestDoctorCluster(t *testing.T) {
defer leaktest.AfterTest(t)()
Expand Down Expand Up @@ -51,7 +171,36 @@ func TestDoctorCluster(t *testing.T) {
})
}

// This test the operation of zip over secure clusters.
// TestDoctorClusterBroken tests that debug doctor examine will pick up a multitude of issues on a corrupt descriptor.
func TestDoctorClusterBroken(t *testing.T) {
defer leaktest.AfterTest(t)()
c := NewCLITest(TestCLIParams{T: t})
defer c.Cleanup()

jobDesc := fmt.Sprintf("SELECT crdb_internal.unsafe_upsert_descriptor('foo'::regclass::oid::int,"+
"crdb_internal.json_to_pb('cockroach.sql.sqlbase.Descriptor', %s::jsonb), true)", descriptorWithFKMutation)

// Introduce a descriptor with an attached job mutation (along with other issues).
c.RunWithArgs([]string{"sql", "-e", strings.Join([]string{
"CREATE TABLE foo (i INT)",
jobDesc,
}, ";\n"),
})

t.Run("examine", func(t *testing.T) {
out, err := c.RunWithCapture("debug doctor examine cluster")
if err != nil {
t.Fatal(err)
}

// Using datadriven allows TESTFLAGS=-rewrite.
datadriven.RunTest(t, datapathutils.TestDataPath(t, "doctor", "test_examine_cluster_jobs"), func(t *testing.T, td *datadriven.TestData) string {
return out
})
})
}

// TestDoctorZipDir tests the operation of zip over secure clusters.
func TestDoctorZipDir(t *testing.T) {
defer leaktest.AfterTest(t)()
c := NewCLITest(TestCLIParams{T: t, NoServer: true})
Expand All @@ -69,6 +218,7 @@ func TestDoctorZipDir(t *testing.T) {
})
})

// Regression test (for #104347) to ensure that quoted table names get properly parsed in system.namespace.
t.Run("examine", func(t *testing.T) {
out, err := c.RunWithCapture("debug doctor examine zipdir testdata/doctor/debugzip-with-quotes")
if err != nil {
Expand Down
Loading

0 comments on commit 4416ff0

Please sign in to comment.