diff --git a/include/dsn/tool-api/logging_provider.h b/include/dsn/tool-api/logging_provider.h index f2a0a95f37..b203f64f1b 100644 --- a/include/dsn/tool-api/logging_provider.h +++ b/include/dsn/tool-api/logging_provider.h @@ -39,7 +39,6 @@ #include namespace dsn { - /*! @addtogroup tool-api-providers @{ @@ -87,6 +86,6 @@ class logging_provider static logging_provider *create_default_instance(); }; -/*@}*/ -// ----------------------- inline implementation --------------------------------------- +void set_log_prefixed_message_func(std::function func); +extern std::function log_prefixed_message_func; } // namespace dsn diff --git a/src/core/core/logging.cpp b/src/core/core/logging.cpp index 6a64b38085..a39d18d05d 100644 --- a/src/core/core/logging.cpp +++ b/src/core/core/logging.cpp @@ -39,13 +39,35 @@ DSN_DEFINE_string("core", DSN_DEFINE_bool("core", logging_flush_on_exit, true, "flush log when exit system"); +namespace dsn { +std::function log_prefixed_message_func = []() { + static thread_local std::string prefixed_message; + + static thread_local std::once_flag flag; + std::call_once(flag, [&]() { + prefixed_message.resize(23); + int tid = dsn::utils::get_current_tid(); + sprintf(const_cast(prefixed_message.c_str()), "unknown.io-thrd.%05d: ", tid); + }); + + return prefixed_message; +}; + +void set_log_prefixed_message_func(std::function func) +{ + log_prefixed_message_func = func; +} +} // namespace dsn + static void log_on_sys_exit(::dsn::sys_exit_type) { dsn::logging_provider *logger = dsn::logging_provider::instance(); logger->flush(); } -void dsn_log_init(const std::string &logging_factory_name, const std::string &dir_log) +void dsn_log_init(const std::string &logging_factory_name, + const std::string &dir_log, + std::function dsn_log_prefixed_message_func) { dsn_log_start_level = enum_from_string(FLAGS_logging_start_level, dsn_log_level_t::LOG_LEVEL_INVALID); @@ -92,6 +114,8 @@ void dsn_log_init(const std::string &logging_factory_name, const std::string &di dsn_log_set_start_level(start_level); return std::string("OK, current level is ") + enum_to_string(start_level); }); + + dsn::set_log_prefixed_message_func(dsn_log_prefixed_message_func); } DSN_API dsn_log_level_t dsn_log_get_start_level() { return dsn_log_start_level; } diff --git a/src/core/core/service_api_c.cpp b/src/core/core/service_api_c.cpp index 37fcb225be..4740c7c51f 100644 --- a/src/core/core/service_api_c.cpp +++ b/src/core/core/service_api_c.cpp @@ -32,6 +32,8 @@ #include #include #include +#include + #ifdef DSN_ENABLE_GPERF #include #endif @@ -275,7 +277,9 @@ tool_app *get_current_tool() { return dsn_all.tool.get(); } } // namespace tools } // namespace dsn -extern void dsn_log_init(const std::string &logging_factory_name, const std::string &dir_log); +extern void dsn_log_init(const std::string &logging_factory_name, + const std::string &dir_log, + std::function dsn_log_prefixed_message_func); extern void dsn_core_init(); inline void dsn_global_init() @@ -287,6 +291,49 @@ inline void dsn_global_init() dsn::service_engine::instance(); } +static std::string dsn_log_prefixed_message_func() +{ + std::string res; + res.resize(100); + char *prefixed_message = const_cast(res.c_str()); + + int tid = dsn::utils::get_current_tid(); + auto t = dsn::task::get_current_task_id(); + if (t) { + if (nullptr != dsn::task::get_current_worker2()) { + sprintf(prefixed_message, + "%6s.%7s%d.%016" PRIx64 ": ", + dsn::task::get_current_node_name(), + dsn::task::get_current_worker2()->pool_spec().name.c_str(), + dsn::task::get_current_worker2()->index(), + t); + } else { + sprintf(prefixed_message, + "%6s.%7s.%05d.%016" PRIx64 ": ", + dsn::task::get_current_node_name(), + "io-thrd", + tid, + t); + } + } else { + if (nullptr != dsn::task::get_current_worker2()) { + sprintf(prefixed_message, + "%6s.%7s%u: ", + dsn::task::get_current_node_name(), + dsn::task::get_current_worker2()->pool_spec().name.c_str(), + dsn::task::get_current_worker2()->index()); + } else { + sprintf(prefixed_message, + "%6s.%7s.%05d: ", + dsn::task::get_current_node_name(), + "io-thrd", + tid); + } + } + + return res; +} + bool run(const char *config_file, const char *config_arguments, bool sleep_after_init, @@ -386,7 +433,7 @@ bool run(const char *config_file, #endif // init logging - dsn_log_init(spec.logging_factory_name, spec.dir_log); + dsn_log_init(spec.logging_factory_name, spec.dir_log, dsn_log_prefixed_message_func); // prepare minimum necessary ::dsn::service_engine::instance().init_before_toollets(spec); diff --git a/src/core/tools/common/simple_logger.cpp b/src/core/tools/common/simple_logger.cpp index 9b1c77c0f3..20ff274a23 100644 --- a/src/core/tools/common/simple_logger.cpp +++ b/src/core/tools/common/simple_logger.cpp @@ -57,45 +57,18 @@ static void print_header(FILE *fp, dsn_log_level_t log_level) { static char s_level_char[] = "IDWEF"; - uint64_t ts = 0; - if (::dsn::tools::is_engine_ready()) - ts = dsn_now_ns(); - + uint64_t ts = dsn_now_ns(); char str[24]; - ::dsn::utils::time_ms_to_string(ts / 1000000, str); - - int tid = ::dsn::utils::get_current_tid(); - - fprintf(fp, "%c%s (%" PRIu64 " %04x) ", s_level_char[log_level], str, ts, tid); - - auto t = task::get_current_task_id(); - if (t) { - if (nullptr != task::get_current_worker2()) { - fprintf(fp, - "%6s.%7s%d.%016" PRIx64 ": ", - task::get_current_node_name(), - task::get_current_worker2()->pool_spec().name.c_str(), - task::get_current_worker2()->index(), - t); - } else { - fprintf(fp, - "%6s.%7s.%05d.%016" PRIx64 ": ", - task::get_current_node_name(), - "io-thrd", - tid, - t); - } - } else { - if (nullptr != task::get_current_worker2()) { - fprintf(fp, - "%6s.%7s%u: ", - task::get_current_node_name(), - task::get_current_worker2()->pool_spec().name.c_str(), - task::get_current_worker2()->index()); - } else { - fprintf(fp, "%6s.%7s.%05d: ", task::get_current_node_name(), "io-thrd", tid); - } - } + dsn::utils::time_ms_to_string(ts / 1000000, str); + + int tid = dsn::utils::get_current_tid(); + fprintf(fp, + "%c%s (%" PRIu64 " %04x) %s", + s_level_char[log_level], + str, + ts, + tid, + log_prefixed_message_func().c_str()); } screen_logger::screen_logger(bool short_header) : logging_provider("./") diff --git a/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp b/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp index c7bf61fa0f..e361cc0fb6 100644 --- a/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp +++ b/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp @@ -233,10 +233,13 @@ error_code replica_bulk_loader::do_bulk_load(const std::string &app_name, case bulk_load_status::BLS_PAUSING: pause_bulk_load(); break; - case bulk_load_status::BLS_CANCELED: { + case bulk_load_status::BLS_CANCELED: handle_bulk_load_finish(bulk_load_status::BLS_CANCELED); - } break; - // TODO(heyuchen): add other bulk load status + break; + case bulk_load_status::BLS_FAILED: + handle_bulk_load_finish(bulk_load_status::BLS_FAILED); + // TODO(heyuchen): add perf-counter here + break; default: break; } @@ -604,12 +607,12 @@ void replica_bulk_loader::report_bulk_load_states_to_meta(bulk_load_status::type break; case bulk_load_status::BLS_SUCCEED: case bulk_load_status::BLS_CANCELED: + case bulk_load_status::BLS_FAILED: report_group_cleaned_up(response); break; case bulk_load_status::BLS_PAUSING: report_group_is_paused(response); break; - // TODO(heyuchen): add other status default: break; } @@ -795,12 +798,12 @@ void replica_bulk_loader::report_bulk_load_states_to_primary( break; case bulk_load_status::BLS_SUCCEED: case bulk_load_status::BLS_CANCELED: + case bulk_load_status::BLS_FAILED: bulk_load_state.__set_is_cleaned_up(is_cleaned_up()); break; case bulk_load_status::BLS_PAUSING: bulk_load_state.__set_is_paused(local_status == bulk_load_status::BLS_PAUSED); break; - // TODO(heyuchen): add other status default: break; } diff --git a/src/dist/replication/lib/bulk_load/test/replica_bulk_loader_test.cpp b/src/dist/replication/lib/bulk_load/test/replica_bulk_loader_test.cpp index 9692ddbe67..1fb7558bb1 100644 --- a/src/dist/replication/lib/bulk_load/test/replica_bulk_loader_test.cpp +++ b/src/dist/replication/lib/bulk_load/test/replica_bulk_loader_test.cpp @@ -519,10 +519,13 @@ TEST_F(replica_bulk_loader_test, bulk_load_finish_test) // Test cases // - bulk load succeed // - double bulk load finish - // TODO(heyuchen): add following cases - // istatus, is_ingestion, create_dir will be used in further tests + // - cancel during downloaded + // - cancel during ingestion + // - cancel during succeed // - failed during downloading // - failed during ingestion + // Tip: bulk load dir will be removed if bulk load finished, so we should create dir before some + // cases struct test_struct { bulk_load_status::type local_status; @@ -560,6 +563,18 @@ TEST_F(replica_bulk_loader_test, bulk_load_finish_test) ingestion_status::IS_INVALID, false, bulk_load_status::BLS_CANCELED, + true}, + {bulk_load_status::BLS_DOWNLOADING, + 10, + ingestion_status::IS_INVALID, + false, + bulk_load_status::BLS_FAILED, + true}, + {bulk_load_status::BLS_INGESTING, + 100, + ingestion_status::type::IS_FAILED, + false, + bulk_load_status::BLS_FAILED, true}}; for (auto test : tests) { diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.cpp b/src/dist/replication/meta_server/meta_bulk_load_service.cpp index 2b04987ff4..5a5c4bb95b 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.cpp +++ b/src/dist/replication/meta_server/meta_bulk_load_service.cpp @@ -712,15 +712,21 @@ void bulk_load_service::try_rollback_to_downloading(const std::string &app_name, // ThreadPool: THREAD_POOL_META_STATE void bulk_load_service::handle_bulk_load_failed(int32_t app_id) { - // TODO(heyuchen): TBD - // replica meets serious error during bulk load, such as file on remote storage is damaged - // should stop bulk load process, set bulk load failed + zauto_write_lock l(_lock); + if (!_apps_cleaning_up[app_id]) { + _apps_cleaning_up[app_id] = true; + update_app_status_on_remote_storage_unlocked(app_id, bulk_load_status::BLS_FAILED); + } } // ThreadPool: THREAD_POOL_META_STATE void bulk_load_service::handle_app_unavailable(int32_t app_id, const std::string &app_name) { - // TODO(heyuchen): TBD + zauto_write_lock l(_lock); + if (is_app_bulk_loading_unlocked(app_id) && !_apps_cleaning_up[app_id]) { + _apps_cleaning_up[app_id] = true; + remove_bulk_load_dir_on_remote_storage(app_id, app_name); + } } // ThreadPool: THREAD_POOL_META_STATE @@ -799,7 +805,6 @@ void bulk_load_service::update_partition_status_on_remote_storage_reply( dsn::enum_to_string(old_status), dsn::enum_to_string(new_status)); - // TODO(heyuchen): add other status switch (new_status) { case bulk_load_status::BLS_DOWNLOADED: case bulk_load_status::BLS_INGESTING: @@ -822,6 +827,7 @@ void bulk_load_service::update_partition_status_on_remote_storage_reply( } break; default: + // do nothing in other status break; } } @@ -912,10 +918,10 @@ void bulk_load_service::update_app_status_on_remote_storage_reply(const app_bulk } } - // TODO(heyuchen): add other status if (new_status == bulk_load_status::BLS_PAUSING || new_status == bulk_load_status::BLS_DOWNLOADING || - new_status == bulk_load_status::BLS_CANCELED) { + new_status == bulk_load_status::BLS_CANCELED || + new_status == bulk_load_status::BLS_FAILED) { for (int i = 0; i < ainfo.partition_count; ++i) { update_partition_status_on_remote_storage( ainfo.app_name, gpid(app_id, i), new_status, should_send_request); @@ -1088,7 +1094,7 @@ void bulk_load_service::reset_local_bulk_load_states(int32_t app_id, const std:: erase_map_elem_by_id(app_id, _partition_bulk_load_info); erase_map_elem_by_id(app_id, _partitions_total_download_progress); erase_map_elem_by_id(app_id, _partitions_cleaned_up); - // TODO(heyuchen): add other varieties + _apps_cleaning_up.erase(app_id); _bulk_load_app_id.erase(app_id); ddebug_f("reset local app({}) bulk load context", app_name); } diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.h b/src/dist/replication/meta_server/meta_bulk_load_service.h index ef3e4fc1cd..55b3c17a4c 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.h +++ b/src/dist/replication/meta_server/meta_bulk_load_service.h @@ -334,6 +334,8 @@ class bulk_load_service _partitions_bulk_load_state; std::unordered_map _partitions_cleaned_up; + // Used for bulk load failed and app unavailable to avoid duplicated clean up + std::unordered_map _apps_cleaning_up; }; } // namespace replication diff --git a/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp b/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp index 17b9876fe3..9f38a896f6 100644 --- a/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp +++ b/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp @@ -350,9 +350,12 @@ class bulk_load_process_test : public bulk_load_service_test /// on_partition_bulk_load_reply unit tests -// TODO(heyuchen): -// add `downloading_fs_error` unit tests after implement function `handle_bulk_load_failed` -// add `downloading_corrupt` unit tests after implement function `handle_bulk_load_failed` +TEST_F(bulk_load_process_test, downloading_fs_error) +{ + test_on_partition_bulk_load_reply( + _partition_count, bulk_load_status::BLS_DOWNLOADING, ERR_FS_INTERNAL); + ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_FAILED); +} TEST_F(bulk_load_process_test, downloading_busy) { @@ -361,6 +364,13 @@ TEST_F(bulk_load_process_test, downloading_busy) ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_DOWNLOADING); } +TEST_F(bulk_load_process_test, downloading_corrupt) +{ + mock_response_progress(ERR_CORRUPTION, false); + test_on_partition_bulk_load_reply(_partition_count, bulk_load_status::BLS_DOWNLOADING); + ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_FAILED); +} + TEST_F(bulk_load_process_test, downloading_report_metadata) { mock_response_bulk_load_metadata(); @@ -399,7 +409,12 @@ TEST_F(bulk_load_process_test, ingestion_running) ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_INGESTING); } -// TODO(heyuchen): add ingestion_error unit tests after implement function `handle_app_failed` +TEST_F(bulk_load_process_test, ingestion_error) +{ + mock_response_ingestion_status(ingestion_status::IS_FAILED); + test_on_partition_bulk_load_reply(_partition_count, bulk_load_status::BLS_INGESTING); + ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_FAILED); +} TEST_F(bulk_load_process_test, normal_succeed) { @@ -436,7 +451,19 @@ TEST_F(bulk_load_process_test, cancel_all_finished) ASSERT_FALSE(app_is_bulk_loading(APP_NAME)); } -// TODO(heyuchen): add half cleanup test while failed +TEST_F(bulk_load_process_test, failed_not_all_finished) +{ + mock_response_cleaned_up_flag(false, bulk_load_status::BLS_FAILED); + test_on_partition_bulk_load_reply(_partition_count, bulk_load_status::BLS_FAILED); + ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_FAILED); +} + +TEST_F(bulk_load_process_test, failed_all_finished) +{ + mock_response_cleaned_up_flag(true, bulk_load_status::BLS_FAILED); + test_on_partition_bulk_load_reply(1, bulk_load_status::BLS_FAILED); + ASSERT_FALSE(app_is_bulk_loading(APP_NAME)); +} TEST_F(bulk_load_process_test, pausing) { @@ -457,7 +484,6 @@ TEST_F(bulk_load_process_test, pause_succeed) /// on_partition_ingestion_reply unit tests // TODO(heyuchen): // add ingest_rpc_error unit tests after implement function `rollback_downloading` -// add ingest_wrong unit tests after implement function `handle_app_failed` TEST_F(bulk_load_process_test, ingest_empty_write_error) { @@ -467,6 +493,14 @@ TEST_F(bulk_load_process_test, ingest_empty_write_error) ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_INGESTING); } +TEST_F(bulk_load_process_test, ingest_wrong) +{ + mock_ingestion_context(ERR_OK, 1, _partition_count); + test_on_partition_ingestion_reply(_ingestion_resp, gpid(_app_id, _pidx)); + wait_all(); + ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_FAILED); +} + TEST_F(bulk_load_process_test, ingest_succeed) { mock_ingestion_context(ERR_OK, 0, 1);