diff --git a/include/qpid/dispatch/vanflow.h b/include/qpid/dispatch/vanflow.h index 3e2937c8c..3bc5199ec 100644 --- a/include/qpid/dispatch/vanflow.h +++ b/include/qpid/dispatch/vanflow.h @@ -51,6 +51,7 @@ typedef enum vflow_record_type { VFLOW_RECORD_PROCESS_GROUP = 0x0c, // A grouping of PROCESS VFLOW_RECORD_HOST = 0x0d, // Host (or Kubernetes Node) on which a process runs VFLOW_RECORD_LOG = 0x0e, // A notable router log event such as an error or warning + VFLOW_RECORD_ACCESS_POINT = 0x0f, // An access point for inter-router connections } vflow_record_type_t; // clang-format off @@ -121,13 +122,17 @@ typedef enum vflow_attribute { VFLOW_ATTRIBUTE_LOG_TEXT = 49, // String VFLOW_ATTRIBUTE_SOURCE_FILE = 50, // String VFLOW_ATTRIBUTE_SOURCE_LINE = 51, // uint + + VFLOW_ATTRIBUTE_LINK_COUNT = 52, // uint/counter + VFLOW_ATTRIBUTE_OPER_STATUS = 53, // String + VFLOW_ATTRIBUTE_ROLE = 54, // String } vflow_attribute_t; // clang-format on #define VALID_REF_ATTRS 0x00006000000000e6 -#define VALID_UINT_ATTRS 0x00099ffa07800119 -#define VALID_COUNTER_ATTRS 0x0000035000800000 -#define VALID_STRING_ATTRS 0x00060005787ffe00 +#define VALID_UINT_ATTRS 0x00199ffa07800119 +#define VALID_COUNTER_ATTRS 0x0010035000800000 +#define VALID_STRING_ATTRS 0x00660005787ffe00 #define VALID_TRACE_ATTRS 0x0000000080000000 typedef enum vflow_log_severity { diff --git a/src/adaptors/amqp/amqp_adaptor.c b/src/adaptors/amqp/amqp_adaptor.c index ff14e14d2..cb02454bc 100644 --- a/src/adaptors/amqp/amqp_adaptor.c +++ b/src/adaptors/amqp/amqp_adaptor.c @@ -20,6 +20,7 @@ #include "private.h" #include "qd_connector.h" #include "qd_connection.h" +#include "qd_listener.h" #include "container.h" #include "node_type.h" @@ -2366,8 +2367,12 @@ static void qd_amqp_adaptor_final(void *adaptor_context) if (ctx->policy_settings) qd_policy_settings_free(ctx->policy_settings); if (ctx->connector) { - ctx->connector->qd_conn = 0; - qd_connector_decref(ctx->connector); + qd_connector_remove_connection(ctx->connector); + ctx->connector = 0; + } + if (ctx->listener) { + qd_listener_remove_connection(ctx->listener, ctx); + ctx->listener = 0; } sys_atomic_destroy(&ctx->wake_core); sys_atomic_destroy(&ctx->wake_cutthrough_inbound); diff --git a/src/adaptors/amqp/connection_manager.c b/src/adaptors/amqp/connection_manager.c index 0f9744a9f..e7422b72d 100644 --- a/src/adaptors/amqp/connection_manager.c +++ b/src/adaptors/amqp/connection_manager.c @@ -30,6 +30,7 @@ #include "qpid/dispatch/ctools.h" #include "qpid/dispatch/failoverlist.h" #include "qpid/dispatch/threading.h" +#include "qpid/dispatch/vanflow.h" #include @@ -147,7 +148,7 @@ static void log_config(qd_server_config_t *c, const char *what, bool create) QD_EXPORT qd_listener_t *qd_dispatch_configure_listener(qd_dispatch_t *qd, qd_entity_t *entity) { qd_connection_manager_t *cm = qd->connection_manager; - qd_listener_t *li = qd_server_listener(qd->server); + qd_listener_t *li = qd_listener(qd->server); if (!li || qd_server_config_load(qd, &li->config, entity, true, 0) != QD_ERROR_NONE) { qd_log(LOG_CONN_MGR, QD_LOG_ERROR, "Unable to create listener: %s", qd_error_message()); qd_listener_decref(li); @@ -166,6 +167,19 @@ QD_EXPORT qd_listener_t *qd_dispatch_configure_listener(qd_dispatch_t *qd, qd_en } else { li->config.failover_list = 0; } + + // + // Set up the vanflow record for this listener (ACCESS_POINT). + // Do this only for router-to-routers links: not mgmt/metrics/healthz/websockets listeners + // + if (strcmp(li->config.role, "inter-router") == 0 || + strcmp(li->config.role, "edge") == 0 || + strcmp(li->config.role, "inter-edge") == 0) { + li->vflow_record = vflow_start_record(VFLOW_RECORD_ACCESS_POINT, 0); + vflow_set_string(li->vflow_record, VFLOW_ATTRIBUTE_ROLE, li->config.role); + vflow_set_uint64(li->vflow_record, VFLOW_ATTRIBUTE_LINK_COUNT, 0); + } + DEQ_ITEM_INIT(li); DEQ_INSERT_TAIL(cm->listeners, li); log_config(&li->config, "Listener", true); @@ -414,8 +428,17 @@ QD_EXPORT qd_connector_t *qd_dispatch_configure_connector(qd_dispatch_t *qd, qd_ item->host_port = malloc(hplen); snprintf(item->host_port, hplen, "%s:%s", item->host , item->port); - DEQ_INSERT_TAIL(ct->conn_info_list, item); + // + // Set up the vanflow record for this connector (LINK) + // + ct->vflow_record = vflow_start_record(VFLOW_RECORD_LINK, 0); + vflow_set_string(ct->vflow_record, VFLOW_ATTRIBUTE_ROLE, ct->config.role); + vflow_set_string(ct->vflow_record, VFLOW_ATTRIBUTE_OPER_STATUS, "down"); + vflow_set_string(ct->vflow_record, VFLOW_ATTRIBUTE_PROTOCOL, item->scheme); + vflow_set_string(ct->vflow_record, VFLOW_ATTRIBUTE_DESTINATION_HOST, item->host); + vflow_set_string(ct->vflow_record, VFLOW_ATTRIBUTE_DESTINATION_PORT, item->port); + DEQ_INSERT_TAIL(ct->conn_info_list, item); return ct; } diff --git a/src/adaptors/amqp/qd_connection.c b/src/adaptors/amqp/qd_connection.c index a0a9c9ae0..e8a02521b 100644 --- a/src/adaptors/amqp/qd_connection.c +++ b/src/adaptors/amqp/qd_connection.c @@ -27,6 +27,7 @@ #include "qpid/dispatch/proton_utils.h" #include "qpid/dispatch/protocol_adaptor.h" #include "qpid/dispatch/timer.h" +#include "qpid/dispatch/vanflow.h" #include #include @@ -445,10 +446,8 @@ static void decorate_connection(qd_connection_t *ctx, const qd_server_config_t * * Does not allocate any managed objects and therefore * does not take ENTITY_CACHE lock. */ -qd_connection_t *qd_server_connection_impl(qd_server_t *server, qd_server_config_t *config, qd_connection_t *ctx, qd_connector_t *connector) +void qd_connection_init(qd_connection_t *ctx, qd_server_t *server, qd_server_config_t *config, qd_connector_t *connector, qd_listener_t *listener) { - assert(ctx); - ZERO(ctx); ctx->pn_conn = pn_connection(); assert(ctx->pn_conn); sys_mutex_init(&ctx->deferred_call_lock); @@ -466,29 +465,23 @@ qd_connection_t *qd_server_connection_impl(qd_server_t *server, qd_server_config DEQ_INIT(ctx->deferred_calls); DEQ_INIT(ctx->free_link_session_list); + // note: setup connector or listener before decorating the connection since + // decoration involves accessing the connection's parent. + if (!!connector) { - ctx->connector = connector; - strncpy(ctx->group_correlator, connector->group_correlator, QD_DISCRIMINATOR_SIZE); + assert(!listener); + qd_connector_add_connection(connector, ctx); + } else if (!!listener) { + qd_listener_add_connection(listener, ctx); } + decorate_connection(ctx, config); sys_mutex_lock(&amqp_adaptor.lock); DEQ_INSERT_TAIL(amqp_adaptor.conn_list, ctx); sys_mutex_unlock(&amqp_adaptor.lock); - - return ctx; } -/* Construct a new qd_connection. Thread safe. - * Allocates a qd_connection_t object and therefore - * takes ENTITY_CACHE lock. - */ -qd_connection_t *qd_server_connection(qd_server_t *server, qd_server_config_t *config) -{ - qd_connection_t *ctx = new_qd_connection_t(); - if (!ctx) return NULL; - return qd_server_connection_impl(server, config, ctx, 0); -} qd_connector_t* qd_connection_connector(const qd_connection_t *c) { return c->connector; @@ -553,18 +546,21 @@ void qd_connection_set_user(qd_connection_t *conn) static void qd_connection_free(qd_connection_t *qd_conn) { - // If this is a dispatch connector uncouple it from the - // connection and restart the re-connect timer if necessary + sys_mutex_lock(&amqp_adaptor.lock); + DEQ_REMOVE(amqp_adaptor.conn_list, qd_conn); + sys_mutex_unlock(&amqp_adaptor.lock); + + // If this connection is from a connector uncouple it and restart the re-connect timer if necessary if (qd_conn->connector) { - qd_connector_release_connection(qd_conn->connector, qd_conn); - qd_connector_decref(qd_conn->connector); // drop connection's reference + qd_connector_remove_connection(qd_conn->connector); qd_conn->connector = 0; } - sys_mutex_lock(&amqp_adaptor.lock); - DEQ_REMOVE(amqp_adaptor.conn_list, qd_conn); - sys_mutex_unlock(&amqp_adaptor.lock); + if (qd_conn->listener) { + qd_listener_remove_connection(qd_conn->listener, qd_conn); + qd_conn->listener = 0; + } // If counted for policy enforcement, notify it has closed if (qd_conn->policy_counted) { diff --git a/src/adaptors/amqp/qd_connection.h b/src/adaptors/amqp/qd_connection.h index 668c8922a..3a371534e 100644 --- a/src/adaptors/amqp/qd_connection.h +++ b/src/adaptors/amqp/qd_connection.h @@ -115,8 +115,9 @@ struct qd_connection_t { ALLOC_DECLARE_SAFE(qd_connection_t); -qd_connection_t *qd_server_connection(qd_server_t *server, qd_server_config_t* config); -qd_connection_t *qd_server_connection_impl(qd_server_t *server, qd_server_config_t *config, qd_connection_t *ctx, qd_connector_t *connector); +// initialize a newly allocated qd_connection_t +void qd_connection_init(qd_connection_t *qd_conn, qd_server_t *server, qd_server_config_t *config, + qd_connector_t *connector, qd_listener_t *listener); qd_connector_t* qd_connection_connector(const qd_connection_t *c); diff --git a/src/adaptors/amqp/qd_connector.c b/src/adaptors/amqp/qd_connector.c index da738b58c..5c01e2013 100644 --- a/src/adaptors/amqp/qd_connector.c +++ b/src/adaptors/amqp/qd_connector.c @@ -23,6 +23,7 @@ #include "qpid/dispatch/alloc_pool.h" #include "qpid/dispatch/timer.h" +#include "qpid/dispatch/vanflow.h" #include @@ -46,22 +47,10 @@ static qd_failover_item_t *qd_connector_get_conn_info_lh(qd_connector_t *ct) TA_ /* Timer callback to try/retry connection open, connector->lock held */ -static void try_open_lh(qd_connector_t *connector, qd_connection_t *connection) TA_REQ(connector->lock) +static void try_open_lh(qd_connector_t *connector, qd_connection_t *qd_conn) TA_REQ(connector->lock) { - // connection until pn_proactor_connect is called below - qd_connection_t *qd_conn = qd_server_connection_impl(connector->server, &connector->config, connection, connector); - if (!qd_conn) { /* Try again later */ - qd_log(LOG_SERVER, QD_LOG_CRITICAL, "Allocation failure connecting to %s", - connector->config.host_port); - connector->delay = 10000; - connector->state = CXTR_STATE_CONNECTING; - qd_timer_schedule(connector->timer, connector->delay); - return; - } + qd_connection_init(qd_conn, connector->server, &connector->config, connector, 0); - sys_atomic_inc(&connector->ref_count); - - connector->qd_conn = qd_conn; connector->state = CXTR_STATE_OPEN; connector->delay = 5000; @@ -101,13 +90,7 @@ static void try_open_cb(void *context) // Allocate connection before taking connector lock to avoid // CONNECTOR - ENTITY_CACHE lock inversion deadlock window. qd_connection_t *ctx = new_qd_connection_t(); - if (!ctx) { - qd_log(LOG_SERVER, QD_LOG_CRITICAL, "Allocation failure connecting to %s", ct->config.host_port); - ct->delay = 10000; - ct->state = CXTR_STATE_CONNECTING; - qd_timer_schedule(ct->timer, ct->delay); - return; - } + ZERO(ctx); sys_mutex_lock(&ct->lock); @@ -185,6 +168,8 @@ void qd_connector_decref(qd_connector_t* connector) assert(connector->state == CXTR_STATE_DELETED); assert(connector->qd_conn == 0); + vflow_end_record(connector->vflow_record); + connector->vflow_record = 0; qd_server_config_free(&connector->config); qd_timer_free(connector->timer); sys_mutex_free(&connector->lock); @@ -291,16 +276,35 @@ void qd_connector_remote_opened(qd_connector_t *connector) sys_mutex_unlock(&connector->lock); } +/** + * Set the child connection of the connector + */ +void qd_connector_add_connection(qd_connector_t *connector, qd_connection_t *ctx) +{ + assert(ctx->connector == 0); + + sys_atomic_inc(&connector->ref_count); + ctx->connector = connector; + connector->qd_conn = ctx; + + strncpy(ctx->group_correlator, connector->group_correlator, QD_DISCRIMINATOR_SIZE); + vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_OPER_STATUS, "up"); +} + /** * The connection associated with this connector is about to be freed, * clean up all related state */ -void qd_connector_release_connection(qd_connector_t *connector, qd_connection_t *qd_conn) +void qd_connector_remove_connection(qd_connector_t *connector) { sys_mutex_lock(&connector->lock); - assert(connector->qd_conn == qd_conn); - connector->qd_conn = 0; // this connection to be freed + + qd_connection_t *ctx = connector->qd_conn; + vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_OPER_STATUS, "down"); + connector->qd_conn = 0; + ctx->connector = 0; + if (connector->state != CXTR_STATE_DELETED) { // Increment the connection index by so that we can try connecting to the failover url (if any). bool has_failover = qd_connector_has_failover_info(connector); @@ -316,4 +320,7 @@ void qd_connector_release_connection(qd_connector_t *connector, qd_connection_t qd_timer_schedule(connector->timer, delay); } sys_mutex_unlock(&connector->lock); + + // Drop reference held by connection. + qd_connector_decref(connector); } diff --git a/src/adaptors/amqp/qd_connector.h b/src/adaptors/amqp/qd_connector.h index 3ab17f2a5..5b6ff2a5e 100644 --- a/src/adaptors/amqp/qd_connector.h +++ b/src/adaptors/amqp/qd_connector.h @@ -32,6 +32,7 @@ typedef struct qd_timer_t qd_timer_t; typedef struct qd_server_t qd_server_t; typedef struct qd_connection_t qd_connection_t; +typedef struct vflow_record_t vflow_record_t; typedef enum { CXTR_STATE_INIT = 0, @@ -58,6 +59,7 @@ typedef struct qd_connector_t { sys_mutex_t lock; cxtr_state_t state; qd_connection_t *qd_conn; + vflow_record_t *vflow_record; /* This conn_list contains all the connection information needed to make a connection. It also includes failover connection information */ qd_failover_item_list_t conn_info_list; @@ -94,5 +96,12 @@ bool qd_connector_has_failover_info(const qd_connector_t* ct); const char *qd_connector_policy_vhost(const qd_connector_t* ct); void qd_connector_handle_transport_error(qd_connector_t *connector, uint64_t connection_id, pn_condition_t *condition); void qd_connector_remote_opened(qd_connector_t *connector); -void qd_connector_release_connection(qd_connector_t *connector, qd_connection_t *qd_conn); + +// add a new connection to the parent connector +void qd_connector_add_connection(qd_connector_t *connector, qd_connection_t *ctx); + +// remove the child connection +// NOTE WELL: this may free the connector if the connection is holding the last +// reference to it +void qd_connector_remove_connection(qd_connector_t *connector); #endif diff --git a/src/adaptors/amqp/qd_listener.c b/src/adaptors/amqp/qd_listener.c index 39422f6de..be891f82c 100644 --- a/src/adaptors/amqp/qd_listener.c +++ b/src/adaptors/amqp/qd_listener.c @@ -23,6 +23,7 @@ #include "qpid/dispatch/server.h" #include "qpid/dispatch/log.h" +#include "qpid/dispatch/vanflow.h" #include #include @@ -39,17 +40,16 @@ static void on_accept(pn_event_t *e, qd_listener_t *listener) { assert(pn_event_type(e) == PN_LISTENER_ACCEPT); pn_listener_t *pn_listener = pn_event_listener(e); - qd_connection_t *ctx = qd_server_connection(listener->server, &listener->config); - if (!ctx) { - qd_log(LOG_SERVER, QD_LOG_CRITICAL, "Allocation failure during accept to %s", - listener->config.host_port); - return; - } - ctx->listener = listener; + + qd_connection_t *ctx = new_qd_connection_t(); + ZERO(ctx); + qd_connection_init(ctx, listener->server, &listener->config, 0, listener); qd_log(LOG_SERVER, QD_LOG_DEBUG, "[C%" PRIu64 "]: Accepting incoming connection to '%s'", ctx->connection_id, ctx->listener->config.host_port); /* Asynchronous accept, configure the transport on PN_CONNECTION_BOUND */ pn_listener_accept(pn_listener, ctx->pn_conn); + // Note well: at this point the connection may now be scheduled on another thread. + // Do not access it. } @@ -112,12 +112,13 @@ static void handle_listener(pn_event_t *e, qd_server_t *qd_server, void *context } } -qd_listener_t *qd_server_listener(qd_server_t *server) +qd_listener_t *qd_listener(qd_server_t *server) { qd_listener_t *li = new_qd_listener_t(); if (!li) return 0; ZERO(li); sys_atomic_init(&li->ref_count, 1); + sys_atomic_init(&li->connection_count, 0); li->server = server; li->http = NULL; li->type.context = li; @@ -164,6 +165,10 @@ bool qd_listener_listen(qd_listener_t *li) void qd_listener_decref(qd_listener_t* li) { if (li && sys_atomic_dec(&li->ref_count) == 1) { + if (!!li->vflow_record) { + vflow_end_record(li->vflow_record); + li->vflow_record = 0; + } qd_server_config_free(&li->config); free_qd_listener_t(li); } @@ -179,3 +184,24 @@ const qd_server_config_t *qd_listener_config(const qd_listener_t *li) return &li->config; } + +void qd_listener_add_connection(qd_listener_t *li, qd_connection_t *ctx) +{ + sys_atomic_inc(&li->ref_count); + ctx->listener = li; + if (!!li->vflow_record) { + uint32_t count = sys_atomic_inc(&li->connection_count) + 1; + vflow_set_uint64(li->vflow_record, VFLOW_ATTRIBUTE_LINK_COUNT, count); + } +} + +void qd_listener_remove_connection(qd_listener_t *li, qd_connection_t *ctx) +{ + assert(ctx->listener == li); + if (!!li->vflow_record) { + uint32_t count = sys_atomic_dec(&li->connection_count) - 1; + vflow_set_uint64(li->vflow_record, VFLOW_ATTRIBUTE_LINK_COUNT, count); + } + ctx->listener = 0; + qd_listener_decref(li); +} diff --git a/src/adaptors/amqp/qd_listener.h b/src/adaptors/amqp/qd_listener.h index 8c500d356..991ea2111 100644 --- a/src/adaptors/amqp/qd_listener.h +++ b/src/adaptors/amqp/qd_listener.h @@ -29,6 +29,8 @@ typedef struct qd_lws_listener_t qd_lws_listener_t; typedef struct qd_server_t qd_server_t; typedef struct pn_listener_t pn_listener_t; +typedef struct vflow_record_t vflow_record_t; +typedef struct qd_connection_t qd_connection_t; /** * Listener objects represent the desire to accept incoming AMQP transport connections. @@ -36,14 +38,16 @@ typedef struct pn_listener_t pn_listener_t; typedef struct qd_listener_t qd_listener_t; struct qd_listener_t { /* May be referenced by connection_manager and pn_listener_t */ - qd_handler_context_t type; - sys_atomic_t ref_count; + qd_handler_context_t type; + sys_atomic_t ref_count; + sys_atomic_t connection_count; qd_server_t *server; qd_server_config_t config; pn_listener_t *pn_listener; qd_lws_listener_t *http; DEQ_LINKS(qd_listener_t); bool exit_on_error; + vflow_record_t *vflow_record; }; DEQ_DECLARE(qd_listener_t, qd_listener_list_t); @@ -53,9 +57,17 @@ DEQ_DECLARE(qd_listener_t, qd_listener_list_t); * Listen for incoming connections, return true if listening succeeded. */ bool qd_listener_listen(qd_listener_t *l); -qd_listener_t *qd_server_listener(qd_server_t *server); +qd_listener_t *qd_listener(qd_server_t *server); void qd_listener_decref(qd_listener_t* ct); qd_lws_listener_t *qd_listener_http(const qd_listener_t *li); const qd_server_config_t *qd_listener_config(const qd_listener_t *li); +// add a new connection with the parent listener +void qd_listener_add_connection(qd_listener_t *li, qd_connection_t *ctx); + +// remove the connection with its parent listener +// NOTE WELL: may free the listener if this connection is holding the last reference +// to it +void qd_listener_remove_connection(qd_listener_t *li, qd_connection_t *qd_conn); + #endif diff --git a/src/http-libwebsockets.c b/src/http-libwebsockets.c index d97fb49b8..62bcd9eb6 100644 --- a/src/http-libwebsockets.c +++ b/src/http-libwebsockets.c @@ -1022,13 +1022,13 @@ static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason, if (hl == NULL || !hl->listener->config.websockets) { return unexpected_close(c->wsi, "cannot-upgrade"); } - c->qd_conn = qd_server_connection(hs->server, &hl->listener->config); - if (c->qd_conn == NULL) { - return unexpected_close(c->wsi, "out-of-memory"); - } + + c->qd_conn = new_qd_connection_t(); + ZERO(c->qd_conn); + qd_connection_init(c->qd_conn, hs->server, &hl->listener->config, 0, hl->listener); c->qd_conn->context = c; c->qd_conn->wake = connection_wake; - c->qd_conn->listener = hl->listener; + lws_get_peer_simple(wsi, c->qd_conn->rhost, sizeof(c->qd_conn->rhost)); int err = pn_connection_driver_init(&c->driver, c->qd_conn->pn_conn, NULL); if (err) { diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c index 70792dd2c..a622b97e2 100644 --- a/src/router_core/route_tables.c +++ b/src/router_core/route_tables.c @@ -243,7 +243,6 @@ void qdr_route_table_setup_CT(qdr_core_t *core) core->routers_by_mask_bit = NEW_PTR_ARRAY(qdr_node_t, qd_bitmask_width()); core->control_links_by_mask_bit = NEW_PTR_ARRAY(qdr_link_t, qd_bitmask_width()); core->rnode_conns_by_mask_bit = NEW_PTR_ARRAY(qdr_connection_t, qd_bitmask_width()); - core->vflow_links_by_mask_bit = NEW_PTR_ARRAY(vflow_record_t, qd_bitmask_width()); core->data_links_by_mask_bit = NEW_ARRAY(qdr_priority_sheaf_t, qd_bitmask_width()); DEQ_INIT(core->unallocated_group_members); core->group_correlator_by_maskbit = NEW_PTR_ARRAY(char, qd_bitmask_width()); @@ -253,7 +252,6 @@ void qdr_route_table_setup_CT(qdr_core_t *core) core->control_links_by_mask_bit[idx] = 0; core->data_links_by_mask_bit[idx].count = 0; core->rnode_conns_by_mask_bit[idx] = 0; - core->vflow_links_by_mask_bit[idx] = 0; for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) { core->data_links_by_mask_bit[idx].links[priority] = 0; } @@ -458,28 +456,6 @@ static void qdr_set_link_CT(qdr_core_t *core, qdr_action_t *action, bool discard // qdr_node_t *rnode = core->routers_by_mask_bit[router_maskbit]; - // - // Allocate a vFlow record for the link only if there is not already one in place. - // In the (misconfigured) case where there are multiple inter-router connections between - // a pair of routers, multiple calls to qdr_set_link_CT can occur without interleaved calls - // to qdr_remove_link_CT. See skupper-router issue #980. - // - if (core->vflow_links_by_mask_bit[router_maskbit] == 0) { - core->vflow_links_by_mask_bit[router_maskbit] = vflow_start_record(VFLOW_RECORD_LINK, 0); - } - - const char *rname = (const char*) qd_hash_key_by_handle(rnode->owning_addr->hash_handle); - vflow_set_string(core->vflow_links_by_mask_bit[router_maskbit], VFLOW_ATTRIBUTE_NAME, &rname[1]); - vflow_set_string(core->vflow_links_by_mask_bit[router_maskbit], VFLOW_ATTRIBUTE_MODE, "interior"); - - qdr_link_t *link = core->control_links_by_mask_bit[conn_maskbit]; - qdr_connection_t *conn = link->conn; - - if (!!conn) { - vflow_set_string(core->vflow_links_by_mask_bit[router_maskbit], VFLOW_ATTRIBUTE_DIRECTION, - conn->incoming ? "incoming" : "outgoing"); - } - // // Add the peer_link reference to the router record. // @@ -506,12 +482,6 @@ static void qdr_remove_link_CT(qdr_core_t *core, qdr_action_t *action, bool disc return; } - // - // Close out the protocol log record for this link. - // - vflow_end_record(core->vflow_links_by_mask_bit[router_maskbit]); - core->vflow_links_by_mask_bit[router_maskbit] = 0; - qdr_node_t *rnode = core->routers_by_mask_bit[router_maskbit]; rnode->conn_mask_bit = -1; } @@ -592,13 +562,6 @@ static void qdr_set_cost_CT(qdr_core_t *core, qdr_action_t *action, bool discard return; } - // - // Set the link cost in the protocol log record. - // - if (!!core->vflow_links_by_mask_bit[router_maskbit]) { - vflow_set_uint64(core->vflow_links_by_mask_bit[router_maskbit], VFLOW_ATTRIBUTE_LINK_COST, (uint64_t) cost); - } - qdr_node_t *rnode = core->routers_by_mask_bit[router_maskbit]; rnode->cost = cost; qdr_route_table_update_cost_CT(core, rnode); diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c index 10e533ce2..01186b8df 100644 --- a/src/router_core/router_core.c +++ b/src/router_core/router_core.c @@ -356,7 +356,6 @@ void qdr_core_free(qdr_core_t *core) if (core->data_links_by_mask_bit) free(core->data_links_by_mask_bit); if (core->neighbor_free_mask) qd_bitmask_free(core->neighbor_free_mask); if (core->rnode_conns_by_mask_bit) free(core->rnode_conns_by_mask_bit); - if (core->vflow_links_by_mask_bit) free(core->vflow_links_by_mask_bit); if (core->group_correlator_by_maskbit) { for (int idx = 0; idx < qd_bitmask_width(); idx++) { free(core->group_correlator_by_maskbit[idx]); diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 9231747e1..6fb36334a 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -878,7 +878,6 @@ struct qdr_core_t { qdr_node_t **routers_by_mask_bit; ///< indexed by qdr_node_t->mask_bit qdr_connection_t **rnode_conns_by_mask_bit; ///< inter-router conns indexed by conn->mask_bit qdr_link_t **control_links_by_mask_bit; ///< indexed by qdr_node_t->link_mask_bit, qdr_connection_t->mask_bit - vflow_record_t **vflow_links_by_mask_bit; ///< indexed by qdr_node_t->mask_bit qdr_priority_sheaf_t *data_links_by_mask_bit; ///< indexed by qdr_node_t->link_mask_bit, qdr_connection_t->mask_bit qdr_connection_list_t unallocated_group_members; ///< List of unallocated group members (i.e. before the group is given a maskbit) char **group_correlator_by_maskbit; ///< Group correlator number indexed by conn->maskbit diff --git a/src/vanflow.c b/src/vanflow.c index 5792aa56e..4b8d25003 100644 --- a/src/vanflow.c +++ b/src/vanflow.c @@ -685,6 +685,7 @@ static const char *_vflow_record_type_name(const vflow_record_t *record) case VFLOW_RECORD_PROCESS_GROUP : return "PROCESS_GROUP"; case VFLOW_RECORD_HOST : return "HOST"; case VFLOW_RECORD_LOG : return "LOG"; + case VFLOW_RECORD_ACCESS_POINT : return "ACCESS_POINT"; } return "UNKNOWN"; } @@ -745,6 +746,9 @@ static const char *_vflow_attribute_name(const vflow_attribute_data_t *data) case VFLOW_ATTRIBUTE_LOG_TEXT : return "logText"; case VFLOW_ATTRIBUTE_SOURCE_FILE : return "sourceFile"; case VFLOW_ATTRIBUTE_SOURCE_LINE : return "sourceLine"; + case VFLOW_ATTRIBUTE_LINK_COUNT : return "linkCount"; + case VFLOW_ATTRIBUTE_OPER_STATUS : return "operStatus"; + case VFLOW_ATTRIBUTE_ROLE : return "role"; } return "UNKNOWN"; } diff --git a/tests/cpp/cpp_system/test_listener_startup.cpp b/tests/cpp/cpp_system/test_listener_startup.cpp index 7bb543880..e45508598 100644 --- a/tests/cpp/cpp_system/test_listener_startup.cpp +++ b/tests/cpp/cpp_system/test_listener_startup.cpp @@ -44,7 +44,7 @@ void check_amqp_listener_startup_log_message(qd_server_config_t config, std::str CaptureCStream css(stderr); qdr.initialize("./minimal_trace.conf"); - qd_listener_t *li = qd_server_listener(qdr.qd->server); + qd_listener_t *li = qd_listener(qdr.qd->server); li->config = config; CHECK(qd_listener_listen(li)); @@ -71,7 +71,7 @@ void check_http_listener_startup_log_message(qd_server_config_t config, std::str CaptureCStream css(stderr); qdr.initialize("./minimal_trace.conf"); - qd_listener_t *li = qd_server_listener(qdr.qd->server); + qd_listener_t *li = qd_listener(qdr.qd->server); li->config = config; const bool http_supported = qd_server_http(qdr.qd->server) != nullptr; diff --git a/tests/system_tests_vflow.py b/tests/system_tests_vflow.py index 3e261580f..4efaf69b0 100644 --- a/tests/system_tests_vflow.py +++ b/tests/system_tests_vflow.py @@ -17,10 +17,15 @@ # under the License. # -from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, unittest, TestTimeout +from http1_tests import wait_tcp_listeners_up +from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, unittest +from system_test import TestTimeout, retry, Logger +from vanflow_snooper import VFlowSnooperThread from proton.handlers import MessagingHandler from proton.reactor import Container from proton import Message +from TCP_echo_server import TcpEchoServer + # # Various codepoints from the VanFlow specification. These are @@ -321,5 +326,180 @@ def run(self): Container(self).run() +class RecordCounter(dict): + """ + Counts the total number of each record type present in the list of records + """ + def __init__(self, records): + super(dict, self).__init__() + for record in records: + if 'RECORD_TYPE' in record: + rtype = record['RECORD_TYPE'] + self[rtype] = self.get(rtype, 0) + 1 + + +class VFlowInterRouterTest(TestCase): + """ + Verify that a multi-hop router network generates the proper Vanflow records + for the topology. + """ + @classmethod + def setUpClass(cls): + """ + Create a three router network: two interiors and an edge hanging off + one of the interior. + """ + super(VFlowInterRouterTest, cls).setUpClass() + + cls.inter_router_port = cls.tester.get_port() + cls.edge_router_port = cls.tester.get_port() + cls.tcp_listener_port = cls.tester.get_port() + cls.tcp_connector_port = cls.tester.get_port() + + configs = [ + # Router INTA + [ + ('router', {'id': 'INTA', + 'mode': 'interior', + 'dataConnectionCount': 4}), + ('listener', {'role': 'normal', + 'port': cls.tester.get_port()}), + ('connector', {'role': 'inter-router', + 'port': cls.inter_router_port}), + ('tcpConnector', {'host': '127.0.0.1', + 'port': cls.tcp_connector_port, + 'address': 'tcpServiceAddress'}), + # a dummy connector which never connects (operStatus == down) + ('connector', {'role': 'inter-router', + 'port': cls.tester.get_port()}), + # health-check listener + ('listener', {'role': 'normal', + 'host': '0.0.0.0', + 'port': cls.tester.get_port(), + 'http': 'true', + 'healthz': 'true'}) + ], + # Router INTB + [ + ('router', {'id': 'INTB', + 'mode': 'interior', + 'dataConnectionCount': 4}), + ('listener', {'role': 'normal', + 'port': cls.tester.get_port()}), + ('listener', {'role': 'inter-router', + 'port': cls.inter_router_port}), + ('listener', {'role': 'edge', + 'port': cls.edge_router_port}), + ], + # Router EdgeB + [ + ('router', {'id': 'EdgeB', + 'mode': 'edge'}), + ('listener', {'role': 'normal', + 'port': cls.tester.get_port()}), + ('connector', {'role': 'edge', + 'port': cls.edge_router_port}), + ('tcpListener', {'host': '0.0.0.0', + 'port': cls.tcp_listener_port, + 'address': 'tcpServiceAddress'}), + # metrics listener + ('listener', {'role': 'normal', + 'host': '0.0.0.0', + 'port': cls.tester.get_port(), + 'http': 'true', + 'metrics': 'true'}) + ] + ] + + # fire up the TCP echo server + + logger = Logger(title="VFlowEchoServer") + cls.echo_server = TcpEchoServer(port=cls.tcp_connector_port, logger=logger) + assert cls.echo_server.is_running + + # bring up the routers + + cls.inta = cls.tester.qdrouterd('INTA', Qdrouterd.Config(configs[0]), wait=False, cl_args=["-T"]) + cls.intb = cls.tester.qdrouterd('INTB', Qdrouterd.Config(configs[1]), wait=False, cl_args=["-T"]) + cls.edgeb = cls.tester.qdrouterd('EdgeB', Qdrouterd.Config(configs[2]), wait=False, cl_args=["-T"]) + cls.inta.wait_router_connected('INTB') + cls.intb.wait_router_connected('INTA') + cls.intb.is_edge_routers_connected() + cls.edgeb.wait_ports() + wait_tcp_listeners_up(cls.edgeb.addresses[0]) + + # start the vanflow event collector thread + + cls.snooper_thread = VFlowSnooperThread(cls.inta.addresses[0]) + + def _inta_check(self, records): + # Verify the expected records are present for router inta's configuration + counts = RecordCounter(records) + return counts.get('ROUTER') == 1 and \ + counts.get('CONNECTOR') == 1 and \ + counts.get('LISTENER') is None and \ + counts.get('LINK') == 2 and \ + counts.get('ACCESS_POINT') is None + + def _intb_check(self, records): + # Verify the expected records are present for router intb's configuration + counts = RecordCounter(records) + return counts.get('ROUTER') == 1 and \ + counts.get('CONNECTOR') is None and \ + counts.get('LISTENER') is None and \ + counts.get('LINK') is None and \ + counts.get('ACCESS_POINT') == 2 + + def _edgeb_check(self, records): + # Verify the expected records are present for router edgeb's configuration + counts = RecordCounter(records) + return counts.get('ROUTER') == 1 and \ + counts.get('CONNECTOR') is None and \ + counts.get('LISTENER') == 1 and \ + counts.get('LINK') == 2 and \ + counts.get('ACCESS_POINT') is None + + def _check_routers(self): + """ + Check the database of received events for each router to verify that + the expected records for that router are present + """ + inta_ok = False + intb_ok = False + edgeb_ok = False + if self.snooper_thread.sources_ready == 3: + results = self.snooper_thread.get_results() + for router, records in results.items(): + for record in records: + rtype = record.get('RECORD_TYPE') + if rtype == 'ROUTER': + name = record.get('NAME') + if 'INTA' in name: + inta_ok = self._inta_check(records) + elif 'INTB' in name: + intb_ok = self._intb_check(records) + elif 'EdgeB' in name: + edgeb_ok = self._edgeb_check(records) + return inta_ok and intb_ok and edgeb_ok + + def test_01_check_topology(self): + """ + Verify the records related to the router configuration and topology are + present and correct + """ + success = retry(self._check_routers, delay=1.0) + self.assertTrue(success, + f"Failed record check: {self.snooper_thread.get_results()}") + + @classmethod + def tearDownClass(cls): + cls.echo_server.wait() + + # we need to manually teardown the router to force the snooper thread + # to exit + cls.inta.teardown() + cls.snooper_thread.join(timeout=TIMEOUT) + + if __name__ == '__main__': unittest.main(main_module()) diff --git a/tests/vanflow_snooper.py b/tests/vanflow_snooper.py index 82151b957..3e959b94f 100755 --- a/tests/vanflow_snooper.py +++ b/tests/vanflow_snooper.py @@ -22,8 +22,10 @@ import json import logging import sys +from copy import deepcopy from os import path from threading import Thread +from threading import Lock from proton.handlers import MessagingHandler from proton.reactor import Container @@ -50,6 +52,7 @@ 12: "PROCESS_GROUP", 13: "HOST", 14: "LOG", + 15: "ACCESS_POINT" } RECORD_TYPE = 0 @@ -107,7 +110,10 @@ 48: "LOG_SEVERITY", 49: "LOG_TEXT", 50: "SOURCE_FILE", - 51: "SOURCE_LINE" + 51: "SOURCE_LINE", + 52: "LINK_COUNT", + 53: "OPER_STATUS", + 54: "ROLE" } @@ -153,11 +159,11 @@ def get_records(self): for key, value in attributes.items(): if key not in attribute_types: # need to update attribute_types with new type? - raise Exception(f"Missing VanFlow attribute type '{key}'") + raise Exception(f"Unknown VanFlow attribute type '{key}'") if key == RECORD_TYPE: if value not in record_types: # need to update record_types with new type? - raise Exception(f"Missing VanFlow record type '{value}'") + raise Exception(f"Unknown VanFlow record type '{value}'") record['RECORD_TYPE'] = record_types[value] else: record[attribute_types[key]] = value @@ -178,6 +184,7 @@ def __init__(self, address, idle_timeout=0): self.address = address # router address self.conn = None self.beacon_receiver = None + self.sources_lock = Lock() self.sources = {} self._error = None self._sources_subscribed = 0 @@ -199,13 +206,14 @@ def on_link_opened(self, event): addr = event.link.source.address logger.debug("%s link opened: %s", ltype, addr) - for name in self.sources.keys(): - if name in addr: - self.sources[name].links_pending -= 1 - assert self.sources[name].links_pending >= 0 - if self.sources[name].links_pending == 0: - self._sources_subscribed += 1 - logger.debug("%s sources ready", self.sources_ready) + with self.sources_lock: + for name in self.sources.keys(): + if name in addr: + self.sources[name].links_pending -= 1 + assert self.sources[name].links_pending >= 0 + if self.sources[name].links_pending == 0: + self._sources_subscribed += 1 + logger.debug("%s sources ready", self.sources_ready) def on_link_closed(self, event): if event.link.is_sender: @@ -272,6 +280,7 @@ def add_source(self, name, base_address, command_address): self.container.create_receiver(self.conn, base_address) self.container.create_receiver(self.conn, flow_address) sender = self.container.create_sender(self.conn, command_address) + assert self.sources_lock.locked() self.sources[name] = EventSource(name, base_address, sender) def handle_beacon(self, message): @@ -279,30 +288,34 @@ def handle_beacon(self, message): logger.debug("Beacon from %s", source_id) name = id_2_source(source_id) - if name not in self.sources: - self.add_source(name, message.properties['address'], message.properties['direct']) + with self.sources_lock: + if name not in self.sources: + self.add_source(name, message.properties['address'], message.properties['direct']) def handle_heartbeat(self, message): source_id = message.properties['id'] logger.debug("Heartbeat from %s", source_id) + sender = None name = id_2_source(source_id) - source = self.sources.get(name) - if source is not None: - source.idle_heartbeats += 1 - if self._idle_timeout > 0: - # Check all known sources for idle timeout - idle = True - for src in self.sources.values(): - if src.idle_heartbeats < self._idle_timeout: - idle = False - break - if idle: - logger.debug("Exiting due to idle_timeout") - self.exit() - return - - if source.sender.credit > 0: - source.sender.send(Message(subject='FLUSH')) + with self.sources_lock: + source = self.sources.get(name) + if source is not None: + source.idle_heartbeats += 1 + if self._idle_timeout > 0: + # Check all known sources for idle timeout + idle = True + for src in self.sources.values(): + if src.idle_heartbeats < self._idle_timeout: + idle = False + break + if idle: + logger.debug("Exiting due to idle_timeout") + self.exit() + return + sender = source.sender + + if sender is not None and sender.credit > 0: + sender.send(Message(subject='FLUSH')) def handle_records(self, message): for record in message.body: @@ -311,20 +324,21 @@ def handle_records(self, message): err = f"ERROR: received record with no id: {record}" logger.error(err) self.exit(err) - source = self.sources.get(id_2_source(identity)) - if source is None: - err = f"ERROR: source {identity} not in sources!!" - logger.error(err) - self.exit(err) - - if identity not in source.records: - logger.debug("New record: %s", identity) - self._total_records += 1 - source.records[identity] = record - source.idle_heartbeats = 0 # new record - else: - logger.debug("Record update: %s", identity) - source.records[identity].update(record) + with self.sources_lock: + source = self.sources.get(id_2_source(identity)) + if source is None: + err = f"ERROR: source {identity} not in sources!!" + logger.error(err) + self.exit(err) + + if identity not in source.records: + logger.debug("New record: %s", identity) + self._total_records += 1 + source.records[identity] = record + source.idle_heartbeats = 0 # new record + else: + logger.debug("Record update: %s", identity) + source.records[identity].update(record) def get_results(self): """ @@ -333,8 +347,9 @@ def get_results(self): """ results = {} if self.error is None: - for source_id, event in self.sources.items(): - results[f"{source_id}:0"] = event.get_records() + with self.sources_lock: + for source_id, event in self.sources.items(): + results[f"{source_id}:0"] = deepcopy(event.get_records()) return results