diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml
new file mode 100644
index 00000000000..9ce1f1ad19c
--- /dev/null
+++ b/.idea/inspectionProfiles/Project_Default.xml
@@ -0,0 +1,44 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/README.md b/README.md
index ff5e9f565d6..413c3480191 100644
--- a/README.md
+++ b/README.md
@@ -32,8 +32,8 @@ Recommended hardware configuration for SeaStar
* CPUs - As much as you need. SeaStar is highly friendly for multi-core and NUMA
* NICs - As fast as possible, we recommend 10G or 40G cards. It's possible to use
- 1G to but you may be limited by their capacity.
- In addition, the more hardware queue per cpu the better for SeaStar.
+ 1G too but you may be limited by their capacity.
+ In addition, the more hardware queue per cpu the better for SeaStar.
Otherwise we have to emulate that in software.
* Disks - Fast SSDs with high number of IOPS.
* Client machines - Usually a single client machine can't load our servers.
diff --git a/apps/fair_queue_tester/fair_queue_tester.cc b/apps/fair_queue_tester/fair_queue_tester.cc
index 9b75d2d9674..4ddb0705b0f 100644
--- a/apps/fair_queue_tester/fair_queue_tester.cc
+++ b/apps/fair_queue_tester/fair_queue_tester.cc
@@ -33,6 +33,7 @@
#include
#include
+using namespace seastar;
using namespace std::chrono_literals;
static auto random_seed = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count();
diff --git a/apps/httpd/main.cc b/apps/httpd/main.cc
index f112a6ed5c3..15fabed887d 100644
--- a/apps/httpd/main.cc
+++ b/apps/httpd/main.cc
@@ -19,8 +19,11 @@
* Copyright 2015 Cloudius Systems
*/
+#include
+#include
#include "http/httpd.hh"
#include "http/handlers.hh"
+#include "http/websocket_handler.hh"
#include "http/function_handlers.hh"
#include "http/file_handler.hh"
#include "apps/httpd/demo.json.hh"
@@ -28,7 +31,9 @@
namespace bpo = boost::program_options;
+using namespace seastar;
using namespace httpd;
+using namespace websocket;
class handl : public httpd::handler_base {
public:
@@ -47,10 +52,59 @@ void set_routes(routes& r) {
function_handler* h2 = new function_handler([](std::unique_ptr req) {
return make_ready_future("json-future");
});
+
+ auto ws_echo_handler = new websocket::ws_handler();
+
+ ws_echo_handler->on_message_future([] (const std::unique_ptr& req, duplex_stream& stream,
+ message message) {
+ return stream.write(std::move(message)).then([&stream] {
+ return stream.flush();
+ });
+ });
+
+ auto ws_sha1_handler = new websocket::ws_handler();
+
+ ws_sha1_handler->on_connection_future([] (const std::unique_ptr& req, duplex_stream& stream) {
+ return stream.write(websocket::message(TEXT, "Hello from seastar ! Send any message to this endpoint"
+ " and get back it's payload SHA1 in base64 format !")).then([&stream] {
+ return stream.flush();
+ });
+ });
+
+ ws_sha1_handler->on_message_future([] (const std::unique_ptr& req, duplex_stream& stream,
+ message message) {
+ CryptoPP::SHA hash;
+ byte digest[CryptoPP::SHA::DIGESTSIZE];
+ hash.Update((byte*) message.payload.begin(), message.payload.size());
+ hash.Final(digest);
+
+ sstring base64;
+
+ CryptoPP::Base64Encoder encoder;
+ encoder.Put(digest, sizeof(digest));
+ encoder.MessageEnd();
+ CryptoPP::word64 size = encoder.MaxRetrievable();
+ if (size) {
+ base64.resize(size);
+ encoder.Get((byte*) base64.data(), base64.size());
+
+ return stream.write(websocket::message(TEXT, base64.substr(0, base64.size() - 1))).then([&stream] {
+ return stream.flush();
+ });
+ }
+ return make_ready_future();
+ });
+
+ ws_sha1_handler->on_disconnection([] (const std::unique_ptr& req) {
+ print("websocket client disconnected\n");
+ });
+
r.add(operation_type::GET, url("/"), h1);
r.add(operation_type::GET, url("/jf"), h2);
- r.add(operation_type::GET, url("/file").remainder("path"),
- new directory_handler("/"));
+ r.add(operation_type::GET, url("/file").remainder("path"), new directory_handler("/"));
+ r.put("/", ws_echo_handler);
+ r.put("/sha1", ws_sha1_handler);
+
demo_json::hello_world.set(r, [] (const_req req) {
demo_json::my_object obj;
obj.var1 = req.param.at("var1");
@@ -65,18 +119,18 @@ void set_routes(routes& r) {
int main(int ac, char** av) {
app_template app;
app.add_options()("port", bpo::value()->default_value(10000),
- "HTTP Server port");
+ "HTTP Server port");
return app.run_deprecated(ac, av, [&] {
- auto&& config = app.configuration();
+ auto &&config = app.configuration();
uint16_t port = config["port"].as();
auto server = new http_server_control();
auto rb = make_shared("apps/httpd/");
server->start().then([server] {
return server->set_routes(set_routes);
- }).then([server, rb]{
- return server->set_routes([rb](routes& r){rb->set_api_doc(r);});
- }).then([server, rb]{
- return server->set_routes([rb](routes& r) {rb->register_function(r, "demo", "hello world application");});
+ }).then([server, rb] {
+ return server->set_routes([rb](routes &r) { rb->set_api_doc(r); });
+ }).then([server, rb] {
+ return server->set_routes([rb](routes &r) { rb->register_function(r, "demo", "hello world application"); });
}).then([server, port] {
return server->listen(port);
}).then([server, port] {
@@ -85,6 +139,5 @@ int main(int ac, char** av) {
return server->stop();
});
});
-
});
}
diff --git a/apps/iotune/iotune.cc b/apps/iotune/iotune.cc
index 2c42d1bf5ee..304328ebc2d 100644
--- a/apps/iotune/iotune.cc
+++ b/apps/iotune/iotune.cc
@@ -44,9 +44,13 @@
#include "core/aligned_buffer.hh"
#include "util/defer.hh"
+using namespace seastar;
using namespace std::chrono_literals;
+namespace seastar {
bool filesystem_has_good_aio_support(sstring directory, bool verbose);
+}
+
class iotune_manager;
class iotune_timeout_exception : public std::exception {
diff --git a/apps/memcached/ascii.rl b/apps/memcached/ascii.rl
index f6f577c4126..fe5ad0a9b6c 100644
--- a/apps/memcached/ascii.rl
+++ b/apps/memcached/ascii.rl
@@ -25,6 +25,8 @@
#include
#include
+using namespace seastar;
+
%%{
machine memcache_ascii_protocol;
diff --git a/apps/memcached/memcache.cc b/apps/memcached/memcache.cc
index ef53ff0a38c..9a03677e3cc 100644
--- a/apps/memcached/memcache.cc
+++ b/apps/memcached/memcache.cc
@@ -48,12 +48,13 @@
#define VERSION "v1.0"
#define VERSION_STRING PLATFORM " " VERSION
+using namespace seastar;
using namespace net;
-namespace bi = boost::intrusive;
-
namespace memcache {
+namespace bi = boost::intrusive;
+
static constexpr double default_slab_growth_factor = 1.25;
static constexpr uint64_t default_slab_page_size = 1UL*MB;
static constexpr uint64_t default_per_cpu_slab_size = 0UL; // zero means reclaimer is enabled.
diff --git a/apps/memcached/memcached.hh b/apps/memcached/memcached.hh
index 54fa217fccd..10a355328fd 100644
--- a/apps/memcached/memcached.hh
+++ b/apps/memcached/memcached.hh
@@ -22,6 +22,8 @@
namespace memcache {
+using namespace seastar;
+
class item;
class cache;
diff --git a/apps/seawreck/seawreck.cc b/apps/seawreck/seawreck.cc
index fc9fbd72e3b..af87018601b 100644
--- a/apps/seawreck/seawreck.cc
+++ b/apps/seawreck/seawreck.cc
@@ -20,6 +20,7 @@
*/
#include "http/http_response_parser.hh"
+#include "http/websocket.hh"
#include "core/print.hh"
#include "core/reactor.hh"
#include "core/app-template.hh"
@@ -28,6 +29,9 @@
#include "core/semaphore.hh"
#include "core/future-util.hh"
#include
+#include
+
+using namespace seastar;
template
void http_debug(const char* fmt, Args&&... args) {
@@ -37,6 +41,7 @@ void http_debug(const char* fmt, Args&&... args) {
}
class http_client {
+
private:
unsigned _duration;
unsigned _conn_per_core;
@@ -48,13 +53,21 @@ class http_client {
bool _timer_based;
bool _timer_done{false};
uint64_t _total_reqs{0};
+ bool _websocket{false};
+ long _max_latency{0};
+ long _min_latency{INT_MAX};
+ double _sum_avr_latency{0};
+ unsigned _payload_size;
+
public:
- http_client(unsigned duration, unsigned total_conn, unsigned reqs_per_conn)
+ http_client(unsigned duration, unsigned total_conn, unsigned reqs_per_conn, bool websocket, unsigned payload_size)
: _duration(duration)
, _conn_per_core(total_conn / smp::count)
, _reqs_per_conn(reqs_per_conn)
, _run_timer([this] { _timer_done = true; })
- , _timer_based(reqs_per_conn == 0) {
+ , _timer_based(reqs_per_conn == 0)
+ , _websocket(websocket)
+ , _payload_size(payload_size) {
}
class connection {
@@ -65,6 +78,10 @@ class http_client {
http_response_parser _parser;
http_client* _http_client;
uint64_t _nr_done{0};
+ long _max_latency{0};
+ long _min_latency{INT_MAX};
+ long _sum_latency{0};
+
public:
connection(connected_socket&& fd, http_client* client)
: _fd(std::move(fd))
@@ -77,12 +94,25 @@ class http_client {
return _nr_done;
}
+ long max_latency() {
+ return _max_latency;
+ }
+
+ long min_latency() {
+ return _min_latency;
+ }
+
+ double avr_latency() {
+ return (double)_sum_latency / (double)_nr_done;
+ }
+
future<> do_req() {
+ auto start = std::chrono::steady_clock::now();
return _write_buf.write("GET / HTTP/1.1\r\nHost: 127.0.0.1:10000\r\n\r\n").then([this] {
return _write_buf.flush();
- }).then([this] {
+ }).then([this, start] {
_parser.init();
- return _read_buf.consume(_parser).then([this] {
+ return _read_buf.consume(_parser).then([this, start] {
// Read HTTP response header first
if (_parser.eof()) {
return make_ready_future<>();
@@ -96,8 +126,14 @@ class http_client {
auto content_len = std::stoi(it->second);
http_debug("Content-Length = %d\n", content_len);
// Read HTTP response body
- return _read_buf.read_exactly(content_len).then([this] (temporary_buffer buf) {
+ return _read_buf.read_exactly(content_len).then([this, start] (temporary_buffer buf) {
_nr_done++;
+ auto ping = std::chrono::duration_cast(std::chrono::steady_clock::now() - start).count();
+ _sum_latency += ping;
+ if (ping > _max_latency)
+ _max_latency = ping;
+ if (ping < _min_latency)
+ _min_latency = ping;
http_debug("%s\n", buf.get());
if (_http_client->done(_nr_done)) {
return make_ready_future();
@@ -110,11 +146,94 @@ class http_client {
}
};
+ class ws_connection {
+ private:
+ connected_socket _fd;
+ input_stream _read_buf;
+ output_stream _write_buf;
+ http_client* _http_client;
+ uint64_t _nr_done{0};
+ long _max_latency{0};
+ long _min_latency{INT_MAX};
+ long _sum_latency{0};
+ httpd::websocket::message _message;
+ temporary_buffer _header;
+ long _read_value;
+ public:
+ ws_connection(connected_socket&& fd, http_client* client)
+ : _fd(std::move(fd))
+ , _read_buf(_fd.input())
+ , _write_buf(_fd.output())
+ , _http_client(client) {
+ using random_bytes_engine = std::independent_bits_engine<
+ std::default_random_engine, std::numeric_limits::digits, unsigned char>;
+ random_bytes_engine rbe;
+
+ sstring payload(client->_payload_size, '\0');
+
+ std::generate(payload.begin(), payload.end(), std::ref(rbe));
+ _message = httpd::websocket::message(httpd::websocket::opcode::BINARY, payload);
+ _header = _message.get_header();
+ _read_value = _header.size() - 4 + _message.payload.size();
+ }
+
+ uint64_t nr_done() {
+ return _nr_done;
+ }
+
+ long max_latency() {
+ return _max_latency;
+ }
+
+ long min_latency() {
+ return _min_latency;
+ }
+
+ double avr_latency() {
+ return (double)_sum_latency / (double)_nr_done;
+ }
+
+ future<> do_req() {
+
+ auto start = std::chrono::steady_clock::now();
+ return _write_buf.write(temporary_buffer(_header.begin(), _header.size())).then([this] {
+ _write_buf.write(temporary_buffer(_message.payload.begin(), _message.payload.size()));
+ }).then([this] { return _write_buf.flush(); }).then([this, start] {
+ return _read_buf.skip(_read_value).then([this, start] {
+ auto ping = std::chrono::duration_cast(std::chrono::steady_clock::now() - start).count();
+ _sum_latency += ping;
+ if (ping > _max_latency)
+ _max_latency = ping;
+ if (ping < _min_latency)
+ _min_latency = ping;
+ _nr_done++;
+ if (_http_client->done(_nr_done)) {
+ return make_ready_future();
+ } else {
+ return do_req();
+ }
+ });
+ });
+ }
+ };
+
future total_reqs() {
print("Requests on cpu %2d: %ld\n", engine().cpu_id(), _total_reqs);
return make_ready_future(_total_reqs);
}
+ long max_latency() {
+ return _max_latency;
+ }
+
+ long min_latency() {
+ return _min_latency;
+ }
+
+ double avr_latency() {
+ return _sum_avr_latency / (double)_conn_per_core;
+ }
+
bool done(uint64_t nr_done) {
if (_timer_based) {
return _timer_done;
@@ -125,12 +244,25 @@ class http_client {
future<> connect(ipv4_addr server_addr) {
// Establish all the TCP connections first
- for (unsigned i = 0; i < _conn_per_core; i++) {
- engine().net().connect(make_ipv4_address(server_addr)).then([this] (connected_socket fd) {
- _sockets.push_back(std::move(fd));
- http_debug("Established connection %6d on cpu %3d\n", _conn_connected.current(), engine().cpu_id());
- _conn_connected.signal();
- }).or_terminate();
+ if (_websocket) {
+ for (unsigned i = 0; i < _conn_per_core; i++) {
+ httpd::websocket::connect(make_ipv4_address(server_addr)).then(
+ [this](connected_socket fd) {
+ _sockets.push_back(std::move(fd));
+ http_debug("Established connection %6d on cpu %3d\n", _conn_connected.current(),
+ engine().cpu_id());
+ _conn_connected.signal();
+ }).or_terminate();
+ }
+ }
+ else {
+ for (unsigned i = 0; i < _conn_per_core; i++) {
+ engine().net().connect(make_ipv4_address(server_addr)).then([this] (connected_socket fd) {
+ _sockets.push_back(std::move(fd));
+ http_debug("Established connection %6d on cpu %3d\n", _conn_connected.current(), engine().cpu_id());
+ _conn_connected.signal();
+ }).or_terminate();
+ }
}
return _conn_connected.wait(_conn_per_core);
}
@@ -141,19 +273,48 @@ class http_client {
if (_timer_based) {
_run_timer.arm(std::chrono::seconds(_duration));
}
- for (auto&& fd : _sockets) {
- auto conn = new connection(std::move(fd), this);
- conn->do_req().then_wrapped([this, conn] (auto&& f) {
- http_debug("Finished connection %6d on cpu %3d\n", _conn_finished.current(), engine().cpu_id());
- _total_reqs += conn->nr_done();
- _conn_finished.signal();
- delete conn;
- try {
- f.get();
- } catch (std::exception& ex) {
- print("http request error: %s\n", ex.what());
- }
- });
+
+ if (_websocket) {
+ for (auto&& fd : _sockets) {
+ auto conn = new ws_connection(std::move(fd), this);
+ conn->do_req().then_wrapped([this, conn] (auto&& f) {
+ http_debug("Finished connection %6d on cpu %3d\n", _conn_finished.current(), engine().cpu_id());
+ _total_reqs += conn->nr_done();
+ _sum_avr_latency += conn->avr_latency();
+ if (conn->max_latency() > _max_latency)
+ _max_latency = conn->max_latency();
+ if (conn->min_latency() < _min_latency)
+ _min_latency = conn->min_latency();
+ _conn_finished.signal();
+ delete conn;
+ try {
+ f.get();
+ } catch (std::exception& ex) {
+ print("websocket error: %s\n", ex.what());
+ }
+ });
+ }
+ }
+ else {
+ for (auto&& fd : _sockets) {
+ auto conn = new connection(std::move(fd), this);
+ conn->do_req().then_wrapped([this, conn] (auto&& f) {
+ http_debug("Finished connection %6d on cpu %3d\n", _conn_finished.current(), engine().cpu_id());
+ _total_reqs += conn->nr_done();
+ _sum_avr_latency += conn->avr_latency();
+ if (conn->max_latency() > _max_latency)
+ _max_latency = conn->max_latency();
+ if (conn->min_latency() < _min_latency)
+ _min_latency = conn->min_latency();
+ _conn_finished.signal();
+ delete conn;
+ try {
+ f.get();
+ } catch (std::exception& ex) {
+ print("http request error: %s\n", ex.what());
+ }
+ });
+ }
}
// All finished
@@ -164,6 +325,42 @@ class http_client {
}
};
+// Implements @Reducer concept.
+template
+class max {
+private:
+ Result _result;
+public:
+ future<> operator()(const Addend& value) {
+ if (value > _result)
+ _result += value;
+ return make_ready_future<>();
+ }
+ Result get() && {
+ return std::move(_result);
+ }
+};
+
+// Implements @Reducer concept.
+template
+class min {
+private:
+ Result _result;
+ bool init{false};
+public:
+ future<> operator()(const Addend& value) {
+ if (value < _result || !init)
+ {
+ _result += value;
+ init = true;
+ }
+ return make_ready_future<>();
+ }
+ Result get() && {
+ return std::move(_result);
+ }
+};
+
namespace bpo = boost::program_options;
int main(int ac, char** av) {
@@ -172,7 +369,9 @@ int main(int ac, char** av) {
("server,s", bpo::value()->default_value("192.168.66.100:10000"), "Server address")
("conn,c", bpo::value()->default_value(100), "total connections")
("reqs,r", bpo::value()->default_value(0), "reqs per connection")
- ("duration,d", bpo::value()->default_value(10), "duration of the test in seconds)");
+ ("duration,d", bpo::value()->default_value(10), "duration of the test in seconds)")
+ ("websocket,w", bpo::bool_switch()->default_value(false), "benchmark websocket")
+ ("size,z", bpo::value()->default_value(20), "websocket request payload size");
return app.run(ac, av, [&app] () -> future {
auto& config = app.configuration();
@@ -180,6 +379,8 @@ int main(int ac, char** av) {
auto reqs_per_conn = config["reqs"].as();
auto total_conn= config["conn"].as();
auto duration = config["duration"].as();
+ auto websocket = config["websocket"].as();
+ auto payload_size = config["size"].as();
if (total_conn % smp::count != 0) {
print("Error: conn needs to be n * cpu_nr\n");
@@ -191,33 +392,44 @@ int main(int ac, char** av) {
// Start http requests on all the cores
auto started = steady_clock_type::now();
print("========== http_client ============\n");
+ print("Benchmark: %s\n", websocket ? "websocket" : "http");
print("Server: %s\n", server);
print("Connections: %u\n", total_conn);
print("Requests/connection: %s\n", reqs_per_conn == 0 ? "dynamic (timer based)" : std::to_string(reqs_per_conn));
- return http_clients->start(std::move(duration), std::move(total_conn), std::move(reqs_per_conn)).then([http_clients, started, server] {
+ return http_clients->start(std::move(duration), std::move(total_conn), std::move(reqs_per_conn), std::move
+ (websocket), std::move(payload_size)).then([http_clients, started, server] {
return http_clients->invoke_on_all(&http_client::connect, ipv4_addr{server});
}).then([http_clients] {
return http_clients->invoke_on_all(&http_client::run);
}).then([http_clients] {
return http_clients->map_reduce(adder(), &http_client::total_reqs);
}).then([http_clients, started] (auto total_reqs) {
- // All the http requests are finished
- auto finished = steady_clock_type::now();
- auto elapsed = finished - started;
- auto secs = static_cast(elapsed.count() / 1000000000.0);
- print("Total cpus: %u\n", smp::count);
- print("Total requests: %u\n", total_reqs);
- print("Total time: %f\n", secs);
- print("Requests/sec: %f\n", static_cast(total_reqs) / secs);
- print("========== done ============\n");
- return http_clients->stop().then([http_clients] {
- // FIXME: If we call engine().exit(0) here to exit when
- // requests are done. The tcp connection will not be closed
- // properly, becasue we exit too earily and the FIN packets are
- // not exchanged.
+ // All the http requests are finished
+ auto finished = steady_clock_type::now();
+ auto elapsed = finished - started;
+ auto secs = elapsed.count() / 1000000000.0;
+ print("Total cpus: %u\n", smp::count);
+ print("Total requests: %u\n", total_reqs);
+ print("Total time: %f\n", secs);
+ print("Requests/sec: %f\n", static_cast(total_reqs) / secs);
+ }).then([http_clients] {
+ return when_all(http_clients->map_reduce(max(), &http_client::max_latency),
+ http_clients->map_reduce(min(), &http_client::min_latency),
+ http_clients->map_reduce(adder(), &http_client::avr_latency));
+ }).then([http_clients] (std::tuple, future, future> joined) {
+ print("Max latency : %u ms\n", std::get<0>(joined).get0());
+ print("Min latency : %u ms\n", std::get<1>(joined).get0());
+ print("Avr latency : %f ms\n", std::get<2>(joined).get0() / (double)smp::count);
+ }).then([http_clients] {
+ print("========== done ============\n");
+ return http_clients->stop().then([http_clients] {
+ // FIXME: If we call engine().exit(0) here to exit when
+ // requests are done. The tcp connection will not be closed
+ // properly, becasue we exit too earily and the FIN packets are
+ // not exchanged.
delete http_clients;
return make_ready_future(0);
- });
+ });
});
});
}
diff --git a/configure.py b/configure.py
index a4808f6744e..de8f34e998f 100755
--- a/configure.py
+++ b/configure.py
@@ -213,7 +213,6 @@ def sanitize_vptr_flag(compiler):
'tests/connect_test',
'tests/chunked_fifo_test',
'tests/circular_buffer_test',
- 'tests/scollectd_test',
'tests/perf/perf_fstream',
'tests/json_formatter_test',
'tests/dns_test',
@@ -329,6 +328,8 @@ def sanitize_vptr_flag(compiler):
'http/reply.cc',
'http/request_parser.rl',
'http/api_docs.cc',
+ 'http/websocket.cc',
+ 'http/websocket_message.cc'
]
boost_test_lib = [
@@ -426,7 +427,7 @@ def have_xen():
'tests/tcp_sctp_client': ['tests/tcp_sctp_client.cc'] + core + libnet,
'tests/tls_test': ['tests/tls_test.cc'] + core + libnet,
'tests/fair_queue_test': ['tests/fair_queue_test.cc'] + core,
- 'apps/seawreck/seawreck': ['apps/seawreck/seawreck.cc', 'http/http_response_parser.rl'] + core + libnet,
+ 'apps/seawreck/seawreck': ['apps/seawreck/seawreck.cc', 'http/http_response_parser.rl'] + core + libnet + http,
'apps/fair_queue_tester/fair_queue_tester': ['apps/fair_queue_tester/fair_queue_tester.cc'] + core,
'apps/iotune/iotune': ['apps/iotune/iotune.cc'] + ['core/resource.cc', 'core/fsqual.cc'],
'tests/blkdiscard_test': ['tests/blkdiscard_test.cc'] + core,
@@ -449,7 +450,6 @@ def have_xen():
'tests/connect_test': ['tests/connect_test.cc'] + core + libnet,
'tests/chunked_fifo_test': ['tests/chunked_fifo_test.cc'] + core,
'tests/circular_buffer_test': ['tests/circular_buffer_test.cc'] + core,
- 'tests/scollectd_test': ['tests/scollectd_test.cc'] + core,
'tests/perf/perf_fstream': ['tests/perf/perf_fstream.cc'] + core,
'tests/json_formatter_test': ['tests/json_formatter_test.cc'] + core + http,
'tests/dns_test': ['tests/dns_test.cc'] + core + libnet,
@@ -472,7 +472,6 @@ def have_xen():
'tests/fstream_test',
'tests/rpc_test',
'tests/connect_test',
- 'tests/scollectd_test',
'tests/json_formatter_test',
'tests/dns_test',
'tests/execution_stage_test',
@@ -570,7 +569,7 @@ def update(lines, vars):
if args.with_osv:
libs += '-lintel_dpdk -lrt -lm -ldl'
else:
- libs += '-Wl,--whole-archive -lrte_pmd_vmxnet3_uio -lrte_pmd_i40e -lrte_pmd_ixgbe -lrte_pmd_e1000 -lrte_pmd_ring -lrte_hash -lrte_kvargs -lrte_mbuf -lrte_ethdev -lrte_eal -lrte_mempool -lrte_ring -lrte_cmdline -lrte_cfgfile -Wl,--no-whole-archive -lrt -lm -ldl'
+ libs += '-Wl,--whole-archive -lrte_pmd_vmxnet3_uio -lrte_pmd_i40e -lrte_pmd_ixgbe -lrte_pmd_e1000 -lrte_pmd_ring -lrte_pmd_bnxt -lrte_pmd_cxgbe -lrte_pmd_ena -lrte_pmd_enic -lrte_pmd_fm10k -lrte_pmd_nfp -lrte_pmd_qede -lrte_pmd_sfc_efx -lrte_hash -lrte_kvargs -lrte_mbuf -lrte_ethdev -lrte_eal -lrte_mempool -lrte_ring -lrte_cmdline -lrte_cfgfile -Wl,--no-whole-archive -lrt -lm -ldl'
args.user_cflags += ' -Ifmt'
@@ -840,7 +839,7 @@ def have_hwloc():
f.write('build {}: ragel {}\n'.format(hh, src))
for hh in swaggers:
src = swaggers[hh]
- f.write('build {}: swagger {}\n'.format(hh,src))
+ f.write('build {}: swagger {} | json/json2code.py\n'.format(hh,src))
for pb in protobufs:
src = protobufs[pb]
c_pb = pb.replace('.h','.cc')
diff --git a/core/align.hh b/core/align.hh
index e53c5dd91e4..d24a231c10f 100644
--- a/core/align.hh
+++ b/core/align.hh
@@ -25,6 +25,8 @@
#include
#include
+namespace seastar {
+
template
inline constexpr
T align_up(T v, T align) {
@@ -51,4 +53,12 @@ T* align_down(T* v, size_t align) {
return reinterpret_cast(align_down(reinterpret_cast(v), align));
}
+
+template
+inline bool is_aligned(const T* v) {
+ return (uintptr_t)(const void *)(v) % sizeof(T) == 0;
+}
+
+}
+
#endif /* ALIGN_HH_ */
diff --git a/core/aligned_buffer.hh b/core/aligned_buffer.hh
index 1d8245a7a9c..ad6e0db02b1 100644
--- a/core/aligned_buffer.hh
+++ b/core/aligned_buffer.hh
@@ -24,6 +24,9 @@
#include
#include "print.hh"
+namespace seastar {
+
+
struct free_deleter {
void operator()(void* p) { ::free(p); }
};
@@ -45,3 +48,4 @@ std::unique_ptr allocate_aligned_buffer(size_t size, s
}
+}
diff --git a/core/app-template.cc b/core/app-template.cc
index 69430fe82da..29d73b8b774 100644
--- a/core/app-template.cc
+++ b/core/app-template.cc
@@ -30,6 +30,8 @@
#include
#include
+namespace seastar {
+
namespace bpo = boost::program_options;
app_template::app_template(app_template::config cfg)
@@ -141,3 +143,5 @@ app_template::run_deprecated(int ac, char ** av, std::function&& func)
smp::cleanup();
return exit_code;
}
+
+}
diff --git a/core/app-template.hh b/core/app-template.hh
index 9e85197d00a..be5b0718833 100644
--- a/core/app-template.hh
+++ b/core/app-template.hh
@@ -27,6 +27,8 @@
#include
#include
+namespace seastar {
+
class app_template {
public:
struct config {
@@ -63,4 +65,6 @@ public:
int run(int ac, char ** av, std::function ()>&& func);
};
+}
+
#endif
diff --git a/core/apply.hh b/core/apply.hh
index c015274fde7..437166ad5bd 100644
--- a/core/apply.hh
+++ b/core/apply.hh
@@ -25,6 +25,8 @@
#include
#include
+namespace seastar {
+
template
struct apply_helper;
@@ -56,4 +58,6 @@ auto apply(Func&& func, const std::tuple& args) {
return helper::apply(std::forward(func), args);
}
+}
+
#endif /* APPLY_HH_ */
diff --git a/core/array_map.hh b/core/array_map.hh
index 9e0a1f60d18..e9d46319cd8 100644
--- a/core/array_map.hh
+++ b/core/array_map.hh
@@ -24,6 +24,8 @@
#include
+namespace seastar {
+
// unordered_map implemented as a simple array
template
@@ -46,6 +48,6 @@ public:
}
};
-
+}
#endif /* ARRAY_MAP_HH_ */
diff --git a/core/bitops.hh b/core/bitops.hh
index c33768f5602..22458dba6a5 100644
--- a/core/bitops.hh
+++ b/core/bitops.hh
@@ -24,6 +24,8 @@
#include
+namespace seastar {
+
inline
constexpr unsigned count_leading_zeros(unsigned x) {
return __builtin_clz(x);
@@ -66,4 +68,6 @@ inline constexpr unsigned log2floor(T n) {
return std::numeric_limits::digits - count_leading_zeros(n) - 1;
}
+}
+
#endif /* BITOPS_HH_ */
diff --git a/core/bitset-iter.hh b/core/bitset-iter.hh
index a79a097f512..cec275fbc72 100644
--- a/core/bitset-iter.hh
+++ b/core/bitset-iter.hh
@@ -17,6 +17,8 @@
#include
#include
+namespace seastar {
+
namespace bitsets {
static constexpr int ulong_bits = std::numeric_limits::digits;
@@ -172,6 +174,7 @@ static inline set_range for_each_set(std::bitset bitset, int offset = 0)
return set_range(bitset, offset);
}
+}
}
diff --git a/core/byteorder.hh b/core/byteorder.hh
index beabdcfe17d..a3ac3f26499 100644
--- a/core/byteorder.hh
+++ b/core/byteorder.hh
@@ -25,6 +25,8 @@
#include
#include "unaligned.hh"
+namespace seastar {
+
inline uint8_t cpu_to_le(uint8_t x) { return x; }
inline uint8_t le_to_cpu(uint8_t x) { return x; }
inline uint16_t cpu_to_le(uint16_t x) { return htole16(x); }
@@ -121,3 +123,5 @@ produce_be(char*& p, T datum) {
write_be(p, datum);
p += sizeof(T);
}
+
+}
diff --git a/core/chunked_fifo.hh b/core/chunked_fifo.hh
index a60b796c4b9..217c43ddd0a 100644
--- a/core/chunked_fifo.hh
+++ b/core/chunked_fifo.hh
@@ -21,6 +21,11 @@
#pragma once
+#include
+#include
+
+namespace seastar {
+
// An unbounded FIFO queue of objects of type T.
//
// It provides operations to push items in one end of the queue, and pop them
@@ -75,9 +80,6 @@
// uses move/copy constructors instead of move/copy assignments, which are
// less efficient.
-#include
-#include
-
template
class chunked_fifo {
static_assert((items_per_chunk & (items_per_chunk - 1)) == 0,
@@ -462,3 +464,5 @@ void chunked_fifo::reserve(size_t n) {
++_nfree_chunks;
}
}
+
+}
diff --git a/core/circular_buffer.hh b/core/circular_buffer.hh
index 838dfe379a8..316792ad110 100644
--- a/core/circular_buffer.hh
+++ b/core/circular_buffer.hh
@@ -37,6 +37,8 @@
#include
#include
+namespace seastar {
+
template >
class circular_buffer {
struct impl : Alloc {
@@ -440,4 +442,6 @@ circular_buffer::erase(iterator first, iterator last) {
}
}
+}
+
#endif /* CIRCULAR_BUFFER_HH_ */
diff --git a/core/condition-variable.hh b/core/condition-variable.hh
index 33eef0e8431..9f948d913dc 100644
--- a/core/condition-variable.hh
+++ b/core/condition-variable.hh
@@ -24,6 +24,8 @@
#include "core/future-util.hh"
#include "core/semaphore.hh"
+namespace seastar {
+
/// \addtogroup fiber-module
/// @{
@@ -174,3 +176,5 @@ public:
};
/// @}
+
+}
diff --git a/core/deleter.hh b/core/deleter.hh
index 7749f624b34..ea3b0ef7e73 100644
--- a/core/deleter.hh
+++ b/core/deleter.hh
@@ -27,6 +27,8 @@
#include
#include
+namespace seastar {
+
/// \addtogroup memory-module
/// @{
@@ -272,4 +274,6 @@ make_object_deleter(deleter d, T&& obj) {
/// @}
+}
+
#endif /* DELETER_HH_ */
diff --git a/core/distributed.hh b/core/distributed.hh
index d95a6eea39f..0c4b62f7928 100644
--- a/core/distributed.hh
+++ b/core/distributed.hh
@@ -23,5 +23,10 @@
#include "sharded.hh"
+namespace seastar {
+
+
template
-using distributed = seastar::sharded;
+using distributed = sharded;
+
+}
diff --git a/core/do_with.hh b/core/do_with.hh
index 630bf8d2a3a..f991be69611 100644
--- a/core/do_with.hh
+++ b/core/do_with.hh
@@ -26,6 +26,8 @@
#include
#include
+namespace seastar {
+
/// \addtogroup future-util
/// @{
@@ -112,3 +114,5 @@ do_with(T1&& rv1, T2&& rv2, T3_or_F&& rv3, More&&... more) {
}
/// @}
+
+}
diff --git a/core/dpdk_rte.cc b/core/dpdk_rte.cc
index 46cc08897cd..5e358fcddf1 100644
--- a/core/dpdk_rte.cc
+++ b/core/dpdk_rte.cc
@@ -24,6 +24,8 @@
#include
#include
+namespace seastar {
+
namespace dpdk {
bool eal::initialized = false;
@@ -111,4 +113,6 @@ size_t eal::mem_size(int num_cpus, bool hugetlbfs_membackend)
} // namespace dpdk
+}
+
#endif // HAVE_DPDK
diff --git a/core/dpdk_rte.hh b/core/dpdk_rte.hh
index 215c1481add..52cec58a19a 100644
--- a/core/dpdk_rte.hh
+++ b/core/dpdk_rte.hh
@@ -38,6 +38,8 @@
#endif
/******************************************************************************/
+namespace seastar {
+
namespace dpdk {
// DPDK Environment Abstraction Layer
@@ -57,5 +59,8 @@ public:
};
} // namespace dpdk
+
+}
+
#endif // HAVE_DPDK
#endif // DPDK_RTE_HH_
diff --git a/core/enum.hh b/core/enum.hh
index be6c4ff9843..1ea3423224b 100644
--- a/core/enum.hh
+++ b/core/enum.hh
@@ -31,6 +31,8 @@
#include
#include
+namespace seastar {
+
template
class enum_hash {
static_assert(std::is_enum::value, "must be an enum");
@@ -40,3 +42,5 @@ public:
return std::hash()(static_cast(e));
}
};
+
+}
diff --git a/core/execution_stage.hh b/core/execution_stage.hh
index d5dcf5cc460..39b6d31f180 100644
--- a/core/execution_stage.hh
+++ b/core/execution_stage.hh
@@ -28,6 +28,7 @@
#include "metrics.hh"
#include "util/reference_wrapper.hh"
#include "util/gcc6-concepts.hh"
+#include "../util/defer.hh"
namespace seastar {
@@ -308,7 +309,7 @@ public:
/// Usage example:
/// ```
/// void do_something(int&, int, std::vector&&);
- /// thread_local auto stage = seastar::make_execution_stage(do_something);
+ /// thread_local auto stage = seastar::make_execution_stage("execution-stage", do_something);
///
/// int global_value;
///
@@ -346,14 +347,14 @@ public:
/// Usage example:
/// ```
/// double do_something(int);
-/// thread_local auto stage1 = seastar::make_execution_stage(do_something);
+/// thread_local auto stage1 = seastar::make_execution_stage("execution-stage1", do_something);
///
/// future func1(int val) {
/// return stage1(val);
/// }
///
/// future do_some_io(int);
-/// thread_local auto stage2 = seastar::make_execution_stage(do_some_io);
+/// thread_local auto stage2 = seastar::make_execution_stage("execution-stage2", do_some_io);
///
/// future func2(int val) {
/// return stage2(val);
@@ -382,14 +383,14 @@ auto make_execution_stage(const sstring& name, Function&& fn) {
/// void do_something(int);
/// };
///
-/// thread_local auto stage = seastar::make_execution_stage(&foo::do_something);
+/// thread_local auto stage = seastar::make_execution_stage("execution-stage", &foo::do_something);
///
/// future<> func(foo& obj, int val) {
/// return stage(&obj, val);
/// }
/// ```
///
-/// \see make_execution_stage(Function&&)
+/// \see make_execution_stage(const sstring&, Function&&)
/// \param name unique name of the execution stage
/// \param fn member function to be executed by the stage
/// \return concrete_execution_stage
@@ -407,34 +408,37 @@ auto make_execution_stage(const sstring& name, Ret (Object::*fn)(Args...) const)
inline execution_stage::execution_stage(const sstring& name)
: _name(name)
- , _metric_group("execution_stages", {
- metrics::make_derive("tasks_scheduled",
- metrics::description("Counts tasks scheduled by execution stages"),
- { metrics::label_instance("execution_stage", name), },
- [name, &esm = internal::execution_stage_manager::get()] {
- return esm.get_stage(name)->get_stats().tasks_scheduled;
- }),
- metrics::make_derive("tasks_preempted",
- metrics::description("Counts tasks which were preempted before execution all queued operations"),
- { metrics::label_instance("execution_stage", name), },
- [name, &esm = internal::execution_stage_manager::get()] {
- return esm.get_stage(name)->get_stats().tasks_preempted;
- }),
- metrics::make_derive("function_calls_enqueued",
- metrics::description("Counts function calls added to execution stages queues"),
- { metrics::label_instance("execution_stage", name), },
- [name, &esm = internal::execution_stage_manager::get()] {
- return esm.get_stage(name)->get_stats().function_calls_enqueued;
- }),
- metrics::make_derive("function_calls_executed",
- metrics::description("Counts function calls executed by execution stages"),
- { metrics::label_instance("execution_stage", name), },
- [name, &esm = internal::execution_stage_manager::get()] {
- return esm.get_stage(name)->get_stats().function_calls_executed;
- }),
- })
+
{
internal::execution_stage_manager::get().register_execution_stage(*this);
+ auto undo = defer([&] { internal::execution_stage_manager::get().unregister_execution_stage(*this); });
+ _metric_group = metrics::metric_group("execution_stages", {
+ metrics::make_derive("tasks_scheduled",
+ metrics::description("Counts tasks scheduled by execution stages"),
+ { metrics::label_instance("execution_stage", name), },
+ [name, &esm = internal::execution_stage_manager::get()] {
+ return esm.get_stage(name)->get_stats().tasks_scheduled;
+ }),
+ metrics::make_derive("tasks_preempted",
+ metrics::description("Counts tasks which were preempted before execution all queued operations"),
+ { metrics::label_instance("execution_stage", name), },
+ [name, &esm = internal::execution_stage_manager::get()] {
+ return esm.get_stage(name)->get_stats().tasks_preempted;
+ }),
+ metrics::make_derive("function_calls_enqueued",
+ metrics::description("Counts function calls added to execution stages queues"),
+ { metrics::label_instance("execution_stage", name), },
+ [name, &esm = internal::execution_stage_manager::get()] {
+ return esm.get_stage(name)->get_stats().function_calls_enqueued;
+ }),
+ metrics::make_derive("function_calls_executed",
+ metrics::description("Counts function calls executed by execution stages"),
+ { metrics::label_instance("execution_stage", name), },
+ [name, &esm = internal::execution_stage_manager::get()] {
+ return esm.get_stage(name)->get_stats().function_calls_executed;
+ }),
+ });
+ undo.cancel();
}
inline execution_stage::~execution_stage()
diff --git a/core/expiring_fifo.hh b/core/expiring_fifo.hh
index d5b8ceee696..63fe96259be 100644
--- a/core/expiring_fifo.hh
+++ b/core/expiring_fifo.hh
@@ -29,6 +29,8 @@
#include "future-util.hh"
#include "lowres_clock.hh"
+namespace seastar {
+
template
struct dummy_expiry {
void operator()(T&) noexcept {};
@@ -165,3 +167,5 @@ public:
drop_expired_front();
}
};
+
+}
diff --git a/core/fair_queue.hh b/core/fair_queue.hh
index 0862868ec6d..be8aa37244a 100644
--- a/core/fair_queue.hh
+++ b/core/fair_queue.hh
@@ -33,6 +33,8 @@
#include
#include
+namespace seastar {
+
/// \addtogroup io-module
/// @{
@@ -227,3 +229,5 @@ public:
}
};
/// @}
+
+}
diff --git a/core/file-impl.hh b/core/file-impl.hh
index f9cf465f63c..c35a9f9f887 100644
--- a/core/file-impl.hh
+++ b/core/file-impl.hh
@@ -25,6 +25,8 @@
#include
#include
+namespace seastar {
+
class posix_file_handle_impl : public seastar::file_handle_impl {
int _fd;
std::atomic* _refcount;
@@ -157,3 +159,4 @@ public:
virtual future<> allocate(uint64_t position, uint64_t length) override;
};
+}
diff --git a/core/file.hh b/core/file.hh
index 30028fa4c71..d8e6576c0a2 100644
--- a/core/file.hh
+++ b/core/file.hh
@@ -36,6 +36,8 @@
#include
#include
+namespace seastar {
+
/// \addtogroup fileio-module
/// @{
@@ -99,8 +101,6 @@ const io_priority_class& default_priority_class();
class file;
class file_impl;
-namespace seastar {
-
class file_handle;
// A handle that can be transported across shards and used to
@@ -112,8 +112,6 @@ public:
virtual shared_ptr to_file() && = 0;
};
-}
-
class file_impl {
protected:
static file_impl* get_file_impl(file& f);
@@ -135,7 +133,7 @@ public:
virtual future<> allocate(uint64_t position, uint64_t length) = 0;
virtual future size(void) = 0;
virtual future<> close() = 0;
- virtual std::unique_ptr dup();
+ virtual std::unique_ptr dup();
virtual subscription list_directory(std::function (directory_entry de)> next) = 0;
virtual future> dma_read_bulk(uint64_t offset, size_t range_size, const io_priority_class& pc) = 0;
@@ -174,7 +172,7 @@ public:
: _file_impl(std::move(impl)) {}
/// Constructs a file object from a \ref file_handle obtained from another shard
- explicit file(seastar::file_handle&& handle);
+ explicit file(file_handle&& handle);
/// Checks whether the file object was initialized.
///
@@ -430,7 +428,7 @@ public:
///
/// \note Use on read-only files.
///
- seastar::file_handle dup();
+ file_handle dup();
template
struct read_state;
@@ -439,8 +437,6 @@ private:
friend class file_impl;
};
-namespace seastar {
-
/// \brief A shard-transportable handle to a file
///
/// If you need to access a file (for reads only) across multiple shards,
@@ -449,9 +445,9 @@ namespace seastar {
/// object on that shard. This is more efficient than calling open_file_dma()
/// again.
class file_handle {
- std::unique_ptr _impl;
+ std::unique_ptr _impl;
private:
- explicit file_handle(std::unique_ptr impl) : _impl(std::move(impl)) {}
+ explicit file_handle(std::unique_ptr impl) : _impl(std::move(impl)) {}
public:
/// Copies a file handle object
file_handle(const file_handle&);
@@ -466,11 +462,9 @@ public:
/// Converts the file handle object to a \ref file.
file to_file() &&;
- friend class ::file;
+ friend class file;
};
-}
-
/// \cond internal
template
@@ -543,4 +537,6 @@ private:
/// @}
+}
+
#endif /* FILE_HH_ */
diff --git a/core/fsqual.cc b/core/fsqual.cc
index a7c50bb8b12..96c7ea6d9ff 100644
--- a/core/fsqual.cc
+++ b/core/fsqual.cc
@@ -31,6 +31,8 @@
#include
#include "fsqual.hh"
+namespace seastar {
+
// Runs func(), and also adds the number of context switches
// that happened during func() to counter.
template
@@ -92,3 +94,5 @@ bool filesystem_has_good_aio_support(sstring directory, bool verbose) {
}
return ok;
}
+
+}
diff --git a/core/fsqual.hh b/core/fsqual.hh
index c00a93ce2fd..5c6ee028346 100644
--- a/core/fsqual.hh
+++ b/core/fsqual.hh
@@ -25,6 +25,10 @@
#include "sstring.hh"
+namespace seastar {
+
bool filesystem_has_good_aio_support(sstring directory, bool verbose = false);
+}
+
#endif /* CORE_FSQUAL_HH_ */
diff --git a/core/fstream.cc b/core/fstream.cc
index 6c3e1317b4a..354cea75948 100644
--- a/core/fstream.cc
+++ b/core/fstream.cc
@@ -27,6 +27,8 @@
#include
#include
+namespace seastar {
+
class file_data_source_impl : public data_source_impl {
struct issued_read {
uint64_t _pos;
@@ -245,7 +247,7 @@ class file_data_source_impl : public data_source_impl {
_read_buffers.emplace_back(_pos, actual_size, futurize>>::apply([&] {
return _file.dma_read_bulk(start, len, _options.io_priority_class);
}).then_wrapped(
- [this, start, end, pos = _pos, remain = _remain] (future> ret) {
+ [this, start, pos = _pos, remain = _remain] (future> ret) {
--_reads_in_progress;
if (_done && !_reads_in_progress) {
_done->set_value();
@@ -426,3 +428,5 @@ output_stream make_file_output_stream(file f, file_output_stream_options o
return output_stream(file_data_sink(std::move(f), options), options.buffer_size, true);
}
+}
+
diff --git a/core/fstream.hh b/core/fstream.hh
index 79f7d00e3cb..e92622e9244 100644
--- a/core/fstream.hh
+++ b/core/fstream.hh
@@ -34,6 +34,8 @@
#include "iostream.hh"
#include "shared_ptr.hh"
+namespace seastar {
+
class file_input_stream_history {
static constexpr uint64_t window_size = 4 * 1024 * 1024;
struct window {
@@ -51,7 +53,7 @@ class file_input_stream_history {
struct file_input_stream_options {
size_t buffer_size = 8192; ///< I/O buffer size
unsigned read_ahead = 0; ///< Maximum number of extra read-ahead operations
- ::io_priority_class io_priority_class = default_priority_class();
+ ::seastar::io_priority_class io_priority_class = default_priority_class();
lw_shared_ptr dynamic_adjustments = { }; ///< Input stream history, if null dynamic adjustments are disabled
};
@@ -83,7 +85,7 @@ struct file_output_stream_options {
unsigned buffer_size = 8192;
unsigned preallocation_size = 1024*1024; // 1MB
unsigned write_behind = 1; ///< Number of buffers to write in parallel
- ::io_priority_class io_priority_class = default_priority_class();
+ ::seastar::io_priority_class io_priority_class = default_priority_class();
};
// Create an output_stream for writing starting at the position zero of a
@@ -100,3 +102,4 @@ output_stream make_file_output_stream(
file file,
file_output_stream_options options);
+}
diff --git a/core/function_traits.hh b/core/function_traits.hh
index 1d278a6207f..a3b9b9d31a8 100644
--- a/core/function_traits.hh
+++ b/core/function_traits.hh
@@ -23,6 +23,8 @@
#include
+namespace seastar {
+
template
struct function_traits;
@@ -63,3 +65,4 @@ template
struct function_traits : public function_traits>
{};
+}
diff --git a/core/future-util.hh b/core/future-util.hh
index 840f25f9f33..cfe1dbe31f6 100644
--- a/core/future-util.hh
+++ b/core/future-util.hh
@@ -37,6 +37,8 @@
#include
#include "util/tuple_utils.hh"
+namespace seastar {
+
/// \cond internal
extern __thread size_t task_quota;
/// \endcond
@@ -372,7 +374,7 @@ future<> do_for_each(Iterator begin, Iterator end, AsyncAction&& action) {
if (begin == end) {
return f;
}
- if (!f.available()) {
+ if (!f.available() || need_preempt()) {
return std::move(f).then([action = std::forward(action),
begin = std::move(begin), end = std::move(end)] () mutable {
return do_for_each(std::move(begin), std::move(end), std::forward(action));
@@ -403,7 +405,6 @@ future<> do_for_each(Container& c, AsyncAction&& action) {
}
/// \cond internal
-namespace seastar {
namespace internal {
template
@@ -446,12 +447,9 @@ public:
}
};
-}
}
/// \endcond
-namespace seastar {
-
GCC6_CONCEPT(
/// \cond internal
@@ -480,8 +478,6 @@ concept bool AllAreFutures = impl::is_tuple_of_futures>::val
)
-}
-
/// Wait for many futures to complete, capturing possible errors (variadic version).
///
@@ -497,14 +493,13 @@ GCC6_CONCEPT( requires seastar::AllAreFutures )
inline
future>
when_all(Futs&&... futs) {
- namespace si = seastar::internal;
+ namespace si = internal;
using state = si::when_all_state, Futs...>;
auto s = make_lw_shared(std::forward(futs)...);
return s->wait_all(std::make_index_sequence());
}
/// \cond internal
-namespace seastar {
namespace internal {
template
@@ -563,7 +558,6 @@ do_when_all(FutureIterator begin, FutureIterator end) {
return complete_when_all(std::move(ret), ret.begin());
}
-}
}
/// \endcond
@@ -582,7 +576,7 @@ GCC6_CONCEPT( requires requires (FutureIterator i) { { *i++ }; requires is_futur
inline
future::value_type>>
when_all(FutureIterator begin, FutureIterator end) {
- namespace si = seastar::internal;
+ namespace si = internal;
using itraits = std::iterator_traits;
using result_transform = si::identity_futures_vector;
return si::do_when_all(std::move(begin), std::move(end));
@@ -848,8 +842,6 @@ future with_timeout(std::chrono::time_point timeout, futu
return result;
}
-namespace seastar {
-
namespace internal {
template
diff --git a/core/future.hh b/core/future.hh
index de551fb6257..7e82684ea34 100644
--- a/core/future.hh
+++ b/core/future.hh
@@ -35,6 +35,8 @@
#include "function_traits.hh"
#include "../util/gcc6-concepts.hh"
+namespace seastar {
+
/// \defgroup future-module Futures and Promises
///
/// \brief
@@ -74,6 +76,9 @@ class promise;
template
class future;
+template
+class shared_future;
+
/// \brief Creates a \ref future in an available, value state.
///
/// Creates a \ref future object that is already resolved. This
@@ -648,8 +653,6 @@ using futurize_t = typename futurize::type;
/// @}
-namespace seastar {
-
GCC6_CONCEPT(
template
@@ -672,8 +675,6 @@ concept bool ApplyReturnsAnyFuture = requires (Func f, T... args) {
)
-}
-
/// \addtogroup future-module
/// @{
@@ -721,7 +722,7 @@ private:
template
void schedule(Func&& func) {
if (state()->available()) {
- ::schedule(std::make_unique>(std::move(func), std::move(*state())));
+ ::seastar::schedule(std::make_unique>(std::move(func), std::move(*state())));
} else {
assert(_promise);
_promise->schedule(std::move(func));
@@ -811,8 +812,8 @@ public:
std::tuple get() {
if (!state()->available()) {
wait();
- } else if (seastar::thread_impl::get() && seastar::thread_impl::should_yield()) {
- seastar::thread_impl::yield();
+ } else if (thread_impl::get() && thread_impl::should_yield()) {
+ thread_impl::yield();
}
return get_available_state().get();
}
@@ -836,13 +837,13 @@ public:
/// \cond internal
void wait() {
- auto thread = seastar::thread_impl::get();
+ auto thread = thread_impl::get();
assert(thread);
schedule([this, thread] (future_state&& new_state) {
*state() = std::move(new_state);
- seastar::thread_impl::switch_in(thread);
+ thread_impl::switch_in(thread);
});
- seastar::thread_impl::switch_out(thread);
+ thread_impl::switch_out(thread);
}
/// \endcond
@@ -878,7 +879,7 @@ public:
/// \return a \c future representing the return value of \c func, applied
/// to the eventual value of this future.
template >>
- GCC6_CONCEPT( requires seastar::CanApply )
+ GCC6_CONCEPT( requires ::seastar::CanApply )
Result
then(Func&& func) noexcept {
using futurator = futurize>;
@@ -925,7 +926,7 @@ public:
/// \return a \c future representing the return value of \c func, applied
/// to the eventual value of this future.
template >>
- GCC6_CONCEPT( requires seastar::CanApply )
+ GCC6_CONCEPT( requires ::seastar::CanApply )
Result
then_wrapped(Func&& func) noexcept {
using futurator = futurize>;
@@ -985,7 +986,7 @@ public:
* nested will be propagated.
*/
template
- GCC6_CONCEPT( requires seastar::CanApply )
+ GCC6_CONCEPT( requires ::seastar::CanApply )
future finally(Func&& func) noexcept {
return then_wrapped(finally_body>::value>(std::forward(func)));
}
@@ -1072,9 +1073,9 @@ public:
/// future<>, the handler function does not need to return anything.
template
/* Broken?
- GCC6_CONCEPT( requires seastar::ApplyReturns, std::exception_ptr>
- || (sizeof...(T) == 0 && seastar::ApplyReturns)
- || (sizeof...(T) == 1 && seastar::ApplyReturns)
+ GCC6_CONCEPT( requires ::seastar::ApplyReturns, std::exception_ptr>
+ || (sizeof...(T) == 0 && ::seastar::ApplyReturns)
+ || (sizeof...(T) == 1 && ::seastar::ApplyReturns)
) */
future handle_exception(Func&& func) noexcept {
using func_ret = std::result_of_t;
@@ -1162,9 +1163,9 @@ void promise::make_ready() noexcept {
if (_task) {
_state = nullptr;
if (Urgent == urgent::yes && !need_preempt()) {
- ::schedule_urgent(std::move(_task));
+ ::seastar::schedule_urgent(std::move(_task));
} else {
- ::schedule(std::move(_task));
+ ::seastar::schedule(std::move(_task));
}
}
}
@@ -1222,7 +1223,7 @@ template
template
typename futurize::type futurize::apply(Func&& func, std::tuple&& args) noexcept {
try {
- return convert(::apply(std::forward(func), std::move(args)));
+ return convert(::seastar::apply(std::forward(func), std::move(args)));
} catch (...) {
return make_exception_future(std::current_exception());
}
@@ -1266,7 +1267,7 @@ inline
std::enable_if_t>::value, future<>>
do_void_futurize_apply_tuple(Func&& func, std::tuple&& args) noexcept {
try {
- ::apply(std::forward(func), std::move(args));
+ ::seastar::apply(std::forward(func), std::move(args));
return make_ready_future<>();
} catch (...) {
return make_exception_future(std::current_exception());
@@ -1278,7 +1279,7 @@ inline
std::enable_if_t>::value, future<>>
do_void_futurize_apply_tuple(Func&& func, std::tuple&& args) noexcept {
try {
- return ::apply(std::forward(func), std::move(args));
+ return ::seastar::apply(std::forward(func), std::move(args));
} catch (...) {
return make_exception_future(std::current_exception());
}
@@ -1298,7 +1299,7 @@ template
template
typename futurize>::type futurize>::apply(Func&& func, std::tuple&& args) noexcept {
try {
- return ::apply(std::forward(func), std::move(args));
+ return ::seastar::apply(std::forward(func), std::move(args));
} catch (...) {
return make_exception_future(std::current_exception());
}
@@ -1319,7 +1320,7 @@ template
inline
future
futurize::make_exception_future(Arg&& arg) {
- return ::make_exception_future(std::forward(arg));
+ return ::seastar::make_exception_future(std::forward(arg));
}
template
@@ -1327,14 +1328,14 @@ template
inline
future
futurize>::make_exception_future(Arg&& arg) {
- return ::make_exception_future(std::forward(arg));
+ return ::seastar::make_exception_future(std::forward(arg));
}
template