Skip to content

Commit

Permalink
cdc/admin: Fix output of Jobs API for Changefeeds
Browse files Browse the repository at this point in the history
Output of the Admin "Jobs" endpoint was broken if any Changefeed type
jobs were present; this happened because crdb_internal.jobs can now
return NULL for the fraction_completed field, and the Jobs Admin API
method was unable to handle this situation.

Release note: None
  • Loading branch information
Matt Tracy committed Aug 9, 2018
1 parent d1540fe commit b76e953
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 5 deletions.
20 changes: 18 additions & 2 deletions pkg/server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,7 @@ func (s *adminServer) Jobs(
}
for i, row := range rows {
job := &resp.Jobs[i]
var fractionCompletedOrNil *float32
if err := scanner.ScanAll(
row,
&job.ID,
Expand All @@ -1163,11 +1164,14 @@ func (s *adminServer) Jobs(
&job.Started,
&job.Finished,
&job.Modified,
&job.FractionCompleted,
&fractionCompletedOrNil,
&job.Error,
); err != nil {
return nil, s.serverError(err)
}
if fractionCompletedOrNil != nil {
job.FractionCompleted = *fractionCompletedOrNil
}
}

return &resp, nil
Expand Down Expand Up @@ -1662,6 +1666,18 @@ func (rs resultScanner) ScanIndex(row tree.Datums, index int, dst interface{}) e
}
*d = float32(*s)

case **float32:
s, ok := src.(*tree.DFloat)
if !ok {
if src != tree.DNull {
return errors.Errorf("source type assertion failed")
}
*d = nil
break
}
val := float32(*s)
*d = &val

case *int64:
s, ok := tree.AsDInt(src)
if !ok {
Expand Down Expand Up @@ -1698,7 +1714,7 @@ func (rs resultScanner) ScanIndex(row tree.Datums, index int, dst interface{}) e
return errors.Errorf("source type assertion failed")
}
*d = nil
return nil
break
}
*d = &s.Time

Expand Down
20 changes: 17 additions & 3 deletions pkg/server/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1098,14 +1098,28 @@ func TestAdminAPIJobs(t *testing.T) {
{1, jobs.StatusRunning, jobspb.RestoreDetails{}, jobspb.RestoreProgress{}},
{2, jobs.StatusRunning, jobspb.BackupDetails{}, jobspb.BackupProgress{}},
{3, jobs.StatusSucceeded, jobspb.BackupDetails{}, jobspb.BackupProgress{}},
{4, jobs.StatusRunning, jobspb.ChangefeedDetails{}, jobspb.ChangefeedProgress{}},
}
for _, job := range testJobs {
payload := jobspb.Payload{Details: jobspb.WrapPayloadDetails(job.details)}
payloadBytes, err := protoutil.Marshal(&payload)
if err != nil {
t.Fatal(err)
}

progress := jobspb.Progress{Details: jobspb.WrapProgressDetails(job.progress)}
// Populate progress.Progress field with a specific progress type based on
// the job type.
if _, ok := job.progress.(jobspb.ChangefeedProgress); ok {
progress.Progress = &jobspb.Progress_HighWater{
HighWater: &hlc.Timestamp{},
}
} else {
progress.Progress = &jobspb.Progress_FractionCompleted{
FractionCompleted: 1.0,
}
}

progressBytes, err := protoutil.Marshal(&progress)
if err != nil {
t.Fatal(err)
Expand All @@ -1122,9 +1136,9 @@ func TestAdminAPIJobs(t *testing.T) {
uri string
expectedIDs []int64
}{
{"jobs", append([]int64{3, 2, 1}, existingIDs...)},
{"jobs?limit=1", []int64{3}},
{"jobs?status=running", []int64{2, 1}},
{"jobs", append([]int64{4, 3, 2, 1}, existingIDs...)},
{"jobs?limit=1", []int64{4}},
{"jobs?status=running", []int64{4, 2, 1}},
{"jobs?status=succeeded", append([]int64{3}, existingIDs...)},
{"jobs?status=pending", []int64{}},
{"jobs?status=garbage", []int64{}},
Expand Down

0 comments on commit b76e953

Please sign in to comment.