Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve web sockets and add autobahn test suite client #1836

Merged
merged 13 commits into from
Jul 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 117 additions & 44 deletions http/vibe/http/websockets.d
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/**
Implements WebSocket support and fallbacks for older browsers.

Standards: $(LINK2 https://tools.ietf.org/html/rfc6455, RFC6455)
Standards: $(LINK2 https://tools.ietf.org/html/rfc6455, RFC6455)
Copyright: © 2012-2014 RejectedSoftware e.K.
License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
Authors: Jan Krüger
Expand Down Expand Up @@ -62,7 +62,7 @@ alias WebSocketHandshakeDelegate = void delegate(scope WebSocket);
/// Exception thrown by $(D vibe.http.websockets).
class WebSocketException: Exception
{
@safe:
@safe pure nothrow:

///
this(string msg, string file = __FILE__, size_t line = __LINE__, Throwable next = null)
Expand Down Expand Up @@ -337,7 +337,9 @@ final class WebSocket {
bool m_pongSkipped;
short m_closeCode;
const(char)[] m_closeReason;
RandomNumberStream m_rng;
/// The entropy generator to use
/// If not null, it means this is a server socket.
RandomNumberStream m_rng;
}

/**
Expand Down Expand Up @@ -450,7 +452,9 @@ final class WebSocket {
*/
void send(scope const(char)[] data)
{
send((scope message){ message.write(cast(const ubyte[])data); });
send(
(scope message) { message.write(cast(const ubyte[])data); },
FrameOpcode.text);
}

