Skip to content

Commit

Permalink
Merge commit 'aa6673118acc90aefc4a83722ce44b16505d41b8'
Browse files Browse the repository at this point in the history
  • Loading branch information
sunilrottoo committed Apr 28, 2014
2 parents dbb0f06 + aa66731 commit d1425a9
Show file tree
Hide file tree
Showing 10 changed files with 288 additions and 288 deletions.
55 changes: 26 additions & 29 deletions soa/service/http_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,9 @@ HttpClient(const string & baseUrl, int numParallel, size_t queueSize)
fd_(-1),
wakeup_(EFD_NONBLOCK | EFD_CLOEXEC),
timerFd_(-1),
connections_(nullptr),
connectionStash_(numParallel),
avlConnections_(numParallel),
nextAvail_(0),
queue_(queueSize)
{
fd_ = epoll_create1(EPOLL_CLOEXEC);
Expand All @@ -161,8 +162,10 @@ HttpClient(const string & baseUrl, int numParallel, size_t queueSize)
::curl_multi_setopt(handle_, CURLMOPT_TIMERFUNCTION, timerCallback);
::curl_multi_setopt(handle_, CURLMOPT_TIMERDATA, this);

/* connections */
fixConnectionStash();
/* available connections */
for (size_t i = 0; i < connectionStash_.size(); i++) {
avlConnections_[i] = &connectionStash_[i];
}

/* kick start multi */
int runningHandles;
Expand All @@ -183,6 +186,8 @@ HttpClient(HttpClient && other)
wakeup_(move(other.wakeup_)),
timerFd_(other.timerFd_),
connectionStash_(move(other.connectionStash_)),
avlConnections_(move(other.avlConnections_)),
nextAvail_(other.nextAvail_),
queue_(move(other.queue_))
{
other.fd_ = -1;
Expand All @@ -196,7 +201,6 @@ HttpClient(HttpClient && other)
::curl_multi_setopt(handle_, CURLMOPT_SOCKETDATA, this);
::curl_multi_setopt(handle_, CURLMOPT_TIMERFUNCTION, timerCallback);
::curl_multi_setopt(handle_, CURLMOPT_TIMERDATA, this);
fixConnectionStash();
}

HttpClient::
Expand All @@ -210,20 +214,6 @@ HttpClient::
}
}

void
HttpClient::
fixConnectionStash()
{
connections_ = &connectionStash_[0];
HttpConnection * current = connections_;
for (size_t i = 1; i < connectionStash_.size(); i++) {
current->next = &connectionStash_[i];
current = current->next;
}
current->next = nullptr;
avlConnections_ = connectionStash_.size();
}

void
HttpClient::
enablePipelining()
Expand All @@ -244,7 +234,8 @@ operator = (HttpClient && other)
timerFd_ = other.timerFd_;
other.timerFd_ = -1;
connectionStash_ = move(other.connectionStash_);
fixConnectionStash();
avlConnections_ = move(other.avlConnections_);
nextAvail_ = other.nextAvail_;
queue_ = move(other.queue_);

return *this;
Expand Down Expand Up @@ -369,7 +360,8 @@ handleWakeupEvent()
wakeup_.read();
// cerr << " wakeup event\n";

