Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
refactor: rename disk_aio to aio_context (#311)
Browse files Browse the repository at this point in the history
  • Loading branch information
Wu Tao authored Sep 16, 2019
1 parent f4947c4 commit 4049f32
Show file tree
Hide file tree
Showing 13 changed files with 73 additions and 69 deletions.
10 changes: 8 additions & 2 deletions include/dsn/tool-api/aio_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,14 @@ class aio_provider

virtual error_code close(dsn_handle_t fh) = 0;
virtual error_code flush(dsn_handle_t fh) = 0;

// Submits the aio_task to the underlying disk-io executor.
// This task may not be executed immediately, call `aio_task::wait`
// to wait until it completes.
// TODO(wutao1): Call it aio_submit().
virtual void aio(aio_task *aio) = 0;
virtual disk_aio *prepare_aio_context(aio_task *) = 0;

virtual aio_context *prepare_aio_context(aio_task *) = 0;

virtual void start() = 0;

Expand All @@ -94,4 +100,4 @@ class aio_provider
};

/*@}*/
} // end namespace
} // namespace dsn
15 changes: 8 additions & 7 deletions include/dsn/tool-api/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ enum aio_type
};

class disk_engine;
class disk_aio
class aio_context : public ref_counter
{
public:
// filled by apps
Expand All @@ -557,9 +557,9 @@ class disk_aio
// filled by frameworks
aio_type type;
disk_engine *engine;
void *file_object;
void *file_object; // TODO(wutao1): make it disk_file*, and distinguish it from `file`

disk_aio()
aio_context()
: file(nullptr),
buffer(nullptr),
support_write_vec(false),
Expand All @@ -578,15 +578,16 @@ class aio_task : public task
public:
aio_task(task_code code, const aio_handler &cb, int hash = 0, service_node *node = nullptr);
aio_task(task_code code, aio_handler &&cb, int hash = 0, service_node *node = nullptr);
~aio_task();

// tell the compiler that we want both the enqueue from base task and ours
// to prevent the compiler complaining -Werror,-Woverloaded-virtual.
using task::enqueue;
void enqueue(error_code err, size_t transferred_size);

size_t get_transferred_size() const { return _transferred_size; }
disk_aio *aio() { return _aio; }

// The ownership of `aio_context` is held by `aio_task`.
aio_context *get_aio_context() { return _aio_ctx.get(); }

// merge buffers in _unmerged_write_buffers to a single merged buffer.
// and store it in _merged_write_buffer_holder.
Expand All @@ -606,8 +607,8 @@ class aio_task : public task
protected:
void clear_non_trivial_on_task_end() override { _cb = nullptr; }

protected:
disk_aio *_aio;
private:
dsn::ref_ptr<aio_context> _aio_ctx;
size_t _transferred_size;
aio_handler _cb;
};
Expand Down
26 changes: 13 additions & 13 deletions src/core/core/disk_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ aio_task *disk_write_queue::unlink_next_workload(void *plength)

