diff --git a/docs/notes/prometheus.adoc b/docs/notes/prometheus.adoc new file mode 100644 index 000000000..e9e6284bb --- /dev/null +++ b/docs/notes/prometheus.adoc @@ -0,0 +1,111 @@ +//// +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License +//// + += Monitoring Router Metrics Via Prometheus + +The router can be configured to enable metrics scraping via +Prometheus. Metrics are provided via an HTTP service running in the +router. A snapshot of the metrics can be obtained by issuing an HTTP +GET request to the service for the */metrics* URL path. + +== Configuration + +Prometheus support is disabled by default. To enable metrics access an +HTTP service must be configured on the router. This is done by +specifying an *io.skupper.router.listener* entry in the router +configuration (or via in-band management). The listener entry must +provide: + +* The host IP address/name +* The TCP port number +* The _http_ attribute set to True + +For example, the following listener entry enables an HTTP server +listening on localhost port 22967. + + listener { + port: 22976 + http: True + host: localhost + saslMechanisms: ANONYMOUS + idleTimeoutSeconds: 120 + authenticatePeer: no + role: normal + } + +The Prometheus server must also be configured to scrape the +router. This requires adding a job in the Prometheus server's +*scrape-config* configuration for the router. An example job +configuration for the above example listener could be: + + scrape_configs: + - job_name: skupper-router + metrics_path: /metrics + static_configs: + - targets: + - localhost:22976 + +== Metrics + +The metrics provided by the router are intended for use by developers +to aid fault monitoring and debugging. Therefore the metrics content +may change between releases as features are added or removed. + +=== Heap Allocation Metrics + +A subset of the router metrics are concerned with the router's heap +memory utilization. The router uses a cache to manage instances of +data objects that have been allocated from the heap. This cache avoids +the overhead of allocating and freeing frequently used data objects +from the system's heap. + +See alloc_pool.c for implementation details. + +The cache is a pool of data objects that have been allocated from the +heap for use by the router. Each data type has its own dedicated +cache. When the router needs an instance of said data type it will +first attempt to claim an object from the cache. If the cache is +empty, the router will instead allocate a batch of data objects from +the system heap. It will reserve one data object instance from the +batch for immediate use and place the remaining into the cache. When +the router no longer needs a particular instance of a data object it +will be placed back into the cache and can be re-used at a later time. + +Given this implementation, a particular instance of a data object may +be either: + +* in the cache (in standby - available for use when needed) +* or currently in use by the router. + +Each data type will have a set of 4 metrics associated with it: + +* allocated: total number of objects that are currently allocated from the heap +* in_use: total objects currently being used by the router +* cached: total objects in the cache +* memory_use_bytes: the sum of all memory allocated from the heap for the given data type + +These metrics adhere to the following relationships: + +* allocated = in_use + cached +* memory_use_bytes = (sizeof() * allocated) + + + + + diff --git a/include/qpid/dispatch/alloc_pool.h b/include/qpid/dispatch/alloc_pool.h index 49c9efb61..f30c3e3de 100644 --- a/include/qpid/dispatch/alloc_pool.h +++ b/include/qpid/dispatch/alloc_pool.h @@ -122,7 +122,7 @@ static inline void *qd_alloc_deref_safe_ptr(const qd_alloc_safe_ptr_t *sp) */ void qd_alloc_desc_init(const char *name, qd_alloc_type_desc_t *desc, size_t size, const size_t *additional_size, const qd_alloc_config_t *config); -qd_alloc_stats_t qd_alloc_desc_stats(qd_alloc_type_desc_t *desc); +qd_alloc_stats_t qd_alloc_desc_stats(const qd_alloc_type_desc_t *desc); // thread safe // clang-format off #define ALLOC_DEFINE_CONFIG(T,S,A,C) \ qd_alloc_type_desc_t __desc_##T __attribute__((aligned(64))); \ @@ -152,5 +152,5 @@ qd_alloc_stats_t qd_alloc_desc_stats(qd_alloc_type_desc_t *desc); void qd_alloc_initialize(void); void qd_alloc_debug_dump(const char *file); void qd_alloc_finalize(void); - +size_t qd_alloc_type_size(const qd_alloc_type_desc_t *desc); // thread safe #endif diff --git a/src/alloc_pool.c b/src/alloc_pool.c index 86a95f6aa..1ea9ef2cc 100644 --- a/src/alloc_pool.c +++ b/src/alloc_pool.c @@ -24,6 +24,7 @@ #include "config.h" #include "entity.h" #include "entity_cache.h" +#include "http.h" #include "qd_asan_interface.h" #include "qpid/dispatch/alloc.h" @@ -500,9 +501,17 @@ void qd_alloc_initialize(void) desc->debug = (void *) items; #endif + // cycle the lock to flush the initialized desc before handing it off to other threads (avoids a spurious tsan + // error) + + sys_mutex_lock(&desc->lock); + sys_mutex_unlock(&desc->lock); + // now add the descriptor to the management entity database + // and telemetry metrics qd_entity_cache_add(QD_ALLOCATOR_TYPE, desc); + qd_http_add_alloc_metric(desc->type_name, desc); } #ifdef QD_MEMORY_DEBUG @@ -545,6 +554,7 @@ void qd_alloc_finalize(void) for (qd_alloc_type_desc_t *desc = DEQ_HEAD(desc_list); desc; desc = DEQ_NEXT(desc)) { qd_entity_cache_remove(QD_ALLOCATOR_TYPE, desc); + qd_http_remove_alloc_metric(desc->type_name); // // Reclaim the items on the global free pool @@ -672,15 +682,21 @@ QD_EXPORT qd_error_t qd_entity_refresh_allocator(qd_entity_t* entity, void *impl return qd_error_code(); } -qd_alloc_stats_t qd_alloc_desc_stats(qd_alloc_type_desc_t *desc) +qd_alloc_stats_t qd_alloc_desc_stats(const qd_alloc_type_desc_t *desc) { - sys_mutex_lock(&desc->lock); + sys_mutex_t *lock = (sys_mutex_t *) &desc->lock; // cast away const + sys_mutex_lock(lock); qd_alloc_stats_t stats = desc->stats; - sys_mutex_unlock(&desc->lock); + sys_mutex_unlock(lock); return stats; } +size_t qd_alloc_type_size(const qd_alloc_type_desc_t *desc) +{ + return desc->total_size; +} + void qd_alloc_debug_dump(const char *file) { debug_dump = file ? strdup(file) : 0; } diff --git a/src/http-libwebsockets.c b/src/http-libwebsockets.c index 3921c3882..b6690f309 100644 --- a/src/http-libwebsockets.c +++ b/src/http-libwebsockets.c @@ -22,8 +22,10 @@ #include "server_private.h" #include "qd_connection.h" +#include "qpid/dispatch/alloc_pool.h" #include "qpid/dispatch/amqp.h" #include "qpid/dispatch/atomic.h" +#include "qpid/dispatch/ctools.h" #include "qpid/dispatch/protocol_adaptor.h" #include "qpid/dispatch/threading.h" #include "qpid/dispatch/timer.h" @@ -96,18 +98,28 @@ typedef struct connection_t { struct lws *wsi; } connection_t; +// Instantiated for every HTTP request, this holds the statistics to be written in the response +// typedef struct stats_request_state_t { - bool callback_completed; - bool wsi_deleted; + bool callback_completed; // T: the core has written the global statistics to the stats field + bool wsi_deleted; // T: client has closed, may release this state instance qdr_global_stats_t stats; qd_http_server_t *server; struct lws *wsi; + size_t buffer_size; // extra octets past lws_prefix[LWS_PRE] for HTTP output + uint8_t lws_prefix[LWS_PRE]; + // buffer_size extra octets are appended to this structure when it is allocated. This space is used for the HTTP + // response. See new_stats_request_state(), Use &lws_prefix[LWS_PRE] as the start of output buffer. } stats_request_state_t; +static stats_request_state_t *new_stats_request_state(size_t buffer_size); +static void free_stats_request_state(stats_request_state_t *); +// Context passed to metrics and healthz protocol callbacks. Instantiated by the LWS thread, represents a single HTTP +// request transaction. +// typedef struct stats_t { - size_t current; - bool headers_sent; - stats_request_state_t *context; + stats_request_state_t *state; + bool response_complete; // T: HTTP response sent } stats_t; /* Navigating from WSI pointer to qd objects */ @@ -445,14 +457,53 @@ static void connection_wake(qd_connection_t *qd_conn) } } +// +// Metrics +// +// Metrics are reported via an HTTP get request on the url "http:///metrics". The metrics in the HTTP response +// are formatted for consumption by Prometheus - see the description "exposition formats" at the Prometheus website for +// details. +// +// Each metric is rendered as ASCII text. Two lines of text are generated for each metric. The format of these lines +// are (minus quotes): +// +// "# TYPE \n" +// " \n" +// +// Currently all metric values are uint64_t integers, and the metric-type is either "counter" or "gauge". Counters are +// those metrics that only increase (may allow reset to zero). Gauges are those metrics whose values may increase or +// decrease over time. +// +// The current http-response body buffering implementation is... interesting. When a request arrives all counters are +// fetched then rendered into an output buffer. After all metrics have been written to the buffer the buffer is written +// to the LWS internal network buffer(s). Metrics can either be fetch synchronously (alloc-pool metrics) or require an +// asynchronous callback (router core metrics). The output buffer rendering and writing all occur on the http thread - +// see callback_metrics(). +// +// Given this implementation it is necessary to ensure that the output buffer is large enough to hold all metrics. The +// following definitions are used to compute the necessary buffer size. These values may need updating occasionally +// should metric be added/removed. I've added many debug asserts to prevent accidental buffer overflow should the +// metrics not be updated properly. +// +// TODO(kgiusti): refactor this to use a smaller buffer with a chunked-output approach if possible with LWS. +#define MAX_METRIC_NAME_LEN 48 +#define MAX_METRIC_VALUE_LEN 20 // uint64_t in decimal +#define MAX_METRIC_TYPE_LEN 7 // strlen("counter") +#define PER_METRIC_BUF_SIZE ((2 * MAX_METRIC_NAME_LEN) + MAX_METRIC_VALUE_LEN + MAX_METRIC_TYPE_LEN + 11) +#define PER_ALLOC_METRIC_COUNT 4 // 4 metrics per alloc type + +#define HTTP_HEADER_LEN 128 // reserve space for headers added by LWS (128 is a guess, asserted in callback). +#define HEALTHZ_BUF_SIZE 2048 // for /healthz url response data + + /** - * Called on router worker thread + * Called on router worker thread: passes latest router stats to the http thread for processing */ static void handle_stats_results(void *context, bool discard) { stats_request_state_t* state = (stats_request_state_t*) context; if (state->wsi_deleted || discard) { - free(state); + free_stats_request_state(state); } else { qd_http_server_t *hs = state->server; if (hs) { @@ -463,90 +514,48 @@ static void handle_stats_results(void *context, bool discard) } /** - * Called on http thread + * Called on http thread: process the stats arriving from the router thread */ static void handle_stats_result_HT(stats_request_state_t* state) { if (state->wsi_deleted) { - free(state); + free_stats_request_state(state); } else { state->callback_completed = true; lws_callback_on_writable(state->wsi); } } -typedef int (*int_metric) (qdr_global_stats_t *stats); +typedef uint64_t (*uint64_metric) (const qdr_global_stats_t *stats); typedef struct metric_definition { const char* name; const char* type; - int_metric value; + uint64_metric get_value; } metric_definition; -typedef struct allocator_metric_definition { - const char* name; - qd_alloc_stats_t (*fn)(void); -} allocator_metric_definition; - -static int stats_get_connections(qdr_global_stats_t *stats) { return stats->connections; } -static int stats_get_links(qdr_global_stats_t *stats) { return stats->links; } -static int stats_get_addrs(qdr_global_stats_t *stats) { return stats->addrs; } -static int stats_get_routers(qdr_global_stats_t *stats) { return stats->routers; } -static int stats_get_auto_links(qdr_global_stats_t *stats) { return stats->auto_links; } -static int stats_get_presettled_deliveries(qdr_global_stats_t *stats) { return stats->presettled_deliveries; } -static int stats_get_dropped_presettled_deliveries(qdr_global_stats_t *stats) { return stats->dropped_presettled_deliveries; } -static int stats_get_accepted_deliveries(qdr_global_stats_t *stats) { return stats->accepted_deliveries; } -static int stats_get_released_deliveries(qdr_global_stats_t *stats) { return stats->released_deliveries; } -static int stats_get_rejected_deliveries(qdr_global_stats_t *stats) { return stats->rejected_deliveries; } -static int stats_get_modified_deliveries(qdr_global_stats_t *stats) { return stats->modified_deliveries; } -static int stats_get_deliveries_ingress(qdr_global_stats_t *stats) { return stats->deliveries_ingress; } -static int stats_get_deliveries_egress(qdr_global_stats_t *stats) { return stats->deliveries_egress; } -static int stats_get_deliveries_transit(qdr_global_stats_t *stats) { return stats->deliveries_transit; } -static int stats_get_deliveries_ingress_route_container(qdr_global_stats_t *stats) { return stats->deliveries_ingress_route_container; } -static int stats_get_deliveries_egress_route_container(qdr_global_stats_t *stats) { return stats->deliveries_egress_route_container; } -static int stats_get_deliveries_delayed_1sec(qdr_global_stats_t *stats) { return stats->deliveries_delayed_1sec; } -static int stats_get_deliveries_delayed_10sec(qdr_global_stats_t *stats) { return stats->deliveries_delayed_10sec; } -static int stats_get_deliveries_stuck(qdr_global_stats_t *stats) { return stats->deliveries_stuck; } -static int stats_get_links_blocked(qdr_global_stats_t *stats) { return stats->links_blocked; } -static int stats_get_deliveries_redirected_to_fallback(qdr_global_stats_t *stats) { return stats->deliveries_redirected_to_fallback; } - -qd_alloc_stats_t alloc_stats_qd_bitmask_t(void); -qd_alloc_stats_t alloc_stats_qd_buffer_t(void); -qd_alloc_stats_t alloc_stats_qd_composed_field_t(void); -qd_alloc_stats_t alloc_stats_qd_composite_t(void); -qd_alloc_stats_t alloc_stats_qd_connection_t(void); -qd_alloc_stats_t alloc_stats_qd_hash_handle_t(void); -qd_alloc_stats_t alloc_stats_qd_hash_item_t(void); -qd_alloc_stats_t alloc_stats_qd_iterator_t(void); -qd_alloc_stats_t alloc_stats_qd_link_ref_t(void); -qd_alloc_stats_t alloc_stats_qd_link_t(void); -qd_alloc_stats_t alloc_stats_qd_listener_t(void); -qd_alloc_stats_t alloc_stats_qd_log_entry_t(void); -qd_alloc_stats_t alloc_stats_qd_management_context_t(void); -qd_alloc_stats_t alloc_stats_qd_message_content_t(void); -qd_alloc_stats_t alloc_stats_qd_message_t(void); -qd_alloc_stats_t alloc_stats_qd_node_t(void); -qd_alloc_stats_t alloc_stats_qd_parse_node_t(void); -qd_alloc_stats_t alloc_stats_qd_parsed_field_t(void); -qd_alloc_stats_t alloc_stats_qd_timer_t(void); -qd_alloc_stats_t alloc_stats_qdr_action_t(void); -qd_alloc_stats_t alloc_stats_qdr_address_config_t(void); -qd_alloc_stats_t alloc_stats_qdr_address_t(void); -qd_alloc_stats_t alloc_stats_qdr_connection_info_t(void); -qd_alloc_stats_t alloc_stats_qdr_connection_t(void); -qd_alloc_stats_t alloc_stats_qdr_connection_work_t(void); -qd_alloc_stats_t alloc_stats_qdr_core_timer_t(void); -qd_alloc_stats_t alloc_stats_qdr_delivery_cleanup_t(void); -qd_alloc_stats_t alloc_stats_qdr_delivery_ref_t(void); -qd_alloc_stats_t alloc_stats_qdr_delivery_t(void); -qd_alloc_stats_t alloc_stats_qdr_field_t(void); -qd_alloc_stats_t alloc_stats_qdr_general_work_t(void); -qd_alloc_stats_t alloc_stats_qdr_link_ref_t(void); -qd_alloc_stats_t alloc_stats_qdr_link_t(void); -qd_alloc_stats_t alloc_stats_qdr_link_work_t(void); -qd_alloc_stats_t alloc_stats_qdr_query_t(void); -qd_alloc_stats_t alloc_stats_qdr_terminus_t(void); - -static struct metric_definition metrics[] = { +static uint64_t stats_get_connections(const qdr_global_stats_t *stats) { return stats->connections; } +static uint64_t stats_get_links(const qdr_global_stats_t *stats) { return stats->links; } +static uint64_t stats_get_addrs(const qdr_global_stats_t *stats) { return stats->addrs; } +static uint64_t stats_get_routers(const qdr_global_stats_t *stats) { return stats->routers; } +static uint64_t stats_get_auto_links(const qdr_global_stats_t *stats) { return stats->auto_links; } +static uint64_t stats_get_presettled_deliveries(const qdr_global_stats_t *stats) { return stats->presettled_deliveries; } +static uint64_t stats_get_dropped_presettled_deliveries(const qdr_global_stats_t *stats) { return stats->dropped_presettled_deliveries; } +static uint64_t stats_get_accepted_deliveries(const qdr_global_stats_t *stats) { return stats->accepted_deliveries; } +static uint64_t stats_get_released_deliveries(const qdr_global_stats_t *stats) { return stats->released_deliveries; } +static uint64_t stats_get_rejected_deliveries(const qdr_global_stats_t *stats) { return stats->rejected_deliveries; } +static uint64_t stats_get_modified_deliveries(const qdr_global_stats_t *stats) { return stats->modified_deliveries; } +static uint64_t stats_get_deliveries_ingress(const qdr_global_stats_t *stats) { return stats->deliveries_ingress; } +static uint64_t stats_get_deliveries_egress(const qdr_global_stats_t *stats) { return stats->deliveries_egress; } +static uint64_t stats_get_deliveries_transit(const qdr_global_stats_t *stats) { return stats->deliveries_transit; } +static uint64_t stats_get_deliveries_ingress_route_container(const qdr_global_stats_t *stats) { return stats->deliveries_ingress_route_container; } +static uint64_t stats_get_deliveries_egress_route_container(const qdr_global_stats_t *stats) { return stats->deliveries_egress_route_container; } +static uint64_t stats_get_deliveries_delayed_1sec(const qdr_global_stats_t *stats) { return stats->deliveries_delayed_1sec; } +static uint64_t stats_get_deliveries_delayed_10sec(const qdr_global_stats_t *stats) { return stats->deliveries_delayed_10sec; } +static uint64_t stats_get_deliveries_stuck(const qdr_global_stats_t *stats) { return stats->deliveries_stuck; } +static uint64_t stats_get_links_blocked(const qdr_global_stats_t *stats) { return stats->links_blocked; } +static uint64_t stats_get_deliveries_redirected_to_fallback(const qdr_global_stats_t *stats) { return stats->deliveries_redirected_to_fallback; } + +static const struct metric_definition metrics[] = { {"qdr_connections_total", "gauge", stats_get_connections}, {"qdr_links_total", "gauge", stats_get_links}, {"qdr_addresses_total", "gauge", stats_get_addrs}, @@ -569,102 +578,152 @@ static struct metric_definition metrics[] = { {"qdr_links_blocked_total", "gauge", stats_get_links_blocked}, {"qdr_deliveries_redirected_to_fallback_total", "counter", stats_get_deliveries_redirected_to_fallback} }; -static size_t metrics_length = sizeof(metrics)/sizeof(metrics[0]); - -static struct allocator_metric_definition allocator_metrics[] = { - {"qdr_allocator_qd_bitmask_t", alloc_stats_qd_bitmask_t}, - {"qdr_allocator_qd_buffer_t", alloc_stats_qd_buffer_t}, - {"qdr_allocator_qd_composed_field_t", alloc_stats_qd_composed_field_t}, - {"qdr_allocator_qd_composite_t", alloc_stats_qd_composite_t}, - {"qdr_allocator_qd_connection_t", alloc_stats_qd_connection_t}, - {"qdr_allocator_qd_hash_handle_t", alloc_stats_qd_hash_handle_t}, - {"qdr_allocator_qd_hash_item_t", alloc_stats_qd_hash_item_t}, - {"qdr_allocator_qd_iterator_t", alloc_stats_qd_iterator_t}, - {"qdr_allocator_qd_link_ref_t", alloc_stats_qd_link_ref_t}, - {"qdr_allocator_qd_link_t", alloc_stats_qd_link_t}, - {"qdr_allocator_qd_listener_t", alloc_stats_qd_listener_t}, - {"qdr_allocator_qd_log_entry_t", alloc_stats_qd_log_entry_t}, - {"qdr_allocator_qd_management_context_t", alloc_stats_qd_management_context_t}, - {"qdr_allocator_qd_message_content_t", alloc_stats_qd_message_content_t}, - {"qdr_allocator_qd_message_t", alloc_stats_qd_message_t}, - {"qdr_allocator_qd_node_t", alloc_stats_qd_node_t}, - {"qdr_allocator_qd_parse_node_t", alloc_stats_qd_parse_node_t}, - {"qdr_allocator_qd_parsed_field_t", alloc_stats_qd_parsed_field_t}, - {"qdr_allocator_qd_timer_t", alloc_stats_qd_timer_t}, - {"qdr_allocator_qdr_action_t", alloc_stats_qdr_action_t}, - {"qdr_allocator_qdr_address_config_t", alloc_stats_qdr_address_config_t}, - {"qdr_allocator_qdr_address_t", alloc_stats_qdr_address_t}, - {"qdr_allocator_qdr_connection_info_t", alloc_stats_qdr_connection_info_t}, - {"qdr_allocator_qdr_connection_t", alloc_stats_qdr_connection_t}, - {"qdr_allocator_qdr_connection_work_t", alloc_stats_qdr_connection_work_t}, - {"qdr_allocator_qdr_core_timer_t", alloc_stats_qdr_core_timer_t}, - {"qdr_allocator_qdr_delivery_cleanup_t", alloc_stats_qdr_delivery_cleanup_t}, - {"qdr_allocator_qdr_delivery_ref_t", alloc_stats_qdr_delivery_ref_t}, - {"qdr_allocator_qdr_delivery_t", alloc_stats_qdr_delivery_t}, - {"qdr_allocator_qdr_field_t", alloc_stats_qdr_field_t}, - {"qdr_allocator_qdr_general_work_t", alloc_stats_qdr_general_work_t}, - {"qdr_allocator_qdr_link_ref_t", alloc_stats_qdr_link_ref_t}, - {"qdr_allocator_qdr_link_t", alloc_stats_qdr_link_t}, - {"qdr_allocator_qdr_link_work_t", alloc_stats_qdr_link_work_t}, - {"qdr_allocator_qdr_query_t", alloc_stats_qdr_query_t}, - {"qdr_allocator_qdr_terminus_t", alloc_stats_qdr_terminus_t} +static const size_t metrics_length = sizeof(metrics)/sizeof(metrics[0]); + +// +// Metrics provided by the alloc_pool memory object cache. +// +// The alloc_pool module will register a name and descriptor for each memory object maintained by the pool during +// initialization. This information can be used to gather the metrics associated with the given object. The alloc_pool +// will deregister these on shutdown. See qd_http_add/remove_alloc_metric(). +// +typedef struct allocator_metric_definition_t allocator_metric_definition_t; +struct allocator_metric_definition_t { + DEQ_LINKS(allocator_metric_definition_t); + const char *name; + const qd_alloc_type_desc_t *desc; }; -static size_t allocator_metrics_length = sizeof(allocator_metrics)/sizeof(allocator_metrics[0]); +DEQ_DECLARE(allocator_metric_definition_t, allocator_metric_definition_list_t); +static allocator_metric_definition_list_t allocator_metrics = DEQ_EMPTY; -#define ALLOC_DATA(S, F) ((allocator_field){#F, S.F}) +// Write a single metric to the output buffer. Advance (*start) past the written data (to the null terminator) and +// return the total octets written (not including null terminator). Return zero on error (abort() if debug build). +// +static size_t _write_metric(uint8_t **start, size_t available, const char *name, const char *type, uint64_t value) +{ + // if you modify this please update any buffer sizing info above -typedef struct allocator_field { - const char* name; - uint64_t value; -} allocator_field; + int rc1 = snprintf((char *) *start, available, "# TYPE %s %s\n", name, type); + if (rc1 < 0 || rc1 >= available) { // overrun! + assert(false); // you need to increase the output_buffer size! + return 0; + } + *start += rc1; + available -= rc1; -static bool write_stats(uint8_t **position, const uint8_t * const end, const char* name, const char* type, int value) + int rc2 = snprintf((char *) *start, available, "%s %" PRIu64 "\n", name, value); + if (rc2 < 0 || rc2 >= available) { // overrun! + assert(false); // you need to increase the output_buffer size! + return 0; + } + *start += rc2; + + return rc1 + rc2; +} + +// Write all the router global metrics to the output buffer. Return the total octets written (not including null +// terminator) or zero on error. +// +// On successful return (*start) will be advanced to the terminating null byte. +// +static size_t _write_global_metrics(const stats_request_state_t *state, uint8_t **start, size_t available) { - //11 chars + type + 2*name + 20 chars for int - // average metric name size is 30 bytes - // average metric type size is 8 bytes - // current number of metrics is 22 - // total metric buffer size = 22 * (11 + 8 + 2*30 + 20) = 2178 - size_t length = 11 + strlen(type) + strlen(name)*2 + 20; - if (end - *position >= length) { - *position += lws_snprintf((char*) *position, end - *position, "# TYPE %s %s\n", name, type); - *position += lws_snprintf((char*) *position, end - *position, "%s %i\n", name, value); - return true; - } else { - return false; + assert(state && state->callback_completed); + + const size_t save = available; + + for (int index = 0; index < metrics_length; ++index) { + const metric_definition *metric = &metrics[index]; + size_t rc = _write_metric(start, available, metric->name, metric->type, metric->get_value(&state->stats)); + if (rc == 0) { + return 0; // error writing, close the connection + } + available -= rc; } + + return save - available; } -static bool write_allocator_stats(uint8_t **position, const uint8_t * const end, const char* name, allocator_field field) + +// Write a single allocator metric to the output buffer. Generate the metric name using the name and subname. Return the +// total octets written (not including null terminator) or zero on error. +// +// On successful return (*start) will be advanced to the terminating null byte. +// +static size_t _write_allocator_metric(uint8_t **start, size_t available, const char *name, const char *subname, uint64_t value) { - // 30 chars (static) + 2*name + 2*field.name + 20 for int - // average allocator metric name size is 54 bytes (name:field.name) - // current number of metrics is 180 - // total allocator buffer size = 180 * (30 + 2*54 + 20) = 28440 - size_t length = 30 + strlen(name)*2 + strlen(field.name)*2 + 20; - if (end - *position >= length) { - *position += lws_snprintf((char*) *position, end - *position, "# TYPE %s:%s_bytes gauge\n", name, field.name); - *position += lws_snprintf((char*) *position, end - *position, "%s:%s_bytes %"PRIu64"\n", name, field.name, field.value); - return true; - } else { - return false; + char name_buffer[MAX_METRIC_NAME_LEN + 1]; + int rc = snprintf(name_buffer, sizeof(name_buffer), "%s:%s", name, subname); + if (rc < 0 || rc >= sizeof(name_buffer)) { // overrun! + assert(false); // you need to increase the output_buffer size! + return 0; } + + return _write_metric(start, available, name_buffer, "gauge", value); } -static bool write_metric(uint8_t **position, const uint8_t * const end, metric_definition* definition, qdr_global_stats_t* stats) +// Write all the allocator metrics to the output buffer. Return the total octets written (not including null terminator) +// or zero on error. +// +// On successful return (*start) will be advanced to the terminating null byte. +// +static size_t _write_allocator_metrics(uint8_t **start, size_t available) { - return write_stats(position, end, definition->name, definition->type, definition->value(stats)); + const size_t save = available; + uint64_t pool_total_bytes = 0; // total memory allocated across all types + + allocator_metric_definition_t *metric = DEQ_HEAD(allocator_metrics); + assert(metric); // unexpected if null no metrics? + + while (metric) { + qd_alloc_stats_t stats = qd_alloc_desc_stats(metric->desc); + uint64_t total_allocated = stats.total_alloc_from_heap - stats.total_free_to_heap; + uint64_t total_in_use = stats.held_by_threads; + uint64_t total_in_cache = total_allocated - total_in_use; + uint64_t total_bytes = total_allocated * qd_alloc_type_size(metric->desc); + + pool_total_bytes += total_bytes; + + size_t rc = _write_allocator_metric(start, available, metric->name, "total_allocated", total_allocated); + if (rc == 0) return 0; + available -= rc; + + rc = _write_allocator_metric(start, available, metric->name, "total_in_use", total_in_use); + if (rc == 0) return 0; + available -= rc; + + rc = _write_allocator_metric(start, available, metric->name, "total_in_cache", total_in_cache); + if (rc == 0) return 0; + available -= rc; + + rc = _write_allocator_metric(start, available, metric->name, "total_bytes", total_bytes); + if (rc == 0) return 0; + available -= rc; + + metric = DEQ_NEXT(metric); + } + + size_t rc = _write_metric(start, available, "alloc_pool_total_bytes", "gauge", pool_total_bytes); + if (rc == 0) return 0; + available -= rc; + + return save - available; } -static bool write_allocator_metric(uint8_t **position, const uint8_t * const end, allocator_metric_definition* definition) +// Gather the current metrics and write them to the output buffer. Return the total bytes written to the buffer (not +// including null terminator) or zero on error. +// +// On successful return *start is advanced to the terminating null byte +// +static size_t _generate_metrics_response(stats_request_state_t *state, uint8_t **start, const uint8_t * const end) { - qd_alloc_stats_t allocator_stats = definition->fn(); - if (!write_allocator_stats(position, end, definition->name, ALLOC_DATA(allocator_stats, total_alloc_from_heap))) return false; - if (!write_allocator_stats(position, end, definition->name, ALLOC_DATA(allocator_stats, total_free_to_heap))) return false; - if (!write_allocator_stats(position, end, definition->name, ALLOC_DATA(allocator_stats, held_by_threads))) return false; - if (!write_allocator_stats(position, end, definition->name, ALLOC_DATA(allocator_stats, batches_rebalanced_to_threads))) return false; - if (!write_allocator_stats(position, end, definition->name, ALLOC_DATA(allocator_stats, batches_rebalanced_to_global))) return false; - return true; + if (_write_global_metrics(state, start, end - *start) == 0 + || _write_allocator_metrics(start, end - *start) == 0) { + // error, close the connection + return 0; + } + + return end - *start; } static int add_header_by_name(struct lws *wsi, const char* name, const char* value, uint8_t** position, uint8_t* end) @@ -673,77 +732,106 @@ static int add_header_by_name(struct lws *wsi, const char* name, const char* val } static int callback_metrics(struct lws *wsi, enum lws_callback_reasons reason, - void *user, void *in, size_t len) + void *user, void *in, size_t len) { qd_http_server_t *hs = wsi_server(wsi); stats_t *stats = (stats_t*) user; - // rationale for buffer size is explained at write_stats and write_allocator_stats - uint8_t buffer[LWS_PRE + 30618]; - uint8_t *start = &buffer[LWS_PRE], *position = start, *end = &buffer[sizeof(buffer) - LWS_PRE - 1]; + + if (!stats) // ignore any non-http request events + return 0; switch (reason) { case LWS_CALLBACK_HTTP: { - stats->context = NEW(stats_request_state_t); - ZERO(stats->context); - stats->context->wsi = wsi; - stats->context->server = hs; + // New HTTP request received, setup per-request state with output buffer + assert(!stats->state); + // see the comments above regarding output buffer size for metrics: + size_t buf_size = HTTP_HEADER_LEN + // router global metrics: + + (metrics_length * PER_METRIC_BUF_SIZE) + // alloc_pool metrics (+ 1 for alloc_pool_total_bytes): + + (DEQ_SIZE(allocator_metrics) * PER_METRIC_BUF_SIZE * PER_ALLOC_METRIC_COUNT) + + PER_METRIC_BUF_SIZE + // 1 terminating null + + 1; + stats->state = new_stats_request_state(buf_size); + stats->state->wsi = wsi; + stats->state->server = hs; //request stats from core thread - qdr_request_global_stats(hs->core, &stats->context->stats, handle_stats_results, (void*) stats->context); + qdr_request_global_stats(hs->core, &stats->state->stats, handle_stats_results, (void*) stats->state); return 0; } case LWS_CALLBACK_HTTP_WRITEABLE: { - //encode stats into buffer - if (!stats->headers_sent) { - if (lws_add_http_header_status(wsi, HTTP_STATUS_OK, &position, end) - || add_header_by_name(wsi, "content-type:", "text/plain", &position, end) - || add_header_by_name(wsi, "connection:", "close", &position, end)) - return 1; - if (lws_finalize_http_header(wsi, &position, end)) - return 1; - stats->headers_sent = true; + // LWS HTTP server ready to send to HTTP response data + assert(stats->state); // expect LWS_CALLBACK_HTTP event occurs first! + + if (stats->response_complete) { // ignore spurious WRITABLE events once response complete + return 0; } - while (stats->current < metrics_length) { - if (write_metric(&position, end, &metrics[stats->current], &stats->context->stats)) { - stats->current++; - qd_log(LOG_HTTP, QD_LOG_DEBUG, "wrote metric %lu of %lu", stats->current, metrics_length); - } else { - qd_log(LOG_HTTP, QD_LOG_WARNING, "insufficient space in buffer"); - break; - } + if (!stats->state->callback_completed) { + // the asynchronous request for global metrics has not yet completed. When it does another + // LWS_CALLBACK_HTTP_WRITABLE event will be generated and then we can send the response. + return 0; } - int alloc_cur = 0; - while (alloc_cur < allocator_metrics_length) { - if (write_allocator_metric(&position, end, &allocator_metrics[alloc_cur])) { - qd_log(LOG_HTTP, QD_LOG_DEBUG, "wrote allocator metric %i of %lu", alloc_cur, - allocator_metrics_length); - alloc_cur++; - } else { - qd_log(LOG_HTTP, QD_LOG_WARNING, "insufficient space in buffer"); - break; - } + uint8_t *start = &stats->state->lws_prefix[LWS_PRE]; + uint8_t *end = start + stats->state->buffer_size; // first byte past buffer + + // encode stats into buffer + + if (lws_add_http_header_status(wsi, HTTP_STATUS_OK, &start, end) + || add_header_by_name(wsi, "content-type:", "text/plain", &start, end) + || add_header_by_name(wsi, "connection:", "close", &start, end) + || lws_finalize_http_header(wsi, &start, end)) { + + qd_log(LOG_HTTP, QD_LOG_WARNING, "Metrics request failed: cannot send headers"); + return 1; + } + + // if this fails make HTTP_HEADER_LEN larger (LWS does not document the required size) + assert(HTTP_HEADER_LEN >= (start - &stats->state->lws_prefix[LWS_PRE])); + + if (_generate_metrics_response(stats->state, &start, end) == 0) { + // Failed to generate output. This is not expected. Terminate the connection + qd_log(LOG_HTTP, QD_LOG_WARNING, "Metrics request failed: cannot access metrics"); + return 1; } - int n = (stats->current < metrics_length) || (alloc_cur < allocator_metrics_length) ? LWS_WRITE_HTTP : LWS_WRITE_HTTP_FINAL; - //write buffer - size_t available = position - start; - if (lws_write(wsi, (unsigned char*) start, available, n) != available) + // Write the entire output buffer to LWS in one call. Best I can tell from the docs this should not fail + // unless the connection has closed. + + size_t available = (size_t) (start - &stats->state->lws_prefix[LWS_PRE]); + size_t amount = lws_write(wsi, (unsigned char *) &stats->state->lws_prefix[LWS_PRE], + available, LWS_WRITE_HTTP_FINAL); + + if (amount < available) { + // according to the lws_write header, this is an error. It may return more than available, which is ok + qd_log(LOG_HTTP, QD_LOG_WARNING, "Metrics request failed: connection closed while writing"); + return 1; + } + + stats->response_complete = true; + + if (lws_http_transaction_completed(wsi)) { + // I do not think this is an error, but according to the examples we close the connection when this happens return 1; - if (n == LWS_WRITE_HTTP_FINAL) { - if (lws_http_transaction_completed(wsi)) return -1; - } else { - lws_callback_on_writable(wsi); } return 0; } + case LWS_CALLBACK_HTTP_DROP_PROTOCOL: case LWS_CALLBACK_CLOSED_HTTP: { - stats->context->wsi_deleted = true; - if (stats->context->callback_completed) { - free(stats->context); + // request complete (added DROP_PROTOCOL since we do not get CLOSED_HTTP from curl clients (?)) + if (stats->state) { + stats->state->wsi_deleted = true; + // if the callback is still running then we cannot free the state since the callback will access it. We rely + // on the callback to free the state in this case. See handle_stats_result_HT(). + if (stats->state->callback_completed) { + free_stats_request_state(stats->state); + stats->state = 0; + } } return 0; } @@ -758,50 +846,83 @@ static int callback_healthz(struct lws *wsi, enum lws_callback_reasons reason, { qd_http_server_t *hs = wsi_server(wsi); stats_t *stats = (stats_t*) user; - uint8_t buffer[LWS_PRE + 2048]; - uint8_t *start = &buffer[LWS_PRE], *position = start, *end = &buffer[sizeof(buffer) - LWS_PRE - 1]; + + if (!stats) // ignore any non-http request events + return 0; switch (reason) { case LWS_CALLBACK_HTTP: { - stats->context = NEW(stats_request_state_t); - ZERO(stats->context); - stats->context->wsi = wsi; - stats->context->server = hs; + assert(!stats->state); + stats->state = new_stats_request_state(HEALTHZ_BUF_SIZE); + stats->state->wsi = wsi; + stats->state->server = hs; //make dummy request for stats (pass in null ptr); this still exercises the //path through core thread and back through callback on io thread which is //a reasonable initial liveness check - qdr_request_global_stats(hs->core, 0, handle_stats_results, (void*) stats->context); + qdr_request_global_stats(hs->core, 0, handle_stats_results, (void*) stats->state); return 0; } case LWS_CALLBACK_HTTP_WRITEABLE: { - //encode stats into buffer - if (!stats->headers_sent) { - if (lws_add_http_header_status(wsi, HTTP_STATUS_OK, &position, end) - || add_header_by_name(wsi, "content-type:", "text/plain", &position, end) - || lws_add_http_header_content_length(wsi, 3, &position, end)) - return 1; - if (lws_finalize_http_header(wsi, &position, end)) - return 1; - stats->headers_sent = true; + assert(stats->state); // expect LWS_CALLBACK_HTTP event occurs first! + + if (stats->response_complete) { // ignore spurious WRITABLE events once response complete + return 0; + } + + if (!stats->state->callback_completed) { + // the asynchronous request for global metrics has not yet completed. When it does another + // LWS_CALLBACK_HTTP_WRITABLE event will be generated and then we can send the response. + return 0; + } + + uint8_t *start = &stats->state->lws_prefix[LWS_PRE]; + uint8_t *end = start + HEALTHZ_BUF_SIZE; // first byte past buffer + + // encode stats into buffer + + if (lws_add_http_header_status(wsi, HTTP_STATUS_OK, &start, end) + || add_header_by_name(wsi, "content-type:", "text/plain", &start, end) + || lws_add_http_header_content_length(wsi, 3, &start, end) + || lws_finalize_http_header(wsi, &start, end)) { + + qd_log(LOG_HTTP, QD_LOG_WARNING, "Healthz request failed: cannot send headers"); + return 1; } - position += lws_snprintf((char*) position, end - position, "OK\n"); - int n = LWS_WRITE_HTTP_FINAL; - //write buffer - size_t available = position - start; - if (lws_write(wsi, (unsigned char*) start, available, n) != available) + // if this fails make HTTP_HEADER_LEN larger (LWS does not document the required size) + assert(HTTP_HEADER_LEN >= (start - &stats->state->lws_prefix[LWS_PRE])); + + start += lws_snprintf((char*) start, end - start, "OK\n"); + + size_t available = (size_t) (start - &stats->state->lws_prefix[LWS_PRE]); + size_t amount = lws_write(wsi, (unsigned char *) &stats->state->lws_prefix[LWS_PRE], + available, LWS_WRITE_HTTP_FINAL); + if (amount < available) { + // according to the lws_write header, this is an error. It may return more than available, which is ok + qd_log(LOG_HTTP, QD_LOG_WARNING, "Healthz request failed: connection closed while writing"); + return 1; + } + + stats->response_complete = true; + + if (lws_http_transaction_completed(wsi)) { + // I do not think this is an error, but according to the examples we close the connection when this happens return 1; - else if (lws_http_transaction_completed(wsi)) - return -1; - else return 0; + } + + return 0; } + case LWS_CALLBACK_HTTP_DROP_PROTOCOL: // won't get CLOSED_HTTP from curl (?) case LWS_CALLBACK_CLOSED_HTTP: { - stats->context->wsi_deleted = true; - if (stats->context->callback_completed) { - free(stats->context); + if (stats->state) { + stats->state->wsi_deleted = true; + if (stats->state->callback_completed) { + free_stats_request_state(stats->state); + stats->state = 0; + } } return 0; } @@ -916,10 +1037,20 @@ static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason, #define DEFAULT_TICK 1000 -static void* http_thread_run(void* v) { +#ifndef NDEBUG +static int threads_running; +#endif + +static void* http_thread_run(void* v) +{ qd_http_server_t *hs = v; qd_log(LOG_HTTP, QD_LOG_INFO, "HTTP server thread running"); int result = 0; + +#ifndef NDEBUG + ++threads_running; +#endif + while(result >= 0) { /* Send a USER event to run transport ticks, may decrease hs->next_tick. */ hs->now = qd_timer_now(); @@ -956,6 +1087,11 @@ static void* http_thread_run(void* v) { } } } + +#ifndef NDEBUG + --threads_running; +#endif + qd_log(LOG_HTTP, QD_LOG_INFO, "HTTP server thread exit"); return NULL; } @@ -1026,6 +1162,7 @@ qd_lws_listener_t *qd_http_server_listen(qd_http_server_t *hs, qd_listener_t *li work_t w = { W_LISTEN, hl }; work_push(hs, w); } + return hl; } @@ -1048,3 +1185,53 @@ static qd_lws_listener_t *wsi_listener(struct lws *wsi) { } return hl; } + +void qd_http_add_alloc_metric(const char *name, const qd_alloc_type_desc_t *desc) +{ + allocator_metric_definition_t *md = qd_malloc(sizeof(allocator_metric_definition_t)); + ZERO(md); + DEQ_ITEM_INIT(md); + // name and desc remain valid until qd_http_remove_alloc_metric() is called + md->name = name; + md->desc = desc; + DEQ_INSERT_TAIL(allocator_metrics, md); + +#ifdef NDEBUG + // Attempting to add a metric after the server threads have started will crash stuff. If you hit this assert then + // qd_alloc_initialize() has not been called. qd_alloc_initialize() MUST be called before starting the http threads! + assert(threads_running == 0); +#endif +} + +void qd_http_remove_alloc_metric(const char *name) +{ + allocator_metric_definition_t *md = DEQ_HEAD(allocator_metrics); + DEQ_FIND(md, strcmp(md->name, name) == 0); + if (md) { + DEQ_REMOVE(allocator_metrics, md); + free(md); + } + +#ifndef NDEBUG + // Attempting to remove a metric while the server threads are running will crash stuff. If you hit this assert then + // qd_alloc_finalize() has been called prior to stopping all http threads. qd_alloc_finalize() MUST NOT be called + // while http threads are running! + assert(threads_running == 0); +#endif +} + +// allocate a new stats_request_state_t instance, include buffer_size additional octets past the structure for rendering +// the HTML response +// +static stats_request_state_t *new_stats_request_state(size_t buffer_size) +{ + stats_request_state_t *state = qd_malloc(sizeof(stats_request_state_t) + buffer_size); + ZERO(state); // do not bother initializing buffer space - it will be overwritten + state->buffer_size = buffer_size; + return state; +} + +static void free_stats_request_state(stats_request_state_t *state) +{ + free(state); +} diff --git a/src/http.h b/src/http.h index fffa2f671..771396345 100644 --- a/src/http.h +++ b/src/http.h @@ -45,4 +45,9 @@ qd_lws_listener_t *qd_http_server_listen(qd_http_server_t *s, struct qd_listener */ void qd_lws_listener_close(qd_lws_listener_t *hl); +/* register/deregister alloc_pool metrics (thread safe) */ +struct qd_alloc_type_desc_t; +void qd_http_add_alloc_metric(const char *name, const struct qd_alloc_type_desc_t *desc); +void qd_http_remove_alloc_metric(const char *name); + #endif // QD_HTTP_H diff --git a/tests/system_tests_http.py b/tests/system_tests_http.py index 4f3afac2d..38aad2e99 100644 --- a/tests/system_tests_http.py +++ b/tests/system_tests_http.py @@ -20,16 +20,20 @@ import os import threading import ssl -from subprocess import PIPE, STDOUT from urllib.request import urlopen, build_opener, HTTPSHandler from urllib.error import HTTPError, URLError import skupper_router_site -from system_test import TIMEOUT, Process, QdManager, retry +from system_test import Process, QdManager, retry from system_test import TestCase, Qdrouterd, main_module, DIR from system_test import unittest +# +# Note: these tests exercise the management interface accessed via HTTP. These +# tests have nothing to do with the HTTP adaptors! +# + class RouterTestHttp(TestCase): @@ -61,18 +65,6 @@ def get_cert(cls, url): opener = build_opener(HTTPSHandler(context=context)) return opener.open(url).read().decode('utf-8') - def run_skmanage(self, cmd, input=None, expect=Process.EXIT_OK, address=None): - p = self.popen( - ['skmanage'] + cmd.split(' ') + ['--bus', address or self.address(), '--indent=-1', '--timeout', str(TIMEOUT)], - stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect, - universal_newlines=True) - out = p.communicate(input)[0] - try: - p.teardown() - except Exception as e: - raise Exception(out if out else str(e)) - return out - def assert_get(self, url): self.assertEqual('HTTP test\n', self.get("%s/system_tests_http.txt" % url)) @@ -202,26 +194,62 @@ def run(self): self.assertRaises(URLError, urlopen, "https://localhost:%d/nosuch" % r.ports[0]) def test_http_metrics(self): + """ Verify the prometheus metrics provided by the router """ + metrics_ports = [self.get_port(), self.get_port()] config = Qdrouterd.Config([ ('router', {'id': 'QDR.METRICS'}), - ('listener', {'port': self.get_port(), 'http': 'yes'}), - ('listener', {'port': self.get_port(), 'httpRootDir': os.path.dirname(__file__)}), + ('listener', {'role': 'normal', 'port': self.get_port()}), + ('listener', {'port': metrics_ports[0], 'http': 'yes'}), + ('listener', {'port': metrics_ports[1], 'httpRootDir': os.path.dirname(__file__)}), ]) r = self.qdrouterd('metrics-test-router', config) - def test(port): - result = urlopen("http://localhost:%d/metrics" % port, cafile=self.ssl_file('ca-certificate.pem')) - self.assertEqual(200, result.getcode()) - data = result.read().decode('utf-8') - assert 'connections' in data - assert 'deliveries_ingress' in data - assert 'deliveries_delayed_1sec' in data - assert 'deliveries_delayed_10sec' in data - assert 'deliveries_redirected_to_fallback' in data + # generate a list of all metric names expected to be provided via HTTP: + + stat_names = ["qdr_connections_total", "qdr_links_total", + "qdr_addresses_total", "qdr_routers_total", + "qdr_auto_links_total", + "qdr_presettled_deliveries_total", + "qdr_dropped_presettled_deliveries_total", + "qdr_accepted_deliveries_total", + "qdr_released_deliveries_total", + "qdr_rejected_deliveries_total", + "qdr_modified_deliveries_total", + "qdr_deliveries_ingress_total", + "qdr_deliveries_egress_total", + "qdr_deliveries_transit_total", + "qdr_deliveries_ingress_route_container_total", + "qdr_deliveries_egress_route_container_total", + "qdr_deliveries_delayed_1sec_total", + "qdr_deliveries_delayed_10sec_total", + "qdr_deliveries_stuck_total", + "qdr_links_blocked_total", + "qdr_deliveries_redirected_to_fallback_total"] + for stat in r.management.query(type="io.skupper.router.allocator").get_dicts(): + stat_names.append(stat['typeName']) + + def _test(stat_names, port): + # sanity check that all expected stats are reported + resp = urlopen(f"http://localhost:{port}/metrics", cafile=self.ssl_file('ca-certificate.pem')) + self.assertEqual(200, resp.getcode()) + metrics = [x for x in resp.read().decode('utf-8').splitlines() if not x.startswith("#")] + + # Verify that all expected stats are reported by the metrics URL + + for name in stat_names: + found = False + for metric in metrics: + # remove the counter and strip the allocator name suffix + # (if present) + mname = metric.strip().split()[0].split(':')[0] + if mname == name: + found = True + break + self.assertTrue(found, f"Did not find {name} in returned metrics!") # Sequential calls on multiple ports - for port in r.ports: - test(port) + for port in metrics_ports: + _test(stat_names, port) # Concurrent calls on multiple ports class TestThread(threading.Thread): @@ -232,10 +260,11 @@ def __init__(self, port): def run(self): try: - test(self.port) + _test(stat_names, self.port) except Exception as e: self.ex = e - threads = [TestThread(p) for p in r.ports + r.ports] + + threads = [TestThread(p) for p in metrics_ports * 4] for t in threads: t.join() for t in threads: