Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Sep 21, 2023
1 parent f1ef432 commit 73af6d0
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 133 deletions.
1 change: 1 addition & 0 deletions src/block_service/test/block_service_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class block_service_manager_test : public pegasus::encrypt_data_test_base
std::string FILE_NAME = "test_file";
};

// TODO(yingchun): ENCRYPTION: add enable encryption test.
INSTANTIATE_TEST_CASE_P(, block_service_manager_test, ::testing::Values(false));

TEST_P(block_service_manager_test, remote_file_not_exist)
Expand Down
94 changes: 49 additions & 45 deletions src/replica/bulk_load/test/replica_bulk_loader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@

#include "replica/bulk_load/replica_bulk_loader.h"

// IWYU pragma: no_include <gtest/gtest-param-test.h>
// IWYU pragma: no_include <gtest/gtest-message.h>
// IWYU pragma: no_include <gtest/gtest-test-part.h>
#include <gtest/gtest.h>
#include <rocksdb/env.h>
#include <rocksdb/slice.h>
#include <rocksdb/status.h>
#include <fstream> // IWYU pragma: keep
#include <memory>
#include <vector>
Expand All @@ -33,6 +37,7 @@
#include "runtime/rpc/rpc_address.h"
#include "runtime/task/task_tracker.h"
#include "utils/blob.h"
#include "utils/env.h"
#include "utils/fail_point.h"
#include "utils/filesystem.h"
#include "utils/fmt_logging.h"
Expand Down Expand Up @@ -250,15 +255,16 @@ class replica_bulk_loader_test : public replica_test_base
void create_local_file(const std::string &file_name)
{
std::string whole_name = utils::filesystem::path_combine(LOCAL_DIR, file_name);
utils::filesystem::create_file(whole_name);
std::ofstream test_file;
test_file.open(whole_name);
test_file << "write some data.\n";
test_file.close();

auto s =
rocksdb::WriteStringToFile(rocksdb::Env::Default(),
rocksdb::Slice("write some data.\n"),
whole_name,
/* should_sync */ true);
ASSERT_TRUE(s.ok()) << s.ToString();
_file_meta.name = whole_name;
utils::filesystem::md5sum(whole_name, _file_meta.md5);
utils::filesystem::file_size(whole_name, _file_meta.size);
utils::filesystem::file_size(
whole_name, dsn::utils::FileDataType::kSensitive, _file_meta.size);
}

error_code create_local_metadata_file()
Expand All @@ -268,21 +274,16 @@ class replica_bulk_loader_test : public replica_test_base
_metadata.file_total_size = _file_meta.size;

std::string whole_name = utils::filesystem::path_combine(LOCAL_DIR, METADATA);
utils::filesystem::create_file(whole_name);
std::ofstream os(whole_name.c_str(),
(std::ofstream::out | std::ios::binary | std::ofstream::trunc));
if (!os.is_open()) {
LOG_ERROR("open file {} failed", whole_name);
return ERR_FILE_OPERATION_FAILED;
}

blob bb = json::json_forwarder<bulk_load_metadata>::encode(_metadata);
os.write((const char *)bb.data(), (std::streamsize)bb.length());
if (os.bad()) {
auto s =
rocksdb::WriteStringToFile(rocksdb::Env::Default(),
rocksdb::Slice(bb.data(), bb.length()),
whole_name,
/* should_sync */ true);
if (!s.ok()) {
LOG_ERROR("write file {} failed", whole_name);
return ERR_FILE_OPERATION_FAILED;
}
os.close();

return ERR_OK;
}
Expand Down Expand Up @@ -451,22 +452,25 @@ class replica_bulk_loader_test : public replica_test_base
std::string FILE_NAME = "test_sst_file";
};

// TODO(yingchun): ENCRYPTION: add enable encryption test.
INSTANTIATE_TEST_CASE_P(, replica_bulk_loader_test, ::testing::Values(false));

// on_bulk_load unit tests
TEST_F(replica_bulk_loader_test, on_bulk_load_not_primary)
TEST_P(replica_bulk_loader_test, on_bulk_load_not_primary)
{
create_bulk_load_request(bulk_load_status::BLS_DOWNLOADING);
ASSERT_EQ(test_on_bulk_load(), ERR_INVALID_STATE);
}

TEST_F(replica_bulk_loader_test, on_bulk_load_ballot_change)
TEST_P(replica_bulk_loader_test, on_bulk_load_ballot_change)
{
create_bulk_load_request(bulk_load_status::BLS_DOWNLOADING, BALLOT + 1);
mock_primary_states();
ASSERT_EQ(test_on_bulk_load(), ERR_INVALID_STATE);
}

