Skip to content

Commit

Permalink
Retry synchronization after a delay
Browse files Browse the repository at this point in the history
  • Loading branch information
sergeuz committed Nov 15, 2023
1 parent e98287c commit 2d47743
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 39 deletions.
91 changes: 58 additions & 33 deletions system/src/ledger/ledger_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ const unsigned MAX_LEDGER_COUNT = 20;
const unsigned MIN_SYNC_DELAY = 5000;
const unsigned MAX_SYNC_DELAY = 30000;

const unsigned MIN_RETRY_DELAY = 30000;
const unsigned MAX_RETRY_DELAY = 5 * 60000;

const auto REQUEST_URI = "L";
const auto REQUEST_METHOD = COAP_METHOD_POST;

Expand Down Expand Up @@ -221,10 +224,13 @@ LedgerManager::LedgerManager() :
curCtx_(nullptr),
msg_(nullptr),
nextSyncTime_(0),
retryTime_(0),
retryDelay_(0),
bytesInBuf_(0),
state_(State::NEW),
pendingState_(0),
reqId_(COAP_INVALID_REQUEST_ID) {
reqId_(COAP_INVALID_REQUEST_ID),
resubscribe_(false) {
}

LedgerManager::~LedgerManager() {
Expand All @@ -235,7 +241,7 @@ LedgerManager::~LedgerManager() {
}

int LedgerManager::init() {
if (state_ != State::NEW || spark_cloud_flag_connected()) { // Must be disconnected from the cloud
if (state_ != State::NEW || spark_cloud_flag_connected()) { // Device must not be connected to the cloud
return SYSTEM_ERROR_INVALID_STATE;
}
// TODO: Allow seeking in ledger and CoAP message streams so that an intermediate buffer is not
Expand Down Expand Up @@ -452,6 +458,14 @@ int LedgerManager::removeAllData() {

void LedgerManager::run() {
std::lock_guard lock(mutex_);
if (state_ == State::FAILED) {
auto now = hal_timer_millis(nullptr);
if (now >= retryTime_) {
LOG(INFO, "Retrying synchronization");
startSync();
}
return;
}
int r = processTasks();
if (r < 0) {
LOG(ERROR, "Failed to process ledger task: %d", r);
Expand All @@ -469,7 +483,7 @@ int LedgerManager::processTasks() {
CHECK(sendGetInfoRequest());
return 0;
}
if (pendingState_ & PendingState::SUBSCRIBE) {
if ((pendingState_ & PendingState::SUBSCRIBE) || resubscribe_) {
LOG(INFO, "Subscribing to ledger updates");
CHECK(sendSubscribeRequest());
return 0;
Expand Down Expand Up @@ -521,29 +535,7 @@ int LedgerManager::notifyConnected() {
return SYSTEM_ERROR_INVALID_STATE;
}
LOG(TRACE, "Connected");
for (auto& ctx: contexts_) {
switch (ctx->syncDir) {
case LEDGER_SYNC_DIRECTION_DEVICE_TO_CLOUD: {
if (ctx->syncPending) {
setPendingState(ctx.get(), PendingState::SYNC_TO_CLOUD);
updateSyncTime(ctx.get());
}
break;
}
case LEDGER_SYNC_DIRECTION_CLOUD_TO_DEVICE: {
// TODO: Cache ledger subscriptions in the session data
setPendingState(ctx.get(), PendingState::SUBSCRIBE);
break;
}
case LEDGER_SYNC_DIRECTION_UNKNOWN: {
setPendingState(ctx.get(), PendingState::GET_INFO);
break;
}
default:
break;
}
}
state_ = State::READY;
startSync();
return 0;
}

Expand All @@ -557,6 +549,7 @@ void LedgerManager::notifyDisconnected(int /* error */) {
}
LOG(TRACE, "Disconnected");
reset();
retryDelay_ = 0;
state_ = State::OFFLINE;
}

Expand Down Expand Up @@ -877,6 +870,7 @@ int LedgerManager::receiveSubscribeResponse(coap_message* msg, int result) {
return SYSTEM_ERROR_LEDGER_INVALID_RESPONSE;
}
}
resubscribe_ = false;
state_ = State::READY;
return 0;
}
Expand Down Expand Up @@ -1013,7 +1007,7 @@ int LedgerManager::receiveGetInfoResponse(coap_message* msg, int result) {
CHECK(ledger->updateInfo(info));
if (ctx->syncDir == LEDGER_SYNC_DIRECTION_CLOUD_TO_DEVICE) {
ctx->resetDeviceToCloudState();
// TODO: Unsubscribe from deleted ledgers
d.resubscribe = true;
}
ctx->updateFromLedgerInfo(info);
}
Expand All @@ -1023,6 +1017,10 @@ int LedgerManager::receiveGetInfoResponse(coap_message* msg, int result) {
setPendingState(ctx.get(), PendingState::SUBSCRIBE);
}
}
if (d.resubscribe) {
// Make sure to clear the subscriptions on the server if no ledgers left to subscribe to
resubscribe_ = true;
}
if (d.localInfoIsInvalid) {
return SYSTEM_ERROR_LEDGER_INCONSISTENT_STATE;
}
Expand Down Expand Up @@ -1353,13 +1351,30 @@ void LedgerManager::updateSyncTime(LedgerSyncContext* ctx) {
}
}

void LedgerManager::handleError(int error) {
if (error < 0 && state_ >= State::READY) {
LOG(ERROR, "Ledger error: %d", error);
reset();
state_ = State::FAILED;
// TODO: Try recovering after a delay rather than next time the device connects to the cloud
void LedgerManager::startSync() {
assert(state_ == State::OFFLINE || state_ == State::FAILED);
for (auto& ctx: contexts_) {
switch (ctx->syncDir) {
case LEDGER_SYNC_DIRECTION_DEVICE_TO_CLOUD: {
if (ctx->syncPending) {
setPendingState(ctx.get(), PendingState::SYNC_TO_CLOUD);
updateSyncTime(ctx.get());
}
break;
}
case LEDGER_SYNC_DIRECTION_CLOUD_TO_DEVICE: {
setPendingState(ctx.get(), PendingState::SUBSCRIBE);
break;
}
case LEDGER_SYNC_DIRECTION_UNKNOWN: {
setPendingState(ctx.get(), PendingState::GET_INFO);
break;
}
default:
break;
}
}
state_ = State::READY;
}

void LedgerManager::reset() {
Expand Down Expand Up @@ -1393,6 +1408,16 @@ void LedgerManager::reset() {
curCtx_ = nullptr;
}

void LedgerManager::handleError(int error) {
if (error < 0 && state_ >= State::READY) {
retryDelay_ = std::clamp(retryDelay_ * 2, MIN_RETRY_DELAY, MAX_RETRY_DELAY);
LOG(ERROR, "Synchronization failed: %d; retrying in %us", error, retryDelay_ / 1000);
reset();
retryTime_ = hal_timer_millis(nullptr) + retryDelay_;
state_ = State::FAILED;
}
}

LedgerManager::LedgerSyncContexts::ConstIterator LedgerManager::findContext(const char* name, bool& found) const {
found = false;
auto it = std::lower_bound(contexts_.begin(), contexts_.end(), name, [&found](const auto& ctx, const char* name) {
Expand Down
16 changes: 10 additions & 6 deletions system/src/ledger/ledger_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ class LedgerManager {
private:
enum class State {
NEW, // Manager is not initialized
FAILED, // Synchronization failed
OFFLINE, // Device is offline
FAILED, // Synchronization failed
READY, // Ready to run a task
SYNC_TO_CLOUD, // Synchronizing a device-to-cloud ledger
SYNC_FROM_CLOUD, // Synchronizing a cloud-to-device ledger
Expand All @@ -99,11 +99,14 @@ class LedgerManager {
std::unique_ptr<char[]> buf_; // Intermediate buffer used for piping ledger data
LedgerSyncContext* curCtx_; // Context of the ledger being synchronized
coap_message* msg_; // CoAP request or response that is being sent or received
uint64_t nextSyncTime_; // Nearest time a device-to-cloud ledger needs to be synchronized
uint64_t nextSyncTime_; // Time when the next device-to-cloud ledger needs to be synchronized (ticks)
uint64_t retryTime_; // Time when synchronization can be retried (ticks)
unsigned retryDelay_; // Delay before retrying synchronization
size_t bytesInBuf_; // Number of bytes stored in the intermediate buffer
State state_; // Current state
int pendingState_; // Pending state flags
State state_; // Current manager state
int pendingState_; // Pending ledger state flags
int reqId_; // ID of the ongoing CoAP request
bool resubscribe_; // Whether ledger subcriptions need to be updated on the server

mutable StaticRecursiveMutex mutex_; // Manager lock

Expand Down Expand Up @@ -138,10 +141,11 @@ class LedgerManager {
void clearPendingState(LedgerSyncContext* ctx, int state);
void updateSyncTime(LedgerSyncContext* ctx);

void handleError(int error);

void startSync();
void reset();

void handleError(int error);

LedgerSyncContexts::ConstIterator findContext(const char* name, bool& found) const;

static int connectionCallback(int error, int status, void* arg);
Expand Down

0 comments on commit 2d47743

Please sign in to comment.