Skip to content

Commit

Permalink
refactor(core,ipc): rework conduit frame buffer, support conduit mess…
Browse files Browse the repository at this point in the history
…aging
  • Loading branch information
jwerle committed Nov 23, 2024
1 parent 6bad643 commit ab839c8
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 40 deletions.
100 changes: 66 additions & 34 deletions src/core/modules/conduit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,6 @@ namespace SSC {
CoreConduit::Client::~Client () {
auto handle = reinterpret_cast<uv_handle_t*>(&this->handle);

if (frameBuffer) {
delete [] frameBuffer;
}

if (
this->isClosing == false &&
this->isClosed == false &&
Expand Down Expand Up @@ -269,52 +265,72 @@ namespace SSC {
int fin = data[0] & 0x80;
int opcode = data[0] & 0x0F;
int mask = data[1] & 0x80;
uint64_t payload_len = data[1] & 0x7F;
uint64_t payloadSize = data[1] & 0x7F;
size_t pos = 2;

if (opcode == 0x08) {
client->close();
return;
}

if (payload_len == 126) {
if (payloadSize == 126) {
if (len < 4) return; // too short to be valid
payload_len = (data[2] << 8) | data[3];
payloadSize = (data[2] << 8) | data[3];
pos = 4;
} else if (payload_len == 127) {
} else if (payloadSize == 127) {
if (len < 10) return; // too short to be valid
payload_len = 0;
payloadSize = 0;
for (int i = 0; i < 8; i++) {
payload_len = (payload_len << 8) | data[2 + i];
payloadSize = (payloadSize << 8) | data[2 + i];
}
pos = 10;
}

if (!mask) return;
if (len < pos + 4 + payload_len) return; // too short to be valid
if (len < pos + 4 + payloadSize) return; // too short to be valid

unsigned char masking_key[4];
memcpy(masking_key, data + pos, 4);
unsigned char maskingKey[4];
memcpy(maskingKey, data + pos, 4);
pos += 4;

if (payload_len > client->frameBufferSize) {
// TODO(@jwerle): refactor to drop usage of `realloc()`
client->frameBuffer = static_cast<unsigned char *>(realloc(client->frameBuffer, payload_len));
client->frameBufferSize = payload_len;
// resize client frame buffer if payload size is too big to fit
if (payloadSize > client->frameBuffer.size()) {
client->frameBuffer.resize(payloadSize);
}

unsigned char *payload = client->frameBuffer;

for (uint64_t i = 0; i < payload_len; i++) {
payload[i] = data[pos + i] ^ masking_key[i % 4];
for (uint64_t i = 0; i < payloadSize; ++i) {
client->frameBuffer[i] = data[pos + i] ^ maskingKey[i % 4];
}

pos += payload_len;
pos += payloadSize;

Vector<uint8_t> vec(payload, payload + payload_len);
auto decoded = this->decodeMessage(vec);
auto decoded = this->decodeMessage(client->frameBuffer.slice<uint8_t>(0, payloadSize));

if (!decoded.has("route")) {
if (decoded.has("to")) {
try {
const auto from = client->id;
const auto to = std::stoull(decoded.get("to"));
if (to != from) {
const auto app = App::sharedApplication();
const auto options = decoded.options;
const auto size = decoded.payload.size();
const auto payload = std::make_shared<char[]>(size);
memcpy(payload.get(), decoded.payload.data(), size);
app->dispatch([this, options, size, payload, from, to] () {
Lock lock(this->mutex);
auto recipient = this->clients[to];
auto client = this->clients[from];
if (client != nullptr && recipient != nullptr) {
recipient->emit(options, payload, size);
}
});
}
} catch (...) {
debug("Invalid 'to' parameter in encoded message");
}
}

// TODO(@jwerle,@heapwolf): handle this
return;
}
Expand Down Expand Up @@ -347,7 +363,7 @@ namespace SSC {
: nullptr;

if (window != nullptr) {
app->dispatch([app, window, uri, client, bytes, size]() {
app->dispatch([window, uri, bytes, size]() {
const auto invoked = window->bridge.router.invoke(
uri,
bytes,
Expand All @@ -360,7 +376,8 @@ namespace SSC {
}
});
} else {
app->dispatch([app, window, uri, client, bytes, size]() {
window = app->windowManager.getWindow(0);
app->dispatch([window, uri, client, bytes, size]() {
const auto invoked = window->bridge.router.invoke(
uri,
bytes,
Expand Down Expand Up @@ -487,6 +504,13 @@ namespace SSC {

this->isClosing = true;

do {
Lock lock(this->conduit->mutex);
if (this->conduit->clients.contains(this->id)) {
this->conduit->clients.erase(this->id);
}
} while (0);

if (handle->loop == nullptr || uv_is_closing(handle)) {
this->isClosed = true;
this->isClosing = false;
Expand All @@ -512,13 +536,6 @@ namespace SSC {
return;
}

do {
Lock lock(this->conduit->mutex);
if (this->conduit->clients.contains(this->id)) {
conduit->clients.erase(this->id);
}
} while (0);

auto shutdown = new uv_shutdown_t;
uv_handle_set_data(
reinterpret_cast<uv_handle_t*>(shutdown),
Expand Down Expand Up @@ -651,8 +668,23 @@ namespace SSC {
auto data = uv_handle_get_data(reinterpret_cast<uv_handle_t*>(stream));
auto client = static_cast<CoreConduit::Client*>(data);

if (client && !client->isClosing && nread > 0) {
if (client && !client->isClosing && !client->isClosed && nread > 0) {
if (client->isHandshakeDone) {
do {
Lock lock(client->conduit->mutex);
if (!client->conduit->clients.contains(client->id)) {
if (buf->base) {
delete [] buf->base;
}
client->close([client]() {
if (client->isClosed) {
delete client;
}
});
return;
}
} while (0);

client->conduit->processFrame(client, buf->base, nread);
} else {
client->conduit->handshake(client, buf->base);
Expand Down
51 changes: 48 additions & 3 deletions src/core/modules/conduit.hh
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,53 @@ namespace SSC {
}
};

struct FrameBuffer {
Vector<unsigned char> vector;

inline const size_t size () const {
return this->vector.size();
}

inline const unsigned char* data () const {
return this->vector.data();
}

inline void resize (const size_t size) {
this->vector.resize(size);
}

unsigned char operator [] (const unsigned int index) const {
if (index >= this->size()) {
return 0;
}

return this->vector.at(index);
}

unsigned char& operator [] (const unsigned int index) {
if (index >= this->size()) {
this->vector.resize(index + 1);
}

return this->vector.at(index);
}

const Vector<unsigned int> slice (
Vector<unsigned int>::const_iterator& begin,
Vector<unsigned int>::const_iterator& end
) {
return std::move(Vector<unsigned int>(begin, end));
}

template<typename T = unsigned int>
const Vector<T> slice (
Vector<unsigned int>::size_type start,
Vector<unsigned int>::size_type end
) {
return Vector<T>(this->vector.begin() + start, this->vector.begin() + end);
}
};

class Client {
public:
using CloseCallback = Function<void()>;
Expand All @@ -71,9 +118,7 @@ namespace SSC {
uv_stream_t* stream = nullptr;

// websocket frame buffer state
unsigned char *frameBuffer = nullptr;
size_t frameBufferSize = 0;

FrameBuffer frameBuffer;
CoreConduit* conduit = nullptr;

Client (CoreConduit* conduit)
Expand Down
29 changes: 28 additions & 1 deletion src/ipc/result.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace SSC::IPC {
this->seq = seq;
this->message = message;
this->source = message.name;
this->token = message.get("ipc-token", "");
this->post.workerId = this->message.get("runtime-worker-id");
}

Expand Down Expand Up @@ -34,19 +35,22 @@ namespace SSC::IPC {
}
}

Result::Result (const JSON::Any value) {
Result::Result (const JSON::Any value, const String& token) {
this->id = rand64();
this->value = value;
this->token = token;
}

Result::Result (const Err error): Result(error.message.seq, error.message) {
this->err = error.value;
this->token = error.message.get("ipc-token", "");
this->source = error.message.name;
}

Result::Result (const Data data): Result(data.message.seq, data.message) {
this->data = data.value;
this->post = data.post;
this->token = data.message.get("ipc-token", "");
this->source = data.message.name;
this->headers = Headers(data.post.headers);
}
Expand All @@ -58,6 +62,9 @@ namespace SSC::IPC {

if (object.has("data") || object.has("err")) {
object["source"] = this->source;
object["token"] = this->token.size() > 0
? JSON::Any(JSON::String(this->token))
: JSON::Any(JSON::Null());
object["id"] = std::to_string(this->id);
}

Expand All @@ -69,6 +76,10 @@ namespace SSC::IPC {

auto entries = JSON::Object::Entries {
{"source", this->source},
{"token", this->token.size() > 0
? JSON::Any(this->token)
: JSON::Any(JSON::Null())
},
{"id", std::to_string(this->id)}
};

Expand All @@ -78,13 +89,29 @@ namespace SSC::IPC {
if (this->err.as<JSON::Object>().has("id")) {
entries["id"] = this->err.as<JSON::Object>().get("id");
}

if (this->err.as<JSON::Object>().has("token")) {
entries["token"] = this->err.as<JSON::Object>().get("token");
}

if (this->err.as<JSON::Object>().has("source")) {
entries["source"] = this->err.as<JSON::Object>().get("source");
}
}
} else if (!this->data.isNull()) {
entries["data"] = this->data;
if (this->data.isObject()) {
if (this->data.as<JSON::Object>().has("id")) {
entries["id"] = this->data.as<JSON::Object>().get("id");
}

if (this->data.as<JSON::Object>().has("token")) {
entries["token"] = this->data.as<JSON::Object>().get("token");
}

if (this->data.as<JSON::Object>().has("source")) {
entries["source"] = this->data.as<JSON::Object>().get("source");
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/ipc/result.hh
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ namespace SSC::IPC {
Message::Seq seq = "-1";
uint64_t id = 0;
String source = "";
String token = "";
JSON::Any value = nullptr;
JSON::Any data = nullptr;
JSON::Any err = nullptr;
Expand All @@ -42,7 +43,7 @@ namespace SSC::IPC {

Result () = default;
Result (const Result&) = default;
Result (const JSON::Any);
Result (const JSON::Any, const String& token = "");
Result (const Err error);
Result (const Data data);
Result (const Message::Seq&, const Message&);
Expand Down
2 changes: 1 addition & 1 deletion src/ipc/routes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3559,8 +3559,8 @@ static void mapIPCRoutes (Router *router) {
});
}

targetWindow->eval(getEmitToRenderProcessJavaScript(event, value));
app->dispatch([=]() {
targetWindow->eval(getEmitToRenderProcessJavaScript(event, value));
reply(Result { message.seq, message });
});
});
Expand Down

0 comments on commit ab839c8

Please sign in to comment.