Skip to content

Commit

Permalink
chore: remove deployments and modules table (#3697)
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas authored Dec 10, 2024
1 parent 3dc4fcc commit cfe35a3
Show file tree
Hide file tree
Showing 18 changed files with 121 additions and 428 deletions.
4 changes: 2 additions & 2 deletions backend/controller/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func verbSchemaString(sch *schema.Schema, verb *schema.Verb) (string, error) {
}

func (c *ConsoleService) GetModules(ctx context.Context, req *connect.Request[pbconsole.GetModulesRequest]) (*connect.Response[pbconsole.GetModulesResponse], error) {
deployments, err := c.dal.GetDeploymentsWithMinReplicas(ctx)
deployments, err := c.dal.GetActiveDeployments()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -371,7 +371,7 @@ func (c *ConsoleService) filterDeployments(unfilteredDeployments []dalmodel.Depl
}

func (c *ConsoleService) sendStreamModulesResp(ctx context.Context, stream *connect.ServerStream[pbconsole.StreamModulesResponse]) error {
unfilteredDeployments, err := c.dal.GetDeploymentsWithMinReplicas(ctx)
unfilteredDeployments, err := c.dal.GetActiveDeployments()
if err != nil {
return fmt.Errorf("failed to get deployments: %w", err)
}
Expand Down
10 changes: 3 additions & 7 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func New(

pubSub := pubsub.New(ctx, conn, routingTable, svc.controllerState)
svc.pubSub = pubSub
svc.dal = dal.New(ctx, conn, svc.storage, svc.controllerState)
svc.dal = dal.New(svc.storage, svc.controllerState)

svc.deploymentLogsSink = newDeploymentLogsSink(ctx)

Expand Down Expand Up @@ -1010,16 +1010,12 @@ func (s *Service) CreateDeployment(ctx context.Context, req *connect.Request[ftl
return nil, fmt.Errorf("invalid module schema: %w", err)
}

dkey, err := s.dal.CreateDeployment(ctx, ms.Runtime.Base.Language, module)
if err != nil {
logger.Errorf(err, "Could not create deployment")
return nil, fmt.Errorf("could not create deployment: %w", err)
}
dkey := model.NewDeploymentKey(module.Name)
err = s.controllerState.Publish(ctx, &state.DeploymentCreatedEvent{
Module: module.Name,
Key: dkey,
CreatedAt: time.Now(),
Schema: ms,
Schema: module,
Artefacts: artefacts,
Language: ms.Runtime.Base.Language,
})
Expand Down
149 changes: 26 additions & 123 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
xmaps "golang.org/x/exp/maps"

aregistry "github.com/TBD54566975/ftl/backend/controller/artefacts"
dalsql "github.com/TBD54566975/ftl/backend/controller/dal/internal/sql"
dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model"
"github.com/TBD54566975/ftl/backend/controller/pubsub"
"github.com/TBD54566975/ftl/backend/controller/state"
Expand All @@ -25,32 +24,18 @@ import (
"github.com/TBD54566975/ftl/internal/slices"
)

func New(ctx context.Context, conn libdal.Connection, registry aregistry.Service, state state.ControllerState) *DAL {
func New(registry aregistry.Service, state state.ControllerState) *DAL {
var d *DAL
db := dalsql.New(conn)
d = &DAL{
db: db,
registry: registry,
state: state,
Handle: libdal.New(conn, func(h *libdal.Handle[DAL]) *DAL {
return &DAL{
Handle: h,
db: dalsql.New(h.Connection),
registry: registry,
DeploymentChanges: d.DeploymentChanges,
state: state,
}
}),
registry: registry,
state: state,
DeploymentChanges: inprocesspubsub.New[DeploymentNotification](),
}

return d
}

type DAL struct {
*libdal.Handle[DAL]
db dalsql.Querier

pubsub *pubsub.Service
registry aregistry.Service
state state.ControllerState
Expand All @@ -74,74 +59,16 @@ func (d *DAL) GetActiveDeployments() ([]dalmodel.Deployment, error) {
}), nil
}

func (d *DAL) UpsertModule(ctx context.Context, language, name string) (err error) {
_, err = d.db.UpsertModule(ctx, language, name)
return libdal.TranslatePGError(err)
}

// CreateDeployment (possibly) creates a new deployment and associates
// previously created artefacts with it.
//
// If an existing deployment with identical artefacts exists, it is returned.
func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module) (key model.DeploymentKey, err error) {

// Start the parent transaction
tx, err := d.Begin(ctx)
if err != nil {
return model.DeploymentKey{}, fmt.Errorf("could not start transaction: %w", err)
}
defer tx.CommitOrRollback(ctx, &err)

// TODO(aat): "schema" containing language?
_, err = tx.db.UpsertModule(ctx, language, moduleSchema.Name)
if err != nil {
return model.DeploymentKey{}, fmt.Errorf("failed to upsert module: %w", libdal.TranslatePGError(err))
}

// upsert topics
for _, decl := range moduleSchema.Decls {
t, ok := decl.(*schema.Topic)
if !ok {
continue
}
err := tx.db.UpsertTopic(ctx, dalsql.UpsertTopicParams{
Topic: model.NewTopicKey(moduleSchema.Name, t.Name),
Module: moduleSchema.Name,
Name: t.Name,
EventType: t.Event.String(),
})
if err != nil {
return model.DeploymentKey{}, fmt.Errorf("could not insert topic: %w", libdal.TranslatePGError(err))
}
}

deploymentKey := model.NewDeploymentKey(moduleSchema.Name)

// Create the deployment
err = tx.db.CreateDeployment(ctx, moduleSchema.Name, moduleSchema, deploymentKey)
if err != nil {
return model.DeploymentKey{}, fmt.Errorf("failed to create deployment: %w", libdal.TranslatePGError(err))
}

return deploymentKey, nil
}

// SetDeploymentReplicas activates the given deployment.
func (d *DAL) SetDeploymentReplicas(ctx context.Context, key model.DeploymentKey, minReplicas int) (err error) {
// Start the transaction
tx, err := d.Begin(ctx)
if err != nil {
return libdal.TranslatePGError(err)
}
defer tx.CommitOrRollback(ctx, &err)

view := d.state.View()
deployment, err := view.GetDeployment(key)
if err != nil {
return fmt.Errorf("could not get deployment: %w", err)
}

err = tx.db.SetDeploymentDesiredReplicas(ctx, key, int32(minReplicas))
err = d.state.Publish(ctx, &state.DeploymentReplicasUpdatedEvent{Key: key, Replicas: minReplicas})
if err != nil {
return libdal.TranslatePGError(err)
}
Expand Down Expand Up @@ -171,13 +98,6 @@ var ErrReplaceDeploymentAlreadyActive = errors.New("deployment already active")
//
// returns ErrReplaceDeploymentAlreadyActive if the new deployment is already active.
func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentKey model.DeploymentKey, minReplicas int) (err error) {
// Start the transaction
tx, err := d.Begin(ctx)
if err != nil {
return fmt.Errorf("replace deployment failed to begin transaction for %v: %w", newDeploymentKey, libdal.TranslatePGError(err))
}

defer tx.CommitOrRollback(ctx, &err)
view := d.state.View()
newDeployment, err := view.GetDeployment(newDeploymentKey)
if err != nil {
Expand All @@ -191,12 +111,19 @@ func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentKey model.Depl

// If there's an existing deployment, set its desired replicas to 0
var replacedDeploymentKey optional.Option[model.DeploymentKey]
oldDeployment, err := tx.db.GetExistingDeploymentForModule(ctx, newDeployment.Module)
if err == nil {
// TODO: remove all this, it needs to be event driven
var oldDeployment *state.Deployment
for _, dep := range view.ActiveDeployments() {
if dep.Module == newDeployment.Module {
oldDeployment = dep
break
}
}
if oldDeployment != nil {
if oldDeployment.Key.String() == newDeploymentKey.String() {
return fmt.Errorf("replace deployment failed: deployment already exists from %v to %v: %w", oldDeployment.Key, newDeploymentKey, ErrReplaceDeploymentAlreadyActive)
}
err = tx.db.SetDeploymentDesiredReplicas(ctx, newDeploymentKey, int32(minReplicas))
err = d.state.Publish(ctx, &state.DeploymentReplicasUpdatedEvent{Key: newDeploymentKey, Replicas: minReplicas})
if err != nil {
return fmt.Errorf("replace deployment failed to set new deployment replicas from %v to %v: %w", oldDeployment.Key, newDeploymentKey, libdal.TranslatePGError(err))
}
Expand All @@ -205,12 +132,9 @@ func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentKey model.Depl
return libdal.TranslatePGError(err)
}
replacedDeploymentKey = optional.Some(oldDeployment.Key)
} else if !libdal.IsNotFound(err) {
// any error other than not found
return fmt.Errorf("replace deployment failed to get existing deployment for %v: %w", newDeploymentKey, libdal.TranslatePGError(err))
} else {
// Set the desired replicas for the new deployment
err = tx.db.SetDeploymentDesiredReplicas(ctx, newDeploymentKey, int32(minReplicas))
err = d.state.Publish(ctx, &state.DeploymentReplicasUpdatedEvent{Key: newDeploymentKey, Replicas: minReplicas})
if err != nil {
return fmt.Errorf("replace deployment failed to set replicas for %v: %w", newDeploymentKey, libdal.TranslatePGError(err))
}
Expand Down Expand Up @@ -257,51 +181,30 @@ func (d *DAL) GetActiveSchema(ctx context.Context) (*schema.Schema, error) {
//
// Note that this is racey as the deployment can be updated by another process. This will go away once we ditch the DB.
func (d *DAL) UpdateModuleSchema(ctx context.Context, deployment model.DeploymentKey, module *schema.Module) error {
err := d.db.UpdateDeploymentSchema(ctx, module, deployment)
err := d.state.Publish(ctx, &state.DeploymentSchemaUpdatedEvent{
Key: deployment,
Schema: module,
})
if err != nil {
return fmt.Errorf("failed to update deployment schema: %w", err)
}
return nil
}

func (d *DAL) GetDeploymentsWithMinReplicas(ctx context.Context) ([]dalmodel.Deployment, error) {
rows, err := d.db.GetDeploymentsWithMinReplicas(ctx)
if err != nil {
if libdal.IsNotFound(err) {
return nil, nil
}
return nil, libdal.TranslatePGError(err)
}
return slices.Map(rows, func(in dalsql.GetDeploymentsWithMinReplicasRow) dalmodel.Deployment {
return dalmodel.Deployment{
Key: in.Deployment.Key,
Module: in.ModuleName,
Language: in.Language,
MinReplicas: int(in.Deployment.MinReplicas),
Schema: in.Deployment.Schema,
CreatedAt: in.Deployment.CreatedAt,
}
}), nil
}

func (d *DAL) GetActiveDeploymentSchemas(ctx context.Context) ([]*schema.Module, error) {
rows, err := d.db.GetActiveDeploymentSchemas(ctx)
if err != nil {
return nil, fmt.Errorf("could not get active deployments: %w", libdal.TranslatePGError(err))
}
return slices.Map(rows, func(in dalsql.GetActiveDeploymentSchemasRow) *schema.Module { return in.Schema }), nil
view := d.state.View()
rows := view.ActiveDeployments()
return slices.Map(xmaps.Values(rows), func(in *state.Deployment) *schema.Module { return in.Schema }), nil
}

// GetActiveDeploymentSchemasByDeploymentKey returns the schema for all active deployments by deployment key.
//
// model.DeploymentKey is not used directly as a key as it's not a valid map key.
func (d *DAL) GetActiveDeploymentSchemasByDeploymentKey(ctx context.Context) (map[string]*schema.Module, error) {
rows, err := d.db.GetActiveDeploymentSchemas(ctx)
if err != nil {
return nil, fmt.Errorf("could not get active deployments: %w", libdal.TranslatePGError(err))
}
return maps.FromSlice(rows, func(in dalsql.GetActiveDeploymentSchemasRow) (string, *schema.Module) {
return in.Key.String(), in.Schema
view := d.state.View()
rows := view.ActiveDeployments()
return maps.MapValues[string, *state.Deployment, *schema.Module](rows, func(dep string, in *state.Deployment) *schema.Module {
return in.Schema
}), nil
}

Expand Down
21 changes: 4 additions & 17 deletions backend/controller/dal/dal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ import (
"golang.org/x/sync/errgroup"

"github.com/TBD54566975/ftl/backend/controller/artefacts"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
"github.com/TBD54566975/ftl/backend/controller/state"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/schema/v1"
"github.com/TBD54566975/ftl/backend/timeline"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
Expand All @@ -26,9 +24,8 @@ func TestDAL(t *testing.T) {
timelineEndpoint, err := url.Parse("http://localhost:8080")
assert.NoError(t, err)
ctx = timeline.ContextWithClient(ctx, timeline.NewClient(ctx, timelineEndpoint))
conn := sqltest.OpenForTesting(ctx, t)

dal := New(ctx, conn, artefacts.NewForTesting(), state.NewInMemoryState())
dal := New(artefacts.NewForTesting(), state.NewInMemoryState())

deploymentChangesCh := dal.DeploymentChanges.Subscribe(nil)
deploymentChanges := []DeploymentNotification{}
Expand All @@ -40,21 +37,15 @@ func TestDAL(t *testing.T) {
return nil
})

t.Run("UpsertModule", func(t *testing.T) {
err = dal.UpsertModule(ctx, "go", "test")
assert.NoError(t, err)
})

module := &schema.Module{Name: "test"}
var deploymentKey model.DeploymentKey
t.Run("CreateDeployment", func(t *testing.T) {
deploymentKey, err = dal.CreateDeployment(ctx, "go", module)
assert.NoError(t, err)
deploymentKey = model.NewDeploymentKey(module.Name)
err = dal.state.Publish(ctx, &state.DeploymentCreatedEvent{
Key: deploymentKey,
CreatedAt: time.Now(),
Module: module.Name,
Schema: &schemapb.Module{Name: module.Name},
Schema: &schema.Module{Name: module.Name},
})
assert.NoError(t, err)
})
Expand All @@ -67,9 +58,8 @@ func TestDAL(t *testing.T) {

func TestCreateArtefactConflict(t *testing.T) {
ctx := log.ContextWithNewDefaultLogger(context.Background())
conn := sqltest.OpenForTesting(ctx, t)

dal := New(ctx, conn, artefacts.NewForTesting(), state.NewInMemoryState())
dal := New(artefacts.NewForTesting(), state.NewInMemoryState())

idch := make(chan sha256.SHA256, 2)

Expand All @@ -78,12 +68,9 @@ func TestCreateArtefactConflict(t *testing.T) {
wg.Add(2)
createContent := func() {
defer wg.Done()
tx1, err := dal.Begin(ctx)
assert.NoError(t, err)
digest, err := dal.registry.Upload(ctx, artefacts.Artefact{Content: []byte("content")})
assert.NoError(t, err)
time.Sleep(time.Second * 2)
err = tx1.Commit(ctx)
assert.NoError(t, err)
idch <- digest
}
Expand Down
12 changes: 0 additions & 12 deletions backend/controller/dal/internal/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit cfe35a3

Please sign in to comment.