Skip to content

Commit

Permalink
use message sequence number as unique key
Browse files Browse the repository at this point in the history
Signed-off-by: Joni Collinge <[email protected]>
  • Loading branch information
jjcollinge committed Jul 20, 2022
1 parent 91ce3d3 commit 517b6e0
Showing 1 changed file with 27 additions and 9 deletions.
36 changes: 27 additions & 9 deletions internal/component/azure/servicebus/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package servicebus

import (
"context"
"fmt"
"sync"
"time"

Expand All @@ -32,7 +33,7 @@ type HandlerFunc func(ctx context.Context, msg *azservicebus.ReceivedMessage) er
type Subscription struct {
entity string
mu sync.RWMutex
activeMessages map[string]*azservicebus.ReceivedMessage
activeMessages map[int64]*azservicebus.ReceivedMessage
activeMessagesChan chan struct{}
receiver *azservicebus.Receiver
timeout time.Duration
Expand All @@ -57,7 +58,7 @@ func NewSubscription(
ctx, cancel := context.WithCancel(parentCtx)
s := &Subscription{
entity: entity,
activeMessages: make(map[string]*azservicebus.ReceivedMessage),
activeMessages: make(map[int64]*azservicebus.ReceivedMessage),
activeMessagesChan: make(chan struct{}, maxActiveMessages),
timeout: time.Duration(timeoutInSec) * time.Second,
logger: logger,
Expand Down Expand Up @@ -175,7 +176,11 @@ func (s *Subscription) ReceiveAndBlock(handler HandlerFunc, lockRenewalInSec int
s.logger.Debugf("Received message: %s; current active message usage: %d/%d", msg.MessageID, len(s.activeMessagesChan), cap(s.activeMessagesChan))
// s.logger.Debugf("Message body: %s", string(msg.Body))

s.addActiveMessage(msg)
if err = s.addActiveMessage(msg); err != nil {
// If we cannot add the message then it is invalid and we cannot process it.
s.logger.Errorf("Error adding message: %s", err.Error())
s.AbandonMessage(ctx, msg)
}

s.logger.Debugf("Processing received message: %s", msg.MessageID)
s.handleAsync(s.ctx, msg, handler)
Expand All @@ -197,6 +202,15 @@ func (s *Subscription) handleAsync(ctx context.Context, msg *azservicebus.Receiv
go func() {
var consumeToken bool
var err error
var messageKey int64

// We check for the existence of the sequence number again just in case
// it has been unset by some other goroutine and we cannot process the message.
if msg.SequenceNumber == nil {
s.logger.Errorf("Message has no sequence number: %s", msg.MessageID)
s.AbandonMessage(ctx, msg)
}
messageKey = *msg.SequenceNumber

// If handleChan is non-nil, we have a limit on how many handler we can process
limitConcurrentHandlers := cap(s.handleChan) > 0
Expand All @@ -217,7 +231,7 @@ func (s *Subscription) handleAsync(ctx context.Context, msg *azservicebus.Receiv
}

// Remove the message from the map of active ones
s.removeActiveMessage(msg.MessageID)
s.removeActiveMessage(messageKey)

// Remove an entry from activeMessageChan to allow processing more messages
<-s.activeMessagesChan
Expand Down Expand Up @@ -315,16 +329,20 @@ func (s *Subscription) CompleteMessage(ctx context.Context, m *azservicebus.Rece
}
}

func (s *Subscription) addActiveMessage(m *azservicebus.ReceivedMessage) {
func (s *Subscription) addActiveMessage(m *azservicebus.ReceivedMessage) error {
s.logger.Debugf("Adding message %s to active messages on %s", m.MessageID, s.entity)
if m.SequenceNumber == nil {
return fmt.Errorf("message sequence number is nil")
}
s.mu.Lock()
s.activeMessages[m.MessageID] = m
s.activeMessages[*m.SequenceNumber] = m
s.mu.Unlock()
return nil
}

func (s *Subscription) removeActiveMessage(messageID string) {
s.logger.Debugf("Removing message %s from active messages on %s", messageID, s.entity)
func (s *Subscription) removeActiveMessage(messageKey int64) {
s.logger.Debugf("Removing message %s from active messages on %s", messageKey, s.entity)
s.mu.Lock()
delete(s.activeMessages, messageID)
delete(s.activeMessages, messageKey)
s.mu.Unlock()
}

0 comments on commit 517b6e0

Please sign in to comment.