diff --git a/http/vibe/http/websockets.d b/http/vibe/http/websockets.d index 5007cb4e79..64632224b4 100644 --- a/http/vibe/http/websockets.d +++ b/http/vibe/http/websockets.d @@ -1,7 +1,7 @@ /** Implements WebSocket support and fallbacks for older browsers. - Standards: $(LINK2 https://tools.ietf.org/html/rfc6455, RFC6455) + Standards: $(LINK2 https://tools.ietf.org/html/rfc6455, RFC6455) Copyright: © 2012-2014 RejectedSoftware e.K. License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. Authors: Jan Krüger @@ -62,7 +62,7 @@ alias WebSocketHandshakeDelegate = void delegate(scope WebSocket); /// Exception thrown by $(D vibe.http.websockets). class WebSocketException: Exception { -@safe: + @safe pure nothrow: /// this(string msg, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @@ -337,7 +337,9 @@ final class WebSocket { bool m_pongSkipped; short m_closeCode; const(char)[] m_closeReason; - RandomNumberStream m_rng; + /// The entropy generator to use + /// If not null, it means this is a server socket. + RandomNumberStream m_rng; } /** @@ -450,7 +452,9 @@ final class WebSocket { */ void send(scope const(char)[] data) { - send((scope message){ message.write(cast(const ubyte[])data); }); + send( + (scope message) { message.write(cast(const ubyte[])data); }, + FrameOpcode.text); } /** @@ -462,7 +466,7 @@ final class WebSocket { A `WebSocketException` is thrown if the connection gets closed before or during the transfer of the message. */ - void send(ubyte[] data) + void send(in ubyte[] data) { send((scope message){ message.write(data); }, FrameOpcode.binary); } @@ -474,7 +478,7 @@ final class WebSocket { A `WebSocketException` is thrown if the connection gets closed before or during the transfer of the message. */ - void send(scope void delegate(scope OutgoingWebSocketMessage) @safe sender, FrameOpcode frameOpcode = FrameOpcode.text) + void send(scope void delegate(scope OutgoingWebSocketMessage) @safe sender, FrameOpcode frameOpcode) { m_writeMutex.performLocked!({ enforceEx!WebSocketException(!m_sentCloseFrame, "WebSocket connection already actively closed."); @@ -484,6 +488,13 @@ final class WebSocket { }); } + /// Compatibility overload - will be removed soon. + deprecated("Call the overload which requires an explicit FrameOpcode.") + void send(scope void delegate(scope OutgoingWebSocketMessage) @safe sender) + { + send(sender, FrameOpcode.text); + } + /** Actively closes the connection. @@ -649,7 +660,7 @@ final class WebSocket { final class OutgoingWebSocketMessage : OutputStream { @safe: private { - RandomNumberStream m_rng; + RandomNumberStream m_rng; Stream m_conn; FrameOpcode m_frameOpcode; Appender!(ubyte[]) m_buffer; @@ -661,7 +672,7 @@ final class OutgoingWebSocketMessage : OutputStream { assert(conn !is null); m_conn = conn; m_frameOpcode = frameOpcode; - m_rng = rng; + m_rng = rng; } size_t write(in ubyte[] bytes, IOMode mode) @@ -720,7 +731,7 @@ final class OutgoingWebSocketMessage : OutputStream { final class IncomingWebSocketMessage : InputStream { @safe: private { - RandomNumberStream m_rng; + RandomNumberStream m_rng; Stream m_conn; Frame m_currentFrame; } @@ -729,8 +740,8 @@ final class IncomingWebSocketMessage : InputStream { { assert(conn !is null); m_conn = conn; - m_rng = rng; - readFrame(); + m_rng = rng; + readFrame(); } @property bool empty() const { return m_currentFrame.payload.length == 0; } @@ -744,6 +755,27 @@ final class IncomingWebSocketMessage : InputStream { const(ubyte)[] peek() { return m_currentFrame.payload; } + /** + * Retrieve the next websocket frame of the stream and discard the current + * one + * + * This function is helpful if one wish to process frames by frames, + * or minimize memory allocation, as `peek` will only return the current + * frame data, and read requires a pre-allocated buffer. + * + * Returns: + * `false` if the current frame is the final one, `true` if a new frame + * was read. + */ + bool skipFrame() + { + if (m_currentFrame.fin) + return false; + + m_currentFrame = Frame.readFrame(m_conn); + return true; + } + size_t read(scope ubyte[] dst, IOMode mode) { size_t nread = 0; @@ -759,10 +791,10 @@ final class IncomingWebSocketMessage : InputStream { m_currentFrame.payload = m_currentFrame.payload[sz .. $]; nread += sz; - if (leastSize == 0 && !m_currentFrame.fin) { + if (leastSize == 0) { if (mode == IOMode.immediate || mode == IOMode.once && nread > 0) break; - m_currentFrame = Frame.readFrame(m_conn); + this.skipFrame(); } } @@ -798,17 +830,18 @@ final class IncomingWebSocketMessage : InputStream { } } +/// Magic string defined by the RFC for challenging the server during upgrade +private static immutable s_webSocketGuid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; -private immutable s_webSocketGuid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; /** - * The Opcode is 4 bytes, as defined in Section 5.2 + * The Opcode is 4 bits, as defined in Section 5.2 * * Values are defined in section 11.8 * Currently only 6 values are defined, however the opcode is defined as - * taking 4 bytes. + * taking 4 bits. */ -private enum FrameOpcode : uint { +private enum FrameOpcode : ubyte { continuation = 0x0, text = 0x1, binary = 0x2, @@ -816,6 +849,7 @@ private enum FrameOpcode : uint { ping = 0x9, pong = 0xA } +static assert(FrameOpcode.max < 0b1111, "FrameOpcode is only 4 bits"); private struct Frame { @@ -826,6 +860,32 @@ private struct Frame { FrameOpcode opcode; ubyte[] payload; + /** + * Return the header length encoded with the expected amount of bits + * + * The WebSocket RFC define a variable-length payload length. + * In short, it means that: + * - If the length is <= 125, it is stored as the 7 least significant + * bits of the second header byte. The first bit is reserved for MASK. + * - If the length is <= 65_536 (so it fits in 2 bytes), a magic value of + * 126 is stored in the aforementioned 7 bits, and the actual length + * is stored in the next two bytes, resulting in a 4 bytes header + * ( + masking key, if any). + * - If the length is > 65_536, a magic value of 127 will be used for + * the 7-bit field, and the next 8 bytes are expected to be the length, + * resulting in a 10 bytes header ( + masking key, if any). + * + * Those functions encapsulate all this logic and allow to just get the + * length with the desired size. + * + * Return: + * - For `ubyte`, the value to store in the 7 bits field, either the + * length or a magic value (126 or 127). + * - For `ushort`, a value in the range [126; 65_536]. + * If payload.length is not in this bound, an assertion will be triggered. + * - For `ulong`, a value in the range [65_537; size_t.max]. + * If payload.length is not in this bound, an assertion will be triggered. + */ size_t getHeaderSize(bool mask) { size_t ret = 1; @@ -838,7 +898,7 @@ private struct Frame { void writeHeader(ubyte[] dst, RandomNumberStream sys_rng) { - ubyte[4] buff; + ubyte[4] buff; ubyte firstByte = cast(ubyte)opcode; if (fin) firstByte |= 0x80; dst[0] = firstByte; @@ -881,38 +941,51 @@ private struct Frame { static Frame readFrame(InputStream stream) { Frame frame; - ubyte[2] data2; - ubyte[8] data8; - stream.read(data2); - //enforceEx!WebSocketException( (data[0] & 0x70) != 0, "reserved bits must be unset" ); - frame.fin = (data2[0] & 0x80) == 0x80; - bool masked = (data2[1] & 0x80) == 0x80; - frame.opcode = cast(FrameOpcode)(data2[0] & 0xf); - - logDebug("Read frame: %s %s", frame.opcode, frame.fin); - //parsing length - ulong length = data2[1] & 0x7f; - if( length == 126 ) { - stream.read(data2); - length = bigEndianToNative!ushort(data2); - } else if( length == 127 ) { - stream.read(data8); - length = bigEndianToNative!ulong(data8); - } + ubyte[8] data; - //masking key - ubyte[4] maskingKey; - if( masked ) stream.read(maskingKey); + stream.read(data[0 .. 2]); + frame.fin = (data[0] & 0x80) != 0; + frame.opcode = cast(FrameOpcode)(data[0] & 0x0F); - //payload + bool masked = !!(data[1] & 0b1000_0000); + + //parsing length + ulong length = data[1] & 0b0111_1111; + if (length == 126) { + stream.read(data[0 .. 2]); + length = bigEndianToNative!ushort(data[0 .. 2]); + } else if (length == 127) { + stream.read(data); + length = bigEndianToNative!ulong(data); + + // RFC 6455, 5.2, 'Payload length': If 127, the following 8 bytes + // interpreted as a 64-bit unsigned integer (the most significant + // bit MUST be 0) + enforceEx!WebSocketException(!(length >> 63), + "Received length has a non-zero most significant bit"); + + } + logDebug("Read frame: %s %s %s length=%d", + frame.opcode, + frame.fin ? "final frame" : "continuation", + masked ? "masked" : "not masked", + length); + + // Masking key is 32 bits / uint + if (masked) + stream.read(data[0 .. 4]); + + // Read payload + // TODO: Provide a way to limit the size read, easy + // DOS for server code here (rejectedsoftware/vibe.d#1496). enforceEx!WebSocketException(length <= size_t.max); - frame.payload = new ubyte[cast(size_t)length]; + frame.payload = new ubyte[](cast(size_t)length); stream.read(frame.payload); //de-masking - for( size_t i = 0; i < length; ++i ) { - frame.payload[i] = frame.payload[i] ^ maskingKey[i % 4]; - } + if (masked) + foreach (size_t i; 0 .. cast(size_t)length) + frame.payload[i] = frame.payload[i] ^ data[i % 4]; return frame; } diff --git a/tests/not-runnable/vibe.http.websocket-autobahn-client/.gitignore b/tests/not-runnable/vibe.http.websocket-autobahn-client/.gitignore new file mode 100644 index 0000000000..103a905478 --- /dev/null +++ b/tests/not-runnable/vibe.http.websocket-autobahn-client/.gitignore @@ -0,0 +1,6 @@ +websockets-autobahn-client +.dub +docs.json +__dummy.html +*.o +*.obj diff --git a/tests/not-runnable/vibe.http.websocket-autobahn-client/dub.json b/tests/not-runnable/vibe.http.websocket-autobahn-client/dub.json new file mode 100644 index 0000000000..b383a157b7 --- /dev/null +++ b/tests/not-runnable/vibe.http.websocket-autobahn-client/dub.json @@ -0,0 +1,9 @@ +{ + "name": "websockets-autobahn-client", + "dependencies": { + "vibe-d": { "path": "../../" } + }, + "versions": [ + "VibeDefaultMain" + ] +} diff --git a/tests/not-runnable/vibe.http.websocket-autobahn-client/source/app.d b/tests/not-runnable/vibe.http.websocket-autobahn-client/source/app.d new file mode 100644 index 0000000000..5966167fc1 --- /dev/null +++ b/tests/not-runnable/vibe.http.websocket-autobahn-client/source/app.d @@ -0,0 +1,41 @@ +import vibe.d; + +shared static this () +{ + runTask(() => runTestSuite()); +} + +void runTestSuite () +{ + auto count = getCaseCount(); + logInfo("We're going to run %d test cases...", count); + + foreach (currCase; 1 .. count) + { + auto url = URL("ws://127.0.0.1:9001/runCase?agent=vibe.d&case=" + ~ to!string(currCase)); + logInfo("Running test case %d/%d", currCase, count); + connectWebSocket( + url, (scope ws) { + while (ws.waitForData) { + ws.receive((scope message) { + ws.send(message.readAll); + }); + } + }); + } +} + + +size_t getCaseCount (string base_addr = "ws://127.0.0.1:9001") +{ + size_t ret; + auto url = URL(base_addr ~ "/getCaseCount"); + connectWebSocket( + url, (scope ws) { + while (ws.waitForData) { + ret = ws.receiveText.to!size_t; + } + }); + return ret; +} diff --git a/travis-ci.sh b/travis-ci.sh index 1ef684907f..04e5f66f66 100755 --- a/travis-ci.sh +++ b/travis-ci.sh @@ -44,7 +44,9 @@ if [ ${BUILD_EXAMPLE=1} -eq 1 ]; then fi if [ ${RUN_TEST=1} -eq 1 ]; then for ex in `\ls -1 tests/`; do - echo "[INFO] Running test $ex" - (cd tests/$ex && dub --compiler=$DC --override-config=vibe-d:core/$VIBED_DRIVER $DUB_ARGS && dub clean) + if [ -r test/$ex/dub.json ] || [ -r test/$ex/dub.sdl ]; then + echo "[INFO] Running test $ex" + (cd tests/$ex && dub --compiler=$DC --override-config=vibe-d:core/$VIBED_DRIVER $DUB_ARGS && dub clean) + fi done fi