Skip to content

Commit

Permalink
Fixes skupperproject#1398, skupperproject#1405 - bugs involving delet…
Browse files Browse the repository at this point in the history
…ion of tcpConnector/Listeners

TCP adaptor creates special connection instances that are used for
managing the service address subscriptions. A user must not
inadvertantly delete these connections or the router will
malfunction. This patch prevents these connections from being
force-closed via the management interface.

This patch also fixes a bug where a stale route entry for the service
address was not cleaned up after the tcpConnector is deleted.

Closes skupperproject#1398
Closes skupperproject#1405
  • Loading branch information
kgiusti committed Feb 10, 2024
1 parent 2ffd30b commit a2561cf
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 33 deletions.
34 changes: 27 additions & 7 deletions src/adaptors/adaptor_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -245,18 +245,38 @@ int qd_raw_connection_write_buffers(pn_raw_connection_t *pn_raw_conn, qd_adaptor
return num_buffers_written;
}

char *qd_raw_conn_get_address(pn_raw_connection_t *pn_raw_conn)

size_t qd_raw_conn_get_address_buf(pn_raw_connection_t *pn_raw_conn, char *buf, size_t buflen)
{
assert(pn_raw_conn);
assert(buflen);

buf[0] = '\0';

const pn_netaddr_t *netaddr = pn_raw_connection_remote_addr(pn_raw_conn);
char buffer[1024];
int len = pn_netaddr_str(netaddr, buffer, 1024);
if (len <= 1024) {
return strdup(buffer);
} else {
return strndup(buffer, 1024);
if (!netaddr)
return 0;

int len = pn_netaddr_str(netaddr, buf, buflen);
if (len < 0)
return 0;
if (len >= buflen) { // truncated
len = buflen - 1;
buf[len] = '\0';
}

return (size_t) len;
}


char *qd_raw_conn_get_address(pn_raw_connection_t *pn_raw_conn)
{
char result[1024];
qd_raw_conn_get_address_buf(pn_raw_conn, result, sizeof(result));
return strdup(result);
}


int qd_raw_connection_drain_write_buffers(pn_raw_connection_t *pn_raw_conn)
{
pn_raw_buffer_t buffs[RAW_BUFFER_BATCH];
Expand Down
7 changes: 7 additions & 0 deletions src/adaptors/adaptor_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ int qd_raw_connection_write_buffers(pn_raw_connection_t *pn_raw_conn, qd_adaptor
*/
char *qd_raw_conn_get_address(pn_raw_connection_t *pn_raw_conn);

/**
* Get the raw connections remote address.
* Like qd_raw_conn_get_address(), but address buffer is supplied by caller.
* @return number of bytes written, zero if no address available (buf is set to the null string).
*/
size_t qd_raw_conn_get_address_buf(pn_raw_connection_t *pn_raw_conn, char *buf, size_t buflen);

/**
* Drains write buffers held by proton raw connection.
* @param raw_conn - The pn_raw_connection_t to which the write buffers were granted.
Expand Down
45 changes: 31 additions & 14 deletions src/adaptors/tcp_lite/tcp_lite.c
Original file line number Diff line number Diff line change
Expand Up @@ -180,21 +180,21 @@ static pn_data_t *TL_conn_properties(void)
}


static qdr_connection_t *TL_open_core_connection(uint64_t conn_id, bool incoming)
static qdr_connection_t *TL_open_core_connection(uint64_t conn_id, bool incoming, const char *host)
{
qdr_connection_t *conn;

//
// The qdr_connection_info() function makes its own copy of the passed in tcp_conn_properties.
// So, we need to call pn_data_free(tcp_conn_properties)
// So, we need to call pn_data_free(properties)
//
pn_data_t *properties = TL_conn_properties();
qdr_connection_info_t *info = qdr_connection_info(false, // is_encrypted,
false, // is_authenticated,
true, // opened,
"", // sasl_mechanisms,
incoming ? QD_INCOMING : QD_OUTGOING, // dir,
"tcplite", // host,
host,
"", // ssl_proto,
"", // ssl_cipher,
"", // user,
Expand Down Expand Up @@ -266,8 +266,9 @@ static void TL_setup_connector(tcplite_connector_t *cr)
// Set up a core connection to handle all of the links and deliveries for this connector
//
cr->conn_id = qd_server_allocate_connection_id(tcplite_context->server);
cr->core_conn = TL_open_core_connection(cr->conn_id, false);
cr->core_conn = TL_open_core_connection(cr->conn_id, false, "egress-dispatch");
qdr_connection_set_context(cr->core_conn, cr);
cr->connections_opened = 1; // for legacy compatibility: it counted the egress-dispatch conn

//
// Attach an out-link to represent our desire to receive connection streams for the address
Expand Down Expand Up @@ -759,11 +760,13 @@ static void link_setup_LSIDE_IO(tcplite_connection_t *conn)
tcplite_listener_t *li = (tcplite_listener_t*) conn->common.parent;
qdr_terminus_t *target = qdr_terminus(0);
qdr_terminus_t *source = qdr_terminus(0);
char host[64]; // for numeric remote client IP:port address

qdr_terminus_set_address(target, li->adaptor_config->address);
qdr_terminus_set_dynamic(source);

conn->core_conn = TL_open_core_connection(conn->conn_id, true);

qd_raw_conn_get_address_buf(conn->raw_conn, host, sizeof(host));
conn->core_conn = TL_open_core_connection(conn->conn_id, true, host);
qdr_connection_set_context(conn->core_conn, conn);

conn->inbound_link = qdr_link_first_attach(conn->core_conn, QD_INCOMING, qdr_terminus(0), target, "tcp.lside.in", 0, false, 0, &conn->inbound_link_id);
Expand All @@ -779,10 +782,12 @@ static void link_setup_CSIDE_IO(tcplite_connection_t *conn, qdr_delivery_t *deli
{
ASSERT_RAW_IO;
qdr_terminus_t *target = qdr_terminus(0);
char host[64]; // for numeric remote server IP:port address

qdr_terminus_set_address(target, conn->reply_to);

conn->core_conn = TL_open_core_connection(conn->conn_id, false);
qd_raw_conn_get_address_buf(conn->raw_conn, host, sizeof(host));
conn->core_conn = TL_open_core_connection(conn->conn_id, false, host);
qdr_connection_set_context(conn->core_conn, conn);

conn->inbound_link = qdr_link_first_attach(conn->core_conn, QD_INCOMING, qdr_terminus(0), target, "tcp.cside.in", 0, false, 0, &conn->inbound_link_id);
Expand Down Expand Up @@ -1781,12 +1786,15 @@ void qd_dispatch_delete_tcp_listener_lite(qd_dispatch_t *qd, tcplite_listener_t
if (li) {
li->closing = true;

if (!tcplite_context->adaptor_finalizing) {
if (!!li->adaptor_listener) {
qd_adaptor_listener_close(li->adaptor_listener);
li->adaptor_listener = 0;
}
} else {
// deactivate the listener to prevent new connections from being accepted
// on the proactor thread
//
if (!!li->adaptor_listener) {
qd_adaptor_listener_close(li->adaptor_listener);
li->adaptor_listener = 0;
}

if (tcplite_context->adaptor_finalizing) {
tcplite_connection_t *conn = DEQ_HEAD(li->connections);
if (!!conn) {
while (conn) {
Expand Down Expand Up @@ -1875,6 +1883,15 @@ void qd_dispatch_delete_tcp_connector_lite(qd_dispatch_t *qd, tcplite_connector_
if (cr) {
cr->closing = true;

// Explicitly drop the out-link so that we notify any link event monitors and stop new deliveries from being
// forwarded to this connector
//
if (!!cr->out_link) {
qdr_link_set_context(cr->out_link, 0);
qdr_link_detach(cr->out_link, QD_LOST, 0);
cr->out_link = 0;
}

if (!tcplite_context->adaptor_finalizing) {
qdr_connection_closed(cr->core_conn);
qd_connection_counter_dec(QD_PROTOCOL_TCP);
Expand Down
2 changes: 0 additions & 2 deletions src/adaptors/tcp_lite/tcp_lite.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ struct tcplite_listener_t {
sys_mutex_t lock;
qd_adaptor_config_t *adaptor_config;
qd_tls_domain_t *tls_domain;
uint64_t link_id;
qdr_link_t *in_link;
qd_adaptor_listener_t *adaptor_listener;
tcplite_connection_list_t connections;
qdpo_config_t *protocol_observer_config;
Expand Down
39 changes: 29 additions & 10 deletions tests/system_tests_tcp_adaptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1661,13 +1661,14 @@ class TcpAdaptorManagementTest(TestCase):
Test Creation and deletion of TCP management entities
"""
@classmethod
def setUpClass(cls):
def setUpClass(cls, encap='legacy', test_name='TCPMgmtTest'):
super(TcpAdaptorManagementTest, cls).setUpClass()

if DISABLE_SELECTOR_TESTS:
return

cls.test_name = 'TCPMgmtTest'
cls.test_name = test_name
cls.encapsulation = encap

# create edge and interior routers. The listener/connector will be on
# the edge router. It is expected that the edge will create proxy
Expand Down Expand Up @@ -1739,12 +1740,14 @@ def test_01_mgmt(self):
name=listener_name,
attributes={'address': van_address,
'port': self.tcp_listener_port,
'host': '127.0.0.1'})
'host': '127.0.0.1',
'encapsulation': self.encapsulation})
mgmt.create(type=TCP_CONNECTOR_TYPE,
name=connector_name,
attributes={'address': van_address,
'port': self.tcp_server_port,
'host': '127.0.0.1'})
'host': '127.0.0.1',
'encapsulation': self.encapsulation})

# verify the entities have been created and tcp traffic works
self.assertEqual(1, len(mgmt.query(type=TCP_LISTENER_TYPE).results))
Expand Down Expand Up @@ -1810,12 +1813,14 @@ def test_02_mgmt_recreate(self):
name=listener_name,
attributes={'address': van_address,
'port': self.tcp_listener_port,
'host': '127.0.0.1'})
'host': '127.0.0.1',
'encapsulation': self.encapsulation})
mgmt.create(type=TCP_CONNECTOR_TYPE,
name=connector_name,
attributes={'address': van_address,
'port': self.tcp_server_port,
'host': '127.0.0.1'})
'host': '127.0.0.1',
'encapsulation': self.encapsulation})

# wait until the listener has initialized

Expand Down Expand Up @@ -1891,14 +1896,18 @@ def _wait_for_close():
# Verify updated statistics.

l_stats = mgmt.read(type=TCP_LISTENER_TYPE, name=listener_name)
self.assertEqual(10, l_stats['bytesIn'])
self.assertEqual(4, l_stats['bytesOut'])
if self.encapsulation == 'legacy':
# deprecated for tcp-lite
self.assertEqual(10, l_stats['bytesIn'])
self.assertEqual(4, l_stats['bytesOut'])
self.assertEqual(1, l_stats['connectionsOpened'])
self.assertEqual(1, l_stats['connectionsClosed'])

c_stats = mgmt.read(type=TCP_CONNECTOR_TYPE, name=connector_name)
self.assertEqual(4, c_stats['bytesIn'])
self.assertEqual(10, c_stats['bytesOut'])
if self.encapsulation == 'legacy':
# deprecated for tcp-lite
self.assertEqual(4, c_stats['bytesIn'])
self.assertEqual(10, c_stats['bytesOut'])
self.assertEqual(2, c_stats['connectionsOpened'])
self.assertEqual(1, c_stats['connectionsClosed'])

Expand Down Expand Up @@ -1934,6 +1943,16 @@ def _retry_until_fail():
retry(_retry_until_fail, delay=0.25)


class TcpAdaptorManagementLiteTest(TcpAdaptorManagementTest):
"""
Test Creation and deletion of TCP management entities
"""
@classmethod
def setUpClass(cls):
super(TcpAdaptorManagementLiteTest, cls).setUpClass(encap='lite',
test_name='TCPMgmtLiteTest')


class TcpAdaptorListenerConnectTest(TestCase):
"""
Test client connecting to TcpListeners in various scenarios
Expand Down

0 comments on commit a2561cf

Please sign in to comment.