Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
task: fix based on comments
Browse files Browse the repository at this point in the history
  • Loading branch information
shengofsun committed May 15, 2018
1 parent 0273bd1 commit 7aaf8d5
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 20 deletions.
14 changes: 7 additions & 7 deletions include/dsn/tool-api/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,13 @@ extern __thread struct __tls_dsn__ tls_dsn;
///
/// So the running thread of callback will be determined hierarchically:
///
/// |<---determined by "hash"
/// |<---determined by "node"
/// | |<-----determined by "code"
/// | | |--------determined by "node"
/// | | |
/// |------|--------|------------V----------------------| |--------------|
/// | |----|--------V-------------| |-------------| | | |
/// | | |--V-----| |--------| | | | | | |
/// | | |-------------determined by "hash"
/// | | |
/// |------V--------|-------|---------------------------| |--------------|
/// | |-------------V-------|-----| |-------------| | | |
/// | | |--------| |----V---| | | | | | |
/// | | | thread | ... | thread | | | | | | |
/// | | |--------| |--------| | | | | | |
/// | | thread pool | ... | thread pool | | | |
Expand Down Expand Up @@ -586,7 +586,7 @@ class aio_task : public task
aio_task(dsn::task_code code, aio_handler &&cb, int hash = 0, service_node *node = nullptr);
~aio_task();

void enqueue_aio(error_code err, size_t transferred_size);
void enqueue(error_code err, size_t transferred_size);
size_t get_transferred_size() const { return _transferred_size; }
disk_aio *aio() { return _aio; }

Expand Down
6 changes: 3 additions & 3 deletions src/apps/nfs/nfs_client_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ void nfs_client_impl::end_get_file_size(::dsn::error_code err,
ureq->file_size_req.source.to_string(),
ureq->file_size_req.source_dir.c_str(),
err.to_string());
ureq->nfs_task->enqueue_aio(err, 0);
ureq->nfs_task->enqueue(err, 0);
return;
}

Expand All @@ -111,7 +111,7 @@ void nfs_client_impl::end_get_file_size(::dsn::error_code err,
ureq->file_size_req.source.to_string(),
ureq->file_size_req.source_dir.c_str(),
err.to_string());
ureq->nfs_task->enqueue_aio(err, 0);
ureq->nfs_task->enqueue(err, 0);
return;
}

Expand Down Expand Up @@ -501,7 +501,7 @@ void nfs_client_impl::handle_completion(const user_request_ptr &req, error_code
req->file_contexts.clear();

// notify aio_task
req->nfs_task->enqueue_aio(err, err == ERR_OK ? total_size : 0);
req->nfs_task->enqueue(err, err == ERR_OK ? total_size : 0);
}
}
}
16 changes: 8 additions & 8 deletions src/core/core/disk_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ aio_task *disk_file::on_read_completed(aio_task *wk, error_code err, size_t size
{
dassert(wk->next == nullptr, "");
auto ret = _read_queue.on_work_completed(wk, nullptr);
wk->enqueue_aio(err, size);
wk->enqueue(err, size);
wk->release_ref(); // added in above read

return ret;
Expand All @@ -132,10 +132,10 @@ aio_task *disk_file::on_write_completed(aio_task *wk, void *ctx, error_code err,
(int)size,
(int)this_size);

wk->enqueue_aio(err, this_size);
wk->enqueue(err, this_size);
size -= this_size;
} else {
wk->enqueue_aio(err, size);
wk->enqueue(err, size);
}

wk->release_ref(); // added in above write
Expand Down Expand Up @@ -214,12 +214,12 @@ error_code disk_engine::flush(dsn_handle_t fh)
void disk_engine::read(aio_task *aio)
{
if (!_is_running) {
aio->enqueue_aio(ERR_SERVICE_NOT_FOUND, 0);
aio->enqueue(ERR_SERVICE_NOT_FOUND, 0);
return;
}

if (!aio->spec().on_aio_call.execute(task::get_current_task(), aio, true)) {
aio->enqueue_aio(ERR_FILE_OPERATION_FAILED, 0);
aio->enqueue(ERR_FILE_OPERATION_FAILED, 0);
return;
}

Expand Down Expand Up @@ -263,12 +263,12 @@ class batch_write_io_task : public aio_task
void disk_engine::write(aio_task *aio)
{
if (!_is_running) {
aio->enqueue_aio(ERR_SERVICE_NOT_FOUND, 0);
aio->enqueue(ERR_SERVICE_NOT_FOUND, 0);
return;
}

if (!aio->spec().on_aio_call.execute(task::get_current_task(), aio, true)) {
aio->enqueue_aio(ERR_FILE_OPERATION_FAILED, 0);
aio->enqueue(ERR_FILE_OPERATION_FAILED, 0);
return;
}

Expand Down Expand Up @@ -340,7 +340,7 @@ void disk_engine::complete_io(aio_task *aio, error_code err, uint32_t bytes, int

// batching
if (aio->code() == LPC_AIO_BATCH_WRITE) {
aio->enqueue_aio(err, (size_t)bytes);
aio->enqueue(err, (size_t)bytes);
aio->release_ref(); // added in process_write
}

Expand Down
2 changes: 1 addition & 1 deletion src/core/core/task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ aio_task::~aio_task()
_aio = nullptr;
}

void aio_task::enqueue_aio(error_code err, size_t transferred_size)
void aio_task::enqueue(error_code err, size_t transferred_size)
{
set_error_code(err);
_transferred_size = transferred_size;
Expand Down
2 changes: 1 addition & 1 deletion src/dist/replication/lib/mutation_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ void mutation_log_shared::write_pending_mutations(bool release_lock_required)
// notify the callbacks
// ATTENTION: callback may be called before this code block executed done.
for (auto &c : *callbacks) {
c->enqueue_aio(err, sz);
c->enqueue(err, sz);
}

// start to write next if possible
Expand Down

0 comments on commit 7aaf8d5

Please sign in to comment.