Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Added default admin config for MaxParallelism #262

Merged
merged 1 commit into from
Oct 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,8 @@ func (m *ExecutionManager) getInheritedExecMetadata(ctx context.Context, request

// Produces execution-time attributes for workflow execution.
// Defaults to overridable execution values set in the execution create request, then looks at the launch plan values
// (if any) before defaulting to values set in the matchable resource db.
// (if any) before defaulting to values set in the matchable resource db and further if matchable resources don't
// exist then defaults to one set in application configuration
func (m *ExecutionManager) getExecutionConfig(ctx context.Context, request *admin.ExecutionCreateRequest,
launchPlan *admin.LaunchPlan) (*admin.WorkflowExecutionConfig, error) {
if request.Spec.MaxParallelism > 0 {
Expand Down Expand Up @@ -565,7 +566,10 @@ func (m *ExecutionManager) getExecutionConfig(ctx context.Context, request *admi
if resource != nil && resource.Attributes.GetWorkflowExecutionConfig() != nil {
return resource.Attributes.GetWorkflowExecutionConfig(), nil
}
return nil, nil
// Defaults to one from the application config
return &admin.WorkflowExecutionConfig{
MaxParallelism: m.config.ApplicationConfiguration().GetTopLevelConfig().GetMaxParallelism(),
}, nil
}

func (m *ExecutionManager) launchSingleTaskExecution(
Expand Down
24 changes: 23 additions & 1 deletion pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

managerInterfaces "github.com/flyteorg/flyteadmin/pkg/manager/interfaces"
managerMocks "github.com/flyteorg/flyteadmin/pkg/manager/mocks"
"github.com/flyteorg/flyteadmin/pkg/runtime"

"github.com/flyteorg/flyteidl/clients/go/coreutils"

Expand Down Expand Up @@ -3300,9 +3301,10 @@ func TestGetExecutionConfig_Spec(t *testing.T) {
t.Errorf("When a user specifies max parallelism in a spec, the db should not be queried")
return nil, nil
}

applicationConfig := runtime.NewConfigurationProvider()
executionManager := ExecutionManager{
resourceManager: &resourceManager,
config: applicationConfig,
}
execConfig, err := executionManager.getExecutionConfig(context.TODO(), &admin.ExecutionCreateRequest{
Project: workflowIdentifier.Project,
Expand All @@ -3329,6 +3331,26 @@ func TestGetExecutionConfig_Spec(t *testing.T) {
})
assert.NoError(t, err)
assert.Equal(t, execConfig.MaxParallelism, int32(50))

resourceManager = managerMocks.MockResourceManager{}
resourceManager.GetResourceFunc = func(ctx context.Context,
request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) {
return nil, nil
}
executionManager = ExecutionManager{
resourceManager: &resourceManager,
config: applicationConfig,
}

execConfig, err = executionManager.getExecutionConfig(context.TODO(), &admin.ExecutionCreateRequest{
Project: workflowIdentifier.Project,
Domain: workflowIdentifier.Domain,
Spec: &admin.ExecutionSpec{},
}, &admin.LaunchPlan{
Spec: &admin.LaunchPlanSpec{},
})
assert.NoError(t, err)
assert.Equal(t, execConfig.MaxParallelism, int32(25))
}

func TestResolvePermissions(t *testing.T) {
Expand Down
16 changes: 8 additions & 8 deletions pkg/rpc/adminservice/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func NewAdminServer(kubeConfig, master string) *AdminService {
configuration := runtime.NewConfigurationProvider()
applicationConfiguration := configuration.ApplicationConfiguration().GetTopLevelConfig()

adminScope := promutils.NewScope(applicationConfiguration.MetricsScope).NewSubScope("admin")
adminScope := promutils.NewScope(applicationConfiguration.GetMetricsScope()).NewSubScope("admin")

defer func() {
if err := recover(); err != nil {
Expand Down Expand Up @@ -92,10 +92,10 @@ func NewAdminServer(kubeConfig, master string) *AdminService {
configuration,
db)
workflowExecutor := workflowengine.NewFlytePropeller(
applicationConfiguration.RoleNameKey,
applicationConfiguration.GetRoleNameKey(),
execCluster,
adminScope.NewSubScope("executor").NewSubScope("flytepropeller"),
configuration.NamespaceMappingConfiguration(), applicationConfiguration.EventVersion)
configuration.NamespaceMappingConfiguration(), applicationConfiguration.GetEventVersion())
logger.Info(context.Background(), "Successfully created a workflow executor engine")
dataStorageClient, err := storage.NewDataStore(storeConfig, adminScope.NewSubScope("storage"))
if err != nil {
Expand Down Expand Up @@ -135,11 +135,11 @@ func NewAdminServer(kubeConfig, master string) *AdminService {
}).GetRemoteURLInterface()

workflowManager := manager.NewWorkflowManager(
db, configuration, workflowengine.NewCompiler(), dataStorageClient, applicationConfiguration.MetadataStoragePrefix,
db, configuration, workflowengine.NewCompiler(), dataStorageClient, applicationConfiguration.GetMetadataStoragePrefix(),
adminScope.NewSubScope("workflow_manager"))
namedEntityManager := manager.NewNamedEntityManager(db, configuration, adminScope.NewSubScope("named_entity_manager"))

executionEventWriter := eventWriter.NewWorkflowExecutionEventWriter(db, applicationConfiguration.AsyncEventsBufferSize)
executionEventWriter := eventWriter.NewWorkflowExecutionEventWriter(db, applicationConfiguration.GetAsyncEventsBufferSize())
go func() {
executionEventWriter.Run()
}()
Expand All @@ -159,13 +159,13 @@ func NewAdminServer(kubeConfig, master string) *AdminService {
// Serve profiling endpoints.
go func() {
err := profutils.StartProfilingServerWithDefaultHandlers(
context.Background(), applicationConfiguration.ProfilerPort, nil)
context.Background(), applicationConfiguration.GetProfilerPort(), nil)
if err != nil {
logger.Panicf(context.Background(), "Failed to Start profiling and Metrics server. Error, %v", err)
}
}()

nodeExecutionEventWriter := eventWriter.NewNodeExecutionEventWriter(db, applicationConfiguration.AsyncEventsBufferSize)
nodeExecutionEventWriter := eventWriter.NewNodeExecutionEventWriter(db, applicationConfiguration.GetAsyncEventsBufferSize())
go func() {
nodeExecutionEventWriter.Run()
}()
Expand All @@ -179,7 +179,7 @@ func NewAdminServer(kubeConfig, master string) *AdminService {
ExecutionManager: executionManager,
NamedEntityManager: namedEntityManager,
VersionManager: versionManager,
NodeExecutionManager: manager.NewNodeExecutionManager(db, configuration, applicationConfiguration.MetadataStoragePrefix, dataStorageClient,
NodeExecutionManager: manager.NewNodeExecutionManager(db, configuration, applicationConfiguration.GetMetadataStoragePrefix(), dataStorageClient,
adminScope.NewSubScope("node_execution_manager"), urlData, eventPublisher, nodeExecutionEventWriter),
TaskExecutionManager: manager.NewTaskExecutionManager(db, configuration, dataStorageClient,
adminScope.NewSubScope("task_execution_manager"), urlData, eventPublisher),
Expand Down
2 changes: 2 additions & 0 deletions pkg/runtime/application_config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ var flyteAdminConfig = config.MustRegisterSection(flyteAdmin, &interfaces.Applic
MetadataStoragePrefix: []string{"metadata", "admin"},
EventVersion: 1,
AsyncEventsBufferSize: 100,
MaxParallelism: 25,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have kept this as default for now .
cc : @katrogan @kumare3

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh thanks! i missed this earlier

})

var schedulerConfig = config.MustRegisterSection(scheduler, &interfaces.SchedulerConfig{
EventSchedulerConfig: interfaces.EventSchedulerConfig{
Scheme: common.Local,
Expand Down
32 changes: 32 additions & 0 deletions pkg/runtime/interfaces/application_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,38 @@ type ApplicationConfig struct {
EventVersion int `json:"eventVersion"`
// Specifies the shared buffer size which is used to queue asynchronous event writes.
AsyncEventsBufferSize int `json:"asyncEventsBufferSize"`
// Controls the maximum number of task nodes that can be run in parallel for the entire workflow.
// This is useful to achieve fairness. Note: MapTasks are regarded as one unit,
// and parallelism/concurrency of MapTasks is independent from this.
MaxParallelism int32 `json:"maxParallelism"`
}

func (a *ApplicationConfig) GetRoleNameKey() string {
return a.RoleNameKey
}

func (a *ApplicationConfig) GetMetricsScope() string {
return a.MetricsScope
}

func (a *ApplicationConfig) GetProfilerPort() int {
return a.ProfilerPort
}

func (a *ApplicationConfig) GetMetadataStoragePrefix() []string {
return a.MetadataStoragePrefix
}

func (a *ApplicationConfig) GetEventVersion() int {
return a.EventVersion
}

func (a *ApplicationConfig) GetAsyncEventsBufferSize() int {
return a.AsyncEventsBufferSize
}

func (a *ApplicationConfig) GetMaxParallelism() int32 {
return a.MaxParallelism
}

// This section holds common config for AWS
Expand Down