Skip to content

Commit

Permalink
feat: use buildengine for deploy cmd (#1002)
Browse files Browse the repository at this point in the history
Not fully tested yet, but starting to flesh out the subscription flow
for build -> deploy

```sh
ftl deploy ../ftl-examples/online-boutique/backend/services examples/go

info:time: Building module
info:ad: Building module
info:currency: Building module
info:cart: Building module
info:cart: Deploying module
info:ad: Deploying module
info:time: Deploying module
info:currency: Deploying module
info:shipping: Building module
info:productcatalog: Building module
info:payment: Building module
info:echo: Building module
info:shipping: Deploying module
info:payment: Deploying module
info:echo: Deploying module
info:productcatalog: Deploying module
info:recommendation: Building module
info:checkout: Building module
info:recommendation: Deploying module
info:checkout: Deploying module
```

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
wesbillman and github-actions[bot] authored Mar 5, 2024
1 parent 8dc0d59 commit c995fa4
Show file tree
Hide file tree
Showing 9 changed files with 286 additions and 279 deletions.
2 changes: 1 addition & 1 deletion buildengine/build_kotlin.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func SetPOMProperties(ctx context.Context, baseDir string) error {
}

func prepareFTLRoot(module Module) error {
buildDir := filepath.Join(module.Dir, "target")
buildDir := module.AbsDeployDir()
if err := os.MkdirAll(buildDir, 0700); err != nil {
return err
}
Expand Down
229 changes: 229 additions & 0 deletions buildengine/deploy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
package buildengine

import (
"context"
"fmt"
"os"
"path/filepath"
"time"

"connectrpc.com/connect"
"golang.org/x/exp/maps"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"

ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
"github.com/TBD54566975/ftl/common/moduleconfig"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/sha256"
"github.com/TBD54566975/ftl/internal/slices"
)

type deploymentArtefact struct {
*ftlv1.DeploymentArtefact
localPath string
}

// Deploy a module to the FTL controller with the given number of replicas. Optionally wait for the deployment to become ready.
func Deploy(ctx context.Context, module Module, replicas int32, waitForDeployOnline bool, client ftlv1connect.ControllerServiceClient) error {
logger := log.FromContext(ctx).Scope(module.Module)
ctx = log.ContextWithLogger(ctx, logger)
logger.Infof("Deploying module")

deployDir := module.AbsDeployDir()
files, err := findFiles(deployDir, module.Deploy)
if err != nil {
logger.Errorf(err, "failed to find files in %s", deployDir)
return err
}

filesByHash, err := hashFiles(deployDir, files)
if err != nil {
return err
}

gadResp, err := client.GetArtefactDiffs(ctx, connect.NewRequest(&ftlv1.GetArtefactDiffsRequest{ClientDigests: maps.Keys(filesByHash)}))
if err != nil {
return err
}

moduleSchema, err := loadProtoSchema(deployDir, module.ModuleConfig, replicas)
if err != nil {
return fmt.Errorf("failed to load protobuf schema from %q: %w", module.ModuleConfig.Schema, err)
}

logger.Debugf("Uploading %d/%d files", len(gadResp.Msg.MissingDigests), len(files))
for _, missing := range gadResp.Msg.MissingDigests {
file := filesByHash[missing]
content, err := os.ReadFile(file.localPath)
if err != nil {
return err
}
logger.Tracef("Uploading %s", relToCWD(file.localPath))
resp, err := client.UploadArtefact(ctx, connect.NewRequest(&ftlv1.UploadArtefactRequest{
Content: content,
}))
if err != nil {
return err
}
logger.Debugf("Uploaded %s as %s:%s", relToCWD(file.localPath), sha256.FromBytes(resp.Msg.Digest), file.Path)
}

resp, err := client.CreateDeployment(ctx, connect.NewRequest(&ftlv1.CreateDeploymentRequest{
Schema: moduleSchema,
Artefacts: slices.Map(maps.Values(filesByHash), func(a deploymentArtefact) *ftlv1.DeploymentArtefact {
return a.DeploymentArtefact
}),
}))
if err != nil {
return err
}

_, err = client.ReplaceDeploy(ctx, connect.NewRequest(&ftlv1.ReplaceDeployRequest{DeploymentName: resp.Msg.GetDeploymentName(), MinReplicas: replicas}))
if err != nil {
return err
}

if waitForDeployOnline {
logger.Debugf("Waiting for deployment %s to become ready", resp.Msg.DeploymentName)
err = checkReadiness(ctx, client, resp.Msg.DeploymentName, replicas)
if err != nil {
return err
}
}

return nil
}

func loadProtoSchema(deployDir string, config moduleconfig.ModuleConfig, replicas int32) (*schemapb.Module, error) {
schema := filepath.Join(deployDir, config.Schema)
content, err := os.ReadFile(schema)
if err != nil {
return nil, err
}
module := &schemapb.Module{}
err = proto.Unmarshal(content, module)
if err != nil {
return nil, err
}
module.Runtime = &schemapb.ModuleRuntime{
CreateTime: timestamppb.Now(),
Language: config.Language,
MinReplicas: replicas,
}
return module, nil
}

func findFiles(base string, files []string) ([]string, error) {
var out []string
for _, file := range files {
file = filepath.Join(base, file)
info, err := os.Stat(file)
if err != nil {
return nil, err
}
if info.IsDir() {
dirFiles, err := findFilesInDir(file)
if err != nil {
return nil, err
}
out = append(out, dirFiles...)
} else {
out = append(out, file)
}
}
return out, nil
}

func findFilesInDir(dir string) ([]string, error) {
var out []string
return out, filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
out = append(out, path)
return nil
})
}

