Skip to content

Commit

Permalink
Added address-watch feature to the router core.
Browse files Browse the repository at this point in the history
  • Loading branch information
ted-ross committed Mar 4, 2022
1 parent 8eb5ae5 commit c847b2f
Show file tree
Hide file tree
Showing 11 changed files with 502 additions and 15 deletions.
65 changes: 65 additions & 0 deletions include/qpid/dispatch/router_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ qd_dispatch_t *qdr_core_dispatch(qdr_core_t *core);
void qdr_process_tick(qdr_core_t *core);


/**
* Return true iff the test hooks option is enabled for this process.
*
* @param core Pointer to the core object returned by qd_core()
* @return true iff test hooks are enabled
*/
bool qdr_core_test_hooks_enabled(const qdr_core_t *core);


/**
******************************************************************************
* Route table maintenance functions (Router Control)
Expand Down Expand Up @@ -163,6 +172,62 @@ void qdr_send_to2(qdr_core_t *core, qd_message_t *msg, const char *addr,
bool exclude_inprocess, bool control);


/**
******************************************************************************
* Address watch functions
******************************************************************************
*/

typedef uint32_t qdr_watch_handle_t;

/**
* Handler for updates on watched addresses. This function shall be invoked on an IO thread.
*
* Note: This function will be invoked when a watched address has a change in reachability.
* It is possible that the function may be called when no change occurs, particularly when an
* address is removed from the core address table.
*
* @param context The opaque context supplied in the call to qdr_core_watch_address
* @param local_consumers Number of consuming (outgoing) links for this address on this router
* @param in_proc_consumers Number of in-process consumers for this address on this router
* @param remote_consumers Number of remote routers with consumers for this address
* @param local_producers Number of producing (incoming) links for this address on this router
*/
typedef void (*qdr_address_watch_update_t)(void *context,
uint32_t local_consumers,
uint32_t in_proc_consumers,
uint32_t remote_consumers,
uint32_t local_producers);

/**
* qdr_core_watch_address
*
* Subscribe to watch for changes in the reachability for an address. It is safe to invoke this
* function from an IO thread.
*
* @param core Pointer to the core module
* @param address The address to be watched
* @param aclass Address class character
* @param on_watch The handler function
* @param context The opaque context sent to the handler on all invocations
* @return Watch handle to be used when canceling the watch
*/
qdr_watch_handle_t qdr_core_watch_address(qdr_core_t *core,
const char *address,
char aclass,
qdr_address_watch_update_t on_watch,
void *context);

/**
* qdr_core_unwatch_address
*
* Cancel an address watch subscription. It is safe to invoke this function from an IO thread.
*
* @param core Pointer to the core module
* @param handle Watch handle returned by qdr_core_watch_address
*/
void qdr_core_unwatch_address(qdr_core_t *core, qdr_watch_handle_t handle);

