From d2e8e1e5874180a7616d39ab2922455a8205c8df Mon Sep 17 00:00:00 2001 From: Denise Li Date: Tue, 28 May 2024 22:38:06 -0400 Subject: [PATCH] feat: add database-backed config resolver + provider for production (#1570) Fixes https://github.com/TBD54566975/ftl/issues/1548 This doesn't hook anything up yet - just adds the provider and resolver that we'll use in prod Changes: * Adds `DBConfig[Resolver|Provider]` to the `configuration` package, where the resolver does very little, and the provider executes most of the actual database SQL queries. For DB-backed configs, all the data needed to retrieve a config can be found in the `ref` without a separate key, so the URLs are all simple `db://` stubs that exist only to satisfy the existing provider and resolver interfaces, which expect URLs to be passed as keys. * Refactors the 3 Err types, `isNotFound`, and `translatePGError` out of the `dal` package into a separate `dalerrors` * Adds a public getter for the `db` field in `dal.DAL` so that we can connect to the same DB in the database config resolver/provider --- backend/controller/dal/dal_test.go | 91 +++++++++++++++++++ backend/controller/sql/models.go | 8 ++ backend/controller/sql/querier.go | 4 + backend/controller/sql/queries.sql | 25 ++++- backend/controller/sql/queries.sql.go | 69 ++++++++++++++ backend/controller/sql/schema/001_init.sql | 12 ++- common/configuration/db_config_provider.go | 52 +++++++++++ .../configuration/db_config_provider_test.go | 51 +++++++++++ common/configuration/db_config_resolver.go | 64 +++++++++++++ .../configuration/db_config_resolver_test.go | 25 +++++ .../projectconfig_resolver_test.go | 31 +++++++ 11 files changed, 429 insertions(+), 3 deletions(-) create mode 100644 common/configuration/db_config_provider.go create mode 100644 common/configuration/db_config_provider_test.go create mode 100644 common/configuration/db_config_resolver.go create mode 100644 common/configuration/db_config_resolver_test.go diff --git a/backend/controller/dal/dal_test.go b/backend/controller/dal/dal_test.go index d9658d99e4..7de4d2e6e2 100644 --- a/backend/controller/dal/dal_test.go +++ b/backend/controller/dal/dal_test.go @@ -12,6 +12,7 @@ import ( "github.com/alecthomas/types/optional" "golang.org/x/sync/errgroup" + "github.com/TBD54566975/ftl/backend/controller/sql" "github.com/TBD54566975/ftl/backend/controller/sql/sqltest" ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" "github.com/TBD54566975/ftl/backend/schema" @@ -400,3 +401,93 @@ func assertEventsEqual(t *testing.T, expected, actual []Event) { t.Helper() assert.Equal(t, normaliseEvents(expected), normaliseEvents(actual)) } + +func TestModuleConfiguration(t *testing.T) { + ctx := log.ContextWithNewDefaultLogger(context.Background()) + conn := sqltest.OpenForTesting(ctx, t) + dal, err := New(ctx, conn) + assert.NoError(t, err) + assert.NotZero(t, dal) + + tests := []struct { + TestName string + ModuleSet optional.Option[string] + ModuleGet optional.Option[string] + PresetGlobal bool + }{ + { + "SetModuleGetModule", + optional.Some("echo"), + optional.Some("echo"), + false, + }, + { + "SetGlobalGetGlobal", + optional.None[string](), + optional.None[string](), + false, + }, + { + "SetGlobalGetModule", + optional.None[string](), + optional.Some("echo"), + false, + }, + { + "SetModuleOverridesGlobal", + optional.Some("echo"), + optional.Some("echo"), + true, + }, + } + + b := []byte(`"asdf"`) + for _, test := range tests { + t.Run(test.TestName, func(t *testing.T) { + if test.PresetGlobal { + err := dal.db.SetModuleConfiguration(ctx, optional.None[string](), "configname", []byte(`"qwerty"`)) + assert.NoError(t, err) + } + err := dal.db.SetModuleConfiguration(ctx, test.ModuleSet, "configname", b) + assert.NoError(t, err) + gotBytes, err := dal.db.GetModuleConfiguration(ctx, test.ModuleGet, "configname") + assert.NoError(t, err) + assert.Equal(t, b, gotBytes) + err = dal.db.UnsetModuleConfiguration(ctx, test.ModuleGet, "configname") + assert.NoError(t, err) + }) + } + + t.Run("List", func(t *testing.T) { + sortedList := []sql.ModuleConfiguration{ + { + Module: optional.Some("echo"), + Name: "a", + }, + { + Module: optional.Some("echo"), + Name: "b", + }, + { + Module: optional.None[string](), + Name: "a", + }, + } + + // Insert entries in a separate order from how they should be returned to + // test sorting logic in the SQL query + err := dal.db.SetModuleConfiguration(ctx, sortedList[1].Module, sortedList[1].Name, []byte(`""`)) + assert.NoError(t, err) + err = dal.db.SetModuleConfiguration(ctx, sortedList[2].Module, sortedList[2].Name, []byte(`""`)) + assert.NoError(t, err) + err = dal.db.SetModuleConfiguration(ctx, sortedList[0].Module, sortedList[0].Name, []byte(`""`)) + assert.NoError(t, err) + + gotList, err := dal.db.ListModuleConfiguration(ctx) + assert.NoError(t, err) + for i := range sortedList { + assert.Equal(t, sortedList[i].Module, gotList[i].Module) + assert.Equal(t, sortedList[i].Name, gotList[i].Name) + } + }) +} diff --git a/backend/controller/sql/models.go b/backend/controller/sql/models.go index b9aa73c754..b9f960a272 100644 --- a/backend/controller/sql/models.go +++ b/backend/controller/sql/models.go @@ -427,6 +427,14 @@ type Module struct { Name string } +type ModuleConfiguration struct { + ID int64 + CreatedAt time.Time + Module optional.Option[string] + Name string + Value []byte +} + type Request struct { ID int64 Origin Origin diff --git a/backend/controller/sql/querier.go b/backend/controller/sql/querier.go index 2de1a0ef89..b8647b90e4 100644 --- a/backend/controller/sql/querier.go +++ b/backend/controller/sql/querier.go @@ -59,6 +59,7 @@ type Querier interface { GetIdleRunners(ctx context.Context, labels []byte, limit int64) ([]Runner, error) // Get the runner endpoints corresponding to the given ingress route. GetIngressRoutes(ctx context.Context, method string) ([]GetIngressRoutesRow, error) + GetModuleConfiguration(ctx context.Context, module optional.Option[string], name string) ([]byte, error) GetModulesByID(ctx context.Context, ids []int64) ([]Module, error) GetProcessList(ctx context.Context) ([]GetProcessListRow, error) // Retrieve routing information for a runner. @@ -76,6 +77,7 @@ type Querier interface { // Mark any controller entries that haven't been updated recently as dead. KillStaleControllers(ctx context.Context, timeout time.Duration) (int64, error) KillStaleRunners(ctx context.Context, timeout time.Duration) (int64, error) + ListModuleConfiguration(ctx context.Context) ([]ModuleConfiguration, error) LoadAsyncCall(ctx context.Context, id int64) (AsyncCall, error) NewLease(ctx context.Context, key leases.Key, ttl time.Duration) (uuid.UUID, error) ReleaseLease(ctx context.Context, idempotencyKey uuid.UUID, key leases.Key) (bool, error) @@ -84,6 +86,7 @@ type Querier interface { // Find an idle runner and reserve it for the given deployment. ReserveRunner(ctx context.Context, reservationTimeout time.Time, deploymentKey model.DeploymentKey, labels []byte) (Runner, error) SetDeploymentDesiredReplicas(ctx context.Context, key model.DeploymentKey, minReplicas int32) error + SetModuleConfiguration(ctx context.Context, module optional.Option[string], name string, value []byte) error StartCronJobs(ctx context.Context, keys []string) ([]StartCronJobsRow, error) // Start a new FSM transition, populating the destination state and async call ID. // @@ -91,6 +94,7 @@ type Querier interface { StartFSMTransition(ctx context.Context, arg StartFSMTransitionParams) (FsmInstance, error) SucceedAsyncCall(ctx context.Context, response []byte, iD int64) (bool, error) SucceedFSMInstance(ctx context.Context, fsm schema.RefKey, key string) (bool, error) + UnsetModuleConfiguration(ctx context.Context, module optional.Option[string], name string) error UpsertController(ctx context.Context, key model.ControllerKey, endpoint string) (int64, error) UpsertModule(ctx context.Context, language string, name string) (int64, error) // Upsert a runner and return the deployment ID that it is assigned to, if any. diff --git a/backend/controller/sql/queries.sql b/backend/controller/sql/queries.sql index 29009c527b..6f7fc73c44 100644 --- a/backend/controller/sql/queries.sql +++ b/backend/controller/sql/queries.sql @@ -603,7 +603,6 @@ WHERE fsm = @fsm::schema_ref AND key = @key::TEXT RETURNING true; - -- name: FailFSMInstance :one UPDATE fsm_instances SET @@ -612,4 +611,26 @@ SET status = 'failed'::fsm_status WHERE fsm = @fsm::schema_ref AND key = @key::TEXT -RETURNING true; \ No newline at end of file +RETURNING true; + +-- name: GetModuleConfiguration :one +SELECT value +FROM module_configuration +WHERE + (module IS NULL OR module = @module) + AND name = @name +ORDER BY module NULLS LAST +LIMIT 1; + +-- name: ListModuleConfiguration :many +SELECT * +FROM module_configuration +ORDER BY module, name; + +-- name: SetModuleConfiguration :exec +INSERT INTO module_configuration (module, name, value) +VALUES ($1, $2, $3); + +-- name: UnsetModuleConfiguration :exec +DELETE FROM module_configuration +WHERE module = @module AND name = @name; diff --git a/backend/controller/sql/queries.sql.go b/backend/controller/sql/queries.sql.go index c6409379c9..1343228d47 100644 --- a/backend/controller/sql/queries.sql.go +++ b/backend/controller/sql/queries.sql.go @@ -1106,6 +1106,23 @@ func (q *Queries) GetIngressRoutes(ctx context.Context, method string) ([]GetIng return items, nil } +const getModuleConfiguration = `-- name: GetModuleConfiguration :one +SELECT value +FROM module_configuration +WHERE + (module IS NULL OR module = $1) + AND name = $2 +ORDER BY module NULLS LAST +LIMIT 1 +` + +func (q *Queries) GetModuleConfiguration(ctx context.Context, module optional.Option[string], name string) ([]byte, error) { + row := q.db.QueryRow(ctx, getModuleConfiguration, module, name) + var value []byte + err := row.Scan(&value) + return value, err +} + const getModulesByID = `-- name: GetModulesByID :many SELECT id, language, name FROM modules @@ -1652,6 +1669,38 @@ func (q *Queries) KillStaleRunners(ctx context.Context, timeout time.Duration) ( return count, err } +const listModuleConfiguration = `-- name: ListModuleConfiguration :many +SELECT id, created_at, module, name, value +FROM module_configuration +ORDER BY module, name +` + +func (q *Queries) ListModuleConfiguration(ctx context.Context) ([]ModuleConfiguration, error) { + rows, err := q.db.Query(ctx, listModuleConfiguration) + if err != nil { + return nil, err + } + defer rows.Close() + var items []ModuleConfiguration + for rows.Next() { + var i ModuleConfiguration + if err := rows.Scan( + &i.ID, + &i.CreatedAt, + &i.Module, + &i.Name, + &i.Value, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const loadAsyncCall = `-- name: LoadAsyncCall :one SELECT id, created_at, lease_id, verb, state, origin, scheduled_at, request, response, error, remaining_attempts, backoff, max_backoff FROM async_calls @@ -1788,6 +1837,16 @@ func (q *Queries) SetDeploymentDesiredReplicas(ctx context.Context, key model.De return err } +const setModuleConfiguration = `-- name: SetModuleConfiguration :exec +INSERT INTO module_configuration (module, name, value) +VALUES ($1, $2, $3) +` + +func (q *Queries) SetModuleConfiguration(ctx context.Context, module optional.Option[string], name string, value []byte) error { + _, err := q.db.Exec(ctx, setModuleConfiguration, module, name, value) + return err +} + const startCronJobs = `-- name: StartCronJobs :many WITH updates AS ( UPDATE cron_jobs @@ -1942,6 +2001,16 @@ func (q *Queries) SucceedFSMInstance(ctx context.Context, fsm schema.RefKey, key return column_1, err } +const unsetModuleConfiguration = `-- name: UnsetModuleConfiguration :exec +DELETE FROM module_configuration +WHERE module = $1 AND name = $2 +` + +func (q *Queries) UnsetModuleConfiguration(ctx context.Context, module optional.Option[string], name string) error { + _, err := q.db.Exec(ctx, unsetModuleConfiguration, module, name) + return err +} + const upsertController = `-- name: UpsertController :one INSERT INTO controller (key, endpoint) VALUES ($1, $2) diff --git a/backend/controller/sql/schema/001_init.sql b/backend/controller/sql/schema/001_init.sql index da3cb66ba2..6203dce488 100644 --- a/backend/controller/sql/schema/001_init.sql +++ b/backend/controller/sql/schema/001_init.sql @@ -448,4 +448,14 @@ CREATE TABLE fsm_instances ( CREATE UNIQUE INDEX idx_fsm_instances_fsm_key ON fsm_instances(fsm, key); CREATE INDEX idx_fsm_instances_status ON fsm_instances(status); --- migrate:down \ No newline at end of file +CREATE TABLE module_configuration +( + id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, + created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc'), + module TEXT, -- If NULL, configuration is global. + name TEXT NOT NULL, + value JSONB NOT NULL, + UNIQUE (module, name) +); + +-- migrate:down diff --git a/common/configuration/db_config_provider.go b/common/configuration/db_config_provider.go new file mode 100644 index 0000000000..83ceefaf0d --- /dev/null +++ b/common/configuration/db_config_provider.go @@ -0,0 +1,52 @@ +package configuration + +import ( + "context" + "net/url" + + "github.com/TBD54566975/ftl/backend/controller/dal" + "github.com/alecthomas/types/optional" +) + +// DBConfigProvider is a configuration provider that stores configuration in its key. +type DBConfigProvider struct { + dal DBConfigProviderDAL +} + +type DBConfigProviderDAL interface { + GetModuleConfiguration(ctx context.Context, module optional.Option[string], name string) ([]byte, error) + SetModuleConfiguration(ctx context.Context, module optional.Option[string], name string, value []byte) error + UnsetModuleConfiguration(ctx context.Context, module optional.Option[string], name string) error +} + +var _ MutableProvider[Configuration] = DBConfigProvider{} + +func NewDBConfigProvider(dal DBConfigProviderDAL) DBConfigProvider { + return DBConfigProvider{ + dal: dal, + } +} + +func (DBConfigProvider) Role() Configuration { return Configuration{} } +func (DBConfigProvider) Key() string { return "db" } +func (DBConfigProvider) Writer() bool { return true } + +func (d DBConfigProvider) Load(ctx context.Context, ref Ref, key *url.URL) ([]byte, error) { + value, err := d.dal.GetModuleConfiguration(ctx, ref.Module, ref.Name) + if err != nil { + return nil, dal.ErrNotFound + } + return value, nil +} + +func (d DBConfigProvider) Store(ctx context.Context, ref Ref, value []byte) (*url.URL, error) { + err := d.dal.SetModuleConfiguration(ctx, ref.Module, ref.Name, value) + if err != nil { + return nil, err + } + return &url.URL{Scheme: "db"}, nil +} + +func (d DBConfigProvider) Delete(ctx context.Context, ref Ref) error { + return d.dal.UnsetModuleConfiguration(ctx, ref.Module, ref.Name) +} diff --git a/common/configuration/db_config_provider_test.go b/common/configuration/db_config_provider_test.go new file mode 100644 index 0000000000..3920399d12 --- /dev/null +++ b/common/configuration/db_config_provider_test.go @@ -0,0 +1,51 @@ +package configuration + +import ( + "context" + "net/url" + "testing" + + "github.com/alecthomas/assert/v2" + "github.com/alecthomas/types/optional" +) + +var b = []byte(`""`) + +type mockDBConfigProviderDAL struct{} + +func (mockDBConfigProviderDAL) GetModuleConfiguration(ctx context.Context, module optional.Option[string], name string) ([]byte, error) { + return b, nil +} + +func (mockDBConfigProviderDAL) SetModuleConfiguration(ctx context.Context, module optional.Option[string], name string, value []byte) error { + return nil +} + +func (mockDBConfigProviderDAL) UnsetModuleConfiguration(ctx context.Context, module optional.Option[string], name string) error { + return nil +} + +func TestDBConfigProvider(t *testing.T) { + ctx := context.Background() + provider := NewDBConfigProvider(mockDBConfigProviderDAL{}) + + gotBytes, err := provider.Load(ctx, Ref{ + Module: optional.Some("module"), + Name: "configname", + }, &url.URL{Scheme: "db"}) + assert.NoError(t, err) + assert.Equal(t, b, gotBytes) + + gotURL, err := provider.Store(ctx, Ref{ + Module: optional.Some("module"), + Name: "configname", + }, b) + assert.NoError(t, err) + assert.Equal(t, &url.URL{Scheme: "db"}, gotURL) + + err = provider.Delete(ctx, Ref{ + Module: optional.Some("module"), + Name: "configname", + }) + assert.NoError(t, err) +} diff --git a/common/configuration/db_config_resolver.go b/common/configuration/db_config_resolver.go new file mode 100644 index 0000000000..7f04d00b12 --- /dev/null +++ b/common/configuration/db_config_resolver.go @@ -0,0 +1,64 @@ +package configuration + +import ( + "context" + "net/url" + + "github.com/TBD54566975/ftl/backend/controller/sql" + "github.com/TBD54566975/ftl/internal/slices" +) + +// DBConfigResolver loads values a project's configuration from the given database. +type DBConfigResolver struct { + dal DBConfigResolverDAL +} + +type DBConfigResolverDAL interface { + ListModuleConfiguration(ctx context.Context) ([]sql.ModuleConfiguration, error) +} + +// DBConfigResolver should only be used for config, not secrets +var _ Resolver[Configuration] = DBConfigResolver{} + +func NewDBConfigResolver(db DBConfigResolverDAL) DBConfigResolver { + return DBConfigResolver{dal: db} +} + +func (d DBConfigResolver) Role() Configuration { return Configuration{} } + +func (d DBConfigResolver) Get(ctx context.Context, ref Ref) (*url.URL, error) { + return urlPtr(), nil +} + +func (d DBConfigResolver) List(ctx context.Context) ([]Entry, error) { + configs, err := d.dal.ListModuleConfiguration(ctx) + if err != nil { + return nil, err + } + return slices.Map(configs, func(c sql.ModuleConfiguration) Entry { + return Entry{ + Ref: Ref{ + Module: c.Module, + Name: c.Name, + }, + Accessor: urlPtr(), + } + }), nil +} + +func (d DBConfigResolver) Set(ctx context.Context, ref Ref, key *url.URL) error { + // Writing to the DB is performed by DBConfigProvider, so this function is a NOOP + return nil +} + +func (d DBConfigResolver) Unset(ctx context.Context, ref Ref) error { + // Writing to the DB is performed by DBConfigProvider, so this function is a NOOP + return nil +} + +func urlPtr() *url.URL { + // The URLs for Database-provided configs are not actually used because all the + // information needed to load each config is contained in the Ref, so we pass + // around an empty "db://" to satisfy the expectations of the Resolver interface. + return &url.URL{Scheme: "db"} +} diff --git a/common/configuration/db_config_resolver_test.go b/common/configuration/db_config_resolver_test.go new file mode 100644 index 0000000000..5204d4f9a3 --- /dev/null +++ b/common/configuration/db_config_resolver_test.go @@ -0,0 +1,25 @@ +package configuration + +import ( + "context" + "testing" + + "github.com/TBD54566975/ftl/backend/controller/sql" + "github.com/alecthomas/assert/v2" +) + +type mockDBConfigResolverDAL struct{} + +func (mockDBConfigResolverDAL) ListModuleConfiguration(ctx context.Context) ([]sql.ModuleConfiguration, error) { + return []sql.ModuleConfiguration{}, nil +} + +func TestDBConfigResolverList(t *testing.T) { + ctx := context.Background() + resolver := NewDBConfigResolver(mockDBConfigResolverDAL{}) + expected := []Entry{} + + entries, err := resolver.List(ctx) + assert.Equal(t, entries, expected) + assert.NoError(t, err) +} diff --git a/common/configuration/projectconfig_resolver_test.go b/common/configuration/projectconfig_resolver_test.go index ceda91717b..f8128b9c0c 100644 --- a/common/configuration/projectconfig_resolver_test.go +++ b/common/configuration/projectconfig_resolver_test.go @@ -40,6 +40,37 @@ func TestSet(t *testing.T) { }) } +func TestGetGlobal(t *testing.T) { + config := filepath.Join(t.TempDir(), "ftl-project.toml") + existing, err := os.ReadFile("testdata/ftl-project.toml") + assert.NoError(t, err) + err = os.WriteFile(config, existing, 0600) + assert.NoError(t, err) + + t.Run("ExistingModule", func(t *testing.T) { + setAndAssert(t, "echo", []string{config}) + }) + ctx := log.ContextWithNewDefaultLogger(context.Background()) + + cf, err := New(ctx, + ProjectConfigResolver[Configuration]{Config: []string{config}}, + []Provider[Configuration]{ + EnvarProvider[Configuration]{}, + InlineProvider[Configuration]{Inline: true}, // Writer + }) + assert.NoError(t, err) + + var got *url.URL + want := URL("inline://qwertyqwerty") + err = cf.Set(ctx, Ref{Module: optional.None[string](), Name: "default"}, want) + assert.NoError(t, err) + err = cf.Get(ctx, Ref{Module: optional.Some[string]("somemodule"), Name: "default"}, &got) + assert.NoError(t, err) + + // Get should return `want` even though it was only set globally + assert.Equal(t, want, got) +} + func setAndAssert(t *testing.T, module string, config []string) { t.Helper()