Skip to content

Commit

Permalink
Merge pull request #36 from ilcato/callback-synchronization
Browse files Browse the repository at this point in the history
Implementation of the Shadow Thing mechanism on the device side
  • Loading branch information
ubidefeo authored Mar 19, 2019
2 parents dfc1877 + 5a2cf5c commit 25be97a
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 12 deletions.
92 changes: 88 additions & 4 deletions src/ArduinoIoTCloud.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
#include "CloudSerial.h"
#include "ArduinoIoTCloud.h"

#ifdef ARDUINO_ARCH_SAMD
#include <RTCZero.h>
RTCZero rtc;
#endif

const static int keySlot = 0;
const static int compressedCertSlot = 10;
const static int serialNumberAndAuthorityKeyIdentifierSlot = 11;
Expand All @@ -37,11 +42,21 @@ static unsigned long getTime() {
return time;
}

static unsigned long getTimestamp() {
#ifdef ARDUINO_ARCH_SAMD
return rtc.getEpoch();
#else
#warning "No RTC available on this architecture - ArduinoIoTCloud will not keep track of local change timestamps ."
return 0;
#endif
}

ArduinoIoTCloudClass::ArduinoIoTCloudClass() :
_thing_id (""),
_bearSslClient(NULL),
_mqttClient (NULL),
connection (NULL)
connection (NULL),
_lastSyncRequestTickTime(0)
{
}

Expand All @@ -64,6 +79,9 @@ int ArduinoIoTCloudClass::begin(ConnectionManager *c, String brokerAddress, uint
Client &connectionClient = c->getClient();
_brokerAddress = brokerAddress;
_brokerPort = brokerPort;
#ifdef ARDUINO_ARCH_SAMD
rtc.begin();
#endif
return begin(connectionClient, _brokerAddress, _brokerPort);
}

Expand Down Expand Up @@ -148,6 +166,8 @@ void ArduinoIoTCloudClass::mqttClientBegin()
else {
_dataTopicIn = "/a/t/" + _thing_id + "/e/i";
_dataTopicOut = "/a/t/" + _thing_id + "/e/o";
_shadowTopicIn = "/a/t/" + _thing_id + "/shadow/i";
_shadowTopicOut = "/a/t/" + _thing_id + "/shadow/o";
}

// use onMessage as callback for received mqtt messages
Expand All @@ -166,6 +186,10 @@ int ArduinoIoTCloudClass::connect()
}
_mqttClient->subscribe(_stdinTopic);
_mqttClient->subscribe(_dataTopicIn);
_mqttClient->subscribe(_shadowTopicIn);

_syncStatus = ArduinoIoTSynchronizationStatus::SYNC_STATUS_WAIT_FOR_CLOUD_VALUES;
_lastSyncRequestTickTime = 0;

return 1;
}
Expand All @@ -182,14 +206,18 @@ void ArduinoIoTCloudClass::poll()
update();
}

void ArduinoIoTCloudClass::update()
void ArduinoIoTCloudClass::update(CallbackFunc onSyncCompleteCallback)
{
// If user call update() without parameters use the default ones
update(MAX_RETRIES, RECONNECTION_TIMEOUT);
update(MAX_RETRIES, RECONNECTION_TIMEOUT, onSyncCompleteCallback);
}

