Skip to content

Commit

Permalink
Merge pull request #1628 from atsign-foundation/monitor-stability
Browse files Browse the repository at this point in the history
fix: stable monitor connections in c sshnpd
  • Loading branch information
XavierChanth authored Jan 8, 2025
2 parents 536f114 + 5512058 commit 209aa2e
Show file tree
Hide file tree
Showing 10 changed files with 70 additions and 38 deletions.
2 changes: 1 addition & 1 deletion packages/c/cmake/atsdk.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ if(NOT atsdk_FOUND)
FetchContent_Declare(
atsdk
GIT_REPOSITORY https://github.com/atsign-foundation/at_c.git
GIT_TAG 9b9cb315a8af08a19e9679c9dad98572536f2ee4
GIT_TAG b261c6d56ebc952a2b1d7ab0b136753173c7a42a
)
FetchContent_MakeAvailable(atsdk)
install(TARGETS atclient atchops atlogger)
Expand Down
5 changes: 5 additions & 0 deletions packages/c/sshnpd/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 0.2.6

- fix: stabilize monitor connection
- automatic failover / reconnect after ~40 seconds of down time

## 0.2.5

- fix: uptake some fixes in monitor
Expand Down
2 changes: 1 addition & 1 deletion packages/c/sshnpd/include/sshnpd/handler_commons.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#ifndef HANDLER_COMMONS_H
#define HANDLER_COMMONS_H
#include "sshnpd/params.h"
#include <atclient/cjson.h>
#include <atclient/monitor.h>
#include <atcommons/json.h>
#include <pthread.h>

#define BYTES(x) (sizeof(unsigned char) * x)
Expand Down
2 changes: 1 addition & 1 deletion packages/c/sshnpd/include/sshnpd/run_srv_process.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#ifndef RUN_SRV_H
#define RUN_SRV_H

#include <atclient/cjson.h>
#include <atcommons/json.h>
#include <stdbool.h>
#include <stdint.h>

Expand Down
2 changes: 1 addition & 1 deletion packages/c/sshnpd/include/sshnpd/version.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#ifndef SSHNPD_VERSION_H
#define SSHNPD_VERSION_H
#define SSHNPD_VERSION "0.2.5"
#define SSHNPD_VERSION "0.2.6"
#endif
2 changes: 1 addition & 1 deletion packages/c/sshnpd/src/handle_npt_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include <atchops/base64.h>
#include <atchops/iv.h>
#include <atchops/rsa_key.h>
#include <atclient/cjson.h>
#include <atcommons/json.h>
#include <atclient/monitor.h>
#include <atclient/notify.h>
#include <atclient/string_utils.h>
Expand Down
2 changes: 1 addition & 1 deletion packages/c/sshnpd/src/handle_ssh_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
#include <atchops/base64.h>
#include <atchops/iv.h>
#include <atchops/rsa_key.h>
#include <atclient/cjson.h>
#include <atclient/monitor.h>
#include <atclient/notify.h>
#include <atclient/string_utils.h>
#include <atcommons/json.h>
#include <atlogger/atlogger.h>
#include <errno.h>
#include <pthread.h>
Expand Down
2 changes: 1 addition & 1 deletion packages/c/sshnpd/src/handler_commons.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include "sshnpd/sshnpd.h"
#include <atchops/constants.h>
#include <atchops/rsa_key.h>
#include <atclient/cjson.h>
#include <atcommons/json.h>
#include <atlogger/atlogger.h>
#include <sshnpd/handler_commons.h>
#include <stdlib.h>
Expand Down
87 changes: 57 additions & 30 deletions packages/c/sshnpd/src/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
#include <atclient/atkey.h>
#include <atclient/atkeys.h>
#include <atclient/atkeys_file.h>
#include <atclient/cjson.h>
#include <atclient/connection.h>
#include <atclient/connection_hooks.h>
#include <atclient/monitor.h>
#include <atclient/notify.h>
#include <atclient/string_utils.h>
#include <atcommons/json.h>
#include <atlogger/atlogger.h>
#include <errno.h>
#include <libgen.h>
Expand All @@ -38,6 +38,10 @@
#define FILENAME_BUFFER_SIZE 500
#define LOGGER_TAG "sshnpd"

#define MONITOR_READ_TIMEOUT_MS 5000
// How often to try to reconnect if the connect appears stale
#define MONITOR_NOOP_TIMEOUT_MS 40000

