Skip to content

Commit

Permalink
feat(bulk-load): bulk load ingestion part5 - replica handle bulk load…
Browse files Browse the repository at this point in the history
… request during ingestion (#496)
  • Loading branch information
hycdong authored Jun 15, 2020
1 parent 5522d9b commit f0efbd2
Show file tree
Hide file tree
Showing 13 changed files with 374 additions and 30 deletions.
18 changes: 16 additions & 2 deletions include/dsn/dist/replication/replication_types.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions src/dist/replication/common/replication_types.cpp

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

123 changes: 120 additions & 3 deletions src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,15 +193,53 @@ error_code replica_bulk_loader::do_bulk_load(const std::string &app_name,
bulk_load_status::type meta_status,
const std::string &cluster_name,
const std::string &provider_name)
{
if (status() != partition_status::PS_PRIMARY && status() != partition_status::PS_SECONDARY) {
return ERR_INVALID_STATE;
}

bulk_load_status::type local_status = _status;
error_code ec = validate_bulk_load_status(meta_status, local_status);
if (ec != ERR_OK) {
derror_replica("invalid bulk load status, remote = {}, local = {}",
enum_to_string(meta_status),
enum_to_string(local_status));
return ec;
}

switch (meta_status) {
case bulk_load_status::BLS_DOWNLOADING:
// TODO(heyuchen): add restart downloading status check
if (local_status == bulk_load_status::BLS_INVALID) {
ec = start_download(app_name, cluster_name, provider_name);
}
break;
case bulk_load_status::BLS_INGESTING:
if (local_status == bulk_load_status::BLS_DOWNLOADED) {
start_ingestion();
} else if (local_status == bulk_load_status::BLS_INGESTING &&
status() == partition_status::PS_PRIMARY) {
check_ingestion_finish();
}
break;
// TODO(heyuchen): add other bulk load status
default:
break;
}
return ec;
}

error_code replica_bulk_loader::validate_bulk_load_status(bulk_load_status::type meta_status,
bulk_load_status::type local_status)
{
// TODO(heyuchen): TBD
return ERR_OK;
}

// ThreadPool: THREAD_POOL_REPLICATION
error_code replica_bulk_loader::bulk_load_start_download(const std::string &app_name,
const std::string &cluster_name,
const std::string &provider_name)
error_code replica_bulk_loader::start_download(const std::string &app_name,
const std::string &cluster_name,
const std::string &provider_name)
{
if (_stub->_bulk_load_downloading_count.load() >=
_stub->_max_concurrent_bulk_load_downloading_count) {
Expand Down Expand Up @@ -380,6 +418,35 @@ void replica_bulk_loader::check_download_finish()
}
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica_bulk_loader::start_ingestion()
{
_status = bulk_load_status::BLS_INGESTING;
// TODO(heyuchen): add perf-counter
if (status() == partition_status::PS_PRIMARY) {
_replica->_primary_states.ingestion_is_empty_prepare_sent = false;
}
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica_bulk_loader::check_ingestion_finish()
{
if (_replica->_app->get_ingestion_status() == ingestion_status::IS_SUCCEED &&
!_replica->_primary_states.ingestion_is_empty_prepare_sent) {
// send an empty prepare to gurantee secondary commit ingestion request, and set
// `pop_all_committed_mutations` as true
// ingestion is a special write request, replay this mutation can not learn data from
// external files, so when ingestion succeed, we should create a checkpoint
// if learn is evoked after ingestion, we should gurantee that learner should learn from
// checkpoint, to gurantee the condition above, we should pop all committed mutations in
// prepare list to gurantee learn type is LT_APP
mutation_ptr mu = _replica->new_mutation(invalid_decree);
mu->add_client_request(RPC_REPLICATION_WRITE_EMPTY, nullptr);
_replica->init_prepare(mu, false, true);
_replica->_primary_states.ingestion_is_empty_prepare_sent = true;
}
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica_bulk_loader::cleanup_download_task()
{
Expand Down Expand Up @@ -414,6 +481,9 @@ void replica_bulk_loader::report_bulk_load_states_to_meta(bulk_load_status::type
case bulk_load_status::BLS_DOWNLOADED:
report_group_download_progress(response);
break;
case bulk_load_status::BLS_INGESTING:
report_group_ingestion_status(response);
break;
// TODO(heyuchen): add other status
default:
break;
Expand Down Expand Up @@ -463,6 +533,50 @@ void replica_bulk_loader::report_group_download_progress(/*out*/ bulk_load_respo
response.__set_total_download_progress(total_progress);
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica_bulk_loader::report_group_ingestion_status(/*out*/ bulk_load_response &response)
{
if (status() != partition_status::PS_PRIMARY) {
dwarn_replica("replica status={}, should be {}",
enum_to_string(status()),
enum_to_string(partition_status::PS_PRIMARY));
response.err = ERR_INVALID_STATE;
return;
}

partition_bulk_load_state primary_state;
primary_state.__set_ingest_status(_replica->_app->get_ingestion_status());
response.group_bulk_load_state[_replica->_primary_states.membership.primary] = primary_state;
ddebug_replica("primary = {}, ingestion status = {}",
_replica->_primary_states.membership.primary.to_string(),
enum_to_string(primary_state.ingest_status));

bool is_group_ingestion_finish = primary_state.ingest_status == ingestion_status::IS_SUCCEED;
for (const auto &target_address : _replica->_primary_states.membership.secondaries) {
const auto &secondary_state =
_replica->_primary_states.secondary_bulk_load_states[target_address];
ingestion_status::type ingest_status = secondary_state.__isset.ingest_status
? secondary_state.ingest_status
: ingestion_status::IS_INVALID;
ddebug_replica("secondary = {}, ingestion status={}",
target_address.to_string(),
enum_to_string(ingest_status));
response.group_bulk_load_state[target_address] = secondary_state;
is_group_ingestion_finish =
is_group_ingestion_finish && (ingest_status == ingestion_status::IS_SUCCEED);
}
response.__set_is_group_ingestion_finished(
is_group_ingestion_finish && (_replica->_primary_states.membership.secondaries.size() + 1 ==
_replica->_primary_states.membership.max_replica_count));

// if group ingestion finish, recover wirte immediately
if (is_group_ingestion_finish) {
ddebug_replica("finish ingestion, recover write");
_replica->_is_bulk_load_ingestion = false;
// TODO(heyuchen): reset perf-counter
}
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica_bulk_loader::report_bulk_load_states_to_primary(
bulk_load_status::type remote_status,
Expand All @@ -481,6 +595,9 @@ void replica_bulk_loader::report_bulk_load_states_to_primary(
bulk_load_state.__set_download_progress(_download_progress.load());
bulk_load_state.__set_download_status(_download_status.load());
break;
case bulk_load_status::BLS_INGESTING:
bulk_load_state.__set_ingest_status(_replica->_app->get_ingestion_status());
break;
// TODO(heyuchen): add other status
default:
break;
Expand Down
16 changes: 13 additions & 3 deletions src/dist/replication/lib/bulk_load/replica_bulk_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,19 @@ class replica_bulk_loader : replica_base
const std::string &cluster_name,
const std::string &provider_name);

// compare meta bulk load status and local bulk load status
// \return ERR_INVALID_STATE if local status is invalid
// for example, if meta status is ingestion, replica local status can only be downloaded or
// ingestion, if local status is other status, will return ERR_INVALID_STATE
error_code validate_bulk_load_status(bulk_load_status::type meta_status,
bulk_load_status::type local_status);

// replica start or restart download sst files from remote provider
// \return ERR_BUSY if node has already had enough replica executing downloading
// \return download errors by function `download_sst_files`
error_code bulk_load_start_download(const std::string &app_name,
const std::string &cluster_name,
const std::string &provider_name);
error_code start_download(const std::string &app_name,
const std::string &cluster_name,
const std::string &provider_name);

// download metadata and sst files from remote provider
// metadata and sst files will be downloaded in {_dir}/.bulk_load directory
Expand All @@ -58,6 +65,8 @@ class replica_bulk_loader : replica_base

void try_decrease_bulk_load_download_count();
void check_download_finish();
void start_ingestion();
void check_ingestion_finish();

void cleanup_download_task();
void clear_bulk_load_states();
Expand All @@ -66,6 +75,7 @@ class replica_bulk_loader : replica_base
bool report_metadata,
/*out*/ bulk_load_response &response);
void report_group_download_progress(/*out*/ bulk_load_response &response);
void report_group_ingestion_status(/*out*/ bulk_load_response &response);

void report_bulk_load_states_to_primary(bulk_load_status::type remote_status,
/*out*/ group_bulk_load_response &response);
Expand Down
Loading

0 comments on commit f0efbd2

Please sign in to comment.