Skip to content

Commit

Permalink
ISSUE-1604: Work in progress - do not merge
Browse files Browse the repository at this point in the history
  • Loading branch information
kgiusti committed Aug 29, 2024
1 parent e50c2d5 commit a5cd339
Show file tree
Hide file tree
Showing 13 changed files with 480 additions and 194 deletions.
108 changes: 108 additions & 0 deletions include/qpid/dispatch/general_work.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#ifndef __general_work_h__
#define __general_work_h__ 1
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include <stddef.h>
#include <stdbool.h>


//
// General Work
//
// The following API can be used to post work to be executed on a background thread. Work handlers are run serially in
// the order they are posted (no two work items will run at the same time). The background thread is non-Proactor and
// can run at the same time as Proactor threads (I/O and qd_timers) as well as router core and other system threads.
//

typedef struct qd_general_work_t qd_general_work_t;

/**
* General work handler
*
* The signature of the function that is run on the background thread
*
* If the discard parameter to the handler is true the router is in the process of shutting down and cleaning up any
* outstanding general work items. At this point all threads have been shutdown and the handler must avoid scheduling
* any further work and should simply release any resources held by the args parameter.
*
* @param context the context parameter passed to qd_general_work() constructor
* @param args a pointer to memory holding the arguments set via qd_general_work_args()
* @param discard True if the router is shutting down and the handler should discard the work.
*/
typedef void (*qd_general_work_handler_t) (void *context, void *args, bool discard);

/**
* Create a new general work request
*
* @param context supplied by caller, passed to handler
* @param handler the function to run
* @param args_size the amount of memory needed for handler arguments
* @return a pointer to an initialized qd_general_work_t instance (never null)
*/
qd_general_work_t *qd_general_work(void *context, qd_general_work_handler_t handler, size_t args_size);

/**
* Access the work item's memory for handler arguments
*
* Use this function to initialize the handler's input parameters before posting the work item. It is expected that the
* caller will cast the return value to a pointer to the appropriate structure that holds the handler's parameters. This
* pointer will be passed in the args parameter to the handler function.
*
* The returned pointer must not be accessed after the work item has been scheduled (on return from
* qd_post_general_work())
*
* @param work the general work instance
* @return address of the start of argument memory. The amount of memory returned will be the value of the args_size
* parameter passed to qd_general_work()
*/
void *qd_general_work_args(qd_general_work_t *work);

/**
* Schedule the work item to run on the general work thread.
*
* The caller must not reference work on return from this call.
*
* @param work the work item to schedule
*/
void qd_post_general_work(qd_general_work_t *work);

/**
* Start the general work thread
*/
void qd_general_work_start(void);

/**
* Stop the general work thread.
*
* Blocks caller until thread has stopped. Work callbacks will cease being invoked on return to the caller.
*
*/
void qd_general_work_stop(void);

/**
* Free all resources associated with general work
*
* During this call any pending work items that have been submitted after qd_general_work_stop() has been called will be
* invoked with the discard flag true.
*
*/
void qd_general_work_finalize(void);

#endif // __general_work_h__
1 change: 1 addition & 0 deletions include/qpid/dispatch/threading.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ typedef enum {
SYS_THREAD_PROACTOR,
SYS_THREAD_VFLOW,
SYS_THREAD_LWS_HTTP,
SYS_THREAD_GENERAL_WORK,
// add new thread roles here and update _thread_names in threading.c
SYS_THREAD_ROLE_COUNT
} sys_thread_role_t;
Expand Down
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ set(qpid_dispatch_SOURCES
qd_asan_interface.c
protocols.c
connection_counters.c
general_work.c
)

set(qpid_dispatch_INCLUDES
Expand Down
6 changes: 6 additions & 0 deletions src/dispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "qpid/dispatch/discriminator.h"
#include "qpid/dispatch/server.h"
#include "qpid/dispatch/static_assert.h"
#include "qpid/dispatch/general_work.h"

#include <dlfcn.h>
#include <inttypes.h>
Expand Down Expand Up @@ -139,6 +140,8 @@ qd_dispatch_t *qd_dispatch(const char *python_pkgdir, bool test_hooks)
if (qd_error_code()) { qd_dispatch_free(qd); return 0; }
qd_message_initialize();
if (qd_error_code()) { qd_dispatch_free(qd); return 0; }
qd_general_work_start();

return qd;
}

