Skip to content

Commit

Permalink
refactor(function_test): Generate bulk load files internally
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Dec 13, 2023
1 parent 5ea2a8d commit 5a30b25
Show file tree
Hide file tree
Showing 15 changed files with 378 additions and 235 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ scripts/py_utils/*.pyc
cmake-build-debug
packages

src/test/function_test/bulk_load/pegasus-bulk-load-function-test-files/
config-shell.ini.*
*.tar.gz
pegasus-server*
Expand Down
9 changes: 0 additions & 9 deletions run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -444,15 +444,6 @@ function run_test()
fi
echo "test_modules=$test_modules"

# download bulk load test data
if [[ "$test_modules" =~ "bulk_load_test" && ! -d "$ROOT/src/test/function_test/bulk_load/pegasus-bulk-load-function-test-files" ]]; then
echo "Start to download files used for bulk load function test"
wget "https://github.com/XiaoMi/pegasus-common/releases/download/deps/pegasus-bulk-load-function-test-files.zip"
unzip "pegasus-bulk-load-function-test-files.zip" -d "$ROOT/src/test/function_test/bulk_load"
rm "pegasus-bulk-load-function-test-files.zip"
echo "Prepare files used for bulk load function test succeed"
fi

for module in `echo $test_modules | sed 's/,/ /g'`; do
echo "====================== run $module =========================="
# restart onebox when test pegasus
Expand Down
65 changes: 36 additions & 29 deletions src/block_service/local/local_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,44 +264,51 @@ uint64_t local_file_object::get_size() { return _size; }

error_code local_file_object::load_metadata()
{
if (_has_meta_synced)
if (_has_meta_synced) {
return ERR_OK;

std::string metadata_path = local_service::get_metafile(file_name());
std::string data;
auto s = rocksdb::ReadFileToString(
dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive), metadata_path, &data);
if (!s.ok()) {
LOG_WARNING("read file '{}' failed, err = {}", metadata_path, s.ToString());
return ERR_FS_INTERNAL;
}

file_metadata meta;
bool ans = file_metadata_from_json(data, meta);
if (!ans) {
LOG_WARNING("decode metadata '{}' file content failed", metadata_path);
file_metadata fmd;
std::string filepath = local_service::get_metafile(file_name());
auto ec = fmd.load_from_file(filepath);
if (ec != ERR_OK) {
LOG_WARNING("load metadata file '{}' failed", filepath);
return ERR_FS_INTERNAL;
}
_size = meta.size;
_md5_value = meta.md5;
_size = fmd.size;
_md5_value = fmd.md5;
_has_meta_synced = true;
return ERR_OK;
}

error_code local_file_object::store_metadata()
error_code file_metadata::write_to_file(std::string filepath) const
{
file_metadata meta;
meta.md5 = _md5_value;
meta.size = _size;
std::string data = nlohmann::json(meta).dump();
std::string metadata_path = local_service::get_metafile(file_name());
std::string data = nlohmann::json(*this).dump();
auto s =
rocksdb::WriteStringToFile(dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive),
rocksdb::Slice(data),
metadata_path,
filepath,
/* should_sync */ true);
if (!s.ok()) {
LOG_WARNING("store to metadata file {} failed, err={}", metadata_path, s.ToString());
LOG_WARNING("write to metadata file '{}' failed, err={}", filepath, s.ToString());
return ERR_FS_INTERNAL;
}

return ERR_OK;
}

error_code file_metadata::load_from_file(std::string filepath)
{
std::string data;
auto s = rocksdb::ReadFileToString(
dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive), filepath, &data);
if (!s.ok()) {
LOG_WARNING("load from metadata file '{}' failed, err = {}", filepath, s.ToString());
return ERR_FS_INTERNAL;
}

if (!file_metadata_from_json(data, *this)) {
LOG_WARNING("decode metadata file '{}' failed", filepath);
return ERR_FS_INTERNAL;
}

Expand Down Expand Up @@ -359,11 +366,10 @@ dsn::task_ptr local_file_object::write(const write_request &req,
// a lot, but it is somewhat not correct.
_size = resp.written_size;
_md5_value = utils::string_md5(req.buffer.data(), req.buffer.length());
// TODO(yingchun): make store_metadata as a local function, do not depend on the
// member variables (i.e. _size and _md5_value).
auto err = store_metadata();
auto err = file_metadata(_size, _md5_value)
.write_to_file(local_service::get_metafile(file_name()));
if (err != ERR_OK) {
LOG_WARNING("store_metadata failed");
LOG_ERROR("file_metadata write failed");
resp.err = ERR_FS_INTERNAL;
break;
}
Expand Down Expand Up @@ -502,9 +508,10 @@ dsn::task_ptr local_file_object::upload(const upload_request &req,
break;
}

auto err = store_metadata();
auto err = file_metadata(_size, _md5_value)
.write_to_file(local_service::get_metafile(file_name()));
if (err != ERR_OK) {
LOG_ERROR("store_metadata failed");
LOG_ERROR("file_metadata write failed");
resp.err = ERR_FS_INTERNAL;
break;
}
Expand Down
5 changes: 4 additions & 1 deletion src/block_service/local/local_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ struct file_metadata
{
int64_t size = 0;
std::string md5;

file_metadata(int64_t s = 0, std::string m = "") : size(s), md5(std::move(m)) {}
error_code write_to_file(std::string filepath) const;
error_code load_from_file(std::string filepath);
};
NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(file_metadata, size, md5)

Expand Down Expand Up @@ -102,7 +106,6 @@ class local_file_object : public block_file
dsn::task_tracker *tracker = nullptr) override;

