Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
kgiusti committed May 3, 2024
1 parent f93c6da commit 429e345
Show file tree
Hide file tree
Showing 9 changed files with 358 additions and 47 deletions.
19 changes: 9 additions & 10 deletions include/qpid/dispatch/protocol_observer.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/

#include <qpid/dispatch/buffer.h>
#include <qpid/dispatch/protocols.h>
#include <qpid/dispatch/vanflow.h>

/**
Expand Down Expand Up @@ -68,7 +69,7 @@ void qdpo_config_add_address(qdpo_config_t *config, const char *field, const cha


typedef struct qdpo_t qdpo_t;
typedef void* qdpo_transport_handle_t;
typedef struct qdpo_transport_handle_t qdpo_transport_handle_t;

/**
* Create a new protocol observer. Protocol observers take raw octets and attempt to detect the application protocol
Expand All @@ -78,7 +79,7 @@ typedef void* qdpo_transport_handle_t;
* @param config Configuration returned by qdpo_config
* @return qdpo_t* Newly allocated observer
*/
qdpo_t *protocol_observer(const char *base, qdpo_config_t *config);
qdpo_t *protocol_observer(qd_protocol_t base, qdpo_config_t *config);

/**
* Free an allocated observer
Expand All @@ -88,33 +89,31 @@ qdpo_t *protocol_observer(const char *base, qdpo_config_t *config);
void qdpo_free(qdpo_t *observer);

/**
* Provide the first buffer for a new connection. This is payload sent from the client of the connection.
* Create an observer for a new connection.
*
* @param observer The observer returned by protocol_observer
* @param vflow The vanflow record for the client-side transport flow
* @param transport_context A context unique to this connections transport (used in callbacks)
* @param buf The buffer containing the protocol payload
* @param offset The offset into the buffer where the first protocol octet is found
* @return void* A transport handle that references the observer's state for this connection
*/
qdpo_transport_handle_t qdpo_first(qdpo_t *observer, vflow_record_t *vflow, void *transport_context, qd_buffer_t *buf, size_t offset);
qdpo_transport_handle_t *qdpo_begin(qdpo_t *observer, vflow_record_t *vflow, void *transport_context);

/**
* Provide subsequent payload data to an already established connection.
*
* @param transport_handle The handle returned by qdpo_first
* @param from_client True if this payload is from the client, false if from the server
* @param buf The buffer containing the protocol payload
* @param offset The offset into the buffer where the next protocol octet is found
* @param data The raw protocol data octets
* @param length The length of the data in octets
*/
void qdpo_data(qdpo_transport_handle_t transport_handle, bool from_client, qd_buffer_t *buf, size_t offset);
void qdpo_data(qdpo_transport_handle_t *transport_handle, bool from_client, const unsigned char *data, size_t length);

/**
* Indicate the end of a connection.
*
* @param connection_handle The handle returned by qdpo_first. This handle
* should not be used after making this call.
*/
void qdpo_end(qdpo_transport_handle_t transport_handle);
void qdpo_end(qdpo_transport_handle_t *transport_handle);

