From f5ab808df784f3aa66aff1d1a5c4d413a1a7e247 Mon Sep 17 00:00:00 2001 From: zhao liwei Date: Fri, 22 May 2020 11:10:53 +0800 Subject: [PATCH] refactor: make aio decoupled from dsn_runtime (#466) --- include/dsn/tool-api/file_io.h | 2 + include/dsn/tool-api/global_config.h | 4 +- include/dsn/tool-api/task.h | 11 +--- include/dsn/tool_api.h | 3 - src/core/CMakeLists.txt | 1 + src/core/aio/CMakeLists.txt | 17 ++++++ src/core/{core => aio}/aio_provider.cpp | 13 +---- .../tool-api => src/core/aio}/aio_provider.h | 2 - src/core/aio/aio_task.cpp | 57 +++++++++++++++++++ src/core/{core => aio}/disk_engine.cpp | 36 ++++-------- src/core/{core => aio}/disk_engine.h | 25 ++++---- src/core/{core => aio}/file_io.cpp | 20 ++++--- .../native_linux_aio_provider.cpp} | 20 +++---- .../native_linux_aio_provider.h} | 6 +- .../sim_aio_provider.cpp} | 18 ++---- .../diske.sim.h => aio/sim_aio_provider.h} | 17 ++---- src/core/core/global_config.cpp | 3 + src/core/core/service_api_c.cpp | 1 - src/core/core/service_engine.cpp | 21 +------ src/core/core/service_engine.h | 5 -- src/core/core/task.cpp | 51 ----------------- src/core/core/tool_api.cpp | 7 --- src/core/tests/CMakeLists.txt | 1 + src/core/tests/aio.cpp | 13 ----- src/core/tests/async_call.cpp | 1 - src/core/tests/lpc.cpp | 1 - src/core/tests/netprovider.cpp | 1 - src/core/tools/common/nativerun.cpp | 4 +- src/core/tools/common/providers.common.cpp | 3 - src/core/tools/simulator/simulator.cpp | 7 +-- src/dist/nfs/CMakeLists.txt | 2 +- src/dist/nfs/nfs_server_impl.h | 2 +- src/dist/nfs/test/CMakeLists.txt | 2 +- src/dist/replication/lib/CMakeLists.txt | 1 + .../replication/meta_server/CMakeLists.txt | 1 + .../test/meta_test/unit_test/CMakeLists.txt | 1 + 36 files changed, 152 insertions(+), 228 deletions(-) create mode 100644 src/core/aio/CMakeLists.txt rename src/core/{core => aio}/aio_provider.cpp (88%) rename {include/dsn/tool-api => src/core/aio}/aio_provider.h (99%) create mode 100644 src/core/aio/aio_task.cpp rename src/core/{core => aio}/disk_engine.cpp (95%) rename src/core/{core => aio}/disk_engine.h (91%) rename src/core/{core => aio}/file_io.cpp (88%) rename src/core/{tools/common/native_aio_provider.linux.cpp => aio/native_linux_aio_provider.cpp} (97%) rename src/core/{tools/common/native_aio_provider.linux.h => aio/native_linux_aio_provider.h} (97%) rename src/core/{tools/simulator/diske.sim.cpp => aio/sim_aio_provider.cpp} (88%) rename src/core/{tools/simulator/diske.sim.h => aio/sim_aio_provider.h} (83%) diff --git a/include/dsn/tool-api/file_io.h b/include/dsn/tool-api/file_io.h index 9e41adc10c..cf8383b32d 100644 --- a/include/dsn/tool-api/file_io.h +++ b/include/dsn/tool-api/file_io.h @@ -87,5 +87,7 @@ extern aio_task_ptr write_vector(disk_file *file, aio_handler &&callback, int hash = 0); +extern aio_context_ptr prepare_aio_context(aio_task *tsk); + } // namespace file } // namespace dsn diff --git a/include/dsn/tool-api/global_config.h b/include/dsn/tool-api/global_config.h index 5fac7b78d6..b6741384d1 100644 --- a/include/dsn/tool-api/global_config.h +++ b/include/dsn/tool-api/global_config.h @@ -152,7 +152,6 @@ struct service_spec bool enable_default_app_mimic; std::string timer_factory_name; - std::string aio_factory_name; std::string env_factory_name; std::string lock_factory_name; std::string lock_nr_factory_name; @@ -189,7 +188,6 @@ CONFIG_FLD( "; in this case, a [apps.mimic] section must be defined in config files"); CONFIG_FLD_STRING(timer_factory_name, "", "timer service provider") -CONFIG_FLD_STRING(aio_factory_name, "", "asynchonous file system provider") CONFIG_FLD_STRING(env_factory_name, "", "environment provider") CONFIG_FLD_STRING(lock_factory_name, "", "recursive exclusive lock provider") CONFIG_FLD_STRING(lock_nr_factory_name, "", "non-recurisve exclusive lock provider") @@ -213,4 +211,6 @@ ENUM_REG(SYS_EXIT_NORMAL) ENUM_REG(SYS_EXIT_BREAK) ENUM_REG(SYS_EXIT_EXCEPTION) ENUM_END(sys_exit_type) + +extern const char *FLAGS_aio_factory_name; } // namespace dsn diff --git a/include/dsn/tool-api/task.h b/include/dsn/tool-api/task.h index 090b87efbd..9c0ce5285a 100644 --- a/include/dsn/tool-api/task.h +++ b/include/dsn/tool-api/task.h @@ -75,7 +75,6 @@ struct __tls_dsn__ int node_id; rpc_engine *rpc; - disk_engine *disk; env_provider *env; int last_worker_queue_size; @@ -248,7 +247,6 @@ class task : public ref_counter, public extensible_object, public trans static int get_current_worker_index(); static const char *get_current_node_name(); static rpc_engine *get_current_rpc(); - static disk_engine *get_current_disk(); static env_provider *get_current_env(); static void set_tls_dsn_context( @@ -572,6 +570,7 @@ class aio_context : public ref_counter { } }; +typedef dsn::ref_ptr aio_context_ptr; class aio_task : public task { @@ -608,7 +607,7 @@ class aio_task : public task void clear_non_trivial_on_task_end() override { _cb = nullptr; } private: - dsn::ref_ptr _aio_ctx; + aio_context_ptr _aio_ctx; size_t _transferred_size; aio_handler _cb; }; @@ -677,12 +676,6 @@ __inline /*static*/ rpc_engine *task::get_current_rpc() return tls_dsn.rpc; } -__inline /*static*/ disk_engine *task::get_current_disk() -{ - check_tls_dsn(); - return tls_dsn.disk; -} - __inline /*static*/ env_provider *task::get_current_env() { check_tls_dsn(); diff --git a/include/dsn/tool_api.h b/include/dsn/tool_api.h index 00928da8da..cd40bd6db1 100644 --- a/include/dsn/tool_api.h +++ b/include/dsn/tool_api.h @@ -57,7 +57,6 @@ Component providers define the interface for the local components (e.g., network #include #include #include -#include #include #include #include @@ -140,8 +139,6 @@ DSN_API bool register_component_provider(const char *name, DSN_API bool register_component_provider(const char *name, network::factory f, ::dsn::provider_type type); DSN_API bool -register_component_provider(const char *name, aio_provider::factory f, ::dsn::provider_type type); -DSN_API bool register_component_provider(const char *name, env_provider::factory f, ::dsn::provider_type type); DSN_API bool register_component_provider(const char *name, logging_provider::factory f, diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt index 01686fff05..21f55f8bd3 100644 --- a/src/core/CMakeLists.txt +++ b/src/core/CMakeLists.txt @@ -2,6 +2,7 @@ add_subdirectory(core) add_subdirectory(perf_counter) add_subdirectory(tools) add_subdirectory(tests) +add_subdirectory(aio) add_library(dsn_runtime STATIC $ diff --git a/src/core/aio/CMakeLists.txt b/src/core/aio/CMakeLists.txt new file mode 100644 index 0000000000..6fad9e66a2 --- /dev/null +++ b/src/core/aio/CMakeLists.txt @@ -0,0 +1,17 @@ +set(MY_PROJ_NAME dsn_aio) + +#Source files under CURRENT project directory will be automatically included. +#You can manually set MY_PROJ_SRC to include source files under other directories. +set(MY_PROJ_SRC "") + +#Search mode for source files under CURRENT project directory ? +#"GLOB_RECURSE" for recursive search +#"GLOB" for non - recursive search +set(MY_SRC_SEARCH_MODE "GLOB") + +set(MY_PROJ_LIBS dsn_runtime) + +#Extra files that will be installed +set(MY_BINPLACES "") + +dsn_add_static_library() diff --git a/src/core/core/aio_provider.cpp b/src/core/aio/aio_provider.cpp similarity index 88% rename from src/core/core/aio_provider.cpp rename to src/core/aio/aio_provider.cpp index 7bc3d893cb..ffe3b0df9b 100644 --- a/src/core/core/aio_provider.cpp +++ b/src/core/aio/aio_provider.cpp @@ -24,16 +24,7 @@ * THE SOFTWARE. */ -/* - * Description: - * What is this file about? - * - * Revision history: - * xxxx-xx-xx, author, first version - * xxxx-xx-xx, author, fix bug about xxx - */ - -#include +#include "aio_provider.h" #include "disk_engine.h" namespace dsn { @@ -50,4 +41,4 @@ void aio_provider::complete_io(aio_task *aio, _engine->complete_io(aio, err, bytes, delay_milliseconds); } -} // end namespace dsn +} // namespace dsn diff --git a/include/dsn/tool-api/aio_provider.h b/src/core/aio/aio_provider.h similarity index 99% rename from include/dsn/tool-api/aio_provider.h rename to src/core/aio/aio_provider.h index ea9a0eb7e5..558f1b4ce2 100644 --- a/include/dsn/tool-api/aio_provider.h +++ b/src/core/aio/aio_provider.h @@ -89,8 +89,6 @@ class aio_provider virtual aio_context *prepare_aio_context(aio_task *) = 0; - virtual void start() = 0; - protected: DSN_API void complete_io(aio_task *aio, error_code err, uint32_t bytes, int delay_milliseconds = 0); diff --git a/src/core/aio/aio_task.cpp b/src/core/aio/aio_task.cpp new file mode 100644 index 0000000000..d74ce20573 --- /dev/null +++ b/src/core/aio/aio_task.cpp @@ -0,0 +1,57 @@ +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include "core/core/task_engine.h" +#include +#include + +namespace dsn { + +aio_task::aio_task(dsn::task_code code, const aio_handler &cb, int hash, service_node *node) + : aio_task(code, aio_handler(cb), hash, node) +{ +} + +aio_task::aio_task(dsn::task_code code, aio_handler &&cb, int hash, service_node *node) + : task(code, hash, node), _cb(std::move(cb)) +{ + _is_null = (_cb == nullptr); + + dassert(TASK_TYPE_AIO == spec().type, + "%s is not of AIO type, please use DEFINE_TASK_CODE_AIO to define the task code", + spec().name.c_str()); + set_error_code(ERR_IO_PENDING); + + _aio_ctx = file::prepare_aio_context(this); +} + +void aio_task::collapse() +{ + if (!_unmerged_write_buffers.empty()) { + std::shared_ptr buffer(dsn::utils::make_shared_array(_aio_ctx->buffer_size)); + char *dest = buffer.get(); + for (const dsn_file_buffer_t &b : _unmerged_write_buffers) { + ::memcpy(dest, b.buffer, b.size); + dest += b.size; + } + dassert(dest - buffer.get() == _aio_ctx->buffer_size, + "%u VS %u", + dest - buffer.get(), + _aio_ctx->buffer_size); + _aio_ctx->buffer = buffer.get(); + _merged_write_buffer_holder.assign(std::move(buffer), 0, _aio_ctx->buffer_size); + } +} + +void aio_task::enqueue(error_code err, size_t transferred_size) +{ + set_error_code(err); + _transferred_size = transferred_size; + + spec().on_aio_enqueue.execute(this); + + task::enqueue(node()->computation()->get_pool(spec().pool_code)); +} + +} // namespace dsn diff --git a/src/core/core/disk_engine.cpp b/src/core/aio/disk_engine.cpp similarity index 95% rename from src/core/core/disk_engine.cpp rename to src/core/aio/disk_engine.cpp index ddd3998830..f1389f03a3 100644 --- a/src/core/core/disk_engine.cpp +++ b/src/core/aio/disk_engine.cpp @@ -25,6 +25,8 @@ */ #include "disk_engine.h" +#include "sim_aio_provider.h" +#include "core/core/service_engine.h" using namespace dsn::utils; @@ -132,25 +134,20 @@ aio_task *disk_file::on_write_completed(aio_task *wk, void *ctx, error_code err, } //----------------- disk_engine ------------------------ -disk_engine::disk_engine(service_node *node) +disk_engine::disk_engine() { - _is_running = false; - _provider = nullptr; - _node = node; + _node = service_engine::instance().get_all_nodes().begin()->second.get(); + + // use native_linux_aio_provider in default + if (!strcmp(FLAGS_aio_factory_name, "dsn::tools::sim_aio_provider")) { + _provider.reset(new aio::sim_aio_provider(this, nullptr)); + } else { + _provider.reset(new native_linux_aio_provider(this, nullptr)); + } } disk_engine::~disk_engine() {} -void disk_engine::start(aio_provider *provider) -{ - if (_is_running) - return; - - _provider = provider; - _provider->start(); - _is_running = true; -} - disk_file *disk_engine::open(const char *file_name, int flag, int pmode) { dsn_handle_t nh = _provider->open(file_name, flag, pmode); @@ -185,11 +182,6 @@ error_code disk_engine::flush(disk_file *fh) void disk_engine::read(aio_task *aio) { - if (!_is_running) { - aio->enqueue(ERR_SERVICE_NOT_FOUND, 0); - return; - } - if (!aio->spec().on_aio_call.execute(task::get_current_task(), aio, true)) { aio->enqueue(ERR_FILE_OPERATION_FAILED, 0); return; @@ -233,11 +225,6 @@ class batch_write_io_task : public aio_task void disk_engine::write(aio_task *aio) { - if (!_is_running) { - aio->enqueue(ERR_SERVICE_NOT_FOUND, 0); - return; - } - if (!aio->spec().on_aio_call.execute(task::get_current_task(), aio, true)) { aio->enqueue(ERR_FILE_OPERATION_FAILED, 0); return; @@ -342,5 +329,4 @@ void disk_engine::complete_io(aio_task *aio, error_code err, uint32_t bytes, int } } } - } // namespace dsn diff --git a/src/core/core/disk_engine.h b/src/core/aio/disk_engine.h similarity index 91% rename from src/core/core/disk_engine.h rename to src/core/aio/disk_engine.h index 774d0056e6..397c4891e8 100644 --- a/src/core/core/disk_engine.h +++ b/src/core/aio/disk_engine.h @@ -26,10 +26,9 @@ #pragma once -#include "service_engine.h" +#include "aio_provider.h" #include -#include #include namespace dsn { @@ -68,14 +67,9 @@ class disk_file work_queue _read_queue; }; -class disk_engine +class disk_engine : public utils::singleton { public: - disk_engine(service_node *node); - ~disk_engine(); - - void start(aio_provider *provider); - // asynchonous file read/write disk_file *open(const char *file_name, int flag, int pmode); error_code close(disk_file *fh); @@ -84,19 +78,22 @@ class disk_engine void write(aio_task *aio); aio_context *prepare_aio_context(aio_task *tsk) { return _provider->prepare_aio_context(tsk); } - service_node *node() const { return _node; } private: - friend class aio_provider; - friend class batch_write_io_task; + // the object of disk_engine must be created by `singleton::instance` + disk_engine(); + ~disk_engine(); + void process_write(aio_task *wk, uint32_t sz); void complete_io(aio_task *aio, error_code err, uint32_t bytes, int delay_milliseconds = 0); -private: - volatile bool _is_running; - aio_provider *_provider; + std::unique_ptr _provider; service_node *_node; + + friend class aio_provider; + friend class batch_write_io_task; + friend class utils::singleton; }; } // namespace dsn diff --git a/src/core/core/file_io.cpp b/src/core/aio/file_io.cpp similarity index 88% rename from src/core/core/file_io.cpp rename to src/core/aio/file_io.cpp index 229f95c4c2..1f736a8ae5 100644 --- a/src/core/core/file_io.cpp +++ b/src/core/aio/file_io.cpp @@ -24,21 +24,21 @@ * THE SOFTWARE. */ -#include - #include "disk_engine.h" +#include + namespace dsn { namespace file { /*extern*/ disk_file *open(const char *file_name, int flag, int pmode) { - return task::get_current_disk()->open(file_name, flag, pmode); + return disk_engine::instance().open(file_name, flag, pmode); } -/*extern*/ error_code close(disk_file *file) { return task::get_current_disk()->close(file); } +/*extern*/ error_code close(disk_file *file) { return disk_engine::instance().close(file); } -/*extern*/ error_code flush(disk_file *file) { return task::get_current_disk()->flush(file); } +/*extern*/ error_code flush(disk_file *file) { return disk_engine::instance().flush(file); } /*extern*/ aio_task_ptr read(disk_file *file, char *buffer, @@ -56,7 +56,7 @@ namespace file { cb->get_aio_context()->file_offset = offset; cb->get_aio_context()->type = AIO_Read; - task::get_current_disk()->read(cb); + disk_engine::instance().read(cb); return cb; } @@ -76,7 +76,7 @@ namespace file { cb->get_aio_context()->file_offset = offset; cb->get_aio_context()->type = AIO_Write; - task::get_current_disk()->write(cb); + disk_engine::instance().write(cb); return cb; } @@ -100,9 +100,13 @@ namespace file { } } - task::get_current_disk()->write(cb); + disk_engine::instance().write(cb); return cb; } +aio_context_ptr prepare_aio_context(aio_task *tsk) +{ + return disk_engine::instance().prepare_aio_context(tsk); +} } // namespace file } // namespace dsn diff --git a/src/core/tools/common/native_aio_provider.linux.cpp b/src/core/aio/native_linux_aio_provider.cpp similarity index 97% rename from src/core/tools/common/native_aio_provider.linux.cpp rename to src/core/aio/native_linux_aio_provider.cpp index 5b90a485d6..55869aa859 100644 --- a/src/core/tools/common/native_aio_provider.linux.cpp +++ b/src/core/aio/native_linux_aio_provider.cpp @@ -24,22 +24,26 @@ * THE SOFTWARE. */ -#include "native_aio_provider.linux.h" +#include "native_linux_aio_provider.h" #include #include namespace dsn { -namespace tools { native_linux_aio_provider::native_linux_aio_provider(disk_engine *disk, aio_provider *inner_provider) : aio_provider(disk, inner_provider) { - memset(&_ctx, 0, sizeof(_ctx)); auto ret = io_setup(128, &_ctx); // 128 concurrent events dassert(ret == 0, "io_setup error, ret = %d", ret); + + _is_running = true; + _worker = std::thread([this, disk]() { + task::set_tls_dsn_context(node(), nullptr); + get_event(); + }); } native_linux_aio_provider::~native_linux_aio_provider() @@ -55,15 +59,6 @@ native_linux_aio_provider::~native_linux_aio_provider() _worker.join(); } -void native_linux_aio_provider::start() -{ - _is_running = true; - _worker = std::thread([this]() { - task::set_tls_dsn_context(node(), nullptr); - get_event(); - }); -} - dsn_handle_t native_linux_aio_provider::open(const char *file_name, int flag, int pmode) { dsn_handle_t fh = (dsn_handle_t)(uintptr_t)::open(file_name, flag, pmode); @@ -233,5 +228,4 @@ error_code native_linux_aio_provider::aio_internal(aio_task *aio_tsk, } } -} // namespace tools } // namespace dsn diff --git a/src/core/tools/common/native_aio_provider.linux.h b/src/core/aio/native_linux_aio_provider.h similarity index 97% rename from src/core/tools/common/native_aio_provider.linux.h rename to src/core/aio/native_linux_aio_provider.h index d5f6cc6566..eb4353e6e8 100644 --- a/src/core/tools/common/native_aio_provider.linux.h +++ b/src/core/aio/native_linux_aio_provider.h @@ -26,6 +26,8 @@ #pragma once +#include "aio_provider.h" + #include #include #include @@ -37,7 +39,6 @@ #include /* uint64_t */ namespace dsn { -namespace tools { class native_linux_aio_provider : public aio_provider { @@ -51,8 +52,6 @@ class native_linux_aio_provider : public aio_provider virtual void aio(aio_task *aio) override; virtual aio_context *prepare_aio_context(aio_task *tsk) override; - virtual void start() override; - class linux_disk_aio_context : public aio_context { public: @@ -80,5 +79,4 @@ class native_linux_aio_provider : public aio_provider std::thread _worker; }; -} // namespace tools } // namespace dsn diff --git a/src/core/tools/simulator/diske.sim.cpp b/src/core/aio/sim_aio_provider.cpp similarity index 88% rename from src/core/tools/simulator/diske.sim.cpp rename to src/core/aio/sim_aio_provider.cpp index 263bae9129..f8aad8f569 100644 --- a/src/core/tools/simulator/diske.sim.cpp +++ b/src/core/aio/sim_aio_provider.cpp @@ -24,19 +24,10 @@ * THE SOFTWARE. */ -/* - * Description: - * What is this file about? - * - * Revision history: - * xxxx-xx-xx, author, first version - * xxxx-xx-xx, author, fix bug about xxx - */ - -#include "diske.sim.h" +#include "sim_aio_provider.h" namespace dsn { -namespace tools { +namespace aio { DEFINE_TASK_CODE(LPC_NATIVE_AIO_REDIRECT, TASK_PRIORITY_HIGH, THREAD_POOL_DEFAULT) @@ -55,5 +46,6 @@ void sim_aio_provider::aio(aio_task *aio) err = aio_internal(aio, false, &bytes); complete_io(aio, err, bytes, 0); } -} -} // end namespace + +} // namespace aio +} // namespace dsn diff --git a/src/core/tools/simulator/diske.sim.h b/src/core/aio/sim_aio_provider.h similarity index 83% rename from src/core/tools/simulator/diske.sim.h rename to src/core/aio/sim_aio_provider.h index a431afe228..b37342796b 100644 --- a/src/core/tools/simulator/diske.sim.h +++ b/src/core/aio/sim_aio_provider.h @@ -24,22 +24,12 @@ * THE SOFTWARE. */ -/* - * Description: - * What is this file about? - * - * Revision history: - * xxxx-xx-xx, author, first version - * xxxx-xx-xx, author, fix bug about xxx - */ - #pragma once -#include -#include "core/tools/common/native_aio_provider.linux.h" +#include "native_linux_aio_provider.h" namespace dsn { -namespace tools { +namespace aio { class sim_aio_provider : public native_linux_aio_provider { @@ -49,5 +39,6 @@ class sim_aio_provider : public native_linux_aio_provider virtual void aio(aio_task *aio) override; }; -} // namespace tools + +} // namespace aio } // namespace dsn diff --git a/src/core/core/global_config.cpp b/src/core/core/global_config.cpp index 701a90ca88..745d216397 100644 --- a/src/core/core/global_config.cpp +++ b/src/core/core/global_config.cpp @@ -38,6 +38,7 @@ #include #include #include +#include #include #include @@ -46,6 +47,8 @@ namespace dsn { +DSN_DEFINE_string("core", aio_factory_name, "", "asynchonous file system provider"); + static bool build_client_network_confs(const char *section, /*out*/ network_client_configs &nss, network_client_configs *default_spec) diff --git a/src/core/core/service_api_c.cpp b/src/core/core/service_api_c.cpp index 093f423ce3..565d3ee158 100644 --- a/src/core/core/service_api_c.cpp +++ b/src/core/core/service_api_c.cpp @@ -38,7 +38,6 @@ #include "service_engine.h" #include "rpc_engine.h" -#include "disk_engine.h" #include "task_engine.h" #include "coredump.h" diff --git a/src/core/core/service_engine.cpp b/src/core/core/service_engine.cpp index ef59248d45..99026b061d 100644 --- a/src/core/core/service_engine.cpp +++ b/src/core/core/service_engine.cpp @@ -26,7 +26,6 @@ #include "service_engine.h" #include "task_engine.h" -#include "disk_engine.h" #include "rpc_engine.h" #include @@ -56,31 +55,15 @@ bool service_node::rpc_unregister_handler(dsn::task_code rpc_code) error_code service_node::init_io_engine() { - auto &spec = service_engine::instance().spec(); - error_code err = ERR_OK; - - // init disk engine - _node_io.disk = make_unique(this); - aio_provider *aio = factory_store::create( - spec.aio_factory_name.c_str(), ::dsn::PROVIDER_TYPE_MAIN, _node_io.disk.get(), nullptr); - _node_io.aio.reset(aio); - // init rpc engine _node_io.rpc = make_unique(this); - - return err; + return ERR_OK; } error_code service_node::start_io_engine_in_main() { - error_code err = ERR_OK; - - // start disk engine - _node_io.disk->start(_node_io.aio.get()); - // start rpc engine - err = _node_io.rpc->start(_app_spec); - return err; + return _node_io.rpc->start(_app_spec); } dsn::error_code service_node::start_app() diff --git a/src/core/core/service_engine.h b/src/core/core/service_engine.h index b1a0310704..29afad40b4 100644 --- a/src/core/core/service_engine.h +++ b/src/core/core/service_engine.h @@ -49,13 +49,11 @@ namespace dsn { class task_engine; class rpc_engine; -class disk_engine; class env_provider; class nfs_node; class task_queue; class task_worker_pool; class timer_service; -class aio_provider; // // @@ -66,8 +64,6 @@ class service_node struct io_engine { std::unique_ptr rpc; - std::unique_ptr disk; - std::unique_ptr aio; }; public: @@ -76,7 +72,6 @@ class service_node ~service_node(); rpc_engine *rpc() const { return _node_io.rpc.get(); } - disk_engine *disk() const { return _node_io.disk.get(); } task_engine *computation() const { return _computation.get(); } void get_runtime_info(const std::string &indent, diff --git a/src/core/core/task.cpp b/src/core/core/task.cpp index 4e9f114475..c2f9595e1a 100644 --- a/src/core/core/task.cpp +++ b/src/core/core/task.cpp @@ -36,7 +36,6 @@ #include #include -#include #include #include #include @@ -46,8 +45,6 @@ #include "task_engine.h" #include "service_engine.h" -#include "service_engine.h" -#include "disk_engine.h" #include "rpc_engine.h" namespace dsn { @@ -76,7 +73,6 @@ __thread uint16_t tls_dsn_lower32_task_id_mask = 0; tls_dsn.worker_index = worker ? worker->index() : -1; tls_dsn.current_task = nullptr; tls_dsn.rpc = node->rpc(); - tls_dsn.disk = node->disk(); tls_dsn.env = service_engine::instance().env(); } @@ -591,51 +587,4 @@ void rpc_response_task::enqueue() } } -aio_task::aio_task(dsn::task_code code, const aio_handler &cb, int hash, service_node *node) - : aio_task(code, aio_handler(cb), hash, node) -{ -} - -aio_task::aio_task(dsn::task_code code, aio_handler &&cb, int hash, service_node *node) - : task(code, hash, node), _cb(std::move(cb)) -{ - _is_null = (_cb == nullptr); - - dassert(TASK_TYPE_AIO == spec().type, - "%s is not of AIO type, please use DEFINE_TASK_CODE_AIO to define the task code", - spec().name.c_str()); - set_error_code(ERR_IO_PENDING); - - disk_engine *disk = task::get_current_disk(); - _aio_ctx = disk->prepare_aio_context(this); -} - -void aio_task::collapse() -{ - if (!_unmerged_write_buffers.empty()) { - std::shared_ptr buffer(dsn::utils::make_shared_array(_aio_ctx->buffer_size)); - char *dest = buffer.get(); - for (const dsn_file_buffer_t &b : _unmerged_write_buffers) { - ::memcpy(dest, b.buffer, b.size); - dest += b.size; - } - dassert(dest - buffer.get() == _aio_ctx->buffer_size, - "%u VS %u", - dest - buffer.get(), - _aio_ctx->buffer_size); - _aio_ctx->buffer = buffer.get(); - _merged_write_buffer_holder.assign(std::move(buffer), 0, _aio_ctx->buffer_size); - } -} - -void aio_task::enqueue(error_code err, size_t transferred_size) -{ - set_error_code(err); - _transferred_size = transferred_size; - - spec().on_aio_enqueue.execute(this); - - task::enqueue(node()->computation()->get_pool(spec().pool_code)); -} - } // namespace dsn diff --git a/src/core/core/tool_api.cpp b/src/core/core/tool_api.cpp index f216352308..4e26e05628 100644 --- a/src/core/core/tool_api.cpp +++ b/src/core/core/tool_api.cpp @@ -149,13 +149,6 @@ bool register_component_provider(const char *name, network::factory f, ::dsn::pr return dsn::utils::factory_store::register_factory(name, f, type); } -bool register_component_provider(const char *name, - aio_provider::factory f, - ::dsn::provider_type type) -{ - return dsn::utils::factory_store::register_factory(name, f, type); -} - bool register_component_provider(const char *name, env_provider::factory f, ::dsn::provider_type type) diff --git a/src/core/tests/CMakeLists.txt b/src/core/tests/CMakeLists.txt index 8e7a5779b3..72d6b36ff2 100644 --- a/src/core/tests/CMakeLists.txt +++ b/src/core/tests/CMakeLists.txt @@ -7,6 +7,7 @@ set(MY_SRC_SEARCH_MODE "GLOB") set(MY_PROJ_LIBS gtest dsn_runtime + dsn_aio ) set(MY_BOOST_LIBS Boost::system Boost::filesystem Boost::regex) diff --git a/src/core/tests/aio.cpp b/src/core/tests/aio.cpp index 9b000f32f6..0927c77bab 100644 --- a/src/core/tests/aio.cpp +++ b/src/core/tests/aio.cpp @@ -33,7 +33,6 @@ * xxxx-xx-xx, author, fix bug about xxx */ -#include #include #include #include @@ -47,9 +46,6 @@ DEFINE_TASK_CODE_AIO(LPC_AIO_TEST, TASK_PRIORITY_COMMON, THREAD_POOL_TEST_SERVER TEST(core, aio) { - if (task::get_current_disk() == nullptr) - return; - const char *buffer = "hello, world"; int len = (int)strlen(buffer); @@ -142,9 +138,6 @@ TEST(core, aio) TEST(core, aio_share) { - if (task::get_current_disk() == nullptr) - return; - auto fp = file::open("tmp", O_WRONLY | O_CREAT | O_BINARY, 0666); EXPECT_TRUE(fp != nullptr); @@ -159,9 +152,6 @@ TEST(core, aio_share) TEST(core, operation_failed) { - if (task::get_current_disk() == nullptr) - return; - auto fp = file::open("tmp_test_file", O_WRONLY, 0600); EXPECT_TRUE(fp == nullptr); @@ -213,9 +203,6 @@ struct aio_result }; TEST(core, dsn_file) { - if (task::get_current_disk() == nullptr) - return; - int64_t fin_size, fout_size; ASSERT_TRUE(utils::filesystem::file_size("command.txt", fin_size)); ASSERT_LT(0, fin_size); diff --git a/src/core/tests/async_call.cpp b/src/core/tests/async_call.cpp index 22261bb315..91a72d1327 100644 --- a/src/core/tests/async_call.cpp +++ b/src/core/tests/async_call.cpp @@ -33,7 +33,6 @@ * xxxx-xx-xx, author, fix bug about xxx */ -#include #include #include #include diff --git a/src/core/tests/lpc.cpp b/src/core/tests/lpc.cpp index d441e4fc40..e8d2fbb800 100644 --- a/src/core/tests/lpc.cpp +++ b/src/core/tests/lpc.cpp @@ -33,7 +33,6 @@ * xxxx-xx-xx, author, fix bug about xxx */ -#include #include #include #include "test_utils.h" diff --git a/src/core/tests/netprovider.cpp b/src/core/tests/netprovider.cpp index b0d35709e3..e0148b1b58 100644 --- a/src/core/tests/netprovider.cpp +++ b/src/core/tests/netprovider.cpp @@ -38,7 +38,6 @@ #include -#include #include #include diff --git a/src/core/tools/common/nativerun.cpp b/src/core/tools/common/nativerun.cpp index 30dbbd696a..5359fe8ceb 100644 --- a/src/core/tools/common/nativerun.cpp +++ b/src/core/tools/common/nativerun.cpp @@ -40,8 +40,8 @@ namespace tools { void nativerun::install(service_spec &spec) { - if (spec.aio_factory_name == "") { - spec.aio_factory_name = ("dsn::tools::native_aio_provider"); + if (0 == strlen(FLAGS_aio_factory_name)) { + FLAGS_aio_factory_name = "dsn::tools::native_aio_provider"; } if (spec.env_factory_name == "") diff --git a/src/core/tools/common/providers.common.cpp b/src/core/tools/common/providers.common.cpp index 165119ebc2..9523bdd73c 100644 --- a/src/core/tools/common/providers.common.cpp +++ b/src/core/tools/common/providers.common.cpp @@ -36,7 +36,6 @@ #include "asio_net_provider.h" #include #include "lockp.std.h" -#include "native_aio_provider.linux.h" #include "simple_task_queue.h" #include "network.sim.h" #include "simple_logger.h" @@ -64,8 +63,6 @@ void register_common_providers() register_message_header_parser(NET_HDR_DSN, {"RDSN"}); register_message_header_parser(NET_HDR_THRIFT, {"THFT"}); register_message_header_parser(NET_HDR_RAW, {"_RAW"}); - - register_component_provider("dsn::tools::native_aio_provider"); } } // namespace tools } // namespace dsn diff --git a/src/core/tools/simulator/simulator.cpp b/src/core/tools/simulator/simulator.cpp index e9da95959c..c10a154035 100644 --- a/src/core/tools/simulator/simulator.cpp +++ b/src/core/tools/simulator/simulator.cpp @@ -36,7 +36,6 @@ #include #include "scheduler.h" -#include "diske.sim.h" #include "env.sim.h" #include "task_engine.sim.h" #include "sim_clock.h" @@ -52,7 +51,6 @@ void simulator::register_checker(const std::string &name, checker::factory f) void simulator::install(service_spec &spec) { - register_component_provider("dsn::tools::sim_aio_provider"); register_component_provider("dsn::tools::sim_env_provider"); register_component_provider("dsn::tools::sim_task_queue"); register_component_provider("dsn::tools::sim_timer_service"); @@ -66,8 +64,9 @@ void simulator::install(service_spec &spec) scheduler::instance(); - if (spec.aio_factory_name == "") - spec.aio_factory_name = ("dsn::tools::sim_aio_provider"); + if (0 == strlen(FLAGS_aio_factory_name)) { + FLAGS_aio_factory_name = "dsn::tools::sim_aio_provider"; + } if (spec.env_factory_name == "") spec.env_factory_name = ("dsn::tools::sim_env_provider"); diff --git a/src/dist/nfs/CMakeLists.txt b/src/dist/nfs/CMakeLists.txt index 8e41f22cd8..085cfce6d3 100644 --- a/src/dist/nfs/CMakeLists.txt +++ b/src/dist/nfs/CMakeLists.txt @@ -9,7 +9,7 @@ set(MY_PROJ_SRC "") # "GLOB" for non-recursive search set(MY_SRC_SEARCH_MODE "GLOB") -set(MY_PROJ_LIBS "") +set(MY_PROJ_LIBS dsn_aio) # Extra files that will be installed set(MY_BINPLACES "") diff --git a/src/dist/nfs/nfs_server_impl.h b/src/dist/nfs/nfs_server_impl.h index cd62f002d3..e15aef42c6 100644 --- a/src/dist/nfs/nfs_server_impl.h +++ b/src/dist/nfs/nfs_server_impl.h @@ -36,7 +36,7 @@ #include #include -#include "core/core/disk_engine.h" +#include "core/aio/disk_engine.h" #include "nfs_server.h" #include "nfs_client_impl.h" diff --git a/src/dist/nfs/test/CMakeLists.txt b/src/dist/nfs/test/CMakeLists.txt index 67c3a52ea2..3272682fa7 100644 --- a/src/dist/nfs/test/CMakeLists.txt +++ b/src/dist/nfs/test/CMakeLists.txt @@ -9,7 +9,7 @@ set(MY_PROJ_SRC "") # "GLOB" for non-recursive search set(MY_SRC_SEARCH_MODE "GLOB") -set(MY_PROJ_LIBS dsn_nfs dsn_runtime gtest) +set(MY_PROJ_LIBS dsn_nfs dsn_runtime gtest dsn_aio) set(MY_BOOST_LIBS Boost::system Boost::filesystem) diff --git a/src/dist/replication/lib/CMakeLists.txt b/src/dist/replication/lib/CMakeLists.txt index bd8a71c2ac..82cfd0eec3 100644 --- a/src/dist/replication/lib/CMakeLists.txt +++ b/src/dist/replication/lib/CMakeLists.txt @@ -37,6 +37,7 @@ set(MY_PROJ_LIBS dsn_cli dsn_http dsn_runtime + dsn_aio ) set(MY_BOOST_LIBS Boost::regex) diff --git a/src/dist/replication/meta_server/CMakeLists.txt b/src/dist/replication/meta_server/CMakeLists.txt index 75878dbb8f..ee8476b395 100644 --- a/src/dist/replication/meta_server/CMakeLists.txt +++ b/src/dist/replication/meta_server/CMakeLists.txt @@ -23,6 +23,7 @@ set(MY_PROJ_LIBS dsn_cli dsn_http dsn_runtime + dsn_aio zookeeper_mt galaxy-fds-sdk-cpp PocoNet diff --git a/src/dist/replication/test/meta_test/unit_test/CMakeLists.txt b/src/dist/replication/test/meta_test/unit_test/CMakeLists.txt index 33708072fb..e12801b872 100644 --- a/src/dist/replication/test/meta_test/unit_test/CMakeLists.txt +++ b/src/dist/replication/test/meta_test/unit_test/CMakeLists.txt @@ -22,6 +22,7 @@ set(MY_PROJ_LIBS dsn_cli dsn_http dsn_runtime + dsn_aio zookeeper_mt galaxy-fds-sdk-cpp PocoNet