Skip to content

Commit

Permalink
Add and log message ID for all messages (#84)
Browse files Browse the repository at this point in the history
* Add and log message ID for all messages

To help with service bus message delivery diagnostics

re AB#9792
  • Loading branch information
j-hartley authored Aug 16, 2024
1 parent f281e36 commit 5d43373
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 11 deletions.
2 changes: 1 addition & 1 deletion azbus/azadmin.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (c *AZAdminClient) Open() (*azadmin.Client, error) {
return nil, fmt.Errorf("failed to create admin client: config must provide either an account name or a connection string")
}

c.log.Infof("Get new Admin client using ConnectionString")
c.log.Debugf("Get new Admin client using ConnectionString")
admin, err := azadmin.NewClientFromConnectionString(
c.ConnectionString,
nil,
Expand Down
8 changes: 4 additions & 4 deletions azbus/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (r *Receiver) processMessage(ctx context.Context, count int, maxDuration ti

// the context wont have a trace span on it yet, so stick with the reciever logger instance

r.log.Debugf("Processing message %d", count)
r.log.Debugf("Processing message %d id %s", count, msg.MessageID)
disp, ctx, err := r.handleReceivedMessageWithTracingContext(ctx, msg, handler)
r.dispose(ctx, disp, err, msg)

Expand All @@ -179,16 +179,16 @@ func (r *Receiver) processMessage(ctx context.Context, count int, maxDuration ti
log := r.log.FromContext(ctx)
defer log.Close()

log.Debugf("Processing message %d took %s", count, duration)
log.Debugf("Processing message %d id %s took %s", count, msg.MessageID, duration)

// This is safe because maxDuration is only defined if RenewMessageLock is false.
if !r.Cfg.RenewMessageLock && duration >= maxDuration {
log.Infof("WARNING: processing msg %d duration %v took more than %v seconds", count, duration, maxDuration)
log.Infof("WARNING: processing msg %d id %s duration %v took more than %v seconds", count, msg.MessageID, duration, maxDuration)
log.Infof("WARNING: please either enable SERVICEBUS_RENEW_LOCK or reduce SERVICEBUS_INCOMING_MESSAGES")
log.Infof("WARNING: both can be found in the helm chart for each service.")
}
if errors.Is(err, ErrPeekLockTimeout) {
log.Infof("WARNING: processing msg %d duration %s returned error: %v", count, duration, err)
log.Infof("WARNING: processing msg %d id %s duration %s returned error: %v", count, msg.MessageID, duration, err)
log.Infof("WARNING: please enable SERVICEBUS_RENEW_LOCK which can be found in the helm chart")
}
}
Expand Down
20 changes: 14 additions & 6 deletions azbus/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"github.com/google/uuid"
otlog "github.com/opentracing/opentracing-go/log"

"github.com/datatrails/go-datatrails-common/tracing"
Expand Down Expand Up @@ -111,9 +112,6 @@ func (s *Sender) Send(ctx context.Context, message *OutMessage) error {

span, ctx := tracing.StartSpanFromContext(ctx, "Sender.Send")
defer span.Finish()
span.LogFields(
otlog.String("sender", s.Cfg.TopicOrQueueName),
)

// Get the logging context after we create the span as that may have created a new
// trace and stashed the traceid in the metadata.
Expand All @@ -127,8 +125,18 @@ func (s *Sender) Send(ctx context.Context, message *OutMessage) error {
return err
}
}

// We set and log a message ID so we can trace the message through the bus
id := uuid.New().String()
message.MessageID = &id

span.LogFields(
otlog.String("sender", s.Cfg.TopicOrQueueName),
otlog.String("message id", id),
)

size := int64(len(message.Body))
log.Debugf("%s: Msg Sized %d limit %d", s, size, s.maxMessageSizeInBytes)
log.Debugf("%s: Msg id %s Sized %d limit %d", s, id, size, s.maxMessageSizeInBytes)
if size > s.maxMessageSizeInBytes {
log.Debugf("Msg Sized %d > limit %d :%v", size, s.maxMessageSizeInBytes, ErrMessageOversized)
return fmt.Errorf("%s: Msg Sized %d > limit %d :%w", s, size, s.maxMessageSizeInBytes, ErrMessageOversized)
Expand All @@ -139,10 +147,10 @@ func (s *Sender) Send(ctx context.Context, message *OutMessage) error {

err = s.sender.SendMessage(ctx, message, nil)
if err != nil {
azerr := fmt.Errorf("Send failed in %s: %w", time.Since(now), NewAzbusError(err))
azerr := fmt.Errorf("Send message id %s failed in %s: %w", id, time.Since(now), NewAzbusError(err))
log.Infof("%s", azerr)
return azerr
}
log.Debugf("Sending message took %s", time.Since(now))
log.Debugf("Sending message id %s took %s", id, time.Since(now))
return nil
}

0 comments on commit 5d43373

Please sign in to comment.