Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: refactor out separate dal and sql packages for cronjobs #1860

Merged
merged 17 commits into from
Jul 8, 2024
Merged
2 changes: 1 addition & 1 deletion Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ init-db:

# Regenerate SQLC code (requires init-db to be run first)
build-sqlc:
@mk backend/controller/sql/{db.go,models.go,querier.go,queries.sql.go} common/configuration/sql/{db.go,models.go,querier.go,queries.sql.go} : backend/controller/sql/queries.sql common/configuration/sql/queries.sql backend/controller/sql/schema sqlc.yaml -- "just init-db && sqlc generate"
@mk backend/controller/sql/{db.go,models.go,querier.go,queries.sql.go} backend/controller/{cronjobs}/sql/{db.go,models.go,querier.go,queries.sql.go} common/configuration/sql/{db.go,models.go,querier.go,queries.sql.go} : backend/controller/sql/queries.sql backend/controller/{cronjobs}/sql/queries.sql common/configuration/sql/queries.sql backend/controller/sql/schema sqlc.yaml -- "just init-db && sqlc generate"

# Build the ZIP files that are embedded in the FTL release binaries
build-zips: build-kt-runtime
Expand Down
24 changes: 19 additions & 5 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/alecthomas/types/either"
"github.com/alecthomas/types/optional"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jellydator/ttlcache/v3"
"github.com/jpillora/backoff"
"golang.org/x/exp/maps"
Expand Down Expand Up @@ -97,7 +98,7 @@ func (c *Config) SetDefaults() {
}

