Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

refactor: rename disk_aio to aio_context #311

Merged
merged 8 commits into from
Sep 16, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions include/dsn/tool-api/aio_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,14 @@ class aio_provider

virtual error_code close(dsn_handle_t fh) = 0;
virtual error_code flush(dsn_handle_t fh) = 0;

// Submits the aio_task to the underlying disk-io executor.
// This task may not be executed immediately, call `aio_task::wait`
// to wait until it completes.
// TODO(wutao1): Call it aio_submit().
virtual void aio(aio_task *aio) = 0;
virtual disk_aio *prepare_aio_context(aio_task *) = 0;

virtual aio_context *prepare_aio_context(aio_task *) = 0;

virtual void start() = 0;

Expand All @@ -94,4 +100,4 @@ class aio_provider
};

/*@}*/
} // end namespace
} // namespace dsn
15 changes: 8 additions & 7 deletions include/dsn/tool-api/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ enum aio_type
};

class disk_engine;
class disk_aio
class aio_context : public ref_counter
{
public:
// filled by apps
Expand All @@ -557,9 +557,9 @@ class disk_aio
// filled by frameworks
aio_type type;
disk_engine *engine;
void *file_object;
void *file_object; // TODO(wutao1): make it disk_file*, and distinguish it from `file`

disk_aio()
aio_context()
: file(nullptr),
buffer(nullptr),
support_write_vec(false),
Expand All @@ -578,15 +578,16 @@ class aio_task : public task
public:
aio_task(task_code code, const aio_handler &cb, int hash = 0, service_node *node = nullptr);
aio_task(task_code code, aio_handler &&cb, int hash = 0, service_node *node = nullptr);
~aio_task();

// tell the compiler that we want both the enqueue from base task and ours
// to prevent the compiler complaining -Werror,-Woverloaded-virtual.
using task::enqueue;
void enqueue(error_code err, size_t transferred_size);

size_t get_transferred_size() const { return _transferred_size; }
disk_aio *aio() { return _aio; }

// The ownership of `aio_context` is held by `aio_task`.
aio_context *get_aio_context() { return _aio_ctx.get(); }

// merge buffers in _unmerged_write_buffers to a single merged buffer.
// and store it in _merged_write_buffer_holder.
Expand All @@ -606,8 +607,8 @@ class aio_task : public task
protected:
void clear_non_trivial_on_task_end() override { _cb = nullptr; }

protected:
disk_aio *_aio;
private:
dsn::ref_ptr<aio_context> _aio_ctx;
size_t _transferred_size;
aio_handler _cb;
};
Expand Down
26 changes: 13 additions & 13 deletions src/core/core/disk_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ aio_task *disk_write_queue::unlink_next_workload(void *plength)

