diff --git a/include/qpid/dispatch/log.h b/include/qpid/dispatch/log.h index 8d9d7905f..6f3cf4439 100644 --- a/include/qpid/dispatch/log.h +++ b/include/qpid/dispatch/log.h @@ -56,6 +56,7 @@ typedef enum { LOG_HTTP_ADAPTOR, LOG_FLOW_LOG, LOG_ADDRESS_WATCH, + LOG_HTTP1_OBSERVER, LOG_DEFAULT } qd_log_module_t; diff --git a/python/skupper_router/management/skrouter.json b/python/skupper_router/management/skrouter.json index e2c2c64b6..8da24cbfd 100644 --- a/python/skupper_router/management/skrouter.json +++ b/python/skupper_router/management/skrouter.json @@ -1366,6 +1366,7 @@ "HTTP_ADAPTOR", "FLOW_LOG", "ADDRESS_WATCH", + "HTTP1_OBSERVER", "DEFAULT" ], "required": true, diff --git a/src/adaptors/tcp/tcp_adaptor.c b/src/adaptors/tcp/tcp_adaptor.c index e0be3b1ed..b17e44df4 100644 --- a/src/adaptors/tcp/tcp_adaptor.c +++ b/src/adaptors/tcp/tcp_adaptor.c @@ -489,11 +489,6 @@ static void free_connection_IO(void *context) sys_mutex_unlock(&conn->activation_lock); // Do NOT free the core_activation lock since the core may be holding it - if (conn->observer_handle) { - qdpo_end(conn->observer_handle); - conn->observer_handle = 0; - } - if (conn->common.parent) { if (conn->common.parent->context_type == TL_LISTENER) { qd_tcp_listener_t *listener = (qd_tcp_listener_t*) conn->common.parent; @@ -610,10 +605,8 @@ static void close_connection_XSIDE_IO(qd_tcp_connection_t *conn) qdr_link_detach(conn->outbound_link, QD_LOST, 0); } - if (!!conn->core_conn) { - qdr_connection_closed(conn->core_conn); - conn->core_conn = 0; - qd_connection_counter_dec(QD_PROTOCOL_TCP); + if (conn->observer_handle) { + qdpo_end(conn->observer_handle); } if (!!conn->common.vflow) { @@ -621,6 +614,12 @@ static void close_connection_XSIDE_IO(qd_tcp_connection_t *conn) vflow_end_record(conn->common.vflow); } + if (!!conn->core_conn) { + qdr_connection_closed(conn->core_conn); + conn->core_conn = 0; + qd_connection_counter_dec(QD_PROTOCOL_TCP); + } + qd_tls_free2(conn->tls); qd_tls_domain_decref(conn->tls_domain); free(conn->alpn_protocol); @@ -632,8 +631,9 @@ static void close_connection_XSIDE_IO(qd_tcp_connection_t *conn) conn->outbound_link = 0; conn->outbound_stream = 0; conn->outbound_delivery = 0; - conn->core_conn = 0; + conn->observer_handle = 0; conn->common.vflow = 0; + conn->core_conn = 0; conn->tls = 0; conn->tls_domain = 0; diff --git a/src/decoders/http1/http1_decoder.c b/src/decoders/http1/http1_decoder.c index 9231730da..63ac28f70 100644 --- a/src/decoders/http1/http1_decoder.c +++ b/src/decoders/http1/http1_decoder.c @@ -209,8 +209,7 @@ static void parser_error(qd_http1_decoder_connection_t *hconn, const char *reaso hconn->parse_error = reason; decoder_new_state(&hconn->client, HTTP1_DECODE_ERROR); decoder_new_state(&hconn->server, HTTP1_DECODE_ERROR); - if (hconn->config->protocol_error) - hconn->config->protocol_error(hconn, reason); + hconn->config->protocol_error(hconn, reason); } } @@ -439,10 +438,12 @@ static bool message_done(qd_http1_decoder_connection_t *hconn, decoder_t *decode } // signal the message receive is complete - int rc = hconn->config->message_done(hconn, hrs->user_context, decoder->is_client); - if (rc) { - parser_error(hconn, "message_done callback failed"); - return false; + if (hconn->config->message_done) { + int rc = hconn->config->message_done(hconn, hrs->user_context, decoder->is_client); + if (rc) { + parser_error(hconn, "message_done callback failed"); + return false; + } } decoder_reset(decoder); // resets decode state to parse_request or parse_response @@ -619,10 +620,12 @@ static bool headers_done(qd_http1_decoder_connection_t *hconn, struct decoder_t h1_decode_request_state_t *hrs = decoder->hrs; assert(hrs); - int rc = hconn->config->rx_headers_done(hconn, hrs->user_context, decoder->is_client); - if (rc) { - parser_error(hconn, "rx_headers_done callback failed"); - return false; + if (hconn->config->rx_headers_done) { + int rc = hconn->config->rx_headers_done(hconn, hrs->user_context, decoder->is_client); + if (rc) { + parser_error(hconn, "rx_headers_done callback failed"); + return false; + } } // Determine if a body is present. See RFC9112 Message Body Length @@ -783,11 +786,13 @@ static bool parse_header(qd_http1_decoder_connection_t *hconn, decoder_t *decode truncate_whitespace(value); } - assert(decoder->hrs); - int rc = hconn->config->rx_header(hconn, decoder->hrs->user_context, decoder->is_client, key, value); - if (rc) { - parser_error(hconn, "rx_header callback failed"); - return false; + if (hconn->config->rx_header) { + assert(decoder->hrs); + int rc = hconn->config->rx_header(hconn, decoder->hrs->user_context, decoder->is_client, key, value); + if (rc) { + parser_error(hconn, "rx_header callback failed"); + return false; + } } if (!process_header(hconn, decoder, key, value)) diff --git a/src/decoders/http1/http1_decoder.h b/src/decoders/http1/http1_decoder.h index ff75d3dd7..babd7d751 100644 --- a/src/decoders/http1/http1_decoder.h +++ b/src/decoders/http1/http1_decoder.h @@ -78,14 +78,14 @@ struct qd_http1_decoder_config_t { uint32_t version_major, uint32_t version_minor); - // Invoked for each HTTP header received. from_client is true if the header was read from the stream sent by the - // client, false if the stream is sent from the server. Neither key nor value is preserved on return from this - // call. The callback must copy the data associated with these values if they need to be saved. + // (Optional) invoked for each HTTP header received. from_client is true if the header was read from the stream sent + // by the client, false if the stream is sent from the server. Neither key nor value is preserved on return from + // this call. The callback must copy the data associated with these values if they need to be saved. // int (*rx_header)(qd_http1_decoder_connection_t *hconn, uintptr_t request_context, bool from_client, const char *key, const char *value); - // Invoked after the last HTTP header (after the last rx_header() callback for this message). + // (Optional) invoked after the last HTTP header (after the last rx_header() callback for this message). // int (*rx_headers_done)(qd_http1_decoder_connection_t *hconn, uintptr_t request_context, bool from_client); diff --git a/src/log.c b/src/log.c index 46c19b12a..5b96e8318 100644 --- a/src/log.c +++ b/src/log.c @@ -44,7 +44,7 @@ #define TEXT_MAX QD_LOG_TEXT_MAX #define LOG_MAX (QD_LOG_TEXT_MAX+128) #define LIST_MAX 1000 -#define NUM_LOG_SOURCES 20 +#define NUM_LOG_SOURCES 21 const char *QD_LOG_STATS_TYPE = "logStats"; @@ -58,7 +58,8 @@ int qd_log_max_len(void) const char *log_module_names[] = {"ROUTER", "ROUTER_CORE", "ROUTER_HELLO", "ROUTER_LS", "ROUTER_MA", "MESSAGE", "SERVER", "AGENT", "CONTAINER", "ERROR", "POLICY", "HTTP", "CONN_MGR", "PYTHON", "PROTOCOL", - "TCP_ADAPTOR", "HTTP_ADAPTOR", "FLOW_LOG", "ADDRESS_WATCH", "DEFAULT"}; + "TCP_ADAPTOR", "HTTP_ADAPTOR", "FLOW_LOG", "ADDRESS_WATCH", "HTTP1_OBSERVER", + "DEFAULT"}; typedef struct qd_log_entry_t qd_log_entry_t; diff --git a/src/observers/http1/http1_observer.c b/src/observers/http1/http1_observer.c index d9e3ffc65..4ede03c98 100644 --- a/src/observers/http1/http1_observer.c +++ b/src/observers/http1/http1_observer.c @@ -19,106 +19,177 @@ #include "observers/private.h" #include "decoders/http1/http1_decoder.h" +#include "qpid/dispatch/alloc_pool.h" +#include "qpid/dispatch/connection_counters.h" #include -/** +struct http1_request_state_t { + DEQ_LINKS(http1_request_state_t); + vflow_record_t *vflow; + bool latency_done:1; // true: latency timing complete +}; +ALLOC_DECLARE(http1_request_state_t); +ALLOC_DEFINE(http1_request_state_t); - ### START RECORD - ### - # connection level record (listener based) - vflow_start_record(VFLOW_RECORD_FLOW, li->vflow); - # connection level record (connector) - vflow_start_record(VFLOW_RECORD_FLOW, connector->vflow); - # Client side request start flow - vflow_start_record(VFLOW_RECORD_FLOW, hconn->vflow) - # Server side request start flow - vflow_start_record(VFLOW_RECORD_FLOW, hconn->vflow) - ### END RECORD - ### - vflow_end_record(hreq->vflow); End of request vflow +/* + * HTTP/1.x decoder callbacks + */ - # connection level flow end - vflow_end_record(hconn->vflow); End of connection vflow - ### ATTRIBUTES CLIENT-FACING CONNECTION LEVEL - ### - ./http1_client.c:213: vflow_set_uint64(hconn->vflow, VFLOW_ATTRIBUTE_OCTETS, 0); - ./http1_client.c:214: vflow_add_rate(hconn->vflow, VFLOW_ATTRIBUTE_OCTETS, VFLOW_ATTRIBUTE_OCTET_RATE); - ./http1_client.c:499: vflow_set_uint64(hconn->vflow, VFLOW_ATTRIBUTE_OCTETS, hconn->in_http1_octets); - ./http1_client.c:597: vflow_set_uint64(hconn->vflow, VFLOW_ATTRIBUTE_OCTETS, hconn->in_http1_octets); - ./http1_client.c:691: qd_set_vflow_netaddr_string(hconn->vflow, hconn->raw_conn, true); - # moves any connection condition to vflow on DISCONNECT: - ./http1_client.c:713: qd_set_condition_on_vflow(hconn->raw_conn, hconn->vflow); +// A new request has arrived, start observing it +// +static int rx_request(qd_http1_decoder_connection_t *hconn, const char *method, const char *target, uint32_t version_major, uint32_t version_minor, uintptr_t *request_context) +{ + qdpo_transport_handle_t *th = (qdpo_transport_handle_t *) qd_http1_decoder_connection_get_context(hconn); + assert(th); + + qd_log(LOG_HTTP1_OBSERVER, QD_LOG_DEBUG, + "[C%" PRIu64 "] HTTP/1.1 observer rx_request(method=%s target=%s version=%"PRIu32".%"PRIu32")", + th->conn_id, method, target, version_major, version_minor); + + http1_request_state_t *hreq = new_http1_request_state_t(); + ZERO(hreq); + DEQ_ITEM_INIT(hreq); + hreq->vflow = vflow_start_record(VFLOW_RECORD_FLOW, th->vflow); + vflow_set_string(hreq->vflow, VFLOW_ATTRIBUTE_PROTOCOL, version_minor == 1 ? "HTTP/1.1" : "HTTP/1.0"); + vflow_set_string(hreq->vflow, VFLOW_ATTRIBUTE_METHOD, method); + vflow_latency_start(hreq->vflow); + hreq->latency_done = false; + DEQ_INSERT_TAIL(th->http1.requests, hreq); + *request_context = (uintptr_t) hreq; + return 0; +} + - ### ATTRIBUTES SERVER-FACING CONNECTION LEVEL - ### - ./http1_server.c:276: vflow_set_uint64(hconn->vflow, VFLOW_ATTRIBUTE_OCTETS, 0); - ./http1_server.c:277: vflow_add_rate(hconn->vflow, VFLOW_ATTRIBUTE_OCTETS, VFLOW_ATTRIBUTE_OCTET_RATE); - ./http1_server.c:603: vflow_set_uint64(hconn->vflow, VFLOW_ATTRIBUTE_OCTETS, hconn->in_http1_octets); - ./http1_server.c:707: vflow_set_uint64(hconn->vflow, VFLOW_ATTRIBUTE_OCTETS, hconn->in_http1_octets); - # gets addr from raw conn - ./http1_server.c:831: qd_set_vflow_netaddr_string(hconn->vflow, hconn->raw_conn, false); - # moves connection condition to vflow on DISCONNECT - ./http1_server.c:856: qd_set_condition_on_vflow(hconn->raw_conn, hconn->vflow); +static int rx_response(qd_http1_decoder_connection_t *hconn, uintptr_t request_context, int status_code, const char *reason_phrase, uint32_t version_major, uint32_t version_minor) +{ + qdpo_transport_handle_t *th = (qdpo_transport_handle_t *) qd_http1_decoder_connection_get_context(hconn); + assert(th); + + qd_log(LOG_HTTP1_OBSERVER, QD_LOG_DEBUG, + "[C%" PRIu64 "] HTTP/1.1 observer rx_response(status=%d reason=%s version=%"PRIu32".%"PRIu32")", + th->conn_id, status_code, reason_phrase, version_major, version_minor); + + http1_request_state_t *hreq = (http1_request_state_t *) request_context; + assert(hreq); + + // stop latency even if the request is not complete (1xx response code) + if (!hreq->latency_done) { + vflow_latency_end(hreq->vflow); + hreq->latency_done = true; + } + + if (status_code / 100 != 1) { // terminal response code + char status_code_str[16]; + snprintf(status_code_str, sizeof(status_code_str), "%d", status_code); + vflow_set_string(hreq->vflow, VFLOW_ATTRIBUTE_RESULT, status_code_str); + vflow_set_string(hreq->vflow, VFLOW_ATTRIBUTE_REASON, reason_phrase); + } + + return 0; +} - ### ATTRIBUTES CLIENT-FACING PER-REQUEST LEVEL - ### - ./http1_client.c:954: vflow_set_uint64(hreq->base.vflow, VFLOW_ATTRIBUTE_OCTETS, 0); - ./http1_client.c:1209: vflow_set_uint64(hreq->base.vflow, VFLOW_ATTRIBUTE_OCTETS, in_octets); +static int transaction_complete(qd_http1_decoder_connection_t *hconn, uintptr_t request_context) +{ + qdpo_transport_handle_t *th = (qdpo_transport_handle_t *) qd_http1_decoder_connection_get_context(hconn); + assert(th); - ./http1_client.c:955: vflow_set_string(hreq->base.vflow, VFLOW_ATTRIBUTE_METHOD, method); - ./http1_client.c:2074: vflow_set_string(hreq->base.vflow, VFLOW_ATTRIBUTE_RESULT, code_str); - ./http1_client.c:2076: vflow_set_string(hreq->base.vflow, VFLOW_ATTRIBUTE_REASON, reason_phrase); + qd_log(LOG_HTTP1_OBSERVER, QD_LOG_DEBUG, "[C%" PRIu64 "] HTTP/1.1 observer transaction complete", th->conn_id); - ./http1_client.c:969: vflow_serialize_identity(hreq->base.vflow, hreq->request_props); + http1_request_state_t *hreq = (http1_request_state_t *) request_context; + assert(hreq); - ./http1_client.c:959: vflow_latency_start(hreq->base.vflow); - ./http1_client.c:1855: vflow_latency_end(hreq->base.vflow); + DEQ_REMOVE(th->http1.requests, hreq); + vflow_end_record(hreq->vflow); + free_http1_request_state_t(hreq); + return 0; +} - ### ATTRIBUTES SERVER-FACING PER-REQUEST LEVEL - ### - ./http1_server.c:1381: vflow_set_uint64(hreq->base.vflow, VFLOW_ATTRIBUTE_OCTETS, in_octets); - ./http1_server.c:1566: vflow_set_uint64(hreq->base.vflow, VFLOW_ATTRIBUTE_OCTETS, 0); - ./http1_server.c:1702: vflow_set_ref_from_parsed(hreq->base.vflow, VFLOW_ATTRIBUTE_COUNTERFLOW, value); +static void protocol_error(qd_http1_decoder_connection_t *hconn, const char *reason) +{ + qdpo_transport_handle_t *th = (qdpo_transport_handle_t *) qd_http1_decoder_connection_get_context(hconn); + assert(th); - ./http1_server.c:1572: vflow_latency_start(hreq->base.vflow); - ./http1_server.c:1119: vflow_latency_end(hreq->base.vflow); + // only complain if we are failing while there are active requests. It may be that the TCP stream is not + // carrying HTTP/1.x data. - ### WHAT ABOUT VFLOW_ATTRIBUTE_PROTOCOL ?? + if (DEQ_SIZE(th->http1.requests)) { + qd_log(LOG_HTTP1_OBSERVER, QD_LOG_ERROR, + "[C%" PRIu64 "] HTTP/1.1 observer disabled due to protocol error: %s", th->conn_id, reason); + } else { + qd_log(LOG_HTTP1_OBSERVER, QD_LOG_DEBUG, "[C%" PRIu64 "] HTTP/1.1 observer protocol error: %s", th->conn_id, reason); + } + // An error code will be returned to the http1_observe() call and this observer will be cleaned up there. +} -https://www.rfc-editor.org/rfc/rfc9112.html (HTTP/1.1) -https://www.rfc-editor.org/rfc/rfc1945 (HTTP/1.0) -https://www.rfc-editor.org/rfc/rfc9110 (HTTP Semantics) - */ +static qd_http1_decoder_config_t decoder_config = { + .rx_request = rx_request, + .rx_response = rx_response, + // .rx_header = rx_header, + // .rx_headers_done = rx_headers_done, + // .rx_body = rx_body, + // .message_done = message_done, + .transaction_complete = transaction_complete, + .protocol_error = protocol_error +}; static void http1_observe(qdpo_transport_handle_t *th, bool from_client, const unsigned char *data, size_t length) { - qd_log(LOG_HTTP_ADAPTOR, QD_LOG_DEBUG, - "[C%" PRIu64 "] HTTP/1.1 observer classifying protocol: %zu %s octets", th->conn_id, length, from_client ? "client" : "server"); -} + assert(th->http1.decoder); + int rc = qd_http1_decoder_connection_rx_data(th->http1.decoder, from_client, data, length); + if (rc) { + qdpo_http1_final(th); + } +} void qdpo_http1_init(qdpo_transport_handle_t *th) { - qd_log(LOG_HTTP_ADAPTOR, QD_LOG_DEBUG, "[C%" PRIu64 "] HTTP/1.1 observer initialized", th->conn_id); + qd_log(LOG_HTTP1_OBSERVER, QD_LOG_DEBUG, "[C%" PRIu64 "] HTTP/1.1 observer initialized", th->conn_id); th->protocol = QD_PROTOCOL_HTTP1; th->observe = http1_observe; - th->http1.tbd = 42; // whatever; + memset(&th->http1, 0, sizeof(th->http1)); + DEQ_INIT(th->http1.requests); + th->http1.decoder = qd_http1_decoder_connection(&decoder_config, (uintptr_t) th); + + // adjust the router's active protocol connection counter since we are replacing the parent TCP connection with an + // HTTP/1.x connection + qd_connection_counter_dec(QD_PROTOCOL_TCP); + qd_connection_counter_inc(QD_PROTOCOL_HTTP1); } + void qdpo_http1_final(qdpo_transport_handle_t *th) { - qd_log(LOG_HTTP_ADAPTOR, QD_LOG_DEBUG, "[C%" PRIu64 "] HTTP/1.1 observer finalized", th->conn_id); - th->observe = 0; + qd_log(LOG_HTTP1_OBSERVER, QD_LOG_DEBUG, "[C%" PRIu64 "] HTTP/1.1 observer finalized", th->conn_id); + + if (th->observe) { + th->observe = 0; + qd_http1_decoder_connection_free(th->http1.decoder); + th->http1.decoder = 0; + http1_request_state_t *hreq = DEQ_HEAD(th->http1.requests); + while (hreq) { + DEQ_REMOVE_HEAD(th->http1.requests); + if (hreq->vflow) { + vflow_end_record(hreq->vflow); + } + free_http1_request_state_t(hreq); + hreq = DEQ_HEAD(th->http1.requests); + } + + // restore the router's protocol connection counter for the parent TCP connection + qd_connection_counter_dec(QD_PROTOCOL_HTTP1); + qd_connection_counter_inc(QD_PROTOCOL_TCP); + } } diff --git a/src/observers/private.h b/src/observers/private.h index 40f28a0ed..2ddfc79a4 100644 --- a/src/observers/private.h +++ b/src/observers/private.h @@ -50,9 +50,14 @@ struct tcp_observer_state_t { /** * state machine for observing HTTP/1.x */ +typedef struct qd_http1_decoder_connection_t qd_http1_decoder_connection_t; +typedef struct http1_request_state_t http1_request_state_t; +DEQ_DECLARE(http1_request_state_t, http1_request_state_list_t); + typedef struct http1_observer_state_t http1_observer_state_t; struct http1_observer_state_t { - int tbd; + qd_http1_decoder_connection_t *decoder; + http1_request_state_list_t requests; }; diff --git a/src/observers/tcp_observer.c b/src/observers/tcp_observer.c index 85bb0cdf6..83e426899 100644 --- a/src/observers/tcp_observer.c +++ b/src/observers/tcp_observer.c @@ -79,8 +79,6 @@ static void tcp_observe(qdpo_transport_handle_t *th, bool from_client, const uns qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%" PRIu64 "] TCP observer classifying protocol: %zu %s octets", th->conn_id, length, from_client ? "client" : "server"); - // wicked brain-dead classification just for POC - if (from_client) { // fill up the protocol classification prefix buffer @@ -107,27 +105,12 @@ static void tcp_observe(qdpo_transport_handle_t *th, bool from_client, const uns } else { - // HTTP/2.0 check failed, try HTTP/1.x - - // hack for now. probably best to just start handing off the http1 observer and let it - // try to find a proper HTTP/1.x request line. - - const char * const http1_commands[] = { - "GET ", "HEAD ", "POST ", "PUT ", "DELETE ", 0 - }; - - for (int i = 0; http1_commands[i] != 0; ++i) { - size_t cmd_len = strlen(http1_commands[i]); - size_t to_match = MIN(th->tcp.prefix_len, cmd_len); - if (memcmp(th->tcp.prefix, http1_commands[i], to_match) == 0) { - if (to_match == cmd_len) { - activate_inner(th, QD_PROTOCOL_HTTP1, data, length); - return; - } else { - return; // partial match need more - } - } - } + // HTTP/2.0 check failed. Currently the only other supported protocol is HTTP/1.x so try that + // unconditionally. If the HTTP/1.x observer fails to find HTTP/1.x traffic it will disable itself without + // posting an error. + + activate_inner(th, QD_PROTOCOL_HTTP1, data, length); + return; } } else { // !from_client diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 23a3fba5b..607d0d9c9 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -163,10 +163,11 @@ foreach(py_test_module system_tests_log_level_update system_tests_routing_protocol system_tests_open_properties - system_tests_http2 + system_tests_http2 system_tests_grpc system_tests_http1_adaptor system_tests_http1_over_tcp + system_tests_http1_observer system_tests_tcp_adaptor system_tests_tcp_adaptor_tls system_tests_http2_tls diff --git a/tests/http1-data/cgi-bin/script.py b/tests/http1-data/cgi-bin/script.py new file mode 100755 index 000000000..a5c828c54 --- /dev/null +++ b/tests/http1-data/cgi-bin/script.py @@ -0,0 +1,27 @@ +#!/usr/bin/env python3 +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Dummy CGI script for handling HTTP/1.1 POST requests + +import sys + +body = "

