Skip to content

Commit

Permalink
MQTT-Client-Framework 0.3.1
Browse files Browse the repository at this point in the history
>Release date: 2015-10-08

[NEW] comment out tvOS until Cocoapods supports it
[NEW] inbound throttling closes #54
  • Loading branch information
Christoph Krey committed Oct 8, 2015
1 parent fca4f17 commit 5e5f230
Show file tree
Hide file tree
Showing 5 changed files with 267 additions and 15 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
MQTT-Client-Framework iOS Release Notes
=======================================

## MQTT-Client-Framework 0.3.1
>Release date: 2015-10-08
[NEW] comment out tvOS until Cocoapods supports it
[NEW] inbound throttling closes #54

## MQTT-Client-Framework 0.3.0
>Release date: 2015-10-03
Expand Down
10 changes: 6 additions & 4 deletions MQTTClient.podspec
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
Pod::Spec.new do |s|
s.name = "MQTTClient"
s.version = "0.3.0"
s.version = "0.3.1"
s.summary = "iOS, OSX and tvOS native ObjectiveC MQTT Framework"
s.homepage = "https://github.com/ckrey/MQTT-Client-Framework"
s.license = { :type => "MIT", :file => "LICENSE" }
s.author = { "Christoph Krey" => "[email protected]" }
s.source = { :git => "https://github.com/ckrey/MQTT-Client-Framework.git", :tag => "0.3.0" }
s.source = { :git => "https://github.com/ckrey/MQTT-Client-Framework.git", :tag => "0.3.1" }

s.source_files = "MQTTClient/MQTTClient", "MQTTClient/MQTTClient/**/*.{h,m}"
s.requires_arc = true

s.platform = :ios, "6.1", :osx, "10.10", :tvos, "9.0"
# s.platform = :ios, "6.1", :osx, "10.10", :tvos, "9.0"
s.platform = :ios, "6.1", :osx, "10.10"

