From 04ce16031183c9c79f25f90a36566414219373d7 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Fri, 20 Dec 2024 11:09:42 +1100 Subject: [PATCH] clean up logs again --- backend/runner/pubsub/consumer.go | 27 +++++++++++++-------------- backend/runner/pubsub/publisher.go | 8 ++++---- internal/log/logger.go | 10 ---------- 3 files changed, 17 insertions(+), 28 deletions(-) diff --git a/backend/runner/pubsub/consumer.go b/backend/runner/pubsub/consumer.go index b98b91280..b8f748529 100644 --- a/backend/runner/pubsub/consumer.go +++ b/backend/runner/pubsub/consumer.go @@ -98,8 +98,7 @@ func (c *consumer) kafkaTopicID() string { func (c *consumer) Begin(ctx context.Context) error { // set up config - logger := log.FromContext(ctx).AppendScope("sub:" + c.verb.Name) - logger.Debugf("Subscribing to %s", c.kafkaTopicID()) + log.FromContext(ctx).Debugf("Starting subscription for %v", c.verb.Name) go c.watchErrors(ctx) go c.subscribe(ctx) @@ -107,14 +106,14 @@ func (c *consumer) Begin(ctx context.Context) error { } func (c *consumer) watchErrors(ctx context.Context) { - logger := log.FromContext(ctx).AppendScope("sub:" + c.verb.Name) + logger := log.FromContext(ctx) for err := range channels.IterContext(ctx, c.group.Errors()) { - logger.Errorf(err, "Consumer group error") + logger.Errorf(err, "Consumer group error for %v", c.verb.Name) } } func (c *consumer) subscribe(ctx context.Context) { - logger := log.FromContext(ctx).AppendScope("sub:" + c.verb.Name) + logger := log.FromContext(ctx) // Iterate over consumer sessions. // // `Consume` should be called inside an infinite loop, when a server-side rebalance happens, @@ -129,19 +128,19 @@ func (c *consumer) subscribe(ctx context.Context) { err := c.group.Consume(ctx, []string{c.kafkaTopicID()}, c) if err != nil { - logger.Errorf(err, "Session failed for %s", c.verb.Name) + logger.Errorf(err, "Consumer group session failed for %s", c.verb.Name) } else { - logger.Debugf("Ending session") + logger.Debugf("Ending consumer group session for %s", c.verb.Name) } } } // Setup is run at the beginning of a new session, before ConsumeClaim. func (c *consumer) Setup(session sarama.ConsumerGroupSession) error { - logger := log.FromContext(session.Context()).AppendScope("sub:" + c.verb.Name) + logger := log.FromContext(session.Context()) partitions := session.Claims()[c.kafkaTopicID()] - logger.Debugf("Starting session with partitions [%v]", strings.Join(slices.Map(partitions, func(partition int32) string { return strconv.Itoa(int(partition)) }), ",")) + logger.Debugf("Starting session for %v with partitions [%v]", c.verb.Name, strings.Join(slices.Map(partitions, func(partition int32) string { return strconv.Itoa(int(partition)) }), ",")) return nil } @@ -156,14 +155,14 @@ func (c *consumer) Cleanup(session sarama.ConsumerGroupSession) error { // is closed, the Handler must finish its processing loop and exit. func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { ctx := session.Context() - logger := log.FromContext(ctx).AppendScope("sub:" + c.verb.Name) + logger := log.FromContext(ctx) for msg := range channels.IterContext(ctx, claim.Messages()) { if msg == nil { // Channel closed, rebalance or shutdown needed return nil } - logger.Debugf("Consuming message with partition %v and offset %v", msg.Partition, msg.Offset) + logger.Debugf("Consuming message from %v[%v:%v]", c.verb.Name, msg.Partition, msg.Offset) remainingRetries := c.retryParams.Count backoff := c.retryParams.MinBackoff for { @@ -175,18 +174,18 @@ func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram case <-ctx.Done(): // Do not commit the message if we did not succeed and the context is done. // No need to retry message either. - logger.Errorf(err, "Failed to consume message with partition %v and offset %v", msg.Partition, msg.Offset) + logger.Errorf(err, "Failed to consume message from %v[%v,%v]", c.verb.Name, msg.Partition, msg.Offset) return nil default: } if remainingRetries == 0 { - logger.Errorf(err, "Failed to consume message with partition %v and offset %v", msg.Partition, msg.Offset) + logger.Errorf(err, "Failed to consume message from %v[%v,%v]", c.verb.Name, msg.Partition, msg.Offset) if !c.publishToDeadLetterTopic(ctx, msg, err) { return nil } break } - logger.Errorf(err, "Failed to consume message with partition %v and offset %v and will retry in %vs", msg.Partition, msg.Offset, int(backoff.Seconds())) + logger.Errorf(err, "Failed to consume message from %v[%v,%v] and will retry in %vs", c.verb.Name, msg.Partition, msg.Offset, int(backoff.Seconds())) time.Sleep(backoff) remainingRetries-- backoff *= 2 diff --git a/backend/runner/pubsub/publisher.go b/backend/runner/pubsub/publisher.go index cbaf59493..d1e91c4f8 100644 --- a/backend/runner/pubsub/publisher.go +++ b/backend/runner/pubsub/publisher.go @@ -52,7 +52,7 @@ func newPublisher(module string, t *schema.Topic, deployment model.DeploymentKey } func (p *publisher) publish(ctx context.Context, data []byte, key string, caller schema.Ref) error { - logger := log.FromContext(ctx).AppendScope("topic:" + p.topic.Name) + logger := log.FromContext(ctx) requestKey, err := rpc.RequestKeyFromContext(ctx) if err != nil { return fmt.Errorf("failed to get request key: %w", err) @@ -78,12 +78,12 @@ func (p *publisher) publish(ctx context.Context, data []byte, key string, caller }) if err != nil { timelineEvent.Error = optional.Some(err.Error()) - logger.Errorf(err, "Failed to publish message") - return fmt.Errorf("failed to publish message: %w", err) + logger.Errorf(err, "Failed to publish message to %s", p.topic.Name) + return fmt.Errorf("failed to publish message to %s: %w", p.topic.Name, err) } timelineEvent.Partition = int(partition) timelineEvent.Offset = int(offset) p.timelineClient.Publish(ctx, timelineEvent) - logger.Debugf("Published to partition %v with offset %v", partition, offset) + logger.Debugf("Published to %v[%v:%v]", p.topic.Name, partition, offset) return nil } diff --git a/internal/log/logger.go b/internal/log/logger.go index 18429338b..1bae8896a 100644 --- a/internal/log/logger.go +++ b/internal/log/logger.go @@ -48,16 +48,6 @@ func (l Logger) Scope(scope string) *Logger { return l.Attrs(map[string]string{scopeKey: scope}) } -func (l Logger) AppendScope(scope string) *Logger { - s, ok := l.attributes[scopeKey] - if ok { - s = s + ":" + scope - } else { - s = scope - } - return l.Attrs(map[string]string{scopeKey: s}) -} - func (l Logger) Module(module string) *Logger { return l.Attrs(map[string]string{moduleKey: module}) }