Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improves concurrency with MQTT in AWSIoT #4211

Merged
merged 6 commits into from
Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 24 additions & 12 deletions AWSIoT/Internal/AWSIoTMQTTClient.m
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ @interface AWSIoTMQTTClient() <AWSSRWebSocketDelegate, NSStreamDelegate, AWSMQTT
@property (atomic, assign) BOOL runLoopShouldContinue;

@property (strong,atomic) dispatch_semaphore_t timerSemaphore;
@property (strong,atomic) dispatch_queue_t timerQueue;

@end

Expand Down Expand Up @@ -107,6 +108,7 @@ - (instancetype)init {
_userDidIssueConnect = NO;
_userDidIssueDisconnect = NO;
_timerSemaphore = dispatch_semaphore_create(1);
_timerQueue = dispatch_queue_create("com.amazon.aws.iot.timer-queue", DISPATCH_QUEUE_SERIAL);
_streamsThread = nil;
}
return self;
Expand Down Expand Up @@ -721,18 +723,10 @@ - (void)initiateReconnectTimer: (id) sender
//Set the timeout to 1800 seconds, which is 1.5x of the max keep-alive 1200 seconds.
//The unit of measure for the dispatch_time function is nano seconds.

dispatch_semaphore_wait(_timerSemaphore, dispatch_time(DISPATCH_TIME_NOW, 1800 * NSEC_PER_SEC));
BOOL isConnectingOrConnected = self.mqttStatus == AWSIoTMQTTStatusConnected || self.mqttStatus == AWSIoTMQTTStatusConnecting;
if (!self.reconnectTimer && !isConnectingOrConnected) {
self.reconnectTimer = [NSTimer timerWithTimeInterval:self.currentReconnectTime
target:self
selector: @selector(reconnectToSession)
userInfo:nil
repeats:NO];
[[NSRunLoop currentRunLoop] addTimer:self.reconnectTimer forMode:NSRunLoopCommonModes];
[[NSRunLoop currentRunLoop] runMode:NSDefaultRunLoopMode beforeDate:[NSDate distantFuture]];
}
dispatch_semaphore_signal(_timerSemaphore);
dispatch_assert_queue_not(self.timerQueue);
dispatch_async(self.timerQueue, ^{
[self scheduleReconnection];
});
}

- (void)openStreams:(id)sender
Expand Down Expand Up @@ -1315,4 +1309,22 @@ - (void)webSocket:(AWSSRWebSocket *)webSocket didReceivePong:(NSData *)pongPaylo
AWSDDLogVerbose(@"Websocket received pong");
}


# pragma mark - private/serial functions -

- (void)scheduleReconnection {
dispatch_assert_queue(self.timerQueue);

BOOL isConnectingOrConnected = self.mqttStatus == AWSIoTMQTTStatusConnected || self.mqttStatus == AWSIoTMQTTStatusConnecting;
if (!self.reconnectTimer && !isConnectingOrConnected) {
self.reconnectTimer = [NSTimer timerWithTimeInterval:self.currentReconnectTime
target:self
selector: @selector(reconnectToSession)
userInfo:nil
repeats:NO];
[[NSRunLoop currentRunLoop] addTimer:self.reconnectTimer forMode:NSRunLoopCommonModes];
[[NSRunLoop currentRunLoop] runMode:NSDefaultRunLoopMode beforeDate:[NSDate distantFuture]];
}
}

@end
86 changes: 48 additions & 38 deletions AWSIoT/Internal/MQTTSDK/AWSMQTTEncoder.m
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ @interface AWSMQTTEncoder () {
NSInteger byteIndex;
}

@property (nonatomic, strong) dispatch_semaphore_t encodeSemaphore;
@property (nonatomic, strong) dispatch_queue_t encodeQueue;

@end

@implementation AWSMQTTEncoder

