From cfe35a367cd6265d541464e0f7500073e016eba2 Mon Sep 17 00:00:00 2001 From: Stuart Douglas Date: Tue, 10 Dec 2024 15:00:24 +1100 Subject: [PATCH] chore: remove deployments and modules table (#3697) --- backend/controller/console/console.go | 4 +- backend/controller/controller.go | 10 +- backend/controller/dal/dal.go | 149 +++----------- backend/controller/dal/dal_test.go | 21 +- backend/controller/dal/internal/sql/models.go | 12 -- .../controller/dal/internal/sql/querier.go | 12 -- .../controller/dal/internal/sql/queries.sql | 46 ----- .../dal/internal/sql/queries.sql.go | 184 ------------------ backend/controller/dal/notify.go | 2 +- backend/controller/pubsub/internal/dal/dal.go | 13 ++ .../internal/sql/async_queries.sql | 0 backend/controller/pubsub/service.go | 16 +- .../20241210022430_remove_deployments.sql | 6 + .../schema/20241210030238_remove_module.sql | 5 + .../controller/state/controllerstate_test.go | 4 +- backend/controller/state/deployments.go | 46 ++++- internal/eventstream/eventstream.go | 10 +- sqlc.yaml | 9 +- 18 files changed, 121 insertions(+), 428 deletions(-) rename backend/controller/{dal => pubsub}/internal/sql/async_queries.sql (100%) create mode 100644 backend/controller/sql/schema/20241210022430_remove_deployments.sql create mode 100644 backend/controller/sql/schema/20241210030238_remove_module.sql diff --git a/backend/controller/console/console.go b/backend/controller/console/console.go index f649bf9900..3226fde0dc 100644 --- a/backend/controller/console/console.go +++ b/backend/controller/console/console.go @@ -77,7 +77,7 @@ func verbSchemaString(sch *schema.Schema, verb *schema.Verb) (string, error) { } func (c *ConsoleService) GetModules(ctx context.Context, req *connect.Request[pbconsole.GetModulesRequest]) (*connect.Response[pbconsole.GetModulesResponse], error) { - deployments, err := c.dal.GetDeploymentsWithMinReplicas(ctx) + deployments, err := c.dal.GetActiveDeployments() if err != nil { return nil, err } @@ -371,7 +371,7 @@ func (c *ConsoleService) filterDeployments(unfilteredDeployments []dalmodel.Depl } func (c *ConsoleService) sendStreamModulesResp(ctx context.Context, stream *connect.ServerStream[pbconsole.StreamModulesResponse]) error { - unfilteredDeployments, err := c.dal.GetDeploymentsWithMinReplicas(ctx) + unfilteredDeployments, err := c.dal.GetActiveDeployments() if err != nil { return fmt.Errorf("failed to get deployments: %w", err) } diff --git a/backend/controller/controller.go b/backend/controller/controller.go index f50b148cdc..1d07636aee 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -261,7 +261,7 @@ func New( pubSub := pubsub.New(ctx, conn, routingTable, svc.controllerState) svc.pubSub = pubSub - svc.dal = dal.New(ctx, conn, svc.storage, svc.controllerState) + svc.dal = dal.New(svc.storage, svc.controllerState) svc.deploymentLogsSink = newDeploymentLogsSink(ctx) @@ -1010,16 +1010,12 @@ func (s *Service) CreateDeployment(ctx context.Context, req *connect.Request[ftl return nil, fmt.Errorf("invalid module schema: %w", err) } - dkey, err := s.dal.CreateDeployment(ctx, ms.Runtime.Base.Language, module) - if err != nil { - logger.Errorf(err, "Could not create deployment") - return nil, fmt.Errorf("could not create deployment: %w", err) - } + dkey := model.NewDeploymentKey(module.Name) err = s.controllerState.Publish(ctx, &state.DeploymentCreatedEvent{ Module: module.Name, Key: dkey, CreatedAt: time.Now(), - Schema: ms, + Schema: module, Artefacts: artefacts, Language: ms.Runtime.Base.Language, }) diff --git a/backend/controller/dal/dal.go b/backend/controller/dal/dal.go index 046ef2e933..60e2bd4e79 100644 --- a/backend/controller/dal/dal.go +++ b/backend/controller/dal/dal.go @@ -12,7 +12,6 @@ import ( xmaps "golang.org/x/exp/maps" aregistry "github.com/TBD54566975/ftl/backend/controller/artefacts" - dalsql "github.com/TBD54566975/ftl/backend/controller/dal/internal/sql" dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model" "github.com/TBD54566975/ftl/backend/controller/pubsub" "github.com/TBD54566975/ftl/backend/controller/state" @@ -25,22 +24,11 @@ import ( "github.com/TBD54566975/ftl/internal/slices" ) -func New(ctx context.Context, conn libdal.Connection, registry aregistry.Service, state state.ControllerState) *DAL { +func New(registry aregistry.Service, state state.ControllerState) *DAL { var d *DAL - db := dalsql.New(conn) d = &DAL{ - db: db, - registry: registry, - state: state, - Handle: libdal.New(conn, func(h *libdal.Handle[DAL]) *DAL { - return &DAL{ - Handle: h, - db: dalsql.New(h.Connection), - registry: registry, - DeploymentChanges: d.DeploymentChanges, - state: state, - } - }), + registry: registry, + state: state, DeploymentChanges: inprocesspubsub.New[DeploymentNotification](), } @@ -48,9 +36,6 @@ func New(ctx context.Context, conn libdal.Connection, registry aregistry.Service } type DAL struct { - *libdal.Handle[DAL] - db dalsql.Querier - pubsub *pubsub.Service registry aregistry.Service state state.ControllerState @@ -74,66 +59,8 @@ func (d *DAL) GetActiveDeployments() ([]dalmodel.Deployment, error) { }), nil } -func (d *DAL) UpsertModule(ctx context.Context, language, name string) (err error) { - _, err = d.db.UpsertModule(ctx, language, name) - return libdal.TranslatePGError(err) -} - -// CreateDeployment (possibly) creates a new deployment and associates -// previously created artefacts with it. -// -// If an existing deployment with identical artefacts exists, it is returned. -func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module) (key model.DeploymentKey, err error) { - - // Start the parent transaction - tx, err := d.Begin(ctx) - if err != nil { - return model.DeploymentKey{}, fmt.Errorf("could not start transaction: %w", err) - } - defer tx.CommitOrRollback(ctx, &err) - - // TODO(aat): "schema" containing language? - _, err = tx.db.UpsertModule(ctx, language, moduleSchema.Name) - if err != nil { - return model.DeploymentKey{}, fmt.Errorf("failed to upsert module: %w", libdal.TranslatePGError(err)) - } - - // upsert topics - for _, decl := range moduleSchema.Decls { - t, ok := decl.(*schema.Topic) - if !ok { - continue - } - err := tx.db.UpsertTopic(ctx, dalsql.UpsertTopicParams{ - Topic: model.NewTopicKey(moduleSchema.Name, t.Name), - Module: moduleSchema.Name, - Name: t.Name, - EventType: t.Event.String(), - }) - if err != nil { - return model.DeploymentKey{}, fmt.Errorf("could not insert topic: %w", libdal.TranslatePGError(err)) - } - } - - deploymentKey := model.NewDeploymentKey(moduleSchema.Name) - - // Create the deployment - err = tx.db.CreateDeployment(ctx, moduleSchema.Name, moduleSchema, deploymentKey) - if err != nil { - return model.DeploymentKey{}, fmt.Errorf("failed to create deployment: %w", libdal.TranslatePGError(err)) - } - - return deploymentKey, nil -} - // SetDeploymentReplicas activates the given deployment. func (d *DAL) SetDeploymentReplicas(ctx context.Context, key model.DeploymentKey, minReplicas int) (err error) { - // Start the transaction - tx, err := d.Begin(ctx) - if err != nil { - return libdal.TranslatePGError(err) - } - defer tx.CommitOrRollback(ctx, &err) view := d.state.View() deployment, err := view.GetDeployment(key) @@ -141,7 +68,7 @@ func (d *DAL) SetDeploymentReplicas(ctx context.Context, key model.DeploymentKey return fmt.Errorf("could not get deployment: %w", err) } - err = tx.db.SetDeploymentDesiredReplicas(ctx, key, int32(minReplicas)) + err = d.state.Publish(ctx, &state.DeploymentReplicasUpdatedEvent{Key: key, Replicas: minReplicas}) if err != nil { return libdal.TranslatePGError(err) } @@ -171,13 +98,6 @@ var ErrReplaceDeploymentAlreadyActive = errors.New("deployment already active") // // returns ErrReplaceDeploymentAlreadyActive if the new deployment is already active. func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentKey model.DeploymentKey, minReplicas int) (err error) { - // Start the transaction - tx, err := d.Begin(ctx) - if err != nil { - return fmt.Errorf("replace deployment failed to begin transaction for %v: %w", newDeploymentKey, libdal.TranslatePGError(err)) - } - - defer tx.CommitOrRollback(ctx, &err) view := d.state.View() newDeployment, err := view.GetDeployment(newDeploymentKey) if err != nil { @@ -191,12 +111,19 @@ func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentKey model.Depl // If there's an existing deployment, set its desired replicas to 0 var replacedDeploymentKey optional.Option[model.DeploymentKey] - oldDeployment, err := tx.db.GetExistingDeploymentForModule(ctx, newDeployment.Module) - if err == nil { + // TODO: remove all this, it needs to be event driven + var oldDeployment *state.Deployment + for _, dep := range view.ActiveDeployments() { + if dep.Module == newDeployment.Module { + oldDeployment = dep + break + } + } + if oldDeployment != nil { if oldDeployment.Key.String() == newDeploymentKey.String() { return fmt.Errorf("replace deployment failed: deployment already exists from %v to %v: %w", oldDeployment.Key, newDeploymentKey, ErrReplaceDeploymentAlreadyActive) } - err = tx.db.SetDeploymentDesiredReplicas(ctx, newDeploymentKey, int32(minReplicas)) + err = d.state.Publish(ctx, &state.DeploymentReplicasUpdatedEvent{Key: newDeploymentKey, Replicas: minReplicas}) if err != nil { return fmt.Errorf("replace deployment failed to set new deployment replicas from %v to %v: %w", oldDeployment.Key, newDeploymentKey, libdal.TranslatePGError(err)) } @@ -205,12 +132,9 @@ func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentKey model.Depl return libdal.TranslatePGError(err) } replacedDeploymentKey = optional.Some(oldDeployment.Key) - } else if !libdal.IsNotFound(err) { - // any error other than not found - return fmt.Errorf("replace deployment failed to get existing deployment for %v: %w", newDeploymentKey, libdal.TranslatePGError(err)) } else { // Set the desired replicas for the new deployment - err = tx.db.SetDeploymentDesiredReplicas(ctx, newDeploymentKey, int32(minReplicas)) + err = d.state.Publish(ctx, &state.DeploymentReplicasUpdatedEvent{Key: newDeploymentKey, Replicas: minReplicas}) if err != nil { return fmt.Errorf("replace deployment failed to set replicas for %v: %w", newDeploymentKey, libdal.TranslatePGError(err)) } @@ -257,51 +181,30 @@ func (d *DAL) GetActiveSchema(ctx context.Context) (*schema.Schema, error) { // // Note that this is racey as the deployment can be updated by another process. This will go away once we ditch the DB. func (d *DAL) UpdateModuleSchema(ctx context.Context, deployment model.DeploymentKey, module *schema.Module) error { - err := d.db.UpdateDeploymentSchema(ctx, module, deployment) + err := d.state.Publish(ctx, &state.DeploymentSchemaUpdatedEvent{ + Key: deployment, + Schema: module, + }) if err != nil { return fmt.Errorf("failed to update deployment schema: %w", err) } return nil } -func (d *DAL) GetDeploymentsWithMinReplicas(ctx context.Context) ([]dalmodel.Deployment, error) { - rows, err := d.db.GetDeploymentsWithMinReplicas(ctx) - if err != nil { - if libdal.IsNotFound(err) { - return nil, nil - } - return nil, libdal.TranslatePGError(err) - } - return slices.Map(rows, func(in dalsql.GetDeploymentsWithMinReplicasRow) dalmodel.Deployment { - return dalmodel.Deployment{ - Key: in.Deployment.Key, - Module: in.ModuleName, - Language: in.Language, - MinReplicas: int(in.Deployment.MinReplicas), - Schema: in.Deployment.Schema, - CreatedAt: in.Deployment.CreatedAt, - } - }), nil -} - func (d *DAL) GetActiveDeploymentSchemas(ctx context.Context) ([]*schema.Module, error) { - rows, err := d.db.GetActiveDeploymentSchemas(ctx) - if err != nil { - return nil, fmt.Errorf("could not get active deployments: %w", libdal.TranslatePGError(err)) - } - return slices.Map(rows, func(in dalsql.GetActiveDeploymentSchemasRow) *schema.Module { return in.Schema }), nil + view := d.state.View() + rows := view.ActiveDeployments() + return slices.Map(xmaps.Values(rows), func(in *state.Deployment) *schema.Module { return in.Schema }), nil } // GetActiveDeploymentSchemasByDeploymentKey returns the schema for all active deployments by deployment key. // // model.DeploymentKey is not used directly as a key as it's not a valid map key. func (d *DAL) GetActiveDeploymentSchemasByDeploymentKey(ctx context.Context) (map[string]*schema.Module, error) { - rows, err := d.db.GetActiveDeploymentSchemas(ctx) - if err != nil { - return nil, fmt.Errorf("could not get active deployments: %w", libdal.TranslatePGError(err)) - } - return maps.FromSlice(rows, func(in dalsql.GetActiveDeploymentSchemasRow) (string, *schema.Module) { - return in.Key.String(), in.Schema + view := d.state.View() + rows := view.ActiveDeployments() + return maps.MapValues[string, *state.Deployment, *schema.Module](rows, func(dep string, in *state.Deployment) *schema.Module { + return in.Schema }), nil } diff --git a/backend/controller/dal/dal_test.go b/backend/controller/dal/dal_test.go index b0902da4f3..66c5b98330 100644 --- a/backend/controller/dal/dal_test.go +++ b/backend/controller/dal/dal_test.go @@ -11,9 +11,7 @@ import ( "golang.org/x/sync/errgroup" "github.com/TBD54566975/ftl/backend/controller/artefacts" - "github.com/TBD54566975/ftl/backend/controller/sql/sqltest" "github.com/TBD54566975/ftl/backend/controller/state" - schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/schema/v1" "github.com/TBD54566975/ftl/backend/timeline" "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/model" @@ -26,9 +24,8 @@ func TestDAL(t *testing.T) { timelineEndpoint, err := url.Parse("http://localhost:8080") assert.NoError(t, err) ctx = timeline.ContextWithClient(ctx, timeline.NewClient(ctx, timelineEndpoint)) - conn := sqltest.OpenForTesting(ctx, t) - dal := New(ctx, conn, artefacts.NewForTesting(), state.NewInMemoryState()) + dal := New(artefacts.NewForTesting(), state.NewInMemoryState()) deploymentChangesCh := dal.DeploymentChanges.Subscribe(nil) deploymentChanges := []DeploymentNotification{} @@ -40,21 +37,15 @@ func TestDAL(t *testing.T) { return nil }) - t.Run("UpsertModule", func(t *testing.T) { - err = dal.UpsertModule(ctx, "go", "test") - assert.NoError(t, err) - }) - module := &schema.Module{Name: "test"} var deploymentKey model.DeploymentKey t.Run("CreateDeployment", func(t *testing.T) { - deploymentKey, err = dal.CreateDeployment(ctx, "go", module) - assert.NoError(t, err) + deploymentKey = model.NewDeploymentKey(module.Name) err = dal.state.Publish(ctx, &state.DeploymentCreatedEvent{ Key: deploymentKey, CreatedAt: time.Now(), Module: module.Name, - Schema: &schemapb.Module{Name: module.Name}, + Schema: &schema.Module{Name: module.Name}, }) assert.NoError(t, err) }) @@ -67,9 +58,8 @@ func TestDAL(t *testing.T) { func TestCreateArtefactConflict(t *testing.T) { ctx := log.ContextWithNewDefaultLogger(context.Background()) - conn := sqltest.OpenForTesting(ctx, t) - dal := New(ctx, conn, artefacts.NewForTesting(), state.NewInMemoryState()) + dal := New(artefacts.NewForTesting(), state.NewInMemoryState()) idch := make(chan sha256.SHA256, 2) @@ -78,12 +68,9 @@ func TestCreateArtefactConflict(t *testing.T) { wg.Add(2) createContent := func() { defer wg.Done() - tx1, err := dal.Begin(ctx) - assert.NoError(t, err) digest, err := dal.registry.Upload(ctx, artefacts.Artefact{Content: []byte("content")}) assert.NoError(t, err) time.Sleep(time.Second * 2) - err = tx1.Commit(ctx) assert.NoError(t, err) idch <- digest } diff --git a/backend/controller/dal/internal/sql/models.go b/backend/controller/dal/internal/sql/models.go index 7ed21a843e..bcbe0b6e25 100644 --- a/backend/controller/dal/internal/sql/models.go +++ b/backend/controller/dal/internal/sql/models.go @@ -11,7 +11,6 @@ import ( "time" "github.com/TBD54566975/ftl/internal/model" - "github.com/TBD54566975/ftl/internal/schema" "github.com/alecthomas/types/optional" "github.com/sqlc-dev/pqtype" ) @@ -58,17 +57,6 @@ func (ns NullTopicSubscriptionState) Value() (driver.Value, error) { return string(ns.TopicSubscriptionState), nil } -type Deployment struct { - ID int64 - CreatedAt time.Time - ModuleID int64 - Key model.DeploymentKey - Schema *schema.Module - Labels json.RawMessage - MinReplicas int32 - LastActivatedAt time.Time -} - type Topic struct { ID int64 Key model.TopicKey diff --git a/backend/controller/dal/internal/sql/querier.go b/backend/controller/dal/internal/sql/querier.go index b9c550a1c3..2ccff303d4 100644 --- a/backend/controller/dal/internal/sql/querier.go +++ b/backend/controller/dal/internal/sql/querier.go @@ -9,22 +9,16 @@ import ( "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" "github.com/TBD54566975/ftl/internal/model" - "github.com/TBD54566975/ftl/internal/schema" "github.com/alecthomas/types/optional" ) type Querier interface { BeginConsumingTopicEvent(ctx context.Context, subscription model.SubscriptionKey, event model.TopicEventKey) error CompleteEventForSubscription(ctx context.Context, name string, module string) error - CreateDeployment(ctx context.Context, moduleName string, schema *schema.Module, key model.DeploymentKey) error DeleteSubscribers(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriberKey, error) DeleteSubscriptions(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriptionKey, error) - GetActiveDeploymentSchemas(ctx context.Context) ([]GetActiveDeploymentSchemasRow, error) - GetDeploymentsWithMinReplicas(ctx context.Context) ([]GetDeploymentsWithMinReplicasRow, error) - GetExistingDeploymentForModule(ctx context.Context, name string) (GetExistingDeploymentForModuleRow, error) GetNextEventForSubscription(ctx context.Context, consumptionDelay sqltypes.Duration, topic model.TopicKey, cursor optional.Option[model.TopicEventKey]) (GetNextEventForSubscriptionRow, error) GetRandomSubscriber(ctx context.Context, key model.SubscriptionKey) (GetRandomSubscriberRow, error) - GetSchemaForDeployment(ctx context.Context, key model.DeploymentKey) (*schema.Module, error) GetSubscription(ctx context.Context, column1 string, column2 string) (TopicSubscription, error) // Results may not be ready to be scheduled yet due to event consumption delay // Sorting ensures that brand new events (that may not be ready for consumption) @@ -35,13 +29,7 @@ type Querier interface { GetTopicEvent(ctx context.Context, dollar_1 int64) (TopicEvent, error) InsertSubscriber(ctx context.Context, arg InsertSubscriberParams) error PublishEventForTopic(ctx context.Context, arg PublishEventForTopicParams) error - SetDeploymentDesiredReplicas(ctx context.Context, key model.DeploymentKey, minReplicas int32) error SetSubscriptionCursor(ctx context.Context, column1 model.SubscriptionKey, column2 model.TopicEventKey) error - // Note that this can result in a race condition if the deployment is being updated by another process. This will go - // away once we ditch the DB. - // - UpdateDeploymentSchema(ctx context.Context, schema *schema.Module, key model.DeploymentKey) error - UpsertModule(ctx context.Context, language string, name string) (int64, error) UpsertSubscription(ctx context.Context, arg UpsertSubscriptionParams) (UpsertSubscriptionRow, error) UpsertTopic(ctx context.Context, arg UpsertTopicParams) error } diff --git a/backend/controller/dal/internal/sql/queries.sql b/backend/controller/dal/internal/sql/queries.sql index 41f78512bc..e69de29bb2 100644 --- a/backend/controller/dal/internal/sql/queries.sql +++ b/backend/controller/dal/internal/sql/queries.sql @@ -1,46 +0,0 @@ --- name: UpsertModule :one -INSERT INTO modules (language, name) -VALUES ($1, $2) -ON CONFLICT (name) DO UPDATE SET language = $1 -RETURNING id; - --- name: CreateDeployment :exec -INSERT INTO deployments (module_id, "schema", "key") -VALUES ((SELECT id FROM modules WHERE name = @module_name::TEXT LIMIT 1), @schema::module_schema_pb, @key::deployment_key); - --- Note that this can result in a race condition if the deployment is being updated by another process. This will go --- away once we ditch the DB. --- --- name: UpdateDeploymentSchema :exec -UPDATE deployments -SET schema = @schema::module_schema_pb -WHERE key = @key::deployment_key -RETURNING 1; - --- name: GetDeploymentsWithMinReplicas :many -SELECT sqlc.embed(d), m.name AS module_name, m.language -FROM deployments d - INNER JOIN modules m on d.module_id = m.id -WHERE min_replicas > 0 -ORDER BY d.created_at,d.key; - --- name: GetActiveDeploymentSchemas :many -SELECT key, schema FROM deployments WHERE min_replicas > 0; - --- name: GetSchemaForDeployment :one -SELECT schema FROM deployments WHERE key = sqlc.arg('key')::deployment_key; - --- name: SetDeploymentDesiredReplicas :exec -UPDATE deployments -SET min_replicas = $2, last_activated_at = CASE WHEN min_replicas = 0 THEN (NOW() AT TIME ZONE 'utc') ELSE last_activated_at END -WHERE key = sqlc.arg('key')::deployment_key -RETURNING 1; - --- name: GetExistingDeploymentForModule :one -SELECT * -FROM deployments d - INNER JOIN modules m on d.module_id = m.id -WHERE m.name = $1 - AND min_replicas > 0 -LIMIT 1; - diff --git a/backend/controller/dal/internal/sql/queries.sql.go b/backend/controller/dal/internal/sql/queries.sql.go index bda15014dd..0c3f092ffe 100644 --- a/backend/controller/dal/internal/sql/queries.sql.go +++ b/backend/controller/dal/internal/sql/queries.sql.go @@ -8,7 +8,6 @@ package sql import ( "context" "encoding/json" - "time" "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" "github.com/TBD54566975/ftl/internal/model" @@ -46,16 +45,6 @@ func (q *Queries) CompleteEventForSubscription(ctx context.Context, name string, return err } -const createDeployment = `-- name: CreateDeployment :exec -INSERT INTO deployments (module_id, "schema", "key") -VALUES ((SELECT id FROM modules WHERE name = $1::TEXT LIMIT 1), $2::module_schema_pb, $3::deployment_key) -` - -func (q *Queries) CreateDeployment(ctx context.Context, moduleName string, schema *schema.Module, key model.DeploymentKey) error { - _, err := q.db.ExecContext(ctx, createDeployment, moduleName, schema, key) - return err -} - const deleteSubscribers = `-- name: DeleteSubscribers :many DELETE FROM topic_subscribers WHERE deployment_key = $1::deployment_key @@ -114,128 +103,6 @@ func (q *Queries) DeleteSubscriptions(ctx context.Context, deployment model.Depl return items, nil } -const getActiveDeploymentSchemas = `-- name: GetActiveDeploymentSchemas :many -SELECT key, schema FROM deployments WHERE min_replicas > 0 -` - -type GetActiveDeploymentSchemasRow struct { - Key model.DeploymentKey - Schema *schema.Module -} - -func (q *Queries) GetActiveDeploymentSchemas(ctx context.Context) ([]GetActiveDeploymentSchemasRow, error) { - rows, err := q.db.QueryContext(ctx, getActiveDeploymentSchemas) - if err != nil { - return nil, err - } - defer rows.Close() - var items []GetActiveDeploymentSchemasRow - for rows.Next() { - var i GetActiveDeploymentSchemasRow - if err := rows.Scan(&i.Key, &i.Schema); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - -const getDeploymentsWithMinReplicas = `-- name: GetDeploymentsWithMinReplicas :many -SELECT d.id, d.created_at, d.module_id, d.key, d.schema, d.labels, d.min_replicas, d.last_activated_at, m.name AS module_name, m.language -FROM deployments d - INNER JOIN modules m on d.module_id = m.id -WHERE min_replicas > 0 -ORDER BY d.created_at,d.key -` - -type GetDeploymentsWithMinReplicasRow struct { - Deployment Deployment - ModuleName string - Language string -} - -func (q *Queries) GetDeploymentsWithMinReplicas(ctx context.Context) ([]GetDeploymentsWithMinReplicasRow, error) { - rows, err := q.db.QueryContext(ctx, getDeploymentsWithMinReplicas) - if err != nil { - return nil, err - } - defer rows.Close() - var items []GetDeploymentsWithMinReplicasRow - for rows.Next() { - var i GetDeploymentsWithMinReplicasRow - if err := rows.Scan( - &i.Deployment.ID, - &i.Deployment.CreatedAt, - &i.Deployment.ModuleID, - &i.Deployment.Key, - &i.Deployment.Schema, - &i.Deployment.Labels, - &i.Deployment.MinReplicas, - &i.Deployment.LastActivatedAt, - &i.ModuleName, - &i.Language, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - -const getExistingDeploymentForModule = `-- name: GetExistingDeploymentForModule :one -SELECT d.id, created_at, module_id, key, schema, labels, min_replicas, last_activated_at, m.id, language, name -FROM deployments d - INNER JOIN modules m on d.module_id = m.id -WHERE m.name = $1 - AND min_replicas > 0 -LIMIT 1 -` - -type GetExistingDeploymentForModuleRow struct { - ID int64 - CreatedAt time.Time - ModuleID int64 - Key model.DeploymentKey - Schema *schema.Module - Labels json.RawMessage - MinReplicas int32 - LastActivatedAt time.Time - ID_2 int64 - Language string - Name string -} - -func (q *Queries) GetExistingDeploymentForModule(ctx context.Context, name string) (GetExistingDeploymentForModuleRow, error) { - row := q.db.QueryRowContext(ctx, getExistingDeploymentForModule, name) - var i GetExistingDeploymentForModuleRow - err := row.Scan( - &i.ID, - &i.CreatedAt, - &i.ModuleID, - &i.Key, - &i.Schema, - &i.Labels, - &i.MinReplicas, - &i.LastActivatedAt, - &i.ID_2, - &i.Language, - &i.Name, - ) - return i, err -} - const getNextEventForSubscription = `-- name: GetNextEventForSubscription :one WITH cursor AS ( SELECT @@ -322,17 +189,6 @@ func (q *Queries) GetRandomSubscriber(ctx context.Context, key model.Subscriptio return i, err } -const getSchemaForDeployment = `-- name: GetSchemaForDeployment :one -SELECT schema FROM deployments WHERE key = $1::deployment_key -` - -func (q *Queries) GetSchemaForDeployment(ctx context.Context, key model.DeploymentKey) (*schema.Module, error) { - row := q.db.QueryRowContext(ctx, getSchemaForDeployment, key) - var schema *schema.Module - err := row.Scan(&schema) - return schema, err -} - const getSubscription = `-- name: GetSubscription :one SELECT id, key, created_at, topic_id, name, cursor, state, deployment_key, module_name FROM topic_subscriptions @@ -563,18 +419,6 @@ func (q *Queries) PublishEventForTopic(ctx context.Context, arg PublishEventForT return err } -const setDeploymentDesiredReplicas = `-- name: SetDeploymentDesiredReplicas :exec -UPDATE deployments -SET min_replicas = $2, last_activated_at = CASE WHEN min_replicas = 0 THEN (NOW() AT TIME ZONE 'utc') ELSE last_activated_at END -WHERE key = $1::deployment_key -RETURNING 1 -` - -func (q *Queries) SetDeploymentDesiredReplicas(ctx context.Context, key model.DeploymentKey, minReplicas int32) error { - _, err := q.db.ExecContext(ctx, setDeploymentDesiredReplicas, key, minReplicas) - return err -} - const setSubscriptionCursor = `-- name: SetSubscriptionCursor :exec WITH event AS ( SELECT id, created_at, key, topic_id, payload @@ -591,34 +435,6 @@ func (q *Queries) SetSubscriptionCursor(ctx context.Context, column1 model.Subsc return err } -const updateDeploymentSchema = `-- name: UpdateDeploymentSchema :exec -UPDATE deployments -SET schema = $1::module_schema_pb -WHERE key = $2::deployment_key -RETURNING 1 -` - -// Note that this can result in a race condition if the deployment is being updated by another process. This will go -// away once we ditch the DB. -func (q *Queries) UpdateDeploymentSchema(ctx context.Context, schema *schema.Module, key model.DeploymentKey) error { - _, err := q.db.ExecContext(ctx, updateDeploymentSchema, schema, key) - return err -} - -const upsertModule = `-- name: UpsertModule :one -INSERT INTO modules (language, name) -VALUES ($1, $2) -ON CONFLICT (name) DO UPDATE SET language = $1 -RETURNING id -` - -func (q *Queries) UpsertModule(ctx context.Context, language string, name string) (int64, error) { - row := q.db.QueryRowContext(ctx, upsertModule, language, name) - var id int64 - err := row.Scan(&id) - return id, err -} - const upsertSubscription = `-- name: UpsertSubscription :one INSERT INTO topic_subscriptions ( key, diff --git a/backend/controller/dal/notify.go b/backend/controller/dal/notify.go index 28488989f6..9087e918ce 100644 --- a/backend/controller/dal/notify.go +++ b/backend/controller/dal/notify.go @@ -46,7 +46,7 @@ func (d *DAL) PollDeployments(ctx context.Context) { delay := time.Millisecond * 500 currentDeployments := make(map[string]deploymentState) - deployments, err := d.GetDeploymentsWithMinReplicas(ctx) + deployments, err := d.GetActiveDeployments() if err != nil { if errors.Is(ctx.Err(), context.Canceled) { logger.Tracef("Polling stopped: %v", ctx.Err()) diff --git a/backend/controller/pubsub/internal/dal/dal.go b/backend/controller/pubsub/internal/dal/dal.go index 37362c89dd..378eadc894 100644 --- a/backend/controller/pubsub/internal/dal/dal.go +++ b/backend/controller/pubsub/internal/dal/dal.go @@ -352,3 +352,16 @@ func (d *DAL) RemoveSubscriptionsAndSubscribers(ctx context.Context, key model.D return nil } + +func (d *DAL) UpsertTopic(ctx context.Context, topic model.TopicKey, eventType string) error { + err := d.db.UpsertTopic(ctx, dalsql.UpsertTopicParams{ + Topic: topic, + Module: topic.Payload.Module, + Name: topic.Payload.Name, + EventType: eventType, + }) + if err != nil { + return fmt.Errorf("could not upsert topic: %w", libdal.TranslatePGError(err)) + } + return nil +} diff --git a/backend/controller/dal/internal/sql/async_queries.sql b/backend/controller/pubsub/internal/sql/async_queries.sql similarity index 100% rename from backend/controller/dal/internal/sql/async_queries.sql rename to backend/controller/pubsub/internal/sql/async_queries.sql diff --git a/backend/controller/pubsub/service.go b/backend/controller/pubsub/service.go index 463932cf10..d838ed7242 100644 --- a/backend/controller/pubsub/service.go +++ b/backend/controller/pubsub/service.go @@ -505,7 +505,8 @@ func (s *Service) finaliseAsyncCall(ctx context.Context, tx *dal.DAL, call *dal. } func (s *Service) watchEventStream(ctx context.Context) { - sub := s.controllerState.Subscribe(ctx) + sub := s.controllerState.Updates().Subscribe(nil) + defer s.controllerState.Updates().Unsubscribe(sub) logger := log.FromContext(ctx).Scope("pubsub") for { select { @@ -513,6 +514,19 @@ func (s *Service) watchEventStream(ctx context.Context) { return case event := <-sub: switch e := event.(type) { + case *state.DeploymentCreatedEvent: + + // upsert topics + for _, decl := range e.Schema.Decls { + if topic, ok := decl.(*schema.Topic); ok { + tk := model.NewTopicKey(e.Module, topic.GetName()) + err := s.dal.UpsertTopic(ctx, tk, topic.Event.String()) + if err != nil { + logger.Errorf(err, "Failed to upsert topic %s", topic) + } + } + + } case *state.DeploymentActivatedEvent: view := s.controllerState.View() deployment, err := view.GetDeployment(e.Key) diff --git a/backend/controller/sql/schema/20241210022430_remove_deployments.sql b/backend/controller/sql/schema/20241210022430_remove_deployments.sql new file mode 100644 index 0000000000..82b4cd34e2 --- /dev/null +++ b/backend/controller/sql/schema/20241210022430_remove_deployments.sql @@ -0,0 +1,6 @@ +-- migrate:up +DROP TABLE deployments; + + +-- migrate:down + diff --git a/backend/controller/sql/schema/20241210030238_remove_module.sql b/backend/controller/sql/schema/20241210030238_remove_module.sql new file mode 100644 index 0000000000..8b3165c52f --- /dev/null +++ b/backend/controller/sql/schema/20241210030238_remove_module.sql @@ -0,0 +1,5 @@ +-- migrate:up +DROP TABLE modules; + +-- migrate:down + diff --git a/backend/controller/state/controllerstate_test.go b/backend/controller/state/controllerstate_test.go index 7446528138..49b79db42b 100644 --- a/backend/controller/state/controllerstate_test.go +++ b/backend/controller/state/controllerstate_test.go @@ -8,9 +8,9 @@ import ( "github.com/alecthomas/assert/v2" "github.com/TBD54566975/ftl/backend/controller/state" - schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/schema/v1" "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/model" + "github.com/TBD54566975/ftl/internal/schema" ) func TestRunnerState(t *testing.T) { @@ -72,7 +72,7 @@ func TestDeploymentState(t *testing.T) { err := cs.Publish(ctx, &state.DeploymentCreatedEvent{ Key: deploymentKey, CreatedAt: create, - Schema: &schemapb.Module{Name: "test"}, + Schema: &schema.Module{Name: "test"}, }) assert.NoError(t, err) view = cs.View() diff --git a/backend/controller/state/deployments.go b/backend/controller/state/deployments.go index 456e57cbdf..f97109033f 100644 --- a/backend/controller/state/deployments.go +++ b/backend/controller/state/deployments.go @@ -6,7 +6,6 @@ import ( "github.com/alecthomas/types/optional" - schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/schema/v1" "github.com/TBD54566975/ftl/internal/model" "github.com/TBD54566975/ftl/internal/schema" ) @@ -41,12 +40,14 @@ func (r *State) ActiveDeployments() map[string]*Deployment { var _ ControllerEvent = (*DeploymentCreatedEvent)(nil) var _ ControllerEvent = (*DeploymentActivatedEvent)(nil) var _ ControllerEvent = (*DeploymentDeactivatedEvent)(nil) +var _ ControllerEvent = (*DeploymentSchemaUpdatedEvent)(nil) +var _ ControllerEvent = (*DeploymentReplicasUpdatedEvent)(nil) type DeploymentCreatedEvent struct { Key model.DeploymentKey CreatedAt time.Time Module string - Schema *schemapb.Module + Schema *schema.Module Artefacts []*DeploymentArtefact Language string } @@ -55,14 +56,10 @@ func (r *DeploymentCreatedEvent) Handle(t State) (State, error) { if existing := t.deployments[r.Key.String()]; existing != nil { return t, nil } - proto, err := schema.ModuleFromProto(r.Schema) - if err != nil { - return t, fmt.Errorf("failed to parse schema: %w", err) - } n := Deployment{ Key: r.Key, CreatedAt: r.CreatedAt, - Schema: proto, + Schema: r.Schema, Module: r.Module, Artefacts: map[string]*DeploymentArtefact{}, } @@ -77,6 +74,41 @@ func (r *DeploymentCreatedEvent) Handle(t State) (State, error) { return t, nil } +type DeploymentSchemaUpdatedEvent struct { + Key model.DeploymentKey + Schema *schema.Module +} + +func (r *DeploymentSchemaUpdatedEvent) Handle(t State) (State, error) { + existing, ok := t.deployments[r.Key.String()] + if !ok { + return t, fmt.Errorf("deployment %s not found", r.Key) + } + existing.Schema = r.Schema + return t, nil +} + +type DeploymentReplicasUpdatedEvent struct { + Key model.DeploymentKey + Replicas int +} + +func (r *DeploymentReplicasUpdatedEvent) Handle(t State) (State, error) { + existing, ok := t.deployments[r.Key.String()] + if !ok { + return t, fmt.Errorf("deployment %s not found", r.Key) + } + if existing.Schema.Runtime == nil { + existing.Schema.Runtime = &schema.ModuleRuntime{} + } + if existing.Schema.Runtime.Scaling == nil { + existing.Schema.Runtime.Scaling = &schema.ModuleRuntimeScaling{} + } + existing.Schema.Runtime.Scaling.MinReplicas = int32(r.Replicas) + existing.MinReplicas = r.Replicas + return t, nil +} + type DeploymentActivatedEvent struct { Key model.DeploymentKey ActivatedAt time.Time diff --git a/internal/eventstream/eventstream.go b/internal/eventstream/eventstream.go index 0ebcb4e6fc..a50810e29d 100644 --- a/internal/eventstream/eventstream.go +++ b/internal/eventstream/eventstream.go @@ -17,7 +17,7 @@ type EventStream[View any, E Event[View]] interface { View() View - Subscribe(ctx context.Context) <-chan E + Updates() *pubsub.Topic[E] } // StreamView is a view of an event stream that can be subscribed to, without modifying the stream. @@ -52,7 +52,7 @@ func (i *inMemoryEventStream[T, E]) Publish(ctx context.Context, e E) error { defer i.lock.Unlock() logger := log.FromContext(ctx) - logger.Debugf("Publishing event %v", e) + logger.Debugf("Publishing event %T%v", e, e) newView, err := e.Handle(reflect.DeepCopy(i.view)) if err != nil { return fmt.Errorf("failed to handle event: %w", err) @@ -66,8 +66,6 @@ func (i *inMemoryEventStream[T, E]) View() T { return i.view } -func (i *inMemoryEventStream[T, E]) Subscribe(ctx context.Context) <-chan E { - ret := i.topic.Subscribe(nil) - - return ret +func (i *inMemoryEventStream[T, E]) Updates() *pubsub.Topic[E] { + return i.topic } diff --git a/sqlc.yaml b/sqlc.yaml index 027b1a94f9..92e47c9d33 100644 --- a/sqlc.yaml +++ b/sqlc.yaml @@ -3,7 +3,6 @@ sql: - &daldir engine: "postgresql" queries: - - backend/controller/dal/internal/sql/queries.sql - backend/controller/pubsub/internal/sql/queries.sql schema: "backend/controller/sql/schema" database: @@ -131,16 +130,10 @@ sql: - sqlc/db-prepare # - postgresql-query-too-costly # - postgresql-no-seq-scan - - <<: *daldir - queries: "internal/configuration/dal/internal/sql/queries.sql" - gen: - go: - <<: *gengo - out: "internal/configuration/dal/internal/sql" - <<: *daldir queries: - backend/controller/pubsub/internal/sql/queries.sql - - backend/controller/dal/internal/sql/async_queries.sql + - backend/controller/pubsub/internal/sql/async_queries.sql gen: go: <<: *gengo