Skip to content
This repository has been archived by the owner on May 28, 2024. It is now read-only.

Commit

Permalink
Improves concurrency with MQTT in AWSIoT (aws-amplify#4211)
Browse files Browse the repository at this point in the history
* eliminates Dispatch Semaphore
* moves work which mutates state into private functions which are only run on a serial queue
* adds assertions to ensure work is on the right queue
* replaces semaphore with queue in AWSMQTTEncoder, AWSMQTTSession and AWSIoTMQTTClient
  • Loading branch information
brennanMKE authored and samkudr committed Sep 26, 2022
1 parent 6838bdc commit 0746568
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 90 deletions.
36 changes: 24 additions & 12 deletions AWSIoT/Internal/AWSIoTMQTTClient.m
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ @interface AWSIoTMQTTClient() <AWSSRWebSocketDelegate, NSStreamDelegate, AWSMQTT
@property (atomic, assign) BOOL runLoopShouldContinue;

@property (strong,atomic) dispatch_semaphore_t timerSemaphore;
@property (strong,atomic) dispatch_queue_t timerQueue;

@end

Expand Down Expand Up @@ -107,6 +108,7 @@ - (instancetype)init {
_userDidIssueConnect = NO;
_userDidIssueDisconnect = NO;
_timerSemaphore = dispatch_semaphore_create(1);
_timerQueue = dispatch_queue_create("com.amazon.aws.iot.timer-queue", DISPATCH_QUEUE_SERIAL);
_streamsThread = nil;
}
return self;
Expand Down Expand Up @@ -721,18 +723,10 @@ - (void)initiateReconnectTimer: (id) sender
//Set the timeout to 1800 seconds, which is 1.5x of the max keep-alive 1200 seconds.
//The unit of measure for the dispatch_time function is nano seconds.

dispatch_semaphore_wait(_timerSemaphore, dispatch_time(DISPATCH_TIME_NOW, 1800 * NSEC_PER_SEC));
BOOL isConnectingOrConnected = self.mqttStatus == AWSIoTMQTTStatusConnected || self.mqttStatus == AWSIoTMQTTStatusConnecting;
if (!self.reconnectTimer && !isConnectingOrConnected) {
self.reconnectTimer = [NSTimer timerWithTimeInterval:self.currentReconnectTime
target:self
selector: @selector(reconnectToSession)
userInfo:nil
repeats:NO];
[[NSRunLoop currentRunLoop] addTimer:self.reconnectTimer forMode:NSRunLoopCommonModes];
[[NSRunLoop currentRunLoop] runMode:NSDefaultRunLoopMode beforeDate:[NSDate distantFuture]];
}
dispatch_semaphore_signal(_timerSemaphore);
dispatch_assert_queue_not(self.timerQueue);
dispatch_async(self.timerQueue, ^{
[self scheduleReconnection];
});
}

- (void)openStreams:(id)sender
Expand Down Expand Up @@ -1315,4 +1309,22 @@ - (void)webSocket:(AWSSRWebSocket *)webSocket didReceivePong:(NSData *)pongPaylo
AWSDDLogVerbose(@"Websocket received pong");
}


# pragma mark - private/serial functions -

- (void)scheduleReconnection {
dispatch_assert_queue(self.timerQueue);

BOOL isConnectingOrConnected = self.mqttStatus == AWSIoTMQTTStatusConnected || self.mqttStatus == AWSIoTMQTTStatusConnecting;
if (!self.reconnectTimer && !isConnectingOrConnected) {
self.reconnectTimer = [NSTimer timerWithTimeInterval:self.currentReconnectTime
target:self
selector: @selector(reconnectToSession)
userInfo:nil
repeats:NO];
[[NSRunLoop currentRunLoop] addTimer:self.reconnectTimer forMode:NSRunLoopCommonModes];
[[NSRunLoop currentRunLoop] runMode:NSDefaultRunLoopMode beforeDate:[NSDate distantFuture]];
}
}

@end
86 changes: 48 additions & 38 deletions AWSIoT/Internal/MQTTSDK/AWSMQTTEncoder.m
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ @interface AWSMQTTEncoder () {
NSInteger byteIndex;
}