void ArduinoIoTCloudClass::update(int const reconnectionMaxRetries, int const reconnectionTimeoutMs)
void ArduinoIoTCloudClass::update(int const reconnectionMaxRetries, int const reconnectionTimeoutMs, CallbackFunc onSyncCompleteCallback)
{
unsigned long const timestamp = getTimestamp();
//check if a property is changed
if(timestamp != 0) Thing.updateTimestampOnChangedProperties(timestamp);

connectionCheck();
if(iotStatus != IOT_STATUS_CLOUD_CONNECTED){
return;
Expand All @@ -198,6 +226,26 @@ void ArduinoIoTCloudClass::update(int const reconnectionMaxRetries, int const re
// MTTQClient connected!, poll() used to retrieve data from MQTT broker
_mqttClient->poll();

switch (_syncStatus) {
case ArduinoIoTSynchronizationStatus::SYNC_STATUS_SYNCHRONIZED:
sendPropertiesToCloud();
break;
case ArduinoIoTSynchronizationStatus::SYNC_STATUS_WAIT_FOR_CLOUD_VALUES:
if (millis() - _lastSyncRequestTickTime > TIMEOUT_FOR_LASTVALUES_SYNC) {
requestLastValue();
_lastSyncRequestTickTime = millis();
}
break;
case ArduinoIoTSynchronizationStatus::SYNC_STATUS_VALUES_PROCESSED:
if(onSyncCompleteCallback != NULL)
(*onSyncCompleteCallback)();
_syncStatus = ArduinoIoTSynchronizationStatus::SYNC_STATUS_SYNCHRONIZED;
break;
}
}

void ArduinoIoTCloudClass::sendPropertiesToCloud()
{
uint8_t data[MQTT_TRANSMIT_BUFFER_SIZE];
int const length = Thing.encode(data, sizeof(data));
if (length > 0) {
Expand Down Expand Up @@ -254,6 +302,23 @@ int ArduinoIoTCloudClass::writeStdout(const byte data[], int length)
return 1;
}

int ArduinoIoTCloudClass::writeShadowOut(const byte data[], int length)
{
if (!_mqttClient->beginMessage(_shadowTopicOut, length, false, 0)) {
return 0;
}

if (!_mqttClient->write(data, length)) {
return 0;
}

if (!_mqttClient->endMessage()) {
return 0;
}

return 1;
}

void ArduinoIoTCloudClass::onMessage(int length)
{
ArduinoCloud.handleMessage(length);
Expand All @@ -276,6 +341,20 @@ void ArduinoIoTCloudClass::handleMessage(int length)
if (_dataTopicIn == topic) {
Thing.decode((uint8_t*)bytes, length);
}
if ((_shadowTopicIn == topic) && _syncStatus == ArduinoIoTSynchronizationStatus::SYNC_STATUS_WAIT_FOR_CLOUD_VALUES) {
Thing.decode((uint8_t*)bytes, length, true);
sendPropertiesToCloud();
_syncStatus = ArduinoIoTSynchronizationStatus::SYNC_STATUS_VALUES_PROCESSED;
}
}

void ArduinoIoTCloudClass::requestLastValue()
{
// Send the getLastValues CBOR message to the cloud
// [{0: "r:m", 3: "getLastValues"}] = 81 A2 00 63 72 3A 6D 03 6D 67 65 74 4C 61 73 74 56 61 6C 75 65 73
// Use http://cbor.me to easily generate CBOR encoding
const uint8_t CBOR_REQUEST_LAST_VALUE_MSG[] = { 0x81, 0xA2, 0x00, 0x63, 0x72, 0x3A, 0x6D, 0x03, 0x6D, 0x67, 0x65, 0x74, 0x4C, 0x61, 0x73, 0x74, 0x56, 0x61, 0x6C, 0x75, 0x65, 0x73 };
writeShadowOut(CBOR_REQUEST_LAST_VALUE_MSG, sizeof(CBOR_REQUEST_LAST_VALUE_MSG));
}

void ArduinoIoTCloudClass::connectionCheck()
Expand Down Expand Up @@ -332,6 +411,11 @@ void ArduinoIoTCloudClass::connectionCheck()
CloudSerial.begin(9600);
CloudSerial.println("Hello from Cloud Serial!");
}
#ifdef ARDUINO_ARCH_SAMD
unsigned long const epoch = getTime();
if (epoch!=0)
rtc.setEpoch(epoch);
#endif
break;
}
}
Expand Down
35 changes: 27 additions & 8 deletions src/ArduinoIoTCloud.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ typedef struct {
int timeout;
} mqttConnectionOptions;

typedef void (*CallbackFunc)(void);

extern ConnectionManager *ArduinoIoTPreferredConnection;

