Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

meta_backup_service: fix incorrect usage of std::chrono::seconds #5

Merged
merged 1 commit into from
Apr 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 86 additions & 97 deletions src/dist/replication/meta_server/meta_backup_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,14 @@ void policy_context::start_backup_app_meta_unlocked(int32_t app_id)
derror("%s: create file %s failed, restart this backup later",
_backup_sig.c_str(),
create_file_req.file_name.c_str());
tasking::enqueue(
LPC_DEFAULT_CALLBACK,
nullptr,
[this, app_id]() {
zauto_lock l(_lock);
start_backup_app_meta_unlocked(app_id);
},
0,
std::chrono::milliseconds(_backup_service->backup_option().block_retry_delay_ms));
tasking::enqueue(LPC_DEFAULT_CALLBACK,
nullptr,
[this, app_id]() {
zauto_lock l(_lock);
start_backup_app_meta_unlocked(app_id);
},
0,
_backup_service->backup_option().block_retry_delay_ms);
return;
}
dassert(remote_file != nullptr,
Expand Down Expand Up @@ -113,8 +112,7 @@ void policy_context::start_backup_app_meta_unlocked(int32_t app_id)
start_backup_app_meta_unlocked(app_id);
},
0,
std::chrono::milliseconds(
_backup_service->backup_option().block_retry_delay_ms));
_backup_service->backup_option().block_retry_delay_ms);
}
},
nullptr);
Expand Down Expand Up @@ -175,15 +173,14 @@ void policy_context::write_backup_app_finish_flag_unlocked(int32_t app_id,
derror("%s: create file %s failed, restart this backup later",
_backup_sig.c_str(),
create_file_req.file_name.c_str());
tasking::enqueue(
LPC_DEFAULT_CALLBACK,
nullptr,
[this, app_id, write_callback]() {
zauto_lock l(_lock);
write_backup_app_finish_flag_unlocked(app_id, write_callback);
},
0,
std::chrono::milliseconds(_backup_service->backup_option().block_retry_delay_ms));
tasking::enqueue(LPC_DEFAULT_CALLBACK,
nullptr,
[this, app_id, write_callback]() {
zauto_lock l(_lock);
write_backup_app_finish_flag_unlocked(app_id, write_callback);
},
0,
_backup_service->backup_option().block_retry_delay_ms);
return;
}

Expand Down Expand Up @@ -225,8 +222,7 @@ void policy_context::write_backup_app_finish_flag_unlocked(int32_t app_id,
write_backup_app_finish_flag_unlocked(app_id, write_callback);
},
0,
std::chrono::milliseconds(
_backup_service->backup_option().block_retry_delay_ms));
_backup_service->backup_option().block_retry_delay_ms);
}
});
}
Expand Down Expand Up @@ -289,15 +285,14 @@ void policy_context::write_backup_info_unlocked(const backup_info &b_info,
derror("%s: create file %s failed, restart this backup later",
_backup_sig.c_str(),
create_file_req.file_name.c_str());
tasking::enqueue(
LPC_DEFAULT_CALLBACK,
nullptr,
[this, b_info, write_callback]() {
zauto_lock l(_lock);
write_backup_info_unlocked(b_info, write_callback);
},
0,
std::chrono::milliseconds(_backup_service->backup_option().block_retry_delay_ms));
tasking::enqueue(LPC_DEFAULT_CALLBACK,
nullptr,
[this, b_info, write_callback]() {
zauto_lock l(_lock);
write_backup_info_unlocked(b_info, write_callback);
},
0,
_backup_service->backup_option().block_retry_delay_ms);
return;
}

Expand Down Expand Up @@ -330,8 +325,7 @@ void policy_context::write_backup_info_unlocked(const backup_info &b_info,
write_backup_info_unlocked(b_info, write_callback);
},
0,
std::chrono::milliseconds(
_backup_service->backup_option().block_retry_delay_ms));
_backup_service->backup_option().block_retry_delay_ms);
}
});
}
Expand Down Expand Up @@ -433,8 +427,7 @@ void policy_context::start_backup_partition_unlocked(gpid pid)
start_backup_partition_unlocked(pid);
},
0,
std::chrono::milliseconds(
_backup_service->backup_option().reconfiguration_retry_delay_ms));
_backup_service->backup_option().reconfiguration_retry_delay_ms);
} else {
backup_request req;
req.pid = pid;
Expand Down Expand Up @@ -519,15 +512,14 @@ void policy_context::on_backup_reply(error_code err,
}

// start another turn of backup no matter we encounter error or not finished
tasking::enqueue(
LPC_DEFAULT_CALLBACK,
nullptr,
[this, pid]() {
zauto_lock l(_lock);
start_backup_partition_unlocked(pid);
},
0,
std::chrono::milliseconds(_backup_service->backup_option().request_backup_period_ms));
tasking::enqueue(LPC_DEFAULT_CALLBACK,
nullptr,
[this, pid]() {
zauto_lock l(_lock);
start_backup_partition_unlocked(pid);
},
0,
_backup_service->backup_option().request_backup_period_ms);
}

