Skip to content

Commit

Permalink
feat: add stop flag support (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
spy16 committed Mar 24, 2023
1 parent f66d8bf commit 859c4b1
Show file tree
Hide file tree
Showing 39 changed files with 1,020 additions and 708 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ tidy:

install: ## install required dependencies
@echo "> installing dependencies"
go install github.com/vektra/mockery/v2@v2.14.0
go install github.com/vektra/mockery/v2@latest
go install google.golang.org/protobuf/cmd/[email protected]
go get -d google.golang.org/protobuf/[email protected]
go get -d google.golang.org/[email protected]
Expand Down
16 changes: 7 additions & 9 deletions cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,25 @@ const configFlag = "config"
// Config contains the application configuration.
type Config struct {
Log logger.LogConfig `mapstructure:"log"`
Worker workerConf `mapstructure:"worker"`
Syncer syncerConf `mapstructure:"syncer"`
Service serveConfig `mapstructure:"service"`
PGConnStr string `mapstructure:"pg_conn_str" default:"postgres://postgres@localhost:5432/entropy?sslmode=disable"`
Telemetry telemetry.Config `mapstructure:"telemetry"`
}

type syncerConf struct {
SyncInterval time.Duration `mapstructure:"sync_interval" default:"1s"`
RefreshInterval time.Duration `mapstructure:"refresh_interval" default:"3s"`
ExtendLockBy time.Duration `mapstructure:"extend_lock_by" default:"5s"`
}

type serveConfig struct {
Host string `mapstructure:"host" default:""`
Port int `mapstructure:"port" default:"8080"`

HTTPAddr string `mapstructure:"http_addr" default:":8081"`
}

type workerConf struct {
QueueName string `mapstructure:"queue_name" default:"entropy_jobs"`
QueueSpec string `mapstructure:"queue_spec" default:"postgres://postgres@localhost:5432/entropy?sslmode=disable"`

Threads int `mapstructure:"threads" default:"1"`
PollInterval time.Duration `mapstructure:"poll_interval" default:"100ms"`
}

func (serveCfg serveConfig) httpAddr() string { return serveCfg.HTTPAddr }

func (serveCfg serveConfig) grpcAddr() string {
Expand Down
2 changes: 1 addition & 1 deletion cli/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ func cmdMigrate() *cobra.Command {
}

func runMigrations(ctx context.Context, zapLog *zap.Logger, cfg Config) error {
store := setupStorage(zapLog, cfg.PGConnStr)
store := setupStorage(zapLog, cfg.PGConnStr, cfg.Syncer)
return store.Migrate(ctx)
}
62 changes: 12 additions & 50 deletions cli/serve.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package cli

import (
"context"
"time"

"github.com/newrelic/go-agent/v3/newrelic"
Expand All @@ -17,8 +16,6 @@ import (
"github.com/goto/entropy/modules/kubernetes"
"github.com/goto/entropy/pkg/logger"
"github.com/goto/entropy/pkg/telemetry"
"github.com/goto/entropy/pkg/worker"
"github.com/goto/entropy/pkg/worker/pgq"
)

func cmdServe() *cobra.Command {
Expand Down Expand Up @@ -52,49 +49,33 @@ func cmdServe() *cobra.Command {
newrelic.ConfigLicense(cfg.Telemetry.NewRelicAPIKey),
)

store := setupStorage(zapLog, cfg.PGConnStr, cfg.Syncer)
moduleService := module.NewService(setupRegistry(zapLog), store)
resourceService := core.New(store, moduleService, time.Now, zapLog)

if migrate {
if migrateErr := runMigrations(cmd.Context(), zapLog, cfg); migrateErr != nil {
return migrateErr
}
}

asyncWorker := setupWorker(zapLog, cfg.Worker)
if spawnWorker {
go func() {
if runErr := asyncWorker.Run(cmd.Context()); runErr != nil {
zapLog.Error("worker exited with error", zap.Error(err))
if runErr := resourceService.RunSyncer(cmd.Context(), cfg.Syncer.SyncInterval); runErr != nil {
zapLog.Error("syncer exited with error", zap.Error(err))
}
}()
}

return runServer(cmd.Context(), nrApp, zapLog, cfg, asyncWorker)
return entropyserver.Serve(cmd.Context(),
cfg.Service.httpAddr(), cfg.Service.grpcAddr(),
nrApp, zapLog, resourceService, moduleService,
)
})

return cmd
}

func runServer(baseCtx context.Context, nrApp *newrelic.Application, zapLog *zap.Logger, cfg Config, asyncWorker *worker.Worker) error {
ctx, cancel := context.WithCancel(baseCtx)
defer cancel()

store := setupStorage(zapLog, cfg.PGConnStr)
moduleService := module.NewService(setupRegistry(zapLog), store)
resourceService := core.New(store, moduleService, asyncWorker, time.Now, zapLog)

if err := asyncWorker.Register(core.JobKindSyncResource, resourceService.HandleSyncJob); err != nil {
return err
}

if err := asyncWorker.Register(core.JobKindScheduledSyncResource, resourceService.HandleSyncJob); err != nil {
return err
}

return entropyserver.Serve(ctx,
cfg.Service.httpAddr(), cfg.Service.grpcAddr(),
nrApp, zapLog, resourceService, moduleService,
)
}

func setupRegistry(logger *zap.Logger) module.Registry {
supported := []module.Descriptor{
kubernetes.Module,
Expand All @@ -113,27 +94,8 @@ func setupRegistry(logger *zap.Logger) module.Registry {
return registry
}

func setupWorker(logger *zap.Logger, conf workerConf) *worker.Worker {
pgQueue, err := pgq.Open(conf.QueueSpec, conf.QueueName)
if err != nil {
logger.Fatal("failed to init postgres job-queue", zap.Error(err))
}

opts := []worker.Option{
worker.WithLogger(logger.Named("worker")),
worker.WithRunConfig(conf.Threads, conf.PollInterval),
}

asyncWorker, err := worker.New(pgQueue, opts...)
if err != nil {
logger.Fatal("failed to init worker instance", zap.Error(err))
}

return asyncWorker
}

func setupStorage(logger *zap.Logger, pgConStr string) *postgres.Store {
store, err := postgres.Open(pgConStr)
func setupStorage(logger *zap.Logger, pgConStr string, syncCfg syncerConf) *postgres.Store {
store, err := postgres.Open(pgConStr, syncCfg.RefreshInterval, syncCfg.ExtendLockBy)
if err != nil {
logger.Fatal("failed to connect to Postgres database",
zap.Error(err), zap.String("conn_str", pgConStr))
Expand Down
9 changes: 4 additions & 5 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type Service struct {
}

type ModuleService interface {
PlanAction(ctx context.Context, res module.ExpandedResource, act module.ActionRequest) (*module.Plan, error)
PlanAction(ctx context.Context, res module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error)
SyncState(ctx context.Context, res module.ExpandedResource) (*resource.State, error)
StreamLogs(ctx context.Context, res module.ExpandedResource, filter map[string]string) (<-chan module.LogChunk, error)
GetOutput(ctx context.Context, res module.ExpandedResource) (json.RawMessage, error)
Expand All @@ -35,7 +35,7 @@ type AsyncWorker interface {
Enqueue(ctx context.Context, jobs ...worker.Job) error
}

func New(repo resource.Store, moduleSvc ModuleService, asyncWorker AsyncWorker, clockFn func() time.Time, lg *zap.Logger) *Service {
func New(repo resource.Store, moduleSvc ModuleService, clockFn func() time.Time, lg *zap.Logger) *Service {
if clockFn == nil {
clockFn = time.Now
}
Expand All @@ -44,19 +44,18 @@ func New(repo resource.Store, moduleSvc ModuleService, asyncWorker AsyncWorker,
logger: lg,
clock: clockFn,
store: repo,
worker: asyncWorker,
moduleSvc: moduleSvc,
}
}

func (s *Service) generateModuleSpec(ctx context.Context, res resource.Resource) (*module.ExpandedResource, error) {
func (svc *Service) generateModuleSpec(ctx context.Context, res resource.Resource) (*module.ExpandedResource, error) {
modSpec := module.ExpandedResource{
Resource: res,
Dependencies: map[string]module.ResolvedDependency{},
}

for key, resURN := range res.Spec.Dependencies {
d, err := s.GetResource(ctx, resURN)
d, err := svc.GetResource(ctx, resURN)
if err != nil {
if errors.Is(err, errors.ErrNotFound) {
return nil, errors.ErrInvalid.
Expand Down
2 changes: 1 addition & 1 deletion core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ var (

func TestNew(t *testing.T) {
t.Parallel()
s := core.New(&mocks.ResourceStore{}, &mocks.ModuleService{}, &mocks.AsyncWorker{}, deadClock, nil)
s := core.New(&mocks.ResourceStore{}, &mocks.ModuleService{}, deadClock, nil)
assert.NotNil(t, s)
}
7 changes: 6 additions & 1 deletion core/mocks/async_worker.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 33 additions & 9 deletions core/mocks/driver.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 859c4b1

Please sign in to comment.