Skip to content

Commit

Permalink
Added periodic beacon event for collector discovery.
Browse files Browse the repository at this point in the history
  • Loading branch information
ted-ross committed Mar 23, 2022
1 parent 7429449 commit 87e9ed8
Show file tree
Hide file tree
Showing 2 changed files with 188 additions and 7 deletions.
2 changes: 1 addition & 1 deletion python/skupper_router_internal/management/qdrouter.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class QdSchema(schema.Schema):

CONFIGURATION_ENTITY = "configurationEntity"
OPERATIONAL_ENTITY = "operationalEntity"
ILLEGAL_ID_CHARS = "|"
ILLEGAL_ID_CHARS = "|$"

def __init__(self) -> None:
"""Load schema."""
Expand Down
193 changes: 187 additions & 6 deletions src/protocol_log.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
*/

#include "qpid/dispatch/protocol_log.h"
#include "qpid/dispatch/protocol_adaptor.h"
#include "qpid/dispatch/ctools.h"
#include "qpid/dispatch/alloc.h"
#include "qpid/dispatch/io_module.h"
#include "qpid/dispatch/threading.h"
#include "qpid/dispatch/log.h"
#include "qpid/dispatch/compose.h"
#include "qpid/dispatch/amqp.h"
#include "qpid/dispatch/timer.h"
#include "dispatch_private.h"
#include "stdbool.h"
#include <inttypes.h>
Expand Down Expand Up @@ -85,13 +87,19 @@ struct plog_work_t {
char *string_val;
uint64_t int_val;
plog_identity_t ref_val;
void *pointer;
} value;
};

ALLOC_DECLARE(plog_work_t);
ALLOC_DEFINE(plog_work_t);
DEQ_DECLARE(plog_work_t, plog_work_list_t);

static const char *event_address_all = "mc/ske.$all";
static const char *event_address_my_prefix = "mc/ske.";
static char *event_address_my = 0;
static const int beacon_interval_sec = 5;

static sys_mutex_t *lock;
static sys_cond_t *condition;
static sys_thread_t *thread;
Expand All @@ -105,6 +113,12 @@ static uint32_t router_id;
static uint64_t next_identity = 0;
static const char *router_area;
static const char *router_name;
static qdr_watch_handle_t all_address_watch_handle;
static qdr_watch_handle_t my_address_watch_handle;
static bool all_address_usable = false;
static bool my_address_usable = false;
static qd_timer_t *beacon_timer = 0;
static uint64_t next_message_id = 0;


