forked from flyteorg/flyte
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Flyte native scheduler (flyteorg#228)
* Flyte native scheduler Signed-off-by: Prafulla Mahindrakar <[email protected]> Co-authored-by: Yuvraj <[email protected]>
- Loading branch information
1 parent
dc871d9
commit ef6db4f
Showing
72 changed files
with
3,183 additions
and
121 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
# WARNING: THIS FILE IS MANAGED IN THE 'BOILERPLATE' REPO AND COPIED TO OTHER REPOSITORIES. | ||
# ONLY EDIT THIS FILE FROM WITHIN THE 'LYFT/BOILERPLATE' REPOSITORY: | ||
# | ||
# TO OPT OUT OF UPDATES, SEE https://github.com/lyft/boilerplate/blob/master/Readme.rst | ||
|
||
FROM golang:1.16.0-alpine3.13 as builder | ||
RUN apk add git openssh-client make curl | ||
|
||
# COPY only the go mod files for efficient caching | ||
COPY go.mod go.sum /go/src/github.com/flyteorg/flyteadmin/ | ||
WORKDIR /go/src/github.com/flyteorg/flyteadmin | ||
|
||
# Pull dependencies | ||
RUN go mod download | ||
|
||
# COPY the rest of the source code | ||
COPY . /go/src/github.com/flyteorg/flyteadmin/ | ||
|
||
# This 'linux_compile_scheduler' target should compile binaries to the /artifacts directory | ||
# The main entrypoint should be compiled to /artifacts/flytescheduler | ||
RUN make linux_compile_scheduler | ||
|
||
# update the PATH to include the /artifacts directory | ||
ENV PATH="/artifacts:${PATH}" | ||
|
||
# This will eventually move to centurylink/ca-certs:latest for minimum possible image size | ||
FROM alpine:3.13 | ||
LABEL org.opencontainers.image.source https://github.com/flyteorg/flyteadmin | ||
|
||
COPY --from=builder /artifacts /bin | ||
|
||
# Ensure the latest CA certs are present to authenticate SSL connections. | ||
RUN apk --update add ca-certificates | ||
|
||
CMD ["flytescheduler"] | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
package entrypoints | ||
|
||
import ( | ||
"context" | ||
"flag" | ||
"fmt" | ||
"os" | ||
|
||
"github.com/flyteorg/flytestdlib/config" | ||
"github.com/flyteorg/flytestdlib/config/viper" | ||
"github.com/spf13/cobra" | ||
"github.com/spf13/pflag" | ||
) | ||
|
||
var ( | ||
cfgFile string | ||
configAccessor = viper.NewAccessor(config.Options{}) | ||
) | ||
|
||
// RootCmd represents the base command when called without any subcommands | ||
var RootCmd = &cobra.Command{ | ||
Use: "flytescheduler", | ||
Short: "Flyte native scheduler to run cron and fixed rate scheduled workflows", | ||
Long: ` | ||
Use the run subcommand which will start the scheduler by connecting to DB containing schedules | ||
flytescheduler run --config flyteadmin_config.yaml --admin.endpoint dns:///localhost:8089 --admin.insecure | ||
`, | ||
PersistentPreRunE: func(cmd *cobra.Command, args []string) error { | ||
return initConfig(cmd.Flags()) | ||
}, | ||
} | ||
|
||
// Execute adds all child commands to the root command sets flags appropriately. | ||
// This is called by main.main(). It only needs to happen once to the rootCmd. | ||
func Execute() error { | ||
if err := RootCmd.Execute(); err != nil { | ||
fmt.Println(err) | ||
return err | ||
} | ||
return nil | ||
} | ||
|
||
func init() { | ||
// allows `$ flytescheduler --logtostderr` to work | ||
pflag.CommandLine.AddGoFlagSet(flag.CommandLine) | ||
|
||
// Add persistent flags - persistent flags persist through all sub-commands | ||
RootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is ./flyteadmin_config.yaml)") | ||
|
||
RootCmd.AddCommand(viper.GetConfigCommand()) | ||
|
||
// Allow viper to read the value of the flags | ||
configAccessor.InitializePflags(RootCmd.PersistentFlags()) | ||
|
||
err := flag.CommandLine.Parse([]string{}) | ||
if err != nil { | ||
fmt.Println(err) | ||
os.Exit(-1) | ||
} | ||
} | ||
|
||
func initConfig(flags *pflag.FlagSet) error { | ||
configAccessor = viper.NewAccessor(config.Options{ | ||
SearchPaths: []string{cfgFile, ".", "/etc/flyte/config", "$GOPATH/src/github.com/flyteorg/flyteadmin"}, | ||
StrictMode: false, | ||
}) | ||
|
||
fmt.Println("Using config file: ", configAccessor.ConfigFilesUsed()) | ||
|
||
configAccessor.InitializePflags(flags) | ||
|
||
return configAccessor.UpdateConfig(context.TODO()) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
package entrypoints | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"runtime/debug" | ||
|
||
"github.com/flyteorg/flyteadmin/pkg/common" | ||
repositoryCommonConfig "github.com/flyteorg/flyteadmin/pkg/repositories/config" | ||
"github.com/flyteorg/flyteadmin/pkg/runtime" | ||
scheduler "github.com/flyteorg/flyteadmin/scheduler" | ||
schdulerRepoConfig "github.com/flyteorg/flyteadmin/scheduler/repositories" | ||
"github.com/flyteorg/flyteidl/clients/go/admin" | ||
"github.com/flyteorg/flytestdlib/contextutils" | ||
"github.com/flyteorg/flytestdlib/logger" | ||
"github.com/flyteorg/flytestdlib/promutils" | ||
"github.com/flyteorg/flytestdlib/promutils/labeled" | ||
|
||
_ "github.com/jinzhu/gorm/dialects/postgres" // Required to import database driver. | ||
"github.com/spf13/cobra" | ||
) | ||
|
||
var schedulerRunCmd = &cobra.Command{ | ||
Use: "run", | ||
Short: "This command will start the flyte native scheduler and periodically get new schedules from the db for scheduling", | ||
RunE: func(cmd *cobra.Command, args []string) error { | ||
ctx := context.Background() | ||
configuration := runtime.NewConfigurationProvider() | ||
applicationConfiguration := configuration.ApplicationConfiguration().GetTopLevelConfig() | ||
|
||
// Define the schedulerScope for prometheus metrics | ||
schedulerScope := promutils.NewScope(applicationConfiguration.MetricsScope).NewSubScope("flytescheduler") | ||
|
||
defer func() { | ||
if err := recover(); err != nil { | ||
schedulerScope.MustNewCounter("initialization_panic", | ||
"panics encountered initializing the flyte native scheduler").Inc() | ||
logger.Fatalf(ctx, fmt.Sprintf("caught panic: %v [%+v]", err, string(debug.Stack()))) | ||
} | ||
}() | ||
|
||
dbConfigValues := configuration.ApplicationConfiguration().GetDbConfig() | ||
dbConfig := repositoryCommonConfig.NewDbConfig(dbConfigValues) | ||
db := schdulerRepoConfig.GetRepository( | ||
schdulerRepoConfig.POSTGRES, dbConfig, schedulerScope.NewSubScope("database")) | ||
|
||
clientSet, err := admin.ClientSetBuilder().WithConfig(admin.GetConfig(ctx)).Build(ctx) | ||
if err != nil { | ||
logger.Fatalf(ctx, "Flyte native scheduler failed to start due to %v", err) | ||
return err | ||
} | ||
adminServiceClient := clientSet.AdminClient() | ||
|
||
scheduleExecutor := scheduler.NewScheduledExecutor(db, | ||
configuration.ApplicationConfiguration().GetSchedulerConfig().GetWorkflowExecutorConfig(), schedulerScope, adminServiceClient) | ||
|
||
logger.Info(context.Background(), "Successfully initialized a native flyte scheduler") | ||
|
||
err = scheduleExecutor.Run(ctx) | ||
if err != nil { | ||
logger.Fatalf(ctx, "Flyte native scheduler failed to start due to %v", err) | ||
return err | ||
} | ||
return nil | ||
}, | ||
} | ||
|
||
func init() { | ||
RootCmd.AddCommand(schedulerRunCmd) | ||
|
||
// Set Keys | ||
labeled.SetMetricKeys(contextutils.AppNameKey, contextutils.ProjectKey, contextutils.DomainKey, | ||
contextutils.ExecIDKey, contextutils.WorkflowIDKey, contextutils.NodeIDKey, contextutils.TaskIDKey, | ||
contextutils.TaskTypeKey, common.RuntimeTypeKey, common.RuntimeVersionKey) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
package main | ||
|
||
import ( | ||
entrypoints2 "github.com/flyteorg/flyteadmin/cmd/scheduler/entrypoints" | ||
"github.com/golang/glog" | ||
) | ||
|
||
func main() { | ||
glog.V(2).Info("Beginning Flyte Scheduler") | ||
err := entrypoints2.Execute() | ||
if err != nil { | ||
panic(err) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.