Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: handle pubsub rebalance and closure #3811

Merged
merged 1 commit into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 68 additions & 49 deletions backend/runner/pubsub/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type consumer struct {
verb *schema.Verb
subscriber *schema.MetadataSubscriber
retryParams schema.RetryParams
group sarama.ConsumerGroup
cancel context.CancelFunc

verbClient VerbClient
timelineClient *timeline.Client
Expand All @@ -40,11 +42,28 @@ func newConsumer(moduleName string, verb *schema.Verb, subscriber *schema.Metada
return nil, fmt.Errorf("subscription %s has no Kafka brokers", verb.Name)
}

config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Consumer.Offsets.AutoCommit.Enable = true
switch subscriber.FromOffset {
case schema.FromOffsetBeginning, schema.FromOffsetUnspecified:
config.Consumer.Offsets.Initial = sarama.OffsetOldest
case schema.FromOffsetLatest:
config.Consumer.Offsets.Initial = sarama.OffsetNewest
}

groupID := kafkaConsumerGroupID(moduleName, verb)
group, err := sarama.NewConsumerGroup(verb.Runtime.Subscription.KafkaBrokers, groupID, config)
if err != nil {
return nil, fmt.Errorf("failed to create consumer group for subscription %s: %w", verb.Name, err)
}

c := &consumer{
moduleName: moduleName,
deployment: deployment,
verb: verb,
subscriber: subscriber,
group: group,

verbClient: verbClient,
timelineClient: timelineClient,
Expand Down Expand Up @@ -73,51 +92,33 @@ func (c *consumer) kafkaTopicID() string {

func (c *consumer) Begin(ctx context.Context) error {
// set up config
logger := log.FromContext(ctx).Scope("subscription:" + c.verb.Name)
logger := log.FromContext(ctx).AppendScope("sub:" + c.verb.Name)
ctx = log.ContextWithLogger(ctx, logger)

config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Consumer.Offsets.AutoCommit.Enable = true
logger.Debugf("Subscribing to %s", c.kafkaTopicID())

var fromOffsetStr string
switch c.subscriber.FromOffset {
case schema.FromOffsetBeginning, schema.FromOffsetUnspecified:
config.Consumer.Offsets.Initial = sarama.OffsetOldest
fromOffsetStr = "beginning"
case schema.FromOffsetLatest:
config.Consumer.Offsets.Initial = sarama.OffsetNewest
fromOffsetStr = "latest"
}

groupID := kafkaConsumerGroupID(c.moduleName, c.verb)
logger.Debugf("Subscribing to %s from %s", c.kafkaTopicID(), fromOffsetStr)

group, err := sarama.NewConsumerGroup(c.verb.Runtime.Subscription.KafkaBrokers, groupID, config)
if err != nil {
return fmt.Errorf("failed to create consumer group for subscription %s: %w", c.verb.Name, err)
}
ctx, cancel := context.WithCancel(ctx)
c.cancel = cancel

go c.watchErrors(ctx, group)
go c.subscribe(ctx, group)
go c.watchErrors(ctx)
go c.subscribe(ctx)
return nil
}

func (c *consumer) watchErrors(ctx context.Context, group sarama.ConsumerGroup) {
func (c *consumer) watchErrors(ctx context.Context) {
logger := log.FromContext(ctx)
for {
select {
case <-ctx.Done():
return
case err := <-group.Errors():
case err := <-c.group.Errors():
logger.Errorf(err, "Consumer group error")
}
}
}

func (c *consumer) subscribe(ctx context.Context, group sarama.ConsumerGroup) {
func (c *consumer) subscribe(ctx context.Context) {
logger := log.FromContext(ctx)
defer group.Close()
// Iterate over consumer sessions.
//
// `Consume` should be called inside an infinite loop, when a server-side rebalance happens,
Expand All @@ -129,7 +130,7 @@ func (c *consumer) subscribe(ctx context.Context, group sarama.ConsumerGroup) {
default:
}

err := group.Consume(ctx, []string{c.kafkaTopicID()}, c)
err := c.group.Consume(ctx, []string{c.kafkaTopicID()}, c)
if err != nil {
logger.Errorf(err, "Session failed for %s", c.verb.Name)
} else {
Expand Down Expand Up @@ -159,31 +160,49 @@ func (c *consumer) Cleanup(session sarama.ConsumerGroupSession) error {
func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
ctx := session.Context()
logger := log.FromContext(ctx)
for msg := range claim.Messages() {
logger.Debugf("Consuming message with partition %v and offset %v)", msg.Partition, msg.Offset)
remainingRetries := c.retryParams.Count
backoff := c.retryParams.MinBackoff
for {
err := c.call(ctx, msg.Value, int(msg.Partition), int(msg.Offset))
if err == nil {
logger.Errorf(err, "Error consuming message with partition %v and offset %v", msg.Partition, msg.Offset)
break
}
if remainingRetries == 0 {
logger.Errorf(err, "Failed to consume message with partition %v and offset %v", msg.Partition, msg.Offset)
break

for {
select {
case <-ctx.Done():
// Rebalance or shutdown needed
return nil

case msg := <-claim.Messages():
if msg == nil {
// Channel closed, rebalance or shutdown needed
return nil
}
logger.Errorf(err, "Failed to consume message with partition %v and offset %v and will retry in %vs", msg.Partition, msg.Offset, int(backoff.Seconds()))
time.Sleep(backoff)
remainingRetries--
backoff *= 2
if backoff > c.retryParams.MaxBackoff {
backoff = c.retryParams.MaxBackoff
logger.Debugf("Consuming message with partition %v and offset %v", msg.Partition, msg.Offset)
remainingRetries := c.retryParams.Count
backoff := c.retryParams.MinBackoff
for {
err := c.call(ctx, msg.Value, int(msg.Partition), int(msg.Offset))
if err == nil {
break
}
select {
case <-ctx.Done():
// Do not commit the message if we did not succeed and the context is done.
// No need to retry message either.
logger.Errorf(err, "Failed to consume message with partition %v and offset %v", msg.Partition, msg.Offset)
return nil
default:
}
if remainingRetries == 0 {
logger.Errorf(err, "Failed to consume message with partition %v and offset %v", msg.Partition, msg.Offset)
break
}
logger.Errorf(err, "Failed to consume message with partition %v and offset %v and will retry in %vs", msg.Partition, msg.Offset, int(backoff.Seconds()))
time.Sleep(backoff)
remainingRetries--
backoff *= 2
if backoff > c.retryParams.MaxBackoff {
backoff = c.retryParams.MaxBackoff
}
}
session.MarkMessage(msg, "")
}
session.MarkMessage(msg, "")
}
return nil
}

func (c *consumer) call(ctx context.Context, body []byte, partition, offset int) error {
Expand Down
2 changes: 1 addition & 1 deletion backend/runner/pubsub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func newPublisher(module string, t *schema.Topic, deployment model.DeploymentKey
}

func (p *publisher) publish(ctx context.Context, data []byte, key string, caller schema.Ref) error {
logger := log.FromContext(ctx).Scope("topic:" + p.topic.Name)
logger := log.FromContext(ctx).AppendScope("topic:" + p.topic.Name)
requestKey, err := rpc.RequestKeyFromContext(ctx)
if err != nil {
return fmt.Errorf("failed to get request key: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion backend/runner/pubsub/testdata/go/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func PublishOneToTopic2(ctx context.Context, req PublishOneToTopic2Request, topi
}

//ftl:verb
//ftl:subscribe testTopic from=latest
//ftl:subscribe localTopic from=latest
func Local(ctx context.Context, event PubSubEvent) error {
ftl.LoggerFromContext(ctx).Infof("Consume local: %v", event.Time)
return nil
Expand Down
11 changes: 11 additions & 0 deletions internal/log/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@ func New(level Level, sink Sink) *Logger {
func (l Logger) Scope(scope string) *Logger {
return l.Attrs(map[string]string{scopeKey: scope})
}

func (l Logger) AppendScope(scope string) *Logger {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting scope in pubsub was losing runner scope which made pubsub logs harder to follow

s, ok := l.attributes[scopeKey]
if ok {
s = s + ":" + scope
} else {
s = scope
}
return l.Attrs(map[string]string{scopeKey: s})
}

func (l Logger) Module(module string) *Logger {
return l.Attrs(map[string]string{moduleKey: module})
}
Expand Down
Loading