func hashFiles(base string, files []string) (filesByHash map[string]deploymentArtefact, err error) {
filesByHash = map[string]deploymentArtefact{}
for _, file := range files {
r, err := os.Open(file)
if err != nil {
return nil, err
}
defer r.Close() //nolint:gosec
hash, err := sha256.SumReader(r)
if err != nil {
return nil, err
}
info, err := r.Stat()
if err != nil {
return nil, err
}
isExecutable := info.Mode()&0111 != 0
path, err := filepath.Rel(base, file)
if err != nil {
return nil, err
}
filesByHash[hash.String()] = deploymentArtefact{
DeploymentArtefact: &ftlv1.DeploymentArtefact{
Digest: hash.String(),
Path: path,
Executable: isExecutable,
},
localPath: file,
}
}
return filesByHash, nil
}

func relToCWD(path string) string {
cwd, err := os.Getwd()
if err != nil {
panic(err)
}
rel, err := filepath.Rel(cwd, path)
if err != nil {
return path
}
return rel
}

func checkReadiness(ctx context.Context, client ftlv1connect.ControllerServiceClient, deploymentName string, replicas int32) error {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
status, err := client.Status(ctx, connect.NewRequest(&ftlv1.StatusRequest{
AllDeployments: true,
}))
if err != nil {
return err
}

var found bool
for _, deployment := range status.Msg.Deployments {
if deployment.Key == deploymentName {
found = true
if deployment.Replicas >= replicas {
return nil
}
}
}
if !found {
return fmt.Errorf("deployment %s not found: %v", deploymentName, status.Msg.Deployments)
}
case <-ctx.Done():
return ctx.Err()
}
}
}
37 changes: 33 additions & 4 deletions buildengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

