Skip to content

Commit

Permalink
refactor(json_file): Refactor the load and dump functions of JSON obj…
Browse files Browse the repository at this point in the history
…ects
  • Loading branch information
acelyc111 committed Dec 27, 2023
1 parent cbc1604 commit ce73232
Show file tree
Hide file tree
Showing 16 changed files with 357 additions and 265 deletions.
50 changes: 7 additions & 43 deletions src/block_service/local/local_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@
// under the License.

#include <rocksdb/env.h>
#include <initializer_list>
#include <memory>
#include <set>
#include <type_traits>
#include <utility>

#include "absl/strings/string_view.h"
#include "local_service.h"
#include "nlohmann/json.hpp"
#include "nlohmann/json_fwd.hpp"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "runtime/task/async_calls.h"
Expand All @@ -36,7 +35,7 @@
#include "utils/filesystem.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "absl/strings/string_view.h"
#include "utils/load_dump_object.h"
#include "utils/strings.h"

DSN_DECLARE_bool(enable_direct_io);
Expand All @@ -53,41 +52,6 @@ namespace block_service {

DEFINE_TASK_CODE(LPC_LOCAL_SERVICE_CALL, TASK_PRIORITY_COMMON, THREAD_POOL_BLOCK_SERVICE)

error_code file_metadata::dump_to_file(const std::string &file_path) const
{
std::string data = nlohmann::json(*this).dump();
auto s =
rocksdb::WriteStringToFile(dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive),
rocksdb::Slice(data),
file_path,
/* should_sync */ true);
if (!s.ok()) {
LOG_WARNING("write to metadata file '{}' failed, err = {}", file_path, s.ToString());
return ERR_FS_INTERNAL;
}

return ERR_OK;
}

error_code file_metadata::load_from_file(const std::string &file_path)
{
std::string data;
auto s = rocksdb::ReadFileToString(
dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive), file_path, &data);
if (!s.ok()) {
LOG_WARNING("load from metadata file '{}' failed, err = {}", file_path, s.ToString());
return ERR_FS_INTERNAL;
}

try {
nlohmann::json::parse(data).get_to(*this);
return ERR_OK;
} catch (nlohmann::json::exception &exp) {
LOG_WARNING("decode metadata from json failed: {}, data = [{}]", exp.what(), data);
return ERR_FS_INTERNAL;
}
}

