From 5b3d6a2a37fa2c9d5e910058dfd55575de46c764 Mon Sep 17 00:00:00 2001 From: Kenneth Giusti Date: Tue, 7 May 2024 09:38:03 -0400 Subject: [PATCH] Issue #1498: Initial HTTP/1.x protocol observer --- include/qpid/dispatch/log.h | 1 + src/adaptors/tcp/tcp_adaptor.c | 20 +-- src/decoders/http1/http1_decoder.c | 35 ++-- src/decoders/http1/http1_decoder.h | 8 +- src/log.c | 5 +- src/observers/http1/http1_observer.c | 238 ++++++++++++++++++--------- src/observers/private.h | 7 +- src/observers/tcp_observer.c | 29 +--- tests/CMakeLists.txt | 3 +- tests/http1_relay.c | 2 +- tests/system_tests_http1_observer.py | 167 +++++++++++++++++++ 11 files changed, 381 insertions(+), 134 deletions(-) create mode 100644 tests/system_tests_http1_observer.py diff --git a/include/qpid/dispatch/log.h b/include/qpid/dispatch/log.h index 8d9d7905f..c158db6cb 100644 --- a/include/qpid/dispatch/log.h +++ b/include/qpid/dispatch/log.h @@ -56,6 +56,7 @@ typedef enum { LOG_HTTP_ADAPTOR, LOG_FLOW_LOG, LOG_ADDRESS_WATCH, + LOG_HTTP_OBSERVER, LOG_DEFAULT } qd_log_module_t; diff --git a/src/adaptors/tcp/tcp_adaptor.c b/src/adaptors/tcp/tcp_adaptor.c index e0be3b1ed..b17e44df4 100644 --- a/src/adaptors/tcp/tcp_adaptor.c +++ b/src/adaptors/tcp/tcp_adaptor.c @@ -489,11 +489,6 @@ static void free_connection_IO(void *context) sys_mutex_unlock(&conn->activation_lock); // Do NOT free the core_activation lock since the core may be holding it - if (conn->observer_handle) { - qdpo_end(conn->observer_handle); - conn->observer_handle = 0; - } - if (conn->common.parent) { if (conn->common.parent->context_type == TL_LISTENER) { qd_tcp_listener_t *listener = (qd_tcp_listener_t*) conn->common.parent; @@ -610,10 +605,8 @@ static void close_connection_XSIDE_IO(qd_tcp_connection_t *conn) qdr_link_detach(conn->outbound_link, QD_LOST, 0); } - if (!!conn->core_conn) { - qdr_connection_closed(conn->core_conn); - conn->core_conn = 0; - qd_connection_counter_dec(QD_PROTOCOL_TCP); + if (conn->observer_handle) { + qdpo_end(conn->observer_handle); } if (!!conn->common.vflow) { @@ -621,6 +614,12 @@ static void close_connection_XSIDE_IO(qd_tcp_connection_t *conn) vflow_end_record(conn->common.vflow); } + if (!!conn->core_conn) { + qdr_connection_closed(conn->core_conn); + conn->core_conn = 0; + qd_connection_counter_dec(QD_PROTOCOL_TCP); + } + qd_tls_free2(conn->tls); qd_tls_domain_decref(conn->tls_domain); free(conn->alpn_protocol); @@ -632,8 +631,9 @@ static void close_connection_XSIDE_IO(qd_tcp_connection_t *conn) conn->outbound_link = 0; conn->outbound_stream = 0; conn->outbound_delivery = 0; - conn->core_conn = 0; + conn->observer_handle = 0; conn->common.vflow = 0; + conn->core_conn = 0; conn->tls = 0; conn->tls_domain = 0; diff --git a/src/decoders/http1/http1_decoder.c b/src/decoders/http1/http1_decoder.c index 9231730da..63ac28f70 100644 --- a/src/decoders/http1/http1_decoder.c +++ b/src/decoders/http1/http1_decoder.c @@ -209,8 +209,7 @@ static void parser_error(qd_http1_decoder_connection_t *hconn, const char *reaso hconn->parse_error = reason; decoder_new_state(&hconn->client, HTTP1_DECODE_ERROR); decoder_new_state(&hconn->server, HTTP1_DECODE_ERROR); - if (hconn->config->protocol_error) - hconn->config->protocol_error(hconn, reason); + hconn->config->protocol_error(hconn, reason); } } @@ -439,10 +438,12 @@ static bool message_done(qd_http1_decoder_connection_t *hconn, decoder_t *decode } // signal the message receive is complete - int rc = hconn->config->message_done(hconn, hrs->user_context, decoder->is_client); - if (rc) { - parser_error(hconn, "message_done callback failed"); - return false; + if (hconn->config->message_done) { + int rc = hconn->config->message_done(hconn, hrs->user_context, decoder->is_client); + if (rc) { + parser_error(hconn, "message_done callback failed"); + return false; + } } decoder_reset(decoder); // resets decode state to parse_request or parse_response @@ -619,10 +620,12 @@ static bool headers_done(qd_http1_decoder_connection_t *hconn, struct decoder_t h1_decode_request_state_t *hrs = decoder->hrs; assert(hrs); - int rc = hconn->config->rx_headers_done(hconn, hrs->user_context, decoder->is_client); - if (rc) { - parser_error(hconn, "rx_headers_done callback failed"); - return false; + if (hconn->config->rx_headers_done) { + int rc = hconn->config->rx_headers_done(hconn, hrs->user_context, decoder->is_client); + if (rc) { + parser_error(hconn, "rx_headers_done callback failed"); + return false; + } } // Determine if a body is present. See RFC9112 Message Body Length @@ -783,11 +786,13 @@ static bool parse_header(qd_http1_decoder_connection_t *hconn, decoder_t *decode truncate_whitespace(value); } - assert(decoder->hrs); - int rc = hconn->config->rx_header(hconn, decoder->hrs->user_context, decoder->is_client, key, value); - if (rc) { - parser_error(hconn, "rx_header callback failed"); - return false; + if (hconn->config->rx_header) { + assert(decoder->hrs); + int rc = hconn->config->rx_header(hconn, decoder->hrs->user_context, decoder->is_client, key, value); + if (rc) { + parser_error(hconn, "rx_header callback failed"); + return false; + } } if (!process_header(hconn, decoder, key, value)) diff --git a/src/decoders/http1/http1_decoder.h b/src/decoders/http1/http1_decoder.h index ff75d3dd7..babd7d751 100644 --- a/src/decoders/http1/http1_decoder.h +++ b/src/decoders/http1/http1_decoder.h @@ -78,14 +78,14 @@ struct qd_http1_decoder_config_t { uint32_t version_major, uint32_t version_minor); - // Invoked for each HTTP header received. from_client is true if the header was read from the stream sent by the - // client, false if the stream is sent from the server. Neither key nor value is preserved on return from this - // call. The callback must copy the data associated with these values if they need to be saved. + // (Optional) invoked for each HTTP header received. from_client is true if the header was read from the stream sent + // by the client, false if the stream is sent from the server. Neither key nor value is preserved on return from + // this call. The callback must copy the data associated with these values if they need to be saved. // int (*rx_header)(qd_http1_decoder_connection_t *hconn, uintptr_t request_context, bool from_client, const char *key, const char *value); - // Invoked after the last HTTP header (after the last rx_header() callback for this message). + // (Optional) invoked after the last HTTP header (after the last rx_header() callback for this message). // int (*rx_headers_done)(qd_http1_decoder_connection_t *hconn, uintptr_t request_context, bool from_client); diff --git a/src/log.c b/src/log.c index 46c19b12a..a07a298a5 100644 --- a/src/log.c +++ b/src/log.c @@ -44,7 +44,7 @@ #define TEXT_MAX QD_LOG_TEXT_MAX #define LOG_MAX (QD_LOG_TEXT_MAX+128) #define LIST_MAX 1000 -#define NUM_LOG_SOURCES 20 +#define NUM_LOG_SOURCES 21 const char *QD_LOG_STATS_TYPE = "logStats"; @@ -58,7 +58,8 @@ int qd_log_max_len(void) const char *log_module_names[] = {"ROUTER", "ROUTER_CORE", "ROUTER_HELLO", "ROUTER_LS", "ROUTER_MA", "MESSAGE", "SERVER", "AGENT", "CONTAINER", "ERROR", "POLICY", "HTTP", "CONN_MGR", "PYTHON", "PROTOCOL", - "TCP_ADAPTOR", "HTTP_ADAPTOR", "FLOW_LOG", "ADDRESS_WATCH", "DEFAULT"}; + "TCP_ADAPTOR", "HTTP_ADAPTOR", "FLOW_LOG", "ADDRESS_WATCH", "HTTP_OBSERVER", + "DEFAULT"}; typedef struct qd_log_entry_t qd_log_entry_t; diff --git a/src/observers/http1/http1_observer.c b/src/observers/http1/http1_observer.c index d9e3ffc65..98be901ac 100644 --- a/src/observers/http1/http1_observer.c +++ b/src/observers/http1/http1_observer.c @@ -19,106 +19,190 @@ #include "observers/private.h" #include "decoders/http1/http1_decoder.h" +#include "qpid/dispatch/alloc_pool.h" +#include "qpid/dispatch/connection_counters.h" #include +struct http1_request_state_t { + DEQ_LINKS(http1_request_state_t); + vflow_record_t *vflow; + bool latency_done:1; // true: latency timing complete +}; +ALLOC_DECLARE(http1_request_state_t); +ALLOC_DEFINE(http1_request_state_t); + /** - ### START RECORD - ### - # connection level record (listener based) - vflow_start_record(VFLOW_RECORD_FLOW, li->vflow); - # connection level record (connector) - vflow_start_record(VFLOW_RECORD_FLOW, connector->vflow); - # Client side request start flow - vflow_start_record(VFLOW_RECORD_FLOW, hconn->vflow) - # Server side request start flow - vflow_start_record(VFLOW_RECORD_FLOW, hconn->vflow) - - ### END RECORD - ### - vflow_end_record(hreq->vflow); End of request vflow - - # connection level flow end - vflow_end_record(hconn->vflow); End of connection vflow - - ### ATTRIBUTES CLIENT-FACING CONNECTION LEVEL - ### - ./http1_client.c:213: vflow_set_uint64(hconn->vflow, VFLOW_ATTRIBUTE_OCTETS, 0); - ./http1_client.c:214: vflow_add_rate(hconn->vflow, VFLOW_ATTRIBUTE_OCTETS, VFLOW_ATTRIBUTE_OCTET_RATE); - ./http1_client.c:499: vflow_set_uint64(hconn->vflow, VFLOW_ATTRIBUTE_OCTETS, hconn->in_http1_octets); - ./http1_client.c:597: vflow_set_uint64(hconn->vflow, VFLOW_ATTRIBUTE_OCTETS, hconn->in_http1_octets); - ./http1_client.c:691: qd_set_vflow_netaddr_string(hconn->vflow, hconn->raw_conn, true); - # moves any connection condition to vflow on DISCONNECT: - ./http1_client.c:713: qd_set_condition_on_vflow(hconn->raw_conn, hconn->vflow); - - ### ATTRIBUTES SERVER-FACING CONNECTION LEVEL - ### - ./http1_server.c:276: vflow_set_uint64(hconn->vflow, VFLOW_ATTRIBUTE_OCTETS, 0); - ./http1_server.c:277: vflow_add_rate(hconn->vflow, VFLOW_ATTRIBUTE_OCTETS, VFLOW_ATTRIBUTE_OCTET_RATE); - ./http1_server.c:603: vflow_set_uint64(hconn->vflow, VFLOW_ATTRIBUTE_OCTETS, hconn->in_http1_octets); - ./http1_server.c:707: vflow_set_uint64(hconn->vflow, VFLOW_ATTRIBUTE_OCTETS, hconn->in_http1_octets); - # gets addr from raw conn - ./http1_server.c:831: qd_set_vflow_netaddr_string(hconn->vflow, hconn->raw_conn, false); - # moves connection condition to vflow on DISCONNECT - ./http1_server.c:856: qd_set_condition_on_vflow(hconn->raw_conn, hconn->vflow); - - - ### ATTRIBUTES CLIENT-FACING PER-REQUEST LEVEL - ### - ./http1_client.c:954: vflow_set_uint64(hreq->base.vflow, VFLOW_ATTRIBUTE_OCTETS, 0); - ./http1_client.c:1209: vflow_set_uint64(hreq->base.vflow, VFLOW_ATTRIBUTE_OCTETS, in_octets); - - ./http1_client.c:955: vflow_set_string(hreq->base.vflow, VFLOW_ATTRIBUTE_METHOD, method); - ./http1_client.c:2074: vflow_set_string(hreq->base.vflow, VFLOW_ATTRIBUTE_RESULT, code_str); - ./http1_client.c:2076: vflow_set_string(hreq->base.vflow, VFLOW_ATTRIBUTE_REASON, reason_phrase); - - ./http1_client.c:969: vflow_serialize_identity(hreq->base.vflow, hreq->request_props); - - ./http1_client.c:959: vflow_latency_start(hreq->base.vflow); - ./http1_client.c:1855: vflow_latency_end(hreq->base.vflow); - - ### ATTRIBUTES SERVER-FACING PER-REQUEST LEVEL - ### - ./http1_server.c:1381: vflow_set_uint64(hreq->base.vflow, VFLOW_ATTRIBUTE_OCTETS, in_octets); - ./http1_server.c:1566: vflow_set_uint64(hreq->base.vflow, VFLOW_ATTRIBUTE_OCTETS, 0); - - ./http1_server.c:1702: vflow_set_ref_from_parsed(hreq->base.vflow, VFLOW_ATTRIBUTE_COUNTERFLOW, value); - - ./http1_server.c:1572: vflow_latency_start(hreq->base.vflow); - ./http1_server.c:1119: vflow_latency_end(hreq->base.vflow); - - ### WHAT ABOUT VFLOW_ATTRIBUTE_PROTOCOL ?? - - -https://www.rfc-editor.org/rfc/rfc9112.html (HTTP/1.1) -https://www.rfc-editor.org/rfc/rfc1945 (HTTP/1.0) -https://www.rfc-editor.org/rfc/rfc9110 (HTTP Semantics) + hreq->base.vflow = vflow_start_record(VFLOW_RECORD_FLOW, hconn->vflow); + vflow_set_string(hreq->base.vflow, VFLOW_ATTRIBUTE_METHOD, method); + vflow_set_string(hreq->base.vflow, VFLOW_ATTRIBUTE_RESULT, code_str); + vflow_set_string(hreq->base.vflow, VFLOW_ATTRIBUTE_REASON, reason_phrase); + + vflow_latency_start(hreq->base.vflow); + vflow_latency_end(hreq->base.vflow); + + + + */ +/* + * */ -static void http1_observe(qdpo_transport_handle_t *th, bool from_client, const unsigned char *data, size_t length) +/* + * HTTP/1.x decoder callbacks + */ + + +// A new request has arrived, start observing it +// +static int rx_request(qd_http1_decoder_connection_t *hconn, const char *method, const char *target, uint32_t version_major, uint32_t version_minor, uintptr_t *request_context) { - qd_log(LOG_HTTP_ADAPTOR, QD_LOG_DEBUG, - "[C%" PRIu64 "] HTTP/1.1 observer classifying protocol: %zu %s octets", th->conn_id, length, from_client ? "client" : "server"); + qdpo_transport_handle_t *th = (qdpo_transport_handle_t *) qd_http1_decoder_connection_get_context(hconn); + assert(th); + + qd_log(LOG_HTTP_OBSERVER, QD_LOG_DEBUG, "[C%" PRIu64 "] HTTP/1.1 observer rx_request(method=%s target=%s)", th->conn_id, method, target); + + http1_request_state_t *hreq = new_http1_request_state_t(); + ZERO(hreq); + DEQ_ITEM_INIT(hreq); + hreq->vflow = vflow_start_record(VFLOW_RECORD_FLOW, th->vflow); + vflow_set_string(hreq->vflow, VFLOW_ATTRIBUTE_METHOD, method); + vflow_latency_start(hreq->vflow); + hreq->latency_done = false; + DEQ_INSERT_TAIL(th->http1.requests, hreq); + *request_context = (uintptr_t) hreq; + return 0; } +static int rx_response(qd_http1_decoder_connection_t *hconn, uintptr_t request_context, int status_code, const char *reason_phrase, uint32_t version_major, uint32_t version_minor) +{ + qdpo_transport_handle_t *th = (qdpo_transport_handle_t *) qd_http1_decoder_connection_get_context(hconn); + assert(th); + + qd_log(LOG_HTTP_OBSERVER, QD_LOG_DEBUG, "[C%" PRIu64 "] HTTP/1.1 observer rx_response(status=%d reason=%s)", th->conn_id, status_code, reason_phrase); + + http1_request_state_t *hreq = (http1_request_state_t *) request_context; + assert(hreq); + + // stop latency even if the request is not complete (1xx response code) + if (!hreq->latency_done) { + vflow_latency_end(hreq->vflow); + hreq->latency_done = true; + } + + if (status_code / 100 != 1) { // terminal response code + char status_code_str[16]; + snprintf(status_code_str, sizeof(status_code_str), "%d", status_code); + vflow_set_string(hreq->vflow, VFLOW_ATTRIBUTE_RESULT, status_code_str); + vflow_set_string(hreq->vflow, VFLOW_ATTRIBUTE_REASON, reason_phrase); + } + + return 0; +} + + +static int transaction_complete(qd_http1_decoder_connection_t *hconn, uintptr_t request_context) +{ + qdpo_transport_handle_t *th = (qdpo_transport_handle_t *) qd_http1_decoder_connection_get_context(hconn); + assert(th); + + qd_log(LOG_HTTP_OBSERVER, QD_LOG_DEBUG, "[C%" PRIu64 "] HTTP/1.1 observer transaction complete", th->conn_id); + + http1_request_state_t *hreq = (http1_request_state_t *) request_context; + assert(hreq); + + DEQ_REMOVE(th->http1.requests, hreq); + vflow_end_record(hreq->vflow); + free_http1_request_state_t(hreq); + return 0; +} + + +static void protocol_error(qd_http1_decoder_connection_t *hconn, const char *reason) +{ + qdpo_transport_handle_t *th = (qdpo_transport_handle_t *) qd_http1_decoder_connection_get_context(hconn); + assert(th); + + // only complain if we are failing while there are active requests. It may be that the TCP stream is not + // carrying HTTP/1.x data. + + if (DEQ_SIZE(th->http1.requests)) { + qd_log(LOG_HTTP_OBSERVER, QD_LOG_ERROR, + "[C%" PRIu64 "] HTTP/1.1 observer disabled due to protocol error: %s", th->conn_id, reason); + } else { + qd_log(LOG_HTTP_OBSERVER, QD_LOG_DEBUG, "[C%" PRIu64 "] HTTP/1.1 observer protocol error: %s", th->conn_id, reason); + } + + // An error code will be returned to the http1_observe() call and this observer will be cleaned up there. +} + + +static qd_http1_decoder_config_t decoder_config = { + .rx_request = rx_request, + .rx_response = rx_response, + // .rx_header = rx_header, + // .rx_headers_done = rx_headers_done, + // .rx_body = rx_body, + // .message_done = message_done, + .transaction_complete = transaction_complete, + .protocol_error = protocol_error +}; + + +static void http1_observe(qdpo_transport_handle_t *th, bool from_client, const unsigned char *data, size_t length) +{ + assert(th->http1.decoder); + + int rc = qd_http1_decoder_connection_rx_data(th->http1.decoder, from_client, data, length); + if (rc) { + qdpo_http1_final(th); + } +} + void qdpo_http1_init(qdpo_transport_handle_t *th) { - qd_log(LOG_HTTP_ADAPTOR, QD_LOG_DEBUG, "[C%" PRIu64 "] HTTP/1.1 observer initialized", th->conn_id); + qd_log(LOG_HTTP_OBSERVER, QD_LOG_DEBUG, "[C%" PRIu64 "] HTTP/1.1 observer initialized", th->conn_id); th->protocol = QD_PROTOCOL_HTTP1; th->observe = http1_observe; - th->http1.tbd = 42; // whatever; + memset(&th->http1, 0, sizeof(th->http1)); + DEQ_INIT(th->http1.requests); + th->http1.decoder = qd_http1_decoder_connection(&decoder_config, (uintptr_t) th); + + // adjust the router's active protocol connection counter since we are replacing the parent TCP connection with an + // HTTP/1.x connection + qd_connection_counter_dec(QD_PROTOCOL_TCP); + qd_connection_counter_inc(QD_PROTOCOL_HTTP1); } + void qdpo_http1_final(qdpo_transport_handle_t *th) { - qd_log(LOG_HTTP_ADAPTOR, QD_LOG_DEBUG, "[C%" PRIu64 "] HTTP/1.1 observer finalized", th->conn_id); - th->observe = 0; + qd_log(LOG_HTTP_OBSERVER, QD_LOG_DEBUG, "[C%" PRIu64 "] HTTP/1.1 observer finalized", th->conn_id); + + if (th->observe) { + th->observe = 0; + qd_http1_decoder_connection_free(th->http1.decoder); + th->http1.decoder = 0; + http1_request_state_t *hreq = DEQ_HEAD(th->http1.requests); + while (hreq) { + DEQ_REMOVE_HEAD(th->http1.requests); + if (hreq->vflow) { + vflow_end_record(hreq->vflow); + } + free_http1_request_state_t(hreq); + hreq = DEQ_HEAD(th->http1.requests); + } + + // restore the router's protocol connection counter for the parent TCP connection + qd_connection_counter_dec(QD_PROTOCOL_HTTP1); + qd_connection_counter_inc(QD_PROTOCOL_TCP); + } } diff --git a/src/observers/private.h b/src/observers/private.h index 40f28a0ed..2ddfc79a4 100644 --- a/src/observers/private.h +++ b/src/observers/private.h @@ -50,9 +50,14 @@ struct tcp_observer_state_t { /** * state machine for observing HTTP/1.x */ +typedef struct qd_http1_decoder_connection_t qd_http1_decoder_connection_t; +typedef struct http1_request_state_t http1_request_state_t; +DEQ_DECLARE(http1_request_state_t, http1_request_state_list_t); + typedef struct http1_observer_state_t http1_observer_state_t; struct http1_observer_state_t { - int tbd; + qd_http1_decoder_connection_t *decoder; + http1_request_state_list_t requests; }; diff --git a/src/observers/tcp_observer.c b/src/observers/tcp_observer.c index 85bb0cdf6..83e426899 100644 --- a/src/observers/tcp_observer.c +++ b/src/observers/tcp_observer.c @@ -79,8 +79,6 @@ static void tcp_observe(qdpo_transport_handle_t *th, bool from_client, const uns qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%" PRIu64 "] TCP observer classifying protocol: %zu %s octets", th->conn_id, length, from_client ? "client" : "server"); - // wicked brain-dead classification just for POC - if (from_client) { // fill up the protocol classification prefix buffer @@ -107,27 +105,12 @@ static void tcp_observe(qdpo_transport_handle_t *th, bool from_client, const uns } else { - // HTTP/2.0 check failed, try HTTP/1.x - - // hack for now. probably best to just start handing off the http1 observer and let it - // try to find a proper HTTP/1.x request line. - - const char * const http1_commands[] = { - "GET ", "HEAD ", "POST ", "PUT ", "DELETE ", 0 - }; - - for (int i = 0; http1_commands[i] != 0; ++i) { - size_t cmd_len = strlen(http1_commands[i]); - size_t to_match = MIN(th->tcp.prefix_len, cmd_len); - if (memcmp(th->tcp.prefix, http1_commands[i], to_match) == 0) { - if (to_match == cmd_len) { - activate_inner(th, QD_PROTOCOL_HTTP1, data, length); - return; - } else { - return; // partial match need more - } - } - } + // HTTP/2.0 check failed. Currently the only other supported protocol is HTTP/1.x so try that + // unconditionally. If the HTTP/1.x observer fails to find HTTP/1.x traffic it will disable itself without + // posting an error. + + activate_inner(th, QD_PROTOCOL_HTTP1, data, length); + return; } } else { // !from_client diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 23a3fba5b..607d0d9c9 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -163,10 +163,11 @@ foreach(py_test_module system_tests_log_level_update system_tests_routing_protocol system_tests_open_properties - system_tests_http2 + system_tests_http2 system_tests_grpc system_tests_http1_adaptor system_tests_http1_over_tcp + system_tests_http1_observer system_tests_tcp_adaptor system_tests_tcp_adaptor_tls system_tests_http2_tls diff --git a/tests/http1_relay.c b/tests/http1_relay.c index 71b4b3205..ac4741189 100644 --- a/tests/http1_relay.c +++ b/tests/http1_relay.c @@ -228,7 +228,7 @@ void protocol_error(qd_http1_decoder_connection_t *hconn, const char *reason) } -qd_http1_decoder_config_t decoder_config = { +static qd_http1_decoder_config_t decoder_config = { .rx_request = rx_request, .rx_response = rx_response, .rx_header = rx_header, diff --git a/tests/system_tests_http1_observer.py b/tests/system_tests_http1_observer.py new file mode 100644 index 000000000..aad597385 --- /dev/null +++ b/tests/system_tests_http1_observer.py @@ -0,0 +1,167 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# +# Test the HTTP/1.x Protocol observer +# + +import os +import re + +from system_test import TestCase, unittest, main_module, Qdrouterd +from system_test import curl_available, run_curl +from system_test import nginx_available, NginxServer + + +def filter_flow_log(log_file): + """ + Search log_file for flow log entries. Extract the key=value pairs for each + entry and append them as a map to a list + """ + # FLOW_LOG (loglevel) FLOW [flow-id] [BEGIN] [END] key=value key2=value2..." + pattern = re.compile(r'^FLOW_LOG\s+\([^\s\)]+\)\s+FLOW\s+\[[^\s\]]+\]\s+(.+)') + flow_log = [] + with open(log_file, 'rt') as fp: + for line in fp: + m = pattern.match(line) + if m is None: + continue + + # extract all the key=value elements and store them in a map + try: + flow_log.append(dict([z.split('=') for z in m.group(1).split() if '=' in z])) + except Exception: + pass + + return flow_log + + +def spawn_nginx(port, tester): + """ + Spawn the Nginx server listening on port + """ + env = dict() + env['nginx-base-folder'] = NginxServer.BASE_FOLDER + env['setupclass-folder'] = tester.directory + env['nginx-configs-folder'] = NginxServer.CONFIGS_FOLDER + env['listening-port'] = str(port) + env['http2'] = '' # disable HTTP/2 + # disable ssl/tls for now + env['ssl'] = '' + env['tls-enabled'] = '#' # Will comment out TLS configuration lines + + # TBD: TLS stuff + # env['ssl'] = 'ssl' + # env['tls-enabled'] = '' # Will enable TLS lines + # env['chained-pem'] = CHAINED_CERT + # env['server-private-key-no-pass-pem'] = SERVER_PRIVATE_KEY_NO_PASS + # env['ssl-verify-client'] = 'on' + # env['ca-certificate'] = CA_CERT + + return tester.nginxserver(config_path=NginxServer.CONFIG_FILE, env=env) + + +@unittest.skipUnless(nginx_available() and curl_available(), + "Requires both nginx and curl tools") +class Http1ObserverTest(TestCase): + """ + Verify the HTTP/1.x observer produces the expected VanFlow records + """ + @classmethod + def router(cls, name, listener_port, server_port, extra_config=None): + """ + Create a router with a tcpConnector and a tcpListener. Set the logging + config to only enable Van Flow record logs. + """ + config = [ + ('router', {'mode': 'interior', + 'id': name}), + ('listener', {'role': 'normal', + 'port': cls.tester.get_port()}), + + ('tcpListener', {'host': "0.0.0.0", + 'port': listener_port, + 'address': 'Http1ObserverTest'}), + ('tcpConnector', {'host': "localhost", + 'port': server_port, + 'address': 'Http1ObserverTest'}), + + ('log', {'module': 'DEFAULT', + 'enable': 'warning+', + 'includeTimestamp': 'false', + 'includeSource': 'false', + 'outputFile': os.path.abspath(f"{name}-flow.log")}), + ('log', {'module': 'FLOW_LOG', + 'enable': 'debug+'}) + ] + + if extra_config is not None: + config.extend(extra_config) + config = Qdrouterd.Config(config) + router = cls.tester.qdrouterd(name, config, wait=False) + router.wait_ports() + router.wait_address('Http1ObserverTest', subscribers=1) + return router + + @classmethod + def setUpClass(cls): + """Configure server and listener addresses and ports""" + super(Http1ObserverTest, cls).setUpClass() + cls.nginx_port = cls.tester.get_port() + cls.nginx_server = spawn_nginx(cls.nginx_port, cls.tester) + + def test_01_get(self): + """ + Simple pipelined GET request + """ + l_port = self.tester.get_port() + router = self.router("test_01", l_port, self.nginx_port) + + curl_args = [ + '--http1.1', + '--output', "/dev/null", + '-G' + ] + + pages = ['index.html', 't100K.html', 't10K.html', 't1K.html'] + for page in pages: + curl_args.append(f"http://localhost:{l_port}/{page}") + (rc, _, err) = run_curl(args=curl_args) + self.assertEqual(0, rc, f"curl failed: {err}") + + router.teardown() + + flow_log = filter_flow_log(router.logfile_path) + self.assertTrue(len(flow_log) > 0, "No flow entries in the log file!") + + get_ct = 0 + response_ct = 0 + for record in flow_log: + if 'method' in record: + if record['method'] == 'GET': + get_ct += 1 + if 'result' in record and 'reason' in record: + response_ct += 1 + + self.assertEqual(len(pages), get_ct, f"Expected {len(pages)} GETS: {flow_log}") + self.assertEqual(len(pages), response_ct, f"Expected {len(pages)} responses: {flow_log}") + + +if __name__ == '__main__': + unittest.main(main_module())