Skip to content

Commit

Permalink
refactor: simplify the common case of ranging over a channel w/ cance…
Browse files Browse the repository at this point in the history
…llation

Simplifies this:

  for {
    select {
      case <-ctx.Done():
        return
      case msg := <-events:
        // DO STUFF
    }
  }

To this:

  for msg := range channels.IterContext(ctx, events) {
    // DO STUFF
  }
  • Loading branch information
alecthomas committed Dec 19, 2024
1 parent 883e350 commit 883adac
Show file tree
Hide file tree
Showing 18 changed files with 323 additions and 381 deletions.
17 changes: 6 additions & 11 deletions backend/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/block/ftl/common/schema"
frontend "github.com/block/ftl/frontend/console"
"github.com/block/ftl/internal/buildengine"
"github.com/block/ftl/internal/channels"
"github.com/block/ftl/internal/log"
"github.com/block/ftl/internal/routing"
"github.com/block/ftl/internal/rpc"
Expand Down Expand Up @@ -345,24 +346,18 @@ func getReferencesFromMap(refMap map[schema.RefKey]map[schema.RefKey]bool, modul
}

func (s *service) StreamModules(ctx context.Context, req *connect.Request[consolepb.StreamModulesRequest], stream *connect.ServerStream[consolepb.StreamModulesResponse]) error {

err := s.sendStreamModulesResp(ctx, stream)
if err != nil {
return err
}

for {
select {
case <-ctx.Done():
return nil

case <-s.schemaEventSource.Events():
err = s.sendStreamModulesResp(ctx, stream)
if err != nil {
return err
}
for range channels.IterContext(ctx, s.schemaEventSource.Events()) {
err = s.sendStreamModulesResp(ctx, stream)
if err != nil {
return err
}
}
return nil
}

// filterDeployments removes any duplicate modules by selecting the deployment with the
Expand Down
109 changes: 51 additions & 58 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/block/ftl/common/schema"
"github.com/block/ftl/common/sha256"
"github.com/block/ftl/common/slices"
"github.com/block/ftl/internal/channels"
"github.com/block/ftl/internal/deploymentcontext"
"github.com/block/ftl/internal/log"
ftlmaps "github.com/block/ftl/internal/maps"
Expand Down Expand Up @@ -1146,68 +1147,60 @@ func (s *Service) watchModuleChanges(ctx context.Context, sendChange func(respon
}
logger.Tracef("Seeded %d deployments", initialCount)

for {
select {
case <-ctx.Done():
return nil

case notification := <-updates:
switch event := notification.(type) {
case *state.DeploymentCreatedEvent:
err := sendChange(&ftlv1.PullSchemaResponse{ //nolint:forcetypeassert
ModuleName: event.Module,
DeploymentKey: proto.String(event.Key.String()),
Schema: event.Schema.ToProto(),
ChangeType: ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGE_TYPE_ADDED,
})
if err != nil {
return err
}
case *state.DeploymentDeactivatedEvent:
view, err := s.controllerState.View(ctx)
if err != nil {
return fmt.Errorf("failed to get controller state: %w", err)
}
dep, err := view.GetDeployment(event.Key)
if err != nil {
logger.Errorf(err, "Deployment not found: %s", event.Key)
continue
}
err = sendChange(&ftlv1.PullSchemaResponse{ //nolint:forcetypeassert
ModuleName: dep.Module,
DeploymentKey: proto.String(event.Key.String()),
Schema: dep.Schema.ToProto(),
ChangeType: ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGE_TYPE_REMOVED,
ModuleRemoved: event.ModuleRemoved,
})
if err != nil {
return err
}
case *state.DeploymentSchemaUpdatedEvent:
view, err := s.controllerState.View(ctx)
if err != nil {
return fmt.Errorf("failed to get controller state: %w", err)
}
dep, err := view.GetDeployment(event.Key)
if err != nil {
logger.Errorf(err, "Deployment not found: %s", event.Key)
continue
}
err = sendChange(&ftlv1.PullSchemaResponse{ //nolint:forcetypeassert
ModuleName: dep.Module,
DeploymentKey: proto.String(event.Key.String()),
Schema: event.Schema.ToProto(),
ChangeType: ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGE_TYPE_CHANGED,
})
if err != nil {
return err
}
default:
for notification := range channels.IterContext(ctx, updates) {
switch event := notification.(type) {
case *state.DeploymentCreatedEvent:
err := sendChange(&ftlv1.PullSchemaResponse{ //nolint:forcetypeassert
ModuleName: event.Module,
DeploymentKey: proto.String(event.Key.String()),
Schema: event.Schema.ToProto(),
ChangeType: ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGE_TYPE_ADDED,
})
if err != nil {
return err
}
case *state.DeploymentDeactivatedEvent:
view, err := s.controllerState.View(ctx)
if err != nil {
return fmt.Errorf("failed to get controller state: %w", err)
}
dep, err := view.GetDeployment(event.Key)
if err != nil {
logger.Errorf(err, "Deployment not found: %s", event.Key)
continue
}

err = sendChange(&ftlv1.PullSchemaResponse{ //nolint:forcetypeassert
ModuleName: dep.Module,
DeploymentKey: proto.String(event.Key.String()),
Schema: dep.Schema.ToProto(),
ChangeType: ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGE_TYPE_REMOVED,
ModuleRemoved: event.ModuleRemoved,
})
if err != nil {
return err
}
case *state.DeploymentSchemaUpdatedEvent:
view, err := s.controllerState.View(ctx)
if err != nil {
return fmt.Errorf("failed to get controller state: %w", err)
}
dep, err := view.GetDeployment(event.Key)
if err != nil {
logger.Errorf(err, "Deployment not found: %s", event.Key)
continue
}
err = sendChange(&ftlv1.PullSchemaResponse{ //nolint:forcetypeassert
ModuleName: dep.Module,
DeploymentKey: proto.String(event.Key.String()),
Schema: event.Schema.ToProto(),
ChangeType: ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGE_TYPE_CHANGED,
})
if err != nil {
return err
}
}
}
return nil
}

func (s *Service) getDeploymentLogger(ctx context.Context, deploymentKey model.DeploymentKey) *log.Logger {
Expand Down
68 changes: 31 additions & 37 deletions backend/controller/deployment_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package controller
import (
"context"
"fmt"
"time"

"github.com/alecthomas/types/optional"

"github.com/block/ftl/backend/timeline"
"github.com/block/ftl/internal/channels"
"github.com/block/ftl/internal/log"
"github.com/block/ftl/internal/model"
)
Expand Down Expand Up @@ -41,46 +41,40 @@ func (d *deploymentLogsSink) Log(entry log.Entry) error {
}

func (d *deploymentLogsSink) processLogs(ctx context.Context, timelineClient *timeline.Client) {
for {
select {
case entry := <-d.logQueue:
var deployment model.DeploymentKey
depStr, ok := entry.Attributes["deployment"]
if !ok {
continue
}

dep, err := model.ParseDeploymentKey(depStr)
if err != nil {
continue
}
deployment = dep
for entry := range channels.IterContext(ctx, d.logQueue) {
var deployment model.DeploymentKey
depStr, ok := entry.Attributes["deployment"]
if !ok {
continue
}

var request optional.Option[model.RequestKey]
if reqStr, ok := entry.Attributes["request"]; ok {
req, err := model.ParseRequestKey(reqStr)
if err == nil {
request = optional.Some(req)
}
}
dep, err := model.ParseDeploymentKey(depStr)
if err != nil {
continue
}
deployment = dep

var errorStr optional.Option[string]
if entry.Error != nil {
errorStr = optional.Some(entry.Error.Error())
var request optional.Option[model.RequestKey]
if reqStr, ok := entry.Attributes["request"]; ok {
req, err := model.ParseRequestKey(reqStr)
if err == nil {
request = optional.Some(req)
}
}

timelineClient.Publish(ctx, &timeline.Log{
RequestKey: request,
DeploymentKey: deployment,
Time: entry.Time,
Level: int32(entry.Level.Severity()),
Attributes: entry.Attributes,
Message: entry.Message,
Error: errorStr,
})
case <-ctx.Done():
return
case <-time.After(1 * time.Second):
var errorStr optional.Option[string]
if entry.Error != nil {
errorStr = optional.Some(entry.Error.Error())
}

timelineClient.Publish(ctx, &timeline.Log{
RequestKey: request,
DeploymentKey: deployment,
Time: entry.Time,
Level: int32(entry.Level.Severity()),
Attributes: entry.Attributes,
Message: entry.Message,
Error: errorStr,
})
}
}
19 changes: 7 additions & 12 deletions backend/ingress/view.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/alecthomas/atomic"

"github.com/block/ftl/common/schema"
"github.com/block/ftl/internal/channels"
"github.com/block/ftl/internal/log"
"github.com/block/ftl/internal/schema/schemaeventsource"
)
Expand All @@ -19,19 +20,13 @@ func syncView(ctx context.Context, schemaEventSource schemaeventsource.EventSour
})
logger.Debugf("Starting routing sync from schema")
go func() {
for {
select {
case <-ctx.Done():
return

case event := <-schemaEventSource.Events():
if event, ok := event.(schemaeventsource.EventRemove); ok && !event.Deleted {
logger.Debugf("Not removing ingress for %s as it is not the current deployment", event.Deployment)
continue
}
state := extractIngressRoutingEntries(event.Schema())
out.Store(state)
for event := range channels.IterContext(ctx, schemaEventSource.Events()) {
if event, ok := event.(schemaeventsource.EventRemove); ok && !event.Deleted {
logger.Debugf("Not removing ingress for %s as it is not the current deployment", event.Deployment)
continue
}
state := extractIngressRoutingEntries(event.Schema())
out.Store(state)
}
}()
return out
Expand Down
14 changes: 5 additions & 9 deletions backend/provisioner/scaling/localscaling/local_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/block/ftl/backend/runner"
"github.com/block/ftl/common/plugin"
"github.com/block/ftl/common/schema"
"github.com/block/ftl/internal/channels"
"github.com/block/ftl/internal/dev"
"github.com/block/ftl/internal/localdebug"
"github.com/block/ftl/internal/log"
Expand Down Expand Up @@ -108,15 +109,10 @@ type devModeRunner struct {

func (l *localScaling) Start(ctx context.Context) error {
go func() {
for {
select {
case <-ctx.Done():
return
case devEndpoints := <-l.devModeEndpointsUpdates:
l.lock.Lock()
l.updateDevModeEndpoint(ctx, devEndpoints)
l.lock.Unlock()
}
for devEndpoints := range channels.IterContext(ctx, l.devModeEndpointsUpdates) {
l.lock.Lock()
l.updateDevModeEndpoint(ctx, devEndpoints)
l.lock.Unlock()
}
}()
return nil
Expand Down
Loading

0 comments on commit 883adac

Please sign in to comment.