Skip to content

Commit

Permalink
sql,stats: use the jobs framework for CREATE STATISTICS
Browse files Browse the repository at this point in the history
Prior to this commit, CREATE STATISTICS was a regular SQL statement
and followed the standard logic of AST -> planNode -> DistSQL physical
plan -> execution. This commit changes CREATE STATISTICS to use the
jobs framework, and as a result the createStats planNode has been
removed.

There are several advantages to using the jobs framework:
- Now CREATE STATISTICS jobs can easily be cancelled, paused and resumed
  from the Admin UI.
- Nodes can adopt the job if the original gateway node fails.
- We will be able to use the JobID to lock creation of automatic
  statistics, so that only one automatic statistics job can run at
  a time. Job adoption will ensure that a dead node never prevents progress
  by holding a lock on stats creation (implementation of locking will be
  saved for the next PR).

Release note (sql change): CREATE STATISTICS now runs as a job instead
of as a regular SQL statement.
  • Loading branch information
rytaft committed Jan 27, 2019
1 parent 8cbeb53 commit 22a3b48
Show file tree
Hide file tree
Showing 20 changed files with 2,138 additions and 677 deletions.
1,258 changes: 1,061 additions & 197 deletions pkg/jobs/jobspb/jobs.pb.go

Large diffs are not rendered by default.

23 changes: 23 additions & 0 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,26 @@ message ChangefeedProgress {
repeated ResolvedSpan resolved_spans = 2 [(gogoproto.nullable) = false];
}

message CreateStatsDetails {
message Columns {
repeated uint32 ids = 1 [
(gogoproto.customname) = "IDs",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/sqlbase.ColumnID"
];
}
string name = 1 [
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/sem/tree.Name"
];
sqlbase.TableDescriptor table = 2 [(gogoproto.nullable) = false];
repeated Columns columns = 3 [(gogoproto.nullable) = false];
string Statement = 4;
}

message CreateStatsProgress {
repeated float sampler_progress = 1;
}


message Payload {
string description = 1;
string username = 2;
Expand All @@ -215,6 +235,7 @@ message Payload {
SchemaChangeDetails schemaChange = 12;
ImportDetails import = 13;
ChangefeedDetails changefeed = 14;
CreateStatsDetails createStats = 15;
}
}

Expand All @@ -232,6 +253,7 @@ message Progress {
SchemaChangeProgress schemaChange = 12;
ImportProgress import = 13;
ChangefeedProgress changefeed = 14;
CreateStatsProgress createStats = 15;
}
}

Expand All @@ -245,4 +267,5 @@ enum Type {
SCHEMA_CHANGE = 3 [(gogoproto.enumvalue_customname) = "TypeSchemaChange"];
IMPORT = 4 [(gogoproto.enumvalue_customname) = "TypeImport"];
CHANGEFEED = 5 [(gogoproto.enumvalue_customname) = "TypeChangefeed"];
CREATE_STATS = 6 [(gogoproto.enumvalue_customname) = "TypeCreateStats"];
}
31 changes: 31 additions & 0 deletions pkg/jobs/jobspb/wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var _ Details = BackupDetails{}
var _ Details = RestoreDetails{}
var _ Details = SchemaChangeDetails{}
var _ Details = ChangefeedDetails{}
var _ Details = CreateStatsDetails{}

// ProgressDetails is a marker interface for job progress details proto structs.
type ProgressDetails interface{}
Expand All @@ -36,6 +37,7 @@ var _ ProgressDetails = BackupProgress{}
var _ ProgressDetails = RestoreProgress{}
var _ ProgressDetails = SchemaChangeProgress{}
var _ ProgressDetails = ChangefeedProgress{}
var _ ProgressDetails = CreateStatsProgress{}

// Type returns the payload's job type.
func (p *Payload) Type() Type {
Expand All @@ -55,6 +57,8 @@ func DetailsType(d isPayload_Details) Type {
return TypeImport
case *Payload_Changefeed:
return TypeChangefeed
case *Payload_CreateStats:
return TypeCreateStats
default:
panic(fmt.Sprintf("Payload.Type called on a payload with an unknown details type: %T", d))
}
Expand All @@ -79,6 +83,8 @@ func WrapProgressDetails(details ProgressDetails) interface {
return &Progress_Import{Import: &d}
case ChangefeedProgress:
return &Progress_Changefeed{Changefeed: &d}
case CreateStatsProgress:
return &Progress_CreateStats{CreateStats: &d}
default:
panic(fmt.Sprintf("WrapProgressDetails: unknown details type %T", d))
}
Expand All @@ -98,6 +104,8 @@ func (p *Payload) UnwrapDetails() Details {
return *d.Import
case *Payload_Changefeed:
return *d.Changefeed
case *Payload_CreateStats:
return *d.CreateStats
default:
return nil
}
Expand All @@ -117,6 +125,8 @@ func (p *Progress) UnwrapDetails() ProgressDetails {
return *d.Import
case *Progress_Changefeed:
return *d.Changefeed
case *Progress_CreateStats:
return *d.CreateStats
default:
return nil
}
Expand Down Expand Up @@ -149,6 +159,8 @@ func WrapPayloadDetails(details Details) interface {
return &Payload_Import{Import: &d}
case ChangefeedDetails:
return &Payload_Changefeed{Changefeed: &d}
case CreateStatsDetails:
return &Payload_CreateStats{CreateStats: &d}
default:
panic(fmt.Sprintf("jobs.WrapPayloadDetails: unknown details type %T", d))
}
Expand Down Expand Up @@ -195,5 +207,24 @@ func (d ImportProgress) Completed() float32 {
return completed
}

// Completed returns the total complete percent of processing this table. Since
// there are multiple distSQL processors running sampling, we assign slots to
// each one, and they are in charge of updating their portion of the progress.
func (d CreateStatsProgress) Completed() float32 {
sum := func(fs []float32) float32 {
var total float32
for _, f := range fs {
total += f
}
return total
}
completed := sum(d.SamplerProgress)
// Float addition can round such that the sum is > 1.
if completed > 1 {
completed = 1
}
return completed
}

// ChangefeedTargets is a set of id targets with metadata.
type ChangefeedTargets map[sqlbase.ID]ChangefeedTarget
Loading

0 comments on commit 22a3b48

Please sign in to comment.