Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
refactor: make aio decoupled from dsn_runtime (#466)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhao liwei authored May 22, 2020
1 parent cd53ad6 commit f5ab808
Show file tree
Hide file tree
Showing 36 changed files with 152 additions and 228 deletions.
2 changes: 2 additions & 0 deletions include/dsn/tool-api/file_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,7 @@ extern aio_task_ptr write_vector(disk_file *file,
aio_handler &&callback,
int hash = 0);

extern aio_context_ptr prepare_aio_context(aio_task *tsk);

} // namespace file
} // namespace dsn
4 changes: 2 additions & 2 deletions include/dsn/tool-api/global_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ struct service_spec
bool enable_default_app_mimic;

std::string timer_factory_name;
std::string aio_factory_name;
std::string env_factory_name;
std::string lock_factory_name;
std::string lock_nr_factory_name;
Expand Down Expand Up @@ -189,7 +188,6 @@ CONFIG_FLD(
"; in this case, a [apps.mimic] section must be defined in config files");

CONFIG_FLD_STRING(timer_factory_name, "", "timer service provider")
CONFIG_FLD_STRING(aio_factory_name, "", "asynchonous file system provider")
CONFIG_FLD_STRING(env_factory_name, "", "environment provider")
CONFIG_FLD_STRING(lock_factory_name, "", "recursive exclusive lock provider")
CONFIG_FLD_STRING(lock_nr_factory_name, "", "non-recurisve exclusive lock provider")
Expand All @@ -213,4 +211,6 @@ ENUM_REG(SYS_EXIT_NORMAL)
ENUM_REG(SYS_EXIT_BREAK)
ENUM_REG(SYS_EXIT_EXCEPTION)
ENUM_END(sys_exit_type)

extern const char *FLAGS_aio_factory_name;
} // namespace dsn
11 changes: 2 additions & 9 deletions include/dsn/tool-api/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ struct __tls_dsn__
int node_id;

rpc_engine *rpc;
disk_engine *disk;
env_provider *env;

int last_worker_queue_size;
Expand Down Expand Up @@ -248,7 +247,6 @@ class task : public ref_counter, public extensible_object<task, 4>, public trans
static int get_current_worker_index();
static const char *get_current_node_name();
static rpc_engine *get_current_rpc();
static disk_engine *get_current_disk();
static env_provider *get_current_env();

static void set_tls_dsn_context(
Expand Down Expand Up @@ -572,6 +570,7 @@ class aio_context : public ref_counter
{
}
};
typedef dsn::ref_ptr<aio_context> aio_context_ptr;

class aio_task : public task
{
Expand Down Expand Up @@ -608,7 +607,7 @@ class aio_task : public task
void clear_non_trivial_on_task_end() override { _cb = nullptr; }

private:
dsn::ref_ptr<aio_context> _aio_ctx;
aio_context_ptr _aio_ctx;
size_t _transferred_size;
aio_handler _cb;
};
Expand Down Expand Up @@ -677,12 +676,6 @@ __inline /*static*/ rpc_engine *task::get_current_rpc()
return tls_dsn.rpc;
}

__inline /*static*/ disk_engine *task::get_current_disk()
{
check_tls_dsn();
return tls_dsn.disk;
}

