diff --git a/pubsub/aws/snssqs/snssqs.go b/pubsub/aws/snssqs/snssqs.go index ef1da90d83..37fdd7849b 100644 --- a/pubsub/aws/snssqs/snssqs.go +++ b/pubsub/aws/snssqs/snssqs.go @@ -42,7 +42,7 @@ import ( ) type snsSqs struct { - topicLocker *TopicsLockManager + topicsLocker TopicsLocker // key is the sanitized topic name topicArns map[string]string // key is the topic name, value holds the ARN of the queue and its url. @@ -169,7 +169,7 @@ func (s *snsSqs) Init(ctx context.Context, metadata pubsub.Metadata) error { } // subscription manager responsible for managing the lifecycle of subscriptions. s.subscriptionManager = NewSubscriptionMgmt(s.logger) - s.topicLocker = GetLockManager() + s.topicsLocker = NewLockManager() s.topicArns = make(map[string]string) s.queues = make(map[string]*sqsQueueInfo) @@ -337,12 +337,12 @@ func (s *snsSqs) getOrCreateQueue(ctx context.Context, queueName string) (*sqsQu ) if cachedQueueInfo, ok := s.queues[queueName]; ok { - s.logger.Debugf("Found queue arn for %s: %s", queueName, cachedQueueInfo.arn) + s.logger.Debugf("Found queue ARN for %s: %s", queueName, cachedQueueInfo.arn) return cachedQueueInfo, nil } // creating queues is idempotent, the names serve as unique keys among a given region. - s.logger.Debugf("No SQS queue arn found for %s\nCreating SQS queue", queueName) + s.logger.Debugf("No SQS queue ARN found for %s\nCreating SQS queue", queueName) sanitizedName := nameToAWSSanitizedName(queueName, s.metadata.Fifo) @@ -425,7 +425,7 @@ func (s *snsSqs) getOrCreateSnsSqsSubscription(ctx context.Context, queueArn, to return cachedSubscriptionArn, nil } - s.logger.Debugf("No subscription arn found of queue arn:%s to topic arn: %s\nCreating subscription", queueArn, topicArn) + s.logger.Debugf("No subscription ARN found of queue arn:%s to topic arn: %s\nCreating subscription", queueArn, topicArn) if !s.metadata.DisableEntityManagement { subscriptionArn, err = s.createSnsSqsSubscription(ctx, queueArn, topicArn) @@ -437,7 +437,7 @@ func (s *snsSqs) getOrCreateSnsSqsSubscription(ctx context.Context, queueArn, to } else { subscriptionArn, err = s.getSnsSqsSubscriptionArn(ctx, topicArn) if err != nil { - s.logger.Errorf("error fetching info for topic arn %s: %w", topicArn, err) + s.logger.Errorf("error fetching info for topic ARN %s: %w", topicArn, err) return "", err } @@ -758,8 +758,8 @@ func (s *snsSqs) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, han return errors.New("component is closed") } - s.topicLocker.Lock(req.Topic) - defer s.topicLocker.Unlock(req.Topic) + s.topicsLocker.Lock(req.Topic) + defer s.topicsLocker.Unlock(req.Topic) // subscribers declare a topic ARN and declare a SQS queue to use // these should be idempotent - queues should not be created if they exist. diff --git a/pubsub/aws/snssqs/subscription_mgmt.go b/pubsub/aws/snssqs/subscription_mgmt.go index c768cde122..843147e441 100644 --- a/pubsub/aws/snssqs/subscription_mgmt.go +++ b/pubsub/aws/snssqs/subscription_mgmt.go @@ -19,10 +19,7 @@ const ( Unsubscribe ) -var ( - subscriptionMgmtInst *SubscriptionManager - initOnce sync.Once -) +var initOnce sync.Once type SubscriptionTopicHandler struct { topic string @@ -50,7 +47,7 @@ type SubscriptionManagement interface { Init(queueInfo *sqsQueueInfo, dlqInfo *sqsQueueInfo, cbk func(context.Context, *sqsQueueInfo, *sqsQueueInfo)) Subscribe(topicHandler *SubscriptionTopicHandler) Close() - GetSubscriptionTopicHandler(string) (*SubscriptionTopicHandler, bool) + GetSubscriptionTopicHandler(topic string) (*SubscriptionTopicHandler, bool) } func NewSubscriptionMgmt(log logger.Logger) SubscriptionManagement { @@ -71,7 +68,6 @@ func createQueueConsumerCbk(queueInfo *sqsQueueInfo, dlqInfo *sqsQueueInfo, cbk func (sm *SubscriptionManager) Init(queueInfo *sqsQueueInfo, dlqInfo *sqsQueueInfo, cbk func(context.Context, *sqsQueueInfo, *sqsQueueInfo)) { initOnce.Do(func() { - sm.logger.Debug("Initializing subscription manager") queueConsumerCbk := createQueueConsumerCbk(queueInfo, dlqInfo, cbk) go sm.queueConsumerController(queueConsumerCbk) sm.logger.Debug("Subscription manager initialized") @@ -86,7 +82,6 @@ func (sm *SubscriptionManager) Init(queueInfo *sqsQueueInfo, dlqInfo *sqsQueueIn // it is also responsible for managing the lifecycle of the subscription handlers. func (sm *SubscriptionManager) queueConsumerController(queueConsumerCbk func(context.Context)) { ctx := context.Background() - sm.logger.Debugf("%+v", sm) for { select { @@ -98,6 +93,7 @@ func (sm *SubscriptionManager) queueConsumerController(queueConsumerCbk func(con // although we have a lock here, the topicsHandlers map is thread safe and can be accessed concurrently so other subscribers that are already consuming messages // can get the handler for the topic while we're still updating the map without blocking them current := sm.topicsHandlers.Size() + switch changeEvent.action { case Subscribe: sm.topicsHandlers.Store(topic, changeEvent.handler) @@ -107,7 +103,7 @@ func (sm *SubscriptionManager) queueConsumerController(queueConsumerCbk func(con // create a new context for sqs consumption with a cancel func to be used when we unsubscribe from all topics subCtx, sm.consumeCancelFunc = context.WithCancel(ctx) // start sqs consumption - sm.logger.Info("Starting sqs consumption") + sm.logger.Info("Starting SQS consumption") go queueConsumerCbk(subCtx) } case Unsubscribe: @@ -116,7 +112,7 @@ func (sm *SubscriptionManager) queueConsumerController(queueConsumerCbk func(con afterDelete := sm.topicsHandlers.Size() // if before we've removed this subscription we had one (last) subscription, this signals us to stop sqs consumption if current == 1 && afterDelete == 0 { - sm.logger.Info("Last subscription removed. no more handlers are mapped to topics. stopping sqs consumption") + sm.logger.Info("Last subscription removed. no more handlers are mapped to topics. stopping SQS consumption") sm.consumeCancelFunc() } } diff --git a/pubsub/aws/snssqs/topics_locker.go b/pubsub/aws/snssqs/topics_locker.go index aa8f7d3e68..bbb934c026 100644 --- a/pubsub/aws/snssqs/topics_locker.go +++ b/pubsub/aws/snssqs/topics_locker.go @@ -6,23 +6,19 @@ import ( "github.com/puzpuzpuz/xsync/v3" ) -var ( - lockManager *TopicsLockManager - lockManagerOnce sync.Once -) - // TopicsLockManager is a singleton for fine-grained locking, to prevent the component r/w operations // from locking the entire component out when performing operations on different topics. type TopicsLockManager struct { xLockMap *xsync.MapOf[string, *sync.Mutex] } -func GetLockManager() *TopicsLockManager { - lockManagerOnce.Do(func() { - lockManager = &TopicsLockManager{xLockMap: xsync.NewMapOf[string, *sync.Mutex]()} - }) +type TopicsLocker interface { + Lock(topic string) *sync.Mutex + Unlock(topic string) +} - return lockManager +func NewLockManager() *TopicsLockManager { + return &TopicsLockManager{xLockMap: xsync.NewMapOf[string, *sync.Mutex]()} } func (lm *TopicsLockManager) Lock(key string) *sync.Mutex { @@ -38,11 +34,11 @@ func (lm *TopicsLockManager) Lock(key string) *sync.Mutex { func (lm *TopicsLockManager) Unlock(key string) { lm.xLockMap.Compute(key, func(oldValue *sync.Mutex, exists bool) (newValue *sync.Mutex, delete bool) { - // if exists then the mutex must be locked, and we unlock it + // if exists then the mutex must be already locked, and we unlock it if exists { oldValue.Unlock() } - + // we return to comply with the Compute signature, but not using the returned values return oldValue, false }) }