Skip to content

Commit

Permalink
refactoring, interfaces, removal of singletons
Browse files Browse the repository at this point in the history
Signed-off-by: Amit Mor <[email protected]>
  • Loading branch information
amimimor committed Oct 26, 2023
1 parent 41bef96 commit 4d8b903
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 29 deletions.
16 changes: 8 additions & 8 deletions pubsub/aws/snssqs/snssqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import (
)

type snsSqs struct {
topicLocker *TopicsLockManager
topicsLocker TopicsLocker
// key is the sanitized topic name
topicArns map[string]string
// key is the topic name, value holds the ARN of the queue and its url.
Expand Down Expand Up @@ -169,7 +169,7 @@ func (s *snsSqs) Init(ctx context.Context, metadata pubsub.Metadata) error {
}
// subscription manager responsible for managing the lifecycle of subscriptions.
s.subscriptionManager = NewSubscriptionMgmt(s.logger)
s.topicLocker = GetLockManager()
s.topicsLocker = NewLockManager()

s.topicArns = make(map[string]string)
s.queues = make(map[string]*sqsQueueInfo)
Expand Down Expand Up @@ -337,12 +337,12 @@ func (s *snsSqs) getOrCreateQueue(ctx context.Context, queueName string) (*sqsQu
)

if cachedQueueInfo, ok := s.queues[queueName]; ok {
s.logger.Debugf("Found queue arn for %s: %s", queueName, cachedQueueInfo.arn)
s.logger.Debugf("Found queue ARN for %s: %s", queueName, cachedQueueInfo.arn)

return cachedQueueInfo, nil
}
// creating queues is idempotent, the names serve as unique keys among a given region.
s.logger.Debugf("No SQS queue arn found for %s\nCreating SQS queue", queueName)
s.logger.Debugf("No SQS queue ARN found for %s\nCreating SQS queue", queueName)

sanitizedName := nameToAWSSanitizedName(queueName, s.metadata.Fifo)

Expand Down Expand Up @@ -425,7 +425,7 @@ func (s *snsSqs) getOrCreateSnsSqsSubscription(ctx context.Context, queueArn, to
return cachedSubscriptionArn, nil
}

s.logger.Debugf("No subscription arn found of queue arn:%s to topic arn: %s\nCreating subscription", queueArn, topicArn)
s.logger.Debugf("No subscription ARN found of queue arn:%s to topic arn: %s\nCreating subscription", queueArn, topicArn)

if !s.metadata.DisableEntityManagement {
subscriptionArn, err = s.createSnsSqsSubscription(ctx, queueArn, topicArn)
Expand All @@ -437,7 +437,7 @@ func (s *snsSqs) getOrCreateSnsSqsSubscription(ctx context.Context, queueArn, to
} else {
subscriptionArn, err = s.getSnsSqsSubscriptionArn(ctx, topicArn)
if err != nil {
s.logger.Errorf("error fetching info for topic arn %s: %w", topicArn, err)
s.logger.Errorf("error fetching info for topic ARN %s: %w", topicArn, err)

return "", err
}
Expand Down Expand Up @@ -758,8 +758,8 @@ func (s *snsSqs) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, han
return errors.New("component is closed")
}

s.topicLocker.Lock(req.Topic)
defer s.topicLocker.Unlock(req.Topic)
s.topicsLocker.Lock(req.Topic)
defer s.topicsLocker.Unlock(req.Topic)

// subscribers declare a topic ARN and declare a SQS queue to use
// these should be idempotent - queues should not be created if they exist.
Expand Down
14 changes: 5 additions & 9 deletions pubsub/aws/snssqs/subscription_mgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@ const (
Unsubscribe
)

var (
subscriptionMgmtInst *SubscriptionManager
initOnce sync.Once
)
var initOnce sync.Once

type SubscriptionTopicHandler struct {
topic string
Expand Down Expand Up @@ -50,7 +47,7 @@ type SubscriptionManagement interface {
Init(queueInfo *sqsQueueInfo, dlqInfo *sqsQueueInfo, cbk func(context.Context, *sqsQueueInfo, *sqsQueueInfo))
Subscribe(topicHandler *SubscriptionTopicHandler)
Close()
GetSubscriptionTopicHandler(string) (*SubscriptionTopicHandler, bool)
GetSubscriptionTopicHandler(topic string) (*SubscriptionTopicHandler, bool)
}

func NewSubscriptionMgmt(log logger.Logger) SubscriptionManagement {
Expand All @@ -71,7 +68,6 @@ func createQueueConsumerCbk(queueInfo *sqsQueueInfo, dlqInfo *sqsQueueInfo, cbk

func (sm *SubscriptionManager) Init(queueInfo *sqsQueueInfo, dlqInfo *sqsQueueInfo, cbk func(context.Context, *sqsQueueInfo, *sqsQueueInfo)) {
initOnce.Do(func() {
sm.logger.Debug("Initializing subscription manager")
queueConsumerCbk := createQueueConsumerCbk(queueInfo, dlqInfo, cbk)
go sm.queueConsumerController(queueConsumerCbk)
sm.logger.Debug("Subscription manager initialized")
Expand All @@ -86,7 +82,6 @@ func (sm *SubscriptionManager) Init(queueInfo *sqsQueueInfo, dlqInfo *sqsQueueIn
// it is also responsible for managing the lifecycle of the subscription handlers.
func (sm *SubscriptionManager) queueConsumerController(queueConsumerCbk func(context.Context)) {
ctx := context.Background()
sm.logger.Debugf("%+v", sm)

for {
select {
Expand All @@ -98,6 +93,7 @@ func (sm *SubscriptionManager) queueConsumerController(queueConsumerCbk func(con
// although we have a lock here, the topicsHandlers map is thread safe and can be accessed concurrently so other subscribers that are already consuming messages
// can get the handler for the topic while we're still updating the map without blocking them
current := sm.topicsHandlers.Size()

switch changeEvent.action {
case Subscribe:
sm.topicsHandlers.Store(topic, changeEvent.handler)
Expand All @@ -107,7 +103,7 @@ func (sm *SubscriptionManager) queueConsumerController(queueConsumerCbk func(con
// create a new context for sqs consumption with a cancel func to be used when we unsubscribe from all topics
subCtx, sm.consumeCancelFunc = context.WithCancel(ctx)
// start sqs consumption
sm.logger.Info("Starting sqs consumption")
sm.logger.Info("Starting SQS consumption")
go queueConsumerCbk(subCtx)
}
case Unsubscribe:
Expand All @@ -116,7 +112,7 @@ func (sm *SubscriptionManager) queueConsumerController(queueConsumerCbk func(con
afterDelete := sm.topicsHandlers.Size()
// if before we've removed this subscription we had one (last) subscription, this signals us to stop sqs consumption
if current == 1 && afterDelete == 0 {
sm.logger.Info("Last subscription removed. no more handlers are mapped to topics. stopping sqs consumption")
sm.logger.Info("Last subscription removed. no more handlers are mapped to topics. stopping SQS consumption")
sm.consumeCancelFunc()
}
}
Expand Down
20 changes: 8 additions & 12 deletions pubsub/aws/snssqs/topics_locker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,19 @@ import (
"github.com/puzpuzpuz/xsync/v3"
)

var (
lockManager *TopicsLockManager
lockManagerOnce sync.Once
)

// TopicsLockManager is a singleton for fine-grained locking, to prevent the component r/w operations
// from locking the entire component out when performing operations on different topics.
type TopicsLockManager struct {
xLockMap *xsync.MapOf[string, *sync.Mutex]
}

func GetLockManager() *TopicsLockManager {
lockManagerOnce.Do(func() {
lockManager = &TopicsLockManager{xLockMap: xsync.NewMapOf[string, *sync.Mutex]()}
})
type TopicsLocker interface {
Lock(topic string) *sync.Mutex
Unlock(topic string)
}

return lockManager
func NewLockManager() *TopicsLockManager {
return &TopicsLockManager{xLockMap: xsync.NewMapOf[string, *sync.Mutex]()}
}

func (lm *TopicsLockManager) Lock(key string) *sync.Mutex {
Expand All @@ -38,11 +34,11 @@ func (lm *TopicsLockManager) Lock(key string) *sync.Mutex {

func (lm *TopicsLockManager) Unlock(key string) {
lm.xLockMap.Compute(key, func(oldValue *sync.Mutex, exists bool) (newValue *sync.Mutex, delete bool) {
// if exists then the mutex must be locked, and we unlock it
// if exists then the mutex must be already locked, and we unlock it
if exists {
oldValue.Unlock()
}

// we return to comply with the Compute signature, but not using the returned values
return oldValue, false
})
}

0 comments on commit 4d8b903

Please sign in to comment.