diff --git a/buildengine/build_kotlin.go b/buildengine/build_kotlin.go index ba50299e57..d2abd506b6 100644 --- a/buildengine/build_kotlin.go +++ b/buildengine/build_kotlin.go @@ -93,7 +93,7 @@ func SetPOMProperties(ctx context.Context, baseDir string) error { } func prepareFTLRoot(module Module) error { - buildDir := filepath.Join(module.Dir, "target") + buildDir := module.AbsDeployDir() if err := os.MkdirAll(buildDir, 0700); err != nil { return err } diff --git a/buildengine/deploy.go b/buildengine/deploy.go new file mode 100644 index 0000000000..71e9a62550 --- /dev/null +++ b/buildengine/deploy.go @@ -0,0 +1,229 @@ +package buildengine + +import ( + "context" + "fmt" + "os" + "path/filepath" + "time" + + "connectrpc.com/connect" + "golang.org/x/exp/maps" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" + + ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" + "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect" + schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema" + "github.com/TBD54566975/ftl/common/moduleconfig" + "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/sha256" + "github.com/TBD54566975/ftl/internal/slices" +) + +type deploymentArtefact struct { + *ftlv1.DeploymentArtefact + localPath string +} + +// Deploy a module to the FTL controller with the given number of replicas. Optionally wait for the deployment to become ready. +func Deploy(ctx context.Context, module Module, replicas int32, waitForDeployOnline bool, client ftlv1connect.ControllerServiceClient) error { + logger := log.FromContext(ctx).Scope(module.Module) + ctx = log.ContextWithLogger(ctx, logger) + logger.Infof("Deploying module") + + deployDir := module.AbsDeployDir() + files, err := findFiles(deployDir, module.Deploy) + if err != nil { + logger.Errorf(err, "failed to find files in %s", deployDir) + return err + } + + filesByHash, err := hashFiles(deployDir, files) + if err != nil { + return err + } + + gadResp, err := client.GetArtefactDiffs(ctx, connect.NewRequest(&ftlv1.GetArtefactDiffsRequest{ClientDigests: maps.Keys(filesByHash)})) + if err != nil { + return err + } + + moduleSchema, err := loadProtoSchema(deployDir, module.ModuleConfig, replicas) + if err != nil { + return fmt.Errorf("failed to load protobuf schema from %q: %w", module.ModuleConfig.Schema, err) + } + + logger.Debugf("Uploading %d/%d files", len(gadResp.Msg.MissingDigests), len(files)) + for _, missing := range gadResp.Msg.MissingDigests { + file := filesByHash[missing] + content, err := os.ReadFile(file.localPath) + if err != nil { + return err + } + logger.Tracef("Uploading %s", relToCWD(file.localPath)) + resp, err := client.UploadArtefact(ctx, connect.NewRequest(&ftlv1.UploadArtefactRequest{ + Content: content, + })) + if err != nil { + return err + } + logger.Debugf("Uploaded %s as %s:%s", relToCWD(file.localPath), sha256.FromBytes(resp.Msg.Digest), file.Path) + } + + resp, err := client.CreateDeployment(ctx, connect.NewRequest(&ftlv1.CreateDeploymentRequest{ + Schema: moduleSchema, + Artefacts: slices.Map(maps.Values(filesByHash), func(a deploymentArtefact) *ftlv1.DeploymentArtefact { + return a.DeploymentArtefact + }), + })) + if err != nil { + return err + } + + _, err = client.ReplaceDeploy(ctx, connect.NewRequest(&ftlv1.ReplaceDeployRequest{DeploymentName: resp.Msg.GetDeploymentName(), MinReplicas: replicas})) + if err != nil { + return err + } + + if waitForDeployOnline { + logger.Debugf("Waiting for deployment %s to become ready", resp.Msg.DeploymentName) + err = checkReadiness(ctx, client, resp.Msg.DeploymentName, replicas) + if err != nil { + return err + } + } + + return nil +} + +func loadProtoSchema(deployDir string, config moduleconfig.ModuleConfig, replicas int32) (*schemapb.Module, error) { + schema := filepath.Join(deployDir, config.Schema) + content, err := os.ReadFile(schema) + if err != nil { + return nil, err + } + module := &schemapb.Module{} + err = proto.Unmarshal(content, module) + if err != nil { + return nil, err + } + module.Runtime = &schemapb.ModuleRuntime{ + CreateTime: timestamppb.Now(), + Language: config.Language, + MinReplicas: replicas, + } + return module, nil +} + +func findFiles(base string, files []string) ([]string, error) { + var out []string + for _, file := range files { + file = filepath.Join(base, file) + info, err := os.Stat(file) + if err != nil { + return nil, err + } + if info.IsDir() { + dirFiles, err := findFilesInDir(file) + if err != nil { + return nil, err + } + out = append(out, dirFiles...) + } else { + out = append(out, file) + } + } + return out, nil +} + +func findFilesInDir(dir string) ([]string, error) { + var out []string + return out, filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if info.IsDir() { + return nil + } + out = append(out, path) + return nil + }) +} + +func hashFiles(base string, files []string) (filesByHash map[string]deploymentArtefact, err error) { + filesByHash = map[string]deploymentArtefact{} + for _, file := range files { + r, err := os.Open(file) + if err != nil { + return nil, err + } + defer r.Close() //nolint:gosec + hash, err := sha256.SumReader(r) + if err != nil { + return nil, err + } + info, err := r.Stat() + if err != nil { + return nil, err + } + isExecutable := info.Mode()&0111 != 0 + path, err := filepath.Rel(base, file) + if err != nil { + return nil, err + } + filesByHash[hash.String()] = deploymentArtefact{ + DeploymentArtefact: &ftlv1.DeploymentArtefact{ + Digest: hash.String(), + Path: path, + Executable: isExecutable, + }, + localPath: file, + } + } + return filesByHash, nil +} + +func relToCWD(path string) string { + cwd, err := os.Getwd() + if err != nil { + panic(err) + } + rel, err := filepath.Rel(cwd, path) + if err != nil { + return path + } + return rel +} + +func checkReadiness(ctx context.Context, client ftlv1connect.ControllerServiceClient, deploymentName string, replicas int32) error { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + status, err := client.Status(ctx, connect.NewRequest(&ftlv1.StatusRequest{ + AllDeployments: true, + })) + if err != nil { + return err + } + + var found bool + for _, deployment := range status.Msg.Deployments { + if deployment.Key == deploymentName { + found = true + if deployment.Replicas >= replicas { + return nil + } + } + } + if !found { + return fmt.Errorf("deployment %s not found: %v", deploymentName, status.Msg.Deployments) + } + case <-ctx.Done(): + return ctx.Err() + } + } +} diff --git a/buildengine/engine.go b/buildengine/engine.go index 14d5e1791a..ce1c141bae 100644 --- a/buildengine/engine.go +++ b/buildengine/engine.go @@ -22,6 +22,7 @@ import ( // Engine for building a set of modules. type Engine struct { + client ftlv1connect.ControllerServiceClient modules map[string]Module controllerSchema *xsync.MapOf[string, *schema.Module] cancel func() @@ -37,6 +38,7 @@ type Engine struct { func New(ctx context.Context, client ftlv1connect.ControllerServiceClient, dirs ...string) (*Engine, error) { ctx = rpc.ContextWithClient(ctx, client) e := &Engine{ + client: client, modules: map[string]Module{}, controllerSchema: xsync.NewMapOf[string, *schema.Module](), } @@ -57,17 +59,17 @@ func New(ctx context.Context, client ftlv1connect.ControllerServiceClient, dirs if client == nil { return e, nil } - schemaSync := e.startSchemaSync(ctx, client) + schemaSync := e.startSchemaSync(ctx) go rpc.RetryStreamingServerStream(ctx, backoff.Backoff{Max: time.Second}, &ftlv1.PullSchemaRequest{}, client.PullSchema, schemaSync) return e, nil } // Sync module schema changes from the FTL controller, as well as from manual // updates, and merge them into a single schema map. -func (e *Engine) startSchemaSync(ctx context.Context, client ftlv1connect.ControllerServiceClient) func(ctx context.Context, msg *ftlv1.PullSchemaResponse) error { +func (e *Engine) startSchemaSync(ctx context.Context) func(ctx context.Context, msg *ftlv1.PullSchemaResponse) error { logger := log.FromContext(ctx) // Blocking schema sync from the controller. - psch, err := client.GetSchema(ctx, connect.NewRequest(&ftlv1.GetSchemaRequest{})) + psch, err := e.client.GetSchema(ctx, connect.NewRequest(&ftlv1.GetSchemaRequest{})) if err == nil { sch, err := schema.FromProto(psch.Msg.Schema) if err == nil { @@ -149,6 +151,28 @@ func (e *Engine) Import(ctx context.Context, schema *schema.Module) { // Build attempts to build the specified modules, or all local modules if none are provided. func (e *Engine) Build(ctx context.Context, modules ...string) error { + return e.buildWithCallback(ctx, nil, modules...) +} + +// Deploy attempts to build and deploy the specified modules, or all local modules if none are provided. +func (e *Engine) Deploy(ctx context.Context, replicas int32, waitForDeployOnline bool, modules ...string) error { + if len(modules) == 0 { + modules = maps.Keys(e.modules) + } + + expectedBuilds := make(map[string]struct{}, len(modules)) + for _, name := range modules { + expectedBuilds[name] = struct{}{} + } + + return e.buildWithCallback(ctx, func(ctx context.Context, module Module) error { + return Deploy(ctx, module, replicas, waitForDeployOnline, e.client) + }, modules...) +} + +type buildCallback func(ctx context.Context, module Module) error + +func (e *Engine) buildWithCallback(ctx context.Context, callback buildCallback, modules ...string) error { mustBuild := map[string]bool{} if len(modules) == 0 { modules = maps.Keys(e.modules) @@ -174,6 +198,7 @@ func (e *Engine) Build(ctx context.Context, modules ...string) error { built := map[string]*schema.Module{ "builtin": schema.Builtins(), } + topology := TopologicalSort(graph) for _, group := range topology { // Collect schemas to be inserted into "built" map for subsequent groups. @@ -183,7 +208,11 @@ func (e *Engine) Build(ctx context.Context, modules ...string) error { for _, name := range group { wg.Go(func() error { if mustBuild[name] { - return e.build(ctx, name, built, schemas) + err := e.build(ctx, name, built, schemas) + if err == nil && callback != nil { + return callback(ctx, e.modules[name]) + } + return err } return e.mustSchema(ctx, name, built, schemas) }) diff --git a/buildengine/engine_test.go b/buildengine/engine_test.go index 17a7251b07..16052f6dd4 100644 --- a/buildengine/engine_test.go +++ b/buildengine/engine_test.go @@ -52,6 +52,6 @@ func TestEngine(t *testing.T) { graph, err := engine.Graph() assert.NoError(t, err) assert.Equal(t, expected, graph) - err = engine.Build(ctx, "alpha") + err = engine.Build(ctx) assert.NoError(t, err) } diff --git a/cmd/ftl/cmd_deploy.go b/cmd/ftl/cmd_deploy.go index 8ad725a7d8..2c06f3ae95 100644 --- a/cmd/ftl/cmd_deploy.go +++ b/cmd/ftl/cmd_deploy.go @@ -2,266 +2,23 @@ package main import ( "context" - "fmt" - "os" - "path/filepath" - "strings" - "time" - "connectrpc.com/connect" - "golang.org/x/exp/maps" - "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/types/known/timestamppb" - - ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect" - schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema" - "github.com/TBD54566975/ftl/common/moduleconfig" - "github.com/TBD54566975/ftl/internal/log" - "github.com/TBD54566975/ftl/internal/sha256" - "github.com/TBD54566975/ftl/internal/slices" + "github.com/TBD54566975/ftl/buildengine" + "github.com/TBD54566975/ftl/internal/rpc" ) type deployCmd struct { - Replicas int32 `short:"n" help:"Number of replicas to deploy." default:"1"` - ModuleDir string `arg:"" help:"Directory containing ftl.toml" type:"existingdir" default:"."` - Wait bool `help:"Only complete the deploy command when sufficient runners have been provisioned, i.e. the deployment is online and reachable." default:"false"` + Replicas int32 `short:"n" help:"Number of replicas to deploy." default:"1"` + Dirs []string `arg:"" help:"Base directories containing modules." type:"existingdir" required:""` + NoWait bool `help:"Do not wait for deployment to complete." default:"false"` } -func (d *deployCmd) Run(ctx context.Context, client ftlv1connect.ControllerServiceClient) error { - logger := log.FromContext(ctx) - - // Load the TOML file. - config, err := moduleconfig.LoadModuleConfig(d.ModuleDir) - if err != nil { - return err - } - - if len(config.Deploy) == 0 { - return fmt.Errorf("no deploy paths defined in config") - } - - build := buildCmd{Dirs: []string{d.ModuleDir}} - err = build.Run(ctx) - if err != nil { - return err - } - - logger.Infof("Preparing module '%s' for deployment", config.Module) - - deployDir := filepath.Join(d.ModuleDir, config.DeployDir) - files, err := findFiles(deployDir, config.Deploy) - if err != nil { - return err - } - - filesByHash, err := hashFiles(deployDir, files) - if err != nil { - return err - } - - gadResp, err := client.GetArtefactDiffs(ctx, connect.NewRequest(&ftlv1.GetArtefactDiffsRequest{ClientDigests: maps.Keys(filesByHash)})) - if err != nil { - return err - } - - module, err := d.loadProtoSchema(deployDir, config) - if err != nil { - return fmt.Errorf("failed to load protobuf schema from %q: %w", config.Schema, err) - } - - logger.Debugf("Uploading %d/%d files", len(gadResp.Msg.MissingDigests), len(files)) - for _, missing := range gadResp.Msg.MissingDigests { - file := filesByHash[missing] - content, err := os.ReadFile(file.localPath) - if err != nil { - return err - } - logger.Tracef("Uploading %s", relToCWD(file.localPath)) - resp, err := client.UploadArtefact(ctx, connect.NewRequest(&ftlv1.UploadArtefactRequest{ - Content: content, - })) - if err != nil { - return err - } - logger.Debugf("Uploaded %s as %s:%s", relToCWD(file.localPath), sha256.FromBytes(resp.Msg.Digest), file.Path) - } - - resp, err := client.CreateDeployment(ctx, connect.NewRequest(&ftlv1.CreateDeploymentRequest{ - Schema: module, - Artefacts: slices.Map(maps.Values(filesByHash), func(a deploymentArtefact) *ftlv1.DeploymentArtefact { - return a.DeploymentArtefact - }), - })) - if err != nil { - return err - } - - _, err = client.ReplaceDeploy(ctx, connect.NewRequest(&ftlv1.ReplaceDeployRequest{DeploymentName: resp.Msg.GetDeploymentName(), MinReplicas: d.Replicas})) +func (d *deployCmd) Run(ctx context.Context) error { + client := rpc.ClientFromContext[ftlv1connect.ControllerServiceClient](ctx) + engine, err := buildengine.New(ctx, client, d.Dirs...) if err != nil { return err } - - if d.Wait { - logger.Infof("Waiting for deployment %s to become ready", resp.Msg.DeploymentName) - err = d.checkReadiness(ctx, client, resp.Msg.DeploymentName) - if err != nil { - return err - } - } - - logger.Infof("Successfully created deployment %s", resp.Msg.DeploymentName) - return nil -} - -func (d *deployCmd) checkReadiness(ctx context.Context, client ftlv1connect.ControllerServiceClient, deploymentName string) error { - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - status, err := client.Status(ctx, connect.NewRequest(&ftlv1.StatusRequest{ - AllDeployments: true, - })) - if err != nil { - return err - } - - var found bool - for _, deployment := range status.Msg.Deployments { - if deployment.Key == deploymentName { - found = true - if deployment.Replicas >= d.Replicas { - return nil - } - } - } - if !found { - return fmt.Errorf("deployment %s not found: %v", deploymentName, status.Msg.Deployments) - } - case <-ctx.Done(): - return ctx.Err() - } - } -} -func (d *deployCmd) loadProtoSchema(deployDir string, config moduleconfig.ModuleConfig) (*schemapb.Module, error) { - schema := filepath.Join(deployDir, config.Schema) - content, err := os.ReadFile(schema) - if err != nil { - return nil, err - } - module := &schemapb.Module{} - err = proto.Unmarshal(content, module) - if err != nil { - return nil, err - } - module.Runtime = &schemapb.ModuleRuntime{ - CreateTime: timestamppb.Now(), - Language: config.Language, - MinReplicas: d.Replicas, - } - return module, nil -} - -type deploymentArtefact struct { - *ftlv1.DeploymentArtefact - localPath string -} - -func hashFiles(base string, files []string) (filesByHash map[string]deploymentArtefact, err error) { - filesByHash = map[string]deploymentArtefact{} - for _, file := range files { - r, err := os.Open(file) - if err != nil { - return nil, err - } - defer r.Close() //nolint:gosec - hash, err := sha256.SumReader(r) - if err != nil { - return nil, err - } - info, err := r.Stat() - if err != nil { - return nil, err - } - isExecutable := info.Mode()&0111 != 0 - path, err := filepath.Rel(base, file) - if err != nil { - return nil, err - } - filesByHash[hash.String()] = deploymentArtefact{ - DeploymentArtefact: &ftlv1.DeploymentArtefact{ - Digest: hash.String(), - Path: path, - Executable: isExecutable, - }, - localPath: file, - } - } - return filesByHash, nil -} - -func longestCommonPathPrefix(paths []string) string { - if len(paths) == 0 { - return "" - } - parts := strings.Split(filepath.Dir(paths[0]), "/") - for _, path := range paths[1:] { - parts2 := strings.Split(path, "/") - for i := range parts { - if i >= len(parts2) || parts[i] != parts2[i] { - parts = parts[:i] - break - } - } - } - return strings.Join(parts, "/") -} - -func relToCWD(path string) string { - cwd, err := os.Getwd() - if err != nil { - panic(err) - } - rel, err := filepath.Rel(cwd, path) - if err != nil { - return path - } - return rel -} - -func findFiles(base string, files []string) ([]string, error) { - var out []string - for _, file := range files { - file = filepath.Join(base, file) - info, err := os.Stat(file) - if err != nil { - return nil, err - } - if info.IsDir() { - dirFiles, err := findFilesInDir(file) - if err != nil { - return nil, err - } - out = append(out, dirFiles...) - } else { - out = append(out, file) - } - } - return out, nil -} - -func findFilesInDir(dir string) ([]string, error) { - var out []string - return out, filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } - if info.IsDir() { - return nil - } - out = append(out, path) - return nil - }) + return engine.Deploy(ctx, d.Replicas, !d.NoWait) } diff --git a/cmd/ftl/cmd_dev.go b/cmd/ftl/cmd_dev.go index 946bf6d53a..e104e1af91 100644 --- a/cmd/ftl/cmd_dev.go +++ b/cmd/ftl/cmd_dev.go @@ -158,11 +158,10 @@ func (d *devCmd) Run(ctx context.Context) error { logger.Warnf("Detected change in %s%s, rebuilding...", changeType, path) } deploy := deployCmd{ - Replicas: 1, - ModuleDir: dir, - Wait: d.ExitAfterDeploy, + Replicas: 1, + Dirs: []string{dir}, } - err = deploy.Run(ctx, client) + err = deploy.Run(ctx) if err != nil { logger.Errorf(err, "Error deploying module %s. Will retry", dir) failedModules[dir] = true diff --git a/cmd/ftl/ftl_test.go b/cmd/ftl/ftl_test.go deleted file mode 100644 index ebd41f4d53..0000000000 --- a/cmd/ftl/ftl_test.go +++ /dev/null @@ -1,12 +0,0 @@ -package main - -import ( - "testing" - - "github.com/alecthomas/assert/v2" -) - -func TestLongestCommonPrefix(t *testing.T) { - foo := []string{"foo/waz/boo", "foo/waz/bar", "foo/waz/baz"} - assert.Equal(t, "foo/waz", longestCommonPathPrefix(foo)) -} diff --git a/common/moduleconfig/moduleconfig.go b/common/moduleconfig/moduleconfig.go index e311e5ea20..20457e1abb 100644 --- a/common/moduleconfig/moduleconfig.go +++ b/common/moduleconfig/moduleconfig.go @@ -54,6 +54,11 @@ func LoadModuleConfig(dir string) (ModuleConfig, error) { return config, nil } +// AbsDeployDir returns the absolute path to the deploy directory. +func (c ModuleConfig) AbsDeployDir() string { + return filepath.Join(c.Dir, c.DeployDir) +} + func setConfigDefaults(moduleDir string, config *ModuleConfig) error { if config.Realm == "" { config.Realm = "home" diff --git a/integration/integration_test.go b/integration/integration_test.go index 34df7c85c8..5598f0d6cd 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -49,7 +49,7 @@ func TestLifecycle(t *testing.T) { run("ftl", rd.initOpts...), }}, {name: fmt.Sprintf("Deploy%s", rd.testSuffix), assertions: assertions{ - run("ftl", "deploy", "--wait", rd.moduleRoot), + run("ftl", "deploy", rd.moduleRoot), deploymentExists(rd.moduleName), }}, {name: fmt.Sprintf("Call%s", rd.testSuffix), assertions: assertions{ @@ -68,7 +68,7 @@ func TestHttpIngress(t *testing.T) { {name: fmt.Sprintf("HttpIngress%s", rd.testSuffix), assertions: assertions{ run("ftl", rd.initOpts...), scaffoldTestData(runtime, "httpingress", rd.modulePath), - run("ftl", "deploy", "--wait", rd.moduleRoot), + run("ftl", "deploy", rd.moduleRoot), httpCall(rd, http.MethodGet, "/users/123/posts/456", jsonData(t, obj{}), func(t testing.TB, resp *httpResponse) { assert.Equal(t, 200, resp.status) assert.Equal(t, []string{"Header from FTL"}, resp.headers["Get"]) @@ -180,7 +180,7 @@ func TestDatabase(t *testing.T) { setUpModuleDB(dbName), run("ftl", rd.initOpts...), scaffoldTestData(runtime, "database", rd.modulePath), - run("ftl", "deploy", "--wait", rd.moduleRoot), + run("ftl", "deploy", rd.moduleRoot), call(rd.moduleName, "insert", obj{"data": requestData}, func(t testing.TB, resp obj) {}), validateModuleDB(dbName, requestData), }}, @@ -197,10 +197,10 @@ func TestExternalCalls(t *testing.T) { name: fmt.Sprintf("Call%sFrom%s", strcase.ToUpperCamel(callee), strcase.ToUpperCamel(runtime)), assertions: assertions{ run("ftl", calleeRd.initOpts...), - run("ftl", "deploy", "--wait", calleeRd.moduleRoot), + run("ftl", "deploy", calleeRd.moduleRoot), run("ftl", rd.initOpts...), scaffoldTestData(runtime, "externalcalls", rd.modulePath), - run("ftl", "deploy", "--wait", rd.moduleRoot), + run("ftl", "deploy", rd.moduleRoot), call(rd.moduleName, "call", obj{"name": "Alice"}, func(t testing.TB, resp obj) { message, ok := resp["message"].(string) assert.True(t, ok, "message is not a string")