Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[coro_http_server][feat]update http server #611

Merged
merged 2 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions include/ylt/coro_io/coro_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include <asio/connect.hpp>
#include <asio/dispatch.hpp>
#include <asio/experimental/channel.hpp>
#include <asio/ip/tcp.hpp>
#include <asio/read.hpp>
#include <asio/read_at.hpp>
Expand All @@ -36,6 +37,7 @@
#include <deque>

#include "io_context_pool.hpp"
#include "ylt/util/type_traits.h"

namespace coro_io {

Expand Down Expand Up @@ -347,6 +349,29 @@ post(Func func,
co_return co_await awaitor.await_resume(helper);
}

template <typename T>
async_simple::coro::Lazy<std::error_code> async_send(
asio::experimental::channel<void(std::error_code, T)> &channel, T val) {
callback_awaitor<std::error_code> awaitor;
co_return co_await awaitor.await_resume(
[&, val = std::move(val)](auto handler) {
channel.async_send({}, std::move(val), [handler](const auto &ec) {
handler.set_value_then_resume(ec);
});
});
}

template <typename R>
async_simple::coro::Lazy<std::pair<std::error_code, R>> async_receive(
asio::experimental::channel<void(std::error_code, R)> &channel) {
callback_awaitor<std::pair<std::error_code, R>> awaitor;
co_return co_await awaitor.await_resume([&](auto handler) {
channel.async_receive([handler](auto ec, auto val) {
handler.set_value_then_resume(std::make_pair(ec, std::move(val)));
});
});
}

template <typename Socket, typename AsioBuffer>
std::pair<asio::error_code, size_t> read_some(Socket &sock,
AsioBuffer &&buffer) {
Expand Down
26 changes: 20 additions & 6 deletions include/ylt/coro_io/io_context_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
#include <thread>
#include <type_traits>
#include <vector>
#include <ylt/easylog.hpp>
#ifdef __linux__
#include <pthread.h>
#include <sched.h>
#endif

namespace coro_io {

Expand Down Expand Up @@ -108,12 +111,12 @@ get_current_executor() {
class io_context_pool {
public:
using executor_type = asio::io_context::executor_type;
explicit io_context_pool(std::size_t pool_size) : next_io_context_(0) {
explicit io_context_pool(std::size_t pool_size, bool cpu_affinity = false)
: next_io_context_(0), cpu_affinity_(cpu_affinity) {
if (pool_size == 0) {
pool_size = 1; // set default value as 1
}

easylog::logger<>::instance();
for (std::size_t i = 0; i < pool_size; ++i) {
io_context_ptr io_context(new asio::io_context(1));
work_ptr work(new asio::io_context::work(*io_context));
Expand Down Expand Up @@ -141,6 +144,16 @@ class io_context_pool {
svr->run();
},
io_contexts_[i]));

#ifdef __linux__
if (cpu_affinity_) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(i, &cpuset);
pthread_setaffinity_np(threads.back()->native_handle(),
sizeof(cpu_set_t), &cpuset);
}
#endif
}

for (std::size_t i = 0; i < threads.size(); ++i) {
Expand Down Expand Up @@ -199,6 +212,7 @@ class io_context_pool {
std::promise<void> promise_;
std::atomic<bool> has_run_or_stop_ = false;
std::once_flag flag_;
bool cpu_affinity_ = false;
};

class multithread_context_pool {
Expand All @@ -211,7 +225,7 @@ class multithread_context_pool {
~multithread_context_pool() { stop(); }

void run() {
for (std::size_t i = 0; i < thd_num_; i++) {
for (int i = 0; i < thd_num_; i++) {
thds_.emplace_back([this] {
ioc_.run();
});
Expand Down Expand Up @@ -248,7 +262,7 @@ template <typename T = io_context_pool>
inline T &g_io_context_pool(
unsigned pool_size = std::thread::hardware_concurrency()) {
static auto _g_io_context_pool = std::make_shared<T>(pool_size);
static bool run_helper = [](auto pool) {
[[maybe_unused]] static bool run_helper = [](auto pool) {
std::thread thrd{[pool] {
pool->run();
}};
Expand All @@ -262,7 +276,7 @@ template <typename T = io_context_pool>
inline T &g_block_io_context_pool(
unsigned pool_size = std::thread::hardware_concurrency()) {
static auto _g_io_context_pool = std::make_shared<T>(pool_size);
static bool run_helper = [](auto pool) {
[[maybe_unused]] static bool run_helper = [](auto pool) {
std::thread thrd{[pool] {
pool->run();
}};
Expand Down
31 changes: 21 additions & 10 deletions include/ylt/thirdparty/cinatra/coro_http_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,18 +338,29 @@ class coro_http_connection
buffers_.clear();
body_.clear();
resp_str_.clear();
multi_buf_ = true;
if (need_shrink_every_time_) {
body_.shrink_to_fit();
}
}
}

async_simple::coro::Lazy<bool> reply(bool need_to_bufffer = true) {
// avoid duplicate reply
if (need_to_bufffer) {
response_.to_buffers(buffers_);
std::error_code ec;
size_t size;
if (multi_buf_) {
if (need_to_bufffer) {
response_.to_buffers(buffers_);
}
std::tie(ec, size) = co_await async_write(buffers_);
}
auto [ec, _] = co_await async_write(buffers_);
else {
if (need_to_bufffer) {
response_.build_resp_str(resp_str_);
}
std::tie(ec, size) = co_await async_write(asio::buffer(resp_str_));
}

if (ec) {
CINATRA_LOG_ERROR << "async_write error: " << ec.message();
close();
Expand Down Expand Up @@ -394,6 +405,8 @@ class coro_http_connection
return ss.str();
}

void set_multi_buf(bool r) { multi_buf_ = r; }

async_simple::coro::Lazy<bool> write_data(std::string_view message) {
std::vector<asio::const_buffer> buffers;
buffers.push_back(asio::buffer(message));
Expand Down Expand Up @@ -761,13 +774,10 @@ class coro_http_connection

private:
bool check_keep_alive() {
bool keep_alive = true;
auto val = request_.get_header_value("connection");
if (!val.empty() && iequal0(val, "close")) {
keep_alive = false;
if (parser_.has_close()) {
return false;
}

return keep_alive;
return true;
}

void build_ws_handshake_head() {
Expand Down Expand Up @@ -823,5 +833,6 @@ class coro_http_connection
bool use_ssl_ = false;
#endif
bool need_shrink_every_time_ = false;
bool multi_buf_ = true;
};
} // namespace cinatra
6 changes: 1 addition & 5 deletions include/ylt/thirdparty/cinatra/coro_http_request.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,17 +188,13 @@ class coro_http_request {
coro_http_connection *get_conn() { return conn_; }

bool is_upgrade() {
auto h = get_header_value("Connection");
if (h.empty())
if (!parser_.has_upgrade())
return false;

auto u = get_header_value("Upgrade");
if (u.empty())
return false;

if (h != UPGRADE)
return false;

if (u != WEBSOCKET)
return false;

Expand Down
1 change: 1 addition & 0 deletions include/ylt/thirdparty/cinatra/coro_http_response.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class coro_http_response {

status_type status() { return status_; }
std::string_view content() { return content_; }
size_t content_size() { return content_.size(); }

void add_header(auto k, auto v) {
resp_headers_.emplace_back(resp_header{std::move(k), std::move(v)});
Expand Down
14 changes: 4 additions & 10 deletions include/ylt/thirdparty/cinatra/coro_http_router.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,9 @@ class coro_http_router {
}

if (whole_str.find(":") != std::string::npos) {
std::vector<std::string> coro_method_names = {};
std::string coro_method_str;
coro_method_str.append(method_name);
coro_method_names.push_back(coro_method_str);
std::string method_str(method_name);
coro_router_tree_->coro_insert(key, std::move(http_handler),
coro_method_names);
method_str);
}
else {
if (whole_str.find("{") != std::string::npos ||
Expand Down Expand Up @@ -138,11 +135,8 @@ class coro_http_router {
}

if (whole_str.find(':') != std::string::npos) {
std::vector<std::string> method_names = {};
std::string method_str;
method_str.append(method_name);
method_names.push_back(method_str);
router_tree_->insert(whole_str, std::move(http_handler), method_names);
std::string method_str(method_name);
router_tree_->insert(whole_str, std::move(http_handler), method_str);
}
else if (whole_str.find("{") != std::string::npos ||
whole_str.find(")") != std::string::npos) {
Expand Down
6 changes: 4 additions & 2 deletions include/ylt/thirdparty/cinatra/coro_http_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ class coro_http_server {
coro_http_server(asio::io_context &ctx, unsigned short port)
: out_ctx_(&ctx), port_(port), acceptor_(ctx), check_timer_(ctx) {}

coro_http_server(size_t thread_num, unsigned short port)
: pool_(std::make_unique<coro_io::io_context_pool>(thread_num)),
coro_http_server(size_t thread_num, unsigned short port,
bool cpu_affinity = false)
: pool_(std::make_unique<coro_io::io_context_pool>(thread_num,
cpu_affinity)),
port_(port),
acceptor_(pool_->get_executor()->get_asio_executor()),
check_timer_(pool_->get_executor()->get_asio_executor()) {}
Expand Down
Loading
Loading