Skip to content

Commit

Permalink
wip. fixing failed tests
Browse files Browse the repository at this point in the history
Signed-off-by: Amit Mor <[email protected]>

Signed-off-by: Amit Mor <[email protected]>
  • Loading branch information
amimimor committed Oct 12, 2023
1 parent a4e07e3 commit e50d952
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 13 deletions.
4 changes: 2 additions & 2 deletions pubsub/aws/snssqs/snssqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (s *snsSqs) getOrCreateTopic(ctx context.Context, topic string) (string, st
}

// creating queues is idempotent, the names serve as unique keys among a given region.
s.logger.Debugf("no SNS topic ARN found for topic: %s\ncreating SNS with (sanitized) topic: %s", topic, sanitizedTopic)
s.logger.Debugf("no SNS topic ARN found for topic: %s. creating SNS with (sanitized) topic: %s", topic, sanitizedTopic)

if !s.metadata.DisableEntityManagement {
topicArn, err = s.createTopic(ctx, sanitizedTopic)
Expand Down Expand Up @@ -364,7 +364,7 @@ func (s *snsSqs) getOrCreateQueue(ctx context.Context, queueName string) (*sqsQu
}

s.queues.Store(queueName, queueInfo)
s.logger.Debugf("Created SQS queue: %s: with arn: %s", queueName, queueInfo.arn)
s.logger.Debugf("created SQS queue: %s: with arn: %s", queueName, queueInfo.arn)

return queueInfo, nil
}
Expand Down
24 changes: 13 additions & 11 deletions pubsub/aws/snssqs/subscription_mgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,20 +87,21 @@ func (sm *SubscriptionManager) Init(queueInfo *sqsQueueInfo, dlqInfo *sqsQueueIn
})
}

// queueConsumerController is responsible for managing the subscription lifecycle.
// it is the only place where the topicsHandlers map is updated.
// it is running in a separate goroutine and is responsible for starting and stopping sqs consumption.
// its lifecycle is managed by the subscription manager and it has its own context with its child contexts used for sqs consumption and aborting of the consumption.
// queueConsumerController is responsible for managing the subscription lifecycle
// and the only place where the topicsHandlers map is updated.
// it is running in a separate goroutine and is responsible for starting and stopping sqs consumption
// where its lifecycle is managed by the subscription manager,
// and it has its own context with its child contexts used for sqs consumption and aborting of the consumption.
// it is also responsible for managing the lifecycle of the subscription handlers.
func (sm *SubscriptionManager) queueConsumerController(queueConsumerBck func(context.Context)) {
baseContext := context.Background()
func (sm *SubscriptionManager) queueConsumerController(queueConsumerCbk func(context.Context)) {
ctx := context.Background()

for {
select {
case changeEvent := <-sm.topicsChangeCh:
topic := changeEvent.handler.topic
sm.logger.Debugf("subscription change event received with action: %s, on topic: %s", changeEvent.action, topic)
// topology change events are serialized so that no interleaving can occur
// topic change events are serialized so that no interleaving can occur
sm.lock.Lock()
// although we have a lock here, the topicsHandlers map is thread safe and can be accessed concurrently so other subscribers that are already consuming messages
// can get the handler for the topic while we're still updating the map without blocking them
Expand All @@ -112,12 +113,12 @@ func (sm *SubscriptionManager) queueConsumerController(queueConsumerBck func(con

// if before we've added the subscription there were no subscriptions, this subscribe signals us to start consuming from sqs
if current == 0 {
var subctx context.Context
var subCtx context.Context
// create a new context for sqs consumption with a cancel func to be used when we unsubscribe from all topics
subctx, sm.consumeCancelFunc = context.WithCancel(baseContext)
subCtx, sm.consumeCancelFunc = context.WithCancel(ctx)
// start sqs consumption
sm.logger.Debug("starting sqs consumption for the first time")
go queueConsumerBck(subctx)
go queueConsumerCbk(subCtx)
}
} else if changeEvent.action == Unsubscribe {
sm.logger.Debug("unsubscribing from topic: ", topic)
Expand All @@ -132,7 +133,6 @@ func (sm *SubscriptionManager) queueConsumerController(queueConsumerBck func(con
}
sm.lock.Unlock()
case <-sm.closeCh:
sm.consumeCancelFunc()
return
}
}
Expand Down Expand Up @@ -164,7 +164,9 @@ func (sm *SubscriptionManager) createUnsubscribeListener(ctx context.Context, to
for {
select {
case <-ctx.Done():
return
case <-closeCh:
return
}
}
}
Expand Down

0 comments on commit e50d952

Please sign in to comment.