Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Sep 20, 2023
1 parent 0d0a8bd commit 98180d6
Showing 1 changed file with 70 additions and 65 deletions.
135 changes: 70 additions & 65 deletions src/test/function_test/bulk_load/test_bulk_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ using std::string;
class bulk_load_test : public test_util
{
protected:
bulk_load_test() : test_util(map<string, string>({{"rocksdb.allow_ingest_behind", "true"}}))
bulk_load_test()
: test_util(map<string, string>({{"rocksdb.allow_ingest_behind", "true"}})),
kBulkLoadLocalAppRoot(fmt::format("{}/{}/{}", kLocalBulkLoadRoot, kCluster, app_name_))
{
TRICKY_CODE_TO_AVOID_LINK_ERROR;
BULK_LOAD_LOCAL_APP_ROOT =
fmt::format("{}/{}/{}", LOCAL_BULK_LOAD_ROOT, CLUSTER, app_name_);
}

void SetUp() override
Expand All @@ -91,7 +91,7 @@ class bulk_load_test : public test_util
void TearDown() override
{
ASSERT_EQ(ERR_OK, ddl_client_->drop_app(app_name_, 0));
NO_FATALS(run_cmd_from_project_root("rm -rf " + LOCAL_BULK_LOAD_ROOT));
NO_FATALS(run_cmd_from_project_root("rm -rf " + kLocalBulkLoadRoot));
}

// Generate the 'bulk_load_info' file according to 'bli' to path 'bulk_load_info_path'.
Expand All @@ -114,7 +114,7 @@ class bulk_load_test : public test_util
ASSERT_EQ(ERR_OK, utils::filesystem::md5sum(bulk_load_info_path, fm.md5));
std::string value = nlohmann::json(fm).dump();
string bulk_load_info_meta_path =
fmt::format("{}/{}/{}/.bulk_load_info.meta", LOCAL_BULK_LOAD_ROOT, CLUSTER, app_name_);
fmt::format("{}/{}/{}/.bulk_load_info.meta", kLocalBulkLoadRoot, kCluster, app_name_);
auto s = rocksdb::WriteStringToFile(rocksdb::Env::Default(),
rocksdb::Slice(value),
bulk_load_info_meta_path,
Expand All @@ -128,13 +128,13 @@ class bulk_load_test : public test_util
// Prepare bulk load files.
// The source data has 8 partitions.
ASSERT_EQ(8, partition_count_);
NO_FATALS(run_cmd_from_project_root("mkdir -p " + LOCAL_BULK_LOAD_ROOT));
NO_FATALS(run_cmd_from_project_root("mkdir -p " + kLocalBulkLoadRoot));
NO_FATALS(run_cmd_from_project_root(
fmt::format("cp -r {}/{} {}", SOURCE_FILES_ROOT, BULK_LOAD, LOCAL_SERVICE_ROOT)));
fmt::format("cp -r {}/{} {}", kSourceFilesRoot, kBulkLoad, kLocalServiceRoot)));

// Generate 'bulk_load_info'.
string bulk_load_info_path =
fmt::format("{}/{}/{}/bulk_load_info", LOCAL_BULK_LOAD_ROOT, CLUSTER, app_name_);
fmt::format("{}/{}/{}/bulk_load_info", kLocalBulkLoadRoot, kCluster, app_name_);
NO_FATALS(generate_bulk_load_info(bulk_load_info(app_id_, app_name_, partition_count_),
bulk_load_info_path));

Expand All @@ -144,7 +144,8 @@ class bulk_load_test : public test_util

error_code start_bulk_load(bool ingest_behind = false)
{
return ddl_client_->start_bulk_load(app_name_, CLUSTER, PROVIDER, BULK_LOAD, ingest_behind)
return ddl_client_
->start_bulk_load(app_name_, kCluster, kProvider, kBulkLoad, ingest_behind)
.get_value()
.err;
}
Expand Down Expand Up @@ -187,51 +188,51 @@ class bulk_load_test : public test_util

void verify_bulk_load_data()
{
NO_FATALS(verify_data("hashkey", "sortkey"));
NO_FATALS(verify_data(HASHKEY_PREFIX, SORTKEY_PREFIX));
NO_FATALS(verify_data(kBulkLoadHashKeyPrefix1, kBulkLoadSortKeyPrefix1));
NO_FATALS(verify_data(kBulkLoadHashKeyPrefix2, kBulkLoadSortKeyPrefix2));
}

