Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc: Add highwater timestamp to crdb_internal.jobs #28156

Merged
merged 1 commit into from
Aug 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions pkg/server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,7 @@ func (s *adminServer) Jobs(
}
for i, row := range rows {
job := &resp.Jobs[i]
var fractionCompletedOrNil *float32
if err := scanner.ScanAll(
row,
&job.ID,
Expand All @@ -1163,11 +1164,14 @@ func (s *adminServer) Jobs(
&job.Started,
&job.Finished,
&job.Modified,
&job.FractionCompleted,
&fractionCompletedOrNil,
&job.Error,
); err != nil {
return nil, s.serverError(err)
}
if fractionCompletedOrNil != nil {
job.FractionCompleted = *fractionCompletedOrNil
}
}

return &resp, nil
Expand Down Expand Up @@ -1662,6 +1666,18 @@ func (rs resultScanner) ScanIndex(row tree.Datums, index int, dst interface{}) e
}
*d = float32(*s)

case **float32:
s, ok := src.(*tree.DFloat)
if !ok {
if src != tree.DNull {
return errors.Errorf("source type assertion failed")
}
*d = nil
break
}
val := float32(*s)
*d = &val

case *int64:
s, ok := tree.AsDInt(src)
if !ok {
Expand Down Expand Up @@ -1698,7 +1714,7 @@ func (rs resultScanner) ScanIndex(row tree.Datums, index int, dst interface{}) e
return errors.Errorf("source type assertion failed")
}
*d = nil
return nil
break
}
*d = &s.Time

