Skip to content

Commit

Permalink
Refactor and centralize opening db connections (flyteorg#345)
Browse files Browse the repository at this point in the history
* good lord

Signed-off-by: Katrina Rogan <[email protected]>

* misc fixes

Signed-off-by: Katrina Rogan <[email protected]>

* scheduler fixes

Signed-off-by: Katrina Rogan <[email protected]>

* more scheduler

Signed-off-by: Katrina Rogan <[email protected]>

* more scheduler

Signed-off-by: Katrina Rogan <[email protected]>

* integration test config

Signed-off-by: Katrina Rogan <[email protected]>

* sad debugging

Signed-off-by: Katrina Rogan <[email protected]>

* lint stand down

Signed-off-by: Katrina Rogan <[email protected]>

* augh

Signed-off-by: Katrina Rogan <[email protected]>

* again

Signed-off-by: Katrina Rogan <[email protected]>

* oy

Signed-off-by: Katrina Rogan <[email protected]>

* om g?

Signed-off-by: Katrina Rogan <[email protected]>

* wip

Signed-off-by: Katrina Rogan <[email protected]>

* big wow

Signed-off-by: Katrina Rogan <[email protected]>
  • Loading branch information
katrogan authored Feb 17, 2022
1 parent 7356c17 commit 4af6bd5
Show file tree
Hide file tree
Showing 67 changed files with 534 additions and 697 deletions.
20 changes: 14 additions & 6 deletions cmd/entrypoints/clusterresource.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package entrypoints
import (
"context"

"github.com/flyteorg/flyteadmin/pkg/repositories/errors"

"github.com/flyteorg/flyteadmin/pkg/clusterresource/impl"
"github.com/flyteorg/flyteadmin/pkg/clusterresource/interfaces"
execClusterIfaces "github.com/flyteorg/flyteadmin/pkg/executioncluster/interfaces"
"github.com/flyteorg/flyteadmin/pkg/manager/impl/resources"
"github.com/flyteorg/flyteadmin/pkg/repositories"
repositoryConfig "github.com/flyteorg/flyteadmin/pkg/repositories/config"

"github.com/flyteorg/flytestdlib/promutils"

"github.com/flyteorg/flyteidl/clients/go/admin"
Expand Down Expand Up @@ -54,11 +54,19 @@ func getClusterResourceController(ctx context.Context, scope promutils.Scope, co
}
adminDataProvider = impl.NewAdminServiceDataProvider(clientSet.AdminClient())
} else {
dbConfig := repositoryConfig.NewDbConfig(configuration.ApplicationConfiguration().GetDbConfig())
db := repositories.GetRepository(
repositories.POSTGRES, dbConfig, scope.NewSubScope("database"))
dbConfig := runtime.NewConfigurationProvider().ApplicationConfiguration().GetDbConfig()
logConfig := logger.GetConfig()

db, err := repositories.GetDB(ctx, dbConfig, logConfig)
if err != nil {
logger.Fatal(ctx, err)
}
dbScope := scope.NewSubScope("db")

repo := repositories.NewGormRepo(
db, errors.NewPostgresErrorTransformer(dbScope.NewSubScope("errors")), dbScope)

adminDataProvider = impl.NewDatabaseAdminDataProvider(db, configuration, resources.NewResourceManager(db, configuration.ApplicationConfiguration()))
adminDataProvider = impl.NewDatabaseAdminDataProvider(repo, configuration, resources.NewResourceManager(repo, configuration.ApplicationConfiguration()))
}

return clusterresource.NewClusterResourceController(adminDataProvider, listTargetsProvider, scope)
Expand Down
77 changes: 11 additions & 66 deletions cmd/entrypoints/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,21 @@ package entrypoints
import (
"context"

"github.com/flyteorg/flyteadmin/pkg/repositories"

"github.com/flyteorg/flyteadmin/pkg/repositories/config"
"github.com/flyteorg/flyteadmin/pkg/runtime"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/promutils"

"github.com/go-gormigrate/gormigrate/v2"
"github.com/spf13/cobra"
"gorm.io/driver/postgres"
_ "gorm.io/driver/postgres" // Required to import database driver.
"gorm.io/gorm"
gormLogger "gorm.io/gorm/logger"
)

var parentMigrateCmd = &cobra.Command{
Use: "migrate",
Short: "This command controls migration behavior for the Flyte admin database. Please choose a subcommand.",
}

var migrationsScope = promutils.NewScope("migrations")
var migrateScope = migrationsScope.NewSubScope("migrate")
var rollbackScope = promutils.NewScope("migrations").NewSubScope("rollback")

// This runs all the migrations
var migrateCmd = &cobra.Command{
Use: "run",
Expand All @@ -33,26 +26,9 @@ var migrateCmd = &cobra.Command{
ctx := context.Background()
configuration := runtime.NewConfigurationProvider()
databaseConfig := configuration.ApplicationConfiguration().GetDbConfig()
dbLogLevel := gormLogger.Silent
if databaseConfig.Debug {
dbLogLevel = gormLogger.Info
}
postgresConfigProvider := config.NewPostgresConfigProvider(config.DbConfig{
BaseConfig: config.BaseConfig{
LogLevel: dbLogLevel,
DisableForeignKeyConstraintWhenMigrating: true,
},
Host: databaseConfig.Host,
Port: databaseConfig.Port,
DbName: databaseConfig.DbName,
User: databaseConfig.User,
Password: databaseConfig.Password,
ExtraOptions: databaseConfig.ExtraOptions,
}, migrateScope)
db, err := gorm.Open(postgres.Open(postgresConfigProvider.GetDSN()), &gorm.Config{
Logger: gormLogger.Default.LogMode(postgresConfigProvider.GetDBConfig().LogLevel),
DisableForeignKeyConstraintWhenMigrating: postgresConfigProvider.GetDBConfig().DisableForeignKeyConstraintWhenMigrating,
})
logConfig := logger.GetConfig()

db, err := repositories.GetDB(ctx, databaseConfig, logConfig)
if err != nil {
logger.Fatal(ctx, err)
}
Expand Down Expand Up @@ -87,25 +63,9 @@ var rollbackCmd = &cobra.Command{
ctx := context.Background()
configuration := runtime.NewConfigurationProvider()
databaseConfig := configuration.ApplicationConfiguration().GetDbConfig()
dbLogLevel := gormLogger.Silent
if databaseConfig.Debug {
dbLogLevel = gormLogger.Info
}
postgresConfigProvider := config.NewPostgresConfigProvider(config.DbConfig{
BaseConfig: config.BaseConfig{
LogLevel: dbLogLevel,
},
Host: databaseConfig.Host,
Port: databaseConfig.Port,
DbName: databaseConfig.DbName,
User: databaseConfig.User,
Password: databaseConfig.Password,
ExtraOptions: databaseConfig.ExtraOptions,
}, rollbackScope)

db, err := gorm.Open(postgres.Open(postgresConfigProvider.GetDSN()), &gorm.Config{
Logger: gormLogger.Default.LogMode(postgresConfigProvider.GetDBConfig().LogLevel),
})
logConfig := logger.GetConfig()

db, err := repositories.GetDB(ctx, databaseConfig, logConfig)
if err != nil {
logger.Fatal(ctx, err)
}
Expand Down Expand Up @@ -140,24 +100,9 @@ var seedProjectsCmd = &cobra.Command{
ctx := context.Background()
configuration := runtime.NewConfigurationProvider()
databaseConfig := configuration.ApplicationConfiguration().GetDbConfig()
dbLogLevel := gormLogger.Silent
if databaseConfig.Debug {
dbLogLevel = gormLogger.Info
}
postgresConfigProvider := config.NewPostgresConfigProvider(config.DbConfig{
BaseConfig: config.BaseConfig{
LogLevel: dbLogLevel,
},
Host: databaseConfig.Host,
Port: databaseConfig.Port,
DbName: databaseConfig.DbName,
User: databaseConfig.User,
Password: databaseConfig.Password,
ExtraOptions: databaseConfig.ExtraOptions,
}, migrateScope)
db, err := gorm.Open(postgres.Open(postgresConfigProvider.GetDSN()), &gorm.Config{
Logger: gormLogger.Default.LogMode(postgresConfigProvider.GetDBConfig().LogLevel),
})
logConfig := logger.GetConfig()

db, err := repositories.GetDB(ctx, databaseConfig, logConfig)
if err != nil {
logger.Fatal(ctx, err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/entrypoints/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func newGRPCServer(ctx context.Context, cfg *config.ServerConfig, authCtx interf
serverOpts = append(serverOpts, opts...)
grpcServer := grpc.NewServer(serverOpts...)
grpcPrometheus.Register(grpcServer)
flyteService.RegisterAdminServiceServer(grpcServer, adminservice.NewAdminServer(cfg.KubeConfig, cfg.Master))
flyteService.RegisterAdminServiceServer(grpcServer, adminservice.NewAdminServer(ctx, cfg.KubeConfig, cfg.Master))
if cfg.Security.UseAuth {
flyteService.RegisterAuthMetadataServiceServer(grpcServer, authCtx.AuthMetadataService())
flyteService.RegisterIdentityServiceServer(grpcServer, authCtx.IdentityService())
Expand Down
21 changes: 14 additions & 7 deletions cmd/scheduler/entrypoints/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import (
"fmt"
"runtime/debug"

"github.com/flyteorg/flyteadmin/pkg/repositories"
"github.com/flyteorg/flyteadmin/pkg/repositories/errors"

"github.com/flyteorg/flyteadmin/pkg/common"
repositoryCommonConfig "github.com/flyteorg/flyteadmin/pkg/repositories/config"
"github.com/flyteorg/flyteadmin/pkg/runtime"
"github.com/flyteorg/flyteadmin/scheduler"
schdulerRepoConfig "github.com/flyteorg/flyteadmin/scheduler/repositories"
"github.com/flyteorg/flyteidl/clients/go/admin"
"github.com/flyteorg/flytestdlib/contextutils"
"github.com/flyteorg/flytestdlib/logger"
Expand Down Expand Up @@ -42,10 +43,16 @@ var schedulerRunCmd = &cobra.Command{
}
}()

dbConfigValues := configuration.ApplicationConfiguration().GetDbConfig()
dbConfig := repositoryCommonConfig.NewDbConfig(dbConfigValues)
db := schdulerRepoConfig.GetRepository(
schdulerRepoConfig.POSTGRES, dbConfig, schedulerScope.NewSubScope("database"))
databaseConfig := configuration.ApplicationConfiguration().GetDbConfig()
logConfig := logger.GetConfig()

db, err := repositories.GetDB(ctx, databaseConfig, logConfig)
if err != nil {
logger.Fatal(ctx, err)
}
dbScope := schedulerScope.NewSubScope("database")
repo := repositories.NewGormRepo(
db, errors.NewPostgresErrorTransformer(schedulerScope.NewSubScope("errors")), dbScope)

clientSet, err := admin.ClientSetBuilder().WithConfig(admin.GetConfig(ctx)).Build(ctx)
if err != nil {
Expand All @@ -54,7 +61,7 @@ var schedulerRunCmd = &cobra.Command{
}
adminServiceClient := clientSet.AdminClient()

scheduleExecutor := scheduler.NewScheduledExecutor(db,
scheduleExecutor := scheduler.NewScheduledExecutor(repo,
configuration.ApplicationConfiguration().GetSchedulerConfig().GetWorkflowExecutorConfig(), schedulerScope, adminServiceClient)

logger.Info(ctx, "Successfully initialized a native flyte scheduler")
Expand Down
11 changes: 6 additions & 5 deletions flyteadmin_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,12 @@ flyteadmin:
- "metadata"
- "admin"
database:
port: 5432
username: postgres
host: localhost
dbname: postgres
options: "sslmode=disable"
postgres:
port: 5432
username: postgres
host: localhost
dbname: postgres
options: "sslmode=disable"
scheduler:
eventScheduler:
scheme: local
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package implementations
import (
"context"

repositoryInterfaces "github.com/flyteorg/flyteadmin/pkg/repositories/interfaces"

"github.com/flyteorg/flyteadmin/pkg/async/events/interfaces"
"github.com/flyteorg/flyteadmin/pkg/repositories"
"github.com/flyteorg/flyteadmin/pkg/repositories/transformers"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flytestdlib/logger"
Expand All @@ -13,7 +14,7 @@ import (
// This event writer acts to asynchronously persist node execution events. As flytepropeller sends node
// events, node execution processing doesn't have to wait on these to be committed.
type nodeExecutionEventWriter struct {
db repositories.RepositoryInterface
db repositoryInterfaces.Repository
events chan admin.NodeExecutionEventRequest
}

Expand All @@ -37,7 +38,7 @@ func (w *nodeExecutionEventWriter) Run() {
}
}

func NewNodeExecutionEventWriter(db repositories.RepositoryInterface, bufferSize int) interfaces.NodeExecutionEventWriter {
func NewNodeExecutionEventWriter(db repositoryInterfaces.Repository, bufferSize int) interfaces.NodeExecutionEventWriter {
return &nodeExecutionEventWriter{
db: db,
events: make(chan admin.NodeExecutionEventRequest, bufferSize),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package implementations
import (
"context"

repositoryInterfaces "github.com/flyteorg/flyteadmin/pkg/repositories/interfaces"

"github.com/flyteorg/flyteadmin/pkg/async/events/interfaces"
"github.com/flyteorg/flyteadmin/pkg/repositories"
"github.com/flyteorg/flyteadmin/pkg/repositories/transformers"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flytestdlib/logger"
Expand All @@ -13,7 +14,7 @@ import (
// This event writer acts to asynchronously persist workflow execution events. As flytepropeller sends workflow
// events, workflow execution processing doesn't have to wait on these to be committed.
type workflowExecutionEventWriter struct {
db repositories.RepositoryInterface
db repositoryInterfaces.Repository
events chan admin.WorkflowExecutionEventRequest
}

Expand All @@ -37,7 +38,7 @@ func (w *workflowExecutionEventWriter) Run() {
}
}

func NewWorkflowExecutionEventWriter(db repositories.RepositoryInterface, bufferSize int) interfaces.WorkflowExecutionEventWriter {
func NewWorkflowExecutionEventWriter(db repositoryInterfaces.Repository, bufferSize int) interfaces.WorkflowExecutionEventWriter {
return &workflowExecutionEventWriter{
db: db,
events: make(chan admin.WorkflowExecutionEventRequest, bufferSize),
Expand Down
5 changes: 3 additions & 2 deletions pkg/async/schedule/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"time"

repoInterfaces "github.com/flyteorg/flyteadmin/pkg/repositories/interfaces"

gizmoConfig "github.com/NYTimes/gizmo/pubsub/aws"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
Expand All @@ -13,7 +15,6 @@ import (
"github.com/flyteorg/flyteadmin/pkg/async/schedule/noop"
"github.com/flyteorg/flyteadmin/pkg/common"
managerInterfaces "github.com/flyteorg/flyteadmin/pkg/manager/interfaces"
"github.com/flyteorg/flyteadmin/pkg/repositories"
runtimeInterfaces "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces"
flytescheduler "github.com/flyteorg/flyteadmin/scheduler/dbapi"
"github.com/flyteorg/flytestdlib/logger"
Expand Down Expand Up @@ -57,7 +58,7 @@ func (w *workflowScheduler) GetWorkflowExecutor(
return w.workflowExecutor
}

func NewWorkflowScheduler(db repositories.RepositoryInterface, cfg WorkflowSchedulerConfig) WorkflowScheduler {
func NewWorkflowScheduler(db repoInterfaces.Repository, cfg WorkflowSchedulerConfig) WorkflowScheduler {
var eventScheduler interfaces.EventScheduler
var workflowExecutor interfaces.WorkflowExecutor

Expand Down
9 changes: 4 additions & 5 deletions pkg/clusterresource/impl/db_admin_data_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,15 @@ import (
"github.com/flyteorg/flyteadmin/pkg/clusterresource/interfaces"
"github.com/flyteorg/flyteadmin/pkg/common"
managerInterfaces "github.com/flyteorg/flyteadmin/pkg/manager/interfaces"
"github.com/flyteorg/flyteadmin/pkg/repositories"
repositoriesInterfaces "github.com/flyteorg/flyteadmin/pkg/repositories/interfaces"
repositoryInterfaces "github.com/flyteorg/flyteadmin/pkg/repositories/interfaces"
"github.com/flyteorg/flyteadmin/pkg/repositories/transformers"
runtimeInterfaces "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
)

// Implementation of an interfaces.FlyteAdminDataProvider which fetches data directly from the provided database connection.
type dbAdminProvider struct {
db repositories.RepositoryInterface
db repositoryInterfaces.Repository
config runtimeInterfaces.Configuration
resourceManager managerInterfaces.ResourceInterface
}
Expand Down Expand Up @@ -52,7 +51,7 @@ func (p dbAdminProvider) GetProjects(ctx context.Context) (*admin.Projects, erro
if err != nil {
return nil, err
}
projectModels, err := p.db.ProjectRepo().List(ctx, repositoriesInterfaces.ListResourceInput{
projectModels, err := p.db.ProjectRepo().List(ctx, repositoryInterfaces.ListResourceInput{
SortParameter: descCreatedAtSortDBParam,
InlineFilters: []common.InlineFilter{filter},
})
Expand All @@ -65,7 +64,7 @@ func (p dbAdminProvider) GetProjects(ctx context.Context) (*admin.Projects, erro
}, nil
}

func NewDatabaseAdminDataProvider(db repositories.RepositoryInterface, config runtimeInterfaces.Configuration, resourceManager managerInterfaces.ResourceInterface) interfaces.FlyteAdminDataProvider {
func NewDatabaseAdminDataProvider(db repositoryInterfaces.Repository, config runtimeInterfaces.Configuration, resourceManager managerInterfaces.ResourceInterface) interfaces.FlyteAdminDataProvider {
return &dbAdminProvider{
db: db,
config: config,
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type ServerSecurityOptions struct {
AllowedOrigins []string `json:"allowedOrigins"`
// These are the Access-Control-Request-Headers that the server will respond to.
// By default, the server will allow Accept, Accept-Language, Content-Language, and Content-Type.
// User this setting to add any additional headers which are needed
// DeprecatedUser this setting to add any additional headers which are needed
AllowedHeaders []string `json:"allowedHeaders"`
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/executioncluster/impl/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package impl

import (
executioncluster_interface "github.com/flyteorg/flyteadmin/pkg/executioncluster/interfaces"
"github.com/flyteorg/flyteadmin/pkg/repositories"
repositoryInterfaces "github.com/flyteorg/flyteadmin/pkg/repositories/interfaces"
"github.com/flyteorg/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flytestdlib/promutils"
)

func GetExecutionCluster(scope promutils.Scope, kubeConfig, master string, config interfaces.Configuration, db repositories.RepositoryInterface) executioncluster_interface.ClusterInterface {
func GetExecutionCluster(scope promutils.Scope, kubeConfig, master string, config interfaces.Configuration, db repositoryInterfaces.Repository) executioncluster_interface.ClusterInterface {
initializationErrorCounter := scope.MustNewCounter(
"flyteclient_initialization_error",
"count of errors encountered initializing a flyte client from kube config")
Expand Down
Loading

0 comments on commit 4af6bd5

Please sign in to comment.