diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h index 6dc32e4db..a0d9208dc 100644 --- a/include/qpid/dispatch/router_core.h +++ b/include/qpid/dispatch/router_core.h @@ -72,6 +72,15 @@ qd_dispatch_t *qdr_core_dispatch(qdr_core_t *core); void qdr_process_tick(qdr_core_t *core); +/** + * Return true iff the test hooks option is enabled for this process. + * + * @param core Pointer to the core object returned by qd_core() + * @return true iff test hooks are enabled + */ +bool qdr_core_test_hooks_enabled(const qdr_core_t *core); + + /** ****************************************************************************** * Route table maintenance functions (Router Control) @@ -163,6 +172,62 @@ void qdr_send_to2(qdr_core_t *core, qd_message_t *msg, const char *addr, bool exclude_inprocess, bool control); +/** + ****************************************************************************** + * Address watch functions + ****************************************************************************** + */ + +typedef uint32_t qdr_watch_handle_t; + +/** + * Handler for updates on watched addresses. This function shall be invoked on an IO thread. + * + * Note: This function will be invoked when a watched address has a change in reachability. + * It is possible that the function may be called when no change occurs, particularly when an + * address is removed from the core address table. + * + * @param context The opaque context supplied in the call to qdr_core_watch_address + * @param local_consumers Number of consuming (outgoing) links for this address on this router + * @param in_proc_consumers Number of in-process consumers for this address on this router + * @param remote_consumers Number of remote routers with consumers for this address + * @param local_producers Number of producing (incoming) links for this address on this router + */ +typedef void (*qdr_address_watch_update_t)(void *context, + uint32_t local_consumers, + uint32_t in_proc_consumers, + uint32_t remote_consumers, + uint32_t local_producers); + +/** + * qdr_core_watch_address + * + * Subscribe to watch for changes in the reachability for an address. It is safe to invoke this + * function from an IO thread. + * + * @param core Pointer to the core module + * @param address The address to be watched + * @param aclass Address class character + * @param on_watch The handler function + * @param context The opaque context sent to the handler on all invocations + * @return Watch handle to be used when canceling the watch + */ +qdr_watch_handle_t qdr_core_watch_address(qdr_core_t *core, + const char *address, + char aclass, + qdr_address_watch_update_t on_watch, + void *context); + +/** + * qdr_core_unwatch_address + * + * Cancel an address watch subscription. It is safe to invoke this function from an IO thread. + * + * @param core Pointer to the core module + * @param handle Watch handle returned by qdr_core_watch_address + */ +void qdr_core_unwatch_address(qdr_core_t *core, qdr_watch_handle_t handle); + /** ****************************************************************************** * Error functions diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 4588be005..95ed4ef6f 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -47,6 +47,7 @@ set(qpid_dispatch_SOURCES adaptors/http1/http1_server.c adaptors/http1/http1_request_info.c adaptors/tcp_adaptor.c + adaptors/test_adaptor.c alloc.c alloc_pool.c aprintf.c @@ -84,6 +85,7 @@ set(qpid_dispatch_SOURCES protocol_log.c router.c router_core.c + router_core/address_watch.c router_core/agent.c router_core/agent_address.c router_core/agent_config_address.c diff --git a/src/adaptors/test_adaptor.c b/src/adaptors/test_adaptor.c new file mode 100644 index 000000000..845a9c8e4 --- /dev/null +++ b/src/adaptors/test_adaptor.c @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "qpid/dispatch/alloc_pool.h" +#include "qpid/dispatch/ctools.h" +#include "qpid/dispatch/protocol_adaptor.h" +#include "qpid/dispatch/router_core.h" + +#include +#include + +static const char *address_1 = "addr_watch/test_address/1"; +static const char *address_2 = "addr_watch/test_address/2"; + +static qdr_watch_handle_t handle1; +static qdr_watch_handle_t handle2; + +static qdr_core_t *core_ptr = 0; +static qd_log_source_t *log_source = 0; + +static void on_watch(void *context, + uint32_t local_consumers, + uint32_t in_proc_consumers, + uint32_t remote_consumers, + uint32_t local_producers) +{ + qd_log(log_source, QD_LOG_INFO, "on_watch (%ld): loc: %"PRIu32" rem: %"PRIu32" prod: %"PRIu32"", + (long) context, local_consumers, remote_consumers, local_producers); +} + + +static void qdr_test_adaptor_init(qdr_core_t *core, void **adaptor_context) +{ + core_ptr = core; + if (qdr_core_test_hooks_enabled(core)) { + log_source = qd_log_source("ADDRESS_WATCH"); + handle1 = qdr_core_watch_address(core, address_1, QD_ITER_HASH_PREFIX_MOBILE, on_watch, (void*) 1); + handle2 = qdr_core_watch_address(core, address_2, QD_ITER_HASH_PREFIX_MOBILE, on_watch, (void*) 2); + } +} + + +static void qdr_test_adaptor_final(void *adaptor_context) +{ + if (qdr_core_test_hooks_enabled(core_ptr)) { + qdr_core_unwatch_address(core_ptr, handle1); + qdr_core_unwatch_address(core_ptr, handle2); + } +} + + +QDR_CORE_ADAPTOR_DECLARE("test-adaptor", qdr_test_adaptor_init, qdr_test_adaptor_final) diff --git a/src/router_core/address_watch.c b/src/router_core/address_watch.c new file mode 100644 index 000000000..ea61ca706 --- /dev/null +++ b/src/router_core/address_watch.c @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "router_core_private.h" +#include "qpid/dispatch/amqp.h" + +struct qdr_address_watch_t { + DEQ_LINKS(struct qdr_address_watch_t); + qdr_watch_handle_t watch_handle; + char *address_hash; + qdr_address_watch_update_t handler; + void *context; +}; + +ALLOC_DECLARE(qdr_address_watch_t); +ALLOC_DEFINE(qdr_address_watch_t); + +static void qdr_watch_invoker(qdr_core_t *core, qdr_general_work_t *work); +static void qdr_core_watch_address_CT(qdr_core_t *core, qdr_action_t *action, bool discard); +static void qdr_core_unwatch_address_CT(qdr_core_t *core, qdr_action_t *action, bool discard); +static void qdr_address_watch_free_CT(qdr_address_watch_t *watch); + +//================================================================================== +// Core Interface Functions +//================================================================================== +qdr_watch_handle_t qdr_core_watch_address(qdr_core_t *core, + const char *address, + char aclass, + qdr_address_watch_update_t on_watch, + void *context) +{ + static sys_atomic_t next_handle; + qdr_action_t *action = qdr_action(qdr_core_watch_address_CT, "watch_address"); + + action->args.io.address = qdr_field(address); + action->args.io.address_class = aclass; + action->args.io.watch_handler = on_watch; + action->args.io.context = context; + action->args.io.value32_1 = sys_atomic_inc(&next_handle); + + qdr_action_enqueue(core, action); + return action->args.io.value32_1; +} + + +void qdr_core_unwatch_address(qdr_core_t *core, qdr_watch_handle_t handle) +{ + qdr_action_t *action = qdr_action(qdr_core_unwatch_address_CT, "unwatch_address"); + + action->args.io.value32_1 = handle; + qdr_action_enqueue(core, action); +} + + +//================================================================================== +// In-Core API Functions +//================================================================================== +void qdr_trigger_address_watch_CT(qdr_core_t *core, qdr_address_t *addr) +{ + const char *address_hash = (char*) qd_hash_key_by_handle(addr->hash_handle); + qdr_address_watch_t *watch = DEQ_HEAD(core->addr_watches); + + while (!!watch) { + if (strcmp(watch->address_hash, address_hash) == 0) { + qdr_general_work_t *work = qdr_general_work(qdr_watch_invoker); + work->watch_handler = watch->handler; + work->context = watch->context; + work->local_consumers = DEQ_SIZE(addr->rlinks); + work->in_proc_consumers = DEQ_SIZE(addr->subscriptions); + work->remote_consumers = qd_bitmask_cardinality(addr->rnodes); + work->local_producers = DEQ_SIZE(addr->inlinks); + qdr_post_general_work_CT(core, work); + } + watch = DEQ_NEXT(watch); + } +} + +void qdr_address_watch_shutdown(qdr_core_t *core) +{ + qdr_address_watch_t *watch = DEQ_HEAD(core->addr_watches); + while (!!watch) { + DEQ_REMOVE(core->addr_watches, watch); + qdr_address_watch_free_CT(watch); + watch = DEQ_HEAD(core->addr_watches); + } +} + + +//================================================================================== +// Local Functions +//================================================================================== +static void qdr_address_watch_free_CT(qdr_address_watch_t *watch) +{ + free(watch->address_hash); + free_qdr_address_watch_t(watch); +} + + +static void qdr_watch_invoker(qdr_core_t *core, qdr_general_work_t *work) +{ + work->watch_handler(work->context, + work->local_consumers, work->in_proc_consumers, work->remote_consumers, work->local_producers); +} + + +static void qdr_core_watch_address_CT(qdr_core_t *core, qdr_action_t *action, bool discard) +{ + if (!discard) { + qd_iterator_t *iter = qdr_field_iterator(action->args.io.address); + qd_iterator_annotate_prefix(iter, action->args.io.address_class); + qd_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH); + + qdr_address_watch_t *watch = new_qdr_address_watch_t(); + ZERO(watch); + watch->watch_handle = action->args.io.value32_1; + watch->address_hash = (char*) qd_iterator_copy(iter); + watch->handler = action->args.io.watch_handler; + watch->context = action->args.io.context; + + DEQ_INSERT_TAIL(core->addr_watches, watch); + } + qdr_field_free(action->args.io.address); +} + + +static void qdr_core_unwatch_address_CT(qdr_core_t *core, qdr_action_t *action, bool discard) +{ + if (!discard) { + qdr_watch_handle_t watch_handle = action->args.io.value32_1; + + qdr_address_watch_t *watch = DEQ_HEAD(core->addr_watches); + while (!!watch) { + if (watch->watch_handle == watch_handle) { + DEQ_REMOVE(core->addr_watches, watch); + qdr_address_watch_free_CT(watch); + break; + } + watch = DEQ_NEXT(watch); + } + } +} diff --git a/src/router_core/address_watch.h b/src/router_core/address_watch.h new file mode 100644 index 000000000..307e38548 --- /dev/null +++ b/src/router_core/address_watch.h @@ -0,0 +1,41 @@ +#ifndef qd_address_watch +#define qd_address_watch 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include + +typedef struct qdr_address_watch_t qdr_address_watch_t; +DEQ_DECLARE(qdr_address_watch_t, qdr_address_watch_list_t); + +/** + * qdr_trigger_address_watch_CT + * + * This function is invoked after changes have been made to the address that affect + * reachability (i.e. local and remote senders and receivers). + * + * @param core Pointer to the router core state + * @param addr Pointer to the address record that was modified + */ +void qdr_trigger_address_watch_CT(qdr_core_t *core, qdr_address_t *addr); + +void qdr_address_watch_shutdown(qdr_core_t *core); + +#endif diff --git a/src/router_core/modules/mobile_sync/mobile.c b/src/router_core/modules/mobile_sync/mobile.c index f1d72ee2e..c70db4c7b 100644 --- a/src/router_core/modules/mobile_sync/mobile.c +++ b/src/router_core/modules/mobile_sync/mobile.c @@ -630,6 +630,8 @@ static void qcm_mobile_sync_on_mau_CT(qdrm_mobile_sync_t *msync, qd_parsed_field qdrc_event_addr_raise(msync->core, QDRC_EVENT_ADDR_BECAME_DEST, addr); else if (qd_bitmask_cardinality(addr->rnodes) == 1 && DEQ_SIZE(addr->rlinks) == 1) qdrc_event_addr_raise(msync->core, QDRC_EVENT_ADDR_TWO_DEST, addr); + + qdr_trigger_address_watch_CT(msync->core, addr); } } while (false); @@ -667,6 +669,7 @@ static void qcm_mobile_sync_on_mau_CT(qdrm_mobile_sync_t *msync, qd_parsed_field else if (qd_bitmask_cardinality(addr->rnodes) == 0 && DEQ_SIZE(addr->rlinks) == 1) qdrc_event_addr_raise(msync->core, QDRC_EVENT_ADDR_ONE_LOCAL_DEST, addr); + qdr_trigger_address_watch_CT(msync->core, addr); qdr_check_addr_CT(msync->core, addr); } } @@ -701,6 +704,7 @@ static void qcm_mobile_sync_on_mau_CT(qdrm_mobile_sync_t *msync, qd_parsed_field else if (qd_bitmask_cardinality(addr->rnodes) == 0 && DEQ_SIZE(addr->rlinks) == 1) qdrc_event_addr_raise(msync->core, QDRC_EVENT_ADDR_ONE_LOCAL_DEST, addr); + qdr_trigger_address_watch_CT(msync->core, addr); qdr_check_addr_CT(msync->core, addr); } addr = next_addr; @@ -826,6 +830,7 @@ static void qcm_mobile_sync_on_router_flush_CT(qdrm_mobile_sync_t *msync, qdr_no else if (qd_bitmask_cardinality(addr->rnodes) == 0 && DEQ_SIZE(addr->rlinks) == 1) qdrc_event_addr_raise(msync->core, QDRC_EVENT_ADDR_ONE_LOCAL_DEST, addr); + qdr_trigger_address_watch_CT(msync->core, addr); qdr_check_addr_CT(msync->core, addr); } addr = next_addr; diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c index 62fa957bc..e3e544ae0 100644 --- a/src/router_core/route_tables.c +++ b/src/router_core/route_tables.c @@ -388,11 +388,13 @@ static void qdr_del_router_CT(qdr_core_t *core, qdr_action_t *action, bool disca // qdr_address_t *addr = DEQ_HEAD(core->addrs); while (addr && rnode->ref_count > 0) { - if (qd_bitmask_clear_bit(addr->rnodes, router_maskbit)) + if (qd_bitmask_clear_bit(addr->rnodes, router_maskbit)) { // // If the cleared bit was originally set, decrement the ref count // rnode->ref_count--; + qdr_trigger_address_watch_CT(core, addr); + } addr = DEQ_NEXT(addr); } assert(rnode->ref_count == 0); diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c index 9c7dc88ee..e75ccc798 100644 --- a/src/router_core/router_core.c +++ b/src/router_core/router_core.c @@ -230,6 +230,11 @@ void qdr_core_free(qdr_core_t *core) // this must happen after qdrc_endpoint_do_cleanup_CT calls qdr_modules_finalize(core); + // + // Remove any left-over address watches + // + qdr_address_watch_shutdown(core); + // discard any left over actions qdr_action_list_t action_list; @@ -329,6 +334,11 @@ void qdr_router_node_free(qdr_core_t *core, qdr_node_t *rnode) free_qdr_node_t(rnode); } +bool qdr_core_test_hooks_enabled(const qdr_core_t *core) +{ + return core->qd->test_hooks; +} + ALLOC_DECLARE(qdr_field_t); ALLOC_DEFINE(qdr_field_t); @@ -582,13 +592,6 @@ void qdr_core_remove_address(qdr_core_t *core, qdr_address_t *addr) if (config && --config->ref_count == 0) free_address_config(config); - // Remove the address from the list, hash index, and parse tree - DEQ_REMOVE(core->addrs, addr); - if (addr->hash_handle) { - qd_hash_remove_by_handle(core->addr_hash, addr->hash_handle); - qd_hash_handle_free(addr->hash_handle); - } - // Free resources associated with this address DEQ_APPEND(addr->rlinks, addr->inlinks); @@ -601,6 +604,19 @@ void qdr_core_remove_address(qdr_core_t *core, qdr_address_t *addr) lref = DEQ_HEAD(addr->rlinks); } + // + // Trigger an address watch to show the address has no more endpoints. + // + qd_bitmask_clear_all(addr->rnodes); + qdr_trigger_address_watch_CT(core, addr); + + // Remove the address from the list, hash index, and parse tree + DEQ_REMOVE(core->addrs, addr); + if (addr->hash_handle) { + qd_hash_remove_by_handle(core->addr_hash, addr->hash_handle); + qd_hash_handle_free(addr->hash_handle); + } + qd_bitmask_free(addr->rnodes); if (addr->treatment == QD_TREATMENT_ANYCAST_CLOSEST) { qd_bitmask_free(addr->closest_remotes); @@ -645,6 +661,8 @@ void qdr_core_bind_address_link_CT(qdr_core_t *core, qdr_address_t *addr, qdr_li else if (DEQ_SIZE(addr->inlinks) == 2) qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_TWO_SOURCE, addr); } + + qdr_trigger_address_watch_CT(core, addr); } @@ -674,6 +692,8 @@ void qdr_core_unbind_address_link_CT(qdr_core_t *core, qdr_address_t *addr, qdr_ qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_ONE_SOURCE, addr); } } + + qdr_trigger_address_watch_CT(core, addr); } diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 0f72287aa..530a8db4b 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -56,6 +56,7 @@ ALLOC_DECLARE(qdr_link_t); #include "core_attach_address_lookup.h" #include "core_events.h" #include "core_link_endpoint.h" +#include "address_watch.h" qdr_forwarder_t *qdr_forwarder_CT(qdr_core_t *core, qd_address_treatment_t treatment); int qdr_forward_message_CT(qdr_core_t *core, qdr_address_t *addr, qd_message_t *msg, qdr_delivery_t *in_delivery, @@ -146,13 +147,16 @@ struct qdr_action_t { // Arguments for in-process messaging // struct { - qdr_field_t *address; - char address_class; - qd_address_treatment_t treatment; - qdr_subscription_t *subscription; - qd_message_t *message; - bool exclude_inprocess; - bool control; + qdr_field_t *address; + char address_class; + qd_address_treatment_t treatment; + qdr_subscription_t *subscription; + qd_message_t *message; + bool exclude_inprocess; + bool control; + qdr_address_watch_update_t watch_handler; + void *context; + uint32_t value32_1; } io; // @@ -227,10 +231,15 @@ struct qdr_general_work_t { void *on_message_context; uint64_t in_conn_id; uint64_t mobile_seq; + uint32_t local_consumers; + uint32_t in_proc_consumers; + uint32_t remote_consumers; + uint32_t local_producers; const qd_policy_spec_t *policy_spec; qdr_delivery_t *delivery; qdr_delivery_cleanup_list_t delivery_cleanup_list; qdr_global_stats_handler_t stats_handler; + qdr_address_watch_update_t watch_handler; void *context; }; @@ -803,6 +812,7 @@ struct qdr_core_t { qd_hash_t *conn_id_hash; qdr_address_list_t addrs; qd_hash_t *addr_hash; + qdr_address_watch_list_t addr_watches; qd_parse_tree_t *addr_parse_tree; qdr_address_t *hello_addr; qdr_address_t *router_addr_L; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 1d55ada62..07e4b4f6a 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -160,6 +160,7 @@ foreach(py_test_module system_tests_http1_over_tcp system_tests_tcp_adaptor system_tests_heartbeats + system_tests_address_watch ) string(CONFIGURE "${PYTHON_TEST_COMMAND}" CONFIGURED_PYTHON_TEST_COMMAND) diff --git a/tests/system_tests_address_watch.py b/tests/system_tests_address_watch.py new file mode 100644 index 000000000..6758b97e1 --- /dev/null +++ b/tests/system_tests_address_watch.py @@ -0,0 +1,116 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, unittest, TestTimeout +from proton.handlers import MessagingHandler +from proton.reactor import Container + + +class RouterTest(TestCase): + + inter_router_port = None + + @classmethod + def setUpClass(cls): + """Start a router""" + super(RouterTest, cls).setUpClass() + + def router(name, connection): + + config = [ + ('router', {'mode': 'interior', 'id': name}), + ('listener', {'port': cls.tester.get_port()}), + connection + ] + + config = Qdrouterd.Config(config) + + cls.routers.append(cls.tester.qdrouterd(name, config, wait=True, cl_args=["-T"])) + + cls.routers = [] + + inter_router_port = cls.tester.get_port() + + router('A', ('listener', {'role': 'inter-router', 'port': inter_router_port})) + router('B', ('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': inter_router_port})) + + cls.routers[0].wait_router_connected('B') + cls.routers[1].wait_router_connected('A') + + def test_address_watch(self): + test = AddressWatchTest(self.routers[0], self.routers[1]) + test.run() + self.assertIsNone(test.error) + + +class AddressWatchTest(MessagingHandler): + def __init__(self, host_a, host_b): + super(AddressWatchTest, self).__init__() + self.host_a = host_a + self.host_b = host_b + self.addr = 'addr_watch/test_address/1' + self.conn_a = None + self.conn_b = None + self.error = None + self.sender = None + self.receiver = None + self.n_closed = 0 + + def timeout(self): + self.error = "Timeout Expired" + if self.conn_a: + self.conn_a.close() + if self.conn_b: + self.conn_b.close() + + def on_start(self, event): + self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) + self.conn_a = event.container.connect(self.host_a.addresses[0]) + self.conn_b = event.container.connect(self.host_b.addresses[0]) + self.receiver = event.container.create_receiver(self.conn_a, self.addr) + self.sender = event.container.create_sender(self.conn_b, self.addr) + + def on_sendable(self, event): + if event.sender == self.sender: + self.conn_a.close() + self.conn_b.close() + + def on_connection_closed(self, event): + self.n_closed += 1 + if self.n_closed == 2: + with open(self.host_a.logfile_path, 'r') as router_log: + log_lines = router_log.read().split("\n") + search_lines = [s for s in log_lines if "ADDRESS_WATCH" in s and "on_watch" in s] + matches = [s for s in search_lines if "loc: 1 rem: 0 prod: 0" in s] + if len(matches) == 0: + self.error = "Didn't see local consumer on router A" + with open(self.host_b.logfile_path, 'r') as router_log: + log_lines = router_log.read().split("\n") + search_lines = [s for s in log_lines if "ADDRESS_WATCH" in s and "on_watch" in s] + matches = [s for s in search_lines if "loc: 0 rem: 1 prod: 1" in s] + if len(matches) == 0: + self.error = "Didn't see remote consumer and local producer on router B" + self.timer.cancel() + + def run(self): + Container(self).run() + + +if __name__ == '__main__': + unittest.main(main_module())