Skip to content

Commit

Permalink
Fixes skupperproject#1456 - Implemented the administrative ACCESS_POI…
Browse files Browse the repository at this point in the history
…NT and LINK

This also separates the closure-of-all-connections from the freeing of the server module so that connections can be closed at an earlier point of the shutdown sequence.

This also adds accepted-connections to the listener reference counts for a more orderly freeing of resources.
  • Loading branch information
ted-ross authored and kgiusti committed Jun 20, 2024
1 parent d823211 commit 0112c8d
Show file tree
Hide file tree
Showing 17 changed files with 185 additions and 133 deletions.
11 changes: 8 additions & 3 deletions include/qpid/dispatch/vanflow.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ typedef enum vflow_record_type {
VFLOW_RECORD_PROCESS_GROUP = 0x0c, // A grouping of PROCESS
VFLOW_RECORD_HOST = 0x0d, // Host (or Kubernetes Node) on which a process runs
VFLOW_RECORD_LOG = 0x0e, // A notable router log event such as an error or warning
VFLOW_RECORD_ACCESS_POINT = 0x0f, // An access point for inter-router connections
} vflow_record_type_t;

// clang-format off
Expand Down Expand Up @@ -121,13 +122,17 @@ typedef enum vflow_attribute {
VFLOW_ATTRIBUTE_LOG_TEXT = 49, // String
VFLOW_ATTRIBUTE_SOURCE_FILE = 50, // String
VFLOW_ATTRIBUTE_SOURCE_LINE = 51, // uint

VFLOW_ATTRIBUTE_LINK_COUNT = 52, // uint/counter
VFLOW_ATTRIBUTE_OPER_STATUS = 53, // String
VFLOW_ATTRIBUTE_ROLE = 54, // String
} vflow_attribute_t;
// clang-format on

#define VALID_REF_ATTRS 0x00006000000000e6
#define VALID_UINT_ATTRS 0x00099ffa07800119
#define VALID_COUNTER_ATTRS 0x0000035000800000
#define VALID_STRING_ATTRS 0x00060005787ffe00
#define VALID_UINT_ATTRS 0x00199ffa07800119
#define VALID_COUNTER_ATTRS 0x0010035000800000
#define VALID_STRING_ATTRS 0x00660005787ffe00
#define VALID_TRACE_ATTRS 0x0000000080000000

typedef enum vflow_log_severity {
Expand Down
41 changes: 23 additions & 18 deletions src/adaptors/amqp/amqp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "private.h"
#include "qd_connector.h"
#include "qd_connection.h"
#include "qd_listener.h"
#include "container.h"
#include "node_type.h"

Expand Down Expand Up @@ -2317,22 +2318,22 @@ static void qd_amqp_adaptor_init(qdr_core_t *core, void **adaptor_context)
amqp_adaptor.router = qd->router;
amqp_adaptor.container = qd_container(qd->router, &router_node);
amqp_adaptor.adaptor = qdr_protocol_adaptor(core,
"amqp",
(void*) amqp_adaptor.router,
CORE_connection_activate,
CORE_link_first_attach,
CORE_link_second_attach,
CORE_link_detach,
CORE_link_flow,
CORE_link_offer,
CORE_link_drained,
CORE_link_drain,
CORE_link_push,
CORE_link_deliver,
CORE_link_get_credit,
CORE_delivery_update,
CORE_close_connection,
CORE_conn_trace);
"amqp",
(void*) amqp_adaptor.router,
CORE_connection_activate,
CORE_link_first_attach,
CORE_link_second_attach,
CORE_link_detach,
CORE_link_flow,
CORE_link_offer,
CORE_link_drained,
CORE_link_drain,
CORE_link_push,
CORE_link_deliver,
CORE_link_get_credit,
CORE_delivery_update,
CORE_close_connection,
CORE_conn_trace);