if (avlConnections_ > 0) {
size_t numAvail = avlConnections_.size() - nextAvail_;
if (numAvail > 0) {
/* empty the queue of events on the wakeup fd */
bool retry(false);
while (retry) {
Expand All @@ -382,8 +374,7 @@ handleWakeupEvent()
}
}

vector<HttpRequest> requests = queue_.tryPopMulti(avlConnections_);

vector<HttpRequest> requests = queue_.tryPopMulti(numAvail);
for (HttpRequest & request: requests) {
HttpConnection *conn = getConnection();
conn->request_ = move(request);
Expand Down Expand Up @@ -551,10 +542,14 @@ HttpConnection *
HttpClient::
getConnection()
{
HttpConnection * conn = connections_;
if (conn) {
connections_ = conn->next;
avlConnections_--;
HttpConnection * conn;

if (nextAvail_ < avlConnections_.size()) {
conn = avlConnections_[nextAvail_];
nextAvail_++;
}
else {
conn = nullptr;
}

return conn;
Expand All @@ -564,9 +559,10 @@ void
HttpClient::
releaseConnection(HttpConnection * oldConnection)
{
oldConnection->next = connections_;
connections_ = oldConnection;
avlConnections_++;
if (nextAvail_ > 0) {
nextAvail_--;
avlConnections_[nextAvail_] = oldConnection;
}
}


Expand Down Expand Up @@ -632,6 +628,7 @@ perform(bool noSSLChecks, bool debug)
easy_.setOpt<curlopt::HeaderFunction>(onHeader_);
easy_.setOpt<curlopt::WriteFunction>(onWrite_);
easy_.setOpt<curlopt::ReadFunction>(onRead_);
easy_.setOpt<curlopt::BufferSize>(65536);
if (request_.timeout_ != -1) {
easy_.setOpt<curlopt::Timeout>(request_.timeout_);
}
Expand Down
6 changes: 2 additions & 4 deletions soa/service/http_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,6 @@ struct HttpClient : public AsyncEventSource {
virtual int selectFd() const;
virtual bool processOne();

void fixConnectionStash();

/* Local */
bool enqueueRequest(const std::string & verb,
const std::string & resource,
Expand Down Expand Up @@ -263,9 +261,9 @@ struct HttpClient : public AsyncEventSource {
curlpp::Multi multi_;
::CURLM * handle_;

HttpConnection * connections_;
std::vector<HttpConnection> connectionStash_;
size_t avlConnections_;
std::vector<HttpConnection *> avlConnections_;
size_t nextAvail_;

ML::RingBufferSRMW<HttpRequest> queue_; /* queued requests */
};
Expand Down
17 changes: 16 additions & 1 deletion soa/service/passive_endpoint.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include <unordered_map>

#include "jml/arch/futex.h"
#include "soa/service//passive_endpoint.h"
#include <poll.h>
#include <boost/date_time/gregorian/gregorian.hpp>
Expand Down Expand Up @@ -56,7 +57,7 @@ init(PortRange const & portRange, const std::string & hostname, int num_threads,

AcceptorT<SocketTransport>::
AcceptorT()
: fd(-1), endpoint(0)
: fd(-1), endpoint(0), listening_(false)
{
}

Expand Down Expand Up @@ -143,6 +144,9 @@ listen(PortRange const & portRange,
addr.set(&inAddr, inAddrLen);
}

listening_ = true;
ML::futex_wake(listening_);

shutdown = false;

acceptThread.reset(new boost::thread([=] () { this->runAcceptThread(); }));
Expand Down Expand Up @@ -306,4 +310,15 @@ runAcceptThread()
}
}

void
AcceptorT<SocketTransport>::
waitListening()
const
{
while (!listening_) {
int oldListening = listening_;
ML::futex_wait(listening_, oldListening);
}
}

} // namespace Datacratic
16 changes: 16 additions & 0 deletions soa/service/passive_endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ struct Acceptor {

/** What port are we listening on? */
virtual int port() const = 0;

/** Wait until we are ready to accept connections */
virtual void waitListening() const = 0;
};


Expand Down Expand Up @@ -86,6 +89,16 @@ struct PassiveEndpoint: public EndpointBase {
return acceptor->listen(portRange, host, this, nameLookup, backlog);
}

/** Wait until we are ready to accept connections */
void waitListening()
const
{
if (!acceptor)
throw ML::Exception("can't listen without acceptor");

acceptor->waitListening();
}

/** Closing the peer in the context of a passive endpoint means
simply not accepting connections any more.
*/
Expand Down Expand Up @@ -197,13 +210,16 @@ struct AcceptorT<SocketTransport> : public Acceptor {
*/
void runAcceptThread();

/** Wait until we are ready to accept connections */
void waitListening() const;

protected:
std::shared_ptr<boost::thread> acceptThread;
ML::Wakeup_Fd wakeup;
ACE_INET_Addr addr;
int fd;
PassiveEndpoint * endpoint;
int listening_; // whether the socket is listening
bool nameLookup;
bool shutdown;
};
Expand Down
113 changes: 2 additions & 111 deletions soa/service/testing/http_client_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,121 +17,12 @@
#include "soa/service/rest_proxy.h"
#include "soa/service/rest_service_endpoint.h"

#include "test_http_services.h"

using namespace std;
using namespace Datacratic;


/* http service */

struct HttpTestConnHandler;

struct HttpService : public ServiceBase, public HttpEndpoint {
HttpService(const shared_ptr<ServiceProxies> & proxies)
: ServiceBase("http-test-service", proxies),
HttpEndpoint("http-test-service-ep"),
portToUse(0)
{
}

~HttpService()
{
shutdown();
}

void start()
{
init(portToUse, "127.0.0.1", 1);
}

virtual shared_ptr<ConnectionHandler> makeNewHandler();
virtual void handleHttpPayload(HttpTestConnHandler & handler,
const HttpHeader & header,
const string & payload) = 0;

int portToUse;
};

struct HttpTestConnHandler : HttpConnectionHandler {
virtual void handleHttpPayload(const HttpHeader & header,
const string & payload) {
HttpService *svc = (HttpService *) httpEndpoint;
svc->handleHttpPayload(*this, header, payload);
}

void sendResponse(int code, const string & body, const string & type)
{
putResponseOnWire(HttpResponse(code, type, body));
}
};

shared_ptr<ConnectionHandler>
HttpService::
makeNewHandler()
{
return make_shared<HttpTestConnHandler>();
}

struct HttpGetService : public HttpService {
HttpGetService(const shared_ptr<ServiceProxies> & proxies)
: HttpService(proxies)
{}

struct TestResponse {
TestResponse(int code = 0, const string & body = "")
: code_(code), body_(body)
{}

int code_;
string body_;
};

void handleHttpPayload(HttpTestConnHandler & handler,
const HttpHeader & header,
const string & payload)
{
string key = header.verb + ":" + header.resource;
if (header.resource == "/timeout") {
sleep(3);
handler.sendResponse(200, "Will time out", "text/plain");
}
else if (header.resource == "/headers") {
string headersBody("{\n");
bool first(true);
for (const auto & it: header.headers) {
if (first) {
first = false;
}
else {
headersBody += ",\n";
}
headersBody += " \"" + it.first + "\": \"" + it.second + "\"\n";
}
headersBody += "}\n";
handler.sendResponse(200, headersBody, "application/json");
}
else {
const auto & it = responses_.find(key);
if (it == responses_.end()) {
handler.sendResponse(404, "Not found", "text/plain");
}
else {
const TestResponse & resp = it->second;
handler.sendResponse(resp.code_, resp.body_, "text/plain");
}
}
}

void addResponse(const string & verb, const string & resource,
int code, const string & body)
{
string key = verb + ":" + resource;
responses_[key] = TestResponse(code, body);
}

map<string, TestResponse> responses_;
};


/* bench methods */

void
Expand Down
Loading

0 comments on commit d1425a9

Please sign in to comment.