diff --git a/src/decoders/http1/http1_decoder.c b/src/decoders/http1/http1_decoder.c index ff612e783..9231730da 100644 --- a/src/decoders/http1/http1_decoder.c +++ b/src/decoders/http1/http1_decoder.c @@ -65,7 +65,8 @@ static const char * const LWS_CHARS = " \t"; typedef enum { - HTTP1_DECODE_REQUEST = 0, // parsing request start line + HTTP1_INVALID_STATE = 0, + HTTP1_DECODE_REQUEST, // parsing request start line HTTP1_DECODE_RESPONSE, // parsing response start line HTTP1_DECODE_HEADERS, // parsing headers HTTP1_DECODE_BODY, // parsing fixed-length body @@ -77,6 +78,7 @@ typedef enum { #if DEBUG_DECODER const char * const decoder_state[] = { + "HTTP1_INVALID_STATE", "HTTP1_DECODE_REQUEST", "HTTP1_DECODE_RESPONSE", "HTTP1_DECODE_HEADERS", @@ -187,8 +189,13 @@ static void ensure_buffer_size(parse_buffer_t *b, size_t required) static inline void decoder_new_state(decoder_t *decoder, http1_decoder_state_t new_state) { #if DEBUG_DECODER - fprintf(stdout, "%s decoder state %s -> %s\n", decoder->is_client ? "client" : "server", - decoder_state[decoder->state], decoder_state[new_state]); + if (decoder->state == HTTP1_INVALID_STATE) { + fprintf(stdout, "%s decoder initial state: %s\n", decoder->is_client ? "client" : "server", + decoder_state[new_state]); + } else { + fprintf(stdout, "%s decoder state %s -> %s\n", decoder->is_client ? "client" : "server", + decoder_state[decoder->state], decoder_state[new_state]); + } #endif decoder->state = new_state; } @@ -256,7 +263,7 @@ static void h1_decode_request_state_free(h1_decode_request_state_t *hrs) // Create a new connection state instance. // -qd_http1_decoder_connection_t *h1_decode_connection(const qd_http1_decoder_config_t *config, uintptr_t user_context) +qd_http1_decoder_connection_t *qd_http1_decoder_connection(const qd_http1_decoder_config_t *config, uintptr_t user_context) { assert(config); @@ -281,7 +288,7 @@ qd_http1_decoder_connection_t *h1_decode_connection(const qd_http1_decoder_confi } -uintptr_t h1_decode_connection_get_context(const qd_http1_decoder_connection_t *hconn) +uintptr_t qd_http1_decoder_connection_get_context(const qd_http1_decoder_connection_t *hconn) { assert(hconn); return hconn->user_context; @@ -290,7 +297,7 @@ uintptr_t h1_decode_connection_get_context(const qd_http1_decoder_connection_t * // Free the connection // -void h1_decode_connection_free(qd_http1_decoder_connection_t *conn) +void qd_http1_decoder_connection_free(qd_http1_decoder_connection_t *conn) { if (conn) { h1_decode_request_state_t *hrs = DEQ_HEAD(conn->hrs_queue); @@ -314,7 +321,7 @@ void h1_decode_connection_free(qd_http1_decoder_connection_t *conn) static void debug_print_line(const char *prefix, const char *line) { #if DEBUG_DECODER - fprintf(stdout, "%s: '%s'\n", prefix, line); + fprintf(stdout, "%s '%s'\n", prefix, line); fflush(stdout); #endif } @@ -376,8 +383,8 @@ static char *read_line(decoder_t *decoder, const unsigned char **data, size_t *l assert(decoder->buffer.length > 0); unsigned char *trim = &decoder->buffer.data[decoder->buffer.length - 1]; while (*trim == LF_TOKEN || *trim == CR_TOKEN) { - *trim-- = '\0'; - if (trim == &decoder->buffer.data[0]) + *trim = '\0'; + if (trim-- == &decoder->buffer.data[0]) break; } decoder->buffer.length = 0; // reset line parser @@ -433,7 +440,7 @@ static bool message_done(qd_http1_decoder_connection_t *hconn, decoder_t *decode // signal the message receive is complete int rc = hconn->config->message_done(hconn, hrs->user_context, decoder->is_client); - if (!rc) { + if (rc) { parser_error(hconn, "message_done callback failed"); return false; } @@ -446,7 +453,7 @@ static bool message_done(qd_http1_decoder_connection_t *hconn, decoder_t *decode DEQ_REMOVE_HEAD(hconn->hrs_queue); h1_decode_request_state_free(hrs); hrs = 0; - if (!rc) { + if (rc) { parser_error(hconn, "transaction_complete callback failed"); return false; } @@ -529,8 +536,6 @@ static bool parse_response_line(qd_http1_decoder_connection_t *hconn, decoder_t return false; // need more data const size_t in_octets = strlen(line); - char *eol = &line[in_octets]; // eol points to null terminator - if (in_octets == 0) { // RFC9112 ignore blank lines before the response return !!(*length); @@ -554,6 +559,7 @@ static bool parse_response_line(qd_http1_decoder_connection_t *hconn, decoder_t char *saveptr = 0; char *version = strtok_r(line, LWS_CHARS, &saveptr); char *status_code = strtok_r(0, LWS_CHARS, &saveptr); + char *reason = strtok_r(0, "", &saveptr); if (!version || !status_code) { parser_error(hconn, "Malformed response line"); @@ -580,14 +586,11 @@ static bool parse_response_line(qd_http1_decoder_connection_t *hconn, decoder_t return false; } - // The reason phrase is optional and may contain spaces. + // The reason phrase is optional - while (eoc < eol) { - // There is something trailing the status code. - eoc = trim_whitespace(eoc); - } + reason = !!reason ? trim_whitespace(reason) : ""; - int rc = hconn->config->rx_response(hconn, hrs->user_context, hrs->response_code, *eoc ? eoc : 0, 1, minor); + int rc = hconn->config->rx_response(hconn, hrs->user_context, hrs->response_code, reason, 1, minor); if (rc) { parser_error(hconn, "rx_response callback failed"); return false; @@ -606,6 +609,9 @@ static bool parse_response_line(qd_http1_decoder_connection_t *hconn, decoder_t // Called after the last incoming header was decoded and passed to the // application. // +// Determines if there is any content to the message. If there is no content we proceed directly to message_done +// otherwise transition to the content consume state. +// // Returns true on success, false on error // static bool headers_done(qd_http1_decoder_connection_t *hconn, struct decoder_t *decoder) @@ -631,8 +637,7 @@ static bool headers_done(qd_http1_decoder_connection_t *hconn, struct decoder_t decoder->is_chunked = false; decoder->hdr_content_length = true; decoder->body_length = 0; - decoder_new_state(decoder, HTTP1_DECODE_BODY); - return true; + return message_done(hconn, decoder); } } @@ -669,13 +674,22 @@ static bool headers_done(qd_http1_decoder_connection_t *hconn, struct decoder_t decoder->body_length = INT64_MAX; decoder_new_state(decoder, HTTP1_DECODE_BODY); return true; + } else { + // Case 7: assume zero length request body + decoder->hdr_content_length = true; + decoder->body_length = 0; } - decoder->body_length = 0; // Case 7: assume zero length body } - // case 6: use content-length - decoder_new_state(decoder, HTTP1_DECODE_BODY); - return true; + // at this point we have elminated chunked and should be set to decode a fixed content length which may be zero + // (Case 6): + assert(decoder->hdr_content_length); + if (decoder->body_length) { + decoder_new_state(decoder, HTTP1_DECODE_BODY); + return true; + } else { + return message_done(hconn, decoder); + } } @@ -806,6 +820,8 @@ static bool consume_body(qd_http1_decoder_connection_t *hconn, decoder_t *decode *data += amount; *length -= amount; + decoder->body_length -= amount; + return true; } @@ -913,7 +929,7 @@ static bool parse_body(qd_http1_decoder_connection_t *hconn, struct decoder_t *d // This is the main decode loop. All callbacks take place in the context of this call. // // -int h1_decode_connection_rx_data(qd_http1_decoder_connection_t *hconn, bool from_client, const unsigned char *data, size_t length) +int qd_http1_decoder_connection_rx_data(qd_http1_decoder_connection_t *hconn, bool from_client, const unsigned char *data, size_t length) { bool more = true; @@ -956,6 +972,7 @@ int h1_decode_connection_rx_data(qd_http1_decoder_connection_t *hconn, bool from break; case HTTP1_DECODE_ERROR: + case HTTP1_INVALID_STATE: more = false; break; } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index ee67922b0..23a3fba5b 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -70,6 +70,9 @@ target_link_libraries(threaded_timer_test skupper-router) add_executable(adaptor_buffer_test adaptor_buffer_test.c) target_link_libraries(adaptor_buffer_test skupper-router) +add_executable(http1-relay http1_relay.c) +target_link_libraries(http1-relay skupper-router) + set(TEST_WRAP ${Python_EXECUTABLE} ${CMAKE_CURRENT_BINARY_DIR}/run.py) add_test(unit_tests_size_10000 ${TEST_WRAP} unit_tests_size 10000) @@ -174,6 +177,7 @@ foreach(py_test_module system_tests_expandvars system_tests_panic_handler system_tests_resend_released + system_tests_http1_decoder ) string(CONFIGURE "${PYTHON_TEST_COMMAND}" CONFIGURED_PYTHON_TEST_COMMAND) diff --git a/tests/http1_relay.c b/tests/http1_relay.c new file mode 100644 index 000000000..71b4b3205 --- /dev/null +++ b/tests/http1_relay.c @@ -0,0 +1,711 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* + * HTTP/1.x test proxy for the HTTP/1.x protocol observer. + * + * Sits between an HTTP/1.x client(s) and a server and invokes the protocol observer codec on the proxied TCP traffic. + */ + +#include "decoders/http1/http1_decoder.h" +#include "qpid/dispatch/alloc.h" +#include "qpid/dispatch/buffer.h" + +#include "proton/proactor.h" +#include "proton/raw_connection.h" +#include "proton/listener.h" +#include "proton/netaddr.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +#define BOOL2STR(b) ((b)?"true":"false") + +bool running = true; // exit main loop when false +bool verbose = false; + +const char *listener_host = "localhost"; +const char *listener_port = "8888"; +char listener_address[1024]; + +const char *server_host = "localhost"; +const char *server_port = "8800"; +char server_address[1024]; + +pn_proactor_t *proactor; +pn_listener_t *listener; + +uintptr_t next_request_id; +uintptr_t next_conn_id; + +// State for one side of the stream +// +typedef struct endpoint_state_t endpoint_state_t; +struct endpoint_state_t { + pn_raw_connection_t *raw_conn; + qd_buffer_list_t outgoing; // buffers staged to send out raw_conn + + bool close_read:1; + bool close_write:1; +}; + +typedef struct h1_stream_t h1_stream_t; +struct h1_stream_t { + DEQ_LINKS(h1_stream_t); + + qd_http1_decoder_connection_t *decoder; + + uintptr_t conn_id; + endpoint_state_t client; // client-facing endpoint + endpoint_state_t server; // server-facing endpoint +}; + +ALLOC_DECLARE(h1_stream_t); +ALLOC_DEFINE(h1_stream_t); +DEQ_DECLARE(h1_stream_t, h1_stream_list_t); + + +h1_stream_list_t stream_list = DEQ_EMPTY; + + +__attribute__((format(printf, 1, 2))) void debug(const char *format, ...) +{ + va_list args; + + if (!verbose) return; + + struct timeval tv_time = {0}; + static const char time_fmt[] = "%Y-%m-%d %H:%M:%S"; + + if (gettimeofday(&tv_time, NULL) != 0) { + perror("gettimeofday failed"); + exit(1); + } + + struct tm tm_time; + char temp[100]; + + if (localtime_r(&tv_time.tv_sec, &tm_time) != &tm_time) { + perror("localtime_r failed"); + exit(1); + } + + if (strftime(temp, sizeof(temp), time_fmt, &tm_time) == 0) { + perror("strftime failed"); + exit(1); + } + + fprintf(stdout, "DEBUG: [%s.%03lld] ", temp, (long long) (tv_time.tv_usec / 1000)); + va_start(args, format); + vfprintf(stdout, format, args); + va_end(args); + fflush(stdout); +} + + +static void signal_handler(int signum) +{ + signal(signum, SIG_IGN); + if (proactor) + pn_proactor_interrupt(proactor); +} + + +/* + * HTTP/1.x decoder callbacks + */ + + +int rx_request(qd_http1_decoder_connection_t *hconn, const char *method, const char *target, uint32_t version_major, uint32_t version_minor, uintptr_t *request_context) +{ + h1_stream_t *stream = (h1_stream_t *) qd_http1_decoder_connection_get_context(hconn); + assert(stream); + + *request_context = ++next_request_id; + fprintf(stdout, "[C%" PRIuPTR ":R%" PRIuPTR "] RX-REQ METHOD=%s TARGET=%s VMAJOR=%" PRIu32 " VMINOR=%" PRIu32 "\n", + stream->conn_id, *request_context, method, target, version_major, version_minor); + return 0; +} + + +int rx_response(qd_http1_decoder_connection_t *hconn, uintptr_t request_context, int status_code, const char *reason_phrase, uint32_t version_major, uint32_t version_minor) +{ + h1_stream_t *stream = (h1_stream_t *) qd_http1_decoder_connection_get_context(hconn); + assert(stream); + + fprintf(stdout, "[C%" PRIuPTR ":R%" PRIuPTR "] RX-RESP STATUS=%d VMAJOR=%" PRIu32 " VMINOR=%" PRIu32 " REASON=%s\n", + stream->conn_id, request_context, status_code, version_major, version_minor, reason_phrase); + return 0; +} + + +int rx_header(qd_http1_decoder_connection_t *hconn, uintptr_t request_context, bool from_client, const char *key, const char *value) +{ + h1_stream_t *stream = (h1_stream_t *) qd_http1_decoder_connection_get_context(hconn); + assert(stream); + + const char *cmd = from_client ? "CLIENT-HEADER" : "SERVER-HEADER"; + fprintf(stdout, "[C%" PRIuPTR ":R%" PRIuPTR "] %s KEY=%s VALUE=%s\n", stream->conn_id, request_context, cmd, key, value); + return 0; +} + + +int rx_headers_done(qd_http1_decoder_connection_t *hconn, uintptr_t request_context, bool from_client) +{ + h1_stream_t *stream = (h1_stream_t *) qd_http1_decoder_connection_get_context(hconn); + assert(stream); + + const char *cmd = from_client ? "CLIENT-HEADER-DONE" : "SERVER-HEADER-DONE"; + fprintf(stdout, "[C%" PRIuPTR ":R%" PRIuPTR "] %s\n", stream->conn_id, request_context, cmd); + return 0; +} + + +int rx_body(qd_http1_decoder_connection_t *hconn, uintptr_t request_context, bool from_client, const unsigned char *body, size_t length) +{ + h1_stream_t *stream = (h1_stream_t *) qd_http1_decoder_connection_get_context(hconn); + assert(stream); + + const char *cmd = from_client ? "CLIENT-BODY" : "SERVER-BODY"; + fprintf(stdout, "[C%" PRIuPTR ":R%" PRIuPTR "] %s LENGTH=%zu\n", stream->conn_id, request_context, cmd, length); + return 0; +} + + +int message_done(qd_http1_decoder_connection_t *hconn, uintptr_t request_context, bool from_client) +{ + h1_stream_t *stream = (h1_stream_t *) qd_http1_decoder_connection_get_context(hconn); + assert(stream); + + const char *cmd = from_client ? "CLIENT-MSG-DONE" : "SERVER-MSG-DONE"; + fprintf(stdout, "[C%" PRIuPTR ":R%" PRIuPTR "] %s\n", stream->conn_id, request_context, cmd); + return 0; +} + + +int transaction_complete(qd_http1_decoder_connection_t *hconn, uintptr_t request_context) +{ + h1_stream_t *stream = (h1_stream_t *) qd_http1_decoder_connection_get_context(hconn); + assert(stream); + + fprintf(stdout, "[C%" PRIuPTR ":R%" PRIuPTR "] TRANSACTION-COMPLETE\n", stream->conn_id, request_context); + return 0; +} + + +void protocol_error(qd_http1_decoder_connection_t *hconn, const char *reason) +{ + h1_stream_t *stream = (h1_stream_t *) qd_http1_decoder_connection_get_context(hconn); + assert(stream); + + fprintf(stdout, "[C%" PRIuPTR "] PROTOCOL-ERROR REASON=%s\n", stream->conn_id, reason); +} + + +qd_http1_decoder_config_t decoder_config = { + .rx_request = rx_request, + .rx_response = rx_response, + .rx_header = rx_header, + .rx_headers_done = rx_headers_done, + .rx_body = rx_body, + .message_done = message_done, + .transaction_complete = transaction_complete, + .protocol_error = protocol_error +}; + + +/** + * h1_stream_t constructor + */ +h1_stream_t *h1_stream(void) +{ + h1_stream_t *stream = new_h1_stream_t(); + ZERO(stream); + + stream->conn_id = ++next_conn_id; + stream->decoder = qd_http1_decoder_connection(&decoder_config, (uintptr_t) stream); + + stream->client.raw_conn = pn_raw_connection(); + pn_raw_connection_set_context(stream->client.raw_conn, stream); + DEQ_INIT(stream->client.outgoing); + + stream->server.raw_conn = pn_raw_connection(); + pn_raw_connection_set_context(stream->server.raw_conn, stream); + DEQ_INIT(stream->server.outgoing); + + return stream; +} + + +/** + * h1_stream_t destructor + */ +void h1_stream_free(h1_stream_t *stream) +{ + if (stream) { + qd_http1_decoder_connection_free(stream->decoder); + qd_buffer_list_free_buffers(&stream->client.outgoing); + qd_buffer_list_free_buffers(&stream->server.outgoing); + free_h1_stream_t(stream); + } +} + + +// fill the raw_conn with empty buffers for receiving input +// +static void grant_read_buffers(pn_raw_connection_t *raw_conn) +{ + pn_raw_buffer_t pn_desc; + + size_t count = pn_raw_connection_read_buffers_capacity(raw_conn); + debug("Granting %zu read buffers\n", count); + + while (count--) { + qd_buffer_t *buf = qd_buffer(); + + pn_desc.context = (uintptr_t) buf; + pn_desc.bytes = (char *) qd_buffer_base(buf); + pn_desc.capacity = qd_buffer_capacity(buf); + pn_desc.size = 0; + pn_desc.offset = 0; + + if (pn_raw_connection_give_read_buffers(raw_conn, &pn_desc, 1) != 1) { + fprintf(stderr, "raw give read buffer failed\n"); + exit(1); + } + } +} + + +// Free used write buffers +// +static void release_written_buffers(pn_raw_connection_t *raw_conn) +{ + pn_raw_buffer_t pn_desc = {0}; + + size_t taken = pn_raw_connection_take_written_buffers(raw_conn, &pn_desc, 1); + while (taken) { + qd_buffer_t *buf = (qd_buffer_t *)pn_desc.context; + assert(buf); + qd_buffer_free(buf); + taken = pn_raw_connection_take_written_buffers(raw_conn, &pn_desc, 1); + } +} + + +// Move full read buffers from src endpoint to the write channel of dst. +// Pass the buffers through the decoder as they are forwarded +// +static void forward_read_buffers(h1_stream_t *stream, bool is_client, endpoint_state_t *src, endpoint_state_t *dst) +{ + pn_raw_buffer_t pn_desc = {0}; + unsigned int count = 0; + size_t amount = 0; + + size_t taken = pn_raw_connection_take_read_buffers(src->raw_conn, &pn_desc, 1); + while (taken == 1) { + qd_buffer_t *buf = (qd_buffer_t *) pn_desc.context; + assert(buf); + + if (pn_desc.size > 0) { + // move to peer + buf->size = pn_desc.size; + DEQ_INSERT_TAIL(dst->outgoing, buf); + amount += pn_desc.size; + count += 1; + + // Pass the data to the decoder. There is no recovery if the decoder fails. + // + if (stream->decoder) { + int rc = qd_http1_decoder_connection_rx_data(stream->decoder, is_client, qd_buffer_base(buf), qd_buffer_size(buf)); + if (rc) { + debug("Decoder failed! rc=%d\n", rc); + qd_http1_decoder_connection_free(stream->decoder); + stream->decoder = 0; + } + } + + } else { + qd_buffer_free(buf); + } + + taken = pn_raw_connection_take_read_buffers(src->raw_conn, &pn_desc, 1); + } + + if (count && dst->raw_conn) { + debug("Forwarded %u buffers (%zu octets)\n", count, amount); + pn_raw_connection_wake(dst->raw_conn); + } + +} + + +// send outgoing buffers out the endpoint +// +static void send_buffers(endpoint_state_t *endpoint) +{ + pn_raw_buffer_t pn_desc = {0}; + unsigned int count = 0; + size_t amount = 0; + + size_t capacity = pn_raw_connection_write_buffers_capacity(endpoint->raw_conn); + while (capacity > 0) { + qd_buffer_t *buf = DEQ_HEAD(endpoint->outgoing); + if (!buf) + break; + + DEQ_REMOVE_HEAD(endpoint->outgoing); + + pn_desc.context = (uintptr_t) buf; + pn_desc.bytes = (char *) qd_buffer_base(buf); + pn_desc.size = qd_buffer_size(buf); + + size_t given = pn_raw_connection_write_buffers(endpoint->raw_conn, &pn_desc, 1); + if (given != 1) + break; + + amount += pn_desc.size; + count += 1; + capacity -= 1; + } + + if (count) { + debug("Wrote %u buffers (%zu octets)\n", count, amount); + } + + if (DEQ_IS_EMPTY(endpoint->outgoing) && endpoint->close_write) { + endpoint->close_write = false; + pn_raw_connection_write_close(endpoint->raw_conn); + debug("endpoint write closed\n"); + } +} + + +/** + * discard any buffers held by raw_conn + */ +static void flush_buffers(pn_raw_connection_t *raw_conn) +{ + pn_raw_buffer_t pn_desc = {0}; + + release_written_buffers(raw_conn); + size_t taken = pn_raw_connection_take_read_buffers(raw_conn, &pn_desc, 1); + while (taken == 1) { + assert(pn_desc.context); + qd_buffer_free((qd_buffer_t *) pn_desc.context); + taken = pn_raw_connection_take_read_buffers(raw_conn, &pn_desc, 1); + } +} + + +/** + * Raw connection event handler - dispatch to the handlers for client or server. + */ +static void raw_event_handler(pn_event_t *event) +{ + pn_event_type_t etype = pn_event_type(event); + pn_raw_connection_t *raw_conn = pn_event_raw_connection(event); + assert(raw_conn); + h1_stream_t *stream = pn_raw_connection_get_context(raw_conn); + assert(stream); + + endpoint_state_t *endpoint; + endpoint_state_t *peer; + bool is_client; + + if (raw_conn == stream->client.raw_conn) { + debug("client connection (%p) event=%s\n", (void *) raw_conn, pn_event_type_name(etype)); + is_client = true; + endpoint = &stream->client; + peer = &stream->server; + } else { + debug("server connection (%p) event=%s\n", (void *) raw_conn, pn_event_type_name(etype)); + assert(raw_conn == stream->server.raw_conn); + is_client = false; + endpoint = &stream->server; + peer = &stream->client; + } + + switch (etype) { + case PN_RAW_CONNECTION_CONNECTED: { + const pn_netaddr_t *local_addr = pn_raw_connection_local_addr(raw_conn); + const pn_netaddr_t *remote_addr = pn_raw_connection_remote_addr(raw_conn); + char remote_host[200]; + char local_host[200]; + char remote_port[32]; + char local_port[32]; + if (pn_netaddr_host_port(remote_addr, remote_host, sizeof(remote_host), remote_port, sizeof(remote_port)) == 0 + && pn_netaddr_host_port(local_addr, local_host, sizeof(local_host), local_port, sizeof(local_port)) == 0) { + debug("%s connected, local=%s:%s remote=%s:%s\n", (is_client) ? "client" : "server", + local_host, local_port, remote_host, remote_port); + } + } break; + + case PN_RAW_CONNECTION_CLOSED_READ: + if (peer->raw_conn) { + debug("Closing peer write side\n"); + peer->close_write = true; + pn_raw_connection_wake(peer->raw_conn); + } + break; + + case PN_RAW_CONNECTION_CLOSED_WRITE: + if (peer->raw_conn) { + debug("Closing peer read side\n"); + peer->close_read = true; + pn_raw_connection_wake(peer->raw_conn); + } + break; + + case PN_RAW_CONNECTION_DISCONNECTED: { + flush_buffers(endpoint->raw_conn); + pn_raw_connection_set_context(endpoint->raw_conn, 0); + endpoint->raw_conn = 0; + if (peer->raw_conn == 0) { + debug("Connection %" PRIuPTR " closed.\n", stream->conn_id); + DEQ_REMOVE(stream_list, stream); + h1_stream_free(stream); + } else { + debug("force-closing peer connection\n"); + peer->close_read = true; + peer->close_write = true; + pn_raw_connection_wake(peer->raw_conn); + } + return; // endpoint.raw_conn no longer valid + } + + case PN_RAW_CONNECTION_NEED_READ_BUFFERS: + grant_read_buffers(endpoint->raw_conn); + break; + + case PN_RAW_CONNECTION_NEED_WRITE_BUFFERS: + send_buffers(endpoint); + break; + + case PN_RAW_CONNECTION_READ: + forward_read_buffers(stream, is_client, endpoint, peer); + break; + + case PN_RAW_CONNECTION_WRITTEN: + release_written_buffers(endpoint->raw_conn); + send_buffers(endpoint); + break; + + case PN_RAW_CONNECTION_WAKE: + send_buffers(endpoint); + + if (endpoint->close_read) { + endpoint->close_read = false; + pn_raw_connection_read_close(endpoint->raw_conn); + debug("endpoint read closed\n"); + } + break; + + case PN_RAW_CONNECTION_DRAIN_BUFFERS: + flush_buffers(endpoint->raw_conn); + break; + + default: + break; + } +} + + +/** + * Listener event handler + */ +static void listener_event_handler(pn_event_t *event) +{ + const pn_event_type_t etype = pn_event_type(event); + debug("new listener event=%s\n", pn_event_type_name(etype)); + + switch (etype) { + + case PN_LISTENER_ACCEPT: { + h1_stream_t *stream = h1_stream(); + DEQ_INSERT_TAIL(stream_list, stream); + + // raw connection events for these connections will arrive after these calls + debug("Accepting new client connection, initiating server connection to %s\n", server_address); + pn_listener_raw_accept(listener, stream->client.raw_conn); + pn_proactor_raw_connect(proactor, stream->server.raw_conn, server_address); + } break; + + case PN_LISTENER_OPEN: + break; + + case PN_LISTENER_CLOSE: + break; + + default: + break; + } +} + + +/** + * Handle generic (non-raw-connection) proactor events + */ +static void proactor_event_handler(pn_event_t *event) +{ + const pn_event_type_t etype = pn_event_type(event); + debug("new proactor event=%s\n", pn_event_type_name(etype)); + + switch (etype) { + + case PN_PROACTOR_INTERRUPT: { + running = false; + } break; + + case PN_PROACTOR_TIMEOUT: + default: + break; + } +} + + +static void usage(const char *prog) +{ + printf("Usage: %s \n", prog); + printf("-l \tThe listener address for incoming client connections [%s:%s]\n", listener_host, listener_port); + printf("-s \tThe server address to connect to [%s:%s]\n", server_host, server_port); + printf("-v \tEnable debug output\n"); + exit(1); +} + + +int main_loop(void ) +{ + void (*event_handler)(pn_event_t *event); + + proactor = pn_proactor(); + listener = pn_listener(); + + int rc = pn_proactor_addr(listener_address, sizeof(listener_address), listener_host, listener_port); + if (rc >= sizeof(listener_address)) { + fprintf(stderr, "listener address too long!\n"); + exit(1); + } + + rc = pn_proactor_addr(server_address, sizeof(server_address), server_host, server_port); + if (rc >= sizeof(server_address)) { + fprintf(stderr, "server address too long!\n"); + exit(1); + } + + debug("Creating listener on address %s\n", listener_address); + pn_proactor_listen(proactor, listener, listener_address, 10); + + while (running) { + debug("Waiting for proactor event...\n"); + pn_event_batch_t *events = pn_proactor_wait(proactor); + debug("Start new proactor batch\n"); + + if (pn_event_batch_raw_connection(events)) { + event_handler = raw_event_handler; + } else if (pn_event_batch_listener(events)) { + event_handler = listener_event_handler; + } else { + event_handler = proactor_event_handler; + } + + pn_event_t *event = pn_event_batch_next(events); + while (event) { + event_handler(event); + event = pn_event_batch_next(events); + } + + debug("Proactor batch processing done\n"); + pn_proactor_done(proactor, events); + } + + debug("Send complete!\n"); + pn_proactor_free(proactor); + + return 0; +} + + +int main(int argc, char** argv) +{ + /* command line options */ + opterr = 0; + int c; + while ((c = getopt(argc, argv, "hl:s:v")) != -1) { + switch(c) { + case 'h': usage(argv[0]); break; + case 'v': verbose = true; break; + case 'l': { + char *colon = strrchr(optarg, ':'); + if (!colon) { + fprintf(stderr, "Error: invalid listener address format: missing port\n"); + usage(argv[0]); + } else { + *colon = '\0'; + listener_port = ++colon; + listener_host = optarg; + } + } break; + case 's': { + char *colon = strrchr(optarg, ':'); + if (!colon) { + fprintf(stderr, "Error: invalid server address format: missing port\n"); + usage(argv[0]); + } else { + *colon = '\0'; + server_port = ++colon; + server_host = optarg; + } + } break; + default: + usage(argv[0]); + break; + } + } + + signal(SIGQUIT, signal_handler); + signal(SIGINT, signal_handler); + signal(SIGTERM, signal_handler); + + qd_alloc_initialize(); + + int rc = main_loop(); + + h1_stream_t *stream = DEQ_HEAD(stream_list); + while (stream) { + DEQ_REMOVE_HEAD(stream_list); + h1_stream_free(stream); + stream = DEQ_HEAD(stream_list); + } + + qd_alloc_finalize(); + + return rc; +} + diff --git a/tests/system_test.py b/tests/system_test.py index 3bb3c2e94..f4a88b6eb 100755 --- a/tests/system_test.py +++ b/tests/system_test.py @@ -303,7 +303,9 @@ def wait_port(port, socket_address_family='IPv4', **retry_kwargs): Takes same keyword arguments as retry to control the timeout""" def check(e): """Only retry on connection refused""" - assert isinstance(e, socket.error) or not e.errno == errno.ECONNREFUSED + if isinstance(e, socket.error) and e.errno == errno.ECONNREFUSED: + return # try again + raise e host = None @@ -312,14 +314,13 @@ def connect(): # man 3 connect: "If connect() fails, the state of the socket is unspecified. [...]" s, host = get_local_host_socket(socket_address_family) try: + s.settimeout(retry_kwargs.get('timeout', TIMEOUT)) s.connect((host, port)) + s.shutdown(socket.SHUT_RDWR) finally: s.close() - try: - retry_exception(connect, exception_test=check, **retry_kwargs) - except Exception as e: - raise Exception("wait_port timeout on host %s port %s: %s" % (host, port, e)) + retry_exception(connect, exception_test=check, **retry_kwargs) def wait_ports(ports, **retry_kwargs): diff --git a/tests/system_tests_http1_decoder.py b/tests/system_tests_http1_decoder.py new file mode 100644 index 000000000..bfd67164b --- /dev/null +++ b/tests/system_tests_http1_decoder.py @@ -0,0 +1,831 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# +# Test the HTTP/1.x Protocol Decoder +# + +import socket +import subprocess +from time import sleep + +from system_test import TestCase, unittest, main_module +from system_test import Process, TIMEOUT, retry_exception + + +# commands for the log file parser +# +MATCH = 1 +CLIENT_BODY = 2 +SERVER_BODY = 3 + + +def _drain_socket(sock): + """ + Read data from socket until the socket is closed + """ + data = b'' + while True: + chunk = sock.recv(4096) + if not chunk: # socket closed + break + data += chunk + return data + + +class Http1TestRelay(Process): + """ + Run the HTTP/1.x test relay program (http1-relay) as a subprocess + """ + def __init__(self, listener_address, server_address, name="Http1TestRelay", + **kwargs): + self.listener_address = listener_address + self.server_address = server_address + + kwargs.setdefault('stdin', subprocess.DEVNULL) # else file descriptor leak + kwargs.setdefault('bufsize', 1) # line-buffer output + kwargs.setdefault('universal_newlines', True) # text output + args = ['http1-relay', + '-l', f"{listener_address[0]}:{listener_address[1]}", + '-s', f"{server_address[0]}:{server_address[1]}", + # '-v' + ] + super(Http1TestRelay, self).__init__(args, name=name, + expect=Process.EXIT_OK, **kwargs) + + def read_log(self): + log = [] + with open(self.outfile_path, 'r') as out: + line = out.readline() + while line: + log.append(line) + line = out.readline() + return log + + def shutdown(self): + if self.poll() is None: + self.terminate() + self.wait(TIMEOUT) + + +class Http1DecoderTest(TestCase): + """ + Simulate various client and server flows across the HTTP/1.x test proxy. + Verify expected behavior of the decoder + """ + @classmethod + def setUpClass(cls): + super(Http1DecoderTest, cls).setUpClass() + + def _spawn_relay(self, name, listener_address, server_address): + return Http1TestRelay(listener_address=listener_address, server_address=server_address, name=name) + + def _run_io(self, proxy, client_stream, server_stream): + """ + Pass client_stream and server_stream through the proxy. + Validate that the inputs are passed through completely. + """ + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as listener: + listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + listener.settimeout(TIMEOUT) + listener.bind(proxy.server_address) + listener.listen(10) + + sleep(0.1) # paranoia: ensure listener is active + + # attach a client to the proxy + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client: + client.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + client.settimeout(TIMEOUT) + retry_exception(lambda cs=client: + cs.connect(proxy.listener_address), + delay=0.25, + exception=(ConnectionRefusedError, + ConnectionAbortedError)) + + # wait for the server connection from the proxy + + server, _ = listener.accept() + try: + client.sendall(client_stream) + client.shutdown(socket.SHUT_WR) + server_output = _drain_socket(server) + server.sendall(server_stream) + server.shutdown(socket.SHUT_WR) + client_output = _drain_socket(client) + return (client_output, server_output) + + finally: + server.close() + + def _test_runner(self, name, client_stream, server_stream): + """ + Simple test runner that passes client and server stream through the + HTTP/1.x decoder proxy. Returns the log output generated by the proxy. + """ + listener_address = ("127.0.0.1", self.get_port()) + server_address = ("127.0.0.1", self.get_port()) + + proxy = self._spawn_relay(name, listener_address, server_address) + try: + cout, sout = self._run_io(proxy, client_stream, server_stream) + finally: + proxy.shutdown() + + return proxy.read_log() + + def _compare_log(self, expected, log): + """ + Compare the output log of the relay against the expected output. + Do some special hacky fu to verify the body length count + """ + try: + index = 0 + for opcode in expected: + if opcode[0] == MATCH: + # opcode = (MATCH, "") + while log[index].split()[0] == 'DEBUG:': + index += 1 + self.assertEqual(opcode[1], log[index]) + index += 1 + elif opcode[0] in [CLIENT_BODY, SERVER_BODY]: + # opcode = (SERVER/CLIENT_BODY, "", " + # log = "[Cx:Rx] SERVER/CLIENT_BODY LENGTH=\n" + stream_id = opcode[1] + body_length = opcode[2] + while True: + token = log[index].split() + index += 1 + if token[0] != 'DEBUG:': + self.assertEqual(stream_id, token[0], "stream id mismatch") + if opcode[0] == CLIENT_BODY: + self.assertEqual(token[1], 'CLIENT-BODY', "expected body identifier") + else: + self.assertEqual(token[1], 'SERVER-BODY', "expected body identifier") + chunk_len = int(token[2].split("=")[1]) + self.assertLessEqual(chunk_len, body_length, "body overflow") + body_length -= chunk_len + if body_length <= 0: + break + self.assertEqual(0, body_length, f"Expected body_length={opcode[2]}") + except IndexError: + self.fail(f"log failed to match (too short)\n{log}") + + def test_01_simple_get(self): + """ + Simple GET request with zero content reply + """ + client_stream = b'GET / HTTP/1.1\r\n\r\n' + server_stream = b'HTTP/1.1 200 OK\r\ncontent-length: 0\r\n\r\n' + expected = [ + (MATCH, '[C1:R1] RX-REQ METHOD=GET TARGET=/ VMAJOR=1 VMINOR=1\n'), + (MATCH, '[C1:R1] CLIENT-HEADER-DONE\n'), + (MATCH, '[C1:R1] CLIENT-MSG-DONE\n'), + + (MATCH, '[C1:R1] RX-RESP STATUS=200 VMAJOR=1 VMINOR=1 REASON=OK\n'), + (MATCH, '[C1:R1] SERVER-HEADER KEY=content-length VALUE=0\n'), + (MATCH, '[C1:R1] SERVER-HEADER-DONE\n'), + (MATCH, '[C1:R1] SERVER-MSG-DONE\n'), + (MATCH, '[C1:R1] TRANSACTION-COMPLETE\n') + ] + log = self._test_runner("test_01", client_stream, server_stream) + self._compare_log(expected, log) + + def test_02_get_content_len_pipeline(self): + """ + Pipelined GET requests with content-length replies + """ + client_stream = b'GET /one HTTP/1.1\r\n\r\n' \ + + b'GET /two HTTP/1.1\r\n\r\n' + server_stream = b'HTTP/1.1 200 OK\r\ncontent-length: 51\r\n' \ + + b'\r\n' + b'2' * 51 \ + + b'HTTP/1.1 200 OK\r\ncontent-length: 1\r\n' \ + + b'\r\n' + b'3' + + expected = [ + (MATCH, '[C1:R1] RX-REQ METHOD=GET TARGET=/one VMAJOR=1 VMINOR=1\n'), + (MATCH, '[C1:R1] CLIENT-HEADER-DONE\n'), + (MATCH, '[C1:R1] CLIENT-MSG-DONE\n'), + + (MATCH, '[C1:R2] RX-REQ METHOD=GET TARGET=/two VMAJOR=1 VMINOR=1\n'), + (MATCH, '[C1:R2] CLIENT-HEADER-DONE\n'), + (MATCH, '[C1:R2] CLIENT-MSG-DONE\n'), + + (MATCH, '[C1:R1] RX-RESP STATUS=200 VMAJOR=1 VMINOR=1 REASON=OK\n'), + (MATCH, '[C1:R1] SERVER-HEADER KEY=content-length VALUE=51\n'), + (MATCH, '[C1:R1] SERVER-HEADER-DONE\n'), + (SERVER_BODY, '[C1:R1]', 51), + (MATCH, '[C1:R1] SERVER-MSG-DONE\n'), + (MATCH, '[C1:R1] TRANSACTION-COMPLETE\n'), + + (MATCH, '[C1:R2] RX-RESP STATUS=200 VMAJOR=1 VMINOR=1 REASON=OK\n'), + (MATCH, '[C1:R2] SERVER-HEADER KEY=content-length VALUE=1\n'), + (MATCH, '[C1:R2] SERVER-HEADER-DONE\n'), + (SERVER_BODY, '[C1:R2]', 1), + (MATCH, '[C1:R2] SERVER-MSG-DONE\n'), + (MATCH, '[C1:R2] TRANSACTION-COMPLETE\n') + ] + + log = self._test_runner("test_02", client_stream, server_stream) + self._compare_log(expected, log) + + def test_03_post_content_len_pipeline(self): + """ + Pipelined POST requests with content-length request and replies + """ + client_stream = b'POST /one HTTP/1.1\r\n' \ + + b'IAmAHeader: Header value\r\n' \ + + b'CoNteNT-lenGth: 12\r\n' \ + + b'Hi: There\r\n' \ + + b'\r\n' \ + + b'ABCDEFGHIJKL' \ + + b'POST /two HTTP/1.1\r\n' \ + + b'content-length: +4\r\n' \ + + b'\r\n' \ + + b'1234' + server_stream = b'HTTP/1.0 200 OK\r\n' \ + + b'header: foo\r\n' \ + + b'content-length: 0\r\n' \ + + b'\r\n' \ + + b'HTTP/1.1 200 OK\r\n' \ + + b'content-length: 1\r\n' \ + + b'\r\n' \ + + b'X' + + expected = [ + (MATCH, '[C1:R1] RX-REQ METHOD=POST TARGET=/one VMAJOR=1 VMINOR=1\n'), + (MATCH, '[C1:R1] CLIENT-HEADER KEY=IAmAHeader VALUE=Header value\n'), + (MATCH, '[C1:R1] CLIENT-HEADER KEY=CoNteNT-lenGth VALUE=12\n'), + (MATCH, '[C1:R1] CLIENT-HEADER KEY=Hi VALUE=There\n'), + (MATCH, '[C1:R1] CLIENT-HEADER-DONE\n'), + (CLIENT_BODY, '[C1:R1]', 12), + (MATCH, '[C1:R1] CLIENT-MSG-DONE\n'), + + (MATCH, '[C1:R2] RX-REQ METHOD=POST TARGET=/two VMAJOR=1 VMINOR=1\n'), + (MATCH, '[C1:R2] CLIENT-HEADER KEY=content-length VALUE=+4\n'), + (MATCH, '[C1:R2] CLIENT-HEADER-DONE\n'), + (CLIENT_BODY, '[C1:R2]', 4), + (MATCH, '[C1:R2] CLIENT-MSG-DONE\n'), + + (MATCH, '[C1:R1] RX-RESP STATUS=200 VMAJOR=1 VMINOR=0 REASON=OK\n'), + (MATCH, '[C1:R1] SERVER-HEADER KEY=header VALUE=foo\n'), + (MATCH, '[C1:R1] SERVER-HEADER KEY=content-length VALUE=0\n'), + (MATCH, '[C1:R1] SERVER-HEADER-DONE\n'), + (MATCH, '[C1:R1] SERVER-MSG-DONE\n'), + (MATCH, '[C1:R1] TRANSACTION-COMPLETE\n'), + + (MATCH, '[C1:R2] RX-RESP STATUS=200 VMAJOR=1 VMINOR=1 REASON=OK\n'), + (MATCH, '[C1:R2] SERVER-HEADER KEY=content-length VALUE=1\n'), + (MATCH, '[C1:R2] SERVER-HEADER-DONE\n'), + (SERVER_BODY, '[C1:R2]', 1), + (MATCH, '[C1:R2] SERVER-MSG-DONE\n'), + (MATCH, '[C1:R2] TRANSACTION-COMPLETE\n') + ] + + log = self._test_runner("test_03", client_stream, server_stream) + self._compare_log(expected, log) + + def test_04_post_content_len_big_msg(self): + """ + Pipelined POST requests with multi-buffer request and reply bodies + """ + client_stream = b'POST /one HTTP/1.1\r\n' \ + + b'IAmAHeader: Header value\r\n' \ + + b'CoNteNT-lenGth: 32777\r\n' \ + + b'Hi: There\r\n' \ + + b'\r\n' \ + + b'?' * 32777 \ + + b'POST /two HTTP/1.1\r\n' \ + + b'content-length: +4\r\n' \ + + b'\r\n' \ + + b'1234' + server_stream = b'HTTP/1.0 200 OK\r\n' \ + + b'header: foo\r\n' \ + + b'content-length: 52831\r\n' \ + + b'\r\n' \ + + b'!' * 52831 \ + + b'HTTP/1.1 200 OK\r\n' \ + + b'content-length: 1\r\n' \ + + b'\r\n' \ + + b'X' + + expected = [ + (MATCH, '[C1:R1] RX-REQ METHOD=POST TARGET=/one VMAJOR=1 VMINOR=1\n'), + (MATCH, '[C1:R1] CLIENT-HEADER KEY=IAmAHeader VALUE=Header value\n'), + (MATCH, '[C1:R1] CLIENT-HEADER KEY=CoNteNT-lenGth VALUE=32777\n'), + (MATCH, '[C1:R1] CLIENT-HEADER KEY=Hi VALUE=There\n'), + (MATCH, '[C1:R1] CLIENT-HEADER-DONE\n'), + (CLIENT_BODY, '[C1:R1]', 32777), + (MATCH, '[C1:R1] CLIENT-MSG-DONE\n'), + + (MATCH, '[C1:R2] RX-REQ METHOD=POST TARGET=/two VMAJOR=1 VMINOR=1\n'), + (MATCH, '[C1:R2] CLIENT-HEADER KEY=content-length VALUE=+4\n'), + (MATCH, '[C1:R2] CLIENT-HEADER-DONE\n'), + (CLIENT_BODY, '[C1:R2]', 4), + (MATCH, '[C1:R2] CLIENT-MSG-DONE\n'), + + (MATCH, '[C1:R1] RX-RESP STATUS=200 VMAJOR=1 VMINOR=0 REASON=OK\n'), + (MATCH, '[C1:R1] SERVER-HEADER KEY=header VALUE=foo\n'), + (MATCH, '[C1:R1] SERVER-HEADER KEY=content-length VALUE=52831\n'), + (MATCH, '[C1:R1] SERVER-HEADER-DONE\n'), + (SERVER_BODY, '[C1:R1]', 52831), + (MATCH, '[C1:R1] SERVER-MSG-DONE\n'), + (MATCH, '[C1:R1] TRANSACTION-COMPLETE\n'), + + (MATCH, '[C1:R2] RX-RESP STATUS=200 VMAJOR=1 VMINOR=1 REASON=OK\n'), + (MATCH, '[C1:R2] SERVER-HEADER KEY=content-length VALUE=1\n'), + (MATCH, '[C1:R2] SERVER-HEADER-DONE\n'), + (SERVER_BODY, '[C1:R2]', 1), + (MATCH, '[C1:R2] SERVER-MSG-DONE\n'), + (MATCH, '[C1:R2] TRANSACTION-COMPLETE\n'), + ] + + log = self._test_runner("test_04", client_stream, server_stream) + self._compare_log(expected, log) + + def test_05_chunked_get(self): + """ + GET with chunked body response + """ + client_stream = b'GET /one HTTP/1.1\r\n' \ + + b'IAmAHeader: Header value\r\n' \ + + b'CoNteNT-lenGth: 0 \r\n' \ + + b'\r\n' \ + + b'POST /two HTTP/1.1\r\n' \ + + b'content-length: +4\r\n' \ + + b'\r\n' \ + + b'1234' + server_stream = b'HTTP/1.1 200 OK\r\n' \ + + b'header: foo\r\n' \ + + b'TransfeR-enCoding: biz,baz,chunked\r\n' \ + + b'header: bar\r\n' \ + + b'\r\n' \ + + b'0A\r\n' \ + + b'1234567890\r\n' \ + + b'1\r\n' \ + + b'1\r\n' \ + + b'00\r\n' \ + + b'\r\n' \ + + b'HTTP/1.1 200 OK\r\n' \ + + b'content-length: 1\r\n' \ + + b'\r\n' \ + + b'X' + + expected = [ + (MATCH, '[C1:R1] RX-REQ METHOD=GET TARGET=/one VMAJOR=1 VMINOR=1\n'), + (MATCH, '[C1:R1] CLIENT-HEADER KEY=IAmAHeader VALUE=Header value\n'), + (MATCH, '[C1:R1] CLIENT-HEADER KEY=CoNteNT-lenGth VALUE=0\n'), + (MATCH, '[C1:R1] CLIENT-HEADER-DONE\n'), + (MATCH, '[C1:R1] CLIENT-MSG-DONE\n'), + + (MATCH, '[C1:R2] RX-REQ METHOD=POST TARGET=/two VMAJOR=1 VMINOR=1\n'), + (MATCH, '[C1:R2] CLIENT-HEADER KEY=content-length VALUE=+4\n'), + (MATCH, '[C1:R2] CLIENT-HEADER-DONE\n'), + (CLIENT_BODY, '[C1:R2]', 4), + (MATCH, '[C1:R2] CLIENT-MSG-DONE\n'), + + (MATCH, '[C1:R1] RX-RESP STATUS=200 VMAJOR=1 VMINOR=1 REASON=OK\n'), + (MATCH, '[C1:R1] SERVER-HEADER KEY=header VALUE=foo\n'), + (MATCH, '[C1:R1] SERVER-HEADER KEY=TransfeR-enCoding VALUE=biz,baz,chunked\n'), + (MATCH, '[C1:R1] SERVER-HEADER KEY=header VALUE=bar\n'), + (MATCH, '[C1:R1] SERVER-HEADER-DONE\n'), + (SERVER_BODY, '[C1:R1]', 10), + (SERVER_BODY, '[C1:R1]', 1), + (MATCH, '[C1:R1] SERVER-MSG-DONE\n'), + (MATCH, '[C1:R1] TRANSACTION-COMPLETE\n'), + + (MATCH, '[C1:R2] RX-RESP STATUS=200 VMAJOR=1 VMINOR=1 REASON=OK\n'), + (MATCH, '[C1:R2] SERVER-HEADER KEY=content-length VALUE=1\n'), + (MATCH, '[C1:R2] SERVER-HEADER-DONE\n'), + (SERVER_BODY, '[C1:R2]', 1), + (MATCH, '[C1:R2] SERVER-MSG-DONE\n'), + (MATCH, '[C1:R2] TRANSACTION-COMPLETE\n') + ] + + log = self._test_runner("test_05", client_stream, server_stream) + self._compare_log(expected, log) + + def test_06_chunked_extensions(self): + """ + GET with chunked body response with chunk extensions + """ + client_stream = b'GET /one HTTP/1.1\r\n' \ + + b'IAmAHeader: Header value\r\n' \ + + b'CoNteNT-lenGth: 0 \r\n' \ + + b'\r\n' \ + + b'POST /two HTTP/1.1\r\n' \ + + b'content-length: +4\r\n' \ + + b'\r\n' \ + + b'1234' + server_stream = b'HTTP/1.1 200 OK\r\n' \ + + b'header: foo\r\n' \ + + b'TransfeR-enCoding: biz ,, baz, chunked,\r\n' \ + + b'header: bar\r\n' \ + + b'\r\n' \ + + b'0A;ex1=hi\r\n' \ + + b'1234567890\r\n' \ + + b'1 ; ex2 = bad\r\n' \ + + b'1\r\n' \ + + b'FF ; ex3 = bad\r\n' \ + + b'F' * 0xFF \ + + b'\r\n' \ + + b'00;boo= bar; zim=zam\r\n' \ + + b'trailer=1\r\n' \ + + b't=trailer,two\r\n' \ + + b'\r\n' \ + + b'HTTP/1.1 200 OK\r\n' \ + + b'content-length: 1\r\n' \ + + b'\r\n' \ + + b'X' + + expected = [ + (MATCH, '[C1:R1] RX-REQ METHOD=GET TARGET=/one VMAJOR=1 VMINOR=1\n'), + (MATCH, '[C1:R1] CLIENT-HEADER KEY=IAmAHeader VALUE=Header value\n'), + (MATCH, '[C1:R1] CLIENT-HEADER KEY=CoNteNT-lenGth VALUE=0\n'), + (MATCH, '[C1:R1] CLIENT-HEADER-DONE\n'), + (MATCH, '[C1:R1] CLIENT-MSG-DONE\n'), + + (MATCH, '[C1:R2] RX-REQ METHOD=POST TARGET=/two VMAJOR=1 VMINOR=1\n'), + (MATCH, '[C1:R2] CLIENT-HEADER KEY=content-length VALUE=+4\n'), + (MATCH, '[C1:R2] CLIENT-HEADER-DONE\n'), + (CLIENT_BODY, '[C1:R2]', 4), + (MATCH, '[C1:R2] CLIENT-MSG-DONE\n'), + + (MATCH, '[C1:R1] RX-RESP STATUS=200 VMAJOR=1 VMINOR=1 REASON=OK\n'), + (MATCH, '[C1:R1] SERVER-HEADER KEY=header VALUE=foo\n'), + (MATCH, '[C1:R1] SERVER-HEADER KEY=TransfeR-enCoding VALUE=biz ,, baz, chunked,\n'), + (MATCH, '[C1:R1] SERVER-HEADER KEY=header VALUE=bar\n'), + (MATCH, '[C1:R1] SERVER-HEADER-DONE\n'), + (SERVER_BODY, '[C1:R1]', 10), + (SERVER_BODY, '[C1:R1]', 1), + (SERVER_BODY, '[C1:R1]', 255), + (MATCH, '[C1:R1] SERVER-MSG-DONE\n'), + (MATCH, '[C1:R1] TRANSACTION-COMPLETE\n'), + + (MATCH, '[C1:R2] RX-RESP STATUS=200 VMAJOR=1 VMINOR=1 REASON=OK\n'), + (MATCH, '[C1:R2] SERVER-HEADER KEY=content-length VALUE=1\n'), + (MATCH, '[C1:R2] SERVER-HEADER-DONE\n'), + (SERVER_BODY, '[C1:R2]', 1), + (MATCH, '[C1:R2] SERVER-MSG-DONE\n'), + (MATCH, '[C1:R2] TRANSACTION-COMPLETE\n'), + ] + + log = self._test_runner("test_06", client_stream, server_stream) + self._compare_log(expected, log) + + def test_07_http1_0(self): + """ + HTTP/1.0 implied length + """ + client_stream = b'GET /one HTTP/1.0\r\n' \ + + b'IAmAHeader: Header value\r\n' \ + + b'\r\n' + server_stream = b'HTTP/1.0 200 OK\r\n' \ + + b'header: foo\r\n' \ + + b'header: bar\r\n' \ + + b'\r\n' \ + + b'CONTENT' + + expected = [ + (MATCH, '[C1:R1] RX-REQ METHOD=GET TARGET=/one VMAJOR=1 VMINOR=0\n'), + (MATCH, '[C1:R1] CLIENT-HEADER KEY=IAmAHeader VALUE=Header value\n'), + (MATCH, '[C1:R1] CLIENT-HEADER-DONE\n'), + (MATCH, '[C1:R1] CLIENT-MSG-DONE\n'), + (MATCH, '[C1:R1] RX-RESP STATUS=200 VMAJOR=1 VMINOR=0 REASON=OK\n'), + (MATCH, '[C1:R1] SERVER-HEADER KEY=header VALUE=foo\n'), + (MATCH, '[C1:R1] SERVER-HEADER KEY=header VALUE=bar\n'), + (MATCH, '[C1:R1] SERVER-HEADER-DONE\n'), + (SERVER_BODY, '[C1:R1]', 7) + # note: connection drop indicates end of message (no output logged) + ] + + log = self._test_runner("test_07", client_stream, server_stream) + self._compare_log(expected, log) + + def test_08_invalid_server_version(self): + """ + Unknown server version + """ + client_stream = b'GET /one HTTP/1.0\r\n' \ + + b'IAmAHeader: Header value\r\n' \ + + b'\r\n' + server_stream = b'HTTP/banana 200 OK\r\n' \ + + b'header: foo\r\n' \ + + b'header: bar\r\n' \ + + b'\r\n' \ + + b'CONTENT' + + expected = [ + (MATCH, '[C1:R1] RX-REQ METHOD=GET TARGET=/one VMAJOR=1 VMINOR=0\n'), + (MATCH, '[C1:R1] CLIENT-HEADER KEY=IAmAHeader VALUE=Header value\n'), + (MATCH, '[C1:R1] CLIENT-HEADER-DONE\n'), + (MATCH, '[C1:R1] CLIENT-MSG-DONE\n'), + (MATCH, '[C1] PROTOCOL-ERROR REASON=Unsupported HTTP/1.x version\n') + ] + + log = self._test_runner("test_08", client_stream, server_stream) + self._compare_log(expected, log) + + def test_09_invalid_request_version(self): + """ + Unknown request version + """ + client_stream = b'GET /one HTTP/0.9\r\n' \ + + b'IAmAHeader: Header value\r\n' \ + + b'\r\n' + server_stream = b'HTTP/1.1 200 OK\r\n' \ + + b'header: foo\r\n' \ + + b'content-length: 0\r\n' \ + + b'header: bar\r\n' \ + + b'\r\n' + + expected = [ + (MATCH, '[C1] PROTOCOL-ERROR REASON=Unsupported HTTP/1.x version\n'), + ] + log = self._test_runner("test_09", client_stream, server_stream) + #self._compare_log(expected, log) + + def test_10_invalid_request_line(self): + """ + Invalid request line (illegal chars) + """ + client_stream = b'GET\x80 /one HTTP/1.0\r\n' \ + + b'IAmAHeader: Header value\r\n' \ + + b'\r\n' + server_stream = b'HTTP/1.0 200 OK\r\n' \ + + b'header: foo\r\n' \ + + b'content-length: 0\r\n' \ + + b'header: bar\r\n' \ + + b'\r\n' + + expected = [ + (MATCH, '[C1] PROTOCOL-ERROR REASON=protocol error: non USASCII data\n'), + ] + log = self._test_runner("test_10", client_stream, server_stream) + self._compare_log(expected, log) + + def test_11_invalid_request_line(self): + """ + Invalid request line (wrong format) + """ + client_stream = b'GET/oneHTTP/1.0\r\n' \ + + b'IAmAHeader: Header value\r\n' \ + + b'\r\n' + server_stream = b'HTTP/1.0 200 OK\r\n' \ + + b'header: foo\r\n' \ + + b'content-length: 0\r\n' \ + + b'header: bar\r\n' \ + + b'\r\n' + + expected = [ + (MATCH, '[C1] PROTOCOL-ERROR REASON=Malformed request line\n'), + ] + log = self._test_runner("test_11", client_stream, server_stream) + self._compare_log(expected, log) + + def test_12_invalid_response_line(self): + """ + Invalid response line (illegal chars) + """ + client_stream = b'GET /one HTTP/1.1\r\n' \ + + b'IAmAHeader: Header value\r\n' \ + + b'content-length: 0\r\n' \ + + b'\r\n' \ + + b'GET /two HTTP/1.1\r\n' \ + + b'IAmAHeader: Header value\r\n' \ + + b'content-length: 0\r\n' \ + + b'\r\n' + server_stream = b'HTTP/1.1 \x00200 OK\r\n' \ + + b'header: foo\r\n' \ + + b'content-length: 0\r\n' \ + + b'header: bar\r\n' \ + + b'\r\n' \ + + b'HTTP/1.1 200 OK\r\n' \ + + b'header: foo\r\n' \ + + b'content-length: 0\r\n' \ + + b'header: bar\r\n' \ + + b'\r\n' + + expected = [ + (MATCH, '[C1:R1] RX-REQ METHOD=GET TARGET=/one VMAJOR=1 VMINOR=1\n'), + (MATCH, '[C1:R1] CLIENT-HEADER KEY=IAmAHeader VALUE=Header value\n'), + (MATCH, '[C1:R1] CLIENT-HEADER KEY=content-length VALUE=0\n'), + (MATCH, '[C1:R1] CLIENT-HEADER-DONE\n'), + (MATCH, '[C1:R1] CLIENT-MSG-DONE\n'), + + (MATCH, '[C1:R2] RX-REQ METHOD=GET TARGET=/two VMAJOR=1 VMINOR=1\n'), + (MATCH, '[C1:R2] CLIENT-HEADER KEY=IAmAHeader VALUE=Header value\n'), + (MATCH, '[C1:R2] CLIENT-HEADER KEY=content-length VALUE=0\n'), + (MATCH, '[C1:R2] CLIENT-HEADER-DONE\n'), + (MATCH, '[C1:R2] CLIENT-MSG-DONE\n'), + + (MATCH, '[C1] PROTOCOL-ERROR REASON=protocol error: non USASCII data\n') + ] + log = self._test_runner("test_12", client_stream, server_stream) + self._compare_log(expected, log) + + def test_13_HEAD_request(self): + """ + No body allowed + """ + client_stream = b'HEAD /one HTTP/1.1\r\n' \ + + b'IAmAHeader: Header value\r\n' \ + + b'content-length: 0\r\n' \ + + b'\r\n' \ + + b'GET /two HTTP/1.1\r\n' \ + + b'IAmAHeader: Header value\r\n' \ + + b'content-length: 0\r\n' \ + + b'\r\n' + server_stream = b'HTTP/1.1 200 OK\r\n' \ + + b'header: foo\r\n' \ + + b'content-length: 900\r\n' \ + + b'header: bar\r\n' \ + + b'\r\n' \ + + b'HTTP/1.1 200 OK\r\n' \ + + b'header: foo\r\n' \ + + b'content-length: 4\r\n' \ + + b'header: bar\r\n' \ + + b'\r\n' \ + + b'1234' + + expected = [ + (MATCH, '[C1:R1] RX-REQ METHOD=HEAD TARGET=/one VMAJOR=1 VMINOR=1\n'), + (MATCH, '[C1:R1] CLIENT-HEADER KEY=IAmAHeader VALUE=Header value\n'), + (MATCH, '[C1:R1] CLIENT-HEADER KEY=content-length VALUE=0\n'), + (MATCH, '[C1:R1] CLIENT-HEADER-DONE\n'), + (MATCH, '[C1:R1] CLIENT-MSG-DONE\n'), + + (MATCH, '[C1:R2] RX-REQ METHOD=GET TARGET=/two VMAJOR=1 VMINOR=1\n'), + (MATCH, '[C1:R2] CLIENT-HEADER KEY=IAmAHeader VALUE=Header value\n'), + (MATCH, '[C1:R2] CLIENT-HEADER KEY=content-length VALUE=0\n'), + (MATCH, '[C1:R2] CLIENT-HEADER-DONE\n'), + (MATCH, '[C1:R2] CLIENT-MSG-DONE\n'), + + (MATCH, '[C1:R1] RX-RESP STATUS=200 VMAJOR=1 VMINOR=1 REASON=OK\n'), + (MATCH, '[C1:R1] SERVER-HEADER KEY=header VALUE=foo\n'), + (MATCH, '[C1:R1] SERVER-HEADER KEY=content-length VALUE=900\n'), + (MATCH, '[C1:R1] SERVER-HEADER KEY=header VALUE=bar\n'), + (MATCH, '[C1:R1] SERVER-HEADER-DONE\n'), + (MATCH, '[C1:R1] SERVER-MSG-DONE\n'), + (MATCH, '[C1:R1] TRANSACTION-COMPLETE\n'), + + (MATCH, '[C1:R2] RX-RESP STATUS=200 VMAJOR=1 VMINOR=1 REASON=OK\n'), + (MATCH, '[C1:R2] SERVER-HEADER KEY=header VALUE=foo\n'), + (MATCH, '[C1:R2] SERVER-HEADER KEY=content-length VALUE=4\n'), + (MATCH, '[C1:R2] SERVER-HEADER KEY=header VALUE=bar\n'), + (MATCH, '[C1:R2] SERVER-HEADER-DONE\n'), + (SERVER_BODY, '[C1:R2]', 4), + (MATCH, '[C1:R2] SERVER-MSG-DONE\n'), + (MATCH, '[C1:R2] TRANSACTION-COMPLETE\n') + ] + log = self._test_runner("test_13", client_stream, server_stream) + self._compare_log(expected, log) + + def test_14_continue_response(self): + """ + Handle continue response + """ + client_stream = b'GET /one HTTP/1.1\r\n' \ + + b'IAmAHeader: Header value\r\n' \ + + b'content-length: 0\r\n' \ + + b'\r\n' \ + + b'GET /two HTTP/1.1\r\n' \ + + b'IAmAHeader: Header value\r\n' \ + + b'content-length: 0\r\n' \ + + b'\r\n' + server_stream = b'HTTP/1.1 100 Continue\r\n' \ + + b'header: foo\r\n' \ + + b'content-length: 4\r\n' \ + + b'header: bar\r\n' \ + + b'\r\n' \ + + b'HTTP/1.1 200 OK\r\n' \ + + b'header: foo\r\n' \ + + b'content-length: 4\r\n' \ + + b'header: bar\r\n' \ + + b'\r\n' \ + + b'4321' \ + + b'HTTP/1.1 200 OK\r\n' \ + + b'header: foo\r\n' \ + + b'content-length: 4\r\n' \ + + b'header: bar\r\n' \ + + b'\r\n' \ + + b'1234' + + expected = [ + (MATCH, '[C1:R1] RX-REQ METHOD=GET TARGET=/one VMAJOR=1 VMINOR=1\n'), + (MATCH, '[C1:R1] CLIENT-HEADER KEY=IAmAHeader VALUE=Header value\n'), + (MATCH, '[C1:R1] CLIENT-HEADER KEY=content-length VALUE=0\n'), + (MATCH, '[C1:R1] CLIENT-HEADER-DONE\n'), + (MATCH, '[C1:R1] CLIENT-MSG-DONE\n'), + + (MATCH, '[C1:R2] RX-REQ METHOD=GET TARGET=/two VMAJOR=1 VMINOR=1\n'), + (MATCH, '[C1:R2] CLIENT-HEADER KEY=IAmAHeader VALUE=Header value\n'), + (MATCH, '[C1:R2] CLIENT-HEADER KEY=content-length VALUE=0\n'), + (MATCH, '[C1:R2] CLIENT-HEADER-DONE\n'), + (MATCH, '[C1:R2] CLIENT-MSG-DONE\n'), + + (MATCH, '[C1:R1] RX-RESP STATUS=100 VMAJOR=1 VMINOR=1 REASON=Continue\n'), + (MATCH, '[C1:R1] SERVER-HEADER KEY=header VALUE=foo\n'), + (MATCH, '[C1:R1] SERVER-HEADER KEY=content-length VALUE=4\n'), + (MATCH, '[C1:R1] SERVER-HEADER KEY=header VALUE=bar\n'), + (MATCH, '[C1:R1] SERVER-HEADER-DONE\n'), + (MATCH, '[C1:R1] SERVER-MSG-DONE\n'), + + (MATCH, '[C1:R1] RX-RESP STATUS=200 VMAJOR=1 VMINOR=1 REASON=OK\n'), + (MATCH, '[C1:R1] SERVER-HEADER KEY=header VALUE=foo\n'), + (MATCH, '[C1:R1] SERVER-HEADER KEY=content-length VALUE=4\n'), + (MATCH, '[C1:R1] SERVER-HEADER KEY=header VALUE=bar\n'), + (MATCH, '[C1:R1] SERVER-HEADER-DONE\n'), + (SERVER_BODY, '[C1:R1]', 4), + (MATCH, '[C1:R1] SERVER-MSG-DONE\n'), + (MATCH, '[C1:R1] TRANSACTION-COMPLETE\n'), + + (MATCH, '[C1:R2] RX-RESP STATUS=200 VMAJOR=1 VMINOR=1 REASON=OK\n'), + (MATCH, '[C1:R2] SERVER-HEADER KEY=header VALUE=foo\n'), + (MATCH, '[C1:R2] SERVER-HEADER KEY=content-length VALUE=4\n'), + (MATCH, '[C1:R2] SERVER-HEADER KEY=header VALUE=bar\n'), + (MATCH, '[C1:R2] SERVER-HEADER-DONE\n'), + (SERVER_BODY, '[C1:R2]', 4), + (MATCH, '[C1:R2] SERVER-MSG-DONE\n'), + (MATCH, '[C1:R2] TRANSACTION-COMPLETE\n') + ] + log = self._test_runner("test_14", client_stream, server_stream) + self._compare_log(expected, log) + + def test_15_no_content_response(self): + """ + Handle No Content response + """ + client_stream = b'GET /one HTTP/1.1\r\n' \ + + b'IAmAHeader: Header value\r\n' \ + + b'content-length: 0\r\n' \ + + b'\r\n' \ + + b'GET /two HTTP/1.1\r\n' \ + + b'IAmAHeader: Header value\r\n' \ + + b'content-length: 0\r\n' \ + + b'\r\n' + server_stream = b'HTTP/1.1 204 No Content\r\n' \ + + b'header: foo\r\n' \ + + b'content-length: 4\r\n' \ + + b'header: bar\r\n' \ + + b'\r\n' \ + + b'HTTP/1.1 200 OK\r\n' \ + + b'header: foo\r\n' \ + + b'content-length: 4\r\n' \ + + b'header: bar\r\n' \ + + b'\r\n' \ + + b'1234' + + expected = [ + (MATCH, '[C1:R1] RX-REQ METHOD=GET TARGET=/one VMAJOR=1 VMINOR=1\n'), + (MATCH, '[C1:R1] CLIENT-HEADER KEY=IAmAHeader VALUE=Header value\n'), + (MATCH, '[C1:R1] CLIENT-HEADER KEY=content-length VALUE=0\n'), + (MATCH, '[C1:R1] CLIENT-HEADER-DONE\n'), + (MATCH, '[C1:R1] CLIENT-MSG-DONE\n'), + + (MATCH, '[C1:R2] RX-REQ METHOD=GET TARGET=/two VMAJOR=1 VMINOR=1\n'), + (MATCH, '[C1:R2] CLIENT-HEADER KEY=IAmAHeader VALUE=Header value\n'), + (MATCH, '[C1:R2] CLIENT-HEADER KEY=content-length VALUE=0\n'), + (MATCH, '[C1:R2] CLIENT-HEADER-DONE\n'), + (MATCH, '[C1:R2] CLIENT-MSG-DONE\n'), + + (MATCH, '[C1:R1] RX-RESP STATUS=204 VMAJOR=1 VMINOR=1 REASON=No Content\n'), + (MATCH, '[C1:R1] SERVER-HEADER KEY=header VALUE=foo\n'), + (MATCH, '[C1:R1] SERVER-HEADER KEY=content-length VALUE=4\n'), + (MATCH, '[C1:R1] SERVER-HEADER KEY=header VALUE=bar\n'), + (MATCH, '[C1:R1] SERVER-HEADER-DONE\n'), + (MATCH, '[C1:R1] SERVER-MSG-DONE\n'), + (MATCH, '[C1:R1] TRANSACTION-COMPLETE\n'), + + (MATCH, '[C1:R2] RX-RESP STATUS=200 VMAJOR=1 VMINOR=1 REASON=OK\n'), + (MATCH, '[C1:R2] SERVER-HEADER KEY=header VALUE=foo\n'), + (MATCH, '[C1:R2] SERVER-HEADER KEY=content-length VALUE=4\n'), + (MATCH, '[C1:R2] SERVER-HEADER KEY=header VALUE=bar\n'), + (MATCH, '[C1:R2] SERVER-HEADER-DONE\n'), + (SERVER_BODY, '[C1:R2]', 4), + (MATCH, '[C1:R2] SERVER-MSG-DONE\n'), + (MATCH, '[C1:R2] TRANSACTION-COMPLETE\n') + ] + log = self._test_runner("test_15", client_stream, server_stream) + self._compare_log(expected, log) + + +if __name__ == '__main__': + unittest.main(main_module())