/**
Expand All @@ -462,7 +466,7 @@ final class WebSocket {
A `WebSocketException` is thrown if the connection gets closed
before or during the transfer of the message.
*/
void send(ubyte[] data)
void send(in ubyte[] data)
{
send((scope message){ message.write(data); }, FrameOpcode.binary);
}
Expand All @@ -474,7 +478,7 @@ final class WebSocket {
A `WebSocketException` is thrown if the connection gets closed
before or during the transfer of the message.
*/
void send(scope void delegate(scope OutgoingWebSocketMessage) @safe sender, FrameOpcode frameOpcode = FrameOpcode.text)
void send(scope void delegate(scope OutgoingWebSocketMessage) @safe sender, FrameOpcode frameOpcode)
{
m_writeMutex.performLocked!({
enforceEx!WebSocketException(!m_sentCloseFrame, "WebSocket connection already actively closed.");
Expand All @@ -484,6 +488,13 @@ final class WebSocket {
});
}

/// Compatibility overload - will be removed soon.
deprecated("Call the overload which requires an explicit FrameOpcode.")
void send(scope void delegate(scope OutgoingWebSocketMessage) @safe sender)
{
send(sender, FrameOpcode.text);
}

/**
Actively closes the connection.

Expand Down Expand Up @@ -649,7 +660,7 @@ final class WebSocket {
final class OutgoingWebSocketMessage : OutputStream {
@safe:
private {
RandomNumberStream m_rng;
RandomNumberStream m_rng;
Stream m_conn;
FrameOpcode m_frameOpcode;
Appender!(ubyte[]) m_buffer;
Expand All @@ -661,7 +672,7 @@ final class OutgoingWebSocketMessage : OutputStream {
assert(conn !is null);
m_conn = conn;
m_frameOpcode = frameOpcode;
m_rng = rng;
m_rng = rng;
}

size_t write(in ubyte[] bytes, IOMode mode)
Expand Down Expand Up @@ -720,7 +731,7 @@ final class OutgoingWebSocketMessage : OutputStream {
final class IncomingWebSocketMessage : InputStream {
@safe:
private {
RandomNumberStream m_rng;
RandomNumberStream m_rng;
Stream m_conn;
Frame m_currentFrame;
}
Expand All @@ -729,8 +740,8 @@ final class IncomingWebSocketMessage : InputStream {
{
assert(conn !is null);
m_conn = conn;
m_rng = rng;
readFrame();
m_rng = rng;
readFrame();
}

@property bool empty() const { return m_currentFrame.payload.length == 0; }
Expand All @@ -744,6 +755,27 @@ final class IncomingWebSocketMessage : InputStream {

const(ubyte)[] peek() { return m_currentFrame.payload; }

/**
* Retrieve the next websocket frame of the stream and discard the current
* one
*
* This function is helpful if one wish to process frames by frames,
* or minimize memory allocation, as `peek` will only return the current
* frame data, and read requires a pre-allocated buffer.
*
* Returns:
* `false` if the current frame is the final one, `true` if a new frame
* was read.
*/
bool skipFrame()
{
if (m_currentFrame.fin)
return false;

m_currentFrame = Frame.readFrame(m_conn);
return true;
}

size_t read(scope ubyte[] dst, IOMode mode)
{
size_t nread = 0;
Expand All @@ -759,10 +791,10 @@ final class IncomingWebSocketMessage : InputStream {
m_currentFrame.payload = m_currentFrame.payload[sz .. $];
nread += sz;

if (leastSize == 0 && !m_currentFrame.fin) {
if (leastSize == 0) {
if (mode == IOMode.immediate || mode == IOMode.once && nread > 0)
break;
m_currentFrame = Frame.readFrame(m_conn);
this.skipFrame();
}
}

Expand Down Expand Up @@ -798,24 +830,26 @@ final class IncomingWebSocketMessage : InputStream {
}
}

/// Magic string defined by the RFC for challenging the server during upgrade
private static immutable s_webSocketGuid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";

private immutable s_webSocketGuid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";

/**
* The Opcode is 4 bytes, as defined in Section 5.2
* The Opcode is 4 bits, as defined in Section 5.2
*
* Values are defined in section 11.8
* Currently only 6 values are defined, however the opcode is defined as
* taking 4 bytes.
* taking 4 bits.
*/
private enum FrameOpcode : uint {
private enum FrameOpcode : ubyte {
continuation = 0x0,
text = 0x1,
binary = 0x2,
close = 0x8,
ping = 0x9,
pong = 0xA
}
static assert(FrameOpcode.max < 0b1111, "FrameOpcode is only 4 bits");


private struct Frame {
Expand All @@ -826,6 +860,32 @@ private struct Frame {
FrameOpcode opcode;
ubyte[] payload;

/**
* Return the header length encoded with the expected amount of bits
*
* The WebSocket RFC define a variable-length payload length.
* In short, it means that:
* - If the length is <= 125, it is stored as the 7 least significant
* bits of the second header byte. The first bit is reserved for MASK.
* - If the length is <= 65_536 (so it fits in 2 bytes), a magic value of
* 126 is stored in the aforementioned 7 bits, and the actual length
* is stored in the next two bytes, resulting in a 4 bytes header
* ( + masking key, if any).
* - If the length is > 65_536, a magic value of 127 will be used for
* the 7-bit field, and the next 8 bytes are expected to be the length,
* resulting in a 10 bytes header ( + masking key, if any).
*
* Those functions encapsulate all this logic and allow to just get the
* length with the desired size.
*
* Return:
* - For `ubyte`, the value to store in the 7 bits field, either the
* length or a magic value (126 or 127).
* - For `ushort`, a value in the range [126; 65_536].
* If payload.length is not in this bound, an assertion will be triggered.
* - For `ulong`, a value in the range [65_537; size_t.max].
* If payload.length is not in this bound, an assertion will be triggered.
*/
size_t getHeaderSize(bool mask)
{
size_t ret = 1;
Expand All @@ -838,7 +898,7 @@ private struct Frame {

void writeHeader(ubyte[] dst, RandomNumberStream sys_rng)
{
ubyte[4] buff;
ubyte[4] buff;
ubyte firstByte = cast(ubyte)opcode;
if (fin) firstByte |= 0x80;
dst[0] = firstByte;
Expand Down Expand Up @@ -881,38 +941,51 @@ private struct Frame {
static Frame readFrame(InputStream stream)
{
Frame frame;
ubyte[2] data2;
ubyte[8] data8;
stream.read(data2);
//enforceEx!WebSocketException( (data[0] & 0x70) != 0, "reserved bits must be unset" );
frame.fin = (data2[0] & 0x80) == 0x80;
bool masked = (data2[1] & 0x80) == 0x80;
frame.opcode = cast(FrameOpcode)(data2[0] & 0xf);

logDebug("Read frame: %s %s", frame.opcode, frame.fin);
//parsing length
ulong length = data2[1] & 0x7f;
if( length == 126 ) {
stream.read(data2);
length = bigEndianToNative!ushort(data2);
} else if( length == 127 ) {
stream.read(data8);
length = bigEndianToNative!ulong(data8);
}
ubyte[8] data;

//masking key
ubyte[4] maskingKey;
if( masked ) stream.read(maskingKey);
stream.read(data[0 .. 2]);
frame.fin = (data[0] & 0x80) != 0;
frame.opcode = cast(FrameOpcode)(data[0] & 0x0F);

//payload
bool masked = !!(data[1] & 0b1000_0000);

//parsing length
ulong length = data[1] & 0b0111_1111;
if (length == 126) {
stream.read(data[0 .. 2]);
length = bigEndianToNative!ushort(data[0 .. 2]);
} else if (length == 127) {
stream.read(data);
length = bigEndianToNative!ulong(data);

// RFC 6455, 5.2, 'Payload length': If 127, the following 8 bytes
// interpreted as a 64-bit unsigned integer (the most significant
// bit MUST be 0)
enforceEx!WebSocketException(!(length >> 63),
"Received length has a non-zero most significant bit");

}
logDebug("Read frame: %s %s %s length=%d",
frame.opcode,
frame.fin ? "final frame" : "continuation",
masked ? "masked" : "not masked",
length);

// Masking key is 32 bits / uint
if (masked)
stream.read(data[0 .. 4]);

// Read payload
// TODO: Provide a way to limit the size read, easy
// DOS for server code here (rejectedsoftware/vibe.d#1496).
enforceEx!WebSocketException(length <= size_t.max);
frame.payload = new ubyte[cast(size_t)length];
frame.payload = new ubyte[](cast(size_t)length);
stream.read(frame.payload);

//de-masking
for( size_t i = 0; i < length; ++i ) {
frame.payload[i] = frame.payload[i] ^ maskingKey[i % 4];
}
if (masked)
foreach (size_t i; 0 .. cast(size_t)length)
frame.payload[i] = frame.payload[i] ^ data[i % 4];

return frame;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
websockets-autobahn-client
.dub
docs.json
__dummy.html
*.o
*.obj
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"name": "websockets-autobahn-client",
"dependencies": {
"vibe-d": { "path": "../../" }
},
"versions": [
"VibeDefaultMain"
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import vibe.d;

shared static this ()
{
runTask(() => runTestSuite());
}

void runTestSuite ()
{
auto count = getCaseCount();
logInfo("We're going to run %d test cases...", count);

foreach (currCase; 1 .. count)
{
auto url = URL("ws://127.0.0.1:9001/runCase?agent=vibe.d&case="
~ to!string(currCase));
logInfo("Running test case %d/%d", currCase, count);
connectWebSocket(
url, (scope ws) {
while (ws.waitForData) {
ws.receive((scope message) {
ws.send(message.readAll);
});
}
});
}
}


size_t getCaseCount (string base_addr = "ws://127.0.0.1:9001")
{
size_t ret;
auto url = URL(base_addr ~ "/getCaseCount");
connectWebSocket(
url, (scope ws) {
while (ws.waitForData) {
ret = ws.receiveText.to!size_t;
}
});
return ret;
}
6 changes: 4 additions & 2 deletions travis-ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ if [ ${BUILD_EXAMPLE=1} -eq 1 ]; then
fi
if [ ${RUN_TEST=1} -eq 1 ]; then
for ex in `\ls -1 tests/`; do
echo "[INFO] Running test $ex"
(cd tests/$ex && dub --compiler=$DC --override-config=vibe-d:core/$VIBED_DRIVER $DUB_ARGS && dub clean)
if [ -r test/$ex/dub.json ] || [ -r test/$ex/dub.sdl ]; then
echo "[INFO] Running test $ex"
(cd tests/$ex && dub --compiler=$DC --override-config=vibe-d:core/$VIBED_DRIVER $DUB_ARGS && dub clean)
fi
done
fi