Skip to content

Commit

Permalink
refactor: simplify logging in pegasus_server_impl (#503)
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 authored and neverchanje committed Mar 31, 2020
1 parent 99f1087 commit 0f456bb
Showing 1 changed file with 82 additions and 112 deletions.
194 changes: 82 additions & 112 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1772,57 +1772,51 @@ ::dsn::error_code pegasus_server_impl::sync_checkpoint()

int64_t last_durable = last_durable_decree();
int64_t last_commit = last_committed_decree();
dassert(last_durable <= last_commit, "%" PRId64 " VS %" PRId64, last_durable, last_commit);
dcheck_le_replica(last_durable, last_commit);

// case 1: last_durable == last_commit
// no need to do checkpoint
if (last_durable == last_commit) {
ddebug("%s: no need to checkpoint because "
"last_durable_decree = last_committed_decree = %" PRId64,
replica_name(),
last_durable);
ddebug_replica(
"no need to do checkpoint because last_durable_decree = last_committed_decree = {}",
last_durable);
return ::dsn::ERR_OK;
}

// case 2: last_durable < last_commit
// need to do checkpoint
rocksdb::Checkpoint *chkpt_raw = nullptr;
auto status = rocksdb::Checkpoint::Create(_db, &chkpt_raw);
if (!status.ok()) {
derror("%s: create Checkpoint object failed, error = %s",
replica_name(),
status.ToString().c_str());
derror_replica("create Checkpoint object failed, error = {}", status.ToString());
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
std::unique_ptr<rocksdb::Checkpoint> chkpt(chkpt_raw);

auto dir = chkpt_get_dir_name(last_commit);
auto chkpt_dir = ::dsn::utils::filesystem::path_combine(data_dir(), dir);
if (::dsn::utils::filesystem::directory_exists(chkpt_dir)) {
ddebug("%s: checkpoint directory %s already exist, remove it first",
replica_name(),
chkpt_dir.c_str());
if (!::dsn::utils::filesystem::remove_path(chkpt_dir)) {
derror(
"%s: remove old checkpoint directory %s failed", replica_name(), chkpt_dir.c_str());
auto checkpoint_dir = ::dsn::utils::filesystem::path_combine(data_dir(), dir);
if (::dsn::utils::filesystem::directory_exists(checkpoint_dir)) {
ddebug_replica("checkpoint directory {} is already existed, remove it first",
checkpoint_dir);
if (!::dsn::utils::filesystem::remove_path(checkpoint_dir)) {
derror_replica("remove checkpoint directory {} failed", checkpoint_dir);
return ::dsn::ERR_FILE_OPERATION_FAILED;
}
}

// CreateCheckpoint() will always flush memtable firstly.
status = chkpt->CreateCheckpoint(chkpt_dir, 0);
// log_size_for_flush = 0 means always flush memtable before recording the live files
status = chkpt->CreateCheckpoint(checkpoint_dir, 0 /* log_size_for_flush */);
if (!status.ok()) {
// sometimes checkpoint may fail, and try again will succeed
derror("%s: create checkpoint failed, error = %s, try again",
replica_name(),
status.ToString().c_str());
status = chkpt->CreateCheckpoint(chkpt_dir, 0);
derror_replica("CreateCheckpoint failed, error = {}, try again", status.ToString());
// TODO(yingchun): fail and return
status = chkpt->CreateCheckpoint(checkpoint_dir, 0);
}

if (!status.ok()) {
derror(
"%s: create checkpoint failed, error = %s", replica_name(), status.ToString().c_str());
::dsn::utils::filesystem::remove_path(chkpt_dir);
if (!::dsn::utils::filesystem::remove_path(chkpt_dir)) {
derror("%s: remove damaged checkpoint directory %s failed",
replica_name(),
chkpt_dir.c_str());
derror_replica("CreateCheckpoint failed, error = {}", status.ToString());
if (!::dsn::utils::filesystem::remove_path(checkpoint_dir)) {
derror_replica("remove checkpoint directory {} failed", checkpoint_dir);
}
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
Expand All @@ -1839,9 +1833,8 @@ ::dsn::error_code pegasus_server_impl::sync_checkpoint()
set_last_durable_decree(_checkpoints.back());
}

ddebug("%s: sync create checkpoint succeed, last_durable_decree = %" PRId64 "",
replica_name(),
last_durable_decree());
ddebug_replica("sync create checkpoint succeed, last_durable_decree = {}",
last_durable_decree());

gc_checkpoints();

Expand All @@ -1859,113 +1852,98 @@ ::dsn::error_code pegasus_server_impl::async_checkpoint(bool flush_memtable)
int64_t last_flushed = static_cast<int64_t>(_db->GetLastFlushedDecree());
int64_t last_commit = last_committed_decree();

dassert(last_durable <= last_flushed, "%" PRId64 " VS %" PRId64, last_durable, last_flushed);
dassert(last_flushed <= last_commit, "%" PRId64 " VS %" PRId64, last_flushed, last_commit);
dcheck_le_replica(last_durable, last_flushed);
dcheck_le_replica(last_flushed, last_commit);

// case 1: last_durable == last_flushed == last_commit
// no need to do checkpoint
if (last_durable == last_commit) {
ddebug("%s: no need to checkpoint because "
"last_durable_decree = last_committed_decree = %" PRId64,
replica_name(),
last_durable);
dcheck_eq_replica(last_durable, last_flushed);
dcheck_eq_replica(last_flushed, last_commit);
ddebug_replica(
"no need to checkpoint because last_durable_decree = last_committed_decree = {}",
last_durable);
return ::dsn::ERR_OK;
}

// case 2: last_durable == last_flushed < last_commit
// no need to do checkpoint, but need to flush memtable if required
if (last_durable == last_flushed) {
if (flush_memtable) {
// trigger flushing memtable, but not wait
rocksdb::FlushOptions options;
options.wait = false;
auto status = _db->Flush(options);
if (status.ok()) {
ddebug("%s: trigger flushing memtable succeed", replica_name());
return ::dsn::ERR_TRY_AGAIN;
} else {
derror("%s: trigger flushing memtable failed, error = %s",
replica_name(),
status.ToString().c_str());
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
} else {
dcheck_lt_replica(last_flushed, last_commit);
if (!flush_memtable) {
// no flush required
return ::dsn::ERR_OK;
}

// flush required, but not wait
rocksdb::FlushOptions options;
options.wait = false;
auto status = _db->Flush(options);
if (status.ok()) {
ddebug_replica("trigger flushing memtable succeed, status = {}", status.ToString());
return ::dsn::ERR_TRY_AGAIN;
} else {
ddebug_replica("trigger flushing memtable failed, status = {}", status.ToString());
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
}

dassert(last_durable < last_flushed, "%" PRId64 " VS %" PRId64, last_durable, last_flushed);
// case 3: last_durable < last_flushed <= last_commit
// need to do checkpoint
dcheck_lt_replica(last_durable, last_flushed);

char buf[256];
sprintf(buf, "checkpoint.tmp.%" PRIu64 "", dsn_now_us());
std::string tmp_dir = ::dsn::utils::filesystem::path_combine(data_dir(), buf);
std::string tmp_dir = ::dsn::utils::filesystem::path_combine(
data_dir(), std::string("checkpoint.tmp.") + std::to_string(dsn_now_us()));
if (::dsn::utils::filesystem::directory_exists(tmp_dir)) {
ddebug("%s: temporary checkpoint directory %s already exist, remove it first",
replica_name(),
tmp_dir.c_str());
ddebug_replica("temporary checkpoint directory {} is already existed, remove it first",
tmp_dir);
if (!::dsn::utils::filesystem::remove_path(tmp_dir)) {
derror("%s: remove temporary checkpoint directory %s failed",
replica_name(),
tmp_dir.c_str());
derror_replica("remove temporary checkpoint directory {} failed", tmp_dir);
return ::dsn::ERR_FILE_OPERATION_FAILED;
}
}

int64_t checkpoint_decree = 0;
::dsn::error_code err = copy_checkpoint_to_dir_unsafe(tmp_dir.c_str(), &checkpoint_decree);
if (err != ::dsn::ERR_OK) {
derror("%s: call copy_checkpoint_to_dir_unsafe failed with err = %s",
replica_name(),
err.to_string());
derror_replica("copy_checkpoint_to_dir_unsafe failed with err = {}", err.to_string());
return ::dsn::ERR_LOCAL_APP_FAILURE;
}

auto chkpt_dir =
auto checkpoint_dir =
::dsn::utils::filesystem::path_combine(data_dir(), chkpt_get_dir_name(checkpoint_decree));
if (::dsn::utils::filesystem::directory_exists(chkpt_dir)) {
ddebug("%s: checkpoint directory %s already exist, remove it first",
replica_name(),
chkpt_dir.c_str());
if (!::dsn::utils::filesystem::remove_path(chkpt_dir)) {
derror(
"%s: remove old checkpoint directory %s failed", replica_name(), chkpt_dir.c_str());
if (::dsn::utils::filesystem::directory_exists(checkpoint_dir)) {
ddebug_replica("checkpoint directory {} is already existed, remove it first",
checkpoint_dir);
if (!::dsn::utils::filesystem::remove_path(checkpoint_dir)) {
derror_replica("remove old checkpoint directory {} failed", checkpoint_dir);
if (!::dsn::utils::filesystem::remove_path(tmp_dir)) {
derror("%s: remove temporary checkpoint directory %s failed",
replica_name(),
tmp_dir.c_str());
derror_replica("remove temporary checkpoint directory {} failed", tmp_dir);
}
return ::dsn::ERR_FILE_OPERATION_FAILED;
}
}

if (!::dsn::utils::filesystem::rename_path(tmp_dir, chkpt_dir)) {
derror("%s: rename checkpoint directory from %s to %s failed",
replica_name(),
tmp_dir.c_str(),
chkpt_dir.c_str());
if (!::dsn::utils::filesystem::rename_path(tmp_dir, checkpoint_dir)) {
derror_replica("rename checkpoint directory from {} to {} failed", tmp_dir, checkpoint_dir);
if (!::dsn::utils::filesystem::remove_path(tmp_dir)) {
derror("%s: remove temporary checkpoint directory %s failed",
replica_name(),
tmp_dir.c_str());
derror_replica("remove temporary checkpoint directory {} failed", tmp_dir);
}
return ::dsn::ERR_FILE_OPERATION_FAILED;
}

{
::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_checkpoints_lock);
dassert(checkpoint_decree > last_durable_decree(),
"%" PRId64 " VS %" PRId64 "",
checkpoint_decree,
last_durable_decree());
dcheck_gt_replica(checkpoint_decree, last_durable_decree());
if (!_checkpoints.empty()) {
dassert(checkpoint_decree > _checkpoints.back(),
"%" PRId64 " VS %" PRId64 "",
checkpoint_decree,
_checkpoints.back());
dcheck_gt_replica(checkpoint_decree, _checkpoints.back());
}
_checkpoints.push_back(checkpoint_decree);
set_last_durable_decree(_checkpoints.back());
}

ddebug("%s: async create checkpoint succeed, last_durable_decree = %" PRId64 "",
replica_name(),
last_durable_decree());
ddebug_replica("async create checkpoint succeed, last_durable_decree = {}",
last_durable_decree());

gc_checkpoints();

Expand All @@ -1989,41 +1967,33 @@ ::dsn::error_code pegasus_server_impl::copy_checkpoint_to_dir_unsafe(const char
int64_t *checkpoint_decree)
{
rocksdb::Checkpoint *chkpt_raw = nullptr;
rocksdb::Status status = rocksdb::Checkpoint::Create(_db, &chkpt_raw);
auto status = rocksdb::Checkpoint::Create(_db, &chkpt_raw);
if (!status.ok()) {
derror("%s: create Checkpoint object failed, error = %s",
replica_name(),
status.ToString().c_str());
derror_replica("create Checkpoint object failed, error = {}", status.ToString());
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
std::unique_ptr<rocksdb::Checkpoint> chkpt(chkpt_raw);

if (::dsn::utils::filesystem::directory_exists(checkpoint_dir)) {
ddebug("%s: checkpoint directory %s is already exist, remove it first",
replica_name(),
checkpoint_dir);
ddebug_replica("checkpoint directory {} is already existed, remove it first",
checkpoint_dir);
if (!::dsn::utils::filesystem::remove_path(checkpoint_dir)) {
derror("%s: remove checkpoint directory %s failed", replica_name(), checkpoint_dir);
derror_replica("remove checkpoint directory {} failed", checkpoint_dir);
return ::dsn::ERR_FILE_OPERATION_FAILED;
}
}

uint64_t ci = 0;
status = chkpt->CreateCheckpointQuick(checkpoint_dir, &ci);
if (!status.ok()) {
derror("%s: async create checkpoint failed, error = %s",
replica_name(),
status.ToString().c_str());
derror_replica("CreateCheckpoint failed, error = {}", status.ToString());
if (!::dsn::utils::filesystem::remove_path(checkpoint_dir)) {
derror("%s: remove checkpoint directory %s failed", replica_name(), checkpoint_dir);
derror_replica("remove checkpoint directory {} failed", checkpoint_dir);
}
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
ddebug_replica("copy checkpoint to dir({}) succeed, last_decree = {}", checkpoint_dir, ci);

ddebug("%s: copy checkpoint to dir(%s) succeed, last_decree = %" PRId64 "",
replica_name(),
checkpoint_dir,
ci);
if (checkpoint_decree != nullptr) {
*checkpoint_decree = static_cast<int64_t>(ci);
}
Expand Down

0 comments on commit 0f456bb

Please sign in to comment.