diff --git a/include/qpid/dispatch/general_work.h b/include/qpid/dispatch/general_work.h new file mode 100644 index 000000000..c45d693a1 --- /dev/null +++ b/include/qpid/dispatch/general_work.h @@ -0,0 +1,108 @@ +#ifndef __general_work_h__ +#define __general_work_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include + + +// +// General Work +// +// The following API can be used to post work to be executed on a background thread. Work handlers are run serially in +// the order they are posted (no two work items will run at the same time). The background thread is non-Proactor and +// can run at the same time as Proactor threads (I/O and qd_timers) as well as router core and other system threads. +// + +typedef struct qd_general_work_t qd_general_work_t; + +/** + * General work handler + * + * The signature of the function that is run on the background thread + * + * If the discard parameter to the handler is true the router is in the process of shutting down and cleaning up any + * outstanding general work items. At this point all threads have been shutdown and the handler must avoid scheduling + * any further work and should simply release any resources held by the args parameter. + * + * @param context the context parameter passed to qd_general_work() constructor + * @param args a pointer to memory holding the arguments set via qd_general_work_args() + * @param discard True if the router is shutting down and the handler should discard the work. + */ +typedef void (*qd_general_work_handler_t) (void *context, void *args, bool discard); + +/** + * Create a new general work request + * + * @param context supplied by caller, passed to handler + * @param handler the function to run + * @param args_size the amount of memory needed for handler arguments + * @return a pointer to an initialized qd_general_work_t instance (never null) + */ +qd_general_work_t *qd_general_work(void *context, qd_general_work_handler_t handler, size_t args_size); + +/** + * Access the work item's memory for handler arguments + * + * Use this function to initialize the handler's input parameters before posting the work item. It is expected that the + * caller will cast the return value to a pointer to the appropriate structure that holds the handler's parameters. This + * pointer will be passed in the args parameter to the handler function. + * + * The returned pointer must not be accessed after the work item has been scheduled (on return from + * qd_post_general_work()) + * + * @param work the general work instance + * @return address of the start of argument memory. The amount of memory returned will be the value of the args_size + * parameter passed to qd_general_work() + */ +void *qd_general_work_args(qd_general_work_t *work); + +/** + * Schedule the work item to run on the general work thread. + * + * The caller must not reference work on return from this call. + * + * @param work the work item to schedule + */ +void qd_post_general_work(qd_general_work_t *work); + +/** + * Start the general work thread + */ +void qd_general_work_start(void); + +/** + * Stop the general work thread. + * + * Blocks caller until thread has stopped. Work callbacks will cease being invoked on return to the caller. + * + */ +void qd_general_work_stop(void); + +/** + * Free all resources associated with general work + * + * During this call any pending work items that have been submitted after qd_general_work_stop() has been called will be + * invoked with the discard flag true. + * + */ +void qd_general_work_finalize(void); + +#endif // __general_work_h__ diff --git a/include/qpid/dispatch/threading.h b/include/qpid/dispatch/threading.h index 4ec08f3c3..59a14c0f5 100644 --- a/include/qpid/dispatch/threading.h +++ b/include/qpid/dispatch/threading.h @@ -67,6 +67,7 @@ typedef enum { SYS_THREAD_PROACTOR, SYS_THREAD_VFLOW, SYS_THREAD_LWS_HTTP, + SYS_THREAD_GENERAL_WORK, // add new thread roles here and update _thread_names in threading.c SYS_THREAD_ROLE_COUNT } sys_thread_role_t; diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 79608521b..b5b1eb930 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -148,6 +148,7 @@ set(qpid_dispatch_SOURCES qd_asan_interface.c protocols.c connection_counters.c + general_work.c ) set(qpid_dispatch_INCLUDES diff --git a/src/dispatch.c b/src/dispatch.c index 9f81ce1e4..e6ba3c1a9 100644 --- a/src/dispatch.c +++ b/src/dispatch.c @@ -37,6 +37,7 @@ #include "qpid/dispatch/discriminator.h" #include "qpid/dispatch/server.h" #include "qpid/dispatch/static_assert.h" +#include "qpid/dispatch/general_work.h" #include #include @@ -139,6 +140,8 @@ qd_dispatch_t *qd_dispatch(const char *python_pkgdir, bool test_hooks) if (qd_error_code()) { qd_dispatch_free(qd); return 0; } qd_message_initialize(); if (qd_error_code()) { qd_dispatch_free(qd); return 0; } + qd_general_work_start(); + return qd; } @@ -384,6 +387,9 @@ void qd_dispatch_free(qd_dispatch_t *qd) { if (!qd) return; + // Stop the general work thread to prevent further callbacks + qd_general_work_stop(); + /* Stop HTTP threads immediately */ qd_http_server_free(qd_server_http(qd->server)); diff --git a/src/general_work.c b/src/general_work.c new file mode 100644 index 000000000..b29d1cec4 --- /dev/null +++ b/src/general_work.c @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +#include "qpid/dispatch/general_work.h" +#include "qpid/dispatch/ctools.h" +#include "qpid/dispatch/threading.h" +#include "qpid/dispatch/alloc_pool.h" + +#include + +// The maximum size allowed for handler arguments. This value can be increased should handlers need more memory for +// arguments. +#define QD_GENERAL_WORK_SIZE 160 + +struct qd_general_work_t { + DEQ_LINKS(qd_general_work_t); + void *context; + qd_general_work_handler_t handler; + uint8_t overlay[QD_GENERAL_WORK_SIZE]; +}; + +ALLOC_DECLARE(qd_general_work_t); +ALLOC_DEFINE(qd_general_work_t); +DEQ_DECLARE(qd_general_work_t, qd_general_work_list_t); + +static sys_mutex_t lock; +static sys_cond_t condition; +sys_thread_t *thread; + +static qd_general_work_list_t work_list_LH = DEQ_EMPTY; // must hold lock +static bool need_wake_LH; // must hold lock +static bool running_LH; // must hold lock + + +static void *general_work_thread(void *context); + + +void qd_general_work_start(void) +{ + sys_mutex_init(&lock); + sys_cond_init(&condition); + sys_mutex_lock(&lock); + running_LH = true; + sys_mutex_unlock(&lock); + thread = sys_thread(SYS_THREAD_GENERAL_WORK, general_work_thread, 0); +} + + +void qd_general_work_stop(void) +{ + // signal the background thread to stop by sending a work request with no handler + + qd_general_work_t *work = qd_general_work(0, 0, 0); + qd_post_general_work(work); + sys_thread_join(thread); +} + +void qd_general_work_finalize(void) +{ + // discard any left over general work items, allowing them to clean up any + // resources held by the work item + + sys_mutex_lock(&lock); + assert(running_LH == false); // need to call qd_general_work_stop first + qd_general_work_t *work = DEQ_HEAD(work_list_LH); + while (!!work) { + DEQ_REMOVE_HEAD(work_list_LH); + sys_mutex_unlock(&lock); + work->handler(work->context,(void *) work->overlay, true); // discard == true + free_qd_general_work_t(work); + sys_mutex_lock(&lock); + work = DEQ_HEAD(work_list_LH); + } + sys_mutex_unlock(&lock); + + sys_thread_free(thread); + sys_cond_free(&condition); + sys_mutex_free(&lock); +} + + +qd_general_work_t *qd_general_work(void *context, qd_general_work_handler_t handler, size_t args_size) +{ + assert(args_size <= QD_GENERAL_WORK_SIZE); // you need to increase QD_GENERAL_WORK_SIZE + qd_general_work_t *work = new_qd_general_work_t(); + ZERO(work); + work->context = context; + work->handler = handler; + return work; +} + + +void *qd_general_work_args(qd_general_work_t *work) +{ + assert(work); + return (void *) work->overlay; +} + + +void qd_post_general_work(qd_general_work_t *work) +{ + bool need_wake; + + DEQ_ITEM_INIT(work); + + sys_mutex_lock(&lock); + assert(running_LH); // post general work after thread stopped! + DEQ_INSERT_TAIL(work_list_LH, work); + need_wake = need_wake_LH; + if (need_wake) { + need_wake_LH = false; + } + sys_mutex_unlock(&lock); + + if (need_wake) { + sys_cond_signal(&condition); + } +} + + +/** + * Thread main loop + */ +static void *general_work_thread(void *context) +{ + qd_general_work_t *work = 0; + + while (true) { + + // Process one at a time, allowing other threads to run each time we take the lock + sys_mutex_lock(&lock); + work = DEQ_HEAD(work_list_LH); + while (!work) { + need_wake_LH = true; + sys_cond_wait(&condition, &lock); + work = DEQ_HEAD(work_list_LH); + } + + DEQ_REMOVE_HEAD(work_list_LH); + if (!work->handler) { + // use a null handler as the stop thread indicator + running_LH = false; + sys_mutex_unlock(&lock); + free_qd_general_work_t(work); + return 0; + } + sys_mutex_unlock(&lock); + + work->handler(work->context, (void *) work->overlay, false); + free_qd_general_work_t(work); + } +} diff --git a/src/posix/threading.c b/src/posix/threading.c index 7fe1a9a82..420313a8e 100644 --- a/src/posix/threading.c +++ b/src/posix/threading.c @@ -140,7 +140,8 @@ static const char thread_names[SYS_THREAD_ROLE_COUNT][SYS_THREAD_NAME_MAX + 1] = "core_thread", // SYS_THREAD_CORE "wrkr_", // SYS_THREAD_PROACTOR (multiple) "vflow_thread", // SYS_THREAD_VFLOW - "lws_thread" // SYS_THREAD_LWS_HTTP + "lws_thread", // SYS_THREAD_LWS_HTTP + "genwork_thread" // SYS_THREAD_GENERAL_WORK }; static sys_atomic_t proactor_thread_count = 0; @@ -381,10 +382,11 @@ char *test_threading_roles_names(void *context) // check non-proactor thread roles and names - sys_thread_role_t roles[3] = { + sys_thread_role_t roles[4] = { SYS_THREAD_CORE, SYS_THREAD_VFLOW, SYS_THREAD_LWS_HTTP, + SYS_THREAD_GENERAL_WORK }; for (int i = 0; i < 3; i++) { diff --git a/src/router_core/address_watch.c b/src/router_core/address_watch.c index 45e712197..60135048a 100644 --- a/src/router_core/address_watch.c +++ b/src/router_core/address_watch.c @@ -19,6 +19,7 @@ #include "router_core_private.h" #include "qpid/dispatch/amqp.h" +#include "qpid/dispatch/general_work.h" struct qdr_address_watch_t { DEQ_LINKS(struct qdr_address_watch_t); @@ -33,8 +34,8 @@ struct qdr_address_watch_t { ALLOC_DECLARE(qdr_address_watch_t); ALLOC_DEFINE(qdr_address_watch_t); -static void qdr_watch_invoker(qdr_core_t *core, qdr_general_work_t *work, bool discard); -static void qdr_watch_cancel_invoker(qdr_core_t *core, qdr_general_work_t *work, bool discard); +static void qdr_watch_invoker(void *context, void *args, bool discard); +static void qdr_watch_cancel_invoker(void *context, void *args, bool discard); static void qdr_core_watch_address_CT(qdr_core_t *core, qdr_action_t *action, bool discard); static void qdr_core_unwatch_address_CT(qdr_core_t *core, qdr_action_t *action, bool discard); static void qdr_address_watch_free_CT(qdr_core_t *core, qdr_address_watch_t *watch); @@ -80,19 +81,41 @@ void qdr_core_unwatch_address(qdr_core_t *core, qdr_watch_handle_t handle) //================================================================================== // In-Core API Functions //================================================================================== + +// arguments for the qdr_watch_invoker() update handler work item +// +typedef struct qdr_watch_invoker_args_t qdr_watch_invoker_args_t; +struct qdr_watch_invoker_args_t { + qdr_address_watch_update_t watch_update_handler; + uint32_t local_consumers; + uint32_t in_proc_consumers; + uint32_t remote_consumers; + uint32_t local_producers; +}; + +// arguments for the qdr_watch_cancel_invoker() cancel handler work item +// +typedef struct qdr_watch_cancel_invoker_args_t qdr_watch_cancel_invoker_args_t; +struct qdr_watch_cancel_invoker_args_t { + qdr_address_watch_cancel_t watch_cancel_handler; +}; + + void qdr_trigger_address_watch_CT(qdr_core_t *core, qdr_address_t *addr) { qdr_address_watch_t *watch = DEQ_HEAD(addr->watches); while (!!watch) { - qdr_general_work_t *work = qdr_general_work(qdr_watch_invoker); - work->watch_update_handler = watch->on_update; - work->context = watch->context; - work->local_consumers = DEQ_SIZE(addr->rlinks); - work->in_proc_consumers = DEQ_SIZE(addr->subscriptions); - work->remote_consumers = qd_bitmask_cardinality(addr->rnodes); - work->local_producers = DEQ_SIZE(addr->inlinks); - qdr_post_general_work_CT(core, work); + qd_general_work_t *work = qd_general_work(watch->context, + qdr_watch_invoker, + sizeof(qdr_watch_invoker_args_t)); + qdr_watch_invoker_args_t *args = (qdr_watch_invoker_args_t *) qd_general_work_args(work); + args->watch_update_handler = watch->on_update; + args->local_consumers = DEQ_SIZE(addr->rlinks); + args->in_proc_consumers = DEQ_SIZE(addr->subscriptions); + args->remote_consumers = qd_bitmask_cardinality(addr->rnodes); + args->local_producers = DEQ_SIZE(addr->inlinks); + qd_post_general_work(work); watch = DEQ_NEXT_N(PER_ADDRESS, watch); } } @@ -124,18 +147,21 @@ static void qdr_address_watch_free_CT(qdr_core_t *core, qdr_address_watch_t *wat } -static void qdr_watch_invoker(qdr_core_t *core, qdr_general_work_t *work, bool discard) +static void qdr_watch_invoker(void *context, void *args, bool discard) { - if (!discard) - work->watch_update_handler(work->context, - work->local_consumers, work->in_proc_consumers, work->remote_consumers, work->local_producers); + if (!discard) { + qdr_watch_invoker_args_t *iargs = (qdr_watch_invoker_args_t *) args; + iargs->watch_update_handler(context, + iargs->local_consumers, iargs->in_proc_consumers, iargs->remote_consumers, iargs->local_producers); + } } -static void qdr_watch_cancel_invoker(qdr_core_t *core, qdr_general_work_t *work, bool discard) +static void qdr_watch_cancel_invoker(void *context, void *args, bool discard) { // @TODO(kgiusti): pass discard flag to handler to allow it to clean up the context - work->watch_cancel_handler(work->context); + qdr_watch_cancel_invoker_args_t *iargs = (qdr_watch_cancel_invoker_args_t *) args; + iargs->watch_cancel_handler(context); } @@ -204,10 +230,12 @@ static void qdr_core_unwatch_address_CT(qdr_core_t *core, qdr_action_t *action, if (watch->watch_handle == watch_handle) { DEQ_REMOVE(core->addr_watches, watch); if (!!watch->on_cancel) { - qdr_general_work_t *work = qdr_general_work(qdr_watch_cancel_invoker); - work->watch_cancel_handler = watch->on_cancel; - work->context = watch->context; - qdr_post_general_work_CT(core, work); + qd_general_work_t *work = qd_general_work(watch->context, + qdr_watch_cancel_invoker, + sizeof(qdr_watch_cancel_invoker_args_t)); + qdr_watch_cancel_invoker_args_t *args = (qdr_watch_cancel_invoker_args_t *) qd_general_work_args(work); + args->watch_cancel_handler = watch->on_cancel; + qd_post_general_work(work); } qdr_address_watch_free_CT(core, watch); break; diff --git a/src/router_core/agent.c b/src/router_core/agent.c index 881c3532f..8e238f586 100644 --- a/src/router_core/agent.c +++ b/src/router_core/agent.c @@ -28,6 +28,8 @@ #include "agent_router_metrics.h" #include "router_core_private.h" #include "qpid/dispatch/amqp.h" +#include "qpid/dispatch/general_work.h" + static void qdr_manage_read_CT(qdr_core_t *core, qdr_action_t *action, bool discard); static void qdr_manage_create_CT(qdr_core_t *core, qdr_action_t *action, bool discard); @@ -41,7 +43,6 @@ ALLOC_DEFINE(qdr_query_t); struct qdr_agent_t { qdr_query_list_t outgoing_query_list; sys_mutex_t query_lock; - qd_timer_t *timer; qdr_manage_response_t response_handler; qdr_subscription_t *subscription_mobile; qdr_subscription_t *subscription_local; @@ -52,13 +53,17 @@ struct qdr_agent_t { // Internal Functions //================================================================================== -static void qdr_agent_response_handler(void *context) +// Runs on the general work thread +static void qdr_agent_response_handler(void *context, void *args, bool discard) { qdr_core_t *core = (qdr_core_t*) context; qdr_agent_t *agent = core->mgmt_agent; qdr_query_t *query; bool done = false; + if (discard) + return; + while (!done) { sys_mutex_lock(&agent->query_lock); query = DEQ_HEAD(agent->outgoing_query_list); @@ -86,8 +91,10 @@ void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query) bool notify = DEQ_SIZE(agent->outgoing_query_list) == 1; sys_mutex_unlock(&agent->query_lock); - if (notify) - qd_timer_schedule(agent->timer, 0); + if (notify) { + qd_general_work_t *work = qd_general_work(core, qdr_agent_response_handler, 0); + qd_post_general_work(work); + } } @@ -128,7 +135,6 @@ qdr_agent_t *qdr_agent(qdr_core_t *core) DEQ_INIT(agent->outgoing_query_list); sys_mutex_init(&agent->query_lock); - agent->timer = qd_timer(core->qd, qdr_agent_response_handler, core); return agent; } @@ -137,7 +143,14 @@ qdr_agent_t *qdr_agent(qdr_core_t *core) void qdr_agent_free(qdr_agent_t *agent) { if (agent) { - qd_timer_free(agent->timer); + + qdr_query_t *query = DEQ_HEAD(agent->outgoing_query_list); + while (query) { + DEQ_REMOVE_HEAD(agent->outgoing_query_list); + qdr_query_free(query); + query = DEQ_HEAD(agent->outgoing_query_list); + } + sys_mutex_free(&agent->query_lock); //we can't call qdr_core_unsubscribe on the subscriptions because the action processing thread has diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index 3df8c0fbc..dac57db91 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -21,6 +21,7 @@ #include "delivery.h" #include "router_core_private.h" +#include "qpid/dispatch/general_work.h" #include #include @@ -363,40 +364,59 @@ static void qdr_settle_subscription_delivery_CT(qdr_core_t *core, qdr_action_t * } -void qdr_forward_on_message(qdr_core_t *core, qdr_general_work_t *work, bool discard) +// Arguments to the qdr_forward_on_message() work handler +// +typedef struct qdr_forward_on_msg_args_t qdr_forward_on_msg_args_t; +struct qdr_forward_on_msg_args_t { + qdr_receive_t on_message; + void *on_message_context; + qd_message_t *msg; + int maskbit; + int inter_router_cost; + uint64_t in_conn_id; + const qd_policy_spec_t *policy_spec; + qdr_delivery_t *delivery; +}; + +// Runs on the general work handler thread +// +static void qdr_forward_on_message(void *context, void *args, bool discard) { + qdr_core_t *core = (qdr_core_t *) context; + qdr_forward_on_msg_args_t *fwd_args = (qdr_forward_on_msg_args_t *) args; + if (discard) { - qd_message_free(work->msg); - qdr_delivery_decref(core, work->delivery, "qdr_forward_on_message - discard on shutdown"); + qd_message_free(fwd_args->msg); + qdr_delivery_decref(core, fwd_args->delivery, "qdr_forward_on_message - discard on shutdown"); return; } qdr_error_t *error = 0; - uint64_t disposition = work->on_message(work->on_message_context, work->msg, work->maskbit, - work->inter_router_cost, work->in_conn_id, work->policy_spec, &error); - qd_message_free(work->msg); + uint64_t disposition = fwd_args->on_message(fwd_args->on_message_context, fwd_args->msg, fwd_args->maskbit, + fwd_args->inter_router_cost, fwd_args->in_conn_id, fwd_args->policy_spec, &error); + qd_message_free(fwd_args->msg); - if (!work->delivery) { + if (!fwd_args->delivery) { qdr_error_free(error); return; } - if (!work->delivery->multicast) { + if (!fwd_args->delivery->multicast) { qdr_action_t*action = qdr_action(qdr_settle_subscription_delivery_CT, "settle_subscription_delivery"); - action->args.delivery.delivery = work->delivery; + action->args.delivery.delivery = fwd_args->delivery; action->args.delivery.disposition = disposition; if (error) { // setting the local state will cause proton to send this // error to the remote - qd_delivery_state_free(work->delivery->local_state); - work->delivery->local_state = qd_delivery_state_from_error(error); + qd_delivery_state_free(fwd_args->delivery->local_state); + fwd_args->delivery->local_state = qd_delivery_state_from_error(error); } qdr_action_enqueue(core, action); // Transfer the delivery reference from work protection to action protection } else { qdr_error_free(error); - qdr_delivery_decref(core, work->delivery, "qdr_forward_on_message - remove from general work"); + qdr_delivery_decref(core, fwd_args->delivery, "qdr_forward_on_message - remove from general work"); } } @@ -429,21 +449,24 @@ void qdr_forward_on_message_CT(qdr_core_t *core, qdr_subscription_t *sub, qdr_li } } else { // - // The handler runs in an IO thread. Defer its invocation. + // The handler runs on the general work thread. Defer its invocation. // if (!!in_delivery) qdr_delivery_incref(in_delivery, "qdr_forward_on_message_CT - adding to general work item"); - qdr_general_work_t *work = qdr_general_work(qdr_forward_on_message); - work->on_message = sub->on_message; - work->on_message_context = sub->on_message_context; - work->msg = qd_message_copy(msg); - work->maskbit = mask_bit; - work->inter_router_cost = cost; - work->in_conn_id = identity; - work->policy_spec = link ? link->conn->policy_spec : 0; - work->delivery = in_delivery; - qdr_post_general_work_CT(core, work); + qd_general_work_t *work = qd_general_work(core, + qdr_forward_on_message, + sizeof(qdr_forward_on_msg_args_t)); + qdr_forward_on_msg_args_t *args = (qdr_forward_on_msg_args_t *) qd_general_work_args(work); + args->on_message = sub->on_message; + args->on_message_context = sub->on_message_context; + args->msg = qd_message_copy(msg); + args->maskbit = mask_bit; + args->inter_router_cost = cost; + args->in_conn_id = identity; + args->policy_spec = link ? link->conn->policy_spec : 0; + args->delivery = in_delivery; + qd_post_general_work(work); } } diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c index a622b97e2..ab1091af3 100644 --- a/src/router_core/route_tables.c +++ b/src/router_core/route_tables.c @@ -18,6 +18,7 @@ */ #include "router_core_private.h" +#include "qpid/dispatch/general_work.h" static void qdr_add_router_CT (qdr_core_t *core, qdr_action_t *action, bool discard); static void qdr_del_router_CT (qdr_core_t *core, qdr_action_t *action, bool discard); @@ -720,49 +721,73 @@ static void qdr_unsubscribe_CT(qdr_core_t *core, qdr_action_t *action, bool disc // Call-back Functions //================================================================================== -static void qdr_do_set_mobile_seq(qdr_core_t *core, qdr_general_work_t *work, bool discard) +typedef struct callback_args_t callback_args_t; +struct callback_args_t { + uint64_t mobile_seq; + int maskbit; +}; + +static void qdr_do_set_mobile_seq(void *context, void *args, bool discard) { + qdr_core_t *core = (qdr_core_t *) context; + callback_args_t *cb_args = (callback_args_t *) args; + if (!discard) - core->rt_set_mobile_seq(core->rt_context, work->maskbit, work->mobile_seq); + core->rt_set_mobile_seq(core->rt_context, cb_args->maskbit, cb_args->mobile_seq); } -static void qdr_do_set_my_mobile_seq(qdr_core_t *core, qdr_general_work_t *work, bool discard) +static void qdr_do_set_my_mobile_seq(void *context, void *args, bool discard) { + qdr_core_t *core = (qdr_core_t *) context; + callback_args_t *cb_args = (callback_args_t *) args; + if (!discard) - core->rt_set_my_mobile_seq(core->rt_context, work->mobile_seq); + core->rt_set_my_mobile_seq(core->rt_context, cb_args->mobile_seq); } -static void qdr_do_link_lost(qdr_core_t *core, qdr_general_work_t *work, bool discard) +static void qdr_do_link_lost(void *context, void *args, bool discard) { + qdr_core_t *core = (qdr_core_t *) context; + callback_args_t *cb_args = (callback_args_t *) args; + if (!discard) - core->rt_link_lost(core->rt_context, work->maskbit); + core->rt_link_lost(core->rt_context, cb_args->maskbit); } void qdr_post_set_mobile_seq_CT(qdr_core_t *core, int router_maskbit, uint64_t mobile_seq) { - qdr_general_work_t *work = qdr_general_work(qdr_do_set_mobile_seq); - work->mobile_seq = mobile_seq; - work->maskbit = router_maskbit; - qdr_post_general_work_CT(core, work); + qd_general_work_t *work = qd_general_work(core, + qdr_do_set_mobile_seq, + sizeof(callback_args_t)); + callback_args_t *args = (callback_args_t *) qd_general_work_args(work); + args->mobile_seq = mobile_seq; + args->maskbit = router_maskbit; + qd_post_general_work(work); } void qdr_post_set_my_mobile_seq_CT(qdr_core_t *core, uint64_t mobile_seq) { - qdr_general_work_t *work = qdr_general_work(qdr_do_set_my_mobile_seq); - work->mobile_seq = mobile_seq; - qdr_post_general_work_CT(core, work); + qd_general_work_t *work = qd_general_work(core, + qdr_do_set_my_mobile_seq, + sizeof(callback_args_t)); + callback_args_t *args = (callback_args_t *) qd_general_work_args(work); + args->mobile_seq = mobile_seq; + qd_post_general_work(work); } void qdr_post_link_lost_CT(qdr_core_t *core, int link_maskbit) { - qdr_general_work_t *work = qdr_general_work(qdr_do_link_lost); - work->maskbit = link_maskbit; - qdr_post_general_work_CT(core, work); + qd_general_work_t *work = qd_general_work(core, + qdr_do_link_lost, + sizeof(callback_args_t)); + callback_args_t *args = (callback_args_t *) qd_general_work_args(work); + args->maskbit = link_maskbit; + qd_post_general_work(work); } diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c index 01186b8df..7ed278af4 100644 --- a/src/router_core/router_core.c +++ b/src/router_core/router_core.c @@ -21,6 +21,7 @@ #include "delivery.h" #include "route_control.h" #include "router_core_private.h" +#include "qpid/dispatch/general_work.h" #include #include @@ -35,7 +36,6 @@ ALLOC_DEFINE_SAFE(qdr_link_t); ALLOC_DEFINE(qdr_router_ref_t); ALLOC_DEFINE(qdr_link_ref_t); ALLOC_DEFINE(qdr_delivery_cleanup_t); -ALLOC_DEFINE(qdr_general_work_t); ALLOC_DEFINE(qdr_link_work_t); ALLOC_DEFINE_SAFE(qdr_connection_ref_t); ALLOC_DEFINE(qdr_connection_info_t); @@ -43,7 +43,6 @@ ALLOC_DEFINE(qdr_subscription_ref_t); const uint64_t QD_DELIVERY_MOVED_TO_NEW_LINK = 999999999; -static void qdr_general_handler(void *context); static void qdr_core_setup_init(qdr_core_t *core) { @@ -103,10 +102,6 @@ qdr_core_t *qdr_core(qd_dispatch_t *qd, qd_router_mode_t mode, const char *area, DEQ_INIT(core->action_list); DEQ_INIT(core->action_list_background); - sys_mutex_init(&core->work_lock); - DEQ_INIT(core->work_list); - core->work_timer = qd_timer(core->qd, qdr_general_handler, core); - // // Set up the unique identifier generator // @@ -156,6 +151,9 @@ void qdr_core_free(qdr_core_t *core) // have adaptors clean up all core resources qdr_adaptors_finalize(core); + // At this point all threads are stopped. It is safe to discard any pending general work items + qd_general_work_finalize(); + // // The char* core->router_id and core->router_area are owned by qd->router_id and qd->router_area respectively // We will set them to zero here just in case anybody tries to use these fields. @@ -300,17 +298,6 @@ void qdr_core_free(qdr_core_t *core) qdr_agent_free(core->mgmt_agent); - // discard any left over general work items, allowing them to clean up any - // resources held by the work item - - while (!DEQ_IS_EMPTY(core->work_list)) { - qdr_general_work_t *work = DEQ_HEAD(core->work_list); - DEQ_REMOVE_HEAD(core->work_list); - work->handler(core, work, true); // discard == true - free_qdr_general_work_t(work); - work = DEQ_HEAD(core->work_list); - } - // discard any left over actions, allowing them to clean up any resources // held by the action @@ -346,7 +333,6 @@ void qdr_core_free(qdr_core_t *core) // action/work handler did not properly honor the discard flag and needs to // be fixed! - assert(DEQ_IS_EMPTY(core->work_list)); assert(DEQ_IS_EMPTY(core->action_list)); assert(DEQ_IS_EMPTY(core->action_list_background)); assert(DEQ_IS_EMPTY(core->streaming_connections)); @@ -366,9 +352,7 @@ void qdr_core_free(qdr_core_t *core) sys_thread_free(core->thread); sys_cond_free(&core->action_cond); sys_mutex_free(&core->action_lock); - sys_mutex_free(&core->work_lock); sys_mutex_free(&core->id_lock); - qd_timer_free(core->work_timer); free(core); } @@ -1015,50 +999,6 @@ void qdr_del_subscription_ref_CT(qdr_subscription_ref_list_t *list, qdr_subscrip } -static void qdr_general_handler(void *context) -{ - qdr_core_t *core = (qdr_core_t*) context; - qdr_general_work_list_t work_list; - qdr_general_work_t *work; - - sys_mutex_lock(&core->work_lock); - DEQ_MOVE(core->work_list, work_list); - sys_mutex_unlock(&core->work_lock); - - work = DEQ_HEAD(work_list); - while (work) { - DEQ_REMOVE_HEAD(work_list); - work->handler(core, work, false); - free_qdr_general_work_t(work); - work = DEQ_HEAD(work_list); - } -} - - -qdr_general_work_t *qdr_general_work(qdr_general_work_handler_t handler) -{ - qdr_general_work_t *work = new_qdr_general_work_t(); - ZERO(work); - work->handler = handler; - return work; -} - - -void qdr_post_general_work_CT(qdr_core_t *core, qdr_general_work_t *work) -{ - bool notify; - - sys_mutex_lock(&core->work_lock); - DEQ_ITEM_INIT(work); - DEQ_INSERT_TAIL(core->work_list, work); - notify = DEQ_SIZE(core->work_list) == 1; - sys_mutex_unlock(&core->work_lock); - - if (notify) - qd_timer_schedule(core->work_timer, 0); -} - - uint64_t qdr_identifier(qdr_core_t* core) { sys_mutex_lock(&core->id_lock); @@ -1082,9 +1022,19 @@ void qdr_connection_work_free_CT(qdr_connection_work_t *work) free_qdr_connection_work_t(work); } -static void qdr_post_global_stats_response(qdr_core_t *core, qdr_general_work_t *work, bool discard) + +// Arguments to the callback for posting the Global Statistics response +typedef struct qdr_post_global_stats_args_t qdr_post_global_stats_args_t; +struct qdr_post_global_stats_args_t { + qdr_global_stats_handler_t stats_handler; +}; + + +// Runs on the general work thread (not core) +static void qdr_post_global_stats_response(void *context, void *args, bool discard) { - work->stats_handler(work->context, discard); + qdr_post_global_stats_args_t *stats_args = (qdr_post_global_stats_args_t *) args; + stats_args->stats_handler(context, discard); } static void qdr_global_stats_request_CT(qdr_core_t *core, qdr_action_t *action, bool discard) @@ -1113,10 +1063,13 @@ static void qdr_global_stats_request_CT(qdr_core_t *core, qdr_action_t *action, stats->deliveries_stuck = core->deliveries_stuck; stats->links_blocked = core->links_blocked; } - qdr_general_work_t *work = qdr_general_work(qdr_post_global_stats_response); - work->stats_handler = action->args.stats_request.handler; - work->context = action->args.stats_request.context; - qdr_post_general_work_CT(core, work); + + qd_general_work_t *work = qd_general_work(action->args.stats_request.context, + qdr_post_global_stats_response, + sizeof(qdr_post_global_stats_args_t)); + qdr_post_global_stats_args_t *stats_args = (qdr_post_global_stats_args_t *) qd_general_work_args(work); + stats_args->stats_handler = action->args.stats_request.handler; + qd_post_general_work(work); } } diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 6fb36334a..e44078d54 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -215,52 +215,6 @@ struct qdr_delivery_cleanup_t { ALLOC_DECLARE(qdr_delivery_cleanup_t); DEQ_DECLARE(qdr_delivery_cleanup_t, qdr_delivery_cleanup_list_t); -// -// General Work -// -// The following types are used to post work to the IO threads for -// non-connection-specific action. These actions are serialized through -// a zero-delay timer and are processed by one thread at a time. General -// actions occur in-order and are not run concurrently. -// -// If the discard parameter to the handler is true the router is in the process -// of shutting down and cleaning up any outstanding general work items. At this -// point all threads have been shutdown and the handler must avoid scheduling -// any further work and should simply release any resources held by the work -// item. -// -typedef struct qdr_general_work_t qdr_general_work_t; -typedef void (*qdr_general_work_handler_t) (qdr_core_t *core, qdr_general_work_t *work, bool discard); - -struct qdr_general_work_t { - DEQ_LINKS(qdr_general_work_t); - qdr_general_work_handler_t handler; - int maskbit; - int inter_router_cost; - qd_message_t *msg; - qdr_receive_t on_message; - void *on_message_context; - uint64_t in_conn_id; - uint64_t mobile_seq; - uint32_t local_consumers; - uint32_t in_proc_consumers; - uint32_t remote_consumers; - uint32_t local_producers; - const qd_policy_spec_t *policy_spec; - qdr_delivery_t *delivery; - qdr_delivery_cleanup_list_t delivery_cleanup_list; - qdr_global_stats_handler_t stats_handler; - qdr_address_watch_update_t watch_update_handler; - qdr_address_watch_cancel_t watch_cancel_handler; - void *context; -}; - -ALLOC_DECLARE(qdr_general_work_t); -DEQ_DECLARE(qdr_general_work_t, qdr_general_work_list_t); - -qdr_general_work_t *qdr_general_work(qdr_general_work_handler_t handler); - - // // Connection Work // @@ -815,10 +769,7 @@ struct qdr_core_t { bool disable_867_fix; /// True if the fix for issue #867 is to be disabled - sys_mutex_t work_lock; qdr_core_timer_list_t scheduled_timers; - qdr_general_work_list_t work_list; - qd_timer_t *work_timer; sys_atomic_t uptime_ticks; qdr_protocol_adaptor_list_t protocol_adaptors; @@ -960,7 +911,6 @@ void qdr_post_set_mobile_seq_CT(qdr_core_t *core, int router_maskbit, uint64_t m void qdr_post_set_my_mobile_seq_CT(qdr_core_t *core, uint64_t mobile_seq); void qdr_post_link_lost_CT(qdr_core_t *core, int link_maskbit); -void qdr_post_general_work_CT(qdr_core_t *core, qdr_general_work_t *work); void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr); void qdr_process_addr_attributes_CT(qdr_core_t *core, qdr_address_t *addr); bool qdr_is_addr_treatment_multicast(qdr_address_t *addr); diff --git a/src/router_core/router_core_thread.c b/src/router_core/router_core_thread.c index 78e39b4f7..e75c227f2 100644 --- a/src/router_core/router_core_thread.c +++ b/src/router_core/router_core_thread.c @@ -21,6 +21,7 @@ #include "router_core_private.h" #include "qpid/dispatch/protocol_adaptor.h" +#include "qpid/dispatch/general_work.h" /** * Creates a thread that is dedicated to managing and using the routing table. @@ -103,19 +104,22 @@ static void qdr_activate_connections_CT(qdr_core_t *core) } -static void qdr_do_message_to_addr_free(qdr_core_t *core, qdr_general_work_t *work, bool discard) +// Runs on the general work thread +// +static void qdr_do_message_to_addr_free(void *context, void *args, bool discard) { // safe to ignore discard flag since this handler simply frees resources - qdr_delivery_cleanup_t *cleanup = DEQ_HEAD(work->delivery_cleanup_list); + qdr_delivery_cleanup_list_t *cleanup_list = (qdr_delivery_cleanup_list_t *) args; + qdr_delivery_cleanup_t *cleanup = DEQ_HEAD(*cleanup_list); while (cleanup) { - DEQ_REMOVE_HEAD(work->delivery_cleanup_list); + DEQ_REMOVE_HEAD(*cleanup_list); if (cleanup->msg) qd_message_free(cleanup->msg); if (cleanup->iter) qd_iterator_free(cleanup->iter); free_qdr_delivery_cleanup_t(cleanup); - cleanup = DEQ_HEAD(work->delivery_cleanup_list); + cleanup = DEQ_HEAD(*cleanup_list); } } @@ -263,9 +267,12 @@ void *router_core_thread(void *arg) // Schedule the cleanup of deliveries freed during this core-thread pass // if (DEQ_SIZE(core->delivery_cleanup_list) > 0) { - qdr_general_work_t *work = qdr_general_work(qdr_do_message_to_addr_free); - DEQ_MOVE(core->delivery_cleanup_list, work->delivery_cleanup_list); - qdr_post_general_work_CT(core, work); + qd_general_work_t *work = qd_general_work(0, + qdr_do_message_to_addr_free, + sizeof(qdr_delivery_cleanup_list_t)); + qdr_delivery_cleanup_list_t *cleanup_list = (qdr_delivery_cleanup_list_t *) qd_general_work_args(work); + DEQ_MOVE(core->delivery_cleanup_list, *cleanup_list); + qd_post_general_work(work); } }