diff --git a/backend/controller/pubsub/integration_test.go b/backend/controller/pubsub/integration_test.go index 2ac2140fdf..bd6893ac17 100644 --- a/backend/controller/pubsub/integration_test.go +++ b/backend/controller/pubsub/integration_test.go @@ -106,3 +106,17 @@ func TestRetry(t *testing.T) { 1+retriesPerCall), ) } + +func TestExternalPublishRuntimeCheck(t *testing.T) { + in.Run(t, "", + in.CopyModule("publisher"), + in.CopyModule("subscriber"), + in.Deploy("publisher"), + in.Deploy("subscriber"), + + in.ExpectError( + in.Call("subscriber", "publishToExternalModule", in.Obj{}, func(t testing.TB, resp in.Obj) {}), + "can not publish to another module's topic", + ), + ) +} diff --git a/backend/controller/pubsub/testdata/go/subscriber/subscriber.go b/backend/controller/pubsub/testdata/go/subscriber/subscriber.go index 70a93211c0..6d425d6cd1 100644 --- a/backend/controller/pubsub/testdata/go/subscriber/subscriber.go +++ b/backend/controller/pubsub/testdata/go/subscriber/subscriber.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "ftl/publisher" + "time" "github.com/TBD54566975/ftl/go-runtime/ftl" // Import the FTL SDK. ) @@ -25,3 +26,10 @@ var _ = ftl.Subscription(publisher.Topic2, "doomed_subscription") func ConsumeButFailAndRetry(ctx context.Context, req publisher.PubSubEvent) error { return fmt.Errorf("always error: event %v", req.Time) } + +//ftl:verb +func PublishToExternalModule(ctx context.Context) error { + // Get around compile-time checks + var topic = publisher.Test_topic + return topic.Publish(ctx, publisher.PubSubEvent{Time: time.Now()}) +} diff --git a/go-runtime/ftl/ftltest/fake.go b/go-runtime/ftl/ftltest/fake.go index 0759a40325..e3ebe1d5d0 100644 --- a/go-runtime/ftl/ftltest/fake.go +++ b/go-runtime/ftl/ftltest/fake.go @@ -7,30 +7,72 @@ import ( "reflect" "strings" + "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/common/configuration" "github.com/TBD54566975/ftl/go-runtime/internal" + "github.com/alecthomas/types/optional" ) +// pubSubEvent is a sum type for all events that can be published to the pubsub system. +// not to be confused with an event that gets published to a topic +// +//sumtype:decl +type pubSubEvent interface { + // cronJobEvent is a marker to ensure that all events implement the interface. + pubSubEvent() +} + +// publishEvent holds an event to be published to a topic +type publishEvent struct { + topic *schema.Ref + content any +} + +func (publishEvent) pubSubEvent() {} + +// subscriptionDidConsumeEvent indicates that a call to a subscriber has completed +type subscriptionDidConsumeEvent struct { + subscription string + err error +} + +func (subscriptionDidConsumeEvent) pubSubEvent() {} + +type subscription struct { + name string + topic *schema.Ref + cursor optional.Option[int] + isExecuting bool + errors map[int]error +} + +type subscriber func(context.Context, any) error + type fakeFTL struct { - fsm *fakeFSMManager + fsm *fakeFSMManager + mockMaps map[uintptr]mapImpl allowMapCalls bool configValues map[string][]byte secretValues map[string][]byte + pubSub *fakePubSub } // mapImpl is a function that takes an object and returns an object of a potentially different // type but is not constrained by input/output type like ftl.Map. type mapImpl func(context.Context) (any, error) -func newFakeFTL() *fakeFTL { - return &fakeFTL{ +func newFakeFTL(ctx context.Context) *fakeFTL { + fake := &fakeFTL{ fsm: newFakeFSMManager(), mockMaps: map[uintptr]mapImpl{}, allowMapCalls: false, configValues: map[string][]byte{}, secretValues: map[string][]byte{}, + pubSub: newFakePubSub(ctx), } + + return fake } var _ internal.FTL = &fakeFTL{} @@ -73,10 +115,6 @@ func (f *fakeFTL) FSMSend(ctx context.Context, fsm string, instance string, even return f.fsm.SendEvent(ctx, fsm, instance, event) } -func (f *fakeFTL) PublishEvent(ctx context.Context, topic string, event any) error { - panic("not implemented") -} - // addMapMock saves a new mock of ftl.Map to the internal map in fakeFTL. // // mockMap provides the whole mock implemention, so it gets called in place of both `fn` @@ -121,3 +159,7 @@ func actuallyCallMap(ctx context.Context, impl mapImpl) any { } return out } + +func (f *fakeFTL) PublishEvent(ctx context.Context, topic *schema.Ref, event any) error { + return f.pubSub.publishEvent(topic, event) +} diff --git a/go-runtime/ftl/ftltest/ftltest.go b/go-runtime/ftl/ftltest/ftltest.go index 3a6761017d..423069b774 100644 --- a/go-runtime/ftl/ftltest/ftltest.go +++ b/go-runtime/ftl/ftltest/ftltest.go @@ -41,7 +41,7 @@ func Context(options ...Option) context.Context { } ctx := log.ContextWithNewDefaultLogger(context.Background()) - ctx = internal.WithContext(ctx, newFakeFTL()) + ctx = internal.WithContext(ctx, newFakeFTL(ctx)) name := reflection.Module() for _, option := range options { @@ -371,3 +371,61 @@ func getDSNFromSecret(ftl internal.FTL, module, name string) (string, error) { } return dsn, nil } + +// WithSubscriber adds a subscriber during a test +// +// By default, all subscribers are disabled in unit tests, and must be manually enabled by calling WithSubscriber(…). +// This allows easy isolation for each unit test. +// +// WithSubscriber(…) can also be used to make an ad-hoc subscriber for your test by defining a new function as the sink. +// +// To be used when setting up a context for a test: +// +// ctx := ftltest.Context( +// ftltest.WithSubscriber(paymentTopic, ProcessPayment), +// // ... other options +// ) +func WithSubscriber[E any](subscription ftl.SubscriptionHandle[E], sink ftl.Sink[E]) Option { + return func(ctx context.Context, state *OptionsState) error { + fftl := internal.FromContext(ctx).(*fakeFTL) //nolint:forcetypeassert + addSubscriber(fftl.pubSub, subscription, sink) + return nil + } +} + +// EventsForTopic returns all published events for a topic +func EventsForTopic[E any](ctx context.Context, topic ftl.TopicHandle[E]) []E { + fftl := internal.FromContext(ctx).(*fakeFTL) //nolint:forcetypeassert + return eventsForTopic(ctx, fftl.pubSub, topic) +} + +type SubscriptionResult[E any] struct { + Event E + Error ftl.Option[error] +} + +// ResultsForSubscription returns all consumed events for a subscription, with any resulting errors +func ResultsForSubscription[E any](ctx context.Context, subscription ftl.SubscriptionHandle[E]) []SubscriptionResult[E] { + fftl := internal.FromContext(ctx).(*fakeFTL) //nolint:forcetypeassert + return resultsForSubscription(ctx, fftl.pubSub, subscription) +} + +// ErrorsForSubscription returns all errors encountered while consuming events for a subscription +func ErrorsForSubscription[E any](ctx context.Context, subscription ftl.SubscriptionHandle[E]) []error { + errs := []error{} + for _, result := range ResultsForSubscription(ctx, subscription) { + if err, ok := result.Error.Get(); ok { + errs = append(errs, err) + } + } + return errs +} + +// WaitForSubscriptionsToComplete waits until all subscriptions have consumed all events +// +// Subscriptions with no manually activated subscribers are ignored. +// Make sure you have called WithSubscriber(…) for all subscriptions you want to wait for. +func WaitForSubscriptionsToComplete(ctx context.Context) { + fftl := internal.FromContext(ctx).(*fakeFTL) //nolint:forcetypeassert + fftl.pubSub.waitForSubscriptionsToComplete(ctx) +} diff --git a/go-runtime/ftl/ftltest/ftltest_integration_test.go b/go-runtime/ftl/ftltest/ftltest_integration_test.go index 0918c1a514..3b7df64ab3 100644 --- a/go-runtime/ftl/ftltest/ftltest_integration_test.go +++ b/go-runtime/ftl/ftltest/ftltest_integration_test.go @@ -14,8 +14,12 @@ func TestModuleUnitTests(t *testing.T) { in.CopyModule("time"), in.CopyModule("wrapped"), in.CopyModule("verbtypes"), - in.Build("time", "wrapped", "verbtypes"), + in.CopyModule("pubsub"), + in.CopyModule("subscriber"), + in.Build("time", "wrapped", "verbtypes", "pubsub", "subscriber"), in.ExecModuleTest("wrapped"), in.ExecModuleTest("verbtypes"), + in.ExecModuleTest("pubsub"), + in.ExecModuleTest("subscriber"), ) } diff --git a/go-runtime/ftl/ftltest/pubsub.go b/go-runtime/ftl/ftltest/pubsub.go new file mode 100644 index 0000000000..817708ad3b --- /dev/null +++ b/go-runtime/ftl/ftltest/pubsub.go @@ -0,0 +1,276 @@ +package ftltest + +import ( + "context" + "fmt" + "math/rand" + "strings" + "sync" + "time" + + "github.com/TBD54566975/ftl/backend/schema" + "github.com/TBD54566975/ftl/go-runtime/ftl" + "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/slices" + "github.com/alecthomas/types/optional" + "github.com/alecthomas/types/pubsub" +) + +type fakePubSub struct { + // all pubsub events are processed through globalTopic + globalTopic *pubsub.Topic[pubSubEvent] + // publishWaitGroup can be used to wait for all events to be published + publishWaitGroup sync.WaitGroup + + // pubSubLock required to access [topics, subscriptions, subscribers] + pubSubLock sync.Mutex + topics map[schema.RefKey][]any + subscriptions map[string]*subscription + subscribers map[string][]subscriber +} + +func newFakePubSub(ctx context.Context) *fakePubSub { + f := &fakePubSub{ + globalTopic: pubsub.New[pubSubEvent](), + topics: map[schema.RefKey][]any{}, + subscriptions: map[string]*subscription{}, + subscribers: map[string][]subscriber{}, + } + f.watchPubSub(ctx) + return f +} + +func (f *fakePubSub) publishEvent(topic *schema.Ref, event any) error { + f.publishWaitGroup.Add(1) + return f.globalTopic.PublishSync(publishEvent{topic: topic, content: event}) +} + +// addSubscriber adds a subscriber to the fake FTL instance. Each subscriber included in the test must be manually added +func addSubscriber[E any](f *fakePubSub, sub ftl.SubscriptionHandle[E], sink ftl.Sink[E]) { + f.pubSubLock.Lock() + defer f.pubSubLock.Unlock() + + if _, ok := f.subscriptions[sub.Name]; !ok { + f.subscriptions[sub.Name] = &subscription{ + name: sub.Name, + topic: sub.Topic, + errors: map[int]error{}, + } + } + + f.subscribers[sub.Name] = append(f.subscribers[sub.Name], func(ctx context.Context, event any) error { + if event, ok := event.(E); ok { + return sink(ctx, event) + } + return fmt.Errorf("unexpected event type %T for subscription %s", event, sub.Name) + }) +} + +// eventsForTopic returns all events published to a topic +func eventsForTopic[E any](ctx context.Context, f *fakePubSub, topic ftl.TopicHandle[E]) []E { + // Make sure all published events make it into our pubsub state + f.publishWaitGroup.Wait() + + f.pubSubLock.Lock() + defer f.pubSubLock.Unlock() + + logger := log.FromContext(ctx).Scope("pubsub") + var events = []E{} + raw, ok := f.topics[topic.Ref.ToRefKey()] + if !ok { + return events + } + for _, e := range raw { + if event, ok := e.(E); ok { + events = append(events, event) + } else { + logger.Warnf("unexpected event type %T for topic %s", e, topic.Ref) + } + } + return events +} + +// resultsForSubscription returns all consumed events and whether an error was returned +func resultsForSubscription[E any](ctx context.Context, f *fakePubSub, handle ftl.SubscriptionHandle[E]) []SubscriptionResult[E] { + f.pubSubLock.Lock() + defer f.pubSubLock.Unlock() + + logger := log.FromContext(ctx).Scope("pubsub") + results := []SubscriptionResult[E]{} + + subscription, ok := f.subscriptions[handle.Name] + if !ok { + return results + } + topic, ok := f.topics[handle.Topic.ToRefKey()] + if !ok { + return results + } + + count := subscription.cursor.Default(-1) + if !subscription.isExecuting { + count++ + } + for i := range count { + e := topic[i] + if event, ok := e.(E); ok { + result := SubscriptionResult[E]{ + Event: event, + } + if err, ok := subscription.errors[i]; ok { + result.Error = ftl.Some(err) + } + results = append(results, result) + } else { + logger.Warnf("unexpected event type %T for subscription %s", e, handle.Name) + } + + } + return results +} + +func (f *fakePubSub) watchPubSub(ctx context.Context) { + events := make(chan pubSubEvent, 128) + f.globalTopic.Subscribe(events) + go func() { + defer f.globalTopic.Unsubscribe(events) + for { + select { + case e := <-events: + f.handlePubSubEvent(ctx, e) + case <-ctx.Done(): + return + } + } + }() +} + +func (f *fakePubSub) handlePubSubEvent(ctx context.Context, e pubSubEvent) { + f.pubSubLock.Lock() + defer f.pubSubLock.Unlock() + + logger := log.FromContext(ctx).Scope("pubsub") + + switch event := e.(type) { + case publishEvent: + logger.Debugf("publishing to %s: %v", event.topic.Name, event.content) + if _, ok := f.topics[event.topic.ToRefKey()]; !ok { + f.topics[event.topic.ToRefKey()] = []any{event.content} + } else { + f.topics[event.topic.ToRefKey()] = append(f.topics[event.topic.ToRefKey()], event.content) + } + f.publishWaitGroup.Done() + case subscriptionDidConsumeEvent: + sub, ok := f.subscriptions[event.subscription] + if !ok { + panic(fmt.Sprintf("subscription %q not found", event.subscription)) + } + if event.err != nil { + sub.errors[sub.cursor.MustGet()] = event.err + } + sub.isExecuting = false + } + + for _, sub := range f.subscriptions { + if sub.isExecuting { + // already executing + continue + } + topicEvents, ok := f.topics[sub.topic.ToRefKey()] + if !ok { + // no events publshed yet + continue + } + var cursor = sub.cursor.Default(-1) + if len(topicEvents) <= cursor+1 { + // no new events + continue + } + subscribers, ok := f.subscribers[sub.name] + if !ok || len(subscribers) == 0 { + // no subscribers + continue + } + chosenSubscriber := subscribers[rand.Intn(len(subscribers))] //nolint:gosec + + sub.cursor = optional.Some(cursor + 1) + sub.isExecuting = true + + go func(sub string, chosenSubscriber subscriber, event any) { + err := chosenSubscriber(ctx, event) + f.globalTopic.Publish(subscriptionDidConsumeEvent{subscription: sub, err: err}) + }(sub.name, chosenSubscriber, topicEvents[sub.cursor.MustGet()]) + } +} + +// waitForSubscriptionsToComplete waits for all subscriptions to consume all events. +// subscriptions with no subscribers are ignored. +// logs what which subscriptions are blocking every 2s. +func (f *fakePubSub) waitForSubscriptionsToComplete(ctx context.Context) { + startTime := time.Now() + nextLogTime := startTime.Add(2 * time.Second) + + // Make sure all published events make it into our pubsub state + f.publishWaitGroup.Wait() + + for { + shouldPrint := time.Now().After(nextLogTime) + if f.checkSubscriptionsAreComplete(ctx, shouldPrint, startTime) { + return + } + if shouldPrint { + nextLogTime = time.Now().Add(2 * time.Second) + } + time.Sleep(200 * time.Millisecond) + } +} + +func (f *fakePubSub) checkSubscriptionsAreComplete(ctx context.Context, shouldPrint bool, startTime time.Time) bool { + f.pubSubLock.Lock() + defer f.pubSubLock.Unlock() + + type remainingState struct { + name string + isExecuting bool + pendingEvents int + } + remaining := []remainingState{} + for _, sub := range f.subscriptions { + topicEvents, ok := f.topics[sub.topic.ToRefKey()] + if !ok { + // no events publshed yet + continue + } + var cursor = sub.cursor.Default(-1) + if !sub.isExecuting && len(topicEvents) <= cursor+1 { + // all events have been consumed + continue + } + subscribers, ok := f.subscribers[sub.name] + if !ok || len(subscribers) == 0 { + // no subscribers + continue + } + remaining = append(remaining, remainingState{ + name: sub.name, + isExecuting: sub.isExecuting, + pendingEvents: len(topicEvents) - cursor - 1, + }) + } + if len(remaining) == 0 { + // not waiting on any more subscriptions + return true + } + if shouldPrint { + // print out what we are waiting on + logger := log.FromContext(ctx).Scope("pubsub") + logger.Debugf("waiting on subscriptions to complete after %ds:\n%s", int(time.Until(startTime).Seconds()*-1), strings.Join(slices.Map(remaining, func(r remainingState) string { + var suffix string + if r.isExecuting { + suffix = ", 1 executing" + } + return fmt.Sprintf(" %s: %d events pending%s", r.name, r.pendingEvents, suffix) + }), "\n")) + } + return false +} diff --git a/go-runtime/ftl/ftltest/testdata/go/pubsub/ftl.toml b/go-runtime/ftl/ftltest/testdata/go/pubsub/ftl.toml new file mode 100644 index 0000000000..4e493b26ce --- /dev/null +++ b/go-runtime/ftl/ftltest/testdata/go/pubsub/ftl.toml @@ -0,0 +1,2 @@ +module = "pubsub" +language = "go" diff --git a/go-runtime/ftl/ftltest/testdata/go/pubsub/go.mod b/go-runtime/ftl/ftltest/testdata/go/pubsub/go.mod new file mode 100644 index 0000000000..5ddd398d16 --- /dev/null +++ b/go-runtime/ftl/ftltest/testdata/go/pubsub/go.mod @@ -0,0 +1,64 @@ +module ftl/pubsub + +go 1.22.2 + +toolchain go1.22.3 + +require ( + github.com/TBD54566975/ftl v1.1.5 + github.com/alecthomas/assert/v2 v2.10.0 + github.com/alecthomas/atomic v0.1.0-alpha2 +) + +require ( + connectrpc.com/connect v1.16.1 // indirect + connectrpc.com/grpcreflect v1.2.0 // indirect + connectrpc.com/otelconnect v0.7.0 // indirect + github.com/BurntSushi/toml v1.4.0 // indirect + github.com/TBD54566975/scaffolder v1.0.0 // indirect + github.com/alecthomas/concurrency v0.0.2 // indirect + github.com/alecthomas/participle/v2 v2.1.1 // indirect + github.com/alecthomas/repr v0.4.0 // indirect + github.com/alecthomas/types v0.16.0 // indirect + github.com/alessio/shellescape v1.4.2 // indirect + github.com/amacneil/dbmate/v2 v2.16.0 // indirect + github.com/aws/aws-sdk-go-v2 v1.27.0 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.7 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.7 // indirect + github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.29.1 // indirect + github.com/aws/smithy-go v1.20.2 // indirect + github.com/danieljoos/wincred v1.2.0 // indirect + github.com/deckarep/golang-set/v2 v2.6.0 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/godbus/dbus/v5 v5.1.0 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/hexops/gotextdiff v1.0.3 // indirect + github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect + github.com/jackc/pgx/v5 v5.6.0 // indirect + github.com/jackc/puddle/v2 v2.2.1 // indirect + github.com/jpillora/backoff v1.0.0 // indirect + github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect + github.com/lib/pq v1.10.9 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/multiformats/go-base36 v0.2.0 // indirect + github.com/puzpuzpuz/xsync/v3 v3.1.0 // indirect + github.com/swaggest/jsonschema-go v0.3.70 // indirect + github.com/swaggest/refl v1.3.0 // indirect + github.com/zalando/go-keyring v0.2.4 // indirect + go.opentelemetry.io/otel v1.27.0 // indirect + go.opentelemetry.io/otel/metric v1.27.0 // indirect + go.opentelemetry.io/otel/trace v1.27.0 // indirect + golang.org/x/crypto v0.24.0 // indirect + golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect + golang.org/x/mod v0.18.0 // indirect + golang.org/x/net v0.26.0 // indirect + golang.org/x/sync v0.7.0 // indirect + golang.org/x/sys v0.21.0 // indirect + golang.org/x/text v0.16.0 // indirect + google.golang.org/protobuf v1.34.1 // indirect +) + +replace github.com/TBD54566975/ftl => ./../../../../../.. diff --git a/go-runtime/ftl/ftltest/testdata/go/pubsub/go.sum b/go-runtime/ftl/ftltest/testdata/go/pubsub/go.sum new file mode 100644 index 0000000000..fa8c86490c --- /dev/null +++ b/go-runtime/ftl/ftltest/testdata/go/pubsub/go.sum @@ -0,0 +1,192 @@ +connectrpc.com/connect v1.16.1 h1:rOdrK/RTI/7TVnn3JsVxt3n028MlTRwmK5Q4heSpjis= +connectrpc.com/connect v1.16.1/go.mod h1:XpZAduBQUySsb4/KO5JffORVkDI4B6/EYPi7N8xpNZw= +connectrpc.com/grpcreflect v1.2.0 h1:Q6og1S7HinmtbEuBvARLNwYmTbhEGRpHDhqrPNlmK+U= +connectrpc.com/grpcreflect v1.2.0/go.mod h1:nwSOKmE8nU5u/CidgHtPYk1PFI3U9ignz7iDMxOYkSY= +connectrpc.com/otelconnect v0.7.0 h1:ZH55ZZtcJOTKWWLy3qmL4Pam4RzRWBJFOqTPyAqCXkY= +connectrpc.com/otelconnect v0.7.0/go.mod h1:Bt2ivBymHZHqxvo4HkJ0EwHuUzQN6k2l0oH+mp/8nwc= +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= +github.com/BurntSushi/toml v1.4.0 h1:kuoIxZQy2WRRk1pttg9asf+WVv6tWQuBNVmK8+nqPr0= +github.com/BurntSushi/toml v1.4.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= +github.com/TBD54566975/scaffolder v1.0.0 h1:QUFSy2wVzumLDg7IHcKC6AP+IYyqWe9Wxiu72nZn5qU= +github.com/TBD54566975/scaffolder v1.0.0/go.mod h1:auVpczIbOAdIhYDVSruIw41DanxOKB9bSvjf6MEl7Fs= +github.com/alecthomas/assert/v2 v2.10.0 h1:jjRCHsj6hBJhkmhznrCzoNpbA3zqy0fYiUcYZP/GkPY= +github.com/alecthomas/assert/v2 v2.10.0/go.mod h1:Bze95FyfUr7x34QZrjL+XP+0qgp/zg8yS+TtBj1WA3k= +github.com/alecthomas/atomic v0.1.0-alpha2 h1:dqwXmax66gXvHhsOS4pGPZKqYOlTkapELkLb3MNdlH8= +github.com/alecthomas/atomic v0.1.0-alpha2/go.mod h1:zD6QGEyw49HIq19caJDc2NMXAy8rNi9ROrxtMXATfyI= +github.com/alecthomas/concurrency v0.0.2 h1:Q3kGPtLbleMbH9lHX5OBFvJygfyFw29bXZKBg+IEVuo= +github.com/alecthomas/concurrency v0.0.2/go.mod h1:GmuQb/iHX7mbNtPlC/WDzEFxDMB0HYFer2Qda9QTs7w= +github.com/alecthomas/participle/v2 v2.1.1 h1:hrjKESvSqGHzRb4yW1ciisFJ4p3MGYih6icjJvbsmV8= +github.com/alecthomas/participle/v2 v2.1.1/go.mod h1:Y1+hAs8DHPmc3YUFzqllV+eSQ9ljPTk0ZkPMtEdAx2c= +github.com/alecthomas/repr v0.4.0 h1:GhI2A8MACjfegCPVq9f1FLvIBS+DrQ2KQBFZP1iFzXc= +github.com/alecthomas/repr v0.4.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4= +github.com/alecthomas/types v0.16.0 h1:o9+JSwCRB6DDaWDeR/Mg7v/zh3R+MlknM6DrnDyY7U0= +github.com/alecthomas/types v0.16.0/go.mod h1:Tswm0qQpjpVq8rn70OquRsUtFxbQKub/8TMyYYGI0+k= +github.com/alessio/shellescape v1.4.2 h1:MHPfaU+ddJ0/bYWpgIeUnQUqKrlJ1S7BfEYPM4uEoM0= +github.com/alessio/shellescape v1.4.2/go.mod h1:PZAiSCk0LJaZkiCSkPv8qIobYglO3FPpyFjDCtHLS30= +github.com/amacneil/dbmate/v2 v2.16.0 h1:uIah9qFOxA9cIyS1TuPMUgn0KVnwRRSx0MhKu7sFpcI= +github.com/amacneil/dbmate/v2 v2.16.0/go.mod h1:TlxMqFDBxsLVertnF2cAxZxPyt9UjlCrt04iy1UxRXc= +github.com/aws/aws-sdk-go-v2 v1.27.0 h1:7bZWKoXhzI+mMR/HjdMx8ZCC5+6fY0lS5tr0bbgiLlo= +github.com/aws/aws-sdk-go-v2 v1.27.0/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM= +github.com/aws/aws-sdk-go-v2/config v1.27.16 h1:knpCuH7laFVGYTNd99Ns5t+8PuRjDn4HnnZK48csipM= +github.com/aws/aws-sdk-go-v2/config v1.27.16/go.mod h1:vutqgRhDUktwSge3hrC3nkuirzkJ4E/mLj5GvI0BQas= +github.com/aws/aws-sdk-go-v2/credentials v1.17.16 h1:7d2QxY83uYl0l58ceyiSpxg9bSbStqBC6BeEeHEchwo= +github.com/aws/aws-sdk-go-v2/credentials v1.17.16/go.mod h1:Ae6li/6Yc6eMzysRL2BXlPYvnrLLBg3D11/AmOjw50k= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.3 h1:dQLK4TjtnlRGb0czOht2CevZ5l6RSyRWAnKeGd7VAFE= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.3/go.mod h1:TL79f2P6+8Q7dTsILpiVST+AL9lkF6PPGI167Ny0Cjw= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.7 h1:lf/8VTF2cM+N4SLzaYJERKEWAXq8MOMpZfU6wEPWsPk= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.7/go.mod h1:4SjkU7QiqK2M9oozyMzfZ/23LmUY+h3oFqhdeP5OMiI= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.7 h1:4OYVp0705xu8yjdyoWix0r9wPIRXnIzzOoUpQVHIJ/g= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.7/go.mod h1:vd7ESTEvI76T2Na050gODNmNU7+OyKrIKroYTu4ABiI= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0/go.mod h1:8tu/lYfQfFe6IGnaOdrpVgEL2IrrDOf6/m9RQum4NkY= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 h1:Ji0DY1xUsUr3I8cHps0G+XM3WWU16lP6yG8qu1GAZAs= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2/go.mod h1:5CsjAbs3NlGQyZNFACh+zztPDI7fU6eW9QsxjfnuBKg= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.9 h1:Wx0rlZoEJR7JwlSZcHnEa7CNjrSIyVxMFWGAaXy4fJY= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.9/go.mod h1:aVMHdE0aHO3v+f/iw01fmXV/5DbfQ3Bi9nN7nd9bE9Y= +github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.29.1 h1:NSWsFzdHN41mJ5I/DOFzxgkKSYNHQADHn7Mu+lU/AKw= +github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.29.1/go.mod h1:5mMk0DgUgaHlcqtN65fNyZI0ZDX3i9Cw+nwq75HKB3U= +github.com/aws/aws-sdk-go-v2/service/sso v1.20.9 h1:aD7AGQhvPuAxlSUfo0CWU7s6FpkbyykMhGYMvlqTjVs= +github.com/aws/aws-sdk-go-v2/service/sso v1.20.9/go.mod h1:c1qtZUWtygI6ZdvKppzCSXsDOq5I4luJPZ0Ud3juFCA= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.24.3 h1:Pav5q3cA260Zqez42T9UhIlsd9QeypszRPwC9LdSSsQ= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.24.3/go.mod h1:9lmoVDVLz/yUZwLaQ676TK02fhCu4+PgRSmMaKR1ozk= +github.com/aws/aws-sdk-go-v2/service/sts v1.28.10 h1:69tpbPED7jKPyzMcrwSvhWcJ9bPnZsZs18NT40JwM0g= +github.com/aws/aws-sdk-go-v2/service/sts v1.28.10/go.mod h1:0Aqn1MnEuitqfsCNyKsdKLhDUOr4txD/g19EfiUqgws= +github.com/aws/smithy-go v1.20.2 h1:tbp628ireGtzcHDDmLT/6ADHidqnwgF57XOXZe6tp4Q= +github.com/aws/smithy-go v1.20.2/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= +github.com/bool64/dev v0.2.34 h1:P9n315P8LdpxusnYQ0X7MP1CZXwBK5ae5RZrd+GdSZE= +github.com/bool64/dev v0.2.34/go.mod h1:iJbh1y/HkunEPhgebWRNcs8wfGq7sjvJ6W5iabL8ACg= +github.com/bool64/shared v0.1.5 h1:fp3eUhBsrSjNCQPcSdQqZxxh9bBwrYiZ+zOKFkM0/2E= +github.com/bool64/shared v0.1.5/go.mod h1:081yz68YC9jeFB3+Bbmno2RFWvGKv1lPKkMP6MHJlPs= +github.com/danieljoos/wincred v1.2.0 h1:ozqKHaLK0W/ii4KVbbvluM91W2H3Sh0BncbUNPS7jLE= +github.com/danieljoos/wincred v1.2.0/go.mod h1:FzQLLMKBFdvu+osBrnFODiv32YGwCfx0SkRa/eYHgec= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/deckarep/golang-set/v2 v2.6.0 h1:XfcQbWM1LlMB8BsJ8N9vW5ehnnPVIw0je80NsVHagjM= +github.com/deckarep/golang-set/v2 v2.6.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= +github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= +github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk= +github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw= +github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= +github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= +github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg= +github.com/iancoleman/orderedmap v0.3.0 h1:5cbR2grmZR/DiVt+VJopEhtVs9YGInGIxAoMJn+Ichc= +github.com/iancoleman/orderedmap v0.3.0/go.mod h1:XuLcCUkdL5owUCQeF2Ue9uuw1EptkJDkXXS7VoV7XGE= +github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 h1:Dj0L5fhJ9F82ZJyVOmBx6msDp/kfd1t9GRfny/mfJA0= +github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY= +github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw= +github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= +github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= +github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= +github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= +github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= +github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= +github.com/multiformats/go-base36 v0.2.0 h1:lFsAbNOGeKtuKozrtBsAkSVhv1p9D0/qedU9rQyccr0= +github.com/multiformats/go-base36 v0.2.0/go.mod h1:qvnKE++v+2MWCfePClUEjE78Z7P2a1UV0xHgWc0hkp4= +github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4= +github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= +github.com/otiai10/copy v1.14.0 h1:dCI/t1iTdYGtkvCuBG2BgR6KZa83PTclw4U5n2wAllU= +github.com/otiai10/copy v1.14.0/go.mod h1:ECfuL02W+/FkTWZWgQqXPWZgW9oeKCSQ5qVfSc4qc4w= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/puzpuzpuz/xsync/v3 v3.1.0 h1:EewKT7/LNac5SLiEblJeUu8z5eERHrmRLnMQL2d7qX4= +github.com/puzpuzpuz/xsync/v3 v3.1.0/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 h1:lZUw3E0/J3roVtGQ+SCrUrg3ON6NgVqpn3+iol9aGu4= +github.com/santhosh-tekuri/jsonschema/v5 v5.3.1/go.mod h1:uToXkOrWAZ6/Oc07xWQrPOhJotwFIyu2bBVN41fcDUY= +github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8= +github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/swaggest/assertjson v1.9.0 h1:dKu0BfJkIxv/xe//mkCrK5yZbs79jL7OVf9Ija7o2xQ= +github.com/swaggest/assertjson v1.9.0/go.mod h1:b+ZKX2VRiUjxfUIal0HDN85W0nHPAYUbYH5WkkSsFsU= +github.com/swaggest/jsonschema-go v0.3.70 h1:8Vx5nm5t/6DBFw2+WC0/Vp1ZVe9/4mpuA0tuAe0wwCI= +github.com/swaggest/jsonschema-go v0.3.70/go.mod h1:7N43/CwdaWgPUDfYV70K7Qm79tRqe/al7gLSt9YeGIE= +github.com/swaggest/refl v1.3.0 h1:PEUWIku+ZznYfsoyheF97ypSduvMApYyGkYF3nabS0I= +github.com/swaggest/refl v1.3.0/go.mod h1:3Ujvbmh1pfSbDYjC6JGG7nMgPvpG0ehQL4iNonnLNbg= +github.com/yudai/gojsondiff v1.0.0 h1:27cbfqXLVEJ1o8I6v3y9lg8Ydm53EKqHXAOMxEGlCOA= +github.com/yudai/gojsondiff v1.0.0/go.mod h1:AY32+k2cwILAkW1fbgxQ5mUmMiZFgLIV+FBNExI05xg= +github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 h1:BHyfKlQyqbsFN5p3IfnEUduWvb9is428/nNb5L3U01M= +github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82/go.mod h1:lgjkn3NuSvDfVJdfcVVdX+jpBxNmX4rDAzaS45IcYoM= +github.com/zalando/go-keyring v0.2.4 h1:wi2xxTqdiwMKbM6TWwi+uJCG/Tum2UV0jqaQhCa9/68= +github.com/zalando/go-keyring v0.2.4/go.mod h1:HL4k+OXQfJUWaMnqyuSOc0drfGPX2b51Du6K+MRgZMk= +github.com/zenizh/go-capturer v0.0.0-20211219060012-52ea6c8fed04 h1:qXafrlZL1WsJW5OokjraLLRURHiw0OzKHD/RNdspp4w= +github.com/zenizh/go-capturer v0.0.0-20211219060012-52ea6c8fed04/go.mod h1:FiwNQxz6hGoNFBC4nIx+CxZhI3nne5RmIOlT/MXcSD4= +go.opentelemetry.io/otel v1.27.0 h1:9BZoF3yMK/O1AafMiQTVu0YDj5Ea4hPhxCs7sGva+cg= +go.opentelemetry.io/otel v1.27.0/go.mod h1:DMpAK8fzYRzs+bi3rS5REupisuqTheUlSZJ1WnZaPAQ= +go.opentelemetry.io/otel/metric v1.27.0 h1:hvj3vdEKyeCi4YaYfNjv2NUje8FqKqUY8IlF0FxV/ik= +go.opentelemetry.io/otel/metric v1.27.0/go.mod h1:mVFgmRlhljgBiuk/MP/oKylr4hs85GZAylncepAX/ak= +go.opentelemetry.io/otel/sdk v1.27.0 h1:mlk+/Y1gLPLn84U4tI8d3GNJmGT/eXe3ZuOXN9kTWmI= +go.opentelemetry.io/otel/sdk v1.27.0/go.mod h1:Ha9vbLwJE6W86YstIywK2xFfPjbWlCuwPtMkKdz/Y4A= +go.opentelemetry.io/otel/sdk/metric v1.27.0 h1:5uGNOlpXi+Hbo/DRoI31BSb1v+OGcpv2NemcCrOL8gI= +go.opentelemetry.io/otel/sdk/metric v1.27.0/go.mod h1:we7jJVrYN2kh3mVBlswtPU22K0SA+769l93J6bsyvqw= +go.opentelemetry.io/otel/trace v1.27.0 h1:IqYb813p7cmbHk0a5y6pD5JPakbVfftRXABGt5/Rscw= +go.opentelemetry.io/otel/trace v1.27.0/go.mod h1:6RiD1hkAprV4/q+yd2ln1HG9GoPx39SuvvstaLBl+l4= +golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI= +golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM= +golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 h1:vr/HnozRka3pE4EsMEg1lgkXJkTFJCVUX+S/ZT6wYzM= +golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc= +golang.org/x/mod v0.18.0 h1:5+9lSbEzPSdWkH32vYPBwEpX8KwDbM52Ud9xBUvNlb0= +golang.org/x/mod v0.18.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= +golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= +google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6 h1:5D53IMaUuA5InSeMu9eJtlQXS2NxAhyWQvkKEgXZhHI= +modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6/go.mod h1:Qz0X07sNOR1jWYCrJMEnbW/X55x206Q7Vt4mz6/wHp4= +modernc.org/libc v1.50.9 h1:hIWf1uz55lorXQhfoEoezdUHjxzuO6ceshET/yWjSjk= +modernc.org/libc v1.50.9/go.mod h1:15P6ublJ9FJR8YQCGy8DeQ2Uwur7iW9Hserr/T3OFZE= +modernc.org/mathutil v1.6.0 h1:fRe9+AmYlaej+64JsEEhoWuAYBkOtQiMEU7n/XgfYi4= +modernc.org/mathutil v1.6.0/go.mod h1:Ui5Q9q1TR2gFm0AQRqQUaBWFLAhQpCwNcuhBOSedWPo= +modernc.org/memory v1.8.0 h1:IqGTL6eFMaDZZhEWwcREgeMXYwmW83LYW8cROZYkg+E= +modernc.org/memory v1.8.0/go.mod h1:XPZ936zp5OMKGWPqbD3JShgd/ZoQ7899TUuQqxY+peU= +modernc.org/sqlite v1.30.0 h1:8YhPUs/HTnlEgErn/jSYQTwHN/ex8CjHHjg+K9iG7LM= +modernc.org/sqlite v1.30.0/go.mod h1:cgkTARJ9ugeXSNaLBPK3CqbOe7Ec7ZhWPoMFGldEYEw= +modernc.org/strutil v1.2.0 h1:agBi9dp1I+eOnxXeiZawM8F4LawKv4NzGWSaLfyeNZA= +modernc.org/strutil v1.2.0/go.mod h1:/mdcBmfOibveCTBxUl5B5l6W+TTH1FXPLHZE6bTosX0= +modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= +modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= diff --git a/go-runtime/ftl/ftltest/testdata/go/pubsub/pubsub.go b/go-runtime/ftl/ftltest/testdata/go/pubsub/pubsub.go new file mode 100644 index 0000000000..7438c01f81 --- /dev/null +++ b/go-runtime/ftl/ftltest/testdata/go/pubsub/pubsub.go @@ -0,0 +1,30 @@ +package pubsub + +import ( + "context" + "fmt" + "time" + + "github.com/TBD54566975/ftl/go-runtime/ftl" + // Import the FTL SDK. +) + +//ftl:export +var topic = ftl.Topic[Event]("topic") +var subscription = ftl.Subscription(topic, "subscription") + +//ftl:data +type Event struct { + Value string +} + +//ftl:verb +func PublishToTopicOne(ctx context.Context, event Event) error { + return topic.Publish(ctx, event) +} + +//ftl:subscribe subscription +func ErrorsAfterASecond(ctx context.Context, event Event) error { + time.Sleep(1 * time.Second) + return fmt.Errorf("SubscriberThatFails always fails") +} diff --git a/go-runtime/ftl/ftltest/testdata/go/pubsub/pubsub_test.go b/go-runtime/ftl/ftltest/testdata/go/pubsub/pubsub_test.go new file mode 100644 index 0000000000..9778d03156 --- /dev/null +++ b/go-runtime/ftl/ftltest/testdata/go/pubsub/pubsub_test.go @@ -0,0 +1,55 @@ +package pubsub + +import ( + "context" + "strconv" + "testing" + + "github.com/alecthomas/atomic" + + "github.com/TBD54566975/ftl/go-runtime/ftl" + "github.com/TBD54566975/ftl/go-runtime/ftl/ftltest" + "github.com/alecthomas/assert/v2" +) + +func TestSubscriberReturningErrors(t *testing.T) { + // Test that we can publish multiple events, which will take time to consume, and that we track each error + ctx := ftltest.Context( + ftltest.WithSubscriber(subscription, ErrorsAfterASecond), + ) + count := 5 + for i := 0; i < count; i++ { + assert.NoError(t, PublishToTopicOne(ctx, Event{Value: strconv.Itoa(i)})) + } + ftltest.WaitForSubscriptionsToComplete(ctx) + assert.Equal(t, count, len(ftltest.ErrorsForSubscription(ctx, subscription))) + assert.Equal(t, count, len(ftltest.EventsForTopic(ctx, topic))) +} + +func TestMultipleMultipleFakeSubscribers(t *testing.T) { + // Test that multiple adhoc subscribers can be added for a subscription + count := 5 + var counter atomic.Value[int] + + ctx := ftltest.Context( + ftltest.WithSubscriber(subscription, func(ctx context.Context, event Event) error { + ftl.LoggerFromContext(ctx).Infof("Fake Subscriber A") + current := counter.Load() + counter.Store(current + 1) + return nil + }), + ftltest.WithSubscriber(subscription, func(ctx context.Context, event Event) error { + ftl.LoggerFromContext(ctx).Infof("Fake Subscriber B") + current := counter.Load() + counter.Store(current + 1) + return nil + }), + ) + for i := 0; i < count; i++ { + assert.NoError(t, PublishToTopicOne(ctx, Event{Value: strconv.Itoa(i)})) + } + ftltest.WaitForSubscriptionsToComplete(ctx) + assert.Equal(t, 0, len(ftltest.ErrorsForSubscription(ctx, subscription))) + assert.Equal(t, count, len(ftltest.EventsForTopic(ctx, topic))) + assert.Equal(t, count, counter.Load()) +} diff --git a/go-runtime/ftl/ftltest/testdata/go/subscriber/ftl.toml b/go-runtime/ftl/ftltest/testdata/go/subscriber/ftl.toml new file mode 100644 index 0000000000..5625d0edbf --- /dev/null +++ b/go-runtime/ftl/ftltest/testdata/go/subscriber/ftl.toml @@ -0,0 +1,2 @@ +module = "subscriber" +language = "go" diff --git a/go-runtime/ftl/ftltest/testdata/go/subscriber/go.mod b/go-runtime/ftl/ftltest/testdata/go/subscriber/go.mod new file mode 100644 index 0000000000..f1b8126809 --- /dev/null +++ b/go-runtime/ftl/ftltest/testdata/go/subscriber/go.mod @@ -0,0 +1,63 @@ +module ftl/subscriber + +go 1.22.2 + +toolchain go1.22.3 + +require ( + github.com/TBD54566975/ftl v1.1.5 + github.com/alecthomas/assert/v2 v2.10.0 +) + +require ( + connectrpc.com/connect v1.16.1 // indirect + connectrpc.com/grpcreflect v1.2.0 // indirect + connectrpc.com/otelconnect v0.7.0 // indirect + github.com/BurntSushi/toml v1.4.0 // indirect + github.com/TBD54566975/scaffolder v1.0.0 // indirect + github.com/alecthomas/concurrency v0.0.2 // indirect + github.com/alecthomas/participle/v2 v2.1.1 // indirect + github.com/alecthomas/repr v0.4.0 // indirect + github.com/alecthomas/types v0.16.0 // indirect + github.com/alessio/shellescape v1.4.2 // indirect + github.com/amacneil/dbmate/v2 v2.16.0 // indirect + github.com/aws/aws-sdk-go-v2 v1.27.0 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.7 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.7 // indirect + github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.29.1 // indirect + github.com/aws/smithy-go v1.20.2 // indirect + github.com/danieljoos/wincred v1.2.0 // indirect + github.com/deckarep/golang-set/v2 v2.6.0 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/godbus/dbus/v5 v5.1.0 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/hexops/gotextdiff v1.0.3 // indirect + github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect + github.com/jackc/pgx/v5 v5.6.0 // indirect + github.com/jackc/puddle/v2 v2.2.1 // indirect + github.com/jpillora/backoff v1.0.0 // indirect + github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect + github.com/lib/pq v1.10.9 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/multiformats/go-base36 v0.2.0 // indirect + github.com/puzpuzpuz/xsync/v3 v3.1.0 // indirect + github.com/swaggest/jsonschema-go v0.3.70 // indirect + github.com/swaggest/refl v1.3.0 // indirect + github.com/zalando/go-keyring v0.2.4 // indirect + go.opentelemetry.io/otel v1.27.0 // indirect + go.opentelemetry.io/otel/metric v1.27.0 // indirect + go.opentelemetry.io/otel/trace v1.27.0 // indirect + golang.org/x/crypto v0.24.0 // indirect + golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect + golang.org/x/mod v0.18.0 // indirect + golang.org/x/net v0.26.0 // indirect + golang.org/x/sync v0.7.0 // indirect + golang.org/x/sys v0.21.0 // indirect + golang.org/x/text v0.16.0 // indirect + google.golang.org/protobuf v1.34.1 // indirect +) + +replace github.com/TBD54566975/ftl => ./../../../../../.. diff --git a/go-runtime/ftl/ftltest/testdata/go/subscriber/go.sum b/go-runtime/ftl/ftltest/testdata/go/subscriber/go.sum new file mode 100644 index 0000000000..be1709c712 --- /dev/null +++ b/go-runtime/ftl/ftltest/testdata/go/subscriber/go.sum @@ -0,0 +1,190 @@ +connectrpc.com/connect v1.16.1 h1:rOdrK/RTI/7TVnn3JsVxt3n028MlTRwmK5Q4heSpjis= +connectrpc.com/connect v1.16.1/go.mod h1:XpZAduBQUySsb4/KO5JffORVkDI4B6/EYPi7N8xpNZw= +connectrpc.com/grpcreflect v1.2.0 h1:Q6og1S7HinmtbEuBvARLNwYmTbhEGRpHDhqrPNlmK+U= +connectrpc.com/grpcreflect v1.2.0/go.mod h1:nwSOKmE8nU5u/CidgHtPYk1PFI3U9ignz7iDMxOYkSY= +connectrpc.com/otelconnect v0.7.0 h1:ZH55ZZtcJOTKWWLy3qmL4Pam4RzRWBJFOqTPyAqCXkY= +connectrpc.com/otelconnect v0.7.0/go.mod h1:Bt2ivBymHZHqxvo4HkJ0EwHuUzQN6k2l0oH+mp/8nwc= +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= +github.com/BurntSushi/toml v1.4.0 h1:kuoIxZQy2WRRk1pttg9asf+WVv6tWQuBNVmK8+nqPr0= +github.com/BurntSushi/toml v1.4.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= +github.com/TBD54566975/scaffolder v1.0.0 h1:QUFSy2wVzumLDg7IHcKC6AP+IYyqWe9Wxiu72nZn5qU= +github.com/TBD54566975/scaffolder v1.0.0/go.mod h1:auVpczIbOAdIhYDVSruIw41DanxOKB9bSvjf6MEl7Fs= +github.com/alecthomas/assert/v2 v2.10.0 h1:jjRCHsj6hBJhkmhznrCzoNpbA3zqy0fYiUcYZP/GkPY= +github.com/alecthomas/assert/v2 v2.10.0/go.mod h1:Bze95FyfUr7x34QZrjL+XP+0qgp/zg8yS+TtBj1WA3k= +github.com/alecthomas/concurrency v0.0.2 h1:Q3kGPtLbleMbH9lHX5OBFvJygfyFw29bXZKBg+IEVuo= +github.com/alecthomas/concurrency v0.0.2/go.mod h1:GmuQb/iHX7mbNtPlC/WDzEFxDMB0HYFer2Qda9QTs7w= +github.com/alecthomas/participle/v2 v2.1.1 h1:hrjKESvSqGHzRb4yW1ciisFJ4p3MGYih6icjJvbsmV8= +github.com/alecthomas/participle/v2 v2.1.1/go.mod h1:Y1+hAs8DHPmc3YUFzqllV+eSQ9ljPTk0ZkPMtEdAx2c= +github.com/alecthomas/repr v0.4.0 h1:GhI2A8MACjfegCPVq9f1FLvIBS+DrQ2KQBFZP1iFzXc= +github.com/alecthomas/repr v0.4.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4= +github.com/alecthomas/types v0.16.0 h1:o9+JSwCRB6DDaWDeR/Mg7v/zh3R+MlknM6DrnDyY7U0= +github.com/alecthomas/types v0.16.0/go.mod h1:Tswm0qQpjpVq8rn70OquRsUtFxbQKub/8TMyYYGI0+k= +github.com/alessio/shellescape v1.4.2 h1:MHPfaU+ddJ0/bYWpgIeUnQUqKrlJ1S7BfEYPM4uEoM0= +github.com/alessio/shellescape v1.4.2/go.mod h1:PZAiSCk0LJaZkiCSkPv8qIobYglO3FPpyFjDCtHLS30= +github.com/amacneil/dbmate/v2 v2.16.0 h1:uIah9qFOxA9cIyS1TuPMUgn0KVnwRRSx0MhKu7sFpcI= +github.com/amacneil/dbmate/v2 v2.16.0/go.mod h1:TlxMqFDBxsLVertnF2cAxZxPyt9UjlCrt04iy1UxRXc= +github.com/aws/aws-sdk-go-v2 v1.27.0 h1:7bZWKoXhzI+mMR/HjdMx8ZCC5+6fY0lS5tr0bbgiLlo= +github.com/aws/aws-sdk-go-v2 v1.27.0/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM= +github.com/aws/aws-sdk-go-v2/config v1.27.16 h1:knpCuH7laFVGYTNd99Ns5t+8PuRjDn4HnnZK48csipM= +github.com/aws/aws-sdk-go-v2/config v1.27.16/go.mod h1:vutqgRhDUktwSge3hrC3nkuirzkJ4E/mLj5GvI0BQas= +github.com/aws/aws-sdk-go-v2/credentials v1.17.16 h1:7d2QxY83uYl0l58ceyiSpxg9bSbStqBC6BeEeHEchwo= +github.com/aws/aws-sdk-go-v2/credentials v1.17.16/go.mod h1:Ae6li/6Yc6eMzysRL2BXlPYvnrLLBg3D11/AmOjw50k= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.3 h1:dQLK4TjtnlRGb0czOht2CevZ5l6RSyRWAnKeGd7VAFE= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.3/go.mod h1:TL79f2P6+8Q7dTsILpiVST+AL9lkF6PPGI167Ny0Cjw= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.7 h1:lf/8VTF2cM+N4SLzaYJERKEWAXq8MOMpZfU6wEPWsPk= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.7/go.mod h1:4SjkU7QiqK2M9oozyMzfZ/23LmUY+h3oFqhdeP5OMiI= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.7 h1:4OYVp0705xu8yjdyoWix0r9wPIRXnIzzOoUpQVHIJ/g= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.7/go.mod h1:vd7ESTEvI76T2Na050gODNmNU7+OyKrIKroYTu4ABiI= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0/go.mod h1:8tu/lYfQfFe6IGnaOdrpVgEL2IrrDOf6/m9RQum4NkY= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 h1:Ji0DY1xUsUr3I8cHps0G+XM3WWU16lP6yG8qu1GAZAs= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2/go.mod h1:5CsjAbs3NlGQyZNFACh+zztPDI7fU6eW9QsxjfnuBKg= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.9 h1:Wx0rlZoEJR7JwlSZcHnEa7CNjrSIyVxMFWGAaXy4fJY= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.9/go.mod h1:aVMHdE0aHO3v+f/iw01fmXV/5DbfQ3Bi9nN7nd9bE9Y= +github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.29.1 h1:NSWsFzdHN41mJ5I/DOFzxgkKSYNHQADHn7Mu+lU/AKw= +github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.29.1/go.mod h1:5mMk0DgUgaHlcqtN65fNyZI0ZDX3i9Cw+nwq75HKB3U= +github.com/aws/aws-sdk-go-v2/service/sso v1.20.9 h1:aD7AGQhvPuAxlSUfo0CWU7s6FpkbyykMhGYMvlqTjVs= +github.com/aws/aws-sdk-go-v2/service/sso v1.20.9/go.mod h1:c1qtZUWtygI6ZdvKppzCSXsDOq5I4luJPZ0Ud3juFCA= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.24.3 h1:Pav5q3cA260Zqez42T9UhIlsd9QeypszRPwC9LdSSsQ= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.24.3/go.mod h1:9lmoVDVLz/yUZwLaQ676TK02fhCu4+PgRSmMaKR1ozk= +github.com/aws/aws-sdk-go-v2/service/sts v1.28.10 h1:69tpbPED7jKPyzMcrwSvhWcJ9bPnZsZs18NT40JwM0g= +github.com/aws/aws-sdk-go-v2/service/sts v1.28.10/go.mod h1:0Aqn1MnEuitqfsCNyKsdKLhDUOr4txD/g19EfiUqgws= +github.com/aws/smithy-go v1.20.2 h1:tbp628ireGtzcHDDmLT/6ADHidqnwgF57XOXZe6tp4Q= +github.com/aws/smithy-go v1.20.2/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= +github.com/bool64/dev v0.2.34 h1:P9n315P8LdpxusnYQ0X7MP1CZXwBK5ae5RZrd+GdSZE= +github.com/bool64/dev v0.2.34/go.mod h1:iJbh1y/HkunEPhgebWRNcs8wfGq7sjvJ6W5iabL8ACg= +github.com/bool64/shared v0.1.5 h1:fp3eUhBsrSjNCQPcSdQqZxxh9bBwrYiZ+zOKFkM0/2E= +github.com/bool64/shared v0.1.5/go.mod h1:081yz68YC9jeFB3+Bbmno2RFWvGKv1lPKkMP6MHJlPs= +github.com/danieljoos/wincred v1.2.0 h1:ozqKHaLK0W/ii4KVbbvluM91W2H3Sh0BncbUNPS7jLE= +github.com/danieljoos/wincred v1.2.0/go.mod h1:FzQLLMKBFdvu+osBrnFODiv32YGwCfx0SkRa/eYHgec= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/deckarep/golang-set/v2 v2.6.0 h1:XfcQbWM1LlMB8BsJ8N9vW5ehnnPVIw0je80NsVHagjM= +github.com/deckarep/golang-set/v2 v2.6.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= +github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= +github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk= +github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw= +github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= +github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= +github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg= +github.com/iancoleman/orderedmap v0.3.0 h1:5cbR2grmZR/DiVt+VJopEhtVs9YGInGIxAoMJn+Ichc= +github.com/iancoleman/orderedmap v0.3.0/go.mod h1:XuLcCUkdL5owUCQeF2Ue9uuw1EptkJDkXXS7VoV7XGE= +github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 h1:Dj0L5fhJ9F82ZJyVOmBx6msDp/kfd1t9GRfny/mfJA0= +github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY= +github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw= +github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= +github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= +github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= +github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= +github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= +github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= +github.com/multiformats/go-base36 v0.2.0 h1:lFsAbNOGeKtuKozrtBsAkSVhv1p9D0/qedU9rQyccr0= +github.com/multiformats/go-base36 v0.2.0/go.mod h1:qvnKE++v+2MWCfePClUEjE78Z7P2a1UV0xHgWc0hkp4= +github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4= +github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= +github.com/otiai10/copy v1.14.0 h1:dCI/t1iTdYGtkvCuBG2BgR6KZa83PTclw4U5n2wAllU= +github.com/otiai10/copy v1.14.0/go.mod h1:ECfuL02W+/FkTWZWgQqXPWZgW9oeKCSQ5qVfSc4qc4w= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/puzpuzpuz/xsync/v3 v3.1.0 h1:EewKT7/LNac5SLiEblJeUu8z5eERHrmRLnMQL2d7qX4= +github.com/puzpuzpuz/xsync/v3 v3.1.0/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 h1:lZUw3E0/J3roVtGQ+SCrUrg3ON6NgVqpn3+iol9aGu4= +github.com/santhosh-tekuri/jsonschema/v5 v5.3.1/go.mod h1:uToXkOrWAZ6/Oc07xWQrPOhJotwFIyu2bBVN41fcDUY= +github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8= +github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/swaggest/assertjson v1.9.0 h1:dKu0BfJkIxv/xe//mkCrK5yZbs79jL7OVf9Ija7o2xQ= +github.com/swaggest/assertjson v1.9.0/go.mod h1:b+ZKX2VRiUjxfUIal0HDN85W0nHPAYUbYH5WkkSsFsU= +github.com/swaggest/jsonschema-go v0.3.70 h1:8Vx5nm5t/6DBFw2+WC0/Vp1ZVe9/4mpuA0tuAe0wwCI= +github.com/swaggest/jsonschema-go v0.3.70/go.mod h1:7N43/CwdaWgPUDfYV70K7Qm79tRqe/al7gLSt9YeGIE= +github.com/swaggest/refl v1.3.0 h1:PEUWIku+ZznYfsoyheF97ypSduvMApYyGkYF3nabS0I= +github.com/swaggest/refl v1.3.0/go.mod h1:3Ujvbmh1pfSbDYjC6JGG7nMgPvpG0ehQL4iNonnLNbg= +github.com/yudai/gojsondiff v1.0.0 h1:27cbfqXLVEJ1o8I6v3y9lg8Ydm53EKqHXAOMxEGlCOA= +github.com/yudai/gojsondiff v1.0.0/go.mod h1:AY32+k2cwILAkW1fbgxQ5mUmMiZFgLIV+FBNExI05xg= +github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 h1:BHyfKlQyqbsFN5p3IfnEUduWvb9is428/nNb5L3U01M= +github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82/go.mod h1:lgjkn3NuSvDfVJdfcVVdX+jpBxNmX4rDAzaS45IcYoM= +github.com/zalando/go-keyring v0.2.4 h1:wi2xxTqdiwMKbM6TWwi+uJCG/Tum2UV0jqaQhCa9/68= +github.com/zalando/go-keyring v0.2.4/go.mod h1:HL4k+OXQfJUWaMnqyuSOc0drfGPX2b51Du6K+MRgZMk= +github.com/zenizh/go-capturer v0.0.0-20211219060012-52ea6c8fed04 h1:qXafrlZL1WsJW5OokjraLLRURHiw0OzKHD/RNdspp4w= +github.com/zenizh/go-capturer v0.0.0-20211219060012-52ea6c8fed04/go.mod h1:FiwNQxz6hGoNFBC4nIx+CxZhI3nne5RmIOlT/MXcSD4= +go.opentelemetry.io/otel v1.27.0 h1:9BZoF3yMK/O1AafMiQTVu0YDj5Ea4hPhxCs7sGva+cg= +go.opentelemetry.io/otel v1.27.0/go.mod h1:DMpAK8fzYRzs+bi3rS5REupisuqTheUlSZJ1WnZaPAQ= +go.opentelemetry.io/otel/metric v1.27.0 h1:hvj3vdEKyeCi4YaYfNjv2NUje8FqKqUY8IlF0FxV/ik= +go.opentelemetry.io/otel/metric v1.27.0/go.mod h1:mVFgmRlhljgBiuk/MP/oKylr4hs85GZAylncepAX/ak= +go.opentelemetry.io/otel/sdk v1.27.0 h1:mlk+/Y1gLPLn84U4tI8d3GNJmGT/eXe3ZuOXN9kTWmI= +go.opentelemetry.io/otel/sdk v1.27.0/go.mod h1:Ha9vbLwJE6W86YstIywK2xFfPjbWlCuwPtMkKdz/Y4A= +go.opentelemetry.io/otel/sdk/metric v1.27.0 h1:5uGNOlpXi+Hbo/DRoI31BSb1v+OGcpv2NemcCrOL8gI= +go.opentelemetry.io/otel/sdk/metric v1.27.0/go.mod h1:we7jJVrYN2kh3mVBlswtPU22K0SA+769l93J6bsyvqw= +go.opentelemetry.io/otel/trace v1.27.0 h1:IqYb813p7cmbHk0a5y6pD5JPakbVfftRXABGt5/Rscw= +go.opentelemetry.io/otel/trace v1.27.0/go.mod h1:6RiD1hkAprV4/q+yd2ln1HG9GoPx39SuvvstaLBl+l4= +golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI= +golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM= +golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 h1:vr/HnozRka3pE4EsMEg1lgkXJkTFJCVUX+S/ZT6wYzM= +golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc= +golang.org/x/mod v0.18.0 h1:5+9lSbEzPSdWkH32vYPBwEpX8KwDbM52Ud9xBUvNlb0= +golang.org/x/mod v0.18.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= +golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= +google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6 h1:5D53IMaUuA5InSeMu9eJtlQXS2NxAhyWQvkKEgXZhHI= +modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6/go.mod h1:Qz0X07sNOR1jWYCrJMEnbW/X55x206Q7Vt4mz6/wHp4= +modernc.org/libc v1.50.9 h1:hIWf1uz55lorXQhfoEoezdUHjxzuO6ceshET/yWjSjk= +modernc.org/libc v1.50.9/go.mod h1:15P6ublJ9FJR8YQCGy8DeQ2Uwur7iW9Hserr/T3OFZE= +modernc.org/mathutil v1.6.0 h1:fRe9+AmYlaej+64JsEEhoWuAYBkOtQiMEU7n/XgfYi4= +modernc.org/mathutil v1.6.0/go.mod h1:Ui5Q9q1TR2gFm0AQRqQUaBWFLAhQpCwNcuhBOSedWPo= +modernc.org/memory v1.8.0 h1:IqGTL6eFMaDZZhEWwcREgeMXYwmW83LYW8cROZYkg+E= +modernc.org/memory v1.8.0/go.mod h1:XPZ936zp5OMKGWPqbD3JShgd/ZoQ7899TUuQqxY+peU= +modernc.org/sqlite v1.30.0 h1:8YhPUs/HTnlEgErn/jSYQTwHN/ex8CjHHjg+K9iG7LM= +modernc.org/sqlite v1.30.0/go.mod h1:cgkTARJ9ugeXSNaLBPK3CqbOe7Ec7ZhWPoMFGldEYEw= +modernc.org/strutil v1.2.0 h1:agBi9dp1I+eOnxXeiZawM8F4LawKv4NzGWSaLfyeNZA= +modernc.org/strutil v1.2.0/go.mod h1:/mdcBmfOibveCTBxUl5B5l6W+TTH1FXPLHZE6bTosX0= +modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= +modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= diff --git a/go-runtime/ftl/ftltest/testdata/go/subscriber/subscriber.go b/go-runtime/ftl/ftltest/testdata/go/subscriber/subscriber.go new file mode 100644 index 0000000000..9a7c5fa22e --- /dev/null +++ b/go-runtime/ftl/ftltest/testdata/go/subscriber/subscriber.go @@ -0,0 +1,9 @@ +package subscriber + +import ( + "ftl/pubsub" + + "github.com/TBD54566975/ftl/go-runtime/ftl" // Import the FTL SDK. +) + +var _ = ftl.Subscription(pubsub.Topic, "subscription") diff --git a/go-runtime/ftl/ftltest/testdata/go/subscriber/subscriber_test.go b/go-runtime/ftl/ftltest/testdata/go/subscriber/subscriber_test.go new file mode 100644 index 0000000000..c03c24bfa0 --- /dev/null +++ b/go-runtime/ftl/ftltest/testdata/go/subscriber/subscriber_test.go @@ -0,0 +1,18 @@ +package subscriber + +import ( + "ftl/pubsub" + "testing" + + "github.com/TBD54566975/ftl/go-runtime/ftl/ftltest" + "github.com/alecthomas/assert/v2" +) + +func TestPublishToExternalModule(t *testing.T) { + ctx := ftltest.Context() + assert.NoError(t, pubsub.Topic.Publish(ctx, pubsub.Event{Value: "external"})) + assert.Equal(t, 1, len(ftltest.EventsForTopic(ctx, pubsub.Topic))) + + // Make sure we correctly made the right ref for the external module. + assert.Equal(t, "pubsub", pubsub.Topic.Ref.Module) +} diff --git a/go-runtime/ftl/pubsub.go b/go-runtime/ftl/pubsub.go index a1b59aad46..eab1f8e2a4 100644 --- a/go-runtime/ftl/pubsub.go +++ b/go-runtime/ftl/pubsub.go @@ -3,6 +3,8 @@ package ftl import ( "context" + "github.com/TBD54566975/ftl/backend/schema" + "github.com/TBD54566975/ftl/go-runtime/ftl/reflection" "github.com/TBD54566975/ftl/go-runtime/internal" ) @@ -10,25 +12,29 @@ import ( // // Topics publish events, and subscriptions can listen to them. func Topic[E any](name string) TopicHandle[E] { - return TopicHandle[E]{name: name} + return TopicHandle[E]{Ref: &schema.Ref{ + Name: name, + Module: reflection.Module(), + }} } type TopicHandle[E any] struct { - name string + Ref *schema.Ref } // Publish publishes an event to a topic func (t TopicHandle[E]) Publish(ctx context.Context, event E) error { - return internal.FromContext(ctx).PublishEvent(ctx, t.name, event) + return internal.FromContext(ctx).PublishEvent(ctx, t.Ref, event) +} + +type SubscriptionHandle[E any] struct { + Topic *schema.Ref + Name string } // Subscription declares a subscription to a topic // // Sinks can consume events from the subscription by including a "ftl:subscibe " directive func Subscription[E any](topic TopicHandle[E], name string) SubscriptionHandle[E] { - return SubscriptionHandle[E]{name: name} -} - -type SubscriptionHandle[E any] struct { - name string + return SubscriptionHandle[E]{Name: name, Topic: topic.Ref} } diff --git a/go-runtime/internal/api.go b/go-runtime/internal/api.go index fc75482001..edef7077cb 100644 --- a/go-runtime/internal/api.go +++ b/go-runtime/internal/api.go @@ -2,6 +2,8 @@ package internal import ( "context" + + "github.com/TBD54566975/ftl/backend/schema" ) // FTL is the interface that the FTL runtime provides to user code. @@ -16,7 +18,7 @@ type FTL interface { FSMSend(ctx context.Context, fsm, instance string, data any) error // PublishEvent sends an event to a pubsub topic. - PublishEvent(ctx context.Context, topic string, event any) error + PublishEvent(ctx context.Context, topic *schema.Ref, event any) error // CallMap calls Get on an instance of an ftl.Map. // diff --git a/go-runtime/internal/impl.go b/go-runtime/internal/impl.go index 6dffdb9cd6..48463df063 100644 --- a/go-runtime/internal/impl.go +++ b/go-runtime/internal/impl.go @@ -69,14 +69,17 @@ func (r *RealFTL) FSMSend(ctx context.Context, fsm, instance string, event any) return nil } -func (r *RealFTL) PublishEvent(ctx context.Context, topic string, event any) error { +func (r *RealFTL) PublishEvent(ctx context.Context, topic *schema.Ref, event any) error { + if topic.Module != reflection.Module() { + return fmt.Errorf("can not publish to another module's topic: %s", topic) + } client := rpc.ClientFromContext[ftlv1connect.VerbServiceClient](ctx) body, err := encoding.Marshal(event) if err != nil { return fmt.Errorf("failed to marshal event: %w", err) } _, err = client.PublishEvent(ctx, connect.NewRequest(&ftlv1.PublishEventRequest{ - Topic: &schemapb.Ref{Module: reflection.Module(), Name: topic}, + Topic: topic.ToProto().(*schemapb.Ref), //nolint: forcetypeassert Body: body, })) if err != nil {