s.ios.deployment_target = "6.1"
s.osx.deployment_target = "10.10"
s.tvos.deployment_target = "9.0"
# s.tvos.deployment_target = "9.0"
end
16 changes: 16 additions & 0 deletions MQTTClient/MQTTClient/MQTTSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,22 @@ typedef NS_ENUM(NSInteger, MQTTSessionEvent) {
retained:(BOOL)retained
mid:(unsigned int)mid;

/** gets called when a new message was received
@param session the MQTTSession reporting the new message
@param data the data received, might be zero length
@param topic the topic the data was published to
@param qos the qos of the message
@param retained indicates if the data retransmitted from server storage
@param mid the Message Identifier of the message if qos = 1 or 2, zero otherwise
@return true if the message was or will be processed, false if the message shall not be ack-ed
*/
- (BOOL)newMessageWithFeedback:(MQTTSession *)session
data:(NSData *)data
onTopic:(NSString *)topic
qos:(MQTTQosLevel)qos
retained:(BOOL)retained
mid:(unsigned int)mid;

/** for mqttio-OBJC backward compatibility
@param session see newMessage for description
@param data see newMessage for description
Expand Down
61 changes: 51 additions & 10 deletions MQTTClient/MQTTClient/MQTTSession.m
Original file line number Diff line number Diff line change
Expand Up @@ -1195,10 +1195,24 @@ - (void)handlePublish:(MQTTMessage*)msg
NSRange range = NSMakeRange(2 + topicLength, [data length] - topicLength - 2);
data = [data subdataWithRange:range];
if ([msg qos] == 0) {
BOOL processed = true;
if ([self.delegate respondsToSelector:@selector(newMessage:data:onTopic:qos:retained:mid:)]) {
[self.delegate newMessage:self data:data onTopic:topic qos:msg.qos retained:msg.retainFlag mid:0];
[self.delegate newMessage:self
data:data
onTopic:topic
qos:msg.qos
retained:msg.retainFlag
mid:0];
}
if(self.messageHandler){
if ([self.delegate respondsToSelector:@selector(newMessageWithFeedback:data:onTopic:qos:retained:mid:)]) {
processed = [self.delegate newMessageWithFeedback:self
data:data
onTopic:topic
qos:msg.qos
retained:msg.retainFlag
mid:0];
}
if (self.messageHandler) {
self.messageHandler(data, topic);
}
} else {
Expand All @@ -1208,13 +1222,29 @@ - (void)handlePublish:(MQTTMessage*)msg
msg.mid = msgId;
data = [data subdataWithRange:NSMakeRange(2, [data length] - 2)];
if ([msg qos] == 1) {
BOOL processed = true;
if ([self.delegate respondsToSelector:@selector(newMessage:data:onTopic:qos:retained:mid:)]) {
[self.delegate newMessage:self data:data onTopic:topic qos:msg.qos retained:msg.retainFlag mid:msgId];
[self.delegate newMessage:self
data:data
onTopic:topic
qos:msg.qos
retained:msg.retainFlag
mid:msgId];
}
if ([self.delegate respondsToSelector:@selector(newMessageWithFeedback:data:onTopic:qos:retained:mid:)]) {
processed = [self.delegate newMessageWithFeedback:self
data:data
onTopic:topic
qos:msg.qos
retained:msg.retainFlag
mid:0];
}
if(self.messageHandler){
if (self.messageHandler) {
self.messageHandler(data, topic);
}
[self send:[MQTTMessage pubackMessageWithMessageId:msgId]];
if (processed) {
[self send:[MQTTMessage pubackMessageWithMessageId:msgId]];
}
return;
} else {
if (![self.persistence storeMessageForClientId:self.clientId
Expand Down Expand Up @@ -1325,6 +1355,7 @@ - (void)handlePubrel:(MQTTMessage*)msg
incomingFlag:YES
messageId:messageId];
if (flow) {
BOOL processed = true;
if ([self.delegate respondsToSelector:@selector(newMessage:data:onTopic:qos:retained:mid:)]) {
[self.delegate newMessage:self
data:flow.data
Expand All @@ -1334,15 +1365,25 @@ - (void)handlePubrel:(MQTTMessage*)msg
mid:[flow.messageId intValue]
];
}
if ([self.delegate respondsToSelector:@selector(newMessageWithFeedback:data:onTopic:qos:retained:mid:)]) {
processed = [self.delegate newMessageWithFeedback:self
data:flow.data
onTopic:flow.topic
qos:[flow.qosLevel intValue]
retained:[flow.retainedFlag boolValue]
mid:[flow.messageId intValue]
];
}
if(self.messageHandler){
self.messageHandler(flow.data, flow.topic);
}

[self.persistence deleteFlow:flow];
[self.persistence sync];
[self tell];
if (processed) {
[self.persistence deleteFlow:flow];
[self.persistence sync];
[self tell];
[self send:[MQTTMessage pubcompMessageWithMessageId:messageId]];
}
}
[self send:[MQTTMessage pubcompMessageWithMessageId:messageId]];
}
}

Expand Down
189 changes: 188 additions & 1 deletion MQTTClient/MQTTClientTests/MQTTClientTests.m
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ @interface MQTTClientTests : XCTestCase <MQTTSessionDelegate>
@property (nonatomic) BOOL timeout;
@property (nonatomic) int type;
@property (nonatomic) BOOL ungraceful;
@property (nonatomic) int received;
@property (nonatomic) int processed;
@property (strong, nonatomic) NSTimer *processingSimulationTimer;

@end

@implementation MQTTClientTests
Expand Down Expand Up @@ -633,6 +637,179 @@ - (void)test_dont_process_after_reject_MQTT_3_1_4_5 {
}
}

#define PROCESSING_NUMBER 20
#define PROCESSING_INTERVAL 0.1
#define PROCESSING_TIMEOUT 30

- (void)test_throttling_incoming_q0 {
for (NSString *broker in BROKERLIST) {
NSLog(@"testing broker %@", broker);
NSDictionary *parameters = BROKERS[broker];
self.session = [[MQTTSession alloc] initWithClientId:nil
userName:parameters[@"user"]
password:parameters[@"pass"]
keepAlive:60
cleanSession:YES
will:NO
willTopic:nil
willMsg:nil
willQoS:0
willRetainFlag:NO
protocolLevel:4
runLoop:[NSRunLoop currentRunLoop]
forMode:NSRunLoopCommonModes];
self.session.persistence.persistent = PERSISTENT;
[self connect:self.session parameters:parameters];
XCTAssert(!self.timeout, @"timeout");
XCTAssertEqual(self.event, MQTTSessionEventConnected, @"MQTTSessionEventConnected %@", self.error);

self.processed = 0;
self.received = 0;

self.processingSimulationTimer = [NSTimer scheduledTimerWithTimeInterval:PROCESSING_INTERVAL
target:self
selector:@selector(processingSimulation:)
userInfo:nil
repeats:true];
[self.session subscribeToTopic:TOPIC atLevel:MQTTQosLevelAtMostOnce];

for (int i = 0; i < PROCESSING_NUMBER; i++) {
NSString *payload = [NSString stringWithFormat:@"Data %d", i];
[self.session publishData:[payload dataUsingEncoding:NSUTF8StringEncoding] onTopic:TOPIC retain:false qos:MQTTQosLevelAtMostOnce];
}

self.timeout = FALSE;
[NSObject cancelPreviousPerformRequestsWithTarget:self];
[self performSelector:@selector(ackTimeout:)
withObject:nil
afterDelay:PROCESSING_TIMEOUT];

while ((self.processed != self.received || self.received != PROCESSING_NUMBER) && !self.timeout) {
NSLog(@"waiting for processing");
[[NSRunLoop currentRunLoop] runUntilDate:[NSDate dateWithTimeIntervalSinceNow:1]];
}

XCTAssert(!self.timeout, @"timeout");

[self shutdown:parameters];
}
}

- (void)test_throttling_incoming_q1 {
for (NSString *broker in BROKERLIST) {
NSLog(@"testing broker %@", broker);
NSDictionary *parameters = BROKERS[broker];
self.session = [[MQTTSession alloc] initWithClientId:nil
userName:parameters[@"user"]
password:parameters[@"pass"]
keepAlive:60
cleanSession:YES
will:NO
willTopic:nil
willMsg:nil
willQoS:0
willRetainFlag:NO
protocolLevel:4
runLoop:[NSRunLoop currentRunLoop]
forMode:NSRunLoopCommonModes];
self.session.persistence.persistent = PERSISTENT;
[self connect:self.session parameters:parameters];
XCTAssert(!self.timeout, @"timeout");
XCTAssertEqual(self.event, MQTTSessionEventConnected, @"MQTTSessionEventConnected %@", self.error);

self.processed = 0;
self.received = 0;

self.processingSimulationTimer = [NSTimer scheduledTimerWithTimeInterval:PROCESSING_INTERVAL
target:self
selector:@selector(processingSimulation:)
userInfo:nil
repeats:true];
[self.session subscribeToTopic:TOPIC atLevel:MQTTQosLevelAtLeastOnce];

for (int i = 0; i < PROCESSING_NUMBER; i++) {
NSString *payload = [NSString stringWithFormat:@"Data %d", i];
[self.session publishData:[payload dataUsingEncoding:NSUTF8StringEncoding] onTopic:TOPIC retain:false qos:MQTTQosLevelAtLeastOnce];
}

self.timeout = FALSE;
[NSObject cancelPreviousPerformRequestsWithTarget:self];
[self performSelector:@selector(ackTimeout:)
withObject:nil
afterDelay:PROCESSING_TIMEOUT];

while ((self.processed != self.received || self.received != PROCESSING_NUMBER) && !self.timeout) {
NSLog(@"waiting for processing");
[[NSRunLoop currentRunLoop] runUntilDate:[NSDate dateWithTimeIntervalSinceNow:1]];
}

XCTAssert(!self.timeout, @"timeout");

[self shutdown:parameters];
}
}

- (void)test_throttling_incoming_q2 {
for (NSString *broker in BROKERLIST) {
NSLog(@"testing broker %@", broker);
NSDictionary *parameters = BROKERS[broker];
self.session = [[MQTTSession alloc] initWithClientId:nil
userName:parameters[@"user"]
password:parameters[@"pass"]
keepAlive:60
cleanSession:YES
will:NO
willTopic:nil
willMsg:nil
willQoS:0
willRetainFlag:NO
protocolLevel:4
runLoop:[NSRunLoop currentRunLoop]
forMode:NSRunLoopCommonModes];
self.session.persistence.persistent = PERSISTENT;
[self connect:self.session parameters:parameters];
XCTAssert(!self.timeout, @"timeout");
XCTAssertEqual(self.event, MQTTSessionEventConnected, @"MQTTSessionEventConnected %@", self.error);

self.processed = 0;
self.received = 0;

self.processingSimulationTimer = [NSTimer scheduledTimerWithTimeInterval:PROCESSING_INTERVAL
target:self
selector:@selector(processingSimulation:)
userInfo:nil
repeats:true];
[self.session subscribeToTopic:TOPIC atLevel:MQTTQosLevelExactlyOnce];

for (int i = 0; i < PROCESSING_NUMBER; i++) {
NSString *payload = [NSString stringWithFormat:@"Data %d", i];
[self.session publishData:[payload dataUsingEncoding:NSUTF8StringEncoding] onTopic:TOPIC retain:false qos:MQTTQosLevelExactlyOnce];
}

self.timeout = FALSE;
[NSObject cancelPreviousPerformRequestsWithTarget:self];
[self performSelector:@selector(ackTimeout:)
withObject:nil
afterDelay:PROCESSING_TIMEOUT];

while ((self.processed != self.received || self.received != PROCESSING_NUMBER) && !self.timeout) {
NSLog(@"waiting for processing");
[[NSRunLoop currentRunLoop] runUntilDate:[NSDate dateWithTimeIntervalSinceNow:1]];
}

XCTAssert(!self.timeout, @"timeout");

[self shutdown:parameters];
}
}

- (void)processingSimulation:(id)userInfo {
NSLog(@"processingSimulation %d/%d", self.processed, self.received);
if (self.received > self.processed) {
self.processed++;
}
}

/*
* Client Certificate
*/
Expand Down Expand Up @@ -921,7 +1098,17 @@ - (void)received:(MQTTSession *)session type:(int)type qos:(MQTTQosLevel)qos ret
}

- (void)newMessage:(MQTTSession *)session data:(NSData *)data onTopic:(NSString *)topic qos:(MQTTQosLevel)qos retained:(BOOL)retained mid:(unsigned int)mid {
NSLog(@"newMessage:%@ onTopic:%@ qos:%d retained:%d mid:%d", data, topic, qos, retained, mid);
NSLog(@"newMessage(%d):%@ onTopic:%@ qos:%d retained:%d mid:%d", self.received, data, topic, qos, retained, mid);
}

- (BOOL)newMessageWithFeedback:(MQTTSession *)session data:(NSData *)data onTopic:(NSString *)topic qos:(MQTTQosLevel)qos retained:(BOOL)retained mid:(unsigned int)mid {
NSLog(@"newMessageWithFeedback(%d):%@ onTopic:%@ qos:%d retained:%d mid:%d", self.processed, data, topic, qos, retained, mid);
if (self.processed > self.received - 10) {
self.received++;
return true;
} else {
return false;
}
}

- (void)handleEvent:(MQTTSession *)session event:(MQTTSessionEvent)eventCode error:(NSError *)error {
Expand Down

0 comments on commit 5e5f230

Please sign in to comment.