Expand Down
20 changes: 17 additions & 3 deletions pkg/server/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1098,14 +1098,28 @@ func TestAdminAPIJobs(t *testing.T) {
{1, jobs.StatusRunning, jobspb.RestoreDetails{}, jobspb.RestoreProgress{}},
{2, jobs.StatusRunning, jobspb.BackupDetails{}, jobspb.BackupProgress{}},
{3, jobs.StatusSucceeded, jobspb.BackupDetails{}, jobspb.BackupProgress{}},
{4, jobs.StatusRunning, jobspb.ChangefeedDetails{}, jobspb.ChangefeedProgress{}},
}
for _, job := range testJobs {
payload := jobspb.Payload{Details: jobspb.WrapPayloadDetails(job.details)}
payloadBytes, err := protoutil.Marshal(&payload)
if err != nil {
t.Fatal(err)
}

progress := jobspb.Progress{Details: jobspb.WrapProgressDetails(job.progress)}
// Populate progress.Progress field with a specific progress type based on
// the job type.
if _, ok := job.progress.(jobspb.ChangefeedProgress); ok {
progress.Progress = &jobspb.Progress_HighWater{
HighWater: &hlc.Timestamp{},
}
} else {
progress.Progress = &jobspb.Progress_FractionCompleted{
FractionCompleted: 1.0,
}
}

progressBytes, err := protoutil.Marshal(&progress)
if err != nil {
t.Fatal(err)
Expand All @@ -1122,9 +1136,9 @@ func TestAdminAPIJobs(t *testing.T) {
uri string
expectedIDs []int64
}{
{"jobs", append([]int64{3, 2, 1}, existingIDs...)},
{"jobs?limit=1", []int64{3}},
{"jobs?status=running", []int64{2, 1}},
{"jobs", append([]int64{4, 3, 2, 1}, existingIDs...)},
{"jobs?limit=1", []int64{4}},
{"jobs?status=running", []int64{4, 2, 1}},
{"jobs?status=succeeded", append([]int64{3}, existingIDs...)},
{"jobs?status=pending", []int64{}},
{"jobs?status=garbage", []int64{}},
Expand Down
40 changes: 24 additions & 16 deletions pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,19 +394,20 @@ func tsOrNull(micros int64) tree.Datum {
var crdbInternalJobsTable = virtualSchemaTable{
schema: `
CREATE TABLE crdb_internal.jobs (
job_id INT,
job_type STRING,
description STRING,
user_name STRING,
descriptor_ids INT[],
status STRING,
created TIMESTAMP,
started TIMESTAMP,
finished TIMESTAMP,
modified TIMESTAMP,
fraction_completed FLOAT,
error STRING,
coordinator_id INT
job_id INT,
job_type STRING,
description STRING,
user_name STRING,
descriptor_ids INT[],
status STRING,
created TIMESTAMP,
started TIMESTAMP,
finished TIMESTAMP,
modified TIMESTAMP,
fraction_completed FLOAT,
high_water_timestamp DECIMAL,
error STRING,
coordinator_id INT
);
`,
populate: func(ctx context.Context, p *planner, _ *DatabaseDescriptor, addRow func(...tree.Datum) error) error {
Expand All @@ -422,9 +423,9 @@ CREATE TABLE crdb_internal.jobs (
id, status, created, payloadBytes, progressBytes := r[0], r[1], r[2], r[3], r[4]

var jobType, description, username, descriptorIDs, started,
finished, modified, fractionCompleted, errorStr, leaseNode = tree.DNull,
finished, modified, fractionCompleted, highWaterTimestamp, errorStr, leaseNode = tree.DNull,
tree.DNull, tree.DNull, tree.DNull, tree.DNull, tree.DNull, tree.DNull, tree.DNull,
tree.DNull, tree.DNull
tree.DNull, tree.DNull, tree.DNull

// Extract data from the payload.
payload, err := jobs.UnmarshalPayload(payloadBytes)
Expand Down Expand Up @@ -462,7 +463,13 @@ CREATE TABLE crdb_internal.jobs (
}
errorStr = tree.NewDString(fmt.Sprintf("%serror decoding progress: %v", baseErr, err))
} else {
fractionCompleted = tree.NewDFloat(tree.DFloat(progress.GetFractionCompleted()))
// Progress contains either fractionCompleted for traditional jobs,
// or the highWaterTimestamp for change feeds.
if highwater := progress.GetHighWater(); highwater != nil {
highWaterTimestamp = tree.TimestampToDecimal(*highwater)
} else {
fractionCompleted = tree.NewDFloat(tree.DFloat(progress.GetFractionCompleted()))
}
modified = tsOrNull(progress.ModifiedMicros)
}
}
Expand All @@ -480,6 +487,7 @@ CREATE TABLE crdb_internal.jobs (
finished,
modified,
fractionCompleted,
highWaterTimestamp,
errorStr,
leaseNode,
); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/logictest/testdata/logic_test/crdb_internal
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,10 @@ Channel


# The validity of the rows in this table are tested elsewhere; we merely assert the columns.
query ITTTTTTTTTRTI colnames
query ITTTTTTTTTRTTI colnames
SELECT * FROM crdb_internal.jobs WHERE false
----
job_id job_type description user_name descriptor_ids status created started finished modified fraction_completed error coordinator_id
job_id job_type description user_name descriptor_ids status created started finished modified fraction_completed high_water_timestamp error coordinator_id

query IITTITTT colnames
SELECT * FROM crdb_internal.schema_changes WHERE table_id < 0
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/logictest/testdata/planner_test/explain
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ EXPLAIN SHOW JOBS
----
render · ·
└── values · ·
· size 13 columns, 0 rows
· size 14 columns, 0 rows

statement ok
CREATE INDEX a ON foo(x)
Expand Down Expand Up @@ -223,7 +223,7 @@ sort · ·
├── render · ·
│ └── filter · ·
│ └── values · ·
│ size 18 columns, 907 rows
│ size 18 columns, 908 rows
└── render · ·
└── filter · ·
└── values · ·
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/opt/exec/execbuilder/testdata/explain
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ EXPLAIN SHOW JOBS
----
render · ·
└── values · ·
· size 13 columns, 0 rows
· size 14 columns, 0 rows

statement ok
CREATE INDEX a ON foo(x)
Expand Down Expand Up @@ -216,7 +216,7 @@ sort · ·
├── render · ·
│ └── filter · ·
│ └── values · ·
│ size 18 columns, 907 rows
│ size 18 columns, 908 rows
└── render · ·
└── filter · ·
└── values · ·
Expand Down
Loading