Skip to content

Commit

Permalink
Allow serializing SM state
Browse files Browse the repository at this point in the history
It is useful in some applications to get a snapshot of the stream
management state for storage outside the process, in order to recover
from crashes and other things.  The get_sm_state already present is not
suitable for this because it returns a live object tied in to the
current context and such, and containing much unneeded internal-api
data.

Introduce xmpp_sm_state_set_callback which is called every time the
state changes with a serialized state, and a dual xmpp_sm_state_restore
which takes in this serialized state and sets up a new sm_state based on
that.

The serialization is considered opaque from the PoV of the API, but is
based on CBOR to facilitate easy debugging.
  • Loading branch information
singpolyma committed Jul 16, 2024
1 parent 5142eea commit 2e3757a
Show file tree
Hide file tree
Showing 6 changed files with 287 additions and 3 deletions.
6 changes: 6 additions & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ STATIC_TESTS = \
tests/test_jid \
tests/test_ctx \
tests/test_send_queue \
tests/test_serialize_sm \
tests/test_string \
tests/test_resolver

Expand Down Expand Up @@ -285,6 +286,11 @@ tests_test_send_queue_CFLAGS = -I$(top_srcdir)/src
tests_test_send_queue_LDADD = $(STROPHE_LIBS)
tests_test_send_queue_LDFLAGS = -static

tests_test_serialize_sm_SOURCES = tests/test_serialize_sm.c tests/test.c tests/test.h
tests_test_serialize_sm_CFLAGS = -I$(top_srcdir)/src
tests_test_serialize_sm_LDADD = $(STROPHE_LIBS)
tests_test_serialize_sm_LDFLAGS = -static

tests_test_snprintf_SOURCES = tests/test_snprintf.c
tests_test_snprintf_CFLAGS = -I$(top_srcdir)/src

Expand Down
3 changes: 3 additions & 0 deletions src/auth.c
Original file line number Diff line number Diff line change
Expand Up @@ -1232,6 +1232,7 @@ static void _sm_enable(xmpp_conn_t *conn)
send_stanza(conn, enable, XMPP_QUEUE_SM_STROPHE);
conn->sm_state->sm_sent_nr = 0;
conn->sm_state->sm_enabled = 1;
trigger_sm_callback(conn);
}

static int
Expand Down Expand Up @@ -1486,6 +1487,8 @@ static int _handle_sm(xmpp_conn_t *const conn,
name = NULL;
}

trigger_sm_callback(conn);

