From 015f416a232695873650b7095d699140ed819996 Mon Sep 17 00:00:00 2001 From: Brennan Stehling Date: Fri, 8 Jul 2022 15:32:56 -0700 Subject: [PATCH] replaces semaphore with queue in AWSMQTTSession --- AWSIoT/Internal/MQTTSDK/AWSMQTTSession.m | 80 +++++++++++++----------- 1 file changed, 43 insertions(+), 37 deletions(-) diff --git a/AWSIoT/Internal/MQTTSDK/AWSMQTTSession.m b/AWSIoT/Internal/MQTTSDK/AWSMQTTSession.m index fccec15e9ce..7a1f455ab2c 100644 --- a/AWSIoT/Internal/MQTTSDK/AWSMQTTSession.m +++ b/AWSIoT/Internal/MQTTSDK/AWSMQTTSession.m @@ -63,7 +63,7 @@ - (UInt16)nextMsgId; @property (strong,atomic) NSMutableArray* queue; //Queue to temporarily hold messages if encoder is busy sending another message @property (strong,atomic) NSMutableArray* timerRing; // circular array of 60. Each element is a set that contains the messages that need to be retried. -@property (strong,nonatomic) dispatch_semaphore_t drainSenderQueueSemaphore; +@property (nonatomic, strong) dispatch_queue_t drainSenderSerialQueue; @end @@ -96,7 +96,7 @@ - (id)initWithClientId:(NSString*)theClientId if (self = [super init]) { clientId = theClientId; - _drainSenderQueueSemaphore = dispatch_semaphore_create(1); + _drainSenderSerialQueue = dispatch_queue_create("com.amazon.aws.iot.drain-sender-queue", DISPATCH_QUEUE_SERIAL); keepAliveInterval = theKeepAliveInterval; connectMessage = msg; _publishRetryThrottle = publishRetryThrottle; @@ -345,22 +345,13 @@ - (void)encoder:(AWSMQTTEncoder*)sender handleEvent:(AWSMQTTEncoderEvent) eventC break; case AWSMQTTSessionStatusConnecting: break; - case AWSMQTTSessionStatusConnected: - AWSDDLogVerbose(@"%s [Line %d], Thread:%@ waiting on drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]); - dispatch_semaphore_wait(self.drainSenderQueueSemaphore, DISPATCH_TIME_FOREVER); - AWSDDLogVerbose(@"%s [Line %d], Thread:%@ passed drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]); - - if ([self.queue count] > 0) { - AWSDDLogDebug(@"Sending message from session queue" ); - AWSMQTTMessage *msg = [self.queue objectAtIndex:0]; - [self.queue removeObjectAtIndex:0]; - [encoder encodeMessage:msg]; - } - - AWSDDLogVerbose(@"%s [Line %d], Thread:%@ signaling on drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]); - dispatch_semaphore_signal(self.drainSenderQueueSemaphore); - AWSDDLogVerbose(@"%s [Line %d], Thread:%@ finshed draining messages", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]); + case AWSMQTTSessionStatusConnected: { + dispatch_assert_queue_not(self.drainSenderSerialQueue); + dispatch_sync(self.drainSenderSerialQueue, ^{ + [self queueNextMessage]; + }); break; + } case AWSMQTTSessionStatusError: break; } @@ -684,17 +675,10 @@ - (void)send:(AWSMQTTMessage*)msg { [encoder encodeMessage:msg]; } else { - AWSDDLogVerbose(@"%s [Line %d], Thread:%@ waiting on drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]); - dispatch_semaphore_wait(self.drainSenderQueueSemaphore, DISPATCH_TIME_FOREVER); - AWSDDLogVerbose(@"%s [Line %d], Thread:%@ passed drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]); - - AWSDDLogDebug(@"<<%@>>: MQTTSession.send added msg to queue to send later", [NSThread currentThread]); - [self.queue addObject:msg]; - - - AWSDDLogVerbose(@"%s [Line %d], Thread:%@ signaling drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]); - dispatch_semaphore_signal(self.drainSenderQueueSemaphore); - AWSDDLogVerbose(@"%s [Line %d], Thread:%@ finished draining messages", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]); + dispatch_assert_queue_not(self.drainSenderSerialQueue); + dispatch_sync(self.drainSenderSerialQueue, ^{ + [self queueMessage:msg]; + }); } } @@ -711,23 +695,45 @@ - (BOOL)isReadyToPublish { return encoder && [encoder status] == AWSMQTTEncoderStatusReady; } --(void) drainSenderQueue { - AWSDDLogVerbose(@"%s [Line %d], Thread:%@ waiting on drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]); - dispatch_semaphore_wait(self.drainSenderQueueSemaphore, DISPATCH_TIME_FOREVER); - AWSDDLogVerbose(@"%s [Line %d], Thread:%@ passed drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]); +- (void)drainSenderQueue { + dispatch_assert_queue_not(self.drainSenderSerialQueue); + dispatch_sync(self.drainSenderSerialQueue, ^{ + [self drainAllMessages]; + }); +} + +# pragma mark - private/serial functions - + +- (void)queueNextMessage { + dispatch_assert_queue(self.drainSenderSerialQueue); + + if ([self.queue count] > 0) { + AWSDDLogDebug(@"Sending message from session queue"); + AWSMQTTMessage *msg = [self.queue objectAtIndex:0]; + [self.queue removeObjectAtIndex:0]; + [encoder encodeMessage:msg]; + } +} + +- (void)queueMessage:(AWSMQTTMessage*)msg { + dispatch_assert_queue(self.drainSenderSerialQueue); + + [self.queue addObject:msg]; +} + +- (void)drainAllMessages { + dispatch_assert_queue(self.drainSenderSerialQueue); int count = 0; - while ([self.queue count] > 0 && count < _publishRetryThrottle && [self isReadyToPublish]) { + while (self.queue.count > 0 && count < _publishRetryThrottle && self.isReadyToPublish) { AWSDDLogDebug(@"Sending message from session queue" ); AWSMQTTMessage *msg = [self.queue objectAtIndex:0]; [self.queue removeObjectAtIndex:0]; [encoder encodeMessage:msg]; count = count + 1; } - - AWSDDLogVerbose(@"%s [Line %d], Thread:%@ signaling on drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]); - dispatch_semaphore_signal(self.drainSenderQueueSemaphore); - AWSDDLogVerbose(@"%s [Line %d], Thread:%@ finished draining messages", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]); } + + @end