Skip to content

Commit

Permalink
Merge pull request #11802 from influxdata/flux-staging
Browse files Browse the repository at this point in the history
Update to Flux v0.19.0
  • Loading branch information
nathanielc authored Feb 11, 2019
2 parents 20c15ff + 020a11f commit 0213db3
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 107 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ require (
github.com/hashicorp/vault v0.11.5
github.com/hashicorp/vault-plugin-secrets-kv v0.0.0-20181106190520-2236f141171e // indirect
github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d // indirect
github.com/influxdata/flux v0.18.0
github.com/influxdata/flux v0.19.0
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6
github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368
github.com/jefferai/jsonx v0.0.0-20160721235117-9cc31c3135ee // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,8 @@ github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28=
github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/flux v0.18.0 h1:tcbvJrOqnCcbyvXz7cbwEtIGMzr/JEBSDcRbdnRJCDk=
github.com/influxdata/flux v0.18.0/go.mod h1:0f5Yrm4VPSd/Ne6jIVOVtPo0MFe6jpLCr6vdaZYp7wY=
github.com/influxdata/flux v0.19.0 h1:Aoc8piw66y8g5eVKhoGDkH5HBJ9ijtCMc0cuhp9TFXA=
github.com/influxdata/flux v0.19.0/go.mod h1:0f5Yrm4VPSd/Ne6jIVOVtPo0MFe6jpLCr6vdaZYp7wY=
github.com/influxdata/goreleaser v0.97.0-influx/go.mod h1:MnjA0e0Uq6ISqjG1WxxMAl+3VS1QYjILSWVnMYDxasE=
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6 h1:CFx+pP90q/qg3spoiZjf8donE4WpAdjeJfPOcoNqkWo=
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6/go.mod h1:KpVI7okXjK6PRi3Z5B+mtKZli+R1DnZgb3N+tzevNgo=
Expand Down
2 changes: 1 addition & 1 deletion http/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ var influxqlParseErrorRE = regexp.MustCompile(`^(.+) at line (\d+), char (\d+)$`

func nowFunc(now time.Time) values.Function {
timeVal := values.NewTime(values.ConvertTime(now))
ftype := semantic.NewFunctionType(semantic.FunctionSignature{
ftype := semantic.NewFunctionPolyType(semantic.FunctionPolySignature{
Return: semantic.Time,
})
call := func(args values.Object) (values.Value, error) {
Expand Down
2 changes: 1 addition & 1 deletion query/influxql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ If the aggregate is combined with conditions, the column name of `_value` is rep

#### <a name="normalize-time"></a> Normalize the time column

If a function was evaluated and the query type is an aggregate type, then all of the functions need to have their time normalized. If the function is an aggregate, the following is added:
If a function was evaluated and the query type is an aggregate type or if we are grouping by time, then all of the functions need to have their time normalized. If the function is an aggregate, the following is added:

```
... |> mean() |> duplicate(column: "_start", as: "_time")
Expand Down
2 changes: 0 additions & 2 deletions query/influxql/end_to_end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@ var skipTests = map[string]string{
"selector_2": "Transpiler: first function uses different series than influxQL (https://github.com/influxdata/platform/issues/1605)",
"selector_6": "Transpiler: first function uses different series than influxQL (https://github.com/influxdata/platform/issues/1605)",
"selector_7": "Transpiler: first function uses different series than influxQL (https://github.com/influxdata/platform/issues/1605)",
"selector_8": "Transpiler: selectors with group by produce different time values than influxQL (https://github.com/influxdata/platform/issues/1606)",
"selector_9": "Transpiler: selectors with group by produce different time values than influxQL (https://github.com/influxdata/platform/issues/1606)",
"series_agg_0": "Transpiler: Implement difference (https://github.com/influxdata/platform/issues/1609)",
"series_agg_1": "Transpiler: Implement stddev (https://github.com/influxdata/platform/issues/1610)",
"series_agg_2": "Transpiler: Implement spread (https://github.com/influxdata/platform/issues/1611)",
Expand Down
2 changes: 1 addition & 1 deletion query/influxql/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func (gr *groupInfo) createCursor(t *transpilerState) (cursor, error) {

// If a function call is present, evaluate the function call.
if gr.call != nil {
c, err := createFunctionCursor(t, gr.call, cur, !gr.selector)
c, err := createFunctionCursor(t, gr.call, cur, !gr.selector || interval > 0)
if err != nil {
return nil, err
}
Expand Down
142 changes: 99 additions & 43 deletions query/stdlib/influxdata/influxdb/from.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ func init() {
flux.RegisterOpSpec(FromKind, newFromOp)
plan.RegisterProcedureSpec(FromKind, newFromProcedure, FromKind)
plan.RegisterPhysicalRules(
FromConversionRule{},
MergeFromRangeRule{},
MergeFromFilterRule{},
FromDistinctRule{},
MergeFromGroupRule{},
FromKeysRule{},
)
execute.RegisterSource(FromKind, createFromSource)
execute.RegisterSource(PhysicalFromKind, createFromSource)
}

func createFromOpSpec(args flux.Arguments, a *flux.Administration) (flux.OperationSpec, error) {
Expand Down Expand Up @@ -79,10 +80,52 @@ func (s *FromOpSpec) Kind() flux.OperationKind {
}

type FromProcedureSpec struct {
plan.DefaultCost
Bucket string
BucketID string
}

func newFromProcedure(qs flux.OperationSpec, pa plan.Administration) (plan.ProcedureSpec, error) {
spec, ok := qs.(*FromOpSpec)
if !ok {
return nil, fmt.Errorf("invalid spec type %T", qs)
}

return &FromProcedureSpec{
Bucket: spec.Bucket,
BucketID: spec.BucketID,
}, nil
}

func (s *FromProcedureSpec) Kind() plan.ProcedureKind {
return FromKind
}

func (s *FromProcedureSpec) Copy() plan.ProcedureSpec {
ns := new(FromProcedureSpec)

ns.Bucket = s.Bucket
ns.BucketID = s.BucketID

return ns
}

func (s FromProcedureSpec) PostPhysicalValidate(id plan.NodeID) error {
// FromProcedureSpec has no bounds, so must be invalid.
var bucket string
if len(s.Bucket) > 0 {
bucket = s.Bucket
} else {
bucket = s.BucketID
}
return fmt.Errorf(`%s: results from "%s" must be bounded`, id, bucket)
}

const PhysicalFromKind = "physFrom"

type PhysicalFromProcedureSpec struct {
FromProcedureSpec

plan.DefaultCost
BoundsSet bool
Bounds flux.Bounds

Expand All @@ -109,24 +152,12 @@ type FromProcedureSpec struct {
AggregateMethod string
}

func newFromProcedure(qs flux.OperationSpec, pa plan.Administration) (plan.ProcedureSpec, error) {
spec, ok := qs.(*FromOpSpec)
if !ok {
return nil, fmt.Errorf("invalid spec type %T", qs)
}

return &FromProcedureSpec{
Bucket: spec.Bucket,
BucketID: spec.BucketID,
}, nil
func (PhysicalFromProcedureSpec) Kind() plan.ProcedureKind {
return PhysicalFromKind
}

func (s *FromProcedureSpec) Kind() plan.ProcedureKind {
return FromKind
}

func (s *FromProcedureSpec) Copy() plan.ProcedureSpec {
ns := new(FromProcedureSpec)
func (s *PhysicalFromProcedureSpec) Copy() plan.ProcedureSpec {
ns := new(PhysicalFromProcedureSpec)

ns.Bucket = s.Bucket
ns.BucketID = s.BucketID
Expand Down Expand Up @@ -160,7 +191,7 @@ func (s *FromProcedureSpec) Copy() plan.ProcedureSpec {
}

// TimeBounds implements plan.BoundsAwareProcedureSpec.
func (s *FromProcedureSpec) TimeBounds(predecessorBounds *plan.Bounds) *plan.Bounds {
func (s *PhysicalFromProcedureSpec) TimeBounds(predecessorBounds *plan.Bounds) *plan.Bounds {
if s.BoundsSet {
bounds := &plan.Bounds{
Start: values.ConvertTime(s.Bounds.Start.Time(s.Bounds.Now)),
Expand All @@ -171,7 +202,7 @@ func (s *FromProcedureSpec) TimeBounds(predecessorBounds *plan.Bounds) *plan.Bou
return nil
}

func (s FromProcedureSpec) PostPhysicalValidate(id plan.NodeID) error {
func (s PhysicalFromProcedureSpec) PostPhysicalValidate(id plan.NodeID) error {
if !s.BoundsSet || (s.Bounds.Start.IsZero() && s.Bounds.Stop.IsZero()) {
var bucket string
if len(s.Bucket) > 0 {
Expand All @@ -185,6 +216,30 @@ func (s FromProcedureSpec) PostPhysicalValidate(id plan.NodeID) error {
return nil
}

// FromConversionRule converts a logical `from` node into a physical `from` node.
// TODO(cwolff): this rule can go away when we require a `range`
// to be pushed into a logical `from` to create a physical `from.`
type FromConversionRule struct {
}

func (FromConversionRule) Name() string {
return "FromConversionRule"
}

func (FromConversionRule) Pattern() plan.Pattern {
return plan.Pat(FromKind)
}

func (FromConversionRule) Rewrite(pn plan.PlanNode) (plan.PlanNode, bool, error) {
logicalFromSpec := pn.ProcedureSpec().(*FromProcedureSpec)
newNode := plan.CreatePhysicalNode(pn.ID(), &PhysicalFromProcedureSpec{
FromProcedureSpec: *logicalFromSpec,
})

plan.ReplaceNode(pn, newNode)
return newNode, true, nil
}

// MergeFromRangeRule pushes a `range` into a `from`.
type MergeFromRangeRule struct{}

Expand All @@ -195,15 +250,15 @@ func (rule MergeFromRangeRule) Name() string {

// Pattern returns the pattern that matches `from -> range`.
func (rule MergeFromRangeRule) Pattern() plan.Pattern {
return plan.Pat(universe.RangeKind, plan.Pat(FromKind))
return plan.Pat(universe.RangeKind, plan.Pat(PhysicalFromKind))
}

// Rewrite attempts to rewrite a `from -> range` into a `FromRange`.
func (rule MergeFromRangeRule) Rewrite(node plan.PlanNode) (plan.PlanNode, bool, error) {
from := node.Predecessors()[0]
fromSpec := from.ProcedureSpec().(*FromProcedureSpec)
fromSpec := from.ProcedureSpec().(*PhysicalFromProcedureSpec)
rangeSpec := node.ProcedureSpec().(*universe.RangeProcedureSpec)
fromRange := fromSpec.Copy().(*FromProcedureSpec)
fromRange := fromSpec.Copy().(*PhysicalFromProcedureSpec)

// Set new bounds to `range` bounds initially
fromRange.Bounds = rangeSpec.Bounds
Expand Down Expand Up @@ -240,7 +295,7 @@ func (rule MergeFromRangeRule) Rewrite(node plan.PlanNode) (plan.PlanNode, bool,
fromRange.BoundsSet = true

// Finally merge nodes into single operation
merged, err := plan.MergePhysicalPlanNodes(node, from, fromRange)
merged, err := plan.MergeToPhysicalPlanNode(node, from, fromRange)
if err != nil {
return nil, false, err
}
Expand All @@ -258,13 +313,13 @@ func (MergeFromFilterRule) Name() string {
}

func (MergeFromFilterRule) Pattern() plan.Pattern {
return plan.Pat(universe.FilterKind, plan.Pat(FromKind))
return plan.Pat(universe.FilterKind, plan.Pat(PhysicalFromKind))
}

func (MergeFromFilterRule) Rewrite(filterNode plan.PlanNode) (plan.PlanNode, bool, error) {
filterSpec := filterNode.ProcedureSpec().(*universe.FilterProcedureSpec)
fromNode := filterNode.Predecessors()[0]
fromSpec := fromNode.ProcedureSpec().(*FromProcedureSpec)
fromSpec := fromNode.ProcedureSpec().(*PhysicalFromProcedureSpec)

if fromSpec.AggregateSet || fromSpec.GroupingSet {
return filterNode, false, nil
Expand Down Expand Up @@ -294,7 +349,7 @@ func (MergeFromFilterRule) Rewrite(filterNode plan.PlanNode) (plan.PlanNode, boo
return filterNode, false, nil
}

newFromSpec := fromSpec.Copy().(*FromProcedureSpec)
newFromSpec := fromSpec.Copy().(*PhysicalFromProcedureSpec)
if newFromSpec.FilterSet {
newBody := semantic.ExprsToConjunction(newFromSpec.Filter.Block.Body.(semantic.Expression), pushable)
newFromSpec.Filter.Block.Body = newBody
Expand All @@ -306,7 +361,7 @@ func (MergeFromFilterRule) Rewrite(filterNode plan.PlanNode) (plan.PlanNode, boo

if notPushable == nil {
// All predicates could be pushed down, so eliminate the filter
mergedNode, err := plan.MergePhysicalPlanNodes(filterNode, fromNode, newFromSpec)
mergedNode, err := plan.MergeToPhysicalPlanNode(filterNode, fromNode, newFromSpec)
if err != nil {
return nil, false, err
}
Expand Down Expand Up @@ -468,13 +523,13 @@ func (FromDistinctRule) Name() string {
}

func (FromDistinctRule) Pattern() plan.Pattern {
return plan.Pat(universe.DistinctKind, plan.Pat(FromKind))
return plan.Pat(universe.DistinctKind, plan.Pat(PhysicalFromKind))
}

func (FromDistinctRule) Rewrite(distinctNode plan.PlanNode) (plan.PlanNode, bool, error) {
fromNode := distinctNode.Predecessors()[0]
distinctSpec := distinctNode.ProcedureSpec().(*universe.DistinctProcedureSpec)
fromSpec := fromNode.ProcedureSpec().(*FromProcedureSpec)
fromSpec := fromNode.ProcedureSpec().(*PhysicalFromProcedureSpec)

if fromSpec.LimitSet && fromSpec.PointsLimit == -1 {
return distinctNode, false, nil
Expand All @@ -485,7 +540,7 @@ func (FromDistinctRule) Rewrite(distinctNode plan.PlanNode) (plan.PlanNode, bool
((fromSpec.GroupMode == flux.GroupModeBy && execute.ContainsStr(fromSpec.GroupKeys, distinctSpec.Column)) ||
(fromSpec.GroupMode == flux.GroupModeExcept && !execute.ContainsStr(fromSpec.GroupKeys, distinctSpec.Column)))
if groupStar || groupByColumn {
newFromSpec := fromSpec.Copy().(*FromProcedureSpec)
newFromSpec := fromSpec.Copy().(*PhysicalFromProcedureSpec)
newFromSpec.LimitSet = true
newFromSpec.PointsLimit = -1
if err := fromNode.ReplaceSpec(newFromSpec); err != nil {
Expand All @@ -505,13 +560,13 @@ func (MergeFromGroupRule) Name() string {
}

func (MergeFromGroupRule) Pattern() plan.Pattern {
return plan.Pat(universe.GroupKind, plan.Pat(FromKind))
return plan.Pat(universe.GroupKind, plan.Pat(PhysicalFromKind))
}

func (MergeFromGroupRule) Rewrite(groupNode plan.PlanNode) (plan.PlanNode, bool, error) {
fromNode := groupNode.Predecessors()[0]
groupSpec := groupNode.ProcedureSpec().(*universe.GroupProcedureSpec)
fromSpec := fromNode.ProcedureSpec().(*FromProcedureSpec)
fromSpec := fromNode.ProcedureSpec().(*PhysicalFromProcedureSpec)

if fromSpec.GroupingSet ||
fromSpec.LimitSet ||
Expand All @@ -527,11 +582,11 @@ func (MergeFromGroupRule) Rewrite(groupNode plan.PlanNode) (plan.PlanNode, bool,
}
}

newFromSpec := fromSpec.Copy().(*FromProcedureSpec)
newFromSpec := fromSpec.Copy().(*PhysicalFromProcedureSpec)
newFromSpec.GroupingSet = true
newFromSpec.GroupMode = groupSpec.GroupMode
newFromSpec.GroupKeys = groupSpec.GroupKeys
merged, err := plan.MergePhysicalPlanNodes(groupNode, fromNode, newFromSpec)
merged, err := plan.MergeToPhysicalPlanNode(groupNode, fromNode, newFromSpec)
if err != nil {
return nil, false, err
}
Expand All @@ -546,18 +601,18 @@ func (FromKeysRule) Name() string {
}

func (FromKeysRule) Pattern() plan.Pattern {
return plan.Pat(universe.KeysKind, plan.Pat(FromKind))
return plan.Pat(universe.KeysKind, plan.Pat(PhysicalFromKind))
}

func (FromKeysRule) Rewrite(keysNode plan.PlanNode) (plan.PlanNode, bool, error) {
fromNode := keysNode.Predecessors()[0]
fromSpec := fromNode.ProcedureSpec().(*FromProcedureSpec)
fromSpec := fromNode.ProcedureSpec().(*PhysicalFromProcedureSpec)

if fromSpec.LimitSet && fromSpec.PointsLimit == -1 {
return keysNode, false, nil
}

newFromSpec := fromSpec.Copy().(*FromProcedureSpec)
newFromSpec := fromSpec.Copy().(*PhysicalFromProcedureSpec)
newFromSpec.LimitSet = true
newFromSpec.PointsLimit = -1

Expand All @@ -572,29 +627,30 @@ func (FromKeysRule) Rewrite(keysNode plan.PlanNode) (plan.PlanNode, bool, error)
// https://github.com/influxdata/flux/issues/114

func createFromSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID, a execute.Administration) (execute.Source, error) {
spec := prSpec.(*FromProcedureSpec)
spec := prSpec.(*PhysicalFromProcedureSpec)
var w execute.Window
bounds := a.StreamContext().Bounds()
if bounds == nil {
return nil, errors.New("nil bounds passed to from")
}

// Note: currently no planner rules will push a window() into from()
// so the following is dead code.
if spec.WindowSet {
w = execute.Window{
Every: execute.Duration(spec.Window.Every),
Period: execute.Duration(spec.Window.Period),
Round: execute.Duration(spec.Window.Round),
Start: bounds.Start,
Offset: execute.Duration(spec.Window.Offset),
}
} else {
duration := execute.Duration(bounds.Stop) - execute.Duration(bounds.Start)
w = execute.Window{
Every: duration,
Period: duration,
Start: bounds.Start,
Offset: bounds.Start.Remainder(duration),
}
}
currentTime := w.Start + execute.Time(w.Period)
currentTime := bounds.Start + execute.Time(w.Period)

deps := a.Dependencies()[FromKind].(Dependencies)
req := query.RequestFromContext(a.Context())
Expand Down
Loading

0 comments on commit 0213db3

Please sign in to comment.