Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Abstract event manager #1

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions 3rdparty/libprocess/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,18 @@ libprocess_la_SOURCES = \
src/config.hpp \
src/decoder.hpp \
src/encoder.hpp \
src/event_manager.hpp \
src/event_manager_base.hpp \
src/gate.hpp \
src/http.cpp \
src/http_proxy.cpp \
src/latch.cpp \
src/libev_event_manager.cpp \
src/libev_event_manager.hpp \
src/metrics/metrics.cpp \
src/pid.cpp \
src/process.cpp \
src/process_reference.hpp \
src/reap.cpp \
src/subprocess.cpp \
src/synchronized.hpp \
Expand Down Expand Up @@ -85,7 +91,7 @@ libprocess_la_LIBADD += $(GPERFTOOLS)/libprofiler.la
endif

# Tests.
check_PROGRAMS = tests
check_PROGRAMS = tests benchmarks

tests_SOURCES = \
src/tests/decoder_tests.cpp \
Expand Down Expand Up @@ -120,12 +126,29 @@ tests_LDADD = \
$(HTTP_PARSER_LIB) \
$(LIBEV_LIB)

benchmarks_SOURCES = \
src/tests/benchmarks.cpp

benchmarks_CPPFLAGS = \
-I$(top_srcdir)/src \
-I$(GTEST)/include \
-I$(GMOCK)/include \
$(libprocess_la_CPPFLAGS)

benchmarks_LDADD = \
3rdparty/libgmock.la \
libprocess.la \
$(LIBGLOG) \
$(HTTP_PARSER_LIB) \
$(LIBEV_LIB)

# We use a check-local target for now to avoid the parallel test
# runner that ships with newer versions of autotools.
# See the following discussion for the workaround:
# http://lists.gnu.org/archive/html/automake/2013-01/msg00051.html
check-local: tests
check-local: tests benchmarks
./tests
./benchmarks

# TODO(benh): Fix shared builds (tests need libglog, libev, etc).

Expand Down
2 changes: 1 addition & 1 deletion 3rdparty/libprocess/include/process/process.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class ProcessBase : public EventVisitor
}

private:
friend class SocketManager;
friend class EventManager;
friend class ProcessManager;
friend class ProcessReference;
friend void* schedule(void*);
Expand Down
23 changes: 9 additions & 14 deletions 3rdparty/libprocess/src/encoder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,24 @@
#include <stout/numify.hpp>
#include <stout/os.hpp>

// NOTE: We forward declare "ev_loop" and "ev_io" here because,
// on OSX, including "ev.h" causes conflict with "EV_ERROR" declared
// in "/usr/include/sys/event.h".
struct ev_loop;
struct ev_io;

