Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'master' into aio
Browse files Browse the repository at this point in the history
levy5307 authored Jul 8, 2020

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
2 parents 0b5693e + 89247b7 commit c24c19d
Showing 9 changed files with 168 additions and 65 deletions.
5 changes: 2 additions & 3 deletions include/dsn/tool-api/logging_provider.h
Original file line number Diff line number Diff line change
@@ -39,7 +39,6 @@
#include <stdarg.h>

namespace dsn {

/*!
@addtogroup tool-api-providers
@{
@@ -87,6 +86,6 @@ class logging_provider
static logging_provider *create_default_instance();
};

/*@}*/
// ----------------------- inline implementation ---------------------------------------
void set_log_prefixed_message_func(std::function<std::string()> func);
extern std::function<std::string()> log_prefixed_message_func;
} // namespace dsn
26 changes: 25 additions & 1 deletion src/core/core/logging.cpp
Original file line number Diff line number Diff line change
@@ -39,13 +39,35 @@ DSN_DEFINE_string("core",

DSN_DEFINE_bool("core", logging_flush_on_exit, true, "flush log when exit system");

namespace dsn {
std::function<std::string()> log_prefixed_message_func = []() {
static thread_local std::string prefixed_message;

static thread_local std::once_flag flag;
std::call_once(flag, [&]() {
prefixed_message.resize(23);
int tid = dsn::utils::get_current_tid();
sprintf(const_cast<char *>(prefixed_message.c_str()), "unknown.io-thrd.%05d: ", tid);
});

return prefixed_message;
};

void set_log_prefixed_message_func(std::function<std::string()> func)
{
log_prefixed_message_func = func;
}
} // namespace dsn

static void log_on_sys_exit(::dsn::sys_exit_type)
{
dsn::logging_provider *logger = dsn::logging_provider::instance();
logger->flush();
}

void dsn_log_init(const std::string &logging_factory_name, const std::string &dir_log)
void dsn_log_init(const std::string &logging_factory_name,
const std::string &dir_log,
std::function<std::string()> dsn_log_prefixed_message_func)
{
dsn_log_start_level =
enum_from_string(FLAGS_logging_start_level, dsn_log_level_t::LOG_LEVEL_INVALID);
@@ -92,6 +114,8 @@ void dsn_log_init(const std::string &logging_factory_name, const std::string &di
dsn_log_set_start_level(start_level);
return std::string("OK, current level is ") + enum_to_string(start_level);
});

dsn::set_log_prefixed_message_func(dsn_log_prefixed_message_func);
}

DSN_API dsn_log_level_t dsn_log_get_start_level() { return dsn_log_start_level; }
51 changes: 49 additions & 2 deletions src/core/core/service_api_c.cpp
Original file line number Diff line number Diff line change
@@ -32,6 +32,8 @@
#include <dsn/utility/flags.h>
#include <dsn/tool-api/command_manager.h>
#include <fstream>
#include <dsn/utility/time_utils.h>

#ifdef DSN_ENABLE_GPERF
#include <gperftools/malloc_extension.h>
#endif
@@ -275,7 +277,9 @@ tool_app *get_current_tool() { return dsn_all.tool.get(); }
} // namespace tools
} // namespace dsn

extern void dsn_log_init(const std::string &logging_factory_name, const std::string &dir_log);
extern void dsn_log_init(const std::string &logging_factory_name,
const std::string &dir_log,
std::function<std::string()> dsn_log_prefixed_message_func);
extern void dsn_core_init();

inline void dsn_global_init()
@@ -287,6 +291,49 @@ inline void dsn_global_init()
dsn::service_engine::instance();
}

static std::string dsn_log_prefixed_message_func()
{
std::string res;
res.resize(100);
char *prefixed_message = const_cast<char *>(res.c_str());

int tid = dsn::utils::get_current_tid();
auto t = dsn::task::get_current_task_id();
if (t) {
if (nullptr != dsn::task::get_current_worker2()) {
sprintf(prefixed_message,
"%6s.%7s%d.%016" PRIx64 ": ",
dsn::task::get_current_node_name(),
dsn::task::get_current_worker2()->pool_spec().name.c_str(),
dsn::task::get_current_worker2()->index(),
t);
} else {
sprintf(prefixed_message,
"%6s.%7s.%05d.%016" PRIx64 ": ",
dsn::task::get_current_node_name(),
"io-thrd",
tid,
t);
}
} else {
if (nullptr != dsn::task::get_current_worker2()) {
sprintf(prefixed_message,
"%6s.%7s%u: ",
dsn::task::get_current_node_name(),
dsn::task::get_current_worker2()->pool_spec().name.c_str(),
dsn::task::get_current_worker2()->index());
} else {
sprintf(prefixed_message,
"%6s.%7s.%05d: ",
dsn::task::get_current_node_name(),
"io-thrd",
tid);
}
}

return res;
}