#endif
5 changes: 4 additions & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ set(qpid_dispatch_SOURCES
adaptors/amqp/qd_connection.c
adaptors/amqp/qd_connector.c
adaptors/amqp/server_config.c
observers/protocol_observer.c
observers/tcp_observer.c
observers/http1_observer.c
observers/http2_observer.c
alloc.c
alloc_pool.c
aprintf.c
Expand Down Expand Up @@ -87,7 +91,6 @@ set(qpid_dispatch_SOURCES
policy.c
policy_spec.c
protocol_adaptor.c
protocol_observer.c
proton_utils.c
posix/threading.c
python_embedded.c
Expand Down
39 changes: 26 additions & 13 deletions src/adaptors/tcp/tcp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ static void TL_setup_listener(qd_tcp_listener_t *li)
// TODO - add configuration to the listener to influence whether and how the observer is set up.
//
li->protocol_observer_config = qdpo_config(0, true);
li->protocol_observer = protocol_observer("tcp", li->protocol_observer_config);
li->protocol_observer = protocol_observer(QD_PROTOCOL_TCP, li->protocol_observer_config);

//
// Create an adaptor listener. This listener will automatically create a listening socket when there is at least one
Expand Down Expand Up @@ -489,6 +489,11 @@ static void free_connection_IO(void *context)
sys_mutex_unlock(&conn->activation_lock);
// Do NOT free the core_activation lock since the core may be holding it

if (conn->observer_handle) {
qdpo_end(conn->observer_handle);
conn->observer_handle = 0;
}

if (conn->common.parent) {
if (conn->common.parent->context_type == TL_LISTENER) {
qd_tcp_listener_t *listener = (qd_tcp_listener_t*) conn->common.parent;
Expand All @@ -513,7 +518,7 @@ static void free_connection_IO(void *context)
}
}

// Pass connector to Core for final deallocation. The Core will free the activation_lock and the related flags. See
// Pass connection to Core for final deallocation. The Core will free the activation_lock and the related flags. See
// qdr_core_free_tcp_resource_CT()
free_tcp_resource(&conn->common);
}
Expand Down Expand Up @@ -758,7 +763,7 @@ static uint64_t produce_read_buffers_XSIDE_IO(qd_tcp_connection_t *conn, qd_mess
if (qd_buffer_size(buf) > 0) {
DEQ_INSERT_TAIL(qd_buffers, buf);
if (conn->listener_side && !!conn->observer_handle) {
qdpo_data(conn->observer_handle, true, buf, 0);
qdpo_data(conn->observer_handle, conn->listener_side, qd_buffer_base(buf), qd_buffer_size(buf));
}
} else {
qd_buffer_free(buf);
Expand Down Expand Up @@ -797,7 +802,7 @@ static uint64_t consume_write_buffers_XSIDE_IO(qd_tcp_connection_t *conn, qd_mes
qd_buffer_t *buf = DEQ_HEAD(buffers);
for (size_t i = 0; i < actual; i++) {
if (conn->listener_side && !!conn->observer_handle) {
qdpo_data(conn->observer_handle, false, buf, 0);
qdpo_data(conn->observer_handle, conn->listener_side, qd_buffer_base(buf), qd_buffer_size(buf));
}
raw_buffers[i].context = (uintptr_t) buf;
raw_buffers[i].bytes = (char*) qd_buffer_base(buf);
Expand Down Expand Up @@ -839,7 +844,7 @@ static uint64_t copy_message_body_TLS_XSIDE_IO(qd_tcp_connection_t *conn, qd_mes
clone->size = size;
memcpy(qd_buffer_base(clone), qd_buffer_base(conn->outbound_body) + offset, size);
if (observe) {
qdpo_data(conn->observer_handle, false, clone, 0);
qdpo_data(conn->observer_handle, conn->listener_side, qd_buffer_base(clone), qd_buffer_size(clone));
}
DEQ_INSERT_TAIL(*buffers, clone);
}
Expand Down Expand Up @@ -879,16 +884,20 @@ static uint64_t consume_message_body_XSIDE_IO(qd_tcp_connection_t *conn, qd_mess
// Note: There may be a non-zero offset only on the first body buffer. It is assumed that
// every subsequent buffer will have an offset of 0.
//
const bool observe = conn->listener_side && !!conn->observer_handle;
while (!!conn->outbound_body && pn_raw_connection_write_buffers_capacity(conn->raw_conn) > 0) {
if (conn->listener_side && !!conn->observer_handle) {
qdpo_data(conn->observer_handle, false, conn->outbound_body, offset);
unsigned char *bytes = qd_buffer_base(conn->outbound_body) + offset;
size_t size = qd_buffer_size(conn->outbound_body) - offset;

if (observe) {
qdpo_data(conn->observer_handle, conn->listener_side, bytes, size);
}
pn_raw_buffer_t raw_buffer;
raw_buffer.context = 0;
raw_buffer.bytes = (char*) qd_buffer_base(conn->outbound_body);
raw_buffer.capacity = qd_buffer_capacity(conn->outbound_body);
raw_buffer.size = qd_buffer_size(conn->outbound_body) - offset;
raw_buffer.offset = offset;
raw_buffer.bytes = (char*) bytes;
raw_buffer.capacity = 0;
raw_buffer.size = size;
raw_buffer.offset = 0;
octets += raw_buffer.size;
pn_raw_connection_write_buffers(conn->raw_conn, &raw_buffer, 1);
conn->outbound_body = DEQ_NEXT(conn->outbound_body);
Expand Down Expand Up @@ -1454,7 +1463,7 @@ static int64_t tls_consume_data_buffers(void *context, qd_buffer_list_t *buffers
while (buf) {
octets += qd_buffer_size(buf);
if (observe) {
qdpo_data(conn->observer_handle, false, buf, 0);
qdpo_data(conn->observer_handle, conn->listener_side, qd_buffer_base(buf), qd_buffer_size(buf));
}
buf = DEQ_NEXT(buf);
}
Expand Down Expand Up @@ -2013,7 +2022,6 @@ static void on_accept(qd_adaptor_listener_t *adaptor_listener, pn_listener_t *pn
//
qd_tcp_listener_incref(listener);


sys_mutex_init(&conn->activation_lock);
sys_atomic_init(&conn->core_activation, 0);
sys_atomic_init(&conn->raw_opened, 0);
Expand All @@ -2036,8 +2044,13 @@ static void on_accept(qd_adaptor_listener_t *adaptor_listener, pn_listener_t *pn
vflow_set_uint64(listener->common.vflow, VFLOW_ATTRIBUTE_FLOW_COUNT_L4, listener->connections_opened);
sys_mutex_unlock(&listener->lock);

if (listener->protocol_observer) {
conn->observer_handle = qdpo_begin(listener->protocol_observer, conn->common.vflow, conn);
}

conn->raw_conn = pn_raw_connection();
pn_raw_connection_set_context(conn->raw_conn, &conn->context);
// Note: this will trigger the connection's event handler on another thread:
pn_listener_raw_accept(pn_listener, conn->raw_conn);
}

Expand Down
2 changes: 1 addition & 1 deletion src/adaptors/tcp/tcp_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ struct qd_tcp_listener_t {
qd_adaptor_config_t *adaptor_config;
qd_tls_domain_t *tls_domain;
qd_adaptor_listener_t *adaptor_listener;
qd_tcp_connection_list_t connections;
qd_tcp_connection_list_t connections;
qdpo_config_t *protocol_observer_config;
qdpo_t *protocol_observer;
uint64_t connections_opened;
Expand Down
47 changes: 47 additions & 0 deletions src/observers/http1_observer.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "private.h"


static void http1_observe(qdpo_transport_handle_t *th, bool from_client, const unsigned char *data, size_t length)
{
qd_log(LOG_HTTP_ADAPTOR, QD_LOG_DEBUG,
"HTTP OBSERVE: %zu %s octets", length, from_client ? "client" : "server");

}



void qdpo_http1_init(qdpo_transport_handle_t *th)
{
qd_log(LOG_HTTP_ADAPTOR, QD_LOG_DEBUG, "HTTP OBSERVER INIT");

th->protocol = QD_PROTOCOL_HTTP1;
th->observe = http1_observe;
th->http1.tbd = 42; // whatever;

}

void qdpo_http1_final(qdpo_transport_handle_t *th)
{
qd_log(LOG_HTTP_ADAPTOR, QD_LOG_DEBUG, "HTTP OBSERVER FINAL");
th->observe = 0;
}

47 changes: 47 additions & 0 deletions src/observers/http2_observer.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "private.h"


static void http2_observe(qdpo_transport_handle_t *th, bool from_client, const unsigned char *data, size_t length)
{
qd_log(LOG_HTTP_ADAPTOR, QD_LOG_DEBUG,
"HTTP/2 OBSERVE: %zu %s octets", length, from_client ? "client" : "server");

}


void qdpo_http2_init(qdpo_transport_handle_t *th)
{
qd_log(LOG_HTTP_ADAPTOR, QD_LOG_DEBUG, "HTTP/2 OBSERVER INIT");

th->protocol = QD_PROTOCOL_HTTP2;
th->observe = http2_observe;
th->http2.tbd = 42; // whatever

}


void qdpo_http2_final(qdpo_transport_handle_t *th)
{
qd_log(LOG_HTTP_ADAPTOR, QD_LOG_DEBUG, "HTTP/2 OBSERVER FINAL");
th->observe = 0;
}

88 changes: 88 additions & 0 deletions src/observers/private.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#ifndef __observers_private_h__
#define __observers_private_h__ 1
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include <qpid/dispatch/protocol_observer.h>

struct qdpo_config_t {
qdpo_use_address_t use_address;
bool allow_all;
};


struct qdpo_t {
qd_protocol_t base; // initial observed protocol
qdpo_config_t *config;
};

/**
* state machine for observing TCP
*/
typedef struct tcp_observer_state_t tcp_observer_state_t;
struct tcp_observer_state_t {
int tbd;
};

/**
* state machine for observing HTTP/1.x
*/
typedef struct http1_observer_state_t http1_observer_state_t;
struct http1_observer_state_t {
int tbd;
};


/**
* state machine for observing HTTP/2.0
*/
typedef struct http2_observer_state_t http2_observer_state_t;
struct http2_observer_state_t {
int tbd;
};


/**
* Per flow protocol observer
*/
struct qdpo_transport_handle_t {
qdpo_t *parent;
vflow_record_t *vflow;
void *transport_context;
qd_protocol_t protocol; // current observed protocol

void (*observe)(qdpo_transport_handle_t *, bool from_client, const unsigned char *buf, size_t length);

union {
tcp_observer_state_t tcp;
http1_observer_state_t http1;
http2_observer_state_t http2;
};
};

void qdpo_tcp_init(qdpo_transport_handle_t *handle);
void qdpo_tcp_final(qdpo_transport_handle_t *handle);

void qdpo_http1_init(qdpo_transport_handle_t *handle);
void qdpo_http1_final(qdpo_transport_handle_t *handle);

void qdpo_http2_init(qdpo_transport_handle_t *handle);
void qdpo_http2_final(qdpo_transport_handle_t *handle);

#endif
Loading

0 comments on commit 429e345

Please sign in to comment.