static struct {
char *str;
enum notification_key key;
Expand All @@ -57,6 +61,7 @@ static int lock_atclient(void);
static int unlock_atclient(int);

static int reconnect_atclient();
static int reconnect_monitor();

static int set_worker_hooks();
static void main_loop();
Expand Down Expand Up @@ -178,6 +183,7 @@ int main(int argc, char **argv) {

// 7.a Initialize the monitor atclient
atclient_init(&monitor_ctx);
atclient_set_read_timeout(&monitor_ctx, MONITOR_READ_TIMEOUT_MS); // 5 seconds for timeout
res = atclient_monitor_pkam_authenticate(&monitor_ctx, params.atsign, &atkeys, NULL);
if (res != 0 || !should_run) {
exit_res = res;
Expand All @@ -187,7 +193,7 @@ int main(int argc, char **argv) {
// 7.b Initialize the worker atclient
atclient_init(&worker);
bool free_ping_response = false;
res = atclient_pkam_authenticate(&worker, params.atsign, &atkeys, NULL);
res = atclient_pkam_authenticate(&worker, params.atsign, &atkeys, NULL, NULL);
if (res != 0 || !should_run) {
exit_res = res;
goto cancel_atclient;
Expand All @@ -205,12 +211,12 @@ int main(int argc, char **argv) {
// atclient_get_public_encryption_key(&atclient, params.manager_list[i], &public_encryption_key);
// TODO: finish caching
}
printf("\n");
if (params.policy == NULL) {
atlogger_log(LOGGER_TAG, ATLOGGER_LOGGING_LEVEL_DEBUG, "Policy Manager: NULL");
} else {
atlogger_log(LOGGER_TAG, ATLOGGER_LOGGING_LEVEL_DEBUG, "Policy Manager: %s", params.policy);
}
printf("\n");

if (!should_run) {
exit_res = res;
Expand Down Expand Up @@ -408,62 +414,63 @@ void main_loop() {
permitopen.permitopen_hosts = params.permitopen_hosts;
permitopen.permitopen_ports = params.permitopen_ports;

size_t timeout_counter = 0;

while (should_run) {
atlogger_log(LOGGER_TAG, ATLOGGER_LOGGING_LEVEL_DEBUG, "Waiting for next monitor thread message\n");
atclient_monitor_response_init(&message);

int ret;
if (timeout_counter * MONITOR_READ_TIMEOUT_MS > MONITOR_NOOP_TIMEOUT_MS) {
// Do noop & reconnect if needed
ret = reconnect_monitor();
if (ret != 0) {
timeout_counter = MONITOR_NOOP_TIMEOUT_MS / MONITOR_READ_TIMEOUT_MS + 1;
atclient_monitor_response_free(&message);
continue;
} else {
timeout_counter = 0;
}
}

// Read the next monitor message
int ret = atclient_monitor_read(&monitor_ctx, &worker, &message, &monitor_hooks);
ret = atclient_monitor_read(&monitor_ctx, &worker, &message, &monitor_hooks);
if (ret != 0) {
atlogger_log(LOGGER_TAG, ATLOGGER_LOGGING_LEVEL_ERROR, "Possible bad state: monitor read failed (ret: %d)\n",
ret);
atlogger_log(LOGGER_TAG, ATLOGGER_LOGGING_LEVEL_ERROR,
"Possible bad state: monitor read failed, resetting connection (ret: %d)\n", ret);
timeout_counter = MONITOR_NOOP_TIMEOUT_MS / MONITOR_READ_TIMEOUT_MS + 1;
}

atlogger_log(LOGGER_TAG, ATLOGGER_LOGGING_LEVEL_DEBUG, "Received message of type: %d\n", message.type);

switch (message.type) {
case ATCLIENT_MONITOR_MESSAGE_TYPE_EMPTY:
// We got a timeout, nothing to read, nothing to do
timeout_counter++;
break;
case ATCLIENT_MONITOR_ERROR_READ:
if (!atclient_monitor_is_connected(&monitor_ctx)) {
atlogger_log(LOGGER_TAG, ATLOGGER_LOGGING_LEVEL_ERROR,
"Seems the monitor connection is down, trying to reconnect\n");

int ret = atclient_monitor_pkam_authenticate(&monitor_ctx, params.atsign, &atkeys, NULL);
if (ret != 0) {
atlogger_log(LOGGER_TAG, ATLOGGER_LOGGING_LEVEL_ERROR,
"Monitor connection failed to reconnect, trying again in 1 second...\n");
sleep(1);
break;
}

ret = atclient_monitor_start(&monitor_ctx, regex);
if (ret != 0) {
atlogger_log(LOGGER_TAG, ATLOGGER_LOGGING_LEVEL_ERROR, "Monitor verb failed to restart.\n");
break;
}

atlogger_log(LOGGER_TAG, ATLOGGER_LOGGING_LEVEL_INFO, "Reconnected the monitor connection.\n");
}
timeout_counter = MONITOR_NOOP_TIMEOUT_MS / MONITOR_READ_TIMEOUT_MS + 1;
break;
case ATCLIENT_MONITOR_MESSAGE_TYPE_DATA_RESPONSE:
timeout_counter = 0;
atlogger_log(LOGGER_TAG, ATLOGGER_LOGGING_LEVEL_DEBUG, "Received a data response: %s\n", message.data_response);
break;
case ATCLIENT_MONITOR_MESSAGE_TYPE_ERROR_RESPONSE:
timeout_counter = 0;
atlogger_log(LOGGER_TAG, ATLOGGER_LOGGING_LEVEL_ERROR, "Received an error response: %s\n",
message.error_response);
break;
case ATCLIENT_MONITOR_MESSAGE_TYPE_NONE:
atlogger_log(LOGGER_TAG, ATLOGGER_LOGGING_LEVEL_ERROR, "Received a NONE notification type\n");
break;
case ATCLIENT_MONITOR_ERROR_PARSE_NOTIFICATION:
timeout_counter = MONITOR_NOOP_TIMEOUT_MS + 1;
atlogger_log(LOGGER_TAG, ATLOGGER_LOGGING_LEVEL_ERROR, "Failed to parse the notification\n");
break;
case ATCLIENT_MONITOR_ERROR_DECRYPT_NOTIFICATION:
timeout_counter = MONITOR_NOOP_TIMEOUT_MS + 1;
atlogger_log(LOGGER_TAG, ATLOGGER_LOGGING_LEVEL_ERROR, "Failed to decrypt the notification\n");
break;
case ATCLIENT_MONITOR_MESSAGE_TYPE_NOTIFICATION: {
timeout_counter = 0;
bool is_init = atclient_atnotification_is_decrypted_value_initialized(&message.notification);
bool has_key = atclient_atnotification_is_key_initialized(&message.notification);
if (is_init) {
Expand Down Expand Up @@ -522,7 +529,6 @@ void main_loop() {
// DO NOT USE permitopen, use npa_permitopen
}

// TODO: maybe multithread these handlers
switch (notification_key) {
case NK_SSHPUBLICKEY:
atlogger_log(LOGGER_TAG, ATLOGGER_LOGGING_LEVEL_DEBUG, "Executing handle_sshpublickey\n");
Expand Down Expand Up @@ -604,7 +610,7 @@ static int reconnect_atclient() {

if (!atclient_is_connected(&worker)) {
atlogger_log(TAG, ATLOGGER_LOGGING_LEVEL_INFO, "Worker client is not connected, attempting to reconnect:\n");
ret = atclient_pkam_authenticate(&worker, params.atsign, &atkeys, NULL);
ret = atclient_pkam_authenticate(&worker, params.atsign, &atkeys, NULL, NULL);

if (ret != 0) {
atlogger_log(TAG, ATLOGGER_LOGGING_LEVEL_ERROR, "Failed to reconnect to the atServer.\n");
Expand All @@ -622,3 +628,24 @@ static int reconnect_atclient() {
exit:
return ret;
}

static int reconnect_monitor() {
atlogger_log(LOGGER_TAG, ATLOGGER_LOGGING_LEVEL_ERROR, "Seems the monitor connection is down, trying to reconnect\n");

int ret = atclient_monitor_pkam_authenticate(&monitor_ctx, params.atsign, &atkeys, NULL);
if (ret != 0) {
atlogger_log(LOGGER_TAG, ATLOGGER_LOGGING_LEVEL_ERROR,
"Monitor connection failed to reconnect, trying again in 1 second...\n");
sleep(1);
return ret;
}

ret = atclient_monitor_start(&monitor_ctx, regex);
if (ret != 0) {
atlogger_log(LOGGER_TAG, ATLOGGER_LOGGING_LEVEL_ERROR, "Monitor verb failed to restart.\n");
return ret;
}

atlogger_log(LOGGER_TAG, ATLOGGER_LOGGING_LEVEL_INFO, "Reconnected the monitor connection.\n");
return 0;
}
2 changes: 1 addition & 1 deletion packages/c/sshnpd/src/run_srv_process.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#include "srv/params.h"
#include "srv/srv.h"
#include <atclient/cjson.h>
#include <atcommons/json.h>
#include <atclient/string_utils.h>
#include <atlogger/atlogger.h>
#include <errno.h>
Expand Down

0 comments on commit 209aa2e

Please sign in to comment.