Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

perf: optimizing write latency using independent IO queues replace of libaio #633

Merged
merged 74 commits into from
Oct 19, 2020
Merged
Show file tree
Hide file tree
Changes from 73 commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
02c608d
refactor: implement thread-pool-based asynchrounous IO
neverchanje Jul 17, 2020
24cedc7
merge update
foreverneverer Sep 8, 2020
4b31613
add tracer
foreverneverer Sep 9, 2020
18ad5e9
update tracer
foreverneverer Sep 9, 2020
caa7535
update tracer
foreverneverer Sep 9, 2020
bb14fe4
update tracer
foreverneverer Sep 9, 2020
b9776e5
update tracer
foreverneverer Sep 9, 2020
147b595
update tracer
foreverneverer Sep 9, 2020
fd7cf38
update tracer
foreverneverer Sep 10, 2020
298942b
merge
foreverneverer Sep 10, 2020
1bc4dd7
add comment
foreverneverer Sep 10, 2020
31c30f9
add comment
foreverneverer Sep 10, 2020
9db31ba
Merge branch 'fix-tracer' into newaio-with-tracer
foreverneverer Sep 10, 2020
32fcef1
add comment
foreverneverer Sep 10, 2020
c1f33f1
add comment
foreverneverer Sep 11, 2020
5656d64
add comment
foreverneverer Sep 11, 2020
fb55339
add comment
foreverneverer Sep 11, 2020
676bd85
add comment
foreverneverer Sep 11, 2020
aabab6a
add comment
foreverneverer Sep 11, 2020
6f6d221
add comment
foreverneverer Sep 11, 2020
7d32244
add comment
foreverneverer Sep 11, 2020
9e7d3fc
add perfcounter
foreverneverer Sep 11, 2020
1718381
add perfcounter
foreverneverer Sep 11, 2020
eec5dcc
add perfcounter
foreverneverer Sep 11, 2020
6297287
add max
foreverneverer Sep 12, 2020
e384c78
update
foreverneverer Sep 12, 2020
c1fe66c
add debug info
foreverneverer Sep 14, 2020
b804b72
add debug info
foreverneverer Sep 14, 2020
c88a1be
add debug info
foreverneverer Sep 14, 2020
026301f
add debug info
foreverneverer Sep 14, 2020
db66a2c
add debug info
foreverneverer Sep 14, 2020
eb30ebf
split all aio task
foreverneverer Sep 14, 2020
03934e6
dequeue one
foreverneverer Sep 16, 2020
ee78661
init
foreverneverer Sep 23, 2020
72dcbe6
init
foreverneverer Sep 23, 2020
b388192
init
foreverneverer Sep 23, 2020
72ae740
init
foreverneverer Sep 23, 2020
a417ff4
init
foreverneverer Sep 23, 2020
542238e
init
foreverneverer Sep 23, 2020
5abfec5
init
foreverneverer Sep 23, 2020
6e37a6d
init but not compile
foreverneverer Sep 23, 2020
1b2568d
ci ok
foreverneverer Sep 23, 2020
8b937dc
ci ok
foreverneverer Sep 23, 2020
24a960a
add_tracer
foreverneverer Sep 24, 2020
5b07031
refactor io queue
foreverneverer Sep 27, 2020
30e5460
refactor io queue
foreverneverer Sep 27, 2020
b5bd3f8
refactor io queue
foreverneverer Sep 27, 2020
a0cbabe
refactor io queue
foreverneverer Sep 27, 2020
65b68c1
refactor io queue
foreverneverer Sep 27, 2020
572d7da
merge master
foreverneverer Sep 27, 2020
1adcb92
delete tracer
foreverneverer Sep 27, 2020
27b58a1
delete tracer
foreverneverer Sep 27, 2020
cc9456b
delete tracer
foreverneverer Sep 27, 2020
da40281
delete tracer
foreverneverer Sep 27, 2020
b0a1f3e
Merge branch 'master' into newaio-with-tracer
foreverneverer Sep 28, 2020
9800ce8
Merge branch 'newaio-with-tracer' of github.com:Shuo-Jia/rdsn into ne…
foreverneverer Sep 28, 2020
dde5c0a
delete tracer
foreverneverer Sep 28, 2020
c936764
fix compile error
foreverneverer Sep 28, 2020
0e7aabc
fix compile error
foreverneverer Sep 28, 2020
1805766
fix compile error
foreverneverer Sep 29, 2020
1c5570a
fix call back exec in replication pool
foreverneverer Oct 10, 2020
2d8e420
move plog pool into rep_long
foreverneverer Oct 10, 2020
0d818d6
merge task queue
foreverneverer Oct 12, 2020
366f18f
merge task queue
foreverneverer Oct 12, 2020
f6d3fcf
merge task queue
foreverneverer Oct 12, 2020
5ec2054
merge task queue
foreverneverer Oct 12, 2020
acb4363
merge task queue
foreverneverer Oct 12, 2020
a677af4
merge task queue
foreverneverer Oct 12, 2020
4caad55
Merge branch 'master' into newaio-with-tracer
foreverneverer Oct 12, 2020
e7977ac
Merge branch 'master' into newaio-with-tracer
hycdong Oct 13, 2020
e46d657
Merge branch 'master' into newaio-with-tracer
levy5307 Oct 15, 2020
d0bc147
review
foreverneverer Oct 19, 2020
1e18d05
Merge branch 'newaio-with-tracer' of github.com:Shuo-Jia/rdsn into ne…
foreverneverer Oct 19, 2020
5cf7373
review
foreverneverer Oct 19, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions include/dsn/dist/replication/replication.codes.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ MAKE_EVENT_CODE_RPC(RPC_TEST_AGENT_WRITE, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_TEST_AGENT_READ, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_AIO(LPC_AIO_TEST, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_AIO(LPC_AIO_IMMEDIATE_CALLBACK, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_AIO(LPC_WRITE_REPLICATION_LOG_COMMON, TASK_PRIORITY_HIGH)
MAKE_EVENT_CODE_AIO(LPC_WRITE_REPLICATION_LOG_PRIVATE, TASK_PRIORITY_HIGH)
MAKE_EVENT_CODE_AIO(LPC_WRITE_REPLICATION_LOG_COMMON, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_QUERY_CONFIGURATION_ALL, TASK_PRIORITY_HIGH)
MAKE_EVENT_CODE(LPC_MEM_RELEASE, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_CREATE_CHILD, TASK_PRIORITY_COMMON)
Expand Down Expand Up @@ -183,6 +182,7 @@ MAKE_EVENT_CODE(LPC_BACKGROUND_BULK_LOAD, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_REPLICATION_LONG_LOW, TASK_PRIORITY_LOW)
MAKE_EVENT_CODE(LPC_REPLICATION_LONG_COMMON, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_REPLICATION_LONG_HIGH, TASK_PRIORITY_HIGH)
MAKE_EVENT_CODE_AIO(LPC_WRITE_REPLICATION_LOG_PRIVATE, TASK_PRIORITY_COMMON)
#undef CURRENT_THREAD_POOL

#define CURRENT_THREAD_POOL THREAD_POOL_SLOG
Expand Down
6 changes: 3 additions & 3 deletions src/aio/aio_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ class aio_provider

virtual error_code close(dsn_handle_t fh) = 0;
virtual error_code flush(dsn_handle_t fh) = 0;
virtual error_code write(aio_context *aio_ctx, uint32_t *processed_bytes) = 0;
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
virtual error_code read(aio_context *aio_ctx, uint32_t *processed_bytes) = 0;

// Submits the aio_task to the underlying disk-io executor.
// This task may not be executed immediately, call `aio_task::wait`
Expand All @@ -69,9 +71,7 @@ class aio_provider

virtual aio_context *prepare_aio_context(aio_task *) = 0;

protected:
DSN_API void
complete_io(aio_task *aio, error_code err, uint32_t bytes, int delay_milliseconds = 0);
void complete_io(aio_task *aio, error_code err, uint32_t bytes, int delay_milliseconds = 0);

private:
disk_engine *_engine;
Expand Down
1 change: 1 addition & 0 deletions src/aio/disk_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include "aio_provider.h"

#include <dsn/tool_api.h>
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
#include <dsn/utility/synchronize.h>
#include <dsn/utility/work_queue.h>

Expand Down
184 changes: 45 additions & 139 deletions src/aio/native_linux_aio_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,36 +26,13 @@

#include "native_linux_aio_provider.h"

#include <fcntl.h>
#include <cstdlib>
#include <dsn/tool-api/async_calls.h>

namespace dsn {

native_linux_aio_provider::native_linux_aio_provider(disk_engine *disk) : aio_provider(disk)
{
memset(&_ctx, 0, sizeof(_ctx));
auto ret = io_setup(128, &_ctx); // 128 concurrent events
dassert(ret == 0, "io_setup error, ret = %d", ret);

_is_running = true;
_worker = std::thread([this]() {
task::set_tls_dsn_context(node(), nullptr);
get_event();
});
}

native_linux_aio_provider::~native_linux_aio_provider()
{
if (!_is_running) {
return;
}
_is_running = false;

auto ret = io_destroy(_ctx);
dassert(ret == 0, "io_destroy error, ret = %d", ret);
native_linux_aio_provider::native_linux_aio_provider(disk_engine *disk) : aio_provider(disk) {}

_worker.join();
}
native_linux_aio_provider::~native_linux_aio_provider() {}

dsn_handle_t native_linux_aio_provider::open(const char *file_name, int flag, int pmode)
{
Expand Down Expand Up @@ -86,144 +63,73 @@ error_code native_linux_aio_provider::flush(dsn_handle_t fh)
}
}

aio_context *native_linux_aio_provider::prepare_aio_context(aio_task *tsk)
error_code native_linux_aio_provider::write(aio_context *aio_ctx, uint32_t *processed_bytes)
{
return new linux_disk_aio_context(tsk);
ssize_t ret = pwrite(static_cast<int>((ssize_t)aio_ctx->file),
aio_ctx->buffer,
aio_ctx->buffer_size,
aio_ctx->file_offset);
if (ret < 0) {
return ERR_FILE_OPERATION_FAILED;
}
*processed_bytes = static_cast<uint32_t>(ret);
return ERR_OK;
}

void native_linux_aio_provider::submit_aio_task(aio_task *aio_tsk) { aio_internal(aio_tsk, true); }

void native_linux_aio_provider::get_event()
error_code native_linux_aio_provider::read(aio_context *aio_ctx, uint32_t *processed_bytes)
{
struct io_event events[1];
int ret;

task::set_tls_dsn_context(node(), nullptr);

const char *name = ::dsn::tools::get_service_node_name(node());
char buffer[128];
sprintf(buffer, "%s.aio", name);
task_worker::set_name(buffer);

while (true) {
if (dsn_unlikely(!_is_running.load(std::memory_order_relaxed))) {
break;
}
ret = io_getevents(_ctx, 1, 1, events, NULL);
if (ret > 0) // should be 1
{
dassert(ret == 1, "io_getevents returns %d", ret);
struct iocb *io = events[0].obj;
complete_aio(io, static_cast<int>(events[0].res), static_cast<int>(events[0].res2));
} else {
// on error it returns a negated error number (the negative of one of the values listed
// in ERRORS
dwarn("io_getevents returns %d, you probably want to try on another machine:-(", ret);
}
ssize_t ret = pread(static_cast<int>((ssize_t)aio_ctx->file),
aio_ctx->buffer,
aio_ctx->buffer_size,
aio_ctx->file_offset);
if (ret < 0) {
return ERR_FILE_OPERATION_FAILED;
}
if (ret == 0) {
return ERR_HANDLE_EOF;
}
*processed_bytes = static_cast<uint32_t>(ret);
return ERR_OK;
}

void native_linux_aio_provider::complete_aio(struct iocb *io, int bytes, int err)
void native_linux_aio_provider::submit_aio_task(aio_task *aio_tsk)
{
linux_disk_aio_context *aio = CONTAINING_RECORD(io, linux_disk_aio_context, cb);
error_code ec;
if (err != 0) {
derror("aio error, err = %s", strerror(err));
ec = ERR_FILE_OPERATION_FAILED;
} else {
ec = bytes > 0 ? ERR_OK : ERR_HANDLE_EOF;
}

if (!aio->evt) {
aio_task *aio_ptr(aio->tsk);
aio->this_->complete_io(aio_ptr, ec, bytes);
} else {
aio->err = ec;
aio->bytes = bytes;
aio->evt->notify();
}
tasking::enqueue(aio_tsk->code(),
aio_tsk->tracker(),
[=]() { aio_internal(aio_tsk, true); },
aio_tsk->hash());
}

error_code native_linux_aio_provider::aio_internal(aio_task *aio_tsk,
bool async,
/*out*/ uint32_t *pbytes /*= nullptr*/)
{
struct iocb *cbs[1];
linux_disk_aio_context *aio;
int ret;

aio = (linux_disk_aio_context *)aio_tsk->get_aio_context();

memset(&aio->cb, 0, sizeof(aio->cb));

aio->this_ = this;

switch (aio->type) {
aio_context *aio_ctx = aio_tsk->get_aio_context();
error_code err = ERR_UNKNOWN;
uint32_t processed_bytes = 0;
switch (aio_ctx->type) {
case AIO_Read:
io_prep_pread(&aio->cb,
static_cast<int>((ssize_t)aio->file),
aio->buffer,
aio->buffer_size,
aio->file_offset);
err = read(aio_ctx, &processed_bytes);
break;
case AIO_Write:
if (aio->buffer) {
io_prep_pwrite(&aio->cb,
static_cast<int>((ssize_t)aio->file),
aio->buffer,
aio->buffer_size,
aio->file_offset);
} else {
int iovcnt = aio->write_buffer_vec->size();
struct iovec *iov = (struct iovec *)alloca(sizeof(struct iovec) * iovcnt);
for (int i = 0; i < iovcnt; i++) {
const dsn_file_buffer_t &buf = aio->write_buffer_vec->at(i);
iov[i].iov_base = buf.buffer;
iov[i].iov_len = buf.size;
}
io_prep_pwritev(
&aio->cb, static_cast<int>((ssize_t)aio->file), iov, iovcnt, aio->file_offset);
}
err = write(aio_ctx, &processed_bytes);
break;
default:
derror("unknown aio type %u", static_cast<int>(aio->type));
return err;
}

if (!async) {
aio->evt = new utils::notify_event();
aio->err = ERR_OK;
aio->bytes = 0;
if (pbytes) {
*pbytes = processed_bytes;
}

cbs[0] = &aio->cb;
ret = io_submit(_ctx, 1, cbs);

if (ret != 1) {
if (ret < 0)
derror("io_submit error, ret = %d", ret);
else
derror("could not sumbit IOs, ret = %d", ret);

if (async) {
complete_io(aio_tsk, ERR_FILE_OPERATION_FAILED, 0);
} else {
delete aio->evt;
aio->evt = nullptr;
}
return ERR_FILE_OPERATION_FAILED;
if (async) {
complete_io(aio_tsk, err, processed_bytes);
} else {
if (async) {
return ERR_IO_PENDING;
} else {
aio->evt->wait();
delete aio->evt;
aio->evt = nullptr;
if (pbytes != nullptr) {
*pbytes = aio->bytes;
}
return aio->err;
}
utils::notify_event notify;
notify.notify();
}

return err;
}

} // namespace dsn
38 changes: 4 additions & 34 deletions src/aio/native_linux_aio_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,6 @@

#include "aio_provider.h"

#include <dsn/tool_api.h>
#include <dsn/utility/synchronize.h>
#include <queue>
#include <stdio.h> /* for perror() */
#include <sys/syscall.h> /* for __NR_* definitions */
#include <libaio.h>
#include <fcntl.h> /* O_RDWR */
#include <string.h> /* memset() */
#include <inttypes.h> /* uint64_t */

namespace dsn {

class native_linux_aio_provider : public aio_provider
Expand All @@ -49,34 +39,14 @@ class native_linux_aio_provider : public aio_provider
dsn_handle_t open(const char *file_name, int flag, int pmode) override;
error_code close(dsn_handle_t fh) override;
error_code flush(dsn_handle_t fh) override;
void submit_aio_task(aio_task *aio) override;
aio_context *prepare_aio_context(aio_task *tsk) override;

class linux_disk_aio_context : public aio_context
{
public:
struct iocb cb;
aio_task *tsk;
native_linux_aio_provider *this_;
utils::notify_event *evt;
error_code err;
uint32_t bytes;
error_code write(aio_context *aio_ctx, uint32_t *processed_bytes) override;
error_code read(aio_context *aio_ctx, uint32_t *processed_bytes) override;

explicit linux_disk_aio_context(aio_task *tsk_)
: tsk(tsk_), this_(nullptr), evt(nullptr), err(ERR_UNKNOWN), bytes(0)
{
}
};
void submit_aio_task(aio_task *aio) override;
aio_context *prepare_aio_context(aio_task *tsk) override { return new aio_context; }

protected:
error_code aio_internal(aio_task *aio, bool async, /*out*/ uint32_t *pbytes = nullptr);
void complete_aio(struct iocb *io, int bytes, int err);
void get_event();

private:
io_context_t _ctx;
std::atomic<bool> _is_running{false};
std::thread _worker;
};

} // namespace dsn
2 changes: 1 addition & 1 deletion src/nfs/nfs_client_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ void nfs_client_impl::end_get_file_size(::dsn::error_code err,
_copy_requests_low.push(std::move(copy_requests));
}

continue_copy();
tasking::enqueue(LPC_NFS_COPY_FILE, nullptr, [this]() { continue_copy(); }, 0);
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
}

void nfs_client_impl::continue_copy()
Expand Down
4 changes: 2 additions & 2 deletions src/replica/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,6 @@ void replica::execute_mutation(mutation_ptr &mu)
name(),
mu->name(),
static_cast<int>(mu->client_requests.size()));
ADD_POINT(mu->tracer);

error_code err = ERR_OK;
decree d = mu->data.header.decree;
Expand All @@ -244,6 +243,7 @@ void replica::execute_mutation(mutation_ptr &mu)
}
break;
case partition_status::PS_PRIMARY: {
ADD_POINT(mu->tracer);
check_state_completeness();
dassert(_app->last_committed_decree() + 1 == d,
"app commit: %" PRId64 ", mutation decree: %" PRId64 "",
Expand Down Expand Up @@ -308,6 +308,7 @@ void replica::execute_mutation(mutation_ptr &mu)
}

if (status() == partition_status::PS_PRIMARY) {
ADD_CUSTOM_POINT(mu->tracer, "completed");
mutation_ptr next = _primary_states.write_queue.check_possible_work(
static_cast<int>(_prepare_list->max_decree() - d));

Expand All @@ -317,7 +318,6 @@ void replica::execute_mutation(mutation_ptr &mu)
}

// update table level latency perf-counters for primary partition
ADD_CUSTOM_POINT(mu->tracer, "completed");
if (partition_status::PS_PRIMARY == status()) {
uint64_t now_ns = dsn_now_ns();
for (auto update : mu->data.updates) {
Expand Down
4 changes: 3 additions & 1 deletion src/replica/replication_app_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,9 @@ ::dsn::error_code replication_app_base::apply_mutation(const mutation *mu)
(int)mu->client_requests.size());
dassert(mu->data.updates.size() > 0, "");

ADD_POINT(mu->tracer);
if (_replica->status() == partition_status::PS_PRIMARY) {
ADD_POINT(mu->tracer);
}

bool has_ingestion_request = false;
int request_count = static_cast<int>(mu->client_requests.size());
Expand Down
1 change: 1 addition & 0 deletions src/replica/test/log_file_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class log_file_test : public replica_test_base
public:
void SetUp() override
{
utils::filesystem::remove_path(_log_dir);
utils::filesystem::create_directory(_log_dir);
_logf = log_file::create_write(_log_dir.c_str(), 1, _start_offset);
}
Expand Down