Skip to content

Commit

Permalink
clean up logs again
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Dec 20, 2024
1 parent f513914 commit 04ce160
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 28 deletions.
27 changes: 13 additions & 14 deletions backend/runner/pubsub/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,23 +98,22 @@ func (c *consumer) kafkaTopicID() string {

func (c *consumer) Begin(ctx context.Context) error {
// set up config
logger := log.FromContext(ctx).AppendScope("sub:" + c.verb.Name)
logger.Debugf("Subscribing to %s", c.kafkaTopicID())
log.FromContext(ctx).Debugf("Starting subscription for %v", c.verb.Name)

go c.watchErrors(ctx)
go c.subscribe(ctx)
return nil
}

func (c *consumer) watchErrors(ctx context.Context) {
logger := log.FromContext(ctx).AppendScope("sub:" + c.verb.Name)
logger := log.FromContext(ctx)
for err := range channels.IterContext(ctx, c.group.Errors()) {
logger.Errorf(err, "Consumer group error")
logger.Errorf(err, "Consumer group error for %v", c.verb.Name)
}
}

func (c *consumer) subscribe(ctx context.Context) {
logger := log.FromContext(ctx).AppendScope("sub:" + c.verb.Name)
logger := log.FromContext(ctx)
// Iterate over consumer sessions.
//
// `Consume` should be called inside an infinite loop, when a server-side rebalance happens,
Expand All @@ -129,19 +128,19 @@ func (c *consumer) subscribe(ctx context.Context) {

err := c.group.Consume(ctx, []string{c.kafkaTopicID()}, c)
if err != nil {
logger.Errorf(err, "Session failed for %s", c.verb.Name)
logger.Errorf(err, "Consumer group session failed for %s", c.verb.Name)
} else {
logger.Debugf("Ending session")
logger.Debugf("Ending consumer group session for %s", c.verb.Name)
}
}
}

// Setup is run at the beginning of a new session, before ConsumeClaim.
func (c *consumer) Setup(session sarama.ConsumerGroupSession) error {
logger := log.FromContext(session.Context()).AppendScope("sub:" + c.verb.Name)
logger := log.FromContext(session.Context())

partitions := session.Claims()[c.kafkaTopicID()]
logger.Debugf("Starting session with partitions [%v]", strings.Join(slices.Map(partitions, func(partition int32) string { return strconv.Itoa(int(partition)) }), ","))
logger.Debugf("Starting session for %v with partitions [%v]", c.verb.Name, strings.Join(slices.Map(partitions, func(partition int32) string { return strconv.Itoa(int(partition)) }), ","))

return nil
}
Expand All @@ -156,14 +155,14 @@ func (c *consumer) Cleanup(session sarama.ConsumerGroupSession) error {
// is closed, the Handler must finish its processing loop and exit.
func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
ctx := session.Context()
logger := log.FromContext(ctx).AppendScope("sub:" + c.verb.Name)
logger := log.FromContext(ctx)

for msg := range channels.IterContext(ctx, claim.Messages()) {
if msg == nil {
// Channel closed, rebalance or shutdown needed
return nil
}
logger.Debugf("Consuming message with partition %v and offset %v", msg.Partition, msg.Offset)
logger.Debugf("Consuming message from %v[%v:%v]", c.verb.Name, msg.Partition, msg.Offset)
remainingRetries := c.retryParams.Count
backoff := c.retryParams.MinBackoff
for {
Expand All @@ -175,18 +174,18 @@ func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
case <-ctx.Done():
// Do not commit the message if we did not succeed and the context is done.
// No need to retry message either.
logger.Errorf(err, "Failed to consume message with partition %v and offset %v", msg.Partition, msg.Offset)
logger.Errorf(err, "Failed to consume message from %v[%v,%v]", c.verb.Name, msg.Partition, msg.Offset)
return nil
default:
}
if remainingRetries == 0 {
logger.Errorf(err, "Failed to consume message with partition %v and offset %v", msg.Partition, msg.Offset)
logger.Errorf(err, "Failed to consume message from %v[%v,%v]", c.verb.Name, msg.Partition, msg.Offset)
if !c.publishToDeadLetterTopic(ctx, msg, err) {
return nil
}
break
}
logger.Errorf(err, "Failed to consume message with partition %v and offset %v and will retry in %vs", msg.Partition, msg.Offset, int(backoff.Seconds()))
logger.Errorf(err, "Failed to consume message from %v[%v,%v] and will retry in %vs", c.verb.Name, msg.Partition, msg.Offset, int(backoff.Seconds()))
time.Sleep(backoff)
remainingRetries--
backoff *= 2
Expand Down
8 changes: 4 additions & 4 deletions backend/runner/pubsub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func newPublisher(module string, t *schema.Topic, deployment model.DeploymentKey
}

func (p *publisher) publish(ctx context.Context, data []byte, key string, caller schema.Ref) error {
logger := log.FromContext(ctx).AppendScope("topic:" + p.topic.Name)
logger := log.FromContext(ctx)
requestKey, err := rpc.RequestKeyFromContext(ctx)
if err != nil {
return fmt.Errorf("failed to get request key: %w", err)
Expand All @@ -78,12 +78,12 @@ func (p *publisher) publish(ctx context.Context, data []byte, key string, caller
})
if err != nil {
timelineEvent.Error = optional.Some(err.Error())
logger.Errorf(err, "Failed to publish message")
return fmt.Errorf("failed to publish message: %w", err)
logger.Errorf(err, "Failed to publish message to %s", p.topic.Name)
return fmt.Errorf("failed to publish message to %s: %w", p.topic.Name, err)
}
timelineEvent.Partition = int(partition)
timelineEvent.Offset = int(offset)
p.timelineClient.Publish(ctx, timelineEvent)
logger.Debugf("Published to partition %v with offset %v", partition, offset)
logger.Debugf("Published to %v[%v:%v]", p.topic.Name, partition, offset)
return nil
}
10 changes: 0 additions & 10 deletions internal/log/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,6 @@ func (l Logger) Scope(scope string) *Logger {
return l.Attrs(map[string]string{scopeKey: scope})
}

func (l Logger) AppendScope(scope string) *Logger {
s, ok := l.attributes[scopeKey]
if ok {
s = s + ":" + scope
} else {
s = scope
}
return l.Attrs(map[string]string{scopeKey: s})
}

func (l Logger) Module(module string) *Logger {
return l.Attrs(map[string]string{moduleKey: module})
}
Expand Down

0 comments on commit 04ce160

Please sign in to comment.