Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adding job module #74

Merged
merged 19 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions cli/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ package cli
import (
"context"

"github.com/spf13/cobra"
"go.uber.org/zap"

"github.com/goto/entropy/pkg/logger"
"github.com/spf13/cobra"
)

func cmdMigrate() *cobra.Command {
Expand All @@ -24,18 +22,18 @@ func cmdMigrate() *cobra.Command {
return err
}

zapLog, err := logger.New(&cfg.Log)
err = logger.Setup(&cfg.Log)
if err != nil {
return err
}

return runMigrations(cmd.Context(), zapLog, cfg)
return runMigrations(cmd.Context(), cfg)
})

return cmd
}

func runMigrations(ctx context.Context, zapLog *zap.Logger, cfg Config) error {
store := setupStorage(zapLog, cfg.PGConnStr, cfg.Syncer)
func runMigrations(ctx context.Context, cfg Config) error {
store := setupStorage(cfg.PGConnStr, cfg.Syncer)
return store.Migrate(ctx)
}
27 changes: 15 additions & 12 deletions cli/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package cli
import (
"time"

"github.com/goto/entropy/modules/job"

"github.com/newrelic/go-agent/v3/newrelic"
"github.com/spf13/cobra"
"go.uber.org/zap"
Expand Down Expand Up @@ -38,54 +40,55 @@ func cmdServe() *cobra.Command {
return err
}

zapLog, err := logger.New(&cfg.Log)
err = logger.Setup(&cfg.Log)
if err != nil {
return err
}

telemetry.Init(cmd.Context(), cfg.Telemetry, zapLog)
telemetry.Init(cmd.Context(), cfg.Telemetry)
nrApp, err := newrelic.NewApplication(
newrelic.ConfigAppName(cfg.Telemetry.ServiceName),
newrelic.ConfigLicense(cfg.Telemetry.NewRelicAPIKey),
)

store := setupStorage(zapLog, cfg.PGConnStr, cfg.Syncer)
moduleService := module.NewService(setupRegistry(zapLog), store)
resourceService := core.New(store, moduleService, time.Now, zapLog)
store := setupStorage(cfg.PGConnStr, cfg.Syncer)
moduleService := module.NewService(setupRegistry(), store)
resourceService := core.New(store, moduleService, time.Now)

if migrate {
if migrateErr := runMigrations(cmd.Context(), zapLog, cfg); migrateErr != nil {
if migrateErr := runMigrations(cmd.Context(), cfg); migrateErr != nil {
return migrateErr
}
}

if spawnWorker {
go func() {
if runErr := resourceService.RunSyncer(cmd.Context(), cfg.Syncer.SyncInterval); runErr != nil {
zapLog.Error("syncer exited with error", zap.Error(err))
zap.L().Error("syncer exited with error", zap.Error(err))
}
}()
}

return entropyserver.Serve(cmd.Context(),
cfg.Service.httpAddr(), cfg.Service.grpcAddr(),
nrApp, zapLog, resourceService, moduleService,
nrApp, resourceService, moduleService,
)
})

return cmd
}

