Skip to content

Commit

Permalink
Fixes skupperproject#1207: add counters for total active service conn…
Browse files Browse the repository at this point in the history
…ections
  • Loading branch information
kgiusti committed Sep 25, 2023
1 parent 16f321d commit 711084f
Show file tree
Hide file tree
Showing 26 changed files with 611 additions and 113 deletions.
51 changes: 51 additions & 0 deletions include/qpid/dispatch/connection_counters.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#ifndef __connection_counters_h__
#define __connection_counters_h__ 1
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/**@file
* System-wide counters for tracking open connections per protocol type.
*/

#include "qpid/dispatch/protocols.h"
#include "qpid/dispatch/atomic.h"

extern atomic_uint_fast64_t qd_connection_counters[QD_PROTOCOL_TOTAL];

static inline void qd_connection_counter_inc(qd_protocol_t proto)
{
assert(proto < QD_PROTOCOL_TOTAL);
atomic_fetch_add_explicit(&qd_connection_counters[proto], 1, memory_order_relaxed);
}

static inline void qd_connection_counter_dec(qd_protocol_t proto)
{
assert(proto < QD_PROTOCOL_TOTAL);
uint64_t old = atomic_fetch_sub_explicit(&qd_connection_counters[proto], 1, memory_order_relaxed);
(void) old;
assert(old != 0); // underflow!
}

static inline uint64_t qd_connection_count(qd_protocol_t proto)
{
assert(proto < QD_PROTOCOL_TOTAL);
return (uint64_t) atomic_load_explicit(&qd_connection_counters[proto], memory_order_relaxed);
}

#endif
4 changes: 3 additions & 1 deletion include/qpid/dispatch/protocol_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "qpid/dispatch/policy_spec.h"
#include "qpid/dispatch/router_core.h"
#include "qpid/dispatch/io_module.h"
#include "qpid/dispatch/protocols.h"

typedef struct qdr_protocol_adaptor_t qdr_protocol_adaptor_t;
typedef struct qdr_connection_t qdr_connection_t;
Expand Down Expand Up @@ -872,7 +873,8 @@ qdr_connection_info_t *qdr_connection_info(bool is_encrypted,
bool ssl,
const char *version,
bool streaming_links,
bool connection_trunking);
bool connection_trunking,
qd_protocol_t protocol);

void qdr_connection_info_set_group_correlator(qdr_connection_info_t *info, const char *correlator);

Expand Down
37 changes: 37 additions & 0 deletions include/qpid/dispatch/protocols.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#ifndef __protocols_h__
#define __protocols_h__ 1
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "qpid/dispatch/enum.h"

// Network protocols supported by the router
//
typedef enum {
QD_PROTOCOL_TCP,
QD_PROTOCOL_AMQP,
QD_PROTOCOL_HTTP1,
QD_PROTOCOL_HTTP2,
QD_PROTOCOL_TOTAL // must be last
} qd_protocol_t;

// Defines qd_protocol_name(qd_protocol_t)
//
ENUM_DECLARE(qd_protocol);
#endif
4 changes: 4 additions & 0 deletions python/skupper_router/management/skrouter.json
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,10 @@
"graph": true,
"description": "The current amount of memory in use by the router process in bytes. This includes memory provisioned for stack, data, and code (VmSize). This value is set to Null if the platform does not provide access to the process memory size."
},
"connectionCounters": {
"type": "map",
"description": "A map keyed by network protocol name with a value of the count of active connections using that protocol at this router node."
},
"dataConnectionCount": {
"description": "The number of parallel data connections to carry streaming data between routers. Applies only to interior routers",
"type": "string",
Expand Down
2 changes: 2 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ set(qpid_dispatch_SOURCES
trace_mask.c
python_utils.c
qd_asan_interface.c
protocols.c
connection_counters.c
)

set(qpid_dispatch_INCLUDES
Expand Down
3 changes: 2 additions & 1 deletion src/adaptors/http1/http1_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,8 @@ static void _setup_client_connection(qdr_http1_connection_t *hconn)
false, //bool ssl,
"", // peer router version,
false, // streaming links
false); // connection trunking
false, // connection trunking
QD_PROTOCOL_HTTP1);

hconn->qdr_conn = qdr_connection_opened(qdr_http1_adaptor->core,
qdr_http1_adaptor->adaptor,
Expand Down
3 changes: 2 additions & 1 deletion src/adaptors/http1/http1_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ static qdr_http1_connection_t *_create_server_connection(qd_http_connector_t *co
false, //bool ssl,
"", // peer router version,
false, // streaming links
false); // connection trunking
false, // connection trunking
QD_PROTOCOL_HTTP1);

