Skip to content

Commit

Permalink
chore: move schema service implementation in controller to a separate…
Browse files Browse the repository at this point in the history
… file (#3913)
  • Loading branch information
jvmakine authored Jan 7, 2025
1 parent 2652c38 commit bc79df5
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 50 deletions.
50 changes: 0 additions & 50 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
ftlv1 "github.com/block/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/block/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
"github.com/block/ftl/backend/runner/pubsub"
schemapb "github.com/block/ftl/common/protos/xyz/block/ftl/schema/v1"
"github.com/block/ftl/common/schema"
"github.com/block/ftl/common/sha256"
"github.com/block/ftl/common/slices"
Expand Down Expand Up @@ -325,55 +324,6 @@ func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusR
return connect.NewResponse(resp), nil
}

func (s *Service) GetSchema(ctx context.Context, c *connect.Request[ftlv1.GetSchemaRequest]) (*connect.Response[ftlv1.GetSchemaResponse], error) {
view, err := s.controllerState.View(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get controller state: %w", err)
}
schemas := view.GetActiveDeploymentSchemas()
modules := []*schemapb.Module{
schema.Builtins().ToProto(),
}
modules = append(modules, slices.Map(schemas, func(d *schema.Module) *schemapb.Module { return d.ToProto() })...)
return connect.NewResponse(&ftlv1.GetSchemaResponse{Schema: &schemapb.Schema{Modules: modules}}), nil
}

func (s *Service) PullSchema(ctx context.Context, req *connect.Request[ftlv1.PullSchemaRequest], stream *connect.ServerStream[ftlv1.PullSchemaResponse]) error {
return s.watchModuleChanges(ctx, func(response *ftlv1.PullSchemaResponse) error {
return stream.Send(response)
})
}

func (s *Service) UpdateDeploymentRuntime(ctx context.Context, req *connect.Request[ftlv1.UpdateDeploymentRuntimeRequest]) (*connect.Response[ftlv1.UpdateDeploymentRuntimeResponse], error) {
deployment, err := model.ParseDeploymentKey(req.Msg.Deployment)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid deployment key: %w", err))
}
view, err := s.controllerState.View(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get controller state: %w", err)
}
dep, err := view.GetDeployment(deployment)
if err != nil {
return nil, fmt.Errorf("could not get schema: %w", err)
}
module := dep.Schema
if module.Runtime == nil {
module.Runtime = &schema.ModuleRuntime{}
}
event := schema.ModuleRuntimeEventFromProto(req.Msg.Event)
module.Runtime.ApplyEvent(event)
err = s.controllerState.Publish(ctx, &state.DeploymentSchemaUpdatedEvent{
Key: deployment,
Schema: module,
})
if err != nil {
return nil, fmt.Errorf("could not update schema for module %s: %w", module.Name, err)
}

return connect.NewResponse(&ftlv1.UpdateDeploymentRuntimeResponse{}), nil
}

func (s *Service) UpdateDeploy(ctx context.Context, req *connect.Request[ftlv1.UpdateDeployRequest]) (response *connect.Response[ftlv1.UpdateDeployResponse], err error) {
deploymentKey, err := model.ParseDeploymentKey(req.Msg.DeploymentKey)
if err != nil {
Expand Down
64 changes: 64 additions & 0 deletions backend/controller/schemaservice.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package controller

import (
"context"
"fmt"

"connectrpc.com/connect"

"github.com/block/ftl/backend/controller/state"
ftlv1 "github.com/block/ftl/backend/protos/xyz/block/ftl/v1"
schemapb "github.com/block/ftl/common/protos/xyz/block/ftl/schema/v1"
"github.com/block/ftl/common/schema"
"github.com/block/ftl/common/slices"
"github.com/block/ftl/internal/model"
)

func (s *Service) GetSchema(ctx context.Context, c *connect.Request[ftlv1.GetSchemaRequest]) (*connect.Response[ftlv1.GetSchemaResponse], error) {
view, err := s.controllerState.View(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get controller state: %w", err)
}
schemas := view.GetActiveDeploymentSchemas()
modules := []*schemapb.Module{
schema.Builtins().ToProto(),
}
modules = append(modules, slices.Map(schemas, func(d *schema.Module) *schemapb.Module { return d.ToProto() })...)
return connect.NewResponse(&ftlv1.GetSchemaResponse{Schema: &schemapb.Schema{Modules: modules}}), nil
}

func (s *Service) PullSchema(ctx context.Context, req *connect.Request[ftlv1.PullSchemaRequest], stream *connect.ServerStream[ftlv1.PullSchemaResponse]) error {
return s.watchModuleChanges(ctx, func(response *ftlv1.PullSchemaResponse) error {
return stream.Send(response)
})
}

func (s *Service) UpdateDeploymentRuntime(ctx context.Context, req *connect.Request[ftlv1.UpdateDeploymentRuntimeRequest]) (*connect.Response[ftlv1.UpdateDeploymentRuntimeResponse], error) {
deployment, err := model.ParseDeploymentKey(req.Msg.Deployment)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid deployment key: %w", err))
}
view, err := s.controllerState.View(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get controller state: %w", err)
}
dep, err := view.GetDeployment(deployment)
if err != nil {
return nil, fmt.Errorf("could not get schema: %w", err)
}
module := dep.Schema
if module.Runtime == nil {
module.Runtime = &schema.ModuleRuntime{}
}
event := schema.ModuleRuntimeEventFromProto(req.Msg.Event)
module.Runtime.ApplyEvent(event)
err = s.controllerState.Publish(ctx, &state.DeploymentSchemaUpdatedEvent{
Key: deployment,
Schema: module,
})
if err != nil {
return nil, fmt.Errorf("could not update schema for module %s: %w", module.Name, err)
}

return connect.NewResponse(&ftlv1.UpdateDeploymentRuntimeResponse{}), nil
}

0 comments on commit bc79df5

Please sign in to comment.