// on_group_bulk_load unit tests
TEST_F(replica_bulk_loader_test, on_group_bulk_load_test)
TEST_P(replica_bulk_loader_test, on_group_bulk_load_test)
{
struct test_struct
{
Expand All @@ -493,7 +497,7 @@ TEST_F(replica_bulk_loader_test, on_group_bulk_load_test)
}

// start_downloading unit tests
TEST_F(replica_bulk_loader_test, start_downloading_test)
TEST_P(replica_bulk_loader_test, start_downloading_test)
{
// Test cases:
// - stub concurrent downloading count excceed
Expand All @@ -520,7 +524,7 @@ TEST_F(replica_bulk_loader_test, start_downloading_test)
}

// start_downloading unit tests
TEST_F(replica_bulk_loader_test, rollback_to_downloading_test)
TEST_P(replica_bulk_loader_test, rollback_to_downloading_test)
{
fail::cfg("replica_bulk_loader_download_files", "return()");
struct test_struct
Expand All @@ -540,23 +544,23 @@ TEST_F(replica_bulk_loader_test, rollback_to_downloading_test)
}

// parse_bulk_load_metadata unit tests
TEST_F(replica_bulk_loader_test, bulk_load_metadata_not_exist)
TEST_P(replica_bulk_loader_test, bulk_load_metadata_not_exist)
{
ASSERT_EQ(test_parse_bulk_load_metadata("path_not_exist"), ERR_FILE_OPERATION_FAILED);
}

TEST_F(replica_bulk_loader_test, bulk_load_metadata_corrupt)
TEST_P(replica_bulk_loader_test, bulk_load_metadata_corrupt)
{
// create file can not parse as bulk_load_metadata structure
utils::filesystem::create_directory(LOCAL_DIR);
create_local_file(METADATA);
std::string metadata_file_name = utils::filesystem::path_combine(LOCAL_DIR, METADATA);
error_code ec = test_parse_bulk_load_metadata(metadata_file_name);
ASSERT_EQ(ec, ERR_CORRUPTION);
ASSERT_EQ(ERR_CORRUPTION, ec);
utils::filesystem::remove_path(LOCAL_DIR);
}

TEST_F(replica_bulk_loader_test, bulk_load_metadata_parse_succeed)
TEST_P(replica_bulk_loader_test, bulk_load_metadata_parse_succeed)
{
utils::filesystem::create_directory(LOCAL_DIR);
error_code ec = create_local_metadata_file();
Expand All @@ -570,7 +574,7 @@ TEST_F(replica_bulk_loader_test, bulk_load_metadata_parse_succeed)
}

// finish download test
TEST_F(replica_bulk_loader_test, finish_download_test)
TEST_P(replica_bulk_loader_test, finish_download_test)
{
mock_downloading_progress(100, 50, 50);
stub->set_bulk_load_downloading_count(3);
Expand All @@ -581,15 +585,15 @@ TEST_F(replica_bulk_loader_test, finish_download_test)
}

// start ingestion test
TEST_F(replica_bulk_loader_test, start_ingestion_test)
TEST_P(replica_bulk_loader_test, start_ingestion_test)
{
mock_group_progress(bulk_load_status::BLS_DOWNLOADED);
test_start_ingestion();
ASSERT_EQ(get_bulk_load_status(), bulk_load_status::BLS_INGESTING);
}

// handle_bulk_load_finish unit tests
TEST_F(replica_bulk_loader_test, bulk_load_finish_test)
TEST_P(replica_bulk_loader_test, bulk_load_finish_test)
{
// Test cases
// - bulk load succeed
Expand Down Expand Up @@ -672,7 +676,7 @@ TEST_F(replica_bulk_loader_test, bulk_load_finish_test)
}

// pause_bulk_load unit tests
TEST_F(replica_bulk_loader_test, pause_bulk_load_test)
TEST_P(replica_bulk_loader_test, pause_bulk_load_test)
{
const int32_t stub_downloading_count = 3;
// Test cases:
Expand Down Expand Up @@ -703,7 +707,7 @@ TEST_F(replica_bulk_loader_test, pause_bulk_load_test)
}

// report_group_download_progress unit tests
TEST_F(replica_bulk_loader_test, report_group_download_progress_test)
TEST_P(replica_bulk_loader_test, report_group_download_progress_test)
{
struct test_struct
{
Expand All @@ -728,7 +732,7 @@ TEST_F(replica_bulk_loader_test, report_group_download_progress_test)
}

// report_group_ingestion_status unit tests
TEST_F(replica_bulk_loader_test, report_group_ingestion_status_test)
TEST_P(replica_bulk_loader_test, report_group_ingestion_status_test)
{

struct ingestion_struct
Expand Down Expand Up @@ -802,27 +806,27 @@ TEST_F(replica_bulk_loader_test, report_group_ingestion_status_test)
}

// report_group_context_clean_flag unit tests
TEST_F(replica_bulk_loader_test, report_group_cleanup_flag_in_unhealthy_state)
TEST_P(replica_bulk_loader_test, report_group_cleanup_flag_in_unhealthy_state)
{
// _primary_states.membership.secondaries is empty
mock_replica_config(partition_status::PS_PRIMARY);
ASSERT_FALSE(test_report_group_cleaned_up());
}

TEST_F(replica_bulk_loader_test, report_group_cleanup_flag_not_cleaned_up)
TEST_P(replica_bulk_loader_test, report_group_cleanup_flag_not_cleaned_up)
{
mock_group_cleanup_flag(bulk_load_status::BLS_SUCCEED, true, false);
ASSERT_FALSE(test_report_group_cleaned_up());
}

TEST_F(replica_bulk_loader_test, report_group_cleanup_flag_all_cleaned_up)
TEST_P(replica_bulk_loader_test, report_group_cleanup_flag_all_cleaned_up)
{
mock_group_cleanup_flag(bulk_load_status::BLS_INVALID, true, true);
ASSERT_TRUE(test_report_group_cleaned_up());
}

// report_group_is_paused unit tests
TEST_F(replica_bulk_loader_test, report_group_is_paused_test)
TEST_P(replica_bulk_loader_test, report_group_is_paused_test)
{
struct test_struct
{
Expand All @@ -836,60 +840,60 @@ TEST_F(replica_bulk_loader_test, report_group_is_paused_test)
}

// on_group_bulk_load_reply unit tests
TEST_F(replica_bulk_loader_test, on_group_bulk_load_reply_downloading_error)
TEST_P(replica_bulk_loader_test, on_group_bulk_load_reply_downloading_error)
{
mock_group_progress(bulk_load_status::BLS_DOWNLOADING, 30, 30, 60);
test_on_group_bulk_load_reply(bulk_load_status::BLS_DOWNLOADING, BALLOT, ERR_BUSY);
ASSERT_TRUE(is_secondary_bulk_load_state_reset());
}

TEST_F(replica_bulk_loader_test, on_group_bulk_load_reply_downloaded_error)
TEST_P(replica_bulk_loader_test, on_group_bulk_load_reply_downloaded_error)
{
mock_group_progress(bulk_load_status::BLS_DOWNLOADED);
test_on_group_bulk_load_reply(bulk_load_status::BLS_DOWNLOADED, BALLOT, ERR_INVALID_STATE);
ASSERT_TRUE(is_secondary_bulk_load_state_reset());
}

TEST_F(replica_bulk_loader_test, on_group_bulk_load_reply_ingestion_error)
TEST_P(replica_bulk_loader_test, on_group_bulk_load_reply_ingestion_error)
{
mock_group_ingestion_states(ingestion_status::IS_RUNNING, ingestion_status::IS_SUCCEED);
test_on_group_bulk_load_reply(
bulk_load_status::BLS_INGESTING, BALLOT - 1, ERR_OK, ERR_INVALID_STATE);
ASSERT_TRUE(is_secondary_bulk_load_state_reset());
}

TEST_F(replica_bulk_loader_test, on_group_bulk_load_reply_succeed_error)
TEST_P(replica_bulk_loader_test, on_group_bulk_load_reply_succeed_error)
{
mock_group_cleanup_flag(bulk_load_status::BLS_SUCCEED);
test_on_group_bulk_load_reply(
bulk_load_status::BLS_SUCCEED, BALLOT - 1, ERR_OK, ERR_INVALID_STATE);
ASSERT_TRUE(is_secondary_bulk_load_state_reset());
}

TEST_F(replica_bulk_loader_test, on_group_bulk_load_reply_failed_error)
TEST_P(replica_bulk_loader_test, on_group_bulk_load_reply_failed_error)
{
mock_group_ingestion_states(ingestion_status::IS_RUNNING, ingestion_status::IS_SUCCEED);
test_on_group_bulk_load_reply(bulk_load_status::BLS_FAILED, BALLOT, ERR_OK, ERR_TIMEOUT);
ASSERT_TRUE(is_secondary_bulk_load_state_reset());
}

TEST_F(replica_bulk_loader_test, on_group_bulk_load_reply_pausing_error)
TEST_P(replica_bulk_loader_test, on_group_bulk_load_reply_pausing_error)
{
mock_group_progress(bulk_load_status::BLS_PAUSED, 100, 50, 10);
test_on_group_bulk_load_reply(
bulk_load_status::BLS_PAUSING, BALLOT, ERR_OK, ERR_NETWORK_FAILURE);
ASSERT_TRUE(is_secondary_bulk_load_state_reset());
}

TEST_F(replica_bulk_loader_test, on_group_bulk_load_reply_rpc_error)
TEST_P(replica_bulk_loader_test, on_group_bulk_load_reply_rpc_error)
{
mock_group_cleanup_flag(bulk_load_status::BLS_INVALID, true, false);
test_on_group_bulk_load_reply(bulk_load_status::BLS_CANCELED, BALLOT, ERR_OBJECT_NOT_FOUND);
ASSERT_TRUE(is_secondary_bulk_load_state_reset());
}

// validate_status unit test
TEST_F(replica_bulk_loader_test, validate_status_test)
TEST_P(replica_bulk_loader_test, validate_status_test)
{
struct validate_struct
{
Expand Down
Loading

0 comments on commit 73af6d0

Please sign in to comment.