bool run(const char *config_file,
const char *config_arguments,
bool sleep_after_init,
@@ -386,7 +433,7 @@ bool run(const char *config_file,
#endif

// init logging
dsn_log_init(spec.logging_factory_name, spec.dir_log);
dsn_log_init(spec.logging_factory_name, spec.dir_log, dsn_log_prefixed_message_func);

// prepare minimum necessary
::dsn::service_engine::instance().init_before_toollets(spec);
49 changes: 11 additions & 38 deletions src/core/tools/common/simple_logger.cpp
Original file line number Diff line number Diff line change
@@ -57,45 +57,18 @@ static void print_header(FILE *fp, dsn_log_level_t log_level)
{
static char s_level_char[] = "IDWEF";

uint64_t ts = 0;
if (::dsn::tools::is_engine_ready())
ts = dsn_now_ns();

uint64_t ts = dsn_now_ns();
char str[24];
::dsn::utils::time_ms_to_string(ts / 1000000, str);

int tid = ::dsn::utils::get_current_tid();

fprintf(fp, "%c%s (%" PRIu64 " %04x) ", s_level_char[log_level], str, ts, tid);

auto t = task::get_current_task_id();
if (t) {
if (nullptr != task::get_current_worker2()) {
fprintf(fp,
"%6s.%7s%d.%016" PRIx64 ": ",
task::get_current_node_name(),
task::get_current_worker2()->pool_spec().name.c_str(),
task::get_current_worker2()->index(),
t);
} else {
fprintf(fp,
"%6s.%7s.%05d.%016" PRIx64 ": ",
task::get_current_node_name(),
"io-thrd",
tid,
t);
}
} else {
if (nullptr != task::get_current_worker2()) {
fprintf(fp,
"%6s.%7s%u: ",
task::get_current_node_name(),
task::get_current_worker2()->pool_spec().name.c_str(),
task::get_current_worker2()->index());
} else {
fprintf(fp, "%6s.%7s.%05d: ", task::get_current_node_name(), "io-thrd", tid);
}
}
dsn::utils::time_ms_to_string(ts / 1000000, str);

int tid = dsn::utils::get_current_tid();
fprintf(fp,
"%c%s (%" PRIu64 " %04x) %s",
s_level_char[log_level],
str,
ts,
tid,
log_prefixed_message_func().c_str());
}

screen_logger::screen_logger(bool short_header) : logging_provider("./")
13 changes: 8 additions & 5 deletions src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp
Original file line number Diff line number Diff line change
@@ -233,10 +233,13 @@ error_code replica_bulk_loader::do_bulk_load(const std::string &app_name,
case bulk_load_status::BLS_PAUSING:
pause_bulk_load();
break;
case bulk_load_status::BLS_CANCELED: {
case bulk_load_status::BLS_CANCELED:
handle_bulk_load_finish(bulk_load_status::BLS_CANCELED);
} break;
// TODO(heyuchen): add other bulk load status
break;
case bulk_load_status::BLS_FAILED:
handle_bulk_load_finish(bulk_load_status::BLS_FAILED);
// TODO(heyuchen): add perf-counter here
break;
default:
break;
}
@@ -604,12 +607,12 @@ void replica_bulk_loader::report_bulk_load_states_to_meta(bulk_load_status::type
break;
case bulk_load_status::BLS_SUCCEED:
case bulk_load_status::BLS_CANCELED:
case bulk_load_status::BLS_FAILED:
report_group_cleaned_up(response);
break;
case bulk_load_status::BLS_PAUSING:
report_group_is_paused(response);
break;
// TODO(heyuchen): add other status
default:
break;
}
@@ -795,12 +798,12 @@ void replica_bulk_loader::report_bulk_load_states_to_primary(
break;
case bulk_load_status::BLS_SUCCEED:
case bulk_load_status::BLS_CANCELED:
case bulk_load_status::BLS_FAILED:
bulk_load_state.__set_is_cleaned_up(is_cleaned_up());
break;
case bulk_load_status::BLS_PAUSING:
bulk_load_state.__set_is_paused(local_status == bulk_load_status::BLS_PAUSED);
break;
// TODO(heyuchen): add other status
default:
break;
}
Original file line number Diff line number Diff line change
@@ -519,10 +519,13 @@ TEST_F(replica_bulk_loader_test, bulk_load_finish_test)
// Test cases
// - bulk load succeed
// - double bulk load finish
// TODO(heyuchen): add following cases
// istatus, is_ingestion, create_dir will be used in further tests
// - cancel during downloaded
// - cancel during ingestion
// - cancel during succeed
// - failed during downloading
// - failed during ingestion
// Tip: bulk load dir will be removed if bulk load finished, so we should create dir before some
// cases
struct test_struct
{
bulk_load_status::type local_status;
@@ -560,6 +563,18 @@ TEST_F(replica_bulk_loader_test, bulk_load_finish_test)
ingestion_status::IS_INVALID,
false,
bulk_load_status::BLS_CANCELED,
true},
{bulk_load_status::BLS_DOWNLOADING,
10,
ingestion_status::IS_INVALID,
false,
bulk_load_status::BLS_FAILED,
true},
{bulk_load_status::BLS_INGESTING,
100,
ingestion_status::type::IS_FAILED,
false,
bulk_load_status::BLS_FAILED,
true}};

for (auto test : tests) {
22 changes: 14 additions & 8 deletions src/dist/replication/meta_server/meta_bulk_load_service.cpp
Original file line number Diff line number Diff line change
@@ -712,15 +712,21 @@ void bulk_load_service::try_rollback_to_downloading(const std::string &app_name,
// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::handle_bulk_load_failed(int32_t app_id)
{
// TODO(heyuchen): TBD
// replica meets serious error during bulk load, such as file on remote storage is damaged
// should stop bulk load process, set bulk load failed
zauto_write_lock l(_lock);
if (!_apps_cleaning_up[app_id]) {
_apps_cleaning_up[app_id] = true;
update_app_status_on_remote_storage_unlocked(app_id, bulk_load_status::BLS_FAILED);
}
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::handle_app_unavailable(int32_t app_id, const std::string &app_name)
{
// TODO(heyuchen): TBD
zauto_write_lock l(_lock);
if (is_app_bulk_loading_unlocked(app_id) && !_apps_cleaning_up[app_id]) {
_apps_cleaning_up[app_id] = true;
remove_bulk_load_dir_on_remote_storage(app_id, app_name);
}
}

// ThreadPool: THREAD_POOL_META_STATE
@@ -799,7 +805,6 @@ void bulk_load_service::update_partition_status_on_remote_storage_reply(
dsn::enum_to_string(old_status),
dsn::enum_to_string(new_status));

// TODO(heyuchen): add other status
switch (new_status) {
case bulk_load_status::BLS_DOWNLOADED:
case bulk_load_status::BLS_INGESTING:
@@ -822,6 +827,7 @@ void bulk_load_service::update_partition_status_on_remote_storage_reply(
}
break;
default:
// do nothing in other status
break;
}
}
@@ -912,10 +918,10 @@ void bulk_load_service::update_app_status_on_remote_storage_reply(const app_bulk
}
}

// TODO(heyuchen): add other status
if (new_status == bulk_load_status::BLS_PAUSING ||
new_status == bulk_load_status::BLS_DOWNLOADING ||
new_status == bulk_load_status::BLS_CANCELED) {
new_status == bulk_load_status::BLS_CANCELED ||
new_status == bulk_load_status::BLS_FAILED) {
for (int i = 0; i < ainfo.partition_count; ++i) {
update_partition_status_on_remote_storage(
ainfo.app_name, gpid(app_id, i), new_status, should_send_request);
@@ -1088,7 +1094,7 @@ void bulk_load_service::reset_local_bulk_load_states(int32_t app_id, const std::
erase_map_elem_by_id(app_id, _partition_bulk_load_info);
erase_map_elem_by_id(app_id, _partitions_total_download_progress);
erase_map_elem_by_id(app_id, _partitions_cleaned_up);
// TODO(heyuchen): add other varieties
_apps_cleaning_up.erase(app_id);
_bulk_load_app_id.erase(app_id);
ddebug_f("reset local app({}) bulk load context", app_name);
}
2 changes: 2 additions & 0 deletions src/dist/replication/meta_server/meta_bulk_load_service.h
Original file line number Diff line number Diff line change
@@ -334,6 +334,8 @@ class bulk_load_service
_partitions_bulk_load_state;

std::unordered_map<gpid, bool> _partitions_cleaned_up;
// Used for bulk load failed and app unavailable to avoid duplicated clean up
std::unordered_map<app_id, bool> _apps_cleaning_up;
};

} // namespace replication
Original file line number Diff line number Diff line change
@@ -350,9 +350,12 @@ class bulk_load_process_test : public bulk_load_service_test

/// on_partition_bulk_load_reply unit tests

// TODO(heyuchen):
// add `downloading_fs_error` unit tests after implement function `handle_bulk_load_failed`
// add `downloading_corrupt` unit tests after implement function `handle_bulk_load_failed`
TEST_F(bulk_load_process_test, downloading_fs_error)
{
test_on_partition_bulk_load_reply(
_partition_count, bulk_load_status::BLS_DOWNLOADING, ERR_FS_INTERNAL);
ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_FAILED);
}

TEST_F(bulk_load_process_test, downloading_busy)
{
@@ -361,6 +364,13 @@ TEST_F(bulk_load_process_test, downloading_busy)
ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_DOWNLOADING);
}

