From 4c56deb1b8c846ab9520812ec7dde9c38454c623 Mon Sep 17 00:00:00 2001 From: Brennan Stehling Date: Fri, 8 Jul 2022 14:23:28 -0700 Subject: [PATCH 1/6] moves work which mutates state into private functions which are only run on a serial queue * eliminates Dispatch Semaphore --- AWSIoT/Internal/MQTTSDK/AWSMQTTEncoder.m | 82 +++++++++++++----------- 1 file changed, 44 insertions(+), 38 deletions(-) diff --git a/AWSIoT/Internal/MQTTSDK/AWSMQTTEncoder.m b/AWSIoT/Internal/MQTTSDK/AWSMQTTEncoder.m index a18b7941426..a675bc36591 100644 --- a/AWSIoT/Internal/MQTTSDK/AWSMQTTEncoder.m +++ b/AWSIoT/Internal/MQTTSDK/AWSMQTTEncoder.m @@ -22,18 +22,18 @@ @interface AWSMQTTEncoder () { NSInteger byteIndex; } -@property (nonatomic, strong) dispatch_semaphore_t encodeSemaphore; +@property (nonatomic, strong) dispatch_queue_t encodeQueue; @end @implementation AWSMQTTEncoder - (id)initWithStream:(NSOutputStream*)aStream - { +{ _status = AWSMQTTEncoderStatusInitializing; stream = aStream; [stream setDelegate:self]; - _encodeSemaphore = dispatch_semaphore_create(1); + _encodeQueue = dispatch_queue_create("com.amazon.aws.iot.encoder-queue", DISPATCH_QUEUE_SERIAL); return self; } @@ -48,7 +48,7 @@ - (void)close { AWSDDLogDebug(@"closing encoder stream."); [stream close]; [stream setDelegate:nil]; - stream = nil; + stream = nil; } //This is executed in the runLoop. @@ -70,25 +70,9 @@ - (void)stream:(NSStream*)sender handleEvent:(NSStreamEvent)eventCode { [_delegate encoder:self handleEvent:AWSMQTTEncoderEventReady]; } else if (_status == AWSMQTTEncoderStatusSending) { - UInt8* ptr; - NSInteger n, length; - - ptr = (UInt8*) [buffer bytes] + byteIndex; - // Number of bytes pending for transfer - length = [buffer length] - byteIndex; - n = [stream write:ptr maxLength:length]; - if (n == -1) { - _status = AWSMQTTEncoderStatusError; - [_delegate encoder:self handleEvent:AWSMQTTEncoderEventErrorOccurred]; - } - else if (n < length) { - byteIndex += n; - } - else { - buffer = NULL; - byteIndex = 0; - _status = AWSMQTTEncoderStatusReady; - } + dispatch_sync(self.encodeQueue, ^{ + [self writeBytes]; + }); } break; case NSStreamEventErrorOccurred: @@ -105,24 +89,27 @@ - (void)stream:(NSStream*)sender handleEvent:(NSStreamEvent)eventCode { } - (void)encodeMessage:(AWSMQTTMessage*)msg { - //Adding a mutex to prevent buffer from being modified by multiple threads - AWSDDLogVerbose(@"***** waiting on encodeSemaphore *****"); - dispatch_semaphore_wait(self.encodeSemaphore, DISPATCH_TIME_FOREVER); - AWSDDLogVerbose(@"***** passed encodeSempahore. *****"); + dispatch_sync(self.encodeQueue, ^{ + [self encodeWhenReady:msg]; + }); +} + +# pragma mark - Private/Serial Functions - + +- (void)encodeWhenReady:(AWSMQTTMessage*)msg { UInt8 header; NSInteger n, length; - + if (_status != AWSMQTTEncoderStatusReady) { AWSDDLogInfo(@"Encoder not ready"); - dispatch_semaphore_signal(self.encodeSemaphore); return; } - + assert (buffer == NULL); assert (byteIndex == 0); - + buffer = [[NSMutableData alloc] init]; - + // encode fixed header header = [msg type] << 4; if ([msg isDuplicate]) { @@ -133,7 +120,7 @@ - (void)encodeMessage:(AWSMQTTMessage*)msg { header |= 0x01; } [buffer appendBytes:&header length:1]; - + // encode remaining length length = [[msg data] length]; do { @@ -145,12 +132,12 @@ - (void)encodeMessage:(AWSMQTTMessage*)msg { [buffer appendBytes:&digit length:1]; } while (length > 0); - + // encode message data if ([msg data] != NULL) { [buffer appendData:[msg data]]; } - + n = [stream write:[buffer bytes] maxLength:[buffer length]]; if (n == -1) { _status = AWSMQTTEncoderStatusError; @@ -164,9 +151,28 @@ - (void)encodeMessage:(AWSMQTTMessage*)msg { buffer = NULL; // XXX [delegate encoder:self handleEvent:MQTTEncoderEventReady]; } - AWSDDLogVerbose(@"***** signaling encodeSemaphore *****"); - dispatch_semaphore_signal(self.encodeSemaphore); - AWSDDLogVerbose(@"<<%@>>: Encoder finished writing message", [NSThread currentThread]); +} + +- (void)writeBytes { + UInt8* ptr; + NSInteger n, length; + + ptr = (UInt8*) [buffer bytes] + byteIndex; + // Number of bytes pending for transfer + length = [buffer length] - byteIndex; + n = [stream write:ptr maxLength:length]; + if (n == -1) { + _status = AWSMQTTEncoderStatusError; + [_delegate encoder:self handleEvent:AWSMQTTEncoderEventErrorOccurred]; + } + else if (n < length) { + byteIndex += n; + } + else { + buffer = NULL; + byteIndex = 0; + _status = AWSMQTTEncoderStatusReady; + } } @end From d8da20f3170611b9de56e0bc0761ce29932cfe88 Mon Sep 17 00:00:00 2001 From: Brennan Stehling Date: Fri, 8 Jul 2022 14:29:22 -0700 Subject: [PATCH 2/6] adds assertions to ensure work is on the right queue --- AWSIoT/Internal/MQTTSDK/AWSMQTTEncoder.m | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/AWSIoT/Internal/MQTTSDK/AWSMQTTEncoder.m b/AWSIoT/Internal/MQTTSDK/AWSMQTTEncoder.m index a675bc36591..ddaa3b6e444 100644 --- a/AWSIoT/Internal/MQTTSDK/AWSMQTTEncoder.m +++ b/AWSIoT/Internal/MQTTSDK/AWSMQTTEncoder.m @@ -70,6 +70,7 @@ - (void)stream:(NSStream*)sender handleEvent:(NSStreamEvent)eventCode { [_delegate encoder:self handleEvent:AWSMQTTEncoderEventReady]; } else if (_status == AWSMQTTEncoderStatusSending) { + dispatch_assert_queue_not(self.encodeQueue); dispatch_sync(self.encodeQueue, ^{ [self writeBytes]; }); @@ -89,14 +90,16 @@ - (void)stream:(NSStream*)sender handleEvent:(NSStreamEvent)eventCode { } - (void)encodeMessage:(AWSMQTTMessage*)msg { + dispatch_assert_queue_not(self.encodeQueue); dispatch_sync(self.encodeQueue, ^{ [self encodeWhenReady:msg]; }); } -# pragma mark - Private/Serial Functions - +# pragma mark - private/serial functions - - (void)encodeWhenReady:(AWSMQTTMessage*)msg { + dispatch_assert_queue(self.encodeQueue); UInt8 header; NSInteger n, length; @@ -154,6 +157,7 @@ - (void)encodeWhenReady:(AWSMQTTMessage*)msg { } - (void)writeBytes { + dispatch_assert_queue(self.encodeQueue); UInt8* ptr; NSInteger n, length; From 4488d5979ee1f319facb0b6522bc9a6d4bfa767b Mon Sep 17 00:00:00 2001 From: Brennan Stehling Date: Fri, 8 Jul 2022 14:29:22 -0700 Subject: [PATCH 3/6] adds assertions to ensure work is on the right queue From 46885a01b27a12aca5790789b3c9accaec32d9c0 Mon Sep 17 00:00:00 2001 From: Brennan Stehling Date: Fri, 8 Jul 2022 15:32:56 -0700 Subject: [PATCH 4/6] replaces semaphore with queue in AWSMQTTSession --- AWSIoT/Internal/MQTTSDK/AWSMQTTSession.m | 80 +++++++++++++----------- 1 file changed, 43 insertions(+), 37 deletions(-) diff --git a/AWSIoT/Internal/MQTTSDK/AWSMQTTSession.m b/AWSIoT/Internal/MQTTSDK/AWSMQTTSession.m index fccec15e9ce..7a1f455ab2c 100644 --- a/AWSIoT/Internal/MQTTSDK/AWSMQTTSession.m +++ b/AWSIoT/Internal/MQTTSDK/AWSMQTTSession.m @@ -63,7 +63,7 @@ - (UInt16)nextMsgId; @property (strong,atomic) NSMutableArray* queue; //Queue to temporarily hold messages if encoder is busy sending another message @property (strong,atomic) NSMutableArray* timerRing; // circular array of 60. Each element is a set that contains the messages that need to be retried. -@property (strong,nonatomic) dispatch_semaphore_t drainSenderQueueSemaphore; +@property (nonatomic, strong) dispatch_queue_t drainSenderSerialQueue; @end @@ -96,7 +96,7 @@ - (id)initWithClientId:(NSString*)theClientId if (self = [super init]) { clientId = theClientId; - _drainSenderQueueSemaphore = dispatch_semaphore_create(1); + _drainSenderSerialQueue = dispatch_queue_create("com.amazon.aws.iot.drain-sender-queue", DISPATCH_QUEUE_SERIAL); keepAliveInterval = theKeepAliveInterval; connectMessage = msg; _publishRetryThrottle = publishRetryThrottle; @@ -345,22 +345,13 @@ - (void)encoder:(AWSMQTTEncoder*)sender handleEvent:(AWSMQTTEncoderEvent) eventC break; case AWSMQTTSessionStatusConnecting: break; - case AWSMQTTSessionStatusConnected: - AWSDDLogVerbose(@"%s [Line %d], Thread:%@ waiting on drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]); - dispatch_semaphore_wait(self.drainSenderQueueSemaphore, DISPATCH_TIME_FOREVER); - AWSDDLogVerbose(@"%s [Line %d], Thread:%@ passed drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]); - - if ([self.queue count] > 0) { - AWSDDLogDebug(@"Sending message from session queue" ); - AWSMQTTMessage *msg = [self.queue objectAtIndex:0]; - [self.queue removeObjectAtIndex:0]; - [encoder encodeMessage:msg]; - } - - AWSDDLogVerbose(@"%s [Line %d], Thread:%@ signaling on drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]); - dispatch_semaphore_signal(self.drainSenderQueueSemaphore); - AWSDDLogVerbose(@"%s [Line %d], Thread:%@ finshed draining messages", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]); + case AWSMQTTSessionStatusConnected: { + dispatch_assert_queue_not(self.drainSenderSerialQueue); + dispatch_sync(self.drainSenderSerialQueue, ^{ + [self queueNextMessage]; + }); break; + } case AWSMQTTSessionStatusError: break; } @@ -684,17 +675,10 @@ - (void)send:(AWSMQTTMessage*)msg { [encoder encodeMessage:msg]; } else { - AWSDDLogVerbose(@"%s [Line %d], Thread:%@ waiting on drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]); - dispatch_semaphore_wait(self.drainSenderQueueSemaphore, DISPATCH_TIME_FOREVER); - AWSDDLogVerbose(@"%s [Line %d], Thread:%@ passed drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]); - - AWSDDLogDebug(@"<<%@>>: MQTTSession.send added msg to queue to send later", [NSThread currentThread]); - [self.queue addObject:msg]; - - - AWSDDLogVerbose(@"%s [Line %d], Thread:%@ signaling drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]); - dispatch_semaphore_signal(self.drainSenderQueueSemaphore); - AWSDDLogVerbose(@"%s [Line %d], Thread:%@ finished draining messages", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]); + dispatch_assert_queue_not(self.drainSenderSerialQueue); + dispatch_sync(self.drainSenderSerialQueue, ^{ + [self queueMessage:msg]; + }); } } @@ -711,23 +695,45 @@ - (BOOL)isReadyToPublish { return encoder && [encoder status] == AWSMQTTEncoderStatusReady; } --(void) drainSenderQueue { - AWSDDLogVerbose(@"%s [Line %d], Thread:%@ waiting on drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]); - dispatch_semaphore_wait(self.drainSenderQueueSemaphore, DISPATCH_TIME_FOREVER); - AWSDDLogVerbose(@"%s [Line %d], Thread:%@ passed drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]); +- (void)drainSenderQueue { + dispatch_assert_queue_not(self.drainSenderSerialQueue); + dispatch_sync(self.drainSenderSerialQueue, ^{ + [self drainAllMessages]; + }); +} + +# pragma mark - private/serial functions - + +- (void)queueNextMessage { + dispatch_assert_queue(self.drainSenderSerialQueue); + + if ([self.queue count] > 0) { + AWSDDLogDebug(@"Sending message from session queue"); + AWSMQTTMessage *msg = [self.queue objectAtIndex:0]; + [self.queue removeObjectAtIndex:0]; + [encoder encodeMessage:msg]; + } +} + +- (void)queueMessage:(AWSMQTTMessage*)msg { + dispatch_assert_queue(self.drainSenderSerialQueue); + + [self.queue addObject:msg]; +} + +- (void)drainAllMessages { + dispatch_assert_queue(self.drainSenderSerialQueue); int count = 0; - while ([self.queue count] > 0 && count < _publishRetryThrottle && [self isReadyToPublish]) { + while (self.queue.count > 0 && count < _publishRetryThrottle && self.isReadyToPublish) { AWSDDLogDebug(@"Sending message from session queue" ); AWSMQTTMessage *msg = [self.queue objectAtIndex:0]; [self.queue removeObjectAtIndex:0]; [encoder encodeMessage:msg]; count = count + 1; } - - AWSDDLogVerbose(@"%s [Line %d], Thread:%@ signaling on drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]); - dispatch_semaphore_signal(self.drainSenderQueueSemaphore); - AWSDDLogVerbose(@"%s [Line %d], Thread:%@ finished draining messages", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]); } + + @end From db75ef4ef685890b61e71677a29a3d209e659cc4 Mon Sep 17 00:00:00 2001 From: Brennan Stehling Date: Fri, 8 Jul 2022 15:33:31 -0700 Subject: [PATCH 5/6] replaces semaphore with queue in AWSIoTMQTTClient --- AWSIoT/Internal/AWSIoTMQTTClient.m | 36 ++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/AWSIoT/Internal/AWSIoTMQTTClient.m b/AWSIoT/Internal/AWSIoTMQTTClient.m index 72dcd74caae..3c7e5b66fd0 100644 --- a/AWSIoT/Internal/AWSIoTMQTTClient.m +++ b/AWSIoT/Internal/AWSIoTMQTTClient.m @@ -80,6 +80,7 @@ @interface AWSIoTMQTTClient() Date: Wed, 13 Jul 2022 13:50:08 -0700 Subject: [PATCH 6/6] updates CHANGELOG.md --- CHANGELOG.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 33c4c54e728..e111fede57f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,15 +7,16 @@ ### Bug Fixes - **AWSIoT** - - fix(iot): Adds support for handling certificate with certificateId (See [PR #4219](https://github.com/aws-amplify/aws-sdk-ios/pull/4219)) + - Adds support for handling certificate with certificateId (See [PR #4219](https://github.com/aws-amplify/aws-sdk-ios/pull/4219)) + - Eliminates Dispatch Semaphore in MQTT internal code [PR #4211](https://github.com/aws-amplify/aws-sdk-ios/pull/4211) ## 2.27.12 ### Bug Fixes - **AWSMobileClient** - - fix(awsmobileClient): AWSMobileclient will refresh the token before making user attribute calls (See [PR #4215](https://github.com/aws-amplify/aws-sdk-ios/pull/4215)) - - fix(AWSMobileClient): Change logic to handle weak reference of token operations (See [PR #4205](https://github.com/aws-amplify/aws-sdk-ios/pull/4205)) +- fix(AWSMobileClient): AWSMobileclient will refresh the token before making user attribute calls (See [PR #4215](https://github.com/aws-amplify/aws-sdk-ios/pull/4215)) +- fix(AWSMobileClient): Change logic to handle weak reference of token operations (See [PR #4205](https://github.com/aws-amplify/aws-sdk-ios/pull/4205)) ## 2.27.11