// Engine for building a set of modules.
type Engine struct {
client ftlv1connect.ControllerServiceClient
modules map[string]Module
controllerSchema *xsync.MapOf[string, *schema.Module]
cancel func()
Expand All @@ -37,6 +38,7 @@ type Engine struct {
func New(ctx context.Context, client ftlv1connect.ControllerServiceClient, dirs ...string) (*Engine, error) {
ctx = rpc.ContextWithClient(ctx, client)
e := &Engine{
client: client,
modules: map[string]Module{},
controllerSchema: xsync.NewMapOf[string, *schema.Module](),
}
Expand All @@ -57,17 +59,17 @@ func New(ctx context.Context, client ftlv1connect.ControllerServiceClient, dirs
if client == nil {
return e, nil
}
schemaSync := e.startSchemaSync(ctx, client)
schemaSync := e.startSchemaSync(ctx)
go rpc.RetryStreamingServerStream(ctx, backoff.Backoff{Max: time.Second}, &ftlv1.PullSchemaRequest{}, client.PullSchema, schemaSync)
return e, nil
}

// Sync module schema changes from the FTL controller, as well as from manual
// updates, and merge them into a single schema map.
func (e *Engine) startSchemaSync(ctx context.Context, client ftlv1connect.ControllerServiceClient) func(ctx context.Context, msg *ftlv1.PullSchemaResponse) error {
func (e *Engine) startSchemaSync(ctx context.Context) func(ctx context.Context, msg *ftlv1.PullSchemaResponse) error {
logger := log.FromContext(ctx)
// Blocking schema sync from the controller.
psch, err := client.GetSchema(ctx, connect.NewRequest(&ftlv1.GetSchemaRequest{}))
psch, err := e.client.GetSchema(ctx, connect.NewRequest(&ftlv1.GetSchemaRequest{}))
if err == nil {
sch, err := schema.FromProto(psch.Msg.Schema)
if err == nil {
Expand Down Expand Up @@ -149,6 +151,28 @@ func (e *Engine) Import(ctx context.Context, schema *schema.Module) {

// Build attempts to build the specified modules, or all local modules if none are provided.
func (e *Engine) Build(ctx context.Context, modules ...string) error {
return e.buildWithCallback(ctx, nil, modules...)
}

// Deploy attempts to build and deploy the specified modules, or all local modules if none are provided.
func (e *Engine) Deploy(ctx context.Context, replicas int32, waitForDeployOnline bool, modules ...string) error {
if len(modules) == 0 {
modules = maps.Keys(e.modules)
}

expectedBuilds := make(map[string]struct{}, len(modules))
for _, name := range modules {
expectedBuilds[name] = struct{}{}
}

return e.buildWithCallback(ctx, func(ctx context.Context, module Module) error {
return Deploy(ctx, module, replicas, waitForDeployOnline, e.client)
}, modules...)
}

type buildCallback func(ctx context.Context, module Module) error

func (e *Engine) buildWithCallback(ctx context.Context, callback buildCallback, modules ...string) error {
mustBuild := map[string]bool{}
if len(modules) == 0 {
modules = maps.Keys(e.modules)
Expand All @@ -174,6 +198,7 @@ func (e *Engine) Build(ctx context.Context, modules ...string) error {
built := map[string]*schema.Module{
"builtin": schema.Builtins(),
}

topology := TopologicalSort(graph)
for _, group := range topology {
// Collect schemas to be inserted into "built" map for subsequent groups.
Expand All @@ -183,7 +208,11 @@ func (e *Engine) Build(ctx context.Context, modules ...string) error {
for _, name := range group {
wg.Go(func() error {
if mustBuild[name] {
return e.build(ctx, name, built, schemas)
err := e.build(ctx, name, built, schemas)
if err == nil && callback != nil {
return callback(ctx, e.modules[name])
}
return err
}
return e.mustSchema(ctx, name, built, schemas)
})
Expand Down
2 changes: 1 addition & 1 deletion buildengine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,6 @@ func TestEngine(t *testing.T) {
graph, err := engine.Graph()
assert.NoError(t, err)
assert.Equal(t, expected, graph)
err = engine.Build(ctx, "alpha")
err = engine.Build(ctx)
assert.NoError(t, err)
}
Loading

0 comments on commit c995fa4

Please sign in to comment.