Skip to content

Commit

Permalink
Fixes skupperproject#1621: add bi-flow octet counters to HTTP/1.x obs…
Browse files Browse the repository at this point in the history
…erver
  • Loading branch information
kgiusti committed Nov 18, 2024
1 parent 349007f commit ce15355
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 18 deletions.
10 changes: 10 additions & 0 deletions include/qpid/dispatch/ctools.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,5 +262,15 @@ static inline char *qd_strdup(const char *s)
return ptr;
}

static inline char *qd_strndup(const char *s, size_t n)
{
assert(s);
char *ptr = strndup(s, n);
if (!ptr) {
perror("qd_strndup");
abort();
}
return ptr;
}

#endif
5 changes: 0 additions & 5 deletions src/decoders/http1/http1_decoder.c
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,6 @@ struct h1_decode_request_state_t {
qd_http1_decoder_connection_t *hconn;
uintptr_t user_context;

#if 0 // TBD
uint64_t client_octets; // # total octets arriving from client for this request
uint64_t server_octets; // # total octets arriving from server for this response
#endif

int32_t response_code; // sent by server

bool head_request:1; // HEAD method
Expand Down
65 changes: 53 additions & 12 deletions src/observers/http1/http1_observer.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,28 @@
struct http1_request_state_t {
DEQ_LINKS(http1_request_state_t);
vflow_record_t *vflow;
bool latency_done:1; // true: latency timing complete
char *response_phrase; // reason phrase in response msg
int response_status; // result code in response msg
uint64_t client_body_octets; // total bytes received in client request msg body
uint64_t server_body_octets; // total bytes received in server response msg body
bool latency_done:1; // true: vflow latency timing complete
};
ALLOC_DECLARE(http1_request_state_t);
ALLOC_DEFINE(http1_request_state_t);


static void http1_request_state_free(http1_request_state_t *hreq)
{
if (hreq) {
if (hreq->vflow) {
vflow_end_record(hreq->vflow);
}
free(hreq->response_phrase);
free_http1_request_state_t(hreq);
}
}


/*
* HTTP/1.x decoder callbacks
*/
Expand All @@ -55,6 +71,10 @@ static int rx_request(qd_http1_decoder_connection_t *hconn, const char *method,
hreq->vflow = vflow_start_record(VFLOW_RECORD_BIFLOW_APP, th->vflow);
vflow_set_string(hreq->vflow, VFLOW_ATTRIBUTE_PROTOCOL, version_minor == 1 ? "HTTP/1.1" : "HTTP/1.0");
vflow_set_string(hreq->vflow, VFLOW_ATTRIBUTE_METHOD, method);
vflow_set_uint64(hreq->vflow, VFLOW_ATTRIBUTE_OCTETS, 0);
vflow_set_uint64(hreq->vflow, VFLOW_ATTRIBUTE_OCTETS_REVERSE, 0);
vflow_add_rate(hreq->vflow, VFLOW_ATTRIBUTE_OCTETS, VFLOW_ATTRIBUTE_OCTET_RATE);
vflow_add_rate(hreq->vflow, VFLOW_ATTRIBUTE_OCTETS_REVERSE, VFLOW_ATTRIBUTE_OCTET_RATE_REVERSE);
vflow_latency_start(hreq->vflow);
hreq->latency_done = false;
DEQ_INSERT_TAIL(th->http1.requests, hreq);
Expand Down Expand Up @@ -82,10 +102,28 @@ static int rx_response(qd_http1_decoder_connection_t *hconn, uintptr_t request_c
}

if (status_code / 100 != 1) { // terminal response code
char status_code_str[16];
snprintf(status_code_str, sizeof(status_code_str), "%d", status_code);
vflow_set_string(hreq->vflow, VFLOW_ATTRIBUTE_RESULT, status_code_str);
vflow_set_string(hreq->vflow, VFLOW_ATTRIBUTE_REASON, reason_phrase);
hreq->response_status = status_code;
if (reason_phrase) {
assert(!hreq->response_phrase);
hreq->response_phrase = qd_strndup(reason_phrase, 256);
}
}

return 0;
}


static int rx_body(qd_http1_decoder_connection_t *hconn, uintptr_t request_context, bool from_client, const unsigned char *body, size_t length)
{
http1_request_state_t *hreq = (http1_request_state_t *) request_context;
assert(hreq);

if (from_client) {
hreq->client_body_octets += length;
vflow_set_uint64(hreq->vflow, VFLOW_ATTRIBUTE_OCTETS, hreq->client_body_octets);
} else {
hreq->server_body_octets += length;
vflow_set_uint64(hreq->vflow, VFLOW_ATTRIBUTE_OCTETS_REVERSE, hreq->server_body_octets);
}

return 0;
Expand All @@ -94,6 +132,7 @@ static int rx_response(qd_http1_decoder_connection_t *hconn, uintptr_t request_c

static int transaction_complete(qd_http1_decoder_connection_t *hconn, uintptr_t request_context)
{
char status_code_str[16];
qdpo_transport_handle_t *th = (qdpo_transport_handle_t *) qd_http1_decoder_connection_get_context(hconn);
assert(th);

Expand All @@ -102,9 +141,14 @@ static int transaction_complete(qd_http1_decoder_connection_t *hconn, uintptr_t
http1_request_state_t *hreq = (http1_request_state_t *) request_context;
assert(hreq);

snprintf(status_code_str, sizeof(status_code_str), "%d", hreq->response_status);
vflow_set_string(hreq->vflow, VFLOW_ATTRIBUTE_RESULT, status_code_str);
if (hreq->response_phrase)
vflow_set_string(hreq->vflow, VFLOW_ATTRIBUTE_REASON, hreq->response_phrase);


DEQ_REMOVE(th->http1.requests, hreq);
vflow_end_record(hreq->vflow);
free_http1_request_state_t(hreq);
http1_request_state_free(hreq);
return 0;
}

Expand Down Expand Up @@ -133,7 +177,7 @@ static qd_http1_decoder_config_t decoder_config = {
.rx_response = rx_response,
// .rx_header = rx_header,
// .rx_headers_done = rx_headers_done,
// .rx_body = rx_body,
.rx_body = rx_body,
// .message_done = message_done,
.transaction_complete = transaction_complete,
.protocol_error = protocol_error
Expand Down Expand Up @@ -180,10 +224,7 @@ void qdpo_http1_final(qdpo_transport_handle_t *th)
http1_request_state_t *hreq = DEQ_HEAD(th->http1.requests);
while (hreq) {
DEQ_REMOVE_HEAD(th->http1.requests);
if (hreq->vflow) {
vflow_end_record(hreq->vflow);
}
free_http1_request_state_t(hreq);
http1_request_state_free(hreq);
hreq = DEQ_HEAD(th->http1.requests);
}

Expand Down
17 changes: 16 additions & 1 deletion tests/system_tests_http_observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ def test_01_get(self):
'-G'
]

# Note: the lengths of these files are hardcoded in the expected map below:
pages = ['index.html', 't100K.html', 't10K.html', 't1K.html']
for page in pages:
curl_args.append(f"http://localhost:{l_port}/{page}")
Expand All @@ -201,21 +202,29 @@ def test_01_get(self):
"RESULT": "200",
"REASON": "OK",
"PROTOCOL": "HTTP/1.1",
"OCTETS": 0,
"OCTETS_REVERSE": 45, # index.html length
'END_TIME': ANY_VALUE}),
('BIFLOW_APP', {"METHOD": "GET",
"RESULT": "200",
"REASON": "OK",
"PROTOCOL": "HTTP/1.1",
"OCTETS": 0,
"OCTETS_REVERSE": 108803, # t100K.html length
'END_TIME': ANY_VALUE}),
('BIFLOW_APP', {"METHOD": "GET",
"RESULT": "200",
"REASON": "OK",
"OCTETS": 0,
"OCTETS_REVERSE": 10972, # t10K.html length
"PROTOCOL": "HTTP/1.1",
'END_TIME': ANY_VALUE}),
('BIFLOW_APP', {'METHOD': "GET",
'RESULT': "200",
'REASON': "OK",
'PROTOCOL': 'HTTP/1.1',
"OCTETS": 0,
"OCTETS_REVERSE": 1188, # t1K.html length
'END_TIME': ANY_VALUE})
]
}
Expand Down Expand Up @@ -286,7 +295,13 @@ def test_02_post(self):
('BIFLOW_APP', {'PROTOCOL': 'HTTP/1.1',
'METHOD': 'POST',
'REASON': ANY_VALUE,
'END_TIME': ANY_VALUE})
'END_TIME': ANY_VALUE,
# curl sends 'Start&End' as the POST request
# message body:
'OCTETS': 9,
# The python HTTP test server replies with
# '<html><h1>Dummy CGI output!</h1></html>'
'OCTETS_REVERSE': 39})
]
}
success = retry(lambda: snooper_thread.match_records(expected), delay=1)
Expand Down

0 comments on commit ce15355

Please sign in to comment.