diff --git a/cmd/scheduler/entrypoints/scheduler.go b/cmd/scheduler/entrypoints/scheduler.go index 617eed5eef..e953ced176 100644 --- a/cmd/scheduler/entrypoints/scheduler.go +++ b/cmd/scheduler/entrypoints/scheduler.go @@ -8,11 +8,12 @@ import ( "github.com/flyteorg/flyteadmin/pkg/common" repositoryCommonConfig "github.com/flyteorg/flyteadmin/pkg/repositories/config" "github.com/flyteorg/flyteadmin/pkg/runtime" - scheduler "github.com/flyteorg/flyteadmin/scheduler" + "github.com/flyteorg/flyteadmin/scheduler" schdulerRepoConfig "github.com/flyteorg/flyteadmin/scheduler/repositories" "github.com/flyteorg/flyteidl/clients/go/admin" "github.com/flyteorg/flytestdlib/contextutils" "github.com/flyteorg/flytestdlib/logger" + "github.com/flyteorg/flytestdlib/profutils" "github.com/flyteorg/flytestdlib/promutils" "github.com/flyteorg/flytestdlib/promutils/labeled" @@ -27,6 +28,7 @@ var schedulerRunCmd = &cobra.Command{ ctx := context.Background() configuration := runtime.NewConfigurationProvider() applicationConfiguration := configuration.ApplicationConfiguration().GetTopLevelConfig() + schedulerConfiguration := configuration.ApplicationConfiguration().GetSchedulerConfig() // Define the schedulerScope for prometheus metrics schedulerScope := promutils.NewScope(applicationConfiguration.MetricsScope).NewSubScope("flytescheduler") @@ -54,7 +56,16 @@ var schedulerRunCmd = &cobra.Command{ scheduleExecutor := scheduler.NewScheduledExecutor(db, configuration.ApplicationConfiguration().GetSchedulerConfig().GetWorkflowExecutorConfig(), schedulerScope, adminServiceClient) - logger.Info(context.Background(), "Successfully initialized a native flyte scheduler") + logger.Info(ctx, "Successfully initialized a native flyte scheduler") + + // Serve profiling endpoints. + go func() { + err := profutils.StartProfilingServerWithDefaultHandlers( + ctx, schedulerConfiguration.ProfilerPort.Port, nil) + if err != nil { + logger.Panicf(ctx, "Failed to Start profiling and Metrics server. Error, %v", err) + } + }() err = scheduleExecutor.Run(ctx) if err != nil { diff --git a/pkg/manager/impl/node_execution_manager.go b/pkg/manager/impl/node_execution_manager.go index 8b51ba7943..cafb1384a4 100644 --- a/pkg/manager/impl/node_execution_manager.go +++ b/pkg/manager/impl/node_execution_manager.go @@ -438,7 +438,7 @@ func (m *NodeExecutionManager) GetNodeExecutionData( return nil, err } signedInputsURLBlob := admin.UrlBlob{} - if nodeExecution.InputUri != "" { + if len(nodeExecution.InputUri) != 0 { signedInputsURLBlob, err = m.urlData.Get(ctx, nodeExecution.InputUri) if err != nil { return nil, err diff --git a/pkg/runtime/application_config_provider.go b/pkg/runtime/application_config_provider.go index 84755a0224..1bfad5acaf 100644 --- a/pkg/runtime/application_config_provider.go +++ b/pkg/runtime/application_config_provider.go @@ -39,6 +39,7 @@ var flyteAdminConfig = config.MustRegisterSection(flyteAdmin, &interfaces.Applic AsyncEventsBufferSize: 100, }) var schedulerConfig = config.MustRegisterSection(scheduler, &interfaces.SchedulerConfig{ + ProfilerPort: config.Port{Port: 10253}, EventSchedulerConfig: interfaces.EventSchedulerConfig{ Scheme: common.Local, FlyteSchedulerConfig: &interfaces.FlyteSchedulerConfig{}, diff --git a/pkg/runtime/interfaces/application_configuration.go b/pkg/runtime/interfaces/application_configuration.go index 029fc17116..dc6b11840a 100644 --- a/pkg/runtime/interfaces/application_configuration.go +++ b/pkg/runtime/interfaces/application_configuration.go @@ -1,6 +1,9 @@ package interfaces -import "golang.org/x/time/rate" +import ( + "github.com/flyteorg/flytestdlib/config" + "golang.org/x/time/rate" +) // This configuration section is used to for initiating the database connection with the store that holds registered // entities (e.g. workflows, tasks, launch plans...) @@ -232,6 +235,8 @@ func (f *AdminRateLimit) GetBurst() int { // This configuration is the base configuration for all scheduler-related set-up. type SchedulerConfig struct { + // Determines which port the profiling server used for scheduler monitoring and application debugging uses. + ProfilerPort config.Port `json:"profilerPort"` EventSchedulerConfig EventSchedulerConfig `json:"eventScheduler"` WorkflowExecutorConfig WorkflowExecutorConfig `json:"workflowExecutor"` // Specifies the number of times to attempt recreating a workflow executor client should there be any disruptions.