aio_task *first = _hdr._first, *current = first, *last = first;
while (nullptr != current) {
auto io = current->aio();
auto io = current->get_aio_context();
if (sz == 0) {
sz = io->buffer_size;
next_offset = io->file_offset + sz;
Expand Down Expand Up @@ -78,7 +78,7 @@ disk_file::disk_file(dsn_handle_t handle) : _handle(handle) {}

aio_task *disk_file::read(aio_task *tsk)
{
tsk->add_ref(); // release on completion
tsk->add_ref(); // release on completion, see `on_read_completed`.
return _read_queue.add_work(tsk, nullptr);
}

Expand Down Expand Up @@ -107,7 +107,7 @@ aio_task *disk_file::on_write_completed(aio_task *wk, void *ctx, error_code err,
wk->next = nullptr;

if (err == ERR_OK) {
size_t this_size = (size_t)wk->aio()->buffer_size;
size_t this_size = (size_t)wk->get_aio_context()->buffer_size;
dassert(size >= this_size,
"written buffer size does not equal to input buffer's size: %d vs %d",
(int)size,
Expand Down Expand Up @@ -195,7 +195,7 @@ void disk_engine::read(aio_task *aio)
return;
}

auto dio = aio->aio();
auto dio = aio->get_aio_context();
auto df = (disk_file *)dio->file;
dio->file = df->native_handle();
dio->file_object = df;
Expand All @@ -218,12 +218,12 @@ class batch_write_io_task : public aio_task

virtual void exec() override
{
auto df = (disk_file *)_tasks->aio()->file_object;
auto df = (disk_file *)_tasks->get_aio_context()->file_object;
uint32_t sz;

auto wk = df->on_write_completed(_tasks, (void *)&sz, error(), _transferred_size);
auto wk = df->on_write_completed(_tasks, (void *)&sz, error(), get_transferred_size());
if (wk) {
wk->aio()->engine->process_write(wk, sz);
wk->get_aio_context()->engine->process_write(wk, sz);
}
}

Expand All @@ -243,7 +243,7 @@ void disk_engine::write(aio_task *aio)
return;
}

auto dio = aio->aio();
auto dio = aio->get_aio_context();
auto df = (disk_file *)dio->file;
dio->file = df->native_handle();
dio->file_object = df;
Expand All @@ -259,7 +259,7 @@ void disk_engine::write(aio_task *aio)

void disk_engine::process_write(aio_task *aio, uint32_t sz)
{
disk_aio *dio = aio->aio();
aio_context *dio = aio->get_aio_context();

// no batching
if (dio->buffer_size == sz) {
Expand All @@ -278,7 +278,7 @@ void disk_engine::process_write(aio_task *aio, uint32_t sz)
else {
// setup io task
auto new_task = new batch_write_io_task(aio);
auto new_dio = new_task->aio();
auto new_dio = new_task->get_aio_context();
new_dio->buffer_size = sz;
new_dio->file_offset = dio->file_offset;
new_dio->file = dio->file;
Expand All @@ -288,7 +288,7 @@ void disk_engine::process_write(aio_task *aio, uint32_t sz)

auto cur_task = aio;
do {
auto cur_dio = cur_task->aio();
auto cur_dio = cur_task->get_aio_context();
if (cur_dio->buffer) {
dsn_file_buffer_t buf;
buf.buffer = cur_dio->buffer;
Expand Down Expand Up @@ -324,8 +324,8 @@ void disk_engine::complete_io(aio_task *aio, error_code err, uint32_t bytes, int

// no batching
else {
auto df = (disk_file *)(aio->aio()->file_object);
if (aio->aio()->type == AIO_Read) {
auto df = (disk_file *)(aio->get_aio_context()->file_object);
if (aio->get_aio_context()->type == AIO_Read) {
auto wk = df->on_read_completed(aio, err, (size_t)bytes);
if (wk) {
_provider->aio(wk);
Expand Down
3 changes: 2 additions & 1 deletion src/core/core/disk_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ class disk_engine
void read(aio_task *aio);
void write(aio_task *aio);

disk_aio *prepare_aio_context(aio_task *tsk) { return _provider->prepare_aio_context(tsk); }
aio_context *prepare_aio_context(aio_task *tsk) { return _provider->prepare_aio_context(tsk); }

service_node *node() const { return _node; }

private:
Expand Down
28 changes: 14 additions & 14 deletions src/core/core/file_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ namespace file {
int hash /*= 0*/)
{
auto cb = create_aio_task(callback_code, tracker, std::move(callback), hash);
cb->aio()->buffer = buffer;
cb->aio()->buffer_size = count;
cb->aio()->file = file;
cb->aio()->file_offset = offset;
cb->aio()->type = AIO_Read;
cb->get_aio_context()->buffer = buffer;
cb->get_aio_context()->buffer_size = count;
cb->get_aio_context()->file = file;
cb->get_aio_context()->file_offset = offset;
cb->get_aio_context()->type = AIO_Read;

task::get_current_disk()->read(cb);
return cb;
Expand All @@ -70,11 +70,11 @@ namespace file {
int hash /*= 0*/)
{
auto cb = create_aio_task(callback_code, tracker, std::move(callback), hash);
cb->aio()->buffer = (char *)buffer;
cb->aio()->buffer_size = count;
cb->aio()->file = file;
cb->aio()->file_offset = offset;
cb->aio()->type = AIO_Write;
cb->get_aio_context()->buffer = (char *)buffer;
cb->get_aio_context()->buffer_size = count;
cb->get_aio_context()->file = file;
cb->get_aio_context()->file_offset = offset;
cb->get_aio_context()->type = AIO_Write;

task::get_current_disk()->write(cb);
return cb;
Expand All @@ -90,13 +90,13 @@ namespace file {
int hash /*= 0*/)
{
auto cb = create_aio_task(callback_code, tracker, std::move(callback), hash);
cb->aio()->file = file;
cb->aio()->file_offset = offset;
cb->aio()->type = AIO_Write;
cb->get_aio_context()->file = file;
cb->get_aio_context()->file_offset = offset;
cb->get_aio_context()->type = AIO_Write;
for (int i = 0; i < buffer_count; i++) {
if (buffers[i].size > 0) {
cb->_unmerged_write_buffers.push_back(buffers[i]);
cb->aio()->buffer_size += buffers[i].size;
cb->get_aio_context()->buffer_size += buffers[i].size;
}
}

Expand Down
22 changes: 8 additions & 14 deletions src/core/core/task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -601,34 +601,28 @@ aio_task::aio_task(dsn::task_code code, aio_handler &&cb, int hash, service_node
spec().name.c_str());
set_error_code(ERR_IO_PENDING);

auto disk = get_current_disk();
_aio = disk->prepare_aio_context(this);
disk_engine *disk = task::get_current_disk();
_aio_ctx = disk->prepare_aio_context(this);
}

void aio_task::collapse()
{
if (!_unmerged_write_buffers.empty()) {
std::shared_ptr<char> buffer(dsn::utils::make_shared_array<char>(_aio->buffer_size));
std::shared_ptr<char> buffer(dsn::utils::make_shared_array<char>(_aio_ctx->buffer_size));
char *dest = buffer.get();
for (const dsn_file_buffer_t &b : _unmerged_write_buffers) {
::memcpy(dest, b.buffer, b.size);
dest += b.size;
}
dassert(dest - buffer.get() == _aio->buffer_size,
dassert(dest - buffer.get() == _aio_ctx->buffer_size,
"%u VS %u",
dest - buffer.get(),
_aio->buffer_size);
_aio->buffer = buffer.get();
_merged_write_buffer_holder.assign(std::move(buffer), 0, _aio->buffer_size);
_aio_ctx->buffer_size);
_aio_ctx->buffer = buffer.get();
_merged_write_buffer_holder.assign(std::move(buffer), 0, _aio_ctx->buffer_size);
}
}

aio_task::~aio_task()
{
delete _aio;
_aio = nullptr;
}

void aio_task::enqueue(error_code err, size_t transferred_size)
{
set_error_code(err);
Expand All @@ -639,4 +633,4 @@ void aio_task::enqueue(error_code err, size_t transferred_size)
task::enqueue(node()->computation()->get_pool(spec().pool_code));
}

} // end namespace
} // namespace dsn
7 changes: 3 additions & 4 deletions src/core/tests/task_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,18 @@ class task_test : public ::testing::Test
{
disk_file *fp = file::open("config-test.ini", O_RDONLY | O_BINARY, 0);

// this aio task is enqueued into read_queue of disk_engine
// this aio task is enqueued into read-queue of disk_engine
char buffer[128];
// in simulator environment this task will be executed immediately,
// so we excluded config-test-sim.ini for this test.
auto t = file::read(fp, buffer, 128, 0, LPC_TASK_TEST, nullptr, nullptr);

t->wait(10000);
ASSERT_TRUE(t->_wait_event.load() != nullptr);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个为什么去掉?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

会导致单测失败

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个不是本次修改引入的单测失败吧?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是的 @acelyc111 但是晚修不如早修

ASSERT_EQ(t->_state, task_state::TASK_STATE_FINISHED);

// signal a finished task won't cause failure
ASSERT_TRUE(t->signal_waiters());
ASSERT_TRUE(t->signal_waiters());
t->signal_waiters(); // signal_waiters may return false
t->signal_waiters();
}
};

Expand Down
64 changes: 64 additions & 0 deletions src/core/tools/common/empty_aio_provider.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
* The MIT License (MIT)
*
* Copyright (c) 2015 Microsoft Corporation
*
* -=- Robust Distributed System Nucleus (rDSN) -=-
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

/*
* Description:
* What is this file about?
*
* Revision history:
* xxxx-xx-xx, author, first version
* xxxx-xx-xx, author, fix bug about xxx
*/

#include "empty_aio_provider.h"

namespace dsn {
namespace tools {

empty_aio_provider::empty_aio_provider(disk_engine *disk, aio_provider *inner_provider)
: aio_provider(disk, inner_provider)
{
}

empty_aio_provider::~empty_aio_provider() {}

dsn_handle_t empty_aio_provider::open(const char *file_name, int flag, int pmode)
{
return (dsn_handle_t)(size_t)(1);
}

error_code empty_aio_provider::close(dsn_handle_t fh) { return ERR_OK; }

error_code empty_aio_provider::flush(dsn_handle_t fh) { return ERR_OK; }

void empty_aio_provider::aio(aio_task *aio)
{
complete_io(aio, ERR_OK, aio->get_aio_context()->buffer_size, 0);
}

aio_context *empty_aio_provider::prepare_aio_context(aio_task *tsk) { return new aio_context(); }
}
}
Loading