diff --git a/include/qpid/dispatch/ctools.h b/include/qpid/dispatch/ctools.h index 18d60d5c4..fc6cc1665 100644 --- a/include/qpid/dispatch/ctools.h +++ b/include/qpid/dispatch/ctools.h @@ -262,5 +262,15 @@ static inline char *qd_strdup(const char *s) return ptr; } +static inline char *qd_strndup(const char *s, size_t n) +{ + assert(s); + char *ptr = strndup(s, n); + if (!ptr) { + perror("qd_strndup"); + abort(); + } + return ptr; +} #endif diff --git a/src/decoders/http1/http1_decoder.c b/src/decoders/http1/http1_decoder.c index 983af2801..2a6fe2e20 100644 --- a/src/decoders/http1/http1_decoder.c +++ b/src/decoders/http1/http1_decoder.c @@ -111,11 +111,6 @@ struct h1_decode_request_state_t { qd_http1_decoder_connection_t *hconn; uintptr_t user_context; -#if 0 // TBD - uint64_t client_octets; // # total octets arriving from client for this request - uint64_t server_octets; // # total octets arriving from server for this response -#endif - int32_t response_code; // sent by server bool head_request:1; // HEAD method diff --git a/src/observers/http1/http1_observer.c b/src/observers/http1/http1_observer.c index 479f4afd6..42cf80634 100644 --- a/src/observers/http1/http1_observer.c +++ b/src/observers/http1/http1_observer.c @@ -27,12 +27,28 @@ struct http1_request_state_t { DEQ_LINKS(http1_request_state_t); vflow_record_t *vflow; - bool latency_done:1; // true: latency timing complete + char *response_phrase; // reason phrase in response msg + int response_status; // result code in response msg + uint64_t client_body_octets; // total bytes received in client request msg body + uint64_t server_body_octets; // total bytes received in server response msg body + bool latency_done:1; // true: vflow latency timing complete }; ALLOC_DECLARE(http1_request_state_t); ALLOC_DEFINE(http1_request_state_t); +static void http1_request_state_free(http1_request_state_t *hreq) +{ + if (hreq) { + if (hreq->vflow) { + vflow_end_record(hreq->vflow); + } + free(hreq->response_phrase); + free_http1_request_state_t(hreq); + } +} + + /* * HTTP/1.x decoder callbacks */ @@ -55,6 +71,10 @@ static int rx_request(qd_http1_decoder_connection_t *hconn, const char *method, hreq->vflow = vflow_start_record(VFLOW_RECORD_BIFLOW_APP, th->vflow); vflow_set_string(hreq->vflow, VFLOW_ATTRIBUTE_PROTOCOL, version_minor == 1 ? "HTTP/1.1" : "HTTP/1.0"); vflow_set_string(hreq->vflow, VFLOW_ATTRIBUTE_METHOD, method); + vflow_set_uint64(hreq->vflow, VFLOW_ATTRIBUTE_OCTETS, 0); + vflow_set_uint64(hreq->vflow, VFLOW_ATTRIBUTE_OCTETS_REVERSE, 0); + vflow_add_rate(hreq->vflow, VFLOW_ATTRIBUTE_OCTETS, VFLOW_ATTRIBUTE_OCTET_RATE); + vflow_add_rate(hreq->vflow, VFLOW_ATTRIBUTE_OCTETS_REVERSE, VFLOW_ATTRIBUTE_OCTET_RATE_REVERSE); vflow_latency_start(hreq->vflow); hreq->latency_done = false; DEQ_INSERT_TAIL(th->http1.requests, hreq); @@ -82,10 +102,28 @@ static int rx_response(qd_http1_decoder_connection_t *hconn, uintptr_t request_c } if (status_code / 100 != 1) { // terminal response code - char status_code_str[16]; - snprintf(status_code_str, sizeof(status_code_str), "%d", status_code); - vflow_set_string(hreq->vflow, VFLOW_ATTRIBUTE_RESULT, status_code_str); - vflow_set_string(hreq->vflow, VFLOW_ATTRIBUTE_REASON, reason_phrase); + hreq->response_status = status_code; + if (reason_phrase) { + assert(!hreq->response_phrase); + hreq->response_phrase = qd_strndup(reason_phrase, 256); + } + } + + return 0; +} + + +static int rx_body(qd_http1_decoder_connection_t *hconn, uintptr_t request_context, bool from_client, const unsigned char *body, size_t length) +{ + http1_request_state_t *hreq = (http1_request_state_t *) request_context; + assert(hreq); + + if (from_client) { + hreq->client_body_octets += length; + vflow_set_uint64(hreq->vflow, VFLOW_ATTRIBUTE_OCTETS, hreq->client_body_octets); + } else { + hreq->server_body_octets += length; + vflow_set_uint64(hreq->vflow, VFLOW_ATTRIBUTE_OCTETS_REVERSE, hreq->server_body_octets); } return 0; @@ -94,6 +132,7 @@ static int rx_response(qd_http1_decoder_connection_t *hconn, uintptr_t request_c static int transaction_complete(qd_http1_decoder_connection_t *hconn, uintptr_t request_context) { + char status_code_str[16]; qdpo_transport_handle_t *th = (qdpo_transport_handle_t *) qd_http1_decoder_connection_get_context(hconn); assert(th); @@ -102,9 +141,14 @@ static int transaction_complete(qd_http1_decoder_connection_t *hconn, uintptr_t http1_request_state_t *hreq = (http1_request_state_t *) request_context; assert(hreq); + snprintf(status_code_str, sizeof(status_code_str), "%d", hreq->response_status); + vflow_set_string(hreq->vflow, VFLOW_ATTRIBUTE_RESULT, status_code_str); + if (hreq->response_phrase) + vflow_set_string(hreq->vflow, VFLOW_ATTRIBUTE_REASON, hreq->response_phrase); + + DEQ_REMOVE(th->http1.requests, hreq); - vflow_end_record(hreq->vflow); - free_http1_request_state_t(hreq); + http1_request_state_free(hreq); return 0; } @@ -133,7 +177,7 @@ static qd_http1_decoder_config_t decoder_config = { .rx_response = rx_response, // .rx_header = rx_header, // .rx_headers_done = rx_headers_done, - // .rx_body = rx_body, + .rx_body = rx_body, // .message_done = message_done, .transaction_complete = transaction_complete, .protocol_error = protocol_error @@ -180,10 +224,7 @@ void qdpo_http1_final(qdpo_transport_handle_t *th) http1_request_state_t *hreq = DEQ_HEAD(th->http1.requests); while (hreq) { DEQ_REMOVE_HEAD(th->http1.requests); - if (hreq->vflow) { - vflow_end_record(hreq->vflow); - } - free_http1_request_state_t(hreq); + http1_request_state_free(hreq); hreq = DEQ_HEAD(th->http1.requests); } diff --git a/tests/system_tests_http_observer.py b/tests/system_tests_http_observer.py index 54078d118..587ca2779 100644 --- a/tests/system_tests_http_observer.py +++ b/tests/system_tests_http_observer.py @@ -186,6 +186,7 @@ def test_01_get(self): '-G' ] + # Note: the lengths of these files are hardcoded in the expected map below: pages = ['index.html', 't100K.html', 't10K.html', 't1K.html'] for page in pages: curl_args.append(f"http://localhost:{l_port}/{page}") @@ -201,21 +202,29 @@ def test_01_get(self): "RESULT": "200", "REASON": "OK", "PROTOCOL": "HTTP/1.1", + "OCTETS": 0, + "OCTETS_REVERSE": 45, # index.html length 'END_TIME': ANY_VALUE}), ('BIFLOW_APP', {"METHOD": "GET", "RESULT": "200", "REASON": "OK", "PROTOCOL": "HTTP/1.1", + "OCTETS": 0, + "OCTETS_REVERSE": 108803, # t100K.html length 'END_TIME': ANY_VALUE}), ('BIFLOW_APP', {"METHOD": "GET", "RESULT": "200", "REASON": "OK", + "OCTETS": 0, + "OCTETS_REVERSE": 10972, # t10K.html length "PROTOCOL": "HTTP/1.1", 'END_TIME': ANY_VALUE}), ('BIFLOW_APP', {'METHOD': "GET", 'RESULT': "200", 'REASON': "OK", 'PROTOCOL': 'HTTP/1.1', + "OCTETS": 0, + "OCTETS_REVERSE": 1188, # t1K.html length 'END_TIME': ANY_VALUE}) ] } @@ -286,7 +295,13 @@ def test_02_post(self): ('BIFLOW_APP', {'PROTOCOL': 'HTTP/1.1', 'METHOD': 'POST', 'REASON': ANY_VALUE, - 'END_TIME': ANY_VALUE}) + 'END_TIME': ANY_VALUE, + # curl sends 'Start&End' as the POST request + # message body: + 'OCTETS': 9, + # The python HTTP test server replies with + # '

Dummy CGI output!

' + 'OCTETS_REVERSE': 39}) ] } success = retry(lambda: snooper_thread.match_records(expected), delay=1)