Skip to content

Commit

Permalink
Flyte native scheduler (flyteorg#228)
Browse files Browse the repository at this point in the history
* Flyte native scheduler

Signed-off-by: Prafulla Mahindrakar <[email protected]>
Co-authored-by: Yuvraj <[email protected]>
  • Loading branch information
pmahindrakar-oss and yindia authored Sep 3, 2021
1 parent 52838b2 commit 5c86bbe
Show file tree
Hide file tree
Showing 72 changed files with 3,183 additions and 121 deletions.
11 changes: 11 additions & 0 deletions .github/workflows/master.yml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,17 @@ jobs:
registry: ghcr.io
build_extra_args: "--compress=true"

- name: Push Flytescheduler Docker Image to Github Registry
uses: whoan/docker-build-with-cache-action@v5
with:
username: "${{ secrets.FLYTE_BOT_USERNAME }}"
password: "${{ secrets.FLYTE_BOT_PAT }}"
image_name: flytescheduler
image_tag: latest,${{ github.sha }},${{ needs.bump-version.outputs.version }}
push_git_tag: true
dockerfile: Dockerfile.scheduler
registry: ghcr.io
build_extra_args: "--compress=true"
tests-lint:
name: Run tests and lint
runs-on: ubuntu-latest
Expand Down
16 changes: 15 additions & 1 deletion .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,21 @@ jobs:
docker build -t flyteorg/${{ github.event.repository.name }}:latest --cache-from=flyteorg/${{ github.event.repository.name }}:builder .
- name: Tag and cache docker image
run: mkdir -p /tmp/tmp/docker-images && docker save flyteorg/${{ github.event.repository.name }}:builder -o /tmp/tmp/docker-images/snapshot-builder.tar && docker save flyteorg/${{ github.event.repository.name }}:latest -o /tmp/tmp/docker-images/snapshot.tar
run: mkdir -p /tmp/tmp/docker-images && docker save flyteorg/${{ github.event.repository.name }}:builder -o /tmp/tmp/docker-images/snapshot-builder.tar && docker save flyteorg/${{ github.event.repository.name }}:latest -o /tmp/tmp/docker-images/snapshot.tar

- name: Build Flytescheduler Docker Image
uses: whoan/docker-build-with-cache-action@v5
with:
username: "${{ secrets.FLYTE_BOT_USERNAME }}"
password: "${{ secrets.FLYTE_BOT_PAT }}"
image_name: flytescheduler
image_tag: latest,${{ github.sha }},${{ needs.bump-version.outputs.version }}
push_git_tag: true
push_image_and_stages: false
dockerfile: Dockerfile.scheduler
registry: ghcr.io
build_extra_args: "--compress=true"


endtoend:
name: End to End tests
Expand Down
36 changes: 36 additions & 0 deletions Dockerfile.scheduler
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# WARNING: THIS FILE IS MANAGED IN THE 'BOILERPLATE' REPO AND COPIED TO OTHER REPOSITORIES.
# ONLY EDIT THIS FILE FROM WITHIN THE 'LYFT/BOILERPLATE' REPOSITORY:
#
# TO OPT OUT OF UPDATES, SEE https://github.com/lyft/boilerplate/blob/master/Readme.rst

FROM golang:1.16.0-alpine3.13 as builder
RUN apk add git openssh-client make curl

# COPY only the go mod files for efficient caching
COPY go.mod go.sum /go/src/github.com/flyteorg/flyteadmin/
WORKDIR /go/src/github.com/flyteorg/flyteadmin

# Pull dependencies
RUN go mod download

# COPY the rest of the source code
COPY . /go/src/github.com/flyteorg/flyteadmin/

# This 'linux_compile_scheduler' target should compile binaries to the /artifacts directory
# The main entrypoint should be compiled to /artifacts/flytescheduler
RUN make linux_compile_scheduler

# update the PATH to include the /artifacts directory
ENV PATH="/artifacts:${PATH}"

# This will eventually move to centurylink/ca-certs:latest for minimum possible image size
FROM alpine:3.13
LABEL org.opencontainers.image.source https://github.com/flyteorg/flyteadmin

COPY --from=builder /artifacts /bin

# Ensure the latest CA certs are present to authenticate SSL connections.
RUN apk --update add ca-certificates

CMD ["flytescheduler"]

20 changes: 20 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export REPOSITORY=flyteadmin
export FLYTE_SCHEDULER_REPOSITORY=flytescheduler
include boilerplate/flyte/docker_build/Makefile
include boilerplate/flyte/golang_test_targets/Makefile
include boilerplate/flyte/end2end/Makefile
Expand All @@ -15,6 +16,10 @@ update_boilerplate:
@curl https://raw.githubusercontent.com/flyteorg/boilerplate/master/boilerplate/update.sh -o boilerplate/update.sh
@boilerplate/update.sh

.PHONY: docker_build_scheduler
docker_build_scheduler:
docker build -t $$FLYTE_SCHEDULER_REPOSITORY:$(GIT_HASH) -f Dockerfile.scheduler .

.PHONY: integration
integration:
CGO_ENABLED=0 GOFLAGS="-count=1" go test -v -tags=integration ./tests/...
Expand All @@ -31,14 +36,29 @@ k8s_integration_execute:
compile:
go build -o flyteadmin -ldflags=$(LD_FLAGS) ./cmd/ && mv ./flyteadmin ${GOPATH}/bin

.PHONY: compile_scheduler
compile_scheduler:
go build -o flytescheduler -ldflags=$(LD_FLAGS) ./cmd/scheduler/ && mv ./flytescheduler ${GOPATH}/bin


.PHONY: linux_compile
linux_compile:
GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o /artifacts/flyteadmin -ldflags=$(LD_FLAGS) ./cmd/

.PHONY: linux_compile_scheduler
linux_compile_scheduler:
GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o /artifacts/flytescheduler -ldflags=$(LD_FLAGS) ./cmd/scheduler/


.PHONY: server
server:
go run cmd/main.go serve --server.kube-config ~/.kube/config --config flyteadmin_config.yaml

.PHONY: scheduler
scheduler:
go run scheduler/main.go run --server.kube-config ~/.kube/config --config flyteadmin_config.yaml


.PHONY: migrate
migrate:
go run cmd/main.go migrate run --server.kube-config ~/.kube/config --config flyteadmin_config.yaml
Expand Down
74 changes: 74 additions & 0 deletions cmd/scheduler/entrypoints/root.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package entrypoints

import (
"context"
"flag"
"fmt"
"os"

"github.com/flyteorg/flytestdlib/config"
"github.com/flyteorg/flytestdlib/config/viper"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
)

var (
cfgFile string
configAccessor = viper.NewAccessor(config.Options{})
)

// RootCmd represents the base command when called without any subcommands
var RootCmd = &cobra.Command{
Use: "flytescheduler",
Short: "Flyte native scheduler to run cron and fixed rate scheduled workflows",
Long: `
Use the run subcommand which will start the scheduler by connecting to DB containing schedules
flytescheduler run --config flyteadmin_config.yaml --admin.endpoint dns:///localhost:8089 --admin.insecure
`,
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
return initConfig(cmd.Flags())
},
}

