diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 8f9884afac..859fb3a91b 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -642,7 +642,12 @@ nextModule: // GetModuleContext retrieves config, secrets and DSNs for a module. func (s *Service) GetModuleContext(ctx context.Context, req *connect.Request[ftlv1.ModuleContextRequest]) (*connect.Response[ftlv1.ModuleContextResponse], error) { - return nil, fmt.Errorf("not implemented") + // get module schema + schemas, err := s.dal.GetActiveDeploymentSchemas(ctx) + if err != nil { + return nil, err + } + return moduleContextToProto(ctx, req.Msg.Module, schemas) } func (s *Service) Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) { diff --git a/backend/controller/module_context.go b/backend/controller/module_context.go new file mode 100644 index 0000000000..1c41c9df9b --- /dev/null +++ b/backend/controller/module_context.go @@ -0,0 +1,97 @@ +package controller + +import ( + "context" + "fmt" + "os" + "strings" + + "connectrpc.com/connect" + ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" + "github.com/TBD54566975/ftl/backend/schema" + cf "github.com/TBD54566975/ftl/common/configuration" + "github.com/TBD54566975/ftl/internal/slices" +) + +func moduleContextToProto(ctx context.Context, name string, schemas []*schema.Module) (*connect.Response[ftlv1.ModuleContextResponse], error) { + schemas = slices.Filter(schemas, func(s *schema.Module) bool { + return s.Name == name + }) + if len(schemas) == 0 { + return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("no schema found for module %q", name)) + } else if len(schemas) > 1 { + return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("multiple schemas found for module %q", name)) + } + + // configs + configManager := cf.ConfigFromContext(ctx) + configMap, err := bytesMapFromConfigManager(ctx, configManager, name) + if err != nil { + return nil, err + } + + // secrets + secretsManager := cf.SecretsFromContext(ctx) + secretsMap, err := bytesMapFromConfigManager(ctx, secretsManager, name) + if err != nil { + return nil, err + } + + // DSNs + dsnProtos := []*ftlv1.ModuleContextResponse_DSN{} + for _, decl := range schemas[0].Decls { + dbDecl, ok := decl.(*schema.Database) + if !ok { + continue + } + key := fmt.Sprintf("FTL_POSTGRES_DSN_%s_%s", strings.ToUpper(name), strings.ToUpper(dbDecl.Name)) + dsn, ok := os.LookupEnv(key) + if !ok { + return nil, fmt.Errorf("missing environment variable %q", key) + } + dsnProtos = append(dsnProtos, &ftlv1.ModuleContextResponse_DSN{ + Name: dbDecl.Name, + Type: ftlv1.ModuleContextResponse_POSTGRES, + Dsn: dsn, + }) + } + + return connect.NewResponse(&ftlv1.ModuleContextResponse{ + Configs: configMap, + Secrets: secretsMap, + Databases: dsnProtos, + }), nil +} + +func bytesMapFromConfigManager[R cf.Role](ctx context.Context, manager *cf.Manager[R], moduleName string) (map[string][]byte, error) { + configList, err := manager.List(ctx) + if err != nil { + return nil, err + } + + // module specific values must override global values + // put module specific values into moduleConfigMap, then merge with configMap + configMap := map[string][]byte{} + moduleConfigMap := map[string][]byte{} + + for _, entry := range configList { + refModule, isModuleSpecific := entry.Module.Get() + if isModuleSpecific && refModule != moduleName { + continue + } + data, err := manager.GetData(ctx, entry.Ref) + if err != nil { + return nil, err + } + if !isModuleSpecific { + configMap[entry.Ref.Name] = data + } else { + moduleConfigMap[entry.Ref.Name] = data + } + } + + for name, data := range moduleConfigMap { + configMap[name] = data + } + return configMap, nil +} diff --git a/backend/controller/module_context_test.go b/backend/controller/module_context_test.go new file mode 100644 index 0000000000..a53a092d6a --- /dev/null +++ b/backend/controller/module_context_test.go @@ -0,0 +1,55 @@ +package controller + +import ( + "context" + "fmt" + "testing" + + "github.com/TBD54566975/ftl/backend/schema" + cf "github.com/TBD54566975/ftl/common/configuration" + "github.com/TBD54566975/ftl/internal/log" + "github.com/alecthomas/assert/v2" + "github.com/alecthomas/types/optional" +) + +func TestModuleContextProto(t *testing.T) { + ctx := log.ContextWithNewDefaultLogger(context.Background()) + + moduleName := "test" + + cp := cf.NewInMemoryProvider[cf.Configuration]() + cr := cf.NewInMemoryResolver[cf.Configuration]() + cm, err := cf.New(ctx, cr, []cf.Provider[cf.Configuration]{cp}) + assert.NoError(t, err) + ctx = cf.ContextWithConfig(ctx, cm) + + sp := cf.NewInMemoryProvider[cf.Secrets]() + sr := cf.NewInMemoryResolver[cf.Secrets]() + sm, err := cf.New(ctx, sr, []cf.Provider[cf.Secrets]{sp}) + assert.NoError(t, err) + ctx = cf.ContextWithSecrets(ctx, sm) + + // Set 50 configs and 50 global configs + // It's hard to tell if module config beats global configs because we are dealing with unordered maps, or because the logic is correct + // Repeating it 50 times hopefully gives us a good chance of catching inconsistencies + for i := range 50 { + key := fmt.Sprintf("key%d", i) + + strValue := "HelloWorld" + globalStrValue := "GlobalHelloWorld" + assert.NoError(t, cm.Set(ctx, cf.Ref{Module: optional.Some(moduleName), Name: key}, strValue)) + assert.NoError(t, cm.Set(ctx, cf.Ref{Module: optional.None[string](), Name: key}, globalStrValue)) + } + + response, err := moduleContextToProto(ctx, moduleName, []*schema.Module{ + { + Name: moduleName, + }, + }) + assert.NoError(t, err) + + for i := range 50 { + key := fmt.Sprintf("key%d", i) + assert.Equal(t, "\"HelloWorld\"", string(response.Msg.Configs[key]), "module configs should beat global configs") + } +} diff --git a/cmd/ftl/main.go b/cmd/ftl/main.go index 697ab3ed05..59cb4991a6 100644 --- a/cmd/ftl/main.go +++ b/cmd/ftl/main.go @@ -8,7 +8,6 @@ import ( "os/signal" "runtime" "strconv" - "strings" "syscall" "github.com/alecthomas/kong" @@ -92,10 +91,19 @@ func main() { kctx.BindTo(sr, (*cf.Resolver[cf.Secrets])(nil)) kctx.BindTo(cr, (*cf.Resolver[cf.Configuration])(nil)) - // Propagate to runner processes. - // TODO: This is a bit of a hack until we get proper configuration - // management through the Controller. - os.Setenv("FTL_CONFIG", strings.Join(projectconfig.ConfigPaths(cli.ConfigFlag), ",")) + // Add config manager to context. + cm, err := cf.NewConfigurationManager(ctx, cr) + if err != nil { + kctx.Fatalf(err.Error()) + } + ctx = cf.ContextWithConfig(ctx, cm) + + // Add secrets manager to context. + sm, err := cf.NewSecretsManager(ctx, sr) + if err != nil { + kctx.Fatalf(err.Error()) + } + ctx = cf.ContextWithSecrets(ctx, sm) // Handle signals. sigch := make(chan os.Signal, 1) diff --git a/common/configuration/in_memory_provider.go b/common/configuration/in_memory_provider.go new file mode 100644 index 0000000000..3c62eff3a6 --- /dev/null +++ b/common/configuration/in_memory_provider.go @@ -0,0 +1,44 @@ +package configuration + +import ( + "context" + "fmt" + "net/url" +) + +// InMemoryProvider is a configuration provider that keeps values in memory +type InMemoryProvider[R Role] struct { + values map[Ref][]byte +} + +var _ MutableProvider[Configuration] = &InMemoryProvider[Configuration]{} + +func NewInMemoryProvider[R Role]() *InMemoryProvider[R] { + return &InMemoryProvider[R]{values: map[Ref][]byte{}} +} + +func (p *InMemoryProvider[R]) Role() R { var r R; return r } +func (p *InMemoryProvider[R]) Key() string { return "grpc" } + +func (p *InMemoryProvider[R]) Load(ctx context.Context, ref Ref, key *url.URL) ([]byte, error) { + if bytes, found := p.values[ref]; found { + return bytes, nil + } + return nil, fmt.Errorf("key %q not found", ref.Name) +} + +func (p *InMemoryProvider[R]) Writer() bool { + return true +} + +// Store a configuration value and return its key. +func (p *InMemoryProvider[R]) Store(ctx context.Context, ref Ref, value []byte) (*url.URL, error) { + p.values[ref] = value + return &url.URL{Scheme: p.Key()}, nil +} + +// Delete a configuration value. +func (p *InMemoryProvider[R]) Delete(ctx context.Context, ref Ref) error { + delete(p.values, ref) + return nil +} diff --git a/common/configuration/in_memory_resolver.go b/common/configuration/in_memory_resolver.go new file mode 100644 index 0000000000..ee0ca59eb5 --- /dev/null +++ b/common/configuration/in_memory_resolver.go @@ -0,0 +1,45 @@ +package configuration + +import ( + "context" + "fmt" + "net/url" +) + +type InMemoryResolver[R Role] struct { + keyMap map[Ref]*url.URL +} + +var _ Resolver[Configuration] = InMemoryResolver[Configuration]{} +var _ Resolver[Secrets] = InMemoryResolver[Secrets]{} + +func NewInMemoryResolver[R Role]() *InMemoryResolver[R] { + return &InMemoryResolver[R]{keyMap: map[Ref]*url.URL{}} +} + +func (k InMemoryResolver[R]) Role() R { var r R; return r } + +func (k InMemoryResolver[R]) Get(ctx context.Context, ref Ref) (*url.URL, error) { + if key, found := k.keyMap[ref]; found { + return key, nil + } + return nil, fmt.Errorf("key %q not found", ref.Name) +} + +func (k InMemoryResolver[R]) List(ctx context.Context) ([]Entry, error) { + entries := []Entry{} + for ref, url := range k.keyMap { + entries = append(entries, Entry{Ref: ref, Accessor: url}) + } + return entries, nil +} + +func (k InMemoryResolver[R]) Set(ctx context.Context, ref Ref, key *url.URL) error { + k.keyMap[ref] = key + return nil +} + +func (k InMemoryResolver[R]) Unset(ctx context.Context, ref Ref) error { + delete(k.keyMap, ref) + return nil +} diff --git a/common/configuration/manager.go b/common/configuration/manager.go index 5718e54718..5fe6f872c6 100644 --- a/common/configuration/manager.go +++ b/common/configuration/manager.go @@ -64,10 +64,9 @@ func (m *Manager[R]) Mutable() error { return fmt.Errorf("no writeable configuration provider available, specify one of %s", strings.Join(writers, ", ")) } -// Get a configuration value from the active providers. -// -// "value" must be a pointer to a Go type that can be unmarshalled from JSON. -func (m *Manager[R]) Get(ctx context.Context, ref Ref, value any) error { +// GetData returns a data value for a configuration from the active providers. +// The data can be unmarshalled from JSON. +func (m *Manager[R]) GetData(ctx context.Context, ref Ref) ([]byte, error) { key, err := m.resolver.Get(ctx, ref) // Try again at the global scope if the value is not found in module scope. if ref.Module.Ok() && errors.Is(err, ErrNotFound) { @@ -75,38 +74,56 @@ func (m *Manager[R]) Get(ctx context.Context, ref Ref, value any) error { gref.Module = optional.None[string]() key, err = m.resolver.Get(ctx, gref) if err != nil { - return err + return nil, err } } else if err != nil { - return err + return nil, err } provider, ok := m.providers[key.Scheme] if !ok { - return fmt.Errorf("no provider for scheme %q", key.Scheme) + return nil, fmt.Errorf("no provider for scheme %q", key.Scheme) } data, err := provider.Load(ctx, ref, key) if err != nil { - return fmt.Errorf("%s: %w", ref, err) + return nil, fmt.Errorf("%s: %w", ref, err) + } + return data, nil +} + +// Get a configuration value from the active providers. +// +// "value" must be a pointer to a Go type that can be unmarshalled from JSON. +func (m *Manager[R]) Get(ctx context.Context, ref Ref, value any) error { + data, err := m.GetData(ctx, ref) + if err != nil { + return err } return json.Unmarshal(data, value) } -// Set a configuration value in the active writing provider. +// SetData updates the configuration value in the active writing provider as raw bytes. // -// "value" must be a Go type that can be marshalled to JSON. -func (m *Manager[R]) Set(ctx context.Context, ref Ref, value any) error { +// "data" must be bytes that can be unmarshalled into a Go type. +func (m *Manager[R]) SetData(ctx context.Context, ref Ref, data []byte) error { if err := m.Mutable(); err != nil { return err } - data, err := json.Marshal(value) + key, err := m.writer.Store(ctx, ref, data) if err != nil { return err } - key, err := m.writer.Store(ctx, ref, data) + return m.resolver.Set(ctx, ref, key) +} + +// Set a configuration value in the active writing provider. +// +// "value" must be a Go type that can be marshalled to JSON. +func (m *Manager[R]) Set(ctx context.Context, ref Ref, value any) error { + data, err := json.Marshal(value) if err != nil { return err } - return m.resolver.Set(ctx, ref, key) + return m.SetData(ctx, ref, data) } // Unset a configuration value in all providers. diff --git a/go-runtime/ftl/database.go b/go-runtime/ftl/database.go index efdef16420..48f750d490 100644 --- a/go-runtime/ftl/database.go +++ b/go-runtime/ftl/database.go @@ -1,25 +1,34 @@ package ftl import ( + "context" "database/sql" "fmt" - "os" - "strings" + + "github.com/TBD54566975/ftl/go-runtime/modulecontext" _ "github.com/jackc/pgx/v5/stdlib" // Register Postgres driver ) -// PostgresDatabase returns a Postgres database connection for the named database. -func PostgresDatabase(name string) *sql.DB { - module := strings.ToUpper(callerModule()) - key := fmt.Sprintf("FTL_POSTGRES_DSN_%s_%s", module, strings.ToUpper(name)) - dsn, ok := os.LookupEnv(key) - if !ok { - panic(fmt.Sprintf("missing DSN environment variable %s", key)) +type Database struct { + Name string +} + +// PostgresDatabase returns a handler for the named database. +func PostgresDatabase(name string) Database { + return Database{ + Name: name, } - db, err := sql.Open("pgx", dsn) +} + +func (d Database) String() string { return fmt.Sprintf("database %q", d.Name) } + +// Get returns the sql db connection for the database. +func (d Database) Get(ctx context.Context) *sql.DB { + provider := modulecontext.DBProviderFromContext(ctx) + db, err := provider.Get(d.Name) if err != nil { - panic(fmt.Sprintf("failed to open database: %s", err)) + panic(err.Error()) } return db } diff --git a/go-runtime/modulecontext/db_provider.go b/go-runtime/modulecontext/db_provider.go new file mode 100644 index 0000000000..a121ee6de1 --- /dev/null +++ b/go-runtime/modulecontext/db_provider.go @@ -0,0 +1,81 @@ +package modulecontext + +import ( + "context" + "database/sql" + "fmt" + "strconv" + + ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" + + _ "github.com/jackc/pgx/v5/stdlib" // SQL driver +) + +type DBType int32 + +const ( + DBTypePostgres = DBType(ftlv1.ModuleContextResponse_POSTGRES) +) + +func (x DBType) String() string { + switch x { + case DBTypePostgres: + return "Postgres" + default: + panic(fmt.Sprintf("unknown DB type: %s", strconv.Itoa(int(x)))) + } +} + +type dbEntry struct { + dsn string + dbType DBType + db *sql.DB +} + +// DBProvider takes in DSNs and holds a *sql.DB for each +// this allows us to: +// - pool db connections, rather than initializing anew each time +// - validate DSNs at startup, rather than returning errors or panicking at Database.Get() +type DBProvider struct { + entries map[string]dbEntry +} + +type contextKeyDSNProvider struct{} + +func NewDBProvider() *DBProvider { + return &DBProvider{ + entries: map[string]dbEntry{}, + } +} + +func ContextWithDBProvider(ctx context.Context, provider *DBProvider) context.Context { + return context.WithValue(ctx, contextKeyDSNProvider{}, provider) +} + +func DBProviderFromContext(ctx context.Context) *DBProvider { + m, ok := ctx.Value(contextKeyDSNProvider{}).(*DBProvider) + if !ok { + panic("no db provider in context") + } + return m +} + +func (d *DBProvider) Add(name string, dbType DBType, dsn string) error { + db, err := sql.Open("pgx", dsn) + if err != nil { + return err + } + d.entries[name] = dbEntry{ + dsn: dsn, + db: db, + dbType: dbType, + } + return nil +} + +func (d *DBProvider) Get(name string) (*sql.DB, error) { + if entry, ok := d.entries[name]; ok { + return entry.db, nil + } + return nil, fmt.Errorf("missing DSN for database %s", name) +} diff --git a/go-runtime/modulecontext/db_provider_test.go b/go-runtime/modulecontext/db_provider_test.go new file mode 100644 index 0000000000..66cf14b157 --- /dev/null +++ b/go-runtime/modulecontext/db_provider_test.go @@ -0,0 +1,24 @@ +package modulecontext + +import ( + "context" + "testing" + + "github.com/TBD54566975/ftl/internal/log" + "github.com/alecthomas/assert/v2" +) + +func TestValidDSN(t *testing.T) { + dbProvider := NewDBProvider() + dsn := "postgres://localhost:54320/echo?sslmode=disable&user=postgres&password=secret" + err := dbProvider.Add("test", DBTypePostgres, dsn) + assert.NoError(t, err, "expected no error for valid DSN") + assert.Equal(t, dbProvider.entries["test"].dsn, dsn, "expected DSN to be set and unmodified") +} + +func TestGettingAndSettingFromContext(t *testing.T) { + ctx := log.ContextWithNewDefaultLogger(context.Background()) + dbProvider := NewDBProvider() + ctx = ContextWithDBProvider(ctx, dbProvider) + assert.Equal(t, dbProvider, DBProviderFromContext(ctx), "expected dbProvider to be set and retrieved correctly") +} diff --git a/go-runtime/modulecontext/modulecontext.go b/go-runtime/modulecontext/modulecontext.go new file mode 100644 index 0000000000..0f9b2f2441 --- /dev/null +++ b/go-runtime/modulecontext/modulecontext.go @@ -0,0 +1,73 @@ +package modulecontext + +import ( + "context" + + ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" + cf "github.com/TBD54566975/ftl/common/configuration" + "github.com/alecthomas/types/optional" +) + +// ModuleContext holds the context needed for a module, including configs, secrets and DSNs +type ModuleContext struct { + configManager *cf.Manager[cf.Configuration] + secretsManager *cf.Manager[cf.Secrets] + dbProvider *DBProvider +} + +func NewFromProto(ctx context.Context, moduleName string, response *ftlv1.ModuleContextResponse) (*ModuleContext, error) { + cm, err := newInMemoryConfigManager[cf.Configuration](ctx) + if err != nil { + return nil, err + } + sm, err := newInMemoryConfigManager[cf.Secrets](ctx) + if err != nil { + return nil, err + } + moduleCtx := &ModuleContext{ + configManager: cm, + secretsManager: sm, + dbProvider: NewDBProvider(), + } + + if err := addConfigOrSecrets[cf.Configuration](ctx, *moduleCtx.configManager, response.Configs, moduleName); err != nil { + return nil, err + } + if err := addConfigOrSecrets[cf.Secrets](ctx, *moduleCtx.secretsManager, response.Secrets, moduleName); err != nil { + return nil, err + } + for _, entry := range response.Databases { + if err = moduleCtx.dbProvider.Add(entry.Name, DBType(entry.Type), entry.Dsn); err != nil { + return nil, err + } + } + return moduleCtx, nil +} + +func newInMemoryConfigManager[R cf.Role](ctx context.Context) (*cf.Manager[R], error) { + provider := cf.NewInMemoryProvider[R]() + resolver := cf.NewInMemoryResolver[R]() + manager, err := cf.New(ctx, resolver, []cf.Provider[R]{provider}) + if err != nil { + return nil, err + } + return manager, nil +} + +func addConfigOrSecrets[R cf.Role](ctx context.Context, manager cf.Manager[R], valueMap map[string][]byte, moduleName string) error { + for name, data := range valueMap { + if err := manager.SetData(ctx, cf.Ref{Module: optional.Some(moduleName), Name: name}, data); err != nil { + return err + } + } + return nil +} + +// ApplyToContext sets up the context so that configurations, secrets and DSNs can be retreived +// Each of these components have accessors to get a manager back from the context +func (m *ModuleContext) ApplyToContext(ctx context.Context) context.Context { + ctx = ContextWithDBProvider(ctx, m.dbProvider) + ctx = cf.ContextWithConfig(ctx, m.configManager) + ctx = cf.ContextWithSecrets(ctx, m.secretsManager) + return ctx +} diff --git a/go-runtime/server/server.go b/go-runtime/server/server.go index 34810fb947..b305d97645 100644 --- a/go-runtime/server/server.go +++ b/go-runtime/server/server.go @@ -10,10 +10,10 @@ import ( ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect" - cf "github.com/TBD54566975/ftl/common/configuration" "github.com/TBD54566975/ftl/common/plugin" "github.com/TBD54566975/ftl/go-runtime/encoding" "github.com/TBD54566975/ftl/go-runtime/ftl" + "github.com/TBD54566975/ftl/go-runtime/modulecontext" "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/maps" "github.com/TBD54566975/ftl/internal/observability" @@ -34,21 +34,17 @@ func NewUserVerbServer(moduleName string, handlers ...Handler) plugin.Constructo verbServiceClient := rpc.Dial(ftlv1connect.NewVerbServiceClient, uc.FTLEndpoint.String(), log.Error) ctx = rpc.ContextWithClient(ctx, verbServiceClient) - // Add config manager to context. - cr := &cf.ProjectConfigResolver[cf.Configuration]{Config: uc.Config} - cm, err := cf.NewConfigurationManager(ctx, cr) + resp, err := verbServiceClient.GetModuleContext(ctx, connect.NewRequest(&ftlv1.ModuleContextRequest{ + Module: moduleName, + })) if err != nil { - return nil, nil, err + return nil, nil, fmt.Errorf("could not get config: %w", err) } - ctx = cf.ContextWithConfig(ctx, cm) - - // Add secrets manager to context. - sr := &cf.ProjectConfigResolver[cf.Secrets]{Config: uc.Config} - sm, err := cf.NewSecretsManager(ctx, sr) + moduleCtx, err := modulecontext.NewFromProto(ctx, moduleName, resp.Msg) if err != nil { return nil, nil, err } - ctx = cf.ContextWithSecrets(ctx, sm) + ctx = moduleCtx.ApplyToContext(ctx) err = observability.Init(ctx, moduleName, "HEAD", uc.ObservabilityConfig) if err != nil {