namespace process {

const uint32_t GZIP_MINIMUM_BODY_LENGTH = 1024;

typedef void (*Sender)(struct ev_loop*, ev_io*, int);

extern void send_data(struct ev_loop*, ev_io*, int);
extern void send_file(struct ev_loop*, ev_io*, int);


class Encoder
{
public:
enum IOKind

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason for the name?

{
send_data,
send_file
};

explicit Encoder(const Socket& _s) : s(_s) {}
virtual ~Encoder() {}

virtual Sender sender() = 0;
virtual IOKind io_kind() = 0;

Socket socket() const
{
Expand All @@ -57,7 +52,7 @@ class DataEncoder : public Encoder

virtual ~DataEncoder() {}

virtual Sender sender()
virtual IOKind io_kind()
{
return send_data;
}
Expand Down Expand Up @@ -235,7 +230,7 @@ class FileEncoder : public Encoder
os::close(fd);
}

virtual Sender sender()
virtual IOKind io_kind()
{
return send_file;
}
Expand Down
150 changes: 150 additions & 0 deletions 3rdparty/libprocess/src/event_manager.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
#ifndef EVENT_MANAGER_HPP
#define EVENT_MANAGER_HPP

#include <process/process.hpp>

#include "event_manager_base.hpp"
#include "http_proxy.hpp"
#include "process_reference.hpp"
#include "synchronized.hpp"

namespace process {

class EventManager : public internal::EventManager

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Description please, it looks to be collapsing the encapsulation of Process + Connection + Events into Event Manager. So anything that large, it's nice to outline it's role.

{
public:

/* A dependency injection of ProcessManager. This exposes just the
* behavior that the EventManager cares about:
* - The ability to talk about a Process.
* - The ability to dispatch a request to be run. */
class ProcessManager
{
protected:
ProcessManager() {}

public:
virtual ~ProcessManager() {}

/* Return a reference counted handle to the given process. */
virtual ProcessReference use(const UPID& pid) = 0;

/* Handle the given request for this socket. */
virtual bool handle(
const Socket& socket,
http::Request* request) = 0;
};

virtual ~EventManager() {}

/* Forward the enqueue call from a more derived class. This is a proxy call
* as ProcessBase can not friend all derived versions of this class. They may
* not be known at compile time. */
inline void enqueue(ProcessBase* proc_base,
Event* event,
bool inject = false);

/* Return the pid from the given process. This is a proxy call similar to the
* above. */
inline const UPID &get_pid(ProcessBase* process) const;

/* A hook for initializing required state to run this manager. For
* example initializing event loops. */
virtual void initialize() = 0;

/* Functions from original SocketManager. These are used by
* ProcessManager to implement delivery of messages. */

/* Establish a persistent connection between the given process and
* the process represented by the UPID to. This is one way
* connection from process -> to. See process::link() for more
* details. */
virtual void link(ProcessBase* process, const UPID& to) = 0;

/* Return a handle to the HttpProxy representing the connection on
* the given socket. */
virtual PID<HttpProxy> proxy(const Socket& socket) = 0;

/* Send the given message to the remote host identified in the
* message. */
virtual void send(Message* message) = 0;

/* Created exited events for linked processes. See usage in
* ProcessManager */
virtual void exited(ProcessBase* process) = 0;

/* Functions related to timer behavior. This behavior is usually
* associated with io event managers as they can block indefinitely
* for IO, and timers are used to set time-outs on waiting. */

// Return the current time.
virtual double get_time() const = 0;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would this exist here?


// Return true if there are pending timers that need to be executed.
virtual bool has_pending_timers() const = 0;

// Update the timer on async interrupt.
virtual void update_timer() = 0;

/* Update the timer on async interrupt iff it is not already set to
* do so. */
virtual void try_update_timer() = 0;

/* Functions coming from process/io.hpp. The following are pure
* virtuals that provide a nice hook for implementing each of
* these functions in a clean way. */

// see process/io.hpp
virtual Future<short> poll(int fd, short events) = 0;

// see process/io.hpp
virtual Future<size_t> read(int fd, void* data, size_t size) = 0;

// see process/io.hpp
virtual Future<std::string> read(int fd) = 0;

// see process/io.hpp
virtual Future<size_t> write(int fd, void* data, size_t size) = 0;

// see process/io.hpp
virtual Future<Nothing> write(int fd, const std::string& data) = 0;

// see process/io.hpp
virtual Future<Nothing> redirect(int from, Option<int> to, size_t chunk) = 0;

protected:
EventManager() {}

};

inline void EventManager::enqueue(
ProcessBase* proc_base,
Event* event,
bool inject)
{
proc_base->enqueue(event, inject);
}

inline const UPID &EventManager::get_pid(ProcessBase* process) const
{
return process->pid;
}

extern std::map<Time, std::list<Timer> >* timeouts;
extern synchronizable(timeouts);

// Unique id that can be assigned to each process.
extern uint32_t __id__;

// Local server socket.
extern int __s__;

// Local IP address.
extern uint32_t __ip__;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So why are we declaring the externs here?


// Local port.
extern uint16_t __port__;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should change esp in leiu of IPv6 requests.


} // namespace process {

#endif // EVENT_MANAGER_HPP
31 changes: 31 additions & 0 deletions 3rdparty/libprocess/src/event_manager_base.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#ifndef EVENT_MANAGER_BASE_HPP
#define EVENT_MANAGER_BASE_HPP

#include "encoder.hpp"

namespace process {

namespace internal {

class EventManager
{
public:
virtual ~EventManager() {}

virtual void send(Encoder* encoder, bool persist) = 0;

virtual void send(
const http::Response& response,
const http::Request& request,
const Socket& socket) = 0;

protected:
EventManager() {}

};

} // namespace internal {

} // namespace process {

#endif // EVENT_MANAGER_BASE_HPP
Loading