Skip to content

Commit

Permalink
Merge branch 'main' into brad/console-verb-shortcut
Browse files Browse the repository at this point in the history
* main:
  fix: race with concurrent deployments (#3553)
  chore: normalise Go binary builds (#3552)
  fix: move redpanda console off common port (#3551)
  fix: remove new subscriptions usage in console (#3550)
  feat: replace subscriptions in schema with verb subscribe metadata (#3528)
  fix: reduce the graph chaos (#3548)
  • Loading branch information
bradleydwyer committed Nov 27, 2024
2 parents 9d93ffc + e6d5b01 commit 711ff86
Show file tree
Hide file tree
Showing 74 changed files with 2,190 additions and 2,813 deletions.
56 changes: 16 additions & 40 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,7 @@ build-without-frontend +tools: build-protos build-zips
for tool in $@; do
path="cmd/$tool"
test "$tool" = "ftl" && path="frontend/cli"
if [ "${FTL_DEBUG:-}" = "true" ]; then
go build -o "{{RELEASE}}/$tool" -tags release -gcflags=all="-N -l" -ldflags "-X github.com/TBD54566975/ftl.Version={{VERSION}} -X github.com/TBD54566975/ftl.Timestamp={{TIMESTAMP}}" "./$path"
else
mk "{{RELEASE}}/$tool" : !(build|integration|infrastructure|node_modules|Procfile*|Dockerfile*) -- go build -o "{{RELEASE}}/$tool" -tags release -ldflags "-X github.com/TBD54566975/ftl.Version={{VERSION}} -X github.com/TBD54566975/ftl.Timestamp={{TIMESTAMP}}" "./$path"
fi
just build-go-binary "./$path" "$tool"
done

# Build all backend binaries
Expand All @@ -102,47 +98,24 @@ build-jvm *args:
mvn -f jvm-runtime/ftl-runtime install {{args}}

# Builds all language plugins
build-language-plugins: build-go build-python build-java build-kotlin

# Build ftl-language-go
build-go: build-zips build-protos
#!/bin/bash
shopt -s extglob

if [ "${FTL_DEBUG:-}" = "true" ]; then
go build -o "{{RELEASE}}/ftl-language-go" -tags release -gcflags=all="-N -l" -ldflags "-X github.com/TBD54566975/ftl.Version={{VERSION}} -X github.com/TBD54566975/ftl.Timestamp={{TIMESTAMP}}" "./go-runtime/cmd/ftl-language-go"
else
mk "{{RELEASE}}/ftl-language-go" : !(build|integration) -- go build -o "{{RELEASE}}/ftl-language-go" -tags release -ldflags "-X github.com/TBD54566975/ftl.Version={{VERSION}} -X github.com/TBD54566975/ftl.Timestamp={{TIMESTAMP}}" "./go-runtime/cmd/ftl-language-go"
fi

# Build ftl-language-python
build-python: build-zips build-protos
#!/bin/bash
shopt -s extglob

if [ "${FTL_DEBUG:-}" = "true" ]; then
go build -o "{{RELEASE}}/ftl-language-python" -tags release -gcflags=all="-N -l" -ldflags "-X github.com/TBD54566975/ftl.Version={{VERSION}} -X github.com/TBD54566975/ftl.Timestamp={{TIMESTAMP}}" "./python-runtime/cmd/ftl-language-python"
else
mk "{{RELEASE}}/ftl-language-python" : !(build|integration) -- go build -o "{{RELEASE}}/ftl-language-python" -tags release -ldflags "-X github.com/TBD54566975/ftl.Version={{VERSION}} -X github.com/TBD54566975/ftl.Timestamp={{TIMESTAMP}}" "./python-runtime/cmd/ftl-language-python"
fi

build-kotlin *args: build-zips build-protos
build-language-plugins:
@just build-go-binary ./go-runtime/cmd/ftl-language-go
@just build-go-binary ./python-runtime/cmd/ftl-language-python
@just build-go-binary ./jvm-runtime/cmd/ftl-language-java
@just build-go-binary ./jvm-runtime/cmd/ftl-language-kotlin

# Build a Go binary with the correct flags and place it in the release dir
build-go-binary dir binary="": build-zips build-protos build-frontend
#!/bin/bash
set -euo pipefail
shopt -s extglob
if [ "${FTL_DEBUG:-}" = "true" ]; then
go build -o "{{RELEASE}}/ftl-language-kotlin" -tags release -gcflags=all="-N -l" -ldflags "-X github.com/TBD54566975/ftl.Version={{VERSION}} -X github.com/TBD54566975/ftl.Timestamp={{TIMESTAMP}}" "./go-runtime/cmd/ftl-language-kotlin"
else
mk "{{RELEASE}}/ftl-language-kotlin" : !(build|integration) -- go build -o "{{RELEASE}}/ftl-language-kotlin" -tags release -ldflags "-X github.com/TBD54566975/ftl.Version={{VERSION}} -X github.com/TBD54566975/ftl.Timestamp={{TIMESTAMP}}" "./jvm-runtime/cmd/ftl-language-kotlin"
fi

build-java *args: build-zips build-protos
#!/bin/bash
shopt -s extglob
binary="${2:-$(basename "$1")}"

if [ "${FTL_DEBUG:-}" = "true" ]; then
go build -o "{{RELEASE}}/ftl-language-java" -tags release -gcflags=all="-N -l" -ldflags "-X github.com/TBD54566975/ftl.Version={{VERSION}} -X github.com/TBD54566975/ftl.Timestamp={{TIMESTAMP}}" "./jvm-runtime/cmd/ftl-language-java"
go build -o "{{RELEASE}}/${binary}" -tags release -gcflags=all="-N -l" -ldflags "-X github.com/TBD54566975/ftl.Version={{VERSION}} -X github.com/TBD54566975/ftl.Timestamp={{TIMESTAMP}}" "$1"
else
mk "{{RELEASE}}/ftl-language-java" : !(build|integration) -- go build -o "{{RELEASE}}/ftl-language-java" -tags release -ldflags "-X github.com/TBD54566975/ftl.Version={{VERSION}} -X github.com/TBD54566975/ftl.Timestamp={{TIMESTAMP}}" "./jvm-runtime/cmd/ftl-language-java"
mk "{{RELEASE}}/${binary}" : !(build|integration|infrastructure|node_modules|Procfile*|Dockerfile*) -- go build -o "{{RELEASE}}/${binary}" -tags release -ldflags "-X github.com/TBD54566975/ftl.Version={{VERSION}} -X github.com/TBD54566975/ftl.Timestamp={{TIMESTAMP}}" "$1"
fi

export DATABASE_URL := "postgres://postgres:secret@localhost:15432/ftl?sslmode=disable"
Expand Down Expand Up @@ -303,9 +276,11 @@ debug *args:
dlv_pid=$!
wait "$dlv_pid"

# Bring up localstack
localstack:
docker compose up localstack -d --wait

# Bring down localstack
localstack-stop:
docker compose down localstack

Expand All @@ -321,5 +296,6 @@ build-docker name:
-t ftl0/ftl-{{name}}:latest \
-f Dockerfile.{{name}} .

# Run a Just command in the Helm charts directory
chart *args:
@cd charts && just {{args}}
31 changes: 9 additions & 22 deletions backend/controller/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (c *ConsoleService) GetModules(ctx context.Context, req *connect.Request[pb
case *schema.Config:
configs = append(configs, configFromDecl(decl, deployment.Module, nilMap))

case *schema.Database, *schema.Enum, *schema.TypeAlias, *schema.Topic, *schema.Subscription:
case *schema.Database, *schema.Enum, *schema.TypeAlias, *schema.Topic:
}
}

Expand Down Expand Up @@ -179,7 +179,6 @@ func moduleFromDecls(decls []schema.Decl, sch *schema.Schema, module string, ref
var topics []*pbconsole.Topic
var typealiases []*pbconsole.TypeAlias
var secrets []*pbconsole.Secret
var subscriptions []*pbconsole.Subscription
var verbs []*pbconsole.Verb

for _, d := range decls {
Expand All @@ -202,9 +201,6 @@ func moduleFromDecls(decls []schema.Decl, sch *schema.Schema, module string, ref
case *schema.Secret:
secrets = append(secrets, secretFromDecl(decl, module, refMap))

case *schema.Subscription:
subscriptions = append(subscriptions, subscriptionFromDecl(decl, module, refMap))

case *schema.TypeAlias:
typealiases = append(typealiases, typealiasFromDecl(decl, module, refMap))

Expand All @@ -218,15 +214,14 @@ func moduleFromDecls(decls []schema.Decl, sch *schema.Schema, module string, ref
}

return &pbconsole.Module{
Configs: configs,
Data: data,
Databases: databases,
Enums: enums,
Topics: topics,
Typealiases: typealiases,
Secrets: secrets,
Subscriptions: subscriptions,
Verbs: verbs,
Configs: configs,
Data: data,
Databases: databases,
Enums: enums,
Topics: topics,
Typealiases: typealiases,
Secrets: secrets,
Verbs: verbs,
}, nil
}

Expand Down Expand Up @@ -288,14 +283,6 @@ func secretFromDecl(decl *schema.Secret, module string, refMap map[schema.RefKey
}
}

func subscriptionFromDecl(decl *schema.Subscription, module string, refMap map[schema.RefKey]map[schema.RefKey]bool) *pbconsole.Subscription {
return &pbconsole.Subscription{
//nolint:forcetypeassert
Subscription: decl.ToProto().(*schemapb.Subscription),
References: getReferencesFromMap(refMap, module, decl.Name),
}
}

func verbFromDecl(decl *schema.Verb, sch *schema.Schema, module string, refMap map[schema.RefKey]map[schema.RefKey]bool) (*pbconsole.Verb, error) {
//nolint:forcetypeassert
v := decl.ToProto().(*schemapb.Verb)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ type Event struct {
// Used to test encryption of topic_events and async_calls tables

type Topic = ftl.TopicHandle[Event]
type Subscription = ftl.SubscriptionHandle[Topic, ConsumeClient, Event]

//ftl:verb
func Publish(ctx context.Context, e Event, topic Topic) error {
Expand All @@ -43,6 +42,7 @@ func Publish(ctx context.Context, e Event, topic Topic) error {
}

//ftl:verb
//ftl:subscribe topic from=beginning
func Consume(ctx context.Context, e Event) error {
fmt.Printf("Received event: %s\n", e.Name)
if e.Name != "AliceInWonderland" {
Expand Down
10 changes: 5 additions & 5 deletions backend/controller/pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestPubSub(t *testing.T) {
WHERE
state = 'success'
AND origin = '%s'
`, async.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "testTopicSubscription"}}.String()),
`, async.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "consume"}}.String()),
events),
)
}
Expand Down Expand Up @@ -109,7 +109,7 @@ func TestRetry(t *testing.T) {
AND verb = 'subscriber.consumeButFailAndRetry'
AND catching = false
AND origin = '%s'
`, async.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "doomedSubscription"}}.String()),
`, async.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "consumeButFailAndRetry"}}.String()),
1+retriesPerCall),

// check that there is one failed attempt to catch (we purposely fail the first one)
Expand All @@ -124,7 +124,7 @@ func TestRetry(t *testing.T) {
AND error LIKE '%%catching error%%'
AND catching = true
AND origin = '%s'
`, async.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "doomedSubscription"}}.String()),
`, async.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "consumeButFailAndRetry"}}.String()),
1),

// check that there is one successful attempt to catch (we succeed the second one as long as we receive the correct error in the request)
Expand All @@ -138,7 +138,7 @@ func TestRetry(t *testing.T) {
AND error IS NULL
AND catching = true
AND origin = '%s'
`, async.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "doomedSubscription"}}.String()),
`, async.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "consumeButFailAndRetry"}}.String()),
1),

