Skip to content

Commit

Permalink
Fixes #1207: add counters for total active service connections (#1208)
Browse files Browse the repository at this point in the history
  • Loading branch information
kgiusti authored Oct 6, 2023
1 parent 3572e83 commit 11562ee
Show file tree
Hide file tree
Showing 24 changed files with 610 additions and 30 deletions.
43 changes: 43 additions & 0 deletions include/qpid/dispatch/connection_counters.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#ifndef __connection_counters_h__
#define __connection_counters_h__ 1
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/**@file
* System-wide counters for tracking open connections per protocol type.
*/

#include "qpid/dispatch/protocols.h"

#include <stdint.h>

// Increment the connection counter for the 'proto' protocol
//
void qd_connection_counter_inc(qd_protocol_t proto);

// Decrement the connection counter for the 'proto' protocol
//
void qd_connection_counter_dec(qd_protocol_t proto);

// Fetch the current value of the connection counter for 'proto'
//
uint64_t qd_connection_count(qd_protocol_t proto);


#endif
37 changes: 37 additions & 0 deletions include/qpid/dispatch/protocols.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#ifndef __protocols_h__
#define __protocols_h__ 1
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "qpid/dispatch/enum.h"

// Network protocols supported by the router
//
typedef enum {
QD_PROTOCOL_TCP,
QD_PROTOCOL_AMQP,
QD_PROTOCOL_HTTP1,
QD_PROTOCOL_HTTP2,
QD_PROTOCOL_TOTAL // must be last
} qd_protocol_t;

