diff --git a/src/adaptors/adaptor_common.c b/src/adaptors/adaptor_common.c index 9510b9251..3671e6f10 100644 --- a/src/adaptors/adaptor_common.c +++ b/src/adaptors/adaptor_common.c @@ -245,18 +245,38 @@ int qd_raw_connection_write_buffers(pn_raw_connection_t *pn_raw_conn, qd_adaptor return num_buffers_written; } -char *qd_raw_conn_get_address(pn_raw_connection_t *pn_raw_conn) + +size_t qd_raw_conn_get_address_buf(pn_raw_connection_t *pn_raw_conn, char *buf, size_t buflen) { + assert(pn_raw_conn); + assert(buflen); + + buf[0] = '\0'; + const pn_netaddr_t *netaddr = pn_raw_connection_remote_addr(pn_raw_conn); - char buffer[1024]; - int len = pn_netaddr_str(netaddr, buffer, 1024); - if (len <= 1024) { - return strdup(buffer); - } else { - return strndup(buffer, 1024); + if (!netaddr) + return 0; + + int len = pn_netaddr_str(netaddr, buf, buflen); + if (len < 0) + return 0; + if (len >= buflen) { // truncated + len = buflen - 1; + buf[len] = '\0'; } + + return (size_t) len; +} + + +char *qd_raw_conn_get_address(pn_raw_connection_t *pn_raw_conn) +{ + char result[1024]; + qd_raw_conn_get_address_buf(pn_raw_conn, result, sizeof(result)); + return strdup(result); } + int qd_raw_connection_drain_write_buffers(pn_raw_connection_t *pn_raw_conn) { pn_raw_buffer_t buffs[RAW_BUFFER_BATCH]; diff --git a/src/adaptors/adaptor_common.h b/src/adaptors/adaptor_common.h index db172db75..911cb312e 100644 --- a/src/adaptors/adaptor_common.h +++ b/src/adaptors/adaptor_common.h @@ -95,6 +95,13 @@ int qd_raw_connection_write_buffers(pn_raw_connection_t *pn_raw_conn, qd_adaptor */ char *qd_raw_conn_get_address(pn_raw_connection_t *pn_raw_conn); +/** + * Get the raw connections remote address. + * Like qd_raw_conn_get_address(), but address buffer is supplied by caller. + * @return number of bytes written, zero if no address available (buf is set to the null string). + */ +size_t qd_raw_conn_get_address_buf(pn_raw_connection_t *pn_raw_conn, char *buf, size_t buflen); + /** * Drains write buffers held by proton raw connection. * @param raw_conn - The pn_raw_connection_t to which the write buffers were granted. diff --git a/src/adaptors/tcp_lite/tcp_lite.c b/src/adaptors/tcp_lite/tcp_lite.c index 294aaabc4..2fa344323 100644 --- a/src/adaptors/tcp_lite/tcp_lite.c +++ b/src/adaptors/tcp_lite/tcp_lite.c @@ -180,13 +180,13 @@ static pn_data_t *TL_conn_properties(void) } -static qdr_connection_t *TL_open_core_connection(uint64_t conn_id, bool incoming) +static qdr_connection_t *TL_open_core_connection(uint64_t conn_id, bool incoming, const char *host) { qdr_connection_t *conn; - + // // The qdr_connection_info() function makes its own copy of the passed in tcp_conn_properties. - // So, we need to call pn_data_free(tcp_conn_properties) + // So, we need to call pn_data_free(properties) // pn_data_t *properties = TL_conn_properties(); qdr_connection_info_t *info = qdr_connection_info(false, // is_encrypted, @@ -194,7 +194,7 @@ static qdr_connection_t *TL_open_core_connection(uint64_t conn_id, bool incoming true, // opened, "", // sasl_mechanisms, incoming ? QD_INCOMING : QD_OUTGOING, // dir, - "tcplite", // host, + host, "", // ssl_proto, "", // ssl_cipher, "", // user, @@ -266,8 +266,9 @@ static void TL_setup_connector(tcplite_connector_t *cr) // Set up a core connection to handle all of the links and deliveries for this connector // cr->conn_id = qd_server_allocate_connection_id(tcplite_context->server); - cr->core_conn = TL_open_core_connection(cr->conn_id, false); + cr->core_conn = TL_open_core_connection(cr->conn_id, false, "egress-dispatch"); qdr_connection_set_context(cr->core_conn, cr); + cr->connections_opened = 1; // for legacy compatibility: it counted the egress-dispatch conn // // Attach an out-link to represent our desire to receive connection streams for the address @@ -759,11 +760,13 @@ static void link_setup_LSIDE_IO(tcplite_connection_t *conn) tcplite_listener_t *li = (tcplite_listener_t*) conn->common.parent; qdr_terminus_t *target = qdr_terminus(0); qdr_terminus_t *source = qdr_terminus(0); + char host[64]; // for numeric remote client IP:port address qdr_terminus_set_address(target, li->adaptor_config->address); qdr_terminus_set_dynamic(source); - - conn->core_conn = TL_open_core_connection(conn->conn_id, true); + + qd_raw_conn_get_address_buf(conn->raw_conn, host, sizeof(host)); + conn->core_conn = TL_open_core_connection(conn->conn_id, true, host); qdr_connection_set_context(conn->core_conn, conn); conn->inbound_link = qdr_link_first_attach(conn->core_conn, QD_INCOMING, qdr_terminus(0), target, "tcp.lside.in", 0, false, 0, &conn->inbound_link_id); @@ -779,10 +782,12 @@ static void link_setup_CSIDE_IO(tcplite_connection_t *conn, qdr_delivery_t *deli { ASSERT_RAW_IO; qdr_terminus_t *target = qdr_terminus(0); + char host[64]; // for numeric remote server IP:port address qdr_terminus_set_address(target, conn->reply_to); - conn->core_conn = TL_open_core_connection(conn->conn_id, false); + qd_raw_conn_get_address_buf(conn->raw_conn, host, sizeof(host)); + conn->core_conn = TL_open_core_connection(conn->conn_id, false, host); qdr_connection_set_context(conn->core_conn, conn); conn->inbound_link = qdr_link_first_attach(conn->core_conn, QD_INCOMING, qdr_terminus(0), target, "tcp.cside.in", 0, false, 0, &conn->inbound_link_id); @@ -1781,12 +1786,15 @@ void qd_dispatch_delete_tcp_listener_lite(qd_dispatch_t *qd, tcplite_listener_t if (li) { li->closing = true; - if (!tcplite_context->adaptor_finalizing) { - if (!!li->adaptor_listener) { - qd_adaptor_listener_close(li->adaptor_listener); - li->adaptor_listener = 0; - } - } else { + // deactivate the listener to prevent new connections from being accepted + // on the proactor thread + // + if (!!li->adaptor_listener) { + qd_adaptor_listener_close(li->adaptor_listener); + li->adaptor_listener = 0; + } + + if (tcplite_context->adaptor_finalizing) { tcplite_connection_t *conn = DEQ_HEAD(li->connections); if (!!conn) { while (conn) { @@ -1875,6 +1883,15 @@ void qd_dispatch_delete_tcp_connector_lite(qd_dispatch_t *qd, tcplite_connector_ if (cr) { cr->closing = true; + // Explicitly drop the out-link so that we notify any link event monitors and stop new deliveries from being + // forwarded to this connector + // + if (!!cr->out_link) { + qdr_link_set_context(cr->out_link, 0); + qdr_link_detach(cr->out_link, QD_LOST, 0); + cr->out_link = 0; + } + if (!tcplite_context->adaptor_finalizing) { qdr_connection_closed(cr->core_conn); qd_connection_counter_dec(QD_PROTOCOL_TCP); diff --git a/src/adaptors/tcp_lite/tcp_lite.h b/src/adaptors/tcp_lite/tcp_lite.h index f69c125c8..a9b006ca5 100644 --- a/src/adaptors/tcp_lite/tcp_lite.h +++ b/src/adaptors/tcp_lite/tcp_lite.h @@ -59,8 +59,6 @@ struct tcplite_listener_t { sys_mutex_t lock; qd_adaptor_config_t *adaptor_config; qd_tls_domain_t *tls_domain; - uint64_t link_id; - qdr_link_t *in_link; qd_adaptor_listener_t *adaptor_listener; tcplite_connection_list_t connections; qdpo_config_t *protocol_observer_config; diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py index 9b1583699..31c259e64 100644 --- a/tests/system_tests_tcp_adaptor.py +++ b/tests/system_tests_tcp_adaptor.py @@ -1661,13 +1661,14 @@ class TcpAdaptorManagementTest(TestCase): Test Creation and deletion of TCP management entities """ @classmethod - def setUpClass(cls): + def setUpClass(cls, encap='legacy', test_name='TCPMgmtTest'): super(TcpAdaptorManagementTest, cls).setUpClass() if DISABLE_SELECTOR_TESTS: return - cls.test_name = 'TCPMgmtTest' + cls.test_name = test_name + cls.encapsulation = encap # create edge and interior routers. The listener/connector will be on # the edge router. It is expected that the edge will create proxy @@ -1739,12 +1740,14 @@ def test_01_mgmt(self): name=listener_name, attributes={'address': van_address, 'port': self.tcp_listener_port, - 'host': '127.0.0.1'}) + 'host': '127.0.0.1', + 'encapsulation': self.encapsulation}) mgmt.create(type=TCP_CONNECTOR_TYPE, name=connector_name, attributes={'address': van_address, 'port': self.tcp_server_port, - 'host': '127.0.0.1'}) + 'host': '127.0.0.1', + 'encapsulation': self.encapsulation}) # verify the entities have been created and tcp traffic works self.assertEqual(1, len(mgmt.query(type=TCP_LISTENER_TYPE).results)) @@ -1810,12 +1813,14 @@ def test_02_mgmt_recreate(self): name=listener_name, attributes={'address': van_address, 'port': self.tcp_listener_port, - 'host': '127.0.0.1'}) + 'host': '127.0.0.1', + 'encapsulation': self.encapsulation}) mgmt.create(type=TCP_CONNECTOR_TYPE, name=connector_name, attributes={'address': van_address, 'port': self.tcp_server_port, - 'host': '127.0.0.1'}) + 'host': '127.0.0.1', + 'encapsulation': self.encapsulation}) # wait until the listener has initialized @@ -1891,14 +1896,18 @@ def _wait_for_close(): # Verify updated statistics. l_stats = mgmt.read(type=TCP_LISTENER_TYPE, name=listener_name) - self.assertEqual(10, l_stats['bytesIn']) - self.assertEqual(4, l_stats['bytesOut']) + if self.encapsulation == 'legacy': + # deprecated for tcp-lite + self.assertEqual(10, l_stats['bytesIn']) + self.assertEqual(4, l_stats['bytesOut']) self.assertEqual(1, l_stats['connectionsOpened']) self.assertEqual(1, l_stats['connectionsClosed']) c_stats = mgmt.read(type=TCP_CONNECTOR_TYPE, name=connector_name) - self.assertEqual(4, c_stats['bytesIn']) - self.assertEqual(10, c_stats['bytesOut']) + if self.encapsulation == 'legacy': + # deprecated for tcp-lite + self.assertEqual(4, c_stats['bytesIn']) + self.assertEqual(10, c_stats['bytesOut']) self.assertEqual(2, c_stats['connectionsOpened']) self.assertEqual(1, c_stats['connectionsClosed']) @@ -1934,6 +1943,16 @@ def _retry_until_fail(): retry(_retry_until_fail, delay=0.25) +class TcpAdaptorManagementLiteTest(TcpAdaptorManagementTest): + """ + Test Creation and deletion of TCP management entities + """ + @classmethod + def setUpClass(cls): + super(TcpAdaptorManagementLiteTest, cls).setUpClass(encap='lite', + test_name='TCPMgmtLiteTest') + + class TcpAdaptorListenerConnectTest(TestCase): """ Test client connecting to TcpListeners in various scenarios