- (id)initWithStream:(NSOutputStream*)aStream
{
{
_status = AWSMQTTEncoderStatusInitializing;
stream = aStream;
[stream setDelegate:self];
_encodeSemaphore = dispatch_semaphore_create(1);
_encodeQueue = dispatch_queue_create("com.amazon.aws.iot.encoder-queue", DISPATCH_QUEUE_SERIAL);
return self;
}

Expand All @@ -48,7 +48,7 @@ - (void)close {
AWSDDLogDebug(@"closing encoder stream.");
[stream close];
[stream setDelegate:nil];
stream = nil;
stream = nil;
}

//This is executed in the runLoop.
Expand All @@ -70,25 +70,10 @@ - (void)stream:(NSStream*)sender handleEvent:(NSStreamEvent)eventCode {
[_delegate encoder:self handleEvent:AWSMQTTEncoderEventReady];
}
else if (_status == AWSMQTTEncoderStatusSending) {
UInt8* ptr;
NSInteger n, length;

ptr = (UInt8*) [buffer bytes] + byteIndex;
// Number of bytes pending for transfer
length = [buffer length] - byteIndex;
n = [stream write:ptr maxLength:length];
if (n == -1) {
_status = AWSMQTTEncoderStatusError;
[_delegate encoder:self handleEvent:AWSMQTTEncoderEventErrorOccurred];
}
else if (n < length) {
byteIndex += n;
}
else {
buffer = NULL;
byteIndex = 0;
_status = AWSMQTTEncoderStatusReady;
}
dispatch_assert_queue_not(self.encodeQueue);
dispatch_sync(self.encodeQueue, ^{
[self writeBytes];
});
}
break;
case NSStreamEventErrorOccurred:
Expand All @@ -105,24 +90,29 @@ - (void)stream:(NSStream*)sender handleEvent:(NSStreamEvent)eventCode {
}

- (void)encodeMessage:(AWSMQTTMessage*)msg {
//Adding a mutex to prevent buffer from being modified by multiple threads
AWSDDLogVerbose(@"***** waiting on encodeSemaphore *****");
dispatch_semaphore_wait(self.encodeSemaphore, DISPATCH_TIME_FOREVER);
AWSDDLogVerbose(@"***** passed encodeSempahore. *****");
dispatch_assert_queue_not(self.encodeQueue);
dispatch_sync(self.encodeQueue, ^{
[self encodeWhenReady:msg];
});
}

# pragma mark - private/serial functions -

- (void)encodeWhenReady:(AWSMQTTMessage*)msg {
dispatch_assert_queue(self.encodeQueue);
UInt8 header;
NSInteger n, length;

if (_status != AWSMQTTEncoderStatusReady) {
AWSDDLogInfo(@"Encoder not ready");
dispatch_semaphore_signal(self.encodeSemaphore);
return;
}

assert (buffer == NULL);
assert (byteIndex == 0);

buffer = [[NSMutableData alloc] init];

// encode fixed header
header = [msg type] << 4;
if ([msg isDuplicate]) {
Expand All @@ -133,7 +123,7 @@ - (void)encodeMessage:(AWSMQTTMessage*)msg {
header |= 0x01;
}
[buffer appendBytes:&header length:1];

// encode remaining length
length = [[msg data] length];
do {
Expand All @@ -145,12 +135,12 @@ - (void)encodeMessage:(AWSMQTTMessage*)msg {
[buffer appendBytes:&digit length:1];
}
while (length > 0);

// encode message data
if ([msg data] != NULL) {
[buffer appendData:[msg data]];
}

n = [stream write:[buffer bytes] maxLength:[buffer length]];
if (n == -1) {
_status = AWSMQTTEncoderStatusError;
Expand All @@ -164,9 +154,29 @@ - (void)encodeMessage:(AWSMQTTMessage*)msg {
buffer = NULL;
// XXX [delegate encoder:self handleEvent:MQTTEncoderEventReady];
}
AWSDDLogVerbose(@"***** signaling encodeSemaphore *****");
dispatch_semaphore_signal(self.encodeSemaphore);
AWSDDLogVerbose(@"<<%@>>: Encoder finished writing message", [NSThread currentThread]);
}

- (void)writeBytes {
dispatch_assert_queue(self.encodeQueue);
UInt8* ptr;
NSInteger n, length;

ptr = (UInt8*) [buffer bytes] + byteIndex;
// Number of bytes pending for transfer
length = [buffer length] - byteIndex;
n = [stream write:ptr maxLength:length];
if (n == -1) {
_status = AWSMQTTEncoderStatusError;
[_delegate encoder:self handleEvent:AWSMQTTEncoderEventErrorOccurred];
}
else if (n < length) {
byteIndex += n;
}
else {
buffer = NULL;
byteIndex = 0;
_status = AWSMQTTEncoderStatusReady;
}
}

@end
80 changes: 43 additions & 37 deletions AWSIoT/Internal/MQTTSDK/AWSMQTTSession.m
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ - (UInt16)nextMsgId;

@property (strong,atomic) NSMutableArray* queue; //Queue to temporarily hold messages if encoder is busy sending another message
@property (strong,atomic) NSMutableArray* timerRing; // circular array of 60. Each element is a set that contains the messages that need to be retried.
@property (strong,nonatomic) dispatch_semaphore_t drainSenderQueueSemaphore;
@property (nonatomic, strong) dispatch_queue_t drainSenderSerialQueue;

@end

Expand Down Expand Up @@ -96,7 +96,7 @@ - (id)initWithClientId:(NSString*)theClientId

if (self = [super init]) {
clientId = theClientId;
_drainSenderQueueSemaphore = dispatch_semaphore_create(1);
_drainSenderSerialQueue = dispatch_queue_create("com.amazon.aws.iot.drain-sender-queue", DISPATCH_QUEUE_SERIAL);
keepAliveInterval = theKeepAliveInterval;
connectMessage = msg;
_publishRetryThrottle = publishRetryThrottle;
Expand Down Expand Up @@ -345,22 +345,13 @@ - (void)encoder:(AWSMQTTEncoder*)sender handleEvent:(AWSMQTTEncoderEvent) eventC
break;
case AWSMQTTSessionStatusConnecting:
break;
case AWSMQTTSessionStatusConnected:
AWSDDLogVerbose(@"%s [Line %d], Thread:%@ waiting on drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]);
dispatch_semaphore_wait(self.drainSenderQueueSemaphore, DISPATCH_TIME_FOREVER);
AWSDDLogVerbose(@"%s [Line %d], Thread:%@ passed drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]);

if ([self.queue count] > 0) {
AWSDDLogDebug(@"Sending message from session queue" );
AWSMQTTMessage *msg = [self.queue objectAtIndex:0];
[self.queue removeObjectAtIndex:0];
[encoder encodeMessage:msg];
}

AWSDDLogVerbose(@"%s [Line %d], Thread:%@ signaling on drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]);
dispatch_semaphore_signal(self.drainSenderQueueSemaphore);
AWSDDLogVerbose(@"%s [Line %d], Thread:%@ finshed draining messages", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]);
case AWSMQTTSessionStatusConnected: {
dispatch_assert_queue_not(self.drainSenderSerialQueue);
dispatch_sync(self.drainSenderSerialQueue, ^{
[self queueNextMessage];
});
break;
}
case AWSMQTTSessionStatusError:
break;
}
Expand Down Expand Up @@ -684,17 +675,10 @@ - (void)send:(AWSMQTTMessage*)msg {
[encoder encodeMessage:msg];
}
else {
AWSDDLogVerbose(@"%s [Line %d], Thread:%@ waiting on drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]);
dispatch_semaphore_wait(self.drainSenderQueueSemaphore, DISPATCH_TIME_FOREVER);
AWSDDLogVerbose(@"%s [Line %d], Thread:%@ passed drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]);

AWSDDLogDebug(@"<<%@>>: MQTTSession.send added msg to queue to send later", [NSThread currentThread]);
[self.queue addObject:msg];


AWSDDLogVerbose(@"%s [Line %d], Thread:%@ signaling drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]);
dispatch_semaphore_signal(self.drainSenderQueueSemaphore);
AWSDDLogVerbose(@"%s [Line %d], Thread:%@ finished draining messages", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]);
dispatch_assert_queue_not(self.drainSenderSerialQueue);
dispatch_sync(self.drainSenderSerialQueue, ^{
[self queueMessage:msg];
});
}
}

Expand All @@ -711,23 +695,45 @@ - (BOOL)isReadyToPublish {
return encoder && [encoder status] == AWSMQTTEncoderStatusReady;
}

-(void) drainSenderQueue {
AWSDDLogVerbose(@"%s [Line %d], Thread:%@ waiting on drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]);
dispatch_semaphore_wait(self.drainSenderQueueSemaphore, DISPATCH_TIME_FOREVER);
AWSDDLogVerbose(@"%s [Line %d], Thread:%@ passed drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]);
- (void)drainSenderQueue {
dispatch_assert_queue_not(self.drainSenderSerialQueue);
dispatch_sync(self.drainSenderSerialQueue, ^{
[self drainAllMessages];
});
}

# pragma mark - private/serial functions -

- (void)queueNextMessage {
dispatch_assert_queue(self.drainSenderSerialQueue);

if ([self.queue count] > 0) {
AWSDDLogDebug(@"Sending message from session queue");
AWSMQTTMessage *msg = [self.queue objectAtIndex:0];
[self.queue removeObjectAtIndex:0];
[encoder encodeMessage:msg];
}
}

- (void)queueMessage:(AWSMQTTMessage*)msg {
dispatch_assert_queue(self.drainSenderSerialQueue);

[self.queue addObject:msg];
}

- (void)drainAllMessages {
dispatch_assert_queue(self.drainSenderSerialQueue);

int count = 0;
while ([self.queue count] > 0 && count < _publishRetryThrottle && [self isReadyToPublish]) {
while (self.queue.count > 0 && count < _publishRetryThrottle && self.isReadyToPublish) {
AWSDDLogDebug(@"Sending message from session queue" );
AWSMQTTMessage *msg = [self.queue objectAtIndex:0];
[self.queue removeObjectAtIndex:0];
[encoder encodeMessage:msg];
count = count + 1;
}

AWSDDLogVerbose(@"%s [Line %d], Thread:%@ signaling on drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]);
dispatch_semaphore_signal(self.drainSenderQueueSemaphore);
AWSDDLogVerbose(@"%s [Line %d], Thread:%@ finished draining messages", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]);
}


@end

7 changes: 4 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@
### Bug Fixes

- **AWSIoT**
- fix(iot): Adds support for handling certificate with certificateId (See [PR #4219](https://github.com/aws-amplify/aws-sdk-ios/pull/4219))
- Adds support for handling certificate with certificateId (See [PR #4219](https://github.com/aws-amplify/aws-sdk-ios/pull/4219))
- Eliminates Dispatch Semaphore in MQTT internal code [PR #4211](https://github.com/aws-amplify/aws-sdk-ios/pull/4211)

## 2.27.12

### Bug Fixes

- **AWSMobileClient**
- fix(awsmobileClient): AWSMobileclient will refresh the token before making user attribute calls (See [PR #4215](https://github.com/aws-amplify/aws-sdk-ios/pull/4215))
- fix(AWSMobileClient): Change logic to handle weak reference of token operations (See [PR #4205](https://github.com/aws-amplify/aws-sdk-ios/pull/4205))
- fix(AWSMobileClient): AWSMobileclient will refresh the token before making user attribute calls (See [PR #4215](https://github.com/aws-amplify/aws-sdk-ios/pull/4215))
- fix(AWSMobileClient): Change logic to handle weak reference of token operations (See [PR #4205](https://github.com/aws-amplify/aws-sdk-ios/pull/4205))

## 2.27.11

Expand Down