Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat(bulk-load): meta handle bulk_load_response #463

Merged
merged 6 commits into from
May 18, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/dist/replication/common/replication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,10 @@ const std::string replica_envs::ROCKSDB_CHECKPOINT_RESERVE_TIME_SECONDS(
const std::string replica_envs::ROCKSDB_ITERATION_THRESHOLD_TIME_MS(
"replica.rocksdb_iteration_threshold_time_ms");
const std::string replica_envs::BUSINESS_INFO("business.info");

const std::string bulk_load_constant::BULK_LOAD_INFO("bulk_load_info");
const int32_t bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL = 10;
const int32_t bulk_load_constant::BULK_LOAD_REQUEST_SHORT_INTERVAL = 5;

namespace cold_backup {
std::string get_policy_path(const std::string &root, const std::string &policy_name)
Expand Down
2 changes: 2 additions & 0 deletions src/dist/replication/common/replication_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ class bulk_load_constant
{
public:
static const std::string BULK_LOAD_INFO;
static const int32_t BULK_LOAD_REQUEST_INTERVAL;
static const int32_t BULK_LOAD_REQUEST_SHORT_INTERVAL;
// TODO(heyuchen): add more constant in further pr
};

Expand Down
176 changes: 176 additions & 0 deletions src/dist/replication/meta_server/meta_bulk_load_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,184 @@ void bulk_load_service::partition_bulk_load(const std::string &app_name, const g
void bulk_load_service::on_partition_bulk_load_reply(error_code err,
const bulk_load_request &request,
const bulk_load_response &response)
{
const std::string &app_name = request.app_name;
const gpid &pid = request.pid;
const rpc_address &primary_addr = request.primary_addr;
int32_t interval = bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL;

if (err != ERR_OK) {
derror_f("app({}), partition({}) failed to recevie bulk load response, error = {}",
pid.get_app_id(),
pid,
err.to_string());
try_rollback_to_downloading(app_name, pid);
try_resend_bulk_load_request(app_name, pid, interval);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to deal with a nonrecoverable error, e.g. files in remote storage is conrrupt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On L353 ~ L363, the app and partitions' bulk load status will be set as bulk_load_failed (will be implemented in functionhandle_bulk_load_failed), and resend request to replica server to cleanup bulk load states and context.

return;
}

if (response.err == ERR_OBJECT_NOT_FOUND || response.err == ERR_INVALID_STATE) {
derror_f(
"app({}), partition({}) doesn't exist or has invalid state on node({}), error = {}",
app_name,
hycdong marked this conversation as resolved.
Show resolved Hide resolved
pid,
primary_addr.to_string(),
response.err.to_string());
try_rollback_to_downloading(app_name, pid);
try_resend_bulk_load_request(app_name, pid, interval);
return;
}

if (response.err == ERR_BUSY) {
dwarn_f("node({}) has enough replicas downloading, wait to next round to send bulk load "
hycdong marked this conversation as resolved.
Show resolved Hide resolved
"request for app({}), partition({})",
primary_addr.to_string(),
app_name,
pid);
try_resend_bulk_load_request(app_name, pid, interval);
return;
}

if (response.err != ERR_OK) {
derror_f("app({}), partition({}) handle bulk load response failed, error = {}, primary "
"status = {}",
app_name,
pid,
response.err.to_string(),
dsn::enum_to_string(response.primary_bulk_load_status));
handle_bulk_load_failed(pid.get_app_id());
try_resend_bulk_load_request(app_name, pid, interval);
return;
}

// response.err = ERR_OK
ballot current_ballot;
{
zauto_read_lock l(app_lock());
std::shared_ptr<app_state> app = _state->get_app(pid.get_app_id());
if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
dwarn_f("app(name={}, id={}) is not existed, set bulk load failed",
app_name,
pid.get_app_id());
handle_app_unavailable(pid.get_app_id(), app_name);
return;
}
current_ballot = app->partitions[pid.get_partition_index()].ballot;
}
if (request.ballot < current_ballot) {
dwarn_f("receive out-date response, app({}), partition({}), request ballot = {}, "
"current ballot= {}",
app_name,
pid,
request.ballot,
current_ballot);
try_rollback_to_downloading(app_name, pid);
try_resend_bulk_load_request(app_name, pid, interval);
return;
}

// handle bulk load states reported from primary replica
bulk_load_status::type app_status = get_app_bulk_load_status(response.pid.get_app_id());
switch (app_status) {
case bulk_load_status::BLS_DOWNLOADING:
handle_app_downloading(response, primary_addr);
break;
case bulk_load_status::BLS_DOWNLOADED:
handle_app_downloaded(response);
// when app status is downloaded or ingesting, send request frequently
interval = bulk_load_constant::BULK_LOAD_REQUEST_SHORT_INTERVAL;
break;
case bulk_load_status::BLS_INGESTING:
handle_app_ingestion(response, primary_addr);
interval = bulk_load_constant::BULK_LOAD_REQUEST_SHORT_INTERVAL;
break;
case bulk_load_status::BLS_SUCCEED:
case bulk_load_status::BLS_FAILED:
case bulk_load_status::BLS_CANCELED:
handle_bulk_load_finish(response, primary_addr);
break;
case bulk_load_status::BLS_PAUSING:
handle_app_pausing(response, primary_addr);
break;
case bulk_load_status::BLS_PAUSED:
// paused not send request to replica servers
return;
default:
// do nothing in other status
break;
}

try_resend_bulk_load_request(app_name, pid, interval);
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::try_resend_bulk_load_request(const std::string &app_name,
const gpid &pid,
const int32_t interval)
{
FAIL_POINT_INJECT_F("meta_bulk_load_resend_request", [](dsn::string_view) {});
zauto_read_lock l(_lock);
if (is_app_bulk_loading_unlock(pid.get_app_id())) {
tasking::enqueue(LPC_META_STATE_NORMAL,
_meta_svc->tracker(),
std::bind(&bulk_load_service::partition_bulk_load, this, app_name, pid),
0,
std::chrono::seconds(interval));
}
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::handle_app_downloading(const bulk_load_response &response,
const rpc_address &primary_addr)
{
// TODO(heyuchen): TBD
// called by `on_partition_bulk_load_reply` when app status is downloading
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::handle_app_downloaded(const bulk_load_response &response)
{
// TODO(heyuchen): TBD
// called by `on_partition_bulk_load_reply` when app status is downloaded
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::handle_app_ingestion(const bulk_load_response &response,
const rpc_address &primary_addr)
{
// TODO(heyuchen): TBD
// called by `on_partition_bulk_load_reply` when app status is ingesting
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::handle_bulk_load_finish(const bulk_load_response &response,
const rpc_address &primary_addr)
{
// TODO(heyuchen): TBD
// called by `on_partition_bulk_load_reply` when app status is succeed, failed, canceled
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::handle_app_pausing(const bulk_load_response &response,
const rpc_address &primary_addr)
{
// TODO(heyuchen): TBD
// called by `on_partition_bulk_load_reply` when app status is pausing
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::try_rollback_to_downloading(const std::string &app_name, const gpid &pid)
{
// TODO(heyuchen): TBD
// replica meets error during bulk load, rollback to downloading to retry bulk load
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::handle_bulk_load_failed(int32_t app_id)
{
// TODO(heyuchen): TBD
// replica meets serious error during bulk load, such as file on remote storage is damaged
// should stop bulk load process, set bulk load failed
}

// ThreadPool: THREAD_POOL_META_STATE
Expand Down
44 changes: 44 additions & 0 deletions src/dist/replication/meta_server/meta_bulk_load_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,32 @@ class bulk_load_service
const bulk_load_request &request,
const bulk_load_response &response);

// if app is still in bulk load, resend bulk_load_request to primary after interval seconds
void try_resend_bulk_load_request(const std::string &app_name,
const gpid &pid,
const int32_t interval);

void handle_app_downloading(const bulk_load_response &response,
const rpc_address &primary_addr);

void handle_app_downloaded(const bulk_load_response &response);

void handle_app_ingestion(const bulk_load_response &response, const rpc_address &primary_addr);

// when app status is `succeed, `failed`, `canceled`, meta and replica should cleanup bulk load
// states
void handle_bulk_load_finish(const bulk_load_response &response,
const rpc_address &primary_addr);

void handle_app_pausing(const bulk_load_response &response, const rpc_address &primary_addr);

// app not existed or not available during bulk load
void handle_app_unavailable(int32_t app_id, const std::string &app_name);

void try_rollback_to_downloading(const std::string &app_name, const gpid &pid);

void handle_bulk_load_failed(int32_t app_id);

///
/// update bulk load states to remote storage functions
///
Expand Down Expand Up @@ -202,6 +225,27 @@ class bulk_load_service
}
}

inline bulk_load_status::type get_app_bulk_load_status(int32_t app_id)
{
zauto_read_lock l(_lock);
return get_app_bulk_load_status_unlock(app_id);
}

inline bulk_load_status::type get_app_bulk_load_status_unlock(int32_t app_id) const
{
const auto &iter = _app_bulk_load_info.find(app_id);
if (iter != _app_bulk_load_info.end()) {
return iter->second.status;
} else {
return bulk_load_status::BLS_INVALID;
}
}

inline bool is_app_bulk_loading_unlock(int32_t app_id) const
{
return (_bulk_load_app_id.find(app_id) != _bulk_load_app_id.end());
}

private:
friend class bulk_load_service_test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,5 +102,8 @@ TEST_F(bulk_load_service_test, start_bulk_load_succeed)

fail::teardown();
}

// TODO(heyuchen): add unit tests for on_partition_bulk_load_reply

} // namespace replication
} // namespace dsn