From 711084ffa95a021cdf13187f0f5bee4af3b2abf2 Mon Sep 17 00:00:00 2001 From: Kenneth Giusti Date: Tue, 12 Sep 2023 15:18:58 -0400 Subject: [PATCH] Fixes #1207: add counters for total active service connections --- include/qpid/dispatch/connection_counters.h | 51 +++++ include/qpid/dispatch/protocol_adaptor.h | 4 +- include/qpid/dispatch/protocols.h | 37 +++ .../skupper_router/management/skrouter.json | 4 + src/CMakeLists.txt | 2 + src/adaptors/http1/http1_client.c | 3 +- src/adaptors/http1/http1_server.c | 3 +- src/adaptors/http2/http2_adaptor.c | 6 +- src/adaptors/reference_adaptor.c | 3 +- src/adaptors/tcp/tcp_adaptor.c | 6 +- src/adaptors/tcp_lite/tcp_lite.c | 3 +- src/connection_counters.c | 23 ++ src/protocols.c | 32 +++ src/router_core/agent_router.c | 15 +- src/router_core/agent_router.h | 2 +- src/router_core/connections.c | 13 +- src/router_core/router_core_private.h | 1 + src/router_node.c | 4 +- tests/system_test.py | 3 + tests/system_tests_default_distribution.py | 2 +- tests/system_tests_edge_router.py | 2 +- tests/system_tests_http1_adaptor.py | 149 +++++++++--- tests/system_tests_http2.py | 101 ++++++++- tests/system_tests_skstat.py | 33 +-- tests/system_tests_tcp_adaptor.py | 213 +++++++++++++----- tools/skstat | 9 +- 26 files changed, 611 insertions(+), 113 deletions(-) create mode 100644 include/qpid/dispatch/connection_counters.h create mode 100644 include/qpid/dispatch/protocols.h create mode 100644 src/connection_counters.c create mode 100644 src/protocols.c diff --git a/include/qpid/dispatch/connection_counters.h b/include/qpid/dispatch/connection_counters.h new file mode 100644 index 000000000..cfcfbb58f --- /dev/null +++ b/include/qpid/dispatch/connection_counters.h @@ -0,0 +1,51 @@ +#ifndef __connection_counters_h__ +#define __connection_counters_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/**@file + * System-wide counters for tracking open connections per protocol type. + */ + +#include "qpid/dispatch/protocols.h" +#include "qpid/dispatch/atomic.h" + +extern atomic_uint_fast64_t qd_connection_counters[QD_PROTOCOL_TOTAL]; + +static inline void qd_connection_counter_inc(qd_protocol_t proto) +{ + assert(proto < QD_PROTOCOL_TOTAL); + atomic_fetch_add_explicit(&qd_connection_counters[proto], 1, memory_order_relaxed); +} + +static inline void qd_connection_counter_dec(qd_protocol_t proto) +{ + assert(proto < QD_PROTOCOL_TOTAL); + uint64_t old = atomic_fetch_sub_explicit(&qd_connection_counters[proto], 1, memory_order_relaxed); + (void) old; + assert(old != 0); // underflow! +} + +static inline uint64_t qd_connection_count(qd_protocol_t proto) +{ + assert(proto < QD_PROTOCOL_TOTAL); + return (uint64_t) atomic_load_explicit(&qd_connection_counters[proto], memory_order_relaxed); +} + +#endif diff --git a/include/qpid/dispatch/protocol_adaptor.h b/include/qpid/dispatch/protocol_adaptor.h index c6871e6e9..e22a61b97 100644 --- a/include/qpid/dispatch/protocol_adaptor.h +++ b/include/qpid/dispatch/protocol_adaptor.h @@ -23,6 +23,7 @@ #include "qpid/dispatch/policy_spec.h" #include "qpid/dispatch/router_core.h" #include "qpid/dispatch/io_module.h" +#include "qpid/dispatch/protocols.h" typedef struct qdr_protocol_adaptor_t qdr_protocol_adaptor_t; typedef struct qdr_connection_t qdr_connection_t; @@ -872,7 +873,8 @@ qdr_connection_info_t *qdr_connection_info(bool is_encrypted, bool ssl, const char *version, bool streaming_links, - bool connection_trunking); + bool connection_trunking, + qd_protocol_t protocol); void qdr_connection_info_set_group_correlator(qdr_connection_info_t *info, const char *correlator); diff --git a/include/qpid/dispatch/protocols.h b/include/qpid/dispatch/protocols.h new file mode 100644 index 000000000..874c70079 --- /dev/null +++ b/include/qpid/dispatch/protocols.h @@ -0,0 +1,37 @@ +#ifndef __protocols_h__ +#define __protocols_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "qpid/dispatch/enum.h" + +// Network protocols supported by the router +// +typedef enum { + QD_PROTOCOL_TCP, + QD_PROTOCOL_AMQP, + QD_PROTOCOL_HTTP1, + QD_PROTOCOL_HTTP2, + QD_PROTOCOL_TOTAL // must be last +} qd_protocol_t; + +// Defines qd_protocol_name(qd_protocol_t) +// +ENUM_DECLARE(qd_protocol); +#endif diff --git a/python/skupper_router/management/skrouter.json b/python/skupper_router/management/skrouter.json index fbedd1038..764b0d817 100644 --- a/python/skupper_router/management/skrouter.json +++ b/python/skupper_router/management/skrouter.json @@ -594,6 +594,10 @@ "graph": true, "description": "The current amount of memory in use by the router process in bytes. This includes memory provisioned for stack, data, and code (VmSize). This value is set to Null if the platform does not provide access to the process memory size." }, + "connectionCounters": { + "type": "map", + "description": "A map keyed by network protocol name with a value of the count of active connections using that protocol at this router node." + }, "dataConnectionCount": { "description": "The number of parallel data connections to carry streaming data between routers. Applies only to interior routers", "type": "string", diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 3b27523c6..fc7a3f323 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -140,6 +140,8 @@ set(qpid_dispatch_SOURCES trace_mask.c python_utils.c qd_asan_interface.c + protocols.c + connection_counters.c ) set(qpid_dispatch_INCLUDES diff --git a/src/adaptors/http1/http1_client.c b/src/adaptors/http1/http1_client.c index 92f67a457..042c7eeab 100644 --- a/src/adaptors/http1/http1_client.c +++ b/src/adaptors/http1/http1_client.c @@ -357,7 +357,8 @@ static void _setup_client_connection(qdr_http1_connection_t *hconn) false, //bool ssl, "", // peer router version, false, // streaming links - false); // connection trunking + false, // connection trunking + QD_PROTOCOL_HTTP1); hconn->qdr_conn = qdr_connection_opened(qdr_http1_adaptor->core, qdr_http1_adaptor->adaptor, diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c index a52b7f76b..d7574f145 100644 --- a/src/adaptors/http1/http1_server.c +++ b/src/adaptors/http1/http1_server.c @@ -207,7 +207,8 @@ static qdr_http1_connection_t *_create_server_connection(qd_http_connector_t *co false, //bool ssl, "", // peer router version, false, // streaming links - false); // connection trunking + false, // connection trunking + QD_PROTOCOL_HTTP1); hconn->qdr_conn = qdr_connection_opened(qdr_http1_adaptor->core, qdr_http1_adaptor->adaptor, diff --git a/src/adaptors/http2/http2_adaptor.c b/src/adaptors/http2/http2_adaptor.c index 3e53b9c5c..942d4add8 100644 --- a/src/adaptors/http2/http2_adaptor.c +++ b/src/adaptors/http2/http2_adaptor.c @@ -2774,7 +2774,8 @@ qdr_http2_connection_t *qdr_http_connection_ingress_accept(qdr_http2_connection_ false, //bool ssl, "", // peer router version, false, // streaming links - false); // connection trunking + false, // connection trunking + QD_PROTOCOL_HTTP2); qdr_connection_t *conn = qdr_connection_opened(http2_adaptor->core, http2_adaptor->adaptor, @@ -3102,7 +3103,8 @@ qdr_http2_connection_t *qdr_http_connection_egress(qd_http_connector_t *connecto false, //bool ssl, "", // peer router version, false, // streaming links - false); // connection trunking + false, // connection trunking + QD_PROTOCOL_HTTP2); qdr_connection_t *conn = qdr_connection_opened(http2_adaptor->core, http2_adaptor->adaptor, diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c index 53641d3f3..4df4ac30e 100644 --- a/src/adaptors/reference_adaptor.c +++ b/src/adaptors/reference_adaptor.c @@ -421,7 +421,8 @@ static void on_startup(void *context) false, //bool ssl, "", // peer router version, false, // streaming links - false); // connection trunking + false, // connection trunking + QD_PROTOCOL_TCP); adaptor->conn = qdr_connection_opened(adaptor->core, // core adaptor->adaptor, // protocol_adaptor diff --git a/src/adaptors/tcp/tcp_adaptor.c b/src/adaptors/tcp/tcp_adaptor.c index 1a22b3894..38460546d 100644 --- a/src/adaptors/tcp/tcp_adaptor.c +++ b/src/adaptors/tcp/tcp_adaptor.c @@ -908,7 +908,8 @@ static void qdr_tcp_connection_ingress_accept(qdr_tcp_connection_t* tc) false, // ssl, "", // peer router version, false, // streaming links - false); // connection trunking + false, // connection trunking + QD_PROTOCOL_TCP); pn_data_free(tcp_conn_properties); qdr_connection_t *conn = qdr_connection_opened(tcp_adaptor->core, @@ -1337,7 +1338,8 @@ static void qdr_tcp_create_server_side_connection(qdr_tcp_connection_t* tc) false, // bool ssl, "", // peer router version, false, // streaming links - false); // connection trunking + false, // connection trunking + QD_PROTOCOL_TCP); pn_data_free(tcp_conn_properties); qdr_connection_t *conn = qdr_connection_opened(tcp_adaptor->core, diff --git a/src/adaptors/tcp_lite/tcp_lite.c b/src/adaptors/tcp_lite/tcp_lite.c index 9c1d39bd1..b9fb311ff 100644 --- a/src/adaptors/tcp_lite/tcp_lite.c +++ b/src/adaptors/tcp_lite/tcp_lite.c @@ -176,7 +176,8 @@ static qdr_connection_t *TL_open_core_connection(uint64_t conn_id, bool incoming false, // ssl, "", // peer router version, true, // streaming links - false); // connection trunking + false, // connection trunking + QD_PROTOCOL_TCP); pn_data_free(properties); conn = qdr_connection_opened(tcplite_context->core, diff --git a/src/connection_counters.c b/src/connection_counters.c new file mode 100644 index 000000000..5c9007092 --- /dev/null +++ b/src/connection_counters.c @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "qpid/dispatch/connection_counters.h" + +atomic_uint_fast64_t qd_connection_counters[QD_PROTOCOL_TOTAL]; + diff --git a/src/protocols.c b/src/protocols.c new file mode 100644 index 000000000..c2c021dfd --- /dev/null +++ b/src/protocols.c @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "qpid/dispatch/protocols.h" + +#include + +// names must match the order of the enum entries in qd_protocol_t +// +static const char *_names[QD_PROTOCOL_TOTAL] = { + "tcp", "amqp", "http/1", "http/2" +}; + +// defines function qd_protocol_name(qd_protocol_t) +// +ENUM_DEFINE(qd_protocol, _names); diff --git a/src/router_core/agent_router.c b/src/router_core/agent_router.c index 596f106e5..88d486516 100644 --- a/src/router_core/agent_router.c +++ b/src/router_core/agent_router.c @@ -18,8 +18,9 @@ */ #include "agent_router.h" - #include "config.h" +#include "qpid/dispatch/protocols.h" +#include "qpid/dispatch/connection_counters.h" #include @@ -56,6 +57,7 @@ #define QDR_ROUTER_UPTIME_SECONDS 29 #define QDR_ROUTER_MEMORY_USAGE 30 #define QDR_ROUTER_WORKER_THREADS 31 +#define QDR_ROUTER_CONNECTION_COUNTERS 32 const char *qdr_router_columns[] = {"name", @@ -90,6 +92,7 @@ const char *qdr_router_columns[] = "uptimeSeconds", "memoryUsage", "workerThreads", + "connectionCounters", 0}; @@ -247,6 +250,16 @@ static void qdr_agent_write_column_CT(qd_composed_field_t *body, int col, qdr_co qd_compose_insert_null(body); } break; + case QDR_ROUTER_CONNECTION_COUNTERS: { + qd_compose_start_map(body); + for (qd_protocol_t proto = 0; proto < QD_PROTOCOL_TOTAL; ++proto) { + qd_compose_insert_string(body, qd_protocol_name(proto)); + qd_compose_insert_ulong(body, qd_connection_count(proto)); + } + qd_compose_end_map(body); + break; + } + default: qd_compose_insert_null(body); break; diff --git a/src/router_core/agent_router.h b/src/router_core/agent_router.h index 73e5097bf..4af7181ee 100644 --- a/src/router_core/agent_router.h +++ b/src/router_core/agent_router.h @@ -21,7 +21,7 @@ #include "router_core_private.h" -#define QDR_ROUTER_COLUMN_COUNT 32 +#define QDR_ROUTER_COLUMN_COUNT 33 extern const char *qdr_router_columns[QDR_ROUTER_COLUMN_COUNT + 1]; diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 52a87487a..d46cb86b9 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -26,6 +26,7 @@ #include "qpid/dispatch/discriminator.h" #include "qpid/dispatch/router_core.h" #include "qpid/dispatch/static_assert.h" +#include "qpid/dispatch/connection_counters.h" #include #include @@ -123,6 +124,10 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t *core, context_binder(conn, bind_token); } + // count "user" connections (not infrastructure) + if (role == QDR_ROLE_NORMAL) + qd_connection_counter_inc(connection_info->protocol); + set_safe_ptr_qdr_connection_t(conn, &action->args.connection.conn); action->args.connection.connection_label = qdr_field(label); action->args.connection.container_id = qdr_field(remote_container_id); @@ -151,6 +156,9 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t *core, void qdr_connection_closed(qdr_connection_t *conn) { + if (conn->role == QDR_ROLE_NORMAL) + qd_connection_counter_dec(conn->connection_info->protocol); + qdr_action_t *action = qdr_action(qdr_connection_closed_CT, "connection_closed"); set_safe_ptr_qdr_connection_t(conn, &action->args.connection.conn); qdr_action_enqueue(conn->core, action); @@ -184,13 +192,15 @@ qdr_connection_info_t *qdr_connection_info(bool is_encrypted, bool ssl, const char *version, bool streaming_links, - bool connection_trunking) + bool connection_trunking, + qd_protocol_t protocol) { qdr_connection_info_t *connection_info = new_qdr_connection_info_t(); ZERO(connection_info); connection_info->is_encrypted = is_encrypted; connection_info->is_authenticated = is_authenticated; connection_info->opened = opened; + connection_info->protocol = protocol; if (container) connection_info->container = strdup(container); @@ -1824,6 +1834,7 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo qd_log(LOG_ROUTER_CORE, QD_LOG_INFO, "[C%" PRIu64 "] Connection Closed", conn->identity); DEQ_REMOVE(core->open_connections, conn); + qdr_connection_free(conn); } diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index d672f7876..db98f7ad2 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -648,6 +648,7 @@ struct qdr_connection_info_t { bool connection_trunking; // peer supports connection trunking qd_direction_t dir; qdr_connection_role_t role; + qd_protocol_t protocol; pn_data_t *connection_properties; bool ssl; int ssl_ssf; //ssl strength factor diff --git a/src/router_node.c b/src/router_node.c index 57161b342..ef9e0c017 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -27,6 +27,7 @@ #include #include #include +#include #include @@ -1535,7 +1536,8 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool is_ssl, rversion, streaming_links, - connection_trunking); + connection_trunking, + QD_PROTOCOL_AMQP); qdr_connection_info_set_group_correlator(connection_info, conn->group_correlator); diff --git a/tests/system_test.py b/tests/system_test.py index 576546696..a529c1cde 100755 --- a/tests/system_test.py +++ b/tests/system_test.py @@ -74,11 +74,14 @@ # Optional modules MISSING_MODULES = [] + +# management type identifiers, long form HTTP_LISTENER_TYPE = 'io.skupper.router.httpListener' TCP_LISTENER_TYPE = 'io.skupper.router.tcpListener' HTTP_CONNECTOR_TYPE = 'io.skupper.router.httpConnector' TCP_CONNECTOR_TYPE = 'io.skupper.router.tcpConnector' CONNECTION_TYPE = 'io.skupper.router.connection' +ROUTER_TYPE = 'io.skupper.router.router' try: import qpidtoollibs # pylint: disable=unused-import diff --git a/tests/system_tests_default_distribution.py b/tests/system_tests_default_distribution.py index da1ac5c4f..d18b9d5fe 100644 --- a/tests/system_tests_default_distribution.py +++ b/tests/system_tests_default_distribution.py @@ -74,7 +74,7 @@ def test_anonymous_sender(self): def test_general(self): out = self.run_skstat(['--general'], r'(?s)Router Statistics.*Mode\s*Standalone') - self.assertIn("Connections 1", out) + self.assertIn("Total Connections 1", out) self.assertIn("Nodes 0", out) self.assertIn("Auto Links 0", out) self.assertIn("Router Id QDR", out) diff --git a/tests/system_tests_edge_router.py b/tests/system_tests_edge_router.py index f7477b306..019fa0774 100644 --- a/tests/system_tests_edge_router.py +++ b/tests/system_tests_edge_router.py @@ -1275,7 +1275,7 @@ def test_69_interior_skstat_all_routers(self): address=self.routers[0].addresses[0]) self.assertEqual(outs.count("Router Links"), 2) self.assertEqual(outs.count("Router Addresses"), 2) - self.assertEqual(outs.count("Connections"), 12) + self.assertEqual(outs.count("Total Connections"), 2) self.assertEqual(outs.count("Router Statistics"), 2) self.assertEqual(outs.count("Memory Pools"), 2) diff --git a/tests/system_tests_http1_adaptor.py b/tests/system_tests_http1_adaptor.py index f0b199a06..8342dd674 100644 --- a/tests/system_tests_http1_adaptor.py +++ b/tests/system_tests_http1_adaptor.py @@ -31,6 +31,7 @@ from ssl import SSLContext, PROTOCOL_TLS_CLIENT, PROTOCOL_TLS_SERVER from ssl import CERT_REQUIRED, SSLSocket from time import sleep, time +from typing import Mapping from email.parser import BytesParser from proton import Message @@ -39,6 +40,8 @@ from system_test import retry_exception, curl_available, run_curl, retry from system_test import nginx_available, get_digest, NginxServer, Process from system_test import openssl_available, is_pattern_present +from system_test import HTTP_LISTENER_TYPE, HTTP_CONNECTOR_TYPE +from system_test import CONNECTION_TYPE, ROUTER_TYPE from http1_tests import http1_ping, TestServer, RequestHandler10 from http1_tests import RequestMsg, ResponseMsg, ResponseValidator from http1_tests import ThreadedTestClient, Http1OneRouterTestBase @@ -120,10 +123,6 @@ class Http1AdaptorManagementTest(TestCase): def setUpClass(cls): super(Http1AdaptorManagementTest, cls).setUpClass() - cls.LISTENER_TYPE = 'io.skupper.router.httpListener' - cls.CONNECTOR_TYPE = 'io.skupper.router.httpConnector' - cls.CONNECTION_TYPE = 'io.skupper.router.connection' - cls.interior_edge_port = cls.tester.get_port() cls.interior_mgmt_port = cls.tester.get_port() cls.edge_mgmt_port = cls.tester.get_port() @@ -166,16 +165,16 @@ def test_01_create_delete(self): adaptor properly notifies the interior of the subscribers/producers. """ e_mgmt = self.e_router.management - self.assertEqual(0, len(e_mgmt.query(type=self.LISTENER_TYPE).results)) - self.assertEqual(0, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results)) + self.assertEqual(0, len(e_mgmt.query(type=HTTP_LISTENER_TYPE).results)) + self.assertEqual(0, len(e_mgmt.query(type=HTTP_CONNECTOR_TYPE).results)) - e_mgmt.create(type=self.CONNECTOR_TYPE, + e_mgmt.create(type=HTTP_CONNECTOR_TYPE, name="ServerConnector", attributes={'address': 'closest/http1Service', 'port': self.http_server_port, 'protocolVersion': 'HTTP1'}) - e_mgmt.create(type=self.LISTENER_TYPE, + e_mgmt.create(type=HTTP_LISTENER_TYPE, name="ClientListener", attributes={'address': 'closest/http1Service', 'port': self.http_listener_port, @@ -183,8 +182,8 @@ def test_01_create_delete(self): # verify the entities have been created and http traffic works - self.assertEqual(1, len(e_mgmt.query(type=self.LISTENER_TYPE).results)) - self.assertEqual(1, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results)) + self.assertEqual(1, len(e_mgmt.query(type=HTTP_LISTENER_TYPE).results)) + self.assertEqual(1, len(e_mgmt.query(type=HTTP_CONNECTOR_TYPE).results)) http1_ping(sport=self.http_server_port, cport=self.http_listener_port) @@ -195,15 +194,15 @@ def test_01_create_delete(self): # delete the connector and listener; wait for the associated connection # to be removed # - e_mgmt.delete(type=self.CONNECTOR_TYPE, name="ServerConnector") - self.assertEqual(0, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results)) - e_mgmt.delete(type=self.LISTENER_TYPE, name="ClientListener") - self.assertEqual(0, len(e_mgmt.query(type=self.LISTENER_TYPE).results)) + e_mgmt.delete(type=HTTP_CONNECTOR_TYPE, name="ServerConnector") + self.assertEqual(0, len(e_mgmt.query(type=HTTP_CONNECTOR_TYPE).results)) + e_mgmt.delete(type=HTTP_LISTENER_TYPE, name="ClientListener") + self.assertEqual(0, len(e_mgmt.query(type=HTTP_LISTENER_TYPE).results)) # will hit test timeout on failure: while True: hconns = 0 - obj = e_mgmt.query(type=self.CONNECTION_TYPE, + obj = e_mgmt.query(type=CONNECTION_TYPE, attribute_names=["protocol"]) for item in obj.get_dicts(): if "http/1.x" in item["protocol"]: @@ -240,45 +239,45 @@ def test_01_create_delete(self): # # re-create the connector and listener; verify it works # - e_mgmt.create(type=self.CONNECTOR_TYPE, + e_mgmt.create(type=HTTP_CONNECTOR_TYPE, name="ServerConnector", attributes={'address': 'closest/http1Service', 'port': self.http_server_port, 'protocolVersion': 'HTTP1'}) - e_mgmt.create(type=self.LISTENER_TYPE, + e_mgmt.create(type=HTTP_LISTENER_TYPE, name="ClientListener", attributes={'address': 'closest/http1Service', 'port': self.http_listener_port, 'protocolVersion': 'HTTP1'}) - self.assertEqual(1, len(e_mgmt.query(type=self.LISTENER_TYPE).results)) - self.assertEqual(1, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results)) + self.assertEqual(1, len(e_mgmt.query(type=HTTP_LISTENER_TYPE).results)) + self.assertEqual(1, len(e_mgmt.query(type=HTTP_CONNECTOR_TYPE).results)) http1_ping(sport=self.http_server_port, cport=self.http_listener_port) self.i_router.wait_address("closest/http1Service", subscribers=1) - e_mgmt.delete(type=self.CONNECTOR_TYPE, name="ServerConnector") - self.assertEqual(0, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results)) - e_mgmt.delete(type=self.LISTENER_TYPE, name="ClientListener") - self.assertEqual(0, len(e_mgmt.query(type=self.LISTENER_TYPE).results)) + e_mgmt.delete(type=HTTP_CONNECTOR_TYPE, name="ServerConnector") + self.assertEqual(0, len(e_mgmt.query(type=HTTP_CONNECTOR_TYPE).results)) + e_mgmt.delete(type=HTTP_LISTENER_TYPE, name="ClientListener") + self.assertEqual(0, len(e_mgmt.query(type=HTTP_LISTENER_TYPE).results)) def test_01_delete_active_connector(self): """Delete an HTTP1 connector that is currently connected to a server. Verify the connection is dropped. """ e_mgmt = self.e_router.management - self.assertEqual(0, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results)) + self.assertEqual(0, len(e_mgmt.query(type=HTTP_CONNECTOR_TYPE).results)) - e_mgmt.create(type=self.CONNECTOR_TYPE, + e_mgmt.create(type=HTTP_CONNECTOR_TYPE, name="ServerConnector", attributes={'address': 'closest/http1Service', 'port': self.http_server_port, 'protocolVersion': 'HTTP1'}) # verify the connector has been created and attach a dummy server - self.assertEqual(1, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results)) + self.assertEqual(1, len(e_mgmt.query(type=HTTP_CONNECTOR_TYPE).results)) server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: @@ -295,8 +294,8 @@ def test_01_delete_active_connector(self): self.i_router.wait_address("closest/http1Service", subscribers=1) # delete the connector - e_mgmt.delete(type=self.CONNECTOR_TYPE, name="ServerConnector") - self.assertEqual(0, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results)) + e_mgmt.delete(type=HTTP_CONNECTOR_TYPE, name="ServerConnector") + self.assertEqual(0, len(e_mgmt.query(type=HTTP_CONNECTOR_TYPE).results)) # expect socket to close while True: @@ -3205,5 +3204,99 @@ def test_01_client_backpressure(self): server.close() +class Http1AdaptorConnCounter(TestCase): + """ + Validate the HTTP1 service connection counter + """ + @classmethod + def setUpClass(cls): + super(Http1AdaptorConnCounter, cls).setUpClass() + + config = [ + ('router', {'mode': 'interior', + 'id': 'HTTP1ConnCounter'}), + ('listener', {'role': 'normal', + 'port': cls.tester.get_port()}), + ('address', {'prefix': 'closest', 'distribution': 'closest'}), + ('address', {'prefix': 'multicast', 'distribution': 'multicast'}), + ] + config = Qdrouterd.Config(config) + cls.router = cls.tester.qdrouterd('HTTP1ConnCounter', config) + + def _get_conn_counters(self) -> Mapping[str, int]: + attributes = ["connectionCounters"] + rc = self.router.management.query(type=ROUTER_TYPE, + attribute_names=attributes) + self.assertIsNotNone(rc, "unexpected query failure") + self.assertEqual(1, len(rc.get_dicts()), "expected one attribute!") + counters = rc.get_dicts()[0].get("connectionCounters") + self.assertIsNotNone(counters, "expected a counter map to be returned") + return counters + + def test_01_check_counter(self): + """ Create and destroy HTTP1 network connections, verify the connection + counter is correct. + """ + mgmt = self.router.management + + # verify the counters start at zero (not including amqp) + counters = self._get_conn_counters() + for proto in ["tcp", "http/1", "http/2"]: + counter = counters.get(proto) + self.assertIsNotNone(counter, f"Missing expected protocol counter {proto}!") + self.assertEqual(0, counter, "counters must be zero on startup") + + # Bring up a server and client, check the counter + + connector_port = self.get_port() + listener_port = self.get_port() + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as listener: + listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + listener.bind(("", connector_port)) + listener.setblocking(True) + listener.listen(10) + + mgmt.create(type=HTTP_LISTENER_TYPE, + name="ClientListener", + attributes={'address': 'closest/http1Service', + 'port': listener_port, + 'protocolVersion': 'HTTP1'}) + mgmt.create(type=HTTP_CONNECTOR_TYPE, + name="ServerConnector", + attributes={'address': 'closest/http1Service', + 'port': connector_port, + 'protocolVersion': 'HTTP1'}) + + server, _ = listener.accept() + try: + wait_http_listeners_up(self.router.addresses[0], + l_filter={'name': 'ClientListener'}) + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client: + client.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + retry_exception(lambda cs=client, lp=listener_port: + cs.connect(("localhost", lp)), + delay=0.25, + exception=ConnectionRefusedError) + + # verify that there are now two HTTP/1 connections: + + self.assertTrue(retry(lambda: + self._get_conn_counters().get("http/1") == 2), + "Expected 2 active HTTP connections") + finally: + server.close() + + mgmt.delete(type=HTTP_CONNECTOR_TYPE, name="ServerConnector") + mgmt.delete(type=HTTP_LISTENER_TYPE, name="ClientListener") + + # expect http/1 counter to return to zero + + self.assertTrue(retry(lambda: + self._get_conn_counters().get("http/1") == 0), + "Expected no active HTTP connections") + + if __name__ == '__main__': unittest.main(main_module()) diff --git a/tests/system_tests_http2.py b/tests/system_tests_http2.py index 5236b897a..e154eded1 100644 --- a/tests/system_tests_http2.py +++ b/tests/system_tests_http2.py @@ -23,12 +23,14 @@ import unittest from subprocess import PIPE from time import sleep +from typing import Mapping import system_test from http1_tests import wait_http_listeners_up, HttpAdaptorListenerConnectTestBase, wait_tcp_listeners_up from system_test import TestCase, Qdrouterd, QdManager, Process, retry_assertion from system_test import curl_available, nginx_available, TIMEOUT, Http2Server -from system_test import get_digest +from system_test import get_digest, retry, retry_exception +from system_test import HTTP_LISTENER_TYPE, HTTP_CONNECTOR_TYPE, ROUTER_TYPE h2hyper_installed = True try: @@ -1194,3 +1196,100 @@ def test_05_listener_edge_edge(self): Test tcpListener socket lifecycle edge to edge """ self._test_listener_socket_lifecycle(self.EdgeA, self.EdgeB, "test_05_listener_edge_edge") + + +class Http2AdaptorConnCounter(TestCase): + """ + Validate the HTTP/2 service connection counter + """ + @classmethod + def setUpClass(cls): + super(Http2AdaptorConnCounter, cls).setUpClass() + + config = [ + ('router', {'mode': 'interior', + 'id': 'HTTP2ConnCounter'}), + ('listener', {'role': 'normal', + 'port': cls.tester.get_port()}), + ('address', {'prefix': 'closest', 'distribution': 'closest'}), + ('address', {'prefix': 'multicast', 'distribution': 'multicast'}), + ] + config = Qdrouterd.Config(config) + cls.router = cls.tester.qdrouterd('HTTP1ConnCounter', config) + + def _get_conn_counters(self) -> Mapping[str, int]: + attributes = ["connectionCounters"] + rc = self.router.management.query(type=ROUTER_TYPE, + attribute_names=attributes) + self.assertIsNotNone(rc, "unexpected query failure") + self.assertEqual(1, len(rc.get_dicts()), "expected one attribute!") + counters = rc.get_dicts()[0].get("connectionCounters") + self.assertIsNotNone(counters, "expected a counter map to be returned") + return counters + + def test_01_check_counter(self): + """ Create and destroy HTTP/2 network connections, verify the connection + counter is correct. + """ + mgmt = self.router.management + + # verify the counter starts at zero + + counters = self._get_conn_counters() + for proto in ["tcp", "http/1", "http/2"]: + counter = counters.get(proto) + self.assertIsNotNone(counter, f"Missing expected protocol counter {proto}!") + self.assertEqual(0, counter, "counters must be zero on startup") + + # Bring up a server and client, check the counter + + connector_port = self.get_port() + listener_port = self.get_port() + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as listener: + listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + listener.bind(("", connector_port)) + listener.setblocking(True) + listener.listen(10) + + mgmt.create(type=HTTP_LISTENER_TYPE, + name="ClientListener", + attributes={'address': 'closest/http2Service', + 'port': listener_port, + 'protocolVersion': 'HTTP2'}) + mgmt.create(type=HTTP_CONNECTOR_TYPE, + name="ServerConnector", + attributes={'address': 'closest/http2Service', + 'port': connector_port, + 'protocolVersion': 'HTTP2'}) + wait_http_listeners_up(self.router.addresses[0], + l_filter={'name': 'ClientListener'}) + + self.assertTrue(retry(lambda: + self._get_conn_counters().get("http/2") == 1), + "Expected 1 HTTP/2 dispatch connection") + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client: + client.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + retry_exception(lambda cs=client, lp=listener_port: + cs.connect(("localhost", lp)), + delay=0.25, + exception=ConnectionRefusedError) + server, _ = listener.accept() + try: + # Note: not sure why only two not three, but it checks out + # against skstat -c: + self.assertTrue(retry(lambda: + self._get_conn_counters().get("http/2") == 2), + "Expected active conn count failed!") + finally: + server.close() + + mgmt.delete(type=HTTP_CONNECTOR_TYPE, name="ServerConnector") + mgmt.delete(type=HTTP_LISTENER_TYPE, name="ClientListener") + + # expect counter to return to zero + + self.assertTrue(retry(lambda: + self._get_conn_counters().get("http/2") == 0), + "Expected no active HTTP/2 connections") diff --git a/tests/system_tests_skstat.py b/tests/system_tests_skstat.py index 55ffcead3..c01770a65 100644 --- a/tests/system_tests_skstat.py +++ b/tests/system_tests_skstat.py @@ -93,12 +93,13 @@ def test_general_csv(self): out = self.run_skstat(['--general', '--csv'], regex=r'(?s)Router Statistics.*Mode","Standalone') - self.assertIn('"Connections","1"', out) + self.assertIn('"Total Connections","1"', out) self.assertIn('"Worker Threads","1"', out) self.assertIn('"Nodes","0"', out) self.assertIn('"Auto Links","0"', out) self.assertIn('"Router Id","QDR.A"', out) self.assertIn('"Mode","standalone"', out) + self.assertIn('"AMQP Service Connections","1"', out) self.assertEqual(out.count("QDR.A"), 2) def test_connections(self): @@ -693,13 +694,14 @@ def test_links_all_routers_csv(self): def _test_all_entities(self, command): out = self.run_skstat(command) - self.assertTrue(out.count('UTC') == 1) - self.assertTrue(out.count('Router Links') == 1) - self.assertTrue(out.count('Router Addresses') == 1) - self.assertTrue(out.count('Connections') == 6) - self.assertTrue(out.count('AutoLinks') == 2) - self.assertTrue(out.count('Router Statistics') == 1) - self.assertTrue(out.count('Memory Pools') == 1) + self.assertEqual(1, out.count('UTC')) + self.assertEqual(1, out.count('Router Links')) + self.assertEqual(1, out.count('Router Addresses')) + self.assertEqual(1, out.count('Total Connections')) + self.assertEqual(2, out.count('AutoLinks')) + self.assertEqual(1, out.count('Router Statistics')) + self.assertEqual(1, out.count('Memory Pools')) + self.assertEqual(1, out.count('AMQP Service Connections')) def test_all_entities(self): self._test_all_entities(['--all-entities']) @@ -710,13 +712,14 @@ def test_all_entities_csv(self): def _test_all_entities_all_routers(self, command): out = self.run_skstat(command) - self.assertTrue(out.count('UTC') == 1) - self.assertTrue(out.count('Router Links') == 2) - self.assertTrue(out.count('Router Addresses') == 2) - self.assertTrue(out.count('Connections') == 12) - self.assertTrue(out.count('AutoLinks') == 4) - self.assertTrue(out.count('Router Statistics') == 2) - self.assertTrue(out.count('Memory Pools') == 2) + self.assertEqual(1, out.count('UTC')) + self.assertEqual(2, out.count('Router Links')) + self.assertEqual(2, out.count('Router Addresses')) + self.assertEqual(2, out.count('Total Connections')) + self.assertEqual(4, out.count('AutoLinks')) + self.assertEqual(2, out.count('Router Statistics')) + self.assertEqual(2, out.count('Memory Pools')) + self.assertEqual(2, out.count('AMQP Service Connections')) def test_all_entities_all_routers(self): self._test_all_entities_all_routers(['--all-entities', '--all-routers']) diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py index 1d4209e0f..da5bff489 100644 --- a/tests/system_tests_tcp_adaptor.py +++ b/tests/system_tests_tcp_adaptor.py @@ -26,7 +26,7 @@ import traceback from subprocess import PIPE from subprocess import STDOUT -from typing import List, Optional +from typing import List, Optional, Mapping from proton import Message from proton.handlers import MessagingHandler @@ -43,7 +43,11 @@ from system_test import unittest from system_test import retry from system_test import CONNECTION_TYPE +from system_test import ROUTER_TYPE +from system_test import TCP_CONNECTOR_TYPE +from system_test import TCP_LISTENER_TYPE from system_test import retry_assertion +from system_test import retry_exception from system_tests_ssl import RouterTestSslBase from http1_tests import wait_tcp_listeners_up @@ -84,7 +88,6 @@ class TCP_echo_server: CLIENT_PRIVATE_KEY_PASSWORD = 'client-password' CA_CERT = RouterTestSslBase.ssl_file('ca-certificate.pem') - # This code takes a wild guess how long an echo server must stall receiving # input data before it fills the adaptor's flow control window in the host # router on all the various CI systems out there. @@ -681,12 +684,11 @@ def router(name, mode, connection, extra=None, ssl=False, encapsulation="legacy" cls.logger.log("TCP_TEST waiting for all tcpListeners to activate...") - LISTENER_TYPE = 'io.skupper.router.tcpListener' for rtr in cls.routers: mgmt = rtr.management listeners_ready = False while not listeners_ready: - listeners = mgmt.query(type=LISTENER_TYPE, + listeners = mgmt.query(type=TCP_LISTENER_TYPE, attribute_names=["operStatus", "name", "address"]).get_dicts() listeners_ready = True @@ -1550,33 +1552,30 @@ def test_01_mgmt(self): Create and delete TCP connectors and listeners. Ensure that the service address is properly removed on the interior router. """ - LISTENER_TYPE = 'io.skupper.router.tcpListener' - CONNECTOR_TYPE = 'io.skupper.router.tcpConnector' - mgmt = self.e_router.management van_address = self.test_name + "/test_01_mgmt" # When starting out, there should be no tcpListeners or tcpConnectors. - self.assertEqual(0, len(mgmt.query(type=LISTENER_TYPE).results)) - self.assertEqual(0, len(mgmt.query(type=CONNECTOR_TYPE).results)) + self.assertEqual(0, len(mgmt.query(type=TCP_LISTENER_TYPE).results)) + self.assertEqual(0, len(mgmt.query(type=TCP_CONNECTOR_TYPE).results)) connector_name = "ServerConnector" listener_name = "ClientListener" - mgmt.create(type=LISTENER_TYPE, + mgmt.create(type=TCP_LISTENER_TYPE, name=listener_name, attributes={'address': van_address, 'port': self.tcp_listener_port, 'host': '127.0.0.1'}) - mgmt.create(type=CONNECTOR_TYPE, + mgmt.create(type=TCP_CONNECTOR_TYPE, name=connector_name, attributes={'address': van_address, 'port': self.tcp_server_port, 'host': '127.0.0.1'}) # verify the entities have been created and tcp traffic works - self.assertEqual(1, len(mgmt.query(type=LISTENER_TYPE).results)) - self.assertEqual(1, len(mgmt.query(type=CONNECTOR_TYPE).results)) + self.assertEqual(1, len(mgmt.query(type=TCP_LISTENER_TYPE).results)) + self.assertEqual(1, len(mgmt.query(type=TCP_CONNECTOR_TYPE).results)) # now verify that the interior router sees the service address # and two proxy links are created: one outgoing for the connector and @@ -1592,12 +1591,12 @@ def test_01_mgmt(self): time.sleep(0.25) # Delete the connector and listener - out = mgmt.delete(type=CONNECTOR_TYPE, name=connector_name) # pylint: disable=assignment-from-no-return + out = mgmt.delete(type=TCP_CONNECTOR_TYPE, name=connector_name) # pylint: disable=assignment-from-no-return self.assertIsNone(out) - self.assertEqual(0, len(mgmt.query(type=CONNECTOR_TYPE).results)) - out = mgmt.delete(type=LISTENER_TYPE, name=listener_name) # pylint: disable=assignment-from-no-return + self.assertEqual(0, len(mgmt.query(type=TCP_CONNECTOR_TYPE).results)) + out = mgmt.delete(type=TCP_LISTENER_TYPE, name=listener_name) # pylint: disable=assignment-from-no-return self.assertIsNone(out) - self.assertEqual(0, len(mgmt.query(type=LISTENER_TYPE).results)) + self.assertEqual(0, len(mgmt.query(type=TCP_LISTENER_TYPE).results)) # verify the service address and proxy links are no longer active on # the interior router @@ -1622,27 +1621,24 @@ def test_02_mgmt_recreate(self): """ Verify that deleting then re-creating listeners and connectors works """ - LISTENER_TYPE = 'io.skupper.router.tcpListener' - CONNECTOR_TYPE = 'io.skupper.router.tcpConnector' - mgmt = self.e_router.management van_address = self.test_name + "/test_02_mgmt_recreate" # When starting out, there should be no tcpListeners or tcpConnectors. - self.assertEqual(0, len(mgmt.query(type=LISTENER_TYPE).results)) - self.assertEqual(0, len(mgmt.query(type=CONNECTOR_TYPE).results)) + self.assertEqual(0, len(mgmt.query(type=TCP_LISTENER_TYPE).results)) + self.assertEqual(0, len(mgmt.query(type=TCP_CONNECTOR_TYPE).results)) connector_name = "ServerConnector" listener_name = "ClientListener" for i in range(2): - mgmt.create(type=LISTENER_TYPE, + mgmt.create(type=TCP_LISTENER_TYPE, name=listener_name, attributes={'address': van_address, 'port': self.tcp_listener_port, 'host': '127.0.0.1'}) - mgmt.create(type=CONNECTOR_TYPE, + mgmt.create(type=TCP_CONNECTOR_TYPE, name=connector_name, attributes={'address': van_address, 'port': self.tcp_server_port, @@ -1651,7 +1647,7 @@ def test_02_mgmt_recreate(self): # wait until the listener has initialized def _wait_for_listener_up(): - li = mgmt.read(type=LISTENER_TYPE, name=listener_name) + li = mgmt.read(type=TCP_LISTENER_TYPE, name=listener_name) if li['operStatus'] == 'up': return True return False @@ -1659,13 +1655,13 @@ def _wait_for_listener_up(): # verify initial statistics - l_stats = mgmt.read(type=LISTENER_TYPE, name=listener_name) + l_stats = mgmt.read(type=TCP_LISTENER_TYPE, name=listener_name) self.assertEqual(0, l_stats['bytesIn']) self.assertEqual(0, l_stats['bytesOut']) self.assertEqual(0, l_stats['connectionsOpened']) self.assertEqual(0, l_stats['connectionsClosed']) - c_stats = mgmt.read(type=CONNECTOR_TYPE, name=connector_name) + c_stats = mgmt.read(type=TCP_CONNECTOR_TYPE, name=connector_name) self.assertEqual(0, c_stats['bytesIn']) self.assertEqual(0, c_stats['bytesOut']) self.assertEqual(1, c_stats['connectionsOpened']) # dispatcher @@ -1680,7 +1676,7 @@ def _wait_for_listener_up(): server.listen(1) self.assertTrue(retry(lambda: - mgmt.read(type=LISTENER_TYPE, + mgmt.read(type=TCP_LISTENER_TYPE, name=listener_name)['operStatus'] == 'up')) @@ -1710,10 +1706,10 @@ def _wait_for_listener_up(): # Wait until the test clients have closed def _wait_for_close(): - if 0 == mgmt.read(type=LISTENER_TYPE, + if 0 == mgmt.read(type=TCP_LISTENER_TYPE, name=listener_name)['connectionsClosed']: return False - if 0 == mgmt.read(type=CONNECTOR_TYPE, + if 0 == mgmt.read(type=TCP_CONNECTOR_TYPE, name=connector_name)['connectionsClosed']: return False return True @@ -1721,13 +1717,13 @@ def _wait_for_close(): # Verify updated statistics. - l_stats = mgmt.read(type=LISTENER_TYPE, name=listener_name) + l_stats = mgmt.read(type=TCP_LISTENER_TYPE, name=listener_name) self.assertEqual(10, l_stats['bytesIn']) self.assertEqual(4, l_stats['bytesOut']) self.assertEqual(1, l_stats['connectionsOpened']) self.assertEqual(1, l_stats['connectionsClosed']) - c_stats = mgmt.read(type=CONNECTOR_TYPE, name=connector_name) + c_stats = mgmt.read(type=TCP_CONNECTOR_TYPE, name=connector_name) self.assertEqual(4, c_stats['bytesIn']) self.assertEqual(10, c_stats['bytesOut']) self.assertEqual(2, c_stats['connectionsOpened']) @@ -1735,11 +1731,11 @@ def _wait_for_close(): # splendid! Not delete all the things - mgmt.delete(type=LISTENER_TYPE, name=listener_name) - self.assertEqual(0, len(mgmt.query(type=LISTENER_TYPE).results)) + mgmt.delete(type=TCP_LISTENER_TYPE, name=listener_name) + self.assertEqual(0, len(mgmt.query(type=TCP_LISTENER_TYPE).results)) - mgmt.delete(type=CONNECTOR_TYPE, name=connector_name) - self.assertEqual(0, len(mgmt.query(type=CONNECTOR_TYPE).results)) + mgmt.delete(type=TCP_CONNECTOR_TYPE, name=connector_name) + self.assertEqual(0, len(mgmt.query(type=TCP_CONNECTOR_TYPE).results)) # attempting to connect should fail once the listener socket has # been closed @@ -1758,9 +1754,6 @@ class TcpAdaptorListenerConnectTest(TestCase): """ Test client connecting to TcpListeners in various scenarios """ - LISTENER_TYPE = 'io.skupper.router.tcpListener' - CONNECTOR_TYPE = 'io.skupper.router.tcpConnector' - @classmethod def setUpClass(cls): super(TcpAdaptorListenerConnectTest, cls).setUpClass() @@ -1830,13 +1823,13 @@ def test_01_no_service(self): connector_port = self.tester.get_port() a_mgmt = self.INTA.management - a_mgmt.create(type=self.LISTENER_TYPE, + a_mgmt.create(type=TCP_LISTENER_TYPE, name="ClientListener01", attributes={'address': van_address, 'port': listener_port}) b_mgmt = self.INTB.management - b_mgmt.create(type=self.CONNECTOR_TYPE, + b_mgmt.create(type=TCP_CONNECTOR_TYPE, name="ServerConnector01", attributes={'address': van_address, 'host': '127.0.0.1', @@ -1850,7 +1843,7 @@ def test_01_no_service(self): # connect to. while True: - listener = a_mgmt.read(type=self.LISTENER_TYPE, + listener = a_mgmt.read(type=TCP_LISTENER_TYPE, name='ClientListener01') if listener['operStatus'] != 'up': time.sleep(0.1) @@ -1883,8 +1876,8 @@ def test_01_no_service(self): # Yay we did not hang! Test passed break - a_mgmt.delete(type=self.LISTENER_TYPE, name="ClientListener01") - b_mgmt.delete(type=self.CONNECTOR_TYPE, name="ServerConnector01") + a_mgmt.delete(type=TCP_LISTENER_TYPE, name="ClientListener01") + b_mgmt.delete(type=TCP_CONNECTOR_TYPE, name="ServerConnector01") self.INTA.wait_address_unsubscribed(van_address) @@ -1903,7 +1896,7 @@ def _test_listener_socket_lifecycle(self, connector_port = self.tester.get_port() l_mgmt = l_router.management - l_mgmt.create(type=self.LISTENER_TYPE, + l_mgmt.create(type=TCP_LISTENER_TYPE, name=listener_name, attributes={'address': van_address, 'port': listener_port}) @@ -1911,7 +1904,7 @@ def _test_listener_socket_lifecycle(self, # since there is no connector present, the operational state must be # down and connection attempts must be refused - listener = l_mgmt.read(type=self.LISTENER_TYPE, name=listener_name) + listener = l_mgmt.read(type=TCP_LISTENER_TYPE, name=listener_name) self.assertEqual('down', listener['operStatus']) with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_conn: @@ -1929,7 +1922,7 @@ def _test_listener_socket_lifecycle(self, server.bind(("", connector_port)) server.listen(1) - c_mgmt.create(type=self.CONNECTOR_TYPE, + c_mgmt.create(type=TCP_CONNECTOR_TYPE, name=connector_name, attributes={'address': van_address, 'host': '127.0.0.1', @@ -1945,7 +1938,7 @@ def _test_listener_socket_lifecycle(self, # expect the listener socket to come up while True: - listener = l_mgmt.read(type=self.LISTENER_TYPE, name=listener_name) + listener = l_mgmt.read(type=TCP_LISTENER_TYPE, name=listener_name) if listener['operStatus'] != 'up': time.sleep(0.1) continue @@ -1971,11 +1964,11 @@ def _test_listener_socket_lifecycle(self, # Teardown the connector, expect listener admin state to go down # and connections be refused - c_mgmt.delete(type=self.CONNECTOR_TYPE, name=connector_name) + c_mgmt.delete(type=TCP_CONNECTOR_TYPE, name=connector_name) l_router.wait_address_unsubscribed(van_address) while True: - listener = l_mgmt.read(type=self.LISTENER_TYPE, name=listener_name) + listener = l_mgmt.read(type=TCP_LISTENER_TYPE, name=listener_name) if listener['operStatus'] != 'down': time.sleep(0.1) continue @@ -1992,7 +1985,7 @@ def _test_listener_socket_lifecycle(self, # Test successful break - l_mgmt.delete(type=self.LISTENER_TYPE, name=listener_name) + l_mgmt.delete(type=TCP_LISTENER_TYPE, name=listener_name) def test_02_listener_interior(self): """ @@ -2069,7 +2062,7 @@ def test_delete_tcp_connection(self): client_conn.connect(('127.0.0.1', self.good_listener_port)) qd_manager = QdManager(self.address) conn_id = None - results = qd_manager.query("io.skupper.router.connection") + results = qd_manager.query(CONNECTION_TYPE) for result in results: conn_direction = result['dir'] # Find the id of the tcp connection we want to delete. @@ -2081,7 +2074,7 @@ def test_delete_tcp_connection(self): self.assertIsNotNone(conn_id, "Expected connection id to be not None") def check_connection_deleted(): - outs = qd_manager.query("io.skupper.router.connection") + outs = qd_manager.query(CONNECTION_TYPE) is_conn_present = False for out in outs: if out['identity'] == conn_id: @@ -2182,5 +2175,119 @@ def run(self): Container(self).run() +class TcpAdaptorConnCounter(TestCase): + """ + Validate the TCP service connection counter + """ + @classmethod + def setUpClass(cls): + super(TcpAdaptorConnCounter, cls).setUpClass() + + config = [ + ('router', {'mode': 'interior', + 'id': 'TCPConnCounter'}), + ('listener', {'role': 'normal', + 'port': cls.tester.get_port()}), + ('address', {'prefix': 'closest', 'distribution': 'closest'}), + ('address', {'prefix': 'multicast', 'distribution': 'multicast'}), + ] + config = Qdrouterd.Config(config) + cls.router = cls.tester.qdrouterd('TCPConnCounter', config) + + def _get_conn_counters(self) -> Mapping[str, int]: + attributes = ["connectionCounters"] + rc = self.router.management.query(type=ROUTER_TYPE, + attribute_names=attributes) + self.assertIsNotNone(rc, "unexpected query failure") + self.assertEqual(1, len(rc.get_dicts()), "expected one attribute!") + counters = rc.get_dicts()[0].get("connectionCounters") + self.assertIsNotNone(counters, "expected a counter map to be returned") + return counters + + def _run_test(self, encaps, idle_ct, active_ct): + # idle_ct: expected connection count when tcp configured, but prior to + # connections active + # activ_ct: expected connection count when 1 client and 1 server + # connected + mgmt = self.router.management + + # verify the counters start at zero (not including amqp) + + counters = self._get_conn_counters() + for proto in ["tcp", "http/1", "http/2"]: + counter = counters.get(proto) + self.assertIsNotNone(counter, f"Missing expected protocol counter {proto}!") + self.assertEqual(0, counter, "counters must be zero on startup") + + # Bring up a server and client, check the counter + + connector_port = self.get_port() + listener_port = self.get_port() + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as listener: + listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + listener.bind(("", connector_port)) + listener.setblocking(True) + listener.listen(10) + + mgmt.create(type=TCP_LISTENER_TYPE, + name="ClientListener", + attributes={'address': 'closest/tcpService', + 'port': listener_port, + 'encapsulation': encaps}) + mgmt.create(type=TCP_CONNECTOR_TYPE, + name="ServerConnector", + attributes={'address': 'closest/tcpService', + 'host': "localhost", + 'port': connector_port, + 'encapsulation': encaps}) + + wait_tcp_listeners_up(self.router.addresses[0], + l_filter={'name': 'ClientListener'}) + + # expect that simply configuring the tcp listener/connector will + # instantiate the "dispatcher" connection: + + errmsg = "Expected idle count failed!" + errmsg += "\nIf you fixed the phantom tcp-lite connection counter" + errmsg += " please update this test with the new counter values!" + self.assertTrue(retry(lambda: + self._get_conn_counters().get("tcp") == idle_ct), + errmsg) + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client: + client.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + retry_exception(lambda cs=client, lp=listener_port: + cs.connect(("localhost", lp)), + delay=0.25, + exception=ConnectionRefusedError) + server, _ = listener.accept() + try: + self.assertTrue(retry(lambda: + self._get_conn_counters().get("tcp") == + active_ct), + f"Expected {active_ct} got {self._get_conn_counters()}!") + finally: + server.shutdown(socket.SHUT_RDWR) + server.close() + client.shutdown(socket.SHUT_RDWR) # context manager calls close + + mgmt.delete(type=TCP_CONNECTOR_TYPE, name="ServerConnector") + mgmt.delete(type=TCP_LISTENER_TYPE, name="ClientListener") + + # expect tcp counter to return to zero once config is cleaned up + + self.assertTrue(retry(lambda: + self._get_conn_counters().get("tcp") == 0), + f"Expected 0 connections, got {self._get_conn_counters()}") + + def test_01_check_counter(self): + """ Create and destroy TCP network connections, verify the connection + counter is correct. + """ + for encaps, idle_ct, active_ct in [('legacy', 1, 3), ('lite', 2, 4)]: + self._run_test(encaps, idle_ct, active_ct) + + if __name__ == '__main__': unittest.main(main_module()) diff --git a/tools/skstat b/tools/skstat index 6d78dcc19..61e87d32d 100755 --- a/tools/skstat +++ b/tools/skstat @@ -334,7 +334,7 @@ class BusManager: rows.append(('Links', PlainNum(router.linkCount))) rows.append(('Nodes', PlainNum(router.nodeCount))) rows.append(('Addresses', PlainNum(router.addrCount))) - rows.append(('Connections', PlainNum(router.connectionCount))) + rows.append(('Total Connections', PlainNum(router.connectionCount))) # Overall delivery related counts. # These overall statistics were introduced in 1.1 version. @@ -364,6 +364,13 @@ class BusManager: except: pass + # per-protocol user connection counts + try: + for proto, count in router.connectionCounters.items(): + rows.append((f"{proto.upper()} Service Connections", PlainNum(count))) + except: + pass + title = "Router Statistics" dispRows = rows disp.formattedTable(title, heads, dispRows)