Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat(bulk-load): meta handle bulk_load_response #463

Merged
merged 6 commits into from
May 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/dist/replication/common/replication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,10 @@ const std::string replica_envs::ROCKSDB_CHECKPOINT_RESERVE_TIME_SECONDS(
const std::string replica_envs::ROCKSDB_ITERATION_THRESHOLD_TIME_MS(
"replica.rocksdb_iteration_threshold_time_ms");
const std::string replica_envs::BUSINESS_INFO("business.info");

const std::string bulk_load_constant::BULK_LOAD_INFO("bulk_load_info");
const int32_t bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL = 10;
const int32_t bulk_load_constant::BULK_LOAD_REQUEST_SHORT_INTERVAL = 5;

namespace cold_backup {
std::string get_policy_path(const std::string &root, const std::string &policy_name)
Expand Down
2 changes: 2 additions & 0 deletions src/dist/replication/common/replication_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ class bulk_load_constant
{
public:
static const std::string BULK_LOAD_INFO;
static const int32_t BULK_LOAD_REQUEST_INTERVAL;
static const int32_t BULK_LOAD_REQUEST_SHORT_INTERVAL;
// TODO(heyuchen): add more constant in further pr
};

Expand Down
176 changes: 176 additions & 0 deletions src/dist/replication/meta_server/meta_bulk_load_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,184 @@ void bulk_load_service::partition_bulk_load(const std::string &app_name, const g
void bulk_load_service::on_partition_bulk_load_reply(error_code err,
const bulk_load_request &request,
const bulk_load_response &response)
{
const std::string &app_name = request.app_name;
const gpid &pid = request.pid;
const rpc_address &primary_addr = request.primary_addr;
int32_t interval = bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL;

if (err != ERR_OK) {
derror_f("app({}), partition({}) failed to recevie bulk load response, error = {}",
app_name,
pid,
err.to_string());
try_rollback_to_downloading(app_name, pid);
try_resend_bulk_load_request(app_name, pid, interval);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to deal with a nonrecoverable error, e.g. files in remote storage is conrrupt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On L353 ~ L363, the app and partitions' bulk load status will be set as bulk_load_failed (will be implemented in functionhandle_bulk_load_failed), and resend request to replica server to cleanup bulk load states and context.

return;
}

if (response.err == ERR_OBJECT_NOT_FOUND || response.err == ERR_INVALID_STATE) {
derror_f(
"app({}), partition({}) doesn't exist or has invalid state on node({}), error = {}",
app_name,
hycdong marked this conversation as resolved.
Show resolved Hide resolved
pid,
primary_addr.to_string(),
response.err.to_string());
try_rollback_to_downloading(app_name, pid);
try_resend_bulk_load_request(app_name, pid, interval);
return;
}

if (response.err == ERR_BUSY) {
dwarn_f("node({}) has enough replicas downloading, wait for next round to send bulk load "
"request for app({}), partition({})",
primary_addr.to_string(),
app_name,
pid);
try_resend_bulk_load_request(app_name, pid, interval);
return;
}

if (response.err != ERR_OK) {
derror_f("app({}), partition({}) handle bulk load response failed, error = {}, primary "
"status = {}",
app_name,
pid,
response.err.to_string(),
dsn::enum_to_string(response.primary_bulk_load_status));
handle_bulk_load_failed(pid.get_app_id());
try_resend_bulk_load_request(app_name, pid, interval);
return;
}

// response.err = ERR_OK
ballot current_ballot;
{
zauto_read_lock l(app_lock());
std::shared_ptr<app_state> app = _state->get_app(pid.get_app_id());
if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
dwarn_f("app(name={}, id={}) is not existed, set bulk load failed",
app_name,
pid.get_app_id());
handle_app_unavailable(pid.get_app_id(), app_name);
return;
}
current_ballot = app->partitions[pid.get_partition_index()].ballot;
}
if (request.ballot < current_ballot) {
dwarn_f("receive out-date response, app({}), partition({}), request ballot = {}, "
"current ballot= {}",
app_name,
pid,
request.ballot,
current_ballot);
try_rollback_to_downloading(app_name, pid);
try_resend_bulk_load_request(app_name, pid, interval);
return;
}

// handle bulk load states reported from primary replica
bulk_load_status::type app_status = get_app_bulk_load_status(response.pid.get_app_id());
switch (app_status) {
case bulk_load_status::BLS_DOWNLOADING:
handle_app_downloading(response, primary_addr);
break;
case bulk_load_status::BLS_DOWNLOADED:
handle_app_downloaded(response);
// when app status is downloaded or ingesting, send request frequently
interval = bulk_load_constant::BULK_LOAD_REQUEST_SHORT_INTERVAL;
break;
case bulk_load_status::BLS_INGESTING:
handle_app_ingestion(response, primary_addr);
interval = bulk_load_constant::BULK_LOAD_REQUEST_SHORT_INTERVAL;
break;
case bulk_load_status::BLS_SUCCEED:
case bulk_load_status::BLS_FAILED:
case bulk_load_status::BLS_CANCELED:
handle_bulk_load_finish(response, primary_addr);
break;
case bulk_load_status::BLS_PAUSING:
handle_app_pausing(response, primary_addr);
break;
case bulk_load_status::BLS_PAUSED:
// paused not send request to replica servers
return;
default:
// do nothing in other status
break;
}

try_resend_bulk_load_request(app_name, pid, interval);
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::try_resend_bulk_load_request(const std::string &app_name,
const gpid &pid,
const int32_t interval)
{
FAIL_POINT_INJECT_F("meta_bulk_load_resend_request", [](dsn::string_view) {});
zauto_read_lock l(_lock);
if (is_app_bulk_loading_unlock(pid.get_app_id())) {
tasking::enqueue(LPC_META_STATE_NORMAL,
_meta_svc->tracker(),
std::bind(&bulk_load_service::partition_bulk_load, this, app_name, pid),
0,
std::chrono::seconds(interval));
}
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::handle_app_downloading(const bulk_load_response &response,
const rpc_address &primary_addr)
{
// TODO(heyuchen): TBD
// called by `on_partition_bulk_load_reply` when app status is downloading
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::handle_app_downloaded(const bulk_load_response &response)
{
// TODO(heyuchen): TBD
// called by `on_partition_bulk_load_reply` when app status is downloaded
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::handle_app_ingestion(const bulk_load_response &response,
const rpc_address &primary_addr)
{
// TODO(heyuchen): TBD
// called by `on_partition_bulk_load_reply` when app status is ingesting
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::handle_bulk_load_finish(const bulk_load_response &response,
const rpc_address &primary_addr)
{
// TODO(heyuchen): TBD
// called by `on_partition_bulk_load_reply` when app status is succeed, failed, canceled
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::handle_app_pausing(const bulk_load_response &response,
const rpc_address &primary_addr)
{
// TODO(heyuchen): TBD
// called by `on_partition_bulk_load_reply` when app status is pausing
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::try_rollback_to_downloading(const std::string &app_name, const gpid &pid)
{
// TODO(heyuchen): TBD
// replica meets error during bulk load, rollback to downloading to retry bulk load
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::handle_bulk_load_failed(int32_t app_id)
{
// TODO(heyuchen): TBD
// replica meets serious error during bulk load, such as file on remote storage is damaged
// should stop bulk load process, set bulk load failed
}

// ThreadPool: THREAD_POOL_META_STATE
Expand Down
44 changes: 44 additions & 0 deletions src/dist/replication/meta_server/meta_bulk_load_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,32 @@ class bulk_load_service
const bulk_load_request &request,
const bulk_load_response &response);

// if app is still in bulk load, resend bulk_load_request to primary after interval seconds
void try_resend_bulk_load_request(const std::string &app_name,
const gpid &pid,
const int32_t interval);

void handle_app_downloading(const bulk_load_response &response,
const rpc_address &primary_addr);

void handle_app_downloaded(const bulk_load_response &response);

void handle_app_ingestion(const bulk_load_response &response, const rpc_address &primary_addr);

// when app status is `succeed, `failed`, `canceled`, meta and replica should cleanup bulk load
// states
void handle_bulk_load_finish(const bulk_load_response &response,
const rpc_address &primary_addr);

void handle_app_pausing(const bulk_load_response &response, const rpc_address &primary_addr);

// app not existed or not available during bulk load
void handle_app_unavailable(int32_t app_id, const std::string &app_name);

void try_rollback_to_downloading(const std::string &app_name, const gpid &pid);

void handle_bulk_load_failed(int32_t app_id);

///
/// update bulk load states to remote storage functions
///
Expand Down Expand Up @@ -202,6 +225,27 @@ class bulk_load_service
}
}

inline bulk_load_status::type get_app_bulk_load_status(int32_t app_id)
{
zauto_read_lock l(_lock);
return get_app_bulk_load_status_unlock(app_id);
}

inline bulk_load_status::type get_app_bulk_load_status_unlock(int32_t app_id) const
{
const auto &iter = _app_bulk_load_info.find(app_id);
if (iter != _app_bulk_load_info.end()) {
return iter->second.status;
} else {
return bulk_load_status::BLS_INVALID;
}
}

inline bool is_app_bulk_loading_unlock(int32_t app_id) const
{
return (_bulk_load_app_id.find(app_id) != _bulk_load_app_id.end());
}

private:
friend class bulk_load_service_test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,5 +102,8 @@ TEST_F(bulk_load_service_test, start_bulk_load_succeed)

fail::teardown();
}

// TODO(heyuchen): add unit tests for on_partition_bulk_load_reply

} // namespace replication
} // namespace dsn