Skip to content

Commit

Permalink
Changes based on some review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Torsph committed Aug 22, 2016
1 parent caeccf7 commit 0d18560
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 62 deletions.
18 changes: 17 additions & 1 deletion test/mock/mockMQTTClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ function mockMQTTClient( wrapper, options ) {
this.subscribeQosValues.length = 0;
this.publishQosValues = new Array;
this.publishQosValues.length = 0;
this.triggerError = null;

// Reinit the record list
this.reInitCommandCalled = function() {
Expand Down Expand Up @@ -68,22 +69,37 @@ function mockMQTTClient( wrapper, options ) {
callback = callback || '';
this.commandCalled['subscribe'] += 1;

var granted = [];
if ( Object.prototype.toString.call(topic) === '[object Array]' ) {
topic.forEach( function( item, index, array ) {
var grantedTopic = {topic: item, qos: 0}
that.subscriptions.push( item );
if (!isUndefined( options.qos )) {
that.subscribeQosValues.push( options.qos );
grantedTopic.qos = options.qos;
}

if (that.triggerError) {
grantedTopic.qos = 128;
}

granted.push(grantedTopic);
});
}
else {
var grantedTopic = {topic: topic, qos: 0}
this.subscriptions.push( topic );
if (!isUndefined( options.qos )) {
that.subscribeQosValues.push( options.qos );
grantedTopic.qos = options.qos;
}
if (this.triggerError) {
grantedTopic.qos = 128;
}
granted.push(grantedTopic);
}
if(callback !== '') {
callback(null); // call callback
callback(null, granted); // call callback
}
};

Expand Down
43 changes: 35 additions & 8 deletions test/thing-unit-tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ describe( "thing shadow class unit tests", function() {
});
});

