diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h index c1b59d5f0..136931f81 100644 --- a/include/qpid/dispatch/message.h +++ b/include/qpid/dispatch/message.h @@ -63,7 +63,6 @@ typedef struct qd_link_t qd_link_t; // Callback for status change (confirmed persistent, loaded-in-memory, etc.) typedef struct qd_message_t qd_message_t; -typedef struct qd_message_stream_data_t qd_message_stream_data_t; /** Amount of message to be parsed. */ typedef enum { @@ -388,130 +387,6 @@ void qd_message_compose_5(qd_message_t *msg, qd_composed_field_t *field1, qd_com int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field, bool *q2_blocked); -/** - * qd_message_stream_data_iterator - * - * Return an iterator that references the content (not the performative headers) - * of the entire body-data section. - * - * The returned iterator must eventually be freed by the caller. - * - * @param stream_data Pointer to a stream_data object produced by qd_message_next_stream_data - * @return Pointer to an iterator referencing the stream_data content - */ -qd_iterator_t *qd_message_stream_data_iterator(const qd_message_stream_data_t *stream_data); - - -/** - * qd_message_stream_data_buffer_count - * - * Return the number of buffers that are needed to hold this body-data's content. - * - * @param stream_data Pointer to a stream_data object produced by qd_message_next_stream_data - * @return Number of pn_raw_buffers needed to contain the entire content of this stream_data. - */ -int qd_message_stream_data_buffer_count(const qd_message_stream_data_t *stream_data); - - -/** - * qd_message_stream_data_buffers - * - * Populate an array of pn_raw_buffer_t objects with references to the stream_data's content. - * - * @param stream_data Pointer to a stream_data object produced by qd_message_next_stream_data - * @param buffers Pointer to an array of pn_raw_buffer_t objects - * @param offset The offset (in the stream_data's buffer set) from which copying should begin - * @param count The number of pn_raw_buffer_t objects in the buffers array - * @return The number of pn_raw_buffer_t objects that were overwritten - */ -int qd_message_stream_data_buffers(qd_message_stream_data_t *stream_data, pn_raw_buffer_t *buffers, int offset, int count); - -/** - * qd_message_stream_data_payload_length - * - * Given a stream_data object, return the length of the payload. - * This will equal the sum of the length of all qd_buffer_t objects contained in payload portion of the stream_data object - * - * @param stream_data Pointer to a stream_data object produced by qd_message_next_stream_data - * @return The length of the payload of the passed in body data object. - */ -size_t qd_message_stream_data_payload_length(const qd_message_stream_data_t *stream_data); - - -/** - * qd_message_stream_data_release - * - * Release buffers that were associated with a body-data section. It is not required that body-data - * objects be released in the same order in which they were offered. - * - * Once this function is called, the caller must drop its reference to the stream_data object - * and not use it again. - * - * @param stream_data Pointer to a body data object returned by qd_message_next_stream_data - */ -void qd_message_stream_data_release(qd_message_stream_data_t *stream_data); - - -/** - * qd_message_stream_data_release_up_to - * - * Release this stream data and all the previous ones also. - * - * @param stream_data Pointer to a body data object returned by qd_message_next_stream_data - */ -void qd_message_stream_data_release_up_to(qd_message_stream_data_t *stream_data); - - -typedef enum { - QD_MESSAGE_STREAM_DATA_BODY_OK, // A valid body data object has been returned - QD_MESSAGE_STREAM_DATA_FOOTER_OK, // A valid footer has been returned - QD_MESSAGE_STREAM_DATA_INCOMPLETE, // The next body data is incomplete, try again later - QD_MESSAGE_STREAM_DATA_NO_MORE, // There are no more body data objects in this stream - QD_MESSAGE_STREAM_DATA_INVALID, // The next body data is invalid, the stream is corrupted - QD_MESSAGE_STREAM_DATA_ABORTED // sender has terminated the transfer, message is incomplete -} qd_message_stream_data_result_t; - - -/** - * qd_message_next_stream_data - * - * Get the next body-data section from this streaming message return the result and - * possibly the valid, completed stream_data object. - * - * @param msg Pointer to a message - * @param stream_data Output pointer to a stream_data object (or 0 if not OK) - * @return The stream_data_result describing the result of this operation - */ -qd_message_stream_data_result_t qd_message_next_stream_data(qd_message_t *msg, qd_message_stream_data_t **stream_data); - - -/** - * qd_message_stream_data_footer_append - * - * Constructs a footer field by calling the qd_compose(QD_PERFORMATIVE_FOOTER, field); - * It then inserts the passed in buffer list to the composed field and proceeds to disable q2 before finally adding the footer - * field to the message. - * - * Use this function if you have the complete footer data available in the passed in buffer list - */ -int qd_message_stream_data_footer_append(qd_message_t *message, qd_buffer_list_t *footer_props); - - -/** - * qd_message_stream_data_append - * - * Append the buffers in data as a sequence of one or more BODY_DATA sections - * to the given message. The buffers in data are moved into the message - * content by this function. - * - * @param msg Pointer to message under construction - * @param data List of buffers containing body data. - * @param qd_blocked Set to true if this call caused Q2 to block - * @return The number of buffers stored in the message's content - */ -int qd_message_stream_data_append(qd_message_t *msg, qd_buffer_list_t *data, bool *q2_blocked); - - /** Put string representation of a message suitable for logging in buffer. Note that log message text is limited to * QD_LOG_TEXT_MAX bytes which includes the terminating null byte. * diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 4ac0dc320..9ddce35e4 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -38,7 +38,6 @@ add_custom_command ( # Build the skupper-router library. set(qpid_dispatch_SOURCES - adaptors/reference_adaptor.c adaptors/adaptor_common.c adaptors/tcp/tcp_adaptor.c adaptors/test_adaptor.c diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c deleted file mode 100644 index 296bfcbe7..000000000 --- a/src/adaptors/reference_adaptor.c +++ /dev/null @@ -1,597 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "delivery.h" - -#include "qpid/dispatch/ctools.h" -#include "qpid/dispatch/message.h" -#include "qpid/dispatch/protocol_adaptor.h" -#include "qpid/dispatch/timer.h" -#include "qpid/dispatch/connection_counters.h" - -#include -#include - -static char *address1 = "examples"; -static char *address2 = "stream"; - -typedef struct qdr_ref_adaptor_t { - qdr_core_t *core; - qdr_protocol_adaptor_t *adaptor; - qd_timer_t *startup_timer; - qd_timer_t *activate_timer; - qd_timer_t *stream_timer; - qdr_connection_t *conn; - qdr_link_t *out_link_1; - qdr_link_t *out_link_2; - qdr_link_t *in_link_2; - qdr_link_t *dynamic_in_link; - char *reply_to; - qd_message_t *streaming_message; - qdr_delivery_t *streaming_delivery; - qd_message_t *incoming_message; - int stream_count; -} qdr_ref_adaptor_t; - - -void qdr_ref_connection_activate_CT(void *context, qdr_connection_t *conn) -{ - // - // Use a zero-delay timer to defer this call to an IO thread - // - // Note that this may not be generally safe to do. There's no guarantee that multiple - // activations won't schedule multiple IO threads running this code concurrently. - // Normally, we would rely on assurances provided by the IO scheduler (Proton) that no - // connection shall ever be served by more than one thread concurrently. - // - qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context; - qd_timer_schedule(adaptor->activate_timer, 0); -} - - -static void qdr_ref_first_attach(void *context, qdr_connection_t *conn, qdr_link_t *link, - qdr_terminus_t *source, qdr_terminus_t *target, - qd_session_class_t session_class) -{ -} - - -static void qdr_ref_second_attach(void *context, qdr_link_t *link, - qdr_terminus_t *source, qdr_terminus_t *target) -{ - qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context; -#define TERM_SIZE 200 - char ftarget[TERM_SIZE]; - char fsource[TERM_SIZE]; - - ftarget[0] = '\0'; - fsource[0] = '\0'; - - size_t size = TERM_SIZE; - qdr_terminus_format(source, fsource, &size); - - size = TERM_SIZE; - qdr_terminus_format(target, ftarget, &size); - - printf("qdr_ref_second_attach: source=%s target=%s\n", fsource, ftarget); - - if (link == adaptor->dynamic_in_link) { - // - // The dynamic in-link has been attached. Get the reply-to address and open - // a couple of out-links. - // - qd_iterator_t *reply_iter = qdr_terminus_get_address(source); - adaptor->reply_to = (char*) qd_iterator_copy(reply_iter); - printf("qdr_ref_second_attach: reply-to=%s\n", adaptor->reply_to); - - // - // Open an out-link for each address - // - uint64_t link_id; - qdr_terminus_t *target = qdr_terminus(0); - - qdr_terminus_set_address(target, address1); - adaptor->out_link_1 = qdr_link_first_attach(adaptor->conn, - QD_INCOMING, - qdr_terminus(0), //qdr_terminus_t *source, - target, //qdr_terminus_t *target, - "ref.1", //const char *name, - 0, //const char *terminus_addr, - false, //bool no_route - 0, //qdr_delivery_t *initial_delivery - &link_id); - - target = qdr_terminus(0); - qdr_terminus_set_address(target, address2); - adaptor->out_link_2 = qdr_link_first_attach(adaptor->conn, - QD_INCOMING, - qdr_terminus(0), //qdr_terminus_t *source, - target, //qdr_terminus_t *target, - "ref.2", //const char *name, - 0, //const char *terminus_addr, - false, //bool no_route - 0, //qdr_delivery_t *initial_delivery - &link_id); - - source = qdr_terminus(0); - qdr_terminus_set_address(source, address2); - adaptor->in_link_2 = qdr_link_first_attach(adaptor->conn, - QD_OUTGOING, - source, //qdr_terminus_t *source, - qdr_terminus(0), //qdr_terminus_t *target, - "ref.3", //const char *name, - 0, //const char *terminus_addr, - false, //bool no_route - 0, //qdr_delivery_t *initial_delivery - &link_id); - } -} - - -static void qdr_ref_detach(void *context, qdr_link_t *link, qdr_error_t *error, bool first, bool close) -{ -} - - -static void qdr_ref_flow(void *context, qdr_link_t *link, int credit) -{ - qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context; - - printf("qdr_ref_flow: %d credits issued\n", credit); - - if (link == adaptor->out_link_1) { - qd_composed_field_t *props = qd_compose(QD_PERFORMATIVE_PROPERTIES, 0); - qd_compose_start_list(props); - qd_compose_insert_null(props); // message-id - qd_compose_insert_null(props); // user-id - qd_compose_insert_null(props); // to - qd_compose_insert_null(props); // subject - qd_compose_insert_string(props, adaptor->reply_to); // reply-to - /* - qd_compose_insert_null(props); // correlation-id - qd_compose_insert_null(props); // content-type - qd_compose_insert_null(props); // content-encoding - qd_compose_insert_timestamp(props, 0); // absolute-expiry-time - qd_compose_insert_timestamp(props, 0); // creation-time - qd_compose_insert_null(props); // group-id - qd_compose_insert_uint(props, 0); // group-sequence - qd_compose_insert_null(props); // reply-to-group-id - */ - qd_compose_end_list(props); - - props = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, props); - qd_compose_insert_string(props, "Test Payload"); - - qd_message_t *msg = qd_message(); - - qd_message_compose_2(msg, props, true); - qd_compose_free(props); - - qdr_link_deliver(adaptor->out_link_1, msg, 0, false, 0, 0, 0, 0); - // Keep return-protection delivery reference as the adaptor's reference - } else if (link == adaptor->out_link_2) { - // - // Begin streaming a long message on the link. - // - qd_composed_field_t *props = qd_compose(QD_PERFORMATIVE_PROPERTIES, 0); - qd_compose_start_list(props); - qd_compose_insert_null(props); // message-id - qd_compose_insert_null(props); // user-id - qd_compose_insert_null(props); // to - qd_compose_insert_null(props); // subject - qd_compose_insert_string(props, adaptor->reply_to); // reply-to - qd_compose_end_list(props); - - adaptor->streaming_message = qd_message(); - - qd_message_compose_2(adaptor->streaming_message, props, false); - qd_compose_free(props); - qd_message_set_streaming_annotation(adaptor->streaming_message); - - printf("qdr_ref_flow: Starting a streaming delivery\n"); - adaptor->streaming_delivery = - qdr_link_deliver(adaptor->out_link_2, adaptor->streaming_message, 0, false, 0, 0, 0, 0); - adaptor->stream_count = 0; - // Keep return-protection delivery reference as the adaptor's reference - - qd_timer_schedule(adaptor->stream_timer, 1000); - } -} - - -static void qdr_ref_offer(void *context, qdr_link_t *link, int delivery_count) -{ -} - - -static void qdr_ref_drained(void *context, qdr_link_t *link) -{ -} - - -static void qdr_ref_drain(void *context, qdr_link_t *link, bool mode) -{ -} - - -static int qdr_ref_push(void *context, qdr_link_t *link, int limit) -{ - qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context; - return qdr_link_process_deliveries(adaptor->core, link, limit); -} - - -static uint64_t qdr_ref_deliver(void *context, qdr_link_t *link, qdr_delivery_t *delivery, bool settled) -{ - qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context; - qd_message_t *msg = qdr_delivery_message(delivery); - - adaptor->incoming_message = msg; - - printf("qdr_ref_deliver called\n"); - - qd_message_depth_status_t status = qd_message_check_depth(msg, QD_DEPTH_BODY); - - switch (status) { - case QD_MESSAGE_DEPTH_OK: { - // - // At least one complete body performative has arrived. It is now safe to switch - // over to the per-message extraction of body-data segments. - // - printf("qdr_ref_deliver: depth ok\n"); - qd_message_stream_data_t *stream_data; - qd_message_stream_data_result_t stream_data_result; - - // - // Process as many body-data segments as are available. - // - while (true) { - stream_data_result = qd_message_next_stream_data(msg, &stream_data); - - switch (stream_data_result) { - case QD_MESSAGE_STREAM_DATA_BODY_OK: { - // - // We have a new valid body-data segment. Handle it - // - printf("qdr_ref_deliver: stream_data_buffer_count: %d\n", qd_message_stream_data_buffer_count(stream_data)); - - qd_iterator_t *body_iter = qd_message_stream_data_iterator(stream_data); - char *body = (char*) qd_iterator_copy(body_iter); - printf("qdr_ref_deliver: message body-data received: %s\n", body); - free(body); - qd_iterator_free(body_iter); - qd_message_stream_data_release(stream_data); - break; - } - - case QD_MESSAGE_STREAM_DATA_FOOTER_OK: { - printf("qdr_ref_deliver: Received message footer\n"); - qd_iterator_t *footer_iter = qd_message_stream_data_iterator(stream_data); - qd_parsed_field_t *footer = qd_parse(footer_iter); - - if (qd_parse_ok(footer)) { - uint8_t tag = qd_parse_tag(footer); - if (tag == QD_AMQP_MAP8 || tag == QD_AMQP_MAP32) { - uint32_t item_count = qd_parse_sub_count(footer); - for (uint32_t i = 0; i < item_count; i++) { - qd_iterator_t *key_iter = qd_parse_raw(qd_parse_sub_key(footer, i)); - qd_iterator_t *value_iter = qd_parse_raw(qd_parse_sub_value(footer, i)); - char *key = (char*) qd_iterator_copy(key_iter); - char *value = (char*) qd_iterator_copy(value_iter); - printf("qdr_ref_deliver: %s: %s\n", key, value); - free(key); - free(value); - } - } else - printf("qdr_ref_deliver: Unexpected tag in footer: %02x\n", tag); - } else - printf("qdr_ref_deliver: Footer parse error: %s\n", qd_parse_error(footer)); - - qd_parse_free(footer); - qd_iterator_free(footer_iter); - qd_message_stream_data_release(stream_data); - break; - } - - case QD_MESSAGE_STREAM_DATA_INCOMPLETE: - // - // A new segment has not completely arrived yet. Check again later. - // - printf("qdr_ref_deliver: body-data incomplete\n"); - return 0; - - case QD_MESSAGE_STREAM_DATA_NO_MORE: - // - // We have already handled the last body-data segment for this delivery. - // Complete the "sending" of this delivery and replenish credit. - // - // Note that depending on the adaptor, it might be desirable to delay the - // acceptance and settlement of this delivery until a later event (i.e. when - // a requested action has completed). - // - qd_message_set_send_complete(msg); - qdr_link_flow(adaptor->core, link, 1, false); - adaptor->incoming_message = 0; - return PN_ACCEPTED; // This will cause the delivery to be settled - - case QD_MESSAGE_STREAM_DATA_INVALID: - case QD_MESSAGE_STREAM_DATA_ABORTED: - // - // The body-data is corrupted or the sender has aborted the message mid-flight. Stop handling the - // delivery and reject it. - // - printf("qdr_ref_deliver: body-data invalid/aborted\n"); - qdr_link_flow(adaptor->core, link, 1, false); - adaptor->incoming_message = 0; - return PN_REJECTED; - } - } - - break; - } - - case QD_MESSAGE_DEPTH_INVALID: - printf("qdr_ref_deliver: message invalid\n"); - qdr_link_flow(adaptor->core, link, 1, false); - adaptor->incoming_message = 0; - return PN_REJECTED; - break; - - case QD_MESSAGE_DEPTH_INCOMPLETE: - printf("qdr_ref_deliver: message incomplete\n"); - break; - } - - return 0; -} - - -static int qdr_ref_get_credit(void *context, qdr_link_t *link) -{ - return 8; -} - - -static void qdr_ref_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled) -{ - qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context; - char *dispname; - - switch (disp) { - case PN_ACCEPTED: dispname = "ACCEPTED"; break; - case PN_REJECTED: dispname = "REJECTED"; break; - case PN_RELEASED: dispname = "RELEASED"; break; - case PN_MODIFIED: dispname = "MODIFIED"; break; - default: - dispname = ""; - } - printf("qdr_ref_delivery_update: disp=%s settled=%s\n", dispname, settled ? "true" : "false"); - - if (qdr_delivery_link(dlv) == adaptor->out_link_2 && qdr_delivery_message(dlv) == adaptor->streaming_message) { - adaptor->streaming_message = 0; - adaptor->stream_count = 0; - } - - if (settled) - qdr_delivery_decref(adaptor->core, dlv, "qdr_ref_delivery_update - settled delivery"); -} - - -static void qdr_ref_conn_close(void *context, qdr_connection_t *conn, qdr_error_t *error) -{ - qd_connection_counter_dec(QD_PROTOCOL_TCP); -} - - -static void qdr_ref_conn_trace(void *context, qdr_connection_t *conn, bool trace) -{ -} - - -static void on_startup(void *context) -{ - qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context; - - qdr_connection_info_t *info = qdr_connection_info(false, //bool is_encrypted, - false, //bool is_authenticated, - true, //bool opened, - "", //char *sasl_mechanisms, - QD_INCOMING, //qd_direction_t dir, - "", //const char *host, - "", //const char *ssl_proto, - "", //const char *ssl_cipher, - "", //const char *user, - "reference-adaptor", //const char *container, - 0, //pn_data_t *connection_properties, - 0, //int ssl_ssf, - false, //bool ssl, - "", // peer router version, - false, // streaming links - false); // connection trunking - - adaptor->conn = qdr_connection_opened(adaptor->core, // core - adaptor->adaptor, // protocol_adaptor - true, // incoming - QDR_ROLE_NORMAL, // role - 1, // cost - qd_server_allocate_connection_id(adaptor->core->qd->server), - 0, // label - 0, // remote_container_id - false, // strip_annotations_in - false, // strip_annotations_out - 250, // link_capacity - 0, // policy_spec - info, // connection_info - 0, // context_binder - 0); // bind_token - qd_connection_counter_inc(QD_PROTOCOL_TCP); - - uint64_t link_id; - - // - // Create a dynamic receiver - // - qdr_terminus_t *dynamic_source = qdr_terminus(0); - qdr_terminus_set_dynamic(dynamic_source); - - adaptor->dynamic_in_link = qdr_link_first_attach(adaptor->conn, - QD_OUTGOING, - dynamic_source, //qdr_terminus_t *source, - qdr_terminus(0), //qdr_terminus_t *target, - "ref.0", //const char *name, - 0, //const char *terminus_addr, - false, //bool no_route - 0, //qdr_delivery_t *initial_delivery - &link_id); -} - - -static void on_activate(void *context) -{ - qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context; - - while (qdr_connection_process(adaptor->conn)) {} -} - - -static void on_stream(void *context) -{ - qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context; - const char *content = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; - const size_t content_length = strlen(content); - int depth; - - if (!adaptor->streaming_message) - return; - - { - // - // This section shows the proper way to extend a streaming message with content. - // Note that the buffer list may be accumulated over the course of many asynchronous - // events before it is placed in the composed field and appended to the message stream. - // - - // - // Accumulated buffer list - // - for (int sections = 0; sections < 3; sections++) { - qd_buffer_list_t buffer_list; - DEQ_INIT(buffer_list); - qd_buffer_list_append(&buffer_list, (const uint8_t*) content, content_length); - qd_buffer_list_append(&buffer_list, (const uint8_t*) content, content_length); - - // - // Compose a DATA performative for this section of the stream - // - qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0); - qd_compose_insert_binary_buffers(field, &buffer_list); - - // - // Extend the streaming message and free the composed field - // - // TODO(kgiusti): need to handle Q2 blocking event - depth = qd_message_extend(adaptor->streaming_message, field, 0); - qd_compose_free(field); - } - - // - // Notify the router that more data is ready to be pushed out on the delivery - // - qdr_delivery_continue(adaptor->core, adaptor->streaming_delivery, false); - } - - if (adaptor->stream_count < 10) { - qd_timer_schedule(adaptor->stream_timer, 100); - adaptor->stream_count++; - printf("on_stream: sent streamed frame %d, depth=%d\n", adaptor->stream_count, depth); - } else { - qd_composed_field_t *footer = qd_compose(QD_PERFORMATIVE_FOOTER, 0); - qd_compose_start_map(footer); - qd_compose_insert_symbol(footer, "trailer"); - qd_compose_insert_string(footer, "value"); - qd_compose_insert_symbol(footer, "second"); - qd_compose_insert_string(footer, "value2"); - qd_compose_end_map(footer); - // @TODO(kgiusti): need to handle Q2 blocking event - depth = qd_message_extend(adaptor->streaming_message, footer, 0); - qd_compose_free(footer); - - qd_message_set_receive_complete(adaptor->streaming_message); - adaptor->streaming_message = 0; - adaptor->stream_count = 0; - printf("on_stream: completed streaming send, depth=%d\n", depth); - } -} - - -/** - * This initialization function will be invoked when the router core is ready for the protocol - * adaptor to be created. This function must: - * - * 1) Register the protocol adaptor with the router-core. - * 2) Prepare the protocol adaptor to be configured. - */ -void qdr_ref_adaptor_init(qdr_core_t *core, void **adaptor_context) -{ - qdr_ref_adaptor_t *adaptor = NEW(qdr_ref_adaptor_t); - ZERO(adaptor); - adaptor->core = core; - adaptor->adaptor = qdr_protocol_adaptor(core, - "reference", // name - adaptor, // context - qdr_ref_connection_activate_CT, - qdr_ref_first_attach, - qdr_ref_second_attach, - qdr_ref_detach, - qdr_ref_flow, - qdr_ref_offer, - qdr_ref_drained, - qdr_ref_drain, - qdr_ref_push, - qdr_ref_deliver, - qdr_ref_get_credit, - qdr_ref_delivery_update, - qdr_ref_conn_close, - qdr_ref_conn_trace); - *adaptor_context = adaptor; - - adaptor->startup_timer = qd_timer(core->qd, on_startup, adaptor); - qd_timer_schedule(adaptor->startup_timer, 0); - - adaptor->activate_timer = qd_timer(core->qd, on_activate, adaptor); - adaptor->stream_timer = qd_timer(core->qd, on_stream, adaptor); -} - - -void qdr_ref_adaptor_final(void *adaptor_context) -{ - qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) adaptor_context; - qdr_protocol_adaptor_free(adaptor->core, adaptor->adaptor); - qd_timer_free(adaptor->startup_timer); - qd_timer_free(adaptor->activate_timer); - qd_message_free(adaptor->streaming_message); - qd_message_free(adaptor->incoming_message); - free(adaptor->reply_to); - free(adaptor); -} - -/** - * Declare the adaptor so that it will self-register on process startup. - */ -//QDR_CORE_ADAPTOR_DECLARE("ref-adaptor", qdr_ref_adaptor_init, qdr_ref_adaptor_final) diff --git a/src/message.c b/src/message.c index 8807e93db..0e44e3733 100644 --- a/src/message.c +++ b/src/message.c @@ -96,7 +96,6 @@ PN_HANDLE(PN_DELIVERY_CTX) ALLOC_DEFINE_CONFIG_SAFE(qd_message_t, sizeof(qd_message_pvt_t), 0, 0); ALLOC_DEFINE(qd_message_content_t); -ALLOC_DEFINE(qd_message_stream_data_t); typedef void (*buffer_process_t) (void *context, const unsigned char *base, int length); @@ -731,9 +730,7 @@ static qd_section_status_t message_section_check_LH(qd_message_content_t *conten bool protect_buffer) TA_REQ(content->lock) { // Note well: do NOT modify the input buffer and cursor values if there is - // no match! We need to preserve the original cursor/buffer positions when - // parsing out body data sections so we can correctly manage the buffer - // fanout counts when releasing qd_message_stream_data_t instances. + // no match! Otherwise alternative long pattern matches will fail. qd_buffer_t *test_buffer = *buffer; unsigned char *test_cursor = *cursor; @@ -1151,11 +1148,6 @@ void qd_message_free(qd_message_t *in_msg) // LOCK(&content->lock); - // DISPATCH-2099: ensure all outstanding stream_data items associated - // with this message have been returned since the underlying buffers - // may be released - assert(DEQ_IS_EMPTY(msg->stream_data_list)); - const bool was_blocked = !_Q2_holdoff_should_unblock_LH(content); qd_buffer_t *buf = msg->cursor.buffer; while (buf) { @@ -2784,390 +2776,6 @@ int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field, bool *q2_bl } -/** - * find_last_buffer_LH - * - * Given a field location, find the following: - * - * - *cursor - The pointer to the octet _past_ the last octet in the field. If this is the last octet in - * the buffer, the cursor must point one octet past the buffer. - * - *buffer - The last buffer that contains content for this field. - * - * Important: If the last octet of the field is the last octet of a buffer and there are more buffers in the - * buffer list, *buffer _must_ refer to the buffer that contains the last octet of the field and *cursor must - * point at the octet following that octet, even if it points past the end of the buffer. - */ -static void find_last_buffer_LH(qd_field_location_t *location, unsigned char **cursor, qd_buffer_t **buffer) -{ - qd_buffer_t *buf = location->buffer; - size_t remaining = location->hdr_length + location->length; - - while (!!buf && remaining > 0) { - size_t this_buf_size = qd_buffer_size(buf) - (buf == location->buffer ? location->offset : 0); - if (remaining <= this_buf_size) { - *buffer = buf; - *cursor = qd_buffer_base(buf) + (buf == location->buffer ? location->offset : 0) + remaining; - return; - } - remaining -= this_buf_size; - buf = DEQ_NEXT(buf); - } - - assert(false); // The field should already have been validated as complete. -} - - -void trim_stream_data_headers_LH(qd_message_stream_data_t *stream_data, bool remove_vbin_header) //TA_REQ(stream_data->owning_message->content->lock) -{ - const qd_field_location_t *location = &stream_data->section; - qd_buffer_t *buffer = location->buffer; - unsigned char *cursor = qd_buffer_base(buffer) + location->offset; - - bool good = advance(&cursor, &buffer, location->hdr_length); - assert(good); - if (good) { - size_t vbin_hdr_len = 0; - unsigned char tag = 0; - - if (remove_vbin_header) { - vbin_hdr_len = 1; - // coverity[check_return] - next_octet(&cursor, &buffer, &tag); - if (tag == QD_AMQP_VBIN8) { - advance(&cursor, &buffer, 1); - vbin_hdr_len += 1; - } else if (tag == QD_AMQP_VBIN32) { - advance(&cursor, &buffer, 4); - vbin_hdr_len += 4; - } - } - - // coverity[check_return] - can_advance(&cursor, &buffer); // bump cursor to the next buffer if necessary - - stream_data->payload.buffer = buffer; - stream_data->payload.offset = cursor - qd_buffer_base(buffer); - stream_data->payload.length = location->length - vbin_hdr_len; - stream_data->payload.hdr_length = 0; - stream_data->payload.parsed = true; - stream_data->payload.tag = tag; - } -} - - -/** - * qd_message_stream_data_iterator - * - * Given a stream_data object, return an iterator that refers to the content of that body data. This iterator - * shall not refer to the 3-byte performative header or the header for the vbin{8,32} field. - * - * The iterator must be freed eventually by the caller. - */ -qd_iterator_t *qd_message_stream_data_iterator(const qd_message_stream_data_t *stream_data) -{ - const qd_field_location_t *location = &stream_data->payload; - - return qd_iterator_buffer(location->buffer, location->offset, location->length, ITER_VIEW_ALL); -} - -/** - * qd_message_stream_data_payload_length - * - * Given a stream_data object, return the length of the payload. - */ -size_t qd_message_stream_data_payload_length(const qd_message_stream_data_t *stream_data) -{ - return stream_data->payload.length; -} - - -/** - * qd_message_stream_data_buffer_count - * - * Return the number of buffers contained in payload portion of the stream_data object. - */ -int qd_message_stream_data_buffer_count(const qd_message_stream_data_t *stream_data) -{ - if (stream_data->payload.length == 0) - return 0; - - int count = 1; - qd_buffer_t *buffer = stream_data->payload.buffer; - while (!!buffer && buffer != stream_data->last_buffer) { - buffer = DEQ_NEXT(buffer); - count++; - } - - return count; -} - - -/** - * qd_message_stream_data_buffers - * - * Populate the provided array of pn_raw_buffers with the addresses and lengths of the buffers in the stream_data - * object. Don't fill more than count raw_buffers with data. Start at offset from the zero-th buffer in the - * stream_data. - */ -int qd_message_stream_data_buffers(qd_message_stream_data_t *stream_data, pn_raw_buffer_t *buffers, int offset, int count) -{ - qd_buffer_t *buffer = stream_data->payload.buffer; - size_t data_offset = stream_data->payload.offset; - size_t payload_len = stream_data->payload.length; - - qd_message_pvt_t *owning_message = stream_data->owning_message; - - - LOCK(&owning_message->content->lock); - // - // Skip the buffer offset - // - if (offset > 0) { - assert(offset < qd_message_stream_data_buffer_count(stream_data)); - while (offset > 0 && payload_len > 0) { - payload_len -= qd_buffer_size(buffer) - data_offset; - offset--; - data_offset = 0; - buffer = DEQ_NEXT(buffer); - } - } - - // - // Fill the buffer array - // - int idx = 0; - while (idx < count && payload_len > 0) { - size_t buf_size = MIN(payload_len, qd_buffer_size(buffer) - data_offset); - buffers[idx].context = 0; // reserved for use by caller - do not modify! - buffers[idx].bytes = (char*) qd_buffer_base(buffer) + data_offset; - buffers[idx].capacity = QD_BUFFER_SIZE; - buffers[idx].size = buf_size; - buffers[idx].offset = 0; - - data_offset = 0; - payload_len -= buf_size; - buffer = DEQ_NEXT(buffer); - idx++; - } - UNLOCK(&owning_message->content->lock); - - return idx; -} - -void qd_message_stream_data_release_up_to(qd_message_stream_data_t *stream_data) -{ - if (!stream_data) - return; - - qd_message_pvt_t *msg = stream_data->owning_message; - qd_message_stream_data_t *next = DEQ_HEAD(msg->stream_data_list); - qd_message_stream_data_t *current = NULL; - while (next && current != stream_data) { - current = next; - next = DEQ_NEXT(next); - qd_message_stream_data_release(current); - } -} - -/** - * qd_message_stream_data_release - * - * Decrement the fanout ref-counts for all of the buffers referred to in the stream_data. If any have reached zero, - * remove them from the buffer list and free them. - */ -void qd_message_stream_data_release(qd_message_stream_data_t *stream_data) -{ - if (!stream_data) - return; - - qd_message_pvt_t *pvt = stream_data->owning_message; - qd_message_content_t *content = pvt->content; - - // Must free these in-order (oldest first). This simplifies the code. - assert(DEQ_PREV(stream_data) == 0); - - // find the range of buffers that do not overlap other stream_data - // or msg->body_buffer - - qd_buffer_t *start_buf = stream_data->first_buffer; - qd_buffer_t *stop_buf = stream_data->last_buffer; - - // end_boundary is the next buffer that is referenced by this message past the section that is being freed. - // That buffer cannot be released. - - qd_buffer_t *end_boundary = DEQ_NEXT(stream_data) ? DEQ_NEXT(stream_data)->first_buffer : pvt->body_buffer; - while (stop_buf != end_boundary) - // safe to free last_buffer since this message no longer references it - stop_buf = DEQ_NEXT(stop_buf); - assert(stop_buf); - - DEQ_REMOVE(pvt->stream_data_list, stream_data); - free_qd_message_stream_data_t(stream_data); - - if (start_buf == stop_buf) // no unreferenced buffers to free - return; - - LOCK(&content->lock); - - bool was_blocked = !_Q2_holdoff_should_unblock_LH(content); - const bool fanout = pvt->is_fanout; - qd_message_q2_unblocker_t q2_unblock = {0}; - - // - // Free non-overlapping buffers with zero refcounts. - // Check against 1 because the last value is returned - // - while (start_buf != stop_buf) { - qd_buffer_t *next = DEQ_NEXT(start_buf); - uint32_t refcnt = (fanout) ? qd_buffer_dec_fanout(start_buf) : 1; - assert(refcnt > 0); - if (refcnt == 1) { - DEQ_REMOVE(content->buffers, start_buf); - qd_buffer_free(start_buf); - } - start_buf = next; - } - - // - // it is possible that we've freed enough buffers to clear Q2 holdoff - // - if (content->q2_input_holdoff - && was_blocked - && _Q2_holdoff_should_unblock_LH(content)) { - content->q2_input_holdoff = false; - q2_unblock = content->q2_unblocker; - } - - UNLOCK(&content->lock); - - if (q2_unblock.handler) - q2_unblock.handler(q2_unblock.context); -} - - -qd_message_stream_data_result_t qd_message_next_stream_data(qd_message_t *in_msg, qd_message_stream_data_t **out_stream_data) -{ - qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; - qd_message_content_t *content = msg->content; - qd_message_stream_data_t *stream_data = 0; - - *out_stream_data = 0; - if (!msg->body_cursor) { - // - // We haven't returned a body-data record for this message yet. Start - // by ensuring the message has been parsed up to the first body section - // - - qd_message_depth_status_t status = qd_message_check_depth(in_msg, QD_DEPTH_BODY); - if (status == QD_MESSAGE_DEPTH_OK) { - // Even if DEPTH_OK, body is optional. If there is no body then move to - // the footer - if (msg->content->section_body.buffer) { - msg->body_buffer = msg->content->section_body.buffer; - msg->body_cursor = qd_buffer_base(msg->body_buffer) + msg->content->section_body.offset; - } else { - // No body. Look for footer - status = qd_message_check_depth(in_msg, QD_DEPTH_ALL); - if (status == QD_MESSAGE_DEPTH_OK) { - if (msg->content->section_footer.buffer) { - // footer is also optional - msg->body_buffer = msg->content->section_footer.buffer; - msg->body_cursor = qd_buffer_base(msg->body_buffer) + msg->content->section_footer.offset; - } - } - } - } - - switch (status) { - case QD_MESSAGE_DEPTH_OK: - if (!msg->body_buffer) { - // neither data nor footer found - return IS_ATOMIC_FLAG_SET(&msg->content->aborted) ? QD_MESSAGE_STREAM_DATA_ABORTED - : QD_MESSAGE_STREAM_DATA_NO_MORE; - } - // proceed to parse out the VBIN - break; - - case QD_MESSAGE_DEPTH_INVALID: - return QD_MESSAGE_STREAM_DATA_INVALID; - - case QD_MESSAGE_DEPTH_INCOMPLETE: - return IS_ATOMIC_FLAG_SET(&msg->content->aborted) ? QD_MESSAGE_STREAM_DATA_ABORTED - : QD_MESSAGE_STREAM_DATA_INCOMPLETE; - } - } - - // parse out the body data section, or the footer if we're past the - // last data section - - qd_section_status_t section_status; - qd_field_location_t location; - ZERO(&location); - - // remember starting buffer since section check may advance location.buffer - // pointer to the next buffer and it may never get freed. - qd_buffer_t *start_buffer = msg->body_buffer; - bool is_footer = false; - qd_message_stream_data_result_t result = QD_MESSAGE_STREAM_DATA_NO_MORE; - - LOCK(&content->lock); - - section_status = message_section_check_LH(content, - &msg->body_buffer, &msg->body_cursor, - BODY_DATA_SHORT, 3, TAGS_BINARY, - &location, - true, // allow duplicates - false); // do not inc buffer fanout - if (section_status == QD_SECTION_NO_MATCH) { - is_footer = true; - section_status = message_section_check_LH(content, - &msg->body_buffer, &msg->body_cursor, - FOOTER_SHORT, 3, TAGS_MAP, - &location, true, false); - } - - switch (section_status) { - case QD_SECTION_INVALID: - case QD_SECTION_NO_MATCH: - result = QD_MESSAGE_STREAM_DATA_INVALID; - break; - - case QD_SECTION_MATCH: - stream_data = new_qd_message_stream_data_t(); - ZERO(stream_data); - stream_data->owning_message = msg; - stream_data->section = location; - stream_data->first_buffer = start_buffer; - // TODO - find_last_buffer_LH(&stream_data->section, &msg->body_cursor, &msg->body_buffer); - stream_data->last_buffer = msg->body_buffer; - // TODO - trim_stream_data_headers_LH(stream_data, !is_footer); - DEQ_INSERT_TAIL(msg->stream_data_list, stream_data); - *out_stream_data = stream_data; - result = is_footer ? QD_MESSAGE_STREAM_DATA_FOOTER_OK : QD_MESSAGE_STREAM_DATA_BODY_OK; - break; - - case QD_SECTION_NEED_MORE: - result = IS_ATOMIC_FLAG_SET(&msg->content->receive_complete) ? - QD_MESSAGE_STREAM_DATA_NO_MORE : QD_MESSAGE_STREAM_DATA_INCOMPLETE; - break; - } - - UNLOCK(&content->lock); - - // clang-format off - if (IS_ATOMIC_FLAG_SET(&msg->content->aborted)) { - result = QD_MESSAGE_STREAM_DATA_ABORTED; - qd_message_stream_data_release(*out_stream_data); - *out_stream_data = 0; - } - // clang-format on - - return result; -} - - qd_parsed_field_t *qd_message_get_ingress_router(qd_message_t *msg) { return ((qd_message_pvt_t*) msg)->content->ra_pf_ingress; @@ -3274,73 +2882,6 @@ bool qd_message_oversize(const qd_message_t *msg) } -int qd_message_stream_data_footer_append(qd_message_t *message, qd_buffer_list_t *footer_props) -{ - qd_composed_field_t *field = 0; - int rc = 0; - - field = qd_compose(QD_PERFORMATIVE_FOOTER, field); - - // Stick the buffers into the footer compose field. - qd_compose_insert_binary_buffers(field, footer_props); - - rc = qd_message_extend(message, field, 0); - - qd_compose_free(field); - return rc; - -} - -int qd_message_stream_data_append(qd_message_t *message, qd_buffer_list_t *data, bool *q2_blocked) -{ - unsigned int length = DEQ_SIZE(*data); - - qd_composed_field_t *field = 0; - int rc = 0; - - if (q2_blocked) - *q2_blocked = false; - - // DISPATCH-1803: ensure no body data section can exceed the - // QD_QLIMIT_Q2_LOWER. This allows the egress router to wait for an entire - // body data section to arrive and be validated before sending it out to - // the endpoint without preventing Q2 from being relieved (DISPATCH-2191). - // - const size_t buf_limit = QD_QLIMIT_Q2_LOWER - 2; // reserve 1 extra for performative header - assert(buf_limit); - while (length > buf_limit) { - qd_buffer_t *buf = DEQ_HEAD(*data); - for (int i = 0; i < buf_limit; ++i) { - buf = DEQ_NEXT(buf); - } - - // split the list at buf. buf becomes head of trailing list - - qd_buffer_list_t trailer = DEQ_EMPTY; - DEQ_HEAD(trailer) = buf; - DEQ_TAIL(trailer) = DEQ_TAIL(*data); - DEQ_TAIL(*data) = DEQ_PREV(buf); - DEQ_NEXT(DEQ_TAIL(*data)) = 0; - DEQ_PREV(buf) = 0; - DEQ_SIZE(trailer) = length - buf_limit; - DEQ_SIZE(*data) = buf_limit; - - field = qd_compose(QD_PERFORMATIVE_BODY_DATA, field); - qd_compose_insert_binary_buffers(field, data); - - DEQ_MOVE(trailer, *data); - length -= buf_limit; - } - - field = qd_compose(QD_PERFORMATIVE_BODY_DATA, field); - qd_compose_insert_binary_buffers(field, data); - - rc = qd_message_extend(message, field, q2_blocked); - qd_compose_free(field); - return rc; -} - - void qd_message_set_q2_unblocked_handler(qd_message_t *msg, qd_message_q2_unblocked_handler_t callback, qd_alloc_safe_ptr_t context) diff --git a/src/message_private.h b/src/message_private.h index 899064fff..800a8edf1 100644 --- a/src/message_private.h +++ b/src/message_private.h @@ -62,19 +62,6 @@ typedef struct { } qd_field_location_t; -struct qd_message_stream_data_t { - DEQ_LINKS(qd_message_stream_data_t); // Linkage to form a DEQ - qd_message_pvt_t *owning_message; // Pointer to the owning message - qd_field_location_t section; // Section descriptor for the field - qd_field_location_t payload; // Descriptor for the payload of the body data - qd_buffer_t *first_buffer; // for freeing, may be before section buffer! - qd_buffer_t *last_buffer; // Pointer to the last buffer in the field -}; - -ALLOC_DECLARE(qd_message_stream_data_t); -DEQ_DECLARE(qd_message_stream_data_t, qd_message_stream_data_list_t); - - typedef struct { qd_message_q2_unblocked_handler_t handler; qd_alloc_safe_ptr_t context; @@ -175,11 +162,6 @@ struct qd_message_pvt_t { bool tag_sent; // Tags are sent bool is_fanout; // Message is an outgoing fanout bool uct_started; // Cut-through has been started for this message - - qd_message_stream_data_list_t stream_data_list;// Stream data parse structure - // TODO - move this to the content for one-time parsing (TLR) - unsigned char *body_cursor; // Stream: tracks the point in the content buffer chain - qd_buffer_t *body_buffer; // Stream: to parse the next body data section, if any sys_atomic_t send_complete; // Message has been been completely sent }; diff --git a/tests/message_test.c b/tests/message_test.c index 9a777419e..feaba8bf5 100644 --- a/tests/message_test.c +++ b/tests/message_test.c @@ -846,241 +846,6 @@ static char *test_check_weird_messages(void *context) return result; } -// -// Testing protocol adapter 'stream_data' interfaces -// - -static void stream_data_generate_message(qd_message_t *msg, char *s_chunk_size, char *s_n_chunks) -{ - // Fill a message with n_chunks of vbin chunk_size body data. - - int chunk_size = atoi(s_chunk_size); - int n_chunks = atoi(s_n_chunks); - - // Add message headers - qd_message_compose_1(msg, "whom-it-may-concern", 0); - - // Add the chunks. This creates the test state for not-flattened buffers. - for (int j=0; jbuffers); - qd_message_extend(msg, field, 0); - - // Clean up temporary resources - free(buf2); - qd_compose_free(field); - qd_message_free(mule); - } -} - -static void free_stream_data_list(qd_message_t *msg_in) -{ - // DISPATCH-1800 - this should not be required here - qd_message_pvt_t *msg = (qd_message_pvt_t *)msg_in; - qd_message_stream_data_t *bd = DEQ_HEAD(msg->stream_data_list); - while (bd) { - qd_message_stream_data_t *next = DEQ_NEXT(bd); - free_qd_message_stream_data_t(bd); - bd = next; - } - -} - -static char *check_stream_data(char *s_chunk_size, char *s_n_chunks, bool flatten) -{ - // Fill a message with n chunks of vbin chunk_size body data. - // Then test by retrieving n chunks from a message copy and verifying. - // - // 'flatten' messes with message buffers after they have been composed. - // * Not flattened means that vbin headers stand alone in separate buffers and - // vbin data always starts in the first byte of a new buffer. This is the - // buffer condition when a message is forwarded between adaptors on a single - // router. The receiver and sender have two messages but share message content. - // * Flattened means that vbin headers and vbin data are packed into the buffer - // list. This is the buffer condition when a message is forwarded between - // routers and the receiver is handling the vbin segments. - - int chunk_size = atoi(s_chunk_size); - int n_chunks = atoi(s_n_chunks); - - char *result = 0; - int received; // got this much of chunk_size chunk - - // Messages for setting/sensing body data - qd_message_t *msg = qd_message(); - qd_message_t *copy = qd_message_copy(msg); - qd_message_pvt_t *msg_pvt = (qd_message_pvt_t *)msg; - - // Set the original message content - stream_data_generate_message(msg, s_chunk_size, s_n_chunks); - - // flatten if required - if (flatten) { - // check that the flatten buffer is big enough - assert(FLAT_BUF_SIZE > (n_chunks * (chunk_size - // per-chunk vbin descriptor overhead: - + (chunk_size > 511 ? 8 : 5)) - + 100)); // leave plenty of allocation for header - - // compress message into flatten buffer - size_t flat_size = flatten_bufs(MSG_CONTENT(msg)); - - // erase buffer list in msg and copy - qd_buffer_list_free_buffers(&msg_pvt->content->buffers); - - // reconstruct buffer list from flat buffer - qd_buffer_list_append(&msg_pvt->content->buffers, buffer, flat_size); - } - - // check the chunks - // Define the number of raw buffers to be extracted on each loop -#define N_PN_RAW_BUFFS (2) - - qd_message_stream_data_t *stream_data; - - for (int j=0; jpayload.length != chunk_size) { - printf( - "********** check_stream_data: BUFFER_SIZE=%zu, pn-buf-array-size:%d, " - "chunk_size:%s, n_chunks:%s, payload length error : %zu \n", - QD_BUFFER_SIZE, N_PN_RAW_BUFFS, s_chunk_size, s_n_chunks, stream_data->payload.length); - fflush(stdout); - result = "qd_message_next_stream_data returned wrong payload length."; - break; - } - - // Loop to extract the body data - // * verify content - // * verify body data length - - // buffs - body data is extracted through this array of raw buffers - pn_raw_buffer_t buffs[N_PN_RAW_BUFFS]; - - // used_buffers - Number of qd_buffers in content buffer chain consumed so far. - // This number must increase as dictated by qd_message_stream_data_buffers() - // when vbin segments are consumed from the current stream_data chunk. - // A single vbin segment may consume 0, 1, or many qd_buffers. - size_t used_buffers = 0; - - while (received < chunk_size) { - ZERO(buffs); - size_t n_used = qd_message_stream_data_buffers(stream_data, buffs, used_buffers, N_PN_RAW_BUFFS); - if (n_used > 0) { - for (size_t ii=0; ii chunk_size) { - printf( - "********** check_stream_data: QD_BUFFER_SIZE=%zu, pn-buf-array-size:%d, " - "chunk_size:%s, n_chunks:%s, received %d bytes (too many) \n", - QD_BUFFER_SIZE, N_PN_RAW_BUFFS, s_chunk_size, s_n_chunks, received); - result = "Received too much data"; - break; - } - } - // successful check - - } else if (stream_data_result == QD_MESSAGE_STREAM_DATA_INCOMPLETE) { - result = "DATA_INCOMPLETE"; break; - } else { - switch (stream_data_result) { - case QD_MESSAGE_STREAM_DATA_NO_MORE: - result = "EOS"; break; - case QD_MESSAGE_STREAM_DATA_INVALID: - result = "Invalid body data for streaming message"; break; - default: - result = "result: default"; break; - } - } - } - - free_stream_data_list(msg); - qd_message_free(msg); - if (!!copy) { - free_stream_data_list(copy); - qd_message_free(copy); - } - return result; -} - -static char *test_check_stream_data(void * context) -{ - char *result = 0; - -#define N_CHUNK_SIZES (10) - char *chunk_sizes[N_CHUNK_SIZES] = {"1", "10", "100", "510", "511", "512", "513", "1023", "1024", "1025"}; - -#define N_N_CHUNKS (4) - char *n_chunks[N_N_CHUNKS] = {"1", "2", "10", "25"}; - - for (int i=0; ibuffers); - - bool blocked; - int depth = qd_message_stream_data_append(msg, &bin_data, &blocked); - if (depth <= body_bufct) { - // expected to add extra buffer(s) for meta-data - result = "append length is incorrect"; - goto exit; - } - - // expected that the append has triggered Q2 blocking: - if (!blocked) { - result = "expected Q2 block event did not occur!"; - goto exit; - } - - // And while we're at it, stuff in a footer - field = qd_compose(QD_PERFORMATIVE_FOOTER, 0); - qd_compose_start_map(field); - qd_compose_insert_symbol(field, "Key1"); - qd_compose_insert_string(field, "Value1"); - qd_compose_insert_symbol(field, "Key2"); - qd_compose_insert_string(field, "Value2"); - qd_compose_end_map(field); - qd_message_extend(msg, field, 0); - qd_compose_free(field); - - qd_message_set_receive_complete(msg); - - // "forward" the message - out_msg = qd_message_copy(msg); - - // walk the data streams... - int bd_count = 0; - int body_buffers = 0; - qd_message_stream_data_t *stream_data = 0; - bool done = false; - int footer_found = 0; - while (!done) { - switch (qd_message_next_stream_data(out_msg, &stream_data)) { - case QD_MESSAGE_STREAM_DATA_INCOMPLETE: - case QD_MESSAGE_STREAM_DATA_INVALID: - case QD_MESSAGE_STREAM_DATA_ABORTED: - result = "Next body data failed to get next body data"; - goto exit; - case QD_MESSAGE_STREAM_DATA_NO_MORE: - done = true; - break; - case QD_MESSAGE_STREAM_DATA_FOOTER_OK: - bd_count += 1; - footer_found += 1; - qd_message_stream_data_release(stream_data); - break; - case QD_MESSAGE_STREAM_DATA_BODY_OK: - bd_count += 1; - // qd_message_stream_data_append() breaks the buffer list up into - // smaller lists that are no bigger than QD_QLIMIT_Q2_LOWER buffers - // long - body_buffers += qd_message_stream_data_buffer_count(stream_data); - if (qd_message_stream_data_buffer_count(stream_data) >= QD_QLIMIT_Q2_LOWER) { - result = "Body data list length too long!"; - goto exit; - } - qd_message_stream_data_release(stream_data); - break; - } - } - - // verify: - - if (body_bufct != body_buffers) { - result = "Not all body data buffers were decoded!"; - goto exit; - } - - if (footer_found != 1) { - result = "I ordered a side of 'footer' with that message!"; - goto exit; - } - - // +2 for 1 extra 5 buffers and 1 for footer - if (bd_count != (body_bufct / QD_QLIMIT_Q2_LOWER) + 2) { - result = "Unexpected count of body data sections!"; - goto exit; - } - - // expect: free all the body and footer buffers except for the very last - // buffer. Remember kids: perfect is good, but done is better. - if (DEQ_SIZE(MSG_CONTENT(out_msg)->buffers) != base_bufct + 1) { - result = "Possible buffer leak detected!"; - goto exit; - } - - // and Q2 should be unblocked - if (qd_message_is_Q2_blocked(msg)) { - result = "Q2 expected to be unblocked!"; - goto exit; - } - - if (unblock_called != 1) { - result = "Q2 unblock handler not called!"; - goto exit; - } - - -exit: - qd_message_free(msg); - qd_message_free(out_msg); - return result; -} - - -// Verify that decoding streaming body data across two -// "outgoing" messages works -static char *test_check_stream_data_fanout(void *context) -{ - char *result = 0; - qd_message_t *in_msg = 0; - qd_message_t *out_msg1 = 0; - qd_message_t *out_msg2 = 0; - - // simulate building a message as an adaptor would: - in_msg = qd_message(); - qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_HEADER, 0); - qd_compose_start_list(field); - qd_compose_insert_bool(field, 0); // durable - qd_compose_insert_null(field); // priority - qd_compose_end_list(field); - field = qd_compose(QD_PERFORMATIVE_PROPERTIES, field); - qd_compose_start_list(field); - qd_compose_insert_ulong(field, 666); // message-id - qd_compose_insert_null(field); // user-id - qd_compose_insert_string(field, "/whereevah"); // to - qd_compose_insert_string(field, "my-subject"); // subject - qd_compose_insert_string(field, "/reply-to"); // reply-to - qd_compose_end_list(field); - - qd_message_compose_2(in_msg, field, false); - qd_compose_free(field); - - // snapshot the message buffer count to use as a baseline - const size_t base_bufct = DEQ_SIZE(MSG_CONTENT(in_msg)->buffers); - - // construct a couple of body data sections, cheek-to-jowl in a buffer - // chain -#define sd_count 6 - field = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0); - memset(buffer, '1', 99); - qd_compose_insert_binary(field, buffer, 99); - - field = qd_compose(QD_PERFORMATIVE_BODY_DATA, field); - memset(buffer, '2', 1); - qd_compose_insert_binary(field, buffer, 1); - - field = qd_compose(QD_PERFORMATIVE_BODY_DATA, field); - memset(buffer, '3', 1); - qd_compose_insert_binary(field, buffer, 1); - - field = qd_compose(QD_PERFORMATIVE_BODY_DATA, field); - memset(buffer, '4', 1001); - qd_compose_insert_binary(field, buffer, 1001); - - field = qd_compose(QD_PERFORMATIVE_BODY_DATA, field); - qd_compose_insert_binary(field, buffer, 0); - - field = qd_compose(QD_PERFORMATIVE_BODY_DATA, field); - memset(buffer, '5', 5); - qd_compose_insert_binary(field, buffer, 5); - - qd_message_extend(in_msg, field, 0); - qd_compose_free(field); - - qd_message_set_receive_complete(in_msg); - - // "fan out" the message - out_msg1 = qd_message_copy(in_msg); - qd_message_add_fanout(out_msg1); - out_msg2 = qd_message_copy(in_msg); - qd_message_add_fanout(out_msg2); - - // walk the data streams for both messages: - qd_message_stream_data_t *out_sd1[sd_count] = {0}; - qd_message_stream_data_t *out_sd2[sd_count] = {0}; - - qd_message_stream_data_t *stream_data = 0; - bool done = false; - int index = 0; - while (!done) { - switch (qd_message_next_stream_data(out_msg1, &stream_data)) { - case QD_MESSAGE_STREAM_DATA_NO_MORE: - done = true; - break; - case QD_MESSAGE_STREAM_DATA_BODY_OK: - out_sd1[index++] = stream_data; - break; - default: - result = "Next body data failed to get next body data"; - goto exit; - } - } - if (index != sd_count) { - result = "wrong stream data count out1"; - goto exit; - } - - index = 0; - done = false; - while (!done) { - switch (qd_message_next_stream_data(out_msg2, &stream_data)) { - case QD_MESSAGE_STREAM_DATA_NO_MORE: - done = true; - break; - case QD_MESSAGE_STREAM_DATA_BODY_OK: - out_sd2[index++] = stream_data; - break; - default: - result = "Next body data failed to get next body data"; - goto exit; - } - } - if (index != sd_count) { - result = "wrong stream data count out2"; - goto exit; - } - - for (index = 0; index < sd_count; ++index) { - qd_message_stream_data_release(out_sd1[index]); - qd_message_stream_data_release(out_sd2[index]); - } - - // expect: all but the last body buffer is freed: - if (DEQ_SIZE(MSG_CONTENT(out_msg1)->buffers) != base_bufct + 1 - || DEQ_SIZE(MSG_CONTENT(out_msg2)->buffers) != base_bufct + 1) { - result = "Possible buffer leak detected!"; - goto exit; - } - -exit: - qd_message_free(in_msg); - qd_message_free(out_msg1); - qd_message_free(out_msg2); - return result; -} - - -// Verify that decoding a message that has only a footer (no body data) -// messages works -static char *test_check_stream_data_footer(void *context) -{ - char *result = 0; - qd_message_t *in_msg = 0; - qd_message_t *out_msg1 = 0; - qd_message_t *out_msg2 = 0; - - // simulate building a message as an adaptor would: - in_msg = qd_message(); - qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_HEADER, 0); - qd_compose_start_list(field); - qd_compose_insert_bool(field, 0); // durable - qd_compose_insert_null(field); // priority - qd_compose_end_list(field); - field = qd_compose(QD_PERFORMATIVE_PROPERTIES, field); - qd_compose_start_list(field); - qd_compose_insert_ulong(field, 666); // message-id - qd_compose_insert_null(field); // user-id - qd_compose_insert_string(field, "/whereevah"); // to - qd_compose_insert_string(field, "my-subject"); // subject - qd_compose_insert_string(field, "/reply-to"); // reply-to - qd_compose_end_list(field); - - qd_message_compose_2(in_msg, field, false); - qd_compose_free(field); - - // snapshot the message buffer count to use as a baseline - const size_t base_bufct = DEQ_SIZE(MSG_CONTENT(in_msg)->buffers); - - // Append a footer - bool q2_blocked; - field = qd_compose(QD_PERFORMATIVE_FOOTER, 0); - qd_compose_start_map(field); - qd_compose_insert_symbol(field, "Key1"); - qd_compose_insert_string(field, "Value1"); - qd_compose_insert_symbol(field, "Key2"); - qd_compose_insert_string(field, "Value2"); - qd_compose_end_map(field); - qd_message_extend(in_msg, field, &q2_blocked); - qd_compose_free(field); - - qd_message_set_receive_complete(in_msg); - - // "fan out" the message - out_msg1 = qd_message_copy(in_msg); - qd_message_add_fanout(out_msg1); - out_msg2 = qd_message_copy(in_msg); - qd_message_add_fanout(out_msg2); - - qd_message_stream_data_t *stream_data = 0; - bool done = false; - bool footer = false; - while (!done) { - switch (qd_message_next_stream_data(out_msg1, &stream_data)) { - case QD_MESSAGE_STREAM_DATA_NO_MORE: - done = true; - break; - case QD_MESSAGE_STREAM_DATA_FOOTER_OK: - footer = true; - qd_message_stream_data_release(stream_data); - break; - case QD_MESSAGE_STREAM_DATA_BODY_OK: - result = "Unexpected body data present"; - goto exit; - default: - result = "Next body data failed to get next body data"; - goto exit; - } - } - if (!footer) { - result = "No footer found in out_msg1"; - goto exit; - } - - done = false; - footer = false; - while (!done) { - switch (qd_message_next_stream_data(out_msg2, &stream_data)) { - case QD_MESSAGE_STREAM_DATA_NO_MORE: - done = true; - break; - case QD_MESSAGE_STREAM_DATA_FOOTER_OK: - footer = true; - qd_message_stream_data_release(stream_data); - break; - case QD_MESSAGE_STREAM_DATA_BODY_OK: - result = "Unexpected body data present"; - goto exit; - default: - result = "Next body data failed to get next body data"; - goto exit; - } - } - if (!footer) { - result = "No footer found in out_msg2"; - goto exit; - } - - // expect: all but the last body buffer is freed: - if (DEQ_SIZE(MSG_CONTENT(out_msg1)->buffers) != base_bufct + 1 - || DEQ_SIZE(MSG_CONTENT(out_msg2)->buffers) != base_bufct + 1) { - result = "Possible buffer leak detected!"; - goto exit; - } - -exit: - qd_message_free(in_msg); - qd_message_free(out_msg1); - qd_message_free(out_msg2); - return result; -} - - -// Verify that alternating decode and release across fanout -// "outgoing" messages works -static char *test_check_stream_data_fanout_leak(void *context) -{ - char *result = 0; - qd_message_t *in_msg = 0; - qd_message_t *out_msg1 = 0; - qd_message_t *out_msg2 = 0; - - // simulate building a message as an adaptor would: - in_msg = qd_message(); - qd_message_content_t *content = MSG_CONTENT(in_msg); - qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_HEADER, 0); - qd_compose_start_list(field); - qd_compose_insert_bool(field, 0); // durable - qd_compose_insert_null(field); // priority - qd_compose_end_list(field); - field = qd_compose(QD_PERFORMATIVE_PROPERTIES, field); - qd_compose_start_list(field); - qd_compose_insert_ulong(field, 666); // message-id - qd_compose_insert_null(field); // user-id - qd_compose_insert_string(field, "/whereevah"); // to - qd_compose_insert_string(field, "my-subject"); // subject - qd_compose_insert_string(field, "/reply-to"); // reply-to - qd_compose_end_list(field); - - qd_message_compose_2(in_msg, field, false); - qd_compose_free(field); - - // snapshot the message buffer count to use as a baseline - const size_t base_bufct = DEQ_SIZE(MSG_CONTENT(in_msg)->buffers); - - // "fan out" the message - out_msg1 = qd_message_copy(in_msg); - qd_message_add_fanout(out_msg1); - out_msg2 = qd_message_copy(in_msg); - qd_message_add_fanout(out_msg2); - - // alternate adding and releasing body data buffers - // Trigger/release Q2 as well - - memset(buffer, 0, sizeof(buffer)); - - const int max_bodies = 25; - for (int j = 1; j < max_bodies; ++j) { - - field = 0; - for (int k = 1; k <= j; ++k) { - // next body datas "arrive" - field = qd_compose(QD_PERFORMATIVE_BODY_DATA, field); - long len = random() % 0xFF; - qd_compose_insert_binary(field, buffer, len); - } - qd_message_extend(in_msg, field, 0); - qd_compose_free(field); - - if (j == (max_bodies - 1)) { - // last body buffers to add - SET_ATOMIC_FLAG(&content->receive_complete); - } - - // consume from both messages: - qd_message_t *msgs[2] = {out_msg1, out_msg2}; - for (int i = 0; i < 2; ++i) { - qd_message_t *mptr = msgs[i]; - - int free_count = 0; - qd_message_stream_data_t *marray[max_bodies]; - bool done = false; - - - while (!done) { - //qd_message_stream_data_t *stream_data = 0; - //switch (qd_message_next_stream_data(mptr, &stream_data)) { - switch (qd_message_next_stream_data(mptr, &marray[free_count])) { - case QD_MESSAGE_STREAM_DATA_BODY_OK: { - //qd_message_stream_data_release(stream_data); - free_count += 1; - break; - } - case QD_MESSAGE_STREAM_DATA_NO_MORE: - case QD_MESSAGE_STREAM_DATA_INCOMPLETE: - done = true; - break; - default: - result = "Next body data failed to get next body data"; - goto exit; - } - } - - if (free_count != j) { - result = "expected more body datas"; - goto exit; - } - - for (int y = 0; y < free_count; ++y) - qd_message_stream_data_release(marray[y]); - } - } - - // expect: all but the last body buffer is freed: - if (DEQ_SIZE(MSG_CONTENT(out_msg1)->buffers) != base_bufct + 1 - || DEQ_SIZE(MSG_CONTENT(out_msg2)->buffers) != base_bufct + 1) { - result = "Possible buffer leak detected!"; - goto exit; - } - -exit: - qd_message_free(in_msg); - qd_message_free(out_msg1); - qd_message_free(out_msg2); - return result; -} - - static char *test_q2_callback_on_disable(void *context) { char *result = 0; @@ -1665,7 +908,10 @@ static char *test_q2_callback_on_disable(void *context) while (!blocked) { qd_buffer_list_t bin_data = DEQ_EMPTY; qd_buffer_list_append(&bin_data, data, sizeof(data)); - qd_message_stream_data_append(msg, &bin_data, &blocked); + field = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0); + qd_compose_insert_binary_buffers(field, &bin_data); + qd_message_extend(msg, field, &blocked); + qd_compose_free(field); } // now ensure callback is made @@ -1773,252 +1019,6 @@ static char *test_q2_ignore_headers(void *context) } -// Verify that partial stream data entries do no leak buffers -// -static char *test_check_stream_data_partial(void *context) -{ - char *result = 0; - qd_message_t *in_msg = 0; - qd_message_t *out_msg1 = 0; - qd_message_t *out_msg2 = 0; - - // simulate building a message as an adaptor would: - in_msg = qd_message(); - qd_message_content_t *content = MSG_CONTENT(in_msg); - qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_HEADER, 0); - qd_compose_start_list(field); - qd_compose_insert_bool(field, 0); // durable - qd_compose_insert_null(field); // priority - qd_compose_end_list(field); - field = qd_compose(QD_PERFORMATIVE_PROPERTIES, field); - qd_compose_start_list(field); - qd_compose_insert_ulong(field, 666); // message-id - qd_compose_insert_null(field); // user-id - qd_compose_insert_string(field, "/whereevah"); // to - qd_compose_insert_string(field, "my-subject"); // subject - qd_compose_insert_string(field, "/reply-to"); // reply-to - qd_compose_end_list(field); - - qd_message_compose_2(in_msg, field, false); - qd_compose_free(field); - - // snapshot the message buffer count to use as a baseline - const size_t base_bufct = DEQ_SIZE(MSG_CONTENT(in_msg)->buffers); - - // "fan out" the message - out_msg1 = qd_message_copy(in_msg); - qd_message_add_fanout(out_msg1); - out_msg2 = qd_message_copy(in_msg); - qd_message_add_fanout(out_msg2); - - // add a complete body data section - field = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0); - memset(buffer, '1', 15); - qd_compose_insert_binary(field, buffer, 15); - qd_message_extend(in_msg, field, 0); - qd_compose_free(field); - - // encoded body section: - const uint8_t section[] = {0x00, 0x53, 0x75}; - const uint8_t body[] = {0xA0, 5, 1, 2, 3, 4, 5}; - - // append only part of the body section: - qd_buffer_list_t blist = DEQ_EMPTY; - qd_buffer_list_append(&blist, section, 3); - for (qd_buffer_t *bf = DEQ_HEAD(blist); bf; bf = DEQ_NEXT(bf)) - qd_buffer_set_fanout(bf, content->fanout); - DEQ_APPEND(content->buffers, blist); - - qd_message_stream_data_t *sdata = 0; - qd_message_stream_data_result_t rc = qd_message_next_stream_data(out_msg1, &sdata); - if (rc != QD_MESSAGE_STREAM_DATA_BODY_OK) { - result = "Did not get first body data entry!"; - goto exit; - } - qd_message_stream_data_release(sdata); - - rc = qd_message_next_stream_data(out_msg1, &sdata); - if (rc != QD_MESSAGE_STREAM_DATA_INCOMPLETE) { - result = "Expected incomplete next body data entry!"; - goto exit; - } - - // append the remainder to complete this section - qd_buffer_list_append(&blist, body, 7); - for (qd_buffer_t *bf = DEQ_HEAD(blist); bf; bf = DEQ_NEXT(bf)) - qd_buffer_set_fanout(bf, content->fanout); - DEQ_APPEND(content->buffers, blist); - - // add another complete body data section - field = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0); - memset(buffer, '1', 15); - qd_compose_insert_binary(field, buffer, 15); - qd_message_extend(in_msg, field, 0); - qd_compose_free(field); - SET_ATOMIC_FLAG(&content->receive_complete); - - rc = qd_message_next_stream_data(out_msg1, &sdata); - if (rc != QD_MESSAGE_STREAM_DATA_BODY_OK) { - result = "Did not get second body data entry!"; - goto exit; - } - qd_message_stream_data_release(sdata); - - rc = qd_message_next_stream_data(out_msg1, &sdata); - if (rc != QD_MESSAGE_STREAM_DATA_BODY_OK) { - result = "Did not get last body data entry!"; - goto exit; - } - qd_message_stream_data_release(sdata); - - rc = qd_message_next_stream_data(out_msg1, &sdata); - if (rc != QD_MESSAGE_STREAM_DATA_NO_MORE) { - result = "Did not get NO MORE!"; - goto exit; - } - - int i = 0; - rc = qd_message_next_stream_data(out_msg2, &sdata); - while (rc == QD_MESSAGE_STREAM_DATA_BODY_OK) { - ++i; - qd_message_stream_data_release(sdata); - rc = qd_message_next_stream_data(out_msg2, &sdata); - } - - if (rc != QD_MESSAGE_STREAM_DATA_NO_MORE) { - result = "Did not get NO MORE for msg2!"; - goto exit; - } - - if (i != 3) { - result = "Did not get 3 sections for msg2!"; - goto exit; - } - - if (DEQ_SIZE(MSG_CONTENT(out_msg1)->buffers) != base_bufct + 1 - || DEQ_SIZE(MSG_CONTENT(out_msg2)->buffers) != base_bufct + 1) { - result = "Possible buffer leak detected!"; - goto exit; - } - -exit: - qd_message_free(in_msg); - qd_message_free(out_msg1); - qd_message_free(out_msg2); - return result; -} - -// Verify that aborted messages are detected by the data stream parser -// -static char *test_check_stream_data_aborted_body(void *context) -{ - char *result = 0; - qd_message_t *in_msg = 0; - qd_message_t *out_msg = 0; - qd_message_stream_data_t *sdata = 0; - - // simulate building a message as an adaptor would: - in_msg = qd_message(); - qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_HEADER, 0); - qd_compose_start_list(field); - qd_compose_insert_bool(field, 0); // durable - qd_compose_insert_null(field); // priority - qd_compose_end_list(field); - field = qd_compose(QD_PERFORMATIVE_PROPERTIES, field); - qd_compose_start_list(field); - qd_compose_insert_ulong(field, 666); // message-id - qd_compose_insert_null(field); // user-id - qd_compose_insert_string(field, "/whereevah"); // to - qd_compose_insert_string(field, "my-subject"); // subject - qd_compose_insert_string(field, "/reply-to"); // reply-to - qd_compose_end_list(field); - - qd_message_compose_2(in_msg, field, false); - qd_compose_free(field); - - // "fan out" the message - out_msg = qd_message_copy(in_msg); - qd_message_add_fanout(out_msg); - - bool ignore = false; - qd_buffer_list_t blist = DEQ_EMPTY; - uint8_t data[3] = {0x01, 0x02, 0x03}; - qd_buffer_list_append(&blist, data, 3); - qd_message_stream_data_append(in_msg, &blist, &ignore); - - // now simulate the abort: - qd_message_set_aborted(in_msg); - qd_message_set_receive_complete(in_msg); - - // expect next stream data to return the ABORT status - qd_message_stream_data_result_t rc = qd_message_next_stream_data(out_msg, &sdata); - if (rc != QD_MESSAGE_STREAM_DATA_ABORTED) { - fprintf(stderr, "Expected ABORTED stream data result - got: %d\n", (int) rc); - result = "Did not get the aborted status"; - goto exit; - } - - -exit: - qd_message_stream_data_release(sdata); - qd_message_free(in_msg); - qd_message_free(out_msg); - return result; -} - - -// Verify that aborted messages are detected by the data stream parser even if no body present -// -static char *test_check_stream_data_aborted_no_body(void *context) -{ - char *result = 0; - qd_message_t *in_msg = 0; - qd_message_t *out_msg = 0; - qd_message_stream_data_t *sdata = 0; - - // simulate building a message as an adaptor would: - in_msg = qd_message(); - qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_HEADER, 0); - qd_compose_start_list(field); - qd_compose_insert_bool(field, 0); // durable - qd_compose_insert_null(field); // priority - qd_compose_end_list(field); - field = qd_compose(QD_PERFORMATIVE_PROPERTIES, field); - qd_compose_start_list(field); - qd_compose_insert_ulong(field, 666); // message-id - qd_compose_insert_null(field); // user-id - qd_compose_insert_string(field, "/whereevah"); // to - qd_compose_insert_string(field, "my-subject"); // subject - qd_compose_insert_string(field, "/reply-to"); // reply-to - qd_compose_end_list(field); - - qd_message_compose_2(in_msg, field, false); - qd_compose_free(field); - - // "fan out" the message - out_msg = qd_message_copy(in_msg); - qd_message_add_fanout(out_msg); - - // now simulate the abort: - qd_message_set_aborted(in_msg); - qd_message_set_receive_complete(in_msg); - - // expect next stream data to return the ABORT status immediately since there are no complete body data sections - qd_message_stream_data_result_t rc = qd_message_next_stream_data(out_msg, &sdata); - if (rc != QD_MESSAGE_STREAM_DATA_ABORTED) { - fprintf(stderr, "Expected ABORTED stream data result - got: %d\n", (int) rc); - result = "Did not get the aborted status"; - goto exit; - } - - -exit: - qd_message_stream_data_release(sdata); - qd_message_free(in_msg); - qd_message_free(out_msg); - return result; -} - int message_tests(void) { int result = 0; @@ -2032,14 +1032,6 @@ int message_tests(void) TEST_CASE(test_q2_input_holdoff_sensing, 0); TEST_CASE(test_incomplete_annotations, 0); TEST_CASE(test_check_weird_messages, 0); - TEST_CASE(test_check_stream_data, 0); - TEST_CASE(test_check_stream_data_append, 0); - TEST_CASE(test_check_stream_data_fanout, 0); - TEST_CASE(test_check_stream_data_footer, 0); - TEST_CASE(test_check_stream_data_fanout_leak, 0); - TEST_CASE(test_check_stream_data_partial, 0); - TEST_CASE(test_check_stream_data_aborted_body, 0); - TEST_CASE(test_check_stream_data_aborted_no_body, 0); TEST_CASE(test_q2_callback_on_disable, 0); TEST_CASE(test_q2_ignore_headers, 0);