hconn->qdr_conn = qdr_connection_opened(qdr_http1_adaptor->core,
qdr_http1_adaptor->adaptor,
Expand Down
6 changes: 4 additions & 2 deletions src/adaptors/http2/http2_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -2774,7 +2774,8 @@ qdr_http2_connection_t *qdr_http_connection_ingress_accept(qdr_http2_connection_
false, //bool ssl,
"", // peer router version,
false, // streaming links
false); // connection trunking
false, // connection trunking
QD_PROTOCOL_HTTP2);

qdr_connection_t *conn = qdr_connection_opened(http2_adaptor->core,
http2_adaptor->adaptor,
Expand Down Expand Up @@ -3102,7 +3103,8 @@ qdr_http2_connection_t *qdr_http_connection_egress(qd_http_connector_t *connecto
false, //bool ssl,
"", // peer router version,
false, // streaming links
false); // connection trunking
false, // connection trunking
QD_PROTOCOL_HTTP2);

qdr_connection_t *conn = qdr_connection_opened(http2_adaptor->core,
http2_adaptor->adaptor,
Expand Down
3 changes: 2 additions & 1 deletion src/adaptors/reference_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,8 @@ static void on_startup(void *context)
false, //bool ssl,
"", // peer router version,
false, // streaming links
false); // connection trunking
false, // connection trunking
QD_PROTOCOL_TCP);

adaptor->conn = qdr_connection_opened(adaptor->core, // core
adaptor->adaptor, // protocol_adaptor
Expand Down
6 changes: 4 additions & 2 deletions src/adaptors/tcp/tcp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -908,7 +908,8 @@ static void qdr_tcp_connection_ingress_accept(qdr_tcp_connection_t* tc)
false, // ssl,
"", // peer router version,
false, // streaming links
false); // connection trunking
false, // connection trunking
QD_PROTOCOL_TCP);
pn_data_free(tcp_conn_properties);

qdr_connection_t *conn = qdr_connection_opened(tcp_adaptor->core,
Expand Down Expand Up @@ -1337,7 +1338,8 @@ static void qdr_tcp_create_server_side_connection(qdr_tcp_connection_t* tc)
false, // bool ssl,
"", // peer router version,
false, // streaming links
false); // connection trunking
false, // connection trunking
QD_PROTOCOL_TCP);
pn_data_free(tcp_conn_properties);

qdr_connection_t *conn = qdr_connection_opened(tcp_adaptor->core,
Expand Down
3 changes: 2 additions & 1 deletion src/adaptors/tcp_lite/tcp_lite.c
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ static qdr_connection_t *TL_open_core_connection(uint64_t conn_id, bool incoming
false, // ssl,
"", // peer router version,
true, // streaming links
false); // connection trunking
false, // connection trunking
QD_PROTOCOL_TCP);
pn_data_free(properties);

