diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am index edbe54be1e7..59e79ca4f11 100644 --- a/3rdparty/libprocess/Makefile.am +++ b/3rdparty/libprocess/Makefile.am @@ -33,12 +33,19 @@ libprocess_la_SOURCES = \ src/config.hpp \ src/decoder.hpp \ src/encoder.hpp \ + src/event_manager.cpp \ + src/event_manager.hpp \ + src/event_manager_base.hpp \ src/gate.hpp \ src/http.cpp \ + src/http_proxy.cpp \ src/latch.cpp \ + src/libev_event_manager.cpp \ + src/libev_event_manager.hpp \ src/metrics/metrics.cpp \ src/pid.cpp \ src/process.cpp \ + src/process_reference.hpp \ src/reap.cpp \ src/subprocess.cpp \ src/synchronized.hpp \ @@ -85,7 +92,7 @@ libprocess_la_LIBADD += $(GPERFTOOLS)/libprofiler.la endif # Tests. -check_PROGRAMS = tests +check_PROGRAMS = tests benchmarks tests_SOURCES = \ src/tests/decoder_tests.cpp \ @@ -120,12 +127,29 @@ tests_LDADD = \ $(HTTP_PARSER_LIB) \ $(LIBEV_LIB) +benchmarks_SOURCES = \ + src/tests/benchmarks.cpp + +benchmarks_CPPFLAGS = \ + -I$(top_srcdir)/src \ + -I$(GTEST)/include \ + -I$(GMOCK)/include \ + $(libprocess_la_CPPFLAGS) + +benchmarks_LDADD = \ + 3rdparty/libgmock.la \ + libprocess.la \ + $(LIBGLOG) \ + $(HTTP_PARSER_LIB) \ + $(LIBEV_LIB) + # We use a check-local target for now to avoid the parallel test # runner that ships with newer versions of autotools. # See the following discussion for the workaround: # http://lists.gnu.org/archive/html/automake/2013-01/msg00051.html -check-local: tests +check-local: tests benchmarks ./tests + ./benchmarks # TODO(benh): Fix shared builds (tests need libglog, libev, etc). diff --git a/3rdparty/libprocess/include/process/event.hpp b/3rdparty/libprocess/include/process/event.hpp index bf689d7270d..b5428ec6534 100644 --- a/3rdparty/libprocess/include/process/event.hpp +++ b/3rdparty/libprocess/include/process/event.hpp @@ -102,8 +102,8 @@ struct MessageEvent : Event struct HttpEvent : Event { - HttpEvent(const Socket& _socket, http::Request* _request) - : socket(_socket), request(_request) {} + HttpEvent(const ConnectionHandle& _connection_handle, http::Request* _request) + : connection_handle(_connection_handle), request(_request) {} virtual ~HttpEvent() { @@ -115,7 +115,7 @@ struct HttpEvent : Event visitor->visit(*this); } - const Socket socket; + const ConnectionHandle connection_handle; http::Request* const request; private: diff --git a/3rdparty/libprocess/include/process/process.hpp b/3rdparty/libprocess/include/process/process.hpp index 270ca28d034..b67af1549e5 100644 --- a/3rdparty/libprocess/include/process/process.hpp +++ b/3rdparty/libprocess/include/process/process.hpp @@ -185,7 +185,7 @@ class ProcessBase : public EventVisitor } private: - friend class SocketManager; + friend class EventManager; friend class ProcessManager; friend class ProcessReference; friend void* schedule(void*); diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp index dbcb4f4c2eb..56985d3d463 100644 --- a/3rdparty/libprocess/include/process/socket.hpp +++ b/3rdparty/libprocess/include/process/socket.hpp @@ -34,76 +34,27 @@ inline Try socket(int family, int type, int protocol) { } -// An abstraction around a socket (file descriptor) that provides -// reference counting such that the socket is only closed (and thus, -// has the possiblity of being reused) after there are no more -// references. - -class Socket -{ +class Connection { public: - Socket() - : refs(new int(1)), s(-1) {} - - explicit Socket(int _s) - : refs(new int(1)), s(_s) {} - - ~Socket() - { - cleanup(); - } + typedef int id_t; + virtual ~Connection() {} - Socket(const Socket& that) + inline operator const id_t&() const { - copy(that); + return id; } - Socket& operator = (const Socket& that) - { - if (this != &that) { - cleanup(); - copy(that); - } - return *this; - } - - bool operator == (const Socket& that) const - { - return s == that.s && refs == that.refs; - } +protected: + Connection(const id_t& _id) : id(_id) {} - operator int () const - { - return s; - } + /* This is good enough for now. In the future we might not want to + * limit ourselves to representing the identifier as an int (fd). */ + const id_t id; +}; -private: - void copy(const Socket& that) - { - assert(that.refs > 0); - __sync_fetch_and_add(that.refs, 1); - refs = that.refs; - s = that.s; - } - void cleanup() - { - assert(refs != NULL); - if (__sync_sub_and_fetch(refs, 1) == 0) { - delete refs; - if (s >= 0) { - Try close = os::close(s); - if (close.isError()) { - std::cerr << "Failed to close socket: " << close.error() << std::endl; - abort(); - } - } - } - } +typedef std::shared_ptr ConnectionHandle; - int* refs; - int s; -}; } // namespace process { diff --git a/3rdparty/libprocess/src/decoder.hpp b/3rdparty/libprocess/src/decoder.hpp index b66f378b4ec..84d78a7b8cf 100644 --- a/3rdparty/libprocess/src/decoder.hpp +++ b/3rdparty/libprocess/src/decoder.hpp @@ -23,8 +23,8 @@ namespace process { class DataDecoder { public: - explicit DataDecoder(const Socket& _s) - : s(_s), failure(false), request(NULL) + explicit DataDecoder(const ConnectionHandle& _conn_handle) + : conn_handle(_conn_handle), failure(false), request(NULL) { settings.on_message_begin = &DataDecoder::on_message_begin; settings.on_header_field = &DataDecoder::on_header_field; @@ -67,9 +67,9 @@ class DataDecoder return failure; } - Socket socket() const + const ConnectionHandle& connection_handle() const { - return s; + return conn_handle; } private: @@ -240,7 +240,8 @@ class DataDecoder return 0; } - const Socket s; // The socket this decoder is associated with. + // The connection handle this decoder is associated with. + const ConnectionHandle conn_handle; bool failure; diff --git a/3rdparty/libprocess/src/encoder.hpp b/3rdparty/libprocess/src/encoder.hpp index 9c5aa8134c1..8919b481e06 100644 --- a/3rdparty/libprocess/src/encoder.hpp +++ b/3rdparty/libprocess/src/encoder.hpp @@ -15,49 +15,49 @@ #include #include -// NOTE: We forward declare "ev_loop" and "ev_io" here because, -// on OSX, including "ev.h" causes conflict with "EV_ERROR" declared -// in "/usr/include/sys/event.h". -struct ev_loop; -struct ev_io; - namespace process { const uint32_t GZIP_MINIMUM_BODY_LENGTH = 1024; -typedef void (*Sender)(struct ev_loop*, ev_io*, int); - -extern void send_data(struct ev_loop*, ev_io*, int); -extern void send_file(struct ev_loop*, ev_io*, int); - class Encoder { public: - explicit Encoder(const Socket& _s) : s(_s) {} + enum IOKind + { + send_data, + send_file + }; + + explicit Encoder(const ConnectionHandle& _connection_handle) + : conn_handle(_connection_handle) {} virtual ~Encoder() {} - virtual Sender sender() = 0; + virtual IOKind io_kind() = 0; - Socket socket() const + const ConnectionHandle& connection_handle() const { - return s; + return conn_handle; } private: - const Socket s; // The socket this encoder is associated with. + + // The connection handle this encoder is associated with. + const ConnectionHandle conn_handle; }; class DataEncoder : public Encoder { public: - DataEncoder(const Socket& s, const std::string& _data) - : Encoder(s), data(_data), index(0) {} + DataEncoder( + const ConnectionHandle& _connection_handle, + const std::string& _data) + : Encoder(_connection_handle), data(_data), index(0) {} virtual ~DataEncoder() {} - virtual Sender sender() + virtual IOKind io_kind() { return send_data; } @@ -91,8 +91,8 @@ class DataEncoder : public Encoder class MessageEncoder : public DataEncoder { public: - MessageEncoder(const Socket& s, Message* _message) - : DataEncoder(s, encode(_message)), message(_message) {} + MessageEncoder(const ConnectionHandle& _connection_handle, Message* _message) + : DataEncoder(_connection_handle, encode(_message)), message(_message) {} virtual ~MessageEncoder() { @@ -143,10 +143,10 @@ class HttpResponseEncoder : public DataEncoder { public: HttpResponseEncoder( - const Socket& s, + const ConnectionHandle& _connection_handle, const http::Response& response, const http::Request& request) - : DataEncoder(s, encode(response, request)) {} + : DataEncoder(_connection_handle, encode(response, request)) {} static std::string encode( const http::Response& response, @@ -227,15 +227,15 @@ class HttpResponseEncoder : public DataEncoder class FileEncoder : public Encoder { public: - FileEncoder(const Socket& s, int _fd, size_t _size) - : Encoder(s), fd(_fd), size(_size), index(0) {} + FileEncoder(const ConnectionHandle& _connection_handle, int _fd, size_t _size) + : Encoder(_connection_handle), fd(_fd), size(_size), index(0) {} virtual ~FileEncoder() { os::close(fd); } - virtual Sender sender() + virtual IOKind io_kind() { return send_file; } diff --git a/3rdparty/libprocess/src/event_manager.hpp b/3rdparty/libprocess/src/event_manager.hpp new file mode 100644 index 00000000000..b30f8a6a162 --- /dev/null +++ b/3rdparty/libprocess/src/event_manager.hpp @@ -0,0 +1,261 @@ +#ifndef EVENT_MANAGER_HPP +#define EVENT_MANAGER_HPP + +#include + +#include "event_manager_base.hpp" +#include "http_proxy.hpp" +#include "process_reference.hpp" +#include "synchronized.hpp" + +namespace process { + +class EventManager : public internal::EventManager +{ +public: + + /* A dependency injection of ProcessManager. This exposes just the + * behavior that the EventManager cares about: + * - The ability to talk about a Process. + * - The ability to dispatch a request to be run. */ + class ProcessManager + { + protected: + ProcessManager() {} + + public: + virtual ~ProcessManager() {} + + /* Return a reference counted handle to the given process. */ + virtual ProcessReference use(const UPID& pid) = 0; + + /* Handle the given request for this socket. */ + virtual bool handle( + const ConnectionHandle& connection_handle, + http::Request* request) = 0; + }; + + virtual ~EventManager() {} + + static inline Try new_connection( + uint32_t ip, + uint16_t port, + int protocol); + + static inline Future conn_poll( + const ConnectionHandle& conn_handle, + short events); + + static inline Future conn_read_data( + const ConnectionHandle& conn_handle, + void* data, + size_t size); + + static inline Future conn_read( + const ConnectionHandle& conn_handle); + + static inline Future conn_write( + const ConnectionHandle& conn_handle, + const std::string& data); + + static inline Future conn_write( + const ConnectionHandle& conn_handle, + void* data, + size_t size); + + /* Forward the enqueue call from a more derived class. This is a proxy call + * as ProcessBase can not friend all derived versions of this class. They may + * not be known at compile time. */ + inline void enqueue(ProcessBase* proc_base, + Event* event, + bool inject = false); + + /* Return the pid from the given process. This is a proxy call similar to the + * above. */ + inline const UPID &get_pid(ProcessBase* process) const; + + /* A hook for initializing required state to run this manager. For + * example initializing event loops. */ + virtual void initialize() = 0; + + /* Functions from original SocketManager. These are used by + * ProcessManager to implement delivery of messages. */ + + /* Establish a persistent connection between the given process and + * the process represented by the UPID to. This is one way + * connection from process -> to. See process::link() for more + * details. */ + virtual void link(ProcessBase* process, const UPID& to) = 0; + + /* Return a handle to the HttpProxy representing the connection on + * the given connection handle. */ + virtual PID proxy(const ConnectionHandle& connection_handle) = 0; + + /* Send the given message to the remote host identified in the + * message. */ + virtual void send(Message* message) = 0; + + /* Created exited events for linked processes. See usage in + * ProcessManager */ + virtual void exited(ProcessBase* process) = 0; + + /* Functions related to timer behavior. This behavior is usually + * associated with io event managers as they can block indefinitely + * for IO, and timers are used to set time-outs on waiting. */ + + // Return the current time. + virtual double get_time() const = 0; + + // Return true if there are pending timers that need to be executed. + virtual bool has_pending_timers() const = 0; + + // Update the timer on async interrupt. + virtual void update_timer() = 0; + + /* Update the timer on async interrupt iff it is not already set to + * do so. */ + virtual void try_update_timer() = 0; + + /* Functions coming from process/io.hpp. The following are pure + * virtuals that provide a nice hook for implementing each of + * these functions in a clean way. */ + + // see process/io.hpp + virtual Future poll(int fd, short events) = 0; + + // see process/io.hpp + virtual Future read(int fd, void* data, size_t size) = 0; + + // see process/io.hpp + virtual Future read(int fd) = 0; + + // see process/io.hpp + virtual Future write(int fd, void* data, size_t size) = 0; + + // see process/io.hpp + virtual Future write(int fd, const std::string& data) = 0; + + // see process/io.hpp + virtual Future redirect(int from, Option to, size_t chunk) = 0; + + virtual Future do_poll( + const ConnectionHandle& conn_handle, + short events) = 0; + + virtual Future do_read( + const ConnectionHandle& conn_handle, + void* data, + size_t size) = 0; + + virtual Future do_read(const ConnectionHandle& conn_handle) = 0; + + virtual Future do_write( + const ConnectionHandle& conn_handle, + const std::string& data) = 0; + + virtual Future do_write( + const ConnectionHandle& conn_handle, + void* data, + size_t size) = 0; + +protected: + EventManager(); + + /* Construct a new connection and return a handle to it. This is a + TCP connection. */ + virtual Try make_new_connection( + uint32_t ip, + uint16_t port, + int protocol) = 0; + +private: + static EventManager* singleton; +}; + +inline EventManager::EventManager() +{ + CHECK(!singleton) << "Can not instantiate multiple event managers"; + singleton = this; +} + +inline Try EventManager::new_connection( + uint32_t ip, + uint16_t port, + int protocol) +{ + CHECK(singleton) << "new_connection requires an initialized EventManager"; + return singleton->make_new_connection(ip, port, protocol); +} + +inline Future EventManager::conn_poll( + const ConnectionHandle& conn_handle, + short events) +{ + CHECK(singleton) << "poll requires an initialized EventManager"; + return singleton->do_poll(conn_handle, events); +} + +inline Future EventManager::conn_read_data( + const ConnectionHandle& conn_handle, + void* data, + size_t size) +{ + CHECK(singleton) << "read requires an initialized EventManager"; + return singleton->do_read(conn_handle, data, size); +} + +inline Future EventManager::conn_read( + const ConnectionHandle& conn_handle) +{ + CHECK(singleton) << "read requires an initialized EventManager"; + return singleton->do_read(conn_handle); +} + +inline Future EventManager::conn_write( + const ConnectionHandle& conn_handle, + const std::string& data) +{ + CHECK(singleton) << "write requires an initialized EventManager"; + return singleton->do_write(conn_handle, data); +} + +inline Future EventManager::conn_write( + const ConnectionHandle& conn_handle, + void* data, + size_t size) +{ + CHECK(singleton) << "write requires an initialized EventManager"; + return singleton->do_write(conn_handle, data, size); +} + +inline void EventManager::enqueue( + ProcessBase* proc_base, + Event* event, + bool inject) +{ + proc_base->enqueue(event, inject); +} + +inline const UPID &EventManager::get_pid(ProcessBase* process) const +{ + return process->pid; +} + +extern std::map >* timeouts; +extern synchronizable(timeouts); + +// Unique id that can be assigned to each process. +extern uint32_t __id__; + +// Local server socket. +extern int __s__; + +// Local IP address. +extern uint32_t __ip__; + +// Local port. +extern uint16_t __port__; + +} // namespace process { + +#endif // EVENT_MANAGER_HPP diff --git a/3rdparty/libprocess/src/event_manager_base.hpp b/3rdparty/libprocess/src/event_manager_base.hpp new file mode 100644 index 00000000000..5ad810dd28f --- /dev/null +++ b/3rdparty/libprocess/src/event_manager_base.hpp @@ -0,0 +1,31 @@ +#ifndef EVENT_MANAGER_BASE_HPP +#define EVENT_MANAGER_BASE_HPP + +#include "encoder.hpp" + +namespace process { + +namespace internal { + +class EventManager +{ +public: + virtual ~EventManager() {} + + virtual void send(Encoder* encoder, bool persist) = 0; + + virtual void send( + const http::Response& response, + const http::Request& request, + const ConnectionHandle& connection_handle) = 0; + +protected: + EventManager() {} + +}; + +} // namespace internal { + +} // namespace process { + +#endif // EVENT_MANAGER_BASE_HPP diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp index 4ef00d11de4..08f04e8f36f 100644 --- a/3rdparty/libprocess/src/http.cpp +++ b/3rdparty/libprocess/src/http.cpp @@ -18,6 +18,7 @@ #include #include "decoder.hpp" +#include "event_manager.hpp" using std::deque; using std::string; @@ -65,20 +66,6 @@ Future request( const Option& body, const Option& contentType) { - Try socket = process::socket(AF_INET, SOCK_STREAM, IPPROTO_IP); - - if (socket.isError()) { - return Failure("Failed to create socket: " + socket.error()); - } - - int s = socket.get(); - - Try cloexec = os::cloexec(s); - if (!cloexec.isSome()) { - os::close(s); - return Failure("Failed to cloexec: " + cloexec.error()); - } - // We use inet_ntop since inet_ntoa is not thread-safe! char ip[INET_ADDRSTRLEN]; if (inet_ntop(AF_INET, (in_addr*) &upid.ip, ip, INET_ADDRSTRLEN) == NULL) { @@ -88,16 +75,14 @@ Future request( const string host = string(ip) + ":" + stringify(upid.port); - sockaddr_in addr; - memset(&addr, 0, sizeof(addr)); - addr.sin_family = AF_INET; - addr.sin_port = htons(upid.port); - addr.sin_addr.s_addr = upid.ip; - - if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) { - os::close(s); - return Failure(ErrnoError("Failed to connect to '" + host + "'")); + Try conn_handle = EventManager::new_connection( + upid.ip, + upid.port, + IPPROTO_IP); + if (conn_handle.isError()) { + return Failure("Failed to create Http connection: " + conn_handle.error()); } + ConnectionHandle connection_handle = conn_handle.get(); std::ostringstream out; @@ -147,19 +132,9 @@ Future request( out << body.get(); } - Try nonblock = os::nonblock(s); - if (!nonblock.isSome()) { - os::close(s); - return Failure("Failed to set nonblock: " + nonblock.error()); - } - - // Need to disambiguate the io::read we want when binding below. - Future (*read)(int) = io::read; - - return io::write(s, out.str()) - .then(lambda::bind(read, s)) - .then(lambda::bind(&internal::decode, lambda::_1)) - .onAny(lambda::bind(&os::close, s)); + return EventManager::conn_write(connection_handle, out.str()) + .then(lambda::bind(&EventManager::conn_read, connection_handle)) + .then(lambda::bind(&internal::decode, lambda::_1)); } diff --git a/3rdparty/libprocess/src/http_proxy.cpp b/3rdparty/libprocess/src/http_proxy.cpp new file mode 100644 index 00000000000..a8321c226be --- /dev/null +++ b/3rdparty/libprocess/src/http_proxy.cpp @@ -0,0 +1,264 @@ +#include + +#include + +#include "http_proxy.hpp" + +using process::http::InternalServerError; +using process::http::NotFound; +using process::http::Request; +using process::http::Response; +using process::http::ServiceUnavailable; + +using std::string; +using std::stringstream; + +namespace process { + +HttpProxy::HttpProxy( + const ConnectionHandle& _connection_handle, + internal::EventManager* _ev_man) + : ProcessBase(ID::generate("__http__")), + connection_handle(_connection_handle), + ev_man(_ev_man) {} + + +HttpProxy::~HttpProxy() +{ + // Need to make sure response producers know not to continue to + // create a response (streaming or otherwise). + if (pipe.isSome()) { + os::close(pipe.get()); + } + pipe = None(); + + while (!items.empty()) { + Item* item = items.front(); + + // Attempt to discard the future. + item->future->discard(); + + // But it might have already been ready. In general, we need to + // wait until this future is potentially ready in order to attempt + // to close a pipe if one exists. + item->future->onReady(lambda::bind(&Item::cleanup, lambda::_1)); + + items.pop(); + delete item; + } +} + + +void HttpProxy::enqueue(const Response& response, const Request& request) +{ + handle(new Future(response), request); +} + + +void HttpProxy::handle(Future* future, const Request& request) +{ + items.push(new Item(request, future)); + + if (items.size() == 1) { + next(); + } +} + + +void HttpProxy::next() +{ + if (items.size() > 0) { + // Wait for any transition of the future. + items.front()->future->onAny( + defer(self(), &HttpProxy::waited, lambda::_1)); + } +} + + +void HttpProxy::waited(const Future& future) +{ + CHECK(items.size() > 0); + Item* item = items.front(); + + CHECK(future == *item->future); + + // Process the item and determine if we're done or not (so we know + // whether to start waiting on the next responses). + bool processed = process(*item->future, item->request); + + items.pop(); + delete item; + + if (processed) { + next(); + } +} + + +bool HttpProxy::process(const Future& future, const Request& request) +{ + if (!future.isReady()) { + // TODO(benh): Consider handling other "states" of future + // (discarded, failed, etc) with different HTTP statuses. + ev_man->send(ServiceUnavailable(), request, connection_handle); + return true; // All done, can process next response. + } + + Response response = future.get(); + + // If the response specifies a path, try and perform a sendfile. + if (response.type == Response::PATH) { + // Make sure no body is sent (this is really an error and + // should be reported and no response sent. + response.body.clear(); + + const string& path = response.path; + int fd = open(path.c_str(), O_RDONLY); + if (fd < 0) { + if (errno == ENOENT || errno == ENOTDIR) { + VLOG(1) << "Returning '404 Not Found' for path '" << path << "'"; + ev_man->send(NotFound(), request, connection_handle); + } else { + const char* error = strerror(errno); + VLOG(1) << "Failed to send file at '" << path << "': " << error; + ev_man->send(InternalServerError(), request, connection_handle); + } + } else { + struct stat s; // Need 'struct' because of function named 'stat'. + if (fstat(fd, &s) != 0) { + const char* error = strerror(errno); + VLOG(1) << "Failed to send file at '" << path << "': " << error; + ev_man->send(InternalServerError(), request, connection_handle); + } else if (S_ISDIR(s.st_mode)) { + VLOG(1) << "Returning '404 Not Found' for directory '" << path << "'"; + ev_man->send(NotFound(), request, connection_handle); + } else { + // While the user is expected to properly set a 'Content-Type' + // header, we fill in (or overwrite) 'Content-Length' header. + stringstream out; + out << s.st_size; + response.headers["Content-Length"] = out.str(); + + if (s.st_size == 0) { + ev_man->send(response, request, connection_handle); + return true; // All done, can process next request. + } + + VLOG(1) << "Sending file at '" << path << "' with length " << s.st_size; + + // TODO(benh): Consider a way to have the socket manager turn + // on TCP_CORK for both sends and then turn it off. + ev_man->send( + new HttpResponseEncoder(connection_handle, response, request), + true); + + // Note the file descriptor gets closed by FileEncoder. + ev_man->send( + new FileEncoder(connection_handle, fd, s.st_size), + request.keepAlive); + } + } + } else if (response.type == Response::PIPE) { + // Make sure no body is sent (this is really an error and + // should be reported and no response sent. + response.body.clear(); + + // Make sure the pipe is nonblocking. + Try nonblock = os::nonblock(response.pipe); + if (nonblock.isError()) { + const char* error = strerror(errno); + VLOG(1) << "Failed make pipe nonblocking: " << error; + ev_man->send(InternalServerError(), request, connection_handle); + return true; // All done, can process next response. + } + + // While the user is expected to properly set a 'Content-Type' + // header, we fill in (or overwrite) 'Transfer-Encoding' header. + response.headers["Transfer-Encoding"] = "chunked"; + + VLOG(1) << "Starting \"chunked\" streaming"; + + ev_man->send( + new HttpResponseEncoder(connection_handle, response, request), + true); + + pipe = response.pipe; + + io::poll(pipe.get(), io::READ).onAny( + defer(self(), &Self::stream, lambda::_1, request)); + + return false; // Streaming, don't process next response (yet)! + } else { + ev_man->send(response, request, connection_handle); + } + + return true; // All done, can process next response. +} + + +void HttpProxy::stream(const Future& poll, const Request& request) +{ + // TODO(benh): Use 'splice' on Linux. + + CHECK(pipe.isSome()); + + bool finished = false; // Whether we're done streaming. + + if (poll.isReady()) { + // Read and write. + CHECK(poll.get() == io::READ); + const size_t size = 4 * 1024; // 4K. + char data[size]; + while (!finished) { + ssize_t length = ::read(pipe.get(), data, size); + if (length < 0 && (errno == EINTR)) { + // Interrupted, try again now. + continue; + } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { + // Might block, try again later. + io::poll(pipe.get(), io::READ).onAny( + defer(self(), &Self::stream, lambda::_1, request)); + break; + } else { + std::ostringstream out; + if (length <= 0) { + // Error or closed, treat both as closed. + if (length < 0) { + // Error. + const char* error = strerror(errno); + VLOG(1) << "Read error while streaming: " << error; + } + out << "0\r\n" << "\r\n"; + finished = true; + } else { + // Data! + out << std::hex << length << "\r\n"; + out.write(data, length); + out << "\r\n"; + } + + // We always persist the connection when we're not finished + // streaming. + ev_man->send( + new DataEncoder(connection_handle, out.str()), + finished ? request.keepAlive : true); + } + } + } else if (poll.isFailed()) { + VLOG(1) << "Failed to poll: " << poll.failure(); + ev_man->send(InternalServerError(), request, connection_handle); + finished = true; + } else { + VLOG(1) << "Unexpected discarded future while polling"; + ev_man->send(InternalServerError(), request, connection_handle); + finished = true; + } + + if (finished) { + os::close(pipe.get()); + pipe = None(); + next(); + } +} + +} // namespace process { \ No newline at end of file diff --git a/3rdparty/libprocess/src/http_proxy.hpp b/3rdparty/libprocess/src/http_proxy.hpp new file mode 100644 index 00000000000..61554676b90 --- /dev/null +++ b/3rdparty/libprocess/src/http_proxy.hpp @@ -0,0 +1,90 @@ +#ifndef HTTP_PROXY_HPP +#define HTTP_PROXY_HPP + +#include + +#include "event_manager_base.hpp" + +namespace process { + +// Provides a process that manages sending HTTP responses so as to +// satisfy HTTP/1.1 pipelining. Each request should either enqueue a +// response, or ask the proxy to handle a future response. The process +// is responsible for making sure the responses are sent in the same +// order as the requests. Note that we use a 'Socket' in order to keep +// the underyling file descriptor from getting closed while there +// might still be outstanding responses even though the client might +// have closed the connection (see more discussion in +// SocketManger::close and SocketManager::proxy). +class HttpProxy : public Process +{ +public: + explicit HttpProxy( + const ConnectionHandle& _connection_handle, + internal::EventManager* _ev_man); + virtual ~HttpProxy(); + + // Enqueues the response to be sent once all previously enqueued + // responses have been processed (e.g., waited for and sent). + void enqueue(const http::Response& response, const http::Request& request); + + // Enqueues a future to a response that will get waited on (up to + // some timeout) and then sent once all previously enqueued + // responses have been processed (e.g., waited for and sent). + void handle(Future* future, const http::Request& request); + +private: + // Starts "waiting" on the next available future response. + void next(); + + // Invoked once a future response has been satisfied. + void waited(const Future& future); + + // Demuxes and handles a response. + bool process( + const Future& future, + const http::Request& request); + + // Handles stream (i.e., pipe) based responses. + void stream(const Future& poll, const http::Request& request); + + // Copy the handle to keep the connection from getting destroyed. + const ConnectionHandle connection_handle; + + internal::EventManager* ev_man; + + // Describes a queue "item" that wraps the future to the response + // and the original request. + // The original request contains needed information such as what encodings + // are acceptable and whether to persist the connection. + struct Item + { + Item(const http::Request& _request, Future* _future) + : request(_request), future(_future) {} + + ~Item() + { + delete future; + } + + // Helper for cleaning up a response (i.e., closing any open pipes + // in the event Response::type is PIPE). + static void cleanup(const http::Response& response) + { + if (response.type == http::Response::PIPE) { + os::close(response.pipe); + } + } + + const http::Request request; // Make a copy. + Future* future; + }; + + std::queue items; + + Option pipe; // Current pipe, if streaming. +}; + +} // namespace process { + +#endif // HTTP_PROXY_HPP diff --git a/3rdparty/libprocess/src/libev_event_manager.cpp b/3rdparty/libprocess/src/libev_event_manager.cpp new file mode 100644 index 00000000000..3a0eee39e42 --- /dev/null +++ b/3rdparty/libprocess/src/libev_event_manager.cpp @@ -0,0 +1,2089 @@ +#include +#include +#include +#include + +#include +#include +#include +#include + +#include + +#include +#include +#include + +#include "decoder.hpp" +#include "libev_event_manager.hpp" + +using process::http::Request; +using process::http::Response; + +using std::deque; +using std::list; +using std::map; +using std::queue; +using std::set; +using std::string; + +namespace process { + +class LibevConnection : public Connection { +public: + LibevConnection(const Connection::id_t& fd) : Connection(fd) {} + virtual ~LibevConnection() + { + if (id >= 0) { + Try close = os::close(id); + if (close.isError()) { + std::cerr << "Failed to close socket: " << close.error() << std::endl; + abort(); + } + } + } + +}; + +class LibevEventManager : public EventManager +{ +public: + LibevEventManager(EventManager::ProcessManager* _proc_man); + + virtual ~LibevEventManager() {} + + virtual void initialize() override; + + virtual Try make_new_connection( + uint32_t ip, + uint16_t port, + int protocol) override; + + virtual Future do_poll( + const ConnectionHandle& conn_handle, + short events) override; + + virtual Future do_read( + const ConnectionHandle& conn_handle, + void* data, + size_t size) override; + + virtual Future do_read( + const ConnectionHandle& conn_handle) override; + + virtual Future do_write( + const ConnectionHandle& conn_handle, + const std::string& data) override; + + virtual Future do_write( + const ConnectionHandle& conn_handle, + void* data, + size_t size) override; + + virtual double get_time() const override; + + ConnectionHandle accepted(int s); + + virtual void link(ProcessBase* process, const UPID& to) override; + + virtual PID proxy( + const ConnectionHandle& connection_handle) override; + + virtual void send(Encoder* encoder, bool persist) override; + virtual void send(const Response& response, + const Request& request, + const ConnectionHandle& connection_handle) override; + virtual void send(Message* message) override; + + Encoder* next(int s); + + void close(int s); + + void exited(const Node& node); + virtual void exited(ProcessBase* process) override; + + virtual bool has_pending_timers() const override; + + virtual void update_timer() override; + + virtual void try_update_timer() override; + + virtual Future poll(int fd, short events) override; + + virtual Future read(int fd, void* data, size_t size) override; + + virtual Future read(int fd) override; + + virtual Future write(int fd, void* data, size_t size) override; + + virtual Future write(int fd, const std::string& data) override; + + virtual Future redirect( + int from, + Option to, + size_t chunk) override; + + /* A pointer to the dependency injection base class of ProcessManager. This + can be publicly accessible since we're in a PIMPL pattern and using this + pointer in libev function callbacks. */ + EventManager::ProcessManager* proc_man; + +private: + // Map from UPID (local/remote) to process. + map > links; + + // Collection of all active connection handles. + map connection_handles; + + // Collection of sockets that should be disposed when they are + // finished being used (e.g., when there is no more data to send on + // them). + set dispose; + + // Map from socket to node (ip, port). + map nodes; + + // Maps from node (ip, port) to temporary sockets (i.e., they will + // get closed once there is no more data to send on them). + map temps; + + // Maps from node (ip, port) to persistent sockets (i.e., they will + // remain open even if there is no more data to send on them). We + // distinguish these from the 'temps' collection so we can tell when + // a persistant socket has been lost (and thus generate + // ExitedEvents). + map persists; + + // Map from socket to outgoing queue. + map > outgoing; + + // HTTP proxies. + map proxies; + + // Protects instance variables. + synchronizable(this); + +}; + +static LibevEventManager* LibevMan; + +EventManager* GetLibevEventManager( + EventManager::ProcessManager* proc_man) +{ + return LibevMan ? LibevMan : (LibevMan = new LibevEventManager(proc_man)); +} + +// Event loop. +static struct ev_loop* loop = NULL; + +// Asynchronous watcher for interrupting loop. +static ev_async async_watcher; + +// Watcher for timeouts. +static ev_timer timeouts_watcher; + +// Server watcher for accepting connections. +static ev_io server_watcher; + +// Queue of functions to be invoked asynchronously within the vent +// loop (protected by 'watchers' below). +static queue >* functions = + new queue >(); + +// Queue of I/O watchers. +// (protected by 'watchers' below). +// TODO(benh): Replace this queue with functions that we put in +// 'functions' below that perform the ev_io_start themselves. +static queue* watchers = new queue(); +static synchronizable(watchers) = SYNCHRONIZED_INITIALIZER; + +// Flag to indicate whether or to update the timer on async interrupt. +static bool update_timer_flag = false; + +// For supporting Clock::settle(), true if timers have been removed +// from 'timeouts' but may not have been executed yet. Protected by +// the timeouts lock. This is only used when the clock is paused. +static bool pending_timers = false; + +typedef void (*Sender)(struct ev_loop*, ev_io*, int); + +void send_data(struct ev_loop*, ev_io*, int); +void send_file(struct ev_loop*, ev_io*, int); + +Sender get_send_function(Encoder::IOKind kind) { + switch (kind) { + case Encoder::send_data: { + return send_data; + } + case Encoder::send_file: { + return send_file; + } + default: { + std::cerr << "Unhandled Encoder IOKind" << std::endl; + abort(); + } + } +} + +// Wrapper around function we want to run in the event loop. +template +void _run_in_event_loop( + const lambda::function(void)>& f, + const Owned >& promise) +{ + // Don't bother running the function if the future has been discarded. + if (promise->future().hasDiscard()) { + promise->discard(); + } else { + promise->set(f()); + } +} + + +// Helper for running a function in the event loop. +template +Future run_in_event_loop(const lambda::function(void)>& f) +{ + Owned > promise(new Promise()); + + Future future = promise->future(); + + // Enqueue the function. + synchronized (watchers) { + functions->push(lambda::bind(&_run_in_event_loop, f, promise)); + } + + // Interrupt the loop. + ev_async_send(loop, &async_watcher); + + return future; +} + +void* serve(void* arg) +{ + ev_loop(((struct ev_loop*) arg), 0); + + return NULL; +} + +void handle_async(struct ev_loop* loop, ev_async* _, int revents) +{ + synchronized (watchers) { + // Start all the new I/O watchers. + while (!watchers->empty()) { + ev_io* watcher = watchers->front(); + watchers->pop(); + ev_io_start(loop, watcher); + } + + while (!functions->empty()) { + (functions->front())(); + functions->pop(); + } + } + + synchronized (timeouts) { + if (update_timer_flag) { + if (!timeouts->empty()) { + // Determine when the next timer should fire. + timeouts_watcher.repeat = + (timeouts->begin()->first - Clock::now()).secs(); + + if (timeouts_watcher.repeat <= 0) { + // Feed the event now! + timeouts_watcher.repeat = 0; + ev_timer_again(loop, &timeouts_watcher); + ev_feed_event(loop, &timeouts_watcher, EV_TIMEOUT); + } else { + // Don't fire the timer if the clock is paused since we + // don't want time to advance (instead a call to + // clock::advance() will handle the timer). + if (Clock::paused() && timeouts_watcher.repeat > 0) { + timeouts_watcher.repeat = 0; + } + + ev_timer_again(loop, &timeouts_watcher); + } + } + + update_timer_flag = false; + } + } +} + + +void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents) +{ + list timedout; + + synchronized (timeouts) { + Time now = Clock::now(); + + VLOG(3) << "Handling timeouts up to " << now; + + foreachkey (const Time& timeout, *timeouts) { + if (timeout > now) { + break; + } + + VLOG(3) << "Have timeout(s) at " << timeout; + + // Record that we have pending timers to execute so the + // Clock::settle() operation can wait until we're done. + pending_timers = true; + + foreach (const Timer& timer, (*timeouts)[timeout]) { + timedout.push_back(timer); + } + } + + // Now erase the range of timeouts that timed out. + timeouts->erase(timeouts->begin(), timeouts->upper_bound(now)); + + // Okay, so the timeout for the next timer should not have fired. + CHECK(timeouts->empty() || (timeouts->begin()->first > now)); + + // Update the timer as necessary. + if (!timeouts->empty()) { + // Determine when the next timer should fire. + timeouts_watcher.repeat = + (timeouts->begin()->first - Clock::now()).secs(); + + if (timeouts_watcher.repeat <= 0) { + // Feed the event now! + timeouts_watcher.repeat = 0; + ev_timer_again(loop, &timeouts_watcher); + ev_feed_event(loop, &timeouts_watcher, EV_TIMEOUT); + } else { + // Don't fire the timer if the clock is paused since we don't + // want time to advance (instead a call to Clock::advance() + // will handle the timer). + if (Clock::paused() && timeouts_watcher.repeat > 0) { + timeouts_watcher.repeat = 0; + } + + ev_timer_again(loop, &timeouts_watcher); + } + } + + update_timer_flag = false; // Since we might have a queued update_timer_flag. + } + + // Update current time of process (if it's present/valid). It might + // be necessary to actually add some more synchronization around + // this so that, for example, pausing and resuming the clock doesn't + // cause some processes to get thier current times updated and + // others not. Since ProcessManager::use acquires the 'processes' + // lock we had to move this out of the synchronized (timeouts) above + // since there was a deadlock with acquring 'processes' then + // 'timeouts' (reverse order) in ProcessManager::cleanup. Note that + // current time may be greater than the timeout if a local message + // was received (and happens-before kicks in). + if (Clock::paused()) { + foreach (const Timer& timer, timedout) { + if (ProcessReference process = LibevMan->proc_man->use(timer.creator())) { + Clock::update(process, timer.timeout().time()); + } + } + } + + // Invoke the timers that timed out (TODO(benh): Do this + // asynchronously so that we don't tie up the event thread!). + foreach (const Timer& timer, timedout) { + timer(); + } + + // Mark ourselves as done executing the timers since it's now safe + // for a call to Clock::settle() to check if there will be any + // future timeouts reached. + synchronized (timeouts) { + pending_timers = false; + } +} + + +void recv_data(struct ev_loop* loop, ev_io* watcher, int revents) +{ + DataDecoder* decoder = (DataDecoder*) watcher->data; + + int s = watcher->fd; + + while (true) { + const ssize_t size = 80 * 1024; + ssize_t length = 0; + + char data[size]; + + length = recv(s, data, size, 0); + + if (length < 0 && (errno == EINTR)) { + // Interrupted, try again now. + continue; + } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { + // Might block, try again later. + break; + } else if (length <= 0) { + // Socket error or closed. + if (length < 0) { + const char* error = strerror(errno); + VLOG(1) << "Socket error while receiving: " << error; + } else { + VLOG(2) << "Socket closed while receiving"; + } + LibevMan->close(s); + delete decoder; + ev_io_stop(loop, watcher); + delete watcher; + break; + } else { + CHECK(length > 0); + + // Decode as much of the data as possible into HTTP requests. + const deque& requests = decoder->decode(data, length); + + if (!requests.empty()) { + foreach (Request* request, requests) { + LibevMan->proc_man->handle(decoder->connection_handle(), request); + } + } else if (requests.empty() && decoder->failed()) { + VLOG(1) << "Decoder error while receiving"; + LibevMan->close(s); + delete decoder; + ev_io_stop(loop, watcher); + delete watcher; + break; + } + } + } +} + + +// A variant of 'recv_data' that doesn't do anything with the +// data. Used by sockets created via SocketManager::link as well as +// SocketManager::send(Message) where we don't care about the data +// received we mostly just want to know when the socket has been +// closed. +void ignore_data(struct ev_loop* loop, ev_io* watcher, int revents) +{ + ConnectionHandle* connection_handle = (ConnectionHandle*) watcher->data; + + int s = watcher->fd; + + while (true) { + const ssize_t size = 80 * 1024; + ssize_t length = 0; + + char data[size]; + + length = recv(s, data, size, 0); + + if (length < 0 && (errno == EINTR)) { + // Interrupted, try again now. + continue; + } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { + // Might block, try again later. + break; + } else if (length <= 0) { + // Socket error or closed. + if (length < 0) { + const char* error = strerror(errno); + VLOG(1) << "Socket error while receiving: " << error; + } else { + VLOG(2) << "Socket closed while receiving"; + } + LibevMan->close(s); + ev_io_stop(loop, watcher); + delete connection_handle; + delete watcher; + break; + } else { + VLOG(2) << "Ignoring " << length << " bytes of data received " + << "on socket used only for sending"; + } + } +} + + +void send_data(struct ev_loop* loop, ev_io* watcher, int revents) +{ + DataEncoder* encoder = (DataEncoder*) watcher->data; + + int s = watcher->fd; + + while (true) { + const void* data; + size_t size; + + data = encoder->next(&size); + CHECK(size > 0); + + ssize_t length = send(s, data, size, MSG_NOSIGNAL); + + if (length < 0 && (errno == EINTR)) { + // Interrupted, try again now. + encoder->backup(size); + continue; + } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { + // Might block, try again later. + encoder->backup(size); + break; + } else if (length <= 0) { + // Socket error or closed. + if (length < 0) { + const char* error = strerror(errno); + VLOG(1) << "Socket error while sending: " << error; + } else { + VLOG(1) << "Socket closed while sending"; + } + LibevMan->close(s); + delete encoder; + ev_io_stop(loop, watcher); + delete watcher; + break; + } else { + CHECK(length > 0); + + // Update the encoder with the amount sent. + encoder->backup(size - length); + + // See if there is any more of the message to send. + if (encoder->remaining() == 0) { + delete encoder; + + // Stop this watcher for now. + ev_io_stop(loop, watcher); + + // Check for more stuff to send on socket. + Encoder* next = LibevMan->next(s); + if (next != NULL) { + watcher->data = next; + ev_io_init(watcher, get_send_function(next->io_kind()), s, EV_WRITE); + ev_io_start(loop, watcher); + } else { + // Nothing more to send right now, clean up. + delete watcher; + } + break; + } + } + } +} + + +void send_file(struct ev_loop* loop, ev_io* watcher, int revents) +{ + FileEncoder* encoder = (FileEncoder*) watcher->data; + + int s = watcher->fd; + + while (true) { + int fd; + off_t offset; + size_t size; + + fd = encoder->next(&offset, &size); + CHECK(size > 0); + + ssize_t length = os::sendfile(s, fd, offset, size); + + if (length < 0 && (errno == EINTR)) { + // Interrupted, try again now. + encoder->backup(size); + continue; + } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { + // Might block, try again later. + encoder->backup(size); + break; + } else if (length <= 0) { + // Socket error or closed. + if (length < 0) { + const char* error = strerror(errno); + VLOG(1) << "Socket error while sending: " << error; + } else { + VLOG(1) << "Socket closed while sending"; + } + LibevMan->close(s); + delete encoder; + ev_io_stop(loop, watcher); + delete watcher; + break; + } else { + CHECK(length > 0); + + // Update the encoder with the amount sent. + encoder->backup(size - length); + + // See if there is any more of the message to send. + if (encoder->remaining() == 0) { + delete encoder; + + // Stop this watcher for now. + ev_io_stop(loop, watcher); + + // Check for more stuff to send on socket. + Encoder* next = LibevMan->next(s); + if (next != NULL) { + watcher->data = next; + ev_io_init(watcher, get_send_function(next->io_kind()), s, EV_WRITE); + ev_io_start(loop, watcher); + } else { + // Nothing more to send right now, clean up. + delete watcher; + } + break; + } + } + } +} + + +void sending_connect(struct ev_loop* loop, ev_io* watcher, int revents) +{ + int s = watcher->fd; + + // Now check that a successful connection was made. + int opt; + socklen_t optlen = sizeof(opt); + + if (getsockopt(s, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0 || opt != 0) { + // Connect failure. + VLOG(1) << "Socket error while connecting"; + LibevMan->close(s); + MessageEncoder* encoder = (MessageEncoder*) watcher->data; + delete encoder; + ev_io_stop(loop, watcher); + delete watcher; + } else { + // We're connected! Now let's do some sending. + ev_io_stop(loop, watcher); + ev_io_init(watcher, send_data, s, EV_WRITE); + ev_io_start(loop, watcher); + } +} + + +void receiving_connect(struct ev_loop* loop, ev_io* watcher, int revents) +{ + int s = watcher->fd; + + // Now check that a successful connection was made. + int opt; + socklen_t optlen = sizeof(opt); + + if (getsockopt(s, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0 || opt != 0) { + // Connect failure. + VLOG(1) << "Socket error while connecting"; + LibevMan->close(s); + ConnectionHandle* connection_handle = (ConnectionHandle*) watcher->data; + delete connection_handle; + ev_io_stop(loop, watcher); + delete watcher; + } else { + // We're connected! Now let's do some receiving. + ev_io_stop(loop, watcher); + ev_io_init(watcher, ignore_data, s, EV_READ); + ev_io_start(loop, watcher); + } +} + + +void accept(struct ev_loop* loop, ev_io* watcher, int revents) +{ + CHECK_EQ(__s__, watcher->fd); + + sockaddr_in addr; + socklen_t addrlen = sizeof(addr); + + int s = ::accept(__s__, (sockaddr*) &addr, &addrlen); + + if (s < 0) { + return; + } + + Try nonblock = os::nonblock(s); + if (nonblock.isError()) { + LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, nonblock: " + << nonblock.error(); + os::close(s); + return; + } + + Try cloexec = os::cloexec(s); + if (cloexec.isError()) { + LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, cloexec: " + << cloexec.error(); + os::close(s); + return; + } + + // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait. + int on = 1; + if (setsockopt(s, SOL_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) { + const char* error = strerror(errno); + VLOG(1) << "Failed to turn off the Nagle algorithm: " << error; + os::close(s); + } else { + // Inform the socket manager for proper bookkeeping. + ConnectionHandle connection_handle = LibevMan->accepted(s); + + // Allocate and initialize the decoder and watcher. + DataDecoder* decoder = new DataDecoder(connection_handle); + + ev_io* watcher = new ev_io(); + watcher->data = decoder; + + ev_io_init(watcher, recv_data, s, EV_READ); + ev_io_start(loop, watcher); + } +} + +LibevEventManager::LibevEventManager(EventManager::ProcessManager* _proc_man) + : proc_man(_proc_man) +{ + synchronizer(this) = SYNCHRONIZED_INITIALIZER_RECURSIVE; +} + +void LibevEventManager::initialize() +{ + // Setup event loop. +#ifdef __sun__ + loop = ev_default_loop(EVBACKEND_POLL | EVBACKEND_SELECT); +#else + loop = ev_default_loop(EVFLAG_AUTO); +#endif // __sun__ + + ev_async_init(&async_watcher, handle_async); + ev_async_start(loop, &async_watcher); + + ev_timer_init(&timeouts_watcher, handle_timeouts, 0., 2100000.0); + ev_timer_again(loop, &timeouts_watcher); + + ev_io_init(&server_watcher, accept, __s__, EV_READ); + ev_io_start(loop, &server_watcher); + +// ev_child_init(&child_watcher, child_exited, pid, 0); +// ev_child_start(loop, &cw); + +// /* Install signal handler. */ +// struct sigaction sa; + +// sa.sa_handler = ev_sighandler; +// sigfillset (&sa.sa_mask); +// sa.sa_flags = SA_RESTART; /* if restarting works we save one iteration */ +// sigaction (w->signum, &sa, 0); + +// sigemptyset (&sa.sa_mask); +// sigaddset (&sa.sa_mask, w->signum); +// sigprocmask (SIG_UNBLOCK, &sa.sa_mask, 0); + + pthread_t thread; // For now, not saving handles on our threads. + if (pthread_create(&thread, NULL, serve, loop) != 0) { + LOG(FATAL) << "Failed to initialize, pthread_create"; + } +} + +Try LibevEventManager::make_new_connection( + uint32_t ip, + uint16_t port, + int protocol) +{ + Try socket = process::socket(AF_INET, SOCK_STREAM, protocol); + if (socket.isError()) { + return Error("Failed to create socket: " + socket.error()); + } + + int s = socket.get(); + + Try cloexec = os::cloexec(s); + if (!cloexec.isSome()) { + os::close(s); + return Error("Failed to cloexec: " + cloexec.error()); + } + sockaddr_in addr; + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + addr.sin_addr.s_addr = ip; + if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) { + os::close(s); + return ErrnoError("Failed to connect to '" + stringify(ip) + ":" + + stringify(port) + "'"); + } + Try nonblock = os::nonblock(s); + if (!nonblock.isSome()) { + os::close(s); + return Error("Failed to set nonblock: " + nonblock.error()); + } + return std::make_shared(s); +} + +Future LibevEventManager::do_poll( + const ConnectionHandle& conn_handle, + short events) +{ + return io::poll(*conn_handle, events); +} + +Future LibevEventManager::do_read( + const ConnectionHandle& conn_handle, + void* data, + size_t size) +{ + return io::read(*conn_handle, data, size); +} + +Future LibevEventManager::do_read( + const ConnectionHandle& conn_handle) +{ + return io::read(*conn_handle); +} + +Future LibevEventManager::do_write( + const ConnectionHandle& conn_handle, + const std::string& data) +{ + return io::write(*conn_handle, data); +} + +Future LibevEventManager::do_write( + const ConnectionHandle& conn_handle, + void* data, + size_t size) +{ + return io::write(*conn_handle, data, size); +} + +double LibevEventManager::get_time() const +{ + return ev_time(); +} + +ConnectionHandle LibevEventManager::accepted(int s) +{ + synchronized (this) { + return connection_handles[s] = std::make_shared(s); + } + + return UNREACHABLE(); // Quiet the compiler. +} + + +void LibevEventManager::link(ProcessBase* process, const UPID& to) +{ + // TODO(benh): The semantics we want to support for link are such + // that if there is nobody to link to (local or remote) then an + // ExitedEvent gets generated. This functionality has only been + // implemented when the link is local, not remote. Of course, if + // there is nobody listening on the remote side, then this should + // work remotely ... but if there is someone listening remotely just + // not at that id, then it will silently continue executing. + + CHECK(process != NULL); + + Node node(to.ip, to.port); + + synchronized (this) { + // Check if node is remote and there isn't a persistant link. + if ((node.ip != __ip__ || node.port != __port__) + && persists.count(node) == 0) { + + // Okay, no link, let's create a socket. + Try connection_handle = make_new_connection( + to.ip, + to.port, + 0); + if (connection_handle.isError()) { + LOG(FATAL) << "Fail to link, connection: " << connection_handle.error(); + } + ConnectionHandle conn_handle = connection_handle.get(); + const Connection::id_t &conn_id = *conn_handle; + connection_handles[conn_id] = conn_handle; + + nodes[conn_id] = node; + + persists[node] = conn_id; + + // Allocate and initialize a watcher for reading data from this + // socket. Note that we don't expect to receive anything other + // than HTTP '202 Accepted' responses which we anyway ignore. + // We do, however, want to react when it gets closed so we can + // generate appropriate lost events (since this is a 'link'). + ev_io* watcher = new ev_io(); + watcher->data = new ConnectionHandle(conn_handle); + + // Enqueue the watcher. + synchronized (watchers) { + watchers->push(watcher); + } + + // Interrupt the loop. + ev_async_send(loop, &async_watcher); + } + + links[to].insert(process); + } +} + + +PID LibevEventManager::proxy( + const ConnectionHandle& connection_handle) +{ + HttpProxy* proxy = NULL; + + synchronized (this) { + // This socket might have been asked to get closed (e.g., remote + // side hang up) while a process is attempting to handle an HTTP + // request. Thus, if there is no more socket, return an empty PID. + const Connection::id_t &conn_id = *connection_handle; + if (connection_handles.count(conn_id) > 0) { + if (proxies.count(conn_id) > 0) { + return proxies[conn_id]->self(); + } else { + proxy = new HttpProxy(connection_handles[conn_id], this); + proxies[conn_id] = proxy; + } + } + } + + // Now check if we need to spawn a newly created proxy. Note that we + // need to do this outside of the synchronized block above to avoid + // a possible deadlock (because spawn eventually synchronizes on + // ProcessManager and ProcessManager::cleanup synchronizes on + // ProcessManager and then SocketManager, so a deadlock results if + // we do spawn within the synchronized block above). + if (proxy != NULL) { + return spawn(proxy, true); + } + + return PID(); +} + + +void LibevEventManager::send(Encoder* encoder, bool persist) +{ + CHECK(encoder != NULL); + + synchronized (this) { + const Connection::id_t &conn_id = *encoder->connection_handle(); + if (connection_handles.count(conn_id) > 0) { + // Update whether or not this socket should get disposed after + // there is no more data to send. + if (!persist) { + dispose.insert(conn_id); + } + + if (outgoing.count(conn_id) > 0) { + outgoing[conn_id].push(encoder); + } else { + // Initialize the outgoing queue. + outgoing[conn_id]; + + // Allocate and initialize the watcher. + ev_io* watcher = new ev_io(); + watcher->data = encoder; + + ev_io_init( + watcher, + get_send_function(encoder->io_kind()), + conn_id, + EV_WRITE); + + synchronized (watchers) { + watchers->push(watcher); + } + + ev_async_send(loop, &async_watcher); + } + } else { + VLOG(1) << "Attempting to send on a no longer valid socket!"; + delete encoder; + } + } +} + + +void LibevEventManager::send( + const Response& response, + const Request& request, + const ConnectionHandle& connection_handle) +{ + bool persist = request.keepAlive; + + // Don't persist the connection if the headers include + // 'Connection: close'. + if (response.headers.contains("Connection")) { + if (response.headers.get("Connection").get() == "close") { + persist = false; + } + } + + send(new HttpResponseEncoder(connection_handle, response, request), persist); +} + + +void LibevEventManager::send(Message* message) +{ + CHECK(message != NULL); + + Node node(message->to.ip, message->to.port); + + synchronized (this) { + // Check if there is already a socket. + bool persist = persists.count(node) > 0; + bool temp = temps.count(node) > 0; + if (persist || temp) { + int s = persist ? persists[node] : temps[node]; + CHECK(connection_handles.count(s) > 0); + send(new MessageEncoder(connection_handles[s], message), persist); + } else { + // No peristent or temporary socket to the node currently + // exists, so we create a temporary one. + Try socket = process::socket(AF_INET, SOCK_STREAM, 0); + if (socket.isError()) { + LOG(FATAL) << "Failed to send, socket: " << socket.error(); + } + + int s = socket.get(); + + Try nonblock = os::nonblock(s); + if (nonblock.isError()) { + LOG(FATAL) << "Failed to send, nonblock: " << nonblock.error(); + } + + Try cloexec = os::cloexec(s); + if (cloexec.isError()) { + LOG(FATAL) << "Failed to send, cloexec: " << cloexec.error(); + } + + connection_handles[s] = std::make_shared(s); + nodes[s] = node; + temps[node] = s; + + dispose.insert(s); + + // Initialize the outgoing queue. + outgoing[s]; + + // Allocate and initialize a watcher for reading data from this + // socket. Note that we don't expect to receive anything other + // than HTTP '202 Accepted' responses which we anyway ignore. + ev_io* watcher = new ev_io(); + watcher->data = new ConnectionHandle(connection_handles[s]); + + ev_io_init(watcher, ignore_data, s, EV_READ); + + // Enqueue the watcher. + synchronized (watchers) { + watchers->push(watcher); + } + + // Allocate and initialize a watcher for sending the message. + watcher = new ev_io(); + watcher->data = new MessageEncoder(connection_handles[s], message); + + // Try and connect to the node using this socket. + sockaddr_in addr; + memset(&addr, 0, sizeof(addr)); + addr.sin_family = PF_INET; + addr.sin_port = htons(message->to.port); + addr.sin_addr.s_addr = message->to.ip; + + if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) { + if (errno != EINPROGRESS) { + PLOG(FATAL) << "Failed to send, connect"; + } + + // Initialize watcher for connecting. + ev_io_init(watcher, sending_connect, s, EV_WRITE); + } else { + // Initialize watcher for sending. + ev_io_init(watcher, send_data, s, EV_WRITE); + } + + // Enqueue the watcher. + synchronized (watchers) { + watchers->push(watcher); + } + + ev_async_send(loop, &async_watcher); + } + } +} + + +Encoder* LibevEventManager::next(int s) +{ + HttpProxy* proxy = NULL; // Non-null if needs to be terminated. + + synchronized (this) { + // We cannot assume 'sockets.count(s) > 0' here because it's + // possible that 's' has been removed with a a call to + // SocketManager::close. For example, it could be the case that a + // socket has gone to CLOSE_WAIT and the call to 'recv' in + // recv_data returned 0 causing SocketManager::close to get + // invoked. Later a call to 'send' or 'sendfile' (e.g., in + // send_data or send_file) can "succeed" (because the socket is + // not "closed" yet because there are still some Socket + // references, namely the reference being used in send_data or + // send_file!). However, when SocketManger::next is actually + // invoked we find out there there is no more data and thus stop + // sending. + // TODO(benh): Should we actually finish sending the data!? + if (connection_handles.count(s) > 0) { + CHECK(outgoing.count(s) > 0); + + if (!outgoing[s].empty()) { + // More messages! + Encoder* encoder = outgoing[s].front(); + outgoing[s].pop(); + return encoder; + } else { + // No more messages ... erase the outgoing queue. + outgoing.erase(s); + + if (dispose.count(s) > 0) { + // This is either a temporary socket we created or it's a + // socket that we were receiving data from and possibly + // sending HTTP responses back on. Clean up either way. + if (nodes.count(s) > 0) { + const Node& node = nodes[s]; + CHECK(temps.count(node) > 0 && temps[node] == s); + temps.erase(node); + nodes.erase(s); + } + + if (proxies.count(s) > 0) { + proxy = proxies[s]; + proxies.erase(s); + } + + dispose.erase(s); + connection_handles.erase(s); + + // We don't actually close the socket (we wait for the Socket + // abstraction to close it once there are no more references), + // but we do shutdown the receiving end so any DataDecoder + // will get cleaned up (which might have the last reference). + shutdown(s, SHUT_RD); + } + } + } + } + + // We terminate the proxy outside the synchronized block to avoid + // possible deadlock between the ProcessManager and SocketManager + // (see comment in SocketManager::proxy for more information). + if (proxy != NULL) { + terminate(proxy); + } + + return NULL; +} + + +void LibevEventManager::close(int s) +{ + HttpProxy* proxy = NULL; // Non-null if needs to be terminated. + + synchronized (this) { + // This socket might not be active if it was already asked to get + // closed (e.g., a write on the socket failed so we try and close + // it and then later the read side of the socket gets closed so we + // try and close it again). Thus, ignore the request if we don't + // know about the socket. + if (connection_handles.count(s) > 0) { + // Clean up any remaining encoders for this socket. + if (outgoing.count(s) > 0) { + while (!outgoing[s].empty()) { + Encoder* encoder = outgoing[s].front(); + delete encoder; + outgoing[s].pop(); + } + + outgoing.erase(s); + } + + // Clean up after sockets used for node communication. + if (nodes.count(s) > 0) { + const Node& node = nodes[s]; + + // Don't bother invoking exited unless socket was persistant. + if (persists.count(node) > 0 && persists[node] == s) { + persists.erase(node); + exited(node); // Generate ExitedEvent(s)! + } else if (temps.count(node) > 0 && temps[node] == s) { + temps.erase(node); + } + + nodes.erase(s); + } + + // Clean up any proxy associated with this socket. + if (proxies.count(s) > 0) { + proxy = proxies[s]; + proxies.erase(s); + } + + // We need to stop any 'ignore_data' readers as they may have + // the last Socket reference so we shutdown reads but don't do a + // full close (since that will be taken care of by ~Socket, see + // comment below). Calling 'shutdown' will trigger 'ignore_data' + // which will get back a 0 (i.e., EOF) when it tries to read + // from the socket. Note we need to do this before we call + // 'sockets.erase(s)' to avoid the potential race with the last + // reference being in 'sockets'. + shutdown(s, SHUT_RD); + + dispose.erase(s); + connection_handles.erase(s); + } + } + + // We terminate the proxy outside the synchronized block to avoid + // possible deadlock between the ProcessManager and SocketManager. + if (proxy != NULL) { + terminate(proxy); + } + + // Note that we don't actually: + // + // close(s); + // + // Because, for example, there could be a race between an HttpProxy + // trying to do send a response with SocketManager::send() or a + // process might be responding to another Request (e.g., trying + // to do a sendfile) since these things may be happening + // asynchronously we can't close the socket yet, because it might + // get reused before any of the above things have finished, and then + // we'll end up sending data on the wrong socket! Instead, we rely + // on the last reference of our Socket object to close the + // socket. Note, however, that since socket is no longer in + // 'sockets' any attempt to send with it will just get ignored. + // TODO(benh): Always do a 'shutdown(s, SHUT_RDWR)' since that + // should keep the file descriptor valid until the last Socket + // reference does a close but force all libev watchers to stop? +} + + +void LibevEventManager::exited(const Node& node) +{ + // TODO(benh): It would be cleaner if this routine could call back + // into ProcessManager ... then we wouldn't have to convince + // ourselves that the accesses to each Process object will always be + // valid. + synchronized (this) { + list removed; + // Look up all linked processes. + foreachpair (const UPID& linkee, set& processes, links) { + if (linkee.ip == node.ip && linkee.port == node.port) { + foreach (ProcessBase* linker, processes) { + LibevMan->enqueue(linker, new ExitedEvent(linkee)); + } + removed.push_back(linkee); + } + } + + foreach (const UPID& pid, removed) { + links.erase(pid); + } + } +} + + +void LibevEventManager::exited(ProcessBase* process) +{ + // An exited event is enough to cause the process to get deleted + // (e.g., by the garbage collector), which means we can't + // dereference process (or even use the address) after we enqueue at + // least one exited event. Thus, we save the process pid. + const UPID pid = LibevMan->get_pid(process); + + // Likewise, we need to save the current time of the process so we + // can update the clocks of linked processes as appropriate. + const Time time = Clock::now(process); + + synchronized (this) { + // Iterate through the links, removing any links the process might + // have had and creating exited events for any linked processes. + foreachpair (const UPID& linkee, set& processes, links) { + processes.erase(process); + + if (linkee == pid) { + foreach (ProcessBase* linker, processes) { + CHECK(linker != process) << "Process linked with itself"; + synchronized (timeouts) { + if (Clock::paused()) { + Clock::update(linker, time); + } + } + LibevMan->enqueue(linker, new ExitedEvent(linkee)); + } + } + } + + links.erase(pid); + } +} + +void LibevEventManager::update_timer() +{ + update_timer_flag = true; + ev_async_send(loop, &async_watcher); +} + + +void LibevEventManager::try_update_timer() +{ + if (!update_timer_flag) { + update_timer_flag = true; + ev_async_send(loop, &async_watcher); + } +} + +bool LibevEventManager::has_pending_timers() const { + return pending_timers; +} + +// Data necessary for polling so we can discard polling and actually +// stop it in the event loop. +struct Poll +{ + Poll() + { + // Need to explicitly instantiate the watchers. + watcher.io.reset(new ev_io()); + watcher.async.reset(new ev_async()); + } + + // An I/O watcher for checking for readability or writeability and + // an async watcher for being able to discard the polling. + struct { + memory::shared_ptr io; + memory::shared_ptr async; + } watcher; + + Promise promise; +}; + +// Event loop callback when I/O is ready on polling file descriptor. +void polled(struct ev_loop* loop, ev_io* watcher, int revents) +{ + Poll* poll = (Poll*) watcher->data; + + ev_io_stop(loop, poll->watcher.io.get()); + + // Stop the async watcher (also clears if pending so 'discard_poll' + // will not get invoked and we can delete 'poll' here). + ev_async_stop(loop, poll->watcher.async.get()); + + poll->promise.set(revents); + + delete poll; +} + + +// Event loop callback when future associated with polling file +// descriptor has been discarded. +void discard_poll(struct ev_loop* loop, ev_async* watcher, int revents) +{ + Poll* poll = (Poll*) watcher->data; + + // Check and see if we have a pending 'polled' callback and if so + // let it "win". + if (ev_is_pending(poll->watcher.io.get())) { + return; + } + + ev_async_stop(loop, poll->watcher.async.get()); + + // Stop the I/O watcher (but note we check if pending above) so it + // won't get invoked and we can delete 'poll' here. + ev_io_stop(loop, poll->watcher.io.get()); + + poll->promise.discard(); + + delete poll; +} + +namespace io { + +namespace internal { + +// Helper/continuation of 'poll' on future discard. +void _poll(const memory::shared_ptr& async) +{ + ev_async_send(loop, async.get()); +} + + +Future poll(int fd, short events) +{ + Poll* poll = new Poll(); + + // Have the watchers data point back to the struct. + poll->watcher.async->data = poll; + poll->watcher.io->data = poll; + + // Get a copy of the future to avoid any races with the event loop. + Future future = poll->promise.future(); + + // Initialize and start the async watcher. + ev_async_init(poll->watcher.async.get(), discard_poll); + ev_async_start(loop, poll->watcher.async.get()); + + // Make sure we stop polling if a discard occurs on our future. + // Note that it's possible that we'll invoke '_poll' when someone + // does a discard even after the polling has already completed, but + // in this case while we will interrupt the event loop since the + // async watcher has already been stopped we won't cause + // 'discard_poll' to get invoked. + future.onDiscard(lambda::bind(&_poll, poll->watcher.async)); + + // Initialize and start the I/O watcher. + ev_io_init(poll->watcher.io.get(), polled, fd, events); + ev_io_start(loop, poll->watcher.io.get()); + + return future; +} + + +void read( + int fd, + void* data, + size_t size, + const memory::shared_ptr >& promise, + const Future& future) +{ + // Ignore this function if the read operation has been discarded. + if (promise->future().hasDiscard()) { + CHECK(!future.isPending()); + promise->discard(); + return; + } + + if (size == 0) { + promise->set(0); + return; + } + + if (future.isDiscarded()) { + promise->fail("Failed to poll: discarded future"); + } else if (future.isFailed()) { + promise->fail(future.failure()); + } else { + ssize_t length = ::read(fd, data, size); + if (length < 0) { + if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) { + // Restart the read operation. + Future future = + io::poll(fd, process::io::READ).onAny( + lambda::bind(&internal::read, + fd, + data, + size, + promise, + lambda::_1)); + + // Stop polling if a discard occurs on our future. + promise->future().onDiscard( + lambda::bind(&process::internal::discard, + WeakFuture(future))); + } else { + // Error occurred. + promise->fail(strerror(errno)); + } + } else { + promise->set(length); + } + } +} + + +void write( + int fd, + void* data, + size_t size, + const memory::shared_ptr >& promise, + const Future& future) +{ + // Ignore this function if the write operation has been discarded. + if (promise->future().hasDiscard()) { + promise->discard(); + return; + } + + if (size == 0) { + promise->set(0); + return; + } + + if (future.isDiscarded()) { + promise->fail("Failed to poll: discarded future"); + } else if (future.isFailed()) { + promise->fail(future.failure()); + } else { + // Do a write but ignore SIGPIPE so we can return an error when + // writing to a pipe or socket where the reading end is closed. + // TODO(benh): The 'suppress' macro failed to work on OS X as it + // appears that signal delivery was happening asynchronously. + // That is, the signal would not appear to be pending when the + // 'suppress' block was closed thus the destructor for + // 'Suppressor' was not waiting/removing the signal via 'sigwait'. + // It also appeared that the signal would be delivered to another + // thread even if it remained blocked in this thiread. The + // workaround here is to check explicitly for EPIPE and then do + // 'sigwait' regardless of what 'os::signals::pending' returns. We + // don't have that luxury with 'Suppressor' and arbitrary signals + // because we don't always have something like EPIPE to tell us + // that a signal is (or will soon be) pending. + bool pending = os::signals::pending(SIGPIPE); + bool unblock = !pending ? os::signals::block(SIGPIPE) : false; + + ssize_t length = ::write(fd, data, size); + + // Save the errno so we can restore it after doing sig* functions + // below. + int errno_ = errno; + + if (length < 0 && errno == EPIPE && !pending) { + sigset_t mask; + sigemptyset(&mask); + sigaddset(&mask, SIGPIPE); + + int result; + do { + int ignored; + result = sigwait(&mask, &ignored); + } while (result == -1 && errno == EINTR); + } + + if (unblock) { + os::signals::unblock(SIGPIPE); + } + + errno = errno_; + + if (length < 0) { + if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) { + // Restart the write operation. + Future future = + io::poll(fd, process::io::WRITE).onAny( + lambda::bind(&internal::write, + fd, + data, + size, + promise, + lambda::_1)); + + // Stop polling if a discard occurs on our future. + promise->future().onDiscard( + lambda::bind(&process::internal::discard, + WeakFuture(future))); + } else { + // Error occurred. + promise->fail(strerror(errno)); + } + } else { + // TODO(benh): Retry if 'length' is 0? + promise->set(length); + } + } +} + + +} // namespace internal { + + +namespace internal { + +#if __cplusplus >= 201103L +Future _read( + int fd, + const memory::shared_ptr& buffer, + const boost::shared_array& data, + size_t length) +{ + return io::read(fd, data.get(), length) + .then([=] (size_t size) -> Future { + if (size == 0) { // EOF. + return string(*buffer); + } + buffer->append(data.get(), size); + return _read(fd, buffer, data, length); + }); +} +#else +// Forward declataion. +Future _read( + int fd, + const memory::shared_ptr& buffer, + const boost::shared_array& data, + size_t length); + + +Future __read( + size_t size, + int fd, + const memory::shared_ptr& buffer, + const boost::shared_array& data, + size_t length) +{ + if (size == 0) { // EOF. + return string(*buffer); + } + + buffer->append(data.get(), size); + + return _read(fd, buffer, data, length); +} + + +Future _read( + int fd, + const memory::shared_ptr& buffer, + const boost::shared_array& data, + size_t length) +{ + return io::read(fd, data.get(), length) + .then(lambda::bind(&__read, lambda::_1, fd, buffer, data, length)); +} +#endif // __cplusplus >= 201103L + + +#if __cplusplus >= 201103L +Future _write( + int fd, + Owned data, + size_t index) +{ + return io::write(fd, (void*) (data->data() + index), data->size() - index) + .then([=] (size_t length) -> Future { + if (index + length == data->size()) { + return Nothing(); + } + return _write(fd, data, index + length); + }); +} +#else +// Forward declaration. +Future _write( + int fd, + Owned data, + size_t index); + + +Future __write( + int fd, + Owned data, + size_t index, + size_t length) +{ + if (index + length == data->size()) { + return Nothing(); + } + return _write(fd, data, index + length); +} + + +Future _write( + int fd, + Owned data, + size_t index) +{ + return io::write(fd, (void*) (data->data() + index), data->size() - index) + .then(lambda::bind(&__write, fd, data, index, lambda::_1)); +} +#endif // __cplusplus >= 201103L + + +#if __cplusplus >= 201103L +void _splice( + int from, + int to, + size_t chunk, + boost::shared_array data, + memory::shared_ptr> promise) +{ + // Stop splicing if a discard occured on our future. + if (promise->future().hasDiscard()) { + // TODO(benh): Consider returning the number of bytes already + // spliced on discarded, or a failure. Same for the 'onDiscarded' + // callbacks below. + promise->discard(); + return; + } + + // Note that only one of io::read or io::write is outstanding at any + // one point in time thus the reuse of 'data' for both operations. + + Future read = io::read(from, data.get(), chunk); + + // Stop reading (or potentially indefinitely polling) if a discard + // occcurs on our future. + promise->future().onDiscard( + lambda::bind(&process::internal::discard, + WeakFuture(read))); + + read + .onReady([=] (size_t size) { + if (size == 0) { // EOF. + promise->set(Nothing()); + } else { + // Note that we always try and complete the write, even if a + // discard has occured on our future, in order to provide + // semantics where everything read is written. The promise + // will eventually be discarded in the next read. + io::write(to, string(data.get(), size)) + .onReady([=] () { _splice(from, to, chunk, data, promise); }) + .onFailed([=] (const string& message) { promise->fail(message); }) + .onDiscarded([=] () { promise->discard(); }); + } + }) + .onFailed([=] (const string& message) { promise->fail(message); }) + .onDiscarded([=] () { promise->discard(); }); +} +#else +// Forward declarations. +void __splice( + int from, + int to, + size_t chunk, + boost::shared_array data, + memory::shared_ptr > promise, + size_t size); + +void ___splice( + memory::shared_ptr > promise, + const string& message); + +void ____splice( + memory::shared_ptr > promise); + + +void _splice( + int from, + int to, + size_t chunk, + boost::shared_array data, + memory::shared_ptr > promise) +{ + // Stop splicing if a discard occured on our future. + if (promise->future().hasDiscard()) { + // TODO(benh): Consider returning the number of bytes already + // spliced on discarded, or a failure. Same for the 'onDiscarded' + // callbacks below. + promise->discard(); + return; + } + + Future read = io::read(from, data.get(), chunk); + + // Stop reading (or potentially indefinitely polling) if a discard + // occurs on our future. + promise->future().onDiscard( + lambda::bind(&process::internal::discard, + WeakFuture(read))); + + read + .onReady( + lambda::bind(&__splice, from, to, chunk, data, promise, lambda::_1)) + .onFailed(lambda::bind(&___splice, promise, lambda::_1)) + .onDiscarded(lambda::bind(&____splice, promise)); +} + + +void __splice( + int from, + int to, + size_t chunk, + boost::shared_array data, + memory::shared_ptr > promise, + size_t size) +{ + if (size == 0) { // EOF. + promise->set(Nothing()); + } else { + // Note that we always try and complete the write, even if a + // discard has occured on our future, in order to provide + // semantics where everything read is written. The promise will + // eventually be discarded in the next read. + io::write(to, string(data.get(), size)) + .onReady(lambda::bind(&_splice, from, to, chunk, data, promise)) + .onFailed(lambda::bind(&___splice, promise, lambda::_1)) + .onDiscarded(lambda::bind(&____splice, promise)); + } +} + + +void ___splice( + memory::shared_ptr > promise, + const string& message) +{ + promise->fail(message); +} + + +void ____splice( + memory::shared_ptr > promise) +{ + promise->discard(); +} +#endif // __cplusplus >= 201103L + + +Future splice(int from, int to, size_t chunk) +{ + boost::shared_array data(new char[chunk]); + + // Rather than having internal::_splice return a future and + // implementing internal::_splice as a chain of io::read and + // io::write calls, we use an explicit promise that we pass around + // so that we don't increase memory usage the longer that we splice. + memory::shared_ptr > promise(new Promise()); + + Future future = promise->future(); + + _splice(from, to, chunk, data, promise); + + return future; +} + +} // namespace internal { + + +} // namespace io { + +Future LibevEventManager::poll(int fd, short events) +{ + process::initialize(); + + // TODO(benh): Check if the file descriptor is non-blocking? + + return run_in_event_loop(lambda::bind(&io::internal::poll, fd, events)); +} + +Future LibevEventManager::read(int fd, void* data, size_t size) +{ + process::initialize(); + + memory::shared_ptr > promise(new Promise()); + + // Check the file descriptor. + Try nonblock = os::isNonblock(fd); + if (nonblock.isError()) { + // The file descriptor is not valid (e.g., has been closed). + promise->fail( + "Failed to check if file descriptor was non-blocking: " + + nonblock.error()); + return promise->future(); + } else if (!nonblock.get()) { + // The file descriptor is not non-blocking. + promise->fail("Expected a non-blocking file descriptor"); + return promise->future(); + } + + // Because the file descriptor is non-blocking, we call read() + // immediately. The read may in turn call poll if necessary, + // avoiding unnecessary polling. We also observed that for some + // combination of libev and Linux kernel versions, the poll would + // block for non-deterministically long periods of time. This may be + // fixed in a newer version of libev (we use 3.8 at the time of + // writing this comment). + io::internal::read(fd, data, size, promise, io::READ); + + return promise->future(); +} + + +Future LibevEventManager::write(int fd, void* data, size_t size) +{ + process::initialize(); + + memory::shared_ptr > promise(new Promise()); + + // Check the file descriptor. + Try nonblock = os::isNonblock(fd); + if (nonblock.isError()) { + // The file descriptor is not valid (e.g., has been closed). + promise->fail( + "Failed to check if file descriptor was non-blocking: " + + nonblock.error()); + return promise->future(); + } else if (!nonblock.get()) { + // The file descriptor is not non-blocking. + promise->fail("Expected a non-blocking file descriptor"); + return promise->future(); + } + + // Because the file descriptor is non-blocking, we call write() + // immediately. The write may in turn call poll if necessary, + // avoiding unnecessary polling. We also observed that for some + // combination of libev and Linux kernel versions, the poll would + // block for non-deterministically long periods of time. This may be + // fixed in a newer version of libev (we use 3.8 at the time of + // writing this comment). + io::internal::write(fd, data, size, promise, io::WRITE); + + return promise->future(); +} + +Future LibevEventManager::read(int fd) +{ + process::initialize(); + + // Get our own copy of the file descriptor so that we're in control + // of the lifetime and don't crash if/when someone by accidently + // closes the file descriptor before discarding this future. We can + // also make sure it's non-blocking and will close-on-exec. Start by + // checking we've got a "valid" file descriptor before dup'ing. + if (fd < 0) { + return Failure(strerror(EBADF)); + } + + fd = dup(fd); + if (fd == -1) { + return Failure(ErrnoError("Failed to duplicate file descriptor")); + } + + // Set the close-on-exec flag. + Try cloexec = os::cloexec(fd); + if (cloexec.isError()) { + os::close(fd); + return Failure( + "Failed to set close-on-exec on duplicated file descriptor: " + + cloexec.error()); + } + + // Make the file descriptor is non-blocking. + Try nonblock = os::nonblock(fd); + if (nonblock.isError()) { + os::close(fd); + return Failure( + "Failed to make duplicated file descriptor non-blocking: " + + nonblock.error()); + } + + // TODO(benh): Wrap up this data as a struct, use 'Owner'. + // TODO(bmahler): For efficiency, use a rope for the buffer. + memory::shared_ptr buffer(new string()); + boost::shared_array data(new char[io::BUFFERED_READ_SIZE]); + + return io::internal::_read(fd, buffer, data, io::BUFFERED_READ_SIZE) + .onAny(lambda::bind(&os::close, fd)); +} + + +Future LibevEventManager::write(int fd, const std::string& data) +{ + process::initialize(); + + // Get our own copy of the file descriptor so that we're in control + // of the lifetime and don't crash if/when someone by accidently + // closes the file descriptor before discarding this future. We can + // also make sure it's non-blocking and will close-on-exec. Start by + // checking we've got a "valid" file descriptor before dup'ing. + if (fd < 0) { + return Failure(strerror(EBADF)); + } + + fd = dup(fd); + if (fd == -1) { + return Failure(ErrnoError("Failed to duplicate file descriptor")); + } + + // Set the close-on-exec flag. + Try cloexec = os::cloexec(fd); + if (cloexec.isError()) { + os::close(fd); + return Failure( + "Failed to set close-on-exec on duplicated file descriptor: " + + cloexec.error()); + } + + // Make the file descriptor is non-blocking. + Try nonblock = os::nonblock(fd); + if (nonblock.isError()) { + os::close(fd); + return Failure( + "Failed to make duplicated file descriptor non-blocking: " + + nonblock.error()); + } + + return io::internal::_write(fd, Owned(new string(data)), 0) + .onAny(lambda::bind(&os::close, fd)); +} + + +Future LibevEventManager::redirect( + int from, + Option to, + size_t chunk) +{ + // Make sure we've got "valid" file descriptors. + if (from < 0 || (to.isSome() && to.get() < 0)) { + return Failure(strerror(EBADF)); + } + + if (to.isNone()) { + // Open up /dev/null that we can splice into. + Try open = os::open("/dev/null", O_WRONLY); + + if (open.isError()) { + return Failure("Failed to open /dev/null for writing: " + open.error()); + } + + to = open.get(); + } else { + // Duplicate 'to' so that we're in control of its lifetime. + int fd = dup(to.get()); + if (fd == -1) { + return Failure(ErrnoError("Failed to duplicate 'to' file descriptor")); + } + + to = fd; + } + + CHECK_SOME(to); + + // Duplicate 'from' so that we're in control of its lifetime. + from = dup(from); + if (from == -1) { + return Failure(ErrnoError("Failed to duplicate 'from' file descriptor")); + } + + // Set the close-on-exec flag (no-op if already set). + Try cloexec = os::cloexec(from); + if (cloexec.isError()) { + os::close(from); + os::close(to.get()); + return Failure("Failed to set close-on-exec on 'from': " + cloexec.error()); + } + + cloexec = os::cloexec(to.get()); + if (cloexec.isError()) { + os::close(from); + os::close(to.get()); + return Failure("Failed to set close-on-exec on 'to': " + cloexec.error()); + } + + // Make the file descriptors non-blocking (no-op if already set). + Try nonblock = os::nonblock(from); + if (nonblock.isError()) { + os::close(from); + os::close(to.get()); + return Failure("Failed to make 'from' non-blocking: " + nonblock.error()); + } + + nonblock = os::nonblock(to.get()); + if (nonblock.isError()) { + os::close(from); + os::close(to.get()); + return Failure("Failed to make 'to' non-blocking: " + nonblock.error()); + } + + return io::internal::splice(from, to.get(), chunk) + .onAny(lambda::bind(&os::close, from)) + .onAny(lambda::bind(&os::close, to.get())); +} + +} // namespace process { \ No newline at end of file diff --git a/3rdparty/libprocess/src/libev_event_manager.hpp b/3rdparty/libprocess/src/libev_event_manager.hpp new file mode 100644 index 00000000000..5624a3b962c --- /dev/null +++ b/3rdparty/libprocess/src/libev_event_manager.hpp @@ -0,0 +1,14 @@ +#ifndef LIBEV_EVENT_MANAGER_HPP +#define LIBEV_EVENT_MANAGER_HPP + +#include "event_manager.hpp" + +namespace process { + +// singelton for PIMPL pattern libev based event manager + extern EventManager* GetLibevEventManager( + EventManager::ProcessManager* process_manager); + +} // namespace process { + +#endif // LIBEV_EVENT_MANAGER_HPP diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp index 8adc8093ab0..d4a1a97adaf 100644 --- a/3rdparty/libprocess/src/process.cpp +++ b/3rdparty/libprocess/src/process.cpp @@ -1,5 +1,4 @@ #include -#include #include #include #include @@ -81,7 +80,11 @@ #include "config.hpp" #include "decoder.hpp" #include "encoder.hpp" +#include "event_manager.hpp" +#include "http_proxy.hpp" #include "gate.hpp" +#include "libev_event_manager.hpp" +#include "process_reference.hpp" #include "synchronized.hpp" using namespace process::metrics::internal; @@ -137,154 +140,6 @@ map types; } // namespace mime { - -// Provides reference counting semantics for a process pointer. -class ProcessReference -{ -public: - ProcessReference() : process(NULL) {} - - ~ProcessReference() - { - cleanup(); - } - - ProcessReference(const ProcessReference& that) - { - copy(that); - } - - ProcessReference& operator = (const ProcessReference& that) - { - if (this != &that) { - cleanup(); - copy(that); - } - return *this; - } - - ProcessBase* operator -> () - { - return process; - } - - operator ProcessBase* () - { - return process; - } - - operator bool () const - { - return process != NULL; - } - -private: - friend class ProcessManager; // For ProcessManager::use. - - explicit ProcessReference(ProcessBase* _process) - : process(_process) - { - if (process != NULL) { - __sync_fetch_and_add(&(process->refs), 1); - } - } - - void copy(const ProcessReference& that) - { - process = that.process; - - if (process != NULL) { - // There should be at least one reference to the process, so - // we don't need to worry about checking if it's exiting or - // not, since we know we can always create another reference. - CHECK(process->refs > 0); - __sync_fetch_and_add(&(process->refs), 1); - } - } - - void cleanup() - { - if (process != NULL) { - __sync_fetch_and_sub(&(process->refs), 1); - } - } - - ProcessBase* process; -}; - - -// Provides a process that manages sending HTTP responses so as to -// satisfy HTTP/1.1 pipelining. Each request should either enqueue a -// response, or ask the proxy to handle a future response. The process -// is responsible for making sure the responses are sent in the same -// order as the requests. Note that we use a 'Socket' in order to keep -// the underyling file descriptor from getting closed while there -// might still be outstanding responses even though the client might -// have closed the connection (see more discussion in -// SocketManger::close and SocketManager::proxy). -class HttpProxy : public Process -{ -public: - explicit HttpProxy(const Socket& _socket); - virtual ~HttpProxy(); - - // Enqueues the response to be sent once all previously enqueued - // responses have been processed (e.g., waited for and sent). - void enqueue(const Response& response, const Request& request); - - // Enqueues a future to a response that will get waited on (up to - // some timeout) and then sent once all previously enqueued - // responses have been processed (e.g., waited for and sent). - void handle(Future* future, const Request& request); - -private: - // Starts "waiting" on the next available future response. - void next(); - - // Invoked once a future response has been satisfied. - void waited(const Future& future); - - // Demuxes and handles a response. - bool process(const Future& future, const Request& request); - - // Handles stream (i.e., pipe) based responses. - void stream(const Future& poll, const Request& request); - - Socket socket; // Wrap the socket to keep it from getting closed. - - // Describes a queue "item" that wraps the future to the response - // and the original request. - // The original request contains needed information such as what encodings - // are acceptable and whether to persist the connection. - struct Item - { - Item(const Request& _request, Future* _future) - : request(_request), future(_future) {} - - ~Item() - { - delete future; - } - - // Helper for cleaning up a response (i.e., closing any open pipes - // in the event Response::type is PIPE). - static void cleanup(const Response& response) - { - if (response.type == Response::PIPE) { - os::close(response.pipe); - } - } - - const Request request; // Make a copy. - Future* future; - }; - - queue items; - - Option pipe; // Current pipe, if streaming. -}; - - // Helper for creating routes without a process. // TODO(benh): Move this into route.hpp. class Route @@ -335,79 +190,17 @@ class Route }; -class SocketManager -{ -public: - SocketManager(); - ~SocketManager(); - - Socket accepted(int s); - - void link(ProcessBase* process, const UPID& to); - - PID proxy(const Socket& socket); - - void send(Encoder* encoder, bool persist); - void send(const Response& response, - const Request& request, - const Socket& socket); - void send(Message* message); - - Encoder* next(int s); - - void close(int s); - - void exited(const Node& node); - void exited(ProcessBase* process); - -private: - // Map from UPID (local/remote) to process. - map > links; - - // Collection of all actice sockets. - map sockets; - - // Collection of sockets that should be disposed when they are - // finished being used (e.g., when there is no more data to send on - // them). - set dispose; - - // Map from socket to node (ip, port). - map nodes; - - // Maps from node (ip, port) to temporary sockets (i.e., they will - // get closed once there is no more data to send on them). - map temps; - - // Maps from node (ip, port) to persistent sockets (i.e., they will - // remain open even if there is no more data to send on them). We - // distinguish these from the 'temps' collection so we can tell when - // a persistant socket has been lost (and thus generate - // ExitedEvents). - map persists; - - // Map from socket to outgoing queue. - map > outgoing; - - // HTTP proxies. - map proxies; - - // Protects instance variables. - synchronizable(this); -}; - - -class ProcessManager +class ProcessManager : public EventManager::ProcessManager { public: explicit ProcessManager(const string& delegate); - ~ProcessManager(); + virtual ~ProcessManager(); - ProcessReference use(const UPID& pid); + virtual ProcessReference use(const UPID& pid) override; - bool handle( - const Socket& socket, - Request* request); + virtual bool handle( + const ConnectionHandle& connection_handle, + Request* request) override; bool deliver( ProcessBase* receiver, @@ -504,61 +297,25 @@ const string Profiler::STOP_HELP = HELP( "> param=VALUE Some description here")); -// Unique id that can be assigned to each process. -static uint32_t __id__ = 0; +uint32_t __id__ = 0; -// Local server socket. -static int __s__ = -1; +int __s__ = -1; -// Local IP address. -static uint32_t __ip__ = 0; +uint32_t __ip__ = 0; -// Local port. -static uint16_t __port__ = 0; +uint16_t __port__ = 0; -// Active SocketManager (eventually will probably be thread-local). -static SocketManager* socket_manager = NULL; +// Active EventManager (eventually will probably be thread-local). +static EventManager* event_manager = NULL; // Active ProcessManager (eventually will probably be thread-local). static ProcessManager* process_manager = NULL; -// Event loop. -static struct ev_loop* loop = NULL; - -// Asynchronous watcher for interrupting loop. -static ev_async async_watcher; - -// Watcher for timeouts. -static ev_timer timeouts_watcher; - -// Server watcher for accepting connections. -static ev_io server_watcher; - -// Queue of I/O watchers to be asynchronously added to the event loop -// (protected by 'watchers' below). -// TODO(benh): Replace this queue with functions that we put in -// 'functions' below that perform the ev_io_start themselves. -static queue* watchers = new queue(); -static synchronizable(watchers) = SYNCHRONIZED_INITIALIZER; - -// Queue of functions to be invoked asynchronously within the vent -// loop (protected by 'watchers' below). -static queue >* functions = - new queue >(); - // We store the timers in a map of lists indexed by the timeout of the // timer so that we can have two timers that have the same timeout. We // exploit that the map is SORTED! -static map >* timeouts = new map >(); -static synchronizable(timeouts) = SYNCHRONIZED_INITIALIZER_RECURSIVE; - -// For supporting Clock::settle(), true if timers have been removed -// from 'timeouts' but may not have been executed yet. Protected by -// the timeouts lock. This is only used when the clock is paused. -static bool pending_timers = false; - -// Flag to indicate whether or to update the timer on async interrupt. -static bool update_timer = false; +map >* timeouts = new map >(); +synchronizable(timeouts) = SYNCHRONIZED_INITIALIZER_RECURSIVE; // Scheduling gate that threads wait at when there is nothing to run. static Gate* gate = new Gate(); @@ -627,8 +384,7 @@ Time Clock::now(ProcessBase* process) } } - // TODO(benh): Versus ev_now()? - double d = ev_time(); + double d = event_manager->get_time(); Try