From c0131fe452f3442a16d5aafe80fbfda377f8f41c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Mon, 26 Jun 2017 22:49:35 +0200 Subject: [PATCH] Assemble frames in OutgoingWebSocketMessage in a single buffer. Fixes #1791. --- http/vibe/http/websockets.d | 110 +++++++++++++++++++++--------------- 1 file changed, 66 insertions(+), 44 deletions(-) diff --git a/http/vibe/http/websockets.d b/http/vibe/http/websockets.d index 0e69fa16d8..6e81ade792 100644 --- a/http/vibe/http/websockets.d +++ b/http/vibe/http/websockets.d @@ -648,7 +648,6 @@ final class WebSocket { */ final class OutgoingWebSocketMessage : OutputStream { @safe: - private { RandomNumberStream m_rng; Stream m_conn; @@ -657,7 +656,7 @@ final class OutgoingWebSocketMessage : OutputStream { bool m_finalized = false; } - this( Stream conn, FrameOpcode frameOpcode, RandomNumberStream rng ) + this(Stream conn, FrameOpcode frameOpcode, RandomNumberStream rng) { assert(conn !is null); m_conn = conn; @@ -668,6 +667,12 @@ final class OutgoingWebSocketMessage : OutputStream { size_t write(in ubyte[] bytes, IOMode mode) { assert(!m_finalized); + + if (!m_buffer.data.length) { + ubyte[Frame.maxHeaderSize] header_padding; + m_buffer.put(header_padding[]); + } + m_buffer.put(bytes); return bytes.length; } @@ -675,27 +680,34 @@ final class OutgoingWebSocketMessage : OutputStream { void flush() { assert(!m_finalized); - Frame frame; - frame.opcode = m_frameOpcode; - frame.fin = false; - frame.payload = m_buffer.data; - frame.writeFrame(m_conn, m_rng); - m_buffer.clear(); - m_conn.flush(); + if (m_buffer.data.length > 0) + sendFrame(false); } void finalize() { if (m_finalized) return; m_finalized = true; + sendFrame(true); + } + + private void sendFrame(bool fin) + { + if (!m_buffer.data.length) + write(null, IOMode.once); + + assert(m_buffer.data.length >= Frame.maxHeaderSize); Frame frame; - frame.fin = true; + frame.fin = fin; frame.opcode = m_frameOpcode; - frame.payload = m_buffer.data; - frame.writeFrame(m_conn, m_rng); - m_buffer.clear(); + frame.payload = m_buffer.data[Frame.maxHeaderSize .. $]; + auto hsize = frame.getHeaderSize(m_rng !is null); + auto msg = m_buffer.data[Frame.maxHeaderSize-hsize .. $]; + frame.writeHeader(msg[0 .. hsize], m_rng); + m_conn.write(msg); m_conn.flush(); + m_buffer.clear(); } alias write = OutputStream.write; @@ -707,7 +719,6 @@ final class OutgoingWebSocketMessage : OutputStream { */ final class IncomingWebSocketMessage : InputStream { @safe: - private { RandomNumberStream m_rng; Stream m_conn; @@ -797,7 +808,7 @@ private immutable s_webSocketGuid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; * Currently only 6 values are defined, however the opcode is defined as * taking 4 bytes. */ -enum FrameOpcode : uint { +private enum FrameOpcode : uint { continuation = 0x0, text = 0x1, binary = 0x2, @@ -807,51 +818,62 @@ enum FrameOpcode : uint { } -struct Frame { +private struct Frame { @safe: + enum maxHeaderSize = 14; bool fin; FrameOpcode opcode; ubyte[] payload; - void writeFrame(OutputStream stream, RandomNumberStream sys_rng) + size_t getHeaderSize(bool mask) { - import vibe.stream.wrapper; - - auto rng = StreamOutputRange(stream); + size_t ret = 1; + if (payload.length < 126) ret += 1; + else if (payload.length <= 65536) ret += 3; + else ret += 9; + if (mask) ret += 4; + return ret; + } + void writeHeader(ubyte[] dst, RandomNumberStream sys_rng) + { ubyte[4] buff; ubyte firstByte = cast(ubyte)opcode; if (fin) firstByte |= 0x80; - rng.put(firstByte); - - auto b1 = 0; - if (sys_rng) { - b1 = 0x80; - } - - if( payload.length < 126 ) { - rng.put(std.bitmanip.nativeToBigEndian(cast(ubyte)(b1 | payload.length))); - } else if( payload.length <= 65536 ) { - buff[0] = cast(ubyte) (b1 | 126); - rng.put(buff[0 .. 1]); - rng.put(std.bitmanip.nativeToBigEndian(cast(ushort)payload.length)); + dst[0] = firstByte; + dst = dst[1 .. $]; + + auto b1 = sys_rng ? 0x80 : 0x00; + + if (payload.length < 126) { + dst[0] = cast(ubyte)(b1 | payload.length); + dst = dst[1 .. $]; + } else if (payload.length <= 65536) { + dst[0] = cast(ubyte) (b1 | 126); + dst[1 .. 3] = std.bitmanip.nativeToBigEndian(cast(ushort)payload.length); + dst = dst[3 .. $]; } else { - buff[0] = cast(ubyte) (b1 | 127); - rng.put(buff[0 .. 1]); - rng.put(std.bitmanip.nativeToBigEndian(payload.length)); + dst[0] = cast(ubyte) (b1 | 127); + dst[1 .. 9] = std.bitmanip.nativeToBigEndian(cast(ulong)payload.length); + dst = dst[9 .. $]; } if (sys_rng) { - sys_rng.read(buff); - rng.put(buff); - for (size_t i = 0; i < payload.length; i++) { - payload[i] ^= buff[i % 4]; - } - rng.put(payload); - }else { - rng.put(payload); + sys_rng.read(dst[0 .. 4]); + for (size_t i = 0; i < payload.length; i++) + payload[i] ^= dst[i % 4]; } + } + + void writeFrame(OutputStream stream, RandomNumberStream sys_rng) + { + import vibe.stream.wrapper; + + auto rng = StreamOutputRange(stream); + ubyte[maxHeaderSize] hdr; + writeHeader(hdr[], sys_rng); + rng.put(hdr[0 .. getHeaderSize(sys_rng !is null)]); rng.flush(); stream.flush(); }