Skip to content

Commit

Permalink
replaces semaphore with queue in AWSMQTTSession
Browse files Browse the repository at this point in the history
  • Loading branch information
Brennan Stehling committed Jul 8, 2022
1 parent 57bb6ff commit 015f416
Showing 1 changed file with 43 additions and 37 deletions.
80 changes: 43 additions & 37 deletions AWSIoT/Internal/MQTTSDK/AWSMQTTSession.m
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ - (UInt16)nextMsgId;

@property (strong,atomic) NSMutableArray* queue; //Queue to temporarily hold messages if encoder is busy sending another message
@property (strong,atomic) NSMutableArray* timerRing; // circular array of 60. Each element is a set that contains the messages that need to be retried.
@property (strong,nonatomic) dispatch_semaphore_t drainSenderQueueSemaphore;
@property (nonatomic, strong) dispatch_queue_t drainSenderSerialQueue;

@end

Expand Down Expand Up @@ -96,7 +96,7 @@ - (id)initWithClientId:(NSString*)theClientId

if (self = [super init]) {
clientId = theClientId;
_drainSenderQueueSemaphore = dispatch_semaphore_create(1);
_drainSenderSerialQueue = dispatch_queue_create("com.amazon.aws.iot.drain-sender-queue", DISPATCH_QUEUE_SERIAL);
keepAliveInterval = theKeepAliveInterval;
connectMessage = msg;
_publishRetryThrottle = publishRetryThrottle;
Expand Down Expand Up @@ -345,22 +345,13 @@ - (void)encoder:(AWSMQTTEncoder*)sender handleEvent:(AWSMQTTEncoderEvent) eventC
break;
case AWSMQTTSessionStatusConnecting:
break;
case AWSMQTTSessionStatusConnected:
AWSDDLogVerbose(@"%s [Line %d], Thread:%@ waiting on drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]);
dispatch_semaphore_wait(self.drainSenderQueueSemaphore, DISPATCH_TIME_FOREVER);
AWSDDLogVerbose(@"%s [Line %d], Thread:%@ passed drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]);

if ([self.queue count] > 0) {
AWSDDLogDebug(@"Sending message from session queue" );
AWSMQTTMessage *msg = [self.queue objectAtIndex:0];
[self.queue removeObjectAtIndex:0];
[encoder encodeMessage:msg];
}

AWSDDLogVerbose(@"%s [Line %d], Thread:%@ signaling on drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]);
dispatch_semaphore_signal(self.drainSenderQueueSemaphore);
AWSDDLogVerbose(@"%s [Line %d], Thread:%@ finshed draining messages", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]);
case AWSMQTTSessionStatusConnected: {
dispatch_assert_queue_not(self.drainSenderSerialQueue);
dispatch_sync(self.drainSenderSerialQueue, ^{
[self queueNextMessage];
});
break;
}
case AWSMQTTSessionStatusError:
break;
}
Expand Down Expand Up @@ -684,17 +675,10 @@ - (void)send:(AWSMQTTMessage*)msg {
[encoder encodeMessage:msg];
}
else {
AWSDDLogVerbose(@"%s [Line %d], Thread:%@ waiting on drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]);
dispatch_semaphore_wait(self.drainSenderQueueSemaphore, DISPATCH_TIME_FOREVER);
AWSDDLogVerbose(@"%s [Line %d], Thread:%@ passed drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]);

AWSDDLogDebug(@"<<%@>>: MQTTSession.send added msg to queue to send later", [NSThread currentThread]);
[self.queue addObject:msg];


AWSDDLogVerbose(@"%s [Line %d], Thread:%@ signaling drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]);
dispatch_semaphore_signal(self.drainSenderQueueSemaphore);
AWSDDLogVerbose(@"%s [Line %d], Thread:%@ finished draining messages", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]);
dispatch_assert_queue_not(self.drainSenderSerialQueue);
dispatch_sync(self.drainSenderSerialQueue, ^{
[self queueMessage:msg];
});
}
}

Expand All @@ -711,23 +695,45 @@ - (BOOL)isReadyToPublish {
return encoder && [encoder status] == AWSMQTTEncoderStatusReady;
}

-(void) drainSenderQueue {
AWSDDLogVerbose(@"%s [Line %d], Thread:%@ waiting on drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]);
dispatch_semaphore_wait(self.drainSenderQueueSemaphore, DISPATCH_TIME_FOREVER);
AWSDDLogVerbose(@"%s [Line %d], Thread:%@ passed drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]);
- (void)drainSenderQueue {
dispatch_assert_queue_not(self.drainSenderSerialQueue);
dispatch_sync(self.drainSenderSerialQueue, ^{
[self drainAllMessages];
});
}

# pragma mark - private/serial functions -

- (void)queueNextMessage {
dispatch_assert_queue(self.drainSenderSerialQueue);

if ([self.queue count] > 0) {
AWSDDLogDebug(@"Sending message from session queue");
AWSMQTTMessage *msg = [self.queue objectAtIndex:0];
[self.queue removeObjectAtIndex:0];
[encoder encodeMessage:msg];
}
}

- (void)queueMessage:(AWSMQTTMessage*)msg {
dispatch_assert_queue(self.drainSenderSerialQueue);

[self.queue addObject:msg];
}

- (void)drainAllMessages {
dispatch_assert_queue(self.drainSenderSerialQueue);

int count = 0;
while ([self.queue count] > 0 && count < _publishRetryThrottle && [self isReadyToPublish]) {
while (self.queue.count > 0 && count < _publishRetryThrottle && self.isReadyToPublish) {
AWSDDLogDebug(@"Sending message from session queue" );
AWSMQTTMessage *msg = [self.queue objectAtIndex:0];
[self.queue removeObjectAtIndex:0];
[encoder encodeMessage:msg];
count = count + 1;
}

AWSDDLogVerbose(@"%s [Line %d], Thread:%@ signaling on drainSenderQueueSemaphore", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]);
dispatch_semaphore_signal(self.drainSenderQueueSemaphore);
AWSDDLogVerbose(@"%s [Line %d], Thread:%@ finished draining messages", __PRETTY_FUNCTION__, __LINE__, [NSThread currentThread]);
}


@end

0 comments on commit 015f416

Please sign in to comment.