aio_task *first = _hdr._first, *current = first, *last = first;
while (nullptr != current) {
auto io = current->aio();
auto io = current->get_aio_context();
if (sz == 0) {
sz = io->buffer_size;
next_offset = io->file_offset + sz;
Expand Down Expand Up @@ -78,7 +78,7 @@ disk_file::disk_file(dsn_handle_t handle) : _handle(handle) {}

aio_task *disk_file::read(aio_task *tsk)
{
tsk->add_ref(); // release on completion
tsk->add_ref(); // release on completion, see `on_read_completed`.
return _read_queue.add_work(tsk, nullptr);
}

Expand Down Expand Up @@ -107,7 +107,7 @@ aio_task *disk_file::on_write_completed(aio_task *wk, void *ctx, error_code err,
wk->next = nullptr;

if (err == ERR_OK) {
size_t this_size = (size_t)wk->aio()->buffer_size;
size_t this_size = (size_t)wk->get_aio_context()->buffer_size;
dassert(size >= this_size,
"written buffer size does not equal to input buffer's size: %d vs %d",
(int)size,
Expand Down Expand Up @@ -195,7 +195,7 @@ void disk_engine::read(aio_task *aio)
return;
}

auto dio = aio->aio();
auto dio = aio->get_aio_context();
auto df = (disk_file *)dio->file;
dio->file = df->native_handle();
dio->file_object = df;
Expand All @@ -218,12 +218,12 @@ class batch_write_io_task : public aio_task

virtual void exec() override
{
auto df = (disk_file *)_tasks->aio()->file_object;
auto df = (disk_file *)_tasks->get_aio_context()->file_object;
uint32_t sz;

auto wk = df->on_write_completed(_tasks, (void *)&sz, error(), _transferred_size);
auto wk = df->on_write_completed(_tasks, (void *)&sz, error(), get_transferred_size());
if (wk) {
wk->aio()->engine->process_write(wk, sz);
wk->get_aio_context()->engine->process_write(wk, sz);
}
}

Expand All @@ -243,7 +243,7 @@ void disk_engine::write(aio_task *aio)
return;
}

auto dio = aio->aio();
auto dio = aio->get_aio_context();
auto df = (disk_file *)dio->file;
dio->file = df->native_handle();
dio->file_object = df;
Expand All @@ -259,7 +259,7 @@ void disk_engine::write(aio_task *aio)

void disk_engine::process_write(aio_task *aio, uint32_t sz)
{
disk_aio *dio = aio->aio();
aio_context *dio = aio->get_aio_context();

// no batching
if (dio->buffer_size == sz) {
Expand All @@ -278,7 +278,7 @@ void disk_engine::process_write(aio_task *aio, uint32_t sz)
else {
// setup io task
auto new_task = new batch_write_io_task(aio);
auto new_dio = new_task->aio();
auto new_dio = new_task->get_aio_context();
new_dio->buffer_size = sz;
new_dio->file_offset = dio->file_offset;
new_dio->file = dio->file;
Expand All @@ -288,7 +288,7 @@ void disk_engine::process_write(aio_task *aio, uint32_t sz)

auto cur_task = aio;
do {
auto cur_dio = cur_task->aio();
auto cur_dio = cur_task->get_aio_context();
if (cur_dio->buffer) {
dsn_file_buffer_t buf;
buf.buffer = cur_dio->buffer;
Expand Down Expand Up @@ -324,8 +324,8 @@ void disk_engine::complete_io(aio_task *aio, error_code err, uint32_t bytes, int

// no batching
else {
auto df = (disk_file *)(aio->aio()->file_object);
if (aio->aio()->type == AIO_Read) {
auto df = (disk_file *)(aio->get_aio_context()->file_object);
if (aio->get_aio_context()->type == AIO_Read) {
auto wk = df->on_read_completed(aio, err, (size_t)bytes);
if (wk) {
_provider->aio(wk);
Expand Down
3 changes: 2 additions & 1 deletion src/core/core/disk_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ class disk_engine
void read(aio_task *aio);
void write(aio_task *aio);

disk_aio *prepare_aio_context(aio_task *tsk) { return _provider->prepare_aio_context(tsk); }
aio_context *prepare_aio_context(aio_task *tsk) { return _provider->prepare_aio_context(tsk); }

service_node *node() const { return _node; }

private:
Expand Down
28 changes: 14 additions & 14 deletions src/core/core/file_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ namespace file {
int hash /*= 0*/)
{
auto cb = create_aio_task(callback_code, tracker, std::move(callback), hash);
cb->aio()->buffer = buffer;
cb->aio()->buffer_size = count;
cb->aio()->file = file;
cb->aio()->file_offset = offset;
cb->aio()->type = AIO_Read;
cb->get_aio_context()->buffer = buffer;
cb->get_aio_context()->buffer_size = count;
cb->get_aio_context()->file = file;
cb->get_aio_context()->file_offset = offset;
cb->get_aio_context()->type = AIO_Read;

task::get_current_disk()->read(cb);
return cb;
Expand All @@ -70,11 +70,11 @@ namespace file {
int hash /*= 0*/)
{
auto cb = create_aio_task(callback_code, tracker, std::move(callback), hash);
cb->aio()->buffer = (char *)buffer;
cb->aio()->buffer_size = count;
cb->aio()->file = file;
cb->aio()->file_offset = offset;
cb->aio()->type = AIO_Write;
cb->get_aio_context()->buffer = (char *)buffer;
cb->get_aio_context()->buffer_size = count;
cb->get_aio_context()->file = file;
cb->get_aio_context()->file_offset = offset;
cb->get_aio_context()->type = AIO_Write;

task::get_current_disk()->write(cb);
return cb;
Expand All @@ -90,13 +90,13 @@ namespace file {
int hash /*= 0*/)
{
auto cb = create_aio_task(callback_code, tracker, std::move(callback), hash);
cb->aio()->file = file;
cb->aio()->file_offset = offset;
cb->aio()->type = AIO_Write;
cb->get_aio_context()->file = file;
cb->get_aio_context()->file_offset = offset;
cb->get_aio_context()->type = AIO_Write;
for (int i = 0; i < buffer_count; i++) {
if (buffers[i].size > 0) {
cb->_unmerged_write_buffers.push_back(buffers[i]);
cb->aio()->buffer_size += buffers[i].size;
cb->get_aio_context()->buffer_size += buffers[i].size;
}
}

Expand Down
22 changes: 8 additions & 14 deletions src/core/core/task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -601,34 +601,28 @@ aio_task::aio_task(dsn::task_code code, aio_handler &&cb, int hash, service_node
spec().name.c_str());
set_error_code(ERR_IO_PENDING);

auto disk = get_current_disk();
_aio = disk->prepare_aio_context(this);
disk_engine *disk = task::get_current_disk();
_aio_ctx = disk->prepare_aio_context(this);
}

void aio_task::collapse()
{
if (!_unmerged_write_buffers.empty()) {
std::shared_ptr<char> buffer(dsn::utils::make_shared_array<char>(_aio->buffer_size));
std::shared_ptr<char> buffer(dsn::utils::make_shared_array<char>(_aio_ctx->buffer_size));
char *dest = buffer.get();
for (const dsn_file_buffer_t &b : _unmerged_write_buffers) {
::memcpy(dest, b.buffer, b.size);
dest += b.size;
}
dassert(dest - buffer.get() == _aio->buffer_size,
dassert(dest - buffer.get() == _aio_ctx->buffer_size,
"%u VS %u",
dest - buffer.get(),
_aio->buffer_size);
_aio->buffer = buffer.get();
_merged_write_buffer_holder.assign(std::move(buffer), 0, _aio->buffer_size);
_aio_ctx->buffer_size);
_aio_ctx->buffer = buffer.get();
_merged_write_buffer_holder.assign(std::move(buffer), 0, _aio_ctx->buffer_size);
}
}

aio_task::~aio_task()
{
delete _aio;
_aio = nullptr;
}

void aio_task::enqueue(error_code err, size_t transferred_size)
{
set_error_code(err);
Expand All @@ -639,4 +633,4 @@ void aio_task::enqueue(error_code err, size_t transferred_size)
task::enqueue(node()->computation()->get_pool(spec().pool_code));
}

} // end namespace
} // namespace dsn
7 changes: 3 additions & 4 deletions src/core/tests/task_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,18 @@ class task_test : public ::testing::Test
{
disk_file *fp = file::open("config-test.ini", O_RDONLY | O_BINARY, 0);

// this aio task is enqueued into read_queue of disk_engine
// this aio task is enqueued into read-queue of disk_engine
char buffer[128];
// in simulator environment this task will be executed immediately,
// so we excluded config-test-sim.ini for this test.
auto t = file::read(fp, buffer, 128, 0, LPC_TASK_TEST, nullptr, nullptr);

t->wait(10000);
ASSERT_TRUE(t->_wait_event.load() != nullptr);
ASSERT_EQ(t->_state, task_state::TASK_STATE_FINISHED);

// signal a finished task won't cause failure
ASSERT_TRUE(t->signal_waiters());
ASSERT_TRUE(t->signal_waiters());
t->signal_waiters(); // signal_waiters may return false
t->signal_waiters();
}
};

Expand Down
2 changes: 1 addition & 1 deletion src/core/tools/common/fault_injector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ static void fault_on_task_cancel_post(task *caller, task *callee, bool succ) {}
// return true means continue, otherwise early terminate with task::set_error_code
static bool fault_on_aio_call(task *caller, aio_task *callee)
{
switch (callee->aio()->type) {
switch (callee->get_aio_context()->type) {
case AIO_Read:
if (rand::next_double01() < s_fj_opts[callee->spec().code].disk_read_fail_ratio) {
ddebug("fault inject %s at %s", callee->spec().name.c_str(), __FUNCTION__);
Expand Down
4 changes: 2 additions & 2 deletions src/core/tools/common/native_aio_provider.linux.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ error_code native_linux_aio_provider::flush(dsn_handle_t fh)
}
}

disk_aio *native_linux_aio_provider::prepare_aio_context(aio_task *tsk)
aio_context *native_linux_aio_provider::prepare_aio_context(aio_task *tsk)
{
return new linux_disk_aio_context(tsk);
}
Expand Down Expand Up @@ -159,7 +159,7 @@ error_code native_linux_aio_provider::aio_internal(aio_task *aio_tsk,
linux_disk_aio_context *aio;
int ret;

aio = (linux_disk_aio_context *)aio_tsk->aio();
aio = (linux_disk_aio_context *)aio_tsk->get_aio_context();

memset(&aio->cb, 0, sizeof(aio->cb));

Expand Down
6 changes: 3 additions & 3 deletions src/core/tools/common/native_aio_provider.linux.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ class native_linux_aio_provider : public aio_provider
virtual error_code close(dsn_handle_t fh) override;
virtual error_code flush(dsn_handle_t fh) override;
virtual void aio(aio_task *aio) override;
virtual disk_aio *prepare_aio_context(aio_task *tsk) override;
virtual aio_context *prepare_aio_context(aio_task *tsk) override;

virtual void start() override;

class linux_disk_aio_context : public disk_aio
class linux_disk_aio_context : public aio_context
{
public:
struct iocb cb;
Expand All @@ -64,7 +64,7 @@ class native_linux_aio_provider : public aio_provider
uint32_t bytes;

explicit linux_disk_aio_context(aio_task *tsk_)
: disk_aio(), tsk(tsk_), this_(nullptr), evt(nullptr), err(ERR_UNKNOWN), bytes(0)
: tsk(tsk_), this_(nullptr), evt(nullptr), err(ERR_UNKNOWN), bytes(0)
{
}
};
Expand Down
4 changes: 2 additions & 2 deletions src/core/tools/common/tracer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ static void tracer_on_aio_call(task *caller, aio_task *callee)
ddebug("%s AIO.CALL, task_id = %016" PRIx64 ", offset = %" PRIu64 ", size = %d",
callee->spec().name.c_str(),
callee->id(),
callee->aio()->file_offset,
callee->aio()->buffer_size);
callee->get_aio_context()->file_offset,
callee->get_aio_context()->buffer_size);
}

static void tracer_on_aio_enqueue(aio_task *this_)
Expand Down
7 changes: 5 additions & 2 deletions src/dist/replication/lib/mutation_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1816,11 +1816,12 @@ class log_file::file_streamer
// possible error_code:
// ERR_OK result would always size as expected
// ERR_HANDLE_EOF if there are not enough data in file. result would still be
// filled with possible data
// filled with possible data
// ERR_FILE_OPERATION_FAILED filesystem failure
error_code read_next(size_t size, /*out*/ blob &result)
{
binary_writer writer(size);

#define TRY(x) \
do { \
auto _x = (x); \
Expand All @@ -1829,6 +1830,7 @@ class log_file::file_streamer
return _x; \
} \
} while (0)

TRY(_current_buffer->wait_ongoing_task());
if (size < _current_buffer->length()) {
result.assign(_current_buffer->_buffer.get(), _current_buffer->_begin, size);
Expand Down Expand Up @@ -1884,7 +1886,8 @@ class log_file::file_streamer
}

// buffer size, in bytes
static const size_t block_size_bytes = 1024 * 1024;
// TODO(wutao1): call it BLOCK_BYTES_SIZE
static constexpr size_t block_size_bytes = 1024 * 1024; // 1MB
struct buffer_t
{
std::unique_ptr<char[]> _buffer; // with block_size
Expand Down
Loading

0 comments on commit 4049f32

Please sign in to comment.