Expand Down Expand Up @@ -384,6 +387,9 @@ void qd_dispatch_free(qd_dispatch_t *qd)
{
if (!qd) return;

// Stop the general work thread to prevent further callbacks
qd_general_work_stop();

/* Stop HTTP threads immediately */
qd_http_server_free(qd_server_http(qd->server));

Expand Down
169 changes: 169 additions & 0 deletions src/general_work.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/


#include "qpid/dispatch/general_work.h"
#include "qpid/dispatch/ctools.h"
#include "qpid/dispatch/threading.h"
#include "qpid/dispatch/alloc_pool.h"

#include <stdint.h>

// The maximum size allowed for handler arguments. This value can be increased should handlers need more memory for
// arguments.
#define QD_GENERAL_WORK_SIZE 160

struct qd_general_work_t {
DEQ_LINKS(qd_general_work_t);
void *context;
qd_general_work_handler_t handler;
uint8_t overlay[QD_GENERAL_WORK_SIZE];
};

ALLOC_DECLARE(qd_general_work_t);
ALLOC_DEFINE(qd_general_work_t);
DEQ_DECLARE(qd_general_work_t, qd_general_work_list_t);

static sys_mutex_t lock;
static sys_cond_t condition;
sys_thread_t *thread;

static qd_general_work_list_t work_list_LH = DEQ_EMPTY; // must hold lock
static bool need_wake_LH; // must hold lock
static bool running_LH; // must hold lock


static void *general_work_thread(void *context);


void qd_general_work_start(void)
{
sys_mutex_init(&lock);
sys_cond_init(&condition);
sys_mutex_lock(&lock);
running_LH = true;
sys_mutex_unlock(&lock);
thread = sys_thread(SYS_THREAD_GENERAL_WORK, general_work_thread, 0);
}


void qd_general_work_stop(void)
{
// signal the background thread to stop by sending a work request with no handler

qd_general_work_t *work = qd_general_work(0, 0, 0);
qd_post_general_work(work);
sys_thread_join(thread);
}

void qd_general_work_finalize(void)
{
// discard any left over general work items, allowing them to clean up any
// resources held by the work item

sys_mutex_lock(&lock);
assert(running_LH == false); // need to call qd_general_work_stop first
qd_general_work_t *work = DEQ_HEAD(work_list_LH);
while (!!work) {
DEQ_REMOVE_HEAD(work_list_LH);
sys_mutex_unlock(&lock);
work->handler(work->context,(void *) work->overlay, true); // discard == true
free_qd_general_work_t(work);
sys_mutex_lock(&lock);
work = DEQ_HEAD(work_list_LH);
}
sys_mutex_unlock(&lock);

sys_thread_free(thread);
sys_cond_free(&condition);
sys_mutex_free(&lock);
}


qd_general_work_t *qd_general_work(void *context, qd_general_work_handler_t handler, size_t args_size)
{
assert(args_size <= QD_GENERAL_WORK_SIZE); // you need to increase QD_GENERAL_WORK_SIZE
qd_general_work_t *work = new_qd_general_work_t();
ZERO(work);
work->context = context;
work->handler = handler;
return work;
}


void *qd_general_work_args(qd_general_work_t *work)
{
assert(work);
return (void *) work->overlay;
}


void qd_post_general_work(qd_general_work_t *work)
{
bool need_wake;

DEQ_ITEM_INIT(work);

sys_mutex_lock(&lock);
assert(running_LH); // post general work after thread stopped!
DEQ_INSERT_TAIL(work_list_LH, work);
need_wake = need_wake_LH;
if (need_wake) {
need_wake_LH = false;
}
sys_mutex_unlock(&lock);

if (need_wake) {
sys_cond_signal(&condition);
}
}


/**
* Thread main loop
*/
static void *general_work_thread(void *context)
{
qd_general_work_t *work = 0;

while (true) {

// Process one at a time, allowing other threads to run each time we take the lock
sys_mutex_lock(&lock);
work = DEQ_HEAD(work_list_LH);
while (!work) {
need_wake_LH = true;
sys_cond_wait(&condition, &lock);
work = DEQ_HEAD(work_list_LH);
}

DEQ_REMOVE_HEAD(work_list_LH);
if (!work->handler) {
// use a null handler as the stop thread indicator
running_LH = false;
sys_mutex_unlock(&lock);
free_qd_general_work_t(work);
return 0;
}
sys_mutex_unlock(&lock);

work->handler(work->context, (void *) work->overlay, false);
free_qd_general_work_t(work);
}
}
6 changes: 4 additions & 2 deletions src/posix/threading.c
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ static const char thread_names[SYS_THREAD_ROLE_COUNT][SYS_THREAD_NAME_MAX + 1] =
"core_thread", // SYS_THREAD_CORE
"wrkr_", // SYS_THREAD_PROACTOR (multiple)
"vflow_thread", // SYS_THREAD_VFLOW
"lws_thread" // SYS_THREAD_LWS_HTTP
"lws_thread", // SYS_THREAD_LWS_HTTP
"genwork_thread" // SYS_THREAD_GENERAL_WORK
};

static sys_atomic_t proactor_thread_count = 0;
Expand Down Expand Up @@ -381,10 +382,11 @@ char *test_threading_roles_names(void *context)

// check non-proactor thread roles and names

sys_thread_role_t roles[3] = {
sys_thread_role_t roles[4] = {
SYS_THREAD_CORE,
SYS_THREAD_VFLOW,
SYS_THREAD_LWS_HTTP,
SYS_THREAD_GENERAL_WORK
};

for (int i = 0; i < 3; i++) {
Expand Down
Loading

0 comments on commit a5cd339

Please sign in to comment.