// Defines qd_protocol_name(qd_protocol_t)
//
ENUM_DECLARE(qd_protocol);
#endif
4 changes: 4 additions & 0 deletions python/skupper_router/management/skrouter.json
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,10 @@
"graph": true,
"description": "The current amount of system memory in use by the router process in bytes (RSS: Resident Set Size). This is the portion of the process memory space that currently resides in RAM. This value is set to Null if the platform does not provide access to the process resident memory size."
},
"connectionCounters": {
"type": "map",
"description": "A map keyed by network protocol name with a value of the count of active service connections using that protocol at this router node. Currently defined key values are: amqp, http1, http2, and tcp"
},
"dataConnectionCount": {
"description": "The number of parallel data connections to carry streaming data between routers. Applies only to interior routers",
"type": "string",
Expand Down
2 changes: 2 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ set(qpid_dispatch_SOURCES
trace_mask.c
python_utils.c
qd_asan_interface.c
protocols.c
connection_counters.c
)

set(qpid_dispatch_INCLUDES
Expand Down
3 changes: 3 additions & 0 deletions src/adaptors/http1/http1_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "http1_private.h"

#include "qpid/dispatch/protocol_adaptor.h"
#include "qpid/dispatch/connection_counters.h"

#include <proton/listener.h>
#include <proton/netaddr.h>
Expand Down Expand Up @@ -375,6 +376,7 @@ static void _setup_client_connection(qdr_http1_connection_t *hconn)
0, // bind context
0); // bind token
qdr_connection_set_context(hconn->qdr_conn, hconn);
qd_connection_counter_inc(QD_PROTOCOL_HTTP1);
hconn->oper_status = QD_CONN_OPER_UP;

qd_log(LOG_HTTP_ADAPTOR, QD_LOG_DEBUG, "[C%" PRIu64 "] HTTP connection to client created",
Expand Down Expand Up @@ -744,6 +746,7 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
if (hconn->qdr_conn) {
qdr_connection_set_context(hconn->qdr_conn, 0);
qdr_connection_closed(hconn->qdr_conn);
qd_connection_counter_dec(QD_PROTOCOL_HTTP1);
hconn->qdr_conn = 0;
}

Expand Down
3 changes: 3 additions & 0 deletions src/adaptors/http1/http1_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "adaptors/adaptor_tls.h"
#include "http1_private.h"
#include "qpid/dispatch/connection_counters.h"

#include <proton/proactor.h>

Expand Down Expand Up @@ -224,6 +225,7 @@ static qdr_http1_connection_t *_create_server_connection(qd_http_connector_t *co
info,
0, // bind context
0); // bind token
qd_connection_counter_inc(QD_PROTOCOL_HTTP1);

// wait for the raw connection to come up before creating the in and out links

Expand Down Expand Up @@ -2020,6 +2022,7 @@ void qdr_http1_server_core_conn_close(qdr_http1_adaptor_t *adaptor,
hconn->oper_status = QD_CONN_OPER_DOWN;
_teardown_server_links(hconn);
qdr_connection_closed(qdr_conn);
qd_connection_counter_dec(QD_PROTOCOL_HTTP1);
qdr_http1_close_connection(hconn, 0);

// it is expected that this callback is the final callback before returning
Expand Down
5 changes: 5 additions & 0 deletions src/adaptors/http2/http2_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "qpid/dispatch/connection_manager.h"
#include "qpid/dispatch/dispatch.h"
#include "qpid/dispatch/protocol_adaptor.h"
#include "qpid/dispatch/connection_counters.h"

#include <proton/condition.h>
#include <proton/listener.h>
Expand Down Expand Up @@ -2795,6 +2796,7 @@ qdr_http2_connection_t *qdr_http_connection_ingress_accept(qdr_http2_connection_
ingress_http_conn->qdr_conn = conn;
qdr_connection_set_context(conn, ingress_http_conn);
ingress_http_conn->connection_established = true;
qd_connection_counter_inc(QD_PROTOCOL_HTTP2);
qd_log(LOG_HTTP_ADAPTOR, QD_LOG_DEBUG,
"[C%" PRIu64 "] qdr_http_connection_ingress_accept, qdr_connection_t object created ",
ingress_http_conn->conn_id);
Expand Down Expand Up @@ -2880,6 +2882,7 @@ static void close_connections(qdr_http2_connection_t* conn)
qdr_connection_set_context(conn->qdr_conn, 0);
if (conn->qdr_conn) {
qdr_connection_closed(conn->qdr_conn);
qd_connection_counter_dec(QD_PROTOCOL_HTTP2);
conn->qdr_conn = 0;
}
qdr_action_t *action = qdr_action(qdr_del_http2_connection_CT, "delete_http2_connection");
Expand Down Expand Up @@ -2970,6 +2973,7 @@ static void egress_conn_timer_handler(void *context)
//
sys_mutex_unlock(qd_server_get_activation_lock(http2_adaptor->core->qd->server));
qdr_connection_closed(conn->qdr_conn);
qd_connection_counter_dec(QD_PROTOCOL_HTTP2);
free_qdr_http2_connection(conn, false);
return;
}
Expand Down Expand Up @@ -3123,6 +3127,7 @@ qdr_http2_connection_t *qdr_http_connection_egress(qd_http_connector_t *connecto
connector->ctx = conn;

qdr_connection_set_context(conn, egress_http_conn);
qd_connection_counter_inc(QD_PROTOCOL_HTTP2);
create_dummy_link_on_egress_conn(egress_http_conn);
return egress_http_conn;
}
Expand Down
3 changes: 3 additions & 0 deletions src/adaptors/reference_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "qpid/dispatch/message.h"
#include "qpid/dispatch/protocol_adaptor.h"
#include "qpid/dispatch/timer.h"
#include "qpid/dispatch/connection_counters.h"

#include <inttypes.h>
#include <stdio.h>
Expand Down Expand Up @@ -394,6 +395,7 @@ static void qdr_ref_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t

static void qdr_ref_conn_close(void *context, qdr_connection_t *conn, qdr_error_t *error)
{
qd_connection_counter_dec(QD_PROTOCOL_TCP);
}


Expand Down Expand Up @@ -438,6 +440,7 @@ static void on_startup(void *context)
info, // connection_info
0, // context_binder
0); // bind_token
qd_connection_counter_inc(QD_PROTOCOL_TCP);

uint64_t link_id;

Expand Down
5 changes: 5 additions & 0 deletions src/adaptors/tcp/tcp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "qpid/dispatch/alloc_pool.h"
#include "qpid/dispatch/amqp.h"
#include "qpid/dispatch/ctools.h"
#include "qpid/dispatch/connection_counters.h"

#include <proton/codec.h>
#include <proton/condition.h>
Expand Down Expand Up @@ -250,6 +251,7 @@ static void on_activate(void *context)
detach_links(conn);
qdr_connection_set_context(conn->qdr_conn, 0);
qdr_connection_closed(conn->qdr_conn);
qd_connection_counter_dec(QD_PROTOCOL_TCP);
conn->qdr_conn = 0;
free_qdr_tcp_connection(conn);
}
Expand Down Expand Up @@ -667,6 +669,7 @@ static void handle_disconnected(qdr_tcp_connection_t* conn)
if (conn->qdr_conn) {
qdr_connection_set_context(conn->qdr_conn, 0);
qdr_connection_closed(conn->qdr_conn);
qd_connection_counter_dec(QD_PROTOCOL_TCP);
conn->qdr_conn = 0;
}

Expand Down Expand Up @@ -928,6 +931,7 @@ static void qdr_tcp_connection_ingress_accept(qdr_tcp_connection_t* tc)
0); // bind_token
tc->qdr_conn = conn;
qdr_connection_set_context(conn, tc);
qd_connection_counter_inc(QD_PROTOCOL_TCP);

