Skip to content

Commit

Permalink
Merge #8: Add Connnect/Serve/Spawn/Wait functions
Browse files Browse the repository at this point in the history
f324c66 Add Connnect/Serve/Spawn/Wait functions (Russell Yanofsky)

Pull request description:

  Add forking and connecting functions to pull more IPC code out of bitcoin/bitcoin#10102 into libmultiprocess

Top commit has no ACKs.

Tree-SHA512: dcb40c4de2afe7d6ebb5b97376abe03f3f6d31d150276961ef1237f0438ad7b5fd890890ae7bf06254abe6c969a92dd44ad764a0a181f55cdfd409b702db5e22
  • Loading branch information
ryanofsky committed Aug 6, 2019
2 parents 8125688 + f324c66 commit dee0711
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 2 deletions.
35 changes: 35 additions & 0 deletions include/mp/proxy-io.h
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,41 @@ struct ThreadContext
bool loop_thread = false;
};

//! Given stream file descriptor, make a new ProxyClient object to send requests
//! over the stream. Also create a new Connection object embedded in the
//! client that is freed when the client is closed.
template <typename InitInterface>
std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, int fd, bool add_client)
{
typename InitInterface::Client init_client(nullptr);
std::unique_ptr<Connection> connection;
loop.sync([&] {
auto stream =
loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
connection = std::make_unique<Connection>(loop, kj::mv(stream), add_client);
init_client = connection->m_rpc_system.bootstrap(ServerVatId().vat_id).castAs<InitInterface>();
Connection* connection_ptr = connection.get();
connection->onDisconnect([&loop, connection_ptr] {
loop.log() << "IPC client: unexpected network disconnect.";
delete connection_ptr;
});
});
return std::make_unique<ProxyClient<InitInterface>>(
kj::mv(init_client), connection.release(), /* destroy_connection= */ true);
}

//! Given stream and a callback to construct a new ProxyServer object that
//! handles requests from the stream, create a new Connection callback, pass it
//! to the callback, use the returned ProxyServer to handle requests, and delete
//! the proxyserver if the connection is disconnected.
//! This should be called from the event loop thread.
void ServeStream(EventLoop& loop,
kj::Own<kj::AsyncIoStream>&& stream,
std::function<capnp::Capability::Client(Connection&)> make_server);

//! Same as above but accept file descriptor rather than stream object.
void ServeStream(EventLoop& loop, int fd, std::function<capnp::Capability::Client(Connection&)> make_server);

extern thread_local ThreadContext g_thread_context;

} // namespace mp
Expand Down
18 changes: 18 additions & 0 deletions include/mp/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <tuple>
#include <type_traits>
#include <utility>
#include <vector>

namespace mp {

Expand Down Expand Up @@ -360,6 +361,23 @@ std::string ThreadName(const char* exe_name);
//! errors in python unit tests.
std::string LogEscape(const kj::StringTree& string);

//! Callback type used by SpawnProcess below.
using FdToArgsFn = std::function<std::vector<std::string>(int fd)>;

//! Spawn a new process that communicates with the current process over a socket
//! pair. Returns pid through an output argument, and file descriptor for the
//! local side of the socket. Invokes fd_to_args callback with the remote file
//! descriptor number which returns the command line arguments that should be
//! used to execute the process, and which should have the remote file
//! descriptor embedded in whatever format the child process expects.
int SpawnProcess(int& pid, FdToArgsFn&& fd_to_args);

//! Call execvp with vector args.
void ExecProcess(const std::vector<std::string>& args);

//! Wait for a process to exit and return its exit code.
int WaitProcess(int pid);

inline char* CharCast(char* c) { return c; }
inline char* CharCast(unsigned char* c) { return (char*)c; }
inline const char* CharCast(const char* c) { return c; }
Expand Down
20 changes: 19 additions & 1 deletion src/mp/proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,10 +311,28 @@ kj::Promise<void> ProxyServer<ThreadMap>::makeThread(MakeThreadContext context)

std::atomic<int> server_reqs{0};


std::string LongThreadName(const char* exe_name)
{
return g_thread_context.thread_name.empty() ? ThreadName(exe_name) : g_thread_context.thread_name;
}

void ServeStream(EventLoop& loop,
kj::Own<kj::AsyncIoStream>&& stream,
std::function<capnp::Capability::Client(Connection&)> make_server)
{
loop.m_incoming_connections.emplace_front(loop, kj::mv(stream), make_server);
auto it = loop.m_incoming_connections.begin();
it->onDisconnect([&loop, it] {
loop.log() << "IPC server: socket disconnected.";
loop.m_incoming_connections.erase(it);
});
}

void ServeStream(EventLoop& loop, int fd, std::function<capnp::Capability::Client(Connection&)> make_server)
{
ServeStream(loop,
loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
std::move(make_server));
}

} // namespace mp
65 changes: 64 additions & 1 deletion src/mp/util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,32 @@
#include <mp/util.h>

#include <kj/array.h>
#include <mp/proxy.h>
#include <pthread.h>
#include <sstream>
#include <stdio.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/un.h>
#include <sys/wait.h>
#include <syscall.h>
#include <unistd.h>

namespace mp {
namespace {

//! Return highest possible file descriptor.
size_t MaxFd()
{
struct rlimit nofile;
if (getrlimit(RLIMIT_NOFILE, &nofile) == 0) {
return nofile.rlim_cur - 1;
} else {
return 1023;
}
}

} // namespace

std::string ThreadName(const char* exe_name)
{
Expand Down Expand Up @@ -54,4 +72,49 @@ std::string LogEscape(const kj::StringTree& string)
return result;
}

int SpawnProcess(int& pid, FdToArgsFn&& fd_to_args)
{
int fds[2];
if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds) != 0) {
throw std::system_error(errno, std::system_category());
}

pid = fork();
if (close(fds[pid ? 0 : 1]) != 0) {
throw std::system_error(errno, std::system_category());
}
if (!pid) {
int maxFd = MaxFd();
for (int fd = 3; fd < maxFd; ++fd) {
if (fd != fds[0]) {
close(fd);
}
}
ExecProcess(fd_to_args(fds[0]));
}
return fds[1];
}

void ExecProcess(const std::vector<std::string>& args)
{
std::vector<char*> argv;
for (const auto& arg : args) {
argv.push_back(const_cast<char*>(arg.c_str()));
}
argv.push_back(nullptr);
if (execvp(argv[0], argv.data()) != 0) {
perror("execlp failed");
_exit(1);
}
}

int WaitProcess(int pid)
{
int status;
if (::waitpid(pid, &status, 0 /* options */) != pid) {
throw std::system_error(errno, std::system_category());
}
return status;
}

} // namespace mp

0 comments on commit dee0711

Please sign in to comment.