*adaptor_context = (void *) &amqp_adaptor;
}
Expand Down Expand Up @@ -2366,8 +2367,12 @@ static void qd_amqp_adaptor_final(void *adaptor_context)
if (ctx->policy_settings)
qd_policy_settings_free(ctx->policy_settings);
if (ctx->connector) {
ctx->connector->qd_conn = 0;
qd_connector_decref(ctx->connector);
qd_connector_release_connection(ctx->connector);
ctx->connector = 0;
}
if (ctx->listener) {
qd_listener_release_connection(ctx->listener, ctx);
ctx->listener = 0;
}
sys_atomic_destroy(&ctx->wake_core);
sys_atomic_destroy(&ctx->wake_cutthrough_inbound);
Expand Down
24 changes: 22 additions & 2 deletions src/adaptors/amqp/connection_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ static void log_config(qd_server_config_t *c, const char *what, bool create)
QD_EXPORT qd_listener_t *qd_dispatch_configure_listener(qd_dispatch_t *qd, qd_entity_t *entity)
{
qd_connection_manager_t *cm = qd->connection_manager;
qd_listener_t *li = qd_server_listener(qd->server);
qd_listener_t *li = qd_listener(qd->server);
if (!li || qd_server_config_load(qd, &li->config, entity, true, 0) != QD_ERROR_NONE) {
qd_log(LOG_CONN_MGR, QD_LOG_ERROR, "Unable to create listener: %s", qd_error_message());
qd_listener_decref(li);
Expand All @@ -166,6 +166,17 @@ QD_EXPORT qd_listener_t *qd_dispatch_configure_listener(qd_dispatch_t *qd, qd_en
} else {
li->config.failover_list = 0;
}

//
// Set up the vanflow record for this listener (ACCESS_POINT).
// Do this only for inter-router links: not metrics/healthz/websockets listeners
//
if (!li->config.http) {
li->vflow_record = vflow_start_record(VFLOW_RECORD_ACCESS_POINT, 0);
vflow_set_string(li->vflow_record, VFLOW_ATTRIBUTE_ROLE, li->config.role);
vflow_set_uint64(li->vflow_record, VFLOW_ATTRIBUTE_LINK_COUNT, 0);
}

DEQ_ITEM_INIT(li);
DEQ_INSERT_TAIL(cm->listeners, li);
log_config(&li->config, "Listener", true);
Expand Down Expand Up @@ -414,8 +425,17 @@ QD_EXPORT qd_connector_t *qd_dispatch_configure_connector(qd_dispatch_t *qd, qd_
item->host_port = malloc(hplen);
snprintf(item->host_port, hplen, "%s:%s", item->host , item->port);

DEQ_INSERT_TAIL(ct->conn_info_list, item);
//
// Set up the vanflow record for this connector (LINK)
//
ct->vflow_record = vflow_start_record(VFLOW_RECORD_LINK, 0);
vflow_set_string(ct->vflow_record, VFLOW_ATTRIBUTE_ROLE, ct->config.role);
vflow_set_string(ct->vflow_record, VFLOW_ATTRIBUTE_OPER_STATUS, "down");
vflow_set_string(ct->vflow_record, VFLOW_ATTRIBUTE_PROTOCOL, item->scheme);
vflow_set_string(ct->vflow_record, VFLOW_ATTRIBUTE_DESTINATION_HOST, item->host);
vflow_set_string(ct->vflow_record, VFLOW_ATTRIBUTE_DESTINATION_PORT, item->port);

DEQ_INSERT_TAIL(ct->conn_info_list, item);
return ct;
}

Expand Down
44 changes: 20 additions & 24 deletions src/adaptors/amqp/qd_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "qpid/dispatch/proton_utils.h"
#include "qpid/dispatch/protocol_adaptor.h"
#include "qpid/dispatch/timer.h"
#include "qpid/dispatch/vanflow.h"

#include <proton/proactor.h>
#include <proton/sasl.h>
Expand Down Expand Up @@ -445,10 +446,8 @@ static void decorate_connection(qd_connection_t *ctx, const qd_server_config_t *
* Does not allocate any managed objects and therefore
* does not take ENTITY_CACHE lock.
*/
qd_connection_t *qd_server_connection_impl(qd_server_t *server, qd_server_config_t *config, qd_connection_t *ctx, qd_connector_t *connector)
void qd_connection_init(qd_connection_t *ctx, qd_server_t *server, qd_server_config_t *config, qd_connector_t *connector, qd_listener_t *listener)
{
assert(ctx);
ZERO(ctx);
ctx->pn_conn = pn_connection();
assert(ctx->pn_conn);
sys_mutex_init(&ctx->deferred_call_lock);
Expand All @@ -466,29 +465,23 @@ qd_connection_t *qd_server_connection_impl(qd_server_t *server, qd_server_config
DEQ_INIT(ctx->deferred_calls);
DEQ_INIT(ctx->free_link_session_list);

// note: setup connector or listener before decorating the connection since
// decoration involves accessing the connection's parent.

if (!!connector) {
ctx->connector = connector;
strncpy(ctx->group_correlator, connector->group_correlator, QD_DISCRIMINATOR_SIZE);
assert(!listener);
qd_connector_add_connection(connector, ctx);
} else if (!!listener) {
qd_listener_add_connection(listener, ctx);
}

decorate_connection(ctx, config);

sys_mutex_lock(&amqp_adaptor.lock);
DEQ_INSERT_TAIL(amqp_adaptor.conn_list, ctx);
sys_mutex_unlock(&amqp_adaptor.lock);

return ctx;
}

/* Construct a new qd_connection. Thread safe.
* Allocates a qd_connection_t object and therefore
* takes ENTITY_CACHE lock.
*/
qd_connection_t *qd_server_connection(qd_server_t *server, qd_server_config_t *config)
{
qd_connection_t *ctx = new_qd_connection_t();
if (!ctx) return NULL;
return qd_server_connection_impl(server, config, ctx, 0);
}

qd_connector_t* qd_connection_connector(const qd_connection_t *c) {
return c->connector;
Expand Down Expand Up @@ -553,18 +546,21 @@ void qd_connection_set_user(qd_connection_t *conn)

static void qd_connection_free(qd_connection_t *qd_conn)
{
// If this is a dispatch connector uncouple it from the
// connection and restart the re-connect timer if necessary
sys_mutex_lock(&amqp_adaptor.lock);
DEQ_REMOVE(amqp_adaptor.conn_list, qd_conn);
sys_mutex_unlock(&amqp_adaptor.lock);

// If this connection is from a connector uncouple it and restart the re-connect timer if necessary

if (qd_conn->connector) {
qd_connector_release_connection(qd_conn->connector, qd_conn);
qd_connector_decref(qd_conn->connector); // drop connection's reference
qd_connector_release_connection(qd_conn->connector);
qd_conn->connector = 0;
}

sys_mutex_lock(&amqp_adaptor.lock);
DEQ_REMOVE(amqp_adaptor.conn_list, qd_conn);
sys_mutex_unlock(&amqp_adaptor.lock);
if (qd_conn->listener) {
qd_listener_release_connection(qd_conn->listener, qd_conn);
qd_conn->listener = 0;
}

// If counted for policy enforcement, notify it has closed
if (qd_conn->policy_counted) {
Expand Down
5 changes: 3 additions & 2 deletions src/adaptors/amqp/qd_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ struct qd_connection_t {

ALLOC_DECLARE_SAFE(qd_connection_t);

qd_connection_t *qd_server_connection(qd_server_t *server, qd_server_config_t* config);
qd_connection_t *qd_server_connection_impl(qd_server_t *server, qd_server_config_t *config, qd_connection_t *ctx, qd_connector_t *connector);
// initialize a newly allocated qd_connection_t
void qd_connection_init(qd_connection_t *qd_conn, qd_server_t *server, qd_server_config_t *config,
qd_connector_t *connector, qd_listener_t *listener);

qd_connector_t* qd_connection_connector(const qd_connection_t *c);

Expand Down
58 changes: 33 additions & 25 deletions src/adaptors/amqp/qd_connector.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "qpid/dispatch/alloc_pool.h"
#include "qpid/dispatch/timer.h"
#include "qpid/dispatch/vanflow.h"

#include <proton/proactor.h>

Expand All @@ -46,22 +47,10 @@ static qd_failover_item_t *qd_connector_get_conn_info_lh(qd_connector_t *ct) TA_


/* Timer callback to try/retry connection open, connector->lock held */
static void try_open_lh(qd_connector_t *connector, qd_connection_t *connection) TA_REQ(connector->lock)
static void try_open_lh(qd_connector_t *connector, qd_connection_t *qd_conn) TA_REQ(connector->lock)
{
// connection until pn_proactor_connect is called below
qd_connection_t *qd_conn = qd_server_connection_impl(connector->server, &connector->config, connection, connector);
if (!qd_conn) { /* Try again later */
qd_log(LOG_SERVER, QD_LOG_CRITICAL, "Allocation failure connecting to %s",
connector->config.host_port);
connector->delay = 10000;
connector->state = CXTR_STATE_CONNECTING;
qd_timer_schedule(connector->timer, connector->delay);
return;
}
qd_connection_init(qd_conn, connector->server, &connector->config, connector, 0);

sys_atomic_inc(&connector->ref_count);

connector->qd_conn = qd_conn;
connector->state = CXTR_STATE_OPEN;
connector->delay = 5000;

Expand All @@ -88,7 +77,8 @@ static void try_open_lh(qd_connector_t *connector, qd_connection_t *connection)
qd_log(LOG_SERVER, QD_LOG_DEBUG, "[C%" PRIu64 "] Connecting to %s", qd_conn->connection_id, host_port);
/* Note: the transport is configured in the PN_CONNECTION_BOUND event */
pn_proactor_connect(qd_server_proactor(connector->server), qd_conn->pn_conn, host_port);
// at this point the qd_conn may now be scheduled on another thread
// Note well: at this point the qd_conn may now be scheduled on another thread
// Do not access it.
}


Expand All @@ -101,13 +91,7 @@ static void try_open_cb(void *context)
// Allocate connection before taking connector lock to avoid
// CONNECTOR - ENTITY_CACHE lock inversion deadlock window.
qd_connection_t *ctx = new_qd_connection_t();
if (!ctx) {
qd_log(LOG_SERVER, QD_LOG_CRITICAL, "Allocation failure connecting to %s", ct->config.host_port);
ct->delay = 10000;
ct->state = CXTR_STATE_CONNECTING;
qd_timer_schedule(ct->timer, ct->delay);
return;
}
ZERO(ctx);

sys_mutex_lock(&ct->lock);

Expand Down Expand Up @@ -185,6 +169,8 @@ void qd_connector_decref(qd_connector_t* connector)
assert(connector->state == CXTR_STATE_DELETED);
assert(connector->qd_conn == 0);

vflow_end_record(connector->vflow_record);
connector->vflow_record = 0;
qd_server_config_free(&connector->config);
qd_timer_free(connector->timer);
sys_mutex_free(&connector->lock);
Expand Down Expand Up @@ -291,16 +277,35 @@ void qd_connector_remote_opened(qd_connector_t *connector)
sys_mutex_unlock(&connector->lock);
}

/**
* Set the child connection of the connector
*/
void qd_connector_add_connection(qd_connector_t *connector, qd_connection_t *ctx)
{
assert(ctx->connector == 0);

sys_atomic_inc(&connector->ref_count);
ctx->connector = connector;
connector->qd_conn = ctx;

strncpy(ctx->group_correlator, connector->group_correlator, QD_DISCRIMINATOR_SIZE);
vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_OPER_STATUS, "up");
}


/**
* The connection associated with this connector is about to be freed,
* clean up all related state
*/
void qd_connector_release_connection(qd_connector_t *connector, qd_connection_t *qd_conn)
void qd_connector_release_connection(qd_connector_t *connector)
{
sys_mutex_lock(&connector->lock);
assert(connector->qd_conn == qd_conn);
connector->qd_conn = 0; // this connection to be freed

qd_connection_t *ctx = connector->qd_conn;
vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_OPER_STATUS, "down");
connector->qd_conn = 0;
ctx->connector = 0;

if (connector->state != CXTR_STATE_DELETED) {
// Increment the connection index by so that we can try connecting to the failover url (if any).
bool has_failover = qd_connector_has_failover_info(connector);
Expand All @@ -316,4 +321,7 @@ void qd_connector_release_connection(qd_connector_t *connector, qd_connection_t
qd_timer_schedule(connector->timer, delay);
}
sys_mutex_unlock(&connector->lock);

// drop reference held by connection
qd_connector_decref(connector);
}
11 changes: 10 additions & 1 deletion src/adaptors/amqp/qd_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
typedef struct qd_timer_t qd_timer_t;
typedef struct qd_server_t qd_server_t;
typedef struct qd_connection_t qd_connection_t;
typedef struct vflow_record_t vflow_record_t;

typedef enum {
CXTR_STATE_INIT = 0,
Expand All @@ -58,6 +59,7 @@ typedef struct qd_connector_t {
sys_mutex_t lock;
cxtr_state_t state;
qd_connection_t *qd_conn;
vflow_record_t *vflow_record;

/* This conn_list contains all the connection information needed to make a connection. It also includes failover connection information */
qd_failover_item_list_t conn_info_list;
Expand Down Expand Up @@ -94,5 +96,12 @@ bool qd_connector_has_failover_info(const qd_connector_t* ct);
const char *qd_connector_policy_vhost(const qd_connector_t* ct);
void qd_connector_handle_transport_error(qd_connector_t *connector, uint64_t connection_id, pn_condition_t *condition);
void qd_connector_remote_opened(qd_connector_t *connector);
void qd_connector_release_connection(qd_connector_t *connector, qd_connection_t *qd_conn);

// associate the new connection with the parent connector
void qd_connector_add_connection(qd_connector_t *connector, qd_connection_t *ctx);

// release the child connection
// NOTE WELL: this may free the connector if the connection is holding the last
// reference to it
void qd_connector_release_connection(qd_connector_t *connector);
#endif
Loading

0 comments on commit 0112c8d

Please sign in to comment.