qdr_terminus_t *dynamic_source = qdr_terminus(0);
qdr_terminus_set_dynamic(dynamic_source);
Expand Down Expand Up @@ -1357,6 +1361,7 @@ static void qdr_tcp_create_server_side_connection(qdr_tcp_connection_t* tc)
0); // bind_token
tc->qdr_conn = conn;
qdr_connection_set_context(conn, tc);
qd_connection_counter_inc(QD_PROTOCOL_TCP);

qdr_terminus_t *source = qdr_terminus(0);
qdr_terminus_set_address(source, tc->config->adaptor_config->address);
Expand Down
6 changes: 5 additions & 1 deletion src/adaptors/tcp_lite/tcp_lite.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <qpid/dispatch/log.h>
#include <qpid/dispatch/cutthrough_utils.h>
#include <qpid/dispatch/platform.h>
#include <qpid/dispatch/connection_counters.h>
#include <proton/proactor.h>
#include <proton/raw_connection.h>
#include <proton/listener.h>
Expand Down Expand Up @@ -194,7 +195,7 @@ static qdr_connection_t *TL_open_core_connection(uint64_t conn_id, bool incoming
info, // connection_info
0, // context_binder
0); // bind_token

qd_connection_counter_inc(QD_PROTOCOL_TCP);
return conn;
}

Expand Down Expand Up @@ -457,6 +458,7 @@ static void close_connection_XSIDE_IO(tcplite_connection_t *conn, bool no_delay)

if (!!conn->common.core_conn) {
qdr_connection_closed(conn->common.core_conn);
qd_connection_counter_dec(QD_PROTOCOL_TCP);
}

if (!!conn->common.vflow) {
Expand Down Expand Up @@ -1634,6 +1636,7 @@ void qd_dispatch_delete_tcp_listener_lite(qd_dispatch_t *qd, tcplite_listener_t
if (!tcplite_context->adaptor_finalizing) {
if (!!li->common.core_conn) {
qdr_connection_closed(li->common.core_conn);
qd_connection_counter_dec(QD_PROTOCOL_TCP);
}

if (!!li->adaptor_listener) {
Expand Down Expand Up @@ -1730,6 +1733,7 @@ void qd_dispatch_delete_tcp_connector_lite(qd_dispatch_t *qd, tcplite_connector_

if (!tcplite_context->adaptor_finalizing) {
qdr_connection_closed(cr->common.core_conn);
qd_connection_counter_dec(QD_PROTOCOL_TCP);
} else {
tcplite_connection_t *conn = DEQ_HEAD(cr->connections);
if (!!conn) {
Expand Down
45 changes: 45 additions & 0 deletions src/connection_counters.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "qpid/dispatch/connection_counters.h"
#include "qpid/dispatch/atomic.h"

atomic_uint_fast64_t qd_connection_counters[QD_PROTOCOL_TOTAL];

void qd_connection_counter_inc(qd_protocol_t proto)
{
assert(proto < QD_PROTOCOL_TOTAL);
atomic_fetch_add_explicit(&qd_connection_counters[proto], 1, memory_order_relaxed);
}

void qd_connection_counter_dec(qd_protocol_t proto)
{
assert(proto < QD_PROTOCOL_TOTAL);
uint64_t old = atomic_fetch_sub_explicit(&qd_connection_counters[proto], 1, memory_order_relaxed);
(void) old;
assert(old != 0); // underflow!
}

uint64_t qd_connection_count(qd_protocol_t proto)
{
assert(proto < QD_PROTOCOL_TOTAL);
return (uint64_t) atomic_load_explicit(&qd_connection_counters[proto], memory_order_relaxed);
}


Loading

0 comments on commit 11562ee

Please sign in to comment.