diff --git a/system/src/ledger/ledger_manager.cpp b/system/src/ledger/ledger_manager.cpp index d21bb15638..ce259901b8 100644 --- a/system/src/ledger/ledger_manager.cpp +++ b/system/src/ledger/ledger_manager.cpp @@ -70,6 +70,9 @@ const unsigned MAX_LEDGER_COUNT = 20; const unsigned MIN_SYNC_DELAY = 5000; const unsigned MAX_SYNC_DELAY = 30000; +const unsigned MIN_RETRY_DELAY = 30000; +const unsigned MAX_RETRY_DELAY = 5 * 60000; + const auto REQUEST_URI = "L"; const auto REQUEST_METHOD = COAP_METHOD_POST; @@ -221,10 +224,13 @@ LedgerManager::LedgerManager() : curCtx_(nullptr), msg_(nullptr), nextSyncTime_(0), + retryTime_(0), + retryDelay_(0), bytesInBuf_(0), state_(State::NEW), pendingState_(0), - reqId_(COAP_INVALID_REQUEST_ID) { + reqId_(COAP_INVALID_REQUEST_ID), + resubscribe_(false) { } LedgerManager::~LedgerManager() { @@ -235,7 +241,7 @@ LedgerManager::~LedgerManager() { } int LedgerManager::init() { - if (state_ != State::NEW || spark_cloud_flag_connected()) { // Must be disconnected from the cloud + if (state_ != State::NEW || spark_cloud_flag_connected()) { // Device must not be connected to the cloud return SYSTEM_ERROR_INVALID_STATE; } // TODO: Allow seeking in ledger and CoAP message streams so that an intermediate buffer is not @@ -452,6 +458,14 @@ int LedgerManager::removeAllData() { void LedgerManager::run() { std::lock_guard lock(mutex_); + if (state_ == State::FAILED) { + auto now = hal_timer_millis(nullptr); + if (now >= retryTime_) { + LOG(INFO, "Retrying synchronization"); + startSync(); + } + return; + } int r = processTasks(); if (r < 0) { LOG(ERROR, "Failed to process ledger task: %d", r); @@ -469,7 +483,7 @@ int LedgerManager::processTasks() { CHECK(sendGetInfoRequest()); return 0; } - if (pendingState_ & PendingState::SUBSCRIBE) { + if ((pendingState_ & PendingState::SUBSCRIBE) || resubscribe_) { LOG(INFO, "Subscribing to ledger updates"); CHECK(sendSubscribeRequest()); return 0; @@ -521,29 +535,7 @@ int LedgerManager::notifyConnected() { return SYSTEM_ERROR_INVALID_STATE; } LOG(TRACE, "Connected"); - for (auto& ctx: contexts_) { - switch (ctx->syncDir) { - case LEDGER_SYNC_DIRECTION_DEVICE_TO_CLOUD: { - if (ctx->syncPending) { - setPendingState(ctx.get(), PendingState::SYNC_TO_CLOUD); - updateSyncTime(ctx.get()); - } - break; - } - case LEDGER_SYNC_DIRECTION_CLOUD_TO_DEVICE: { - // TODO: Cache ledger subscriptions in the session data - setPendingState(ctx.get(), PendingState::SUBSCRIBE); - break; - } - case LEDGER_SYNC_DIRECTION_UNKNOWN: { - setPendingState(ctx.get(), PendingState::GET_INFO); - break; - } - default: - break; - } - } - state_ = State::READY; + startSync(); return 0; } @@ -557,6 +549,7 @@ void LedgerManager::notifyDisconnected(int /* error */) { } LOG(TRACE, "Disconnected"); reset(); + retryDelay_ = 0; state_ = State::OFFLINE; } @@ -877,6 +870,7 @@ int LedgerManager::receiveSubscribeResponse(coap_message* msg, int result) { return SYSTEM_ERROR_LEDGER_INVALID_RESPONSE; } } + resubscribe_ = false; state_ = State::READY; return 0; } @@ -1013,7 +1007,7 @@ int LedgerManager::receiveGetInfoResponse(coap_message* msg, int result) { CHECK(ledger->updateInfo(info)); if (ctx->syncDir == LEDGER_SYNC_DIRECTION_CLOUD_TO_DEVICE) { ctx->resetDeviceToCloudState(); - // TODO: Unsubscribe from deleted ledgers + d.resubscribe = true; } ctx->updateFromLedgerInfo(info); } @@ -1023,6 +1017,10 @@ int LedgerManager::receiveGetInfoResponse(coap_message* msg, int result) { setPendingState(ctx.get(), PendingState::SUBSCRIBE); } } + if (d.resubscribe) { + // Make sure to clear the subscriptions on the server if no ledgers left to subscribe to + resubscribe_ = true; + } if (d.localInfoIsInvalid) { return SYSTEM_ERROR_LEDGER_INCONSISTENT_STATE; } @@ -1353,13 +1351,30 @@ void LedgerManager::updateSyncTime(LedgerSyncContext* ctx) { } } -void LedgerManager::handleError(int error) { - if (error < 0 && state_ >= State::READY) { - LOG(ERROR, "Ledger error: %d", error); - reset(); - state_ = State::FAILED; - // TODO: Try recovering after a delay rather than next time the device connects to the cloud +void LedgerManager::startSync() { + assert(state_ == State::OFFLINE || state_ == State::FAILED); + for (auto& ctx: contexts_) { + switch (ctx->syncDir) { + case LEDGER_SYNC_DIRECTION_DEVICE_TO_CLOUD: { + if (ctx->syncPending) { + setPendingState(ctx.get(), PendingState::SYNC_TO_CLOUD); + updateSyncTime(ctx.get()); + } + break; + } + case LEDGER_SYNC_DIRECTION_CLOUD_TO_DEVICE: { + setPendingState(ctx.get(), PendingState::SUBSCRIBE); + break; + } + case LEDGER_SYNC_DIRECTION_UNKNOWN: { + setPendingState(ctx.get(), PendingState::GET_INFO); + break; + } + default: + break; + } } + state_ = State::READY; } void LedgerManager::reset() { @@ -1393,6 +1408,16 @@ void LedgerManager::reset() { curCtx_ = nullptr; } +void LedgerManager::handleError(int error) { + if (error < 0 && state_ >= State::READY) { + retryDelay_ = std::clamp(retryDelay_ * 2, MIN_RETRY_DELAY, MAX_RETRY_DELAY); + LOG(ERROR, "Synchronization failed: %d; retrying in %us", error, retryDelay_ / 1000); + reset(); + retryTime_ = hal_timer_millis(nullptr) + retryDelay_; + state_ = State::FAILED; + } +} + LedgerManager::LedgerSyncContexts::ConstIterator LedgerManager::findContext(const char* name, bool& found) const { found = false; auto it = std::lower_bound(contexts_.begin(), contexts_.end(), name, [&found](const auto& ctx, const char* name) { diff --git a/system/src/ledger/ledger_manager.h b/system/src/ledger/ledger_manager.h index c23acd8805..3586668986 100644 --- a/system/src/ledger/ledger_manager.h +++ b/system/src/ledger/ledger_manager.h @@ -76,8 +76,8 @@ class LedgerManager { private: enum class State { NEW, // Manager is not initialized - FAILED, // Synchronization failed OFFLINE, // Device is offline + FAILED, // Synchronization failed READY, // Ready to run a task SYNC_TO_CLOUD, // Synchronizing a device-to-cloud ledger SYNC_FROM_CLOUD, // Synchronizing a cloud-to-device ledger @@ -99,11 +99,14 @@ class LedgerManager { std::unique_ptr buf_; // Intermediate buffer used for piping ledger data LedgerSyncContext* curCtx_; // Context of the ledger being synchronized coap_message* msg_; // CoAP request or response that is being sent or received - uint64_t nextSyncTime_; // Nearest time a device-to-cloud ledger needs to be synchronized + uint64_t nextSyncTime_; // Time when the next device-to-cloud ledger needs to be synchronized (ticks) + uint64_t retryTime_; // Time when synchronization can be retried (ticks) + unsigned retryDelay_; // Delay before retrying synchronization size_t bytesInBuf_; // Number of bytes stored in the intermediate buffer - State state_; // Current state - int pendingState_; // Pending state flags + State state_; // Current manager state + int pendingState_; // Pending ledger state flags int reqId_; // ID of the ongoing CoAP request + bool resubscribe_; // Whether ledger subcriptions need to be updated on the server mutable StaticRecursiveMutex mutex_; // Manager lock @@ -138,10 +141,11 @@ class LedgerManager { void clearPendingState(LedgerSyncContext* ctx, int state); void updateSyncTime(LedgerSyncContext* ctx); - void handleError(int error); - + void startSync(); void reset(); + void handleError(int error); + LedgerSyncContexts::ConstIterator findContext(const char* name, bool& found) const; static int connectionCallback(int error, int status, void* arg);