From eee76118f2a557a79bda0214ea5c0974a7cd40c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Pokorn=C3=BD?= Date: Wed, 29 Aug 2018 15:49:58 +0200 Subject: [PATCH] Low: mainloop: make it possible to specify server's priority in mainloop --- include/crm/common/mainloop.h | 24 ++++++++++ lib/common/mainloop.c | 82 +++++++++++++++++++++++++++++++++-- 2 files changed, 103 insertions(+), 3 deletions(-) diff --git a/include/crm/common/mainloop.h b/include/crm/common/mainloop.h index eab31ac5d62..e00da483480 100644 --- a/include/crm/common/mainloop.h +++ b/include/crm/common/mainloop.h @@ -67,6 +67,30 @@ struct ipc_client_callbacks { qb_ipcs_service_t *mainloop_add_ipc_server(const char *name, enum qb_ipc_type type, struct qb_ipcs_service_handlers *callbacks); +/*! + * \brief Start server-side API end-point, hooked into the internal event loop + * + * \param[in] name name of the IPC end-point ("address" for the client) + * \param[in] type selects libqb's IPC back-end (or use #QB_IPC_NATIVE) + * \param[in] callbacks defines libqb's IPC service-level handlers + * \param[in] priority priority relative to other events handled in the + * abstract handling loop, use #QB_LOOP_MED when unsure + * + * \return libqb's opaque handle to the created service abstraction + * + * \note For portability concerns, do not use this function if you keep + * \p priority as #QB_LOOP_MED, stick with #mainloop_add_ipc_server + * (with exactly such semantics) instead (once you link with this new + * symbol employed, you can't downgrade the library freely anymore). + * + * \note The intended effect will only get fully reflected when run-time + * linked to patched libqb: https://github.com/ClusterLabs/libqb/pull/352 + */ +qb_ipcs_service_t *mainloop_add_ipc_server_with_prio(const char *name, + enum qb_ipc_type type, + struct qb_ipcs_service_handlers *callbacks, + enum qb_loop_priority prio); + void mainloop_del_ipc_server(qb_ipcs_service_t * server); mainloop_io_t *mainloop_add_ipc_client(const char *name, int priority, size_t max_size, diff --git a/lib/common/mainloop.c b/lib/common/mainloop.c index 60726cb5852..9bdd02648a5 100644 --- a/lib/common/mainloop.c +++ b/lib/common/mainloop.c @@ -456,6 +456,65 @@ gio_poll_destroy(gpointer data) } } +/*! + * \internal + * \brief Convert libqb's poll priority into GLib's one + * + * \param[in] prio libqb's poll priority (#QB_LOOP_MED assumed as fallback) + * + * \return best matching GLib's priority + */ +static gint +conv_prio_libqb2glib(enum qb_loop_priority prio) +{ + gint ret = G_PRIORITY_DEFAULT; + switch (prio) { + case QB_LOOP_LOW: + ret = G_PRIORITY_LOW; + break; + case QB_LOOP_HIGH: + ret = G_PRIORITY_HIGH; + break; + default: + crm_trace("Invalid libqb's loop priority %d, assuming QB_LOOP_MED", + prio); + /* fall-through */ + case QB_LOOP_MED: + break; + } + return ret; +} + +/*! + * \internal + * \brief Convert libqb's poll priority to rate limiting spec + * + * \param[in] prio libqb's poll priority (#QB_LOOP_MED assumed as fallback) + * + * \return best matching rate limiting spec + */ +static enum qb_ipcs_rate_limit +conv_libqb_prio2ratelimit(enum qb_loop_priority prio) +{ + /* this is an inversion of what libqb's qb_ipcs_request_rate_limit does */ + enum qb_ipcs_rate_limit ret = QB_IPCS_RATE_NORMAL; + switch (prio) { + case QB_LOOP_LOW: + ret = QB_IPCS_RATE_SLOW; + break; + case QB_LOOP_HIGH: + ret = QB_IPCS_RATE_FAST; + break; + default: + crm_trace("Invalid libqb's loop priority %d, assuming QB_LOOP_MED", + prio); + /* fall-through */ + case QB_LOOP_MED: + break; + } + return ret; +} + static int32_t gio_poll_dispatch_update(enum qb_loop_priority p, int32_t fd, int32_t evts, void *data, qb_ipcs_dispatch_fn_t fn, int32_t add) @@ -502,8 +561,8 @@ gio_poll_dispatch_update(enum qb_loop_priority p, int32_t fd, int32_t evts, adaptor->p = p; adaptor->is_used++; adaptor->source = - g_io_add_watch_full(channel, G_PRIORITY_DEFAULT, evts, gio_read_socket, adaptor, - gio_poll_destroy); + g_io_add_watch_full(channel, conv_prio_libqb2glib(p), evts, + gio_read_socket, adaptor, gio_poll_destroy); /* Now that mainloop now holds a reference to channel, * thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new(). @@ -587,7 +646,15 @@ pick_ipc_type(enum qb_ipc_type requested) qb_ipcs_service_t * mainloop_add_ipc_server(const char *name, enum qb_ipc_type type, - struct qb_ipcs_service_handlers * callbacks) + struct qb_ipcs_service_handlers *callbacks) +{ + return mainloop_add_ipc_server_with_prio(name, type, callbacks, QB_LOOP_MED); +} + +qb_ipcs_service_t * +mainloop_add_ipc_server_with_prio(const char *name, enum qb_ipc_type type, + struct qb_ipcs_service_handlers *callbacks, + enum qb_loop_priority prio) { int rc = 0; qb_ipcs_service_t *server = NULL; @@ -599,6 +666,15 @@ mainloop_add_ipc_server(const char *name, enum qb_ipc_type type, crm_client_init(); server = qb_ipcs_create(name, 0, pick_ipc_type(type), callbacks); + if (server == NULL) { + crm_err("Could not create %s IPC server: %s (%d)", name, pcmk_strerror(rc), rc); + return NULL; + } + + if (prio != QB_LOOP_MED) { + qb_ipcs_request_rate_limit(server, conv_libqb_prio2ratelimit(prio)); + } + #ifdef HAVE_IPCS_GET_BUFFER_SIZE /* All clients should use at least ipc_buffer_max as their buffer size */ qb_ipcs_enforce_buffer_size(server, crm_ipc_default_buffer_size());