-
Notifications
You must be signed in to change notification settings - Fork 59
refactor: move primary's learning preparation of cache into another function #368
Changes from 13 commits
738e588
d3937ef
180ad32
59de52d
9c4a51c
02c4a76
7ca8b12
d99ab53
2a623b6
83c1e2d
ce366fb
a353de9
3c38918
0111d51
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -437,62 +437,15 @@ void replica::on_learn(dsn::message_ex *msg, const learn_request &request) | |
response.last_committed_decree = local_committed_decree; | ||
response.err = ERR_OK; | ||
|
||
// set prepare_start_decree when to-be-learn state is covered by prepare list, | ||
// note min_decree can be NOT present in prepare list when list.count == 0 | ||
if (learn_start_decree > _prepare_list->min_decree() || | ||
(learn_start_decree == _prepare_list->min_decree() && _prepare_list->count() > 0)) { | ||
if (learner_state.prepare_start_decree == invalid_decree) { | ||
// start from (last_committed_decree + 1) | ||
learner_state.prepare_start_decree = local_committed_decree + 1; | ||
|
||
cleanup_preparing_mutations(false); | ||
|
||
// the replayed prepare msg needs to be AFTER the learning response msg | ||
// to reduce probability that preparing messages arrive remote early than | ||
// learning response msg. | ||
delayed_replay_prepare_list = true; | ||
|
||
ddebug("%s: on_learn[%016" PRIx64 | ||
"]: learner = %s, set prepare_start_decree = %" PRId64, | ||
name(), | ||
request.signature, | ||
request.learner.to_string(), | ||
local_committed_decree + 1); | ||
} | ||
|
||
response.prepare_start_decree = learner_state.prepare_start_decree; | ||
} else { | ||
learner_state.prepare_start_decree = invalid_decree; | ||
} | ||
|
||
// only learn mutation cache in range of [learn_start_decree, prepare_start_decree), | ||
// in this case, the state on the PS should be contiguous (+ to-be-sent prepare list) | ||
if (response.prepare_start_decree != invalid_decree) { | ||
binary_writer writer; | ||
int count = 0; | ||
for (decree d = learn_start_decree; d < response.prepare_start_decree; d++) { | ||
auto mu = _prepare_list->get_mutation_by_decree(d); | ||
dassert(mu != nullptr, "mutation must not be nullptr, decree = %" PRId64 "", d); | ||
mu->write_to(writer, nullptr); | ||
count++; | ||
} | ||
response.type = learn_type::LT_CACHE; | ||
response.state.meta = writer.get_buffer(); | ||
ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, learn mutation cache succeed, " | ||
"learn_start_decree = %" PRId64 ", prepare_start_decree = %" PRId64 ", " | ||
"learn_mutation_count = %d, learn_data_size = %d", | ||
name(), | ||
request.signature, | ||
request.learner.to_string(), | ||
learn_start_decree, | ||
response.prepare_start_decree, | ||
count, | ||
response.state.meta.length()); | ||
} | ||
|
||
// learn delta state or checkpoint | ||
// in this case, the state on the PS is still incomplete | ||
else { | ||
bool should_learn_cache = prepare_cached_learn_state(request, | ||
learn_start_decree, | ||
local_committed_decree, | ||
learner_state, | ||
response, | ||
delayed_replay_prepare_list); | ||
if (!should_learn_cache) { | ||
if (learn_start_decree > _app->last_durable_decree()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems that the comments are for condition
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think "delta state" stands for log and cache. So it's still reasonable to see this comment stands above L442. I don't know what's the initial intention of this comment. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds reasonable~ I recommend removing comment |
||
ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, choose to learn private logs, " | ||
"because learn_start_decree(%" PRId64 ") > _app->last_durable_decree(%" PRId64 | ||
|
@@ -1004,6 +957,69 @@ void replica::on_learn_reply(error_code err, learn_request &&req, learn_response | |
} | ||
} | ||
|
||
bool replica::prepare_cached_learn_state(const learn_request &request, | ||
decree learn_start_decree, | ||
decree local_committed_decree, | ||
/*out*/ remote_learner_state &learner_state, | ||
/*out*/ learn_response &response, | ||
/*out*/ bool &delayed_replay_prepare_list) | ||
{ | ||
// set prepare_start_decree when to-be-learn state is covered by prepare list, | ||
// note min_decree can be NOT present in prepare list when list.count == 0 | ||
if (learn_start_decree > _prepare_list->min_decree() || | ||
(learn_start_decree == _prepare_list->min_decree() && _prepare_list->count() > 0)) { | ||
if (learner_state.prepare_start_decree == invalid_decree) { | ||
// start from (last_committed_decree + 1) | ||
learner_state.prepare_start_decree = local_committed_decree + 1; | ||
|
||
cleanup_preparing_mutations(false); | ||
|
||
// the replayed prepare msg needs to be AFTER the learning response msg | ||
// to reduce probability that preparing messages arrive remote early than | ||
// learning response msg. | ||
delayed_replay_prepare_list = true; | ||
|
||
ddebug("%s: on_learn[%016" PRIx64 | ||
"]: learner = %s, set prepare_start_decree = %" PRId64, | ||
name(), | ||
request.signature, | ||
request.learner.to_string(), | ||
local_committed_decree + 1); | ||
} | ||
|
||
response.prepare_start_decree = learner_state.prepare_start_decree; | ||
} else { | ||
learner_state.prepare_start_decree = invalid_decree; | ||
} | ||
|
||
// only learn mutation cache in range of [learn_start_decree, prepare_start_decree), | ||
// in this case, the state on the PS should be contiguous (+ to-be-sent prepare list) | ||
if (response.prepare_start_decree != invalid_decree) { | ||
binary_writer writer; | ||
int count = 0; | ||
for (decree d = learn_start_decree; d < response.prepare_start_decree; d++) { | ||
auto mu = _prepare_list->get_mutation_by_decree(d); | ||
dassert(mu != nullptr, "mutation must not be nullptr, decree = %" PRId64 "", d); | ||
mu->write_to(writer, nullptr); | ||
count++; | ||
} | ||
response.type = learn_type::LT_CACHE; | ||
response.state.meta = writer.get_buffer(); | ||
ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, learn mutation cache succeed, " | ||
neverchanje marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"learn_start_decree = %" PRId64 ", prepare_start_decree = %" PRId64 ", " | ||
"learn_mutation_count = %d, learn_data_size = %d", | ||
name(), | ||
request.signature, | ||
request.learner.to_string(), | ||
learn_start_decree, | ||
response.prepare_start_decree, | ||
count, | ||
response.state.meta.length()); | ||
return true; | ||
} | ||
return false; | ||
} | ||
|
||
void replica::on_copy_remote_state_completed(error_code err, | ||
size_t size, | ||
uint64_t copy_start_time, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seem should put https://github.com/XiaoMi/rdsn/pull/368/files#diff-304228a830e17d7be16721c87b13698c7d767c47e74c3947e2d005fc30f099b5R448