conn = qdr_connection_opened(tcplite_context->core,
Expand Down
23 changes: 23 additions & 0 deletions src/connection_counters.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "qpid/dispatch/connection_counters.h"

atomic_uint_fast64_t qd_connection_counters[QD_PROTOCOL_TOTAL];

32 changes: 32 additions & 0 deletions src/protocols.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "qpid/dispatch/protocols.h"

#include <stddef.h>

// names must match the order of the enum entries in qd_protocol_t
//
static const char *_names[QD_PROTOCOL_TOTAL] = {
"tcp", "amqp", "http/1", "http/2"
};

// defines function qd_protocol_name(qd_protocol_t)
//
ENUM_DEFINE(qd_protocol, _names);
15 changes: 14 additions & 1 deletion src/router_core/agent_router.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
*/

#include "agent_router.h"

#include "config.h"
#include "qpid/dispatch/protocols.h"
#include "qpid/dispatch/connection_counters.h"

#include <inttypes.h>

Expand Down Expand Up @@ -56,6 +57,7 @@
#define QDR_ROUTER_UPTIME_SECONDS 29
#define QDR_ROUTER_MEMORY_USAGE 30
#define QDR_ROUTER_WORKER_THREADS 31
#define QDR_ROUTER_CONNECTION_COUNTERS 32

const char *qdr_router_columns[] =
{"name",
Expand Down Expand Up @@ -90,6 +92,7 @@ const char *qdr_router_columns[] =
"uptimeSeconds",
"memoryUsage",
"workerThreads",
"connectionCounters",
0};


Expand Down Expand Up @@ -247,6 +250,16 @@ static void qdr_agent_write_column_CT(qd_composed_field_t *body, int col, qdr_co
qd_compose_insert_null(body);
} break;

case QDR_ROUTER_CONNECTION_COUNTERS: {
qd_compose_start_map(body);
for (qd_protocol_t proto = 0; proto < QD_PROTOCOL_TOTAL; ++proto) {
qd_compose_insert_string(body, qd_protocol_name(proto));
qd_compose_insert_ulong(body, qd_connection_count(proto));
}
qd_compose_end_map(body);
break;
}

default:
qd_compose_insert_null(body);
break;
Expand Down
2 changes: 1 addition & 1 deletion src/router_core/agent_router.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

#include "router_core_private.h"

#define QDR_ROUTER_COLUMN_COUNT 32
#define QDR_ROUTER_COLUMN_COUNT 33

extern const char *qdr_router_columns[QDR_ROUTER_COLUMN_COUNT + 1];

Expand Down
13 changes: 12 additions & 1 deletion src/router_core/connections.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "qpid/dispatch/discriminator.h"
#include "qpid/dispatch/router_core.h"
#include "qpid/dispatch/static_assert.h"
#include "qpid/dispatch/connection_counters.h"

#include <inttypes.h>
#include <stdio.h>
Expand Down Expand Up @@ -123,6 +124,10 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t *core,
context_binder(conn, bind_token);
}

// count "user" connections (not infrastructure)
if (role == QDR_ROLE_NORMAL)
qd_connection_counter_inc(connection_info->protocol);

set_safe_ptr_qdr_connection_t(conn, &action->args.connection.conn);
action->args.connection.connection_label = qdr_field(label);
action->args.connection.container_id = qdr_field(remote_container_id);
Expand Down Expand Up @@ -151,6 +156,9 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t *core,

void qdr_connection_closed(qdr_connection_t *conn)
{
if (conn->role == QDR_ROLE_NORMAL)
qd_connection_counter_dec(conn->connection_info->protocol);

qdr_action_t *action = qdr_action(qdr_connection_closed_CT, "connection_closed");
set_safe_ptr_qdr_connection_t(conn, &action->args.connection.conn);
qdr_action_enqueue(conn->core, action);
Expand Down Expand Up @@ -184,13 +192,15 @@ qdr_connection_info_t *qdr_connection_info(bool is_encrypted,
bool ssl,
const char *version,
bool streaming_links,
bool connection_trunking)
bool connection_trunking,
qd_protocol_t protocol)
{
qdr_connection_info_t *connection_info = new_qdr_connection_info_t();
ZERO(connection_info);
connection_info->is_encrypted = is_encrypted;
connection_info->is_authenticated = is_authenticated;
connection_info->opened = opened;
connection_info->protocol = protocol;

if (container)
connection_info->container = strdup(container);
Expand Down Expand Up @@ -1824,6 +1834,7 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo
qd_log(LOG_ROUTER_CORE, QD_LOG_INFO, "[C%" PRIu64 "] Connection Closed", conn->identity);

DEQ_REMOVE(core->open_connections, conn);

qdr_connection_free(conn);
}

Expand Down
1 change: 1 addition & 0 deletions src/router_core/router_core_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,7 @@ struct qdr_connection_info_t {
bool connection_trunking; // peer supports connection trunking
qd_direction_t dir;
qdr_connection_role_t role;
qd_protocol_t protocol;
pn_data_t *connection_properties;
bool ssl;
int ssl_ssf; //ssl strength factor
Expand Down
4 changes: 3 additions & 1 deletion src/router_node.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <qpid/dispatch/protocol_adaptor.h>
#include <qpid/dispatch/proton_utils.h>
#include <qpid/dispatch/cutthrough_utils.h>
#include <qpid/dispatch/protocols.h>

#include <proton/sasl.h>

Expand Down Expand Up @@ -1535,7 +1536,8 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
is_ssl,
rversion,
streaming_links,
connection_trunking);
connection_trunking,
QD_PROTOCOL_AMQP);

qdr_connection_info_set_group_correlator(connection_info, conn->group_correlator);

Expand Down
Loading

0 comments on commit 711084f

Please sign in to comment.