From ef8430693535ea051f5f925e16050c030248b767 Mon Sep 17 00:00:00 2001 From: John Hartley <41830007+j-hartley@users.noreply.github.com> Date: Fri, 16 Aug 2024 11:37:30 +0100 Subject: [PATCH 1/2] Add and log message ID for all messages To help with service bus message delivery diagnostics re AB#9792 --- azbus/azadmin.go | 2 +- azbus/receiver.go | 8 ++++---- azbus/sender.go | 20 ++++++++++++++------ 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/azbus/azadmin.go b/azbus/azadmin.go index d7a5522..6d1f696 100644 --- a/azbus/azadmin.go +++ b/azbus/azadmin.go @@ -45,7 +45,7 @@ func (c *AZAdminClient) Open() (*azadmin.Client, error) { return nil, fmt.Errorf("failed to create admin client: config must provide either an account name or a connection string") } - c.log.Infof("Get new Admin client using ConnectionString") + c.log.Debugf("Get new Admin client using ConnectionString") admin, err := azadmin.NewClientFromConnectionString( c.ConnectionString, nil, diff --git a/azbus/receiver.go b/azbus/receiver.go index efd3224..4679784 100644 --- a/azbus/receiver.go +++ b/azbus/receiver.go @@ -169,7 +169,7 @@ func (r *Receiver) processMessage(ctx context.Context, count int, maxDuration ti // the context wont have a trace span on it yet, so stick with the reciever logger instance - r.log.Debugf("Processing message %d", count) + r.log.Debugf("Processing message %d id %s", count, msg.MessageID) disp, ctx, err := r.handleReceivedMessageWithTracingContext(ctx, msg, handler) r.dispose(ctx, disp, err, msg) @@ -179,16 +179,16 @@ func (r *Receiver) processMessage(ctx context.Context, count int, maxDuration ti log := r.log.FromContext(ctx) defer log.Close() - log.Debugf("Processing message %d took %s", count, duration) + log.Debugf("Processing message %d id %s took %s", count, msg.MessageID, duration) // This is safe because maxDuration is only defined if RenewMessageLock is false. if !r.Cfg.RenewMessageLock && duration >= maxDuration { - log.Infof("WARNING: processing msg %d duration %v took more than %v seconds", count, duration, maxDuration) + log.Infof("WARNING: processing msg %d id %s duration %v took more than %v seconds", count, msg.MessageID, duration, maxDuration) log.Infof("WARNING: please either enable SERVICEBUS_RENEW_LOCK or reduce SERVICEBUS_INCOMING_MESSAGES") log.Infof("WARNING: both can be found in the helm chart for each service.") } if errors.Is(err, ErrPeekLockTimeout) { - log.Infof("WARNING: processing msg %d duration %s returned error: %v", count, duration, err) + log.Infof("WARNING: processing msg %d id %s duration %s returned error: %v", count, msg.MessageID, duration, err) log.Infof("WARNING: please enable SERVICEBUS_RENEW_LOCK which can be found in the helm chart") } } diff --git a/azbus/sender.go b/azbus/sender.go index 95d512d..8eb73c1 100644 --- a/azbus/sender.go +++ b/azbus/sender.go @@ -7,6 +7,7 @@ import ( "time" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" + "github.com/google/uuid" otlog "github.com/opentracing/opentracing-go/log" "github.com/datatrails/go-datatrails-common/tracing" @@ -111,9 +112,6 @@ func (s *Sender) Send(ctx context.Context, message *OutMessage) error { span, ctx := tracing.StartSpanFromContext(ctx, "Sender.Send") defer span.Finish() - span.LogFields( - otlog.String("sender", s.Cfg.TopicOrQueueName), - ) // Get the logging context after we create the span as that may have created a new // trace and stashed the traceid in the metadata. @@ -127,8 +125,18 @@ func (s *Sender) Send(ctx context.Context, message *OutMessage) error { return err } } + + // We set and log a message ID so we can trace the message through the bus + id := uuid.New().String() + message.MessageID = &id + + span.LogFields( + otlog.String("sender", s.Cfg.TopicOrQueueName), + otlog.String("message id", id), + ) + size := int64(len(message.Body)) - log.Debugf("%s: Msg Sized %d limit %d", s, size, s.maxMessageSizeInBytes) + log.Debugf("%s: Msg id %s Sized %d limit %d", s, id, size, s.maxMessageSizeInBytes) if size > s.maxMessageSizeInBytes { log.Debugf("Msg Sized %d > limit %d :%v", size, s.maxMessageSizeInBytes, ErrMessageOversized) return fmt.Errorf("%s: Msg Sized %d > limit %d :%w", s, size, s.maxMessageSizeInBytes, ErrMessageOversized) @@ -139,10 +147,10 @@ func (s *Sender) Send(ctx context.Context, message *OutMessage) error { err = s.sender.SendMessage(ctx, message, nil) if err != nil { - azerr := fmt.Errorf("Send failed in %s: %w", time.Since(now), NewAzbusError(err)) + azerr := fmt.Errorf("Send message %s failed in %s: %w", id, time.Since(now), NewAzbusError(err)) log.Infof("%s", azerr) return azerr } - log.Debugf("Sending message took %s", time.Since(now)) + log.Debugf("Sending message id %s took %s", id, time.Since(now)) return nil } From 109c0d937a6955926ce36caf62bbc133c7bc3bf8 Mon Sep 17 00:00:00 2001 From: John Hartley <41830007+j-hartley@users.noreply.github.com> Date: Fri, 16 Aug 2024 12:08:03 +0100 Subject: [PATCH 2/2] fixup! Add and log message ID for all messages --- azbus/sender.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/azbus/sender.go b/azbus/sender.go index 8eb73c1..cbaea5d 100644 --- a/azbus/sender.go +++ b/azbus/sender.go @@ -147,7 +147,7 @@ func (s *Sender) Send(ctx context.Context, message *OutMessage) error { err = s.sender.SendMessage(ctx, message, nil) if err != nil { - azerr := fmt.Errorf("Send message %s failed in %s: %w", id, time.Since(now), NewAzbusError(err)) + azerr := fmt.Errorf("Send message id %s failed in %s: %w", id, time.Since(now), NewAzbusError(err)) log.Infof("%s", azerr) return azerr }