void policy_context::initialize_backup_progress_unlocked()
Expand Down Expand Up @@ -601,16 +593,15 @@ void policy_context::sync_backup_to_remote_storage_unlocked(const backup_info &b
derror("%s: sync backup info(" PRId64 ") to remote storage got timeout, retry it later",
_policy.policy_name.c_str(),
b_info.backup_id);
tasking::enqueue(
LPC_DEFAULT_CALLBACK,
nullptr,
[this, b_info, sync_callback, create_new_node]() {
zauto_lock l(_lock);
sync_backup_to_remote_storage_unlocked(
std::move(b_info), std::move(sync_callback), create_new_node);
},
0,
std::chrono::milliseconds(_backup_service->backup_option().meta_retry_delay_ms));
tasking::enqueue(LPC_DEFAULT_CALLBACK,
nullptr,
[this, b_info, sync_callback, create_new_node]() {
zauto_lock l(_lock);
sync_backup_to_remote_storage_unlocked(
std::move(b_info), std::move(sync_callback), create_new_node);
},
0,
_backup_service->backup_option().meta_retry_delay_ms);
} else {
dassert(false,
"%s: we can't handle this right now, error(%s)",
Expand Down Expand Up @@ -705,31 +696,29 @@ void policy_context::issue_new_backup_unlocked()
if (_policy.is_disable) {
ddebug("%s: policy is disable, just ignore backup, try it later",
_policy.policy_name.c_str());
tasking::enqueue(
LPC_DEFAULT_CALLBACK,
nullptr,
[this]() {
zauto_lock l(_lock);
issue_new_backup_unlocked();
},
0,
std::chrono::milliseconds(_backup_service->backup_option().issue_backup_interval_ms));
tasking::enqueue(LPC_DEFAULT_CALLBACK,
nullptr,
[this]() {
zauto_lock l(_lock);
issue_new_backup_unlocked();
},
0,
_backup_service->backup_option().issue_backup_interval_ms);
return;
}

if (!should_start_backup_unlocked()) {
tasking::enqueue(
LPC_DEFAULT_CALLBACK,
nullptr,
[this]() {
zauto_lock l(_lock);
issue_new_backup_unlocked();
},
0,
std::chrono::milliseconds(_backup_service->backup_option().issue_backup_interval_ms));
tasking::enqueue(LPC_DEFAULT_CALLBACK,
nullptr,
[this]() {
zauto_lock l(_lock);
issue_new_backup_unlocked();
},
0,
_backup_service->backup_option().issue_backup_interval_ms);
ddebug("%s: start issue new backup %" PRId64 "ms later",
_policy.policy_name.c_str(),
_backup_service->backup_option().issue_backup_interval_ms);
_backup_service->backup_option().issue_backup_interval_ms.count());
return;
}

Expand All @@ -739,15 +728,14 @@ void policy_context::issue_new_backup_unlocked()
// TODO: just ignore this backup and wait next backup
dwarn("%s: all apps have been dropped, ignore this backup and retry it later",
_backup_sig.c_str());
tasking::enqueue(
LPC_DEFAULT_CALLBACK,
nullptr,
[this]() {
zauto_lock l(_lock);
issue_new_backup_unlocked();
},
0,
std::chrono::milliseconds(_backup_service->backup_option().issue_backup_interval_ms));
tasking::enqueue(LPC_DEFAULT_CALLBACK,
nullptr,
[this]() {
zauto_lock l(_lock);
issue_new_backup_unlocked();
},
0,
_backup_service->backup_option().issue_backup_interval_ms);
} else {
task_ptr continue_to_backup = tasking::create_task(LPC_DEFAULT_CALLBACK, nullptr, [this]() {
zauto_lock l(_lock);
Expand Down Expand Up @@ -989,7 +977,7 @@ void policy_context::sync_remove_backup_info(const backup_info &info, dsn::task_
nullptr,
[this, info, sync_callback]() { sync_remove_backup_info(info, sync_callback); },
0,
std::chrono::milliseconds(_backup_service->backup_option().meta_retry_delay_ms));
_backup_service->backup_option().meta_retry_delay_ms);
} else {
dassert(false,
"%s: we can't handle this right now, error(%s)",
Expand All @@ -1013,12 +1001,12 @@ backup_service::backup_service(meta_service *meta_svc,
{
_state = _meta_svc->get_server_state();

_opt.meta_retry_delay_ms = 10000;
_opt.block_retry_delay_ms = 60000;
_opt.app_dropped_retry_delay_ms = 600000;
_opt.reconfiguration_retry_delay_ms = 15000;
_opt.request_backup_period_ms = 10000;
_opt.issue_backup_interval_ms = 300000;
_opt.meta_retry_delay_ms = 10000_ms;
_opt.block_retry_delay_ms = 60000_ms;
_opt.app_dropped_retry_delay_ms = 600000_ms;
_opt.reconfiguration_retry_delay_ms = 15000_ms;
_opt.request_backup_period_ms = 10000_ms;
_opt.issue_backup_interval_ms = 300000_ms;

_in_initialize.store(true);
}
Expand All @@ -1041,7 +1029,7 @@ void backup_service::start_create_policy_meta_root(dsn::task_ptr callback)
nullptr,
std::bind(&backup_service::start_create_policy_meta_root, this, callback),
0,
std::chrono::milliseconds(_opt.meta_retry_delay_ms));
_opt.meta_retry_delay_ms);
} else {
dassert(false, "we can't handle this error(%s) right now", err.to_string());
}
Expand Down Expand Up @@ -1069,7 +1057,7 @@ void backup_service::start_sync_policies()
nullptr,
std::bind(&backup_service::start_sync_policies, this),
0,
std::chrono::milliseconds(_opt.meta_retry_delay_ms));
_opt.meta_retry_delay_ms);
} else {
dassert(false,
"sync policies from remote storage encounter error(%s), we can't handle "
Expand Down Expand Up @@ -1310,13 +1298,14 @@ void backup_service::do_add_policy(dsn_message_t req,
}
p->start();
} else if (err == ERR_TIMEOUT) {
derror("create backup policy on remote storage timeout, retry after %d(ms)",
_opt.meta_retry_delay_ms);
derror("create backup policy on remote storage timeout, retry after %" PRId64
"(ms)",
_opt.meta_retry_delay_ms.count());
tasking::enqueue(LPC_DEFAULT_CALLBACK,
nullptr,
std::bind(&backup_service::do_add_policy, this, req, p, hint_msg),
0,
std::chrono::seconds(_opt.meta_retry_delay_ms));
_opt.meta_retry_delay_ms);
return;
} else {
dassert(false,
Expand Down Expand Up @@ -1344,9 +1333,9 @@ void backup_service::do_update_policy_to_remote_storage(
dsn_msg_release_ref(req);
} else if (err == ERR_TIMEOUT) {
derror("update backup policy to remote storage failed, policy_name = %s, retry "
"after %d(ms)",
"after %" PRId64 "(ms)",
p.policy_name.c_str(),
_opt.meta_retry_delay_ms);
_opt.meta_retry_delay_ms.count());
tasking::enqueue(LPC_DEFAULT_CALLBACK,
nullptr,
std::bind(&backup_service::do_update_policy_to_remote_storage,
Expand All @@ -1355,7 +1344,7 @@ void backup_service::do_update_policy_to_remote_storage(
p,
p_context_ptr),
0,
std::chrono::seconds(_opt.meta_retry_delay_ms));
_opt.meta_retry_delay_ms);
} else {
dassert(false,
"we can't handle this when create backup policy, err(%s)",
Expand Down
12 changes: 6 additions & 6 deletions src/dist/replication/meta_server/meta_backup_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,12 @@ class backup_service
public:
struct backup_opt
{
uint64_t meta_retry_delay_ms;
uint64_t block_retry_delay_ms;
uint64_t app_dropped_retry_delay_ms;
uint64_t reconfiguration_retry_delay_ms;
uint64_t request_backup_period_ms; // period that meta send backup command to replica
uint64_t issue_backup_interval_ms; // interval that meta try to issue a new backup
std::chrono::milliseconds meta_retry_delay_ms;
std::chrono::milliseconds block_retry_delay_ms;
std::chrono::milliseconds app_dropped_retry_delay_ms;
std::chrono::milliseconds reconfiguration_retry_delay_ms;
std::chrono::milliseconds request_backup_period_ms; // period that meta send backup command to replica
std::chrono::milliseconds issue_backup_interval_ms; // interval that meta try to issue a new backup
};

typedef std::function<std::shared_ptr<policy_context>(backup_service *)> policy_factory;
Expand Down
6 changes: 3 additions & 3 deletions src/dist/replication/test/meta_test/unit_test/backup_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,9 @@ void meta_service_test_app::policy_context_test()
server_state *state = s->_state.get();
s->_started = true;
s->_backup_handler = std::make_shared<backup_service>(s.get(), policy_root, ".", nullptr);
s->_backup_handler->backup_option().app_dropped_retry_delay_ms = 500;
s->_backup_handler->backup_option().request_backup_period_ms = 20;
s->_backup_handler->backup_option().issue_backup_interval_ms = 1000;
s->_backup_handler->backup_option().app_dropped_retry_delay_ms = 500_ms;
s->_backup_handler->backup_option().request_backup_period_ms = 20_ms;
s->_backup_handler->backup_option().issue_backup_interval_ms = 1000_ms;
s->_storage
->create_node(
policy_root, dsn::TASK_CODE_EXEC_INLINED, [&ec](dsn::error_code err) { ec = err; })
Expand Down