Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DISPATCH-2212: Set the subject field in the AMQP message with method… #1318

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 26 additions & 19 deletions src/adaptors/http2/http2_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,9 @@ static void free_http2_stream_data(qdr_http2_stream_data_t *stream_data, bool on
DEQ_REMOVE(session_data->streams, stream_data);
nghttp2_session_set_stream_user_data(session_data->session, stream_data->stream_id, NULL);
}
if (stream_data->method) free(stream_data->method);
if (stream_data->remote_site) free(stream_data->remote_site);
free(stream_data->method);
free(stream_data->remote_site);
free(stream_data->request_status);

qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Freeing stream_data in free_http2_stream_data (%lx)", conn->conn_id, stream_data->stream_id, (long) stream_data);

Expand Down Expand Up @@ -750,7 +751,7 @@ static int on_header_callback(nghttp2_session *session,
stream_data->method = qd_strdup((const char *)value);
}
if (strcmp(STATUS, (const char *)name) == 0) {
stream_data->request_status = atoi((const char *)value);
stream_data->request_status = qd_strdup((const char *)value);
}
qd_compose_insert_string_n(stream_data->app_properties, (const char *)name, namelen);
qd_compose_insert_string_n(stream_data->app_properties, (const char *)value, valuelen);
Expand All @@ -771,23 +772,23 @@ static bool compose_and_deliver(qdr_http2_connection_t *conn, qdr_http2_stream_d
qd_composed_field_t *header_and_props = 0;
if (conn->ingress) {
header_and_props = qd_message_compose_amqp(stream_data->message,
conn->config->address, // const char *to
0, // const char *subject
stream_data->reply_to, // const char *reply_to
0, // const char *content_type
0, // const char *content_encoding
0, // int32_t correlation_id
conn->config->site);
conn->config->address, // const char *to
stream_data->method, // const char *subject
stream_data->reply_to, // const char *reply_to
0, // const char *content_type
0, // const char *content_encoding
0, // int32_t correlation_id
conn->config->site);
}
else {
header_and_props = qd_message_compose_amqp(stream_data->message,
stream_data->reply_to, // const char *to
0, // const char *subject
0, // const char *reply_to
0, // const char *content_type
0, // const char *content_encoding
0, // int32_t correlation_id
conn->config->site);
stream_data->reply_to, // const char *to
stream_data->request_status, // const char *subject
0, // const char *reply_to
0, // const char *content_type
0, // const char *content_encoding
0, // int32_t correlation_id
conn->config->site);
}

if (receive_complete) {
Expand Down Expand Up @@ -928,7 +929,7 @@ static void _http_record_request(qdr_http2_connection_t *conn, qdr_http2_stream_
}
qd_http_record_request(http2_adaptor->core,
stream_data->method,
stream_data->request_status,
stream_data->request_status?atoi(stream_data->request_status):0,
conn->config?conn->config->address:0,
remote_addr, conn->config?conn->config->site:0,
stream_data->remote_site,
Expand Down Expand Up @@ -1691,6 +1692,12 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data)
stream_data->remote_site = (char*) qd_iterator_copy(group_id_itr);
qd_iterator_free(group_id_itr);

#ifndef NDEBUG
qd_iterator_t *subject_itr = qd_message_field_iterator(message, QD_FIELD_SUBJECT);
// Make sure there is a non-zero subject field iterator
assert(subject_itr != 0);
qd_iterator_free(subject_itr);
#endif
qd_iterator_t *app_properties_iter = qd_message_field_iterator(message, QD_FIELD_APPLICATION_PROPERTIES);
qd_parsed_field_t *app_properties_fld = qd_parse(app_properties_iter);

Expand All @@ -1714,7 +1721,7 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data)
stream_data->method = qd_strdup((const char *)hdrs[idx].value);
}
if (strcmp(STATUS, (const char *)hdrs[idx].name) == 0) {
stream_data->request_status = atoi((const char *)hdrs[idx].value);
stream_data->request_status = qd_strdup((const char *)hdrs[idx].value);
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/adaptors/http2/http2_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ struct qdr_http2_stream_data_t {
void *context;
char *reply_to;
char *remote_site; //for stats:
char *method; //for stats:
char *method; //for stats, also used in the subject field of AMQP request message.
char *request_status; //for stats, also used in the subject field of AMQP response message.
qdr_delivery_t *in_dlv;
qdr_delivery_t *out_dlv;
uint64_t incoming_id;
Expand Down Expand Up @@ -114,7 +115,6 @@ struct qdr_http2_stream_data_t {
bool in_dlv_decrefed;
bool out_dlv_decrefed;
bool body_data_added;
int request_status;
int bytes_in;
int bytes_out;
qd_timestamp_t start;
Expand Down