diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index 85c8bc45c..36251f63d 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -68,18 +68,17 @@ struct qdr_tcp_connection_t { char *reply_to; qdr_connection_t *qdr_conn; uint64_t conn_id; - qdr_link_t *incoming; + qdr_link_t *incoming_link; uint64_t incoming_id; - qdr_link_t *outgoing; + qdr_link_t *outgoing_link; uint64_t outgoing_id; pn_raw_connection_t *pn_raw_conn; sys_mutex_t *activation_lock; - qdr_delivery_t *instream; - qdr_delivery_t *outstream; + qdr_delivery_t *in_dlv_stream; + qdr_delivery_t *out_dlv_stream; bool ingress; bool flow_enabled; - bool incoming_started; - bool egress_dispatcher; + bool is_egress_dispatcher_conn; bool connector_closed;//only used if egress_dispatcher=true bool in_list; // This connection is in the adaptor's connections list sys_atomic_t raw_closed_read; // proton event seen @@ -142,7 +141,7 @@ static void qdr_del_tcp_connection_CT(qdr_core_t *core, qdr_action_t *action, bo static void handle_disconnected(qdr_tcp_connection_t* conn); static void free_qdr_tcp_connection(qdr_tcp_connection_t* conn); static void free_bridge_config(qd_tcp_bridge_t *config); -static void qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc); +static void qdr_tcp_create_server_side_connection(qdr_tcp_connection_t* tc); static void detach_links(qdr_tcp_connection_t *tc); static void qd_tcp_connector_decref(qd_tcp_connector_t* c); @@ -176,7 +175,7 @@ static void allocate_tcp_write_buffer(pn_raw_buffer_t *buffer) static inline uint64_t qdr_tcp_conn_linkid(const qdr_tcp_connection_t *conn) { assert(conn); - return conn->instream ? conn->incoming_id : conn->outgoing_id; + return conn->in_dlv_stream ? conn->incoming_id : conn->outgoing_id; } static inline const char * qdr_tcp_connection_role_name(const qdr_tcp_connection_t *tc) @@ -199,7 +198,7 @@ static void on_activate(void *context) qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] on_activate", conn->conn_id); while (qdr_connection_process(conn->qdr_conn)) {} - if (conn->egress_dispatcher && conn->connector_closed) { + if (conn->is_egress_dispatcher_conn && conn->connector_closed) { detach_links(conn); qdr_connection_set_context(conn->qdr_conn, 0); qdr_connection_closed(conn->qdr_conn); @@ -330,7 +329,7 @@ static int handle_incoming(qdr_tcp_connection_t *conn, const char *msg) } // Ensure existence of ingress stream message - if (!conn->instream) { + if (!conn->in_dlv_stream) { qd_message_t *msg = qd_message(); qd_message_set_streaming_annotation(msg); @@ -393,23 +392,20 @@ static int handle_incoming(qdr_tcp_connection_t *conn, const char *msg) plog_latency_end(conn->plog); } - - conn->instream = qdr_link_deliver(conn->incoming, msg, 0, false, 0, 0, 0, 0); + conn->in_dlv_stream = qdr_link_deliver(conn->incoming_link, msg, 0, false, 0, 0, 0, 0); qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"][D%"PRIu32"] Initiating empty %s incoming stream message", - conn->conn_id, conn->incoming_id, conn->instream->delivery_id, + conn->conn_id, conn->incoming_id, conn->in_dlv_stream->delivery_id, qdr_tcp_connection_role_name(conn)); - conn->incoming_started = true; } - qdr_delivery_t *conn_instream = conn->instream; // Don't read from proton if in Q2 holdoff if (conn->q2_blocked) { qd_log(log, QD_LOG_DEBUG, DLV_FMT" handle_incoming q2_blocked for %s connection", - DLV_ARGS(conn_instream), qdr_tcp_connection_role_name(conn)); + DLV_ARGS(conn->in_dlv_stream), qdr_tcp_connection_role_name(conn)); return 0; } @@ -424,18 +420,18 @@ static int handle_incoming(qdr_tcp_connection_t *conn, const char *msg) // Push the bytes just read into the streaming message if (count > 0) { - qd_message_stream_data_append(qdr_delivery_message(conn->instream), &buffers, &conn->q2_blocked); + qd_message_stream_data_append(qdr_delivery_message(conn->in_dlv_stream), &buffers, &conn->q2_blocked); if (conn->q2_blocked) { // note: unit tests grep for this log! qd_log(log, QD_LOG_DEBUG, DLV_FMT" %s client link blocked on Q2 limit", - DLV_ARGS(conn_instream), qdr_tcp_connection_role_name(conn)); + DLV_ARGS(conn->in_dlv_stream), qdr_tcp_connection_role_name(conn)); } - qdr_delivery_continue(tcp_adaptor->core, conn->instream, false); + qdr_delivery_continue(tcp_adaptor->core, conn->in_dlv_stream, false); qd_log(log, QD_LOG_TRACE, DLV_FMT" Continuing %s message with %i bytes", - DLV_ARGS(conn_instream), qdr_tcp_connection_role_name(conn), count); + DLV_ARGS(conn->in_dlv_stream), qdr_tcp_connection_role_name(conn), count); } else { assert (DEQ_SIZE(buffers) == 0); } @@ -443,10 +439,10 @@ static int handle_incoming(qdr_tcp_connection_t *conn, const char *msg) // Close the stream message if read side has closed if (IS_ATOMIC_FLAG_SET(&conn->raw_closed_read)) { qd_log(log, QD_LOG_DEBUG, - DLV_FMT" close %s instream delivery", - DLV_ARGS(conn_instream), qdr_tcp_connection_role_name(conn)); - qd_message_set_receive_complete(qdr_delivery_message(conn->instream)); - qdr_delivery_continue(tcp_adaptor->core, conn->instream, true); + DLV_FMT" close %s in_dlv_stream delivery", + DLV_ARGS(conn->in_dlv_stream), qdr_tcp_connection_role_name(conn)); + qd_message_set_receive_complete(qdr_delivery_message(conn->in_dlv_stream)); + qdr_delivery_continue(tcp_adaptor->core, conn->in_dlv_stream, true); conn->raw_read_shutdown = true; } @@ -493,21 +489,21 @@ static void handle_disconnected(qdr_tcp_connection_t* conn) conn->previous_stream_data = 0; } - if (conn->instream) { + if (conn->in_dlv_stream) { qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, - "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - close instream", + "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - close in_dlv_stream", conn->conn_id, conn->incoming_id); - qd_message_set_receive_complete(qdr_delivery_message(conn->instream)); - qdr_delivery_continue(tcp_adaptor->core, conn->instream, true); - qdr_delivery_decref(tcp_adaptor->core, conn->instream, "tcp-adaptor.handle_disconnected - instream"); - conn->instream = 0; + qd_message_set_receive_complete(qdr_delivery_message(conn->in_dlv_stream)); + qdr_delivery_continue(tcp_adaptor->core, conn->in_dlv_stream, true); + qdr_delivery_decref(tcp_adaptor->core, conn->in_dlv_stream, "tcp-adaptor.handle_disconnected - in_dlv_stream"); + conn->in_dlv_stream = 0; } - if (conn->outstream) { + if (conn->out_dlv_stream) { qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, - "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - close outstream", + "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - close out_dlv_stream", conn->conn_id, conn->outgoing_id); - qdr_delivery_decref(tcp_adaptor->core, conn->outstream, "tcp-adaptor.handle_disconnected - outstream"); - conn->outstream = 0; + qdr_delivery_decref(tcp_adaptor->core, conn->out_dlv_stream, "tcp-adaptor.handle_disconnected - out_dlv_stream"); + conn->out_dlv_stream = 0; } detach_links(conn); @@ -638,12 +634,12 @@ static bool copy_outgoing_buffs(qdr_tcp_connection_t *conn) static void handle_outgoing(qdr_tcp_connection_t *conn) { - if (conn->outstream) { + if (conn->out_dlv_stream) { if (IS_ATOMIC_FLAG_SET(&conn->raw_closed_write)) { // give no more buffers to raw connection return; } - qd_message_t *msg = qdr_delivery_message(conn->outstream); + qd_message_t *msg = qdr_delivery_message(conn->out_dlv_stream); bool read_more_body = true; if (conn->outgoing_buff_count > 0) { @@ -780,7 +776,7 @@ static void qdr_tcp_connection_ingress_accept(qdr_tcp_connection_t* tc) qdr_terminus_t *target = qdr_terminus(0); qdr_terminus_set_address(target, tc->bridge->address); - tc->outgoing = qdr_link_first_attach(conn, + tc->outgoing_link = qdr_link_first_attach(conn, QD_OUTGOING, dynamic_source, //qdr_terminus_t *source, qdr_terminus(0), //qdr_terminus_t *target, @@ -789,8 +785,8 @@ static void qdr_tcp_connection_ingress_accept(qdr_tcp_connection_t* tc) false, NULL, &(tc->outgoing_id)); - qdr_link_set_context(tc->outgoing, tc); - tc->incoming = qdr_link_first_attach(conn, + qdr_link_set_context(tc->outgoing_link, tc); + tc->incoming_link = qdr_link_first_attach(conn, QD_INCOMING, qdr_terminus(0), //qdr_terminus_t *source, target, //qdr_terminus_t *target, @@ -800,7 +796,7 @@ static void qdr_tcp_connection_ingress_accept(qdr_tcp_connection_t* tc) NULL, &(tc->incoming_id)); tc->opened_time = qdr_core_uptime_ticks(tcp_adaptor->core); - qdr_link_set_context(tc->incoming, tc); + qdr_link_set_context(tc->incoming_link, tc); qdr_action_t *action = qdr_action(qdr_add_tcp_connection_CT, "add_tcp_connection"); action->args.general.context_1 = tc; @@ -839,7 +835,7 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void "[C%"PRIu64"] PN_RAW_CONNECTION_CONNECTED Connector egress connected to %s", conn->conn_id, conn->remote_address); if (!!conn->initial_delivery) { - qdr_tcp_open_server_side_connection(conn); + qdr_tcp_create_server_side_connection(conn); } while (qdr_connection_process(conn->qdr_conn)) {} handle_outgoing(conn); @@ -887,7 +883,7 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void "[C%"PRIu64"] PN_RAW_CONNECTION_NEED_READ_BUFFERS %s", conn->conn_id, qdr_tcp_connection_role_name(conn)); while (qdr_connection_process(conn->qdr_conn)) {} - if (conn->incoming_started) { + if (conn->in_dlv_stream) { grant_read_buffers(conn); handle_incoming(conn, "PNRC_NEED_READ_BUFFERS"); } @@ -915,7 +911,7 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void "[C%"PRIu64"] PN_RAW_CONNECTION_READ %s Event ", conn->conn_id, qdr_tcp_connection_role_name(conn)); int read = 0; - if (conn->incoming_started) { + if (conn->in_dlv_stream) { // Streaming message exists. Process read normally. read = handle_incoming(conn, "PNRC_READ"); } @@ -952,7 +948,7 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void qd_delivery_state_t *dstate = qd_delivery_state(); dstate->section_number = 0; dstate->section_offset = conn->bytes_out; - qdr_delivery_remote_state_updated(tcp_adaptor->core, conn->outstream, + qdr_delivery_remote_state_updated(tcp_adaptor->core, conn->out_dlv_stream, PN_RECEIVED, false, // settled dstate, @@ -973,28 +969,36 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void } } -static qdr_tcp_connection_t *qdr_tcp_connection_ingress(qd_tcp_listener_t* listener) + +static qdr_tcp_connection_t *qdr_tcp_connection(bool ingress, qd_server_t *server, qd_tcp_bridge_t *bridge) { qdr_tcp_connection_t* tc = new_qdr_tcp_connection_t(); ZERO(tc); - tc->activation_lock = sys_mutex(); - tc->ingress = true; tc->context.context = tc; tc->context.handler = &handle_connection_event; - tc->bridge = listener->config; - sys_atomic_inc(&tc->bridge->ref_count); - tc->server = listener->server; sys_atomic_init(&tc->q2_restart, 0); sys_atomic_init(&tc->raw_closed_read, 0); sys_atomic_init(&tc->raw_closed_write, 0); - - tc->plog = plog_start_record(PLOG_RECORD_FLOW, listener->plog); - plog_set_uint64(tc->plog, PLOG_ATTRIBUTE_OCTETS, 0); + tc->activation_lock = sys_mutex(); + tc->ingress = ingress; + tc->server = server; + tc->bridge = bridge; + sys_atomic_inc(&tc->bridge->ref_count); LOCK(tc->bridge->stats_lock); tc->bridge->connections_opened +=1; UNLOCK(tc->bridge->stats_lock); + return tc; +} + +static qdr_tcp_connection_t *qdr_tcp_connection_ingress(qd_tcp_listener_t* listener) +{ + qdr_tcp_connection_t* tc = qdr_tcp_connection(true, listener->server, listener->config); + + tc->plog = plog_start_record(PLOG_RECORD_FLOW, listener->plog); + plog_set_uint64(tc->plog, PLOG_ATTRIBUTE_OCTETS, 0); + tc->pn_raw_conn = pn_raw_connection(); pn_raw_connection_set_context(tc->pn_raw_conn, tc); @@ -1019,9 +1023,13 @@ static qdr_tcp_connection_t *qdr_tcp_connection_ingress(qd_tcp_listener_t* liste } -static void qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc) +/** + * Creates the connection objects that are necessary to communicate with the core. + * This function does not open any socket level connections + */ +static void qdr_tcp_create_server_side_connection(qdr_tcp_connection_t* tc) { - const char *host = tc->egress_dispatcher ? "egress-dispatch" : tc->bridge->host_port; + const char *host = tc->is_egress_dispatcher_conn ? "egress-dispatch" : tc->bridge->host_port; qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] Opening server-side core connection %s", tc->conn_id, host); // @@ -1075,24 +1083,25 @@ static void qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc) i_conn_id = tc->initial_delivery->conn_id; i_link_id = tc->initial_delivery->link_id; } - tc->outgoing = qdr_link_first_attach(conn, - QD_OUTGOING, - source, //qdr_terminus_t *source, - qdr_terminus(0), //qdr_terminus_t *target, - "tcp.egress.out", //const char *name, - 0, //const char *terminus_addr, - !(tc->egress_dispatcher), - tc->initial_delivery, - &(tc->outgoing_id)); + + tc->outgoing_link = qdr_link_first_attach(conn, + QD_OUTGOING, + source, //qdr_terminus_t *source, + qdr_terminus(0), //qdr_terminus_t *target, + "tcp.egress.out", //const char *name, + 0, //const char *terminus_addr, + !(tc->is_egress_dispatcher_conn), + tc->initial_delivery, + &(tc->outgoing_id)); if (!!tc->initial_delivery) { qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, DLV_FMT" initial_delivery ownership passed to "DLV_FMT, i_conn_id, i_link_id, tc->initial_delivery->delivery_id, - tc->outgoing->conn_id, tc->outgoing->identity, tc->initial_delivery->delivery_id); + tc->outgoing_link->conn_id, tc->outgoing_link->identity, tc->initial_delivery->delivery_id); qdr_delivery_decref(tcp_adaptor->core, tc->initial_delivery, "tcp-adaptor - passing initial_delivery into new link"); tc->initial_delivery = 0; } - qdr_link_set_context(tc->outgoing, tc); + qdr_link_set_context(tc->outgoing_link, tc); } @@ -1101,38 +1110,11 @@ static qdr_tcp_connection_t *qdr_tcp_connection_egress(qd_tcp_connector_t *conne qd_server_t *server, qdr_delivery_t *initial_delivery) { - qdr_tcp_connection_t* tc = new_qdr_tcp_connection_t(); - ZERO(tc); + qdr_tcp_connection_t* tc = qdr_tcp_connection(false, server, config); tc->connector = connector; sys_atomic_inc(&connector->ref_count); - tc->activation_lock = sys_mutex(); - if (initial_delivery) { - tc->egress_dispatcher = false; - tc->initial_delivery = initial_delivery; - qdr_delivery_incref(initial_delivery, "qdr_tcp_connection_egress - held initial delivery"); - } else { - tc->activate_timer = qd_timer(tcp_adaptor->core->qd, on_activate, tc); - tc->egress_dispatcher = true; - } - tc->ingress = false; - tc->context.context = tc; - tc->context.handler = &handle_connection_event; - tc->bridge = config; - sys_atomic_inc(&tc->bridge->ref_count); - tc->server = server; - sys_atomic_init(&tc->q2_restart, 0); - sys_atomic_init(&tc->raw_closed_read, 0); - sys_atomic_init(&tc->raw_closed_write, 0); - tc->conn_id = qd_server_allocate_connection_id(tc->server); - - if (!tc->egress_dispatcher) { - tc->plog = plog_start_record(PLOG_RECORD_FLOW, connector->plog); - plog_set_uint64(tc->plog, PLOG_ATTRIBUTE_OCTETS, 0); - } - LOCK(tc->bridge->stats_lock); - tc->bridge->connections_opened +=1; - UNLOCK(tc->bridge->stats_lock); + tc->conn_id = qd_server_allocate_connection_id(tc->server); // // If this is the egress dispatcher, set up the core connection now. @@ -1140,11 +1122,20 @@ static qdr_tcp_connection_t *qdr_tcp_connection_egress(qd_tcp_connector_t *conne // running in that connection's context to set up the core // connection. // - if (tc->egress_dispatcher) { - qdr_tcp_open_server_side_connection(tc); - return tc; - } - else { + if (initial_delivery) { + // + // This is not an egress dispatcher connection. + // Real TCP traffic flows thru this connection. + // There is one of these connection per every client that is attaching to the router + // network, i.e. there are N of these non-egress dispatcher connections for N clients respectively. + // + tc->is_egress_dispatcher_conn = false; + tc->initial_delivery = initial_delivery; + qdr_delivery_incref(initial_delivery, "qdr_tcp_connection_egress - held initial delivery"); + + tc->plog = plog_start_record(PLOG_RECORD_FLOW, connector->plog); + plog_set_uint64(tc->plog, PLOG_ATTRIBUTE_OCTETS, 0); + allocate_tcp_write_buffer(&tc->write_buffer); allocate_tcp_buffer(&tc->read_buffer); qd_log(tcp_adaptor->log_source, QD_LOG_INFO, @@ -1162,8 +1153,23 @@ static qdr_tcp_connection_t *qdr_tcp_connection_egress(qd_tcp_connector_t *conne // and freeing this qdr_tcp_connection_t from underneath. pn_proactor_raw_connect(qd_server_proactor(tc->server), tc->pn_raw_conn, tc->bridge->host_port); return 0; - } + } else { + // + // This is just an egress dispatcher connection. It is initially created so an + // outgoing link can be created on it. When a delivery arrives on the outgoing link + // in the egress dispatcher connection, it is moved to another connection/link + // + tc->is_egress_dispatcher_conn = true; + tc->activate_timer = qd_timer(tcp_adaptor->core->qd, on_activate, tc); + + // + // Create a server side dispatcher connection. + // We don't want to create any socket level connection here. + // + qdr_tcp_create_server_side_connection(tc); + return tc; + } } @@ -1426,7 +1432,7 @@ QD_EXPORT qd_tcp_connector_t *qd_dispatch_configure_tcp_connector(qd_dispatch_t return c; } -static void close_egress_dispatcher(qdr_tcp_connection_t *context) +static void close_egress_dispatcher_connection(qdr_tcp_connection_t *context) { //actual close needs to happen on connection thread context->connector_closed = true; @@ -1442,7 +1448,7 @@ QD_EXPORT void qd_dispatch_delete_tcp_connector(qd_dispatch_t *qd, void *impl) qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "Deleted TcpConnector for %s, %s:%s", ct->config->address, ct->config->host, ct->config->port); - close_egress_dispatcher((qdr_tcp_connection_t*) ct->dispatcher_conn); + close_egress_dispatcher_connection((qdr_tcp_connection_t*) ct->dispatcher_conn); DEQ_REMOVE(tcp_adaptor->connectors, ct); qd_tcp_connector_decref(ct); } @@ -1626,7 +1632,7 @@ static int qdr_tcp_push(void *context, qdr_link_t *link, int limit) /** * @brief Find the flow-id in the message's application properties, it it's there and use * it as the counterflow reference of the connection's flow record. - * + * * @param tc Pointer to the tcp connection state * @param msg Pointer to the message received from the ingress (listener) side */ @@ -1682,14 +1688,19 @@ static uint64_t qdr_tcp_deliver(void *context, qdr_link_t *link, qdr_delivery_t qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context; qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, DLV_FMT" qdr_tcp_deliver Delivery event", DLV_ARGS(delivery)); - if (tc->egress_dispatcher) { + if (tc->is_egress_dispatcher_conn) { + // + // We have received a delivery on the egress dispatcher connection. + // Move it to a new link on a new connection. This new connection + // will be used to communicate with the TCP server. + // qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, DLV_FMT" tcp_adaptor initiating egress connection", DLV_ARGS(delivery)); qdr_tcp_connection_egress(tc->connector, tc->bridge, tc->server, delivery); return QD_DELIVERY_MOVED_TO_NEW_LINK; - } else if (!tc->outstream) { - tc->outstream = delivery; - qdr_delivery_incref(delivery, "tcp_adaptor - new outstream"); + } else if (!tc->out_dlv_stream) { + tc->out_dlv_stream = delivery; + qdr_delivery_incref(delivery, "tcp_adaptor - new out_dlv_stream"); if (tc->ingress) { plog_latency_end(tc->plog); } else { @@ -1707,7 +1718,7 @@ static uint64_t qdr_tcp_deliver(void *context, qdr_link_t *link, qdr_delivery_t plog_set_trace(tc->plog, msg); qdr_terminus_t *target = qdr_terminus(0); qdr_terminus_set_address(target, tc->reply_to); - tc->incoming = qdr_link_first_attach(tc->qdr_conn, + tc->incoming_link = qdr_link_first_attach(tc->qdr_conn, QD_INCOMING, qdr_terminus(0), //qdr_terminus_t *source, target, //qdr_terminus_t *target, @@ -1718,10 +1729,12 @@ static uint64_t qdr_tcp_deliver(void *context, qdr_link_t *link, qdr_delivery_t &(tc->incoming_id)); qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] %s Created link to %s", - tc->conn_id, tc->incoming->identity, - qdr_tcp_quadrant_id(tc, tc->incoming), tc->reply_to); - qdr_link_set_context(tc->incoming, tc); + tc->conn_id, tc->incoming_link->identity, + qdr_tcp_quadrant_id(tc, tc->incoming_link), tc->reply_to); + qdr_link_set_context(tc->incoming_link, tc); + // //add this connection to those visible through management now that we have the global_id + // qdr_action_t *action = qdr_action(qdr_add_tcp_connection_CT, "add_tcp_connection"); action->args.general.context_1 = tc; qdr_action_enqueue(tcp_adaptor->core, action); @@ -2219,18 +2232,18 @@ static void qdr_del_tcp_connection_CT(qdr_core_t *core, qdr_action_t *action, bo static void detach_links(qdr_tcp_connection_t *conn) { - if (conn->incoming) { + if (conn->incoming_link) { qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] detaching incoming link", conn->conn_id, conn->incoming_id); - qdr_link_detach(conn->incoming, QD_LOST, 0); - conn->incoming = 0; + qdr_link_detach(conn->incoming_link, QD_LOST, 0); + conn->incoming_link = 0; } - if (conn->outgoing) { + if (conn->outgoing_link) { qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] detaching outgoing link", conn->conn_id, conn->outgoing_id); - qdr_link_detach(conn->outgoing, QD_LOST, 0); - conn->outgoing = 0; + qdr_link_detach(conn->outgoing_link, QD_LOST, 0); + conn->outgoing_link = 0; } }