diff --git a/include/dsn/tool-api/aio_provider.h b/include/dsn/tool-api/aio_provider.h index ec57ed8f1f..ea9a0eb7e5 100644 --- a/include/dsn/tool-api/aio_provider.h +++ b/include/dsn/tool-api/aio_provider.h @@ -80,8 +80,14 @@ class aio_provider virtual error_code close(dsn_handle_t fh) = 0; virtual error_code flush(dsn_handle_t fh) = 0; + + // Submits the aio_task to the underlying disk-io executor. + // This task may not be executed immediately, call `aio_task::wait` + // to wait until it completes. + // TODO(wutao1): Call it aio_submit(). virtual void aio(aio_task *aio) = 0; - virtual disk_aio *prepare_aio_context(aio_task *) = 0; + + virtual aio_context *prepare_aio_context(aio_task *) = 0; virtual void start() = 0; @@ -94,4 +100,4 @@ class aio_provider }; /*@}*/ -} // end namespace +} // namespace dsn diff --git a/include/dsn/tool-api/task.h b/include/dsn/tool-api/task.h index 4620a4541c..0926e2d662 100644 --- a/include/dsn/tool-api/task.h +++ b/include/dsn/tool-api/task.h @@ -543,7 +543,7 @@ enum aio_type }; class disk_engine; -class disk_aio +class aio_context : public ref_counter { public: // filled by apps @@ -557,9 +557,9 @@ class disk_aio // filled by frameworks aio_type type; disk_engine *engine; - void *file_object; + void *file_object; // TODO(wutao1): make it disk_file*, and distinguish it from `file` - disk_aio() + aio_context() : file(nullptr), buffer(nullptr), support_write_vec(false), @@ -578,7 +578,6 @@ class aio_task : public task public: aio_task(task_code code, const aio_handler &cb, int hash = 0, service_node *node = nullptr); aio_task(task_code code, aio_handler &&cb, int hash = 0, service_node *node = nullptr); - ~aio_task(); // tell the compiler that we want both the enqueue from base task and ours // to prevent the compiler complaining -Werror,-Woverloaded-virtual. @@ -586,7 +585,9 @@ class aio_task : public task void enqueue(error_code err, size_t transferred_size); size_t get_transferred_size() const { return _transferred_size; } - disk_aio *aio() { return _aio; } + + // The ownership of `aio_context` is held by `aio_task`. + aio_context *get_aio_context() { return _aio_ctx.get(); } // merge buffers in _unmerged_write_buffers to a single merged buffer. // and store it in _merged_write_buffer_holder. @@ -606,8 +607,8 @@ class aio_task : public task protected: void clear_non_trivial_on_task_end() override { _cb = nullptr; } -protected: - disk_aio *_aio; +private: + dsn::ref_ptr _aio_ctx; size_t _transferred_size; aio_handler _cb; }; diff --git a/src/core/core/disk_engine.cpp b/src/core/core/disk_engine.cpp index 5b53c34898..ddd3998830 100644 --- a/src/core/core/disk_engine.cpp +++ b/src/core/core/disk_engine.cpp @@ -41,7 +41,7 @@ aio_task *disk_write_queue::unlink_next_workload(void *plength) aio_task *first = _hdr._first, *current = first, *last = first; while (nullptr != current) { - auto io = current->aio(); + auto io = current->get_aio_context(); if (sz == 0) { sz = io->buffer_size; next_offset = io->file_offset + sz; @@ -78,7 +78,7 @@ disk_file::disk_file(dsn_handle_t handle) : _handle(handle) {} aio_task *disk_file::read(aio_task *tsk) { - tsk->add_ref(); // release on completion + tsk->add_ref(); // release on completion, see `on_read_completed`. return _read_queue.add_work(tsk, nullptr); } @@ -107,7 +107,7 @@ aio_task *disk_file::on_write_completed(aio_task *wk, void *ctx, error_code err, wk->next = nullptr; if (err == ERR_OK) { - size_t this_size = (size_t)wk->aio()->buffer_size; + size_t this_size = (size_t)wk->get_aio_context()->buffer_size; dassert(size >= this_size, "written buffer size does not equal to input buffer's size: %d vs %d", (int)size, @@ -195,7 +195,7 @@ void disk_engine::read(aio_task *aio) return; } - auto dio = aio->aio(); + auto dio = aio->get_aio_context(); auto df = (disk_file *)dio->file; dio->file = df->native_handle(); dio->file_object = df; @@ -218,12 +218,12 @@ class batch_write_io_task : public aio_task virtual void exec() override { - auto df = (disk_file *)_tasks->aio()->file_object; + auto df = (disk_file *)_tasks->get_aio_context()->file_object; uint32_t sz; - auto wk = df->on_write_completed(_tasks, (void *)&sz, error(), _transferred_size); + auto wk = df->on_write_completed(_tasks, (void *)&sz, error(), get_transferred_size()); if (wk) { - wk->aio()->engine->process_write(wk, sz); + wk->get_aio_context()->engine->process_write(wk, sz); } } @@ -243,7 +243,7 @@ void disk_engine::write(aio_task *aio) return; } - auto dio = aio->aio(); + auto dio = aio->get_aio_context(); auto df = (disk_file *)dio->file; dio->file = df->native_handle(); dio->file_object = df; @@ -259,7 +259,7 @@ void disk_engine::write(aio_task *aio) void disk_engine::process_write(aio_task *aio, uint32_t sz) { - disk_aio *dio = aio->aio(); + aio_context *dio = aio->get_aio_context(); // no batching if (dio->buffer_size == sz) { @@ -278,7 +278,7 @@ void disk_engine::process_write(aio_task *aio, uint32_t sz) else { // setup io task auto new_task = new batch_write_io_task(aio); - auto new_dio = new_task->aio(); + auto new_dio = new_task->get_aio_context(); new_dio->buffer_size = sz; new_dio->file_offset = dio->file_offset; new_dio->file = dio->file; @@ -288,7 +288,7 @@ void disk_engine::process_write(aio_task *aio, uint32_t sz) auto cur_task = aio; do { - auto cur_dio = cur_task->aio(); + auto cur_dio = cur_task->get_aio_context(); if (cur_dio->buffer) { dsn_file_buffer_t buf; buf.buffer = cur_dio->buffer; @@ -324,8 +324,8 @@ void disk_engine::complete_io(aio_task *aio, error_code err, uint32_t bytes, int // no batching else { - auto df = (disk_file *)(aio->aio()->file_object); - if (aio->aio()->type == AIO_Read) { + auto df = (disk_file *)(aio->get_aio_context()->file_object); + if (aio->get_aio_context()->type == AIO_Read) { auto wk = df->on_read_completed(aio, err, (size_t)bytes); if (wk) { _provider->aio(wk); diff --git a/src/core/core/disk_engine.h b/src/core/core/disk_engine.h index 16af16071b..774d0056e6 100644 --- a/src/core/core/disk_engine.h +++ b/src/core/core/disk_engine.h @@ -83,7 +83,8 @@ class disk_engine void read(aio_task *aio); void write(aio_task *aio); - disk_aio *prepare_aio_context(aio_task *tsk) { return _provider->prepare_aio_context(tsk); } + aio_context *prepare_aio_context(aio_task *tsk) { return _provider->prepare_aio_context(tsk); } + service_node *node() const { return _node; } private: diff --git a/src/core/core/file_io.cpp b/src/core/core/file_io.cpp index 11922054b6..229f95c4c2 100644 --- a/src/core/core/file_io.cpp +++ b/src/core/core/file_io.cpp @@ -50,11 +50,11 @@ namespace file { int hash /*= 0*/) { auto cb = create_aio_task(callback_code, tracker, std::move(callback), hash); - cb->aio()->buffer = buffer; - cb->aio()->buffer_size = count; - cb->aio()->file = file; - cb->aio()->file_offset = offset; - cb->aio()->type = AIO_Read; + cb->get_aio_context()->buffer = buffer; + cb->get_aio_context()->buffer_size = count; + cb->get_aio_context()->file = file; + cb->get_aio_context()->file_offset = offset; + cb->get_aio_context()->type = AIO_Read; task::get_current_disk()->read(cb); return cb; @@ -70,11 +70,11 @@ namespace file { int hash /*= 0*/) { auto cb = create_aio_task(callback_code, tracker, std::move(callback), hash); - cb->aio()->buffer = (char *)buffer; - cb->aio()->buffer_size = count; - cb->aio()->file = file; - cb->aio()->file_offset = offset; - cb->aio()->type = AIO_Write; + cb->get_aio_context()->buffer = (char *)buffer; + cb->get_aio_context()->buffer_size = count; + cb->get_aio_context()->file = file; + cb->get_aio_context()->file_offset = offset; + cb->get_aio_context()->type = AIO_Write; task::get_current_disk()->write(cb); return cb; @@ -90,13 +90,13 @@ namespace file { int hash /*= 0*/) { auto cb = create_aio_task(callback_code, tracker, std::move(callback), hash); - cb->aio()->file = file; - cb->aio()->file_offset = offset; - cb->aio()->type = AIO_Write; + cb->get_aio_context()->file = file; + cb->get_aio_context()->file_offset = offset; + cb->get_aio_context()->type = AIO_Write; for (int i = 0; i < buffer_count; i++) { if (buffers[i].size > 0) { cb->_unmerged_write_buffers.push_back(buffers[i]); - cb->aio()->buffer_size += buffers[i].size; + cb->get_aio_context()->buffer_size += buffers[i].size; } } diff --git a/src/core/core/task.cpp b/src/core/core/task.cpp index a7876b5970..ab98e73c65 100644 --- a/src/core/core/task.cpp +++ b/src/core/core/task.cpp @@ -601,34 +601,28 @@ aio_task::aio_task(dsn::task_code code, aio_handler &&cb, int hash, service_node spec().name.c_str()); set_error_code(ERR_IO_PENDING); - auto disk = get_current_disk(); - _aio = disk->prepare_aio_context(this); + disk_engine *disk = task::get_current_disk(); + _aio_ctx = disk->prepare_aio_context(this); } void aio_task::collapse() { if (!_unmerged_write_buffers.empty()) { - std::shared_ptr buffer(dsn::utils::make_shared_array(_aio->buffer_size)); + std::shared_ptr buffer(dsn::utils::make_shared_array(_aio_ctx->buffer_size)); char *dest = buffer.get(); for (const dsn_file_buffer_t &b : _unmerged_write_buffers) { ::memcpy(dest, b.buffer, b.size); dest += b.size; } - dassert(dest - buffer.get() == _aio->buffer_size, + dassert(dest - buffer.get() == _aio_ctx->buffer_size, "%u VS %u", dest - buffer.get(), - _aio->buffer_size); - _aio->buffer = buffer.get(); - _merged_write_buffer_holder.assign(std::move(buffer), 0, _aio->buffer_size); + _aio_ctx->buffer_size); + _aio_ctx->buffer = buffer.get(); + _merged_write_buffer_holder.assign(std::move(buffer), 0, _aio_ctx->buffer_size); } } -aio_task::~aio_task() -{ - delete _aio; - _aio = nullptr; -} - void aio_task::enqueue(error_code err, size_t transferred_size) { set_error_code(err); @@ -639,4 +633,4 @@ void aio_task::enqueue(error_code err, size_t transferred_size) task::enqueue(node()->computation()->get_pool(spec().pool_code)); } -} // end namespace +} // namespace dsn diff --git a/src/core/tests/task_test.cpp b/src/core/tests/task_test.cpp index 49fd027a4b..5a7432d0a2 100644 --- a/src/core/tests/task_test.cpp +++ b/src/core/tests/task_test.cpp @@ -46,19 +46,18 @@ class task_test : public ::testing::Test { disk_file *fp = file::open("config-test.ini", O_RDONLY | O_BINARY, 0); - // this aio task is enqueued into read_queue of disk_engine + // this aio task is enqueued into read-queue of disk_engine char buffer[128]; // in simulator environment this task will be executed immediately, // so we excluded config-test-sim.ini for this test. auto t = file::read(fp, buffer, 128, 0, LPC_TASK_TEST, nullptr, nullptr); t->wait(10000); - ASSERT_TRUE(t->_wait_event.load() != nullptr); ASSERT_EQ(t->_state, task_state::TASK_STATE_FINISHED); // signal a finished task won't cause failure - ASSERT_TRUE(t->signal_waiters()); - ASSERT_TRUE(t->signal_waiters()); + t->signal_waiters(); // signal_waiters may return false + t->signal_waiters(); } }; diff --git a/src/core/tools/common/fault_injector.cpp b/src/core/tools/common/fault_injector.cpp index 8f7f6cc73b..f34e5173b3 100644 --- a/src/core/tools/common/fault_injector.cpp +++ b/src/core/tools/common/fault_injector.cpp @@ -161,7 +161,7 @@ static void fault_on_task_cancel_post(task *caller, task *callee, bool succ) {} // return true means continue, otherwise early terminate with task::set_error_code static bool fault_on_aio_call(task *caller, aio_task *callee) { - switch (callee->aio()->type) { + switch (callee->get_aio_context()->type) { case AIO_Read: if (rand::next_double01() < s_fj_opts[callee->spec().code].disk_read_fail_ratio) { ddebug("fault inject %s at %s", callee->spec().name.c_str(), __FUNCTION__); diff --git a/src/core/tools/common/native_aio_provider.linux.cpp b/src/core/tools/common/native_aio_provider.linux.cpp index 94f898e892..5b90a485d6 100644 --- a/src/core/tools/common/native_aio_provider.linux.cpp +++ b/src/core/tools/common/native_aio_provider.linux.cpp @@ -93,7 +93,7 @@ error_code native_linux_aio_provider::flush(dsn_handle_t fh) } } -disk_aio *native_linux_aio_provider::prepare_aio_context(aio_task *tsk) +aio_context *native_linux_aio_provider::prepare_aio_context(aio_task *tsk) { return new linux_disk_aio_context(tsk); } @@ -159,7 +159,7 @@ error_code native_linux_aio_provider::aio_internal(aio_task *aio_tsk, linux_disk_aio_context *aio; int ret; - aio = (linux_disk_aio_context *)aio_tsk->aio(); + aio = (linux_disk_aio_context *)aio_tsk->get_aio_context(); memset(&aio->cb, 0, sizeof(aio->cb)); diff --git a/src/core/tools/common/native_aio_provider.linux.h b/src/core/tools/common/native_aio_provider.linux.h index 45acb75a92..d5f6cc6566 100644 --- a/src/core/tools/common/native_aio_provider.linux.h +++ b/src/core/tools/common/native_aio_provider.linux.h @@ -49,11 +49,11 @@ class native_linux_aio_provider : public aio_provider virtual error_code close(dsn_handle_t fh) override; virtual error_code flush(dsn_handle_t fh) override; virtual void aio(aio_task *aio) override; - virtual disk_aio *prepare_aio_context(aio_task *tsk) override; + virtual aio_context *prepare_aio_context(aio_task *tsk) override; virtual void start() override; - class linux_disk_aio_context : public disk_aio + class linux_disk_aio_context : public aio_context { public: struct iocb cb; @@ -64,7 +64,7 @@ class native_linux_aio_provider : public aio_provider uint32_t bytes; explicit linux_disk_aio_context(aio_task *tsk_) - : disk_aio(), tsk(tsk_), this_(nullptr), evt(nullptr), err(ERR_UNKNOWN), bytes(0) + : tsk(tsk_), this_(nullptr), evt(nullptr), err(ERR_UNKNOWN), bytes(0) { } }; diff --git a/src/core/tools/common/tracer.cpp b/src/core/tools/common/tracer.cpp index 823047cf1b..53eafa0031 100644 --- a/src/core/tools/common/tracer.cpp +++ b/src/core/tools/common/tracer.cpp @@ -133,8 +133,8 @@ static void tracer_on_aio_call(task *caller, aio_task *callee) ddebug("%s AIO.CALL, task_id = %016" PRIx64 ", offset = %" PRIu64 ", size = %d", callee->spec().name.c_str(), callee->id(), - callee->aio()->file_offset, - callee->aio()->buffer_size); + callee->get_aio_context()->file_offset, + callee->get_aio_context()->buffer_size); } static void tracer_on_aio_enqueue(aio_task *this_) diff --git a/src/dist/replication/lib/mutation_log.cpp b/src/dist/replication/lib/mutation_log.cpp index de7a4aa7c6..3a22e92d8c 100644 --- a/src/dist/replication/lib/mutation_log.cpp +++ b/src/dist/replication/lib/mutation_log.cpp @@ -1816,11 +1816,12 @@ class log_file::file_streamer // possible error_code: // ERR_OK result would always size as expected // ERR_HANDLE_EOF if there are not enough data in file. result would still be - // filled with possible data + // filled with possible data // ERR_FILE_OPERATION_FAILED filesystem failure error_code read_next(size_t size, /*out*/ blob &result) { binary_writer writer(size); + #define TRY(x) \ do { \ auto _x = (x); \ @@ -1829,6 +1830,7 @@ class log_file::file_streamer return _x; \ } \ } while (0) + TRY(_current_buffer->wait_ongoing_task()); if (size < _current_buffer->length()) { result.assign(_current_buffer->_buffer.get(), _current_buffer->_begin, size); @@ -1884,7 +1886,8 @@ class log_file::file_streamer } // buffer size, in bytes - static const size_t block_size_bytes = 1024 * 1024; + // TODO(wutao1): call it BLOCK_BYTES_SIZE + static constexpr size_t block_size_bytes = 1024 * 1024; // 1MB struct buffer_t { std::unique_ptr _buffer; // with block_size diff --git a/src/dist/replication/test/simple_kv/case.cpp b/src/dist/replication/test/simple_kv/case.cpp index fe4151220f..0f05293d5f 100644 --- a/src/dist/replication/test/simple_kv/case.cpp +++ b/src/dist/replication/test/simple_kv/case.cpp @@ -627,11 +627,11 @@ bool event_on_aio::check_satisfied(const event *ev) const void event_on_aio::init(aio_task *tsk) { event_on_task::init(tsk); - if (tsk->aio()->type == dsn::AIO_Invalid) + if (tsk->get_aio_context()->type == dsn::AIO_Invalid) return; // for flush task, the type is AIO_Invalid - _type = (tsk->aio()->type == dsn::AIO_Read ? "READ" : "WRITE"); - _file_offset = boost::lexical_cast(tsk->aio()->file_offset); - _buffer_size = boost::lexical_cast(tsk->aio()->buffer_size); + _type = (tsk->get_aio_context()->type == dsn::AIO_Read ? "READ" : "WRITE"); + _file_offset = boost::lexical_cast(tsk->get_aio_context()->file_offset); + _buffer_size = boost::lexical_cast(tsk->get_aio_context()->buffer_size); } void event_on_aio_enqueue::internal_to_string(std::ostream &oss) const