diff --git a/CMakeLists.txt b/CMakeLists.txt index ab7b904..7a3a1ff 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -55,13 +55,6 @@ SET(lib-streamlabs-ipc_SOURCES "${PROJECT_SOURCE_DIR}/include/ipc-server-instance.hpp" "${PROJECT_SOURCE_DIR}/source/ipc-value.cpp" "${PROJECT_SOURCE_DIR}/include/ipc-value.hpp" - "${PROJECT_SOURCE_DIR}/include/os-error.hpp" - "${PROJECT_SOURCE_DIR}/source/os-namedsocket.cpp" - "${PROJECT_SOURCE_DIR}/include/os-namedsocket.hpp" - "${PROJECT_SOURCE_DIR}/source/os-signal.cpp" - "${PROJECT_SOURCE_DIR}/include/os-signal.hpp" - "${PROJECT_SOURCE_DIR}/source/os-signal-win.cpp" - "${PROJECT_SOURCE_DIR}/include/os-signal-win.hpp" "${PROJECT_SOURCE_DIR}/include/util.h" ) SET(Protobuf_IMPORT_DIRS @@ -95,18 +88,13 @@ IF(WIN32) add_definitions(-D_CRT_SECURE_NO_WARNINGS) LIST(APPEND lib-streamlabs-ipc_SOURCES - "${PROJECT_SOURCE_DIR}/include/os-namedsocket-win.hpp" - "${PROJECT_SOURCE_DIR}/source/os-namedsocket-win.cpp" ) LIST(APPEND lib-streamlabs-ipc_DEPS - advapi32 - ole32 ) ELSEIF(APPLE) # MacOSX LIST(APPEND lib-streamlabs-ipc_SOURCES - "${PROJECT_SOURCE_DIR}/source/os-namedsocket-mac.cpp" ) LIST(APPEND lib-streamlabs-ipc_DEPS ) @@ -114,7 +102,6 @@ ELSEIF("${CMAKE_SYSTEM_NAME}" MATCHES "Linux") # Linux LIST(APPEND lib-streamlabs-ipc_SOURCES - "${PROJECT_SOURCE_DIR}/source/os-namedsocket-linux.cpp" ) LIST(APPEND lib-streamlabs-ipc_DEPS ) @@ -122,7 +109,6 @@ ELSEIF("${CMAKE_SYSTEM_NAME}" MATCHES "FreeBSD") # FreeBSD LIST(APPEND lib-streamlabs-ipc_SOURCES - "${PROJECT_SOURCE_DIR}/source/os-namedsocket-freebsd.cpp" ) LIST(APPEND lib-streamlabs-ipc_DEPS ) @@ -139,6 +125,7 @@ INCLUDE_DIRECTORIES( ${PROJECT_SOURCE_DIR}/source ${PROJECT_SOURCE_DIR}/include ${PROJECT_SOURCE_DIR}/third-party/boost/ + ${PROJECT_SOURCE_DIR}/third-party/lib-datalane/ ${Protobuf_INCLUDE_DIRS} ${PROJECT_BINARY_DIR} ) @@ -163,9 +150,6 @@ TARGET_LINK_LIBRARIES(${PROJECT_NAME} ################################################################################ IF(BUILD_TESTS) ADD_SUBDIRECTORY(tests/shared) - ADD_SUBDIRECTORY(tests/os-namedsocket/raw-data) - ADD_SUBDIRECTORY(tests/os-namedsocket/read-no-sleep) - ADD_SUBDIRECTORY(tests/os-namedsocket/single-in-single-out) ADD_SUBDIRECTORY(tests/ipc/simple-multi-client) ADD_SUBDIRECTORY(tests/ipc/synchronous-call) ENDIF(BUILD_TESTS) diff --git a/include/ipc-client.hpp b/include/ipc-client.hpp index 55e3fcb..8191a5a 100644 --- a/include/ipc-client.hpp +++ b/include/ipc-client.hpp @@ -17,8 +17,6 @@ #pragma once #include "ipc.hpp" -#include "os-namedsocket.hpp" -#include "os-signal.hpp" #include #include #include @@ -26,11 +24,30 @@ #include #include #include +#include "source/os/windows/named-pipe.hpp" namespace ipc { typedef void(*call_return_t)(const void* data, const std::vector& rval); class client { + std::unique_ptr m_socket; + std::shared_ptr m_wop, m_rop; + std::vector m_wbuf, m_rbuf; + + bool m_authenticated = false; + std::mutex m_lock; + std::map> m_cb; + + // Threading + struct { + std::thread worker; + bool stop = false; + } m_watcher; + + void worker(); + void read_callback_init(os::error ec, size_t size); + void read_callback_msg(os::error ec, size_t size); + public: client(std::string socketPath); virtual ~client(); @@ -44,18 +61,5 @@ namespace ipc { // Temporary helper std::vector call_synchronous_helper(std::string cname, std::string fname, std::vector args, std::chrono::nanoseconds timeout = std::chrono::milliseconds(5000000000)); - - private: - std::unique_ptr m_socket; - bool m_authenticated = false; - std::map> m_cb; - - private: // Threading - bool m_stopWorkers = false; - std::thread m_worker; - std::mutex m_lock; - static void worker_thread(client* ptr); - std::shared_ptr m_readSignal; - std::shared_ptr m_writeSignal; }; } \ No newline at end of file diff --git a/include/ipc-server-instance.hpp b/include/ipc-server-instance.hpp index 2d16925..ca95bc5 100644 --- a/include/ipc-server-instance.hpp +++ b/include/ipc-server-instance.hpp @@ -17,14 +17,13 @@ #pragma once #include "ipc-server.hpp" -#include "os-namedsocket.hpp" -#include "os-signal.hpp" #include #include #include #include #include #include +#include "source/os/windows/named-pipe.hpp" namespace ipc { class server; @@ -34,7 +33,7 @@ namespace ipc { public: server_instance(); - server_instance(server* owner, std::shared_ptr conn); + server_instance(server* owner, std::shared_ptr conn); ~server_instance(); bool is_alive(); @@ -44,18 +43,19 @@ namespace ipc { std::thread m_worker; void worker(); - static void worker_main(server_instance* ptr) { - ptr->worker(); - }; + void read_callback_init(os::error ec, size_t size); + void read_callback_msg(os::error ec, size_t size); + void write_callback(os::error ec, size_t size); protected: - std::shared_ptr m_socket; + std::shared_ptr m_socket; + std::shared_ptr m_wop, m_rop; + std::vector m_wbuf, m_rbuf; + std::queue> m_write_queue; private: server* m_parent = nullptr; - os::ClientId_t m_clientId; + int64_t m_clientId; bool m_isAuthenticated = false; - std::shared_ptr m_readSignal; - std::shared_ptr m_writeSignal; }; } \ No newline at end of file diff --git a/include/ipc-server.hpp b/include/ipc-server.hpp index f5d5ea3..dd63ace 100644 --- a/include/ipc-server.hpp +++ b/include/ipc-server.hpp @@ -19,23 +19,52 @@ #include "ipc.hpp" #include "ipc-class.hpp" #include "ipc-server-instance.hpp" -#include "os-namedsocket.hpp" #include #include #include #include #include #include +#include "source/os/windows/named-pipe.hpp" namespace ipc { class server_instance; - typedef bool(*server_connect_handler_t)(void*, os::ClientId_t); - typedef void(*server_disconnect_handler_t)(void*, os::ClientId_t); - typedef void(*server_message_handler_t)(void*, os::ClientId_t, const std::vector&); + typedef bool(*server_connect_handler_t)(void*, int64_t); + typedef void(*server_disconnect_handler_t)(void*, int64_t); + typedef void(*server_message_handler_t)(void*, int64_t, const std::vector&); class server { - friend class server_instance; + bool m_isInitialized = false; + + // Functions + std::map> m_classes; + + // Socket + size_t backlog = 4; + std::mutex m_sockets_mtx; + std::list> m_sockets; + std::string m_socketPath = ""; + + // Client management. + std::mutex m_clients_mtx; + std::map, std::shared_ptr> m_clients; + + // Event Handlers + std::pair m_handlerConnect; + std::pair m_handlerDisconnect; + std::pair m_handlerMessage; + + // Worker + struct { + std::thread worker; + bool stop = false; + } m_watcher; + + void watcher(); + + void spawn_client(std::shared_ptr socket); + void kill_client(std::shared_ptr socket); public: server(); @@ -55,29 +84,9 @@ namespace ipc { bool register_collection(std::shared_ptr cls); protected: // Client -> Server - bool client_call_function(os::ClientId_t cid, std::string cname, std::string fname, + bool client_call_function(int64_t cid, std::string cname, std::string fname, std::vector& args, std::vector& rval, std::string& errormsg); - private: // Threading - std::thread m_worker; - bool m_stopWorker = false; - - static void worker_main(server* ptr); - void worker_local(); - - private: - std::unique_ptr m_socket; - bool m_isInitialized = false; - std::string m_socketPath = ""; - - // Client management. - std::mutex m_clientLock; - std::map> m_clients; - std::map> m_classes; - - // Event Handlers - std::pair m_handlerConnect; - std::pair m_handlerDisconnect; - std::pair m_handlerMessage; + friend class server_instance; }; } diff --git a/include/ipc.hpp b/include/ipc.hpp index 76903e5..9ac06c3 100644 --- a/include/ipc.hpp +++ b/include/ipc.hpp @@ -21,6 +21,18 @@ #include namespace ipc { + typedef uint32_t ipc_size_t; + + inline void make_sendable(std::vector& out, std::vector const& in) { + out.resize(in.size() + sizeof(ipc_size_t)); + memcpy(out.data() + sizeof(ipc_size_t), in.data(), in.size()); + reinterpret_cast(out[0]) = ipc_size_t(in.size()); + } + + inline ipc_size_t read_size(std::vector const& in) { + return reinterpret_cast(in[0]); + } + class base { public: static std::string make_unique_id(std::string name, std::vector parameters); diff --git a/proto/ipc.proto b/proto/ipc.proto index 50fd9cb..f338604 100644 --- a/proto/ipc.proto +++ b/proto/ipc.proto @@ -33,7 +33,6 @@ message Authenticate { } message AuthenticateReply { - string read_event = 1; - string write_event = 2; + string password = 1; } diff --git a/source/ipc-client.cpp b/source/ipc-client.cpp index 6b593ae..9a2d50b 100644 --- a/source/ipc-client.cpp +++ b/source/ipc-client.cpp @@ -17,11 +17,13 @@ #include "ipc-client.hpp" #include "ipc.pb.h" -#include "os-error.hpp" #include #include #include -#include "os-signal.hpp" +#include +#include "source/os/error.hpp" +#include "source/os/tags.hpp" +#include "source/os/windows/semaphore.hpp" #ifdef _WIN32 #define WIN32_LEAN_AND_MEAN @@ -29,104 +31,245 @@ #include #endif +using namespace std::placeholders; + static const size_t buffer_size = 128 * 1024 * 1024; -ipc::client::client(std::string socketPath) { - m_socket = os::named_socket::create(); - m_socket->set_receive_timeout(std::chrono::nanoseconds(1000000ull)); - m_socket->set_send_timeout(std::chrono::nanoseconds(1000000ull)); - m_socket->set_wait_timeout(std::chrono::nanoseconds(10000000ull)); - m_socket->set_receive_buffer_size(buffer_size); - m_socket->set_send_buffer_size(buffer_size); - if (!m_socket->connect(socketPath)) { - throw std::exception("Failed to initialize socket."); +void ipc::client::worker() { + os::error ec = os::error::Success; + std::vector proc_rval; + + while (m_socket->is_connected() && !m_watcher.stop) { + if (!m_rop || !m_rop->is_valid()) { + m_rbuf.resize(sizeof(ipc_size_t)); + ec = m_socket->read(m_rbuf.data(), m_rbuf.size(), m_rop, std::bind(&client::read_callback_init, this, _1, _2)); + if (ec != os::error::Pending && ec != os::error::Success) { + if (ec == os::error::Disconnected) { + break; + } else { + throw std::exception("Unexpected error."); + } + } + } + + os::waitable * waits[] = { m_rop.get() }; + size_t wait_index = -1; + for (size_t idx = 0; idx < 1; idx++) { + if (waits[idx] != nullptr) { + if (waits[idx]->wait(std::chrono::milliseconds(0)) == os::error::Success) { + wait_index = idx; + break; + } + } + } + if (wait_index == -1) { + os::error code = os::waitable::wait_any(waits, 1, wait_index, std::chrono::milliseconds(20)); + if (code == os::error::TimedOut) { + continue; + } else if (code == os::error::Disconnected) { + break; + } else if (code == os::error::Error) { + throw std::exception("Error"); + } + } } - m_stopWorkers = false; - // Worker is created on authenatication. + // Call any remaining callbacks. + proc_rval.resize(1); + proc_rval[0].type = ipc::type::Null; + proc_rval[0].value_str = "Lost IPC Connection"; + + { // ToDo: Figure out better way of registering functions, perhaps even a way to have "events" across a IPC connection. + std::unique_lock ulock(m_lock); + for (auto& cb : m_cb) { + cb.second.first(cb.second.second, proc_rval); + } + + m_cb.clear(); + } +} + +void ipc::client::read_callback_init(os::error ec, size_t size) { + os::error ec2 = os::error::Success; + + m_rop->invalidate(); + + if (ec == os::error::Success || ec == os::error::MoreData) { + ipc_size_t n_size = read_size(m_rbuf); + if (n_size != 0) { + m_rbuf.resize(n_size); + ec2 = m_socket->read(m_rbuf.data(), m_rbuf.size(), m_rop, std::bind(&client::read_callback_msg, this, _1, _2)); + if (ec2 != os::error::Pending && ec2 != os::error::Success) { + if (ec2 == os::error::Disconnected) { + return; + } else { + throw std::exception("Unexpected error."); + } + } + } + } +} + +void ipc::client::read_callback_msg(os::error ec, size_t size) { + ::FunctionResult proc_pb_result; + std::vector proc_rval; + std::pair cb; + + m_rop->invalidate(); + + bool success = proc_pb_result.ParsePartialFromArray(m_rbuf.data(), int(m_rbuf.size())); + if (!success) { + return; + } + + // Find the callback function. + std::unique_lock ulock(m_lock); + auto cb2 = m_cb.find(proc_pb_result.timestamp()); + if (cb2 == m_cb.end()) { + return; + } + cb = cb2->second; + + // Decode return values or errors. + if (proc_pb_result.error().length() > 0) { + proc_rval.resize(1); + proc_rval.at(0).type = ipc::type::Null; + proc_rval.at(0).value_str = proc_pb_result.error(); + } else if (proc_pb_result.value_size() > 0) { + proc_rval.resize(proc_pb_result.value_size()); + for (size_t n = 0; n < proc_rval.size(); n++) { + auto& v = proc_pb_result.value((int)n); + switch (v.value_case()) { + case ::Value::ValueCase::kValBool: + proc_rval.at(n).type = ipc::type::Int32; + proc_rval.at(n).value_union.i32 = v.val_bool() ? 1 : 0; + break; + case ::Value::ValueCase::kValFloat: + proc_rval.at(n).type = ipc::type::Float; + proc_rval.at(n).value_union.fp32 = v.val_float(); + break; + case ::Value::ValueCase::kValDouble: + proc_rval.at(n).type = ipc::type::Double; + proc_rval.at(n).value_union.fp64 = v.val_double(); + break; + case ::Value::ValueCase::kValInt32: + proc_rval.at(n).type = ipc::type::Int32; + proc_rval.at(n).value_union.i32 = v.val_int32(); + break; + case ::Value::ValueCase::kValInt64: + proc_rval.at(n).type = ipc::type::Int64; + proc_rval.at(n).value_union.i64 = v.val_int64(); + break; + case ::Value::ValueCase::kValUint32: + proc_rval.at(n).type = ipc::type::UInt32; + proc_rval.at(n).value_union.ui32 = v.val_uint32(); + break; + case ::Value::ValueCase::kValUint64: + proc_rval.at(n).type = ipc::type::UInt64; + proc_rval.at(n).value_union.ui64 = v.val_uint64(); + break; + case ::Value::ValueCase::kValString: + proc_rval.at(n).type = ipc::type::String; + proc_rval.at(n).value_str = v.val_string(); + break; + case ::Value::ValueCase::kValBinary: + proc_rval.at(n).type = ipc::type::Binary; + proc_rval.at(n).value_bin.resize(v.val_binary().size()); + memcpy(proc_rval.at(n).value_bin.data(), v.val_binary().data(), v.val_binary().size()); + break; + } + } + } + + // Call Callback + cb.first(cb.second, proc_rval); + + // Remove cb entry + /// ToDo: Figure out better way of registering functions, perhaps even a way to have "events" across a IPC connection. + m_cb.erase(proc_pb_result.timestamp()); +} + +ipc::client::client(std::string socketPath) { + m_socket = std::make_unique(os::open_only, socketPath, os::windows::pipe_read_mode::Byte); } ipc::client::~client() { - m_stopWorkers = true; - if (m_worker.joinable()) - m_worker.join(); - m_socket->close(); + m_watcher.stop = true; + if (m_watcher.worker.joinable()) { + m_watcher.worker.join(); + } + m_socket = nullptr; } bool ipc::client::authenticate() { - if (m_authenticated) - return true; + os::error ec = os::error::Success; + ::Authenticate msg; + ::AuthenticateReply rpl; - auto sock = m_socket->get_connection(); - if (sock->bad()) + if (!m_socket) return false; - ::Authenticate msg; + if (m_authenticated) + return true; + + // Build Message msg.set_password("Hello World"); // Eventually will be used. - std::string signalname = ""; -#ifdef _WIN32 - GUID guid; - CoCreateGuid(&guid); - std::vector guid_buffer(8 + 8 + 8 + 8 + 4 + 1); - snprintf(guid_buffer.data(), guid_buffer.size(), "%08X-%08X-%08X-%08X", - guid.Data1, guid.Data2, guid.Data3, *reinterpret_cast(guid.Data4)); - signalname = std::string(guid_buffer.data(), guid_buffer.data() + guid_buffer.size()); -#endif - msg.set_name(signalname); + msg.set_name(""); std::vector buf(msg.ByteSizeLong()); - if (!msg.SerializePartialToArray(buf.data(), (int)buf.size())) + if (!msg.SerializePartialToArray(buf.data(), (int)buf.size())) { return false; + } - bool success = false; - auto tp_begin = std::chrono::high_resolution_clock::now(); - while ((std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - tp_begin).count() < 100) - && sock->good()) { - - size_t temp = 0; - if (sock->write(buf.data(), buf.size(), temp) == os::error::Ok) { - #ifdef _WIN32 - Sleep(0); - #endif - success = true; - break; - } + ipc::make_sendable(m_wbuf, buf); + ec = m_socket->write(m_wbuf.data(), m_wbuf.size(), m_wop, nullptr); + if (ec != os::error::Success && ec != os::error::Pending) { + m_wop->invalidate(); + return false; } - if (!success) { + + ec = m_wop->wait(std::chrono::milliseconds(5000)); + if (ec != os::error::Success) { + m_wop->invalidate(); return false; } + m_wop->invalidate(); - // Read reply - ::AuthenticateReply rpl; - tp_begin = std::chrono::high_resolution_clock::now(); - success = false; - while ((std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - tp_begin).count() < 500000) - && sock->good()) { - if (sock->read_avail() == 0) { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - continue; - } + m_rbuf.resize(sizeof(ipc_size_t)); + ec = m_socket->read(m_rbuf.data(), m_rbuf.size(), m_rop, nullptr); + if (ec != os::error::Success && ec != os::error::Pending) { + m_rop->invalidate(); + return false; + } - size_t avail = sock->read_avail(); - buf.resize(avail); - os::error err = sock->read(buf.data(), buf.size(), avail); - if (err == os::error::Ok) { - success = true; - break; - } + ec = m_rop->wait(std::chrono::milliseconds(5000)); + if (ec != os::error::Success) { + m_rop->invalidate(); + return false; } - if (!success) { + m_rop->invalidate(); + + ipc_size_t size = ipc::read_size(m_rbuf); + m_rbuf.resize(size); + ec = m_socket->read(m_rbuf.data(), m_rbuf.size(), m_rop, nullptr); + if (ec != os::error::Success && ec != os::error::Pending) { + m_rop->invalidate(); return false; } - success = rpl.ParsePartialFromArray(buf.data(), (int)buf.size()); - if (!success) { + ec = m_rop->wait(std::chrono::milliseconds(5000)); + if (ec != os::error::Success) { + m_rop->invalidate(); return false; } - m_readSignal = os::signal::create(rpl.write_event()); - m_writeSignal = os::signal::create(rpl.read_event()); + m_rop->invalidate(); - m_worker = std::thread(std::bind(worker_thread, this)); + if (!rpl.ParsePartialFromArray(buf.data(), (int)buf.size())) { + return false; + } + + m_watcher.stop = false; + m_watcher.worker = std::thread(std::bind(&client::worker, this)); return true; } @@ -141,19 +284,20 @@ bool ipc::client::cancel(int64_t const& id) { } bool ipc::client::call(std::string cname, std::string fname, std::vector args, call_return_t fn, void* data, int64_t& cbid) { - auto sock = m_socket->get_connection(); - if (sock->bad()) - return false; - static std::mutex mtx; static uint64_t timestamp = 0; - ::FunctionCall msg; + os::error ec; + + if (!m_socket) + return false; + { std::unique_lock ulock(mtx); timestamp++; msg.set_timestamp(timestamp); } + msg.set_classname(cname); msg.set_functionname(fname); auto b = msg.mutable_arguments(); @@ -198,35 +342,35 @@ bool ipc::client::call(std::string cname, std::string fname, std::vectorbad()) - break; - - write_error = sock->write(buf.data(), buf.size(), write_length); - if ((write_error == os::error::Ok) && (write_length == buf.size())) { - m_writeSignal->set(); - #ifdef _WIN32 - Sleep(0); - #endif - return true; + ipc::make_sendable(m_wbuf, buf); + ec = m_socket->write(m_wbuf.data(), m_wbuf.size(), m_wop, nullptr); + if (ec != os::error::Success && ec != os::error::Pending) { + cancel(cbid); + if (ec == os::error::Disconnected) { + return false; + } else { + throw std::exception("Unexpected Error"); } } - if (fn != nullptr) { - std::unique_lock ulock(m_lock); - m_cb.erase(msg.timestamp()); - cbid = 0; + ec = m_wop->wait(std::chrono::milliseconds(5000)); + if (ec != os::error::Success) { + cancel(cbid); + if (ec == os::error::Disconnected) { + return false; + } else { + throw std::exception("Unexpected Error"); + } } - return false; + + return true; } std::vector ipc::client::call_synchronous_helper(std::string cname, std::string fname, std::vector args, std::chrono::nanoseconds timeout) { // Set up call reference data. struct CallData { - std::shared_ptr sgn = os::signal::create(); + std::shared_ptr sgn = std::make_shared(); std::mutex mtx; bool called = false; std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now(); @@ -242,10 +386,7 @@ std::vector ipc::client::call_synchronous_helper(std::string cname, std::copy(rval.begin(), rval.end(), std::back_inserter(cd.values)); cd.called = true; - cd.sgn->set(); - #ifdef _WIN32 - Sleep(0); - #endif + cd.sgn->signal(); }; std::unique_lock ulock(cd.mtx); @@ -256,7 +397,7 @@ std::vector ipc::client::call_synchronous_helper(std::string cname, return {}; } - cd.sgn->wait(std::chrono::nanoseconds(INFINITE * 1000000ull)); + cd.sgn->wait(timeout); if (!cd.called) { cancel(cbid); return {}; @@ -264,133 +405,3 @@ std::vector ipc::client::call_synchronous_helper(std::string cname, return std::move(cd.values); } - -void ipc::client::worker_thread(client* ptr) { - // Variables - auto conn = ptr->m_socket->get_connection(); - /// Reading - os::error read_error = os::error::Ok; - size_t read_length = 0, read_full_length = 0; - char read_buffer_temp = 0; - std::vector read_buffer; - /// Processing - ::FunctionResult proc_pb_result; - std::vector proc_rval; - - // Prepare Buffers - read_buffer.reserve(ptr->m_socket->get_receive_buffer_size()); - - while ((!ptr->m_stopWorkers) && conn->good()) { - if (conn->read_avail() == 0) { - switch (ptr->m_readSignal->wait(std::chrono::milliseconds(10))) { - default: - #ifdef _WIN32 - Sleep(0); - #endif - continue; - case os::error::Ok: - break; - } - ptr->m_readSignal->clear(); - } - - read_full_length = conn->read_avail(); - read_buffer.resize(read_full_length); - read_error = conn->read(read_buffer.data(), read_buffer.size(), read_length); - if (read_error != os::error::Ok) { - #ifdef _WIN32 - Sleep(0); - #endif - continue; - } - - // Process read message. - { - bool success = proc_pb_result.ParsePartialFromArray(read_buffer.data(), (int)read_full_length); - if (!success) { - continue; - } - - // Find the callback function. - std::pair cb; - std::unique_lock ulock(ptr->m_lock); - auto cb2 = ptr->m_cb.find(proc_pb_result.timestamp()); - if (cb2 == ptr->m_cb.end()) { - continue; - } - cb = cb2->second; - - // Decode return values or errors. - if (proc_pb_result.error().length() > 0) { - proc_rval.resize(1); - proc_rval.at(0).type = ipc::type::Null; - proc_rval.at(0).value_str = proc_pb_result.error(); - } else if (proc_pb_result.value_size() > 0) { - proc_rval.resize(proc_pb_result.value_size()); - for (size_t n = 0; n < proc_rval.size(); n++) { - auto& v = proc_pb_result.value((int)n); - switch (v.value_case()) { - case ::Value::ValueCase::kValBool: - proc_rval.at(n).type = ipc::type::Int32; - proc_rval.at(n).value_union.i32 = v.val_bool() ? 1 : 0; - break; - case ::Value::ValueCase::kValFloat: - proc_rval.at(n).type = ipc::type::Float; - proc_rval.at(n).value_union.fp32 = v.val_float(); - break; - case ::Value::ValueCase::kValDouble: - proc_rval.at(n).type = ipc::type::Double; - proc_rval.at(n).value_union.fp64 = v.val_double(); - break; - case ::Value::ValueCase::kValInt32: - proc_rval.at(n).type = ipc::type::Int32; - proc_rval.at(n).value_union.i32 = v.val_int32(); - break; - case ::Value::ValueCase::kValInt64: - proc_rval.at(n).type = ipc::type::Int64; - proc_rval.at(n).value_union.i64 = v.val_int64(); - break; - case ::Value::ValueCase::kValUint32: - proc_rval.at(n).type = ipc::type::UInt32; - proc_rval.at(n).value_union.ui32 = v.val_uint32(); - break; - case ::Value::ValueCase::kValUint64: - proc_rval.at(n).type = ipc::type::UInt64; - proc_rval.at(n).value_union.ui64 = v.val_uint64(); - break; - case ::Value::ValueCase::kValString: - proc_rval.at(n).type = ipc::type::String; - proc_rval.at(n).value_str = v.val_string(); - break; - case ::Value::ValueCase::kValBinary: - proc_rval.at(n).type = ipc::type::Binary; - proc_rval.at(n).value_bin.resize(v.val_binary().size()); - memcpy(proc_rval.at(n).value_bin.data(), v.val_binary().data(), v.val_binary().size()); - break; - } - } - } - - // Call Callback - cb.first(cb.second, proc_rval); - - // Remove cb entry - /// ToDo: Figure out better way of registering functions, perhaps even a way to have "events" across a IPC connection. - ptr->m_cb.erase(proc_pb_result.timestamp()); - } - } - - // Call any remaining callbacks. - proc_rval.resize(1); - proc_rval[0].type = ipc::type::Null; - proc_rval[0].value_str = "Lost IPC Connection"; - - { // ToDo: Figure out better way of registering functions, perhaps even a way to have "events" across a IPC connection. - std::unique_lock ulock(ptr->m_lock); - for (auto& cb : ptr->m_cb) { - cb.second.first(cb.second.second, proc_rval); - } - - ptr->m_cb.clear(); - } -} diff --git a/source/ipc-server-instance.cpp b/source/ipc-server-instance.cpp index 7ef804f..ab56cb8 100644 --- a/source/ipc-server-instance.cpp +++ b/source/ipc-server-instance.cpp @@ -17,8 +17,10 @@ #include "ipc-server-instance.hpp" #include "ipc.pb.h" -#include "os-error.hpp" #include +#include + +using namespace std::placeholders; #ifdef _WIN32 #define WIN32_LEAN_AND_MEAN @@ -27,12 +29,13 @@ ipc::server_instance::server_instance() {} -ipc::server_instance::server_instance(ipc::server* owner, std::shared_ptr conn) { +ipc::server_instance::server_instance(ipc::server* owner, std::shared_ptr conn) { m_parent = owner; m_socket = conn; - m_clientId = m_socket->get_client_id(); + m_clientId = 0; + m_stopWorkers = false; - m_worker = std::thread(worker_main, this); + m_worker = std::thread(std::bind(&server_instance::worker, this)); } ipc::server_instance::~server_instance() { @@ -43,7 +46,7 @@ ipc::server_instance::~server_instance() { } bool ipc::server_instance::is_alive() { - if (m_socket->bad()) + if (!m_socket->is_connected()) return false; if (m_stopWorkers) @@ -129,178 +132,207 @@ void EncodeIPCToProtobuf(const ipc::value& v, ::Value* val) { } void ipc::server_instance::worker() { - // Variables - /// Reading - os::error read_error = os::error::Ok; - size_t read_length = 0; - size_t read_full_length = 0; - char read_buffer_temp = 0; - std::vector read_buffer; - /// Processing - ::Authenticate proc_pb_auth; - ::AuthenticateReply proc_pb_auth_reply; - ::FunctionCall proc_pb_call; - ::FunctionResult proc_pb_result; - std::vector proc_args; - std::vector proc_rval; - ipc::value proc_tempval; - std::string proc_error; - size_t proc_reply_size = 0; - /// Writing - os::error write_error = os::error::Ok; - size_t write_length = 0; - std::vector write_buffer; - std::queue> write_queue; + os::error ec = os::error::Success; // Prepare Buffers - read_buffer.reserve(m_parent->m_socket->get_receive_buffer_size()); - write_buffer.reserve(m_parent->m_socket->get_send_buffer_size()); + m_rbuf.reserve(65535); + m_wbuf.reserve(65535); // Loop - while ((!m_stopWorkers) && m_socket->good()) { - // Attempt to clear the output queue. - if (write_queue.size() > 0) { - while ((write_queue.size() > 0) || (write_length != write_buffer.size())) { - auto& buf = write_queue.front(); - write_error = m_socket->write(buf.data(), buf.size(), write_length); - if (write_error != os::error::Ok) { + while ((!m_stopWorkers) && m_socket->is_connected()) { + if (!m_rop || !m_rop->is_valid()) { + m_rbuf.resize(sizeof(ipc_size_t)); + ec = m_socket->read(m_rbuf.data(), m_rbuf.size(), m_rop, std::bind(&server_instance::read_callback_init, this, _1, _2)); + if (ec != os::error::Pending && ec != os::error::Success) { + if (ec == os::error::Disconnected) { break; } else { - write_queue.pop(); + throw std::exception("Unexpected error."); } } - /// Flush and give up time slice. - if (m_isAuthenticated) { - m_writeSignal->set(); + } + if (!m_wop || !m_wop->is_valid()) { + if (m_write_queue.size() > 0) { + std::vector& fbuf = m_write_queue.front(); + ipc::make_sendable(m_wbuf, fbuf); + ec = m_socket->write(m_wbuf.data(), m_wbuf.size(), m_wop, std::bind(&server_instance::write_callback, this, _1, _2)); + if (ec != os::error::Pending && ec != os::error::Success) { + if (ec == os::error::Disconnected) { + break; + } else { + throw std::exception("Unexpected error."); + } + } + m_write_queue.pop(); } - #ifdef _WIN32 - Sleep(0); - #endif } - // Read Message - if (!m_isAuthenticated) { - if (m_socket->read_avail() == 0) { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + os::waitable * waits[] = { m_rop.get(), m_wop.get() }; + size_t wait_index = -1; + for (size_t idx = 0; idx < 2; idx++) { + if (waits[idx] != nullptr) { + if (waits[idx]->wait(std::chrono::milliseconds(0)) == os::error::Success) { + wait_index = idx; + break; + } + } + } + if (wait_index == -1) { + os::error code = os::waitable::wait_any(waits, 2, wait_index, std::chrono::milliseconds(20)); + if (code == os::error::TimedOut) { continue; + } else if (code == os::error::Disconnected) { + break; + } else if (code == os::error::Error) { + throw std::exception("Error"); } - } else { - if (m_socket->read_avail() == 0) { - switch (m_readSignal->wait(std::chrono::milliseconds(10))) { - case os::error::Error: - case os::error::Abandoned: - case os::error::TimedOut: - #ifdef _WIN32 - Sleep(0); - #endif - continue; + } + } +} + +void ipc::server_instance::read_callback_init(os::error ec, size_t size) { + os::error ec2 = os::error::Success; + + m_rop->invalidate(); + + if (ec == os::error::Success || ec == os::error::MoreData) { + ipc_size_t n_size = read_size(m_rbuf); + if (n_size != 0) { + m_rbuf.resize(n_size); + ec2 = m_socket->read(m_rbuf.data(), m_rbuf.size(), m_rop, std::bind(&server_instance::read_callback_msg, this, _1, _2)); + if (ec2 != os::error::Pending && ec2 != os::error::Success) { + if (ec2 == os::error::Disconnected) { + return; + } else { + throw std::exception("Unexpected error."); } } - m_readSignal->clear(); } + } +} - read_full_length = m_socket->read_avail(); - read_buffer.resize(read_full_length); - read_error = m_socket->read(read_buffer.data(), read_full_length, read_length); - if (read_error != os::error::Ok) { - continue; - } +void ipc::server_instance::read_callback_msg(os::error ec, size_t size) { + /// Processing + ::Authenticate proc_pb_auth; + ::AuthenticateReply proc_pb_auth_reply; + ::FunctionCall proc_pb_call; + ::FunctionResult proc_pb_result; + std::vector proc_args; + std::vector proc_rval; + ipc::value proc_tempval; + std::string proc_error; + size_t proc_reply_size = 0; + std::vector write_buffer; - // Decode the new message. - bool success = false; - if (!m_isAuthenticated) { - // Client is not authenticated. + m_rop->invalidate(); - /// Authentication is required so the two sides have an event to actually wait on. This is necessary to - /// avoid a race condition that would cause us to lose data to a read that timed out just before it was - /// done. + if (ec != os::error::Success) { + return; + } - success = proc_pb_auth.ParsePartialFromArray(read_buffer.data(), int(read_full_length)); - if (!success) { - continue; - } + bool success = false; + if (!m_isAuthenticated) { + // Client is not authenticated. + /// Authentication is required so that both sides know that the other is ready. - std::string read_event_name = "Global\\" + proc_pb_auth.name() + "_r"; - std::string write_event_name = "Global\\" + proc_pb_auth.name() + "_w"; - m_readSignal = os::signal::create(read_event_name); - m_writeSignal = os::signal::create(write_event_name); + success = proc_pb_auth.ParsePartialFromArray(m_rbuf.data(), int(m_rbuf.size())); + if (!success) { + std::cerr << "Failed to parse Authenticate message." << std::endl; + return; + } - proc_pb_auth_reply.Clear(); - proc_pb_auth_reply.set_read_event(read_event_name); - proc_pb_auth_reply.set_write_event(write_event_name); + proc_pb_auth_reply.Clear(); + proc_pb_auth_reply.set_password(proc_pb_auth.password()); - // Encode - proc_reply_size = proc_pb_auth_reply.ByteSizeLong(); - if (proc_reply_size == 0) { - continue; - } + // Encode + proc_reply_size = proc_pb_auth_reply.ByteSizeLong(); + if (proc_reply_size == 0) { + std::cerr << "Failed to encode AuthenticateReply message." << std::endl; + return; + } - write_buffer.resize(proc_reply_size); - success = proc_pb_auth_reply.SerializePartialToArray(write_buffer.data(), int(proc_reply_size)); - if (!success) { - continue; - } + write_buffer.resize(proc_reply_size); + success = proc_pb_auth_reply.SerializePartialToArray(write_buffer.data(), int(proc_reply_size)); + if (!success) { + std::cerr << "Failed to serialize AuthenticateReply message." << std::endl; + return; + } - m_isAuthenticated = true; - } else { - // Client is authenticated. + m_isAuthenticated = true; + } else { + // Client is authenticated. - // Parse - success = proc_pb_call.ParsePartialFromArray(read_buffer.data(), int(read_full_length)); - if (!success) { - continue; - } + // Parse + success = proc_pb_call.ParsePartialFromArray(m_rbuf.data(), int(m_rbuf.size())); + if (!success) { + std::cerr << "Failed to parse FunctionCall message." << std::endl; + return; + } - // Decode Arguments - proc_args.resize(proc_pb_call.arguments_size()); - for (size_t n = 0; n < proc_args.size(); n++) { - DecodeProtobufToIPC(proc_pb_call.arguments((int)n), proc_tempval); - proc_args[n] = std::move(proc_tempval); - } + // Decode Arguments + proc_args.resize(proc_pb_call.arguments_size()); + for (size_t n = 0; n < proc_args.size(); n++) { + DecodeProtobufToIPC(proc_pb_call.arguments((int)n), proc_tempval); + proc_args[n] = std::move(proc_tempval); + } - // Execute - proc_pb_result.Clear(); - proc_pb_result.set_timestamp(proc_pb_call.timestamp()); - proc_rval.resize(0); - success = m_parent->client_call_function(m_clientId, - proc_pb_call.classname(), proc_pb_call.functionname(), - proc_args, proc_rval, proc_error); - if (success) { - for (size_t n = 0; n < proc_rval.size(); n++) { - ::Value* rv = proc_pb_result.add_value(); - EncodeIPCToProtobuf(proc_rval[n], rv); - } - } else { - proc_pb_result.set_error(proc_error); + // Execute + proc_pb_result.Clear(); + proc_pb_result.set_timestamp(proc_pb_call.timestamp()); + proc_rval.resize(0); + success = m_parent->client_call_function(m_clientId, + proc_pb_call.classname(), proc_pb_call.functionname(), + proc_args, proc_rval, proc_error); + if (success) { + for (size_t n = 0; n < proc_rval.size(); n++) { + ::Value* rv = proc_pb_result.add_value(); + EncodeIPCToProtobuf(proc_rval[n], rv); } + } else { + proc_pb_result.set_error(proc_error); + } - // Encode - proc_reply_size = proc_pb_result.ByteSizeLong(); - if (proc_reply_size == 0) { - continue; - } + // Encode + proc_reply_size = proc_pb_result.ByteSizeLong(); + if (proc_reply_size == 0) { + std::cerr << "Failed to encode FunctionResult message." << std::endl; + return; + } - write_buffer.resize(proc_reply_size); - success = proc_pb_result.SerializePartialToArray(write_buffer.data(), int(proc_reply_size)); - if (!success) { - continue; - } + write_buffer.resize(proc_reply_size); + success = proc_pb_result.SerializePartialToArray(write_buffer.data(), int(proc_reply_size)); + if (!success) { + std::cerr << "Failed to serialize FunctionResult message." << std::endl; + return; } + } - // Write new output. - write_error = m_socket->write(write_buffer.data(), write_buffer.size(), write_length); - if ((write_error != os::error::Ok) || (write_length != write_buffer.size())) { - // Failed to write? Put it into the queue. - write_queue.push(std::move(write_buffer)); - write_buffer.reserve(m_parent->m_socket->get_send_buffer_size()); - } else { - /// Flush and give up current time slice to any signaled threads. - if (m_isAuthenticated) { - m_writeSignal->set(); + if (write_buffer.size() != 0) { + if ((!m_wop || !m_wop->is_valid()) && (m_write_queue.size() == 0)) { + ipc::make_sendable(m_wbuf, write_buffer); + os::error ec2 = m_socket->write(m_wbuf.data(), m_wbuf.size(), m_wop, std::bind(&server_instance::write_callback, this, _1, _2)); + if (ec2 != os::error::Success && ec2 != os::error::Pending) { + if (ec2 == os::error::Disconnected) { + return; + } else { + throw std::exception("Unexpected Error"); + } } - #ifdef _WIN32 - Sleep(0); - #endif + } else { + m_write_queue.push(std::move(write_buffer)); } } } + +void ipc::server_instance::write_callback(os::error ec, size_t size) { + m_wop->invalidate(); + // Do we need to do anything here? Not really. + + // Uncomment this to give up the rest of the time slice to the next thread. + // Not recommended since we do this anyway with the next wait. + /* + #ifdef _WIN32 + Sleep(0); + #endif + */ +} diff --git a/source/ipc-server.cpp b/source/ipc-server.cpp index df44895..5f343dd 100644 --- a/source/ipc-server.cpp +++ b/source/ipc-server.cpp @@ -18,37 +18,148 @@ #include "ipc-server.hpp" #include "ipc.pb.h" #include +#include "source/os/error.hpp" +#include "source/os/tags.hpp" static const size_t buffer_size = 128 * 1024 * 1024; +void ipc::server::watcher() { + os::error ec; + + struct pending_accept { + std::shared_ptr op; + std::chrono::high_resolution_clock::time_point start; + }; + + std::map, pending_accept> pa_map; + + while (!m_watcher.stop) { + { // Always try to keep sockets in a connected state. + std::unique_lock ul(m_sockets_mtx); + for (std::shared_ptr socket : m_sockets) { + if (!socket->is_connected()) { + if (!pa_map.count(socket)) { + pending_accept pa; + pa.start = std::chrono::high_resolution_clock::now(); + ec = socket->accept(pa.op, nullptr); + if (ec == os::error::Success) { + pa_map.insert_or_assign(socket, pa); + } + } else { // Client is no longer there, kill. + kill_client(socket); + } + } else { + std::unique_lock ul(m_clients_mtx); + if (!m_clients.count(socket)) { + ul.unlock(); + ul.release(); + spawn_client(socket); + } + } + } + } + + // Wait for sockets to connect. + std::vector waits; + std::vector> idx_to_socket; + for (auto kv : pa_map) { + waits.push_back(kv.second.op.get()); + idx_to_socket.push_back(kv.first); + } + + if (waits.size() == 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + continue; + } + + size_t idx = -1; + ec = os::waitable::wait_any(waits, idx, std::chrono::milliseconds(20)); + if (ec == os::error::TimedOut) { + continue; + } else if (ec == os::error::Success) { + pending_accept pa; + auto kv = pa_map.find(idx_to_socket[idx]); + if (kv != pa_map.end()) { + pa_map.erase(idx_to_socket[idx]); + // New client, spawn. + spawn_client(idx_to_socket[idx]); + } + } else { + // Unknown error. + } + } +} + +void ipc::server::spawn_client(std::shared_ptr socket) { + std::unique_lock ul(m_clients_mtx); + std::shared_ptr client = std::make_shared(this, socket); + if (m_handlerConnect.first) { + m_handlerConnect.first(m_handlerConnect.second, 0); + } + m_clients.insert_or_assign(socket, client); +} + +void ipc::server::kill_client(std::shared_ptr socket) { + std::unique_lock ul(m_clients_mtx); + if (m_handlerDisconnect.first) { + m_handlerDisconnect.first(m_handlerDisconnect.second, 0); + } + m_clients.erase(socket); +} + ipc::server::server() { - m_socket = os::named_socket::create(); - m_socket->set_send_timeout(std::chrono::nanoseconds(1000000ull)); - m_socket->set_receive_timeout(std::chrono::nanoseconds(1000000ull)); - m_socket->set_receive_buffer_size(buffer_size); - m_socket->set_send_buffer_size(buffer_size); + // Start Watcher + m_watcher.stop = false; + m_watcher.worker = std::thread(std::bind(&ipc::server::watcher, this)); } ipc::server::~server() { finalize(); + + m_watcher.stop = true; + if (m_watcher.worker.joinable()) { + m_watcher.worker.join(); + } } void ipc::server::initialize(std::string socketPath) { - if (!m_socket->listen(socketPath, 4)) - throw std::exception("Failed to initialize socket."); - m_worker = std::thread(worker_main, this); + // Start a few sockets. + + try { + std::unique_lock ul(m_sockets_mtx); + m_sockets.insert(m_sockets.end(), + std::make_shared(os::create_only, socketPath, 255, + os::windows::pipe_type::Byte, os::windows::pipe_read_mode::Byte, true)); + for (size_t idx = 1; idx < backlog; idx++) { + m_sockets.insert(m_sockets.end(), + std::make_shared(os::create_only, socketPath, 255, + os::windows::pipe_type::Byte, os::windows::pipe_read_mode::Byte, false)); + } + } catch (std::exception e) { + throw e; + } + m_isInitialized = true; m_socketPath = socketPath; } void ipc::server::finalize() { - if (m_isInitialized) { - m_stopWorker = true; - if (m_worker.joinable()) - m_worker.join(); + if (!m_isInitialized) { + return; + } + + // Lock sockets mutex so that watcher pauses. + std::unique_lock ul(m_sockets_mtx); + + { // Kill/Disconnect any clients + for (auto kv : m_clients) { + kill_client(kv.first); + } m_clients.clear(); - m_socket->close(); } + + // Kill any remaining sockets + m_sockets.clear(); } void ipc::server::set_connect_handler(server_connect_handler_t handler, void* data) { @@ -75,7 +186,7 @@ bool ipc::server::register_collection(std::shared_ptr cls) { return true; } -bool ipc::server::client_call_function(os::ClientId_t cid, std::string cname, std::string fname, std::vector& args, std::vector& rval, std::string& errormsg) { +bool ipc::server::client_call_function(int64_t cid, std::string cname, std::string fname, std::vector& args, std::vector& rval, std::string& errormsg) { if (m_classes.count(cname) == 0) { errormsg = "Class '" + cname + "' is not registered."; return false; @@ -92,39 +203,3 @@ bool ipc::server::client_call_function(os::ClientId_t cid, std::string cname, st return true; } - -void ipc::server::worker_main(server* ptr) { - ptr->worker_local(); -} - -void ipc::server::worker_local() { - std::queue dcQueue; - while (m_stopWorker == false) { - std::shared_ptr conn = m_socket->accept().lock(); - if (conn) { - bool allow = true; - if (m_handlerConnect.first != nullptr) - allow = m_handlerConnect.first(m_handlerConnect.second, conn->get_client_id()); - - if (allow && conn->connect()) { - std::unique_lock ulock(m_clientLock); - std::shared_ptr instance = std::make_shared(this, conn); - m_clients.insert(std::make_pair(conn->get_client_id(), instance)); - } - } - - for (auto kv = m_clients.begin(); kv != m_clients.end(); kv++) { - if (!kv->second->m_socket->is_connected()) - dcQueue.push(kv->first); - } - while (dcQueue.size() > 0) { - os::ClientId_t id = dcQueue.front(); - if (m_handlerDisconnect.first != nullptr) - m_handlerDisconnect.first(m_handlerDisconnect.second, id); - m_clients.erase(id); - dcQueue.pop(); - } - - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } -}