From b76e953ee2d06628cb997bff98d086ef972370fc Mon Sep 17 00:00:00 2001 From: Matt Tracy Date: Thu, 9 Aug 2018 14:11:47 -0700 Subject: [PATCH] cdc/admin: Fix output of Jobs API for Changefeeds Output of the Admin "Jobs" endpoint was broken if any Changefeed type jobs were present; this happened because crdb_internal.jobs can now return NULL for the fraction_completed field, and the Jobs Admin API method was unable to handle this situation. Release note: None --- pkg/server/admin.go | 20 ++++++++++++++++++-- pkg/server/admin_test.go | 20 +++++++++++++++++--- 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/pkg/server/admin.go b/pkg/server/admin.go index 3640db004da8..55605108004a 100644 --- a/pkg/server/admin.go +++ b/pkg/server/admin.go @@ -1151,6 +1151,7 @@ func (s *adminServer) Jobs( } for i, row := range rows { job := &resp.Jobs[i] + var fractionCompletedOrNil *float32 if err := scanner.ScanAll( row, &job.ID, @@ -1163,11 +1164,14 @@ func (s *adminServer) Jobs( &job.Started, &job.Finished, &job.Modified, - &job.FractionCompleted, + &fractionCompletedOrNil, &job.Error, ); err != nil { return nil, s.serverError(err) } + if fractionCompletedOrNil != nil { + job.FractionCompleted = *fractionCompletedOrNil + } } return &resp, nil @@ -1662,6 +1666,18 @@ func (rs resultScanner) ScanIndex(row tree.Datums, index int, dst interface{}) e } *d = float32(*s) + case **float32: + s, ok := src.(*tree.DFloat) + if !ok { + if src != tree.DNull { + return errors.Errorf("source type assertion failed") + } + *d = nil + break + } + val := float32(*s) + *d = &val + case *int64: s, ok := tree.AsDInt(src) if !ok { @@ -1698,7 +1714,7 @@ func (rs resultScanner) ScanIndex(row tree.Datums, index int, dst interface{}) e return errors.Errorf("source type assertion failed") } *d = nil - return nil + break } *d = &s.Time diff --git a/pkg/server/admin_test.go b/pkg/server/admin_test.go index 88d3bcbd523e..5514f47b585e 100644 --- a/pkg/server/admin_test.go +++ b/pkg/server/admin_test.go @@ -1098,6 +1098,7 @@ func TestAdminAPIJobs(t *testing.T) { {1, jobs.StatusRunning, jobspb.RestoreDetails{}, jobspb.RestoreProgress{}}, {2, jobs.StatusRunning, jobspb.BackupDetails{}, jobspb.BackupProgress{}}, {3, jobs.StatusSucceeded, jobspb.BackupDetails{}, jobspb.BackupProgress{}}, + {4, jobs.StatusRunning, jobspb.ChangefeedDetails{}, jobspb.ChangefeedProgress{}}, } for _, job := range testJobs { payload := jobspb.Payload{Details: jobspb.WrapPayloadDetails(job.details)} @@ -1105,7 +1106,20 @@ func TestAdminAPIJobs(t *testing.T) { if err != nil { t.Fatal(err) } + progress := jobspb.Progress{Details: jobspb.WrapProgressDetails(job.progress)} + // Populate progress.Progress field with a specific progress type based on + // the job type. + if _, ok := job.progress.(jobspb.ChangefeedProgress); ok { + progress.Progress = &jobspb.Progress_HighWater{ + HighWater: &hlc.Timestamp{}, + } + } else { + progress.Progress = &jobspb.Progress_FractionCompleted{ + FractionCompleted: 1.0, + } + } + progressBytes, err := protoutil.Marshal(&progress) if err != nil { t.Fatal(err) @@ -1122,9 +1136,9 @@ func TestAdminAPIJobs(t *testing.T) { uri string expectedIDs []int64 }{ - {"jobs", append([]int64{3, 2, 1}, existingIDs...)}, - {"jobs?limit=1", []int64{3}}, - {"jobs?status=running", []int64{2, 1}}, + {"jobs", append([]int64{4, 3, 2, 1}, existingIDs...)}, + {"jobs?limit=1", []int64{4}}, + {"jobs?status=running", []int64{4, 2, 1}}, {"jobs?status=succeeded", append([]int64{3}, existingIDs...)}, {"jobs?status=pending", []int64{}}, {"jobs?status=garbage", []int64{}},