// Start the Controller. Blocks until the context is cancelled.
func Start(ctx context.Context, config Config, runnerScaling scaling.RunnerScaling, dal *dal.DAL) error {
func Start(ctx context.Context, config Config, runnerScaling scaling.RunnerScaling) error {
config.SetDefaults()

logger := log.FromContext(ctx)
Expand All @@ -118,7 +119,13 @@ func Start(ctx context.Context, config Config, runnerScaling scaling.RunnerScali
logger.Infof("Web console available at: %s", config.Bind)
}

svc, err := New(ctx, dal, config, runnerScaling)
// Bring up the DB connection and DAL.
deniseli marked this conversation as resolved.
Show resolved Hide resolved
conn, err := pgxpool.New(ctx, config.DSN)
if err != nil {
return fmt.Errorf("failed to bring up DB connection: %w", err)
}

svc, err := New(ctx, conn, config, runnerScaling)
if err != nil {
return err
}
Expand All @@ -128,7 +135,7 @@ func Start(ctx context.Context, config Config, runnerScaling scaling.RunnerScali
sm := cf.SecretsFromContext(ctx)

admin := admin.NewAdminService(cm, sm)
console := NewConsoleService(dal)
console := NewConsoleService(svc.dal)

ingressHandler := http.Handler(svc)
if len(config.AllowOrigins) > 0 {
Expand Down Expand Up @@ -171,6 +178,7 @@ type ControllerListListener interface {
}

type Service struct {
pool *pgxpool.Pool
dal *dal.DAL
key model.ControllerKey
deploymentLogsSink *deploymentLogsSink
Expand All @@ -194,7 +202,7 @@ type Service struct {
asyncCallsLock sync.Mutex
}

func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.RunnerScaling) (*Service, error) {
func New(ctx context.Context, pool *pgxpool.Pool, config Config, runnerScaling scaling.RunnerScaling) (*Service, error) {
key := config.Key
if config.Key.IsZero() {
key = model.NewControllerKey(config.Bind.Hostname(), config.Bind.Port())
Expand All @@ -208,8 +216,14 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.
config.ControllerTimeout = time.Second * 5
}

db, err := dal.New(ctx, pool)
if err != nil {
return nil, fmt.Errorf("failed to create DAL: %w", err)
}

svc := &Service{
tasks: scheduledtask.New(ctx, key, db),
pool: pool,
dal: db,
key: key,
deploymentLogsSink: newDeploymentLogsSink(ctx, db),
Expand All @@ -221,7 +235,7 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.
svc.routes.Store(map[string][]dal.Route{})
svc.schema.Store(&schema.Schema{})

cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, cronjobs.Config{Timeout: config.CronJobTimeout}, db, svc.tasks, svc.callWithRequest)
cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, cronjobs.Config{Timeout: config.CronJobTimeout}, pool, svc.tasks, svc.callWithRequest)
svc.cronJobs = cronSvc
svc.controllerListListeners = append(svc.controllerListListeners, cronSvc)

Expand Down
14 changes: 8 additions & 6 deletions backend/controller/cronjobs/cronjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import (
"github.com/alecthomas/types/optional"
"github.com/alecthomas/types/pubsub"
"github.com/benbjohnson/clock"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jpillora/backoff"
"github.com/serialx/hashring"

"github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/cronjobs/dal"
parentdal "github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/scheduledtask"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
Expand Down Expand Up @@ -61,7 +63,7 @@ func (updatedHashRingEvent) cronJobEvent() {}

type hashRingState struct {
hashRing *hashring.HashRing
controllers []dal.Controller
controllers []parentdal.Controller
idx int
}

Expand Down Expand Up @@ -94,8 +96,8 @@ type Service struct {
hashRingState atomic.Value[*hashRingState]
}

func New(ctx context.Context, key model.ControllerKey, requestSource string, config Config, dal DAL, scheduler Scheduler, call ExecuteCallFunc) *Service {
return NewForTesting(ctx, key, requestSource, config, dal, scheduler, call, clock.New())
func New(ctx context.Context, key model.ControllerKey, requestSource string, config Config, pool *pgxpool.Pool, scheduler Scheduler, call ExecuteCallFunc) *Service {
return NewForTesting(ctx, key, requestSource, config, dal.New(pool), scheduler, call, clock.New())
}

func NewForTesting(ctx context.Context, key model.ControllerKey, requestSource string, config Config, dal DAL, scheduler Scheduler, call ExecuteCallFunc, clock clock.Clock) *Service {
Expand Down Expand Up @@ -408,7 +410,7 @@ func (s *Service) nextAttemptForJob(job model.CronJob, state *state, allowsNow b
}

// UpdatedControllerList synchronises the hash ring with the active controllers.
func (s *Service) UpdatedControllerList(ctx context.Context, controllers []dal.Controller) {
func (s *Service) UpdatedControllerList(ctx context.Context, controllers []parentdal.Controller) {
logger := log.FromContext(ctx).Scope("cron")
controllerIdx := -1
for idx, controller := range controllers {
Expand Down Expand Up @@ -436,7 +438,7 @@ func (s *Service) UpdatedControllerList(ctx context.Context, controllers []dal.C
}
}

hashRing := hashring.New(slices.Map(controllers, func(c dal.Controller) string { return c.Key.String() }))
hashRing := hashring.New(slices.Map(controllers, func(c parentdal.Controller) string { return c.Key.String() }))
s.hashRingState.Store(&hashRingState{
hashRing: hashRing,
controllers: controllers,
Expand Down
8 changes: 5 additions & 3 deletions backend/controller/cronjobs/cronjobs_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import (
"testing"
"time"

db "github.com/TBD54566975/ftl/backend/controller/dal"
db "github.com/TBD54566975/ftl/backend/controller/cronjobs/dal"
parentdb "github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
in "github.com/TBD54566975/ftl/integration"
"github.com/TBD54566975/ftl/internal/log"
Expand All @@ -24,7 +25,8 @@ func TestServiceWithRealDal(t *testing.T) {
t.Cleanup(cancel)

conn := sqltest.OpenForTesting(ctx, t)
dal, err := db.New(ctx, conn)
dal := db.New(conn)
parentDAL, err := parentdb.New(ctx, conn)
assert.NoError(t, err)

// Using a real clock because real db queries use db clock
Expand All @@ -36,7 +38,7 @@ func TestServiceWithRealDal(t *testing.T) {
time.Sleep(2*time.Second - time.Duration(clk.Now().Nanosecond())*time.Nanosecond)
}

testServiceWithDal(ctx, t, dal, clk)
testServiceWithDal(ctx, t, dal, parentDAL, clk)
}

func TestCron(t *testing.T) {
Expand Down
6 changes: 5 additions & 1 deletion backend/controller/cronjobs/cronjobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
xslices "golang.org/x/exp/slices"

db "github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/log"
Expand All @@ -35,8 +36,11 @@ func TestServiceWithMockDal(t *testing.T) {
lock: sync.Mutex{},
attemptCountMap: map[string]int{},
}
conn := sqltest.OpenForTesting(ctx, t)
parentDAL, err := db.New(ctx, conn)
assert.NoError(t, err)

testServiceWithDal(ctx, t, mockDal, clk)
testServiceWithDal(ctx, t, mockDal, parentDAL, clk)
}

func TestHashRing(t *testing.T) {
Expand Down
33 changes: 17 additions & 16 deletions backend/controller/cronjobs/cronjobs_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import (
"github.com/benbjohnson/clock"
"github.com/jpillora/backoff"

db "github.com/TBD54566975/ftl/backend/controller/dal"
cronjobsdb "github.com/TBD54566975/ftl/backend/controller/cronjobs/dal"
parentdb "github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/scheduledtask"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/schema"
Expand All @@ -23,9 +24,8 @@ import (
"github.com/TBD54566975/ftl/internal/slices"
)

type ExtendedDAL interface {
DAL
CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []db.DeploymentArtefact, ingressRoutes []db.IngressRoutingEntry, cronJobs []model.CronJob) (key model.DeploymentKey, err error)
type ParentDAL interface {
CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []parentdb.DeploymentArtefact, ingressRoutes []parentdb.IngressRoutingEntry, cronJobs []model.CronJob) (key model.DeploymentKey, err error)
ReplaceDeployment(ctx context.Context, newDeploymentKey model.DeploymentKey, minReplicas int) (err error)
}

Expand All @@ -36,9 +36,10 @@ type mockDAL struct {
attemptCountMap map[string]int
}

var _ ExtendedDAL = &mockDAL{}
var _ ParentDAL = &mockDAL{}
var _ DAL = &mockDAL{}

func (d *mockDAL) CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []db.DeploymentArtefact, ingressRoutes []db.IngressRoutingEntry, cronJobs []model.CronJob) (key model.DeploymentKey, err error) {
func (d *mockDAL) CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []parentdb.DeploymentArtefact, ingressRoutes []parentdb.IngressRoutingEntry, cronJobs []model.CronJob) (key model.DeploymentKey, err error) {
deploymentKey := model.NewDeploymentKey(moduleSchema.Name)
d.jobs = []model.CronJob{}
for _, job := range cronJobs {
Expand Down Expand Up @@ -68,11 +69,11 @@ func (d *mockDAL) indexForJob(job model.CronJob) (int, error) {
return -1, fmt.Errorf("job not found")
}

func (d *mockDAL) StartCronJobs(ctx context.Context, jobs []model.CronJob) (attemptedJobs []db.AttemptedCronJob, err error) {
func (d *mockDAL) StartCronJobs(ctx context.Context, jobs []model.CronJob) (attemptedJobs []cronjobsdb.AttemptedCronJob, err error) {
d.lock.Lock()
defer d.lock.Unlock()

attemptedJobs = []db.AttemptedCronJob{}
attemptedJobs = []cronjobsdb.AttemptedCronJob{}
now := d.clock.Now()

for _, inputJob := range jobs {
Expand All @@ -85,13 +86,13 @@ func (d *mockDAL) StartCronJobs(ctx context.Context, jobs []model.CronJob) (atte
job.State = model.CronJobStateExecuting
job.StartTime = d.clock.Now()
d.jobs[i] = job
attemptedJobs = append(attemptedJobs, db.AttemptedCronJob{
attemptedJobs = append(attemptedJobs, cronjobsdb.AttemptedCronJob{
CronJob: job,
DidStartExecution: true,
HasMinReplicas: true,
})
} else {
attemptedJobs = append(attemptedJobs, db.AttemptedCronJob{
attemptedJobs = append(attemptedJobs, cronjobsdb.AttemptedCronJob{
CronJob: job,
DidStartExecution: false,
HasMinReplicas: true,
Expand Down Expand Up @@ -200,8 +201,8 @@ func newControllers(ctx context.Context, count int, dal DAL, clockFactory func()
for _, c := range controllers {
s := c.cronJobs
go func() {
s.UpdatedControllerList(ctx, slices.Map(controllers, func(ctrl *controller) db.Controller {
return db.Controller{
s.UpdatedControllerList(ctx, slices.Map(controllers, func(ctrl *controller) parentdb.Controller {
return parentdb.Controller{
Key: ctrl.key,
}
}))
Expand All @@ -215,7 +216,7 @@ func newControllers(ctx context.Context, count int, dal DAL, clockFactory func()
}

// should be called when clk is half way between cron job executions (ie on an odd second)
func testServiceWithDal(ctx context.Context, t *testing.T, dal ExtendedDAL, clk clock.Clock) {
func testServiceWithDal(ctx context.Context, t *testing.T, dal DAL, parentDAL ParentDAL, clk clock.Clock) {
t.Helper()

verbCallCount := map[string]int{}
Expand All @@ -224,12 +225,12 @@ func testServiceWithDal(ctx context.Context, t *testing.T, dal ExtendedDAL, clk
moduleName := "initial"
jobsToCreate := newJobs(t, moduleName, "*/2 * * * * * *", clk, 20)

deploymentKey, err := dal.CreateDeployment(ctx, "go", &schema.Module{
deploymentKey, err := parentDAL.CreateDeployment(ctx, "go", &schema.Module{
Name: moduleName,
}, []db.DeploymentArtefact{}, []db.IngressRoutingEntry{}, jobsToCreate)
}, []parentdb.DeploymentArtefact{}, []parentdb.IngressRoutingEntry{}, jobsToCreate)
assert.NoError(t, err)

err = dal.ReplaceDeployment(ctx, deploymentKey, 1)
err = parentDAL.ReplaceDeployment(ctx, deploymentKey, 1)
assert.NoError(t, err)

_ = newControllers(ctx, 5, dal, func() clock.Clock { return clk }, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], o optional.Option[model.RequestKey], s string) (*connect.Response[ftlv1.CallResponse], error) {
Expand Down
Loading
Loading