// check that there was one successful attempt to catchAny
Expand All @@ -152,7 +152,7 @@ func TestRetry(t *testing.T) {
AND error IS NULL
AND catching = true
AND origin = '%s'
`, async.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "doomedSubscription2"}}.String()),
`, async.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "consumeButFailAndCatchAny"}}.String()),
1),
)
}
Expand Down
46 changes: 9 additions & 37 deletions backend/controller/pubsub/internal/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,28 +270,19 @@ func (d *DAL) ResetSubscription(ctx context.Context, module, name string) (err e
func (d *DAL) CreateSubscriptions(ctx context.Context, key model.DeploymentKey, module *schema.Module) error {
logger := log.FromContext(ctx)

for _, decl := range module.Decls {
s, ok := decl.(*schema.Subscription)
for verb := range slices.FilterVariants[*schema.Verb](module.Decls) {
subscriber, ok := slices.FindVariant[*schema.MetadataSubscriber](verb.Metadata)
if !ok {
continue
}
if !hasSubscribers(s, module.Decls) {
// Ignore subscriptions without subscribers
// This ensures that controllers don't endlessly try to progress subscriptions without subscribers
// https://github.com/TBD54566975/ftl/issues/1685
//
// It does mean that a subscription will reset to the topic's head if all subscribers are removed and then later re-added
logger.Debugf("Skipping upsert of subscription %s for %s due to lack of subscribers", s.Name, key)
continue
}
subscriptionKey := model.NewSubscriptionKey(module.Name, s.Name)
subscriptionKey := model.NewSubscriptionKey(module.Name, verb.Name)
result, err := d.db.UpsertSubscription(ctx, dalsql.UpsertSubscriptionParams{
Key: subscriptionKey,
Module: module.Name,
Deployment: key,
TopicModule: s.Topic.Module,
TopicName: s.Topic.Name,
Name: s.Name,
TopicModule: subscriber.Topic.Module,
TopicName: subscriber.Topic.Name,
Name: verb.Name,
})
if err != nil {
return fmt.Errorf("could not insert subscription: %w", libdal.TranslatePGError(err))
Expand All @@ -305,25 +296,6 @@ func (d *DAL) CreateSubscriptions(ctx context.Context, key model.DeploymentKey,
return nil
}

func hasSubscribers(subscription *schema.Subscription, decls []schema.Decl) bool {
for _, d := range decls {
verb, ok := d.(*schema.Verb)
if !ok {
continue
}
for _, md := range verb.Metadata {
subscriber, ok := md.(*schema.MetadataSubscriber)
if !ok {
continue
}
if subscriber.Name == subscription.Name {
return true
}
}
}
return false
}

func (d *DAL) CreateSubscribers(ctx context.Context, key model.DeploymentKey, module *schema.Module) error {
logger := log.FromContext(ctx)
for _, decl := range module.Decls {
Expand All @@ -332,7 +304,7 @@ func (d *DAL) CreateSubscribers(ctx context.Context, key model.DeploymentKey, mo
continue
}
for _, md := range v.Metadata {
s, ok := md.(*schema.MetadataSubscriber)
_, ok := md.(*schema.MetadataSubscriber)
if !ok {
continue
}
Expand All @@ -348,11 +320,11 @@ func (d *DAL) CreateSubscribers(ctx context.Context, key model.DeploymentKey, mo
return fmt.Errorf("could not parse retry parameters for %q: %w", v.Name, err)
}
}
subscriberKey := model.NewSubscriberKey(module.Name, s.Name, v.Name)
subscriberKey := model.NewSubscriberKey(module.Name, v.Name, v.Name)
err = d.db.InsertSubscriber(ctx, dalsql.InsertSubscriberParams{
Key: subscriberKey,
Module: module.Name,
SubscriptionName: s.Name,
SubscriptionName: v.Name,
Deployment: key,
Sink: sinkRef,
RetryAttempts: int32(retryParams.Count),
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/pubsub/testdata/go/slow/slow.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
)

type Topic = ftl.TopicHandle[Event]
type SlowSubscription = ftl.SubscriptionHandle[Topic, ConsumeClient, Event]

type Event struct {
Duration int
Expand All @@ -32,6 +31,7 @@ func Publish(ctx context.Context, req PublishRequest, topic Topic) error {
}

//ftl:verb
//ftl:subscribe topic from=beginning
func Consume(ctx context.Context, event Event) error {
for i := range event.Duration {
select {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,24 @@ import (
//"github.com/TBD54566975/ftl/go-runtime/ftl" // Import the FTL SDK.
)

type TestTopicSubscription = ftl.SubscriptionHandle[publisher.TestTopic, ConsumeClient, publisher.PubSubEvent]
type DoomedSubscription = ftl.SubscriptionHandle[publisher.Topic2, ConsumeButFailAndRetryClient, publisher.PubSubEvent]
type DoomedSubscription2 = ftl.SubscriptionHandle[publisher.Topic2, ConsumeButFailAndCatchAnyClient, publisher.PubSubEvent]

var catchCount atomic.Value[int]

//ftl:verb
//ftl:subscribe publisher.testTopic from=beginning
func Consume(ctx context.Context, req publisher.PubSubEvent) error {
ftl.LoggerFromContext(ctx).Infof("Subscriber is consuming %v", req.Time)
return nil
}

//ftl:verb
//ftl:subscribe publisher.topic2 from=beginning
//ftl:retry 2 1s 1s catch catch
func ConsumeButFailAndRetry(ctx context.Context, req publisher.PubSubEvent) error {
return fmt.Errorf("always error: event %v", req.Time)
}

//ftl:verb
//ftl:subscribe publisher.topic2 from=beginning
//ftl:retry 1 1s 1s catch catchAny
func ConsumeButFailAndCatchAny(ctx context.Context, req publisher.PubSubEvent) error {
return fmt.Errorf("always error: event %v", req.Time)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import io.quarkus.logging.Log;
import xyz.block.ftl.Export;
import xyz.block.ftl.FromOffset;
import xyz.block.ftl.Subscription;
import xyz.block.ftl.SubscriptionOptions;
import xyz.block.ftl.Topic;
import xyz.block.ftl.TopicDefinition;
import xyz.block.ftl.Verb;
Expand Down Expand Up @@ -49,7 +51,8 @@ void publishOneToTopic2(Topic2 topic2) throws Exception {
topic2.publish(new PubSubEvent().setTime(t));
}

@Subscription(topicClass = LocalTopic.class, name = "localSubscription")
@Subscription(topicClass = LocalTopic.class)
@SubscriptionOptions(from = FromOffset.LATEST)
public void local(TestTopic testTopic, PubSubEvent event) {
testTopic.publish(event);
}
Expand Down
Loading

0 comments on commit 711ff86

Please sign in to comment.