Skip to content

Commit

Permalink
Assemble frames in OutgoingWebSocketMessage in a single buffer. Fixes #…
Browse files Browse the repository at this point in the history
  • Loading branch information
s-ludwig committed Jul 7, 2017
1 parent 7ec2f44 commit c0131fe
Showing 1 changed file with 66 additions and 44 deletions.
110 changes: 66 additions & 44 deletions http/vibe/http/websockets.d
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,6 @@ final class WebSocket {
*/
final class OutgoingWebSocketMessage : OutputStream {
@safe:

private {
RandomNumberStream m_rng;
Stream m_conn;
Expand All @@ -657,7 +656,7 @@ final class OutgoingWebSocketMessage : OutputStream {
bool m_finalized = false;
}

this( Stream conn, FrameOpcode frameOpcode, RandomNumberStream rng )
this(Stream conn, FrameOpcode frameOpcode, RandomNumberStream rng)
{
assert(conn !is null);
m_conn = conn;
Expand All @@ -668,34 +667,47 @@ final class OutgoingWebSocketMessage : OutputStream {
size_t write(in ubyte[] bytes, IOMode mode)
{
assert(!m_finalized);

if (!m_buffer.data.length) {
ubyte[Frame.maxHeaderSize] header_padding;
m_buffer.put(header_padding[]);
}

m_buffer.put(bytes);
return bytes.length;
}

void flush()
{
assert(!m_finalized);
Frame frame;
frame.opcode = m_frameOpcode;
frame.fin = false;
frame.payload = m_buffer.data;
frame.writeFrame(m_conn, m_rng);
m_buffer.clear();
m_conn.flush();
if (m_buffer.data.length > 0)
sendFrame(false);
}

void finalize()
{
if (m_finalized) return;
m_finalized = true;
sendFrame(true);
}

private void sendFrame(bool fin)
{
if (!m_buffer.data.length)
write(null, IOMode.once);

assert(m_buffer.data.length >= Frame.maxHeaderSize);

Frame frame;
frame.fin = true;
frame.fin = fin;
frame.opcode = m_frameOpcode;
frame.payload = m_buffer.data;
frame.writeFrame(m_conn, m_rng);
m_buffer.clear();
frame.payload = m_buffer.data[Frame.maxHeaderSize .. $];
auto hsize = frame.getHeaderSize(m_rng !is null);
auto msg = m_buffer.data[Frame.maxHeaderSize-hsize .. $];
frame.writeHeader(msg[0 .. hsize], m_rng);
m_conn.write(msg);
m_conn.flush();
m_buffer.clear();
}

alias write = OutputStream.write;
Expand All @@ -707,7 +719,6 @@ final class OutgoingWebSocketMessage : OutputStream {
*/
final class IncomingWebSocketMessage : InputStream {
@safe:

private {
RandomNumberStream m_rng;
Stream m_conn;
Expand Down Expand Up @@ -797,7 +808,7 @@ private immutable s_webSocketGuid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
* Currently only 6 values are defined, however the opcode is defined as
* taking 4 bytes.
*/
enum FrameOpcode : uint {
private enum FrameOpcode : uint {
continuation = 0x0,
text = 0x1,
binary = 0x2,
Expand All @@ -807,51 +818,62 @@ enum FrameOpcode : uint {
}


struct Frame {
private struct Frame {
@safe:
enum maxHeaderSize = 14;

bool fin;
FrameOpcode opcode;
ubyte[] payload;

void writeFrame(OutputStream stream, RandomNumberStream sys_rng)
size_t getHeaderSize(bool mask)
{
import vibe.stream.wrapper;

auto rng = StreamOutputRange(stream);
size_t ret = 1;
if (payload.length < 126) ret += 1;
else if (payload.length <= 65536) ret += 3;
else ret += 9;
if (mask) ret += 4;
return ret;
}

void writeHeader(ubyte[] dst, RandomNumberStream sys_rng)
{
ubyte[4] buff;
ubyte firstByte = cast(ubyte)opcode;
if (fin) firstByte |= 0x80;
rng.put(firstByte);

auto b1 = 0;
if (sys_rng) {
b1 = 0x80;
}

if( payload.length < 126 ) {
rng.put(std.bitmanip.nativeToBigEndian(cast(ubyte)(b1 | payload.length)));
} else if( payload.length <= 65536 ) {
buff[0] = cast(ubyte) (b1 | 126);
rng.put(buff[0 .. 1]);
rng.put(std.bitmanip.nativeToBigEndian(cast(ushort)payload.length));
dst[0] = firstByte;
dst = dst[1 .. $];

auto b1 = sys_rng ? 0x80 : 0x00;

if (payload.length < 126) {
dst[0] = cast(ubyte)(b1 | payload.length);
dst = dst[1 .. $];
} else if (payload.length <= 65536) {
dst[0] = cast(ubyte) (b1 | 126);
dst[1 .. 3] = std.bitmanip.nativeToBigEndian(cast(ushort)payload.length);
dst = dst[3 .. $];
} else {
buff[0] = cast(ubyte) (b1 | 127);
rng.put(buff[0 .. 1]);
rng.put(std.bitmanip.nativeToBigEndian(payload.length));
dst[0] = cast(ubyte) (b1 | 127);
dst[1 .. 9] = std.bitmanip.nativeToBigEndian(cast(ulong)payload.length);
dst = dst[9 .. $];
}

if (sys_rng) {
sys_rng.read(buff);
rng.put(buff);
for (size_t i = 0; i < payload.length; i++) {
payload[i] ^= buff[i % 4];
}
rng.put(payload);
}else {
rng.put(payload);
sys_rng.read(dst[0 .. 4]);
for (size_t i = 0; i < payload.length; i++)
payload[i] ^= dst[i % 4];
}
}

void writeFrame(OutputStream stream, RandomNumberStream sys_rng)
{
import vibe.stream.wrapper;

auto rng = StreamOutputRange(stream);
ubyte[maxHeaderSize] hdr;
writeHeader(hdr[], sys_rng);
rng.put(hdr[0 .. getHeaderSize(sys_rng !is null)]);
rng.flush();
stream.flush();
}
Expand Down

0 comments on commit c0131fe

Please sign in to comment.