Skip to content

Commit

Permalink
comments and debug msgs
Browse files Browse the repository at this point in the history
Signed-off-by: Amit Mor <[email protected]>
Signed-off-by: Amit Mor <[email protected]>
  • Loading branch information
amimimor committed Oct 12, 2023
1 parent e50d952 commit ada263b
Showing 1 changed file with 10 additions and 8 deletions.
18 changes: 10 additions & 8 deletions pubsub/aws/snssqs/subscription_mgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (sm *SubscriptionManager) queueConsumerController(queueConsumerCbk func(con
select {
case changeEvent := <-sm.topicsChangeCh:
topic := changeEvent.handler.topic
sm.logger.Debugf("subscription change event received with action: %s, on topic: %s", changeEvent.action, topic)
sm.logger.Debugf("subscription change event received with action: %v, on topic: %s", changeEvent.action, topic)
// topic change events are serialized so that no interleaving can occur
sm.lock.Lock()
// although we have a lock here, the topicsHandlers map is thread safe and can be accessed concurrently so other subscribers that are already consuming messages
Expand All @@ -109,19 +109,17 @@ func (sm *SubscriptionManager) queueConsumerController(queueConsumerCbk func(con

if changeEvent.action == Subscribe {
sm.topicsHandlers.Store(topic, changeEvent.handler)
sm.logger.Debug("subscribing to topic: ", topic)

// if before we've added the subscription there were no subscriptions, this subscribe signals us to start consuming from sqs
if current == 0 {
var subCtx context.Context
// create a new context for sqs consumption with a cancel func to be used when we unsubscribe from all topics
subCtx, sm.consumeCancelFunc = context.WithCancel(ctx)
// start sqs consumption
sm.logger.Debug("starting sqs consumption for the first time")
sm.logger.Info("starting sqs consumption")
go queueConsumerCbk(subCtx)
}
} else if changeEvent.action == Unsubscribe {
sm.logger.Debug("unsubscribing from topic: ", topic)
sm.topicsHandlers.Delete(topic)
// for idempotency, we check the size of the map after the delete operation, as we might have already deleted the subscription
afterDelete := sm.topicsHandlers.Size()
Expand All @@ -139,7 +137,8 @@ func (sm *SubscriptionManager) queueConsumerController(queueConsumerCbk func(con
}

func (sm *SubscriptionManager) Subscribe(topicHandler *SubscriptionTopicHandler) {
sm.logger.Debug("subscribing to topic", topicHandler.topic)
sm.logger.Debug("subscribing to topic: ", topicHandler.topic)

sm.wg.Add(1)
go func() {
defer sm.wg.Done()
Expand All @@ -148,11 +147,14 @@ func (sm *SubscriptionManager) Subscribe(topicHandler *SubscriptionTopicHandler)
}

func (sm *SubscriptionManager) createSubscribeListener(topicHandler *SubscriptionTopicHandler) {
topic := topicHandler.topic
sm.logger.Debug("creating subscribe listener for topic ", topic)
sm.logger.Debug("creating a subscribe listener for topic: ", topicHandler.topic)

sm.topicsChangeCh <- changeSubscriptionTopicHandler{Subscribe, topicHandler}
closeCh := make(chan struct{})
go sm.createUnsubscribeListener(topicHandler.ctx, topic, closeCh)
// the unsubscriber is expected to be terminated by the dapr runtime as it cancels the context upon unsubscribe
go sm.createUnsubscribeListener(topicHandler.ctx, topicHandler.topic, closeCh)
// if the SubscriptinoManager is being closed and somehow the dapr runtime did not call unsubscribe, we close the control
// channel here to terminate the unsubscriber and return
defer close(closeCh)
<-sm.closeCh
}
Expand Down

0 comments on commit ada263b

Please sign in to comment.