err_sm:
if (!name) {
char *err = "Couldn't convert stanza to text!";
Expand Down
3 changes: 3 additions & 0 deletions src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,8 @@ struct _xmpp_conn_t {
hash_t *id_handlers;
xmpp_handlist_t *handlers;
xmpp_sockopt_callback sockopt_cb;
xmpp_sm_callback sm_callback;
void *sm_callback_ctx;
};

void conn_disconnect(xmpp_conn_t *conn);
Expand Down Expand Up @@ -376,6 +378,7 @@ void handler_add(xmpp_conn_t *conn,
void handler_system_delete_all(xmpp_conn_t *conn);

/* utility functions */
void trigger_sm_callback(xmpp_conn_t *conn);
void reset_sm_state(xmpp_sm_state_t *sm_state);
void disconnect_mem_error(xmpp_conn_t *conn);

Expand Down
271 changes: 269 additions & 2 deletions src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -1284,6 +1284,267 @@ xmpp_sm_state_t *xmpp_conn_get_sm_state(xmpp_conn_t *conn)
return ret;
}

void xmpp_sm_state_set_callback(xmpp_conn_t *conn, xmpp_sm_callback cb, void *ctx)
{
conn->sm_callback = cb;
conn->sm_callback_ctx = ctx;
}

int xmpp_sm_state_restore(xmpp_conn_t *conn, const unsigned char *sm_state, size_t sm_state_len)
{
/* We can only set the SM state when we're disconnected */
if (conn->state != XMPP_STATE_DISCONNECTED) {
strophe_error(conn->ctx, "conn",
"SM state can only be set the when we're disconnected");
return XMPP_EINVOP;
}

if (conn->sm_state) {
strophe_error(conn->ctx, "conn", "SM state is already set!");
return XMPP_EINVOP;
}

if (sm_state_len < 5*6) {
strophe_error(conn->ctx, "conn", "Provided sm_state data is too short");
return XMPP_EINVOP;
}

if (memcmp(sm_state, "\x1a\x00\x00\x00\x00", 5) != 0) {
strophe_error(conn->ctx, "conn", "Unknown sm_state version");
return XMPP_EINVOP;
}
sm_state += 5;

conn->sm_state = strophe_alloc(conn->ctx, sizeof(*conn->sm_state));
if (!conn->sm_state) return XMPP_EMEM;

memset(conn->sm_state, 0, sizeof(*conn->sm_state));
conn->sm_state->sm_queue.head = NULL;
conn->sm_state->sm_queue.tail = NULL;
conn->sm_state->ctx = conn->ctx;

conn->sm_state->sm_support = 1;
conn->sm_state->sm_enabled = 1;
conn->sm_state->can_resume = 1;
conn->sm_state->resume = 1;

sm_state++;
conn->sm_state->sm_sent_nr = *sm_state++ << 24;
conn->sm_state->sm_sent_nr |= *sm_state++ << 16;
conn->sm_state->sm_sent_nr |= *sm_state++ << 8;
conn->sm_state->sm_sent_nr |= *sm_state++;

sm_state++;
conn->sm_state->sm_handled_nr = *sm_state++ << 24;
conn->sm_state->sm_handled_nr |= *sm_state++ << 16;
conn->sm_state->sm_handled_nr |= *sm_state++ << 8;
conn->sm_state->sm_handled_nr |= *sm_state++;

sm_state++;
size_t id_len = *sm_state++ << 24;
id_len |= *sm_state++ << 16;
id_len |= *sm_state++ << 8;
id_len |= *sm_state++;
conn->sm_state->id = strophe_alloc(conn->ctx, id_len + 1);
if (!conn->sm_state->id) {
xmpp_free_sm_state(conn->sm_state);
return XMPP_EMEM;
}
memcpy(conn->sm_state->id, sm_state, id_len);
conn->sm_state->id[id_len] = '\0';
sm_state += id_len;

sm_state++;
conn->send_queue_len = *sm_state++ << 24;
conn->send_queue_len |= *sm_state++ << 16;
conn->send_queue_len |= *sm_state++ << 8;
conn->send_queue_len |= *sm_state++;
conn->send_queue_user_len = conn->send_queue_len;
for (int i = 0; i < conn->send_queue_len; i++) {
xmpp_send_queue_t *item = strophe_alloc(conn->ctx, sizeof(*item));
if (!item) {
xmpp_free_sm_state(conn->sm_state);
return XMPP_EMEM;
}
memset(item, 0, sizeof(*item));

sm_state++;
item->len = *sm_state++ << 24;
item->len |= *sm_state++ << 16;
item->len |= *sm_state++ << 8;
item->len |= *sm_state++;
item->data = strophe_alloc(conn->ctx, item->len + 1);
if (!item->data) {
xmpp_free_sm_state(conn->sm_state);
return XMPP_EMEM;
}
memcpy(item->data, sm_state, item->len);
item->data[item->len] = '\0';
sm_state += item->len;

item->written = 0;
item->wip = 0;
item->userdata = NULL;
item->owner = XMPP_QUEUE_USER;

if (!conn->send_queue_tail) {
conn->send_queue_head = item;
conn->send_queue_tail = item;
} else {
conn->send_queue_tail->next = item;
conn->send_queue_tail = item;
}
}

sm_state++;
size_t sm_q_len = *sm_state++ << 24;
sm_q_len |= *sm_state++ << 16;
sm_q_len |= *sm_state++ << 8;
sm_q_len |= *sm_state++;
for (size_t i = 0; i < sm_q_len; i++) {
xmpp_send_queue_t *item = strophe_alloc(conn->ctx, sizeof(*item));
if (!item) {
xmpp_free_sm_state(conn->sm_state);
return XMPP_EMEM;
}
memset(item, 0, sizeof(*item));

sm_state++;
item->sm_h = *sm_state++ << 24;
item->sm_h |= *sm_state++ << 16;
item->sm_h |= *sm_state++ << 8;
item->sm_h |= *sm_state++;
sm_state++;
item->len = *sm_state++ << 24;
item->len |= *sm_state++ << 16;
item->len |= *sm_state++ << 8;
item->len |= *sm_state++;
item->data = strophe_alloc(conn->ctx, item->len + 1);
if (!item->data) {
xmpp_free_sm_state(conn->sm_state);
return XMPP_EMEM;
}
memcpy(item->data, sm_state, item->len);
item->data[item->len] = '\0';
sm_state += item->len;

item->written = 0;
item->wip = 0;
item->userdata = NULL;
item->owner = XMPP_QUEUE_USER;
add_queue_back(&conn->sm_state->sm_queue, item);
}

return XMPP_EOK;
}

size_t xmpp_conn_serialize_sm_state(xmpp_conn_t *conn, unsigned char **buf)
{
if (!conn->sm_state->sm_support || !conn->sm_state->sm_enabled || !conn->sm_state->can_resume) {
*buf = NULL;
return 0;
}

size_t id_len = strlen(conn->sm_state->id);
xmpp_send_queue_t *peek = conn->sm_state->sm_queue.head;
size_t sm_queue_len = 0;
size_t sm_queue_size = 0;
while (peek) {
sm_queue_len++;
sm_queue_size += 10 + peek->len;
peek = peek->next;
}

size_t send_queue_len = 0;
size_t send_queue_size = 0;
peek = conn->send_queue_head;
while (peek) {
send_queue_len++;
send_queue_size += 5 + peek->len;
peek = peek->next;
}

size_t buf_size = 5 + 5 + 5 + 5 + id_len + 5 + send_queue_size + 5 + sm_queue_size;
*buf = strophe_alloc(conn->ctx, buf_size);
unsigned char *next = *buf;

memcpy(next, "\x1a\x00\x00\x00\x00", 5); // Version
next += 5;

*next++ = 0x1a;
*next++ = (conn->sm_state->sm_sent_nr >> 24) & 0xFF;
*next++ = (conn->sm_state->sm_sent_nr >> 16) & 0xFF;
*next++ = (conn->sm_state->sm_sent_nr >> 8) & 0xFF;
*next++ = conn->sm_state->sm_sent_nr & 0xFF;

*next++ = 0x1a;
*next++ = (conn->sm_state->sm_handled_nr >> 24) & 0xFF;
*next++ = (conn->sm_state->sm_handled_nr >> 16) & 0xFF;
*next++ = (conn->sm_state->sm_handled_nr >> 8) & 0xFF;
*next++ = conn->sm_state->sm_handled_nr & 0xFF;

*next++ = 0x7a;
*next++ = (id_len >> 24) & 0xFF;
*next++ = (id_len >> 16) & 0xFF;
*next++ = (id_len >> 8) & 0xFF;
*next++ = id_len & 0xFF;
memcpy(next, conn->sm_state->id, id_len);
next += id_len;

*next++ = 0x9a;
*next++ = (send_queue_len >> 24) & 0xFF;
*next++ = (send_queue_len >> 16) & 0xFF;
*next++ = (send_queue_len >> 8) & 0xFF;
*next++ = send_queue_len & 0xFF;

peek = conn->send_queue_head;
while (peek) {
*next++ = 0x7a;
*next++ = (peek->len >> 24) & 0xFF;
*next++ = (peek->len >> 16) & 0xFF;
*next++ = (peek->len >> 8) & 0xFF;
*next++ = peek->len & 0xFF;
memcpy(next, peek->data, peek->len);
next += peek->len;
peek = peek->next;
}

*next++ = 0xba;
*next++ = (sm_queue_len >> 24) & 0xFF;
*next++ = (sm_queue_len >> 16) & 0xFF;
*next++ = (sm_queue_len >> 8) & 0xFF;
*next++ = sm_queue_len & 0xFF;

peek = conn->sm_state->sm_queue.head;
while (peek) {
*next++ = 0x1a;
*next++ = (peek->sm_h >> 24) & 0xFF;
*next++ = (peek->sm_h >> 16) & 0xFF;
*next++ = (peek->sm_h >> 8) & 0xFF;
*next++ = peek->sm_h & 0xFF;

*next++ = 0x7a;
*next++ = (peek->len >> 24) & 0xFF;
*next++ = (peek->len >> 16) & 0xFF;
*next++ = (peek->len >> 8) & 0xFF;
*next++ = peek->len & 0xFF;
memcpy(next, peek->data, peek->len);
next += peek->len;
peek = peek->next;
}

return buf_size;
}

void trigger_sm_callback(xmpp_conn_t *conn) {
if (!conn || !conn->sm_callback) return;

unsigned char *buf;
size_t size = xmpp_conn_serialize_sm_state(conn, &buf);
conn->sm_callback(conn, conn->sm_callback_ctx, buf, size);
strophe_free(conn->ctx, buf);
}

static void _reset_sm_state_for_reconnect(xmpp_conn_t *conn)
{
xmpp_sm_state_t *s = conn->sm_state;
Expand Down Expand Up @@ -1490,7 +1751,9 @@ char *xmpp_conn_send_queue_drop_element(xmpp_conn_t *conn,
conn->sm_state->r_sent = 0;
}
/* Finally drop the element */
return _drop_send_queue_element(conn, t);
char *r = _drop_send_queue_element(conn, t);
trigger_sm_callback(conn);
return r;
}

/* timed handler for cleanup if normal disconnect procedure takes too long */
Expand Down Expand Up @@ -1684,6 +1947,7 @@ static void _handle_stream_end(char *name, void *userdata)
strophe_debug(conn->ctx, "xmpp", "RECV: </stream:stream>");
/* the session has been terminated properly, i.e. it can't be resumed */
conn->sm_state->can_resume = 0;
trigger_sm_callback(conn);
conn_disconnect_clean(conn);
}

