Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-24.1: cli/doctor, doctor: use right jobs table, skip dropped descs #124302

Merged
merged 2 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/cli/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ go_library(
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/sqlstats",
"//pkg/sql/stats",
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/storage/fs",
Expand Down
36 changes: 18 additions & 18 deletions pkg/cli/doctor.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,9 +387,9 @@ func fromZipDir(
return errors.Wrapf(err, "failed to parse descriptor id %s", fields[0])
}

descBytes, err := hx.DecodeString(fields[last])
if err != nil {
return errors.Wrapf(err, "failed to decode hex descriptor %d", i)
descBytes, ok := interpretString(fields[last])
if !ok {
return errors.Newf("failed to decode hex descriptor %d", i)
}
ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
descTable = append(descTable, doctor.DescriptorTableRow{ID: int64(i), DescBytes: descBytes, ModTime: ts})
Expand Down Expand Up @@ -507,8 +507,8 @@ func fromZipDir(
}

jobsTable = make(doctor.JobsTable, 0)
if checkIfFileExists(zipDirPath, "system.jobs.txt") {
if err := slurp(zipDirPath, "system.jobs.txt", func(row string) error {
if checkIfFileExists(zipDirPath, "crdb_internal.system_jobs.txt") {
if err := slurp(zipDirPath, "crdb_internal.system_jobs.txt", func(row string) error {
fields := strings.Fields(row)
md := jobs.JobMetadata{}
md.Status = jobs.Status(fields[1])
Expand All @@ -519,23 +519,23 @@ func fromZipDir(
}
md.ID = jobspb.JobID(id)

last := len(fields) - 1
payloadBytes, err := hx.DecodeString(fields[last-1])
if err != nil {
// TODO(postamar): remove this check once payload redaction is improved
if fields[last-1] == "NULL" {
return nil
}
return errors.Wrapf(err, "job %d: failed to decode hex payload", id)
// N.B. The "created" column takes 2 positions in our fields array.
payloadBytes, ok := interpretString(fields[4])
if !ok {
return errors.Newf("job %d: failed to decode hex payload", id)
}
md.Payload = &jobspb.Payload{}
if err := protoutil.Unmarshal(payloadBytes, md.Payload); err != nil {
return errors.Wrap(err, "failed unmarshalling job payload")
}

progressBytes, err := hx.DecodeString(fields[last])
if err != nil {
return errors.Wrapf(err, "job %d: failed to decode hex progress", id)
// Skip NULL job payloads.
if fields[5] == "NULL" {
return nil
}
progressBytes, ok := interpretString(fields[5])
if !ok {
return errors.Newf("job %d: failed to decode hex progress", id)
}
md.Progress = &jobspb.Progress{}
if err := protoutil.Unmarshal(progressBytes, md.Progress); err != nil {
Expand All @@ -554,8 +554,8 @@ func fromZipDir(
Payload string `json:"hex_payload"`
Progress string `json:"hex_progress"`
}, 0)
if err := parseJSONFile(zipDirPath, "system.jobs.json", &jobsTableJSON); err != nil {
return nil, nil, nil, errors.Wrapf(err, "failed to parse system.descriptor.json")
if err := parseJSONFile(zipDirPath, "crdb_internal.system_jobs.json", &jobsTableJSON); err != nil {
return nil, nil, nil, errors.Wrapf(err, "failed to parse crdb_internal.system_jobs.json")
}
for _, job := range jobsTableJSON {
row := jobs.JobMetadata{
Expand Down
199 changes: 198 additions & 1 deletion pkg/cli/doctor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package cli

import (
"fmt"
"strings"
"testing"

Expand All @@ -19,9 +20,139 @@ import (
"github.com/cockroachdb/datadriven"
)

func descriptorWithFKMutation(isDropped bool) string {
dropState := ``
dropTime := ``
if isDropped {
dropState = `"state": "DROP",`
dropTime = `"dropTime": "1713940113794672911",`
}
return `'{
"table": {
"columns": [
{
"id": 1,
"name": "i",
"type": {
"family": "IntFamily",
"oid": 20,
"width": 64
}
}
],` +
dropTime +
`"createAsOfTime": {
"logical": 1,
"wallTime": "1713940112376217646"
},
"families": [
{
"columnIds": [
1
],
"columnNames": [
"i"
],
"name": "primary"
}
],
"formatVersion": 3,
"id": 104,
"modificationTime": {},
"mutationJobs": [
{
"jobId": "962952277419655169",
"mutationId": 1
}
],
"mutations": [
{
"constraint": {
"check": {},
"constraintType": "FOREIGN_KEY",
"foreignKey": {
"constraintId": 2,
"name": "foo_foo_fk",
"onDelete": "CASCADE",
"onUpdate": "CASCADE",
"originColumnIds": [
1
],
"originTableId": 104,
"referencedColumnIds": [
1
],
"referencedTableId": 104,
"validity": "Validating"
},
"name": "foo_foo_fk",
"uniqueWithoutIndexConstraint": {}
},
"direction": "ADD",
"mutationId": 1,
"state": "DELETE_ONLY"
}
],
"name": "foo",
"nextColumnId": 2,
"nextConstraintId": 3,
"nextFamilyId": 1,
"nextIndexId": 2,
"nextMutationId": 2,
"parentId": 183,
"primaryIndex": {
"constraintId": 1,
"createdAtNanos": "1713940112106985000",
"encodingType": 1,
"foreignKey": {},
"geoConfig": {},
"id": 1,
"interleave": {},
"keyColumnDirections": [
"DESC"
],
"keyColumnIds": [
1
],
"keyColumnNames": [
"i"
],
"name": "table_w3_143_pkey",
"partitioning": {},
"sharded": {},
"unique": true,
"version": 4
},
"privileges": {
"ownerProto": "roachprod",
"users": [
{
"privileges": "2",
"userProto": "admin",
"withGrantOption": "2"
},
{
"privileges": "2",
"userProto": "root",
"withGrantOption": "2"
}
],
"version": 3
},
"replacementOf": {
"time": {}
},` +
dropState +
`"unexposedParentSchemaId": 381,
"version": "2"
}
}'`
}

// This test doctoring a secure cluster.
func TestDoctorCluster(t *testing.T) {
defer leaktest.AfterTest(t)()
//
c := NewCLITest(TestCLIParams{T: t})
defer c.Cleanup()

Expand Down Expand Up @@ -51,7 +182,72 @@ func TestDoctorCluster(t *testing.T) {
})
}

// This test the operation of zip over secure clusters.
// TestDoctorClusterBroken tests that debug doctor examine will pick up a multitude of issues on a corrupt descriptor.
func TestDoctorClusterBroken(t *testing.T) {
defer leaktest.AfterTest(t)()
c := NewCLITest(TestCLIParams{T: t, DisableAutoStats: true})
defer c.Cleanup()

desc := fmt.Sprintf("SELECT crdb_internal.unsafe_upsert_descriptor('foo'::regclass::oid::int,"+
"crdb_internal.json_to_pb('cockroach.sql.sqlbase.Descriptor', %s::jsonb), true)", descriptorWithFKMutation(false /* isDropped */))

// Introduce a descriptor with an attached job mutation (along with other issues). We want to ensure that the number of
// jobs created is deterministic (auto table stats will be collected due to the "schema change" on foo); therefore,
// we should disable automatic stats collection and instead create our own.
c.RunWithArgs([]string{"sql", "-e", strings.Join([]string{
"CREATE TABLE foo (i INT)",
desc,
"CREATE STATISTICS foo_stats FROM foo",
"SELECT pg_catalog.pg_sleep(1)",
}, ";\n"),
})

t.Run("examine", func(t *testing.T) {
out, err := c.RunWithCapture("debug doctor examine cluster")
if err != nil {
t.Fatal(err)
}

// Using datadriven allows TESTFLAGS=-rewrite.
datadriven.RunTest(t, datapathutils.TestDataPath(t, "doctor", "test_examine_cluster_jobs"), func(t *testing.T, td *datadriven.TestData) string {
return out
})
})
}

// TestDoctorClusterDropped tests that debug doctor examine will avoid validating dropped descriptors.
func TestDoctorClusterDropped(t *testing.T) {
defer leaktest.AfterTest(t)()
c := NewCLITest(TestCLIParams{T: t})
defer c.Cleanup()

desc := fmt.Sprintf("SELECT crdb_internal.unsafe_upsert_descriptor('foo'::regclass::oid::int,"+
"crdb_internal.json_to_pb('cockroach.sql.sqlbase.Descriptor', %s::jsonb), true)", descriptorWithFKMutation(true /* isDropped */))
// Introduce a dropped descriptor with an attached job mutation (along with other issues).
c.RunWithArgs([]string{"sql", "-e", strings.Join([]string{
"CREATE TABLE foo (i INT)",
desc,
"INSERT INTO system.users VALUES ('node', NULL, true, 3)",
"GRANT node TO root",
"DELETE FROM system.namespace WHERE name = 'foo'",
"SELECT pg_catalog.pg_sleep(1)",
}, ";\n"),
})

t.Run("examine", func(t *testing.T) {
out, err := c.RunWithCapture("debug doctor examine cluster")
if err != nil {
t.Fatal(err)
}

// Using datadriven allows TESTFLAGS=-rewrite.
datadriven.RunTest(t, datapathutils.TestDataPath(t, "doctor", "test_examine_cluster_dropped"), func(t *testing.T, td *datadriven.TestData) string {
return out
})
})
}

// TestDoctorZipDir tests the operation of zip over secure clusters.
func TestDoctorZipDir(t *testing.T) {
defer leaktest.AfterTest(t)()
c := NewCLITest(TestCLIParams{T: t, NoServer: true})
Expand All @@ -69,6 +265,7 @@ func TestDoctorZipDir(t *testing.T) {
})
})

// Regression test (for #104347) to ensure that quoted table names get properly parsed in system.namespace.
t.Run("examine", func(t *testing.T) {
out, err := c.RunWithCapture("debug doctor examine zipdir testdata/doctor/debugzip-with-quotes")
if err != nil {
Expand Down
Loading