Skip to content

Commit

Permalink
Merge pull request #2293 from particle-iot/unsubscribe_system/ch74478
Browse files Browse the repository at this point in the history
Particle.unsubscribe() doesn't preserve system subscriptions
  • Loading branch information
sergeuz authored Apr 1, 2021
2 parents aedf623 + 955289b commit a10eefe
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 93 deletions.
4 changes: 3 additions & 1 deletion system/src/system_cloud.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ extern void (*random_seed_from_cloud_handler)(unsigned int);
namespace
{

using namespace particle;
using namespace particle::system;

#if PLATFORM_THREADING
Expand Down Expand Up @@ -124,7 +125,8 @@ bool spark_subscribe(const char *eventName, EventHandler handler, void* handler_
void spark_unsubscribe(void *reserved)
{
SYSTEM_THREAD_CONTEXT_ASYNC(spark_unsubscribe(reserved));
spark_protocol_remove_event_handlers(sp, NULL);
spark_protocol_remove_event_handlers(sp, NULL); // Clear all subscriptions
registerSystemSubscriptions(); // Re-add system subscriptions
// TODO: Notify the cloud that subscriptions have been cleared
}

Expand Down
192 changes: 100 additions & 92 deletions system/src/system_cloud_internal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,96 @@
#include <stdio.h>
#include <stdint.h>

using namespace particle;
using namespace particle::system;

namespace particle {

namespace system {
namespace {

constexpr const char CLAIM_EVENTS[] = "spark/device/claim/";
constexpr const char RESET_EVENT[] = "spark/device/reset";
constexpr const char KEY_RESTORE_EVENT[] = "spark/device/key/restore";
constexpr const char DEVICE_UPDATES_EVENT[] = "particle/device/updates/";
constexpr const char FORCED_EVENT[] = "forced";
constexpr const char UPDATES_PENDING_EVENT[] = "pending";

inline bool isSuffix(const char* eventName, const char* prefix, const char* suffix) {
// todo - sanity check parameters?
return !strncmp(eventName+strlen(prefix), suffix, strlen(eventName)-strlen(prefix));
}

inline uint8_t dataToFlag(const char* data) {
return !strncmp(data, "true", strlen(data));
}

/**
* Handler for system cloud events.
*/
void systemEventHandler(const char* name, const char* data)
{
if (particle::startsWith(name, DEVICE_UPDATES_EVENT)) {
const uint8_t flagValue = dataToFlag(data);
if (isSuffix(name, DEVICE_UPDATES_EVENT, FORCED_EVENT)) {
system_set_flag(SYSTEM_FLAG_OTA_UPDATE_FORCED, flagValue, nullptr);
}
else if (isSuffix(name, DEVICE_UPDATES_EVENT, UPDATES_PENDING_EVENT)) {
system_set_flag(SYSTEM_FLAG_OTA_UPDATE_PENDING, flagValue, nullptr);
}
}
else if (!strncmp(name, CLAIM_EVENTS, strlen(CLAIM_EVENTS))) {
LOG(TRACE, "Claim code received by the cloud and cleared locally.");
HAL_Set_Claim_Code(NULL);
}
else if (!strcmp(name, RESET_EVENT)) {
if (data && *data) {
if (!strcmp("safe mode", data)) {
System.enterSafeMode();
} else if (!strcmp("dfu", data)) {
System.dfu(false);
} else if (!strcmp("reboot", data)) {
System.reset();
} else if (!strcmp("network", data)) {
LOG(WARN, "Received a command to reset the network");
// Reset all active network interfaces asynchronously
const auto task = new(std::nothrow) ISRTaskQueue::Task();
if (task) {
task->func = [](ISRTaskQueue::Task* task) {
delete task;
particle::resetNetworkInterfaces();
};
SystemISRTaskQueue.enqueue(task);
}
}
}
}
else if (!strncmp(name, KEY_RESTORE_EVENT, strlen(KEY_RESTORE_EVENT))) {
// Restore PSK to DCT/DCD/FLASH
LOG(INFO,"Restoring Public Server Key and Server Address to flash");
#if HAL_PLATFORM_CLOUD_UDP
bool udp = HAL_Feature_Get(FEATURE_CLOUD_UDP);
#else
bool udp = false;
#endif
unsigned char psk_buf[EXTERNAL_FLASH_SERVER_PUBLIC_KEY_LENGTH]; // 320 (udp) vs 294 (tcp), allocate 320.
unsigned char server_addr_buf[EXTERNAL_FLASH_SERVER_ADDRESS_LENGTH];
memset(&psk_buf, 0xff, sizeof(psk_buf));
memset(&server_addr_buf, 0xff, sizeof(server_addr_buf));
if (udp) {
#if HAL_PLATFORM_CLOUD_UDP
memcpy(&psk_buf, backup_udp_public_server_key, backup_udp_public_server_key_size);
memcpy(&server_addr_buf, backup_udp_public_server_address, backup_udp_public_server_address_size);
#endif // HAL_PLATFORM_CLOUD_UDP
} else {
#if HAL_PLATFORM_CLOUD_TCP
memcpy(&psk_buf, backup_tcp_public_server_key, sizeof(backup_tcp_public_server_key));
memcpy(&server_addr_buf, backup_tcp_public_server_address, sizeof(backup_tcp_public_server_address));
#endif // HAL_PLATFORM_CLOUD_TCP
}
HAL_FLASH_Write_ServerPublicKey(psk_buf, udp);
HAL_FLASH_Write_ServerAddress(server_addr_buf, udp);
}
}

#if HAL_PLATFORM_OTA_PROTOCOL_V3

Expand All @@ -82,7 +169,7 @@ int saveFirmwareChunk(const char* chunkData, size_t chunkSize, size_t chunkOffse

#endif // HAL_PLATFORM_OTA_PROTOCOL_V3

} // namespace system
} // namespace

int sendApplicationDescription() {
LOG(INFO, "Sending application DESCRIBE");
Expand All @@ -95,6 +182,14 @@ int sendApplicationDescription() {
return 0;
}

void registerSystemSubscriptions() {
bool ok = Particle.subscribe("particle", systemEventHandler, MY_DEVICES);
ok = Particle.subscribe("spark", systemEventHandler, MY_DEVICES) && ok;
if (!ok) {
LOG(ERROR, "Particle.subscribe() failed");
}
}

} // namespace particle

using namespace particle::system;
Expand Down Expand Up @@ -342,11 +437,9 @@ void invokeEventHandlerString(uint16_t handlerInfoSize, FilteringEventHandler* h
invokeEventHandlerInternal(handlerInfoSize, handlerInfo, name.c_str(), data.c_str(), reserved);
}

void SystemEvents(const char* name, const char* data);

bool is_system_handler(uint16_t handlerInfoSize, FilteringEventHandler* handlerInfo) {
// for now we hack this to recognize our own system handler
return handlerInfo->handler==SystemEvents;
return handlerInfo->handler == systemEventHandler;
}

void invokeEventHandler(uint16_t handlerInfoSize, FilteringEventHandler* handlerInfo,
Expand Down Expand Up @@ -401,90 +494,6 @@ SparkReturnType::Enum wrapVarTypeInEnum(const char *varKey)
}
}

constexpr const char CLAIM_EVENTS[] = "spark/device/claim/";
constexpr const char RESET_EVENT[] = "spark/device/reset";
constexpr const char KEY_RESTORE_EVENT[] = "spark/device/key/restore";
constexpr const char DEVICE_UPDATES_EVENT[] = "particle/device/updates/";
constexpr const char FORCED_EVENT[] = "forced";
constexpr const char UPDATES_PENDING_EVENT[] = "pending";

inline bool is_suffix(const char* eventName, const char* prefix, const char* suffix) {
// todo - sanity check parameters?
return !strncmp(eventName+strlen(prefix), suffix, strlen(eventName)-strlen(prefix));
}

uint8_t data_to_flag(const char* data) {
return !strncmp(data, "true", strlen(data));
}

/**
* Handler for system events.
*/
void SystemEvents(const char* name, const char* data)
{
if (particle::startsWith(name, DEVICE_UPDATES_EVENT)) {
const uint8_t flagValue = data_to_flag(data);
if (is_suffix(name, DEVICE_UPDATES_EVENT, FORCED_EVENT)) {
system_set_flag(SYSTEM_FLAG_OTA_UPDATE_FORCED, flagValue, nullptr);
}
else if (is_suffix(name, DEVICE_UPDATES_EVENT, UPDATES_PENDING_EVENT)) {
system_set_flag(SYSTEM_FLAG_OTA_UPDATE_PENDING, flagValue, nullptr);
}
}
else if (!strncmp(name, CLAIM_EVENTS, strlen(CLAIM_EVENTS))) {
LOG(TRACE, "Claim code received by the cloud and cleared locally.");
HAL_Set_Claim_Code(NULL);
}
else if (!strcmp(name, RESET_EVENT)) {
if (data && *data) {
if (!strcmp("safe mode", data)) {
System.enterSafeMode();
} else if (!strcmp("dfu", data)) {
System.dfu(false);
} else if (!strcmp("reboot", data)) {
System.reset();
} else if (!strcmp("network", data)) {
LOG(WARN, "Received a command to reset the network");
// Reset all active network interfaces asynchronously
const auto task = new(std::nothrow) ISRTaskQueue::Task();
if (task) {
task->func = [](ISRTaskQueue::Task* task) {
delete task;
particle::resetNetworkInterfaces();
};
SystemISRTaskQueue.enqueue(task);
}
}
}
}
else if (!strncmp(name, KEY_RESTORE_EVENT, strlen(KEY_RESTORE_EVENT))) {
// Restore PSK to DCT/DCD/FLASH
LOG(INFO,"Restoring Public Server Key and Server Address to flash");
#if HAL_PLATFORM_CLOUD_UDP
bool udp = HAL_Feature_Get(FEATURE_CLOUD_UDP);
#else
bool udp = false;
#endif
unsigned char psk_buf[EXTERNAL_FLASH_SERVER_PUBLIC_KEY_LENGTH]; // 320 (udp) vs 294 (tcp), allocate 320.
unsigned char server_addr_buf[EXTERNAL_FLASH_SERVER_ADDRESS_LENGTH];
memset(&psk_buf, 0xff, sizeof(psk_buf));
memset(&server_addr_buf, 0xff, sizeof(server_addr_buf));
if (udp) {
#if HAL_PLATFORM_CLOUD_UDP
memcpy(&psk_buf, backup_udp_public_server_key, backup_udp_public_server_key_size);
memcpy(&server_addr_buf, backup_udp_public_server_address, backup_udp_public_server_address_size);
#endif // HAL_PLATFORM_CLOUD_UDP
} else {
#if HAL_PLATFORM_CLOUD_TCP
memcpy(&psk_buf, backup_tcp_public_server_key, sizeof(backup_tcp_public_server_key));
memcpy(&server_addr_buf, backup_tcp_public_server_address, sizeof(backup_tcp_public_server_address));
#endif // HAL_PLATFORM_CLOUD_TCP
}
HAL_FLASH_Write_ServerPublicKey(psk_buf, udp);
HAL_FLASH_Write_ServerAddress(server_addr_buf, udp);
}
}

#if HAL_PLATFORM_CLOUD_UDP

using particle::protocol::SessionPersistOpaque;
Expand Down Expand Up @@ -1078,13 +1087,12 @@ void Spark_Protocol_Init(void)
spark_protocol_set_connection_property(sp, particle::protocol::Connection::OTA_CHUNK_SIZE, HAL_OTA_ChunkSize(),
nullptr, nullptr);

Particle.subscribe("spark", SystemEvents, MY_DEVICES);
Particle.subscribe("particle", SystemEvents, MY_DEVICES);

CommunicationsHandlers handlers;
handlers.size = sizeof(handlers);
handlers.random_seed_from_cloud = random_seed_from_cloud_handler;
spark_protocol_communications_handlers(sp, &handlers);

registerSystemSubscriptions();
}
}

Expand Down
3 changes: 3 additions & 0 deletions system/src/system_cloud_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ inline bool publishEvent(const char* event, const char* data = nullptr, unsigned
// Sends application DESCRIBE and subscriptions
int sendApplicationDescription();

// Subscribes to system cloud events
void registerSystemSubscriptions();

} // namespace particle

#ifdef __cplusplus
Expand Down

0 comments on commit a10eefe

Please sign in to comment.