Skip to content

Commit

Permalink
HttpServer:WebSocket support: (#1265)
Browse files Browse the repository at this point in the history
- Fixes #1262
- Fixed memory leak(s).
Core: Added "endless" memory stream.
  • Loading branch information
slaff authored Oct 18, 2017
1 parent 6ac7c4a commit 77cc289
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 18 deletions.
62 changes: 62 additions & 0 deletions Sming/SmingCore/DataSourceStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ uint16_t FileStream::readMemoryBlock(char* data, int bufSize)
int len = min(bufSize, size - pos);
int available = fileRead(handle, data, len);
fileSeek(handle, pos, eSO_FileStart); // Don't move cursor now (waiting seek)
if(available < 0) {
available = 0;
}
return available;
}

Expand Down Expand Up @@ -359,3 +362,62 @@ int JsonObjectStream::length()

return rootNode.measureLength();
}

EndlessMemoryStream::~EndlessMemoryStream()
{
delete stream;
stream = NULL;
}

StreamType EndlessMemoryStream::getStreamType()
{
return eSST_Memory;
}

uint16_t EndlessMemoryStream::readMemoryBlock(char* data, int bufSize)
{
if(stream == NULL) {
return 0;
}

return stream->readMemoryBlock(data, bufSize);
}

//Use base class documentation
bool EndlessMemoryStream::seek(int len)
{
if(stream == NULL) {
return false;
}

int res = stream->seek(len);
if(stream->isFinished()) {
delete stream;
stream = NULL;
}

return res;
}

size_t EndlessMemoryStream::write(uint8_t charToWrite)
{
if(stream == NULL) {
stream = new MemoryDataStream();
}

return stream->write(charToWrite);
}

size_t EndlessMemoryStream::write(const uint8_t *buffer, size_t size)
{
if(stream == NULL) {
stream = new MemoryDataStream();
}

return stream->write(buffer, size);
}

bool EndlessMemoryStream::isFinished()
{
return false;
}
38 changes: 35 additions & 3 deletions Sming/SmingCore/DataSourceStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class ReadWriteStream : public IDataSourceStream

/** @brief Write chars to stream
* @param buffer Pointer to buffer to write to the stream
* @param size Quantity of chars to writen
* @param size Quantity of chars to write
* @retval size_t Quantity of chars written to stream
*/
virtual size_t write(const uint8_t *buffer, size_t size) = 0;
Expand Down Expand Up @@ -145,7 +145,7 @@ class MemoryDataStream : public Print, public ReadWriteStream

/** @brief Write chars to stream
* @param buffer Pointer to buffer to write to the stream
* @param size Quantity of chars to writen
* @param size Quantity of chars to write
* @retval size_t Quantity of chars written to stream
*/
virtual size_t write(const uint8_t *buffer, size_t size);
Expand Down Expand Up @@ -256,7 +256,7 @@ class TemplateFileStream : public FileStream
/** @brief Set value of a variable in the template file
* @param name Name of variable
* @param value Value to assign to the variable
* @note Sets and existing varible or adds a new variable if variable does not already exist
* @note Sets and existing variable or adds a new variable if variable does not already exist
*/
void setVar(String name, String value);

Expand Down Expand Up @@ -317,5 +317,37 @@ class JsonObjectStream : public MemoryDataStream
bool send;
};

class EndlessMemoryStream: public ReadWriteStream
{
public:
virtual ~EndlessMemoryStream();

//Use base class documentation
virtual StreamType getStreamType();

virtual uint16_t readMemoryBlock(char* data, int bufSize);

//Use base class documentation
virtual bool seek(int len);

/** @brief Write a single char to stream
* @param charToWrite Char to write to the stream
* @retval size_t Quantity of chars written to stream (always 1)
*/
virtual size_t write(uint8_t charToWrite);

/** @brief Write chars to stream
* @param buffer Pointer to buffer to write to the stream
* @param size Quantity of chars to write
* @retval size_t Quantity of chars written to stream
*/
virtual size_t write(const uint8_t *buffer, size_t size);

virtual bool isFinished();

private:
MemoryDataStream* stream = NULL;
};

