Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: refactor out separate dal and sql packages for cronjobs #1860

Merged
merged 17 commits into from
Jul 8, 2024
Merged
2 changes: 1 addition & 1 deletion Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ init-db:

# Regenerate SQLC code (requires init-db to be run first)
build-sqlc:
@mk backend/controller/sql/{db.go,models.go,querier.go,queries.sql.go} common/configuration/sql/{db.go,models.go,querier.go,queries.sql.go} : backend/controller/sql/queries.sql common/configuration/sql/queries.sql backend/controller/sql/schema sqlc.yaml -- "just init-db && sqlc generate"
@mk backend/controller/sql/{db.go,models.go,querier.go,queries.sql.go} backend/controller/{cronjobs}/sql/{db.go,models.go,querier.go,queries.sql.go} backend/controller/{cronjobs}/shared/sql/{db.go,models.go,querier.go,queries.sql.go} common/configuration/sql/{db.go,models.go,querier.go,queries.sql.go} : backend/controller/sql/queries.sql backend/controller/{cronjobs}/sql/queries.sql backend/controller/{cronjobs}/shared/sql/queries.sql common/configuration/sql/queries.sql backend/controller/sql/schema sqlc.yaml -- "just init-db && sqlc generate"

# Build the ZIP files that are embedded in the FTL release binaries
build-zips: build-kt-runtime
Expand Down
23 changes: 19 additions & 4 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/alecthomas/types/either"
"github.com/alecthomas/types/optional"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jellydator/ttlcache/v3"
"github.com/jpillora/backoff"
"golang.org/x/exp/maps"
Expand All @@ -34,6 +35,7 @@ import (
"github.com/TBD54566975/ftl"
"github.com/TBD54566975/ftl/backend/controller/admin"
"github.com/TBD54566975/ftl/backend/controller/cronjobs"
cronjobsdal "github.com/TBD54566975/ftl/backend/controller/cronjobs/dal"
"github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/ingress"
"github.com/TBD54566975/ftl/backend/controller/leases"
Expand Down Expand Up @@ -118,7 +120,13 @@ func Start(ctx context.Context, config Config, runnerScaling scaling.RunnerScali
logger.Infof("Web console available at: %s", config.Bind)
}

svc, err := New(ctx, dal, config, runnerScaling)
// Bring up the DB connection and DAL.
deniseli marked this conversation as resolved.
Show resolved Hide resolved
conn, err := pgxpool.New(ctx, config.DSN)
if err != nil {
return fmt.Errorf("failed to bring up DB connection: %w", err)
}

svc, err := New(ctx, conn, config, runnerScaling)
if err != nil {
return err
}
Expand All @@ -128,7 +136,7 @@ func Start(ctx context.Context, config Config, runnerScaling scaling.RunnerScali
sm := cf.SecretsFromContext(ctx)

admin := admin.NewAdminService(cm, sm)
console := NewConsoleService(dal)
console := NewConsoleService(svc.dal)

ingressHandler := http.Handler(svc)
if len(config.AllowOrigins) > 0 {
Expand Down Expand Up @@ -170,6 +178,7 @@ type ControllerListListener interface {
}

type Service struct {
pool *pgxpool.Pool
dal *dal.DAL
key model.ControllerKey
deploymentLogsSink *deploymentLogsSink
Expand All @@ -193,7 +202,7 @@ type Service struct {
asyncCallsLock sync.Mutex
}

func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.RunnerScaling) (*Service, error) {
func New(ctx context.Context, pool *pgxpool.Pool, config Config, runnerScaling scaling.RunnerScaling) (*Service, error) {
key := config.Key
if config.Key.IsZero() {
key = model.NewControllerKey(config.Bind.Hostname(), config.Bind.Port())
Expand All @@ -207,8 +216,14 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.
config.ControllerTimeout = time.Second * 5
}

db, err := dal.New(ctx, pool)
if err != nil {
return nil, fmt.Errorf("failed to create DAL: %w", err)
}

svc := &Service{
tasks: scheduledtask.New(ctx, key, db),
pool: pool,
dal: db,
key: key,
deploymentLogsSink: newDeploymentLogsSink(ctx, db),
Expand All @@ -220,7 +235,7 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.
svc.routes.Store(map[string][]dal.Route{})
svc.schema.Store(&schema.Schema{})

cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, cronjobs.Config{Timeout: config.CronJobTimeout}, db, svc.tasks, svc.callWithRequest)
cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, cronjobs.Config{Timeout: config.CronJobTimeout}, cronjobsdal.New(pool), svc.tasks, svc.callWithRequest)
deniseli marked this conversation as resolved.
Show resolved Hide resolved
svc.cronJobs = cronSvc
svc.controllerListListeners = append(svc.controllerListListeners, cronSvc)

Expand Down
9 changes: 5 additions & 4 deletions backend/controller/cronjobs/cronjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import (
"github.com/jpillora/backoff"
"github.com/serialx/hashring"

"github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/cronjobs/dal"
parentdal "github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/scheduledtask"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
Expand Down Expand Up @@ -61,7 +62,7 @@ func (updatedHashRingEvent) cronJobEvent() {}

type hashRingState struct {
hashRing *hashring.HashRing
controllers []dal.Controller
controllers []parentdal.Controller
idx int
}

Expand Down Expand Up @@ -408,7 +409,7 @@ func (s *Service) nextAttemptForJob(job model.CronJob, state *state, allowsNow b
}

// UpdatedControllerList synchronises the hash ring with the active controllers.
func (s *Service) UpdatedControllerList(ctx context.Context, controllers []dal.Controller) {
func (s *Service) UpdatedControllerList(ctx context.Context, controllers []parentdal.Controller) {
logger := log.FromContext(ctx).Scope("cron")
controllerIdx := -1
for idx, controller := range controllers {
Expand Down Expand Up @@ -436,7 +437,7 @@ func (s *Service) UpdatedControllerList(ctx context.Context, controllers []dal.C
}
}

hashRing := hashring.New(slices.Map(controllers, func(c dal.Controller) string { return c.Key.String() }))
hashRing := hashring.New(slices.Map(controllers, func(c parentdal.Controller) string { return c.Key.String() }))
s.hashRingState.Store(&hashRingState{
hashRing: hashRing,
controllers: controllers,
Expand Down
8 changes: 5 additions & 3 deletions backend/controller/cronjobs/cronjobs_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import (
"testing"
"time"

db "github.com/TBD54566975/ftl/backend/controller/dal"
db "github.com/TBD54566975/ftl/backend/controller/cronjobs/dal"
parentdb "github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
in "github.com/TBD54566975/ftl/integration"
"github.com/TBD54566975/ftl/internal/log"
Expand All @@ -24,7 +25,8 @@ func TestServiceWithRealDal(t *testing.T) {
t.Cleanup(cancel)

conn := sqltest.OpenForTesting(ctx, t)
dal, err := db.New(ctx, conn)
dal := db.New(conn)
parentDAL, err := parentdb.New(ctx, conn)
assert.NoError(t, err)

// Using a real clock because real db queries use db clock
Expand All @@ -36,7 +38,7 @@ func TestServiceWithRealDal(t *testing.T) {
time.Sleep(2*time.Second - time.Duration(clk.Now().Nanosecond())*time.Nanosecond)
}

testServiceWithDal(ctx, t, dal, clk)
testServiceWithDal(ctx, t, dal, parentDAL, clk)
}

func TestCron(t *testing.T) {
Expand Down
6 changes: 5 additions & 1 deletion backend/controller/cronjobs/cronjobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
xslices "golang.org/x/exp/slices"

db "github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/log"
Expand All @@ -35,8 +36,11 @@ func TestServiceWithMockDal(t *testing.T) {
lock: sync.Mutex{},
attemptCountMap: map[string]int{},
}
conn := sqltest.OpenForTesting(ctx, t)
parentDAL, err := db.New(ctx, conn)
assert.NoError(t, err)

testServiceWithDal(ctx, t, mockDal, clk)
testServiceWithDal(ctx, t, mockDal, parentDAL, clk)
}

func TestHashRing(t *testing.T) {
Expand Down
33 changes: 17 additions & 16 deletions backend/controller/cronjobs/cronjobs_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import (
"github.com/benbjohnson/clock"
"github.com/jpillora/backoff"

db "github.com/TBD54566975/ftl/backend/controller/dal"
cronjobsdb "github.com/TBD54566975/ftl/backend/controller/cronjobs/dal"
parentdb "github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/scheduledtask"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/schema"
Expand All @@ -23,9 +24,8 @@ import (
"github.com/TBD54566975/ftl/internal/slices"
)

type ExtendedDAL interface {
DAL
CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []db.DeploymentArtefact, ingressRoutes []db.IngressRoutingEntry, cronJobs []model.CronJob) (key model.DeploymentKey, err error)
type ParentDAL interface {
CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []parentdb.DeploymentArtefact, ingressRoutes []parentdb.IngressRoutingEntry, cronJobs []model.CronJob) (key model.DeploymentKey, err error)
ReplaceDeployment(ctx context.Context, newDeploymentKey model.DeploymentKey, minReplicas int) (err error)
}

Expand All @@ -36,9 +36,10 @@ type mockDAL struct {
attemptCountMap map[string]int
}

var _ ExtendedDAL = &mockDAL{}
var _ ParentDAL = &mockDAL{}
var _ DAL = &mockDAL{}

func (d *mockDAL) CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []db.DeploymentArtefact, ingressRoutes []db.IngressRoutingEntry, cronJobs []model.CronJob) (key model.DeploymentKey, err error) {
func (d *mockDAL) CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []parentdb.DeploymentArtefact, ingressRoutes []parentdb.IngressRoutingEntry, cronJobs []model.CronJob) (key model.DeploymentKey, err error) {
deploymentKey := model.NewDeploymentKey(moduleSchema.Name)
d.jobs = []model.CronJob{}
for _, job := range cronJobs {
Expand Down Expand Up @@ -68,11 +69,11 @@ func (d *mockDAL) indexForJob(job model.CronJob) (int, error) {
return -1, fmt.Errorf("job not found")
}

func (d *mockDAL) StartCronJobs(ctx context.Context, jobs []model.CronJob) (attemptedJobs []db.AttemptedCronJob, err error) {
func (d *mockDAL) StartCronJobs(ctx context.Context, jobs []model.CronJob) (attemptedJobs []cronjobsdb.AttemptedCronJob, err error) {
d.lock.Lock()
defer d.lock.Unlock()

attemptedJobs = []db.AttemptedCronJob{}
attemptedJobs = []cronjobsdb.AttemptedCronJob{}
now := d.clock.Now()

for _, inputJob := range jobs {
Expand All @@ -85,13 +86,13 @@ func (d *mockDAL) StartCronJobs(ctx context.Context, jobs []model.CronJob) (atte
job.State = model.CronJobStateExecuting
job.StartTime = d.clock.Now()
d.jobs[i] = job
attemptedJobs = append(attemptedJobs, db.AttemptedCronJob{
attemptedJobs = append(attemptedJobs, cronjobsdb.AttemptedCronJob{
CronJob: job,
DidStartExecution: true,
HasMinReplicas: true,
})
} else {
attemptedJobs = append(attemptedJobs, db.AttemptedCronJob{
attemptedJobs = append(attemptedJobs, cronjobsdb.AttemptedCronJob{
CronJob: job,
DidStartExecution: false,
HasMinReplicas: true,
Expand Down Expand Up @@ -200,8 +201,8 @@ func newControllers(ctx context.Context, count int, dal DAL, clockFactory func()
for _, c := range controllers {
s := c.cronJobs
go func() {
s.UpdatedControllerList(ctx, slices.Map(controllers, func(ctrl *controller) db.Controller {
return db.Controller{
s.UpdatedControllerList(ctx, slices.Map(controllers, func(ctrl *controller) parentdb.Controller {
return parentdb.Controller{
Key: ctrl.key,
}
}))
Expand All @@ -215,7 +216,7 @@ func newControllers(ctx context.Context, count int, dal DAL, clockFactory func()
}

// should be called when clk is half way between cron job executions (ie on an odd second)
func testServiceWithDal(ctx context.Context, t *testing.T, dal ExtendedDAL, clk clock.Clock) {
func testServiceWithDal(ctx context.Context, t *testing.T, dal DAL, parentDAL ParentDAL, clk clock.Clock) {
t.Helper()

verbCallCount := map[string]int{}
Expand All @@ -224,12 +225,12 @@ func testServiceWithDal(ctx context.Context, t *testing.T, dal ExtendedDAL, clk
moduleName := "initial"
jobsToCreate := newJobs(t, moduleName, "*/2 * * * * * *", clk, 20)

deploymentKey, err := dal.CreateDeployment(ctx, "go", &schema.Module{
deploymentKey, err := parentDAL.CreateDeployment(ctx, "go", &schema.Module{
Name: moduleName,
}, []db.DeploymentArtefact{}, []db.IngressRoutingEntry{}, jobsToCreate)
}, []parentdb.DeploymentArtefact{}, []parentdb.IngressRoutingEntry{}, jobsToCreate)
assert.NoError(t, err)

err = dal.ReplaceDeployment(ctx, deploymentKey, 1)
err = parentDAL.ReplaceDeployment(ctx, deploymentKey, 1)
assert.NoError(t, err)

_ = newControllers(ctx, 5, dal, func() clock.Clock { return clk }, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], o optional.Option[model.RequestKey], s string) (*connect.Response[ftlv1.CallResponse], error) {
Expand Down
102 changes: 102 additions & 0 deletions backend/controller/cronjobs/dal/dal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Package dal provides a data abstraction layer for cron jobs
package dal

import (
"context"
"fmt"
"time"

"github.com/jackc/pgx/v5/pgxpool"

"github.com/TBD54566975/ftl/backend/controller/cronjobs/sql"
dalerrs "github.com/TBD54566975/ftl/backend/dal"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/slices"
)

type DAL struct {
db sql.DBI
}

func New(pool *pgxpool.Pool) *DAL {
return &DAL{db: sql.NewDB(pool)}
}

func cronJobFromRow(row sql.GetCronJobsRow) model.CronJob {
return model.CronJob{
Key: row.Key,
DeploymentKey: row.DeploymentKey,
Verb: schema.Ref{Module: row.Module, Name: row.Verb},
Schedule: row.Schedule,
StartTime: row.StartTime,
NextExecution: row.NextExecution,
State: row.State,
}
}

// GetCronJobs returns all cron jobs for deployments with min replicas > 0
func (d *DAL) GetCronJobs(ctx context.Context) ([]model.CronJob, error) {
rows, err := d.db.GetCronJobs(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get cron jobs: %w", dalerrs.TranslatePGError(err))
}
return slices.Map(rows, cronJobFromRow), nil
}

type AttemptedCronJob struct {
DidStartExecution bool
HasMinReplicas bool
model.CronJob
}

// StartCronJobs returns a full list of results so that the caller can update their list of jobs whether or not they successfully updated the row
func (d *DAL) StartCronJobs(ctx context.Context, jobs []model.CronJob) (attemptedJobs []AttemptedCronJob, err error) {
if len(jobs) == 0 {
return nil, nil
}
rows, err := d.db.StartCronJobs(ctx, slices.Map(jobs, func(job model.CronJob) string { return job.Key.String() }))
if err != nil {
return nil, fmt.Errorf("failed to start cron jobs: %w", dalerrs.TranslatePGError(err))
}

attemptedJobs = []AttemptedCronJob{}
for _, row := range rows {
job := AttemptedCronJob{
CronJob: model.CronJob{
Key: row.Key,
DeploymentKey: row.DeploymentKey,
Verb: schema.Ref{Module: row.Module, Name: row.Verb},
Schedule: row.Schedule,
StartTime: row.StartTime,
NextExecution: row.NextExecution,
State: row.State,
},
DidStartExecution: row.Updated,
HasMinReplicas: row.HasMinReplicas,
}
attemptedJobs = append(attemptedJobs, job)
}
return attemptedJobs, nil
}

// EndCronJob sets the status from executing to idle and updates the next execution time
// Can be called on the successful completion of a job, or if the job failed to execute (error or timeout)
func (d *DAL) EndCronJob(ctx context.Context, job model.CronJob, next time.Time) (model.CronJob, error) {
row, err := d.db.EndCronJob(ctx, next, job.Key, job.StartTime)
if err != nil {
return model.CronJob{}, fmt.Errorf("failed to end cron job: %w", dalerrs.TranslatePGError(err))
}
return cronJobFromRow(sql.GetCronJobsRow(row)), nil
}

// GetStaleCronJobs returns a list of cron jobs that have been executing longer than the duration
func (d *DAL) GetStaleCronJobs(ctx context.Context, duration time.Duration) ([]model.CronJob, error) {
rows, err := d.db.GetStaleCronJobs(ctx, duration)
if err != nil {
return nil, fmt.Errorf("failed to get stale cron jobs: %w", dalerrs.TranslatePGError(err))
}
return slices.Map(rows, func(row sql.GetStaleCronJobsRow) model.CronJob {
return cronJobFromRow(sql.GetCronJobsRow(row))
}), nil
}
Loading
Loading