std::string local_service::get_metafile(const std::string &filepath)
{
std::string dir_part = utils::filesystem::remove_file_name(filepath);
Expand Down Expand Up @@ -294,7 +258,7 @@ error_code local_file_object::load_metadata()

file_metadata fmd;
std::string filepath = local_service::get_metafile(file_name());
auto ec = fmd.load_from_file(filepath);
auto ec = dsn::utils::load_njobj_from_file(filepath, &fmd);
if (ec != ERR_OK) {
LOG_WARNING("load metadata file '{}' failed", filepath);
return ERR_FS_INTERNAL;
Expand Down Expand Up @@ -356,8 +320,8 @@ dsn::task_ptr local_file_object::write(const write_request &req,
// a lot, but it is somewhat not correct.
_size = resp.written_size;
_md5_value = utils::string_md5(req.buffer.data(), req.buffer.length());
auto err = file_metadata(_size, _md5_value)
.dump_to_file(local_service::get_metafile(file_name()));
auto err = dsn::utils::dump_njobj_to_file(file_metadata(_size, _md5_value),
local_service::get_metafile(file_name()));
if (err != ERR_OK) {
LOG_ERROR("file_metadata write failed");
resp.err = ERR_FS_INTERNAL;
Expand Down Expand Up @@ -498,8 +462,8 @@ dsn::task_ptr local_file_object::upload(const upload_request &req,
break;
}

auto err = file_metadata(_size, _md5_value)
.dump_to_file(local_service::get_metafile(file_name()));
auto err = dsn::utils::dump_njobj_to_file(file_metadata(_size, _md5_value),
local_service::get_metafile(file_name()));
if (err != ERR_OK) {
LOG_ERROR("file_metadata write failed");
resp.err = ERR_FS_INTERNAL;
Expand Down
8 changes: 1 addition & 7 deletions src/block_service/local/local_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,8 @@ struct file_metadata
std::string md5;

file_metadata(int64_t s = 0, const std::string &m = "") : size(s), md5(m) {}

// Dump the object to a file in JSON format.
error_code dump_to_file(const std::string &file_path) const;

// Load the object from a file in JSON format.
error_code load_from_file(const std::string &file_path);
};
NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(file_metadata, size, md5)
NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(file_metadata, size, md5);

class local_service : public block_filesystem
{
Expand Down
8 changes: 5 additions & 3 deletions src/block_service/test/local_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "test_util/test_util.h"
#include "utils/env.h"
#include "utils/error_code.h"
#include "utils/load_dump_object.h"

namespace dsn {
namespace dist {
Expand All @@ -49,11 +50,11 @@ TEST_P(local_service_test, file_metadata)
const int64_t kSize = 12345;
const std::string kMD5 = "0123456789abcdef0123456789abcdef";
auto meta_file_path = local_service::get_metafile("a.txt");
ASSERT_EQ(ERR_OK, file_metadata(kSize, kMD5).dump_to_file(meta_file_path));
ASSERT_EQ(ERR_OK, dsn::utils::dump_njobj_to_file(file_metadata(kSize, kMD5), meta_file_path));
ASSERT_TRUE(boost::filesystem::exists(meta_file_path));

file_metadata fm;
fm.load_from_file(meta_file_path);
ASSERT_EQ(ERR_OK, dsn::utils::load_njobj_from_file(meta_file_path, &fm));
ASSERT_EQ(kSize, fm.size);
ASSERT_EQ(kMD5, fm.md5);
}
Expand All @@ -64,7 +65,8 @@ TEST_P(local_service_test, load_metadata)
auto meta_file_path = local_service::get_metafile(file.file_name());

{
ASSERT_EQ(ERR_OK, file_metadata(5, "abcde").dump_to_file(meta_file_path));
ASSERT_EQ(ERR_OK,
dsn::utils::dump_njobj_to_file(file_metadata(5, "abcde"), meta_file_path));
ASSERT_EQ(ERR_OK, file.load_metadata());
ASSERT_EQ("abcde", file.get_md5sum());
ASSERT_EQ(5, file.get_size());
Expand Down
24 changes: 6 additions & 18 deletions src/replica/bulk_load/replica_bulk_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,16 @@
// under the License.

#include <fmt/core.h>
#include <rocksdb/env.h>
#include <rocksdb/status.h>
#include <functional>
#include <memory>
#include <unordered_map>
#include <utility>
#include <vector>

#include "absl/strings/string_view.h"
#include "block_service/block_service_manager.h"
#include "common/bulk_load_common.h"
#include "common/gpid.h"
#include "common/json_helper.h"
#include "common/replication.codes.h"
#include "common/replication_common.h"
#include "common/replication_enums.h"
Expand All @@ -42,14 +40,12 @@
#include "runtime/rpc/rpc_holder.h"
#include "runtime/task/async_calls.h"
#include "utils/autoref_ptr.h"
#include "utils/blob.h"
#include "utils/chrono_literals.h"
#include "utils/env.h"
#include "utils/fail_point.h"
#include "utils/filesystem.h"
#include "utils/fmt_logging.h"
#include "utils/ports.h"
#include "absl/strings/string_view.h"
#include "utils/load_dump_object.h"
#include "utils/thread_access_checker.h"

METRIC_DEFINE_counter(replica,
Expand Down Expand Up @@ -606,18 +602,10 @@ void replica_bulk_loader::download_sst_file(const std::string &remote_dir,
// need to acquire write lock while calling it
error_code replica_bulk_loader::parse_bulk_load_metadata(const std::string &fname)
{
std::string buf;
auto s = rocksdb::ReadFileToString(
dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive), fname, &buf);
if (dsn_unlikely(!s.ok())) {
LOG_ERROR_PREFIX("read file {} failed, error = {}", fname, s.ToString());
return ERR_FILE_OPERATION_FAILED;
}

blob bb = blob::create_from_bytes(std::move(buf));
if (!json::json_forwarder<bulk_load_metadata>::decode(bb, _metadata)) {
LOG_ERROR_PREFIX("file({}) is damaged", fname);
return ERR_CORRUPTION;
auto ec = dsn::utils::load_rjobj_from_file(fname, &_metadata);
if (ec != ERR_OK) {
LOG_ERROR_PREFIX("load bulk_load_metadata from file {} failed", fname);
return ec;
}

if (_metadata.file_total_size <= 0) {
Expand Down
3 changes: 2 additions & 1 deletion src/replica/bulk_load/replica_bulk_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ class replica_bulk_loader : replica_base
int32_t file_index,
dist::block_service::block_filesystem *fs);

// \return ERR_FILE_OPERATION_FAILED: file not exist, get size failed, open file failed
// \return ERR_PATH_NOT_FOUND: file not exist
// \return ERR_FILE_OPERATION_FAILED: get size failed, open file failed
// \return ERR_CORRUPTION: parse failed
// need to acquire write lock while calling it
error_code parse_bulk_load_metadata(const std::string &fname);
Expand Down
25 changes: 5 additions & 20 deletions src/replica/bulk_load/test/replica_bulk_loader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,22 @@

#include "replica/bulk_load/replica_bulk_loader.h"

#include <fmt/core.h>
#include <rocksdb/env.h>
#include <rocksdb/slice.h>
#include <rocksdb/status.h>
#include <fstream> // IWYU pragma: keep
#include <memory>
#include <vector>

#include "common/bulk_load_common.h"
#include "common/gpid.h"
#include "common/json_helper.h"
#include "dsn.layer2_types.h"
#include "gtest/gtest.h"
#include "replica/test/mock_utils.h"
#include "replica/test/replica_test_base.h"
#include "runtime/rpc/rpc_address.h"
#include "runtime/task/task_tracker.h"
#include "test_util/test_util.h"
#include "utils/blob.h"
#include "utils/env.h"
#include "utils/fail_point.h"
#include "utils/filesystem.h"
#include "utils/load_dump_object.h"
#include "utils/test_macros.h"

namespace dsn {
Expand Down Expand Up @@ -258,14 +252,7 @@ class replica_bulk_loader_test : public replica_test_base
_metadata.files.emplace_back(_file_meta);
_metadata.file_total_size = _file_meta.size;
std::string whole_name = utils::filesystem::path_combine(LOCAL_DIR, METADATA);
blob bb = json::json_forwarder<bulk_load_metadata>::encode(_metadata);
auto s =
rocksdb::WriteStringToFile(dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive),
rocksdb::Slice(bb.data(), bb.length()),
whole_name,
/* should_sync */ true);
ASSERT_TRUE(s.ok()) << fmt::format(
"write file {} failed, err = {}", whole_name, s.ToString());
ASSERT_EQ(ERR_OK, utils::dump_rjobj_to_file(_metadata, whole_name));
}

bool validate_metadata()
Expand Down Expand Up @@ -525,7 +512,7 @@ TEST_P(replica_bulk_loader_test, rollback_to_downloading_test)
// parse_bulk_load_metadata unit tests
TEST_P(replica_bulk_loader_test, bulk_load_metadata_not_exist)
{
ASSERT_EQ(test_parse_bulk_load_metadata("path_not_exist"), ERR_FILE_OPERATION_FAILED);
ASSERT_EQ(ERR_PATH_NOT_FOUND, test_parse_bulk_load_metadata("path_not_exist"));
}

TEST_P(replica_bulk_loader_test, bulk_load_metadata_corrupt)
Expand All @@ -535,8 +522,7 @@ TEST_P(replica_bulk_loader_test, bulk_load_metadata_corrupt)
NO_FATALS(pegasus::create_local_test_file(utils::filesystem::path_combine(LOCAL_DIR, METADATA),
&_file_meta));
std::string metadata_file_name = utils::filesystem::path_combine(LOCAL_DIR, METADATA);
error_code ec = test_parse_bulk_load_metadata(metadata_file_name);
ASSERT_EQ(ERR_CORRUPTION, ec);
ASSERT_EQ(ERR_CORRUPTION, test_parse_bulk_load_metadata(metadata_file_name));
utils::filesystem::remove_path(LOCAL_DIR);
}

Expand All @@ -546,8 +532,7 @@ TEST_P(replica_bulk_loader_test, bulk_load_metadata_parse_succeed)
NO_FATALS(create_local_metadata_file());

std::string metadata_file_name = utils::filesystem::path_combine(LOCAL_DIR, METADATA);
auto ec = test_parse_bulk_load_metadata(metadata_file_name);
ASSERT_EQ(ec, ERR_OK);
ASSERT_EQ(ERR_OK, test_parse_bulk_load_metadata(metadata_file_name));
ASSERT_TRUE(validate_metadata());
utils::filesystem::remove_path(LOCAL_DIR);
}
Expand Down
5 changes: 4 additions & 1 deletion src/replica/replica_disk_migrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "utils/filesystem.h"
#include "utils/fmt_logging.h"
#include "utils/thread_access_checker.h"
#include "utils/load_dump_object.h"

namespace dsn {
namespace replication {
Expand Down Expand Up @@ -251,7 +252,9 @@ bool replica_disk_migrator::migrate_replica_app_info(const replica_disk_migrate_
return false;
});
replica_init_info init_info = _replica->get_app()->init_info();
const auto &store_init_info_err = init_info.store(_target_replica_dir);
const auto &store_init_info_err = utils::dump_rjobj_to_file(
init_info,
utils::filesystem::path_combine(_target_replica_dir, replica_init_info::kInitInfo));
if (store_init_info_err != ERR_OK) {
LOG_ERROR_PREFIX("disk migration(origin={}, target={}) stores app init info failed({})",
req.origin_disk,
Expand Down
38 changes: 6 additions & 32 deletions src/replica/replica_restore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

#include <boost/cstdint.hpp>
#include <boost/lexical_cast.hpp>
#include <rocksdb/env.h>
#include <rocksdb/status.h>
#include <stdint.h>
#include <atomic>
#include <fstream>
Expand All @@ -34,7 +32,6 @@
#include "block_service/block_service_manager.h"
#include "common/backup_common.h"
#include "common/gpid.h"
#include "common/json_helper.h"
#include "common/replication.codes.h"
#include "dsn.layer2_types.h"
#include "failure_detector/failure_detector_multimaster.h"
Expand All @@ -55,6 +52,7 @@
#include "utils/error_code.h"
#include "utils/filesystem.h"
#include "utils/fmt_logging.h"
#include "utils/load_dump_object.h"

using namespace dsn::dist::block_service;

Expand Down Expand Up @@ -95,31 +93,6 @@ bool replica::remove_useless_file_under_chkpt(const std::string &chkpt_dir,
return true;
}

bool replica::read_cold_backup_metadata(const std::string &fname,
cold_backup_metadata &backup_metadata)
{
if (!::dsn::utils::filesystem::file_exists(fname)) {
LOG_ERROR_PREFIX(
"checkpoint on remote storage media is damaged, coz file({}) doesn't exist", fname);
return false;
}

std::string data;
auto s = rocksdb::ReadFileToString(
dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive), fname, &data);
if (!s.ok()) {
LOG_ERROR_PREFIX("read file '{}' failed, err = {}", fname, s.ToString());
return false;
}

if (!::dsn::json::json_forwarder<cold_backup_metadata>::decode(
blob::create_from_bytes(std::move(data)), backup_metadata)) {
LOG_ERROR_PREFIX("file({}) under checkpoint is damaged", fname);
return false;
}
return true;
}

error_code replica::download_checkpoint(const configuration_restore_request &req,
const std::string &remote_chkpt_dir,
const std::string &local_chkpt_dir)
Expand Down Expand Up @@ -206,13 +179,14 @@ error_code replica::get_backup_metadata(block_filesystem *fs,
return err;
}

// parse cold_backup_meta from metadata file
// Load cold_backup_metadata from metadata file.
const std::string local_backup_metada_file =
utils::filesystem::path_combine(local_chkpt_dir, cold_backup_constant::BACKUP_METADATA);
if (!read_cold_backup_metadata(local_backup_metada_file, backup_metadata)) {
LOG_ERROR_PREFIX("read cold_backup_metadata from file({}) failed",
auto ec = dsn::utils::load_rjobj_from_file(local_backup_metada_file, &backup_metadata);
if (ec != ERR_OK) {
LOG_ERROR_PREFIX("load cold_backup_metadata from file({}) failed",
local_backup_metada_file);
return ERR_FILE_OPERATION_FAILED;
return ec;
}

_chkpt_total_size = backup_metadata.checkpoint_total_size;
Expand Down
Loading

0 comments on commit ce73232

Please sign in to comment.