@property (nonatomic, strong) dispatch_semaphore_t encodeSemaphore;
@property (nonatomic, strong) dispatch_queue_t encodeQueue;

@end

@implementation AWSMQTTEncoder

- (id)initWithStream:(NSOutputStream*)aStream
{
{
_status = AWSMQTTEncoderStatusInitializing;
stream = aStream;
[stream setDelegate:self];
_encodeSemaphore = dispatch_semaphore_create(1);
_encodeQueue = dispatch_queue_create("com.amazon.aws.iot.encoder-queue", DISPATCH_QUEUE_SERIAL);
return self;
}

Expand All @@ -48,7 +48,7 @@ - (void)close {
AWSDDLogDebug(@"closing encoder stream.");
[stream close];
[stream setDelegate:nil];
stream = nil;
stream = nil;
}

//This is executed in the runLoop.
Expand All @@ -70,25 +70,10 @@ - (void)stream:(NSStream*)sender handleEvent:(NSStreamEvent)eventCode {
[_delegate encoder:self handleEvent:AWSMQTTEncoderEventReady];
}
else if (_status == AWSMQTTEncoderStatusSending) {
UInt8* ptr;
NSInteger n, length;

ptr = (UInt8*) [buffer bytes] + byteIndex;
// Number of bytes pending for transfer
length = [buffer length] - byteIndex;
n = [stream write:ptr maxLength:length];
if (n == -1) {
_status = AWSMQTTEncoderStatusError;
[_delegate encoder:self handleEvent:AWSMQTTEncoderEventErrorOccurred];
}
else if (n < length) {
byteIndex += n;
}
else {
buffer = NULL;
byteIndex = 0;
_status = AWSMQTTEncoderStatusReady;
}
dispatch_assert_queue_not(self.encodeQueue);
dispatch_sync(self.encodeQueue, ^{
[self writeBytes];
});
}
break;
case NSStreamEventErrorOccurred:
Expand All @@ -105,24 +90,29 @@ - (void)stream:(NSStream*)sender handleEvent:(NSStreamEvent)eventCode {
}

- (void)encodeMessage:(AWSMQTTMessage*)msg {
//Adding a mutex to prevent buffer from being modified by multiple threads
AWSDDLogVerbose(@"***** waiting on encodeSemaphore *****");
dispatch_semaphore_wait(self.encodeSemaphore, DISPATCH_TIME_FOREVER);
AWSDDLogVerbose(@"***** passed encodeSempahore. *****");
dispatch_assert_queue_not(self.encodeQueue);
dispatch_sync(self.encodeQueue, ^{
[self encodeWhenReady:msg];
});
}

# pragma mark - private/serial functions -

- (void)encodeWhenReady:(AWSMQTTMessage*)msg {
dispatch_assert_queue(self.encodeQueue);
UInt8 header;
NSInteger n, length;

if (_status != AWSMQTTEncoderStatusReady) {
AWSDDLogInfo(@"Encoder not ready");
dispatch_semaphore_signal(self.encodeSemaphore);
return;
}

assert (buffer == NULL);
assert (byteIndex == 0);

buffer = [[NSMutableData alloc] init];

// encode fixed header
header = [msg type] << 4;
if ([msg isDuplicate]) {
Expand All @@ -133,7 +123,7 @@ - (void)encodeMessage:(AWSMQTTMessage*)msg {
header |= 0x01;
}
[buffer appendBytes:&header length:1];

// encode remaining length
length = [[msg data] length];
do {
Expand All @@ -145,12 +135,12 @@ - (void)encodeMessage:(AWSMQTTMessage*)msg {
[buffer appendBytes:&digit length:1];
}
while (length > 0);

// encode message data
if ([msg data] != NULL) {
[buffer appendData:[msg data]];
}

n = [stream write:[buffer bytes] maxLength:[buffer length]];
if (n == -1) {
_status = AWSMQTTEncoderStatusError;
Expand All @@ -164,9 +154,29 @@ - (void)encodeMessage:(AWSMQTTMessage*)msg {
buffer = NULL;
// XXX [delegate encoder:self handleEvent:MQTTEncoderEventReady];
}
AWSDDLogVerbose(@"***** signaling encodeSemaphore *****");
dispatch_semaphore_signal(self.encodeSemaphore);
AWSDDLogVerbose(@"<<%@>>: Encoder finished writing message", [NSThread currentThread]);
}

- (void)writeBytes {
dispatch_assert_queue(self.encodeQueue);
UInt8* ptr;
NSInteger n, length;

ptr = (UInt8*) [buffer bytes] + byteIndex;
// Number of bytes pending for transfer
length = [buffer length] - byteIndex;
n = [stream write:ptr maxLength:length];
if (n == -1) {
_status = AWSMQTTEncoderStatusError;
[_delegate encoder:self handleEvent:AWSMQTTEncoderEventErrorOccurred];
}
else if (n < length) {
byteIndex += n;
}
else {
buffer = NULL;
byteIndex = 0;
_status = AWSMQTTEncoderStatusReady;
}
}

@end
80 changes: 43 additions & 37 deletions AWSIoT/Internal/MQTTSDK/AWSMQTTSession.m
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ - (UInt16)nextMsgId;

@property (strong,atomic) NSMutableArray* queue; //Queue to temporarily hold messages if encoder is busy sending another message
@property (strong,atomic) NSMutableArray* timerRing; // circular array of 60. Each element is a set that contains the messages that need to be retried.
@property (strong,nonatomic) dispatch_semaphore_t drainSenderQueueSemaphore;
@property (nonatomic, strong) dispatch_queue_t drainSenderSerialQueue;

@end

Expand Down Expand Up @@ -96,7 +96,7 @@ - (id)initWithClientId:(NSString*)theClientId

if (self = [super init]) {
clientId = theClientId;
_drainSenderQueueSemaphore = dispatch_semaphore_create(1);
_drainSenderSerialQueue = dispatch_queue_create("com.amazon.aws.iot.drain-sender-queue", DISPATCH_QUEUE_SERIAL);
keepAliveInterval = theKeepAliveInterval;
connectMessage = msg;
_publishRetryThrottle = publishRetryThrottle;
Expand Down Expand Up @@ -345,22 +345,13 @@ - (void)encoder:(AWSMQTTEncoder*)sender handleEvent:(AWSMQTTEncoderEvent) eventC
break;
case AWSMQTTSessionStatusConnecting:
break;
case AWSMQTTSessionStatusConnected:
AWSDDLogVerbose(@"%s [Line %d], Thread:%@ waiting on drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]);
dispatch_semaphore_wait(self.drainSenderQueueSemaphore, DISPATCH_TIME_FOREVER);
AWSDDLogVerbose(@"%s [Line %d], Thread:%@ passed drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]);

if ([self.queue count] > 0) {
AWSDDLogDebug(@"Sending message from session queue" );
AWSMQTTMessage *msg = [self.queue objectAtIndex:0];
[self.queue removeObjectAtIndex:0];
[encoder encodeMessage:msg];
}

AWSDDLogVerbose(@"%s [Line %d], Thread:%@ signaling on drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]);
dispatch_semaphore_signal(self.drainSenderQueueSemaphore);
AWSDDLogVerbose(@"%s [Line %d], Thread:%@ finshed draining messages", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]);
case AWSMQTTSessionStatusConnected: {
dispatch_assert_queue_not(self.drainSenderSerialQueue);
dispatch_sync(self.drainSenderSerialQueue, ^{
[self queueNextMessage];
});
break;
}
case AWSMQTTSessionStatusError:
break;
}
Expand Down Expand Up @@ -684,17 +675,10 @@ - (void)send:(AWSMQTTMessage*)msg {
[encoder encodeMessage:msg];
}
else {
AWSDDLogVerbose(@"%s [Line %d], Thread:%@ waiting on drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]);
dispatch_semaphore_wait(self.drainSenderQueueSemaphore, DISPATCH_TIME_FOREVER);
AWSDDLogVerbose(@"%s [Line %d], Thread:%@ passed drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]);

AWSDDLogDebug(@"<<%@>>: MQTTSession.send added msg to queue to send later", [NSThread currentThread]);
[self.queue addObject:msg];


AWSDDLogVerbose(@"%s [Line %d], Thread:%@ signaling drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]);
dispatch_semaphore_signal(self.drainSenderQueueSemaphore);
AWSDDLogVerbose(@"%s [Line %d], Thread:%@ finished draining messages", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]);
dispatch_assert_queue_not(self.drainSenderSerialQueue);
dispatch_sync(self.drainSenderSerialQueue, ^{
[self queueMessage:msg];
});
}
}

Expand All @@ -711,23 +695,45 @@ - (BOOL)isReadyToPublish {
return encoder && [encoder status] == AWSMQTTEncoderStatusReady;
}

-(void) drainSenderQueue {
AWSDDLogVerbose(@"%s [Line %d], Thread:%@ waiting on drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]);
dispatch_semaphore_wait(self.drainSenderQueueSemaphore, DISPATCH_TIME_FOREVER);
AWSDDLogVerbose(@"%s [Line %d], Thread:%@ passed drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]);
- (void)drainSenderQueue {
dispatch_assert_queue_not(self.drainSenderSerialQueue);
dispatch_sync(self.drainSenderSerialQueue, ^{
[self drainAllMessages];
});
}

# pragma mark - private/serial functions -

- (void)queueNextMessage {
dispatch_assert_queue(self.drainSenderSerialQueue);

if ([self.queue count] > 0) {
AWSDDLogDebug(@"Sending message from session queue");
AWSMQTTMessage *msg = [self.queue objectAtIndex:0];
[self.queue removeObjectAtIndex:0];
[encoder encodeMessage:msg];
}
}

- (void)queueMessage:(AWSMQTTMessage*)msg {
dispatch_assert_queue(self.drainSenderSerialQueue);

[self.queue addObject:msg];
}

- (void)drainAllMessages {
dispatch_assert_queue(self.drainSenderSerialQueue);

int count = 0;
while ([self.queue count] > 0 && count < _publishRetryThrottle && [self isReadyToPublish]) {
while (self.queue.count > 0 && count < _publishRetryThrottle && self.isReadyToPublish) {
AWSDDLogDebug(@"Sending message from session queue" );
AWSMQTTMessage *msg = [self.queue objectAtIndex:0];
[self.queue removeObjectAtIndex:0];
[encoder encodeMessage:msg];
count = count + 1;
}

AWSDDLogVerbose(@"%s [Line %d], Thread:%@ signaling on drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]);
dispatch_semaphore_signal(self.drainSenderQueueSemaphore);
AWSDDLogVerbose(@"%s [Line %d], Thread:%@ finished draining messages", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]);
}


@end

7 changes: 4 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@
### Bug Fixes

- **AWSIoT**
- fix(iot): Adds support for handling certificate with certificateId (See [PR #4219](https://github.com/aws-amplify/aws-sdk-ios/pull/4219))
- Adds support for handling certificate with certificateId (See [PR #4219](https://github.com/aws-amplify/aws-sdk-ios/pull/4219))
- Eliminates Dispatch Semaphore in MQTT internal code [PR #4211](https://github.com/aws-amplify/aws-sdk-ios/pull/4211)

## 2.27.12

### Bug Fixes

- **AWSMobileClient**
- fix(awsmobileClient): AWSMobileclient will refresh the token before making user attribute calls (See [PR #4215](https://github.com/aws-amplify/aws-sdk-ios/pull/4215))
- fix(AWSMobileClient): Change logic to handle weak reference of token operations (See [PR #4205](https://github.com/aws-amplify/aws-sdk-ios/pull/4205))
- fix(AWSMobileClient): AWSMobileclient will refresh the token before making user attribute calls (See [PR #4215](https://github.com/aws-amplify/aws-sdk-ios/pull/4215))
- fix(AWSMobileClient): Change logic to handle weak reference of token operations (See [PR #4205](https://github.com/aws-amplify/aws-sdk-ios/pull/4205))

## 2.27.11

Expand Down

0 comments on commit 0746568

Please sign in to comment.