Skip to content

Commit

Permalink
project: Integrate lib-datalane
Browse files Browse the repository at this point in the history
This is a rather huge change that only affects how things are run internally. From the outside, everything should still run as it did before, while from the inside things now run drastically different. The new internals of lib-datalane are designed around the kernel API instead of being designed around the target use, which results in a much smoother experience, less lost data and higher control.

Internally the new API allows us to remove the use of Signals, allowing us to instead properly wait using the Kernel itself. This results in quicker and much more stabler messages. We also now send the message size with the message itself, no longer rely on the message pipe type (everything is bytes) and largely have switched to callbacks instead of one huge loop. While things still execute on the same thread anyway, this results in cleaner overall code.

ipc::server is now only using a single thread to keep track of connected and disconnected clients, which results in less overhead, but makes tracking new and old clients a little bit more difficult to do.

This breaks:
- Client Id (not implemented yet)
  • Loading branch information
Xaymar committed Jul 13, 2018
1 parent 95b37ce commit 43ecb14
Show file tree
Hide file tree
Showing 9 changed files with 619 additions and 493 deletions.
18 changes: 1 addition & 17 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,6 @@ SET(lib-streamlabs-ipc_SOURCES
"${PROJECT_SOURCE_DIR}/include/ipc-server-instance.hpp"
"${PROJECT_SOURCE_DIR}/source/ipc-value.cpp"
"${PROJECT_SOURCE_DIR}/include/ipc-value.hpp"
"${PROJECT_SOURCE_DIR}/include/os-error.hpp"
"${PROJECT_SOURCE_DIR}/source/os-namedsocket.cpp"
"${PROJECT_SOURCE_DIR}/include/os-namedsocket.hpp"
"${PROJECT_SOURCE_DIR}/source/os-signal.cpp"
"${PROJECT_SOURCE_DIR}/include/os-signal.hpp"
"${PROJECT_SOURCE_DIR}/source/os-signal-win.cpp"
"${PROJECT_SOURCE_DIR}/include/os-signal-win.hpp"
"${PROJECT_SOURCE_DIR}/include/util.h"
)
SET(Protobuf_IMPORT_DIRS
Expand Down Expand Up @@ -95,34 +88,27 @@ IF(WIN32)
add_definitions(-D_CRT_SECURE_NO_WARNINGS)

LIST(APPEND lib-streamlabs-ipc_SOURCES
"${PROJECT_SOURCE_DIR}/include/os-namedsocket-win.hpp"
"${PROJECT_SOURCE_DIR}/source/os-namedsocket-win.cpp"
)
LIST(APPEND lib-streamlabs-ipc_DEPS
advapi32
ole32
)
ELSEIF(APPLE)
# MacOSX

LIST(APPEND lib-streamlabs-ipc_SOURCES
"${PROJECT_SOURCE_DIR}/source/os-namedsocket-mac.cpp"
)
LIST(APPEND lib-streamlabs-ipc_DEPS
)
ELSEIF("${CMAKE_SYSTEM_NAME}" MATCHES "Linux")
# Linux

LIST(APPEND lib-streamlabs-ipc_SOURCES
"${PROJECT_SOURCE_DIR}/source/os-namedsocket-linux.cpp"
)
LIST(APPEND lib-streamlabs-ipc_DEPS
)
ELSEIF("${CMAKE_SYSTEM_NAME}" MATCHES "FreeBSD")
# FreeBSD

LIST(APPEND lib-streamlabs-ipc_SOURCES
"${PROJECT_SOURCE_DIR}/source/os-namedsocket-freebsd.cpp"
)
LIST(APPEND lib-streamlabs-ipc_DEPS
)
Expand All @@ -139,6 +125,7 @@ INCLUDE_DIRECTORIES(
${PROJECT_SOURCE_DIR}/source
${PROJECT_SOURCE_DIR}/include
${PROJECT_SOURCE_DIR}/third-party/boost/
${PROJECT_SOURCE_DIR}/third-party/lib-datalane/
${Protobuf_INCLUDE_DIRS}
${PROJECT_BINARY_DIR}
)
Expand All @@ -163,9 +150,6 @@ TARGET_LINK_LIBRARIES(${PROJECT_NAME}
################################################################################
IF(BUILD_TESTS)
ADD_SUBDIRECTORY(tests/shared)
ADD_SUBDIRECTORY(tests/os-namedsocket/raw-data)
ADD_SUBDIRECTORY(tests/os-namedsocket/read-no-sleep)
ADD_SUBDIRECTORY(tests/os-namedsocket/single-in-single-out)
ADD_SUBDIRECTORY(tests/ipc/simple-multi-client)
ADD_SUBDIRECTORY(tests/ipc/synchronous-call)
ENDIF(BUILD_TESTS)
Expand Down
34 changes: 19 additions & 15 deletions include/ipc-client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,37 @@

#pragma once
#include "ipc.hpp"
#include "os-namedsocket.hpp"
#include "os-signal.hpp"
#include <string>
#include <vector>
#include <map>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
#include "source/os/windows/named-pipe.hpp"

namespace ipc {
typedef void(*call_return_t)(const void* data, const std::vector<ipc::value>& rval);

class client {
std::unique_ptr<os::windows::named_pipe> m_socket;
std::shared_ptr<os::async_op> m_wop, m_rop;
std::vector<char> m_wbuf, m_rbuf;

bool m_authenticated = false;
std::mutex m_lock;
std::map<int64_t, std::pair<call_return_t, void*>> m_cb;

// Threading
struct {
std::thread worker;
bool stop = false;
} m_watcher;

void worker();
void read_callback_init(os::error ec, size_t size);
void read_callback_msg(os::error ec, size_t size);

public:
client(std::string socketPath);
virtual ~client();
Expand All @@ -44,18 +61,5 @@ namespace ipc {
// Temporary helper
std::vector<ipc::value> call_synchronous_helper(std::string cname, std::string fname, std::vector<ipc::value> args,
std::chrono::nanoseconds timeout = std::chrono::milliseconds(5000000000));

private:
std::unique_ptr<os::named_socket> m_socket;
bool m_authenticated = false;
std::map<int64_t, std::pair<call_return_t, void*>> m_cb;

private: // Threading
bool m_stopWorkers = false;
std::thread m_worker;
std::mutex m_lock;
static void worker_thread(client* ptr);
std::shared_ptr<os::signal> m_readSignal;
std::shared_ptr<os::signal> m_writeSignal;
};
}
20 changes: 10 additions & 10 deletions include/ipc-server-instance.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@

#pragma once
#include "ipc-server.hpp"
#include "os-namedsocket.hpp"
#include "os-signal.hpp"
#include <condition_variable>
#include <memory>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
#include "source/os/windows/named-pipe.hpp"

namespace ipc {
class server;
Expand All @@ -34,7 +33,7 @@ namespace ipc {

public:
server_instance();
server_instance(server* owner, std::shared_ptr<os::named_socket_connection> conn);
server_instance(server* owner, std::shared_ptr<os::windows::named_pipe> conn);
~server_instance();

bool is_alive();
Expand All @@ -44,18 +43,19 @@ namespace ipc {
std::thread m_worker;

void worker();
static void worker_main(server_instance* ptr) {
ptr->worker();
};
void read_callback_init(os::error ec, size_t size);
void read_callback_msg(os::error ec, size_t size);
void write_callback(os::error ec, size_t size);

protected:
std::shared_ptr<os::named_socket_connection> m_socket;
std::shared_ptr<os::windows::named_pipe> m_socket;
std::shared_ptr<os::async_op> m_wop, m_rop;
std::vector<char> m_wbuf, m_rbuf;
std::queue<std::vector<char>> m_write_queue;

private:
server* m_parent = nullptr;
os::ClientId_t m_clientId;
int64_t m_clientId;
bool m_isAuthenticated = false;
std::shared_ptr<os::signal> m_readSignal;
std::shared_ptr<os::signal> m_writeSignal;
};
}
63 changes: 36 additions & 27 deletions include/ipc-server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,52 @@
#include "ipc.hpp"
#include "ipc-class.hpp"
#include "ipc-server-instance.hpp"
#include "os-namedsocket.hpp"
#include <list>
#include <map>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include "source/os/windows/named-pipe.hpp"

namespace ipc {
class server_instance;

typedef bool(*server_connect_handler_t)(void*, os::ClientId_t);
typedef void(*server_disconnect_handler_t)(void*, os::ClientId_t);
typedef void(*server_message_handler_t)(void*, os::ClientId_t, const std::vector<char>&);
typedef bool(*server_connect_handler_t)(void*, int64_t);
typedef void(*server_disconnect_handler_t)(void*, int64_t);
typedef void(*server_message_handler_t)(void*, int64_t, const std::vector<char>&);

class server {
friend class server_instance;
bool m_isInitialized = false;

// Functions
std::map<std::string, std::shared_ptr<ipc::collection>> m_classes;

// Socket
size_t backlog = 4;
std::mutex m_sockets_mtx;
std::list<std::shared_ptr<os::windows::named_pipe>> m_sockets;
std::string m_socketPath = "";

// Client management.
std::mutex m_clients_mtx;
std::map<std::shared_ptr<os::windows::named_pipe>, std::shared_ptr<server_instance>> m_clients;

// Event Handlers
std::pair<server_connect_handler_t, void*> m_handlerConnect;
std::pair<server_disconnect_handler_t, void*> m_handlerDisconnect;
std::pair<server_message_handler_t, void*> m_handlerMessage;

// Worker
struct {
std::thread worker;
bool stop = false;
} m_watcher;

void watcher();

void spawn_client(std::shared_ptr<os::windows::named_pipe> socket);
void kill_client(std::shared_ptr<os::windows::named_pipe> socket);

public:
server();
Expand All @@ -55,29 +84,9 @@ namespace ipc {
bool register_collection(std::shared_ptr<ipc::collection> cls);

protected: // Client -> Server
bool client_call_function(os::ClientId_t cid, std::string cname, std::string fname,
bool client_call_function(int64_t cid, std::string cname, std::string fname,
std::vector<ipc::value>& args, std::vector<ipc::value>& rval, std::string& errormsg);

private: // Threading
std::thread m_worker;
bool m_stopWorker = false;

static void worker_main(server* ptr);
void worker_local();

private:
std::unique_ptr<os::named_socket> m_socket;
bool m_isInitialized = false;
std::string m_socketPath = "";

// Client management.
std::mutex m_clientLock;
std::map<os::ClientId_t, std::shared_ptr<server_instance>> m_clients;
std::map<std::string, std::shared_ptr<ipc::collection>> m_classes;

// Event Handlers
std::pair<server_connect_handler_t, void*> m_handlerConnect;
std::pair<server_disconnect_handler_t, void*> m_handlerDisconnect;
std::pair<server_message_handler_t, void*> m_handlerMessage;
friend class server_instance;
};
}
12 changes: 12 additions & 0 deletions include/ipc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@
#include <vector>

namespace ipc {
typedef uint32_t ipc_size_t;

inline void make_sendable(std::vector<char>& out, std::vector<char> const& in) {
out.resize(in.size() + sizeof(ipc_size_t));
memcpy(out.data() + sizeof(ipc_size_t), in.data(), in.size());
reinterpret_cast<ipc_size_t&>(out[0]) = ipc_size_t(in.size());
}

inline ipc_size_t read_size(std::vector<char> const& in) {
return reinterpret_cast<const ipc_size_t&>(in[0]);
}

class base {
public:
static std::string make_unique_id(std::string name, std::vector<type> parameters);
Expand Down
3 changes: 1 addition & 2 deletions proto/ipc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ message Authenticate {
}

message AuthenticateReply {
string read_event = 1;
string write_event = 2;
string password = 1;
}

Loading

0 comments on commit 43ecb14

Please sign in to comment.