Skip to content

Commit

Permalink
fix: close consumer group (#3815)
Browse files Browse the repository at this point in the history
closes #3472
- Rebalance after old deployment is killed is faster
- Added integration test that checks to make sure slow consumers are not
affected by rebalances (the call will fail due to rebalance and then try
again after rebalance)
  • Loading branch information
matt2e authored Dec 18, 2024
1 parent 3951fa2 commit c24e6fe
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 16 deletions.
1 change: 1 addition & 0 deletions backend/runner/pubsub/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func (c *consumer) subscribe(ctx context.Context) {
for {
select {
case <-ctx.Done():
c.group.Close()
return
default:
}
Expand Down
85 changes: 85 additions & 0 deletions backend/runner/pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@
package pubsub

import (
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"

"connectrpc.com/connect"
"github.com/IBM/sarama"
"github.com/alecthomas/assert/v2"
"github.com/alecthomas/types/optional"
timelinepb "github.com/block/ftl/backend/protos/xyz/block/ftl/timeline/v1"
"github.com/block/ftl/common/slices"
"github.com/block/ftl/internal/exec"
in "github.com/block/ftl/internal/integration"
"github.com/block/ftl/internal/model"
)
Expand Down Expand Up @@ -84,6 +88,68 @@ func TestExternalPublishRuntimeCheck(t *testing.T) {
)
}

// TestConsumerGroupMembership tests that when a runner ends, the consumer group is properly exited.
func TestConsumerGroupMembership(t *testing.T) {
var deploymentKilledTime *time.Time
in.Run(t,
in.WithLanguages("go"),
in.WithPubSub(),
in.CopyModule("publisher"),
in.CopyModule("subscriber"),
in.Deploy("publisher"),
in.Deploy("subscriber"),

// consumer group must now have a member for each partition
checkGroupMembership("subscriber", "consumeSlow", 1),

// publish events that will take a long time to process on the first subscriber deployment
// to test that rebalancing doesnt cause consumption to fail and skip events
in.Repeat(100, in.Call("publisher", "publishSlow", in.Obj{}, func(t testing.TB, resp in.Obj) {})),

// Upgrade deployment
func(t testing.TB, ic in.TestContext) {
in.Infof("Modifying code")
path := filepath.Join(ic.WorkingDir(), "subscriber", "subscriber.go")

bytes, err := os.ReadFile(path)
assert.NoError(t, err)
output := strings.ReplaceAll(string(bytes), "This deployment is TheFirstDeployment", "This deployment is TheSecondDeployment")
assert.NoError(t, os.WriteFile(path, []byte(output), 0644))
},
in.Deploy("subscriber"),

// Currently old deployment runs for a little bit longer.
// During this time we expect the consumer group to have 2 members (old deployment and new deployment).
// This will probably change when we have proper draining of the old deployment.
checkGroupMembership("subscriber", "consumeSlow", 2),
func(t testing.TB, ic in.TestContext) {
in.Infof("Waiting for old deployment to be killed")
start := time.Now()
for {
assert.True(t, time.Since(start) < 15*time.Second)
ps, err := exec.Capture(ic.Context, ".", "ftl", "ps")
assert.NoError(t, err)
if strings.Count(string(ps), "dpl-subscriber-") == 1 {
// original deployment has ended
now := time.Now()
deploymentKilledTime = &now
return
}
}
},
// Once old deployment has ended, the consumer group should only have 1 member per partition (the new deployment)
// This should happen fairly quickly. If it takes a while it could be because the previous deployment did not close
// the group properly.
checkGroupMembership("subscriber", "consumeSlow", 1),
func(t testing.TB, ic in.TestContext) {
assert.True(t, time.Since(*deploymentKilledTime) < 3*time.Second, "make sure old deployment was removed from consumer group fast enough")
},

// confirm that each message was consumed successfully
checkConsumed("subscriber", "consumeSlow", true, 100, optional.None[string]()),
)
}

func publishToTestAndLocalTopics(calls int) in.Action {
// do this in parallel because we want to test race conditions
return func(t testing.TB, ic in.TestContext) {
Expand Down Expand Up @@ -161,3 +227,22 @@ func checkConsumed(module, verb string, success bool, count int, needle optional
}
}
}
func checkGroupMembership(module, subscription string, expectedCount int) in.Action {
return func(t testing.TB, ic in.TestContext) {
consumerGroup := module + "." + subscription
in.Infof("Checking group membership for %v", consumerGroup)

client, err := sarama.NewClient(in.RedPandaBrokers, sarama.NewConfig())
assert.NoError(t, err)
defer client.Close()

clusterAdmin, err := sarama.NewClusterAdminFromClient(client)
assert.NoError(t, err)
defer clusterAdmin.Close()

groups, err := clusterAdmin.DescribeConsumerGroups([]string{consumerGroup})
assert.NoError(t, err)
assert.Equal(t, len(groups), 1)
assert.Equal(t, len(groups[0].Members), expectedCount, "expected consumer group %v to have %v members", consumerGroup, expectedCount)
}
}
21 changes: 17 additions & 4 deletions backend/runner/pubsub/testdata/go/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ import (
// Import the FTL SDK.
)

type PubSubEvent struct {
Time time.Time
Haystack string
}

type PartitionMapper struct{}

var _ ftl.TopicPartitionMap[PubSubEvent] = PartitionMapper{}
Expand All @@ -21,10 +26,8 @@ type TestTopic = ftl.TopicHandle[PubSubEvent, PartitionMapper]

type LocalTopic = ftl.TopicHandle[PubSubEvent, PartitionMapper]

type PubSubEvent struct {
Time time.Time
Haystack string
}
//ftl:export
type SlowTopic = ftl.TopicHandle[PubSubEvent, PartitionMapper]

//ftl:verb
func PublishTen(ctx context.Context, topic TestTopic) error {
Expand Down Expand Up @@ -87,3 +90,13 @@ func Local(ctx context.Context, event PubSubEvent) error {
ftl.LoggerFromContext(ctx).Infof("Consume local: %v", event.Time)
return nil
}

//ftl:verb
func PublishSlow(ctx context.Context, topic SlowTopic) error {
logger := ftl.LoggerFromContext(ctx)
t := time.Now()
logger.Infof("Publishing to slowTopic: %v", t)
return topic.Publish(ctx, PubSubEvent{
Time: t,
})
}
26 changes: 16 additions & 10 deletions backend/runner/pubsub/testdata/go/publisher/types.ftl.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions backend/runner/pubsub/testdata/go/subscriber/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package subscriber
import (
"context"
"fmt"
"strings"
"time"

"ftl/publisher"
Expand Down Expand Up @@ -38,3 +39,16 @@ func PublishToExternalModule(ctx context.Context) error {
externalTopic := ftl.TopicHandle[publisher.PubSubEvent, ftl.SinglePartitionMap[publisher.PubSubEvent]]{Ref: reflection.Ref{Module: "publisher", Name: "testTopic"}.ToSchema()}
return externalTopic.Publish(ctx, publisher.PubSubEvent{Time: time.Now()})
}

//ftl:verb
//ftl:subscribe publisher.slowTopic from=beginning
func ConsumeSlow(ctx context.Context, req publisher.PubSubEvent) error {
versionDescription := "This deployment is TheFirstDeployment"
if strings.Contains(versionDescription, "TheFirstDeployment") {
ftl.LoggerFromContext(ctx).Infof("ConsumeSlow first deployment (will sleep 5s): %v", req.Time)
time.Sleep(5 * time.Second)
return nil
}
ftl.LoggerFromContext(ctx).Infof("ConsumeSlow second deployment (immediate): %v", req.Time)
return nil
}
5 changes: 5 additions & 0 deletions backend/runner/pubsub/testdata/go/subscriber/types.ftl.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions internal/integration/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import (

const dumpPath = "/tmp/ftl-kube-report"

var redPandaBrokers = []string{"127.0.0.1:19092"}
var RedPandaBrokers = []string{"127.0.0.1:19092"}

func (i TestContext) integrationTestTimeout() time.Duration {
timeout := optional.Zero(os.Getenv("FTL_INTEGRATION_TEST_TIMEOUT")).Default("5s")
Expand Down Expand Up @@ -415,7 +415,7 @@ func run(t *testing.T, actionsOrOptions ...ActionOrOption) {
err = exec.CommandWithEnv(ctx, log.Debug, rootDir, envars, "docker", "compose", "-f", "internal/dev/docker-compose.redpanda.yml", "-p", "ftl", "up", "-d", "--wait").RunBuffered(ctx)
assert.NoError(t, err)

client, err := sarama.NewClient(redPandaBrokers, sarama.NewConfig())
client, err := sarama.NewClient(RedPandaBrokers, sarama.NewConfig())
assert.NoError(t, err)
defer client.Close()

Expand Down

0 comments on commit c24e6fe

Please sign in to comment.