diff --git a/include/dsn/tool-api/task.h b/include/dsn/tool-api/task.h index 172d33d7fd..513c8c7f74 100644 --- a/include/dsn/tool-api/task.h +++ b/include/dsn/tool-api/task.h @@ -116,13 +116,13 @@ extern __thread struct __tls_dsn__ tls_dsn; /// /// So the running thread of callback will be determined hierarchically: /// -/// |<---determined by "hash" +/// |<---determined by "node" /// | |<-----determined by "code" -/// | | |--------determined by "node" -/// | | | -/// |------|--------|------------V----------------------| |--------------| -/// | |----|--------V-------------| |-------------| | | | -/// | | |--V-----| |--------| | | | | | | +/// | | |-------------determined by "hash" +/// | | | +/// |------V--------|-------|---------------------------| |--------------| +/// | |-------------V-------|-----| |-------------| | | | +/// | | |--------| |----V---| | | | | | | /// | | | thread | ... | thread | | | | | | | /// | | |--------| |--------| | | | | | | /// | | thread pool | ... | thread pool | | | | @@ -586,7 +586,7 @@ class aio_task : public task aio_task(dsn::task_code code, aio_handler &&cb, int hash = 0, service_node *node = nullptr); ~aio_task(); - void enqueue_aio(error_code err, size_t transferred_size); + void enqueue(error_code err, size_t transferred_size); size_t get_transferred_size() const { return _transferred_size; } disk_aio *aio() { return _aio; } diff --git a/src/apps/nfs/nfs_client_impl.cpp b/src/apps/nfs/nfs_client_impl.cpp index 3fd24706e6..5770b04df7 100644 --- a/src/apps/nfs/nfs_client_impl.cpp +++ b/src/apps/nfs/nfs_client_impl.cpp @@ -101,7 +101,7 @@ void nfs_client_impl::end_get_file_size(::dsn::error_code err, ureq->file_size_req.source.to_string(), ureq->file_size_req.source_dir.c_str(), err.to_string()); - ureq->nfs_task->enqueue_aio(err, 0); + ureq->nfs_task->enqueue(err, 0); return; } @@ -111,7 +111,7 @@ void nfs_client_impl::end_get_file_size(::dsn::error_code err, ureq->file_size_req.source.to_string(), ureq->file_size_req.source_dir.c_str(), err.to_string()); - ureq->nfs_task->enqueue_aio(err, 0); + ureq->nfs_task->enqueue(err, 0); return; } @@ -501,7 +501,7 @@ void nfs_client_impl::handle_completion(const user_request_ptr &req, error_code req->file_contexts.clear(); // notify aio_task - req->nfs_task->enqueue_aio(err, err == ERR_OK ? total_size : 0); + req->nfs_task->enqueue(err, err == ERR_OK ? total_size : 0); } } } diff --git a/src/core/core/disk_engine.cpp b/src/core/core/disk_engine.cpp index aff7a7162c..e563711a44 100644 --- a/src/core/core/disk_engine.cpp +++ b/src/core/core/disk_engine.cpp @@ -111,7 +111,7 @@ aio_task *disk_file::on_read_completed(aio_task *wk, error_code err, size_t size { dassert(wk->next == nullptr, ""); auto ret = _read_queue.on_work_completed(wk, nullptr); - wk->enqueue_aio(err, size); + wk->enqueue(err, size); wk->release_ref(); // added in above read return ret; @@ -132,10 +132,10 @@ aio_task *disk_file::on_write_completed(aio_task *wk, void *ctx, error_code err, (int)size, (int)this_size); - wk->enqueue_aio(err, this_size); + wk->enqueue(err, this_size); size -= this_size; } else { - wk->enqueue_aio(err, size); + wk->enqueue(err, size); } wk->release_ref(); // added in above write @@ -214,12 +214,12 @@ error_code disk_engine::flush(dsn_handle_t fh) void disk_engine::read(aio_task *aio) { if (!_is_running) { - aio->enqueue_aio(ERR_SERVICE_NOT_FOUND, 0); + aio->enqueue(ERR_SERVICE_NOT_FOUND, 0); return; } if (!aio->spec().on_aio_call.execute(task::get_current_task(), aio, true)) { - aio->enqueue_aio(ERR_FILE_OPERATION_FAILED, 0); + aio->enqueue(ERR_FILE_OPERATION_FAILED, 0); return; } @@ -263,12 +263,12 @@ class batch_write_io_task : public aio_task void disk_engine::write(aio_task *aio) { if (!_is_running) { - aio->enqueue_aio(ERR_SERVICE_NOT_FOUND, 0); + aio->enqueue(ERR_SERVICE_NOT_FOUND, 0); return; } if (!aio->spec().on_aio_call.execute(task::get_current_task(), aio, true)) { - aio->enqueue_aio(ERR_FILE_OPERATION_FAILED, 0); + aio->enqueue(ERR_FILE_OPERATION_FAILED, 0); return; } @@ -340,7 +340,7 @@ void disk_engine::complete_io(aio_task *aio, error_code err, uint32_t bytes, int // batching if (aio->code() == LPC_AIO_BATCH_WRITE) { - aio->enqueue_aio(err, (size_t)bytes); + aio->enqueue(err, (size_t)bytes); aio->release_ref(); // added in process_write } diff --git a/src/core/core/task.cpp b/src/core/core/task.cpp index 5507c16e94..2beba721f9 100644 --- a/src/core/core/task.cpp +++ b/src/core/core/task.cpp @@ -593,7 +593,7 @@ aio_task::~aio_task() _aio = nullptr; } -void aio_task::enqueue_aio(error_code err, size_t transferred_size) +void aio_task::enqueue(error_code err, size_t transferred_size) { set_error_code(err); _transferred_size = transferred_size; diff --git a/src/dist/replication/lib/mutation_log.cpp b/src/dist/replication/lib/mutation_log.cpp index efc34041ae..c5745dd246 100644 --- a/src/dist/replication/lib/mutation_log.cpp +++ b/src/dist/replication/lib/mutation_log.cpp @@ -191,7 +191,7 @@ void mutation_log_shared::write_pending_mutations(bool release_lock_required) // notify the callbacks // ATTENTION: callback may be called before this code block executed done. for (auto &c : *callbacks) { - c->enqueue_aio(err, sz); + c->enqueue(err, sz); } // start to write next if possible