Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Abstract event manager #1

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions 3rdparty/libprocess/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,19 @@ libprocess_la_SOURCES = \
src/config.hpp \
src/decoder.hpp \
src/encoder.hpp \
src/event_manager.cpp \
src/event_manager.hpp \
src/event_manager_base.hpp \
src/gate.hpp \
src/http.cpp \
src/http_proxy.cpp \
src/latch.cpp \
src/libev_event_manager.cpp \
src/libev_event_manager.hpp \
src/metrics/metrics.cpp \
src/pid.cpp \
src/process.cpp \
src/process_reference.hpp \
src/reap.cpp \
src/subprocess.cpp \
src/synchronized.hpp \
Expand Down Expand Up @@ -85,7 +92,7 @@ libprocess_la_LIBADD += $(GPERFTOOLS)/libprofiler.la
endif

# Tests.
check_PROGRAMS = tests
check_PROGRAMS = tests benchmarks

tests_SOURCES = \
src/tests/decoder_tests.cpp \
Expand Down Expand Up @@ -120,12 +127,29 @@ tests_LDADD = \
$(HTTP_PARSER_LIB) \
$(LIBEV_LIB)

benchmarks_SOURCES = \
src/tests/benchmarks.cpp

benchmarks_CPPFLAGS = \
-I$(top_srcdir)/src \
-I$(GTEST)/include \
-I$(GMOCK)/include \
$(libprocess_la_CPPFLAGS)

benchmarks_LDADD = \
3rdparty/libgmock.la \
libprocess.la \
$(LIBGLOG) \
$(HTTP_PARSER_LIB) \
$(LIBEV_LIB)

# We use a check-local target for now to avoid the parallel test
# runner that ships with newer versions of autotools.
# See the following discussion for the workaround:
# http://lists.gnu.org/archive/html/automake/2013-01/msg00051.html
check-local: tests
check-local: tests benchmarks
./tests
./benchmarks

# TODO(benh): Fix shared builds (tests need libglog, libev, etc).

Expand Down
6 changes: 3 additions & 3 deletions 3rdparty/libprocess/include/process/event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ struct MessageEvent : Event

struct HttpEvent : Event
{
HttpEvent(const Socket& _socket, http::Request* _request)
: socket(_socket), request(_request) {}
HttpEvent(const ConnectionHandle& _connection_handle, http::Request* _request)
: connection_handle(_connection_handle), request(_request) {}

virtual ~HttpEvent()
{
Expand All @@ -115,7 +115,7 @@ struct HttpEvent : Event
visitor->visit(*this);
}

const Socket socket;
const ConnectionHandle connection_handle;
http::Request* const request;

private:
Expand Down
2 changes: 1 addition & 1 deletion 3rdparty/libprocess/include/process/process.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class ProcessBase : public EventVisitor
}

private:
friend class SocketManager;
friend class EventManager;
friend class ProcessManager;
friend class ProcessReference;
friend void* schedule(void*);
Expand Down
73 changes: 12 additions & 61 deletions 3rdparty/libprocess/include/process/socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,76 +34,27 @@ inline Try<int> socket(int family, int type, int protocol) {
}


// An abstraction around a socket (file descriptor) that provides
// reference counting such that the socket is only closed (and thus,
// has the possiblity of being reused) after there are no more
// references.

class Socket
{
class Connection {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely worth a comment block to elaborate on relation to Socket. No one likes vernacular changes unless they are justified.

public:
Socket()
: refs(new int(1)), s(-1) {}

explicit Socket(int _s)
: refs(new int(1)), s(_s) {}

~Socket()
{
cleanup();
}
typedef int id_t;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why? isn't this just the socket fd?

virtual ~Connection() {}

Socket(const Socket& that)
inline operator const id_t&() const
{
copy(that);
return id;
}

Socket& operator = (const Socket& that)
{
if (this != &that) {
cleanup();
copy(that);
}
return *this;
}

bool operator == (const Socket& that) const
{
return s == that.s && refs == that.refs;
}
protected:
Connection(const id_t& _id) : id(_id) {}

operator int () const
{
return s;
}
/* This is good enough for now. In the future we might not want to
* limit ourselves to representing the identifier as an int (fd). */
const id_t id;
};

private:
void copy(const Socket& that)
{
assert(that.refs > 0);
__sync_fetch_and_add(that.refs, 1);
refs = that.refs;
s = that.s;
}

void cleanup()
{
assert(refs != NULL);
if (__sync_sub_and_fetch(refs, 1) == 0) {
delete refs;
if (s >= 0) {
Try<Nothing> close = os::close(s);
if (close.isError()) {
std::cerr << "Failed to close socket: " << close.error() << std::endl;
abort();
}
}
}
}
typedef std::shared_ptr<Connection> ConnectionHandle;

int* refs;
int s;
};

} // namespace process {