error_code load_metadata();
error_code store_metadata();

private:
std::string compute_md5();
Expand Down
6 changes: 3 additions & 3 deletions src/block_service/test/local_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ INSTANTIATE_TEST_CASE_P(, local_service_test, ::testing::Values(false, true));
TEST_P(local_service_test, store_metadata)
{
local_file_object file("a.txt");
error_code ec = file.store_metadata();
ASSERT_EQ(ERR_OK, ec);

auto meta_file_path = local_service::get_metafile(file.file_name());

ASSERT_EQ(ERR_OK, file_metadata(0, "").write_to_file(meta_file_path));

ASSERT_TRUE(boost::filesystem::exists(meta_file_path));

std::string data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class backup_restore_test : public test_util
{
NO_FATALS(wait_table_healthy(table_name_));
NO_FATALS(write_data(s_num_of_rows));
NO_FATALS(verify_data(table_name_, s_num_of_rows));
NO_FATALS(verify_data(s_num_of_rows));

auto resp =
ddl_client_->backup_app(table_id_, s_provider_type, user_specified_path).get_value();
Expand All @@ -70,7 +70,7 @@ class backup_restore_test : public test_util
NO_FATALS(wait_backup_complete(backup_id));
ASSERT_EQ(ERR_OK,
ddl_client_->do_restore(s_provider_type,
cluster_name_,
kClusterName_,
/* policy_name */ "",
backup_id,
table_name_,
Expand Down
6 changes: 3 additions & 3 deletions src/test/function_test/base_api/integration_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ TEST_F(integration_test, write_corrupt_db)
// cause timeout.
// Force to fetch the latest route table.
client_ =
pegasus_client_factory::get_client(cluster_name_.c_str(), table_name_.c_str());
pegasus_client_factory::get_client(kClusterName_.c_str(), table_name_.c_str());
ASSERT_TRUE(client_ != nullptr);
} else {
ASSERT_TRUE(false) << ret;
Expand All @@ -87,7 +87,7 @@ TEST_F(integration_test, write_corrupt_db)
}
ASSERT_EQ(PERR_NOT_FOUND, ret);
client_ =
pegasus_client_factory::get_client(cluster_name_.c_str(), table_name_.c_str());
pegasus_client_factory::get_client(kClusterName_.c_str(), table_name_.c_str());
ASSERT_TRUE(client_ != nullptr);

ret = client_->get(hkey, skey, got_value);
Expand Down Expand Up @@ -180,7 +180,7 @@ TEST_F(integration_test, read_corrupt_db)
// a new read operation on the primary replica it ever held will cause timeout.
// Force to fetch the latest route table.
client_ =
pegasus_client_factory::get_client(cluster_name_.c_str(), table_name_.c_str());
pegasus_client_factory::get_client(kClusterName_.c_str(), table_name_.c_str());
ASSERT_TRUE(client_ != nullptr);
} else {
ASSERT_TRUE(false) << ret;
Expand Down
2 changes: 1 addition & 1 deletion src/test/function_test/base_api/test_batch_get.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class batch_get : public test_util
TEST_F(batch_get, set_and_then_batch_get)
{
auto rrdb_client =
new ::dsn::apps::rrdb_client(cluster_name_.c_str(), meta_list_, table_name_.c_str());
new ::dsn::apps::rrdb_client(kClusterName_.c_str(), meta_list_, table_name_.c_str());

int test_data_count = 100;
int test_timeout_milliseconds = 3000;
Expand Down
4 changes: 2 additions & 2 deletions src/test/function_test/base_api/test_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ class copy_data_test : public test_util
ddl_client_->create_app(
destination_app_name, "pegasus", default_partitions, 3, {}, false));
source_client_ =
pegasus_client_factory::get_client(cluster_name_.c_str(), source_app_name.c_str());
pegasus_client_factory::get_client(kClusterName_.c_str(), source_app_name.c_str());
ASSERT_NE(nullptr, source_client_);
destination_client_ =
pegasus_client_factory::get_client(cluster_name_.c_str(), destination_app_name.c_str());
pegasus_client_factory::get_client(kClusterName_.c_str(), destination_app_name.c_str());
ASSERT_NE(nullptr, destination_client_);
}

Expand Down
2 changes: 1 addition & 1 deletion src/test/function_test/base_api/test_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class scan_test : public test_util
test_util::SetUp();
ASSERT_EQ(dsn::ERR_OK, ddl_client_->drop_app(table_name_, 0));
ASSERT_EQ(dsn::ERR_OK, ddl_client_->create_app(table_name_, "pegasus", 8, 3, {}, false));
client_ = pegasus_client_factory::get_client(cluster_name_.c_str(), table_name_.c_str());
client_ = pegasus_client_factory::get_client(kClusterName_.c_str(), table_name_.c_str());
ASSERT_TRUE(client_ != nullptr);
ASSERT_NO_FATAL_FAILURE(fill_database());
}
Expand Down
1 change: 1 addition & 0 deletions src/test/function_test/bulk_load/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ set(MY_PROJ_LIBS
dsn_client
dsn_replication_common
dsn_utils
dsn.block_service.local
pegasus_client_static
gtest
sasl2
Expand Down
Loading

0 comments on commit 5a30b25

Please sign in to comment.