// Execute adds all child commands to the root command sets flags appropriately.
// This is called by main.main(). It only needs to happen once to the rootCmd.
func Execute() error {
if err := RootCmd.Execute(); err != nil {
fmt.Println(err)
return err
}
return nil
}

func init() {
// allows `$ flytescheduler --logtostderr` to work
pflag.CommandLine.AddGoFlagSet(flag.CommandLine)

// Add persistent flags - persistent flags persist through all sub-commands
RootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is ./flyteadmin_config.yaml)")

RootCmd.AddCommand(viper.GetConfigCommand())

// Allow viper to read the value of the flags
configAccessor.InitializePflags(RootCmd.PersistentFlags())

err := flag.CommandLine.Parse([]string{})
if err != nil {
fmt.Println(err)
os.Exit(-1)
}
}

func initConfig(flags *pflag.FlagSet) error {
configAccessor = viper.NewAccessor(config.Options{
SearchPaths: []string{cfgFile, ".", "/etc/flyte/config", "$GOPATH/src/github.com/flyteorg/flyteadmin"},
StrictMode: false,
})

fmt.Println("Using config file: ", configAccessor.ConfigFilesUsed())

configAccessor.InitializePflags(flags)

return configAccessor.UpdateConfig(context.TODO())
}
75 changes: 75 additions & 0 deletions cmd/scheduler/entrypoints/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package entrypoints

import (
"context"
"fmt"
"runtime/debug"

"github.com/flyteorg/flyteadmin/pkg/common"
repositoryCommonConfig "github.com/flyteorg/flyteadmin/pkg/repositories/config"
"github.com/flyteorg/flyteadmin/pkg/runtime"
scheduler "github.com/flyteorg/flyteadmin/scheduler"
schdulerRepoConfig "github.com/flyteorg/flyteadmin/scheduler/repositories"
"github.com/flyteorg/flyteidl/clients/go/admin"
"github.com/flyteorg/flytestdlib/contextutils"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/flyteorg/flytestdlib/promutils/labeled"

_ "github.com/jinzhu/gorm/dialects/postgres" // Required to import database driver.
"github.com/spf13/cobra"
)

