diff --git a/backend/console/console.go b/backend/console/console.go index f6f09e0791..876d6f1976 100644 --- a/backend/console/console.go +++ b/backend/console/console.go @@ -20,6 +20,7 @@ import ( "github.com/block/ftl/common/schema" frontend "github.com/block/ftl/frontend/console" "github.com/block/ftl/internal/buildengine" + "github.com/block/ftl/internal/channels" "github.com/block/ftl/internal/log" "github.com/block/ftl/internal/routing" "github.com/block/ftl/internal/rpc" @@ -345,24 +346,18 @@ func getReferencesFromMap(refMap map[schema.RefKey]map[schema.RefKey]bool, modul } func (s *service) StreamModules(ctx context.Context, req *connect.Request[consolepb.StreamModulesRequest], stream *connect.ServerStream[consolepb.StreamModulesResponse]) error { - err := s.sendStreamModulesResp(ctx, stream) if err != nil { return err } - for { - select { - case <-ctx.Done(): - return nil - - case <-s.schemaEventSource.Events(): - err = s.sendStreamModulesResp(ctx, stream) - if err != nil { - return err - } + for range channels.IterContext(ctx, s.schemaEventSource.Events()) { + err = s.sendStreamModulesResp(ctx, stream) + if err != nil { + return err } } + return nil } // filterDeployments removes any duplicate modules by selecting the deployment with the diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 3a5098670f..2dda64cdff 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -42,6 +42,7 @@ import ( "github.com/block/ftl/common/schema" "github.com/block/ftl/common/sha256" "github.com/block/ftl/common/slices" + "github.com/block/ftl/internal/channels" "github.com/block/ftl/internal/deploymentcontext" "github.com/block/ftl/internal/log" ftlmaps "github.com/block/ftl/internal/maps" @@ -1146,68 +1147,60 @@ func (s *Service) watchModuleChanges(ctx context.Context, sendChange func(respon } logger.Tracef("Seeded %d deployments", initialCount) - for { - select { - case <-ctx.Done(): - return nil - - case notification := <-updates: - switch event := notification.(type) { - case *state.DeploymentCreatedEvent: - err := sendChange(&ftlv1.PullSchemaResponse{ //nolint:forcetypeassert - ModuleName: event.Module, - DeploymentKey: proto.String(event.Key.String()), - Schema: event.Schema.ToProto(), - ChangeType: ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGE_TYPE_ADDED, - }) - if err != nil { - return err - } - case *state.DeploymentDeactivatedEvent: - view, err := s.controllerState.View(ctx) - if err != nil { - return fmt.Errorf("failed to get controller state: %w", err) - } - dep, err := view.GetDeployment(event.Key) - if err != nil { - logger.Errorf(err, "Deployment not found: %s", event.Key) - continue - } - err = sendChange(&ftlv1.PullSchemaResponse{ //nolint:forcetypeassert - ModuleName: dep.Module, - DeploymentKey: proto.String(event.Key.String()), - Schema: dep.Schema.ToProto(), - ChangeType: ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGE_TYPE_REMOVED, - ModuleRemoved: event.ModuleRemoved, - }) - if err != nil { - return err - } - case *state.DeploymentSchemaUpdatedEvent: - view, err := s.controllerState.View(ctx) - if err != nil { - return fmt.Errorf("failed to get controller state: %w", err) - } - dep, err := view.GetDeployment(event.Key) - if err != nil { - logger.Errorf(err, "Deployment not found: %s", event.Key) - continue - } - err = sendChange(&ftlv1.PullSchemaResponse{ //nolint:forcetypeassert - ModuleName: dep.Module, - DeploymentKey: proto.String(event.Key.String()), - Schema: event.Schema.ToProto(), - ChangeType: ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGE_TYPE_CHANGED, - }) - if err != nil { - return err - } - default: + for notification := range channels.IterContext(ctx, updates) { + switch event := notification.(type) { + case *state.DeploymentCreatedEvent: + err := sendChange(&ftlv1.PullSchemaResponse{ //nolint:forcetypeassert + ModuleName: event.Module, + DeploymentKey: proto.String(event.Key.String()), + Schema: event.Schema.ToProto(), + ChangeType: ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGE_TYPE_ADDED, + }) + if err != nil { + return err + } + case *state.DeploymentDeactivatedEvent: + view, err := s.controllerState.View(ctx) + if err != nil { + return fmt.Errorf("failed to get controller state: %w", err) + } + dep, err := view.GetDeployment(event.Key) + if err != nil { + logger.Errorf(err, "Deployment not found: %s", event.Key) continue } - + err = sendChange(&ftlv1.PullSchemaResponse{ //nolint:forcetypeassert + ModuleName: dep.Module, + DeploymentKey: proto.String(event.Key.String()), + Schema: dep.Schema.ToProto(), + ChangeType: ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGE_TYPE_REMOVED, + ModuleRemoved: event.ModuleRemoved, + }) + if err != nil { + return err + } + case *state.DeploymentSchemaUpdatedEvent: + view, err := s.controllerState.View(ctx) + if err != nil { + return fmt.Errorf("failed to get controller state: %w", err) + } + dep, err := view.GetDeployment(event.Key) + if err != nil { + logger.Errorf(err, "Deployment not found: %s", event.Key) + continue + } + err = sendChange(&ftlv1.PullSchemaResponse{ //nolint:forcetypeassert + ModuleName: dep.Module, + DeploymentKey: proto.String(event.Key.String()), + Schema: event.Schema.ToProto(), + ChangeType: ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGE_TYPE_CHANGED, + }) + if err != nil { + return err + } } } + return nil } func (s *Service) getDeploymentLogger(ctx context.Context, deploymentKey model.DeploymentKey) *log.Logger { diff --git a/backend/controller/deployment_logs.go b/backend/controller/deployment_logs.go index 5cc00ced29..fe939191c2 100644 --- a/backend/controller/deployment_logs.go +++ b/backend/controller/deployment_logs.go @@ -3,11 +3,11 @@ package controller import ( "context" "fmt" - "time" "github.com/alecthomas/types/optional" "github.com/block/ftl/backend/timeline" + "github.com/block/ftl/internal/channels" "github.com/block/ftl/internal/log" "github.com/block/ftl/internal/model" ) @@ -41,46 +41,40 @@ func (d *deploymentLogsSink) Log(entry log.Entry) error { } func (d *deploymentLogsSink) processLogs(ctx context.Context, timelineClient *timeline.Client) { - for { - select { - case entry := <-d.logQueue: - var deployment model.DeploymentKey - depStr, ok := entry.Attributes["deployment"] - if !ok { - continue - } - - dep, err := model.ParseDeploymentKey(depStr) - if err != nil { - continue - } - deployment = dep + for entry := range channels.IterContext(ctx, d.logQueue) { + var deployment model.DeploymentKey + depStr, ok := entry.Attributes["deployment"] + if !ok { + continue + } - var request optional.Option[model.RequestKey] - if reqStr, ok := entry.Attributes["request"]; ok { - req, err := model.ParseRequestKey(reqStr) - if err == nil { - request = optional.Some(req) - } - } + dep, err := model.ParseDeploymentKey(depStr) + if err != nil { + continue + } + deployment = dep - var errorStr optional.Option[string] - if entry.Error != nil { - errorStr = optional.Some(entry.Error.Error()) + var request optional.Option[model.RequestKey] + if reqStr, ok := entry.Attributes["request"]; ok { + req, err := model.ParseRequestKey(reqStr) + if err == nil { + request = optional.Some(req) } + } - timelineClient.Publish(ctx, &timeline.Log{ - RequestKey: request, - DeploymentKey: deployment, - Time: entry.Time, - Level: int32(entry.Level.Severity()), - Attributes: entry.Attributes, - Message: entry.Message, - Error: errorStr, - }) - case <-ctx.Done(): - return - case <-time.After(1 * time.Second): + var errorStr optional.Option[string] + if entry.Error != nil { + errorStr = optional.Some(entry.Error.Error()) } + + timelineClient.Publish(ctx, &timeline.Log{ + RequestKey: request, + DeploymentKey: deployment, + Time: entry.Time, + Level: int32(entry.Level.Severity()), + Attributes: entry.Attributes, + Message: entry.Message, + Error: errorStr, + }) } } diff --git a/backend/ingress/view.go b/backend/ingress/view.go index 52ccdd2fa4..05d2416714 100644 --- a/backend/ingress/view.go +++ b/backend/ingress/view.go @@ -6,6 +6,7 @@ import ( "github.com/alecthomas/atomic" "github.com/block/ftl/common/schema" + "github.com/block/ftl/internal/channels" "github.com/block/ftl/internal/log" "github.com/block/ftl/internal/schema/schemaeventsource" ) @@ -19,19 +20,13 @@ func syncView(ctx context.Context, schemaEventSource schemaeventsource.EventSour }) logger.Debugf("Starting routing sync from schema") go func() { - for { - select { - case <-ctx.Done(): - return - - case event := <-schemaEventSource.Events(): - if event, ok := event.(schemaeventsource.EventRemove); ok && !event.Deleted { - logger.Debugf("Not removing ingress for %s as it is not the current deployment", event.Deployment) - continue - } - state := extractIngressRoutingEntries(event.Schema()) - out.Store(state) + for event := range channels.IterContext(ctx, schemaEventSource.Events()) { + if event, ok := event.(schemaeventsource.EventRemove); ok && !event.Deleted { + logger.Debugf("Not removing ingress for %s as it is not the current deployment", event.Deployment) + continue } + state := extractIngressRoutingEntries(event.Schema()) + out.Store(state) } }() return out diff --git a/backend/provisioner/scaling/localscaling/local_scaling.go b/backend/provisioner/scaling/localscaling/local_scaling.go index 1a49ab0038..4c6087eaf1 100644 --- a/backend/provisioner/scaling/localscaling/local_scaling.go +++ b/backend/provisioner/scaling/localscaling/local_scaling.go @@ -18,6 +18,7 @@ import ( "github.com/block/ftl/backend/runner" "github.com/block/ftl/common/plugin" "github.com/block/ftl/common/schema" + "github.com/block/ftl/internal/channels" "github.com/block/ftl/internal/dev" "github.com/block/ftl/internal/localdebug" "github.com/block/ftl/internal/log" @@ -108,15 +109,10 @@ type devModeRunner struct { func (l *localScaling) Start(ctx context.Context) error { go func() { - for { - select { - case <-ctx.Done(): - return - case devEndpoints := <-l.devModeEndpointsUpdates: - l.lock.Lock() - l.updateDevModeEndpoint(ctx, devEndpoints) - l.lock.Unlock() - } + for devEndpoints := range channels.IterContext(ctx, l.devModeEndpointsUpdates) { + l.lock.Lock() + l.updateDevModeEndpoint(ctx, devEndpoints) + l.lock.Unlock() } }() return nil diff --git a/backend/runner/pubsub/consumer.go b/backend/runner/pubsub/consumer.go index 51f14a00bd..9c6b0c0349 100644 --- a/backend/runner/pubsub/consumer.go +++ b/backend/runner/pubsub/consumer.go @@ -17,6 +17,7 @@ import ( "github.com/block/ftl/backend/timeline" "github.com/block/ftl/common/schema" "github.com/block/ftl/common/slices" + "github.com/block/ftl/internal/channels" "github.com/block/ftl/internal/log" "github.com/block/ftl/internal/model" ) @@ -107,13 +108,8 @@ func (c *consumer) Begin(ctx context.Context) error { func (c *consumer) watchErrors(ctx context.Context) { logger := log.FromContext(ctx) - for { - select { - case <-ctx.Done(): - return - case err := <-c.group.Errors(): - logger.Errorf(err, "Consumer group error") - } + for err := range channels.IterContext(ctx, c.group.Errors()) { + logger.Errorf(err, "Consumer group error") } } @@ -162,48 +158,43 @@ func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram ctx := session.Context() logger := log.FromContext(ctx) - for { - select { - case <-ctx.Done(): - // Rebalance or shutdown needed + for msg := range channels.IterContext(ctx, claim.Messages()) { + if msg == nil { + // Channel closed, rebalance or shutdown needed return nil - - case msg := <-claim.Messages(): - if msg == nil { - // Channel closed, rebalance or shutdown needed + } + logger.Debugf("Consuming message with partition %v and offset %v", msg.Partition, msg.Offset) + remainingRetries := c.retryParams.Count + backoff := c.retryParams.MinBackoff + for { + err := c.call(ctx, msg.Value, int(msg.Partition), int(msg.Offset)) + if err == nil { + break + } + select { + case <-ctx.Done(): + // Do not commit the message if we did not succeed and the context is done. + // No need to retry message either. + logger.Errorf(err, "Failed to consume message with partition %v and offset %v", msg.Partition, msg.Offset) return nil + default: + } + if remainingRetries == 0 { + logger.Errorf(err, "Failed to consume message with partition %v and offset %v", msg.Partition, msg.Offset) + break } - logger.Debugf("Consuming message with partition %v and offset %v", msg.Partition, msg.Offset) - remainingRetries := c.retryParams.Count - backoff := c.retryParams.MinBackoff - for { - err := c.call(ctx, msg.Value, int(msg.Partition), int(msg.Offset)) - if err == nil { - break - } - select { - case <-ctx.Done(): - // Do not commit the message if we did not succeed and the context is done. - // No need to retry message either. - logger.Errorf(err, "Failed to consume message with partition %v and offset %v", msg.Partition, msg.Offset) - return nil - default: - } - if remainingRetries == 0 { - logger.Errorf(err, "Failed to consume message with partition %v and offset %v", msg.Partition, msg.Offset) - break - } - logger.Errorf(err, "Failed to consume message with partition %v and offset %v and will retry in %vs", msg.Partition, msg.Offset, int(backoff.Seconds())) - time.Sleep(backoff) - remainingRetries-- - backoff *= 2 - if backoff > c.retryParams.MaxBackoff { - backoff = c.retryParams.MaxBackoff - } + logger.Errorf(err, "Failed to consume message with partition %v and offset %v and will retry in %vs", msg.Partition, msg.Offset, int(backoff.Seconds())) + time.Sleep(backoff) + remainingRetries-- + backoff *= 2 + if backoff > c.retryParams.MaxBackoff { + backoff = c.retryParams.MaxBackoff } - session.MarkMessage(msg, "") } + session.MarkMessage(msg, "") } + // Rebalance or shutdown needed + return nil } func (c *consumer) call(ctx context.Context, body []byte, partition, offset int) error { diff --git a/backend/timeline/service.go b/backend/timeline/service.go index 7127007c34..5ac29ece82 100644 --- a/backend/timeline/service.go +++ b/backend/timeline/service.go @@ -18,6 +18,7 @@ import ( timelineconnect "github.com/block/ftl/backend/protos/xyz/block/ftl/timeline/v1/timelinepbconnect" ftlv1 "github.com/block/ftl/backend/protos/xyz/block/ftl/v1" "github.com/block/ftl/common/slices" + "github.com/block/ftl/internal/channels" "github.com/block/ftl/internal/log" "github.com/block/ftl/internal/rpc" ) @@ -288,27 +289,25 @@ func (s *service) reapCallEvents(ctx context.Context) { } else { interval = *s.config.EventLogRetention / 20 } - for { - select { - case <-ctx.Done(): - return - case <-time.After(interval): - if s.config.EventLogRetention == nil { - logger.Tracef("Event log retention is disabled, will not prune.") - continue - } + ticker := time.NewTicker(interval) + defer ticker.Stop() + for range channels.IterContext(ctx, ticker.C) { + if s.config.EventLogRetention == nil { + logger.Tracef("Event log retention is disabled, will not prune.") + continue + } - resp, err := s.DeleteOldEvents(ctx, connect.NewRequest(&timelinepb.DeleteOldEventsRequest{ - EventType: timelinepb.EventType_EVENT_TYPE_CALL, - AgeSeconds: int64(s.config.EventLogRetention.Seconds()), - })) - if err != nil { - logger.Errorf(err, "Failed to prune call events") - continue - } - if resp.Msg.DeletedCount > 0 { - logger.Debugf("Pruned %d call events older than %s", resp.Msg.DeletedCount, s.config.EventLogRetention) - } + resp, err := s.DeleteOldEvents(ctx, connect.NewRequest(&timelinepb.DeleteOldEventsRequest{ + EventType: timelinepb.EventType_EVENT_TYPE_CALL, + AgeSeconds: int64(s.config.EventLogRetention.Seconds()), + })) + if err != nil { + logger.Errorf(err, "Failed to prune call events") + continue + } + if resp.Msg.DeletedCount > 0 { + logger.Debugf("Pruned %d call events older than %s", resp.Msg.DeletedCount, s.config.EventLogRetention) } } + return } diff --git a/frontend/cli/cmd_serve.go b/frontend/cli/cmd_serve.go index 5a8a40bc67..1ba4c0b7fd 100644 --- a/frontend/cli/cmd_serve.go +++ b/frontend/cli/cmd_serve.go @@ -34,6 +34,7 @@ import ( "github.com/block/ftl/backend/timeline" "github.com/block/ftl/common/schema" "github.com/block/ftl/internal/bind" + "github.com/block/ftl/internal/channels" "github.com/block/ftl/internal/configuration" "github.com/block/ftl/internal/configuration/manager" "github.com/block/ftl/internal/dev" @@ -539,22 +540,21 @@ func waitForControllerOnline(ctx context.Context, startupTimeout time.Duration, ticker := time.NewTicker(time.Millisecond * 50) defer ticker.Stop() - for { - select { - case <-ticker.C: - _, err := client.Status(ctx, connect.NewRequest(&ftlv1.StatusRequest{})) - if err != nil { - logger.Tracef("Error getting status, retrying...: %v", err) - continue // retry - } + for range channels.IterContext(ctx, ticker.C) { + _, err := client.Status(ctx, connect.NewRequest(&ftlv1.StatusRequest{})) + if err != nil { + logger.Tracef("Error getting status, retrying...: %v", err) + continue // retry + } - return nil + return nil + } + if ctx.Err() == nil { + return nil + } - case <-ctx.Done(): - if errors.Is(ctx.Err(), context.DeadlineExceeded) { - logger.Errorf(ctx.Err(), "Timeout reached while polling for controller status") - } - return ctx.Err() - } + if errors.Is(ctx.Err(), context.DeadlineExceeded) { + logger.Errorf(ctx.Err(), "Timeout reached while polling for controller status") } + return fmt.Errorf("context cancelled: %w", ctx.Err()) } diff --git a/go-runtime/ftl/ftltest/pubsub.go b/go-runtime/ftl/ftltest/pubsub.go index c50da42cec..dc65a5d3b8 100644 --- a/go-runtime/ftl/ftltest/pubsub.go +++ b/go-runtime/ftl/ftltest/pubsub.go @@ -14,6 +14,7 @@ import ( "github.com/block/ftl/common/schema" "github.com/block/ftl/common/slices" "github.com/block/ftl/go-runtime/ftl" + "github.com/block/ftl/internal/channels" "github.com/block/ftl/internal/log" ) @@ -95,13 +96,8 @@ func (f *fakePubSub) watchPubSub(ctx context.Context) { f.globalTopic.Subscribe(events) go func() { defer f.globalTopic.Unsubscribe(events) - for { - select { - case e := <-events: - f.handlePubSubEvent(ctx, e) - case <-ctx.Done(): - return - } + for e := range channels.IterContext(ctx, events) { + f.handlePubSubEvent(ctx, e) } }() } diff --git a/go-runtime/goplugin/service.go b/go-runtime/goplugin/service.go index e279bd2adf..4aa9a5a56c 100644 --- a/go-runtime/goplugin/service.go +++ b/go-runtime/goplugin/service.go @@ -23,6 +23,7 @@ import ( goruntime "github.com/block/ftl/go-runtime" "github.com/block/ftl/go-runtime/compile" "github.com/block/ftl/internal" + "github.com/block/ftl/internal/channels" "github.com/block/ftl/internal/exec" "github.com/block/ftl/internal/flock" "github.com/block/ftl/internal/log" @@ -259,31 +260,29 @@ func (s *Service) Build(ctx context.Context, req *connect.Request[langpb.BuildRe } // Watch for changes and build as needed - for { - select { - case e := <-events: - var isAutomaticRebuild bool - buildCtx, isAutomaticRebuild = buildContextFromPendingEvents(ctx, buildCtx, events, e, ongoingState) - if isAutomaticRebuild { - err = stream.Send(&langpb.BuildResponse{ - Event: &langpb.BuildResponse_AutoRebuildStarted{ - AutoRebuildStarted: &langpb.AutoRebuildStarted{ - ContextId: buildCtx.ID, - }, + for e := range channels.IterContext(ctx, events) { + var isAutomaticRebuild bool + buildCtx, isAutomaticRebuild = buildContextFromPendingEvents(ctx, buildCtx, events, e, ongoingState) + if isAutomaticRebuild { + err = stream.Send(&langpb.BuildResponse{ + Event: &langpb.BuildResponse_AutoRebuildStarted{ + AutoRebuildStarted: &langpb.AutoRebuildStarted{ + ContextId: buildCtx.ID, }, - }) - if err != nil { - return fmt.Errorf("could not send auto rebuild started event: %w", err) - } - } - if err = buildAndSend(ctx, stream, req.Msg.ProjectRoot, req.Msg.StubsRoot, buildCtx, isAutomaticRebuild, watcher.GetTransaction(buildCtx.Config.Dir), ongoingState); err != nil { - return err + }, + }) + if err != nil { + return fmt.Errorf("could not send auto rebuild started event: %w", err) } - case <-ctx.Done(): - log.FromContext(ctx).Infof("Build call ending - ctx cancelled") - return nil } + if err = buildAndSend(ctx, stream, req.Msg.ProjectRoot, req.Msg.StubsRoot, buildCtx, isAutomaticRebuild, watcher.GetTransaction(buildCtx.Config.Dir), ongoingState); err != nil { + return err + } + } + if ctx.Err() != nil { + log.FromContext(ctx).Infof("Build call ending - ctx cancelled") } + return nil } // BuildContextUpdated is called whenever the build context is update while a Build call with "rebuild_automatically" is active. @@ -327,16 +326,10 @@ func watchFiles(ctx context.Context, watcher *watch.Watcher, buildCtx buildConte } go func() { - for { - select { - case e := <-watchEvents: - if change, ok := e.(watch.WatchEventModuleChanged); ok { - log.FromContext(ctx).Infof("Found file changes: %s", change) - events <- filesUpdatedEvent{changes: change.Changes} - } - - case <-ctx.Done(): - return + for e := range channels.IterContext(ctx, watchEvents) { + if change, ok := e.(watch.WatchEventModuleChanged); ok { + log.FromContext(ctx).Infof("Found file changes: %s", change) + events <- filesUpdatedEvent{changes: change.Changes} } } }() diff --git a/internal/buildengine/deploy.go b/internal/buildengine/deploy.go index b9078cec07..4a7de2d251 100644 --- a/internal/buildengine/deploy.go +++ b/internal/buildengine/deploy.go @@ -17,6 +17,7 @@ import ( schemapb "github.com/block/ftl/common/protos/xyz/block/ftl/schema/v1" "github.com/block/ftl/common/sha256" "github.com/block/ftl/common/slices" + "github.com/block/ftl/internal/channels" "github.com/block/ftl/internal/log" "github.com/block/ftl/internal/moduleconfig" "github.com/block/ftl/internal/projectconfig" @@ -261,34 +262,29 @@ func checkReadiness(ctx context.Context, client DeployClient, deploymentKey stri break } } - for { - select { - case <-ticker.C: - status, err := client.Status(ctx, connect.NewRequest(&ftlv1.StatusRequest{})) - if err != nil { - return err - } + for range channels.IterContext(ctx, ticker.C) { + status, err := client.Status(ctx, connect.NewRequest(&ftlv1.StatusRequest{})) + if err != nil { + return err + } - for _, deployment := range status.Msg.Deployments { - if deployment.Key == deploymentKey { - if deployment.Replicas >= replicas { - if hasVerbs { - // Also verify the routing table is ready - for _, route := range status.Msg.Routes { - if route.Deployment == deploymentKey { - return nil - } + for _, deployment := range status.Msg.Deployments { + if deployment.Key == deploymentKey { + if deployment.Replicas >= replicas { + if hasVerbs { + // Also verify the routing table is ready + for _, route := range status.Msg.Routes { + if route.Deployment == deploymentKey { + return nil } - - } else { - return nil } + + } else { + return nil } } } - - case <-ctx.Done(): - return ctx.Err() } } + return nil } diff --git a/internal/buildengine/engine.go b/internal/buildengine/engine.go index 7a1ab4f226..40b9951db2 100644 --- a/internal/buildengine/engine.go +++ b/internal/buildengine/engine.go @@ -19,6 +19,7 @@ import ( "github.com/block/ftl/common/schema" "github.com/block/ftl/common/slices" "github.com/block/ftl/internal/buildengine/languageplugin" + "github.com/block/ftl/internal/channels" "github.com/block/ftl/internal/dev" "github.com/block/ftl/internal/log" "github.com/block/ftl/internal/moduleconfig" @@ -316,29 +317,17 @@ func (e *Engine) startSchemaSync(ctx context.Context) { logger.Debugf("Schema source is not live, skipping initial sync.") } else { initialSync: - for { - select { - case <-ctx.Done(): - return - - case event := <-e.schemaSource.Events(): - e.processEvent(event) - if !event.More() { - break initialSync - } + for event := range channels.IterContext(ctx, e.schemaSource.Events()) { + e.processEvent(event) + if !event.More() { + break initialSync } } } go func() { - for { - select { - case <-ctx.Done(): - return - - case event := <-e.schemaSource.Events(): - e.processEvent(event) - } + for event := range channels.IterContext(ctx, e.schemaSource.Events()) { + e.processEvent(event) } }() } diff --git a/internal/buildengine/terminal.go b/internal/buildengine/terminal.go index ed9ba419cf..0db293fd3b 100644 --- a/internal/buildengine/terminal.go +++ b/internal/buildengine/terminal.go @@ -5,6 +5,7 @@ import ( "github.com/alecthomas/types/pubsub" + "github.com/block/ftl/internal/channels" "github.com/block/ftl/internal/terminal" ) @@ -14,36 +15,31 @@ func updateTerminalWithEngineEvents(ctx context.Context, topic *pubsub.Topic[Eng go func() { defer topic.Unsubscribe(events) - for { - select { - case event := <-events: - switch event := event.(type) { - case EngineStarted: - case EngineEnded: - - case ModuleAdded: - terminal.UpdateModuleState(ctx, event.Module, terminal.BuildStateWaiting) - case ModuleRemoved: - terminal.UpdateModuleState(ctx, event.Module, terminal.BuildStateTerminated) - - case ModuleBuildWaiting: - terminal.UpdateModuleState(ctx, event.Config.Module, terminal.BuildStateWaiting) - case ModuleBuildStarted: - terminal.UpdateModuleState(ctx, event.Config.Module, terminal.BuildStateBuilding) - case ModuleBuildSuccess: - terminal.UpdateModuleState(ctx, event.Config.Module, terminal.BuildStateBuilt) - case ModuleBuildFailed: - terminal.UpdateModuleState(ctx, event.Config.Module, terminal.BuildStateFailed) - - case ModuleDeployStarted: - terminal.UpdateModuleState(ctx, event.Module, terminal.BuildStateDeploying) - case ModuleDeploySuccess: - terminal.UpdateModuleState(ctx, event.Module, terminal.BuildStateDeployed) - case ModuleDeployFailed: - terminal.UpdateModuleState(ctx, event.Module, terminal.BuildStateFailed) - } - case <-ctx.Done(): - return + for event := range channels.IterContext(ctx, events) { + switch event := event.(type) { + case EngineStarted: + case EngineEnded: + + case ModuleAdded: + terminal.UpdateModuleState(ctx, event.Module, terminal.BuildStateWaiting) + case ModuleRemoved: + terminal.UpdateModuleState(ctx, event.Module, terminal.BuildStateTerminated) + + case ModuleBuildWaiting: + terminal.UpdateModuleState(ctx, event.Config.Module, terminal.BuildStateWaiting) + case ModuleBuildStarted: + terminal.UpdateModuleState(ctx, event.Config.Module, terminal.BuildStateBuilding) + case ModuleBuildSuccess: + terminal.UpdateModuleState(ctx, event.Config.Module, terminal.BuildStateBuilt) + case ModuleBuildFailed: + terminal.UpdateModuleState(ctx, event.Config.Module, terminal.BuildStateFailed) + + case ModuleDeployStarted: + terminal.UpdateModuleState(ctx, event.Module, terminal.BuildStateDeploying) + case ModuleDeploySuccess: + terminal.UpdateModuleState(ctx, event.Module, terminal.BuildStateDeployed) + case ModuleDeployFailed: + terminal.UpdateModuleState(ctx, event.Module, terminal.BuildStateFailed) } } }() diff --git a/internal/channels/itercontext.go b/internal/channels/itercontext.go new file mode 100644 index 0000000000..7fc9decca6 --- /dev/null +++ b/internal/channels/itercontext.go @@ -0,0 +1,27 @@ +package channels + +import ( + "context" + "iter" +) + +// IterContext returns an iterator that iterates over the channel until the channel is closed or the context cancelled. +// +// Check ctx.Err() != nil to detect if the context was cancelled. +func IterContext[T any](ctx context.Context, ch <-chan T) iter.Seq[T] { + return func(yield func(T) bool) { + for { + select { + case <-ctx.Done(): + return + case v, ok := <-ch: + if !ok { + return + } + if !yield(v) { + return + } + } + } + } +} diff --git a/internal/lsp/lsp.go b/internal/lsp/lsp.go index 72c519258d..68b65edf85 100644 --- a/internal/lsp/lsp.go +++ b/internal/lsp/lsp.go @@ -19,6 +19,7 @@ import ( "github.com/block/ftl/common/builderrors" ftlErrors "github.com/block/ftl/common/errors" "github.com/block/ftl/internal/buildengine" + "github.com/block/ftl/internal/channels" "github.com/block/ftl/internal/log" ) @@ -78,42 +79,36 @@ func (s *Server) Subscribe(ctx context.Context, topic *pubsub.Topic[buildengine. topic.Subscribe(events) go func() { defer topic.Unsubscribe(events) - for { - select { - case <-ctx.Done(): - return - - case event := <-events: - switch event := event.(type) { - case buildengine.EngineStarted: - s.publishBuildState(buildStateBuilding, nil) - - case buildengine.EngineEnded: - if len(event.ModuleErrors) == 0 { - s.publishBuildState(buildStateSuccess, nil) - continue - } - errs := []error{} - for module, e := range event.ModuleErrors { - errs = append(errs, fmt.Errorf("%s: %w", module, e)) - } - s.publishBuildState(buildStateFailure, errors.Join(errs...)) - - case buildengine.ModuleBuildStarted: - dirURI := "file://" + event.Config.Dir - - s.diagnostics.Range(func(uri protocol.DocumentUri, diagnostics []protocol.Diagnostic) bool { - if strings.HasPrefix(uri, dirURI) { - s.diagnostics.Delete(uri) - s.publishDiagnostics(uri, []protocol.Diagnostic{}) - } - return true - }) - - case buildengine.ModuleBuildSuccess, buildengine.ModuleBuildFailed, buildengine.ModuleAdded, - buildengine.ModuleRemoved, buildengine.ModuleBuildWaiting, buildengine.ModuleDeployStarted, - buildengine.ModuleDeploySuccess, buildengine.ModuleDeployFailed: + for event := range channels.IterContext(ctx, events) { + switch event := event.(type) { + case buildengine.EngineStarted: + s.publishBuildState(buildStateBuilding, nil) + + case buildengine.EngineEnded: + if len(event.ModuleErrors) == 0 { + s.publishBuildState(buildStateSuccess, nil) + continue } + errs := []error{} + for module, e := range event.ModuleErrors { + errs = append(errs, fmt.Errorf("%s: %w", module, e)) + } + s.publishBuildState(buildStateFailure, errors.Join(errs...)) + + case buildengine.ModuleBuildStarted: + dirURI := "file://" + event.Config.Dir + + s.diagnostics.Range(func(uri protocol.DocumentUri, diagnostics []protocol.Diagnostic) bool { + if strings.HasPrefix(uri, dirURI) { + s.diagnostics.Delete(uri) + s.publishDiagnostics(uri, []protocol.Diagnostic{}) + } + return true + }) + + case buildengine.ModuleBuildSuccess, buildengine.ModuleBuildFailed, buildengine.ModuleAdded, + buildengine.ModuleRemoved, buildengine.ModuleBuildWaiting, buildengine.ModuleDeployStarted, + buildengine.ModuleDeploySuccess, buildengine.ModuleDeployFailed: } } }() diff --git a/internal/routing/routing.go b/internal/routing/routing.go index 90f7db8a12..0a83478d01 100644 --- a/internal/routing/routing.go +++ b/internal/routing/routing.go @@ -9,6 +9,7 @@ import ( "github.com/alecthomas/types/pubsub" "github.com/block/ftl/common/schema" + "github.com/block/ftl/internal/channels" "github.com/block/ftl/internal/log" "github.com/block/ftl/internal/model" "github.com/block/ftl/internal/schema/schemaeventsource" @@ -36,27 +37,21 @@ func New(ctx context.Context, changes schemaeventsource.EventSource) *RouteTable } func (r *RouteTable) run(ctx context.Context, changes schemaeventsource.EventSource) { - for { - select { - case <-ctx.Done(): - return - - case <-changes.Events(): - old := r.routes.Load() - routes := extractRoutes(ctx, changes.View()) - for module, rd := range old.moduleToDeployment { - if old.byDeployment[rd.String()] != routes.byDeployment[rd.String()] { - r.changeNotification.Publish(module) - } + for range channels.IterContext(ctx, changes.Events()) { + old := r.routes.Load() + routes := extractRoutes(ctx, changes.View()) + for module, rd := range old.moduleToDeployment { + if old.byDeployment[rd.String()] != routes.byDeployment[rd.String()] { + r.changeNotification.Publish(module) } - for module, rd := range routes.moduleToDeployment { - // Check for new modules - if old.byDeployment[rd.String()] == nil { - r.changeNotification.Publish(module) - } + } + for module, rd := range routes.moduleToDeployment { + // Check for new modules + if old.byDeployment[rd.String()] == nil { + r.changeNotification.Publish(module) } - r.routes.Store(routes) } + r.routes.Store(routes) } } diff --git a/internal/routing/verb_routing.go b/internal/routing/verb_routing.go index e64c509478..3a95caca58 100644 --- a/internal/routing/verb_routing.go +++ b/internal/routing/verb_routing.go @@ -15,6 +15,7 @@ import ( "github.com/block/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect" "github.com/block/ftl/backend/timeline" "github.com/block/ftl/common/schema" + "github.com/block/ftl/internal/channels" "github.com/block/ftl/internal/log" "github.com/block/ftl/internal/model" "github.com/block/ftl/internal/rpc" @@ -90,13 +91,8 @@ func NewVerbRouterFromTable(ctx context.Context, routeTable *RouteTable, timelin } routeUpdates := svc.routingTable.Subscribe() go func() { - for { - select { - case <-ctx.Done(): - return - case module := <-routeUpdates: - svc.moduleClients.Delete(module) - } + for module := range channels.IterContext(ctx, routeUpdates) { + svc.moduleClients.Delete(module) } }() return svc diff --git a/internal/schema/schemaeventsource/schemaeventsource_test.go b/internal/schema/schemaeventsource/schemaeventsource_test.go index b7f1853761..b4e6426ce6 100644 --- a/internal/schema/schemaeventsource/schemaeventsource_test.go +++ b/internal/schema/schemaeventsource/schemaeventsource_test.go @@ -17,6 +17,7 @@ import ( ftlv1 "github.com/block/ftl/backend/protos/xyz/block/ftl/v1" "github.com/block/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect" "github.com/block/ftl/common/schema" + "github.com/block/ftl/internal/channels" "github.com/block/ftl/internal/log" "github.com/block/ftl/internal/model" "github.com/block/ftl/internal/rpc" @@ -189,14 +190,9 @@ func (m *mockSchemaService) Ping(context.Context, *connect.Request[ftlv1.PingReq } func (m *mockSchemaService) PullSchema(ctx context.Context, req *connect.Request[ftlv1.PullSchemaRequest], resp *connect.ServerStream[ftlv1.PullSchemaResponse]) error { - for { - select { - case <-ctx.Done(): - return nil - case change := <-m.changes: - if err := resp.Send(change); err != nil { - return fmt.Errorf("send change: %w", err) - } + for change := range channels.IterContext(ctx, m.changes) { + if err := resp.Send(change); err != nil { + return fmt.Errorf("send change: %w", err) } } }