Skip to content

Commit

Permalink
Merge #123298
Browse files Browse the repository at this point in the history
123298: cli/doctor, doctor: use right jobs table, skip dropped descs r=fqazi a=annrpom

### cli/doctor: doctor should read from the right jobs table
In #97762, we started writing a job's payload (and progress)
information to the `system.jobs_info` table. As a result, we
had to change the parts of our code that relied on the `system.jobs`
table to use `crdb_internal.system_jobs` instead (since that table
would inaccurately report that some `payload`s were `NULL`).
This change did not occur for the in-memory representation of the jobs
table created by debug doctor -- which can result in missing job
false-positives. This patch updates debug doctor's representation of
the jobs table by referring to `crdb_internal.system_jobs` instead.

Epic: none
Fixes: #122675

Release note: None


---

### doctor: skip validation for dropped descriptors
In some cases, dropped descriptors appear in our `system.descriptors`
table with dangling job mutations without an associated job.
This patch teaches debug doctor examine to skip validation
on such dropped descriptors.

Epic: none

Fixes: #123477
Fixes: #122956

Release note: none

Co-authored-by: Annie Pompa <[email protected]>
  • Loading branch information
craig[bot] and annrpom committed May 16, 2024
2 parents 6fa15a6 + 52a697b commit 6a45828
Show file tree
Hide file tree
Showing 20 changed files with 664 additions and 296 deletions.
1 change: 1 addition & 0 deletions pkg/cli/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ go_library(
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/sqlstats",
"//pkg/sql/stats",
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/storage/fs",
Expand Down
36 changes: 18 additions & 18 deletions pkg/cli/doctor.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,9 +387,9 @@ func fromZipDir(
return errors.Wrapf(err, "failed to parse descriptor id %s", fields[0])
}

descBytes, err := hx.DecodeString(fields[last])
if err != nil {
return errors.Wrapf(err, "failed to decode hex descriptor %d", i)
descBytes, ok := interpretString(fields[last])
if !ok {
return errors.Newf("failed to decode hex descriptor %d", i)
}
ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
descTable = append(descTable, doctor.DescriptorTableRow{ID: int64(i), DescBytes: descBytes, ModTime: ts})
Expand Down Expand Up @@ -507,8 +507,8 @@ func fromZipDir(
}

jobsTable = make(doctor.JobsTable, 0)
if checkIfFileExists(zipDirPath, "system.jobs.txt") {
if err := slurp(zipDirPath, "system.jobs.txt", func(row string) error {
if checkIfFileExists(zipDirPath, "crdb_internal.system_jobs.txt") {
if err := slurp(zipDirPath, "crdb_internal.system_jobs.txt", func(row string) error {
fields := strings.Fields(row)
md := jobs.JobMetadata{}
md.Status = jobs.Status(fields[1])
Expand All @@ -519,23 +519,23 @@ func fromZipDir(
}
md.ID = jobspb.JobID(id)

last := len(fields) - 1
payloadBytes, err := hx.DecodeString(fields[last-1])
if err != nil {
// TODO(postamar): remove this check once payload redaction is improved
if fields[last-1] == "NULL" {
return nil
}
return errors.Wrapf(err, "job %d: failed to decode hex payload", id)
// N.B. The "created" column takes 2 positions in our fields array.
payloadBytes, ok := interpretString(fields[4])
if !ok {
return errors.Newf("job %d: failed to decode hex payload", id)
}
md.Payload = &jobspb.Payload{}
if err := protoutil.Unmarshal(payloadBytes, md.Payload); err != nil {
return errors.Wrap(err, "failed unmarshalling job payload")
}

progressBytes, err := hx.DecodeString(fields[last])
if err != nil {
return errors.Wrapf(err, "job %d: failed to decode hex progress", id)
// Skip NULL job payloads.
if fields[5] == "NULL" {
return nil
}
progressBytes, ok := interpretString(fields[5])
if !ok {
return errors.Newf("job %d: failed to decode hex progress", id)
}
md.Progress = &jobspb.Progress{}
if err := protoutil.Unmarshal(progressBytes, md.Progress); err != nil {
Expand All @@ -554,8 +554,8 @@ func fromZipDir(
Payload string `json:"hex_payload"`
Progress string `json:"hex_progress"`
}, 0)
if err := parseJSONFile(zipDirPath, "system.jobs.json", &jobsTableJSON); err != nil {
return nil, nil, nil, errors.Wrapf(err, "failed to parse system.descriptor.json")
if err := parseJSONFile(zipDirPath, "crdb_internal.system_jobs.json", &jobsTableJSON); err != nil {
return nil, nil, nil, errors.Wrapf(err, "failed to parse crdb_internal.system_jobs.json")
}
for _, job := range jobsTableJSON {
row := jobs.JobMetadata{
Expand Down
199 changes: 198 additions & 1 deletion pkg/cli/doctor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package cli

import (
"fmt"
"strings"
"testing"

Expand All @@ -19,9 +20,139 @@ import (
"github.com/cockroachdb/datadriven"
)

func descriptorWithFKMutation(isDropped bool) string {
dropState := ``
dropTime := ``
if isDropped {
dropState = `"state": "DROP",`
dropTime = `"dropTime": "1713940113794672911",`
}
return `'{
"table": {
"columns": [
{
"id": 1,
"name": "i",
"type": {
"family": "IntFamily",
"oid": 20,
"width": 64
}
}
],` +
dropTime +
`"createAsOfTime": {
"logical": 1,
"wallTime": "1713940112376217646"
},
"families": [
{
"columnIds": [
1
],
"columnNames": [
"i"
],
"name": "primary"
}
],
"formatVersion": 3,
"id": 104,
"modificationTime": {},
"mutationJobs": [
{
"jobId": "962952277419655169",
"mutationId": 1
}
],
"mutations": [
{
"constraint": {
"check": {},
"constraintType": "FOREIGN_KEY",
"foreignKey": {
"constraintId": 2,
"name": "foo_foo_fk",
"onDelete": "CASCADE",
"onUpdate": "CASCADE",
"originColumnIds": [
1
],
"originTableId": 104,
"referencedColumnIds": [
1
],
"referencedTableId": 104,
"validity": "Validating"
},
"name": "foo_foo_fk",
"uniqueWithoutIndexConstraint": {}
},
"direction": "ADD",
"mutationId": 1,
"state": "DELETE_ONLY"
}
],
"name": "foo",
"nextColumnId": 2,
"nextConstraintId": 3,
"nextFamilyId": 1,
"nextIndexId": 2,
"nextMutationId": 2,
"parentId": 183,
"primaryIndex": {
"constraintId": 1,
"createdAtNanos": "1713940112106985000",
"encodingType": 1,
"foreignKey": {},
"geoConfig": {},
"id": 1,
"interleave": {},
"keyColumnDirections": [
"DESC"
],
"keyColumnIds": [
1
],
"keyColumnNames": [
"i"
],
"name": "table_w3_143_pkey",
"partitioning": {},
"sharded": {},
"unique": true,
"version": 4
},
"privileges": {
"ownerProto": "roachprod",
"users": [
{
"privileges": "2",
"userProto": "admin",
"withGrantOption": "2"
},
{
"privileges": "2",
"userProto": "root",
"withGrantOption": "2"
}
],
"version": 3
},
"replacementOf": {
"time": {}
},` +
dropState +
`"unexposedParentSchemaId": 381,
"version": "2"
}
}'`
}

// This test doctoring a secure cluster.
func TestDoctorCluster(t *testing.T) {
defer leaktest.AfterTest(t)()
//
c := NewCLITest(TestCLIParams{T: t})
defer c.Cleanup()

Expand Down Expand Up @@ -51,7 +182,72 @@ func TestDoctorCluster(t *testing.T) {
})
}

// This test the operation of zip over secure clusters.
// TestDoctorClusterBroken tests that debug doctor examine will pick up a multitude of issues on a corrupt descriptor.
func TestDoctorClusterBroken(t *testing.T) {
defer leaktest.AfterTest(t)()
c := NewCLITest(TestCLIParams{T: t, DisableAutoStats: true})
defer c.Cleanup()

desc := fmt.Sprintf("SELECT crdb_internal.unsafe_upsert_descriptor('foo'::regclass::oid::int,"+
"crdb_internal.json_to_pb('cockroach.sql.sqlbase.Descriptor', %s::jsonb), true)", descriptorWithFKMutation(false /* isDropped */))

// Introduce a descriptor with an attached job mutation (along with other issues). We want to ensure that the number of
// jobs created is deterministic (auto table stats will be collected due to the "schema change" on foo); therefore,
// we should disable automatic stats collection and instead create our own.
c.RunWithArgs([]string{"sql", "-e", strings.Join([]string{
"CREATE TABLE foo (i INT)",
desc,
"CREATE STATISTICS foo_stats FROM foo",
"SELECT pg_catalog.pg_sleep(1)",
}, ";\n"),
})

t.Run("examine", func(t *testing.T) {
out, err := c.RunWithCapture("debug doctor examine cluster")
if err != nil {
t.Fatal(err)
}

// Using datadriven allows TESTFLAGS=-rewrite.
datadriven.RunTest(t, datapathutils.TestDataPath(t, "doctor", "test_examine_cluster_jobs"), func(t *testing.T, td *datadriven.TestData) string {
return out
})
})
}

// TestDoctorClusterDropped tests that debug doctor examine will avoid validating dropped descriptors.
func TestDoctorClusterDropped(t *testing.T) {
defer leaktest.AfterTest(t)()
c := NewCLITest(TestCLIParams{T: t})
defer c.Cleanup()

desc := fmt.Sprintf("SELECT crdb_internal.unsafe_upsert_descriptor('foo'::regclass::oid::int,"+
"crdb_internal.json_to_pb('cockroach.sql.sqlbase.Descriptor', %s::jsonb), true)", descriptorWithFKMutation(true /* isDropped */))
// Introduce a dropped descriptor with an attached job mutation (along with other issues).
c.RunWithArgs([]string{"sql", "-e", strings.Join([]string{
"CREATE TABLE foo (i INT)",
desc,
"INSERT INTO system.users VALUES ('node', NULL, true, 3)",
"GRANT node TO root",
"DELETE FROM system.namespace WHERE name = 'foo'",
"SELECT pg_catalog.pg_sleep(1)",
}, ";\n"),
})

t.Run("examine", func(t *testing.T) {
out, err := c.RunWithCapture("debug doctor examine cluster")
if err != nil {
t.Fatal(err)
}

// Using datadriven allows TESTFLAGS=-rewrite.
datadriven.RunTest(t, datapathutils.TestDataPath(t, "doctor", "test_examine_cluster_dropped"), func(t *testing.T, td *datadriven.TestData) string {
return out
})
})
}

// TestDoctorZipDir tests the operation of zip over secure clusters.
func TestDoctorZipDir(t *testing.T) {
defer leaktest.AfterTest(t)()
c := NewCLITest(TestCLIParams{T: t, NoServer: true})
Expand All @@ -69,6 +265,7 @@ func TestDoctorZipDir(t *testing.T) {
})
})

// Regression test (for #104347) to ensure that quoted table names get properly parsed in system.namespace.
t.Run("examine", func(t *testing.T) {
out, err := c.RunWithCapture("debug doctor examine zipdir testdata/doctor/debugzip-with-quotes")
if err != nil {
Expand Down
Loading

0 comments on commit 6a45828

Please sign in to comment.