/**
Expand Down Expand Up @@ -697,6 +711,74 @@ static void _plog_flush(void)
}


static void _plog_send_beacon_TH(plog_work_t *work, bool discard)
{
if (!discard) {
qdr_core_t *core = (qdr_core_t*) work->value.pointer;

//
// Compose the message content starting with the properties
//
qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_PROPERTIES, 0);
qd_compose_start_list(field);
qd_compose_insert_long(field, next_message_id++);
qd_compose_insert_null(field); // user-id
qd_compose_insert_string(field, event_address_all); // to
qd_compose_insert_string(field, "ROUTER"); // subject
qd_compose_end_list(field);

//
// Append the body section to the content
//
field = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, field);
qd_compose_start_map(field);
qd_compose_insert_int(field, PLOG_ATTRIBUTE_IDENTITY);
qd_compose_start_list(field);
qd_compose_insert_int(field, local_router->identity.site_id);
qd_compose_insert_int(field, local_router->identity.router_id);
qd_compose_insert_int(field, local_router->identity.record_id);
qd_compose_end_list(field);
qd_compose_end_map(field);

//
// Create a message for the content
//
qd_message_t *beacon = qd_message();
qd_message_compose_2(beacon, field, true);

//
// Annotate the message so it will be properly multicast
//
qdr_new_message_annotate(core, beacon);

//
// Send the message to all of the bound receivers
//
qdr_send_to2(core, beacon, event_address_all, true, false);

//
// Free up used resources
//
qd_compose_free(field);
qd_message_free(beacon);
}
}


static void _plog_send_beacon(qdr_core_t *core)
{
plog_work_t *work = _plog_work(_plog_send_beacon_TH);
work->value.pointer = core;
_plog_post_work(work);
if (!!beacon_timer) {
qd_timer_schedule(beacon_timer, beacon_interval_sec * 1000);
}
}


//=====================================================================================
// Module Thread
//=====================================================================================
/**
* @brief Main function for the plog thread. This thread runs for the entire
* lifecycle of the router.
Expand All @@ -712,8 +794,6 @@ static void *_plog_thread(void *unused)

qd_log(log, QD_LOG_INFO, "Protocol logging started");

_plog_create_router_record();

while (running) {
//
// Use the lock only to protect the condition variable and the work lists
Expand Down Expand Up @@ -777,6 +857,66 @@ static void *_plog_thread(void *unused)
}


//=====================================================================================
// API Callbacks
//=====================================================================================
static void _plog_on_all_address_watch(void *context,
uint32_t local_consumers,
uint32_t in_proc_consumers,
uint32_t remote_consumers,
uint32_t local_producers)
{
bool now_usable = local_consumers > 0 || remote_consumers > 0;

if (now_usable && !all_address_usable) {
//
// Start sending beacon messages to the the all_address.
//
all_address_usable = true;
_plog_send_beacon((qdr_core_t*) context);
} else if (!now_usable && all_address_usable) {
//
// Stop sending beacons. Nobody is listening.
//
all_address_usable = false;
if (!!beacon_timer) {
qd_timer_cancel(beacon_timer);
}
}
}


static void _plog_on_my_address_watch(void *context,
uint32_t local_consumers,
uint32_t in_proc_consumers,
uint32_t remote_consumers,
uint32_t local_producers)
{
bool now_usable = local_consumers > 0 || remote_consumers > 0;

if (now_usable && !my_address_usable) {
//
// Start using the my_address
//
my_address_usable = true;
} else if (!now_usable && my_address_usable) {
//
// Stop using the my_address
//
my_address_usable = false;
}
}


static void _plog_on_beacon(void *context)
{
_plog_send_beacon((qdr_core_t*) context);
}


//=====================================================================================
// Public Functions
//=====================================================================================
plog_record_t *plog_start_record(plog_record_type_t record_type, plog_record_t *parent)
{
plog_record_t *record = new_plog_record_t();
Expand Down Expand Up @@ -937,11 +1077,14 @@ void plog_set_trace(plog_record_t *record, qd_message_t *msg)
}


//=====================================================================================
// IO Module Callbacks
//=====================================================================================
/**
* @brief Module initializer
*
* @param core Pointer to the core object
* @param adaptor_context (out) Unused context
* @param adaptor_context (out) Context set for use in finalizer
*/
static void _plog_init(qdr_core_t *core, void **adaptor_context)
{
Expand All @@ -954,20 +1097,58 @@ static void _plog_init(qdr_core_t *core, void **adaptor_context)
lock = sys_mutex();
condition = sys_cond();
thread = sys_thread(_plog_thread, 0);
*adaptor_context = 0;
*adaptor_context = core;

_plog_create_router_record();

event_address_my = (char*) malloc(71);
strcpy(event_address_my, event_address_my_prefix);
_plog_strncat_id(event_address_my, 70, &local_router->identity);

all_address_watch_handle = qdr_core_watch_address(core, event_address_all, 'M', _plog_on_all_address_watch, core);
my_address_watch_handle = qdr_core_watch_address(core, event_address_my, 'M', _plog_on_my_address_watch, core);

beacon_timer = qd_timer(qdr_core_dispatch(core), _plog_on_beacon, core);
}


/**
* @brief Module finalizer
*
* @param adaptor_context Unused
* @param adaptor_context Contains the core module pointer
*/
static void _plog_final(void *adaptor_context)
{
_plog_post_work(_plog_work(0)); // Signal for the thread to exit
qdr_core_t *core = (qdr_core_t*) adaptor_context;

qd_timer_free(beacon_timer);
beacon_timer = 0;

//
// Cancel the address watches
//
qdr_core_unwatch_address(core, all_address_watch_handle);
qdr_core_unwatch_address(core, my_address_watch_handle);

//
// Free the allocated my-address
//
free(event_address_my);

//
// Signal the thread to exit by posting a NULL work pointer
//
_plog_post_work(_plog_work(0));

//
// Join and free the thread
//
sys_thread_join(thread);
sys_thread_free(thread);

//
// Free the condition and lock variables
//
sys_cond_free(condition);
sys_mutex_free(lock);
}
Expand Down

0 comments on commit 87e9ed8

Please sign in to comment.