Dummy CGI output!

" +sys.stdout.write(f"content-length: {len(body)}\r\n\r\n") +sys.stdout.write(body) diff --git a/tests/http1-data/index.html b/tests/http1-data/index.html new file mode 100644 index 000000000..93a0a7ea8 --- /dev/null +++ b/tests/http1-data/index.html @@ -0,0 +1,27 @@ + + + + + HTTP/1.x Test Content + + +

+ This directory contains content used by the HTTP/1.x tests. +

+ + diff --git a/tests/http1_relay.c b/tests/http1_relay.c index 71b4b3205..ac4741189 100644 --- a/tests/http1_relay.c +++ b/tests/http1_relay.c @@ -228,7 +228,7 @@ void protocol_error(qd_http1_decoder_connection_t *hconn, const char *reason) } -qd_http1_decoder_config_t decoder_config = { +static qd_http1_decoder_config_t decoder_config = { .rx_request = rx_request, .rx_response = rx_response, .rx_header = rx_header, diff --git a/tests/system_test.py b/tests/system_test.py index f4a88b6eb..821f4b34a 100755 --- a/tests/system_test.py +++ b/tests/system_test.py @@ -513,12 +513,45 @@ def write(self, name, suffix=".conf"): return name -class HttpServer(Process): - def __init__(self, args, name=None, expect=Process.RUNNING, **kwargs): - super(HttpServer, self).__init__(args, name=name, expect=expect, **kwargs) +class Http1Server(Process): + """ + Run the Python library http.server as a background process. + Default content can be found in the http1-data subdirectory. + """ + def __init__(self, port, name=None, expect=Process.RUNNING, **kwargs): + name = name or "http.server" + kwargs.setdefault('stdin', subprocess.DEVNULL) # no input accepted + kwargs.setdefault('directory', os.path.join(current_dir, "http1-data")) + + args = [sys.executable, + "-m", "http.server", + "-d", kwargs['directory'], + "--cgi"] + + protocol_supported = sys.version_info >= (3, 11) + if protocol_supported: + kwargs.setdefault('protocol', "HTTP/1.1") + args.append("-p") + args.append(kwargs['protocol']) + elif 'protocol' in kwargs: + raise NotImplementedError("http.server does not support '--protocol' param") + + args.append(str(port)) + + # remove keywords not used by super class + kwargs.pop('directory') + if 'protocol' in kwargs: + kwargs.pop('protocol') + super(Http1Server, self).__init__(args, name=name, expect=expect, **kwargs) + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + self.teardown() -class Http2Server(HttpServer): +class Http2Server(Process): """A HTTP2 Server that will respond to requests made via the router.""" def __init__(self, name=None, listen_port=None, wait=True, diff --git a/tests/system_tests_http1_observer.py b/tests/system_tests_http1_observer.py new file mode 100644 index 000000000..f8542766f --- /dev/null +++ b/tests/system_tests_http1_observer.py @@ -0,0 +1,237 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# +# Test the HTTP/1.x Protocol observer +# + +import sys + +from system_test import TestCase, unittest, main_module, Qdrouterd +from system_test import curl_available, run_curl +from system_test import nginx_available, NginxServer +from system_test import Http1Server, retry, TIMEOUT +from vanflow_snooper import VFlowSnooperThread + + +def spawn_nginx(port, tester): + """ + Spawn the Nginx server listening on port + """ + env = dict() + env['nginx-base-folder'] = NginxServer.BASE_FOLDER + env['setupclass-folder'] = tester.directory + env['nginx-configs-folder'] = NginxServer.CONFIGS_FOLDER + env['listening-port'] = str(port) + env['http2'] = '' # disable HTTP/2 + # disable ssl/tls for now + env['ssl'] = '' + env['tls-enabled'] = '#' # Will comment out TLS configuration lines + + # TBD: TLS stuff + # env['ssl'] = 'ssl' + # env['tls-enabled'] = '' # Will enable TLS lines + # env['chained-pem'] = CHAINED_CERT + # env['server-private-key-no-pass-pem'] = SERVER_PRIVATE_KEY_NO_PASS + # env['ssl-verify-client'] = 'on' + # env['ca-certificate'] = CA_CERT + + return tester.nginxserver(config_path=NginxServer.CONFIG_FILE, env=env) + + +@unittest.skipUnless(nginx_available() and curl_available(), + "Requires both nginx and curl tools") +class Http1ObserverTest(TestCase): + """ + Verify the HTTP/1.x observer produces the expected VanFlow records + """ + @classmethod + def router(cls, name, listener_port, server_port, extra_config=None): + """ + Create a router with a tcpConnector and a tcpListener. Set the logging + config to only enable Van Flow record logs. + """ + config = [ + ('router', {'mode': 'interior', + 'id': name}), + ('listener', {'role': 'normal', + 'port': cls.tester.get_port()}), + + ('tcpListener', {'host': "0.0.0.0", + 'port': listener_port, + 'address': 'Http1ObserverTest'}), + ('tcpConnector', {'host': "localhost", + 'port': server_port, + 'address': 'Http1ObserverTest'}) + ] + + if extra_config is not None: + config.extend(extra_config) + config = Qdrouterd.Config(config) + router = cls.tester.qdrouterd(name, config, wait=False, cl_args=["-T"]) + router.wait_ports() + router.wait_address('Http1ObserverTest', subscribers=1) + return router + + @classmethod + def setUpClass(cls): + """ + Start the HTTP1 servers + """ + super(Http1ObserverTest, cls).setUpClass() + cls.nginx_port = cls.tester.get_port() + cls.nginx_server = spawn_nginx(cls.nginx_port, cls.tester) + cls.http1_port = cls.tester.get_port() + cls.http1_server = cls.tester.cleanup(Http1Server(cls.http1_port)) + + @unittest.skipUnless(sys.version_info >= (3, 11), "Requires HTTP/1.1 support") + def test_01_get(self): + """ + Simple pipelined GET request. + """ + l_port = self.tester.get_port() + router = self.router("test_01", l_port, self.nginx_port) + + snooper_thread = VFlowSnooperThread(router.addresses[0]) + retry(lambda: snooper_thread.sources_ready == 1, delay=0.25) + self.assertEqual(1, snooper_thread.sources_ready, "timed out waiting for router beacon") + + curl_args = [ + '--http1.1', + '-G' + ] + + pages = ['index.html', 't100K.html', 't10K.html', 't1K.html'] + for page in pages: + curl_args.append(f"http://localhost:{l_port}/{page}") + (rc, out, err) = run_curl(args=curl_args) + self.assertEqual(0, rc, f"curl failed: {rc}, {err}, {out}") + + # Expect at least 8 records: + # 1 - Listener + # 1 - Connector + # 1 - TCP Flow + # 1 - TCP Counter-flow + # 4 - HTTP requests + + retry(lambda: snooper_thread.total_records > 7, delay=0.25) + self.assertLess(7, snooper_thread.total_records, f"{snooper_thread.total_records}") + + router.teardown() + snooper_thread.join(timeout=TIMEOUT) + results = snooper_thread.get_results() + + # + # Expect 4 'GET' requests with 200 (OK) status + # + + matches = 0 + self.assertEqual(1, len(results), f"Expected one router entry: {results}") + records = results.popitem()[1] + for record in records: + if 'METHOD' in record: + self.assertEqual('GET', record['METHOD']) + self.assertIn('RESULT', record) + self.assertEqual('200', record['RESULT']) + self.assertIn('REASON', record) + self.assertEqual('OK', record['REASON']) + self.assertIn('PROTOCOL', record) + self.assertEqual('HTTP/1.1', record['PROTOCOL']) + matches += 1 + + self.assertEqual(len(pages), matches, f"unexpected results {results}") + + @unittest.skipUnless(sys.version_info >= (3, 11), "Requires HTTP/1.1 support") + def test_02_post(self): + """ + Simple POST request (chunked). Uses the Http server from the Python + standard library. + + Note: no pipelining POST requests! It is illegal and not supported by + the server. + """ + l_port = self.tester.get_port() + router = self.router("test_02", l_port, self.http1_port) + + snooper_thread = VFlowSnooperThread(router.addresses[0]) + retry(lambda: snooper_thread.sources_ready == 1, delay=0.25) + self.assertEqual(1, snooper_thread.sources_ready, "timed out waiting for router beacon") + + curl_args = [ + '--http1.1', + '-H', "Transfer-Encoding: chunked", + '--data-ascii', "Start", + '--data-ascii', "End", + f"http://localhost:{l_port}/cgi-bin/script.py" + ] + + (rc, out, err) = run_curl(args=curl_args) + self.assertEqual(0, rc, f"curl post failed: {rc}, {err}, {out}") + + # this will pipeline 3 get requests due to the globbing parameter + # 'ignore': + curl_args = [ + '--http1.1', + '-G', + f"http://localhost:{l_port}/index.html?ignore=[1-3]" + ] + + (rc, out, err) = run_curl(args=curl_args) + self.assertEqual(0, rc, f"curl get failed: {rc}, {err}, {out}") + + # Expect at least 10 records: listener, connector, two tcp flows, two + # counter flows, and 4 HTTP requests: + retry(lambda: snooper_thread.total_records > 9, delay=0.25) + self.assertLess(9, snooper_thread.total_records, f"{snooper_thread.total_records}") + + router.teardown() + snooper_thread.join(timeout=TIMEOUT) + results = snooper_thread.get_results() + + get_count = 0 + post_count = 0 + self.assertEqual(1, len(results), f"Expected one router entry: {results}") + records = results.popitem()[1] + for record in records: + if 'METHOD' in record: + if record['METHOD'] == 'GET': + self.assertIn('RESULT', record) + self.assertEqual('200', record['RESULT']) + self.assertIn('REASON', record) + self.assertEqual('OK', record['REASON']) + self.assertIn('PROTOCOL', record) + self.assertEqual('HTTP/1.1', record['PROTOCOL']) + get_count += 1 + else: + self.assertEqual('POST', record['METHOD']) + self.assertIn('RESULT', record) + self.assertEqual('200', record['RESULT']) + self.assertIn('PROTOCOL', record) + self.assertEqual('HTTP/1.1', record['PROTOCOL']) + # reason is hardcoded by Python Http server: + self.assertIn('REASON', record) + self.assertEqual('Script output follows', record['REASON']) + post_count += 1 + + self.assertEqual(3, get_count, f"{results}") + self.assertEqual(1, post_count, f"{results}") + + +if __name__ == '__main__': + unittest.main(main_module()) diff --git a/tests/vanflow_snooper.py b/tests/vanflow_snooper.py new file mode 100755 index 000000000..37c19fa3a --- /dev/null +++ b/tests/vanflow_snooper.py @@ -0,0 +1,400 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import argparse +import json +import logging +import sys +from enum import IntEnum, unique +from threading import Thread + +from proton.handlers import MessagingHandler +from proton.reactor import Container +from proton import Message + + +# These mappings come directly from the vanflow.h source. They will need to be +# updated as new records/attributes/etc are added +# + +@unique +class RecordTypes(IntEnum): + SITE = 0 + ROUTER = 1 + LINK = 2 + CONTROLLER = 3 + LISTENER = 4 + CONNECTOR = 5 + FLOW = 6 + PROCESS = 7 + IMAGE = 8 + INGRESS = 9 + EGRESS = 10 + COLLECTOR = 11 + PROCESS_GROUP = 12 + HOST = 13 + LOG = 14 + + +@unique +class AttributeTypes(IntEnum): + RECORD_TYPE = 0 + IDENTITY = 1 + PARENT = 2 + START_TIME = 3 + END_TIME = 4 + COUNTERFLOW = 5 + PEER = 6 + PROCESS = 7 + SIBLING_ORDINAL = 8 + LOCATION = 9 + PROVIDER = 10 + PLATFORM = 11 + NAMESPACE = 12 + MODE = 13 + SOURCE_HOST = 14 + DESTINATION_HOST = 15 + PROTOCOL = 16 + SOURCE_PORT = 17 + DESTINATION_PORT = 18 + VAN_ADDRESS = 19 + IMAGE_NAME = 20 + IMAGE_VERSION = 21 + HOST_NAME = 22 + OCTETS = 23 + LATENCY = 24 + TRANSIT_LATENCY = 25 + BACKLOG = 26 + METHOD = 27 + RESULT = 28 + REASON = 29 + NAME = 30 + TRACE = 31 + BUILD_VERSION = 32 + LINK_COST = 33 + DIRECTION = 34 + OCTET_RATE = 35 + OCTETS_OUT = 36 + OCTETS_UNACKED = 37 + WINDOW_CLOSURES = 38 + WINDOW_SIZE = 39 + FLOW_COUNT_L4 = 40 + FLOW_COUNT_L7 = 41 + FLOW_RATE_L4 = 42 + FLOW_RATE_L7 = 43 + DURATION = 44 + IMAGE = 45 + GROUP = 46 + STREAM_ID = 47 + LOG_SEVERITY = 48 + LOG_TEXT = 49 + SOURCE_FILE = 50 + SOURCE_LINE = 51 + + +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.WARNING) + + +def id_2_source(identity): + """ + Identity attributes are in the form :# + + This routine simply returns the base source-id from an identity value. + """ + return identity.split(":")[0] + + +class EventSource: + """ + Events from a given source (router). There is an instance of this class for + each source the snooper has detected. All records emitted by the source is + gathered into the records map. + + The records map is indexed by the record identity and the value is the + record's attributes. + """ + def __init__(self, name, base_address, sender): + self.name = name + self.base_address = base_address + self.sender = sender + self.records = {} # indexed by identifier + self.links_pending = 3 # 1 sender, 1 for events, 1 for flows + logger.debug("New EventSource name=%s base-addr=%s", name, base_address) + + def get_records(self): + """ + Return a list of all records, translating attribute and record type + values to human-friendly names. Each record is a map. + """ + records = [] + for record_id, attributes in self.records.items(): + record = {} + for key, value in attributes.items(): + if key not in AttributeTypes: + raise Exception(f"Missing VanFlow attribute type '{key}'") + if key == AttributeTypes.RECORD_TYPE: + if value not in RecordTypes: + raise Exception(f"Missing VanFlow record type '{value}'") + record[AttributeTypes.RECORD_TYPE.name] = RecordTypes(value).name + else: + record[AttributeTypes(key).name] = value + records.append(record) + return records + + +class VFlowSnooper(MessagingHandler): + ''' + Open a receiver for BEACON messages on the indicated address. + + Use BEACONS to detect event sources (routers). When discovered, + instantiate a new EventSource for that source. Open receiver links to + subscribe to that source's event and flow records. + ''' + def __init__(self, address): + super(VFlowSnooper, self).__init__() + self.address = address # router address + self.conn = None + self.beacon_receiver = None + self.sources = {} + self.heartbeats_seen = 0 + self._error = None + self._sources_subscribed = 0 + self._total_records = 0 + + def on_connection_opened(self, event): + logger.debug("Connection opened") + + def on_connection_closed(self, event): + logger.debug("Connection closed") + + def on_link_opened(self, event): + if event.link.is_sender: + ltype = "sender" + addr = event.link.target.address + else: + ltype = "receiver" + addr = event.link.source.address + logger.debug("%s link opened: %s", ltype, addr) + + for name in self.sources.keys(): + if name in addr: + self.sources[name].links_pending -= 1 + assert self.sources[name].links_pending >= 0 + if self.sources[name].links_pending == 0: + self._sources_subscribed += 1 + logger.debug("%s sources ready", self.sources_ready) + + def on_link_closed(self, event): + if event.link.is_sender: + ltype = "sender" + addr = event.link.target.address + else: + ltype = "receiver" + addr = event.link.source.address + logger.debug("%s link closed: %s", ltype, addr) + + def on_transport_error(self, event): + cond = event.transport.condition + logger.debug("Transport error %s %s %s", cond.name, cond.description, cond.info) + # ignore connection retries errors + if "connection refused" not in cond.description.lower(): + self.exit() # assume router terminated + + def on_start(self, event): + self.container = event.container + self.conn = event.container.connect(self.address) + self.beacon_receiver = event.container.create_receiver(self.conn, 'mc/sfe.all') + + def on_sendable(self, event): + return + + def on_message(self, event): + subject = event.message.subject + if subject == 'BEACON': + self.handle_beacon(event.message) + elif subject == 'HEARTBEAT': + self.handle_heartbeat(event.message) + elif subject == 'RECORD': + self.handle_records(event.message) + else: + self.exit(f"Message received with unknown subject '{subject}'") + + @property + def sources_ready(self): + """ + Total sources detected + """ + return self._sources_subscribed + + @property + def total_records(self): + """ + Total records received from all sources + """ + return self._total_records + + @property + def error(self): + return self._error + + def exit(self, error=None): + self._error = error + self.conn.close() + if self._error is not None: + logger.error("exit error: %s", self._error) + raise Exception(self._error) + + def add_source(self, name, base_address, command_address): + flow_address = f"{base_address}.flows" + self.container.create_receiver(self.conn, base_address) + self.container.create_receiver(self.conn, flow_address) + sender = self.container.create_sender(self.conn, command_address) + self.sources[name] = EventSource(name, base_address, sender) + + def handle_beacon(self, message): + source_id = message.properties['id'] + logger.debug("Beacon from %s", source_id) + + name = id_2_source(source_id) + if name not in self.sources: + self.add_source(name, message.properties['address'], message.properties['direct']) + + def handle_heartbeat(self, message): + source_id = message.properties['id'] + logger.debug("Heartbeat from %s", source_id) + self.heartbeats_seen += 1 + name = id_2_source(source_id) + source = self.sources.get(name) + if source is not None: + if source.sender.credit > 0: + source.sender.send(Message(subject='FLUSH')) + + def handle_records(self, message): + for record in message.body: + identity = record.get(AttributeTypes.IDENTITY) + if identity is None: + err = f"ERROR: received record with no id: {record}" + logger.error(err) + self.exit(err) + source = self.sources.get(id_2_source(identity)) + if source is None: + err = f"ERROR: source {identity} not in sources!!" + logger.error(err) + self.exit(err) + + if identity not in source.records: + logger.debug("New record: %s", identity) + self._total_records += 1 + source.records[identity] = record + else: + source.records[identity].update(record) + + def get_results(self): + """ + Return a map keyed by source-id. Value is a list of records emitted by + that source (in no particular order). + """ + results = {} + if self.error is None: + for source_id, event in self.sources.items(): + results[f"{source_id}:0"] = event.get_records() + return results + + +class VFlowSnooperThread: + """ + Run the vanflow snooper as a Python thread + """ + def __init__(self, address, verbose=False): + if verbose is True: + logger.setLevel(logging.DEBUG) + self.address = address + self._snooper = VFlowSnooper(self.address) + self._thread = Thread(target=self._run) + self._thread.daemon = True + self._thread.start() + + def _run(self): + self._snooper = VFlowSnooper(self.address) + cid = f"vanflow-snooper-{self.address}" + try: + Container(self._snooper, container_id=cid).run() + except Exception as exc: + pass # caller must check error property + + def join(self, timeout): + self._thread.join(timeout) + if self._thread.is_alive(): + raise Exception("VFlowSnooperThread failed to join!") + + def get_results(self): + assert not self._thread.is_alive() + return self._snooper.get_results() + + @property + def total_records(self): + """ + Return total number of records received to date. + """ + return self._snooper.total_records + + @property + def sources_ready(self): + """ + Total sources detected + """ + return self._snooper.sources_ready + + @property + def error(self): + return self._snooper.error + + +def main(): + parser = argparse.ArgumentParser(description="Display Vanflow Records") + parser.add_argument("-a", "--address", + help="Address of the router", + type=str, + default="localhost:5672") + parser.add_argument("-d", "--debug", help="Verbose logging", + action='store_true') + args = parser.parse_args() + if args.debug: + logger.setLevel(logging.DEBUG) + + snooper = VFlowSnooper(args.address) + cid = f"vanflow-snooper-{args.address}" + try: + Container(snooper, container_id=cid).run() + except KeyboardInterrupt: + pass + + if snooper.error: + print(f"ERROR: {snooper.error}", sys.stderr) + return 1 + + results = json.dumps(snooper.get_results(), indent=2, sort_keys=True) + print(f"{results}") + return 0 + + +if __name__ == "__main__": + sys.exit(main())