Expand Down
11 changes: 6 additions & 5 deletions 3rdparty/libprocess/src/decoder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ namespace process {
class DataDecoder
{
public:
explicit DataDecoder(const Socket& _s)
: s(_s), failure(false), request(NULL)
explicit DataDecoder(const ConnectionHandle& _conn_handle)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I know that you're following the internal conventions here, but it would be nice if we were to inforce standard naming conventions outlined by Sutter etc.

: conn_handle(_conn_handle), failure(false), request(NULL)
{
settings.on_message_begin = &DataDecoder::on_message_begin;
settings.on_header_field = &DataDecoder::on_header_field;
Expand Down Expand Up @@ -67,9 +67,9 @@ class DataDecoder
return failure;
}

Socket socket() const
const ConnectionHandle& connection_handle() const
{
return s;
return conn_handle;
}

private:
Expand Down Expand Up @@ -240,7 +240,8 @@ class DataDecoder
return 0;
}

const Socket s; // The socket this decoder is associated with.
// The connection handle this decoder is associated with.
const ConnectionHandle conn_handle;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you follow Sutters well established coding standards member variables begin with _ , but that is more a old-school preference thing.


bool failure;

Expand Down
52 changes: 26 additions & 26 deletions 3rdparty/libprocess/src/encoder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,49 +15,49 @@
#include <stout/numify.hpp>
#include <stout/os.hpp>

// NOTE: We forward declare "ev_loop" and "ev_io" here because,
// on OSX, including "ev.h" causes conflict with "EV_ERROR" declared
// in "/usr/include/sys/event.h".
struct ev_loop;
struct ev_io;

namespace process {

const uint32_t GZIP_MINIMUM_BODY_LENGTH = 1024;

typedef void (*Sender)(struct ev_loop*, ev_io*, int);

extern void send_data(struct ev_loop*, ev_io*, int);
extern void send_file(struct ev_loop*, ev_io*, int);


class Encoder
{
public:
explicit Encoder(const Socket& _s) : s(_s) {}
enum IOKind

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason for the name?

{
send_data,
send_file
};

explicit Encoder(const ConnectionHandle& _connection_handle)
: conn_handle(_connection_handle) {}
virtual ~Encoder() {}

virtual Sender sender() = 0;
virtual IOKind io_kind() = 0;

Socket socket() const
const ConnectionHandle& connection_handle() const
{
return s;
return conn_handle;
}

private:
const Socket s; // The socket this encoder is associated with.

// The connection handle this encoder is associated with.
const ConnectionHandle conn_handle;
};


class DataEncoder : public Encoder
{
public:
DataEncoder(const Socket& s, const std::string& _data)
: Encoder(s), data(_data), index(0) {}
DataEncoder(
const ConnectionHandle& _connection_handle,
const std::string& _data)
: Encoder(_connection_handle), data(_data), index(0) {}

virtual ~DataEncoder() {}

virtual Sender sender()
virtual IOKind io_kind()
{
return send_data;
}
Expand Down Expand Up @@ -91,8 +91,8 @@ class DataEncoder : public Encoder
class MessageEncoder : public DataEncoder
{
public:
MessageEncoder(const Socket& s, Message* _message)
: DataEncoder(s, encode(_message)), message(_message) {}
MessageEncoder(const ConnectionHandle& _connection_handle, Message* _message)
: DataEncoder(_connection_handle, encode(_message)), message(_message) {}

virtual ~MessageEncoder()
{
Expand Down Expand Up @@ -143,10 +143,10 @@ class HttpResponseEncoder : public DataEncoder
{
public:
HttpResponseEncoder(
const Socket& s,
const ConnectionHandle& _connection_handle,
const http::Response& response,
const http::Request& request)
: DataEncoder(s, encode(response, request)) {}
: DataEncoder(_connection_handle, encode(response, request)) {}

static std::string encode(
const http::Response& response,
Expand Down Expand Up @@ -227,15 +227,15 @@ class HttpResponseEncoder : public DataEncoder
class FileEncoder : public Encoder
{
public:
FileEncoder(const Socket& s, int _fd, size_t _size)
: Encoder(s), fd(_fd), size(_size), index(0) {}
FileEncoder(const ConnectionHandle& _connection_handle, int _fd, size_t _size)
: Encoder(_connection_handle), fd(_fd), size(_size), index(0) {}

virtual ~FileEncoder()
{
os::close(fd);
}

virtual Sender sender()
virtual IOKind io_kind()
{
return send_file;
}
Expand Down
Loading