TEST_F(bulk_load_process_test, downloading_corrupt)
{
mock_response_progress(ERR_CORRUPTION, false);
test_on_partition_bulk_load_reply(_partition_count, bulk_load_status::BLS_DOWNLOADING);
ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_FAILED);
}

TEST_F(bulk_load_process_test, downloading_report_metadata)
{
mock_response_bulk_load_metadata();
@@ -399,7 +409,12 @@ TEST_F(bulk_load_process_test, ingestion_running)
ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_INGESTING);
}

// TODO(heyuchen): add ingestion_error unit tests after implement function `handle_app_failed`
TEST_F(bulk_load_process_test, ingestion_error)
{
mock_response_ingestion_status(ingestion_status::IS_FAILED);
test_on_partition_bulk_load_reply(_partition_count, bulk_load_status::BLS_INGESTING);
ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_FAILED);
}

TEST_F(bulk_load_process_test, normal_succeed)
{
@@ -436,7 +451,19 @@ TEST_F(bulk_load_process_test, cancel_all_finished)
ASSERT_FALSE(app_is_bulk_loading(APP_NAME));
}

// TODO(heyuchen): add half cleanup test while failed
TEST_F(bulk_load_process_test, failed_not_all_finished)
{
mock_response_cleaned_up_flag(false, bulk_load_status::BLS_FAILED);
test_on_partition_bulk_load_reply(_partition_count, bulk_load_status::BLS_FAILED);
ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_FAILED);
}

