diff --git a/src/block_service/test/CMakeLists.txt b/src/block_service/test/CMakeLists.txt index 092bcb9466..36eaf0e24e 100644 --- a/src/block_service/test/CMakeLists.txt +++ b/src/block_service/test/CMakeLists.txt @@ -40,6 +40,7 @@ set(MY_PROJ_LIBS set(MY_BINPLACES config-test.ini run.sh + clear.sh ) dsn_add_test() diff --git a/src/block_service/test/clear.sh b/src/block_service/test/clear.sh index 466b357864..ef2373641a 100755 --- a/src/block_service/test/clear.sh +++ b/src/block_service/test/clear.sh @@ -18,4 +18,4 @@ # under the License. ############################################################################## -rm -rf log.* *.log data dsn_block_service_test.xml randomfile* +rm -rf log.* *.log data dsn_block_service_test.xml randomfile* rename_dir* test_dir diff --git a/src/block_service/test/hdfs_service_test.cpp b/src/block_service/test/hdfs_service_test.cpp index eb4d2bfe35..01fb148146 100644 --- a/src/block_service/test/hdfs_service_test.cpp +++ b/src/block_service/test/hdfs_service_test.cpp @@ -52,14 +52,22 @@ class HDFSClientTest : public testing::Test virtual void SetUp() override; virtual void TearDown() override; void generate_test_file(const char *filename); + void write_test_files_async(); std::string name_node; std::string backup_path; + std::string local_test_dir; + std::string test_data_str; }; void HDFSClientTest::SetUp() { name_node = FLAGS_test_name_node; backup_path = FLAGS_test_backup_path; + local_test_dir = "test_dir"; + test_data_str = ""; + for (int i = 0; i < FLAGS_num_test_file_lines; ++i) { + test_data_str += "test"; + } } void HDFSClientTest::TearDown() {} @@ -75,6 +83,22 @@ void HDFSClientTest::generate_test_file(const char *filename) fclose(fp); } +void HDFSClientTest::write_test_files_async() +{ + dsn::utils::filesystem::create_directory(local_test_dir); + for (int i = 0; i < 100; ++i) { + tasking::enqueue(LPC_TEST_HDFS, nullptr, [this, i]() { + // mock the writing process in hdfs_file_object::download(). + std::string test_file_name = local_test_dir + "/test_file_" + std::to_string(i); + std::ofstream out(test_file_name, std::ios::binary | std::ios::out | std::ios::trunc); + if (out.is_open()) { + out.write(test_data_str.c_str(), test_data_str.length()); + } + out.close(); + }); + } +} + TEST_F(HDFSClientTest, test_basic_operation) { if (name_node == example_name_node || backup_path == example_backup_path) { @@ -341,3 +365,20 @@ TEST_F(HDFSClientTest, test_concurrent_upload_download) utils::filesystem::remove_path(downloaded_file_names[i]); } } + +TEST_F(HDFSClientTest, test_rename_path_while_writing) +{ + write_test_files_async(); + usleep(100); + std::string rename_dir = "rename_dir." + std::to_string(dsn_now_ms()); + // rename succeed but writing failed. + ASSERT_TRUE(dsn::utils::filesystem::rename_path(local_test_dir, rename_dir)); +} + +TEST_F(HDFSClientTest, test_remove_path_while_writing) +{ + write_test_files_async(); + usleep(100); + // couldn't remove the directory while writing files in it. + ASSERT_FALSE(dsn::utils::filesystem::remove_path(local_test_dir)); +} diff --git a/src/replica/bulk_load/replica_bulk_loader.cpp b/src/replica/bulk_load/replica_bulk_loader.cpp index 2cce213740..321808e5f3 100644 --- a/src/replica/bulk_load/replica_bulk_loader.cpp +++ b/src/replica/bulk_load/replica_bulk_loader.cpp @@ -15,14 +15,15 @@ // specific language governing permissions and limitations // under the License. -#include "replica_bulk_loader.h" - #include #include #include #include #include +#include "replica_bulk_loader.h" +#include "replica/disk_cleaner.h" + namespace dsn { namespace replication { @@ -592,27 +593,33 @@ void replica_bulk_loader::handle_bulk_load_finish(bulk_load_status::type new_sta // remove local bulk load dir std::string bulk_load_dir = utils::filesystem::path_combine( _replica->_dir, bulk_load_constant::BULK_LOAD_LOCAL_ROOT_DIR); - error_code err = remove_local_bulk_load_dir(bulk_load_dir); - if (err != ERR_OK) { - tasking::enqueue( - LPC_REPLICATION_COMMON, - &_replica->_tracker, - std::bind(&replica_bulk_loader::remove_local_bulk_load_dir, this, bulk_load_dir), - get_gpid().thread_hash()); - } - + remove_local_bulk_load_dir(bulk_load_dir); clear_bulk_load_states(); } // ThreadPool: THREAD_POOL_REPLICATION -error_code replica_bulk_loader::remove_local_bulk_load_dir(const std::string &bulk_load_dir) +void replica_bulk_loader::remove_local_bulk_load_dir(const std::string &bulk_load_dir) { - if (!utils::filesystem::directory_exists(bulk_load_dir) || - !utils::filesystem::remove_path(bulk_load_dir)) { - derror_replica("remove bulk_load dir({}) failed", bulk_load_dir); - return ERR_FILE_OPERATION_FAILED; + if (!utils::filesystem::directory_exists(bulk_load_dir)) { + return; + } + // Rename bulk_load_dir to ${replica_dir}.bulk_load.timestamp.gar before remove it. + // Because we download sst files asynchronously and couldn't remove a directory while writing + // files in it. + std::string garbage_dir = fmt::format("{}.{}.{}.{}", + _replica->_dir, + bulk_load_constant::BULK_LOAD_LOCAL_ROOT_DIR, + std::to_string(dsn_now_ms()), + kFolderSuffixGar); + if (!utils::filesystem::rename_path(bulk_load_dir, garbage_dir)) { + derror_replica("rename bulk_load dir({}) failed.", bulk_load_dir); + return; + } + if (!utils::filesystem::remove_path(garbage_dir)) { + derror_replica( + "remove bulk_load gar dir({}) failed, disk cleaner would retry to remove it.", + garbage_dir); } - return ERR_OK; } // ThreadPool: THREAD_POOL_REPLICATION diff --git a/src/replica/bulk_load/replica_bulk_loader.h b/src/replica/bulk_load/replica_bulk_loader.h index 592b1cefa4..a327a1ea04 100644 --- a/src/replica/bulk_load/replica_bulk_loader.h +++ b/src/replica/bulk_load/replica_bulk_loader.h @@ -82,7 +82,7 @@ class replica_bulk_loader : replica_base void handle_bulk_load_finish(bulk_load_status::type new_status); void pause_bulk_load(); - error_code remove_local_bulk_load_dir(const std::string &bulk_load_dir); + void remove_local_bulk_load_dir(const std::string &bulk_load_dir); void cleanup_download_task(); void clear_bulk_load_states(); bool is_cleaned_up();