From dfb80e72e5bb4bbd3c9b701e3258ee548f3d86e6 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Wed, 11 Sep 2019 16:17:41 +0800 Subject: [PATCH 1/6] refactor: rename aio in aio_task to aio_context --- include/dsn/tool-api/aio_provider.h | 10 +++++-- include/dsn/tool-api/task.h | 15 +++++----- src/core/core/disk_engine.cpp | 26 ++++++++--------- src/core/core/disk_engine.h | 3 +- src/core/core/file_io.cpp | 28 +++++++++---------- src/core/core/task.cpp | 22 ++++++--------- src/core/tools/common/fault_injector.cpp | 2 +- .../tools/common/native_aio_provider.linux.h | 4 +-- src/core/tools/common/tracer.cpp | 4 +-- src/dist/replication/lib/mutation_log.cpp | 7 +++-- src/dist/replication/test/simple_kv/case.cpp | 8 +++--- 11 files changed, 67 insertions(+), 62 deletions(-) diff --git a/include/dsn/tool-api/aio_provider.h b/include/dsn/tool-api/aio_provider.h index ec57ed8f1f..ea9a0eb7e5 100644 --- a/include/dsn/tool-api/aio_provider.h +++ b/include/dsn/tool-api/aio_provider.h @@ -80,8 +80,14 @@ class aio_provider virtual error_code close(dsn_handle_t fh) = 0; virtual error_code flush(dsn_handle_t fh) = 0; + + // Submits the aio_task to the underlying disk-io executor. + // This task may not be executed immediately, call `aio_task::wait` + // to wait until it completes. + // TODO(wutao1): Call it aio_submit(). virtual void aio(aio_task *aio) = 0; - virtual disk_aio *prepare_aio_context(aio_task *) = 0; + + virtual aio_context *prepare_aio_context(aio_task *) = 0; virtual void start() = 0; @@ -94,4 +100,4 @@ class aio_provider }; /*@}*/ -} // end namespace +} // namespace dsn diff --git a/include/dsn/tool-api/task.h b/include/dsn/tool-api/task.h index 4620a4541c..0926e2d662 100644 --- a/include/dsn/tool-api/task.h +++ b/include/dsn/tool-api/task.h @@ -543,7 +543,7 @@ enum aio_type }; class disk_engine; -class disk_aio +class aio_context : public ref_counter { public: // filled by apps @@ -557,9 +557,9 @@ class disk_aio // filled by frameworks aio_type type; disk_engine *engine; - void *file_object; + void *file_object; // TODO(wutao1): make it disk_file*, and distinguish it from `file` - disk_aio() + aio_context() : file(nullptr), buffer(nullptr), support_write_vec(false), @@ -578,7 +578,6 @@ class aio_task : public task public: aio_task(task_code code, const aio_handler &cb, int hash = 0, service_node *node = nullptr); aio_task(task_code code, aio_handler &&cb, int hash = 0, service_node *node = nullptr); - ~aio_task(); // tell the compiler that we want both the enqueue from base task and ours // to prevent the compiler complaining -Werror,-Woverloaded-virtual. @@ -586,7 +585,9 @@ class aio_task : public task void enqueue(error_code err, size_t transferred_size); size_t get_transferred_size() const { return _transferred_size; } - disk_aio *aio() { return _aio; } + + // The ownership of `aio_context` is held by `aio_task`. + aio_context *get_aio_context() { return _aio_ctx.get(); } // merge buffers in _unmerged_write_buffers to a single merged buffer. // and store it in _merged_write_buffer_holder. @@ -606,8 +607,8 @@ class aio_task : public task protected: void clear_non_trivial_on_task_end() override { _cb = nullptr; } -protected: - disk_aio *_aio; +private: + dsn::ref_ptr _aio_ctx; size_t _transferred_size; aio_handler _cb; }; diff --git a/src/core/core/disk_engine.cpp b/src/core/core/disk_engine.cpp index 5b53c34898..ddd3998830 100644 --- a/src/core/core/disk_engine.cpp +++ b/src/core/core/disk_engine.cpp @@ -41,7 +41,7 @@ aio_task *disk_write_queue::unlink_next_workload(void *plength) aio_task *first = _hdr._first, *current = first, *last = first; while (nullptr != current) { - auto io = current->aio(); + auto io = current->get_aio_context(); if (sz == 0) { sz = io->buffer_size; next_offset = io->file_offset + sz; @@ -78,7 +78,7 @@ disk_file::disk_file(dsn_handle_t handle) : _handle(handle) {} aio_task *disk_file::read(aio_task *tsk) { - tsk->add_ref(); // release on completion + tsk->add_ref(); // release on completion, see `on_read_completed`. return _read_queue.add_work(tsk, nullptr); } @@ -107,7 +107,7 @@ aio_task *disk_file::on_write_completed(aio_task *wk, void *ctx, error_code err, wk->next = nullptr; if (err == ERR_OK) { - size_t this_size = (size_t)wk->aio()->buffer_size; + size_t this_size = (size_t)wk->get_aio_context()->buffer_size; dassert(size >= this_size, "written buffer size does not equal to input buffer's size: %d vs %d", (int)size, @@ -195,7 +195,7 @@ void disk_engine::read(aio_task *aio) return; } - auto dio = aio->aio(); + auto dio = aio->get_aio_context(); auto df = (disk_file *)dio->file; dio->file = df->native_handle(); dio->file_object = df; @@ -218,12 +218,12 @@ class batch_write_io_task : public aio_task virtual void exec() override { - auto df = (disk_file *)_tasks->aio()->file_object; + auto df = (disk_file *)_tasks->get_aio_context()->file_object; uint32_t sz; - auto wk = df->on_write_completed(_tasks, (void *)&sz, error(), _transferred_size); + auto wk = df->on_write_completed(_tasks, (void *)&sz, error(), get_transferred_size()); if (wk) { - wk->aio()->engine->process_write(wk, sz); + wk->get_aio_context()->engine->process_write(wk, sz); } } @@ -243,7 +243,7 @@ void disk_engine::write(aio_task *aio) return; } - auto dio = aio->aio(); + auto dio = aio->get_aio_context(); auto df = (disk_file *)dio->file; dio->file = df->native_handle(); dio->file_object = df; @@ -259,7 +259,7 @@ void disk_engine::write(aio_task *aio) void disk_engine::process_write(aio_task *aio, uint32_t sz) { - disk_aio *dio = aio->aio(); + aio_context *dio = aio->get_aio_context(); // no batching if (dio->buffer_size == sz) { @@ -278,7 +278,7 @@ void disk_engine::process_write(aio_task *aio, uint32_t sz) else { // setup io task auto new_task = new batch_write_io_task(aio); - auto new_dio = new_task->aio(); + auto new_dio = new_task->get_aio_context(); new_dio->buffer_size = sz; new_dio->file_offset = dio->file_offset; new_dio->file = dio->file; @@ -288,7 +288,7 @@ void disk_engine::process_write(aio_task *aio, uint32_t sz) auto cur_task = aio; do { - auto cur_dio = cur_task->aio(); + auto cur_dio = cur_task->get_aio_context(); if (cur_dio->buffer) { dsn_file_buffer_t buf; buf.buffer = cur_dio->buffer; @@ -324,8 +324,8 @@ void disk_engine::complete_io(aio_task *aio, error_code err, uint32_t bytes, int // no batching else { - auto df = (disk_file *)(aio->aio()->file_object); - if (aio->aio()->type == AIO_Read) { + auto df = (disk_file *)(aio->get_aio_context()->file_object); + if (aio->get_aio_context()->type == AIO_Read) { auto wk = df->on_read_completed(aio, err, (size_t)bytes); if (wk) { _provider->aio(wk); diff --git a/src/core/core/disk_engine.h b/src/core/core/disk_engine.h index 16af16071b..774d0056e6 100644 --- a/src/core/core/disk_engine.h +++ b/src/core/core/disk_engine.h @@ -83,7 +83,8 @@ class disk_engine void read(aio_task *aio); void write(aio_task *aio); - disk_aio *prepare_aio_context(aio_task *tsk) { return _provider->prepare_aio_context(tsk); } + aio_context *prepare_aio_context(aio_task *tsk) { return _provider->prepare_aio_context(tsk); } + service_node *node() const { return _node; } private: diff --git a/src/core/core/file_io.cpp b/src/core/core/file_io.cpp index 11922054b6..229f95c4c2 100644 --- a/src/core/core/file_io.cpp +++ b/src/core/core/file_io.cpp @@ -50,11 +50,11 @@ namespace file { int hash /*= 0*/) { auto cb = create_aio_task(callback_code, tracker, std::move(callback), hash); - cb->aio()->buffer = buffer; - cb->aio()->buffer_size = count; - cb->aio()->file = file; - cb->aio()->file_offset = offset; - cb->aio()->type = AIO_Read; + cb->get_aio_context()->buffer = buffer; + cb->get_aio_context()->buffer_size = count; + cb->get_aio_context()->file = file; + cb->get_aio_context()->file_offset = offset; + cb->get_aio_context()->type = AIO_Read; task::get_current_disk()->read(cb); return cb; @@ -70,11 +70,11 @@ namespace file { int hash /*= 0*/) { auto cb = create_aio_task(callback_code, tracker, std::move(callback), hash); - cb->aio()->buffer = (char *)buffer; - cb->aio()->buffer_size = count; - cb->aio()->file = file; - cb->aio()->file_offset = offset; - cb->aio()->type = AIO_Write; + cb->get_aio_context()->buffer = (char *)buffer; + cb->get_aio_context()->buffer_size = count; + cb->get_aio_context()->file = file; + cb->get_aio_context()->file_offset = offset; + cb->get_aio_context()->type = AIO_Write; task::get_current_disk()->write(cb); return cb; @@ -90,13 +90,13 @@ namespace file { int hash /*= 0*/) { auto cb = create_aio_task(callback_code, tracker, std::move(callback), hash); - cb->aio()->file = file; - cb->aio()->file_offset = offset; - cb->aio()->type = AIO_Write; + cb->get_aio_context()->file = file; + cb->get_aio_context()->file_offset = offset; + cb->get_aio_context()->type = AIO_Write; for (int i = 0; i < buffer_count; i++) { if (buffers[i].size > 0) { cb->_unmerged_write_buffers.push_back(buffers[i]); - cb->aio()->buffer_size += buffers[i].size; + cb->get_aio_context()->buffer_size += buffers[i].size; } } diff --git a/src/core/core/task.cpp b/src/core/core/task.cpp index a7876b5970..ab98e73c65 100644 --- a/src/core/core/task.cpp +++ b/src/core/core/task.cpp @@ -601,34 +601,28 @@ aio_task::aio_task(dsn::task_code code, aio_handler &&cb, int hash, service_node spec().name.c_str()); set_error_code(ERR_IO_PENDING); - auto disk = get_current_disk(); - _aio = disk->prepare_aio_context(this); + disk_engine *disk = task::get_current_disk(); + _aio_ctx = disk->prepare_aio_context(this); } void aio_task::collapse() { if (!_unmerged_write_buffers.empty()) { - std::shared_ptr buffer(dsn::utils::make_shared_array(_aio->buffer_size)); + std::shared_ptr buffer(dsn::utils::make_shared_array(_aio_ctx->buffer_size)); char *dest = buffer.get(); for (const dsn_file_buffer_t &b : _unmerged_write_buffers) { ::memcpy(dest, b.buffer, b.size); dest += b.size; } - dassert(dest - buffer.get() == _aio->buffer_size, + dassert(dest - buffer.get() == _aio_ctx->buffer_size, "%u VS %u", dest - buffer.get(), - _aio->buffer_size); - _aio->buffer = buffer.get(); - _merged_write_buffer_holder.assign(std::move(buffer), 0, _aio->buffer_size); + _aio_ctx->buffer_size); + _aio_ctx->buffer = buffer.get(); + _merged_write_buffer_holder.assign(std::move(buffer), 0, _aio_ctx->buffer_size); } } -aio_task::~aio_task() -{ - delete _aio; - _aio = nullptr; -} - void aio_task::enqueue(error_code err, size_t transferred_size) { set_error_code(err); @@ -639,4 +633,4 @@ void aio_task::enqueue(error_code err, size_t transferred_size) task::enqueue(node()->computation()->get_pool(spec().pool_code)); } -} // end namespace +} // namespace dsn diff --git a/src/core/tools/common/fault_injector.cpp b/src/core/tools/common/fault_injector.cpp index 8f7f6cc73b..f34e5173b3 100644 --- a/src/core/tools/common/fault_injector.cpp +++ b/src/core/tools/common/fault_injector.cpp @@ -161,7 +161,7 @@ static void fault_on_task_cancel_post(task *caller, task *callee, bool succ) {} // return true means continue, otherwise early terminate with task::set_error_code static bool fault_on_aio_call(task *caller, aio_task *callee) { - switch (callee->aio()->type) { + switch (callee->get_aio_context()->type) { case AIO_Read: if (rand::next_double01() < s_fj_opts[callee->spec().code].disk_read_fail_ratio) { ddebug("fault inject %s at %s", callee->spec().name.c_str(), __FUNCTION__); diff --git a/src/core/tools/common/native_aio_provider.linux.h b/src/core/tools/common/native_aio_provider.linux.h index 45acb75a92..be1d959cda 100644 --- a/src/core/tools/common/native_aio_provider.linux.h +++ b/src/core/tools/common/native_aio_provider.linux.h @@ -49,11 +49,11 @@ class native_linux_aio_provider : public aio_provider virtual error_code close(dsn_handle_t fh) override; virtual error_code flush(dsn_handle_t fh) override; virtual void aio(aio_task *aio) override; - virtual disk_aio *prepare_aio_context(aio_task *tsk) override; + virtual aio_context *prepare_aio_context(aio_task *tsk) override; virtual void start() override; - class linux_disk_aio_context : public disk_aio + class linux_disk_aio_context : public aio_context { public: struct iocb cb; diff --git a/src/core/tools/common/tracer.cpp b/src/core/tools/common/tracer.cpp index 823047cf1b..53eafa0031 100644 --- a/src/core/tools/common/tracer.cpp +++ b/src/core/tools/common/tracer.cpp @@ -133,8 +133,8 @@ static void tracer_on_aio_call(task *caller, aio_task *callee) ddebug("%s AIO.CALL, task_id = %016" PRIx64 ", offset = %" PRIu64 ", size = %d", callee->spec().name.c_str(), callee->id(), - callee->aio()->file_offset, - callee->aio()->buffer_size); + callee->get_aio_context()->file_offset, + callee->get_aio_context()->buffer_size); } static void tracer_on_aio_enqueue(aio_task *this_) diff --git a/src/dist/replication/lib/mutation_log.cpp b/src/dist/replication/lib/mutation_log.cpp index 860f5020d3..12aed73ffa 100644 --- a/src/dist/replication/lib/mutation_log.cpp +++ b/src/dist/replication/lib/mutation_log.cpp @@ -1913,11 +1913,12 @@ class log_file::file_streamer // possible error_code: // ERR_OK result would always size as expected // ERR_HANDLE_EOF if there are not enough data in file. result would still be - // filled with possible data + // filled with possible data // ERR_FILE_OPERATION_FAILED filesystem failure error_code read_next(size_t size, /*out*/ blob &result) { binary_writer writer(size); + #define TRY(x) \ do { \ auto _x = (x); \ @@ -1926,6 +1927,7 @@ class log_file::file_streamer return _x; \ } \ } while (0) + TRY(_current_buffer->wait_ongoing_task()); if (size < _current_buffer->length()) { result.assign(_current_buffer->_buffer.get(), _current_buffer->_begin, size); @@ -1981,7 +1983,8 @@ class log_file::file_streamer } // buffer size, in bytes - static const size_t block_size_bytes = 1024 * 1024; + // TODO(wutao1): call it BLOCK_BYTES_SIZE + static constexpr size_t block_size_bytes = 1024 * 1024; // 1MB struct buffer_t { std::unique_ptr _buffer; // with block_size diff --git a/src/dist/replication/test/simple_kv/case.cpp b/src/dist/replication/test/simple_kv/case.cpp index fe4151220f..0f05293d5f 100644 --- a/src/dist/replication/test/simple_kv/case.cpp +++ b/src/dist/replication/test/simple_kv/case.cpp @@ -627,11 +627,11 @@ bool event_on_aio::check_satisfied(const event *ev) const void event_on_aio::init(aio_task *tsk) { event_on_task::init(tsk); - if (tsk->aio()->type == dsn::AIO_Invalid) + if (tsk->get_aio_context()->type == dsn::AIO_Invalid) return; // for flush task, the type is AIO_Invalid - _type = (tsk->aio()->type == dsn::AIO_Read ? "READ" : "WRITE"); - _file_offset = boost::lexical_cast(tsk->aio()->file_offset); - _buffer_size = boost::lexical_cast(tsk->aio()->buffer_size); + _type = (tsk->get_aio_context()->type == dsn::AIO_Read ? "READ" : "WRITE"); + _file_offset = boost::lexical_cast(tsk->get_aio_context()->file_offset); + _buffer_size = boost::lexical_cast(tsk->get_aio_context()->buffer_size); } void event_on_aio_enqueue::internal_to_string(std::ostream &oss) const From 245359b2fc9414e0b441f1cc9b12bde88816a03f Mon Sep 17 00:00:00 2001 From: neverchanje Date: Wed, 11 Sep 2019 23:50:01 +0800 Subject: [PATCH 2/6] refactor: rename disk_aio to aio_context --- src/core/tools/common/empty_aio_provider.cpp | 4 ++-- src/core/tools/common/empty_aio_provider.h | 2 +- src/core/tools/common/native_aio_provider.linux.cpp | 4 ++-- src/core/tools/common/native_aio_provider.linux.h | 2 +- src/core/tools/common/native_aio_provider.posix.cpp | 8 ++++---- src/core/tools/common/native_aio_provider.posix.h | 2 +- 6 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/core/tools/common/empty_aio_provider.cpp b/src/core/tools/common/empty_aio_provider.cpp index a3476f2961..25c395b389 100644 --- a/src/core/tools/common/empty_aio_provider.cpp +++ b/src/core/tools/common/empty_aio_provider.cpp @@ -56,9 +56,9 @@ error_code empty_aio_provider::flush(dsn_handle_t fh) { return ERR_OK; } void empty_aio_provider::aio(aio_task *aio) { - complete_io(aio, ERR_OK, aio->aio()->buffer_size, 0); + complete_io(aio, ERR_OK, aio->get_aio_context()->buffer_size, 0); } -disk_aio *empty_aio_provider::prepare_aio_context(aio_task *tsk) { return new disk_aio(); } +aio_context *empty_aio_provider::prepare_aio_context(aio_task *tsk) { return new aio_context(); } } } diff --git a/src/core/tools/common/empty_aio_provider.h b/src/core/tools/common/empty_aio_provider.h index 941df8efc8..5cc72f5ecc 100644 --- a/src/core/tools/common/empty_aio_provider.h +++ b/src/core/tools/common/empty_aio_provider.h @@ -50,7 +50,7 @@ class empty_aio_provider : public aio_provider virtual error_code close(dsn_handle_t fh) override; virtual error_code flush(dsn_handle_t fh) override; virtual void aio(aio_task *aio) override; - virtual disk_aio *prepare_aio_context(aio_task *tsk) override; + virtual aio_context *prepare_aio_context(aio_task *tsk) override; virtual void start() override {} }; diff --git a/src/core/tools/common/native_aio_provider.linux.cpp b/src/core/tools/common/native_aio_provider.linux.cpp index f6df8acbf7..59687cc551 100644 --- a/src/core/tools/common/native_aio_provider.linux.cpp +++ b/src/core/tools/common/native_aio_provider.linux.cpp @@ -103,7 +103,7 @@ error_code native_linux_aio_provider::flush(dsn_handle_t fh) } } -disk_aio *native_linux_aio_provider::prepare_aio_context(aio_task *tsk) +aio_context *native_linux_aio_provider::prepare_aio_context(aio_task *tsk) { return new linux_disk_aio_context(tsk); } @@ -169,7 +169,7 @@ error_code native_linux_aio_provider::aio_internal(aio_task *aio_tsk, linux_disk_aio_context *aio; int ret; - aio = (linux_disk_aio_context *)aio_tsk->aio(); + aio = (linux_disk_aio_context *)aio_tsk->get_aio_context(); memset(&aio->cb, 0, sizeof(aio->cb)); diff --git a/src/core/tools/common/native_aio_provider.linux.h b/src/core/tools/common/native_aio_provider.linux.h index be1d959cda..d5f6cc6566 100644 --- a/src/core/tools/common/native_aio_provider.linux.h +++ b/src/core/tools/common/native_aio_provider.linux.h @@ -64,7 +64,7 @@ class native_linux_aio_provider : public aio_provider uint32_t bytes; explicit linux_disk_aio_context(aio_task *tsk_) - : disk_aio(), tsk(tsk_), this_(nullptr), evt(nullptr), err(ERR_UNKNOWN), bytes(0) + : tsk(tsk_), this_(nullptr), evt(nullptr), err(ERR_UNKNOWN), bytes(0) { } }; diff --git a/src/core/tools/common/native_aio_provider.posix.cpp b/src/core/tools/common/native_aio_provider.posix.cpp index cde38331f6..e343ce3d2c 100644 --- a/src/core/tools/common/native_aio_provider.posix.cpp +++ b/src/core/tools/common/native_aio_provider.posix.cpp @@ -81,7 +81,7 @@ error_code native_posix_aio_provider::flush(dsn_handle_t fh) } } -class posix_disk_aio_context : public disk_aio +class posix_disk_aio_context : public aio_context { public: struct aiocb cb; @@ -92,13 +92,13 @@ class posix_disk_aio_context : public disk_aio uint32_t bytes; explicit posix_disk_aio_context(aio_task *tsk_) - : disk_aio(), tsk(tsk_), this_(nullptr), evt(nullptr), err(ERR_UNKNOWN), bytes(0) + : tsk(tsk_), this_(nullptr), evt(nullptr), err(ERR_UNKNOWN), bytes(0) { ::bzero((char *)&cb, sizeof(cb)); } }; -disk_aio *native_posix_aio_provider::prepare_aio_context(aio_task *tsk) +aio_context *native_posix_aio_provider::prepare_aio_context(aio_task *tsk) { return new posix_disk_aio_context(tsk); } @@ -138,7 +138,7 @@ error_code native_posix_aio_provider::aio_internal(aio_task *aio_tsk, bool async, /*out*/ uint32_t *pbytes /*= nullptr*/) { - auto aio = (posix_disk_aio_context *)aio_tsk->aio(); + auto aio = (posix_disk_aio_context *)aio_tsk->get_aio_context(); int r = 0; aio->this_ = this; diff --git a/src/core/tools/common/native_aio_provider.posix.h b/src/core/tools/common/native_aio_provider.posix.h index 1e44a1810e..5347e4b1ae 100644 --- a/src/core/tools/common/native_aio_provider.posix.h +++ b/src/core/tools/common/native_aio_provider.posix.h @@ -55,7 +55,7 @@ class native_posix_aio_provider : public aio_provider virtual error_code close(dsn_handle_t fh) override; virtual error_code flush(dsn_handle_t fh) override; virtual void aio(aio_task *aio) override; - virtual disk_aio *prepare_aio_context(aio_task *tsk) override; + virtual aio_context *prepare_aio_context(aio_task *tsk) override; virtual void start() override {} From c6fa4d1d9cdcb4fb7f5550d633a47bb401ce8839 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Thu, 12 Sep 2019 11:41:00 +0800 Subject: [PATCH 3/6] fix travis --- src/core/tests/task_test.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/core/tests/task_test.cpp b/src/core/tests/task_test.cpp index 49fd027a4b..06e6901003 100644 --- a/src/core/tests/task_test.cpp +++ b/src/core/tests/task_test.cpp @@ -46,14 +46,13 @@ class task_test : public ::testing::Test { disk_file *fp = file::open("config-test.ini", O_RDONLY | O_BINARY, 0); - // this aio task is enqueued into read_queue of disk_engine + // this aio task is enqueued into read-queue of disk_engine char buffer[128]; // in simulator environment this task will be executed immediately, // so we excluded config-test-sim.ini for this test. auto t = file::read(fp, buffer, 128, 0, LPC_TASK_TEST, nullptr, nullptr); t->wait(10000); - ASSERT_TRUE(t->_wait_event.load() != nullptr); ASSERT_EQ(t->_state, task_state::TASK_STATE_FINISHED); // signal a finished task won't cause failure From a799b5a0e8331d7ce971ff08081b3d2cfd843e51 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Mon, 16 Sep 2019 15:21:00 +0800 Subject: [PATCH 4/6] fix travis --- src/core/tests/task_test.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/tests/task_test.cpp b/src/core/tests/task_test.cpp index 06e6901003..748553d7e8 100644 --- a/src/core/tests/task_test.cpp +++ b/src/core/tests/task_test.cpp @@ -56,8 +56,8 @@ class task_test : public ::testing::Test ASSERT_EQ(t->_state, task_state::TASK_STATE_FINISHED); // signal a finished task won't cause failure - ASSERT_TRUE(t->signal_waiters()); - ASSERT_TRUE(t->signal_waiters()); + t->signal_waiters(); + t->signal_waiters(); } }; From 1896ce5fd697a27d4e0a0d7b5520b09f24a013f9 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Mon, 16 Sep 2019 15:25:11 +0800 Subject: [PATCH 5/6] add comment on ut --- src/core/tests/task_test.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/tests/task_test.cpp b/src/core/tests/task_test.cpp index 748553d7e8..5a7432d0a2 100644 --- a/src/core/tests/task_test.cpp +++ b/src/core/tests/task_test.cpp @@ -56,7 +56,7 @@ class task_test : public ::testing::Test ASSERT_EQ(t->_state, task_state::TASK_STATE_FINISHED); // signal a finished task won't cause failure - t->signal_waiters(); + t->signal_waiters(); // signal_waiters may return false t->signal_waiters(); } }; From 239ecba948dd0065a8a9c13c06648604ff9559fe Mon Sep 17 00:00:00 2001 From: neverchanje Date: Mon, 16 Sep 2019 18:14:14 +0800 Subject: [PATCH 6/6] remove files --- src/core/tools/common/empty_aio_provider.cpp | 64 ------ src/core/tools/common/empty_aio_provider.h | 58 ----- .../common/native_aio_provider.posix.cpp | 207 ------------------ .../tools/common/native_aio_provider.posix.h | 71 ------ 4 files changed, 400 deletions(-) delete mode 100644 src/core/tools/common/empty_aio_provider.cpp delete mode 100644 src/core/tools/common/empty_aio_provider.h delete mode 100644 src/core/tools/common/native_aio_provider.posix.cpp delete mode 100644 src/core/tools/common/native_aio_provider.posix.h diff --git a/src/core/tools/common/empty_aio_provider.cpp b/src/core/tools/common/empty_aio_provider.cpp deleted file mode 100644 index 25c395b389..0000000000 --- a/src/core/tools/common/empty_aio_provider.cpp +++ /dev/null @@ -1,64 +0,0 @@ -/* - * The MIT License (MIT) - * - * Copyright (c) 2015 Microsoft Corporation - * - * -=- Robust Distributed System Nucleus (rDSN) -=- - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ - -/* - * Description: - * What is this file about? - * - * Revision history: - * xxxx-xx-xx, author, first version - * xxxx-xx-xx, author, fix bug about xxx - */ - -#include "empty_aio_provider.h" - -namespace dsn { -namespace tools { - -empty_aio_provider::empty_aio_provider(disk_engine *disk, aio_provider *inner_provider) - : aio_provider(disk, inner_provider) -{ -} - -empty_aio_provider::~empty_aio_provider() {} - -dsn_handle_t empty_aio_provider::open(const char *file_name, int flag, int pmode) -{ - return (dsn_handle_t)(size_t)(1); -} - -error_code empty_aio_provider::close(dsn_handle_t fh) { return ERR_OK; } - -error_code empty_aio_provider::flush(dsn_handle_t fh) { return ERR_OK; } - -void empty_aio_provider::aio(aio_task *aio) -{ - complete_io(aio, ERR_OK, aio->get_aio_context()->buffer_size, 0); -} - -aio_context *empty_aio_provider::prepare_aio_context(aio_task *tsk) { return new aio_context(); } -} -} diff --git a/src/core/tools/common/empty_aio_provider.h b/src/core/tools/common/empty_aio_provider.h deleted file mode 100644 index 5cc72f5ecc..0000000000 --- a/src/core/tools/common/empty_aio_provider.h +++ /dev/null @@ -1,58 +0,0 @@ -/* - * The MIT License (MIT) - * - * Copyright (c) 2015 Microsoft Corporation - * - * -=- Robust Distributed System Nucleus (rDSN) -=- - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ - -/* - * Description: - * What is this file about? - * - * Revision history: - * xxxx-xx-xx, author, first version - * xxxx-xx-xx, author, fix bug about xxx - */ - -#pragma once - -#include - -namespace dsn { -namespace tools { - -class empty_aio_provider : public aio_provider -{ -public: - empty_aio_provider(disk_engine *disk, aio_provider *inner_provider); - ~empty_aio_provider(); - - virtual dsn_handle_t open(const char *file_name, int flag, int pmode) override; - virtual error_code close(dsn_handle_t fh) override; - virtual error_code flush(dsn_handle_t fh) override; - virtual void aio(aio_task *aio) override; - virtual aio_context *prepare_aio_context(aio_task *tsk) override; - - virtual void start() override {} -}; -} -} diff --git a/src/core/tools/common/native_aio_provider.posix.cpp b/src/core/tools/common/native_aio_provider.posix.cpp deleted file mode 100644 index e343ce3d2c..0000000000 --- a/src/core/tools/common/native_aio_provider.posix.cpp +++ /dev/null @@ -1,207 +0,0 @@ -/* - * The MIT License (MIT) - * - * Copyright (c) 2015 Microsoft Corporation - * - * -=- Robust Distributed System Nucleus (rDSN) -=- - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ - -/* - * Description: - * What is this file about? - * - * Revision history: - * xxxx-xx-xx, author, first version - * xxxx-xx-xx, author, fix bug about xxx - */ - -#ifndef _WIN32 - -#include "native_aio_provider.posix.h" - -#include -#include -#include - -namespace dsn { -namespace tools { - -native_posix_aio_provider::native_posix_aio_provider(disk_engine *disk, - aio_provider *inner_provider) - : aio_provider(disk, inner_provider) -{ -} - -native_posix_aio_provider::~native_posix_aio_provider() {} - -dsn_handle_t native_posix_aio_provider::open(const char *file_name, int flag, int pmode) -{ - dsn_handle_t fh = (dsn_handle_t)(uintptr_t)::open(file_name, flag, pmode); - if (fh == DSN_INVALID_FILE_HANDLE) { - derror("create file failed, err = %s", strerror(errno)); - } - return fh; -} - -error_code native_posix_aio_provider::close(dsn_handle_t fh) -{ - if (fh == DSN_INVALID_FILE_HANDLE || ::close((int)(uintptr_t)(fh)) == 0) { - return ERR_OK; - } else { - derror("close file failed, err = %s", strerror(errno)); - return ERR_FILE_OPERATION_FAILED; - } -} - -error_code native_posix_aio_provider::flush(dsn_handle_t fh) -{ - if (fh == DSN_INVALID_FILE_HANDLE || ::fsync((int)(uintptr_t)(fh)) == 0) { - return ERR_OK; - } else { - derror("flush file failed, err = %s", strerror(errno)); - return ERR_FILE_OPERATION_FAILED; - } -} - -class posix_disk_aio_context : public aio_context -{ -public: - struct aiocb cb; - aio_task *tsk; - native_posix_aio_provider *this_; - utils::notify_event *evt; - error_code err; - uint32_t bytes; - - explicit posix_disk_aio_context(aio_task *tsk_) - : tsk(tsk_), this_(nullptr), evt(nullptr), err(ERR_UNKNOWN), bytes(0) - { - ::bzero((char *)&cb, sizeof(cb)); - } -}; - -aio_context *native_posix_aio_provider::prepare_aio_context(aio_task *tsk) -{ - return new posix_disk_aio_context(tsk); -} - -void native_posix_aio_provider::aio(aio_task *aio_tsk) { aio_internal(aio_tsk, true); } - -void aio_completed(sigval sigval) -{ - auto ctx = (posix_disk_aio_context *)sigval.sival_ptr; - - if (dsn::tls_dsn.magic != 0xdeadbeef) - task::set_tls_dsn_context(ctx->tsk->node(), nullptr); - - int err = aio_error(&ctx->cb); - if (err != EINPROGRESS) { - size_t bytes = aio_return(&ctx->cb); // from e.g., read or write - error_code ec; - if (err != 0) { - derror("aio error, err = %s", strerror(err)); - ec = ERR_FILE_OPERATION_FAILED; - } else { - ec = bytes > 0 ? ERR_OK : ERR_HANDLE_EOF; - } - - if (!ctx->evt) { - aio_task *aio(ctx->tsk); - ctx->this_->complete_io(aio, ec, bytes); - } else { - ctx->err = ec; - ctx->bytes = bytes; - ctx->evt->notify(); - } - } -} - -error_code native_posix_aio_provider::aio_internal(aio_task *aio_tsk, - bool async, - /*out*/ uint32_t *pbytes /*= nullptr*/) -{ - auto aio = (posix_disk_aio_context *)aio_tsk->get_aio_context(); - int r = 0; - - aio->this_ = this; - memset(&aio->cb, 0, sizeof(aio->cb)); - aio->cb.aio_reqprio = 0; - aio->cb.aio_lio_opcode = (aio->type == AIO_Read ? LIO_READ : LIO_WRITE); - aio->cb.aio_fildes = static_cast((ssize_t)aio->file); - aio->cb.aio_buf = aio->buffer; - aio->cb.aio_nbytes = aio->buffer_size; - aio->cb.aio_offset = aio->file_offset; - - // set up callback - aio->cb.aio_sigevent.sigev_notify = SIGEV_THREAD; - aio->cb.aio_sigevent.sigev_notify_function = aio_completed; - aio->cb.aio_sigevent.sigev_notify_attributes = nullptr; - aio->cb.aio_sigevent.sigev_value.sival_ptr = aio; - - if (!async) { - aio->evt = new utils::notify_event(); - aio->err = ERR_OK; - aio->bytes = 0; - } - - switch (aio->type) { - case AIO_Read: - r = aio_read(&aio->cb); - break; - case AIO_Write: - r = aio_write(&aio->cb); - break; - default: - dassert(false, "unknown aio type %u", static_cast(aio->type)); - break; - } - - if (r != 0) { - derror("file op failed, err = %d (%s). On FreeBSD, you may need to load" - " aio kernel module by running 'sudo kldload aio'.", - errno, - strerror(errno)); - - if (async) { - complete_io(aio_tsk, ERR_FILE_OPERATION_FAILED, 0); - } else { - delete aio->evt; - aio->evt = nullptr; - } - return ERR_FILE_OPERATION_FAILED; - } else { - if (async) { - return ERR_IO_PENDING; - } else { - aio->evt->wait(); - delete aio->evt; - aio->evt = nullptr; - if (pbytes != nullptr) { - *pbytes = aio->bytes; - } - return aio->err; - } - } -} -} -} // end namespace dsn::tools - -#endif diff --git a/src/core/tools/common/native_aio_provider.posix.h b/src/core/tools/common/native_aio_provider.posix.h deleted file mode 100644 index 5347e4b1ae..0000000000 --- a/src/core/tools/common/native_aio_provider.posix.h +++ /dev/null @@ -1,71 +0,0 @@ -/* - * The MIT License (MIT) - * - * Copyright (c) 2015 Microsoft Corporation - * - * -=- Robust Distributed System Nucleus (rDSN) -=- - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ - -/* - * Description: - * What is this file about? - * - * Revision history: - * xxxx-xx-xx, author, first version - * xxxx-xx-xx, author, fix bug about xxx - */ - -#pragma once - -#ifndef _WIN32 - -#include -#include - -#include -#include - -namespace dsn { -namespace tools { -class native_posix_aio_provider : public aio_provider -{ -public: - native_posix_aio_provider(disk_engine *disk, aio_provider *inner_provider); - ~native_posix_aio_provider(); - - virtual dsn_handle_t open(const char *file_name, int flag, int pmode) override; - virtual error_code close(dsn_handle_t fh) override; - virtual error_code flush(dsn_handle_t fh) override; - virtual void aio(aio_task *aio) override; - virtual aio_context *prepare_aio_context(aio_task *tsk) override; - - virtual void start() override {} - -protected: - error_code aio_internal(aio_task *aio, bool async, /*out*/ uint32_t *pbytes = nullptr); - -private: - friend void aio_completed(sigval sigval); -}; -} -} - -#endif