From 859c4b18220c57f51683c3437888be581a7f81c9 Mon Sep 17 00:00:00 2001 From: Shivaprasad Bhat Date: Fri, 24 Mar 2023 14:08:21 +0530 Subject: [PATCH] feat: add stop flag support (#13) --- Makefile | 2 +- cli/config.go | 16 +- cli/migrate.go | 2 +- cli/serve.go | 62 +--- core/core.go | 9 +- core/core_test.go | 2 +- core/mocks/async_worker.go | 7 +- core/mocks/driver.go | 42 ++- core/mocks/loggable_module.go | 52 ++- core/mocks/module_registry.go | 14 +- core/mocks/module_service.go | 52 ++- core/mocks/module_store.go | 37 +- core/mocks/resource_store.go | 90 ++++- core/module/driver.go | 10 +- core/module/service.go | 2 +- core/read.go | 24 +- core/read_test.go | 10 +- core/resource/resource.go | 4 + core/resource/state.go | 2 + core/sync.go | 101 ++---- core/write.go | 72 ++-- core/write_test.go | 160 ++++----- internal/server/v1/mocks/module_service.go | 47 ++- internal/server/v1/mocks/resource_service.go | 77 +++- internal/store/postgres/postgres.go | 22 +- internal/store/postgres/resource_store.go | 84 +++++ internal/store/postgres/revision_model.go | 4 +- internal/store/postgres/revision_store.go | 118 +++--- internal/store/postgres/schema.sql | 51 +-- modules/firehose/config.go | 3 + modules/firehose/driver.go | 38 +- modules/firehose/driver_output_test.go | 4 +- modules/firehose/driver_plan.go | 54 +-- modules/firehose/driver_plan_test.go | 359 +++++++++---------- modules/firehose/driver_sync.go | 42 ++- modules/firehose/driver_sync_test.go | 32 +- modules/firehose/module.go | 3 +- modules/kubernetes/driver.go | 6 +- pkg/worker/mocks/job_queue.go | 12 +- 39 files changed, 1020 insertions(+), 708 deletions(-) diff --git a/Makefile b/Makefile index e5d3f9f2..a8d73626 100644 --- a/Makefile +++ b/Makefile @@ -17,7 +17,7 @@ tidy: install: ## install required dependencies @echo "> installing dependencies" - go install github.com/vektra/mockery/v2@v2.14.0 + go install github.com/vektra/mockery/v2@latest go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28.1 go get -d google.golang.org/protobuf/proto@v1.28.1 go get -d google.golang.org/grpc@v1.49.0 diff --git a/cli/config.go b/cli/config.go index f44c8802..52b320e9 100644 --- a/cli/config.go +++ b/cli/config.go @@ -20,12 +20,18 @@ const configFlag = "config" // Config contains the application configuration. type Config struct { Log logger.LogConfig `mapstructure:"log"` - Worker workerConf `mapstructure:"worker"` + Syncer syncerConf `mapstructure:"syncer"` Service serveConfig `mapstructure:"service"` PGConnStr string `mapstructure:"pg_conn_str" default:"postgres://postgres@localhost:5432/entropy?sslmode=disable"` Telemetry telemetry.Config `mapstructure:"telemetry"` } +type syncerConf struct { + SyncInterval time.Duration `mapstructure:"sync_interval" default:"1s"` + RefreshInterval time.Duration `mapstructure:"refresh_interval" default:"3s"` + ExtendLockBy time.Duration `mapstructure:"extend_lock_by" default:"5s"` +} + type serveConfig struct { Host string `mapstructure:"host" default:""` Port int `mapstructure:"port" default:"8080"` @@ -33,14 +39,6 @@ type serveConfig struct { HTTPAddr string `mapstructure:"http_addr" default:":8081"` } -type workerConf struct { - QueueName string `mapstructure:"queue_name" default:"entropy_jobs"` - QueueSpec string `mapstructure:"queue_spec" default:"postgres://postgres@localhost:5432/entropy?sslmode=disable"` - - Threads int `mapstructure:"threads" default:"1"` - PollInterval time.Duration `mapstructure:"poll_interval" default:"100ms"` -} - func (serveCfg serveConfig) httpAddr() string { return serveCfg.HTTPAddr } func (serveCfg serveConfig) grpcAddr() string { diff --git a/cli/migrate.go b/cli/migrate.go index ee0393e0..350c60d0 100644 --- a/cli/migrate.go +++ b/cli/migrate.go @@ -36,6 +36,6 @@ func cmdMigrate() *cobra.Command { } func runMigrations(ctx context.Context, zapLog *zap.Logger, cfg Config) error { - store := setupStorage(zapLog, cfg.PGConnStr) + store := setupStorage(zapLog, cfg.PGConnStr, cfg.Syncer) return store.Migrate(ctx) } diff --git a/cli/serve.go b/cli/serve.go index 855430ef..5e057bbd 100644 --- a/cli/serve.go +++ b/cli/serve.go @@ -1,7 +1,6 @@ package cli import ( - "context" "time" "github.com/newrelic/go-agent/v3/newrelic" @@ -17,8 +16,6 @@ import ( "github.com/goto/entropy/modules/kubernetes" "github.com/goto/entropy/pkg/logger" "github.com/goto/entropy/pkg/telemetry" - "github.com/goto/entropy/pkg/worker" - "github.com/goto/entropy/pkg/worker/pgq" ) func cmdServe() *cobra.Command { @@ -52,49 +49,33 @@ func cmdServe() *cobra.Command { newrelic.ConfigLicense(cfg.Telemetry.NewRelicAPIKey), ) + store := setupStorage(zapLog, cfg.PGConnStr, cfg.Syncer) + moduleService := module.NewService(setupRegistry(zapLog), store) + resourceService := core.New(store, moduleService, time.Now, zapLog) + if migrate { if migrateErr := runMigrations(cmd.Context(), zapLog, cfg); migrateErr != nil { return migrateErr } } - asyncWorker := setupWorker(zapLog, cfg.Worker) if spawnWorker { go func() { - if runErr := asyncWorker.Run(cmd.Context()); runErr != nil { - zapLog.Error("worker exited with error", zap.Error(err)) + if runErr := resourceService.RunSyncer(cmd.Context(), cfg.Syncer.SyncInterval); runErr != nil { + zapLog.Error("syncer exited with error", zap.Error(err)) } }() } - return runServer(cmd.Context(), nrApp, zapLog, cfg, asyncWorker) + return entropyserver.Serve(cmd.Context(), + cfg.Service.httpAddr(), cfg.Service.grpcAddr(), + nrApp, zapLog, resourceService, moduleService, + ) }) return cmd } -func runServer(baseCtx context.Context, nrApp *newrelic.Application, zapLog *zap.Logger, cfg Config, asyncWorker *worker.Worker) error { - ctx, cancel := context.WithCancel(baseCtx) - defer cancel() - - store := setupStorage(zapLog, cfg.PGConnStr) - moduleService := module.NewService(setupRegistry(zapLog), store) - resourceService := core.New(store, moduleService, asyncWorker, time.Now, zapLog) - - if err := asyncWorker.Register(core.JobKindSyncResource, resourceService.HandleSyncJob); err != nil { - return err - } - - if err := asyncWorker.Register(core.JobKindScheduledSyncResource, resourceService.HandleSyncJob); err != nil { - return err - } - - return entropyserver.Serve(ctx, - cfg.Service.httpAddr(), cfg.Service.grpcAddr(), - nrApp, zapLog, resourceService, moduleService, - ) -} - func setupRegistry(logger *zap.Logger) module.Registry { supported := []module.Descriptor{ kubernetes.Module, @@ -113,27 +94,8 @@ func setupRegistry(logger *zap.Logger) module.Registry { return registry } -func setupWorker(logger *zap.Logger, conf workerConf) *worker.Worker { - pgQueue, err := pgq.Open(conf.QueueSpec, conf.QueueName) - if err != nil { - logger.Fatal("failed to init postgres job-queue", zap.Error(err)) - } - - opts := []worker.Option{ - worker.WithLogger(logger.Named("worker")), - worker.WithRunConfig(conf.Threads, conf.PollInterval), - } - - asyncWorker, err := worker.New(pgQueue, opts...) - if err != nil { - logger.Fatal("failed to init worker instance", zap.Error(err)) - } - - return asyncWorker -} - -func setupStorage(logger *zap.Logger, pgConStr string) *postgres.Store { - store, err := postgres.Open(pgConStr) +func setupStorage(logger *zap.Logger, pgConStr string, syncCfg syncerConf) *postgres.Store { + store, err := postgres.Open(pgConStr, syncCfg.RefreshInterval, syncCfg.ExtendLockBy) if err != nil { logger.Fatal("failed to connect to Postgres database", zap.Error(err), zap.String("conn_str", pgConStr)) diff --git a/core/core.go b/core/core.go index e5b4c0b9..bd20ecd2 100644 --- a/core/core.go +++ b/core/core.go @@ -25,7 +25,7 @@ type Service struct { } type ModuleService interface { - PlanAction(ctx context.Context, res module.ExpandedResource, act module.ActionRequest) (*module.Plan, error) + PlanAction(ctx context.Context, res module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) SyncState(ctx context.Context, res module.ExpandedResource) (*resource.State, error) StreamLogs(ctx context.Context, res module.ExpandedResource, filter map[string]string) (<-chan module.LogChunk, error) GetOutput(ctx context.Context, res module.ExpandedResource) (json.RawMessage, error) @@ -35,7 +35,7 @@ type AsyncWorker interface { Enqueue(ctx context.Context, jobs ...worker.Job) error } -func New(repo resource.Store, moduleSvc ModuleService, asyncWorker AsyncWorker, clockFn func() time.Time, lg *zap.Logger) *Service { +func New(repo resource.Store, moduleSvc ModuleService, clockFn func() time.Time, lg *zap.Logger) *Service { if clockFn == nil { clockFn = time.Now } @@ -44,19 +44,18 @@ func New(repo resource.Store, moduleSvc ModuleService, asyncWorker AsyncWorker, logger: lg, clock: clockFn, store: repo, - worker: asyncWorker, moduleSvc: moduleSvc, } } -func (s *Service) generateModuleSpec(ctx context.Context, res resource.Resource) (*module.ExpandedResource, error) { +func (svc *Service) generateModuleSpec(ctx context.Context, res resource.Resource) (*module.ExpandedResource, error) { modSpec := module.ExpandedResource{ Resource: res, Dependencies: map[string]module.ResolvedDependency{}, } for key, resURN := range res.Spec.Dependencies { - d, err := s.GetResource(ctx, resURN) + d, err := svc.GetResource(ctx, resURN) if err != nil { if errors.Is(err, errors.ErrNotFound) { return nil, errors.ErrInvalid. diff --git a/core/core_test.go b/core/core_test.go index b891a661..54628532 100644 --- a/core/core_test.go +++ b/core/core_test.go @@ -25,6 +25,6 @@ var ( func TestNew(t *testing.T) { t.Parallel() - s := core.New(&mocks.ResourceStore{}, &mocks.ModuleService{}, &mocks.AsyncWorker{}, deadClock, nil) + s := core.New(&mocks.ResourceStore{}, &mocks.ModuleService{}, deadClock, nil) assert.NotNil(t, s) } diff --git a/core/mocks/async_worker.go b/core/mocks/async_worker.go index 2223efda..ef62e849 100644 --- a/core/mocks/async_worker.go +++ b/core/mocks/async_worker.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. +// Code generated by mockery v2.23.1. DO NOT EDIT. package mocks @@ -75,6 +75,11 @@ func (_c *AsyncWorker_Enqueue_Call) Return(_a0 error) *AsyncWorker_Enqueue_Call return _c } +func (_c *AsyncWorker_Enqueue_Call) RunAndReturn(run func(context.Context, ...worker.Job) error) *AsyncWorker_Enqueue_Call { + _c.Call.Return(run) + return _c +} + type mockConstructorTestingTNewAsyncWorker interface { mock.TestingT Cleanup(func()) diff --git a/core/mocks/driver.go b/core/mocks/driver.go index bd8e990d..eda11a2c 100644 --- a/core/mocks/driver.go +++ b/core/mocks/driver.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. +// Code generated by mockery v2.23.1. DO NOT EDIT. package mocks @@ -31,6 +31,10 @@ func (_m *ModuleDriver) Output(ctx context.Context, res module.ExpandedResource) ret := _m.Called(ctx, res) var r0 json.RawMessage + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, module.ExpandedResource) (json.RawMessage, error)); ok { + return rf(ctx, res) + } if rf, ok := ret.Get(0).(func(context.Context, module.ExpandedResource) json.RawMessage); ok { r0 = rf(ctx, res) } else { @@ -39,7 +43,6 @@ func (_m *ModuleDriver) Output(ctx context.Context, res module.ExpandedResource) } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, module.ExpandedResource) error); ok { r1 = rf(ctx, res) } else { @@ -73,20 +76,28 @@ func (_c *ModuleDriver_Output_Call) Return(_a0 json.RawMessage, _a1 error) *Modu return _c } +func (_c *ModuleDriver_Output_Call) RunAndReturn(run func(context.Context, module.ExpandedResource) (json.RawMessage, error)) *ModuleDriver_Output_Call { + _c.Call.Return(run) + return _c +} + // Plan provides a mock function with given fields: ctx, res, act -func (_m *ModuleDriver) Plan(ctx context.Context, res module.ExpandedResource, act module.ActionRequest) (*module.Plan, error) { +func (_m *ModuleDriver) Plan(ctx context.Context, res module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) { ret := _m.Called(ctx, res, act) - var r0 *module.Plan - if rf, ok := ret.Get(0).(func(context.Context, module.ExpandedResource, module.ActionRequest) *module.Plan); ok { + var r0 *resource.Resource + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, module.ExpandedResource, module.ActionRequest) (*resource.Resource, error)); ok { + return rf(ctx, res, act) + } + if rf, ok := ret.Get(0).(func(context.Context, module.ExpandedResource, module.ActionRequest) *resource.Resource); ok { r0 = rf(ctx, res, act) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(*module.Plan) + r0 = ret.Get(0).(*resource.Resource) } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, module.ExpandedResource, module.ActionRequest) error); ok { r1 = rf(ctx, res, act) } else { @@ -116,16 +127,25 @@ func (_c *ModuleDriver_Plan_Call) Run(run func(ctx context.Context, res module.E return _c } -func (_c *ModuleDriver_Plan_Call) Return(_a0 *module.Plan, _a1 error) *ModuleDriver_Plan_Call { +func (_c *ModuleDriver_Plan_Call) Return(_a0 *resource.Resource, _a1 error) *ModuleDriver_Plan_Call { _c.Call.Return(_a0, _a1) return _c } +func (_c *ModuleDriver_Plan_Call) RunAndReturn(run func(context.Context, module.ExpandedResource, module.ActionRequest) (*resource.Resource, error)) *ModuleDriver_Plan_Call { + _c.Call.Return(run) + return _c +} + // Sync provides a mock function with given fields: ctx, res func (_m *ModuleDriver) Sync(ctx context.Context, res module.ExpandedResource) (*resource.State, error) { ret := _m.Called(ctx, res) var r0 *resource.State + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, module.ExpandedResource) (*resource.State, error)); ok { + return rf(ctx, res) + } if rf, ok := ret.Get(0).(func(context.Context, module.ExpandedResource) *resource.State); ok { r0 = rf(ctx, res) } else { @@ -134,7 +154,6 @@ func (_m *ModuleDriver) Sync(ctx context.Context, res module.ExpandedResource) ( } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, module.ExpandedResource) error); ok { r1 = rf(ctx, res) } else { @@ -168,6 +187,11 @@ func (_c *ModuleDriver_Sync_Call) Return(_a0 *resource.State, _a1 error) *Module return _c } +func (_c *ModuleDriver_Sync_Call) RunAndReturn(run func(context.Context, module.ExpandedResource) (*resource.State, error)) *ModuleDriver_Sync_Call { + _c.Call.Return(run) + return _c +} + type mockConstructorTestingTNewModuleDriver interface { mock.TestingT Cleanup(func()) diff --git a/core/mocks/loggable_module.go b/core/mocks/loggable_module.go index 888df706..ecd2f59f 100644 --- a/core/mocks/loggable_module.go +++ b/core/mocks/loggable_module.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. +// Code generated by mockery v2.23.1. DO NOT EDIT. package mocks @@ -31,6 +31,10 @@ func (_m *LoggableModule) Log(ctx context.Context, res module.ExpandedResource, ret := _m.Called(ctx, res, filter) var r0 <-chan module.LogChunk + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, module.ExpandedResource, map[string]string) (<-chan module.LogChunk, error)); ok { + return rf(ctx, res, filter) + } if rf, ok := ret.Get(0).(func(context.Context, module.ExpandedResource, map[string]string) <-chan module.LogChunk); ok { r0 = rf(ctx, res, filter) } else { @@ -39,7 +43,6 @@ func (_m *LoggableModule) Log(ctx context.Context, res module.ExpandedResource, } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, module.ExpandedResource, map[string]string) error); ok { r1 = rf(ctx, res, filter) } else { @@ -74,11 +77,20 @@ func (_c *LoggableModule_Log_Call) Return(_a0 <-chan module.LogChunk, _a1 error) return _c } +func (_c *LoggableModule_Log_Call) RunAndReturn(run func(context.Context, module.ExpandedResource, map[string]string) (<-chan module.LogChunk, error)) *LoggableModule_Log_Call { + _c.Call.Return(run) + return _c +} + // Output provides a mock function with given fields: ctx, res func (_m *LoggableModule) Output(ctx context.Context, res module.ExpandedResource) (json.RawMessage, error) { ret := _m.Called(ctx, res) var r0 json.RawMessage + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, module.ExpandedResource) (json.RawMessage, error)); ok { + return rf(ctx, res) + } if rf, ok := ret.Get(0).(func(context.Context, module.ExpandedResource) json.RawMessage); ok { r0 = rf(ctx, res) } else { @@ -87,7 +99,6 @@ func (_m *LoggableModule) Output(ctx context.Context, res module.ExpandedResourc } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, module.ExpandedResource) error); ok { r1 = rf(ctx, res) } else { @@ -121,20 +132,28 @@ func (_c *LoggableModule_Output_Call) Return(_a0 json.RawMessage, _a1 error) *Lo return _c } +func (_c *LoggableModule_Output_Call) RunAndReturn(run func(context.Context, module.ExpandedResource) (json.RawMessage, error)) *LoggableModule_Output_Call { + _c.Call.Return(run) + return _c +} + // Plan provides a mock function with given fields: ctx, res, act -func (_m *LoggableModule) Plan(ctx context.Context, res module.ExpandedResource, act module.ActionRequest) (*module.Plan, error) { +func (_m *LoggableModule) Plan(ctx context.Context, res module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) { ret := _m.Called(ctx, res, act) - var r0 *module.Plan - if rf, ok := ret.Get(0).(func(context.Context, module.ExpandedResource, module.ActionRequest) *module.Plan); ok { + var r0 *resource.Resource + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, module.ExpandedResource, module.ActionRequest) (*resource.Resource, error)); ok { + return rf(ctx, res, act) + } + if rf, ok := ret.Get(0).(func(context.Context, module.ExpandedResource, module.ActionRequest) *resource.Resource); ok { r0 = rf(ctx, res, act) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(*module.Plan) + r0 = ret.Get(0).(*resource.Resource) } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, module.ExpandedResource, module.ActionRequest) error); ok { r1 = rf(ctx, res, act) } else { @@ -164,16 +183,25 @@ func (_c *LoggableModule_Plan_Call) Run(run func(ctx context.Context, res module return _c } -func (_c *LoggableModule_Plan_Call) Return(_a0 *module.Plan, _a1 error) *LoggableModule_Plan_Call { +func (_c *LoggableModule_Plan_Call) Return(_a0 *resource.Resource, _a1 error) *LoggableModule_Plan_Call { _c.Call.Return(_a0, _a1) return _c } +func (_c *LoggableModule_Plan_Call) RunAndReturn(run func(context.Context, module.ExpandedResource, module.ActionRequest) (*resource.Resource, error)) *LoggableModule_Plan_Call { + _c.Call.Return(run) + return _c +} + // Sync provides a mock function with given fields: ctx, res func (_m *LoggableModule) Sync(ctx context.Context, res module.ExpandedResource) (*resource.State, error) { ret := _m.Called(ctx, res) var r0 *resource.State + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, module.ExpandedResource) (*resource.State, error)); ok { + return rf(ctx, res) + } if rf, ok := ret.Get(0).(func(context.Context, module.ExpandedResource) *resource.State); ok { r0 = rf(ctx, res) } else { @@ -182,7 +210,6 @@ func (_m *LoggableModule) Sync(ctx context.Context, res module.ExpandedResource) } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, module.ExpandedResource) error); ok { r1 = rf(ctx, res) } else { @@ -216,6 +243,11 @@ func (_c *LoggableModule_Sync_Call) Return(_a0 *resource.State, _a1 error) *Logg return _c } +func (_c *LoggableModule_Sync_Call) RunAndReturn(run func(context.Context, module.ExpandedResource) (*resource.State, error)) *LoggableModule_Sync_Call { + _c.Call.Return(run) + return _c +} + type mockConstructorTestingTNewLoggableModule interface { mock.TestingT Cleanup(func()) diff --git a/core/mocks/module_registry.go b/core/mocks/module_registry.go index 5dd6d1fc..1b523afe 100644 --- a/core/mocks/module_registry.go +++ b/core/mocks/module_registry.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. +// Code generated by mockery v2.23.1. DO NOT EDIT. package mocks @@ -27,6 +27,11 @@ func (_m *ModuleRegistry) GetDriver(ctx context.Context, mod module.Module) (mod ret := _m.Called(ctx, mod) var r0 module.Driver + var r1 module.Descriptor + var r2 error + if rf, ok := ret.Get(0).(func(context.Context, module.Module) (module.Driver, module.Descriptor, error)); ok { + return rf(ctx, mod) + } if rf, ok := ret.Get(0).(func(context.Context, module.Module) module.Driver); ok { r0 = rf(ctx, mod) } else { @@ -35,14 +40,12 @@ func (_m *ModuleRegistry) GetDriver(ctx context.Context, mod module.Module) (mod } } - var r1 module.Descriptor if rf, ok := ret.Get(1).(func(context.Context, module.Module) module.Descriptor); ok { r1 = rf(ctx, mod) } else { r1 = ret.Get(1).(module.Descriptor) } - var r2 error if rf, ok := ret.Get(2).(func(context.Context, module.Module) error); ok { r2 = rf(ctx, mod) } else { @@ -76,6 +79,11 @@ func (_c *ModuleRegistry_GetDriver_Call) Return(_a0 module.Driver, _a1 module.De return _c } +func (_c *ModuleRegistry_GetDriver_Call) RunAndReturn(run func(context.Context, module.Module) (module.Driver, module.Descriptor, error)) *ModuleRegistry_GetDriver_Call { + _c.Call.Return(run) + return _c +} + type mockConstructorTestingTNewModuleRegistry interface { mock.TestingT Cleanup(func()) diff --git a/core/mocks/module_service.go b/core/mocks/module_service.go index 03a6a29c..38f48081 100644 --- a/core/mocks/module_service.go +++ b/core/mocks/module_service.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. +// Code generated by mockery v2.23.1. DO NOT EDIT. package mocks @@ -32,6 +32,10 @@ func (_m *ModuleService) GetOutput(ctx context.Context, res module.ExpandedResou ret := _m.Called(ctx, res) var r0 json.RawMessage + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, module.ExpandedResource) (json.RawMessage, error)); ok { + return rf(ctx, res) + } if rf, ok := ret.Get(0).(func(context.Context, module.ExpandedResource) json.RawMessage); ok { r0 = rf(ctx, res) } else { @@ -40,7 +44,6 @@ func (_m *ModuleService) GetOutput(ctx context.Context, res module.ExpandedResou } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, module.ExpandedResource) error); ok { r1 = rf(ctx, res) } else { @@ -74,20 +77,28 @@ func (_c *ModuleService_GetOutput_Call) Return(_a0 json.RawMessage, _a1 error) * return _c } +func (_c *ModuleService_GetOutput_Call) RunAndReturn(run func(context.Context, module.ExpandedResource) (json.RawMessage, error)) *ModuleService_GetOutput_Call { + _c.Call.Return(run) + return _c +} + // PlanAction provides a mock function with given fields: ctx, res, act -func (_m *ModuleService) PlanAction(ctx context.Context, res module.ExpandedResource, act module.ActionRequest) (*module.Plan, error) { +func (_m *ModuleService) PlanAction(ctx context.Context, res module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) { ret := _m.Called(ctx, res, act) - var r0 *module.Plan - if rf, ok := ret.Get(0).(func(context.Context, module.ExpandedResource, module.ActionRequest) *module.Plan); ok { + var r0 *resource.Resource + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, module.ExpandedResource, module.ActionRequest) (*resource.Resource, error)); ok { + return rf(ctx, res, act) + } + if rf, ok := ret.Get(0).(func(context.Context, module.ExpandedResource, module.ActionRequest) *resource.Resource); ok { r0 = rf(ctx, res, act) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(*module.Plan) + r0 = ret.Get(0).(*resource.Resource) } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, module.ExpandedResource, module.ActionRequest) error); ok { r1 = rf(ctx, res, act) } else { @@ -117,16 +128,25 @@ func (_c *ModuleService_PlanAction_Call) Run(run func(ctx context.Context, res m return _c } -func (_c *ModuleService_PlanAction_Call) Return(_a0 *module.Plan, _a1 error) *ModuleService_PlanAction_Call { +func (_c *ModuleService_PlanAction_Call) Return(_a0 *resource.Resource, _a1 error) *ModuleService_PlanAction_Call { _c.Call.Return(_a0, _a1) return _c } +func (_c *ModuleService_PlanAction_Call) RunAndReturn(run func(context.Context, module.ExpandedResource, module.ActionRequest) (*resource.Resource, error)) *ModuleService_PlanAction_Call { + _c.Call.Return(run) + return _c +} + // StreamLogs provides a mock function with given fields: ctx, res, filter func (_m *ModuleService) StreamLogs(ctx context.Context, res module.ExpandedResource, filter map[string]string) (<-chan module.LogChunk, error) { ret := _m.Called(ctx, res, filter) var r0 <-chan module.LogChunk + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, module.ExpandedResource, map[string]string) (<-chan module.LogChunk, error)); ok { + return rf(ctx, res, filter) + } if rf, ok := ret.Get(0).(func(context.Context, module.ExpandedResource, map[string]string) <-chan module.LogChunk); ok { r0 = rf(ctx, res, filter) } else { @@ -135,7 +155,6 @@ func (_m *ModuleService) StreamLogs(ctx context.Context, res module.ExpandedReso } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, module.ExpandedResource, map[string]string) error); ok { r1 = rf(ctx, res, filter) } else { @@ -170,11 +189,20 @@ func (_c *ModuleService_StreamLogs_Call) Return(_a0 <-chan module.LogChunk, _a1 return _c } +func (_c *ModuleService_StreamLogs_Call) RunAndReturn(run func(context.Context, module.ExpandedResource, map[string]string) (<-chan module.LogChunk, error)) *ModuleService_StreamLogs_Call { + _c.Call.Return(run) + return _c +} + // SyncState provides a mock function with given fields: ctx, res func (_m *ModuleService) SyncState(ctx context.Context, res module.ExpandedResource) (*resource.State, error) { ret := _m.Called(ctx, res) var r0 *resource.State + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, module.ExpandedResource) (*resource.State, error)); ok { + return rf(ctx, res) + } if rf, ok := ret.Get(0).(func(context.Context, module.ExpandedResource) *resource.State); ok { r0 = rf(ctx, res) } else { @@ -183,7 +211,6 @@ func (_m *ModuleService) SyncState(ctx context.Context, res module.ExpandedResou } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, module.ExpandedResource) error); ok { r1 = rf(ctx, res) } else { @@ -217,6 +244,11 @@ func (_c *ModuleService_SyncState_Call) Return(_a0 *resource.State, _a1 error) * return _c } +func (_c *ModuleService_SyncState_Call) RunAndReturn(run func(context.Context, module.ExpandedResource) (*resource.State, error)) *ModuleService_SyncState_Call { + _c.Call.Return(run) + return _c +} + type mockConstructorTestingTNewModuleService interface { mock.TestingT Cleanup(func()) diff --git a/core/mocks/module_store.go b/core/mocks/module_store.go index 95fe96c9..ea3f05c6 100644 --- a/core/mocks/module_store.go +++ b/core/mocks/module_store.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. +// Code generated by mockery v2.23.1. DO NOT EDIT. package mocks @@ -60,6 +60,11 @@ func (_c *ModuleStore_CreateModule_Call) Return(_a0 error) *ModuleStore_CreateMo return _c } +func (_c *ModuleStore_CreateModule_Call) RunAndReturn(run func(context.Context, module.Module) error) *ModuleStore_CreateModule_Call { + _c.Call.Return(run) + return _c +} + // DeleteModule provides a mock function with given fields: ctx, urn func (_m *ModuleStore) DeleteModule(ctx context.Context, urn string) error { ret := _m.Called(ctx, urn) @@ -98,11 +103,20 @@ func (_c *ModuleStore_DeleteModule_Call) Return(_a0 error) *ModuleStore_DeleteMo return _c } +func (_c *ModuleStore_DeleteModule_Call) RunAndReturn(run func(context.Context, string) error) *ModuleStore_DeleteModule_Call { + _c.Call.Return(run) + return _c +} + // GetModule provides a mock function with given fields: ctx, urn func (_m *ModuleStore) GetModule(ctx context.Context, urn string) (*module.Module, error) { ret := _m.Called(ctx, urn) var r0 *module.Module + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (*module.Module, error)); ok { + return rf(ctx, urn) + } if rf, ok := ret.Get(0).(func(context.Context, string) *module.Module); ok { r0 = rf(ctx, urn) } else { @@ -111,7 +125,6 @@ func (_m *ModuleStore) GetModule(ctx context.Context, urn string) (*module.Modul } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { r1 = rf(ctx, urn) } else { @@ -145,11 +158,20 @@ func (_c *ModuleStore_GetModule_Call) Return(_a0 *module.Module, _a1 error) *Mod return _c } +func (_c *ModuleStore_GetModule_Call) RunAndReturn(run func(context.Context, string) (*module.Module, error)) *ModuleStore_GetModule_Call { + _c.Call.Return(run) + return _c +} + // ListModules provides a mock function with given fields: ctx, project func (_m *ModuleStore) ListModules(ctx context.Context, project string) ([]module.Module, error) { ret := _m.Called(ctx, project) var r0 []module.Module + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) ([]module.Module, error)); ok { + return rf(ctx, project) + } if rf, ok := ret.Get(0).(func(context.Context, string) []module.Module); ok { r0 = rf(ctx, project) } else { @@ -158,7 +180,6 @@ func (_m *ModuleStore) ListModules(ctx context.Context, project string) ([]modul } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { r1 = rf(ctx, project) } else { @@ -192,6 +213,11 @@ func (_c *ModuleStore_ListModules_Call) Return(_a0 []module.Module, _a1 error) * return _c } +func (_c *ModuleStore_ListModules_Call) RunAndReturn(run func(context.Context, string) ([]module.Module, error)) *ModuleStore_ListModules_Call { + _c.Call.Return(run) + return _c +} + // UpdateModule provides a mock function with given fields: ctx, m func (_m *ModuleStore) UpdateModule(ctx context.Context, m module.Module) error { ret := _m.Called(ctx, m) @@ -230,6 +256,11 @@ func (_c *ModuleStore_UpdateModule_Call) Return(_a0 error) *ModuleStore_UpdateMo return _c } +func (_c *ModuleStore_UpdateModule_Call) RunAndReturn(run func(context.Context, module.Module) error) *ModuleStore_UpdateModule_Call { + _c.Call.Return(run) + return _c +} + type mockConstructorTestingTNewModuleStore interface { mock.TestingT Cleanup(func()) diff --git a/core/mocks/resource_store.go b/core/mocks/resource_store.go index 79ac941e..f55aefd6 100644 --- a/core/mocks/resource_store.go +++ b/core/mocks/resource_store.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. +// Code generated by mockery v2.23.1. DO NOT EDIT. package mocks @@ -75,6 +75,11 @@ func (_c *ResourceStore_Create_Call) Return(_a0 error) *ResourceStore_Create_Cal return _c } +func (_c *ResourceStore_Create_Call) RunAndReturn(run func(context.Context, resource.Resource, ...resource.MutationHook) error) *ResourceStore_Create_Call { + _c.Call.Return(run) + return _c +} + // Delete provides a mock function with given fields: ctx, urn, hooks func (_m *ResourceStore) Delete(ctx context.Context, urn string, hooks ...resource.MutationHook) error { _va := make([]interface{}, len(hooks)) @@ -128,11 +133,20 @@ func (_c *ResourceStore_Delete_Call) Return(_a0 error) *ResourceStore_Delete_Cal return _c } +func (_c *ResourceStore_Delete_Call) RunAndReturn(run func(context.Context, string, ...resource.MutationHook) error) *ResourceStore_Delete_Call { + _c.Call.Return(run) + return _c +} + // GetByURN provides a mock function with given fields: ctx, urn func (_m *ResourceStore) GetByURN(ctx context.Context, urn string) (*resource.Resource, error) { ret := _m.Called(ctx, urn) var r0 *resource.Resource + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (*resource.Resource, error)); ok { + return rf(ctx, urn) + } if rf, ok := ret.Get(0).(func(context.Context, string) *resource.Resource); ok { r0 = rf(ctx, urn) } else { @@ -141,7 +155,6 @@ func (_m *ResourceStore) GetByURN(ctx context.Context, urn string) (*resource.Re } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { r1 = rf(ctx, urn) } else { @@ -175,11 +188,20 @@ func (_c *ResourceStore_GetByURN_Call) Return(_a0 *resource.Resource, _a1 error) return _c } +func (_c *ResourceStore_GetByURN_Call) RunAndReturn(run func(context.Context, string) (*resource.Resource, error)) *ResourceStore_GetByURN_Call { + _c.Call.Return(run) + return _c +} + // List provides a mock function with given fields: ctx, filter func (_m *ResourceStore) List(ctx context.Context, filter resource.Filter) ([]resource.Resource, error) { ret := _m.Called(ctx, filter) var r0 []resource.Resource + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, resource.Filter) ([]resource.Resource, error)); ok { + return rf(ctx, filter) + } if rf, ok := ret.Get(0).(func(context.Context, resource.Filter) []resource.Resource); ok { r0 = rf(ctx, filter) } else { @@ -188,7 +210,6 @@ func (_m *ResourceStore) List(ctx context.Context, filter resource.Filter) ([]re } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, resource.Filter) error); ok { r1 = rf(ctx, filter) } else { @@ -222,11 +243,20 @@ func (_c *ResourceStore_List_Call) Return(_a0 []resource.Resource, _a1 error) *R return _c } +func (_c *ResourceStore_List_Call) RunAndReturn(run func(context.Context, resource.Filter) ([]resource.Resource, error)) *ResourceStore_List_Call { + _c.Call.Return(run) + return _c +} + // Revisions provides a mock function with given fields: ctx, selector func (_m *ResourceStore) Revisions(ctx context.Context, selector resource.RevisionsSelector) ([]resource.Revision, error) { ret := _m.Called(ctx, selector) var r0 []resource.Revision + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, resource.RevisionsSelector) ([]resource.Revision, error)); ok { + return rf(ctx, selector) + } if rf, ok := ret.Get(0).(func(context.Context, resource.RevisionsSelector) []resource.Revision); ok { r0 = rf(ctx, selector) } else { @@ -235,7 +265,6 @@ func (_m *ResourceStore) Revisions(ctx context.Context, selector resource.Revisi } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, resource.RevisionsSelector) error); ok { r1 = rf(ctx, selector) } else { @@ -269,6 +298,54 @@ func (_c *ResourceStore_Revisions_Call) Return(_a0 []resource.Revision, _a1 erro return _c } +func (_c *ResourceStore_Revisions_Call) RunAndReturn(run func(context.Context, resource.RevisionsSelector) ([]resource.Revision, error)) *ResourceStore_Revisions_Call { + _c.Call.Return(run) + return _c +} + +// SyncOne provides a mock function with given fields: ctx, syncFn +func (_m *ResourceStore) SyncOne(ctx context.Context, syncFn resource.SyncFn) error { + ret := _m.Called(ctx, syncFn) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, resource.SyncFn) error); ok { + r0 = rf(ctx, syncFn) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// ResourceStore_SyncOne_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SyncOne' +type ResourceStore_SyncOne_Call struct { + *mock.Call +} + +// SyncOne is a helper method to define mock.On call +// - ctx context.Context +// - syncFn resource.SyncFn +func (_e *ResourceStore_Expecter) SyncOne(ctx interface{}, syncFn interface{}) *ResourceStore_SyncOne_Call { + return &ResourceStore_SyncOne_Call{Call: _e.mock.On("SyncOne", ctx, syncFn)} +} + +func (_c *ResourceStore_SyncOne_Call) Run(run func(ctx context.Context, syncFn resource.SyncFn)) *ResourceStore_SyncOne_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(resource.SyncFn)) + }) + return _c +} + +func (_c *ResourceStore_SyncOne_Call) Return(_a0 error) *ResourceStore_SyncOne_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *ResourceStore_SyncOne_Call) RunAndReturn(run func(context.Context, resource.SyncFn) error) *ResourceStore_SyncOne_Call { + _c.Call.Return(run) + return _c +} + // Update provides a mock function with given fields: ctx, r, saveRevision, reason, hooks func (_m *ResourceStore) Update(ctx context.Context, r resource.Resource, saveRevision bool, reason string, hooks ...resource.MutationHook) error { _va := make([]interface{}, len(hooks)) @@ -324,6 +401,11 @@ func (_c *ResourceStore_Update_Call) Return(_a0 error) *ResourceStore_Update_Cal return _c } +func (_c *ResourceStore_Update_Call) RunAndReturn(run func(context.Context, resource.Resource, bool, string, ...resource.MutationHook) error) *ResourceStore_Update_Call { + _c.Call.Return(run) + return _c +} + type mockConstructorTestingTNewResourceStore interface { mock.TestingT Cleanup(func()) diff --git a/core/module/driver.go b/core/module/driver.go index 61f1b7f6..cdcd86da 100644 --- a/core/module/driver.go +++ b/core/module/driver.go @@ -6,7 +6,6 @@ package module import ( "context" "encoding/json" - "time" "github.com/goto/entropy/core/resource" ) @@ -17,7 +16,7 @@ type Driver interface { // Plan SHOULD validate the action on the current version of the resource, // return the resource with config/status/state changes (if any) applied. // Plan SHOULD NOT have side effects on anything other than the resource. - Plan(ctx context.Context, res ExpandedResource, act ActionRequest) (*Plan, error) + Plan(ctx context.Context, res ExpandedResource, act ActionRequest) (*resource.Resource, error) // Sync is called repeatedly by Entropy core until the returned state is // a terminal status. Driver implementation is free to execute an action @@ -32,13 +31,6 @@ type Driver interface { Output(ctx context.Context, res ExpandedResource) (json.RawMessage, error) } -// Plan represents the changes to be staged and later synced by module. -type Plan struct { - Reason string - Resource resource.Resource - ScheduleRunAt *time.Time -} - // Loggable extension of driver allows streaming log data for a resource. type Loggable interface { Driver diff --git a/core/module/service.go b/core/module/service.go index e3d07405..d2c5ddef 100644 --- a/core/module/service.go +++ b/core/module/service.go @@ -21,7 +21,7 @@ func NewService(registry Registry, store Store) *Service { } } -func (mr *Service) PlanAction(ctx context.Context, res ExpandedResource, act ActionRequest) (*Plan, error) { +func (mr *Service) PlanAction(ctx context.Context, res ExpandedResource, act ActionRequest) (*resource.Resource, error) { mod, err := mr.discoverModule(ctx, res.Kind, res.Project) if err != nil { return nil, err diff --git a/core/read.go b/core/read.go index 7c1769aa..3818264e 100644 --- a/core/read.go +++ b/core/read.go @@ -8,8 +8,8 @@ import ( "github.com/goto/entropy/pkg/errors" ) -func (s *Service) GetResource(ctx context.Context, urn string) (*resource.Resource, error) { - res, err := s.store.GetByURN(ctx, urn) +func (svc *Service) GetResource(ctx context.Context, urn string) (*resource.Resource, error) { + res, err := svc.store.GetByURN(ctx, urn) if err != nil { if errors.Is(err, errors.ErrNotFound) { return nil, errors.ErrNotFound.WithMsgf("resource with urn '%s' not found", urn) @@ -17,12 +17,12 @@ func (s *Service) GetResource(ctx context.Context, urn string) (*resource.Resour return nil, errors.ErrInternal.WithCausef(err.Error()) } - modSpec, err := s.generateModuleSpec(ctx, *res) + modSpec, err := svc.generateModuleSpec(ctx, *res) if err != nil { return nil, err } - output, err := s.moduleSvc.GetOutput(ctx, *modSpec) + output, err := svc.moduleSvc.GetOutput(ctx, *modSpec) if err != nil { return nil, err } @@ -32,26 +32,26 @@ func (s *Service) GetResource(ctx context.Context, urn string) (*resource.Resour return res, nil } -func (s *Service) ListResources(ctx context.Context, filter resource.Filter) ([]resource.Resource, error) { - resources, err := s.store.List(ctx, filter) +func (svc *Service) ListResources(ctx context.Context, filter resource.Filter) ([]resource.Resource, error) { + resources, err := svc.store.List(ctx, filter) if err != nil { return nil, errors.ErrInternal.WithCausef(err.Error()) } return filter.Apply(resources), nil } -func (s *Service) GetLog(ctx context.Context, urn string, filter map[string]string) (<-chan module.LogChunk, error) { - res, err := s.GetResource(ctx, urn) +func (svc *Service) GetLog(ctx context.Context, urn string, filter map[string]string) (<-chan module.LogChunk, error) { + res, err := svc.GetResource(ctx, urn) if err != nil { return nil, err } - modSpec, err := s.generateModuleSpec(ctx, *res) + modSpec, err := svc.generateModuleSpec(ctx, *res) if err != nil { return nil, err } - logCh, err := s.moduleSvc.StreamLogs(ctx, *modSpec, filter) + logCh, err := svc.moduleSvc.StreamLogs(ctx, *modSpec, filter) if err != nil { if errors.Is(err, errors.ErrUnsupported) { return nil, errors.ErrUnsupported.WithMsgf("log streaming not supported for kind '%s'", res.Kind) @@ -61,8 +61,8 @@ func (s *Service) GetLog(ctx context.Context, urn string, filter map[string]stri return logCh, nil } -func (s *Service) GetRevisions(ctx context.Context, selector resource.RevisionsSelector) ([]resource.Revision, error) { - revs, err := s.store.Revisions(ctx, selector) +func (svc *Service) GetRevisions(ctx context.Context, selector resource.RevisionsSelector) ([]resource.Revision, error) { + revs, err := svc.store.Revisions(ctx, selector) if err != nil { return nil, errors.ErrInternal.WithCausef(err.Error()) } diff --git a/core/read_test.go b/core/read_test.go index 9a501005..d8160e70 100644 --- a/core/read_test.go +++ b/core/read_test.go @@ -32,7 +32,7 @@ func TestService_GetResource(t *testing.T) { GetByURN(mock.Anything, mock.Anything). Return(nil, errors.ErrNotFound). Once() - return core.New(repo, nil, &mocks.AsyncWorker{}, nil, nil) + return core.New(repo, nil, nil, nil) }, urn: "foo:bar:baz", wantErr: errors.ErrNotFound, @@ -52,7 +52,7 @@ func TestService_GetResource(t *testing.T) { Return(nil, nil). Once() - return core.New(repo, mod, &mocks.AsyncWorker{}, deadClock, nil) + return core.New(repo, mod, deadClock, nil) }, urn: "foo:bar:baz", want: &sampleResource, @@ -99,7 +99,7 @@ func TestService_ListResources(t *testing.T) { List(mock.Anything, mock.Anything). Return(nil, nil). Once() - return core.New(repo, nil, &mocks.AsyncWorker{}, deadClock, nil) + return core.New(repo, nil, deadClock, nil) }, want: nil, wantErr: nil, @@ -113,7 +113,7 @@ func TestService_ListResources(t *testing.T) { List(mock.Anything, mock.Anything). Return(nil, errStoreFailure). Once() - return core.New(repo, nil, &mocks.AsyncWorker{}, deadClock, nil) + return core.New(repo, nil, deadClock, nil) }, want: nil, wantErr: errors.ErrInternal, @@ -127,7 +127,7 @@ func TestService_ListResources(t *testing.T) { List(mock.Anything, mock.Anything). Return([]resource.Resource{sampleResource}, nil). Once() - return core.New(repo, nil, &mocks.AsyncWorker{}, deadClock, nil) + return core.New(repo, nil, deadClock, nil) }, want: []resource.Resource{sampleResource}, wantErr: nil, diff --git a/core/resource/resource.go b/core/resource/resource.go index 64edcf40..028cf891 100644 --- a/core/resource/resource.go +++ b/core/resource/resource.go @@ -25,8 +25,12 @@ type Store interface { Delete(ctx context.Context, urn string, hooks ...MutationHook) error Revisions(ctx context.Context, selector RevisionsSelector) ([]Revision, error) + + SyncOne(ctx context.Context, syncFn SyncFn) error } +type SyncFn func(ctx context.Context, res Resource) (*Resource, error) + // MutationHook values are passed to mutation operations of resource storage // to handle any transactional requirements. type MutationHook func(ctx context.Context) error diff --git a/core/resource/state.go b/core/resource/state.go index a661fd83..13c0c784 100644 --- a/core/resource/state.go +++ b/core/resource/state.go @@ -2,6 +2,7 @@ package resource import ( "encoding/json" + "time" ) const ( @@ -15,6 +16,7 @@ const ( type State struct { Status string `json:"status"` Output json.RawMessage `json:"output"` + NextSyncAt *time.Time `json:"next_sync_at,omitempty"` ModuleData json.RawMessage `json:"module_data,omitempty"` } diff --git a/core/sync.go b/core/sync.go index 80a9319d..2c5045a6 100644 --- a/core/sync.go +++ b/core/sync.go @@ -2,110 +2,51 @@ package core import ( "context" - "encoding/json" - "fmt" "time" - "github.com/goto/entropy/core/module" + "go.uber.org/zap" + "github.com/goto/entropy/core/resource" "github.com/goto/entropy/pkg/errors" - "github.com/goto/entropy/pkg/worker" -) - -const ( - JobKindSyncResource = "sync_resource" - JobKindScheduledSyncResource = "sched_sync_resource" ) -type syncJobPayload struct { - ResourceURN string `json:"resource_urn"` - UpdatedAt time.Time `json:"updated_at"` -} +// RunSyncer runs the syncer thread that keeps performing resource-sync at +// regular intervals. +func (svc *Service) RunSyncer(ctx context.Context, interval time.Duration) error { + tick := time.NewTimer(interval) + defer tick.Stop() -func (s *Service) enqueueSyncJob(ctx context.Context, res resource.Resource, runAt time.Time, jobType string) error { - data := syncJobPayload{ - ResourceURN: res.URN, - UpdatedAt: res.UpdatedAt, - } + for { + select { + case <-ctx.Done(): + return nil - payload, err := json.Marshal(data) - if err != nil { - return err - } + case <-tick.C: + tick.Reset(interval) - job := worker.Job{ - ID: fmt.Sprintf(jobType+"-%s-%d", res.URN, runAt.Unix()), - Kind: jobType, - RunAt: runAt, - Payload: payload, - } - - if err := s.worker.Enqueue(ctx, job); err != nil && !errors.Is(err, worker.ErrJobExists) { - return err - } - return nil -} - -// HandleSyncJob is meant to be invoked by asyncWorker when an enqueued job is -// ready. -// TODO: make this private and move the registration of this handler inside New(). -func (s *Service) HandleSyncJob(ctx context.Context, job worker.Job) ([]byte, error) { - const retryBackoff = 5 * time.Second - - var data syncJobPayload - if err := json.Unmarshal(job.Payload, &data); err != nil { - return nil, err - } - - syncedRes, err := s.syncChange(ctx, data.ResourceURN) - if err != nil { - if errors.Is(err, errors.ErrInternal) { - return nil, &worker.RetryableError{ - Cause: err, - RetryAfter: retryBackoff, + err := svc.store.SyncOne(ctx, svc.handleSync) + if err != nil { + svc.logger.Warn("SyncOne() failed", zap.Error(err)) } } - return nil, err } - - return json.Marshal(map[string]interface{}{ - "status": syncedRes.State.Status, - }) } -func (s *Service) syncChange(ctx context.Context, urn string) (*resource.Resource, error) { - res, err := s.GetResource(ctx, urn) - if err != nil { - return nil, err - } - - modSpec, err := s.generateModuleSpec(ctx, *res) +func (svc *Service) handleSync(ctx context.Context, res resource.Resource) (*resource.Resource, error) { + modSpec, err := svc.generateModuleSpec(ctx, res) if err != nil { return nil, err } - oldState := res.State.Clone() - newState, err := s.moduleSvc.SyncState(ctx, *modSpec) + newState, err := svc.moduleSvc.SyncState(ctx, *modSpec) if err != nil { if errors.Is(err, errors.ErrInvalid) { return nil, err } return nil, errors.ErrInternal.WithMsgf("sync() failed").WithCausef(err.Error()) } - res.UpdatedAt = s.clock() + res.UpdatedAt = svc.clock() res.State = *newState - // TODO: clarify on behaviour when resource schedule for deletion reaches error. - shouldDelete := oldState.InDeletion() && newState.IsTerminal() - if shouldDelete { - if err := s.DeleteResource(ctx, urn); err != nil { - return nil, err - } - } else { - if err := s.upsert(ctx, module.Plan{Resource: *res}, false, false, ""); err != nil { - return nil, err - } - } - - return res, nil + return &res, nil } diff --git a/core/write.go b/core/write.go index 0735d776..0d01c584 100644 --- a/core/write.go +++ b/core/write.go @@ -2,13 +2,14 @@ package core import ( "context" + "fmt" "github.com/goto/entropy/core/module" "github.com/goto/entropy/core/resource" "github.com/goto/entropy/pkg/errors" ) -func (s *Service) CreateResource(ctx context.Context, res resource.Resource) (*resource.Resource, error) { +func (svc *Service) CreateResource(ctx context.Context, res resource.Resource) (*resource.Resource, error) { if err := res.Validate(true); err != nil { return nil, err } @@ -20,32 +21,32 @@ func (s *Service) CreateResource(ctx context.Context, res resource.Resource) (*r } res.Spec.Configs = nil - return s.execAction(ctx, res, act) + return svc.execAction(ctx, res, act) } -func (s *Service) UpdateResource(ctx context.Context, urn string, req resource.UpdateRequest) (*resource.Resource, error) { +func (svc *Service) UpdateResource(ctx context.Context, urn string, req resource.UpdateRequest) (*resource.Resource, error) { if len(req.Spec.Dependencies) != 0 { return nil, errors.ErrUnsupported.WithMsgf("updating dependencies is not supported") } else if len(req.Spec.Configs) == 0 { return nil, errors.ErrInvalid.WithMsgf("no config is being updated, nothing to do") } - return s.ApplyAction(ctx, urn, module.ActionRequest{ + return svc.ApplyAction(ctx, urn, module.ActionRequest{ Name: module.UpdateAction, Params: req.Spec.Configs, Labels: req.Labels, }) } -func (s *Service) DeleteResource(ctx context.Context, urn string) error { - _, actionErr := s.ApplyAction(ctx, urn, module.ActionRequest{ +func (svc *Service) DeleteResource(ctx context.Context, urn string) error { + _, actionErr := svc.ApplyAction(ctx, urn, module.ActionRequest{ Name: module.DeleteAction, }) return actionErr } -func (s *Service) ApplyAction(ctx context.Context, urn string, act module.ActionRequest) (*resource.Resource, error) { - res, err := s.GetResource(ctx, urn) +func (svc *Service) ApplyAction(ctx context.Context, urn string, act module.ActionRequest) (*resource.Resource, error) { + res, err := svc.GetResource(ctx, urn) if err != nil { return nil, err } else if !res.State.IsTerminal() { @@ -53,36 +54,37 @@ func (s *Service) ApplyAction(ctx context.Context, urn string, act module.Action WithMsgf("cannot perform '%s' on resource in '%s'", act.Name, res.State.Status) } - return s.execAction(ctx, *res, act) + return svc.execAction(ctx, *res, act) } -func (s *Service) execAction(ctx context.Context, res resource.Resource, act module.ActionRequest) (*resource.Resource, error) { - planned, err := s.planChange(ctx, res, act) +func (svc *Service) execAction(ctx context.Context, res resource.Resource, act module.ActionRequest) (*resource.Resource, error) { + planned, err := svc.planChange(ctx, res, act) if err != nil { return nil, err } if isCreate(act.Name) { - planned.Resource.CreatedAt = s.clock() - planned.Resource.UpdatedAt = planned.Resource.CreatedAt + planned.CreatedAt = svc.clock() + planned.UpdatedAt = planned.CreatedAt } else { - planned.Resource.CreatedAt = res.CreatedAt - planned.Resource.UpdatedAt = s.clock() + planned.CreatedAt = res.CreatedAt + planned.UpdatedAt = svc.clock() } - if err := s.upsert(ctx, *planned, isCreate(act.Name), true, planned.Reason); err != nil { + reason := fmt.Sprintf("action:%s", act.Name) + if err := svc.upsert(ctx, *planned, isCreate(act.Name), true, reason); err != nil { return nil, err } - return &planned.Resource, nil + return planned, nil } -func (s *Service) planChange(ctx context.Context, res resource.Resource, act module.ActionRequest) (*module.Plan, error) { - modSpec, err := s.generateModuleSpec(ctx, res) +func (svc *Service) planChange(ctx context.Context, res resource.Resource, act module.ActionRequest) (*resource.Resource, error) { + modSpec, err := svc.generateModuleSpec(ctx, res) if err != nil { return nil, err } - planned, err := s.moduleSvc.PlanAction(ctx, *modSpec, act) + planned, err := svc.moduleSvc.PlanAction(ctx, *modSpec, act) if err != nil { if errors.Is(err, errors.ErrInvalid) { return nil, err @@ -90,43 +92,27 @@ func (s *Service) planChange(ctx context.Context, res resource.Resource, act mod return nil, errors.ErrInternal.WithMsgf("plan() failed").WithCausef(err.Error()) } - planned.Resource.Labels = act.Labels - if err := planned.Resource.Validate(isCreate(act.Name)); err != nil { + planned.Labels = act.Labels + if err := planned.Validate(isCreate(act.Name)); err != nil { return nil, err } return planned, nil } -func (s *Service) upsert(ctx context.Context, plan module.Plan, isCreate bool, saveRevision bool, reason string) error { - var hooks []resource.MutationHook - hooks = append(hooks, func(ctx context.Context) error { - if plan.Resource.State.IsTerminal() { - // no need to enqueue if resource has reached terminal state. - return nil - } - - return s.enqueueSyncJob(ctx, plan.Resource, s.clock(), JobKindSyncResource) - }) - - if plan.ScheduleRunAt != nil { - hooks = append(hooks, func(ctx context.Context) error { - return s.enqueueSyncJob(ctx, plan.Resource, *plan.ScheduleRunAt, JobKindScheduledSyncResource) - }) - } - +func (svc *Service) upsert(ctx context.Context, res resource.Resource, isCreate bool, saveRevision bool, reason string) error { var err error if isCreate { - err = s.store.Create(ctx, plan.Resource, hooks...) + err = svc.store.Create(ctx, res) } else { - err = s.store.Update(ctx, plan.Resource, saveRevision, reason, hooks...) + err = svc.store.Update(ctx, res, saveRevision, reason) } if err != nil { if isCreate && errors.Is(err, errors.ErrConflict) { - return errors.ErrConflict.WithMsgf("resource with urn '%s' already exists", plan.Resource.URN) + return errors.ErrConflict.WithMsgf("resource with urn '%s' already exists", res.URN) } else if !isCreate && errors.Is(err, errors.ErrNotFound) { - return errors.ErrNotFound.WithMsgf("resource with urn '%s' does not exist", plan.Resource.URN) + return errors.ErrNotFound.WithMsgf("resource with urn '%s' does not exist", res.URN) } return errors.ErrInternal.WithCausef(err.Error()) } diff --git a/core/write_test.go b/core/write_test.go index f6233519..29ddd3fc 100644 --- a/core/write_test.go +++ b/core/write_test.go @@ -37,7 +37,7 @@ func TestService_CreateResource(t *testing.T) { PlanAction(mock.Anything, mock.Anything, mock.Anything). Return(nil, errSample).Once() - return core.New(nil, mod, &mocks.AsyncWorker{}, deadClock, nil) + return core.New(nil, mod, deadClock, nil) }, res: resource.Resource{ Kind: "mock", @@ -59,7 +59,7 @@ func TestService_CreateResource(t *testing.T) { Return(nil, errors.ErrNotFound). Once() - return core.New(resourceRepo, mod, &mocks.AsyncWorker{}, deadClock, nil) + return core.New(resourceRepo, mod, deadClock, nil) }, res: resource.Resource{ Kind: "mock", @@ -98,7 +98,7 @@ func TestService_CreateResource(t *testing.T) { }, nil). Once() - return core.New(resourceRepo, mod, &mocks.AsyncWorker{}, deadClock, nil) + return core.New(resourceRepo, mod, deadClock, nil) }, res: resource.Resource{ Kind: "mock", @@ -136,7 +136,7 @@ func TestService_CreateResource(t *testing.T) { }, nil). Once() - return core.New(resourceRepo, mod, &mocks.AsyncWorker{}, deadClock, nil) + return core.New(resourceRepo, mod, deadClock, nil) }, res: resource.Resource{ Kind: "mock", @@ -158,12 +158,10 @@ func TestService_CreateResource(t *testing.T) { mod := &mocks.ModuleService{} mod.EXPECT(). PlanAction(mock.Anything, mock.Anything, mock.Anything). - Return(&module.Plan{ - Resource: resource.Resource{ - Kind: "mock", - Name: "child", - Project: "project", - }, + Return(&resource.Resource{ + Kind: "mock", + Name: "child", + Project: "project", }, nil).Once() resourceRepo := &mocks.ResourceStore{} @@ -172,7 +170,7 @@ func TestService_CreateResource(t *testing.T) { Return(errSample). Once() - return core.New(resourceRepo, mod, &mocks.AsyncWorker{}, deadClock, nil) + return core.New(resourceRepo, mod, deadClock, nil) }, res: resource.Resource{ Kind: "mock", @@ -189,12 +187,10 @@ func TestService_CreateResource(t *testing.T) { mod := &mocks.ModuleService{} mod.EXPECT(). PlanAction(mock.Anything, mock.Anything, mock.Anything). - Return(&module.Plan{ - Resource: resource.Resource{ - Kind: "mock", - Name: "child", - Project: "project", - }, + Return(&resource.Resource{ + Kind: "mock", + Name: "child", + Project: "project", }, nil).Once() resourceRepo := &mocks.ResourceStore{} @@ -202,7 +198,7 @@ func TestService_CreateResource(t *testing.T) { Create(mock.Anything, mock.Anything, mock.Anything). Return(errors.ErrConflict).Once() - return core.New(resourceRepo, mod, &mocks.AsyncWorker{}, deadClock, nil) + return core.New(resourceRepo, mod, deadClock, nil) }, res: resource.Resource{ Kind: "mock", @@ -219,13 +215,11 @@ func TestService_CreateResource(t *testing.T) { mod := &mocks.ModuleService{} mod.EXPECT(). PlanAction(mock.Anything, mock.Anything, mock.Anything). - Return(&module.Plan{ - Resource: resource.Resource{ - Kind: "mock", - Name: "child", - Project: "project", - State: resource.State{Status: resource.StatusCompleted}, - }, + Return(&resource.Resource{ + Kind: "mock", + Name: "child", + Project: "project", + State: resource.State{Status: resource.StatusCompleted}, }, nil).Once() mod.EXPECT(). GetOutput(mock.Anything, mock.Anything). @@ -247,8 +241,7 @@ func TestService_CreateResource(t *testing.T) { resourceRepo.EXPECT(). Create(mock.Anything, mock.Anything, mock.Anything). Run(func(ctx context.Context, r resource.Resource, hooks ...resource.MutationHook) { - assert.Len(t, hooks, 1) - assert.NoError(t, hooks[0](ctx)) + assert.Len(t, hooks, 0) }). Return(nil). Once() @@ -262,7 +255,7 @@ func TestService_CreateResource(t *testing.T) { }). Return(nil) - return core.New(resourceRepo, mod, mockWorker, deadClock, nil) + return core.New(resourceRepo, mod, deadClock, nil) }, res: resource.Resource{ Kind: "mock", @@ -335,7 +328,7 @@ func TestService_UpdateResource(t *testing.T) { Return(nil, errors.ErrNotFound). Once() - return core.New(resourceRepo, nil, &mocks.AsyncWorker{}, deadClock, nil) + return core.New(resourceRepo, nil, deadClock, nil) }, urn: "orn:entropy:mock:project:child", update: resource.UpdateRequest{ @@ -364,7 +357,7 @@ func TestService_UpdateResource(t *testing.T) { Return(&testResource, nil). Once() - return core.New(resourceRepo, mod, &mocks.AsyncWorker{}, deadClock, nil) + return core.New(resourceRepo, mod, deadClock, nil) }, urn: "orn:entropy:mock:project:child", update: resource.UpdateRequest{ @@ -381,7 +374,7 @@ func TestService_UpdateResource(t *testing.T) { mod := &mocks.ModuleService{} mod.EXPECT(). PlanAction(mock.Anything, mock.Anything, mock.Anything). - Return(&module.Plan{Resource: testResource}, nil).Once() + Return(&testResource, nil).Once() mod.EXPECT(). GetOutput(mock.Anything, mock.Anything). Return(nil, nil). @@ -396,8 +389,7 @@ func TestService_UpdateResource(t *testing.T) { resourceRepo.EXPECT(). Update(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). Run(func(ctx context.Context, r resource.Resource, saveRevision bool, reason string, hooks ...resource.MutationHook) { - assert.Len(t, hooks, 1) - assert.NoError(t, hooks[0](ctx)) + assert.Len(t, hooks, 0) }). Return(testErr) @@ -412,7 +404,7 @@ func TestService_UpdateResource(t *testing.T) { Return(nil). Once() - return core.New(resourceRepo, mod, mockWorker, deadClock, nil) + return core.New(resourceRepo, mod, deadClock, nil) }, urn: "orn:entropy:mock:project:child", update: resource.UpdateRequest{ @@ -429,18 +421,16 @@ func TestService_UpdateResource(t *testing.T) { mod := &mocks.ModuleService{} mod.EXPECT(). PlanAction(mock.Anything, mock.Anything, mock.Anything). - Return(&module.Plan{ - Resource: resource.Resource{ - URN: "orn:entropy:mock:project:child", - Kind: "mock", - Name: "child", - Project: "project", - Spec: resource.Spec{ - Configs: []byte(`{"foo": "bar"}`), - }, - State: resource.State{Status: resource.StatusPending}, - CreatedAt: frozenTime, + Return(&resource.Resource{ + URN: "orn:entropy:mock:project:child", + Kind: "mock", + Name: "child", + Project: "project", + Spec: resource.Spec{ + Configs: []byte(`{"foo": "bar"}`), }, + State: resource.State{Status: resource.StatusPending}, + CreatedAt: frozenTime, }, nil).Once() mod.EXPECT(). GetOutput(mock.Anything, mock.Anything). @@ -456,23 +446,11 @@ func TestService_UpdateResource(t *testing.T) { Update(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(nil). Run(func(ctx context.Context, r resource.Resource, saveRevision bool, reason string, hooks ...resource.MutationHook) { - assert.Len(t, hooks, 1) - assert.NoError(t, hooks[0](ctx)) + assert.Len(t, hooks, 0) }). Twice() - mockWorker := &mocks.AsyncWorker{} - mockWorker.EXPECT(). - Enqueue(mock.Anything, mock.Anything). - Return(nil). - Run(func(ctx context.Context, jobs ...worker.Job) { - assert.Len(t, jobs, 1) - assert.Equal(t, jobs[0].ID, "sync_resource-orn:entropy:mock:project:child-1650536955") - assert.Equal(t, jobs[0].Kind, "sync_resource") - }). - Once() - - return core.New(resourceRepo, mod, mockWorker, deadClock, nil) + return core.New(resourceRepo, mod, deadClock, nil) }, urn: "orn:entropy:mock:project:child", update: resource.UpdateRequest{ @@ -535,7 +513,7 @@ func TestService_DeleteResource(t *testing.T) { Return(nil, testErr). Once() - return core.New(resourceRepo, nil, &mocks.AsyncWorker{}, deadClock, nil) + return core.New(resourceRepo, nil, deadClock, nil) }, urn: "orn:entropy:mock:foo:bar", wantErr: testErr, @@ -547,16 +525,14 @@ func TestService_DeleteResource(t *testing.T) { mod := &mocks.ModuleService{} mod.EXPECT(). PlanAction(mock.Anything, mock.Anything, mock.Anything). - Return(&module.Plan{ - Resource: resource.Resource{ - URN: "orn:entropy:mock:project:child", - Kind: "mock", - Name: "child", - Project: "project", - State: resource.State{Status: resource.StatusPending}, - CreatedAt: frozenTime, - UpdatedAt: frozenTime, - }, + Return(&resource.Resource{ + URN: "orn:entropy:mock:project:child", + Kind: "mock", + Name: "child", + Project: "project", + State: resource.State{Status: resource.StatusPending}, + CreatedAt: frozenTime, + UpdatedAt: frozenTime, }, nil).Once() mod.EXPECT(). GetOutput(mock.Anything, mock.Anything). @@ -582,7 +558,7 @@ func TestService_DeleteResource(t *testing.T) { Return(testErr). Once() - return core.New(resourceRepo, mod, &mocks.AsyncWorker{}, deadClock, nil) + return core.New(resourceRepo, mod, deadClock, nil) }, urn: "orn:entropy:mock:foo:bar", wantErr: errors.ErrInternal, @@ -594,16 +570,14 @@ func TestService_DeleteResource(t *testing.T) { mod := &mocks.ModuleService{} mod.EXPECT(). PlanAction(mock.Anything, mock.Anything, mock.Anything). - Return(&module.Plan{ - Resource: resource.Resource{ - URN: "orn:entropy:mock:project:child", - Kind: "mock", - Name: "child", - Project: "project", - State: resource.State{Status: resource.StatusPending}, - CreatedAt: frozenTime, - UpdatedAt: frozenTime, - }, + Return(&resource.Resource{ + URN: "orn:entropy:mock:project:child", + Kind: "mock", + Name: "child", + Project: "project", + State: resource.State{Status: resource.StatusPending}, + CreatedAt: frozenTime, + UpdatedAt: frozenTime, }, nil).Once() mod.EXPECT(). GetOutput(mock.Anything, mock.Anything). @@ -629,7 +603,7 @@ func TestService_DeleteResource(t *testing.T) { Return(nil). Once() - return core.New(resourceRepo, mod, &mocks.AsyncWorker{}, deadClock, nil) + return core.New(resourceRepo, mod, deadClock, nil) }, urn: "orn:entropy:mock:foo:bar", wantErr: nil, @@ -679,7 +653,7 @@ func TestService_ApplyAction(t *testing.T) { Return(nil, errors.ErrNotFound). Once() - return core.New(resourceRepo, nil, &mocks.AsyncWorker{}, deadClock, nil) + return core.New(resourceRepo, nil, deadClock, nil) }, urn: "orn:entropy:mock:foo:bar", action: sampleAction, @@ -706,7 +680,7 @@ func TestService_ApplyAction(t *testing.T) { }, nil). Once() - return core.New(resourceRepo, mod, &mocks.AsyncWorker{}, deadClock, nil) + return core.New(resourceRepo, mod, deadClock, nil) }, urn: "orn:entropy:mock:foo:bar", action: sampleAction, @@ -739,7 +713,7 @@ func TestService_ApplyAction(t *testing.T) { }, nil). Once() - return core.New(resourceRepo, mod, &mocks.AsyncWorker{}, deadClock, nil) + return core.New(resourceRepo, mod, deadClock, nil) }, urn: "orn:entropy:mock:foo:bar", action: sampleAction, @@ -753,14 +727,12 @@ func TestService_ApplyAction(t *testing.T) { mod := &mocks.ModuleService{} mod.EXPECT(). PlanAction(mock.Anything, mock.Anything, sampleAction). - Return(&module.Plan{ - Resource: resource.Resource{ - URN: "orn:entropy:mock:foo:bar", - Kind: "mock", - Project: "foo", - Name: "bar", - State: resource.State{Status: resource.StatusPending}, - }, + Return(&resource.Resource{ + URN: "orn:entropy:mock:foo:bar", + Kind: "mock", + Project: "foo", + Name: "bar", + State: resource.State{Status: resource.StatusPending}, }, nil).Once() mod.EXPECT(). GetOutput(mock.Anything, mock.Anything). @@ -784,7 +756,7 @@ func TestService_ApplyAction(t *testing.T) { Return(nil). Once() - return core.New(resourceRepo, mod, &mocks.AsyncWorker{}, deadClock, nil) + return core.New(resourceRepo, mod, deadClock, nil) }, urn: "orn:entropy:mock:foo:bar", action: sampleAction, diff --git a/internal/server/v1/mocks/module_service.go b/internal/server/v1/mocks/module_service.go index a486135c..b42c63aa 100644 --- a/internal/server/v1/mocks/module_service.go +++ b/internal/server/v1/mocks/module_service.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. +// Code generated by mockery v2.23.1. DO NOT EDIT. package mocks @@ -29,6 +29,10 @@ func (_m *ModuleService) CreateModule(ctx context.Context, mod module.Module) (* ret := _m.Called(ctx, mod) var r0 *module.Module + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, module.Module) (*module.Module, error)); ok { + return rf(ctx, mod) + } if rf, ok := ret.Get(0).(func(context.Context, module.Module) *module.Module); ok { r0 = rf(ctx, mod) } else { @@ -37,7 +41,6 @@ func (_m *ModuleService) CreateModule(ctx context.Context, mod module.Module) (* } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, module.Module) error); ok { r1 = rf(ctx, mod) } else { @@ -71,6 +74,11 @@ func (_c *ModuleService_CreateModule_Call) Return(_a0 *module.Module, _a1 error) return _c } +func (_c *ModuleService_CreateModule_Call) RunAndReturn(run func(context.Context, module.Module) (*module.Module, error)) *ModuleService_CreateModule_Call { + _c.Call.Return(run) + return _c +} + // DeleteModule provides a mock function with given fields: ctx, urn func (_m *ModuleService) DeleteModule(ctx context.Context, urn string) error { ret := _m.Called(ctx, urn) @@ -109,11 +117,20 @@ func (_c *ModuleService_DeleteModule_Call) Return(_a0 error) *ModuleService_Dele return _c } +func (_c *ModuleService_DeleteModule_Call) RunAndReturn(run func(context.Context, string) error) *ModuleService_DeleteModule_Call { + _c.Call.Return(run) + return _c +} + // GetModule provides a mock function with given fields: ctx, urn func (_m *ModuleService) GetModule(ctx context.Context, urn string) (*module.Module, error) { ret := _m.Called(ctx, urn) var r0 *module.Module + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (*module.Module, error)); ok { + return rf(ctx, urn) + } if rf, ok := ret.Get(0).(func(context.Context, string) *module.Module); ok { r0 = rf(ctx, urn) } else { @@ -122,7 +139,6 @@ func (_m *ModuleService) GetModule(ctx context.Context, urn string) (*module.Mod } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { r1 = rf(ctx, urn) } else { @@ -156,11 +172,20 @@ func (_c *ModuleService_GetModule_Call) Return(_a0 *module.Module, _a1 error) *M return _c } +func (_c *ModuleService_GetModule_Call) RunAndReturn(run func(context.Context, string) (*module.Module, error)) *ModuleService_GetModule_Call { + _c.Call.Return(run) + return _c +} + // ListModules provides a mock function with given fields: ctx, project func (_m *ModuleService) ListModules(ctx context.Context, project string) ([]module.Module, error) { ret := _m.Called(ctx, project) var r0 []module.Module + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) ([]module.Module, error)); ok { + return rf(ctx, project) + } if rf, ok := ret.Get(0).(func(context.Context, string) []module.Module); ok { r0 = rf(ctx, project) } else { @@ -169,7 +194,6 @@ func (_m *ModuleService) ListModules(ctx context.Context, project string) ([]mod } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { r1 = rf(ctx, project) } else { @@ -203,11 +227,20 @@ func (_c *ModuleService_ListModules_Call) Return(_a0 []module.Module, _a1 error) return _c } +func (_c *ModuleService_ListModules_Call) RunAndReturn(run func(context.Context, string) ([]module.Module, error)) *ModuleService_ListModules_Call { + _c.Call.Return(run) + return _c +} + // UpdateModule provides a mock function with given fields: ctx, urn, newConfigs func (_m *ModuleService) UpdateModule(ctx context.Context, urn string, newConfigs json.RawMessage) (*module.Module, error) { ret := _m.Called(ctx, urn, newConfigs) var r0 *module.Module + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, json.RawMessage) (*module.Module, error)); ok { + return rf(ctx, urn, newConfigs) + } if rf, ok := ret.Get(0).(func(context.Context, string, json.RawMessage) *module.Module); ok { r0 = rf(ctx, urn, newConfigs) } else { @@ -216,7 +249,6 @@ func (_m *ModuleService) UpdateModule(ctx context.Context, urn string, newConfig } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, string, json.RawMessage) error); ok { r1 = rf(ctx, urn, newConfigs) } else { @@ -251,6 +283,11 @@ func (_c *ModuleService_UpdateModule_Call) Return(_a0 *module.Module, _a1 error) return _c } +func (_c *ModuleService_UpdateModule_Call) RunAndReturn(run func(context.Context, string, json.RawMessage) (*module.Module, error)) *ModuleService_UpdateModule_Call { + _c.Call.Return(run) + return _c +} + type mockConstructorTestingTNewModuleService interface { mock.TestingT Cleanup(func()) diff --git a/internal/server/v1/mocks/resource_service.go b/internal/server/v1/mocks/resource_service.go index 637f48be..a96fb7cd 100644 --- a/internal/server/v1/mocks/resource_service.go +++ b/internal/server/v1/mocks/resource_service.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. +// Code generated by mockery v2.23.1. DO NOT EDIT. package mocks @@ -29,6 +29,10 @@ func (_m *ResourceService) ApplyAction(ctx context.Context, urn string, action m ret := _m.Called(ctx, urn, action) var r0 *resource.Resource + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, module.ActionRequest) (*resource.Resource, error)); ok { + return rf(ctx, urn, action) + } if rf, ok := ret.Get(0).(func(context.Context, string, module.ActionRequest) *resource.Resource); ok { r0 = rf(ctx, urn, action) } else { @@ -37,7 +41,6 @@ func (_m *ResourceService) ApplyAction(ctx context.Context, urn string, action m } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, string, module.ActionRequest) error); ok { r1 = rf(ctx, urn, action) } else { @@ -72,11 +75,20 @@ func (_c *ResourceService_ApplyAction_Call) Return(_a0 *resource.Resource, _a1 e return _c } +func (_c *ResourceService_ApplyAction_Call) RunAndReturn(run func(context.Context, string, module.ActionRequest) (*resource.Resource, error)) *ResourceService_ApplyAction_Call { + _c.Call.Return(run) + return _c +} + // CreateResource provides a mock function with given fields: ctx, res func (_m *ResourceService) CreateResource(ctx context.Context, res resource.Resource) (*resource.Resource, error) { ret := _m.Called(ctx, res) var r0 *resource.Resource + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, resource.Resource) (*resource.Resource, error)); ok { + return rf(ctx, res) + } if rf, ok := ret.Get(0).(func(context.Context, resource.Resource) *resource.Resource); ok { r0 = rf(ctx, res) } else { @@ -85,7 +97,6 @@ func (_m *ResourceService) CreateResource(ctx context.Context, res resource.Reso } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, resource.Resource) error); ok { r1 = rf(ctx, res) } else { @@ -119,6 +130,11 @@ func (_c *ResourceService_CreateResource_Call) Return(_a0 *resource.Resource, _a return _c } +func (_c *ResourceService_CreateResource_Call) RunAndReturn(run func(context.Context, resource.Resource) (*resource.Resource, error)) *ResourceService_CreateResource_Call { + _c.Call.Return(run) + return _c +} + // DeleteResource provides a mock function with given fields: ctx, urn func (_m *ResourceService) DeleteResource(ctx context.Context, urn string) error { ret := _m.Called(ctx, urn) @@ -157,11 +173,20 @@ func (_c *ResourceService_DeleteResource_Call) Return(_a0 error) *ResourceServic return _c } +func (_c *ResourceService_DeleteResource_Call) RunAndReturn(run func(context.Context, string) error) *ResourceService_DeleteResource_Call { + _c.Call.Return(run) + return _c +} + // GetLog provides a mock function with given fields: ctx, urn, filter func (_m *ResourceService) GetLog(ctx context.Context, urn string, filter map[string]string) (<-chan module.LogChunk, error) { ret := _m.Called(ctx, urn, filter) var r0 <-chan module.LogChunk + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, map[string]string) (<-chan module.LogChunk, error)); ok { + return rf(ctx, urn, filter) + } if rf, ok := ret.Get(0).(func(context.Context, string, map[string]string) <-chan module.LogChunk); ok { r0 = rf(ctx, urn, filter) } else { @@ -170,7 +195,6 @@ func (_m *ResourceService) GetLog(ctx context.Context, urn string, filter map[st } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, string, map[string]string) error); ok { r1 = rf(ctx, urn, filter) } else { @@ -205,11 +229,20 @@ func (_c *ResourceService_GetLog_Call) Return(_a0 <-chan module.LogChunk, _a1 er return _c } +func (_c *ResourceService_GetLog_Call) RunAndReturn(run func(context.Context, string, map[string]string) (<-chan module.LogChunk, error)) *ResourceService_GetLog_Call { + _c.Call.Return(run) + return _c +} + // GetResource provides a mock function with given fields: ctx, urn func (_m *ResourceService) GetResource(ctx context.Context, urn string) (*resource.Resource, error) { ret := _m.Called(ctx, urn) var r0 *resource.Resource + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (*resource.Resource, error)); ok { + return rf(ctx, urn) + } if rf, ok := ret.Get(0).(func(context.Context, string) *resource.Resource); ok { r0 = rf(ctx, urn) } else { @@ -218,7 +251,6 @@ func (_m *ResourceService) GetResource(ctx context.Context, urn string) (*resour } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { r1 = rf(ctx, urn) } else { @@ -252,11 +284,20 @@ func (_c *ResourceService_GetResource_Call) Return(_a0 *resource.Resource, _a1 e return _c } +func (_c *ResourceService_GetResource_Call) RunAndReturn(run func(context.Context, string) (*resource.Resource, error)) *ResourceService_GetResource_Call { + _c.Call.Return(run) + return _c +} + // GetRevisions provides a mock function with given fields: ctx, selector func (_m *ResourceService) GetRevisions(ctx context.Context, selector resource.RevisionsSelector) ([]resource.Revision, error) { ret := _m.Called(ctx, selector) var r0 []resource.Revision + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, resource.RevisionsSelector) ([]resource.Revision, error)); ok { + return rf(ctx, selector) + } if rf, ok := ret.Get(0).(func(context.Context, resource.RevisionsSelector) []resource.Revision); ok { r0 = rf(ctx, selector) } else { @@ -265,7 +306,6 @@ func (_m *ResourceService) GetRevisions(ctx context.Context, selector resource.R } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, resource.RevisionsSelector) error); ok { r1 = rf(ctx, selector) } else { @@ -299,11 +339,20 @@ func (_c *ResourceService_GetRevisions_Call) Return(_a0 []resource.Revision, _a1 return _c } +func (_c *ResourceService_GetRevisions_Call) RunAndReturn(run func(context.Context, resource.RevisionsSelector) ([]resource.Revision, error)) *ResourceService_GetRevisions_Call { + _c.Call.Return(run) + return _c +} + // ListResources provides a mock function with given fields: ctx, filter func (_m *ResourceService) ListResources(ctx context.Context, filter resource.Filter) ([]resource.Resource, error) { ret := _m.Called(ctx, filter) var r0 []resource.Resource + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, resource.Filter) ([]resource.Resource, error)); ok { + return rf(ctx, filter) + } if rf, ok := ret.Get(0).(func(context.Context, resource.Filter) []resource.Resource); ok { r0 = rf(ctx, filter) } else { @@ -312,7 +361,6 @@ func (_m *ResourceService) ListResources(ctx context.Context, filter resource.Fi } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, resource.Filter) error); ok { r1 = rf(ctx, filter) } else { @@ -346,11 +394,20 @@ func (_c *ResourceService_ListResources_Call) Return(_a0 []resource.Resource, _a return _c } +func (_c *ResourceService_ListResources_Call) RunAndReturn(run func(context.Context, resource.Filter) ([]resource.Resource, error)) *ResourceService_ListResources_Call { + _c.Call.Return(run) + return _c +} + // UpdateResource provides a mock function with given fields: ctx, urn, req func (_m *ResourceService) UpdateResource(ctx context.Context, urn string, req resource.UpdateRequest) (*resource.Resource, error) { ret := _m.Called(ctx, urn, req) var r0 *resource.Resource + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, resource.UpdateRequest) (*resource.Resource, error)); ok { + return rf(ctx, urn, req) + } if rf, ok := ret.Get(0).(func(context.Context, string, resource.UpdateRequest) *resource.Resource); ok { r0 = rf(ctx, urn, req) } else { @@ -359,7 +416,6 @@ func (_m *ResourceService) UpdateResource(ctx context.Context, urn string, req r } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, string, resource.UpdateRequest) error); ok { r1 = rf(ctx, urn, req) } else { @@ -394,6 +450,11 @@ func (_c *ResourceService_UpdateResource_Call) Return(_a0 *resource.Resource, _a return _c } +func (_c *ResourceService_UpdateResource_Call) RunAndReturn(run func(context.Context, string, resource.UpdateRequest) (*resource.Resource, error)) *ResourceService_UpdateResource_Call { + _c.Call.Return(run) + return _c +} + type mockConstructorTestingTNewResourceService interface { mock.TestingT Cleanup(func()) diff --git a/internal/store/postgres/postgres.go b/internal/store/postgres/postgres.go index 0110071b..6dd4a3f1 100644 --- a/internal/store/postgres/postgres.go +++ b/internal/store/postgres/postgres.go @@ -3,8 +3,11 @@ package postgres import ( "context" _ "embed" + "time" "github.com/jmoiron/sqlx" + + "github.com/goto/entropy/pkg/errors" ) const ( @@ -25,7 +28,9 @@ const ( var schema string type Store struct { - db *sqlx.DB + db *sqlx.DB + extendInterval time.Duration + refreshInterval time.Duration } func (st *Store) Migrate(ctx context.Context) error { @@ -35,11 +40,20 @@ func (st *Store) Migrate(ctx context.Context) error { func (st *Store) Close() error { return st.db.Close() } -// Open returns store instance backed by PostgreSQL. -func Open(conStr string) (*Store, error) { +// Open returns store instance backed by PostgresQL. +func Open(conStr string, refreshInterval, extendInterval time.Duration) (*Store, error) { db, err := sqlx.Open("postgres", conStr) if err != nil { return nil, err } - return &Store{db: db}, nil + + if refreshInterval >= extendInterval { + return nil, errors.New("refreshInterval must be lower than extendInterval") + } + + return &Store{ + db: db, + extendInterval: extendInterval, + refreshInterval: refreshInterval, + }, nil } diff --git a/internal/store/postgres/resource_store.go b/internal/store/postgres/resource_store.go index a2aa6b8d..72bbfac6 100644 --- a/internal/store/postgres/resource_store.go +++ b/internal/store/postgres/resource_store.go @@ -3,6 +3,7 @@ package postgres import ( "context" "database/sql" + "time" sq "github.com/Masterminds/squirrel" "github.com/jmoiron/sqlx" @@ -219,6 +220,89 @@ func (st *Store) Delete(ctx context.Context, urn string, hooks ...resource.Mutat return withinTx(ctx, st.db, false, deleteFn) } +func (st *Store) SyncOne(ctx context.Context, syncFn resource.SyncFn) error { + urn, err := st.fetchResourceForSync(ctx) + if err != nil { + return err + } else if urn == "" { + // No resource available for sync. + return nil + } + + cur, err := st.GetByURN(ctx, urn) + if err != nil { + return err + } + + synced, err := st.handleDequeued(ctx, *cur, syncFn) + if err != nil { + return err + } + + return st.Update(ctx, *synced, false, "sync") +} + +func (st *Store) handleDequeued(baseCtx context.Context, res resource.Resource, fn resource.SyncFn) (*resource.Resource, error) { + runCtx, cancel := context.WithCancel(baseCtx) + defer cancel() + + // Run heartbeat to keep the resource being picked up by some other syncer + // thread. If heartbeat exits, runCtx will be cancelled and fn should exit. + go st.runHeartbeat(runCtx, cancel, res.URN) + + return fn(runCtx, res) +} + +func (st *Store) fetchResourceForSync(ctx context.Context) (string, error) { + builder := sq. + Select("urn"). + From(tableResources). + Where(sq.Expr("state_next_sync <= current_timestamp")) + + query, args, err := builder.PlaceholderFormat(sq.Dollar).ToSql() + if err != nil { + return "", err + } + + var urn string + if err := st.db.QueryRowxContext(ctx, query, args...).Scan(&urn); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return "", nil + } + return "", err + } + return urn, nil +} + +func (st *Store) runHeartbeat(ctx context.Context, cancel context.CancelFunc, id string) { + defer cancel() + + tick := time.NewTicker(st.refreshInterval) + defer tick.Stop() + + for { + select { + case <-ctx.Done(): + return + + case <-tick.C: + if err := st.extendWaitTime(ctx, st.db, id); err != nil { + return + } + } + } +} + +func (st *Store) extendWaitTime(ctx context.Context, r sq.BaseRunner, urn string) error { + extendTo := sq.Expr("current_timestamp + (? ||' seconds')::interval ", st.extendInterval.Seconds()) + extendQuery := sq.Update(tableResources). + Set("state_next_sync", extendTo). + Where(sq.Eq{"urn": urn}) + + _, err := extendQuery.PlaceholderFormat(sq.Dollar).RunWith(r).ExecContext(ctx) + return err +} + func insertResourceRecord(ctx context.Context, runner sq.BaseRunner, r resource.Resource) (int64, error) { q := sq.Insert(tableResources). Columns("urn", "kind", "project", "name", "created_at", "updated_at", diff --git a/internal/store/postgres/revision_model.go b/internal/store/postgres/revision_model.go index fd222576..abefad2c 100644 --- a/internal/store/postgres/revision_model.go +++ b/internal/store/postgres/revision_model.go @@ -13,14 +13,14 @@ import ( type revisionModel struct { ID int64 `db:"id"` - URN string `db:"urn"` Reason string `db:"reason"` CreatedAt time.Time `db:"created_at"` + ResourceID int64 `db:"resource_id"` SpecConfigs []byte `db:"spec_configs"` } func readRevisionRecord(ctx context.Context, r sqlx.QueryerContext, id int64, into *revisionModel) error { - cols := []string{"id", "urn", "reason", "created_at", "spec_configs"} + cols := []string{"id", "resource_id", "reason", "created_at", "spec_configs"} builder := sq.Select(cols...).From(tableRevisions).Where(sq.Eq{"id": id}) query, args, err := builder.PlaceholderFormat(sq.Dollar).ToSql() diff --git a/internal/store/postgres/revision_store.go b/internal/store/postgres/revision_store.go index 116e9d1d..8d3de50e 100644 --- a/internal/store/postgres/revision_store.go +++ b/internal/store/postgres/revision_store.go @@ -12,103 +12,83 @@ import ( ) func (st *Store) Revisions(ctx context.Context, selector resource.RevisionsSelector) ([]resource.Revision, error) { - q := sq.Select("id"). - From(tableRevisions). - Where(sq.Eq{"urn": selector.URN}) - - rows, err := q.PlaceholderFormat(sq.Dollar).RunWith(st.db).QueryContext(ctx) - if err != nil { - if errors.Is(err, sql.ErrNoRows) { - return nil, nil - } - return nil, err - } - defer rows.Close() - var revs []resource.Revision - for rows.Next() { - var id int64 - if err := rows.Scan(&id); err != nil { - return nil, err - } - - r, err := st.getRevisionByID(ctx, id) + txFn := func(ctx context.Context, tx *sqlx.Tx) error { + resourceID, err := translateURNToID(ctx, tx, selector.URN) if err != nil { - return nil, err - } - revs = append(revs, *r) - } - - return revs, rows.Err() -} - -func (st *Store) getRevisionByID(ctx context.Context, id int64) (*resource.Revision, error) { - var rec revisionModel - var tags []string - deps := map[string]string{} - - readRevisionParts := func(ctx context.Context, tx *sqlx.Tx) error { - if err := readRevisionRecord(ctx, tx, id, &rec); err != nil { return err } - if err := readRevisionTags(ctx, tx, rec.ID, &tags); err != nil { + deps := map[string]string{} + if err := readResourceDeps(ctx, tx, resourceID, deps); err != nil { return err } - resourceID, err := translateURNToID(ctx, tx, rec.URN) + builder := sq.Select("*"). + From(tableRevisions). + Where(sq.Eq{"resource_id": resourceID}). + OrderBy("created_at DESC") + + q, args, err := builder.PlaceholderFormat(sq.Dollar).ToSql() if err != nil { return err } - if err := readResourceDeps(ctx, tx, resourceID, deps); err != nil { + rows, err := tx.QueryxContext(ctx, q, args...) + if err != nil { return err } + defer func() { _ = rows.Close() }() + + for rows.Next() { + var rm revisionModel + if err := rows.StructScan(&rm); err != nil { + return err + } + + var tags []string + if err := readRevisionTags(ctx, tx, rm.ID, &tags); err != nil { + return err + } + + revs = append(revs, resource.Revision{ + ID: rm.ID, + URN: selector.URN, + Reason: rm.Reason, + Labels: tagsToLabelMap(tags), + CreatedAt: rm.CreatedAt, + Spec: resource.Spec{ + Configs: rm.SpecConfigs, + Dependencies: deps, + }, + }) + } return nil } - if txErr := withinTx(ctx, st.db, true, readRevisionParts); txErr != nil { - return nil, txErr + if err := withinTx(ctx, st.db, true, txFn); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, err } - - return &resource.Revision{ - ID: rec.ID, - URN: rec.URN, - Reason: rec.Reason, - Labels: tagsToLabelMap(tags), - CreatedAt: rec.CreatedAt, - Spec: resource.Spec{ - Configs: rec.SpecConfigs, - Dependencies: deps, - }, - }, nil + return revs, nil } func insertRevision(ctx context.Context, tx *sqlx.Tx, rev resource.Revision) error { - revisionID, err := insertRevisionRecord(ctx, tx, rev) - if err != nil { - return err - } - - if err := setRevisionTags(ctx, tx, revisionID, rev.Labels); err != nil { - return err - } - return nil -} - -func insertRevisionRecord(ctx context.Context, runner sq.BaseRunner, r resource.Revision) (int64, error) { q := sq.Insert(tableRevisions). Columns("urn", "reason", "spec_configs"). - Values(r.URN, r.Reason, r.Spec.Configs). + Values(rev.URN, rev.Reason, rev.Spec.Configs). Suffix(`RETURNING "id"`). PlaceholderFormat(sq.Dollar) - var id int64 - if err := q.RunWith(runner).QueryRowContext(ctx).Scan(&id); err != nil { - return 0, err + var revisionID int64 + if err := q.RunWith(tx).QueryRowContext(ctx).Scan(&revisionID); err != nil { + return err } - return id, nil + + return setRevisionTags(ctx, tx, revisionID, rev.Labels) } func setRevisionTags(ctx context.Context, runner sq.BaseRunner, id int64, labels map[string]string) error { diff --git a/internal/store/postgres/schema.sql b/internal/store/postgres/schema.sql index 6f7c6f06..5f7d020d 100644 --- a/internal/store/postgres/schema.sql +++ b/internal/store/postgres/schema.sql @@ -1,3 +1,14 @@ +CREATE TABLE IF NOT EXISTS modules +( + urn TEXT NOT NULL PRIMARY KEY, + name TEXT NOT NULL, + project TEXT NOT NULL, + configs jsonb NOT NULL, + created_at timestamp with time zone NOT NULL DEFAULT current_timestamp, + updated_at timestamp with time zone NOT NULL DEFAULT current_timestamp +); +CREATE INDEX IF NOT EXISTS idx_modules_project ON modules (project); + CREATE TABLE IF NOT EXISTS resources ( id BIGSERIAL NOT NULL PRIMARY KEY, @@ -10,26 +21,28 @@ CREATE TABLE IF NOT EXISTS resources spec_configs bytea NOT NULL, state_status TEXT NOT NULL, state_output bytea NOT NULL, - state_module_data bytea NOT NULL + state_module_data bytea NOT NULL, + state_next_sync timestamp ); - CREATE INDEX IF NOT EXISTS idx_resources_kind ON resources (kind); -CREATE INDEX IF NOT EXISTS idx_resources_name ON resources (name); CREATE INDEX IF NOT EXISTS idx_resources_project ON resources (project); CREATE INDEX IF NOT EXISTS idx_resources_state_status ON resources (state_status); +CREATE INDEX IF NOT EXISTS idx_resources_next_sync ON resources (state_next_sync); CREATE TABLE IF NOT EXISTS resource_dependencies ( resource_id BIGINT NOT NULL REFERENCES resources (id), dependency_key TEXT NOT NULL, depends_on BIGINT NOT NULL REFERENCES resources (id), + UNIQUE (resource_id, dependency_key) ); CREATE TABLE IF NOT EXISTS resource_tags ( - resource_id BIGINT NOT NULL REFERENCES resources (id), tag TEXT NOT NULL, + resource_id BIGINT NOT NULL REFERENCES resources (id), + UNIQUE (resource_id, tag) ); CREATE INDEX IF NOT EXISTS idx_resource_tags_resource_id ON resource_tags (resource_id); @@ -37,33 +50,21 @@ CREATE INDEX IF NOT EXISTS idx_resource_tags_tag ON resource_tags (tag); CREATE TABLE IF NOT EXISTS revisions ( - id BIGSERIAL NOT NULL PRIMARY KEY, - urn TEXT NOT NULL, - spec_configs bytea NOT NULL, - created_at timestamp NOT NULL DEFAULT current_timestamp + id BIGSERIAL NOT NULL PRIMARY KEY, + reason TEXT NOT NULL DEFAULT '', + created_at timestamp NOT NULL DEFAULT current_timestamp, + resource_id BIGINT NOT NULL REFERENCES resources (id), + spec_configs bytea NOT NULL ); - -CREATE INDEX IF NOT EXISTS idx_revisions_urn ON revisions (urn); +CREATE INDEX IF NOT EXISTS idx_revisions_resource_id ON revisions (resource_id); CREATE INDEX IF NOT EXISTS idx_revisions_created_at ON revisions (created_at); CREATE TABLE IF NOT EXISTS revision_tags ( - revision_id BIGINT NOT NULL REFERENCES revisions (id), tag TEXT NOT NULL, + revision_id BIGINT NOT NULL REFERENCES revisions (id), + UNIQUE (revision_id, tag) ); CREATE INDEX IF NOT EXISTS idx_revision_tags_revision_id ON revision_tags (revision_id); -CREATE INDEX IF NOT EXISTS idx_revision_tags_tag ON revision_tags (tag); - --- -CREATE TABLE IF NOT EXISTS modules ( - urn TEXT NOT NULL PRIMARY KEY, - name TEXT NOT NULL, - project TEXT NOT NULL, - configs jsonb NOT NULL, - created_at timestamp with time zone NOT NULL DEFAULT current_timestamp, - updated_at timestamp with time zone NOT NULL DEFAULT current_timestamp -); - -CREATE INDEX IF NOT EXISTS idx_modules_project ON modules (project); -ALTER TABLE revisions ADD COLUMN IF NOT EXISTS reason TEXT DEFAULT '' NOT NULL; +CREATE INDEX IF NOT EXISTS idx_revision_tags_tag ON revision_tags (tag); \ No newline at end of file diff --git a/modules/firehose/config.go b/modules/firehose/config.go index a1a70f11..2f42f4cd 100644 --- a/modules/firehose/config.go +++ b/modules/firehose/config.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "strings" + "time" "github.com/goto/entropy/core/resource" "github.com/goto/entropy/pkg/errors" @@ -27,6 +28,8 @@ var ( ) type Config struct { + Stopped bool `json:"stopped"` + StopTime *time.Time `json:"stop_time,omitempty"` Telegraf map[string]any `json:"telegraf,omitempty"` Replicas int `json:"replicas"` Namespace string `json:"namespace,omitempty"` diff --git a/modules/firehose/driver.go b/modules/firehose/driver.go index 44718560..d74e3da9 100644 --- a/modules/firehose/driver.go +++ b/modules/firehose/driver.go @@ -3,6 +3,8 @@ package firehose import ( "context" "encoding/json" + "strings" + "time" "github.com/goto/entropy/core/module" "github.com/goto/entropy/modules/kubernetes" @@ -18,6 +20,12 @@ const ( stepKafkaReset = "consumer_reset" ) +const ( + chartRepo = "https://goto.github.io/charts/" + chartName = "firehose" + imageRepo = "gotocompany/firehose" +) + var defaultDriverConf = driverConf{ Namespace: "firehose", Telegraf: map[string]any{ @@ -31,6 +39,7 @@ var defaultDriverConf = driverConf{ } type firehoseDriver struct { + timeNow func() time.Time conf driverConf kubeDeploy kubeDeployFn kubeGetPod kubeGetPodFn @@ -67,12 +76,6 @@ type transientData struct { } func (*firehoseDriver) getHelmRelease(conf Config) *helm.ReleaseConfig { - const ( - chartRepo = "https://odpf.github.io/charts/" - chartName = "firehose" - imageRepo = "odpf/firehose" - ) - rc := helm.DefaultReleaseConfig() rc.Name = conf.DeploymentID rc.Repository = chartRepo @@ -95,6 +98,29 @@ func (*firehoseDriver) getHelmRelease(conf Config) *helm.ReleaseConfig { return rc } +func mergeChartValues(cur, newVal *chartValues) (*chartValues, error) { + if newVal == nil { + return cur, nil + } + + merged := chartValues{ + ImageTag: cur.ImageTag, + ChartVersion: cur.ChartVersion, + ImagePullPolicy: cur.ImagePullPolicy, + } + + newTag := strings.TrimSpace(newVal.ImageTag) + if newTag != "" { + if strings.Contains(newTag, ":") && !strings.HasPrefix(newTag, imageRepo) { + return nil, errors.ErrInvalid. + WithMsgf("unknown image repo: '%s', must start with '%s'", newTag, imageRepo) + } + merged.ImageTag = strings.TrimPrefix(newTag, imageRepo) + } + + return &merged, nil +} + func readOutputData(exr module.ExpandedResource) (*Output, error) { var curOut Output if err := json.Unmarshal(exr.Resource.State.Output, &curOut); err != nil { diff --git a/modules/firehose/driver_output_test.go b/modules/firehose/driver_output_test.go index ddcd0b19..68209f46 100644 --- a/modules/firehose/driver_output_test.go +++ b/modules/firehose/driver_output_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -123,7 +124,8 @@ func TestFirehoseDriver_Output(t *testing.T) { for _, tt := range table { t.Run(tt.title, func(t *testing.T) { fd := &firehoseDriver{ - conf: defaultDriverConf, + conf: defaultDriverConf, + timeNow: func() time.Time { return frozenTime }, } if tt.kubeGetPod != nil { fd.kubeGetPod = tt.kubeGetPod(t) diff --git a/modules/firehose/driver_plan.go b/modules/firehose/driver_plan.go index 0d93b8a2..e5329b89 100644 --- a/modules/firehose/driver_plan.go +++ b/modules/firehose/driver_plan.go @@ -3,7 +3,6 @@ package firehose import ( "context" "encoding/json" - "fmt" "github.com/goto/entropy/core/module" "github.com/goto/entropy/core/resource" @@ -11,7 +10,7 @@ import ( "github.com/goto/entropy/pkg/kafka" ) -func (fd *firehoseDriver) Plan(_ context.Context, exr module.ExpandedResource, act module.ActionRequest) (*module.Plan, error) { +func (fd *firehoseDriver) Plan(_ context.Context, exr module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) { switch act.Name { case module.CreateAction: return fd.planCreate(exr, act) @@ -24,7 +23,7 @@ func (fd *firehoseDriver) Plan(_ context.Context, exr module.ExpandedResource, a } } -func (fd *firehoseDriver) planChange(exr module.ExpandedResource, act module.ActionRequest) (*module.Plan, error) { +func (fd *firehoseDriver) planChange(exr module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) { curConf, err := readConfig(exr.Resource, exr.Resource.Spec.Configs) if err != nil { return nil, err @@ -38,9 +37,14 @@ func (fd *firehoseDriver) planChange(exr module.ExpandedResource, act module.Act return nil, err } + chartVals, err := mergeChartValues(curConf.ChartValues, newConf.ChartValues) + if err != nil { + return nil, err + } + // restore configs that are not user-controlled. newConf.DeploymentID = curConf.DeploymentID - newConf.ChartValues = curConf.ChartValues + newConf.ChartValues = chartVals newConf.Namespace = curConf.Namespace newConf.Telegraf = curConf.Telegraf @@ -67,9 +71,12 @@ func (fd *firehoseDriver) planChange(exr module.ExpandedResource, act module.Act case UpgradeAction: // upgrade the chart values to the latest project-level config. + // Note: upgrade/downgrade will happen based on module-level configs. curConf.ChartValues = &fd.conf.ChartValues } + immediately := fd.timeNow() + exr.Resource.Spec.Configs = mustJSON(curConf) exr.Resource.State = resource.State{ Status: resource.StatusPending, @@ -77,24 +84,29 @@ func (fd *firehoseDriver) planChange(exr module.ExpandedResource, act module.Act ModuleData: mustJSON(transientData{ PendingSteps: enqueueSteps, }), + NextSyncAt: &immediately, } - return &module.Plan{ - Reason: fmt.Sprintf("firehose_%s", act.Name), - Resource: exr.Resource, - }, nil + return &exr.Resource, nil } -func (fd *firehoseDriver) planCreate(exr module.ExpandedResource, act module.ActionRequest) (*module.Plan, error) { +func (fd *firehoseDriver) planCreate(exr module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) { conf, err := readConfig(exr.Resource, act.Params) if err != nil { return nil, err } + chartVals, err := mergeChartValues(&fd.conf.ChartValues, conf.ChartValues) + if err != nil { + return nil, err + } + // set project defaults. conf.Telegraf = fd.conf.Telegraf conf.Namespace = fd.conf.Namespace - conf.ChartValues = &fd.conf.ChartValues + conf.ChartValues = chartVals + + immediately := fd.timeNow() exr.Resource.Spec.Configs = mustJSON(conf) exr.Resource.State = resource.State{ @@ -103,37 +115,35 @@ func (fd *firehoseDriver) planCreate(exr module.ExpandedResource, act module.Act Namespace: conf.Namespace, ReleaseName: conf.DeploymentID, }), + NextSyncAt: &immediately, ModuleData: mustJSON(transientData{ PendingSteps: []string{stepReleaseCreate}, }), } - return &module.Plan{ - Reason: "firehose_create", - Resource: exr.Resource, - }, nil + return &exr.Resource, nil } -func (*firehoseDriver) planReset(exr module.ExpandedResource, act module.ActionRequest) (*module.Plan, error) { +func (fd *firehoseDriver) planReset(exr module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) { resetValue, err := kafka.ParseResetParams(act.Params) if err != nil { return nil, err } + immediately := fd.timeNow() + exr.Resource.State = resource.State{ - Status: resource.StatusPending, - Output: exr.Resource.State.Output, + Status: resource.StatusPending, + Output: exr.Resource.State.Output, + NextSyncAt: &immediately, ModuleData: mustJSON(transientData{ ResetOffsetTo: resetValue, PendingSteps: []string{ - stepReleaseStop, + stepReleaseStop, // stop the firehose stepKafkaReset, // reset the consumer group offset value. stepReleaseUpdate, // restart the deployment. }, }), } - return &module.Plan{ - Reason: "firehose_reset", - Resource: exr.Resource, - }, nil + return &exr.Resource, nil } diff --git a/modules/firehose/driver_plan_test.go b/modules/firehose/driver_plan_test.go index 338810f1..40343927 100644 --- a/modules/firehose/driver_plan_test.go +++ b/modules/firehose/driver_plan_test.go @@ -3,6 +3,7 @@ package firehose import ( "context" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -12,6 +13,8 @@ import ( "github.com/goto/entropy/pkg/errors" ) +var frozenTime = time.Unix(1679668743, 0) + func TestFirehoseDriver_Plan(t *testing.T) { t.Parallel() @@ -19,7 +22,7 @@ func TestFirehoseDriver_Plan(t *testing.T) { title string exr module.ExpandedResource act module.ActionRequest - want *module.Plan + want *resource.Resource wantErr error }{ // create action tests @@ -63,46 +66,45 @@ func TestFirehoseDriver_Plan(t *testing.T) { }, }), }, - want: &module.Plan{ - Resource: resource.Resource{ - URN: "urn:goto:entropy:ABCDEFGHIJKLMNOPQRSTUVWXYZ:abcdefghijklmnopqrstuvwxyz", - Kind: "firehose", - Name: "abcdefghijklmnopqrstuvwxyz", - Project: "ABCDEFGHIJKLMNOPQRSTUVWXYZ", - Spec: resource.Spec{ - Configs: mustJSON(map[string]any{ - "replicas": 1, - "namespace": "firehose", - "deployment_id": "firehose-ABCDEFGHIJKLMNOPQRSTUVWXYZ-abcdefghij-9bf099", - "telegraf": map[string]any{ - "enabled": false, - }, - "chart_values": map[string]string{ - "chart_version": "0.1.3", - "image_pull_policy": "IfNotPresent", - "image_tag": "latest", - }, - "env_variables": map[string]string{ - "SINK_TYPE": "LOG", - "INPUT_SCHEMA_PROTO_CLASS": "com.foo.Bar", - "SOURCE_KAFKA_CONSUMER_GROUP_ID": "firehose-ABCDEFGHIJKLMNOPQRSTUVWXYZ-abcdefghij-9bf099-0001", - "SOURCE_KAFKA_BROKERS": "localhost:9092", - "SOURCE_KAFKA_TOPIC": "foo-log", - }, - }), - }, - State: resource.State{ - Status: resource.StatusPending, - Output: mustJSON(Output{ - Namespace: "firehose", - ReleaseName: "firehose-ABCDEFGHIJKLMNOPQRSTUVWXYZ-abcdefghij-9bf099", - }), - ModuleData: mustJSON(transientData{ - PendingSteps: []string{stepReleaseCreate}, - }), - }, + want: &resource.Resource{ + URN: "urn:goto:entropy:ABCDEFGHIJKLMNOPQRSTUVWXYZ:abcdefghijklmnopqrstuvwxyz", + Kind: "firehose", + Name: "abcdefghijklmnopqrstuvwxyz", + Project: "ABCDEFGHIJKLMNOPQRSTUVWXYZ", + Spec: resource.Spec{ + Configs: mustJSON(map[string]any{ + "stopped": false, + "replicas": 1, + "namespace": "firehose", + "deployment_id": "firehose-ABCDEFGHIJKLMNOPQRSTUVWXYZ-abcdefghij-9bf099", + "telegraf": map[string]any{ + "enabled": false, + }, + "chart_values": map[string]string{ + "chart_version": "0.1.3", + "image_pull_policy": "IfNotPresent", + "image_tag": "latest", + }, + "env_variables": map[string]string{ + "SINK_TYPE": "LOG", + "INPUT_SCHEMA_PROTO_CLASS": "com.foo.Bar", + "SOURCE_KAFKA_CONSUMER_GROUP_ID": "firehose-ABCDEFGHIJKLMNOPQRSTUVWXYZ-abcdefghij-9bf099-0001", + "SOURCE_KAFKA_BROKERS": "localhost:9092", + "SOURCE_KAFKA_TOPIC": "foo-log", + }, + }), + }, + State: resource.State{ + Status: resource.StatusPending, + Output: mustJSON(Output{ + Namespace: "firehose", + ReleaseName: "firehose-ABCDEFGHIJKLMNOPQRSTUVWXYZ-abcdefghij-9bf099", + }), + ModuleData: mustJSON(transientData{ + PendingSteps: []string{stepReleaseCreate}, + }), + NextSyncAt: &frozenTime, }, - Reason: "firehose_create", }, wantErr: nil, }, @@ -129,46 +131,45 @@ func TestFirehoseDriver_Plan(t *testing.T) { }, }), }, - want: &module.Plan{ - Resource: resource.Resource{ - URN: "urn:goto:entropy:foo:fh1", - Kind: "firehose", - Name: "fh1", - Project: "foo", - Spec: resource.Spec{ - Configs: mustJSON(map[string]any{ - "replicas": 1, - "namespace": "firehose", - "deployment_id": "firehose-foo-fh1", - "telegraf": map[string]any{ - "enabled": false, - }, - "chart_values": map[string]string{ - "chart_version": "0.1.3", - "image_pull_policy": "IfNotPresent", - "image_tag": "latest", - }, - "env_variables": map[string]string{ - "SINK_TYPE": "LOG", - "INPUT_SCHEMA_PROTO_CLASS": "com.foo.Bar", - "SOURCE_KAFKA_CONSUMER_GROUP_ID": "foo-bar-baz", - "SOURCE_KAFKA_BROKERS": "localhost:9092", - "SOURCE_KAFKA_TOPIC": "foo-log", - }, - }), - }, - State: resource.State{ - Status: resource.StatusPending, - Output: mustJSON(Output{ - Namespace: "firehose", - ReleaseName: "firehose-foo-fh1", - }), - ModuleData: mustJSON(transientData{ - PendingSteps: []string{stepReleaseCreate}, - }), - }, + want: &resource.Resource{ + URN: "urn:goto:entropy:foo:fh1", + Kind: "firehose", + Name: "fh1", + Project: "foo", + Spec: resource.Spec{ + Configs: mustJSON(map[string]any{ + "stopped": false, + "replicas": 1, + "namespace": "firehose", + "deployment_id": "firehose-foo-fh1", + "telegraf": map[string]any{ + "enabled": false, + }, + "chart_values": map[string]string{ + "chart_version": "0.1.3", + "image_pull_policy": "IfNotPresent", + "image_tag": "latest", + }, + "env_variables": map[string]string{ + "SINK_TYPE": "LOG", + "INPUT_SCHEMA_PROTO_CLASS": "com.foo.Bar", + "SOURCE_KAFKA_CONSUMER_GROUP_ID": "foo-bar-baz", + "SOURCE_KAFKA_BROKERS": "localhost:9092", + "SOURCE_KAFKA_TOPIC": "foo-log", + }, + }), + }, + State: resource.State{ + Status: resource.StatusPending, + Output: mustJSON(Output{ + Namespace: "firehose", + ReleaseName: "firehose-foo-fh1", + }), + ModuleData: mustJSON(transientData{ + PendingSteps: []string{stepReleaseCreate}, + }), + NextSyncAt: &frozenTime, }, - Reason: "firehose_create", }, wantErr: nil, }, @@ -216,37 +217,36 @@ func TestFirehoseDriver_Plan(t *testing.T) { }, }), }, - want: &module.Plan{ - Resource: resource.Resource{ - URN: "urn:goto:entropy:foo:fh1", - Kind: "firehose", - Name: "fh1", - Project: "foo", - Spec: resource.Spec{ - Configs: mustJSON(map[string]any{ - "replicas": 10, - "deployment_id": "firehose-deployment-x", - "env_variables": map[string]string{ - "SINK_TYPE": "HTTP", - "INPUT_SCHEMA_PROTO_CLASS": "com.foo.Bar", - "SOURCE_KAFKA_CONSUMER_GROUP_ID": "foo-bar-baz", - "SOURCE_KAFKA_BROKERS": "localhost:9092", - "SOURCE_KAFKA_TOPIC": "foo-log", - }, - }), - }, - State: resource.State{ - Status: resource.StatusPending, - Output: mustJSON(Output{ - Namespace: "foo", - ReleaseName: "bar", - }), - ModuleData: mustJSON(transientData{ - PendingSteps: []string{stepReleaseUpdate}, - }), - }, + want: &resource.Resource{ + URN: "urn:goto:entropy:foo:fh1", + Kind: "firehose", + Name: "fh1", + Project: "foo", + Spec: resource.Spec{ + Configs: mustJSON(map[string]any{ + "stopped": false, + "replicas": 10, + "deployment_id": "firehose-deployment-x", + "env_variables": map[string]string{ + "SINK_TYPE": "HTTP", + "INPUT_SCHEMA_PROTO_CLASS": "com.foo.Bar", + "SOURCE_KAFKA_CONSUMER_GROUP_ID": "foo-bar-baz", + "SOURCE_KAFKA_BROKERS": "localhost:9092", + "SOURCE_KAFKA_TOPIC": "foo-log", + }, + }), + }, + State: resource.State{ + Status: resource.StatusPending, + Output: mustJSON(Output{ + Namespace: "foo", + ReleaseName: "bar", + }), + ModuleData: mustJSON(transientData{ + PendingSteps: []string{stepReleaseUpdate}, + }), + NextSyncAt: &frozenTime, }, - Reason: "firehose_update", }, wantErr: nil, }, @@ -326,41 +326,39 @@ func TestFirehoseDriver_Plan(t *testing.T) { "to": "latest", }), }, - want: &module.Plan{ - Reason: "firehose_reset", - Resource: resource.Resource{ - URN: "urn:goto:entropy:foo:fh1", - Kind: "firehose", - Name: "fh1", - Project: "foo", - Spec: resource.Spec{ - Configs: mustJSON(map[string]any{ - "replicas": 1, - "deployment_id": "firehose-deployment-x", - "env_variables": map[string]string{ - "SINK_TYPE": "LOG", - "INPUT_SCHEMA_PROTO_CLASS": "com.foo.Bar", - "SOURCE_KAFKA_CONSUMER_GROUP_ID": "foo-bar-baz", - "SOURCE_KAFKA_BROKERS": "localhost:9092", - "SOURCE_KAFKA_TOPIC": "foo-log", - }, - }), - }, - State: resource.State{ - Status: resource.StatusPending, - Output: mustJSON(Output{ - Namespace: "foo", - ReleaseName: "bar", - }), - ModuleData: mustJSON(transientData{ - ResetOffsetTo: "latest", - PendingSteps: []string{ - stepReleaseStop, - stepKafkaReset, - stepReleaseUpdate, - }, - }), - }, + want: &resource.Resource{ + URN: "urn:goto:entropy:foo:fh1", + Kind: "firehose", + Name: "fh1", + Project: "foo", + Spec: resource.Spec{ + Configs: mustJSON(map[string]any{ + "replicas": 1, + "deployment_id": "firehose-deployment-x", + "env_variables": map[string]string{ + "SINK_TYPE": "LOG", + "INPUT_SCHEMA_PROTO_CLASS": "com.foo.Bar", + "SOURCE_KAFKA_CONSUMER_GROUP_ID": "foo-bar-baz", + "SOURCE_KAFKA_BROKERS": "localhost:9092", + "SOURCE_KAFKA_TOPIC": "foo-log", + }, + }), + }, + State: resource.State{ + Status: resource.StatusPending, + Output: mustJSON(Output{ + Namespace: "foo", + ReleaseName: "bar", + }), + ModuleData: mustJSON(transientData{ + ResetOffsetTo: "latest", + PendingSteps: []string{ + stepReleaseStop, + stepKafkaReset, + stepReleaseUpdate, + }, + }), + NextSyncAt: &frozenTime, }, }, }, @@ -376,6 +374,7 @@ func TestFirehoseDriver_Plan(t *testing.T) { Project: "foo", Spec: resource.Spec{ Configs: mustJSON(map[string]any{ + "stopped": false, "replicas": 1, "deployment_id": "firehose-deployment-x", "chart_values": map[string]string{ @@ -404,42 +403,41 @@ func TestFirehoseDriver_Plan(t *testing.T) { act: module.ActionRequest{ Name: UpgradeAction, }, - want: &module.Plan{ - Resource: resource.Resource{ - URN: "urn:goto:entropy:foo:fh1", - Kind: "firehose", - Name: "fh1", - Project: "foo", - Spec: resource.Spec{ - Configs: mustJSON(map[string]any{ - "replicas": 1, - "deployment_id": "firehose-deployment-x", - "chart_values": map[string]string{ - "chart_version": "0.1.3", - "image_pull_policy": "IfNotPresent", - "image_tag": "latest", - }, - "env_variables": map[string]string{ - "SINK_TYPE": "LOG", - "INPUT_SCHEMA_PROTO_CLASS": "com.foo.Bar", - "SOURCE_KAFKA_CONSUMER_GROUP_ID": "foo-bar-baz", - "SOURCE_KAFKA_BROKERS": "localhost:9092", - "SOURCE_KAFKA_TOPIC": "foo-log", - }, - }), - }, - State: resource.State{ - Status: resource.StatusPending, - Output: mustJSON(Output{ - Namespace: "foo", - ReleaseName: "bar", - }), - ModuleData: mustJSON(transientData{ - PendingSteps: []string{stepReleaseUpdate}, - }), - }, + want: &resource.Resource{ + URN: "urn:goto:entropy:foo:fh1", + Kind: "firehose", + Name: "fh1", + Project: "foo", + Spec: resource.Spec{ + Configs: mustJSON(map[string]any{ + "stopped": false, + "replicas": 1, + "deployment_id": "firehose-deployment-x", + "chart_values": map[string]string{ + "chart_version": "0.1.3", + "image_pull_policy": "IfNotPresent", + "image_tag": "latest", + }, + "env_variables": map[string]string{ + "SINK_TYPE": "LOG", + "INPUT_SCHEMA_PROTO_CLASS": "com.foo.Bar", + "SOURCE_KAFKA_CONSUMER_GROUP_ID": "foo-bar-baz", + "SOURCE_KAFKA_BROKERS": "localhost:9092", + "SOURCE_KAFKA_TOPIC": "foo-log", + }, + }), + }, + State: resource.State{ + Status: resource.StatusPending, + Output: mustJSON(Output{ + Namespace: "foo", + ReleaseName: "bar", + }), + ModuleData: mustJSON(transientData{ + PendingSteps: []string{stepReleaseUpdate}, + }), + NextSyncAt: &frozenTime, }, - Reason: "firehose_upgrade", }, wantErr: nil, }, @@ -491,7 +489,8 @@ func TestFirehoseDriver_Plan(t *testing.T) { for _, tt := range table { t.Run(tt.title, func(t *testing.T) { dr := &firehoseDriver{ - conf: defaultDriverConf, + conf: defaultDriverConf, + timeNow: func() time.Time { return frozenTime }, } got, err := dr.Plan(context.Background(), tt.exr, tt.act) diff --git a/modules/firehose/driver_sync.go b/modules/firehose/driver_sync.go index 22f586e5..56878bdb 100644 --- a/modules/firehose/driver_sync.go +++ b/modules/firehose/driver_sync.go @@ -31,6 +31,11 @@ func (fd *firehoseDriver) Sync(ctx context.Context, exr module.ExpandedResource) return nil, errors.ErrInternal.WithMsgf("invalid kube state").WithCausef(err.Error()) } + finalState := resource.State{ + Status: resource.StatusPending, + Output: exr.Resource.State.Output, + } + // pickup the next pending step if available. if len(modData.PendingSteps) > 0 { pendingStep := modData.PendingSteps[0] @@ -41,7 +46,7 @@ func (fd *firehoseDriver) Sync(ctx context.Context, exr module.ExpandedResource) // we want to stop the current deployment. we do this by setting // replicas to 0. But this value will not be persisted to DB since // config changes during Sync() are not saved. - if pendingStep == stepReleaseStop { + if pendingStep == stepReleaseStop || conf.Stopped { conf.Replicas = 0 } @@ -58,20 +63,33 @@ func (fd *firehoseDriver) Sync(ctx context.Context, exr module.ExpandedResource) default: return nil, errors.ErrInternal.WithMsgf("unknown step: '%s'", pendingStep) } - } - finalOut, err := fd.refreshOutput(ctx, *conf, *out, kubeOut) - if err != nil { - return nil, err - } + // we have more pending states, so enqueue resource for another sync + // as soon as possible. + immediately := fd.timeNow() + finalState.NextSyncAt = &immediately + finalState.ModuleData = mustJSON(modData) + } else { + // even if the resource is in completed state, we check this time to + // see if the firehose is expected to be stopped by this time. + if conf.StopTime != nil { + if conf.StopTime.Before(fd.timeNow()) { + conf.Replicas = 0 + if err := fd.releaseSync(ctx, false, *conf, kubeOut); err != nil { + return nil, err + } + finalState.NextSyncAt = nil + } else { + finalState.NextSyncAt = conf.StopTime + } + } - finalState := resource.State{ - Status: resource.StatusPending, - Output: finalOut, - ModuleData: mustJSON(modData), - } + finalOut, err := fd.refreshOutput(ctx, *conf, *out, kubeOut) + if err != nil { + return nil, err + } + finalState.Output = finalOut - if len(modData.PendingSteps) == 0 { finalState.Status = resource.StatusCompleted finalState.ModuleData = nil } diff --git a/modules/firehose/driver_sync_test.go b/modules/firehose/driver_sync_test.go index 112000b6..08f6dd09 100644 --- a/modules/firehose/driver_sync_test.go +++ b/modules/firehose/driver_sync_test.go @@ -3,6 +3,7 @@ package firehose import ( "context" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -181,16 +182,12 @@ func TestFirehoseDriver_Sync(t *testing.T) { } }, want: &resource.State{ - Status: resource.StatusCompleted, - Output: mustJSON(Output{ - Pods: []kube.Pod{ - { - Name: "foo-1", - Containers: []string{"firehose"}, - }, - }, + Status: resource.StatusPending, + Output: mustJSON(Output{}), + ModuleData: mustJSON(transientData{ + PendingSteps: []string{}, }), - ModuleData: nil, + NextSyncAt: &frozenTime, }, }, { @@ -224,16 +221,12 @@ func TestFirehoseDriver_Sync(t *testing.T) { } }, want: &resource.State{ - Status: resource.StatusCompleted, - Output: mustJSON(Output{ - Pods: []kube.Pod{ - { - Name: "foo-1", - Containers: []string{"firehose"}, - }, - }, + Status: resource.StatusPending, + Output: mustJSON(Output{}), + ModuleData: mustJSON(transientData{ + PendingSteps: []string{}, }), - ModuleData: nil, + NextSyncAt: &frozenTime, }, }, } @@ -241,7 +234,8 @@ func TestFirehoseDriver_Sync(t *testing.T) { for _, tt := range table { t.Run(tt.title, func(t *testing.T) { fd := &firehoseDriver{ - conf: defaultDriverConf, + conf: defaultDriverConf, + timeNow: func() time.Time { return frozenTime }, } if tt.kubeGetPod != nil { diff --git a/modules/firehose/module.go b/modules/firehose/module.go index 2465e0ec..542b4baf 100644 --- a/modules/firehose/module.go +++ b/modules/firehose/module.go @@ -70,7 +70,8 @@ var Module = module.Descriptor{ } return &firehoseDriver{ - conf: conf, + conf: conf, + timeNow: func() time.Time { return time.Now() }, kubeDeploy: func(_ context.Context, isCreate bool, kubeConf kube.Config, hc helm.ReleaseConfig) error { helmCl := helm.NewClient(&helm.Config{Kubernetes: kubeConf}) diff --git a/modules/kubernetes/driver.go b/modules/kubernetes/driver.go index 346d2717..0e5717ce 100644 --- a/modules/kubernetes/driver.go +++ b/modules/kubernetes/driver.go @@ -14,7 +14,9 @@ import ( type kubeDriver struct{} -func (m *kubeDriver) Plan(ctx context.Context, res module.ExpandedResource, act module.ActionRequest) (*module.Plan, error) { +func (m *kubeDriver) Plan(ctx context.Context, res module.ExpandedResource, + act module.ActionRequest, +) (*resource.Resource, error) { res.Resource.Spec = resource.Spec{ Configs: act.Params, Dependencies: nil, @@ -30,7 +32,7 @@ func (m *kubeDriver) Plan(ctx context.Context, res module.ExpandedResource, act Output: output, } - return &module.Plan{Resource: res.Resource, Reason: "kubernetes cluster details updated"}, nil + return &res.Resource, nil } func (*kubeDriver) Sync(_ context.Context, res module.ExpandedResource) (*resource.State, error) { diff --git a/pkg/worker/mocks/job_queue.go b/pkg/worker/mocks/job_queue.go index 950e4628..577c270c 100644 --- a/pkg/worker/mocks/job_queue.go +++ b/pkg/worker/mocks/job_queue.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. +// Code generated by mockery v2.23.1. DO NOT EDIT. package mocks @@ -61,6 +61,11 @@ func (_c *JobQueue_Dequeue_Call) Return(_a0 error) *JobQueue_Dequeue_Call { return _c } +func (_c *JobQueue_Dequeue_Call) RunAndReturn(run func(context.Context, []string, worker.DequeueFn) error) *JobQueue_Dequeue_Call { + _c.Call.Return(run) + return _c +} + // Enqueue provides a mock function with given fields: ctx, jobs func (_m *JobQueue) Enqueue(ctx context.Context, jobs ...worker.Job) error { _va := make([]interface{}, len(jobs)) @@ -113,6 +118,11 @@ func (_c *JobQueue_Enqueue_Call) Return(_a0 error) *JobQueue_Enqueue_Call { return _c } +func (_c *JobQueue_Enqueue_Call) RunAndReturn(run func(context.Context, ...worker.Job) error) *JobQueue_Enqueue_Call { + _c.Call.Return(run) + return _c +} + type mockConstructorTestingTNewJobQueue interface { mock.TestingT Cleanup(func())