diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 6c6c575890..9dfba78e1e 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -2,6 +2,7 @@ package controller import ( "context" + "encoding/binary" "errors" "fmt" "io" @@ -684,31 +685,70 @@ nextModule: func (s *Service) GetModuleContext(ctx context.Context, req *connect.Request[ftlv1.ModuleContextRequest], resp *connect.ServerStream[ftlv1.ModuleContextResponse]) error { name := req.Msg.Module - // TODO migrate to a polling implementation that only emits responses when the configuration changes - cm := cf.ConfigFromContext(ctx) sm := cf.SecretsFromContext(ctx) - configs, err := cm.MapForModule(ctx, name) - if err != nil { - return connect.NewError(connect.CodeInternal, fmt.Errorf("could not get configs: %w", err)) - } - secrets, err := sm.MapForModule(ctx, name) - if err != nil { - return connect.NewError(connect.CodeInternal, fmt.Errorf("could not get secrets: %w", err)) - } - databases, err := modulecontext.DatabasesFromSecrets(ctx, name, secrets) - if err != nil { - return connect.NewError(connect.CodeInternal, fmt.Errorf("could not get databases: %w", err)) - } + lastChecksum := int64(0) + + for { + select { + case <-ctx.Done(): + return nil + case <-time.After(30 * time.Second): + } + + configs, err := cm.MapForModule(ctx, name) + if err != nil { + return connect.NewError(connect.CodeInternal, fmt.Errorf("could not get configs: %w", err)) + } + secrets, err := sm.MapForModule(ctx, name) + if err != nil { + return connect.NewError(connect.CodeInternal, fmt.Errorf("could not get secrets: %w", err)) + } + databases, err := modulecontext.DatabasesFromSecrets(ctx, name, secrets) + if err != nil { + return connect.NewError(connect.CodeInternal, fmt.Errorf("could not get databases: %w", err)) + } + + checksum := configurationMapChecksum(configs) + checksum = (checksum * 115163) + configurationMapChecksum(secrets) + checksum = (checksum * 454213) + configurationDatabaseChecksum(databases) - response := modulecontext.NewBuilder(name).AddConfigs(configs).AddSecrets(secrets).AddDatabases(databases).Build().ToProto() + if checksum != lastChecksum { + response := modulecontext.NewBuilder(name).AddConfigs(configs).AddSecrets(secrets).AddDatabases(databases).Build().ToProto() + + if err := resp.Send(response); err != nil { + return connect.NewError(connect.CodeInternal, fmt.Errorf("could not send response: %w", err)) + } - if err := resp.Send(response); err != nil { - return connect.NewError(connect.CodeInternal, fmt.Errorf("could not send response: %w", err)) + lastChecksum = checksum + } } +} - return nil +// configurationMapChecksum computes a checksum on the map that is order invariant. +// +// This operation is used to detect configuration change. +func configurationMapChecksum(m map[string][]byte) int64 { + sum := int64(0) + for k, v := range m { + data := sha256.Sum(append([]byte(k), v...)) + sum += int64(binary.BigEndian.Uint64(data[0:8])) + } + return sum +} + +// configurationMapChecksum computes a checksum on the database map that is order invariant. +// +// This operation is used to detect configuration change. +func configurationDatabaseChecksum(m map[string]modulecontext.Database) int64 { + sum := int64(0) + for k, v := range m { + // currently, only the DSN is treated as mutable configuration + data := sha256.Sum(append([]byte(k), []byte(v.DSN)...)) + sum += int64(binary.BigEndian.Uint64(data[0:8])) + } + return sum } // AcquireLease acquires a lease on behalf of a module.