Expand Down Expand Up @@ -1759,6 +2023,7 @@ static void _conn_sm_handle_stanza(xmpp_conn_t *const conn,
conn->sm_state->r_sent = 0;
}
}
trigger_sm_callback(conn);
}

static unsigned short _conn_default_port(xmpp_conn_t *conn,
Expand Down Expand Up @@ -2045,8 +2310,10 @@ static int _send_raw(xmpp_conn_t *conn,
strophe_debug_verbose(1, conn->ctx, "conn", "Q_ADD: %p", item);
if (!(owner & XMPP_QUEUE_SM) && conn->sm_state->sm_enabled &&
!conn->sm_state->r_sent) {
send_raw(conn, req_ack, strlen(req_ack), XMPP_QUEUE_SM_STROPHE, item);
conn->sm_state->r_sent = 1;
send_raw(conn, req_ack, strlen(req_ack), XMPP_QUEUE_SM_STROPHE, item);
} else {
trigger_sm_callback(conn);
}
return XMPP_EOK;
}
1 change: 1 addition & 0 deletions src/event.c
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ void xmpp_run_once(xmpp_ctx_t *ctx, unsigned long timeout)
/* if we've sent everything update the tail */
if (!sq)
conn->send_queue_tail = NULL;
trigger_sm_callback(conn);
}
intf->flush(intf);