enum ArduinoIoTConnectionStatus {
Expand All @@ -51,6 +53,12 @@ enum ArduinoIoTConnectionStatus {
IOT_STATUS_CLOUD_ERROR,
};

enum class ArduinoIoTSynchronizationStatus {
SYNC_STATUS_SYNCHRONIZED,
SYNC_STATUS_WAIT_FOR_CLOUD_VALUES,
SYNC_STATUS_VALUES_PROCESSED
};

class ArduinoIoTCloudClass {

public:
Expand All @@ -63,18 +71,18 @@ class ArduinoIoTCloudClass {
static const int MQTT_TRANSMIT_BUFFER_SIZE = 256;
static const int MAX_RETRIES = 5;
static const int RECONNECTION_TIMEOUT = 2000;

static const int TIMEOUT_FOR_LASTVALUES_SYNC = 10000;

void onGetTime(unsigned long(*callback)(void));

int connect ();
bool disconnect();

void poll() __attribute__((deprecated)); /* Attention: Function is deprecated - use 'update' instead */
void update();
void update(CallbackFunc onSyncCompleteCallback = NULL);

// defined for users who want to specify max reconnections reties and timeout between them
void update(int const reconnectionMaxRetries, int const reconnectionTimeoutMs);
void update(int const reconnectionMaxRetries, int const reconnectionTimeoutMs, CallbackFunc onSyncCompleteCallback = NULL);

int connected();
// Clean up existing Mqtt connection, create a new one and initialize it
Expand All @@ -92,17 +100,16 @@ class ArduinoIoTCloudClass {


template<typename T, typename N=T>
void addPropertyReal(T & property, String name, permissionType permission_type = READWRITE, long seconds = ON_CHANGE, void(*fn)(void) = NULL, N minDelta = N(0)) {
void addPropertyReal(T & property, String name, permissionType permission_type = READWRITE, long seconds = ON_CHANGE, void(*fn)(void) = NULL, void(*synFn)(ArduinoCloudProperty<T> property) = CLOUD_WINS, N minDelta = N(0)) {
Permission permission = Permission::ReadWrite;
if (permission_type == READ ) permission = Permission::Read;
else if(permission_type == WRITE) permission = Permission::Write;
else permission = Permission::ReadWrite;

if(seconds == ON_CHANGE) {
Thing.addPropertyReal(property, name, permission).publishOnChange((T)minDelta, DEFAULT_MIN_TIME_BETWEEN_UPDATES_MILLIS).onUpdate(fn);
}
else {
Thing.addPropertyReal(property, name, permission).publishEvery(seconds).onUpdate(fn);
Thing.addPropertyReal(property, name, permission).publishOnChange((T)minDelta, DEFAULT_MIN_TIME_BETWEEN_UPDATES_MILLIS).onUpdate(fn).onSync(synFn);
} else {
Thing.addPropertyReal(property, name, permission).publishEvery(seconds).onUpdate(fn).onSync(synFn);
}
}

Expand All @@ -120,10 +127,14 @@ class ArduinoIoTCloudClass {
friend class CloudSerialClass;
int writeStdout(const byte data[], int length);
int writeProperties(const byte data[], int length);
int writeShadowOut(const byte data[], int length);

// Used to initialize MQTTClient
void mqttClientBegin();
// Function in charge of perform MQTT reconnection, basing on class parameters(retries,and timeout)
bool mqttReconnect(int const maxRetries, int const timeout);
// Used to retrieve last values from _shadowTopicIn
void requestLastValue();

ArduinoIoTConnectionStatus getIoTStatus() { return iotStatus; }
void setIoTConnectionState(ArduinoIoTConnectionStatus _newState);
Expand All @@ -132,6 +143,10 @@ class ArduinoIoTCloudClass {
ConnectionManager *connection;
static void onMessage(int length);
void handleMessage(int length);
ArduinoIoTSynchronizationStatus _syncStatus = ArduinoIoTSynchronizationStatus::SYNC_STATUS_SYNCHRONIZED;

void sendPropertiesToCloud();


String _id,
_thing_id,
Expand All @@ -140,10 +155,14 @@ class ArduinoIoTCloudClass {
ArduinoCloudThing Thing;
BearSSLClient* _bearSslClient;
MqttClient* _mqttClient;
int _lastSyncRequestTickTime;


// Class attribute to define MTTQ topics 2 for stdIn/out and 2 for data, in order to avoid getting previous pupblished payload
String _stdinTopic;
String _stdoutTopic;
String _shadowTopicOut;
String _shadowTopicIn;
String _dataTopicOut;
String _dataTopicIn;
String _otaTopic;
Expand Down

0 comments on commit 25be97a

Please sign in to comment.