diff --git a/include/qpid/dispatch/connection_counters.h b/include/qpid/dispatch/connection_counters.h new file mode 100644 index 000000000..eb3f0c1a8 --- /dev/null +++ b/include/qpid/dispatch/connection_counters.h @@ -0,0 +1,43 @@ +#ifndef __connection_counters_h__ +#define __connection_counters_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/**@file + * System-wide counters for tracking open connections per protocol type. + */ + +#include "qpid/dispatch/protocols.h" + +#include + +// Increment the connection counter for the 'proto' protocol +// +void qd_connection_counter_inc(qd_protocol_t proto); + +// Decrement the connection counter for the 'proto' protocol +// +void qd_connection_counter_dec(qd_protocol_t proto); + +// Fetch the current value of the connection counter for 'proto' +// +uint64_t qd_connection_count(qd_protocol_t proto); + + +#endif diff --git a/include/qpid/dispatch/protocols.h b/include/qpid/dispatch/protocols.h new file mode 100644 index 000000000..874c70079 --- /dev/null +++ b/include/qpid/dispatch/protocols.h @@ -0,0 +1,37 @@ +#ifndef __protocols_h__ +#define __protocols_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "qpid/dispatch/enum.h" + +// Network protocols supported by the router +// +typedef enum { + QD_PROTOCOL_TCP, + QD_PROTOCOL_AMQP, + QD_PROTOCOL_HTTP1, + QD_PROTOCOL_HTTP2, + QD_PROTOCOL_TOTAL // must be last +} qd_protocol_t; + +// Defines qd_protocol_name(qd_protocol_t) +// +ENUM_DECLARE(qd_protocol); +#endif diff --git a/python/skupper_router/management/skrouter.json b/python/skupper_router/management/skrouter.json index 044f2e9b4..e5fb9af68 100644 --- a/python/skupper_router/management/skrouter.json +++ b/python/skupper_router/management/skrouter.json @@ -599,6 +599,10 @@ "graph": true, "description": "The current amount of system memory in use by the router process in bytes (RSS: Resident Set Size). This is the portion of the process memory space that currently resides in RAM. This value is set to Null if the platform does not provide access to the process resident memory size." }, + "connectionCounters": { + "type": "map", + "description": "A map keyed by network protocol name with a value of the count of active service connections using that protocol at this router node. Currently defined key values are: amqp, http1, http2, and tcp" + }, "dataConnectionCount": { "description": "The number of parallel data connections to carry streaming data between routers. Applies only to interior routers", "type": "string", diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 3b27523c6..fc7a3f323 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -140,6 +140,8 @@ set(qpid_dispatch_SOURCES trace_mask.c python_utils.c qd_asan_interface.c + protocols.c + connection_counters.c ) set(qpid_dispatch_INCLUDES diff --git a/src/adaptors/http1/http1_client.c b/src/adaptors/http1/http1_client.c index 00fee5102..6ac6b0bbf 100644 --- a/src/adaptors/http1/http1_client.c +++ b/src/adaptors/http1/http1_client.c @@ -24,6 +24,7 @@ #include "http1_private.h" #include "qpid/dispatch/protocol_adaptor.h" +#include "qpid/dispatch/connection_counters.h" #include #include @@ -375,6 +376,7 @@ static void _setup_client_connection(qdr_http1_connection_t *hconn) 0, // bind context 0); // bind token qdr_connection_set_context(hconn->qdr_conn, hconn); + qd_connection_counter_inc(QD_PROTOCOL_HTTP1); hconn->oper_status = QD_CONN_OPER_UP; qd_log(LOG_HTTP_ADAPTOR, QD_LOG_DEBUG, "[C%" PRIu64 "] HTTP connection to client created", @@ -744,6 +746,7 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi if (hconn->qdr_conn) { qdr_connection_set_context(hconn->qdr_conn, 0); qdr_connection_closed(hconn->qdr_conn); + qd_connection_counter_dec(QD_PROTOCOL_HTTP1); hconn->qdr_conn = 0; } diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c index a52b7f76b..53587a977 100644 --- a/src/adaptors/http1/http1_server.c +++ b/src/adaptors/http1/http1_server.c @@ -19,6 +19,7 @@ #include "adaptors/adaptor_tls.h" #include "http1_private.h" +#include "qpid/dispatch/connection_counters.h" #include @@ -224,6 +225,7 @@ static qdr_http1_connection_t *_create_server_connection(qd_http_connector_t *co info, 0, // bind context 0); // bind token + qd_connection_counter_inc(QD_PROTOCOL_HTTP1); // wait for the raw connection to come up before creating the in and out links @@ -2020,6 +2022,7 @@ void qdr_http1_server_core_conn_close(qdr_http1_adaptor_t *adaptor, hconn->oper_status = QD_CONN_OPER_DOWN; _teardown_server_links(hconn); qdr_connection_closed(qdr_conn); + qd_connection_counter_dec(QD_PROTOCOL_HTTP1); qdr_http1_close_connection(hconn, 0); // it is expected that this callback is the final callback before returning diff --git a/src/adaptors/http2/http2_adaptor.c b/src/adaptors/http2/http2_adaptor.c index 3e53b9c5c..267a0104a 100644 --- a/src/adaptors/http2/http2_adaptor.c +++ b/src/adaptors/http2/http2_adaptor.c @@ -25,6 +25,7 @@ #include "qpid/dispatch/connection_manager.h" #include "qpid/dispatch/dispatch.h" #include "qpid/dispatch/protocol_adaptor.h" +#include "qpid/dispatch/connection_counters.h" #include #include @@ -2795,6 +2796,7 @@ qdr_http2_connection_t *qdr_http_connection_ingress_accept(qdr_http2_connection_ ingress_http_conn->qdr_conn = conn; qdr_connection_set_context(conn, ingress_http_conn); ingress_http_conn->connection_established = true; + qd_connection_counter_inc(QD_PROTOCOL_HTTP2); qd_log(LOG_HTTP_ADAPTOR, QD_LOG_DEBUG, "[C%" PRIu64 "] qdr_http_connection_ingress_accept, qdr_connection_t object created ", ingress_http_conn->conn_id); @@ -2880,6 +2882,7 @@ static void close_connections(qdr_http2_connection_t* conn) qdr_connection_set_context(conn->qdr_conn, 0); if (conn->qdr_conn) { qdr_connection_closed(conn->qdr_conn); + qd_connection_counter_dec(QD_PROTOCOL_HTTP2); conn->qdr_conn = 0; } qdr_action_t *action = qdr_action(qdr_del_http2_connection_CT, "delete_http2_connection"); @@ -2970,6 +2973,7 @@ static void egress_conn_timer_handler(void *context) // sys_mutex_unlock(qd_server_get_activation_lock(http2_adaptor->core->qd->server)); qdr_connection_closed(conn->qdr_conn); + qd_connection_counter_dec(QD_PROTOCOL_HTTP2); free_qdr_http2_connection(conn, false); return; } @@ -3123,6 +3127,7 @@ qdr_http2_connection_t *qdr_http_connection_egress(qd_http_connector_t *connecto connector->ctx = conn; qdr_connection_set_context(conn, egress_http_conn); + qd_connection_counter_inc(QD_PROTOCOL_HTTP2); create_dummy_link_on_egress_conn(egress_http_conn); return egress_http_conn; } diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c index 53641d3f3..296bfcbe7 100644 --- a/src/adaptors/reference_adaptor.c +++ b/src/adaptors/reference_adaptor.c @@ -23,6 +23,7 @@ #include "qpid/dispatch/message.h" #include "qpid/dispatch/protocol_adaptor.h" #include "qpid/dispatch/timer.h" +#include "qpid/dispatch/connection_counters.h" #include #include @@ -394,6 +395,7 @@ static void qdr_ref_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t static void qdr_ref_conn_close(void *context, qdr_connection_t *conn, qdr_error_t *error) { + qd_connection_counter_dec(QD_PROTOCOL_TCP); } @@ -438,6 +440,7 @@ static void on_startup(void *context) info, // connection_info 0, // context_binder 0); // bind_token + qd_connection_counter_inc(QD_PROTOCOL_TCP); uint64_t link_id; diff --git a/src/adaptors/tcp/tcp_adaptor.c b/src/adaptors/tcp/tcp_adaptor.c index 4d0e2f1ce..c73d3773c 100644 --- a/src/adaptors/tcp/tcp_adaptor.c +++ b/src/adaptors/tcp/tcp_adaptor.c @@ -27,6 +27,7 @@ #include "qpid/dispatch/alloc_pool.h" #include "qpid/dispatch/amqp.h" #include "qpid/dispatch/ctools.h" +#include "qpid/dispatch/connection_counters.h" #include #include @@ -250,6 +251,7 @@ static void on_activate(void *context) detach_links(conn); qdr_connection_set_context(conn->qdr_conn, 0); qdr_connection_closed(conn->qdr_conn); + qd_connection_counter_dec(QD_PROTOCOL_TCP); conn->qdr_conn = 0; free_qdr_tcp_connection(conn); } @@ -667,6 +669,7 @@ static void handle_disconnected(qdr_tcp_connection_t* conn) if (conn->qdr_conn) { qdr_connection_set_context(conn->qdr_conn, 0); qdr_connection_closed(conn->qdr_conn); + qd_connection_counter_dec(QD_PROTOCOL_TCP); conn->qdr_conn = 0; } @@ -928,6 +931,7 @@ static void qdr_tcp_connection_ingress_accept(qdr_tcp_connection_t* tc) 0); // bind_token tc->qdr_conn = conn; qdr_connection_set_context(conn, tc); + qd_connection_counter_inc(QD_PROTOCOL_TCP); qdr_terminus_t *dynamic_source = qdr_terminus(0); qdr_terminus_set_dynamic(dynamic_source); @@ -1357,6 +1361,7 @@ static void qdr_tcp_create_server_side_connection(qdr_tcp_connection_t* tc) 0); // bind_token tc->qdr_conn = conn; qdr_connection_set_context(conn, tc); + qd_connection_counter_inc(QD_PROTOCOL_TCP); qdr_terminus_t *source = qdr_terminus(0); qdr_terminus_set_address(source, tc->config->adaptor_config->address); diff --git a/src/adaptors/tcp_lite/tcp_lite.c b/src/adaptors/tcp_lite/tcp_lite.c index 57e68e5a3..38abd4b2a 100644 --- a/src/adaptors/tcp_lite/tcp_lite.c +++ b/src/adaptors/tcp_lite/tcp_lite.c @@ -27,6 +27,7 @@ #include #include #include +#include #include #include #include @@ -194,7 +195,7 @@ static qdr_connection_t *TL_open_core_connection(uint64_t conn_id, bool incoming info, // connection_info 0, // context_binder 0); // bind_token - + qd_connection_counter_inc(QD_PROTOCOL_TCP); return conn; } @@ -457,6 +458,7 @@ static void close_connection_XSIDE_IO(tcplite_connection_t *conn, bool no_delay) if (!!conn->common.core_conn) { qdr_connection_closed(conn->common.core_conn); + qd_connection_counter_dec(QD_PROTOCOL_TCP); } if (!!conn->common.vflow) { @@ -1634,6 +1636,7 @@ void qd_dispatch_delete_tcp_listener_lite(qd_dispatch_t *qd, tcplite_listener_t if (!tcplite_context->adaptor_finalizing) { if (!!li->common.core_conn) { qdr_connection_closed(li->common.core_conn); + qd_connection_counter_dec(QD_PROTOCOL_TCP); } if (!!li->adaptor_listener) { @@ -1730,6 +1733,7 @@ void qd_dispatch_delete_tcp_connector_lite(qd_dispatch_t *qd, tcplite_connector_ if (!tcplite_context->adaptor_finalizing) { qdr_connection_closed(cr->common.core_conn); + qd_connection_counter_dec(QD_PROTOCOL_TCP); } else { tcplite_connection_t *conn = DEQ_HEAD(cr->connections); if (!!conn) { diff --git a/src/connection_counters.c b/src/connection_counters.c new file mode 100644 index 000000000..f96d99d13 --- /dev/null +++ b/src/connection_counters.c @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "qpid/dispatch/connection_counters.h" +#include "qpid/dispatch/atomic.h" + +atomic_uint_fast64_t qd_connection_counters[QD_PROTOCOL_TOTAL]; + +void qd_connection_counter_inc(qd_protocol_t proto) +{ + assert(proto < QD_PROTOCOL_TOTAL); + atomic_fetch_add_explicit(&qd_connection_counters[proto], 1, memory_order_relaxed); +} + +void qd_connection_counter_dec(qd_protocol_t proto) +{ + assert(proto < QD_PROTOCOL_TOTAL); + uint64_t old = atomic_fetch_sub_explicit(&qd_connection_counters[proto], 1, memory_order_relaxed); + (void) old; + assert(old != 0); // underflow! +} + +uint64_t qd_connection_count(qd_protocol_t proto) +{ + assert(proto < QD_PROTOCOL_TOTAL); + return (uint64_t) atomic_load_explicit(&qd_connection_counters[proto], memory_order_relaxed); +} + + diff --git a/src/http-libwebsockets.c b/src/http-libwebsockets.c index 9bbaac979..fdb9b4850 100644 --- a/src/http-libwebsockets.c +++ b/src/http-libwebsockets.c @@ -29,6 +29,7 @@ #include "qpid/dispatch/protocol_adaptor.h" #include "qpid/dispatch/threading.h" #include "qpid/dispatch/timer.h" +#include "qpid/dispatch/connection_counters.h" #include #include @@ -613,6 +614,43 @@ static size_t _write_metric(uint8_t **start, size_t available, const char *name, return rc1 + rc2; } +// Write all the per-protocol connection counters. Return the total octets written (not including null terminator) or +// zero on error. +// +// On successful return (*start) will be advanced to the terminating null byte. +// +static size_t _write_conn_counter_metrics(uint8_t **start, size_t available) +{ + char name_buffer[MAX_METRIC_NAME_LEN + 1]; + const size_t save = available; + + for (int proto = 0; proto < QD_PROTOCOL_TOTAL; ++proto) { + const char *proto_name = qd_protocol_name(proto); + assert(proto_name); + int ct = snprintf(name_buffer, sizeof(name_buffer), "qdr_%s_service_connections", proto_name); + if (ct < 0 || ct >= sizeof(name_buffer)) { // overrun! + assert(false); // you need to increase the output_buffer size! + return 0; + } + + // Prometheus restricts metric names to the following character set: [a-zA-Z_:]. Protocol names may include + // other characters, like 'http/1'. Convert any illegal characters to '_' + + for (char *ptr = name_buffer; *ptr; ++ptr) { + if (!isalnum(*ptr) && *ptr != '_' && *ptr != ':') + *ptr = '_'; + } + + size_t rc = _write_metric(start, available, name_buffer, "gauge", qd_connection_count(proto)); + if (rc == 0) { + return 0; // error writing, close the connection + } + available -= rc; + } + + return save - available; +} + // Write all the router global metrics to the output buffer. Return the total octets written (not including null // terminator) or zero on error. // @@ -742,7 +780,8 @@ static size_t _generate_metrics_response(stats_request_state_t *state, uint8_t * { if (_write_global_metrics(state, start, end - *start) == 0 || _write_allocator_metrics(start, end - *start) == 0 - || _write_memory_metrics(start, end - *start) == 0) { + || _write_memory_metrics(start, end - *start) == 0 + || _write_conn_counter_metrics(start, end - *start) == 0) { // error, close the connection return 0; } @@ -776,8 +815,10 @@ static int callback_metrics(struct lws *wsi, enum lws_callback_reasons reason, // alloc_pool metrics (+ 1 for qdr_alloc_pool_bytes): + (DEQ_SIZE(allocator_metrics) * PER_METRIC_BUF_SIZE * PER_ALLOC_METRIC_COUNT) + PER_METRIC_BUF_SIZE - // qdr_router_vmsize_bytes and qdr_router_rss_bytes + // qdr_router_vmsize_bytes and qdr_router_rss_bytes: + (2 * PER_METRIC_BUF_SIZE) + // connection counters by protocol: + + (QD_PROTOCOL_TOTAL * PER_METRIC_BUF_SIZE) // 1 terminating null + 1; stats->state = new_stats_request_state(buf_size); diff --git a/src/protocols.c b/src/protocols.c new file mode 100644 index 000000000..94a289e39 --- /dev/null +++ b/src/protocols.c @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "qpid/dispatch/protocols.h" + +#include + +// Note: names must match the order of the corresponding enum entries in qd_protocol_t, and any changes here require +// updating the connectionCounters router entity attribute in skrouter.json. +// +static const char *_names[QD_PROTOCOL_TOTAL] = { + "tcp", "amqp", "http1", "http2" +}; + +// defines function qd_protocol_name(qd_protocol_t) +// +ENUM_DEFINE(qd_protocol, _names); diff --git a/src/router_core/agent_router.c b/src/router_core/agent_router.c index 8bdb724c3..7e8e91ba9 100644 --- a/src/router_core/agent_router.c +++ b/src/router_core/agent_router.c @@ -18,8 +18,9 @@ */ #include "agent_router.h" - #include "config.h" +#include "qpid/dispatch/protocols.h" +#include "qpid/dispatch/connection_counters.h" #include @@ -57,6 +58,7 @@ #define QDR_ROUTER_MEMORY_USAGE 30 #define QDR_ROUTER_WORKER_THREADS 31 #define QDR_ROUTER_RSS_USAGE 32 +#define QDR_ROUTER_CONNECTION_COUNTERS 33 const char *qdr_router_columns[] = {"name", @@ -92,6 +94,7 @@ const char *qdr_router_columns[] = "memoryUsage", "workerThreads", "residentMemoryUsage", + "connectionCounters", 0}; @@ -257,6 +260,16 @@ static void qdr_agent_write_column_CT(qd_composed_field_t *body, int col, qdr_co qd_compose_insert_null(body); } break; + case QDR_ROUTER_CONNECTION_COUNTERS: { + qd_compose_start_map(body); + for (qd_protocol_t proto = 0; proto < QD_PROTOCOL_TOTAL; ++proto) { + qd_compose_insert_string(body, qd_protocol_name(proto)); + qd_compose_insert_ulong(body, qd_connection_count(proto)); + } + qd_compose_end_map(body); + break; + } + default: qd_compose_insert_null(body); break; diff --git a/src/router_core/agent_router.h b/src/router_core/agent_router.h index 4af7181ee..8b2a9c6c9 100644 --- a/src/router_core/agent_router.h +++ b/src/router_core/agent_router.h @@ -21,7 +21,7 @@ #include "router_core_private.h" -#define QDR_ROUTER_COLUMN_COUNT 33 +#define QDR_ROUTER_COLUMN_COUNT 34 extern const char *qdr_router_columns[QDR_ROUTER_COLUMN_COUNT + 1]; diff --git a/src/router_node.c b/src/router_node.c index 57161b342..aa9aa4684 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -27,6 +27,8 @@ #include #include #include +#include +#include #include @@ -1554,6 +1556,10 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool connection_info, bind_connection_context, conn); + if (role == QDR_ROLE_NORMAL) { + // These counters track the number of 'user' connections, not infrastructure connections + qd_connection_counter_inc(QD_PROTOCOL_AMQP); + } if (conn->connector) { char conn_msg[300]; @@ -1739,6 +1745,9 @@ static int AMQP_closed_handler(void *type_context, qd_connection_t *conn, void * qdr_connection_set_context(qdrc, 0); sys_mutex_unlock(qd_server_get_activation_lock(router->qd->server)); + if (qdrc->role == QDR_ROLE_NORMAL) { + qd_connection_counter_dec(QD_PROTOCOL_AMQP); + } qdr_connection_closed(qdrc); qd_connection_set_context(conn, 0); } diff --git a/tests/system_tests_default_distribution.py b/tests/system_tests_default_distribution.py index da1ac5c4f..d18b9d5fe 100644 --- a/tests/system_tests_default_distribution.py +++ b/tests/system_tests_default_distribution.py @@ -74,7 +74,7 @@ def test_anonymous_sender(self): def test_general(self): out = self.run_skstat(['--general'], r'(?s)Router Statistics.*Mode\s*Standalone') - self.assertIn("Connections 1", out) + self.assertIn("Total Connections 1", out) self.assertIn("Nodes 0", out) self.assertIn("Auto Links 0", out) self.assertIn("Router Id QDR", out) diff --git a/tests/system_tests_edge_router.py b/tests/system_tests_edge_router.py index 8ea9a7129..7c4aee615 100644 --- a/tests/system_tests_edge_router.py +++ b/tests/system_tests_edge_router.py @@ -1302,7 +1302,7 @@ def test_69_interior_skstat_all_routers(self): address=self.routers[0].addresses[0]) self.assertEqual(outs.count("Router Links"), 2) self.assertEqual(outs.count("Router Addresses"), 2) - self.assertEqual(outs.count("Connections"), 12) + self.assertEqual(outs.count("Total Connections"), 2) self.assertEqual(outs.count("Router Statistics"), 2) self.assertEqual(outs.count("Memory Pools"), 2) diff --git a/tests/system_tests_http.py b/tests/system_tests_http.py index af8a21c20..9bd6c8ea0 100644 --- a/tests/system_tests_http.py +++ b/tests/system_tests_http.py @@ -18,6 +18,7 @@ # import errno import os +import re import threading import ssl @@ -219,7 +220,11 @@ def test_http_metrics(self): "qdr_deliveries_delayed_1sec_total", "qdr_deliveries_delayed_10sec_total", "qdr_deliveries_stuck_total", - "qdr_links_blocked_total"] + "qdr_links_blocked_total", + "qdr_tcp_service_connections", + "qdr_amqp_service_connections", + "qdr_http1_service_connections", + "qdr_http2_service_connections"] for stat in r.management.query(type=ALLOCATOR_TYPE).get_dicts(): stat_names.append(stat['typeName']) @@ -229,6 +234,14 @@ def _test(stat_names, port): self.assertEqual(200, resp.getcode()) metrics = [x for x in resp.read().decode('utf-8').splitlines() if not x.startswith("#")] + # Verify that all metric names are valid prometheus names that + # must match the regex [a-zA-Z_:][a-zA-Z0-9_:]* + for metric in metrics: + # remove trailing counter + mname = metric.strip().split()[0] + match = re.fullmatch(r'([a-zA-Z_:])([a-zA-Z0-9_:])*', mname) + self.assertIsNotNone(match, f"Metric {mname} has invalid name syntax") + # Verify that all expected stats are reported by the metrics URL for name in stat_names: diff --git a/tests/system_tests_http1_adaptor.py b/tests/system_tests_http1_adaptor.py index dd5a1bfd6..f57a7c110 100644 --- a/tests/system_tests_http1_adaptor.py +++ b/tests/system_tests_http1_adaptor.py @@ -31,6 +31,7 @@ from ssl import SSLContext, PROTOCOL_TLS_CLIENT, PROTOCOL_TLS_SERVER from ssl import CERT_REQUIRED, SSLSocket from time import sleep, time +from typing import Mapping from email.parser import BytesParser from proton import Message @@ -39,7 +40,7 @@ from system_test import retry_exception, curl_available, run_curl, retry from system_test import nginx_available, get_digest, NginxServer, Process from system_test import openssl_available, is_pattern_present -from system_test import HTTP_CONNECTOR_TYPE, HTTP_LISTENER_TYPE +from system_test import HTTP_CONNECTOR_TYPE, HTTP_LISTENER_TYPE, ROUTER_TYPE from system_test import CONNECTION_TYPE, HTTP_REQ_INFO_TYPE, ROUTER_LINK_TYPE from http1_tests import http1_ping, TestServer, RequestHandler10 from http1_tests import RequestMsg, ResponseMsg, ResponseValidator @@ -3203,5 +3204,99 @@ def test_01_client_backpressure(self): server.close() +class Http1AdaptorConnCounter(TestCase): + """ + Validate the HTTP1 service connection counter + """ + @classmethod + def setUpClass(cls): + super(Http1AdaptorConnCounter, cls).setUpClass() + + config = [ + ('router', {'mode': 'interior', + 'id': 'HTTP1ConnCounter'}), + ('listener', {'role': 'normal', + 'port': cls.tester.get_port()}), + ('address', {'prefix': 'closest', 'distribution': 'closest'}), + ('address', {'prefix': 'multicast', 'distribution': 'multicast'}), + ] + config = Qdrouterd.Config(config) + cls.router = cls.tester.qdrouterd('HTTP1ConnCounter', config) + + def _get_conn_counters(self) -> Mapping[str, int]: + attributes = ["connectionCounters"] + rc = self.router.management.query(type=ROUTER_TYPE, + attribute_names=attributes) + self.assertIsNotNone(rc, "unexpected query failure") + self.assertEqual(1, len(rc.get_dicts()), "expected one attribute!") + counters = rc.get_dicts()[0].get("connectionCounters") + self.assertIsNotNone(counters, "expected a counter map to be returned") + return counters + + def test_01_check_counter(self): + """ Create and destroy HTTP1 network connections, verify the connection + counter is correct. + """ + mgmt = self.router.management + + # verify the counters start at zero (not including amqp) + counters = self._get_conn_counters() + for proto in ["tcp", "http1", "http2"]: + counter = counters.get(proto) + self.assertIsNotNone(counter, f"Missing expected protocol counter {proto}!") + self.assertEqual(0, counter, "counters must be zero on startup") + + # Bring up a server and client, check the counter + + connector_port = self.get_port() + listener_port = self.get_port() + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as listener: + listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + listener.bind(("", connector_port)) + listener.setblocking(True) + listener.listen(10) + + mgmt.create(type=HTTP_LISTENER_TYPE, + name="ClientListener", + attributes={'address': 'closest/http1Service', + 'port': listener_port, + 'protocolVersion': 'HTTP1'}) + mgmt.create(type=HTTP_CONNECTOR_TYPE, + name="ServerConnector", + attributes={'address': 'closest/http1Service', + 'port': connector_port, + 'protocolVersion': 'HTTP1'}) + + server, _ = listener.accept() + try: + wait_http_listeners_up(self.router.addresses[0], + l_filter={'name': 'ClientListener'}) + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client: + client.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + retry_exception(lambda cs=client, lp=listener_port: + cs.connect(("localhost", lp)), + delay=0.25, + exception=ConnectionRefusedError) + + # verify that there are now two HTTP/1 connections: + + self.assertTrue(retry(lambda: + self._get_conn_counters().get("http1") == 2), + "Expected 2 active HTTP connections") + finally: + server.close() + + mgmt.delete(type=HTTP_CONNECTOR_TYPE, name="ServerConnector") + mgmt.delete(type=HTTP_LISTENER_TYPE, name="ClientListener") + + # expect http/1 counter to return to zero + + self.assertTrue(retry(lambda: + self._get_conn_counters().get("http1") == 0), + "Expected no active HTTP connections") + + if __name__ == '__main__': unittest.main(main_module()) diff --git a/tests/system_tests_http2.py b/tests/system_tests_http2.py index ad9bf84bf..bc7de3d8c 100644 --- a/tests/system_tests_http2.py +++ b/tests/system_tests_http2.py @@ -23,6 +23,7 @@ import unittest from subprocess import PIPE from time import sleep +from typing import Mapping import system_test from http1_tests import wait_http_listeners_up, HttpAdaptorListenerConnectTestBase, wait_tcp_listeners_up @@ -30,8 +31,8 @@ from system_test import curl_available, nginx_available, TIMEOUT, Http2Server from system_test import get_digest, TCP_LISTENER_TYPE, TCP_CONNECTOR_TYPE from system_test import HTTP_LISTENER_TYPE, HTTP_CONNECTOR_TYPE -from system_test import CONNECTION_TYPE, HTTP_REQ_INFO_TYPE - +from system_test import CONNECTION_TYPE, HTTP_REQ_INFO_TYPE, ROUTER_TYPE +from system_test import retry, retry_exception h2hyper_installed = True try: @@ -1197,3 +1198,100 @@ def test_05_listener_edge_edge(self): Test tcpListener socket lifecycle edge to edge """ self._test_listener_socket_lifecycle(self.EdgeA, self.EdgeB, "test_05_listener_edge_edge") + + +class Http2AdaptorConnCounter(TestCase): + """ + Validate the HTTP/2 service connection counter + """ + @classmethod + def setUpClass(cls): + super(Http2AdaptorConnCounter, cls).setUpClass() + + config = [ + ('router', {'mode': 'interior', + 'id': 'HTTP2ConnCounter'}), + ('listener', {'role': 'normal', + 'port': cls.tester.get_port()}), + ('address', {'prefix': 'closest', 'distribution': 'closest'}), + ('address', {'prefix': 'multicast', 'distribution': 'multicast'}), + ] + config = Qdrouterd.Config(config) + cls.router = cls.tester.qdrouterd('HTTP1ConnCounter', config) + + def _get_conn_counters(self) -> Mapping[str, int]: + attributes = ["connectionCounters"] + rc = self.router.management.query(type=ROUTER_TYPE, + attribute_names=attributes) + self.assertIsNotNone(rc, "unexpected query failure") + self.assertEqual(1, len(rc.get_dicts()), "expected one attribute!") + counters = rc.get_dicts()[0].get("connectionCounters") + self.assertIsNotNone(counters, "expected a counter map to be returned") + return counters + + def test_01_check_counter(self): + """ Create and destroy HTTP/2 network connections, verify the connection + counter is correct. + """ + mgmt = self.router.management + + # verify the counter starts at zero + + counters = self._get_conn_counters() + for proto in ["tcp", "http1", "http2"]: + counter = counters.get(proto) + self.assertIsNotNone(counter, f"Missing expected protocol counter {proto}!") + self.assertEqual(0, counter, "counters must be zero on startup") + + # Bring up a server and client, check the counter + + connector_port = self.get_port() + listener_port = self.get_port() + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as listener: + listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + listener.bind(("", connector_port)) + listener.setblocking(True) + listener.listen(10) + + mgmt.create(type=HTTP_LISTENER_TYPE, + name="ClientListener", + attributes={'address': 'closest/http2Service', + 'port': listener_port, + 'protocolVersion': 'HTTP2'}) + mgmt.create(type=HTTP_CONNECTOR_TYPE, + name="ServerConnector", + attributes={'address': 'closest/http2Service', + 'port': connector_port, + 'protocolVersion': 'HTTP2'}) + wait_http_listeners_up(self.router.addresses[0], + l_filter={'name': 'ClientListener'}) + + self.assertTrue(retry(lambda: + self._get_conn_counters().get("http2") == 1), + "Expected 1 HTTP/2 dispatch connection") + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client: + client.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + retry_exception(lambda cs=client, lp=listener_port: + cs.connect(("localhost", lp)), + delay=0.25, + exception=ConnectionRefusedError) + server, _ = listener.accept() + try: + # Note: not sure why only two not three, but it checks out + # against skstat -c: + self.assertTrue(retry(lambda: + self._get_conn_counters().get("http2") == 2), + "Expected active conn count failed!") + finally: + server.close() + + mgmt.delete(type=HTTP_CONNECTOR_TYPE, name="ServerConnector") + mgmt.delete(type=HTTP_LISTENER_TYPE, name="ClientListener") + + # expect counter to return to zero + + self.assertTrue(retry(lambda: + self._get_conn_counters().get("http2") == 0), + "Expected no active HTTP/2 connections") diff --git a/tests/system_tests_skstat.py b/tests/system_tests_skstat.py index 55ffcead3..c01770a65 100644 --- a/tests/system_tests_skstat.py +++ b/tests/system_tests_skstat.py @@ -93,12 +93,13 @@ def test_general_csv(self): out = self.run_skstat(['--general', '--csv'], regex=r'(?s)Router Statistics.*Mode","Standalone') - self.assertIn('"Connections","1"', out) + self.assertIn('"Total Connections","1"', out) self.assertIn('"Worker Threads","1"', out) self.assertIn('"Nodes","0"', out) self.assertIn('"Auto Links","0"', out) self.assertIn('"Router Id","QDR.A"', out) self.assertIn('"Mode","standalone"', out) + self.assertIn('"AMQP Service Connections","1"', out) self.assertEqual(out.count("QDR.A"), 2) def test_connections(self): @@ -693,13 +694,14 @@ def test_links_all_routers_csv(self): def _test_all_entities(self, command): out = self.run_skstat(command) - self.assertTrue(out.count('UTC') == 1) - self.assertTrue(out.count('Router Links') == 1) - self.assertTrue(out.count('Router Addresses') == 1) - self.assertTrue(out.count('Connections') == 6) - self.assertTrue(out.count('AutoLinks') == 2) - self.assertTrue(out.count('Router Statistics') == 1) - self.assertTrue(out.count('Memory Pools') == 1) + self.assertEqual(1, out.count('UTC')) + self.assertEqual(1, out.count('Router Links')) + self.assertEqual(1, out.count('Router Addresses')) + self.assertEqual(1, out.count('Total Connections')) + self.assertEqual(2, out.count('AutoLinks')) + self.assertEqual(1, out.count('Router Statistics')) + self.assertEqual(1, out.count('Memory Pools')) + self.assertEqual(1, out.count('AMQP Service Connections')) def test_all_entities(self): self._test_all_entities(['--all-entities']) @@ -710,13 +712,14 @@ def test_all_entities_csv(self): def _test_all_entities_all_routers(self, command): out = self.run_skstat(command) - self.assertTrue(out.count('UTC') == 1) - self.assertTrue(out.count('Router Links') == 2) - self.assertTrue(out.count('Router Addresses') == 2) - self.assertTrue(out.count('Connections') == 12) - self.assertTrue(out.count('AutoLinks') == 4) - self.assertTrue(out.count('Router Statistics') == 2) - self.assertTrue(out.count('Memory Pools') == 2) + self.assertEqual(1, out.count('UTC')) + self.assertEqual(2, out.count('Router Links')) + self.assertEqual(2, out.count('Router Addresses')) + self.assertEqual(2, out.count('Total Connections')) + self.assertEqual(4, out.count('AutoLinks')) + self.assertEqual(2, out.count('Router Statistics')) + self.assertEqual(2, out.count('Memory Pools')) + self.assertEqual(2, out.count('AMQP Service Connections')) def test_all_entities_all_routers(self): self._test_all_entities_all_routers(['--all-entities', '--all-routers']) diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py index 7e416baf5..dbfd4e23d 100644 --- a/tests/system_tests_tcp_adaptor.py +++ b/tests/system_tests_tcp_adaptor.py @@ -26,7 +26,7 @@ import traceback from subprocess import PIPE from subprocess import STDOUT -from typing import List, Optional +from typing import List, Optional, Mapping from proton import Message from proton.handlers import MessagingHandler @@ -45,8 +45,9 @@ from system_test import unittest from system_test import retry from system_test import CONNECTION_TYPE, TCP_CONNECTOR_TYPE, TCP_LISTENER_TYPE -from system_test import ROUTER_LINK_TYPE +from system_test import ROUTER_LINK_TYPE, ROUTER_TYPE from system_test import retry_assertion +from system_test import retry_exception from system_tests_ssl import RouterTestSslBase from http1_tests import wait_tcp_listeners_up @@ -87,7 +88,6 @@ class TCP_echo_server: CLIENT_PRIVATE_KEY_PASSWORD = 'client-password' CA_CERT = RouterTestSslBase.ssl_file('ca-certificate.pem') - # This code takes a wild guess how long an echo server must stall receiving # input data before it fills the adaptor's flow control window in the host # router on all the various CI systems out there. @@ -2283,5 +2283,119 @@ def run(self): Container(self).run() +class TcpAdaptorConnCounter(TestCase): + """ + Validate the TCP service connection counter + """ + @classmethod + def setUpClass(cls): + super(TcpAdaptorConnCounter, cls).setUpClass() + + config = [ + ('router', {'mode': 'interior', + 'id': 'TCPConnCounter'}), + ('listener', {'role': 'normal', + 'port': cls.tester.get_port()}), + ('address', {'prefix': 'closest', 'distribution': 'closest'}), + ('address', {'prefix': 'multicast', 'distribution': 'multicast'}), + ] + config = Qdrouterd.Config(config) + cls.router = cls.tester.qdrouterd('TCPConnCounter', config) + + def _get_conn_counters(self) -> Mapping[str, int]: + attributes = ["connectionCounters"] + rc = self.router.management.query(type=ROUTER_TYPE, + attribute_names=attributes) + self.assertIsNotNone(rc, "unexpected query failure") + self.assertEqual(1, len(rc.get_dicts()), "expected one attribute!") + counters = rc.get_dicts()[0].get("connectionCounters") + self.assertIsNotNone(counters, "expected a counter map to be returned") + return counters + + def _run_test(self, encaps, idle_ct, active_ct): + # idle_ct: expected connection count when tcp configured, but prior to + # connections active + # activ_ct: expected connection count when 1 client and 1 server + # connected + mgmt = self.router.management + + # verify the counters start at zero (not including amqp) + + counters = self._get_conn_counters() + for proto in ["tcp", "http1", "http2"]: + counter = counters.get(proto) + self.assertIsNotNone(counter, f"Missing expected protocol counter {proto}!") + self.assertEqual(0, counter, "counters must be zero on startup") + + # Bring up a server and client, check the counter + + connector_port = self.get_port() + listener_port = self.get_port() + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as listener: + listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + listener.bind(("", connector_port)) + listener.setblocking(True) + listener.listen(10) + + mgmt.create(type=TCP_LISTENER_TYPE, + name="ClientListener", + attributes={'address': 'closest/tcpService', + 'port': listener_port, + 'encapsulation': encaps}) + mgmt.create(type=TCP_CONNECTOR_TYPE, + name="ServerConnector", + attributes={'address': 'closest/tcpService', + 'host': "localhost", + 'port': connector_port, + 'encapsulation': encaps}) + + wait_tcp_listeners_up(self.router.addresses[0], + l_filter={'name': 'ClientListener'}) + + # expect that simply configuring the tcp listener/connector will + # instantiate the "dispatcher" connection: + + errmsg = "Expected idle count failed!" + errmsg += "\nIf you fixed the phantom tcp-lite connection counter" + errmsg += " please update this test with the new counter values!" + self.assertTrue(retry(lambda: + self._get_conn_counters().get("tcp") == idle_ct), + errmsg) + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client: + client.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + retry_exception(lambda cs=client, lp=listener_port: + cs.connect(("localhost", lp)), + delay=0.25, + exception=ConnectionRefusedError) + server, _ = listener.accept() + try: + self.assertTrue(retry(lambda: + self._get_conn_counters().get("tcp") == + active_ct), + f"Expected {active_ct} got {self._get_conn_counters()}!") + finally: + server.shutdown(socket.SHUT_RDWR) + server.close() + client.shutdown(socket.SHUT_RDWR) # context manager calls close + + mgmt.delete(type=TCP_CONNECTOR_TYPE, name="ServerConnector") + mgmt.delete(type=TCP_LISTENER_TYPE, name="ClientListener") + + # expect tcp counter to return to zero once config is cleaned up + + self.assertTrue(retry(lambda: + self._get_conn_counters().get("tcp") == 0), + f"Expected 0 tcp conns, got {self._get_conn_counters()}") + + def test_01_check_counter(self): + """ Create and destroy TCP network connections, verify the connection + counter is correct. + """ + for encaps, idle_ct, active_ct in [('legacy', 1, 3), ('lite', 2, 4)]: + self._run_test(encaps, idle_ct, active_ct) + + if __name__ == '__main__': unittest.main(main_module()) diff --git a/tools/skstat b/tools/skstat index 45eb5e115..1e02d9ff5 100755 --- a/tools/skstat +++ b/tools/skstat @@ -340,7 +340,7 @@ class BusManager: rows.append(('Links', PlainNum(router.linkCount))) rows.append(('Nodes', PlainNum(router.nodeCount))) rows.append(('Addresses', PlainNum(router.addrCount))) - rows.append(('Connections', PlainNum(router.connectionCount))) + rows.append(('Total Connections', PlainNum(router.connectionCount))) # Overall delivery related counts. # These overall statistics were introduced in 1.1 version. @@ -370,6 +370,13 @@ class BusManager: except: pass + # per-protocol user connection counts + try: + for proto, count in router.connectionCounters.items(): + rows.append((f"{proto.upper()} Service Connections", PlainNum(count))) + except: + pass + title = "Router Statistics" dispRows = rows disp.formattedTable(title, heads, dispRows)