TEST_F(bulk_load_process_test, failed_all_finished)
{
mock_response_cleaned_up_flag(true, bulk_load_status::BLS_FAILED);
test_on_partition_bulk_load_reply(1, bulk_load_status::BLS_FAILED);
ASSERT_FALSE(app_is_bulk_loading(APP_NAME));
}

TEST_F(bulk_load_process_test, pausing)
{
@@ -457,7 +484,6 @@ TEST_F(bulk_load_process_test, pause_succeed)
/// on_partition_ingestion_reply unit tests
// TODO(heyuchen):
// add ingest_rpc_error unit tests after implement function `rollback_downloading`
// add ingest_wrong unit tests after implement function `handle_app_failed`

TEST_F(bulk_load_process_test, ingest_empty_write_error)
{
@@ -467,6 +493,14 @@ TEST_F(bulk_load_process_test, ingest_empty_write_error)
ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_INGESTING);
}

TEST_F(bulk_load_process_test, ingest_wrong)
{
mock_ingestion_context(ERR_OK, 1, _partition_count);
test_on_partition_ingestion_reply(_ingestion_resp, gpid(_app_id, _pidx));
wait_all();
ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_FAILED);
}

TEST_F(bulk_load_process_test, ingest_succeed)
{
mock_ingestion_context(ERR_OK, 0, 1);

0 comments on commit c24c19d

Please sign in to comment.