Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: take the concurrencyLimit from feature flags and keep in dependencies #4564

Merged
merged 1 commit into from
Mar 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions execute/dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package execute

import (
"context"
"math"
"time"

"github.com/influxdata/flux/codes"
Expand All @@ -19,8 +20,10 @@ type key int
const executionDependenciesKey key = iota

type ExecutionOptions struct {
OperatorProfiler *OperatorProfiler
Profilers []Profiler
OperatorProfiler *OperatorProfiler
Profilers []Profiler
DefaultMemoryLimit int64
ConcurrencyLimit int
}

// ExecutionDependencies represents the dependencies that a function call
Expand Down Expand Up @@ -83,11 +86,14 @@ func NewExecutionDependencies(allocator *memory.Allocator, now *time.Time, logge
now = &nowVar
}
return ExecutionDependencies{
Allocator: allocator,
Now: now,
Logger: logger,
Metadata: make(metadata.Metadata),
ExecutionOptions: &ExecutionOptions{},
Allocator: allocator,
Now: now,
Logger: logger,
Metadata: make(metadata.Metadata),
ExecutionOptions: &ExecutionOptions{
DefaultMemoryLimit: math.MaxInt64,
ConcurrencyLimit: 0,
},
}
}

Expand Down
80 changes: 69 additions & 11 deletions execute/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package execute
import (
"context"
"fmt"
"math"
"reflect"
"sync"
"time"
Expand Down Expand Up @@ -77,18 +78,7 @@ func (e *executor) Execute(ctx context.Context, p *plan.Spec, a *memory.Allocato
return es.results, es.metaCh, nil
}

func validatePlan(p *plan.Spec) error {
if p.Resources.ConcurrencyQuota == 0 {
return errors.New(codes.Invalid, "plan must have a non-zero concurrency quota")
}
return nil
}

func (e *executor) createExecutionState(ctx context.Context, p *plan.Spec, a *memory.Allocator) (*executionState, error) {
if err := validatePlan(p); err != nil {
return nil, errors.Wrap(err, codes.Invalid, "invalid plan")
}

ctx, cancel := context.WithCancel(ctx)
es := &executionState{
p: p,
Expand All @@ -115,6 +105,13 @@ func (e *executor) createExecutionState(ctx context.Context, p *plan.Spec, a *me
// report metadata.
es.metaCh = make(chan metadata.Metadata, len(es.sources))

// Choose some default resource limits based on execution options, if necessary.
es.chooseDefaultResources(ctx, p)

if err := es.validate(); err != nil {
return nil, errors.Wrap(err, codes.Invalid, "execution state")
}

return v.es, nil
}

Expand Down Expand Up @@ -357,6 +354,67 @@ func getResultName(node plan.Node, spec plan.ProcedureSpec, isParallelMerge bool
return name, nil
}

func (es *executionState) validate() error {
if es.resources.ConcurrencyQuota == 0 {
return errors.New(codes.Invalid, "execution state must have a non-zero concurrency quota")
}
return nil
}

// getResourceExecOptions returns the DefaultMemoryLimit and ConcurrencyLimit
// from exec options, if present.
func getResourceLimits(ctx context.Context) (int64, int) {
// Initialize resources from the execution dependencies and/or properties of the plan.
if !HaveExecutionDependencies(ctx) {
return 0, math.MaxInt64
}

execOptions := GetExecutionDependencies(ctx).ExecutionOptions
return execOptions.DefaultMemoryLimit, execOptions.ConcurrencyLimit
}

func (es *executionState) chooseDefaultResources(ctx context.Context, p *plan.Spec) {
defaultMemoryLimit, concurrencyLimit := getResourceLimits(ctx)

// Update memory quota
if es.resources.MemoryBytesQuota == 0 {
es.resources.MemoryBytesQuota = defaultMemoryLimit
}

// Update concurrency quota
if es.resources.ConcurrencyQuota == 0 {
es.resources.ConcurrencyQuota = len(p.Roots)

// If the query concurrency limit is greater than zero,
// we will use the new behavior that sets the concurrency
// quota equal to the number of transformations and limits
// it to the value specified.
if concurrencyLimit > 0 {
concurrencyQuota := 0
_ = p.TopDownWalk(func(node plan.Node) error {
// Do not include source nodes in the node list as
// they do not use the dispatcher.
if len(node.Predecessors()) > 0 {
addend := 1
ppn := node.(*plan.PhysicalPlanNode)
if attr, ok := ppn.OutputAttrs[plan.ParallelRunKey]; ok {
addend = attr.(plan.ParallelRunAttribute).Factor
}
concurrencyQuota += addend
}
return nil
})

if concurrencyQuota > int(concurrencyLimit) {
concurrencyQuota = int(concurrencyLimit)
} else if concurrencyQuota == 0 {
concurrencyQuota = 1
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious about this branch here. If we had a trivial query whose execution graph just had a ReadWindowAggregate node, I guess concurrencyQuota could be 0. Do we require it to be positive even if there are no non-sources?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be mistaken, but my guess is there needs to be at least one goroutine to work the consecutive transport belonging to the source nodes. The source nodes themselves I think just deposit messages to the outgoing dataset and that's as far as the source goroutines take it. There needs to be a dispatcher thread that reads those messages and writes to CSV writer.

Copy link
Contributor

@onelson onelson Mar 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anecdotally, during recent refactors when I failed to mark any nodes as roots (my mistake) the concurrency quota was set to zero, which produced an error. It didn't seem like there was any conditional around that check since it failed for from/range/filter which I think would have be rewritten as a single source.

flux/execute/executor.go

Lines 80 to 85 in dc08c57

func validatePlan(p *plan.Spec) error {
if p.Resources.ConcurrencyQuota == 0 {
return errors.New(codes.Invalid, "plan must have a non-zero concurrency quota")
}
return nil
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right makes sense.

Given Go's convention of having a useful zero value, I wonder if we should change the meaning of concurrency quota to be the number of additional goroutines after the required one. Or maybe 0 should just mean the default of 1.

Nothing that needs to change here for this PR, it just seems a little weird.

}
es.resources.ConcurrencyQuota = concurrencyQuota
}
}
}

func (es *executionState) abort(err error) {
for _, r := range es.results {
r.(*result).abort(err)
Expand Down
227 changes: 227 additions & 0 deletions execute/options_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
package execute

import (
"context"
"math"
"testing"

"github.com/influxdata/flux"
"github.com/influxdata/flux/plan"
planspec "github.com/influxdata/flux/plan/plantest/spec"
"go.uber.org/zap/zaptest"
)

func TestExecuteOptions(t *testing.T) {
type runWith struct {
concurrencyQuota int
memoryBytesQuota int64
}

testcases := []struct {
name string
spec *planspec.PlanSpec
concurrencyLimit int
defaultMemoryLimit int64
want runWith
}{
{
// If the concurrency quota and max bytes are set in the plan
// resources, the execution state always uses those resources.
// Historically, the values in the plan resources always took
// precedence.
name: "via-plan-no-options",
spec: &planspec.PlanSpec{
Nodes: []plan.Node{
planspec.CreatePhysicalMockNode("0"),
planspec.CreatePhysicalMockNode("1"),
},
Resources: flux.ResourceManagement{
MemoryBytesQuota: 163484,
ConcurrencyQuota: 4,
},
Edges: [][2]int{
{0, 1},
},
},
want: runWith{
memoryBytesQuota: 163484,
concurrencyQuota: 4,
},
},
{
// Use the plan resources even if the execution options are set.
name: "via-plan-with-exec-options",
spec: &planspec.PlanSpec{
Nodes: []plan.Node{
planspec.CreatePhysicalMockNode("0"),
planspec.CreatePhysicalMockNode("1"),
},
Resources: flux.ResourceManagement{
MemoryBytesQuota: 163484,
ConcurrencyQuota: 4,
},
Edges: [][2]int{
{0, 1},
},
},
defaultMemoryLimit: 8,
concurrencyLimit: 2,

want: runWith{
memoryBytesQuota: 163484,
concurrencyQuota: 4,
},
},
{
// Choose resources based on the default execute options. We get
// old behaviour of choosing concurrency quota based on the number
// of roots in the plan.
name: "defaults-one-root",
spec: &planspec.PlanSpec{
Nodes: []plan.Node{
planspec.CreatePhysicalMockNode("0"),
planspec.CreatePhysicalMockNode("1"),
planspec.CreatePhysicalMockNode("2"),
planspec.CreatePhysicalMockNode("3"),
},
Edges: [][2]int{
{0, 1},
{1, 2},
{2, 3},
},
},
want: runWith{
memoryBytesQuota: math.MaxInt64,
concurrencyQuota: 1,
},
},
{
// Again use the default execute options. Two roots in the plan
// means we get a concurrency quota of two.
name: "defaults-two-roots",
spec: &planspec.PlanSpec{
Nodes: []plan.Node{
planspec.CreatePhysicalMockNode("0"),
planspec.CreatePhysicalMockNode("1"),
planspec.CreatePhysicalMockNode("root-0"),
planspec.CreatePhysicalMockNode("root-1"),
},
Edges: [][2]int{
{0, 1},
{1, 2},
{1, 3},
},
},
want: runWith{
memoryBytesQuota: math.MaxInt64,
concurrencyQuota: 2,
},
},
{
// Set the execute options. The memory limit passes in verbatim.
// The concurrency limit is 16 and the new behaviour of choosing
// the concurreny quota based on the number of non-source nodes is
// active.
name: "via-options-new-behaviour-non-source",
spec: &planspec.PlanSpec{
Nodes: []plan.Node{
planspec.CreatePhysicalMockNode("0"),
planspec.CreatePhysicalMockNode("1"),
planspec.CreatePhysicalMockNode("2"),
planspec.CreatePhysicalMockNode("3"),
planspec.CreatePhysicalMockNode("root-0"),
planspec.CreatePhysicalMockNode("root-1"),
},
Edges: [][2]int{
{0, 1},
{1, 2},
{2, 3},
{3, 4},
{3, 5},
},
},
defaultMemoryLimit: 32768,
concurrencyLimit: 16,
want: runWith{
memoryBytesQuota: 32768,
concurrencyQuota: 5,
},
},
{
// Set the execute options. We want the new behaviour of setting
// concurrency quota based on the number of non-source nodes (5),
// but limited by the concurrency limit (4).
name: "via-options-new-behaviour-limited",
spec: &planspec.PlanSpec{
Nodes: []plan.Node{
planspec.CreatePhysicalMockNode("0"),
planspec.CreatePhysicalMockNode("1"),
planspec.CreatePhysicalMockNode("2"),
planspec.CreatePhysicalMockNode("3"),
planspec.CreatePhysicalMockNode("root-0"),
planspec.CreatePhysicalMockNode("root-1"),
},
Edges: [][2]int{
{0, 1},
{1, 2},
{2, 3},
{3, 4},
{3, 5},
},
},
defaultMemoryLimit: 32768,
concurrencyLimit: 4,
want: runWith{
memoryBytesQuota: 32768,
concurrencyQuota: 4,
},
},
}

for _, tc := range testcases {
execDeps := NewExecutionDependencies(nil, nil, nil)
ctx := execDeps.Inject(context.Background())

inputPlan := planspec.CreatePlanSpec(tc.spec)

thePlanner := plan.NewPhysicalPlanner()
outputPlan, err := thePlanner.Plan(context.Background(), inputPlan)
if err != nil {
t.Fatalf("Physical planning failed: %v", err)
}

//
// Modify the execution options. In practice, we would do this from
// planner rules
//
if tc.defaultMemoryLimit != 0 {
execDeps.ExecutionOptions.DefaultMemoryLimit = tc.defaultMemoryLimit
}
if tc.concurrencyLimit != 0 {
execDeps.ExecutionOptions.ConcurrencyLimit = tc.concurrencyLimit
}

// Construct a basic execution state and choose the default resources.
es := &executionState{
p: outputPlan,
ctx: ctx,
resources: outputPlan.Resources,
logger: zaptest.NewLogger(t),
}
es.chooseDefaultResources(ctx, outputPlan)

if err := es.validate(); err != nil {
t.Fatalf("execution state failed validation: %s", err.Error())
}

if es.resources.MemoryBytesQuota != tc.want.memoryBytesQuota {
t.Errorf("Expected memory quota of %v, but execution state has %v",
tc.want.memoryBytesQuota, es.resources.MemoryBytesQuota)
}

if es.resources.ConcurrencyQuota != tc.want.concurrencyQuota {
t.Errorf("Expected concurrency quota of %v, but execution state has %v",
tc.want.concurrencyQuota, es.resources.ConcurrencyQuota)
}
}
}
7 changes: 7 additions & 0 deletions lang/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/influxdata/flux/dependency"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/internal/errors"
"github.com/influxdata/flux/internal/feature"
"github.com/influxdata/flux/internal/jaeger"
"github.com/influxdata/flux/internal/spec"
"github.com/influxdata/flux/interpreter"
Expand Down Expand Up @@ -475,6 +476,12 @@ func (p *AstProgram) Start(ctx context.Context, alloc *memory.Allocator) (flux.Q
// The program must inject execution dependencies to make it available to
// function calls during the evaluation phase (see `tableFind`).
deps := execute.NewExecutionDependencies(alloc, &p.Now, p.Logger)

// The query concurrency limit is taken from the feature flag, then lives
// in the depenencies. This gives us an opportunity to modify it before
// execution begins.
deps.ExecutionOptions.ConcurrencyLimit = feature.QueryConcurrencyLimit().Int(ctx)

ctx, span := dependency.Inject(ctx, deps)
nextPlanNodeID := new(int)
ctx = context.WithValue(ctx, plan.NextPlanNodeIDKey, nextPlanNodeID)
Expand Down
Loading