/** @} */
#endif /* _SMING_CORE_DATASTREAM_H_ */
4 changes: 4 additions & 0 deletions Sming/SmingCore/Network/Http/HttpResource.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ typedef Delegate<void(HttpRequest&, HttpResponse&)> HttpPathDelegate; // << depr
class HttpResource {
public:
virtual ~HttpResource() {}
/**
* @brief Takes care to cleanup the connection
*/
virtual void shutdown(HttpServerConnection& connection) {}

public:
HttpServerConnectionBodyDelegate onBody = 0; // << called when the resource wants to process the raw body data
Expand Down
3 changes: 3 additions & 0 deletions Sming/SmingCore/Network/Http/HttpServerConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ HttpServerConnection::HttpServerConnection(tcp_pcb *clientTcp)

HttpServerConnection::~HttpServerConnection()
{
if(this->resource) {
this->resource->shutdown(*this);
}
}

void HttpServerConnection::setResourceTree(ResourceTree* resourceTree)
Expand Down
25 changes: 14 additions & 11 deletions Sming/SmingCore/Network/Http/Websocket/WebSocketConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ WebSocketConnection::WebSocketConnection(HttpServerConnection* conn)

WebSocketConnection::~WebSocketConnection()
{
websocketList.removeElement(*this);
websocketList.removeElement(this);
}

bool WebSocketConnection::initialize(HttpRequest& request, HttpResponse& response)
Expand All @@ -41,9 +41,11 @@ bool WebSocketConnection::initialize(HttpRequest& request, HttpResponse& respons
response.setHeader("Upgrade", "websocket");
response.setHeader("Sec-WebSocket-Accept", secure);

connection->userData = (void *)this;
delete stream;
stream = new EndlessMemoryStream();
response.sendDataStream(stream);

websocketList.addElement(*this);
connection->userData = (void *)this;

memset(&parserSettings, 0, sizeof(parserSettings));
parserSettings.on_data_begin = staticOnDataBegin;
Expand All @@ -56,6 +58,7 @@ bool WebSocketConnection::initialize(HttpRequest& request, HttpResponse& respons
ws_parser_init(&parser, &parserSettings);
parser.user_data = (void*)this;

websocketList.addElement(this);
if(wsConnect) {
wsConnect(*this);
}
Expand Down Expand Up @@ -152,19 +155,21 @@ int WebSocketConnection::staticOnControlEnd(void* userData)

void WebSocketConnection::send(const char* message, int length, wsFrameType type /* = WS_TEXT_FRAME*/)
{
if(stream == NULL) {
return;
}

uint8_t frameHeader[16] = {0};
size_t headSize = sizeof(frameHeader);
wsMakeFrame(nullptr, length, frameHeader, &headSize, type);
connection->send((const char* )frameHeader, (uint16_t )headSize);
if(length > 0) {
connection->send((const char* )message, (uint16_t )length);
}
stream->write((uint8_t *)frameHeader, (uint16_t )headSize);
stream->write((uint8_t *)message, (uint16_t )length);
}

void WebSocketConnection::broadcast(const char* message, int length, wsFrameType type /* = WS_TEXT_FRAME*/)
{
for (int i = 0; i < websocketList.count(); i++) {
websocketList[i].send(message, length, type);
websocketList[i]->send(message, length, type);
}
}

Expand All @@ -190,9 +195,8 @@ WebSocketsList& WebSocketConnection::getActiveWebSockets()

void WebSocketConnection::close()
{
websocketList.removeElement((const WebSocketConnection)*this);
websocketList.removeElement(this);
state = eWSCS_Closed;

if(wsDisconnect) {
wsDisconnect(*this);
}
Expand All @@ -210,7 +214,6 @@ void* WebSocketConnection::getUserData()
return userData;
}


void WebSocketConnection::setConnectionHandler(WebSocketDelegate handler)
{
wsConnect = handler;
Expand Down
6 changes: 3 additions & 3 deletions Sming/SmingCore/Network/Http/Websocket/WebSocketConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ extern "C" {

class WebSocketConnection;

typedef Vector<WebSocketConnection> WebSocketsList;
typedef Vector<WebSocketConnection*> WebSocketsList;

typedef Delegate<void(WebSocketConnection&)> WebSocketDelegate;
typedef Delegate<void(WebSocketConnection&, const String&)> WebSocketMessageDelegate;
Expand Down Expand Up @@ -89,9 +89,9 @@ class WebSocketConnection
ws_parser_t parser;
ws_parser_callbacks_t parserSettings;

// @deprecated
static WebSocketsList websocketList;
// @end deprecated

EndlessMemoryStream* stream = NULL;
};

#endif /* SMINGCORE_NETWORK_WEBSOCKETCONNECTION_H_ */
8 changes: 7 additions & 1 deletion Sming/SmingCore/Network/Http/Websocket/WebsocketResource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,19 @@ int WebsocketResource::checkHeaders(HttpServerConnection& connection, HttpReques
}

connection.setTimeOut(USHRT_MAX); //Disable disconnection on connection idle (no rx/tx)
connection.userData = (void *)socket;

// TODO: Re-Enable Command Executor...

return 0;
}

void WebsocketResource::shutdown(HttpServerConnection& connection)
{
WebSocketConnection* socket = (WebSocketConnection *)connection.userData;
delete socket;
connection.userData = NULL;
}

int WebsocketResource::processData(HttpServerConnection& connection, HttpRequest& request, char *at, int size)
{
WebSocketConnection *socket = (WebSocketConnection *)connection.userData;
Expand Down
2 changes: 2 additions & 0 deletions Sming/SmingCore/Network/Http/Websocket/WebsocketResource.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ class WebsocketResource: public HttpResource {
int checkHeaders(HttpServerConnection& connection, HttpRequest& request, HttpResponse& response);
int processData(HttpServerConnection& connection, HttpRequest& request, char *at, int size);

virtual void shutdown(HttpServerConnection& connection);

void setConnectionHandler(WebSocketDelegate handler);
void setMessageHandler(WebSocketMessageDelegate handler);
void setBinaryHandler(WebSocketBinaryDelegate handler);
Expand Down

0 comments on commit 77cc289

Please sign in to comment.