Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

fix(bulk_load): fix remove_local_bulk_load_dir() #823

Merged
merged 6 commits into from
Apr 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/block_service/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ set(MY_PROJ_LIBS
set(MY_BINPLACES
config-test.ini
run.sh
clear.sh
)

dsn_add_test()
2 changes: 1 addition & 1 deletion src/block_service/test/clear.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@
# under the License.
##############################################################################

rm -rf log.* *.log data dsn_block_service_test.xml randomfile*
rm -rf log.* *.log data dsn_block_service_test.xml randomfile* rename_dir* test_dir
41 changes: 41 additions & 0 deletions src/block_service/test/hdfs_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,22 @@ class HDFSClientTest : public testing::Test
virtual void SetUp() override;
virtual void TearDown() override;
void generate_test_file(const char *filename);
void write_test_files_async();
std::string name_node;
std::string backup_path;
std::string local_test_dir;
std::string test_data_str;
};

void HDFSClientTest::SetUp()
{
name_node = FLAGS_test_name_node;
backup_path = FLAGS_test_backup_path;
local_test_dir = "test_dir";
test_data_str = "";
for (int i = 0; i < FLAGS_num_test_file_lines; ++i) {
test_data_str += "test";
}
}

void HDFSClientTest::TearDown() {}
Expand All @@ -75,6 +83,22 @@ void HDFSClientTest::generate_test_file(const char *filename)
fclose(fp);
}

void HDFSClientTest::write_test_files_async()
{
dsn::utils::filesystem::create_directory(local_test_dir);
for (int i = 0; i < 100; ++i) {
tasking::enqueue(LPC_TEST_HDFS, nullptr, [this, i]() {
// mock the writing process in hdfs_file_object::download().
std::string test_file_name = local_test_dir + "/test_file_" + std::to_string(i);
std::ofstream out(test_file_name, std::ios::binary | std::ios::out | std::ios::trunc);
if (out.is_open()) {
out.write(test_data_str.c_str(), test_data_str.length());
}
out.close();
});
}
}

TEST_F(HDFSClientTest, test_basic_operation)
{
if (name_node == example_name_node || backup_path == example_backup_path) {
Expand Down Expand Up @@ -341,3 +365,20 @@ TEST_F(HDFSClientTest, test_concurrent_upload_download)
utils::filesystem::remove_path(downloaded_file_names[i]);
}
}

TEST_F(HDFSClientTest, test_rename_path_while_writing)
{
write_test_files_async();
usleep(100);
std::string rename_dir = "rename_dir." + std::to_string(dsn_now_ms());
// rename succeed but writing failed.
ASSERT_TRUE(dsn::utils::filesystem::rename_path(local_test_dir, rename_dir));
}

TEST_F(HDFSClientTest, test_remove_path_while_writing)
{
write_test_files_async();
usleep(100);
// couldn't remove the directory while writing files in it.
ASSERT_FALSE(dsn::utils::filesystem::remove_path(local_test_dir));
}
41 changes: 24 additions & 17 deletions src/replica/bulk_load/replica_bulk_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
// specific language governing permissions and limitations
// under the License.

#include "replica_bulk_loader.h"

#include <dsn/dist/block_service.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/dist/replication/replication_app_base.h>
#include <dsn/utility/fail_point.h>
#include <dsn/utility/filesystem.h>

#include "replica_bulk_loader.h"
#include "replica/disk_cleaner.h"

namespace dsn {
namespace replication {

Expand Down Expand Up @@ -592,27 +593,33 @@ void replica_bulk_loader::handle_bulk_load_finish(bulk_load_status::type new_sta
// remove local bulk load dir
std::string bulk_load_dir = utils::filesystem::path_combine(
_replica->_dir, bulk_load_constant::BULK_LOAD_LOCAL_ROOT_DIR);
error_code err = remove_local_bulk_load_dir(bulk_load_dir);
if (err != ERR_OK) {
tasking::enqueue(
LPC_REPLICATION_COMMON,
&_replica->_tracker,
std::bind(&replica_bulk_loader::remove_local_bulk_load_dir, this, bulk_load_dir),
get_gpid().thread_hash());
}

remove_local_bulk_load_dir(bulk_load_dir);
clear_bulk_load_states();
}

// ThreadPool: THREAD_POOL_REPLICATION
error_code replica_bulk_loader::remove_local_bulk_load_dir(const std::string &bulk_load_dir)
void replica_bulk_loader::remove_local_bulk_load_dir(const std::string &bulk_load_dir)
{
if (!utils::filesystem::directory_exists(bulk_load_dir) ||
!utils::filesystem::remove_path(bulk_load_dir)) {
derror_replica("remove bulk_load dir({}) failed", bulk_load_dir);
return ERR_FILE_OPERATION_FAILED;
if (!utils::filesystem::directory_exists(bulk_load_dir)) {
return;
}
// Rename bulk_load_dir to ${replica_dir}.bulk_load.timestamp.gar before remove it.
// Because we download sst files asynchronously and couldn't remove a directory while writing
// files in it.
std::string garbage_dir = fmt::format("{}.{}.{}.{}",
_replica->_dir,
bulk_load_constant::BULK_LOAD_LOCAL_ROOT_DIR,
std::to_string(dsn_now_ms()),
kFolderSuffixGar);
if (!utils::filesystem::rename_path(bulk_load_dir, garbage_dir)) {
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
derror_replica("rename bulk_load dir({}) failed.", bulk_load_dir);
return;
}
if (!utils::filesystem::remove_path(garbage_dir)) {
derror_replica(
"remove bulk_load gar dir({}) failed, disk cleaner would retry to remove it.",
garbage_dir);
}
return ERR_OK;
}

// ThreadPool: THREAD_POOL_REPLICATION
Expand Down
2 changes: 1 addition & 1 deletion src/replica/bulk_load/replica_bulk_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class replica_bulk_loader : replica_base
void handle_bulk_load_finish(bulk_load_status::type new_status);
void pause_bulk_load();

error_code remove_local_bulk_load_dir(const std::string &bulk_load_dir);
void remove_local_bulk_load_dir(const std::string &bulk_load_dir);
void cleanup_download_task();
void clear_bulk_load_states();
bool is_cleaned_up();
Expand Down