/**
******************************************************************************
* Error functions
Expand Down
2 changes: 2 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ set(qpid_dispatch_SOURCES
adaptors/http1/http1_server.c
adaptors/http1/http1_request_info.c
adaptors/tcp_adaptor.c
adaptors/test_adaptor.c
alloc.c
alloc_pool.c
aprintf.c
Expand Down Expand Up @@ -84,6 +85,7 @@ set(qpid_dispatch_SOURCES
protocol_log.c
router.c
router_core.c
router_core/address_watch.c
router_core/agent.c
router_core/agent_address.c
router_core/agent_config_address.c
Expand Down
68 changes: 68 additions & 0 deletions src/adaptors/test_adaptor.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "qpid/dispatch/alloc_pool.h"
#include "qpid/dispatch/ctools.h"
#include "qpid/dispatch/protocol_adaptor.h"
#include "qpid/dispatch/router_core.h"

#include <inttypes.h>
#include <stdio.h>

static const char *address_1 = "addr_watch/test_address/1";
static const char *address_2 = "addr_watch/test_address/2";

static qdr_watch_handle_t handle1;
static qdr_watch_handle_t handle2;

static qdr_core_t *core_ptr = 0;
static qd_log_source_t *log_source = 0;

static void on_watch(void *context,
uint32_t local_consumers,
uint32_t in_proc_consumers,
uint32_t remote_consumers,
uint32_t local_producers)
{
qd_log(log_source, QD_LOG_INFO, "on_watch (%ld): loc: %"PRIu32" rem: %"PRIu32" prod: %"PRIu32"",
(long) context, local_consumers, remote_consumers, local_producers);
}


static void qdr_test_adaptor_init(qdr_core_t *core, void **adaptor_context)
{
core_ptr = core;
if (qdr_core_test_hooks_enabled(core)) {
log_source = qd_log_source("ADDRESS_WATCH");
handle1 = qdr_core_watch_address(core, address_1, QD_ITER_HASH_PREFIX_MOBILE, on_watch, (void*) 1);
handle2 = qdr_core_watch_address(core, address_2, QD_ITER_HASH_PREFIX_MOBILE, on_watch, (void*) 2);
}
}


static void qdr_test_adaptor_final(void *adaptor_context)
{
if (qdr_core_test_hooks_enabled(core_ptr)) {
qdr_core_unwatch_address(core_ptr, handle1);
qdr_core_unwatch_address(core_ptr, handle2);
}
}


QDR_CORE_ADAPTOR_DECLARE("test-adaptor", qdr_test_adaptor_init, qdr_test_adaptor_final)
157 changes: 157 additions & 0 deletions src/router_core/address_watch.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "router_core_private.h"
#include "qpid/dispatch/amqp.h"

struct qdr_address_watch_t {
DEQ_LINKS(struct qdr_address_watch_t);
qdr_watch_handle_t watch_handle;
char *address_hash;
qdr_address_watch_update_t handler;
void *context;
};

ALLOC_DECLARE(qdr_address_watch_t);
ALLOC_DEFINE(qdr_address_watch_t);

static void qdr_watch_invoker(qdr_core_t *core, qdr_general_work_t *work);
static void qdr_core_watch_address_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
static void qdr_core_unwatch_address_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
static void qdr_address_watch_free_CT(qdr_address_watch_t *watch);

//==================================================================================
// Core Interface Functions
//==================================================================================
qdr_watch_handle_t qdr_core_watch_address(qdr_core_t *core,
const char *address,
char aclass,
qdr_address_watch_update_t on_watch,
void *context)
{
static sys_atomic_t next_handle;
qdr_action_t *action = qdr_action(qdr_core_watch_address_CT, "watch_address");

action->args.io.address = qdr_field(address);
action->args.io.address_class = aclass;
action->args.io.watch_handler = on_watch;
action->args.io.context = context;
action->args.io.value32_1 = sys_atomic_inc(&next_handle);

qdr_action_enqueue(core, action);
return action->args.io.value32_1;
}


void qdr_core_unwatch_address(qdr_core_t *core, qdr_watch_handle_t handle)
{
qdr_action_t *action = qdr_action(qdr_core_unwatch_address_CT, "unwatch_address");

action->args.io.value32_1 = handle;
qdr_action_enqueue(core, action);
}


//==================================================================================
// In-Core API Functions
//==================================================================================
void qdr_trigger_address_watch_CT(qdr_core_t *core, qdr_address_t *addr)
{
const char *address_hash = (char*) qd_hash_key_by_handle(addr->hash_handle);
qdr_address_watch_t *watch = DEQ_HEAD(core->addr_watches);

while (!!watch) {
if (strcmp(watch->address_hash, address_hash) == 0) {
qdr_general_work_t *work = qdr_general_work(qdr_watch_invoker);
work->watch_handler = watch->handler;
work->context = watch->context;
work->local_consumers = DEQ_SIZE(addr->rlinks);
work->in_proc_consumers = DEQ_SIZE(addr->subscriptions);
work->remote_consumers = qd_bitmask_cardinality(addr->rnodes);
work->local_producers = DEQ_SIZE(addr->inlinks);
qdr_post_general_work_CT(core, work);
}
watch = DEQ_NEXT(watch);
}
}

void qdr_address_watch_shutdown(qdr_core_t *core)
{
qdr_address_watch_t *watch = DEQ_HEAD(core->addr_watches);
while (!!watch) {
DEQ_REMOVE(core->addr_watches, watch);
qdr_address_watch_free_CT(watch);
watch = DEQ_HEAD(core->addr_watches);
}
}


//==================================================================================
// Local Functions
//==================================================================================
static void qdr_address_watch_free_CT(qdr_address_watch_t *watch)
{
free(watch->address_hash);
free_qdr_address_watch_t(watch);
}


static void qdr_watch_invoker(qdr_core_t *core, qdr_general_work_t *work)
{
work->watch_handler(work->context,
work->local_consumers, work->in_proc_consumers, work->remote_consumers, work->local_producers);
}


static void qdr_core_watch_address_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
if (!discard) {
qd_iterator_t *iter = qdr_field_iterator(action->args.io.address);
qd_iterator_annotate_prefix(iter, action->args.io.address_class);
qd_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);

qdr_address_watch_t *watch = new_qdr_address_watch_t();
ZERO(watch);
watch->watch_handle = action->args.io.value32_1;
watch->address_hash = (char*) qd_iterator_copy(iter);
watch->handler = action->args.io.watch_handler;
watch->context = action->args.io.context;

DEQ_INSERT_TAIL(core->addr_watches, watch);
}
qdr_field_free(action->args.io.address);
}


static void qdr_core_unwatch_address_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
if (!discard) {
qdr_watch_handle_t watch_handle = action->args.io.value32_1;

qdr_address_watch_t *watch = DEQ_HEAD(core->addr_watches);
while (!!watch) {
if (watch->watch_handle == watch_handle) {
DEQ_REMOVE(core->addr_watches, watch);
qdr_address_watch_free_CT(watch);
break;
}
watch = DEQ_NEXT(watch);
}
}
}
41 changes: 41 additions & 0 deletions src/router_core/address_watch.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#ifndef qd_address_watch
#define qd_address_watch 1
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include <qpid/dispatch/alloc.h>
#include <qpid/dispatch/router_core.h>

typedef struct qdr_address_watch_t qdr_address_watch_t;
DEQ_DECLARE(qdr_address_watch_t, qdr_address_watch_list_t);

/**
* qdr_trigger_address_watch_CT
*
* This function is invoked after changes have been made to the address that affect
* reachability (i.e. local and remote senders and receivers).
*
* @param core Pointer to the router core state
* @param addr Pointer to the address record that was modified
*/
void qdr_trigger_address_watch_CT(qdr_core_t *core, qdr_address_t *addr);

