Skip to content

Commit

Permalink
fix: ensure that a schema-only change triggers a deploy (#857)
Browse files Browse the repository at this point in the history
Previously, changes to the schema without corresponding artefact changes
would not trigger a deploy. This could include things like ingress
routes, etc.

Fixes #789
  • Loading branch information
alecthomas authored Jan 31, 2024
1 parent bab131b commit 605ef90
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 29 deletions.
47 changes: 31 additions & 16 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ type IngressRoutingEntry struct {
// previously created artefacts with it.
//
// If an existing deployment with identical artefacts exists, it is returned.
func (d *DAL) CreateDeployment(ctx context.Context, language string, schema *schema.Module, artefacts []DeploymentArtefact, ingressRoutes []IngressRoutingEntry) (key model.DeploymentName, err error) {
func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []DeploymentArtefact, ingressRoutes []IngressRoutingEntry) (key model.DeploymentName, err error) {
logger := log.FromContext(ctx)

// Start the transaction
Expand All @@ -419,37 +419,32 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, schema *sch

defer tx.CommitOrRollback(ctx, &err)

// Check if the deployment already exists and if so, return it.
existing, err := tx.GetDeploymentsWithArtefacts(ctx,
sha256esToBytes(slices.Map(artefacts, func(in DeploymentArtefact) sha256.SHA256 { return in.Digest })),
int64(len(artefacts)),
)
existingDeployment, err := d.checkForExistingDeployments(ctx, tx, moduleSchema, artefacts)
if err != nil {
return "", fmt.Errorf("%s: %w", "couldn't check for existing deployment", err)
}
if len(existing) > 0 {
logger.Debugf("Returning existing deployment %s", existing[0].DeploymentName)
return existing[0].DeploymentName, nil
return "", err
} else if existingDeployment != "" {
logger.Debugf("Returning existing deployment %s", existingDeployment)
return existingDeployment, nil
}

artefactsByDigest := maps.FromSlice(artefacts, func(in DeploymentArtefact) (sha256.SHA256, DeploymentArtefact) {
return in.Digest, in
})

schemaBytes, err := proto.Marshal(schema.ToProto())
schemaBytes, err := proto.Marshal(moduleSchema.ToProto())
if err != nil {
return "", fmt.Errorf("%s: %w", "failed to marshal schema", err)
}

// TODO(aat): "schema" containing language?
_, err = tx.UpsertModule(ctx, language, schema.Name)
_, err = tx.UpsertModule(ctx, language, moduleSchema.Name)
if err != nil {
return "", fmt.Errorf("%s: %w", "failed to upsert module", translatePGError(err))
}

deploymentName := model.NewDeploymentName(schema.Name)
deploymentName := model.NewDeploymentName(moduleSchema.Name)
// Create the deployment
err = tx.CreateDeployment(ctx, deploymentName, schema.Name, schemaBytes)
err = tx.CreateDeployment(ctx, deploymentName, moduleSchema.Name, schemaBytes)
if err != nil {
return "", fmt.Errorf("%s: %w", "failed to create deployment", translatePGError(err))
}
Expand Down Expand Up @@ -483,7 +478,7 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, schema *sch
Name: deploymentName,
Method: ingressRoute.Method,
Path: ingressRoute.Path,
Module: schema.Name,
Module: moduleSchema.Name,
Verb: ingressRoute.Verb,
})
if err != nil {
Expand Down Expand Up @@ -1018,6 +1013,26 @@ func (d *DAL) GetActiveRunners(ctx context.Context) ([]Runner, error) {
}), nil
}

// Check if a deployment exists that exactly matches the given artefacts and schema.
func (*DAL) checkForExistingDeployments(ctx context.Context, tx *sql.Tx, moduleSchema *schema.Module, artefacts []DeploymentArtefact) (model.DeploymentName, error) {
schemaBytes, err := schema.ModuleToBytes(moduleSchema)
if err != nil {
return "", fmt.Errorf("failed to marshal schema: %w", err)
}
existing, err := tx.GetDeploymentsWithArtefacts(ctx,
sha256esToBytes(slices.Map(artefacts, func(in DeploymentArtefact) sha256.SHA256 { return in.Digest })),
schemaBytes,
int64(len(artefacts)),
)
if err != nil {
return "", fmt.Errorf("%s: %w", "couldn't check for existing deployment", err)
}
if len(existing) > 0 {
return existing[0].DeploymentName, nil
}
return "", nil
}

func sha256esToBytes(digests []sha256.SHA256) [][]byte {
return slices.Map(digests, func(digest sha256.SHA256) []byte { return digest[:] })
}
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/sql/db.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion backend/controller/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions backend/controller/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion backend/controller/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,15 @@ WHERE d.name = $1;

-- name: GetDeploymentsWithArtefacts :many
-- Get all deployments that have artefacts matching the given digests.
SELECT d.id, d.created_at, d.name as deployment_name, m.name AS module_name
SELECT d.id, d.created_at, d.name as deployment_name, d.schema, m.name AS module_name
FROM deployments d
INNER JOIN modules m ON d.module_id = m.id
WHERE EXISTS (SELECT 1
FROM deployment_artefacts da
INNER JOIN artefacts a ON da.artefact_id = a.id
WHERE a.digest = ANY (@digests::bytea[])
AND da.deployment_id = d.id
AND d.schema = @schema::BYTEA
HAVING COUNT(*) = @count::BIGINT -- Number of unique digests provided
);

Expand Down
9 changes: 5 additions & 4 deletions backend/controller/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions backend/schema/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ func ModuleFromBytes(b []byte) (*Module, error) {
return ModuleFromProto(s)
}

func ModuleToBytes(m *Module) ([]byte, error) {
return proto.Marshal(m.ToProto())
}

func moduleListToSchema(s []*schemapb.Module) ([]*Module, error) {
var out []*Module
for _, n := range s {
Expand Down
6 changes: 2 additions & 4 deletions examples/time/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ type TimeResponse struct {
// Time returns the current time.
//
//ftl:verb
//ftl:ingress GET /time
//ftl:ingress GET /timef
func Time(ctx context.Context, req TimeRequest) (TimeResponse, error) {
return TimeResponse{
Time: time.Now(),
}, nil
return TimeResponse{Time: time.Now()}, nil
}

0 comments on commit 605ef90

Please sign in to comment.