func setupRegistry(logger *zap.Logger) module.Registry {
func setupRegistry() module.Registry {
supported := []module.Descriptor{
kubernetes.Module,
firehose.Module,
job.Module,
}

registry := &modules.Registry{}
for _, desc := range supported {
if err := registry.Register(desc); err != nil {
logger.Fatal("failed to register module",
zap.L().Fatal("failed to register module",
zap.String("module_kind", desc.Kind),
zap.Error(err),
)
Expand All @@ -94,10 +97,10 @@ func setupRegistry(logger *zap.Logger) module.Registry {
return registry
}

func setupStorage(logger *zap.Logger, pgConStr string, syncCfg syncerConf) *postgres.Store {
func setupStorage(pgConStr string, syncCfg syncerConf) *postgres.Store {
store, err := postgres.Open(pgConStr, syncCfg.RefreshInterval, syncCfg.ExtendLockBy)
if err != nil {
logger.Fatal("failed to connect to Postgres database",
zap.L().Fatal("failed to connect to Postgres database",
zap.Error(err), zap.String("conn_str", pgConStr))
}
return store
Expand Down
6 changes: 1 addition & 5 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,12 @@ import (
"encoding/json"
"time"

"go.uber.org/zap"

"github.com/goto/entropy/core/module"
"github.com/goto/entropy/core/resource"
"github.com/goto/entropy/pkg/errors"
)

type Service struct {
logger *zap.Logger
mabdh marked this conversation as resolved.
Show resolved Hide resolved
clock func() time.Time
store resource.Store
moduleSvc ModuleService
Expand All @@ -30,7 +27,7 @@ type ModuleService interface {
GetOutput(ctx context.Context, res module.ExpandedResource) (json.RawMessage, error)
}

func New(repo resource.Store, moduleSvc ModuleService, clockFn func() time.Time, lg *zap.Logger) *Service {
func New(repo resource.Store, moduleSvc ModuleService, clockFn func() time.Time) *Service {
const (
defaultMaxRetries = 10
defaultSyncBackoff = 5 * time.Second
Expand All @@ -41,7 +38,6 @@ func New(repo resource.Store, moduleSvc ModuleService, clockFn func() time.Time,
}

return &Service{
logger: lg,
clock: clockFn,
store: repo,
syncBackoff: defaultSyncBackoff,
Expand Down
2 changes: 1 addition & 1 deletion core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ var (

func TestNew(t *testing.T) {
t.Parallel()
s := core.New(&mocks.ResourceStore{}, &mocks.ModuleService{}, deadClock, nil)
s := core.New(&mocks.ResourceStore{}, &mocks.ModuleService{}, deadClock)
assert.NotNil(t, s)
}
10 changes: 5 additions & 5 deletions core/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestService_GetResource(t *testing.T) {
GetByURN(mock.Anything, mock.Anything).
Return(nil, errors.ErrNotFound).
Once()
return core.New(repo, nil, nil, nil)
return core.New(repo, nil, nil)
},
urn: "foo:bar:baz",
wantErr: errors.ErrNotFound,
Expand All @@ -52,7 +52,7 @@ func TestService_GetResource(t *testing.T) {
Return(nil, nil).
Once()

return core.New(repo, mod, deadClock, nil)
return core.New(repo, mod, deadClock)
},
urn: "foo:bar:baz",
want: &sampleResource,
Expand Down Expand Up @@ -99,7 +99,7 @@ func TestService_ListResources(t *testing.T) {
List(mock.Anything, mock.Anything).
Return(nil, nil).
Once()
return core.New(repo, nil, deadClock, nil)
return core.New(repo, nil, deadClock)
},
want: nil,
wantErr: nil,
Expand All @@ -113,7 +113,7 @@ func TestService_ListResources(t *testing.T) {
List(mock.Anything, mock.Anything).
Return(nil, errStoreFailure).
Once()
return core.New(repo, nil, deadClock, nil)
return core.New(repo, nil, deadClock)
},
want: nil,
wantErr: errors.ErrInternal,
Expand All @@ -127,7 +127,7 @@ func TestService_ListResources(t *testing.T) {
List(mock.Anything, mock.Anything).
Return([]resource.Resource{sampleResource}, nil).
Once()
return core.New(repo, nil, deadClock, nil)
return core.New(repo, nil, deadClock)
},
want: []resource.Resource{sampleResource},
wantErr: nil,
Expand Down
4 changes: 2 additions & 2 deletions core/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ func (svc *Service) RunSyncer(ctx context.Context, interval time.Duration) error

err := svc.store.SyncOne(ctx, svc.handleSync)
if err != nil {
svc.logger.Warn("SyncOne() failed", zap.Error(err))
zap.L().Warn("SyncOne() failed", zap.Error(err))
}
}
}
}

func (svc *Service) handleSync(ctx context.Context, res resource.Resource) (*resource.Resource, error) {
logEntry := svc.logger.With(
logEntry := zap.L().With(
zap.String("resource_urn", res.URN),
zap.String("resource_status", res.State.Status),
zap.Int("retries", res.State.SyncResult.Retries),
Expand Down
36 changes: 18 additions & 18 deletions core/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestService_CreateResource(t *testing.T) {
PlanAction(mock.Anything, mock.Anything, mock.Anything).
Return(nil, errSample).Once()

return core.New(nil, mod, deadClock, nil)
return core.New(nil, mod, deadClock)
},
res: resource.Resource{
Kind: "mock",
Expand All @@ -59,7 +59,7 @@ func TestService_CreateResource(t *testing.T) {
Return(nil, errors.ErrNotFound).
Once()

return core.New(resourceRepo, mod, deadClock, nil)
return core.New(resourceRepo, mod, deadClock)
},
res: resource.Resource{
Kind: "mock",
Expand Down Expand Up @@ -98,7 +98,7 @@ func TestService_CreateResource(t *testing.T) {
}, nil).
Once()

return core.New(resourceRepo, mod, deadClock, nil)
return core.New(resourceRepo, mod, deadClock)
},
res: resource.Resource{
Kind: "mock",
Expand Down Expand Up @@ -136,7 +136,7 @@ func TestService_CreateResource(t *testing.T) {
}, nil).
Once()

return core.New(resourceRepo, mod, deadClock, nil)
return core.New(resourceRepo, mod, deadClock)
},
res: resource.Resource{
Kind: "mock",
Expand Down Expand Up @@ -170,7 +170,7 @@ func TestService_CreateResource(t *testing.T) {
Return(errSample).
Once()

return core.New(resourceRepo, mod, deadClock, nil)
return core.New(resourceRepo, mod, deadClock)
},
res: resource.Resource{
Kind: "mock",
Expand Down Expand Up @@ -198,7 +198,7 @@ func TestService_CreateResource(t *testing.T) {
Create(mock.Anything, mock.Anything, mock.Anything).
Return(errors.ErrConflict).Once()

return core.New(resourceRepo, mod, deadClock, nil)
return core.New(resourceRepo, mod, deadClock)
},
res: resource.Resource{
Kind: "mock",
Expand Down Expand Up @@ -255,7 +255,7 @@ func TestService_CreateResource(t *testing.T) {
}).
Return(nil)

return core.New(resourceRepo, mod, deadClock, nil)
return core.New(resourceRepo, mod, deadClock)
},
res: resource.Resource{
Kind: "mock",
Expand Down Expand Up @@ -328,7 +328,7 @@ func TestService_UpdateResource(t *testing.T) {
Return(nil, errors.ErrNotFound).
Once()

return core.New(resourceRepo, nil, deadClock, nil)
return core.New(resourceRepo, nil, deadClock)
},
urn: "orn:entropy:mock:project:child",
update: resource.UpdateRequest{
Expand Down Expand Up @@ -357,7 +357,7 @@ func TestService_UpdateResource(t *testing.T) {
Return(&testResource, nil).
Once()

return core.New(resourceRepo, mod, deadClock, nil)
return core.New(resourceRepo, mod, deadClock)
},
urn: "orn:entropy:mock:project:child",
update: resource.UpdateRequest{
Expand Down Expand Up @@ -404,7 +404,7 @@ func TestService_UpdateResource(t *testing.T) {
Return(nil).
Once()

return core.New(resourceRepo, mod, deadClock, nil)
return core.New(resourceRepo, mod, deadClock)
},
urn: "orn:entropy:mock:project:child",
update: resource.UpdateRequest{
Expand Down Expand Up @@ -450,7 +450,7 @@ func TestService_UpdateResource(t *testing.T) {
}).
Twice()

return core.New(resourceRepo, mod, deadClock, nil)
return core.New(resourceRepo, mod, deadClock)
},
urn: "orn:entropy:mock:project:child",
update: resource.UpdateRequest{
Expand Down Expand Up @@ -513,7 +513,7 @@ func TestService_DeleteResource(t *testing.T) {
Return(nil, testErr).
Once()

return core.New(resourceRepo, nil, deadClock, nil)
return core.New(resourceRepo, nil, deadClock)
},
urn: "orn:entropy:mock:foo:bar",
wantErr: testErr,
Expand Down Expand Up @@ -558,7 +558,7 @@ func TestService_DeleteResource(t *testing.T) {
Return(testErr).
Once()

return core.New(resourceRepo, mod, deadClock, nil)
return core.New(resourceRepo, mod, deadClock)
},
urn: "orn:entropy:mock:foo:bar",
wantErr: errors.ErrInternal,
Expand Down Expand Up @@ -603,7 +603,7 @@ func TestService_DeleteResource(t *testing.T) {
Return(nil).
Once()

return core.New(resourceRepo, mod, deadClock, nil)
return core.New(resourceRepo, mod, deadClock)
},
urn: "orn:entropy:mock:foo:bar",
wantErr: nil,
Expand Down Expand Up @@ -653,7 +653,7 @@ func TestService_ApplyAction(t *testing.T) {
Return(nil, errors.ErrNotFound).
Once()

return core.New(resourceRepo, nil, deadClock, nil)
return core.New(resourceRepo, nil, deadClock)
},
urn: "orn:entropy:mock:foo:bar",
action: sampleAction,
Expand All @@ -680,7 +680,7 @@ func TestService_ApplyAction(t *testing.T) {
}, nil).
Once()

return core.New(resourceRepo, mod, deadClock, nil)
return core.New(resourceRepo, mod, deadClock)
},
urn: "orn:entropy:mock:foo:bar",
action: sampleAction,
Expand Down Expand Up @@ -713,7 +713,7 @@ func TestService_ApplyAction(t *testing.T) {
}, nil).
Once()

return core.New(resourceRepo, mod, deadClock, nil)
return core.New(resourceRepo, mod, deadClock)
},
urn: "orn:entropy:mock:foo:bar",
action: sampleAction,
Expand Down Expand Up @@ -756,7 +756,7 @@ func TestService_ApplyAction(t *testing.T) {
Return(nil).
Once()

return core.New(resourceRepo, mod, deadClock, nil)
return core.New(resourceRepo, mod, deadClock)
},
urn: "orn:entropy:mock:foo:bar",
action: sampleAction,
Expand Down
Loading
Loading