diff --git a/include/qpid/dispatch/protocol_observer.h b/include/qpid/dispatch/protocol_observer.h index 9a7f3cfda..7dd64b9e4 100644 --- a/include/qpid/dispatch/protocol_observer.h +++ b/include/qpid/dispatch/protocol_observer.h @@ -20,6 +20,7 @@ */ #include +#include #include /** @@ -68,7 +69,7 @@ void qdpo_config_add_address(qdpo_config_t *config, const char *field, const cha typedef struct qdpo_t qdpo_t; -typedef void* qdpo_transport_handle_t; +typedef struct qdpo_transport_handle_t qdpo_transport_handle_t; /** * Create a new protocol observer. Protocol observers take raw octets and attempt to detect the application protocol @@ -78,7 +79,7 @@ typedef void* qdpo_transport_handle_t; * @param config Configuration returned by qdpo_config * @return qdpo_t* Newly allocated observer */ -qdpo_t *protocol_observer(const char *base, qdpo_config_t *config); +qdpo_t *protocol_observer(qd_protocol_t base, qdpo_config_t *config); /** * Free an allocated observer @@ -88,26 +89,25 @@ qdpo_t *protocol_observer(const char *base, qdpo_config_t *config); void qdpo_free(qdpo_t *observer); /** - * Provide the first buffer for a new connection. This is payload sent from the client of the connection. + * Create an observer for a new connection. * * @param observer The observer returned by protocol_observer * @param vflow The vanflow record for the client-side transport flow * @param transport_context A context unique to this connections transport (used in callbacks) - * @param buf The buffer containing the protocol payload - * @param offset The offset into the buffer where the first protocol octet is found - * @return void* A transport handle that references the observer's state for this connection + * @param conn_id The routers connection identifier associated with this flow. + * @return A transport handle that references the observer's state for this connection */ -qdpo_transport_handle_t qdpo_first(qdpo_t *observer, vflow_record_t *vflow, void *transport_context, qd_buffer_t *buf, size_t offset); +qdpo_transport_handle_t *qdpo_begin(qdpo_t *observer, vflow_record_t *vflow, void *transport_context, uint64_t conn_id); /** * Provide subsequent payload data to an already established connection. * * @param transport_handle The handle returned by qdpo_first * @param from_client True if this payload is from the client, false if from the server - * @param buf The buffer containing the protocol payload - * @param offset The offset into the buffer where the next protocol octet is found + * @param data The raw protocol data octets + * @param length The length of the data in octets */ -void qdpo_data(qdpo_transport_handle_t transport_handle, bool from_client, qd_buffer_t *buf, size_t offset); +void qdpo_data(qdpo_transport_handle_t *transport_handle, bool from_client, const unsigned char *data, size_t length); /** * Indicate the end of a connection. @@ -115,6 +115,6 @@ void qdpo_data(qdpo_transport_handle_t transport_handle, bool from_client, qd_bu * @param connection_handle The handle returned by qdpo_first. This handle * should not be used after making this call. */ -void qdpo_end(qdpo_transport_handle_t transport_handle); +void qdpo_end(qdpo_transport_handle_t *transport_handle); #endif diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 0600d1f3a..2f56a9ac8 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -59,6 +59,10 @@ set(qpid_dispatch_SOURCES adaptors/amqp/qd_connection.c adaptors/amqp/qd_connector.c adaptors/amqp/server_config.c + observers/protocol_observer.c + observers/tcp_observer.c + observers/http1_observer.c + observers/http2_observer.c alloc.c alloc_pool.c aprintf.c @@ -87,7 +91,6 @@ set(qpid_dispatch_SOURCES policy.c policy_spec.c protocol_adaptor.c - protocol_observer.c proton_utils.c posix/threading.c python_embedded.c diff --git a/src/adaptors/tcp/tcp_adaptor.c b/src/adaptors/tcp/tcp_adaptor.c index 357d7c989..e0be3b1ed 100644 --- a/src/adaptors/tcp/tcp_adaptor.c +++ b/src/adaptors/tcp/tcp_adaptor.c @@ -347,7 +347,7 @@ static void TL_setup_listener(qd_tcp_listener_t *li) // TODO - add configuration to the listener to influence whether and how the observer is set up. // li->protocol_observer_config = qdpo_config(0, true); - li->protocol_observer = protocol_observer("tcp", li->protocol_observer_config); + li->protocol_observer = protocol_observer(QD_PROTOCOL_TCP, li->protocol_observer_config); // // Create an adaptor listener. This listener will automatically create a listening socket when there is at least one @@ -489,6 +489,11 @@ static void free_connection_IO(void *context) sys_mutex_unlock(&conn->activation_lock); // Do NOT free the core_activation lock since the core may be holding it + if (conn->observer_handle) { + qdpo_end(conn->observer_handle); + conn->observer_handle = 0; + } + if (conn->common.parent) { if (conn->common.parent->context_type == TL_LISTENER) { qd_tcp_listener_t *listener = (qd_tcp_listener_t*) conn->common.parent; @@ -513,7 +518,7 @@ static void free_connection_IO(void *context) } } - // Pass connector to Core for final deallocation. The Core will free the activation_lock and the related flags. See + // Pass connection to Core for final deallocation. The Core will free the activation_lock and the related flags. See // qdr_core_free_tcp_resource_CT() free_tcp_resource(&conn->common); } @@ -758,7 +763,7 @@ static uint64_t produce_read_buffers_XSIDE_IO(qd_tcp_connection_t *conn, qd_mess if (qd_buffer_size(buf) > 0) { DEQ_INSERT_TAIL(qd_buffers, buf); if (conn->listener_side && !!conn->observer_handle) { - qdpo_data(conn->observer_handle, true, buf, 0); + qdpo_data(conn->observer_handle, true, qd_buffer_base(buf), qd_buffer_size(buf)); } } else { qd_buffer_free(buf); @@ -797,7 +802,7 @@ static uint64_t consume_write_buffers_XSIDE_IO(qd_tcp_connection_t *conn, qd_mes qd_buffer_t *buf = DEQ_HEAD(buffers); for (size_t i = 0; i < actual; i++) { if (conn->listener_side && !!conn->observer_handle) { - qdpo_data(conn->observer_handle, false, buf, 0); + qdpo_data(conn->observer_handle, false, qd_buffer_base(buf), qd_buffer_size(buf)); } raw_buffers[i].context = (uintptr_t) buf; raw_buffers[i].bytes = (char*) qd_buffer_base(buf); @@ -819,11 +824,9 @@ static uint64_t consume_write_buffers_XSIDE_IO(qd_tcp_connection_t *conn, qd_mes // output message buffers and will free them as they are processed. Due to that we need to make a copy of these buffers // in order to avoid freeing buffers that are part of the message (double-free). // -static uint64_t copy_message_body_TLS_XSIDE_IO(qd_tcp_connection_t *conn, qd_message_t *stream, qd_buffer_list_t *buffers, size_t limit) +static void copy_message_body_TLS_XSIDE_IO(qd_tcp_connection_t *conn, qd_message_t *stream, qd_buffer_list_t *buffers, size_t limit) { size_t offset = 0; - uint64_t octets = 0; - const bool observe = (conn->listener_side && !!conn->observer_handle); assert(conn->tls); @@ -838,12 +841,8 @@ static uint64_t copy_message_body_TLS_XSIDE_IO(qd_tcp_connection_t *conn, qd_mes qd_buffer_t *clone = qd_buffer(); clone->size = size; memcpy(qd_buffer_base(clone), qd_buffer_base(conn->outbound_body) + offset, size); - if (observe) { - qdpo_data(conn->observer_handle, false, clone, 0); - } DEQ_INSERT_TAIL(*buffers, clone); } - octets += size; offset = 0; conn->outbound_body = DEQ_NEXT(conn->outbound_body); } @@ -852,8 +851,6 @@ static uint64_t copy_message_body_TLS_XSIDE_IO(qd_tcp_connection_t *conn, qd_mes conn->outbound_body_complete = true; qd_message_release_raw_body(stream); } - - return octets; } static uint64_t consume_message_body_XSIDE_IO(qd_tcp_connection_t *conn, qd_message_t *stream) @@ -879,16 +876,20 @@ static uint64_t consume_message_body_XSIDE_IO(qd_tcp_connection_t *conn, qd_mess // Note: There may be a non-zero offset only on the first body buffer. It is assumed that // every subsequent buffer will have an offset of 0. // + const bool observe = conn->listener_side && !!conn->observer_handle; while (!!conn->outbound_body && pn_raw_connection_write_buffers_capacity(conn->raw_conn) > 0) { - if (conn->listener_side && !!conn->observer_handle) { - qdpo_data(conn->observer_handle, false, conn->outbound_body, offset); + unsigned char *bytes = qd_buffer_base(conn->outbound_body) + offset; + size_t size = qd_buffer_size(conn->outbound_body) - offset; + + if (observe) { + qdpo_data(conn->observer_handle, false, bytes, size); } pn_raw_buffer_t raw_buffer; raw_buffer.context = 0; - raw_buffer.bytes = (char*) qd_buffer_base(conn->outbound_body); - raw_buffer.capacity = qd_buffer_capacity(conn->outbound_body); - raw_buffer.size = qd_buffer_size(conn->outbound_body) - offset; - raw_buffer.offset = offset; + raw_buffer.bytes = (char*) bytes; + raw_buffer.capacity = 0; + raw_buffer.size = size; + raw_buffer.offset = 0; octets += raw_buffer.size; pn_raw_connection_write_buffers(conn->raw_conn, &raw_buffer, 1); conn->outbound_body = DEQ_NEXT(conn->outbound_body); @@ -1429,9 +1430,8 @@ static bool manage_flow_XSIDE_IO(qd_tcp_connection_t *conn) // static int64_t tls_consume_data_buffers(void *context, qd_buffer_list_t *buffers, size_t limit) { - qd_tcp_connection_t *conn = (qd_tcp_connection_t *) context; - const bool observe = conn->listener_side && !!conn->observer_handle; - uint64_t octets = 0; + qd_tcp_connection_t *conn = (qd_tcp_connection_t *) context; + uint64_t octets = 0; assert(limit > 0); assert(DEQ_IS_EMPTY(*buffers)); @@ -1440,7 +1440,7 @@ static int64_t tls_consume_data_buffers(void *context, qd_buffer_list_t *buffers return octets; if (!conn->outbound_body_complete) { - octets = copy_message_body_TLS_XSIDE_IO(conn, conn->outbound_stream, buffers, limit); + copy_message_body_TLS_XSIDE_IO(conn, conn->outbound_stream, buffers, limit); assert(limit >= DEQ_SIZE(*buffers)); limit -= DEQ_SIZE(*buffers); } @@ -1449,19 +1449,21 @@ static int64_t tls_consume_data_buffers(void *context, qd_buffer_list_t *buffers qd_buffer_list_t tmp = DEQ_EMPTY; qd_message_consume_buffers(conn->outbound_stream, &tmp, limit); assert(limit >= DEQ_SIZE(tmp)); - limit -= DEQ_SIZE(tmp); - qd_buffer_t *buf = DEQ_HEAD(tmp); + DEQ_APPEND(*buffers, tmp); + } + + if (!DEQ_IS_EMPTY(*buffers)) { + const bool observe = conn->listener_side && !!conn->observer_handle; + + qd_buffer_t *buf = DEQ_HEAD(*buffers); while (buf) { octets += qd_buffer_size(buf); if (observe) { - qdpo_data(conn->observer_handle, false, buf, 0); + qdpo_data(conn->observer_handle, false, qd_buffer_base(buf), qd_buffer_size(buf)); } buf = DEQ_NEXT(buf); } - DEQ_APPEND(*buffers, tmp); - } - if (octets) { qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] TLS consumed %"PRIu64" cleartext octets from stream", conn->conn_id, octets); @@ -1536,6 +1538,7 @@ static bool manage_tls_flow_XSIDE_IO(qd_tcp_connection_t *conn) const int tls_status = qd_tls_do_io2(conn->tls, conn->raw_conn, tls_consume_data_buffers, conn, (can_produce) ? &decrypted_buffers : 0, &decrypted_octets); + // // Process inbound cleartext data. // @@ -1544,6 +1547,15 @@ static bool manage_tls_flow_XSIDE_IO(qd_tcp_connection_t *conn) more_work = true; conn->inbound_octets += decrypted_octets; vflow_set_uint64(conn->common.vflow, VFLOW_ATTRIBUTE_OCTETS, conn->inbound_octets); + + if (conn->listener_side && !!conn->observer_handle) { + qd_buffer_t *buf = DEQ_HEAD(decrypted_buffers); + while (buf) { + qdpo_data(conn->observer_handle, true, qd_buffer_base(buf), qd_buffer_size(buf)); + buf = DEQ_NEXT(buf); + } + } + qd_message_produce_buffers(conn->inbound_stream, &decrypted_buffers); qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] %cSIDE TLS read: Produced %"PRIu64" octets into stream", conn->conn_id, conn->listener_side ? 'L' : 'C', decrypted_octets); @@ -2013,7 +2025,6 @@ static void on_accept(qd_adaptor_listener_t *adaptor_listener, pn_listener_t *pn // qd_tcp_listener_incref(listener); - sys_mutex_init(&conn->activation_lock); sys_atomic_init(&conn->core_activation, 0); sys_atomic_init(&conn->raw_opened, 0); @@ -2036,8 +2047,13 @@ static void on_accept(qd_adaptor_listener_t *adaptor_listener, pn_listener_t *pn vflow_set_uint64(listener->common.vflow, VFLOW_ATTRIBUTE_FLOW_COUNT_L4, listener->connections_opened); sys_mutex_unlock(&listener->lock); + if (listener->protocol_observer) { + conn->observer_handle = qdpo_begin(listener->protocol_observer, conn->common.vflow, conn, conn->conn_id); + } + conn->raw_conn = pn_raw_connection(); pn_raw_connection_set_context(conn->raw_conn, &conn->context); + // Note: this will trigger the connection's event handler on another thread: pn_listener_raw_accept(pn_listener, conn->raw_conn); } diff --git a/src/adaptors/tcp/tcp_adaptor.h b/src/adaptors/tcp/tcp_adaptor.h index aaae84293..86532ff23 100644 --- a/src/adaptors/tcp/tcp_adaptor.h +++ b/src/adaptors/tcp/tcp_adaptor.h @@ -62,7 +62,7 @@ struct qd_tcp_listener_t { qd_adaptor_config_t *adaptor_config; qd_tls_domain_t *tls_domain; qd_adaptor_listener_t *adaptor_listener; - qd_tcp_connection_list_t connections; + qd_tcp_connection_list_t connections; qdpo_config_t *protocol_observer_config; qdpo_t *protocol_observer; uint64_t connections_opened; diff --git a/src/observers/http1_observer.c b/src/observers/http1_observer.c new file mode 100644 index 000000000..8468c44eb --- /dev/null +++ b/src/observers/http1_observer.c @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "private.h" + +#include + + +static void http1_observe(qdpo_transport_handle_t *th, bool from_client, const unsigned char *data, size_t length) +{ + qd_log(LOG_HTTP_ADAPTOR, QD_LOG_DEBUG, + "[C%" PRIu64 "] HTTP/1.1 observer classifying protocol: %zu %s octets", th->conn_id, length, from_client ? "client" : "server"); +} + + + +void qdpo_http1_init(qdpo_transport_handle_t *th) +{ + qd_log(LOG_HTTP_ADAPTOR, QD_LOG_DEBUG, "[C%" PRIu64 "] HTTP/1.1 observer initialized", th->conn_id); + + th->protocol = QD_PROTOCOL_HTTP1; + th->observe = http1_observe; + th->http1.tbd = 42; // whatever; + +} + +void qdpo_http1_final(qdpo_transport_handle_t *th) +{ + qd_log(LOG_HTTP_ADAPTOR, QD_LOG_DEBUG, "[C%" PRIu64 "] HTTP/1.1 observer finalized", th->conn_id); + th->observe = 0; +} + diff --git a/src/observers/http2_observer.c b/src/observers/http2_observer.c new file mode 100644 index 000000000..1be17bc12 --- /dev/null +++ b/src/observers/http2_observer.c @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "private.h" + +#include + + +static void http2_observe(qdpo_transport_handle_t *th, bool from_client, const unsigned char *data, size_t length) +{ + qd_log(LOG_HTTP_ADAPTOR, QD_LOG_DEBUG, + "[C%" PRIu64 "] HTTP/2.0 observer classifying protocol: %zu %s octets", th->conn_id, length, from_client ? "client" : "server"); + +} + + +void qdpo_http2_init(qdpo_transport_handle_t *th) +{ + qd_log(LOG_HTTP_ADAPTOR, QD_LOG_DEBUG, "[C%" PRIu64 "] HTTP/2.0 observer initialized", th->conn_id); + + th->protocol = QD_PROTOCOL_HTTP2; + th->observe = http2_observe; + th->http2.tbd = 42; // whatever +} + + +void qdpo_http2_final(qdpo_transport_handle_t *th) +{ + qd_log(LOG_HTTP_ADAPTOR, QD_LOG_DEBUG, "[C%" PRIu64 "] HTTP/2.0 observer finalized", th->conn_id); + th->observe = 0; +} + diff --git a/src/observers/private.h b/src/observers/private.h new file mode 100644 index 000000000..40f28a0ed --- /dev/null +++ b/src/observers/private.h @@ -0,0 +1,96 @@ +#ifndef __observers_private_h__ +#define __observers_private_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include + +struct qdpo_config_t { + qdpo_use_address_t use_address; + bool allow_all; +}; + + +struct qdpo_t { + qd_protocol_t base; // initial observed protocol + qdpo_config_t *config; +}; + +/** + * state machine for observing TCP + */ +#define TCP_PREFIX_LEN 32 +typedef struct tcp_observer_state_t tcp_observer_state_t; +struct tcp_observer_state_t { + // buffer holding enough of the stream to classify the protocol + uint8_t prefix[TCP_PREFIX_LEN]; + int prefix_len; + + // store incoming server data arriving prior to completing classification. + qd_buffer_list_t server_data; + size_t server_bytes; +}; + +/** + * state machine for observing HTTP/1.x + */ +typedef struct http1_observer_state_t http1_observer_state_t; +struct http1_observer_state_t { + int tbd; +}; + + +/** + * state machine for observing HTTP/2.0 + */ +typedef struct http2_observer_state_t http2_observer_state_t; +struct http2_observer_state_t { + int tbd; +}; + + +/** + * Per flow protocol observer + */ +struct qdpo_transport_handle_t { + qdpo_t *parent; + vflow_record_t *vflow; + void *transport_context; + uint64_t conn_id; + qd_protocol_t protocol; // current observed protocol + + void (*observe)(qdpo_transport_handle_t *, bool from_client, const unsigned char *buf, size_t length); + + union { + tcp_observer_state_t tcp; + http1_observer_state_t http1; + http2_observer_state_t http2; + }; +}; + +void qdpo_tcp_init(qdpo_transport_handle_t *handle); +void qdpo_tcp_final(qdpo_transport_handle_t *handle); + +void qdpo_http1_init(qdpo_transport_handle_t *handle); +void qdpo_http1_final(qdpo_transport_handle_t *handle); + +void qdpo_http2_init(qdpo_transport_handle_t *handle); +void qdpo_http2_final(qdpo_transport_handle_t *handle); + +#endif diff --git a/src/protocol_observer.c b/src/observers/protocol_observer.c similarity index 50% rename from src/protocol_observer.c rename to src/observers/protocol_observer.c index ddf9d3b79..01992e194 100644 --- a/src/protocol_observer.c +++ b/src/observers/protocol_observer.c @@ -17,16 +17,16 @@ * under the License. */ +#include "private.h" #include -#include - -struct qdpo_config_t { - qdpo_use_address_t use_address; - bool allow_all; -}; ALLOC_DECLARE(qdpo_config_t); ALLOC_DEFINE(qdpo_config_t); +ALLOC_DECLARE(qdpo_t); +ALLOC_DEFINE(qdpo_t); +ALLOC_DECLARE(qdpo_transport_handle_t); +ALLOC_DEFINE(qdpo_transport_handle_t); + qdpo_config_t *qdpo_config(qdpo_use_address_t use_address, bool allow_all_protocols) { @@ -58,19 +58,12 @@ void qdpo_config_add_address(qdpo_config_t *config, const char *field, const cha } -struct qdpo_t { - qdpo_config_t *config; -}; - -ALLOC_DECLARE(qdpo_t); -ALLOC_DEFINE(qdpo_t); - - -qdpo_t *protocol_observer(const char *base, qdpo_config_t *config) +qdpo_t *protocol_observer(qd_protocol_t base, qdpo_config_t *config) { qdpo_t *observer = new_qdpo_t(); ZERO(observer); + observer->base = base; observer->config = config; return observer; @@ -83,21 +76,65 @@ void qdpo_free(qdpo_t *observer) } -qdpo_transport_handle_t qdpo_first(qdpo_t *observer, vflow_record_t *vflow, void *transport_context, qd_buffer_t *buf, size_t offset) +qdpo_transport_handle_t *qdpo_begin(qdpo_t *observer, vflow_record_t *vflow, void *transport_context, uint64_t conn_id) { - // TODO - return 0; + assert(observer); + + qdpo_transport_handle_t *th = new_qdpo_transport_handle_t(); + ZERO(th); + + th->parent = observer; + th->vflow = vflow; + th->transport_context = transport_context; + th->protocol = observer->base; + th->conn_id = conn_id; + + switch (th->protocol) { + case QD_PROTOCOL_TCP: + qdpo_tcp_init(th); + break; + case QD_PROTOCOL_HTTP1: + qdpo_http1_init(th); + break; + case QD_PROTOCOL_HTTP2: + qdpo_http2_init(th); + break; + default: + assert(false); // unsupported protocol + break; + } + + return th; } -void qdpo_data(qdpo_transport_handle_t transport_handle, bool from_client, qd_buffer_t *buf, size_t offset) +void qdpo_data(qdpo_transport_handle_t *th, bool from_client, const unsigned char *data, size_t length) { - // TODO + assert(th); + if (th->observe) + th->observe(th, from_client, data, length); } -void qdpo_end(qdpo_transport_handle_t transport_handle) +void qdpo_end(qdpo_transport_handle_t *th) { - // TODO + if (th) { + switch (th->protocol) { + case QD_PROTOCOL_TCP: + qdpo_tcp_final(th); + break; + case QD_PROTOCOL_HTTP1: + qdpo_http1_final(th); + break; + case QD_PROTOCOL_HTTP2: + qdpo_http2_final(th); + break; + default: + assert(false); // unsupported protocol + break; + } + + free_qdpo_transport_handle_t(th); + } } diff --git a/src/observers/tcp_observer.c b/src/observers/tcp_observer.c new file mode 100644 index 000000000..85bb0cdf6 --- /dev/null +++ b/src/observers/tcp_observer.c @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "private.h" + +#include +#include +#include + +#define MAX_SERVER_DATA_LEN 16384 + +// HTTP/2.0 protocol prefix (RFC 9113) +// "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" +// +#define HTTP2_PREFIX_LEN 24 +const uint8_t http2_prefix[HTTP2_PREFIX_LEN] = { + 0x50, 0x52, 0x49, 0x20, 0x2a, 0x20, 0x48, 0x54, 0x54, 0x50, 0x2f, 0x32, + 0x2e, 0x30, 0x0d, 0x0a, 0x0d, 0x0a, 0x53, 0x4d, 0x0d, 0x0a, 0x0d, 0x0a +}; +STATIC_ASSERT(TCP_PREFIX_LEN >= HTTP2_PREFIX_LEN, "Increase TCP_PREFIX_LEN"); + + +// Start a new protocol observer for the content of the tcp data stream +// +// This is a bit tricky since the handle uses an union for protocol state data we need to +// make a separate copy of the saved data for the new observer. +// +static void activate_inner(qdpo_transport_handle_t *th, qd_protocol_t inner_protocol, const unsigned char *data, size_t length) +{ + tcp_observer_state_t save = th->tcp; + + DEQ_INIT(th->tcp.server_data); // prevent free by qdpo_tcp_final() + qdpo_tcp_final(th); + + switch (inner_protocol) { + case QD_PROTOCOL_HTTP1: + qdpo_http1_init(th); + break; + case QD_PROTOCOL_HTTP2: + qdpo_http2_init(th); + break; + default: + assert(false); // unsupported protocol + break; + } + + // pass save data to new observer + + th->observe(th, true, save.prefix, save.prefix_len); + th->observe(th, true, data, length); + qd_buffer_t *buf = DEQ_HEAD(save.server_data); + while (buf) { + DEQ_REMOVE_HEAD(save.server_data); + th->observe(th, false, qd_buffer_base(buf), qd_buffer_size(buf)); + qd_buffer_free(buf); + buf = DEQ_HEAD(save.server_data); + } +} + + +static void tcp_observe(qdpo_transport_handle_t *th, bool from_client, const unsigned char *data, size_t length) +{ + qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, + "[C%" PRIu64 "] TCP observer classifying protocol: %zu %s octets", th->conn_id, length, from_client ? "client" : "server"); + + // wicked brain-dead classification just for POC + + if (from_client) { + + // fill up the protocol classification prefix buffer + + if (th->tcp.prefix_len < TCP_PREFIX_LEN) { + size_t to_copy = MIN(length, TCP_PREFIX_LEN - th->tcp.prefix_len); + memcpy(&th->tcp.prefix[th->tcp.prefix_len], data, to_copy); + th->tcp.prefix_len += to_copy; + data += to_copy; + length -= to_copy; + } + + // Check for HTTP/2.0 + + size_t to_match = MIN(HTTP2_PREFIX_LEN, th->tcp.prefix_len); + if (memcmp(th->tcp.prefix, http2_prefix, to_match) == 0) { + // did we match the entire http2_prefix? + if (to_match == HTTP2_PREFIX_LEN) { + activate_inner(th, QD_PROTOCOL_HTTP2, data, length); + return; + } else { + return; // Partial match for HTTP/2 prefix, wait for more + } + + } else { + + // HTTP/2.0 check failed, try HTTP/1.x + + // hack for now. probably best to just start handing off the http1 observer and let it + // try to find a proper HTTP/1.x request line. + + const char * const http1_commands[] = { + "GET ", "HEAD ", "POST ", "PUT ", "DELETE ", 0 + }; + + for (int i = 0; http1_commands[i] != 0; ++i) { + size_t cmd_len = strlen(http1_commands[i]); + size_t to_match = MIN(th->tcp.prefix_len, cmd_len); + if (memcmp(th->tcp.prefix, http1_commands[i], to_match) == 0) { + if (to_match == cmd_len) { + activate_inner(th, QD_PROTOCOL_HTTP1, data, length); + return; + } else { + return; // partial match need more + } + } + } + } + + } else { // !from_client + + // hold server data until protocol is classified + if (length + th->tcp.server_bytes < MAX_SERVER_DATA_LEN) { + qd_buffer_list_append(&th->tcp.server_data, data, length); + th->tcp.server_bytes += length; + return; + } + + // Server has sent too much data for us to hold while classifying. Protocol unknown. + // + qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%" PRIu64 "] TCP observer response received before classification completed", th->conn_id); + } + + qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, + "[C%" PRIu64 "] TCP observer terminated: unknown protocol: %.2X %.2X %.2X %.2X ...", + th->conn_id, th->tcp.prefix[0], th->tcp.prefix[1], th->tcp.prefix[2], th->tcp.prefix[3]); + qdpo_tcp_final(th); +} + + +void qdpo_tcp_init(qdpo_transport_handle_t *th) +{ + th->protocol = QD_PROTOCOL_TCP; + th->observe = tcp_observe; + + th->tcp.prefix_len = 0; + th->tcp.server_bytes = 0; + DEQ_INIT(th->tcp.server_data); + memset(th->tcp.prefix, 0, TCP_PREFIX_LEN); + + qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%" PRIu64 "] TCP observer initialized", th->conn_id); +} + +void qdpo_tcp_final(qdpo_transport_handle_t *th) +{ + qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%" PRIu64 "] TCP observer finalized", th->conn_id); + th->observe = 0; + qd_buffer_list_free_buffers(&th->tcp.server_data); +} +