Skip to content

Commit

Permalink
fix: use fsync() to prevent .app-info/.init-info lost risk on XFS aft…
Browse files Browse the repository at this point in the history
…er power outage (apache#1017)
  • Loading branch information
acelyc111 authored Jan 12, 2022
1 parent 2987691 commit ac8f8de
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 53 deletions.
8 changes: 4 additions & 4 deletions include/dsn/dist/replication/replication_app_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ class replica_init_info
std::string to_string();

private:
error_code load_json(const char *file);
error_code store_json(const char *file);
error_code load_json(const std::string &file);
error_code store_json(const std::string &file);
};

class replica_app_info
Expand All @@ -74,8 +74,8 @@ class replica_app_info

public:
replica_app_info(app_info *app) { _app = app; }
error_code load(const char *file);
error_code store(const char *file);
error_code load(const std::string &file);
error_code store(const std::string &file);
};

/// The store engine interface of Pegasus.
Expand Down
2 changes: 1 addition & 1 deletion src/replica/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ error_code replica::store_app_info(app_info &info, const std::string &path)
{
replica_app_info new_info((app_info *)&info);
const auto &info_path = path.empty() ? utils::filesystem::path_combine(_dir, kAppInfo) : path;
auto err = new_info.store(info_path.c_str());
auto err = new_info.store(info_path);
if (dsn_unlikely(err != ERR_OK)) {
derror_replica("failed to save app_info to {}, error = {}", info_path, err);
}
Expand Down
2 changes: 1 addition & 1 deletion src/replica/replica_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ error_code replica::initialize_on_load()
dsn::app_info info;
replica_app_info info2(&info);
std::string path = utils::filesystem::path_combine(dir, kAppInfo);
auto err = info2.load(path.c_str());
auto err = info2.load(path);
if (ERR_OK != err) {
derror("load app-info from %s failed, err = %s", path.c_str(), err.to_string());
return nullptr;
Expand Down
99 changes: 52 additions & 47 deletions src/replica/replication_app_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <dsn/utils/latency_tracer.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/dist/replication/replication_app_base.h>
#include <dsn/utility/defer.h>
#include <dsn/utility/factory_store.h>
#include <dsn/utility/filesystem.h>
#include <dsn/utility/crc.h>
Expand All @@ -45,12 +46,53 @@ namespace replication {

const std::string replica_init_info::kInitInfo = ".init-info";

DEFINE_TASK_CODE_AIO(LPC_AIO_INFO_WRITE, TASK_PRIORITY_COMMON, THREAD_POOL_DEFAULT)

namespace {
error_code write_blob_to_file(const std::string &file, const blob &data)
{
std::string tmp_file = file + ".tmp";
disk_file *hfile = file::open(tmp_file.c_str(), O_WRONLY | O_CREAT | O_BINARY | O_TRUNC, 0666);
ERR_LOG_AND_RETURN_NOT_TRUE(hfile, ERR_FILE_OPERATION_FAILED, "open file {} failed", tmp_file);
auto cleanup = defer([tmp_file]() { utils::filesystem::remove_path(tmp_file); });

error_code err;
size_t sz = 0;
task_tracker tracker;
aio_task_ptr tsk = file::write(hfile,
data.data(),
data.length(),
0,
LPC_AIO_INFO_WRITE,
&tracker,
[&err, &sz](error_code e, size_t s) {
err = e;
sz = s;
},
0);
dassert_f(tsk, "create file::write task failed");
tracker.wait_outstanding_tasks();
file::flush(hfile);
file::close(hfile);
ERR_LOG_AND_RETURN_NOT_OK(err, "write file {} failed", tmp_file);
dcheck_eq(data.length(), sz);
// TODO(yingchun): need fsync too?
ERR_LOG_AND_RETURN_NOT_TRUE(utils::filesystem::rename_path(tmp_file, file),
ERR_FILE_OPERATION_FAILED,
"move file from {} to {} failed",
tmp_file,
file);

return ERR_OK;
}
} // namespace

error_code replica_init_info::load(const std::string &dir)
{
std::string info_path = utils::filesystem::path_combine(dir, kInitInfo);
dassert_f(utils::filesystem::path_exists(info_path), "file({}) not exist", info_path);
ERR_LOG_AND_RETURN_NOT_OK(
load_json(info_path.c_str()), "load replica_init_info from {} failed", info_path);
load_json(info_path), "load replica_init_info from {} failed", info_path);
ddebug_f("load replica_init_info from {} succeed: {}", info_path, to_string());
return ERR_OK;
}
Expand All @@ -59,7 +101,7 @@ error_code replica_init_info::store(const std::string &dir)
{
uint64_t start = dsn_now_ns();
std::string info_path = utils::filesystem::path_combine(dir, kInitInfo);
ERR_LOG_AND_RETURN_NOT_OK(store_json(info_path.c_str()),
ERR_LOG_AND_RETURN_NOT_OK(store_json(info_path),
"store replica_init_info to {} failed, time_used_ns = {}",
info_path,
dsn_now_ns() - start);
Expand All @@ -70,7 +112,7 @@ error_code replica_init_info::store(const std::string &dir)
return ERR_OK;
}

error_code replica_init_info::load_json(const char *file)
error_code replica_init_info::load_json(const std::string &file)
{
std::ifstream is(file, std::ios::binary);
ERR_LOG_AND_RETURN_NOT_TRUE(
Expand All @@ -96,41 +138,22 @@ error_code replica_init_info::load_json(const char *file)
return ERR_OK;
}

error_code replica_init_info::store_json(const char *file)
error_code replica_init_info::store_json(const std::string &file)
{
std::string ffile(file);
std::string tmp_file = ffile + ".tmp";

std::ofstream os(tmp_file.c_str(),
(std::ofstream::out | std::ios::binary | std::ofstream::trunc));
ERR_LOG_AND_RETURN_NOT_TRUE(
os.is_open(), ERR_FILE_OPERATION_FAILED, "open file {} failed", tmp_file);

dsn::blob bb = dsn::json::json_forwarder<replica_init_info>::encode(*this);
os.write((const char *)bb.data(), (std::streamsize)bb.length());
ERR_LOG_AND_RETURN_NOT_TRUE(
!os.bad(), ERR_FILE_OPERATION_FAILED, "write file {} failed", tmp_file);
os.close();

ERR_LOG_AND_RETURN_NOT_TRUE(utils::filesystem::rename_path(tmp_file, ffile),
ERR_FILE_OPERATION_FAILED,
"move file from {} to {} failed",
tmp_file,
ffile);

return ERR_OK;
return write_blob_to_file(file, json::json_forwarder<replica_init_info>::encode(*this));
}

std::string replica_init_info::to_string()
{
// TODO(yingchun): use fmt instead
std::ostringstream oss;
oss << "init_ballot = " << init_ballot << ", init_durable_decree = " << init_durable_decree
<< ", init_offset_in_shared_log = " << init_offset_in_shared_log
<< ", init_offset_in_private_log = " << init_offset_in_private_log;
return oss.str();
}

error_code replica_app_info::load(const char *file)
error_code replica_app_info::load(const std::string &file)
{
std::ifstream is(file, std::ios::binary);
ERR_LOG_AND_RETURN_NOT_TRUE(
Expand All @@ -157,7 +180,7 @@ error_code replica_app_info::load(const char *file)
return ERR_OK;
}

error_code replica_app_info::store(const char *file)
error_code replica_app_info::store(const std::string &file)
{
binary_writer writer;
int magic = 0xdeadbeef;
Expand All @@ -177,24 +200,7 @@ error_code replica_app_info::store(const char *file)
marshall(writer, tmp, DSF_THRIFT_JSON);
}

std::string ffile = std::string(file);
std::string tmp_file = ffile + ".tmp";

std::ofstream os(tmp_file.c_str(),
(std::ofstream::out | std::ios::binary | std::ofstream::trunc));
ERR_LOG_AND_RETURN_NOT_TRUE(
os.is_open(), ERR_FILE_OPERATION_FAILED, "open file {} failed", tmp_file);

auto data = writer.get_buffer();
os.write((const char *)data.data(), (std::streamsize)data.length());
os.close();

ERR_LOG_AND_RETURN_NOT_TRUE(utils::filesystem::rename_path(tmp_file, ffile),
ERR_FILE_OPERATION_FAILED,
"move file from {} to {} failed",
tmp_file,
ffile);
return ERR_OK;
return write_blob_to_file(file, writer.get_buffer());
}

/*static*/
Expand Down Expand Up @@ -268,12 +274,10 @@ error_code replication_app_base::open_new_internal(replica *r,
_dir_data);

ERR_LOG_AND_RETURN_NOT_OK(open(), "[{}]: open replica app failed", r->name());

_last_committed_decree = last_durable_decree();
ERR_LOG_AND_RETURN_NOT_OK(update_init_info(_replica, shared_log_start, private_log_start, 0),
"[{}]: open replica app failed",
r->name());

return ERR_OK;
}

Expand Down Expand Up @@ -334,6 +338,7 @@ int replication_app_base::on_batched_write_requests(int64_t decree,
{
int storage_error = 0;
for (int i = 0; i < request_length; ++i) {
// TODO(yingchun): better to return error_code
int e = on_request(requests[i]);
if (e != 0) {
derror_replica("got storage error when handler request({})",
Expand Down

0 comments on commit ac8f8de

Please sign in to comment.