__inline /*static*/ env_provider *task::get_current_env()
{
check_tls_dsn();
Expand Down
3 changes: 0 additions & 3 deletions include/dsn/tool_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ Component providers define the interface for the local components (e.g., network
#include <dsn/tool-api/task_worker.h>
#include <dsn/tool-api/admission_controller.h>
#include <dsn/tool-api/network.h>
#include <dsn/tool-api/aio_provider.h>
#include <dsn/tool-api/env_provider.h>
#include <dsn/tool-api/message_parser.h>
#include <dsn/tool-api/logging_provider.h>
Expand Down Expand Up @@ -140,8 +139,6 @@ DSN_API bool register_component_provider(const char *name,
DSN_API bool
register_component_provider(const char *name, network::factory f, ::dsn::provider_type type);
DSN_API bool
register_component_provider(const char *name, aio_provider::factory f, ::dsn::provider_type type);
DSN_API bool
register_component_provider(const char *name, env_provider::factory f, ::dsn::provider_type type);
DSN_API bool register_component_provider(const char *name,
logging_provider::factory f,
Expand Down
1 change: 1 addition & 0 deletions src/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ add_subdirectory(core)
add_subdirectory(perf_counter)
add_subdirectory(tools)
add_subdirectory(tests)
add_subdirectory(aio)

add_library(dsn_runtime STATIC
$<TARGET_OBJECTS:dsn.core>
Expand Down
17 changes: 17 additions & 0 deletions src/core/aio/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
set(MY_PROJ_NAME dsn_aio)

#Source files under CURRENT project directory will be automatically included.
#You can manually set MY_PROJ_SRC to include source files under other directories.
set(MY_PROJ_SRC "")

#Search mode for source files under CURRENT project directory ?
#"GLOB_RECURSE" for recursive search
#"GLOB" for non - recursive search
set(MY_SRC_SEARCH_MODE "GLOB")

set(MY_PROJ_LIBS dsn_runtime)

#Extra files that will be installed
set(MY_BINPLACES "")

dsn_add_static_library()
13 changes: 2 additions & 11 deletions src/core/core/aio_provider.cpp → src/core/aio/aio_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,7 @@
* THE SOFTWARE.
*/

/*
* Description:
* What is this file about?
*
* Revision history:
* xxxx-xx-xx, author, first version
* xxxx-xx-xx, author, fix bug about xxx
*/

#include <dsn/tool-api/aio_provider.h>
#include "aio_provider.h"
#include "disk_engine.h"

namespace dsn {
Expand All @@ -50,4 +41,4 @@ void aio_provider::complete_io(aio_task *aio,
_engine->complete_io(aio, err, bytes, delay_milliseconds);
}

} // end namespace dsn
} // namespace dsn
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,6 @@ class aio_provider

virtual aio_context *prepare_aio_context(aio_task *) = 0;

virtual void start() = 0;

protected:
DSN_API void
complete_io(aio_task *aio, error_code err, uint32_t bytes, int delay_milliseconds = 0);
Expand Down
57 changes: 57 additions & 0 deletions src/core/aio/aio_task.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#include "core/core/task_engine.h"
#include <dsn/tool-api/file_io.h>
#include <dsn/utility/error_code.h>

namespace dsn {

aio_task::aio_task(dsn::task_code code, const aio_handler &cb, int hash, service_node *node)
: aio_task(code, aio_handler(cb), hash, node)
{
}

aio_task::aio_task(dsn::task_code code, aio_handler &&cb, int hash, service_node *node)
: task(code, hash, node), _cb(std::move(cb))
{
_is_null = (_cb == nullptr);

dassert(TASK_TYPE_AIO == spec().type,
"%s is not of AIO type, please use DEFINE_TASK_CODE_AIO to define the task code",
spec().name.c_str());
set_error_code(ERR_IO_PENDING);

_aio_ctx = file::prepare_aio_context(this);
}

void aio_task::collapse()
{
if (!_unmerged_write_buffers.empty()) {
std::shared_ptr<char> buffer(dsn::utils::make_shared_array<char>(_aio_ctx->buffer_size));
char *dest = buffer.get();
for (const dsn_file_buffer_t &b : _unmerged_write_buffers) {
::memcpy(dest, b.buffer, b.size);
dest += b.size;
}
dassert(dest - buffer.get() == _aio_ctx->buffer_size,
"%u VS %u",
dest - buffer.get(),
_aio_ctx->buffer_size);
_aio_ctx->buffer = buffer.get();
_merged_write_buffer_holder.assign(std::move(buffer), 0, _aio_ctx->buffer_size);
}
}

void aio_task::enqueue(error_code err, size_t transferred_size)
{
set_error_code(err);
_transferred_size = transferred_size;

spec().on_aio_enqueue.execute(this);

task::enqueue(node()->computation()->get_pool(spec().pool_code));
}

} // namespace dsn
36 changes: 11 additions & 25 deletions src/core/core/disk_engine.cpp → src/core/aio/disk_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
*/

#include "disk_engine.h"
#include "sim_aio_provider.h"
#include "core/core/service_engine.h"

using namespace dsn::utils;

Expand Down Expand Up @@ -132,25 +134,20 @@ aio_task *disk_file::on_write_completed(aio_task *wk, void *ctx, error_code err,
}

//----------------- disk_engine ------------------------
disk_engine::disk_engine(service_node *node)
disk_engine::disk_engine()
{
_is_running = false;
_provider = nullptr;
_node = node;
_node = service_engine::instance().get_all_nodes().begin()->second.get();

// use native_linux_aio_provider in default
if (!strcmp(FLAGS_aio_factory_name, "dsn::tools::sim_aio_provider")) {
_provider.reset(new aio::sim_aio_provider(this, nullptr));
} else {
_provider.reset(new native_linux_aio_provider(this, nullptr));
}
}

disk_engine::~disk_engine() {}

void disk_engine::start(aio_provider *provider)
{
if (_is_running)
return;

_provider = provider;
_provider->start();
_is_running = true;
}

disk_file *disk_engine::open(const char *file_name, int flag, int pmode)
{
dsn_handle_t nh = _provider->open(file_name, flag, pmode);
Expand Down Expand Up @@ -185,11 +182,6 @@ error_code disk_engine::flush(disk_file *fh)

void disk_engine::read(aio_task *aio)
{
if (!_is_running) {
aio->enqueue(ERR_SERVICE_NOT_FOUND, 0);
return;
}

if (!aio->spec().on_aio_call.execute(task::get_current_task(), aio, true)) {
aio->enqueue(ERR_FILE_OPERATION_FAILED, 0);
return;
Expand Down Expand Up @@ -233,11 +225,6 @@ class batch_write_io_task : public aio_task

void disk_engine::write(aio_task *aio)
{
if (!_is_running) {
aio->enqueue(ERR_SERVICE_NOT_FOUND, 0);
return;
}

if (!aio->spec().on_aio_call.execute(task::get_current_task(), aio, true)) {
aio->enqueue(ERR_FILE_OPERATION_FAILED, 0);
return;
Expand Down Expand Up @@ -342,5 +329,4 @@ void disk_engine::complete_io(aio_task *aio, error_code err, uint32_t bytes, int
}
}
}

} // namespace dsn
25 changes: 11 additions & 14 deletions src/core/core/disk_engine.h → src/core/aio/disk_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@

#pragma once

#include "service_engine.h"
#include "aio_provider.h"

#include <dsn/utility/synchronize.h>
#include <dsn/tool-api/aio_provider.h>
#include <dsn/utility/work_queue.h>

namespace dsn {
Expand Down Expand Up @@ -68,14 +67,9 @@ class disk_file
work_queue<aio_task> _read_queue;
};

class disk_engine
class disk_engine : public utils::singleton<disk_engine>
{
public:
disk_engine(service_node *node);
~disk_engine();

void start(aio_provider *provider);

// asynchonous file read/write
disk_file *open(const char *file_name, int flag, int pmode);
error_code close(disk_file *fh);
Expand All @@ -84,19 +78,22 @@ class disk_engine
void write(aio_task *aio);

aio_context *prepare_aio_context(aio_task *tsk) { return _provider->prepare_aio_context(tsk); }

service_node *node() const { return _node; }

private:
friend class aio_provider;
friend class batch_write_io_task;
// the object of disk_engine must be created by `singleton::instance`
disk_engine();
~disk_engine();

void process_write(aio_task *wk, uint32_t sz);
void complete_io(aio_task *aio, error_code err, uint32_t bytes, int delay_milliseconds = 0);

private:
volatile bool _is_running;
aio_provider *_provider;
std::unique_ptr<aio_provider> _provider;
service_node *_node;

friend class aio_provider;
friend class batch_write_io_task;
friend class utils::singleton<disk_engine>;
};

} // namespace dsn
Loading

0 comments on commit f5ab808

Please sign in to comment.