Skip to content

Commit

Permalink
Issue #1498: Initial HTTP/1.x observer implementation (#1513)
Browse files Browse the repository at this point in the history
  • Loading branch information
kgiusti authored Jun 7, 2024
1 parent 403afe3 commit fc31f23
Show file tree
Hide file tree
Showing 16 changed files with 917 additions and 125 deletions.
1 change: 1 addition & 0 deletions include/qpid/dispatch/log.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ typedef enum {
LOG_HTTP_ADAPTOR,
LOG_FLOW_LOG,
LOG_ADDRESS_WATCH,
LOG_HTTP1_OBSERVER,
LOG_DEFAULT
} qd_log_module_t;

Expand Down
1 change: 1 addition & 0 deletions python/skupper_router/management/skrouter.json
Original file line number Diff line number Diff line change
Expand Up @@ -1366,6 +1366,7 @@
"HTTP_ADAPTOR",
"FLOW_LOG",
"ADDRESS_WATCH",
"HTTP1_OBSERVER",
"DEFAULT"
],
"required": true,
Expand Down
20 changes: 10 additions & 10 deletions src/adaptors/tcp/tcp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -489,11 +489,6 @@ static void free_connection_IO(void *context)
sys_mutex_unlock(&conn->activation_lock);
// Do NOT free the core_activation lock since the core may be holding it

if (conn->observer_handle) {
qdpo_end(conn->observer_handle);
conn->observer_handle = 0;
}