Expand Down
6 changes: 5 additions & 1 deletion strophe.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#define __LIBSTROPHE_STROPHE_H__

#include <stddef.h> /* size_t */
#include <stdint.h>

#ifdef __cplusplus
extern "C" {
Expand Down Expand Up @@ -414,8 +415,11 @@ typedef enum {
char *xmpp_conn_send_queue_drop_element(xmpp_conn_t *conn,
xmpp_queue_element_t which);

xmpp_sm_state_t *xmpp_conn_get_sm_state(xmpp_conn_t *conn);
typedef void (*xmpp_sm_callback)(xmpp_conn_t *conn, void *ctx, const unsigned char *sm_state, size_t sm_state_len);
int xmpp_conn_set_sm_state(xmpp_conn_t *conn, xmpp_sm_state_t *sm_state);
xmpp_sm_state_t *xmpp_conn_get_sm_state(xmpp_conn_t *conn);
void xmpp_sm_state_set_callback(xmpp_conn_t *conn, xmpp_sm_callback cb, void *ctx);
int xmpp_sm_state_restore(xmpp_conn_t *conn, const unsigned char *sm_state, size_t sm_state_len);

void xmpp_free_sm_state(xmpp_sm_state_t *sm_state);

Expand Down

0 comments on commit 2e3757a

Please sign in to comment.