Skip to content

Commit

Permalink
Low: mainloop: make it possible to specify server's priority in mainloop
Browse files Browse the repository at this point in the history
  • Loading branch information
jnpkrn committed Jun 6, 2019
1 parent 3af11a8 commit 353417f
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 3 deletions.
24 changes: 24 additions & 0 deletions include/crm/common/mainloop.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,30 @@ struct ipc_client_callbacks {
qb_ipcs_service_t *mainloop_add_ipc_server(const char *name, enum qb_ipc_type type,
struct qb_ipcs_service_handlers *callbacks);

/*!
* \brief Start server-side API end-point, hooked into the internal event loop
*
* \param[in] name name of the IPC end-point ("address" for the client)
* \param[in] type selects libqb's IPC back-end (or use #QB_IPC_NATIVE)
* \param[in] callbacks defines libqb's IPC service-level handlers
* \param[in] priority priority relative to other events handled in the
* abstract handling loop, use #QB_LOOP_MED when unsure
*
* \return libqb's opaque handle to the created service abstraction
*
* \note For portability concerns, do not use this function if you keep
* \p priority as #QB_LOOP_MED, stick with #mainloop_add_ipc_server
* (with exactly such semantics) instead (once you link with this new
* symbol employed, you can't downgrade the library freely anymore).
*
* \note The intended effect will only get fully reflected when run-time
* linked to patched libqb: https://github.com/ClusterLabs/libqb/pull/352
*/
qb_ipcs_service_t *mainloop_add_ipc_server_with_prio(const char *name,
enum qb_ipc_type type,
struct qb_ipcs_service_handlers *callbacks,
enum qb_loop_priority prio);

void mainloop_del_ipc_server(qb_ipcs_service_t * server);

mainloop_io_t *mainloop_add_ipc_client(const char *name, int priority, size_t max_size,
Expand Down
82 changes: 79 additions & 3 deletions lib/common/mainloop.c
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,65 @@ gio_poll_destroy(gpointer data)
}
}

/*!
* \internal
* \brief Convert libqb's poll priority into GLib's one
*
* \param[in] prio libqb's poll priority (#QB_LOOP_MED assumed as fallback)
*
* \return best matching GLib's priority
*/
static gint
conv_prio_libqb2glib(enum qb_loop_priority prio)
{
gint ret = G_PRIORITY_DEFAULT;
switch (prio) {
case QB_LOOP_LOW:
ret = G_PRIORITY_LOW;
break;
case QB_LOOP_HIGH:
ret = G_PRIORITY_HIGH;
break;
default:
crm_trace("Invalid libqb's loop priority %d, assuming QB_LOOP_MED",
prio);
/* fall-through */
case QB_LOOP_MED:
break;
}
return ret;
}

/*!
* \internal
* \brief Convert libqb's poll priority to rate limiting spec
*
* \param[in] prio libqb's poll priority (#QB_LOOP_MED assumed as fallback)
*
* \return best matching rate limiting spec
*/
static enum qb_ipcs_rate_limit
conv_libqb_prio2ratelimit(enum qb_loop_priority prio)
{
/* this is an inversion of what libqb's qb_ipcs_request_rate_limit does */
enum qb_ipcs_rate_limit ret = QB_IPCS_RATE_NORMAL;
switch (prio) {
case QB_LOOP_LOW:
ret = QB_IPCS_RATE_SLOW;
break;
case QB_LOOP_HIGH:
ret = QB_IPCS_RATE_FAST;
break;
default:
crm_trace("Invalid libqb's loop priority %d, assuming QB_LOOP_MED",
prio);
/* fall-through */
case QB_LOOP_MED:
break;
}
return ret;
}

static int32_t
gio_poll_dispatch_update(enum qb_loop_priority p, int32_t fd, int32_t evts,
void *data, qb_ipcs_dispatch_fn_t fn, int32_t add)
Expand Down Expand Up @@ -502,8 +561,8 @@ gio_poll_dispatch_update(enum qb_loop_priority p, int32_t fd, int32_t evts,
adaptor->p = p;
adaptor->is_used++;
adaptor->source =
g_io_add_watch_full(channel, G_PRIORITY_DEFAULT, evts, gio_read_socket, adaptor,
gio_poll_destroy);
g_io_add_watch_full(channel, conv_prio_libqb2glib(p), evts,
gio_read_socket, adaptor, gio_poll_destroy);

/* Now that mainloop now holds a reference to channel,
* thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new().
Expand Down Expand Up @@ -587,7 +646,15 @@ pick_ipc_type(enum qb_ipc_type requested)

qb_ipcs_service_t *
mainloop_add_ipc_server(const char *name, enum qb_ipc_type type,
struct qb_ipcs_service_handlers * callbacks)
struct qb_ipcs_service_handlers *callbacks)
{
return mainloop_add_ipc_server_with_prio(name, type, callbacks, QB_LOOP_MED);
}

qb_ipcs_service_t *
mainloop_add_ipc_server_with_prio(const char *name, enum qb_ipc_type type,
struct qb_ipcs_service_handlers *callbacks,
enum qb_loop_priority prio)
{
int rc = 0;
qb_ipcs_service_t *server = NULL;
Expand All @@ -599,6 +666,15 @@ mainloop_add_ipc_server(const char *name, enum qb_ipc_type type,
crm_client_init();
server = qb_ipcs_create(name, 0, pick_ipc_type(type), callbacks);

if (server == NULL) {
crm_err("Could not create %s IPC server: %s (%d)", name, pcmk_strerror(rc), rc);
return NULL;
}

if (prio != QB_LOOP_MED) {
qb_ipcs_request_rate_limit(server, conv_libqb_prio2ratelimit(prio));
}

#ifdef HAVE_IPCS_GET_BUFFER_SIZE
/* All clients should use at least ipc_buffer_max as their buffer size */
qb_ipcs_enforce_buffer_size(server, crm_ipc_default_buffer_size());
Expand Down

0 comments on commit 353417f

Please sign in to comment.