void qdr_address_watch_shutdown(qdr_core_t *core);

#endif
5 changes: 5 additions & 0 deletions src/router_core/modules/mobile_sync/mobile.c
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,8 @@ static void qcm_mobile_sync_on_mau_CT(qdrm_mobile_sync_t *msync, qd_parsed_field
qdrc_event_addr_raise(msync->core, QDRC_EVENT_ADDR_BECAME_DEST, addr);
else if (qd_bitmask_cardinality(addr->rnodes) == 1 && DEQ_SIZE(addr->rlinks) == 1)
qdrc_event_addr_raise(msync->core, QDRC_EVENT_ADDR_TWO_DEST, addr);

qdr_trigger_address_watch_CT(msync->core, addr);
}
} while (false);

Expand Down Expand Up @@ -667,6 +669,7 @@ static void qcm_mobile_sync_on_mau_CT(qdrm_mobile_sync_t *msync, qd_parsed_field
else if (qd_bitmask_cardinality(addr->rnodes) == 0 && DEQ_SIZE(addr->rlinks) == 1)
qdrc_event_addr_raise(msync->core, QDRC_EVENT_ADDR_ONE_LOCAL_DEST, addr);

qdr_trigger_address_watch_CT(msync->core, addr);
qdr_check_addr_CT(msync->core, addr);
}
}
Expand Down Expand Up @@ -701,6 +704,7 @@ static void qcm_mobile_sync_on_mau_CT(qdrm_mobile_sync_t *msync, qd_parsed_field
else if (qd_bitmask_cardinality(addr->rnodes) == 0 && DEQ_SIZE(addr->rlinks) == 1)
qdrc_event_addr_raise(msync->core, QDRC_EVENT_ADDR_ONE_LOCAL_DEST, addr);

qdr_trigger_address_watch_CT(msync->core, addr);
qdr_check_addr_CT(msync->core, addr);
}
addr = next_addr;
Expand Down Expand Up @@ -826,6 +830,7 @@ static void qcm_mobile_sync_on_router_flush_CT(qdrm_mobile_sync_t *msync, qdr_no
else if (qd_bitmask_cardinality(addr->rnodes) == 0 && DEQ_SIZE(addr->rlinks) == 1)
qdrc_event_addr_raise(msync->core, QDRC_EVENT_ADDR_ONE_LOCAL_DEST, addr);

qdr_trigger_address_watch_CT(msync->core, addr);
qdr_check_addr_CT(msync->core, addr);
}
addr = next_addr;
Expand Down
Loading

0 comments on commit c847b2f

Please sign in to comment.