From 9a4aad11b3b5bb6e4ebeee6d379da81ed032c324 Mon Sep 17 00:00:00 2001 From: Adrian Thurston Date: Tue, 15 Mar 2022 17:06:59 -0700 Subject: [PATCH] feat: take the concurrencyLimit from FF and keep it in the dependencies Pull the concurrencyLimit from feature flags at the start of the execution process, before planning, and stash it in the ExecutionOptions, which live in the ExecutionDependencies. From there it can then be modified by planner rules. This allows parallelization rules to raise the limit if they parallelize a query. At the same time we move the defaultMemoryLimit from the planner and into the execution options. We also move the computation of memory limit and concurrency quota from the planner and into the executor. Included are test cases covering the existing and the new method of determining query concurrency. --- execute/dependencies.go | 20 ++- execute/executor.go | 80 +++++++++-- execute/options_test.go | 227 +++++++++++++++++++++++++++++++ lang/compiler.go | 7 + lang/compiler_test.go | 48 +++---- plan/physical.go | 39 ------ plan/physical_test.go | 39 ------ plan/plantest/spec.go | 27 ++++ plan/plantest/{ => spec}/mock.go | 2 +- plan/plantest/{ => spec}/plan.go | 2 +- 10 files changed, 362 insertions(+), 129 deletions(-) create mode 100644 execute/options_test.go create mode 100644 plan/plantest/spec.go rename plan/plantest/{ => spec}/mock.go (98%) rename plan/plantest/{ => spec}/plan.go (99%) diff --git a/execute/dependencies.go b/execute/dependencies.go index f56aa32571..90bb6c0c5a 100644 --- a/execute/dependencies.go +++ b/execute/dependencies.go @@ -2,6 +2,7 @@ package execute import ( "context" + "math" "time" "github.com/influxdata/flux/codes" @@ -19,8 +20,10 @@ type key int const executionDependenciesKey key = iota type ExecutionOptions struct { - OperatorProfiler *OperatorProfiler - Profilers []Profiler + OperatorProfiler *OperatorProfiler + Profilers []Profiler + DefaultMemoryLimit int64 + ConcurrencyLimit int } // ExecutionDependencies represents the dependencies that a function call @@ -83,11 +86,14 @@ func NewExecutionDependencies(allocator *memory.Allocator, now *time.Time, logge now = &nowVar } return ExecutionDependencies{ - Allocator: allocator, - Now: now, - Logger: logger, - Metadata: make(metadata.Metadata), - ExecutionOptions: &ExecutionOptions{}, + Allocator: allocator, + Now: now, + Logger: logger, + Metadata: make(metadata.Metadata), + ExecutionOptions: &ExecutionOptions{ + DefaultMemoryLimit: math.MaxInt64, + ConcurrencyLimit: 0, + }, } } diff --git a/execute/executor.go b/execute/executor.go index cfed5c94e3..49dac41c6b 100644 --- a/execute/executor.go +++ b/execute/executor.go @@ -4,6 +4,7 @@ package execute import ( "context" "fmt" + "math" "reflect" "sync" "time" @@ -77,18 +78,7 @@ func (e *executor) Execute(ctx context.Context, p *plan.Spec, a *memory.Allocato return es.results, es.metaCh, nil } -func validatePlan(p *plan.Spec) error { - if p.Resources.ConcurrencyQuota == 0 { - return errors.New(codes.Invalid, "plan must have a non-zero concurrency quota") - } - return nil -} - func (e *executor) createExecutionState(ctx context.Context, p *plan.Spec, a *memory.Allocator) (*executionState, error) { - if err := validatePlan(p); err != nil { - return nil, errors.Wrap(err, codes.Invalid, "invalid plan") - } - ctx, cancel := context.WithCancel(ctx) es := &executionState{ p: p, @@ -115,6 +105,13 @@ func (e *executor) createExecutionState(ctx context.Context, p *plan.Spec, a *me // report metadata. es.metaCh = make(chan metadata.Metadata, len(es.sources)) + // Choose some default resource limits based on execution options, if necessary. + es.chooseDefaultResources(ctx, p) + + if err := es.validate(); err != nil { + return nil, errors.Wrap(err, codes.Invalid, "execution state") + } + return v.es, nil } @@ -357,6 +354,67 @@ func getResultName(node plan.Node, spec plan.ProcedureSpec, isParallelMerge bool return name, nil } +func (es *executionState) validate() error { + if es.resources.ConcurrencyQuota == 0 { + return errors.New(codes.Invalid, "execution state must have a non-zero concurrency quota") + } + return nil +} + +// getResourceExecOptions returns the DefaultMemoryLimit and ConcurrencyLimit +// from exec options, if present. +func getResourceLimits(ctx context.Context) (int64, int) { + // Initialize resources from the execution dependencies and/or properties of the plan. + if !HaveExecutionDependencies(ctx) { + return 0, math.MaxInt64 + } + + execOptions := GetExecutionDependencies(ctx).ExecutionOptions + return execOptions.DefaultMemoryLimit, execOptions.ConcurrencyLimit +} + +func (es *executionState) chooseDefaultResources(ctx context.Context, p *plan.Spec) { + defaultMemoryLimit, concurrencyLimit := getResourceLimits(ctx) + + // Update memory quota + if es.resources.MemoryBytesQuota == 0 { + es.resources.MemoryBytesQuota = defaultMemoryLimit + } + + // Update concurrency quota + if es.resources.ConcurrencyQuota == 0 { + es.resources.ConcurrencyQuota = len(p.Roots) + + // If the query concurrency limit is greater than zero, + // we will use the new behavior that sets the concurrency + // quota equal to the number of transformations and limits + // it to the value specified. + if concurrencyLimit > 0 { + concurrencyQuota := 0 + _ = p.TopDownWalk(func(node plan.Node) error { + // Do not include source nodes in the node list as + // they do not use the dispatcher. + if len(node.Predecessors()) > 0 { + addend := 1 + ppn := node.(*plan.PhysicalPlanNode) + if attr, ok := ppn.OutputAttrs[plan.ParallelRunKey]; ok { + addend = attr.(plan.ParallelRunAttribute).Factor + } + concurrencyQuota += addend + } + return nil + }) + + if concurrencyQuota > int(concurrencyLimit) { + concurrencyQuota = int(concurrencyLimit) + } else if concurrencyQuota == 0 { + concurrencyQuota = 1 + } + es.resources.ConcurrencyQuota = concurrencyQuota + } + } +} + func (es *executionState) abort(err error) { for _, r := range es.results { r.(*result).abort(err) diff --git a/execute/options_test.go b/execute/options_test.go new file mode 100644 index 0000000000..7159ecbbe4 --- /dev/null +++ b/execute/options_test.go @@ -0,0 +1,227 @@ +package execute + +import ( + "context" + "math" + "testing" + + "github.com/influxdata/flux" + "github.com/influxdata/flux/plan" + planspec "github.com/influxdata/flux/plan/plantest/spec" + "go.uber.org/zap/zaptest" +) + +func TestExecuteOptions(t *testing.T) { + type runWith struct { + concurrencyQuota int + memoryBytesQuota int64 + } + + testcases := []struct { + name string + spec *planspec.PlanSpec + concurrencyLimit int + defaultMemoryLimit int64 + want runWith + }{ + { + // If the concurrency quota and max bytes are set in the plan + // resources, the execution state always uses those resources. + // Historically, the values in the plan resources always took + // precedence. + name: "via-plan-no-options", + spec: &planspec.PlanSpec{ + Nodes: []plan.Node{ + planspec.CreatePhysicalMockNode("0"), + planspec.CreatePhysicalMockNode("1"), + }, + Resources: flux.ResourceManagement{ + MemoryBytesQuota: 163484, + ConcurrencyQuota: 4, + }, + Edges: [][2]int{ + {0, 1}, + }, + }, + want: runWith{ + memoryBytesQuota: 163484, + concurrencyQuota: 4, + }, + }, + { + // Use the plan resources even if the execution options are set. + name: "via-plan-with-exec-options", + spec: &planspec.PlanSpec{ + Nodes: []plan.Node{ + planspec.CreatePhysicalMockNode("0"), + planspec.CreatePhysicalMockNode("1"), + }, + Resources: flux.ResourceManagement{ + MemoryBytesQuota: 163484, + ConcurrencyQuota: 4, + }, + Edges: [][2]int{ + {0, 1}, + }, + }, + defaultMemoryLimit: 8, + concurrencyLimit: 2, + + want: runWith{ + memoryBytesQuota: 163484, + concurrencyQuota: 4, + }, + }, + { + // Choose resources based on the default execute options. We get + // old behaviour of choosing concurrency quota based on the number + // of roots in the plan. + name: "defaults-one-root", + spec: &planspec.PlanSpec{ + Nodes: []plan.Node{ + planspec.CreatePhysicalMockNode("0"), + planspec.CreatePhysicalMockNode("1"), + planspec.CreatePhysicalMockNode("2"), + planspec.CreatePhysicalMockNode("3"), + }, + Edges: [][2]int{ + {0, 1}, + {1, 2}, + {2, 3}, + }, + }, + want: runWith{ + memoryBytesQuota: math.MaxInt64, + concurrencyQuota: 1, + }, + }, + { + // Again use the default execute options. Two roots in the plan + // means we get a concurrency quota of two. + name: "defaults-two-roots", + spec: &planspec.PlanSpec{ + Nodes: []plan.Node{ + planspec.CreatePhysicalMockNode("0"), + planspec.CreatePhysicalMockNode("1"), + planspec.CreatePhysicalMockNode("root-0"), + planspec.CreatePhysicalMockNode("root-1"), + }, + Edges: [][2]int{ + {0, 1}, + {1, 2}, + {1, 3}, + }, + }, + want: runWith{ + memoryBytesQuota: math.MaxInt64, + concurrencyQuota: 2, + }, + }, + { + // Set the execute options. The memory limit passes in verbatim. + // The concurrency limit is 16 and the new behaviour of choosing + // the concurreny quota based on the number of non-source nodes is + // active. + name: "via-options-new-behaviour-non-source", + spec: &planspec.PlanSpec{ + Nodes: []plan.Node{ + planspec.CreatePhysicalMockNode("0"), + planspec.CreatePhysicalMockNode("1"), + planspec.CreatePhysicalMockNode("2"), + planspec.CreatePhysicalMockNode("3"), + planspec.CreatePhysicalMockNode("root-0"), + planspec.CreatePhysicalMockNode("root-1"), + }, + Edges: [][2]int{ + {0, 1}, + {1, 2}, + {2, 3}, + {3, 4}, + {3, 5}, + }, + }, + defaultMemoryLimit: 32768, + concurrencyLimit: 16, + want: runWith{ + memoryBytesQuota: 32768, + concurrencyQuota: 5, + }, + }, + { + // Set the execute options. We want the new behaviour of setting + // concurrency quota based on the number of non-source nodes (5), + // but limited by the concurrency limit (4). + name: "via-options-new-behaviour-limited", + spec: &planspec.PlanSpec{ + Nodes: []plan.Node{ + planspec.CreatePhysicalMockNode("0"), + planspec.CreatePhysicalMockNode("1"), + planspec.CreatePhysicalMockNode("2"), + planspec.CreatePhysicalMockNode("3"), + planspec.CreatePhysicalMockNode("root-0"), + planspec.CreatePhysicalMockNode("root-1"), + }, + Edges: [][2]int{ + {0, 1}, + {1, 2}, + {2, 3}, + {3, 4}, + {3, 5}, + }, + }, + defaultMemoryLimit: 32768, + concurrencyLimit: 4, + want: runWith{ + memoryBytesQuota: 32768, + concurrencyQuota: 4, + }, + }, + } + + for _, tc := range testcases { + execDeps := NewExecutionDependencies(nil, nil, nil) + ctx := execDeps.Inject(context.Background()) + + inputPlan := planspec.CreatePlanSpec(tc.spec) + + thePlanner := plan.NewPhysicalPlanner() + outputPlan, err := thePlanner.Plan(context.Background(), inputPlan) + if err != nil { + t.Fatalf("Physical planning failed: %v", err) + } + + // + // Modify the execution options. In practice, we would do this from + // planner rules + // + if tc.defaultMemoryLimit != 0 { + execDeps.ExecutionOptions.DefaultMemoryLimit = tc.defaultMemoryLimit + } + if tc.concurrencyLimit != 0 { + execDeps.ExecutionOptions.ConcurrencyLimit = tc.concurrencyLimit + } + + // Construct a basic execution state and choose the default resources. + es := &executionState{ + p: outputPlan, + ctx: ctx, + resources: outputPlan.Resources, + logger: zaptest.NewLogger(t), + } + es.chooseDefaultResources(ctx, outputPlan) + + if err := es.validate(); err != nil { + t.Fatalf("execution state failed validation: %s", err.Error()) + } + + if es.resources.MemoryBytesQuota != tc.want.memoryBytesQuota { + t.Errorf("Expected memory quota of %v, but execution state has %v", + tc.want.memoryBytesQuota, es.resources.MemoryBytesQuota) + } + + if es.resources.ConcurrencyQuota != tc.want.concurrencyQuota { + t.Errorf("Expected concurrency quota of %v, but execution state has %v", + tc.want.concurrencyQuota, es.resources.ConcurrencyQuota) + } + } +} diff --git a/lang/compiler.go b/lang/compiler.go index 9e3fffba70..34f3ba109b 100644 --- a/lang/compiler.go +++ b/lang/compiler.go @@ -12,6 +12,7 @@ import ( "github.com/influxdata/flux/dependency" "github.com/influxdata/flux/execute" "github.com/influxdata/flux/internal/errors" + "github.com/influxdata/flux/internal/feature" "github.com/influxdata/flux/internal/jaeger" "github.com/influxdata/flux/internal/spec" "github.com/influxdata/flux/interpreter" @@ -475,6 +476,12 @@ func (p *AstProgram) Start(ctx context.Context, alloc *memory.Allocator) (flux.Q // The program must inject execution dependencies to make it available to // function calls during the evaluation phase (see `tableFind`). deps := execute.NewExecutionDependencies(alloc, &p.Now, p.Logger) + + // The query concurrency limit is taken from the feature flag, then lives + // in the depenencies. This gives us an opportunity to modify it before + // execution begins. + deps.ExecutionOptions.ConcurrencyLimit = feature.QueryConcurrencyLimit().Int(ctx) + ctx, span := dependency.Inject(ctx, deps) nextPlanNodeID := new(int) ctx = context.WithValue(ctx, plan.NextPlanNodeIDKey, nextPlanNodeID) diff --git a/lang/compiler_test.go b/lang/compiler_test.go index 06e4162ea8..ae973661c8 100644 --- a/lang/compiler_test.go +++ b/lang/compiler_test.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "encoding/json" - "math" "regexp" "strings" "testing" @@ -243,8 +242,7 @@ csv.from(csv: "foo,bar") |> range(start: 2017-10-10T00:00:00Z) Edges: [][2]int{ {0, 1}, }, - Resources: flux.ResourceManagement{ConcurrencyQuota: 1, MemoryBytesQuota: math.MaxInt64}, - Now: parser.MustParseTime("2017-10-10T00:01:00Z").Value, + Now: parser.MustParseTime("2017-10-10T00:01:00Z").Value, }, }, { @@ -262,8 +260,7 @@ csv.from(csv: "foo,bar") |> range(start: 2017-10-10T00:00:00Z) Edges: [][2]int{ {0, 1}, }, - Resources: flux.ResourceManagement{ConcurrencyQuota: 1, MemoryBytesQuota: math.MaxInt64}, - Now: parser.MustParseTime("2018-10-10T00:00:00Z").Value, + Now: parser.MustParseTime("2018-10-10T00:00:00Z").Value, }, }, { @@ -294,8 +291,7 @@ csv.from(csv: "foo,bar") |> range(start: 2017-10-10T00:00:00Z) Edges: [][2]int{ {0, 1}, }, - Resources: flux.ResourceManagement{ConcurrencyQuota: 1, MemoryBytesQuota: math.MaxInt64}, - Now: parser.MustParseTime("2018-10-10T00:00:00Z").Value, + Now: parser.MustParseTime("2018-10-10T00:00:00Z").Value, }, }, { @@ -509,8 +505,7 @@ func TestCompileOptions(t *testing.T) { Edges: [][2]int{ {0, 1}, }, - Resources: flux.ResourceManagement{ConcurrencyQuota: 1, MemoryBytesQuota: math.MaxInt64}, - Now: parser.MustParseTime("2018-10-10T00:00:00Z").Value, + Now: parser.MustParseTime("2018-10-10T00:00:00Z").Value, }) if err := plantest.ComparePlansShallow(want, program.PlanSpec); err != nil { @@ -552,9 +547,8 @@ from(bucket: "bkt") |> range(start: 0) |> filter(fn: (r) => r._value > 0) |> cou Nodes: []plan.Node{ &plan.PhysicalPlanNode{Spec: &influxdb.FromRemoteProcedureSpec{}}, }, - Edges: [][2]int{}, - Resources: flux.ResourceManagement{ConcurrencyQuota: 1, MemoryBytesQuota: math.MaxInt64}, - Now: nowFn(), + Edges: [][2]int{}, + Now: nowFn(), }), }, { @@ -573,8 +567,7 @@ from(bucket: "bkt") |> range(start: 0) |> filter(fn: (r) => r._value > 0) |> cou Edges: [][2]int{ {0, 1}, }, - Resources: flux.ResourceManagement{ConcurrencyQuota: 1, MemoryBytesQuota: math.MaxInt64}, - Now: nowFn(), + Now: nowFn(), }), }, { @@ -596,8 +589,7 @@ from(bucket: "bkt") |> range(start: 0) |> filter(fn: (r) => r._value > 0) |> cou {0, 1}, {1, 2}, }, - Resources: flux.ResourceManagement{ConcurrencyQuota: 1, MemoryBytesQuota: math.MaxInt64}, - Now: nowFn(), + Now: nowFn(), }), }, { @@ -619,8 +611,7 @@ from(bucket: "bkt") |> range(start: 0) |> filter(fn: (r) => r._value > 0) |> cou {0, 1}, {1, 2}, }, - Resources: flux.ResourceManagement{ConcurrencyQuota: 1, MemoryBytesQuota: math.MaxInt64}, - Now: nowFn(), + Now: nowFn(), }), }, { @@ -636,9 +627,8 @@ from(bucket: "bkt") |> range(start: 0) |> filter(fn: (r) => r._value > 0) |> cou Nodes: []plan.Node{ &plan.PhysicalPlanNode{Spec: &influxdb.FromRemoteProcedureSpec{}}, }, - Edges: [][2]int{}, - Resources: flux.ResourceManagement{ConcurrencyQuota: 1, MemoryBytesQuota: math.MaxInt64}, - Now: nowFn(), + Edges: [][2]int{}, + Now: nowFn(), }), }, { @@ -654,9 +644,8 @@ from(bucket: "bkt") |> range(start: 0) |> filter(fn: (r) => r._value > 0) |> cou Nodes: []plan.Node{ &plan.PhysicalPlanNode{Spec: &influxdb.FromRemoteProcedureSpec{}}, }, - Edges: [][2]int{}, - Resources: flux.ResourceManagement{ConcurrencyQuota: 1, MemoryBytesQuota: math.MaxInt64}, - Now: nowFn(), + Edges: [][2]int{}, + Now: nowFn(), }), }, { @@ -717,9 +706,8 @@ from(bucket: "bkt") |> range(start: 0) |> filter(fn: (r) => r._value > 0) |> cou Nodes: []plan.Node{ &plan.PhysicalPlanNode{Spec: &influxdb.FromRemoteProcedureSpec{}}, }, - Edges: [][2]int{}, - Resources: flux.ResourceManagement{ConcurrencyQuota: 1, MemoryBytesQuota: math.MaxInt64}, - Now: nowFn(), + Edges: [][2]int{}, + Now: nowFn(), }), }, { @@ -741,8 +729,7 @@ from(bucket: "bkt") |> range(start: 0) |> filter(fn: (r) => r._value > 0) |> cou {0, 1}, {1, 2}, }, - Resources: flux.ResourceManagement{ConcurrencyQuota: 1, MemoryBytesQuota: math.MaxInt64}, - Now: nowFn(), + Now: nowFn(), }), }, { @@ -768,8 +755,7 @@ option planner.disableLogicalRules = ["removeCountRule"]`}, {0, 1}, {1, 2}, }, - Resources: flux.ResourceManagement{ConcurrencyQuota: 1, MemoryBytesQuota: math.MaxInt64}, - Now: nowFn(), + Now: nowFn(), }), }, } diff --git a/plan/physical.go b/plan/physical.go index 926d8f0ac1..6330b830cb 100644 --- a/plan/physical.go +++ b/plan/physical.go @@ -7,7 +7,6 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/codes" - "github.com/influxdata/flux/internal/feature" "github.com/influxdata/flux/interpreter" ) @@ -89,44 +88,6 @@ func (pp *physicalPlanner) Plan(ctx context.Context, spec *Spec) (*Spec, error) } } - // Update memory quota - if transformedSpec.Resources.MemoryBytesQuota == 0 { - transformedSpec.Resources.MemoryBytesQuota = pp.defaultMemoryLimit - } - - // Update concurrency quota - if transformedSpec.Resources.ConcurrencyQuota == 0 { - transformedSpec.Resources.ConcurrencyQuota = len(transformedSpec.Roots) - - // If the query concurrency limit is greater than zero, - // we will use the new behavior that sets the concurrency - // quota equal to the number of transformations and limits - // it to the value specified. - if concurrencyLimit := feature.QueryConcurrencyLimit().Int(ctx); concurrencyLimit > 0 { - concurrencyQuota := 0 - _ = transformedSpec.TopDownWalk(func(node Node) error { - // Do not include source nodes in the node list as - // they do not use the dispatcher. - if len(node.Predecessors()) > 0 { - addend := 1 - ppn := node.(*PhysicalPlanNode) - if attr, ok := ppn.OutputAttrs[ParallelRunKey]; ok { - addend = attr.(ParallelRunAttribute).Factor - } - concurrencyQuota += addend - } - return nil - }) - - if concurrencyQuota > int(concurrencyLimit) { - concurrencyQuota = int(concurrencyLimit) - } else if concurrencyQuota == 0 { - concurrencyQuota = 1 - } - transformedSpec.Resources.ConcurrencyQuota = concurrencyQuota - } - } - return transformedSpec, nil } diff --git a/plan/physical_test.go b/plan/physical_test.go index 442c8da9a3..d57f6c1b50 100644 --- a/plan/physical_test.go +++ b/plan/physical_test.go @@ -3,51 +3,12 @@ package plan_test import ( "context" "fmt" - "math" "testing" "github.com/influxdata/flux/plan" "github.com/influxdata/flux/plan/plantest" ) -func TestPhysicalOptions(t *testing.T) { - configs := [][]plan.PhysicalOption{ - {plan.WithDefaultMemoryLimit(16384)}, - {}, - } - - for _, options := range configs { - spec := &plantest.PlanSpec{ - Nodes: []plan.Node{ - plantest.CreatePhysicalMockNode("0"), - plantest.CreatePhysicalMockNode("1"), - }, - Edges: [][2]int{ - {0, 1}, - }, - } - - inputPlan := plantest.CreatePlanSpec(spec) - - thePlanner := plan.NewPhysicalPlanner(options...) - outputPlan, err := thePlanner.Plan(context.Background(), inputPlan) - if err != nil { - t.Fatalf("Physical planning failed: %v", err) - } - - // If option was specified, we should have overridden the default memory quota. - if len(options) > 0 { - if outputPlan.Resources.MemoryBytesQuota != 16384 { - t.Errorf("Expected memory quota of 16384 with option specified") - } - } else { - if outputPlan.Resources.MemoryBytesQuota != math.MaxInt64 { - t.Errorf("Expected memory quota of math.MaxInt64 with no options specified") - } - } - } -} - func TestPhysicalIntegrityCheckOption(t *testing.T) { node0 := plantest.CreatePhysicalMockNode("0") node1 := plantest.CreatePhysicalMockNode("1") diff --git a/plan/plantest/spec.go b/plan/plantest/spec.go new file mode 100644 index 0000000000..256de68b01 --- /dev/null +++ b/plan/plantest/spec.go @@ -0,0 +1,27 @@ +package plantest + +import ( + "github.com/influxdata/flux/plan" + "github.com/influxdata/flux/plan/plantest/spec" +) + +// +// Export the plan/plantest/spec types and functions. This is isolated so it +// can be used in testing code for /execute, where the full plantest +// dependencies cause an import cycle. +// + +type PlanSpec = spec.PlanSpec +type MockProcedureSpec = spec.MockProcedureSpec + +func CreatePlanSpec(ps *PlanSpec) *plan.Spec { + return spec.CreatePlanSpec(ps) +} + +func CreateLogicalMockNode(id string) *plan.LogicalNode { + return spec.CreateLogicalMockNode(id) +} + +func CreatePhysicalMockNode(id string) *plan.PhysicalPlanNode { + return spec.CreatePhysicalMockNode(id) +} diff --git a/plan/plantest/mock.go b/plan/plantest/spec/mock.go similarity index 98% rename from plan/plantest/mock.go rename to plan/plantest/spec/mock.go index fb93ef88bb..56de3432e8 100644 --- a/plan/plantest/mock.go +++ b/plan/plantest/spec/mock.go @@ -1,4 +1,4 @@ -package plantest +package spec import "github.com/influxdata/flux/plan" diff --git a/plan/plantest/plan.go b/plan/plantest/spec/plan.go similarity index 99% rename from plan/plantest/plan.go rename to plan/plantest/spec/plan.go index 854e9cdf7e..2be2daa189 100644 --- a/plan/plantest/plan.go +++ b/plan/plantest/spec/plan.go @@ -1,4 +1,4 @@ -package plantest +package spec import ( "fmt"