diff --git a/src/adaptors/tcp/tcp_adaptor.c b/src/adaptors/tcp/tcp_adaptor.c index c73d3773c..8ddfc30bf 100644 --- a/src/adaptors/tcp/tcp_adaptor.c +++ b/src/adaptors/tcp/tcp_adaptor.c @@ -702,8 +702,10 @@ static int read_message_body(qdr_tcp_connection_t *conn, qd_message_t *msg, pn_r conn->read_eos_seen = true; break; case QD_MESSAGE_STREAM_DATA_INVALID: + // Corrupted message, treat like EOS since there is no way to undo what has already been sent qd_log(LOG_TCP_ADAPTOR, QD_LOG_ERROR, - "[C%" PRIu64 "] Invalid body data for streaming message", conn->conn_id); + "[C%" PRIu64 "] Invalid body data for streaming message, closing connection", conn->conn_id); + conn->read_eos_seen = true; break; default: break; @@ -1996,7 +1998,11 @@ static uint64_t qdr_tcp_deliver(void *context, qdr_link_t *link, qdr_delivery_t DLV_FMT " tcp_adaptor egress message incomplete, waiting for more", DLV_ARGS(delivery)); return 0; // retry later } - assert(depth_ok == QD_MESSAGE_DEPTH_OK); // otherwise bug in message encoding? + if (depth_ok != QD_MESSAGE_DEPTH_OK) { // otherwise bug? corrupted message encoding? + qd_log(LOG_TCP_ADAPTOR, QD_LOG_WARNING, DLV_FMT " Malformed TCP message - discarding!", DLV_ARGS(delivery)); + qd_message_set_send_complete(msg); + return PN_REJECTED; + } // ISSUE-1136: check if the message format is correct. For this adaptor the content-type field must be // unset. See comment in handle_incoming(). @@ -2004,7 +2010,7 @@ static uint64_t qdr_tcp_deliver(void *context, qdr_link_t *link, qdr_delivery_t qd_iterator_t *ctype = qd_message_field_iterator(msg, QD_FIELD_CONTENT_TYPE); if (ctype) { qd_iterator_free(ctype); - qd_log(LOG_TCP_ADAPTOR, QD_LOG_ERROR, DLV_FMT " Misconfigured tcpConnector (wrong version)", + qd_log(LOG_TCP_ADAPTOR, QD_LOG_ERROR, DLV_FMT " Misconfigured tcpConnector (wrong encapsulation)", DLV_ARGS(delivery)); qd_message_set_send_complete(msg); return PN_RELEASED; // allow it to be re-forwarded to a different adaptor @@ -2028,6 +2034,50 @@ static uint64_t qdr_tcp_deliver(void *context, qdr_link_t *link, qdr_delivery_t } else if (!tc->out_dlv_stream) { qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " tcp_adaptor delivery arrived on non-egress dispatcher connection", DLV_ARGS(delivery)); + + if (tc->ingress) { + // Egress (connector-side) outgoing messages are validated when they arrive on the dispatcher link (see + // above). Ingress (client-side) outbound reply messages do not arrive on the dispatch link so these + // messages need to be validated here. + + qd_message_depth_status_t depth_ok = qd_message_check_depth(msg, QD_DEPTH_APPLICATION_PROPERTIES); + if (depth_ok == QD_MESSAGE_DEPTH_INCOMPLETE) { + qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, + DLV_FMT " tcp_adaptor reply message incomplete, waiting for more", DLV_ARGS(delivery)); + return 0; // retry later + } + if (depth_ok != QD_MESSAGE_DEPTH_OK) { // otherwise bug? corrupted message encoding? + qd_log(LOG_TCP_ADAPTOR, QD_LOG_WARNING, DLV_FMT " Malformed TCP message - discarding!", DLV_ARGS(delivery)); + qd_message_set_send_complete(msg); + return PN_REJECTED; + } + + // ISSUE-1136: check if the message format is correct. For this adaptor the content-type field must be + // unset. See comment in handle_incoming(). + // + qd_iterator_t *ctype = qd_message_field_iterator(msg, QD_FIELD_CONTENT_TYPE); + if (ctype) { + qd_iterator_free(ctype); + qd_log(LOG_TCP_ADAPTOR, QD_LOG_ERROR, DLV_FMT " Misconfigured tcpListener (wrong outgoing encapsulation)", + DLV_ARGS(delivery)); + qd_message_set_send_complete(msg); + + // What to do? This is a reply message, so it cannot be re-delivered to another service. + + if (tc->pn_raw_conn) { + // set the raw connection condition info so it will appear in the vanflow logs + // when the connection disconnects + pn_condition_t *cond = pn_raw_connection_condition(tc->pn_raw_conn); + if (!!cond) { + (void) pn_condition_set_name(cond, "delivery-failed"); + (void) pn_condition_set_description(cond, "invalid message encapsulation"); + } + pn_raw_connection_close(tc->pn_raw_conn); + } + return PN_REJECTED; + } + } + tc->out_dlv_stream = delivery; qdr_delivery_incref(delivery, "tcp_adaptor - new out_dlv_stream"); if (tc->ingress) { @@ -2115,7 +2165,7 @@ static void qdr_tcp_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t pn_condition_t *cond = pn_raw_connection_condition(tc->pn_raw_conn); if (!!cond) { (void) pn_condition_set_name(cond, "delivery-failed"); - (void) pn_condition_set_description(cond, "destination unreachable"); + (void) pn_condition_set_description(cond, (disp == PN_REJECTED) ? "invalid/corrupt message" : "destination unreachable"); } pn_raw_connection_close(tc->pn_raw_conn); } diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py index dbfd4e23d..906ccba7c 100644 --- a/tests/system_tests_tcp_adaptor.py +++ b/tests/system_tests_tcp_adaptor.py @@ -28,7 +28,7 @@ from subprocess import STDOUT from typing import List, Optional, Mapping -from proton import Message +from proton import Message, Disposition from proton.handlers import MessagingHandler from proton.reactor import Container @@ -2195,53 +2195,100 @@ def check_connection_deleted(): client_conn.close() -class TcpInvalidEncodingTest(TestCase): +class TcpLegacyInvalidEncodingTest(TestCase): """ Ensure that the TCP adaptor can recover from receiving an improperly formatted/wrong version AMQP encoded stream message. """ @classmethod def setUpClass(cls): - super(TcpInvalidEncodingTest, cls).setUpClass() + super(TcpLegacyInvalidEncodingTest, cls).setUpClass() config = [ ('router', {'mode': 'interior', 'id': 'TcpInvalidEncoding'}), # Listener for handling router management requests. ('listener', {'role': 'normal', 'port': cls.tester.get_port()}), - ('tcpConnector', {'host': "localhost", + ('tcpConnector', {'host': "127.0.0.1", 'port': cls.tester.get_port(), - 'address': 'tcp-adaptor', + 'address': 'tcp-connector', + 'encapsulation': 'legacy', 'siteId': "mySite"}), + ('tcpListener', {'host': "0.0.0.0", + 'port': cls.tester.get_port(), + 'address': 'tcp-listener', + 'encapsulation': 'legacy', + 'siteId': "mySite"}), ('address', {'prefix': 'closest', 'distribution': 'closest'}), ('address', {'prefix': 'multicast', 'distribution': 'multicast'}), ] cls.router = cls.tester.qdrouterd('TcpInvalidEncoding', Qdrouterd.Config(config), wait=True) - cls.address = cls.router.addresses[0] - def test_invalid_amqp_message(self): + cls.amqp_address = cls.router.addresses[0] + cls.listener_address = cls.router.tcp_addresses[0] + + def test_invalid_egress_client_request_encaps(self): """ - Send an AMQP message addressed to the TCP service via the amqp - listener. Set values in the AMQP header which will conflict with what - is expected by the adaptor. Verify the message is RELEASED and an an - error has been logged. + Simulate an invalid message arriving at the egress connector. Verify + the message is RELEASED and an error has been logged. """ + + # send a request message with an incompatible encapsulation + msg = Message() - msg.to = "tcp-adaptor" + msg.to = "tcp-connector" msg.subject = "stuff" msg.reply_to = "invalid/reply/to" - msg.content_type = "application/octet-stream" - test = SendAMQPMessage(msg, self.address, 'tcp-adaptor') + msg.content_type = "This-is-wrong" + test = InvalidClientSendRequest(msg, self.amqp_address, 'tcp-connector') + test.run() + self.assertIsNone(test.error) + self.router.wait_log_message(pattern=r"Misconfigured tcpConnector \(wrong encapsulation\)") + + def test_invalid_ingress_server_reply_encaps(self): + """ + Simulate an invalid reply message arriving at the ingress listener. Verify + the message is RELEASED and an error has been logged. + """ + + # send a reply message with an incompatible encapsulation + + msg = Message() + msg.subject = "Subject" + msg.annotations = {":flowid": "whatever"} + msg.content_type = "This-is-wrong" + test = InvalidServerSendReply(msg, self.amqp_address, + self.listener_address, 'tcp-listener', + Disposition.REJECTED) + test.run() + self.assertIsNone(test.error) + self.router.wait_log_message(pattern=r"Misconfigured tcpListener \(wrong outgoing encapsulation\)") + + def test_invalid_ingress_server_reply_body(self): + """ + Simulate an invalid reply message arriving at the ingress listener. Verify + the message is RELEASED and an error has been logged. + """ + + # send a reply message with an incompatible body format + + msg = Message() + msg.subject = "Subject" + msg.annotations = {":flowid": "whatever"} + msg.body = "This is a STRING, NOT VBIN!" + test = InvalidServerSendReply(msg, self.amqp_address, + self.listener_address, 'tcp-listener', + Disposition.ACCEPTED) test.run() self.assertIsNone(test.error) - self.router.wait_log_message(pattern=r"Misconfigured tcpConnector \(wrong version\)") + self.router.wait_log_message(pattern=r"Invalid body data for streaming message") -class SendAMQPMessage(MessagingHandler): +class InvalidClientSendRequest(MessagingHandler): def __init__(self, msg, address, destination): - super(SendAMQPMessage, self).__init__(auto_settle=False) + super(InvalidClientSendRequest, self).__init__(auto_settle=False) self.msg = msg self.address = address self.destination = destination @@ -2283,6 +2330,102 @@ def run(self): Container(self).run() +class InvalidServerSendReply(MessagingHandler): + def __init__(self, msg, server_address, listener_address, service_address, dispo): + super(InvalidServerSendReply, self).__init__(auto_settle=False) + self.msg = msg + self.service_address = service_address + self.error = None + self.timer = None + self.expected_dispo = dispo + + # fake server connection, receive link for request, send link for reply-to + self.server_address = server_address + self.server_conn = None + self.server_sender = None + self.server_receiver = None + self.server_sent = False + + # The request message that arrives at the "server" is streaming. Proton + # does not give us an "on_message" callback since it never + # completes. Wait long enough for the headers to arrive so we can + # extract the reply-to + self.request_dlv = None + self.dlv_drain_timer = None + + # fack tcp client, just sends a request message + self.listener_address = listener_address + self.client_conn = None + self.client_sent = False + + def done(self, error=None): + self.error = error + if self.timer: + self.timer.cancel() + self.server_conn.close() + if self.client_conn is not None: + self.client_conn.close() + if self.dlv_drain_timer: + self.dlv_drain_timer.cancel() + + def timeout(self): + self.timer = None + self.done(error=f"Timeout Expired: sent={self.sent}") + + def on_start(self, event): + self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) + self.server_conn = event.container.connect(self.server_address) + self.server_receiver = event.container.create_receiver(self.server_conn, + self.service_address) + + def on_timer_task(self, event): + # at this point we expect the reply-to header to have arrived + try: + data = self.server_receiver.recv(self.request_dlv.pending) + #print(f"len={len(xxx)}\nBODY=[{xxx}]", flush=True) + msg = Message() + msg.decode(data) + self.server_sender = event.container.create_sender(self.server_conn, + msg.reply_to) + except Exception as exc: + self.bail(error=f"Incomplete request msg headers {data}") + + self.request_dlv.settle() + + def on_delivery(self, event): + if event.receiver == self.server_receiver: + if self.request_dlv is None and event.delivery.readable: + # sleep a bit to allow all the header data to arrive on the + # delivery + self.request_dlv = event.delivery + self.dlv_drain_timer = event.reactor.schedule(1.0, self) + + def on_link_opened(self, event): + if event.receiver == self.server_receiver: + # "server" ready to take requests, fire up the "client". All we + # need is to connect since that will activate the tcp adaptor + self.client_conn = event.container.connect(self.listener_address) + + def on_sendable(self, event): + if event.sender == self.server_sender: + if not self.server_sent: + # send the invalid reply + self.server_sender.send(self.msg) + self.server_sent = True + + def on_released(self, event): + self.done(None if self.expected_dispo == Disposition.RELEASED else "Unexpected PN_RELEASED") + + def on_accepted(self, event): + self.done(None if self.expected_dispo == Disposition.ACCEPTED else "Unexpected PN_ACCEPTED") + + def on_rejected(self, event): + self.done(None if self.expected_dispo == Disposition.REJECTED else "Unexpected PN_REJECTED") + + def run(self): + Container(self).run() + + class TcpAdaptorConnCounter(TestCase): """ Validate the TCP service connection counter