Skip to content

Commit

Permalink
fixes #1699
Browse files Browse the repository at this point in the history
`GetModuleContext` will stream updated `ModuleContext` to the runner.
  • Loading branch information
jonathanj-square committed Jun 24, 2024
1 parent 9964a96 commit f48ae05
Showing 1 changed file with 58 additions and 18 deletions.
76 changes: 58 additions & 18 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controller

import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -684,31 +685,70 @@ nextModule:
func (s *Service) GetModuleContext(ctx context.Context, req *connect.Request[ftlv1.ModuleContextRequest], resp *connect.ServerStream[ftlv1.ModuleContextResponse]) error {
name := req.Msg.Module

// TODO migrate to a polling implementation that only emits responses when the configuration changes

cm := cf.ConfigFromContext(ctx)
sm := cf.SecretsFromContext(ctx)

configs, err := cm.MapForModule(ctx, name)
if err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("could not get configs: %w", err))
}
secrets, err := sm.MapForModule(ctx, name)
if err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("could not get secrets: %w", err))
}
databases, err := modulecontext.DatabasesFromSecrets(ctx, name, secrets)
if err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("could not get databases: %w", err))
}
lastChecksum := int64(0)

for {
select {
case <-ctx.Done():
return nil
case <-time.After(30 * time.Second):
}

configs, err := cm.MapForModule(ctx, name)
if err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("could not get configs: %w", err))
}
secrets, err := sm.MapForModule(ctx, name)
if err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("could not get secrets: %w", err))
}
databases, err := modulecontext.DatabasesFromSecrets(ctx, name, secrets)
if err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("could not get databases: %w", err))
}

checksum := configurationMapChecksum(configs)
checksum = (checksum * 115163) + configurationMapChecksum(secrets)
checksum = (checksum * 454213) + configurationDatabaseChecksum(databases)

response := modulecontext.NewBuilder(name).AddConfigs(configs).AddSecrets(secrets).AddDatabases(databases).Build().ToProto()
if checksum != lastChecksum {
response := modulecontext.NewBuilder(name).AddConfigs(configs).AddSecrets(secrets).AddDatabases(databases).Build().ToProto()

if err := resp.Send(response); err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("could not send response: %w", err))
}

if err := resp.Send(response); err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("could not send response: %w", err))
lastChecksum = checksum
}
}
}

return nil
// configurationMapChecksum computes a checksum on the map that is order invariant.
//
// This operation is used to detect configuration change.
func configurationMapChecksum(m map[string][]byte) int64 {
sum := int64(0)
for k, v := range m {
data := sha256.Sum(append([]byte(k), v...))
sum += int64(binary.BigEndian.Uint64(data[0:8]))
}
return sum
}

// configurationMapChecksum computes a checksum on the database map that is order invariant.
//
// This operation is used to detect configuration change.
func configurationDatabaseChecksum(m map[string]modulecontext.Database) int64 {
sum := int64(0)
for k, v := range m {
// currently, only the DSN is treated as mutable configuration
data := sha256.Sum(append([]byte(k), []byte(v.DSN)...))
sum += int64(binary.BigEndian.Uint64(data[0:8]))
}
return sum
}

// AcquireLease acquires a lease on behalf of a module.
Expand Down

0 comments on commit f48ae05

Please sign in to comment.