if (conn->common.parent) {
if (conn->common.parent->context_type == TL_LISTENER) {
qd_tcp_listener_t *listener = (qd_tcp_listener_t*) conn->common.parent;
Expand Down Expand Up @@ -610,17 +605,21 @@ static void close_connection_XSIDE_IO(qd_tcp_connection_t *conn)
qdr_link_detach(conn->outbound_link, QD_LOST, 0);
}

if (!!conn->core_conn) {
qdr_connection_closed(conn->core_conn);
conn->core_conn = 0;
qd_connection_counter_dec(QD_PROTOCOL_TCP);
if (conn->observer_handle) {
qdpo_end(conn->observer_handle);
}

if (!!conn->common.vflow) {
vflow_set_uint64(conn->common.vflow, VFLOW_ATTRIBUTE_OCTETS, conn->inbound_octets);
vflow_end_record(conn->common.vflow);
}

if (!!conn->core_conn) {
qdr_connection_closed(conn->core_conn);
conn->core_conn = 0;
qd_connection_counter_dec(QD_PROTOCOL_TCP);
}

qd_tls_free2(conn->tls);
qd_tls_domain_decref(conn->tls_domain);
free(conn->alpn_protocol);
Expand All @@ -632,8 +631,9 @@ static void close_connection_XSIDE_IO(qd_tcp_connection_t *conn)
conn->outbound_link = 0;
conn->outbound_stream = 0;
conn->outbound_delivery = 0;
conn->core_conn = 0;
conn->observer_handle = 0;
conn->common.vflow = 0;
conn->core_conn = 0;
conn->tls = 0;
conn->tls_domain = 0;

Expand Down
35 changes: 20 additions & 15 deletions src/decoders/http1/http1_decoder.c
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,7 @@ static void parser_error(qd_http1_decoder_connection_t *hconn, const char *reaso
hconn->parse_error = reason;
decoder_new_state(&hconn->client, HTTP1_DECODE_ERROR);
decoder_new_state(&hconn->server, HTTP1_DECODE_ERROR);
if (hconn->config->protocol_error)
hconn->config->protocol_error(hconn, reason);
hconn->config->protocol_error(hconn, reason);
}
}

Expand Down Expand Up @@ -439,10 +438,12 @@ static bool message_done(qd_http1_decoder_connection_t *hconn, decoder_t *decode
}

// signal the message receive is complete
int rc = hconn->config->message_done(hconn, hrs->user_context, decoder->is_client);
if (rc) {
parser_error(hconn, "message_done callback failed");
return false;
if (hconn->config->message_done) {
int rc = hconn->config->message_done(hconn, hrs->user_context, decoder->is_client);
if (rc) {
parser_error(hconn, "message_done callback failed");
return false;
}
}

decoder_reset(decoder); // resets decode state to parse_request or parse_response
Expand Down Expand Up @@ -619,10 +620,12 @@ static bool headers_done(qd_http1_decoder_connection_t *hconn, struct decoder_t
h1_decode_request_state_t *hrs = decoder->hrs;
assert(hrs);

int rc = hconn->config->rx_headers_done(hconn, hrs->user_context, decoder->is_client);
if (rc) {
parser_error(hconn, "rx_headers_done callback failed");
return false;
if (hconn->config->rx_headers_done) {
int rc = hconn->config->rx_headers_done(hconn, hrs->user_context, decoder->is_client);
if (rc) {
parser_error(hconn, "rx_headers_done callback failed");
return false;
}
}

// Determine if a body is present. See RFC9112 Message Body Length
Expand Down Expand Up @@ -783,11 +786,13 @@ static bool parse_header(qd_http1_decoder_connection_t *hconn, decoder_t *decode
truncate_whitespace(value);
}

assert(decoder->hrs);
int rc = hconn->config->rx_header(hconn, decoder->hrs->user_context, decoder->is_client, key, value);
if (rc) {
parser_error(hconn, "rx_header callback failed");
return false;
if (hconn->config->rx_header) {
assert(decoder->hrs);
int rc = hconn->config->rx_header(hconn, decoder->hrs->user_context, decoder->is_client, key, value);
if (rc) {
parser_error(hconn, "rx_header callback failed");
return false;
}
}

if (!process_header(hconn, decoder, key, value))
Expand Down
8 changes: 4 additions & 4 deletions src/decoders/http1/http1_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ struct qd_http1_decoder_config_t {
uint32_t version_major,
uint32_t version_minor);

// Invoked for each HTTP header received. from_client is true if the header was read from the stream sent by the
// client, false if the stream is sent from the server. Neither key nor value is preserved on return from this
// call. The callback must copy the data associated with these values if they need to be saved.
// (Optional) invoked for each HTTP header received. from_client is true if the header was read from the stream sent
// by the client, false if the stream is sent from the server. Neither key nor value is preserved on return from
// this call. The callback must copy the data associated with these values if they need to be saved.
//
int (*rx_header)(qd_http1_decoder_connection_t *hconn, uintptr_t request_context, bool from_client,
const char *key, const char *value);

// Invoked after the last HTTP header (after the last rx_header() callback for this message).
// (Optional) invoked after the last HTTP header (after the last rx_header() callback for this message).
//
int (*rx_headers_done)(qd_http1_decoder_connection_t *hconn, uintptr_t request_context, bool from_client);

Expand Down
5 changes: 3 additions & 2 deletions src/log.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
#define TEXT_MAX QD_LOG_TEXT_MAX
#define LOG_MAX (QD_LOG_TEXT_MAX+128)
#define LIST_MAX 1000
#define NUM_LOG_SOURCES 20
#define NUM_LOG_SOURCES 21

const char *QD_LOG_STATS_TYPE = "logStats";

Expand All @@ -58,7 +58,8 @@ int qd_log_max_len(void)
const char *log_module_names[] = {"ROUTER", "ROUTER_CORE", "ROUTER_HELLO", "ROUTER_LS", "ROUTER_MA",
"MESSAGE", "SERVER", "AGENT", "CONTAINER", "ERROR",
"POLICY", "HTTP", "CONN_MGR", "PYTHON", "PROTOCOL",
"TCP_ADAPTOR", "HTTP_ADAPTOR", "FLOW_LOG", "ADDRESS_WATCH", "DEFAULT"};
"TCP_ADAPTOR", "HTTP_ADAPTOR", "FLOW_LOG", "ADDRESS_WATCH", "HTTP1_OBSERVER",
"DEFAULT"};

typedef struct qd_log_entry_t qd_log_entry_t;

Expand Down
Loading

0 comments on commit fc31f23

Please sign in to comment.