void verify_data(const string &hashkey_prefix, const string &sortkey_prefix)
{
for (int i = 0; i < COUNT; ++i) {
for (int i = 0; i < kBulkLoadItemCount; ++i) {
string hash_key = hashkey_prefix + std::to_string(i);
for (int j = 0; j < COUNT; ++j) {
for (int j = 0; j < kBulkLoadItemCount; ++j) {
string sort_key = sortkey_prefix + std::to_string(j);
string act_value;
ASSERT_EQ(PERR_OK, client_->get(hash_key, sort_key, act_value)) << hash_key << ","
<< sort_key;
ASSERT_EQ(VALUE, act_value) << hash_key << "," << sort_key;
string actual_value;
ASSERT_EQ(PERR_OK, client_->get(hash_key, sort_key, actual_value))
<< hash_key << "," << sort_key;
ASSERT_EQ(kBulkLoadValue, actual_value) << hash_key << "," << sort_key;
}
}
}

enum operation
enum class operation
{
GET,
SET,
DEL,
NO_VALUE
};
void operate_data(bulk_load_test::operation op, const string &value, int count)
void operate_data(operation op, const string &value, int count)
{
for (int i = 0; i < count; ++i) {
string hash_key = HASHKEY_PREFIX + std::to_string(i);
string sort_key = SORTKEY_PREFIX + std::to_string(i);
string hash_key = fmt::format("{}{}", kBulkLoadHashKeyPrefix2, i);
string sort_key = fmt::format("{}{}", kBulkLoadSortKeyPrefix2, i);
switch (op) {
case bulk_load_test::operation::GET: {
string act_value;
ASSERT_EQ(PERR_OK, client_->get(hash_key, sort_key, act_value));
ASSERT_EQ(value, act_value);
case operation::GET: {
string actual_value;
ASSERT_EQ(PERR_OK, client_->get(hash_key, sort_key, actual_value));
ASSERT_EQ(value, actual_value);
} break;
case bulk_load_test::operation::DEL: {
case operation::DEL: {
ASSERT_EQ(PERR_OK, client_->del(hash_key, sort_key));
} break;
case bulk_load_test::operation::SET: {
case operation::SET: {
ASSERT_EQ(PERR_OK, client_->set(hash_key, sort_key, value));
} break;
case bulk_load_test::operation::NO_VALUE: {
string act_value;
ASSERT_EQ(PERR_NOT_FOUND, client_->get(hash_key, sort_key, act_value));
case operation::NO_VALUE: {
string actual_value;
ASSERT_EQ(PERR_NOT_FOUND, client_->get(hash_key, sort_key, actual_value));
} break;
default:
ASSERT_TRUE(false);
Expand All @@ -244,56 +245,63 @@ class bulk_load_test : public test_util
const std::string &value_before_bulk_load,
const std::string &value_after_bulk_load)
{
// write old data
// Write some data before bulk load.
NO_FATALS(operate_data(operation::SET, value_before_bulk_load, 10));
NO_FATALS(operate_data(operation::GET, value_before_bulk_load, 10));

// Start bulk load and wait until it complete.
ASSERT_EQ(ERR_OK, start_bulk_load(ingest_behind));
ASSERT_EQ(bulk_load_status::BLS_SUCCEED, wait_bulk_load_finish(300));
if (!ingest_behind) {
std::cout << "Start to verify data..." << std::endl;
NO_FATALS(verify_bulk_load_data());
}
// values have been overwritten by bulk_load_data
const std::string &expect_value = ingest_behind ? value_before_bulk_load : VALUE;
NO_FATALS(operate_data(operation::GET, VALUE, 10));

std::cout << "Start to verify data..." << std::endl;
if (ingest_behind) {
NO_FATALS(verify_data("hashkey", "sortkey"));
// Values have NOT been overwritten by the bulk load data.
NO_FATALS(operate_data(operation::GET, value_before_bulk_load, 10));
NO_FATALS(verify_data(kBulkLoadHashKeyPrefix1, kBulkLoadSortKeyPrefix1));
} else {
// Values have been overwritten by the bulk load data.
NO_FATALS(operate_data(operation::GET, kBulkLoadValue, 10));
NO_FATALS(verify_bulk_load_data());
}

// write new data succeed after bulk load
// Write new data succeed after bulk load.
NO_FATALS(operate_data(operation::SET, value_after_bulk_load, 20));
NO_FATALS(operate_data(operation::GET, value_after_bulk_load, 20));

// delete data succeed after bulk load
// Delete data succeed after bulk load.
NO_FATALS(operate_data(operation::DEL, "", 15));
NO_FATALS(operate_data(operation::NO_VALUE, "", 15));
}

protected:
string BULK_LOAD_LOCAL_APP_ROOT;
const string SOURCE_FILES_ROOT =
"src/test/function_test/bulk_load_test/pegasus-bulk-load-function-test-files";
const string LOCAL_SERVICE_ROOT = "onebox/block_service/local_service";
const string LOCAL_BULK_LOAD_ROOT = "onebox/block_service/local_service/bulk_load_root";
const string BULK_LOAD = "bulk_load_root";
const string CLUSTER = "cluster";
const string PROVIDER = "local_service";
const string HASHKEY_PREFIX = "hash";
const string SORTKEY_PREFIX = "sort";
const string VALUE = "newValue";
const int32_t COUNT = 100;
string kBulkLoadLocalAppRoot;
const string kSourceFilesRoot =
"src/test/function_test/bulk_load/pegasus-bulk-load-function-test-files";
const string kLocalServiceRoot = "onebox/block_service/local_service";
const string kLocalBulkLoadRoot = "onebox/block_service/local_service/bulk_load_root";
const string kBulkLoad = "bulk_load_root";
const string kCluster = "cluster";
const string kProvider = "local_service";

const int32_t kBulkLoadItemCount = 1000;
const string kBulkLoadHashKeyPrefix1 = "hashkey";
const string kBulkLoadSortKeyPrefix1 = "sortkey";
const string kBulkLoadValue = "newValue";

// Real time write operations will use this prefix.
const string kBulkLoadHashKeyPrefix2 = "hash";
const string kBulkLoadSortKeyPrefix2 = "sort";
};

// Test bulk load failed because the 'bulk_load_info' file is missing
TEST_F(bulk_load_test, bulk_load_test_missing_bulk_load_info)
TEST_F(bulk_load_test, missing_bulk_load_info)
{
NO_FATALS(remove_file(BULK_LOAD_LOCAL_APP_ROOT + "/bulk_load_info"));
NO_FATALS(remove_file(kBulkLoadLocalAppRoot + "/bulk_load_info"));
ASSERT_EQ(ERR_OBJECT_NOT_FOUND, start_bulk_load());
}

// Test bulk load failed because the 'bulk_load_info' file is inconsistent with the actual app info.
TEST_F(bulk_load_test, bulk_load_test_inconsistent_bulk_load_info)
TEST_F(bulk_load_test, inconsistent_bulk_load_info)
{
// Only 'app_id' and 'partition_count' will be checked in Pegasus server, so just inject these
// kind of inconsistencies.
Expand All @@ -302,7 +310,7 @@ TEST_F(bulk_load_test, bulk_load_test_inconsistent_bulk_load_info)
for (const auto &test : tests) {
// Generate inconsistent 'bulk_load_info'.
string bulk_load_info_path =
fmt::format("{}/{}/{}/bulk_load_info", LOCAL_BULK_LOAD_ROOT, CLUSTER, app_name_);
fmt::format("{}/{}/{}/bulk_load_info", kLocalBulkLoadRoot, kCluster, app_name_);
NO_FATALS(generate_bulk_load_info(test, bulk_load_info_path));

// Generate '.bulk_load_info.meta'.
Expand All @@ -314,9 +322,9 @@ TEST_F(bulk_load_test, bulk_load_test_inconsistent_bulk_load_info)
}

// Test bulk load failed because partition[0]'s 'bulk_load_metadata' file is missing.
TEST_F(bulk_load_test, bulk_load_test_missing_p0_bulk_load_metadata)
TEST_F(bulk_load_test, missing_p0_bulk_load_metadata)
{
NO_FATALS(remove_file(BULK_LOAD_LOCAL_APP_ROOT + "/0/bulk_load_metadata"));
NO_FATALS(remove_file(kBulkLoadLocalAppRoot + "/0/bulk_load_metadata"));
ASSERT_EQ(ERR_OK, start_bulk_load());
ASSERT_EQ(bulk_load_status::BLS_FAILED, wait_bulk_load_finish(300));
}
Expand All @@ -329,15 +337,12 @@ TEST_F(bulk_load_test, allow_ingest_behind_inconsistent)
}

// Test normal bulk load, old data will be overwritten by bulk load data.
TEST_F(bulk_load_test, bulk_load_tests)
{
check_bulk_load(false, "oldValue", "valueAfterBulkLoad");
}
TEST_F(bulk_load_test, normal) { check_bulk_load(false, "oldValue", "valueAfterBulkLoad"); }

// Test normal bulk load with allow_ingest_behind=true, old data will NOT be overwritten by bulk
// load data.
TEST_F(bulk_load_test, bulk_load_ingest_behind_tests)
TEST_F(bulk_load_test, allow_ingest_behind)
{
NO_FATALS(update_allow_ingest_behind("true"));
check_bulk_load(false, "oldValue", "valueAfterBulkLoad");
check_bulk_load(true, "oldValue", "valueAfterBulkLoad");
}

0 comments on commit 98180d6

Please sign in to comment.