var schedulerRunCmd = &cobra.Command{
Use: "run",
Short: "This command will start the flyte native scheduler and periodically get new schedules from the db for scheduling",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
configuration := runtime.NewConfigurationProvider()
applicationConfiguration := configuration.ApplicationConfiguration().GetTopLevelConfig()

// Define the schedulerScope for prometheus metrics
schedulerScope := promutils.NewScope(applicationConfiguration.MetricsScope).NewSubScope("flytescheduler")

defer func() {
if err := recover(); err != nil {
schedulerScope.MustNewCounter("initialization_panic",
"panics encountered initializing the flyte native scheduler").Inc()
logger.Fatalf(ctx, fmt.Sprintf("caught panic: %v [%+v]", err, string(debug.Stack())))
}
}()

dbConfigValues := configuration.ApplicationConfiguration().GetDbConfig()
dbConfig := repositoryCommonConfig.NewDbConfig(dbConfigValues)
db := schdulerRepoConfig.GetRepository(
schdulerRepoConfig.POSTGRES, dbConfig, schedulerScope.NewSubScope("database"))

clientSet, err := admin.ClientSetBuilder().WithConfig(admin.GetConfig(ctx)).Build(ctx)
if err != nil {
logger.Fatalf(ctx, "Flyte native scheduler failed to start due to %v", err)
return err
}
adminServiceClient := clientSet.AdminClient()

scheduleExecutor := scheduler.NewScheduledExecutor(db,
configuration.ApplicationConfiguration().GetSchedulerConfig().GetWorkflowExecutorConfig(), schedulerScope, adminServiceClient)

logger.Info(context.Background(), "Successfully initialized a native flyte scheduler")

err = scheduleExecutor.Run(ctx)
if err != nil {
logger.Fatalf(ctx, "Flyte native scheduler failed to start due to %v", err)
return err
}
return nil
},
}

func init() {
RootCmd.AddCommand(schedulerRunCmd)

// Set Keys
labeled.SetMetricKeys(contextutils.AppNameKey, contextutils.ProjectKey, contextutils.DomainKey,
contextutils.ExecIDKey, contextutils.WorkflowIDKey, contextutils.NodeIDKey, contextutils.TaskIDKey,
contextutils.TaskTypeKey, common.RuntimeTypeKey, common.RuntimeVersionKey)
}
14 changes: 14 additions & 0 deletions cmd/scheduler/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package main

import (
entrypoints2 "github.com/flyteorg/flyteadmin/cmd/scheduler/entrypoints"
"github.com/golang/glog"
)

func main() {
glog.V(2).Info("Beginning Flyte Scheduler")
err := entrypoints2.Execute()
if err != nil {
panic(err)
}
}
4 changes: 4 additions & 0 deletions flyteadmin_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ scheduler:
scheduleNamePrefix: "flyte"
workflowExecutor:
scheme: local
local:
adminRateLimit:
tps: 100 # per sec how many requests to send to admin
burst: 10 # burst count of request to admin
region: "my-region"
scheduleQueueName: "won't-work-locally"
accountId: "abc123"
Expand Down
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/gogo/protobuf v1.3.2
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/golang/protobuf v1.4.3
github.com/google/uuid v1.2.0
github.com/googleapis/gax-go/v2 v2.0.5
github.com/googleapis/gnostic v0.5.4 // indirect
github.com/gorilla/handlers v1.5.1
Expand All @@ -48,6 +49,7 @@ require (
github.com/prometheus/common v0.19.0 // indirect
github.com/qor/qor v1.2.0 // indirect
github.com/qor/validations v0.0.0-20171228122639-f364bca61b46
github.com/robfig/cron/v3 v3.0.0
github.com/sendgrid/rest v2.6.4+incompatible // indirect
github.com/sendgrid/sendgrid-go v3.10.0+incompatible
github.com/sirupsen/logrus v1.8.1 // indirect
Expand All @@ -56,6 +58,7 @@ require (
github.com/stretchr/testify v1.7.0
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b // indirect
golang.org/x/oauth2 v0.0.0-20210313182246-cd4f82c27b84
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
golang.org/x/tools v0.1.2 // indirect
google.golang.org/api v0.42.0
google.golang.org/genproto v0.0.0-20210315173758-2651cd453018
Expand All @@ -72,3 +75,5 @@ require (
sigs.k8s.io/controller-runtime v0.8.3
sigs.k8s.io/structured-merge-diff/v4 v4.1.0 // indirect
)

replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.20210825070134-bfc34418fe84
4 changes: 3 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ
github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk=
github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo=
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
Expand Down Expand Up @@ -1159,6 +1158,7 @@ github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG
github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4 v2.4.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 h1:Qj1ukM4GlMWXNdMBuXcXfz/Kw9s1qm0CLY32QxuSImI=
github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4/go.mod h1:N6UoU20jOqggOuDwUaBQpluzLNDqif3kq9z2wpdYEfQ=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1-0.20171018195549-f15c970de5b7/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down Expand Up @@ -1386,6 +1386,8 @@ github.com/uber/jaeger-lib v1.5.0/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/Aaua
github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/unionai/cron/v3 v3.0.2-0.20210825070134-bfc34418fe84 h1:EompdlTtH1GbcgfTNe+sAwHeDdeboYAvywrlVDbnixQ=
github.com/unionai/cron/v3 v3.0.2-0.20210825070134-bfc34418fe84/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/unrolled/secure v0.0.0-20180918153822-f340ee86eb8b/go.mod h1:mnPT77IAdsi/kV7+Es7y+pXALeV3h7G6dQF6mNYjcLA=
github.com/unrolled/secure v0.0.0-20181005190816-ff9db2ff917f/go.mod h1:mnPT77IAdsi/kV7+Es7y+pXALeV3h7G6dQF6mNYjcLA=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
Expand Down
Loading

0 comments on commit 5c86bbe

Please sign in to comment.