describe( "register a thing shadow name should trigger callback", function() {
describe( "register a thing shadow name", function() {
//
// Verify that the thing shadow invokes the register callback when subscription to all
// topics are finished. The callback is invoked based on the callback from the mqtt library.
Expand All @@ -83,31 +83,58 @@ describe( "thing shadow class unit tests", function() {
region: 'us-east-1'
};

it("when ignoreDeltas is true and persistentSubscribe is true", function() {
it("should trigger error when a subscription fails", function () {

// Need local stub to trigger error, so just copy-paste the code
var localMockMqtt;
var fakeConnect = function (options) {
mockMQTTClientObject = new mockMQTTClient(); // return the mocking object
mockMQTTClientObject.reInitCommandCalled();
mockMQTTClientObject.resetPublishedMessage();
mockMQTTClientObject.triggerError = true;
return mockMQTTClientObject;
};

mqttSave.restore();
mqttSave = sinon.stub(device, 'DeviceClient', fakeConnect);

var thingShadows = thingShadow(thingShadowsConfig);
thingShadows.register('testShadow1', { ignoreDeltas: true, persistentSubscribe: true }, function (err, granted) {
assert.notEqual(err, null);
for (var k = 0, grantedLen = granted.length; k < grantedLen; k++) {
//
// 128 is 0x80 - Failure from the MQTT lib.
//
assert.equal(granted[k].qos, 128);
}
});
});

it("should trigger callback when ignoreDeltas is true and persistentSubscribe is true", function() {
var thingShadows = thingShadow( thingShadowsConfig );
var fakeCallback = sinon.spy();
thingShadows.register( 'testShadow1', {ignoreDeltas:true, persistentSubscribe:true}, fakeCallback);

assert(fakeCallback.calledOnce);
});

it("when ignoreDeltas is false and persistentSubscribe is false", function() {
it("should trigger callback when ignoreDeltas is false and persistentSubscribe is false", function() {
var thingShadows = thingShadow( thingShadowsConfig );
var fakeCallback = sinon.spy();
thingShadows.register( 'testShadow1', {ignoreDeltas:false, persistentSubscribe:false}, fakeCallback);

assert(fakeCallback.calledOnce);
});

it("when ignoreDeltas is true and persistentSubscribe is false", function() {
it("should trigger callback when ignoreDeltas is true and persistentSubscribe is false", function() {
var thingShadows = thingShadow( thingShadowsConfig );
var fakeCallback = sinon.spy();
thingShadows.register( 'testShadow1', {ignoreDeltas:true, persistentSubscribe:false}, fakeCallback);

assert(fakeCallback.calledOnce);
});

it("when ignoreDeltas is false and persistentSubscribe is true", function() {
it("should trigger callback when ignoreDeltas is false and persistentSubscribe is true", function() {
var thingShadows = thingShadow( thingShadowsConfig );
var fakeCallback = sinon.spy();
thingShadows.register( 'testShadow1', {ignoreDeltas:false, persistentSubscribe:true}, fakeCallback);
Expand Down Expand Up @@ -258,10 +285,10 @@ describe( "thing shadow class unit tests", function() {
} );
// Register a thing, using default delta settings
thingShadows.register('testShadow1');
assert.equal(mockMQTTClientObject.commandCalled['subscribe'], 2); // Called twice, one for delta, one for GUD
assert.equal(mockMQTTClientObject.commandCalled['subscribe'], 1); // Called twice, one for delta, one for GUD
mockMQTTClientObject.reInitCommandCalled();
thingShadows.unregister('testShadow1');
assert.equal(mockMQTTClientObject.commandCalled['unsubscribe'], 2);
assert.equal(mockMQTTClientObject.commandCalled['unsubscribe'], 1);
});
});

Expand Down Expand Up @@ -291,7 +318,7 @@ describe( "thing shadow class unit tests", function() {
assert.equal(mockMQTTClientObject.commandCalled['subscribe'], 1); // Called once, for GUD
mockMQTTClientObject.reInitCommandCalled();
thingShadows.unregister('testShadow1');
assert.equal(mockMQTTClientObject.commandCalled['unsubscribe'], 2); // Called twice, unsub from ALL
assert.equal(mockMQTTClientObject.commandCalled['unsubscribe'], 1); // Called twice, unsub from ALL
});
});

Expand Down
137 changes: 84 additions & 53 deletions thing/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,18 +131,19 @@ function ThingShadowsClient(deviceOptions, thingShadowOptions) {
//
// Private function to subscribe and unsubscribe from topics.
//
this._handleSubscriptions = function(thingName, operations,
statii, devFunction, callback) {
this._handleSubscriptions = function(thingName, topicObjects, devFunction, callback) {
var topics = [];

//
// Build an array of topic names.
//
for (var i = 0, k = 0, opsLen = operations.length; i < opsLen; i++) {
for (var j = 0, statLen = statii.length; j < statLen; j++) {
topics[k++] = buildThingShadowTopic(thingName,
operations[i],
statii[j]);
for (var i = 0, topicsLen = topicObjects.length; i < topicsLen; i++) {
for (var j = 0, opsLen = topicObjects[i].operations.length; j < opsLen; j++) {
for (var k = 0, statLen = topicObjects[i].statii.length; k < statLen; k++) {
topics.push(buildThingShadowTopic(thingName,
topicObjects[i].operations[j],
topicObjects[i].statii[k]));
}
}
}

Expand All @@ -152,15 +153,40 @@ function ThingShadowsClient(deviceOptions, thingShadowOptions) {
//
// Subscribe/unsubscribe from the topics and perform callback when complete.
//
if (!isUndefined(callback)) {
device[devFunction](topics, {
qos: thingShadows[thingName].qos
}, callback);
} else {
device[devFunction](topics, {
qos: thingShadows[thingName].qos
});
}
device[devFunction](topics, {
qos: thingShadows[thingName].qos
}, function (err, granted) {
thingShadows[thingName].pending = false;

if (!isUndefined(callback)) {
if (err) {
callback(err, granted);
return;
}
//
// Check to see if we got all topic subscriptions granted.
//
var hasError = false;
for (var k = 0, grantedLen = granted.length; k < grantedLen; k++) {
//
// 128 is 0x80 - Failure from the MQTT lib.
//
if(granted[k].qos === 128) {
hasError = true;
}
}

if (hasError) {
callback(new Error('Some topics was not granted. [' + topics + '] != [' + granted + ']'), granted);
return;
}

//
// Forward granted array if the client want to inspect it
//
callback(null, granted);
}
});
};

//
Expand Down Expand Up @@ -264,8 +290,10 @@ function ThingShadowsClient(deviceOptions, thingShadowOptions) {
// persistently subscribed to this thing shadow.
//
if (thingShadows[thingName].persistentSubscribe === false) {
this._handleSubscriptions(thingName, [operation], ['accepted', 'rejected'],
'unsubscribe');
this._handleSubscriptions(thingName, {
operations: [operation],
statii: ['accepted', 'rejected'],
}, 'unsubscribe');
}

//
Expand Down Expand Up @@ -369,8 +397,10 @@ function ThingShadowsClient(deviceOptions, thingShadowOptions) {
// we are persistently subscribing to this thing shadow.
//
if (thingShadows[thingName].persistentSubscribe === false) {
that._handleSubscriptions(thingName, [operation], ['accepted', 'rejected'],
'unsubscribe');
that._handleSubscriptions(thingName, {
operations: [operation],
statii: ['accepted', 'rejected'],
}, 'unsubscribe');
}
//
// Mark this operation as complete.
Expand All @@ -394,8 +424,10 @@ function ThingShadowsClient(deviceOptions, thingShadowOptions) {
// since we are already subscribed to all applicable sub-topics.
//
if (thingShadows[thingName].persistentSubscribe === false) {
this._handleSubscriptions(thingName, [operation], ['accepted', 'rejected'],
'subscribe',
this._handleSubscriptions(thingName, {
operations: [operation],
statii: ['accepted', 'rejected'],
},'subscribe',
function(err) {
//
// If 'stateObject' is defined, publish it to the publish topic for this
Expand Down Expand Up @@ -470,26 +502,23 @@ function ThingShadowsClient(deviceOptions, thingShadowOptions) {
// property will be added after the first accepted update from AWS IoT.
//
var ignoreDeltas = false;
var topicsObject = [];
thingShadows[thingName] = {
timeouts: {},
persistentSubscribe: true,
debug: false,
discardStale: true,
enableVersioning: true,
qos: 0,
pending: false,
pendingDeltaSubscribe: true,
pendingPersistentSubscribe: true
pending: true
};

if (!isUndefined(options)) {
if (!isUndefined(options.ignoreDeltas)) {
ignoreDeltas = options.ignoreDeltas;
thingShadows[thingName].pendingDeltaSubscribe = !ignoreDeltas;
}
if (!isUndefined(options.persistentSubscribe)) {
thingShadows[thingName].persistentSubscribe = options.persistentSubscribe;
thingShadows[thingName].pendingPersistentSubscribe = options.persistentSubscribe;
}
if (!isUndefined(options.debug)) {
thingShadows[thingName].debug = options.debug;
Expand All @@ -508,15 +537,10 @@ function ThingShadowsClient(deviceOptions, thingShadowOptions) {
// Always listen for deltas unless requested otherwise.
//
if (ignoreDeltas === false) {
this._handleSubscriptions(thingName, ['update'], ['delta'],
'subscribe', function () {
thingShadows[thingName].pendingDeltaSubscribe = false;
if (!thingShadows[thingName].pendingDeltaSubscribe && !thingShadows[thingName].pendingPersistentSubscribe) {
if (!isUndefined(callback)) {
callback();
}
}
});
topicsObject.push({
operations: ['update'],
statii: ['delta'],
});
}
//
// If we are persistently subscribing, we subscribe to everything we could ever
Expand All @@ -525,22 +549,20 @@ function ThingShadowsClient(deviceOptions, thingShadowOptions) {
// which the application will need to filter out.
//
if (thingShadows[thingName].persistentSubscribe === true) {
this._handleSubscriptions(thingName, ['update', 'get', 'delete'], ['accepted', 'rejected'],
'subscribe', function () {
thingShadows[thingName].pendingPersistentSubscribe = false;
if (!thingShadows[thingName].pendingDeltaSubscribe && !thingShadows[thingName].pendingPersistentSubscribe) {
if (!isUndefined(callback)) {
callback();
}
}
});
topicsObject.push({
operations: ['update', 'get', 'delete'],
statii: ['accepted', 'rejected'],
});
}

if (ignoreDeltas === true && thingShadows[thingName].persistentSubscribe === false) {
if (!isUndefined(callback)) {
callback();
}
if (topicsObject.length > 0) {
this._handleSubscriptions(thingName, topicsObject, 'subscribe', callback);
} else {
if (!isUndefined(callback)) {
callback(null);
}
}

} else {
if (deviceOptions.debug === true) {
console.error('thing already registered: ', thingName);
Expand All @@ -550,6 +572,8 @@ function ThingShadowsClient(deviceOptions, thingShadowOptions) {

this.unregister = function(thingName) {
if (thingShadows.hasOwnProperty(thingName)) {
var topicsObject = [];

//
// If an operation is outstanding, it will have a timeout set; when it
// expires any accept/reject sub-topic subscriptions for the thing will be
Expand All @@ -558,17 +582,24 @@ function ThingShadowsClient(deviceOptions, thingShadowOptions) {
// The only sub-topic we need to unsubscribe from is the delta sub-topic,
// which is always active.
//
this._handleSubscriptions(thingName, ['update'], ['delta'],
'unsubscribe');
topicsObject.push({
operations: ['update'],
statii: ['delta'],
});

//
// If we are persistently subscribing, we subscribe to everything we could ever
// possibly be interested in; this means that when it's time to unregister
// interest in a thing, we need to unsubscribe from all of these topics.
//
if (thingShadows[thingName].persistentSubscribe === true) {
this._handleSubscriptions(thingName, ['update', 'get', 'delete'], ['accepted', 'rejected'],
'unsubscribe');
topicsObject.push({
operations: ['update', 'get', 'delete'],
statii: ['accepted', 'rejected'],
});
}

this._handleSubscriptions(thingName, topicsObject, 'unsubscribe');
//
// Delete any pending timeouts
//
Expand Down

0 comments on commit 0d18560

Please sign in to comment.