diff --git a/pkg/server/admin.go b/pkg/server/admin.go index 3640db004da8..55605108004a 100644 --- a/pkg/server/admin.go +++ b/pkg/server/admin.go @@ -1151,6 +1151,7 @@ func (s *adminServer) Jobs( } for i, row := range rows { job := &resp.Jobs[i] + var fractionCompletedOrNil *float32 if err := scanner.ScanAll( row, &job.ID, @@ -1163,11 +1164,14 @@ func (s *adminServer) Jobs( &job.Started, &job.Finished, &job.Modified, - &job.FractionCompleted, + &fractionCompletedOrNil, &job.Error, ); err != nil { return nil, s.serverError(err) } + if fractionCompletedOrNil != nil { + job.FractionCompleted = *fractionCompletedOrNil + } } return &resp, nil @@ -1662,6 +1666,18 @@ func (rs resultScanner) ScanIndex(row tree.Datums, index int, dst interface{}) e } *d = float32(*s) + case **float32: + s, ok := src.(*tree.DFloat) + if !ok { + if src != tree.DNull { + return errors.Errorf("source type assertion failed") + } + *d = nil + break + } + val := float32(*s) + *d = &val + case *int64: s, ok := tree.AsDInt(src) if !ok { @@ -1698,7 +1714,7 @@ func (rs resultScanner) ScanIndex(row tree.Datums, index int, dst interface{}) e return errors.Errorf("source type assertion failed") } *d = nil - return nil + break } *d = &s.Time diff --git a/pkg/server/admin_test.go b/pkg/server/admin_test.go index 88d3bcbd523e..5514f47b585e 100644 --- a/pkg/server/admin_test.go +++ b/pkg/server/admin_test.go @@ -1098,6 +1098,7 @@ func TestAdminAPIJobs(t *testing.T) { {1, jobs.StatusRunning, jobspb.RestoreDetails{}, jobspb.RestoreProgress{}}, {2, jobs.StatusRunning, jobspb.BackupDetails{}, jobspb.BackupProgress{}}, {3, jobs.StatusSucceeded, jobspb.BackupDetails{}, jobspb.BackupProgress{}}, + {4, jobs.StatusRunning, jobspb.ChangefeedDetails{}, jobspb.ChangefeedProgress{}}, } for _, job := range testJobs { payload := jobspb.Payload{Details: jobspb.WrapPayloadDetails(job.details)} @@ -1105,7 +1106,20 @@ func TestAdminAPIJobs(t *testing.T) { if err != nil { t.Fatal(err) } + progress := jobspb.Progress{Details: jobspb.WrapProgressDetails(job.progress)} + // Populate progress.Progress field with a specific progress type based on + // the job type. + if _, ok := job.progress.(jobspb.ChangefeedProgress); ok { + progress.Progress = &jobspb.Progress_HighWater{ + HighWater: &hlc.Timestamp{}, + } + } else { + progress.Progress = &jobspb.Progress_FractionCompleted{ + FractionCompleted: 1.0, + } + } + progressBytes, err := protoutil.Marshal(&progress) if err != nil { t.Fatal(err) @@ -1122,9 +1136,9 @@ func TestAdminAPIJobs(t *testing.T) { uri string expectedIDs []int64 }{ - {"jobs", append([]int64{3, 2, 1}, existingIDs...)}, - {"jobs?limit=1", []int64{3}}, - {"jobs?status=running", []int64{2, 1}}, + {"jobs", append([]int64{4, 3, 2, 1}, existingIDs...)}, + {"jobs?limit=1", []int64{4}}, + {"jobs?status=running", []int64{4, 2, 1}}, {"jobs?status=succeeded", append([]int64{3}, existingIDs...)}, {"jobs?status=pending", []int64{}}, {"jobs?status=garbage", []int64{}}, diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index a424726d7f61..1624af6a0af8 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -394,19 +394,20 @@ func tsOrNull(micros int64) tree.Datum { var crdbInternalJobsTable = virtualSchemaTable{ schema: ` CREATE TABLE crdb_internal.jobs ( - job_id INT, - job_type STRING, - description STRING, - user_name STRING, - descriptor_ids INT[], - status STRING, - created TIMESTAMP, - started TIMESTAMP, - finished TIMESTAMP, - modified TIMESTAMP, - fraction_completed FLOAT, - error STRING, - coordinator_id INT + job_id INT, + job_type STRING, + description STRING, + user_name STRING, + descriptor_ids INT[], + status STRING, + created TIMESTAMP, + started TIMESTAMP, + finished TIMESTAMP, + modified TIMESTAMP, + fraction_completed FLOAT, + high_water_timestamp DECIMAL, + error STRING, + coordinator_id INT ); `, populate: func(ctx context.Context, p *planner, _ *DatabaseDescriptor, addRow func(...tree.Datum) error) error { @@ -422,9 +423,9 @@ CREATE TABLE crdb_internal.jobs ( id, status, created, payloadBytes, progressBytes := r[0], r[1], r[2], r[3], r[4] var jobType, description, username, descriptorIDs, started, - finished, modified, fractionCompleted, errorStr, leaseNode = tree.DNull, + finished, modified, fractionCompleted, highWaterTimestamp, errorStr, leaseNode = tree.DNull, tree.DNull, tree.DNull, tree.DNull, tree.DNull, tree.DNull, tree.DNull, tree.DNull, - tree.DNull, tree.DNull + tree.DNull, tree.DNull, tree.DNull // Extract data from the payload. payload, err := jobs.UnmarshalPayload(payloadBytes) @@ -462,7 +463,13 @@ CREATE TABLE crdb_internal.jobs ( } errorStr = tree.NewDString(fmt.Sprintf("%serror decoding progress: %v", baseErr, err)) } else { - fractionCompleted = tree.NewDFloat(tree.DFloat(progress.GetFractionCompleted())) + // Progress contains either fractionCompleted for traditional jobs, + // or the highWaterTimestamp for change feeds. + if highwater := progress.GetHighWater(); highwater != nil { + highWaterTimestamp = tree.TimestampToDecimal(*highwater) + } else { + fractionCompleted = tree.NewDFloat(tree.DFloat(progress.GetFractionCompleted())) + } modified = tsOrNull(progress.ModifiedMicros) } } @@ -480,6 +487,7 @@ CREATE TABLE crdb_internal.jobs ( finished, modified, fractionCompleted, + highWaterTimestamp, errorStr, leaseNode, ); err != nil { diff --git a/pkg/sql/logictest/testdata/logic_test/crdb_internal b/pkg/sql/logictest/testdata/logic_test/crdb_internal index 4cc832c385b8..0a1ba38aa21e 100644 --- a/pkg/sql/logictest/testdata/logic_test/crdb_internal +++ b/pkg/sql/logictest/testdata/logic_test/crdb_internal @@ -103,10 +103,10 @@ Channel # The validity of the rows in this table are tested elsewhere; we merely assert the columns. -query ITTTTTTTTTRTI colnames +query ITTTTTTTTTRTTI colnames SELECT * FROM crdb_internal.jobs WHERE false ---- -job_id job_type description user_name descriptor_ids status created started finished modified fraction_completed error coordinator_id +job_id job_type description user_name descriptor_ids status created started finished modified fraction_completed high_water_timestamp error coordinator_id query IITTITTT colnames SELECT * FROM crdb_internal.schema_changes WHERE table_id < 0 diff --git a/pkg/sql/logictest/testdata/planner_test/explain b/pkg/sql/logictest/testdata/planner_test/explain index 0823844aab02..d715f5e26a17 100644 --- a/pkg/sql/logictest/testdata/planner_test/explain +++ b/pkg/sql/logictest/testdata/planner_test/explain @@ -112,7 +112,7 @@ EXPLAIN SHOW JOBS ---- render · · └── values · · -· size 13 columns, 0 rows +· size 14 columns, 0 rows statement ok CREATE INDEX a ON foo(x) @@ -223,7 +223,7 @@ sort · · ├── render · · │ └── filter · · │ └── values · · - │ size 18 columns, 907 rows + │ size 18 columns, 908 rows └── render · · └── filter · · └── values · · diff --git a/pkg/sql/opt/exec/execbuilder/testdata/explain b/pkg/sql/opt/exec/execbuilder/testdata/explain index 7ac8e250f2ea..714e1f69e7cb 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/explain +++ b/pkg/sql/opt/exec/execbuilder/testdata/explain @@ -105,7 +105,7 @@ EXPLAIN SHOW JOBS ---- render · · └── values · · -· size 13 columns, 0 rows +· size 14 columns, 0 rows statement ok CREATE INDEX a ON foo(x) @@ -216,7 +216,7 @@ sort · · ├── render · · │ └── filter · · │ └── values · · - │ size 18 columns, 907 rows + │ size 18 columns, 908 rows └── render · · └── filter · · └── values · · diff --git a/pkg/sql/show_test.go b/pkg/sql/show_test.go index c72b5380298f..48595fcb678a 100644 --- a/pkg/sql/show_test.go +++ b/pkg/sql/show_test.go @@ -25,6 +25,9 @@ import ( "time" "unicode/utf8" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + + "github.com/cockroachdb/apd" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -32,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -738,70 +742,125 @@ func TestShowJobs(t *testing.T) { finished time.Time modified time.Time fractionCompleted float32 + highWater hlc.Timestamp coordinatorID roachpb.NodeID + details jobspb.Details } - in := row{ - id: 42, - typ: "SCHEMA CHANGE", - status: "superfailed", - description: "failjob", - username: "failure", - err: "boom", - // lib/pq returns time.Time objects with goofy locations, which breaks - // reflect.DeepEqual without this time.FixedZone song and dance. - // See: https://github.com/lib/pq/issues/329 - created: timeutil.Unix(1, 0).In(time.FixedZone("", 0)), - started: timeutil.Unix(2, 0).In(time.FixedZone("", 0)), - finished: timeutil.Unix(3, 0).In(time.FixedZone("", 0)), - modified: timeutil.Unix(4, 0).In(time.FixedZone("", 0)), - fractionCompleted: 0.42, - coordinatorID: 7, - } - - // system.jobs is part proper SQL columns, part protobuf, so we can't use the - // row struct directly. - inPayload, err := protoutil.Marshal(&jobspb.Payload{ - Description: in.description, - StartedMicros: in.started.UnixNano() / time.Microsecond.Nanoseconds(), - FinishedMicros: in.finished.UnixNano() / time.Microsecond.Nanoseconds(), - Username: in.username, - Lease: &jobspb.Lease{ - NodeID: 7, + for _, in := range []row{ + { + id: 42, + typ: "SCHEMA CHANGE", + status: "superfailed", + description: "failjob", + username: "failure", + err: "boom", + // lib/pq returns time.Time objects with goofy locations, which breaks + // reflect.DeepEqual without this time.FixedZone song and dance. + // See: https://github.com/lib/pq/issues/329 + created: timeutil.Unix(1, 0).In(time.FixedZone("", 0)), + started: timeutil.Unix(2, 0).In(time.FixedZone("", 0)), + finished: timeutil.Unix(3, 0).In(time.FixedZone("", 0)), + modified: timeutil.Unix(4, 0).In(time.FixedZone("", 0)), + fractionCompleted: 0.42, + coordinatorID: 7, + details: jobspb.SchemaChangeDetails{}, }, - Error: in.err, - Details: jobspb.WrapPayloadDetails(jobspb.SchemaChangeDetails{}), - }) - if err != nil { - t.Fatal(err) - } - - inProgress, err := protoutil.Marshal(&jobspb.Progress{ - ModifiedMicros: in.modified.UnixNano() / time.Microsecond.Nanoseconds(), - Progress: &jobspb.Progress_FractionCompleted{ - FractionCompleted: in.fractionCompleted, + { + id: 43, + typ: "CHANGEFEED", + status: "running", + description: "persistent feed", + username: "persistent", + err: "", + // lib/pq returns time.Time objects with goofy locations, which breaks + // reflect.DeepEqual without this time.FixedZone song and dance. + // See: https://github.com/lib/pq/issues/329 + created: timeutil.Unix(1, 0).In(time.FixedZone("", 0)), + started: timeutil.Unix(2, 0).In(time.FixedZone("", 0)), + finished: timeutil.Unix(3, 0).In(time.FixedZone("", 0)), + modified: timeutil.Unix(4, 0).In(time.FixedZone("", 0)), + highWater: hlc.Timestamp{ + WallTime: 1533143242000000, + Logical: 4, + }, + coordinatorID: 7, + details: jobspb.ChangefeedDetails{}, }, - }) - if err != nil { - t.Fatal(err) - } - sqlDB.Exec(t, - `INSERT INTO system.jobs (id, status, created, payload, progress) VALUES ($1, $2, $3, $4, $5)`, - in.id, in.status, in.created, inPayload, inProgress, - ) + } { + t.Run("", func(t *testing.T) { + // system.jobs is part proper SQL columns, part protobuf, so we can't use the + // row struct directly. + inPayload, err := protoutil.Marshal(&jobspb.Payload{ + Description: in.description, + StartedMicros: in.started.UnixNano() / time.Microsecond.Nanoseconds(), + FinishedMicros: in.finished.UnixNano() / time.Microsecond.Nanoseconds(), + Username: in.username, + Lease: &jobspb.Lease{ + NodeID: 7, + }, + Error: in.err, + Details: jobspb.WrapPayloadDetails(in.details), + }) + if err != nil { + t.Fatal(err) + } - var out row - sqlDB.QueryRow(t, ` + progress := &jobspb.Progress{ + ModifiedMicros: in.modified.UnixNano() / time.Microsecond.Nanoseconds(), + } + if in.highWater != (hlc.Timestamp{}) { + progress.Progress = &jobspb.Progress_HighWater{ + HighWater: &in.highWater, + } + } else { + progress.Progress = &jobspb.Progress_FractionCompleted{ + FractionCompleted: in.fractionCompleted, + } + } + inProgress, err := protoutil.Marshal(progress) + if err != nil { + t.Fatal(err) + } + sqlDB.Exec(t, + `INSERT INTO system.jobs (id, status, created, payload, progress) VALUES ($1, $2, $3, $4, $5)`, + in.id, in.status, in.created, inPayload, inProgress, + ) + + var out row + var maybeFractionCompleted *float32 + var decimalHighWater *apd.Decimal + sqlDB.QueryRow(t, ` SELECT job_id, job_type, status, created, description, started, finished, modified, - fraction_completed, user_name, ifnull(error, ''), coordinator_id - FROM crdb_internal.jobs`).Scan( - &out.id, &out.typ, &out.status, &out.created, &out.description, &out.started, - &out.finished, &out.modified, &out.fractionCompleted, &out.username, - &out.err, &out.coordinatorID, - ) - if !reflect.DeepEqual(in, out) { - diff := strings.Join(pretty.Diff(in, out), "\n") - t.Fatalf("in job did not match out job:\n%s", diff) + fraction_completed, high_water_timestamp, user_name, ifnull(error, ''), coordinator_id + FROM crdb_internal.jobs WHERE job_id = $1`, in.id).Scan( + &out.id, &out.typ, &out.status, &out.created, &out.description, &out.started, + &out.finished, &out.modified, &maybeFractionCompleted, &decimalHighWater, &out.username, + &out.err, &out.coordinatorID, + ) + + if decimalHighWater != nil { + var err error + out.highWater, err = tree.DecimalToHLC(decimalHighWater) + if err != nil { + t.Fatal(err) + } + } + + if maybeFractionCompleted != nil { + out.fractionCompleted = *maybeFractionCompleted + } + + // details field is not explicitly checked for equality; its value is + // confirmed via the job_type field, which is dependent on the details + // field. + out.details = in.details + + if !reflect.DeepEqual(in, out) { + diff := strings.Join(pretty.Diff(in, out), "\n") + t.Fatalf("in job did not match out job:\n%s", diff) + } + }) } }