From 1a95d071bfe4d6941f533cbbd9ad5d7f7e39c501 Mon Sep 17 00:00:00 2001 From: Yingchun Lai Date: Thu, 21 Sep 2023 20:26:20 +0800 Subject: [PATCH] refactor(file_system): use rocksdb API to read/write file --- src/replica/bulk_load/replica_bulk_loader.cpp | 7 +- src/replica/replica_restore.cpp | 4 +- src/server/pegasus_write_service_impl.h | 4 +- src/utils/filesystem.cpp | 154 ++++++++++-------- src/utils/filesystem.h | 13 +- src/utils/test/file_system_test.cpp | 55 ++++++- src/utils/test/file_utils.cpp | 9 +- 7 files changed, 156 insertions(+), 90 deletions(-) diff --git a/src/replica/bulk_load/replica_bulk_loader.cpp b/src/replica/bulk_load/replica_bulk_loader.cpp index 60f3e5a043..5261e4183c 100644 --- a/src/replica/bulk_load/replica_bulk_loader.cpp +++ b/src/replica/bulk_load/replica_bulk_loader.cpp @@ -44,6 +44,7 @@ #include "utils/autoref_ptr.h" #include "utils/blob.h" #include "utils/chrono_literals.h" +#include "utils/env.h" #include "utils/fail_point.h" #include "utils/filesystem.h" #include "utils/fmt_logging.h" @@ -497,7 +498,8 @@ void replica_bulk_loader::download_sst_file(const std::string &remote_dir, // We are not sure if the file was cached by system. And we couldn't // afford the io overhead which is cased by reading file in verify_file(), // so if file exist we just verify file size - if (utils::filesystem::verify_file_size(file_name, f_meta.size)) { + if (utils::filesystem::verify_file_size( + file_name, utils::FileDataType::kSensitive, f_meta.size)) { // local file exist and is verified ec = ERR_OK; f_size = f_meta.size; @@ -520,7 +522,8 @@ void replica_bulk_loader::download_sst_file(const std::string &remote_dir, if (ec == ERR_OK && !verified) { if (!f_meta.md5.empty() && f_md5 != f_meta.md5) { ec = ERR_CORRUPTION; - } else if (!utils::filesystem::verify_file_size(file_name, f_meta.size)) { + } else if (!utils::filesystem::verify_file_size( + file_name, utils::FileDataType::kSensitive, f_meta.size)) { ec = ERR_CORRUPTION; } } diff --git a/src/replica/replica_restore.cpp b/src/replica/replica_restore.cpp index 1a37479777..63255aeccb 100644 --- a/src/replica/replica_restore.cpp +++ b/src/replica/replica_restore.cpp @@ -49,6 +49,7 @@ #include "runtime/task/task_tracker.h" #include "utils/autoref_ptr.h" #include "utils/blob.h" +#include "utils/env.h" #include "utils/error_code.h" #include "utils/filesystem.h" #include "utils/fmt_logging.h" @@ -160,7 +161,8 @@ error_code replica::download_checkpoint(const configuration_restore_request &req const std::string file_name = utils::filesystem::path_combine(local_chkpt_dir, f_meta.name); if (download_err == ERR_OK || download_err == ERR_PATH_ALREADY_EXIST) { - if (!utils::filesystem::verify_file(file_name, f_meta.md5, f_meta.size)) { + if (!utils::filesystem::verify_file( + file_name, utils::FileDataType::kSensitive, f_meta.md5, f_meta.size)) { download_err = ERR_CORRUPTION; } else if (download_err == ERR_PATH_ALREADY_EXIST) { download_err = ERR_OK; diff --git a/src/server/pegasus_write_service_impl.h b/src/server/pegasus_write_service_impl.h index ff9f2fe837..69b71c6ba4 100644 --- a/src/server/pegasus_write_service_impl.h +++ b/src/server/pegasus_write_service_impl.h @@ -29,6 +29,7 @@ #include "pegasus_write_service.h" #include "rocksdb_wrapper.h" #include "utils/defer.h" +#include "utils/env.h" #include "utils/filesystem.h" #include "utils/string_conv.h" #include "utils/strings.h" @@ -76,7 +77,8 @@ inline dsn::error_code get_external_files_path(const std::string &bulk_load_dir, for (const auto &f_meta : metadata.files) { const auto &file_name = dsn::utils::filesystem::path_combine(bulk_load_dir, f_meta.name); if (verify_before_ingest && - !dsn::utils::filesystem::verify_file(file_name, f_meta.md5, f_meta.size)) { + !dsn::utils::filesystem::verify_file( + file_name, dsn::utils::FileDataType::kSensitive, f_meta.md5, f_meta.size)) { break; } files_path.emplace_back(file_name); diff --git a/src/utils/filesystem.cpp b/src/utils/filesystem.cpp index f2fa59ef87..8c7ad427be 100644 --- a/src/utils/filesystem.cpp +++ b/src/utils/filesystem.cpp @@ -33,24 +33,23 @@ * xxxx-xx-xx, author, fix bug about xxx */ +#include +#include #include -#include +#include #include #include #include +#include +#include +#include #include #include #include // IWYU pragma: no_include #include - #include - -#include -#include -#include -#include -#include +#include #include "utils/defer.h" #include "utils/env.h" @@ -60,7 +59,6 @@ #include "utils/ports.h" #include "utils/safe_strerror_posix.h" #include "utils/string_view.h" -#include "utils/strings.h" #define getcwd_ getcwd #define rmdir_ rmdir @@ -417,7 +415,6 @@ bool deprecated_file_size(const std::string &path, int64_t &sz) } sz = st.st_size; - return true; } @@ -517,53 +514,28 @@ bool create_directory(const std::string &path) bool create_file(const std::string &path) { - size_t pos; std::string npath; - int fd; - int mode; - int err; - - if (path.empty()) { - return false; - } - - if (_FS_ISSEP(path.back())) { - return false; - } - - err = get_normalized_path(path, npath); + int err = get_normalized_path(path, npath); if (err != 0) { return false; } - if (dsn::utils::filesystem::path_exists_internal(npath, FTW_F)) { - return true; - } - - if (dsn::utils::filesystem::path_exists_internal(npath, FTW_D)) { - return false; - } - - pos = npath.find_last_of("\\/"); + auto pos = npath.find_last_of("\\/"); if ((pos != std::string::npos) && (pos > 0)) { auto ppath = npath.substr(0, pos); if (!dsn::utils::filesystem::create_directory(ppath)) { + LOG_WARNING("fail to create directory {}", ppath); return false; } } - mode = 0775; - fd = ::creat(npath.c_str(), mode); - if (fd == -1) { - err = errno; - LOG_WARNING("create_file {} failed, err = {}", path, safe_strerror(err)); + std::unique_ptr wfile; + auto s = rocksdb::Env::Default()->ReopenWritableFile(path, &wfile, rocksdb::EnvOptions()); + if (dsn_unlikely(!s.ok())) { + LOG_WARNING("fail to create file {}, err={}", path, s.ToString()); return false; } - if (::close_(fd) != 0) { - LOG_WARNING("create_file {}, failed to close the file handle.", path); - } - return true; } @@ -745,6 +717,54 @@ bool link_file(const std::string &src, const std::string &target) } error_code md5sum(const std::string &file_path, /*out*/ std::string &result) +{ + result.clear(); + if (!::dsn::utils::filesystem::file_exists(file_path)) { + LOG_ERROR("md5sum error: file {} not exist", file_path); + return ERR_OBJECT_NOT_FOUND; + } + + std::unique_ptr sfile; + auto s = rocksdb::Env::Default()->NewSequentialFile(file_path, &sfile, rocksdb::EnvOptions()); + if (!sfile) { + LOG_ERROR("md5sum error: open file {} failed, err={}", file_path, s.ToString()); + return ERR_FILE_OPERATION_FAILED; + } + + const int64_t kBufferSize = 4096; + char buf[kBufferSize]; + unsigned char out[MD5_DIGEST_LENGTH] = {0}; + MD5_CTX c; + CHECK_EQ(1, MD5_Init(&c)); + while (true) { + rocksdb::Slice res; + s = sfile->Read(kBufferSize, &res, buf); + if (!s.ok()) { + MD5_Final(out, &c); + LOG_ERROR("md5sum error: read file {} failed, err = ", file_path, s.ToString()); + return ERR_FILE_OPERATION_FAILED; + } + if (res.empty()) { + break; + } + CHECK_EQ(1, MD5_Update(&c, buf, res.size())); + if (res.size() < kBufferSize) { + break; + } + } + CHECK_EQ(1, MD5_Final(out, &c)); + + char str[MD5_DIGEST_LENGTH * 2 + 1]; + str[MD5_DIGEST_LENGTH * 2] = 0; + for (int n = 0; n < MD5_DIGEST_LENGTH; n++) { + sprintf(str + n + n, "%02x", out[n]); + } + result.assign(str); + + return ERR_OK; +} + +error_code deprecated_md5sum(const std::string &file_path, /*out*/ std::string &result) { result.clear(); // if file not exist, we return ERR_OBJECT_NOT_FOUND @@ -841,6 +861,7 @@ error_code read_file(const std::string &fname, std::string &buf) } bool verify_file(const std::string &fname, + FileDataType type, const std::string &expected_md5, const int64_t &expected_fsize) { @@ -849,7 +870,7 @@ bool verify_file(const std::string &fname, return false; } int64_t f_size = 0; - if (!file_size(fname, f_size)) { + if (!file_size(fname, type, f_size)) { LOG_ERROR("verify file({}) failed, becaused failed to get file size", fname); return false; } @@ -870,14 +891,14 @@ bool verify_file(const std::string &fname, return true; } -bool verify_file_size(const std::string &fname, const int64_t &expected_fsize) +bool verify_file_size(const std::string &fname, FileDataType type, const int64_t &expected_fsize) { if (!file_exists(fname)) { LOG_ERROR("file({}) is not existed", fname); return false; } int64_t f_size = 0; - if (!file_size(fname, f_size)) { + if (!file_size(fname, type, f_size)) { LOG_ERROR("verify file({}) size failed, becaused failed to get file size", fname); return false; } @@ -891,22 +912,6 @@ bool verify_file_size(const std::string &fname, const int64_t &expected_fsize) return true; } -bool verify_data_md5(const std::string &fname, - const char *data, - const size_t data_size, - const std::string &expected_md5) -{ - std::string md5 = string_md5(data, data_size); - if (md5 != expected_md5) { - LOG_ERROR("verify data({}) failed, because data damaged, size: md5: {} VS {}", - fname, - md5, - expected_md5); - return false; - } - return true; -} - bool create_directory(const std::string &path, std::string &absolute_path, std::string &err_msg) { FAIL_POINT_INJECT_F("filesystem_create_directory", [path](string_view str) { @@ -952,23 +957,28 @@ bool check_dir_rw(const std::string &path, std::string &err_msg) path.find(broken_disk_dir) == std::string::npos; }); - std::string fname = "read_write_test_file"; - std::string fpath = path_combine(path, fname); - if (!create_file(fpath)) { - err_msg = fmt::format("Fail to create test file {}.", fpath); + static const std::string kTestValue = "test_value"; + static const std::string kFname = "read_write_test_file"; + std::string fpath = path_combine(path, kFname); + auto cleanup = defer([&fpath]() { remove_path(fpath); }); + auto s = rocksdb::WriteStringToFile(rocksdb::Env::Default(), + rocksdb::Slice(kTestValue), + fpath, + /* should_sync */ true); + if (dsn_unlikely(!s.ok())) { + err_msg = fmt::format("fail to write file {}, err={}", fpath, s.ToString()); return false; } - auto cleanup = defer([&fpath]() { remove_path(fpath); }); - std::string value = "test_value"; - if (!write_file(fpath, value)) { - err_msg = fmt::format("Fail to write file {}.", fpath); + std::string read_data; + s = rocksdb::ReadFileToString(rocksdb::Env::Default(), fpath, &read_data); + if (dsn_unlikely(!s.ok())) { + err_msg = fmt::format("fail to read file {}, err={}", fpath, s.ToString()); return false; } - std::string buf; - if (read_file(fpath, buf) != ERR_OK || buf != value) { - err_msg = fmt::format("Fail to read file {} or get wrong value({}).", fpath, buf); + if (dsn_unlikely(read_data != kTestValue)) { + err_msg = fmt::format("get wrong value '{}' from file", read_data, fpath); return false; } diff --git a/src/utils/filesystem.h b/src/utils/filesystem.h index 9c5d3efea9..440128a97d 100644 --- a/src/utils/filesystem.h +++ b/src/utils/filesystem.h @@ -65,6 +65,8 @@ enum class FileDataType; namespace filesystem { +// TODO(yingchun): Consider using rocksdb APIs to rewrite the following functions. + int get_normalized_path(const std::string &path, std::string &npath); bool get_absolute_path(const std::string &path1, std::string &path2); @@ -136,6 +138,7 @@ bool get_disk_space_info(const std::string &path, disk_space_info &info); bool link_file(const std::string &src, const std::string &target); error_code md5sum(const std::string &file_path, /*out*/ std::string &result); +error_code deprecated_md5sum(const std::string &file_path, /*out*/ std::string &result); // return value: // - : @@ -143,25 +146,23 @@ error_code md5sum(const std::string &file_path, /*out*/ std::string &result); // B is represent wheter the directory is empty, true means empty, otherwise false std::pair is_directory_empty(const std::string &dirname); +// TODO(yingchun): remove it! error_code read_file(const std::string &fname, /*out*/ std::string &buf); // compare file metadata calculated by fname with expected md5 and file_size bool verify_file(const std::string &fname, + FileDataType type, const std::string &expected_md5, const int64_t &expected_fsize); -bool verify_file_size(const std::string &fname, const int64_t &expected_fsize); - -bool verify_data_md5(const std::string &fname, - const char *data, - const size_t data_size, - const std::string &expected_md5); +bool verify_file_size(const std::string &fname, FileDataType type, const int64_t &expected_fsize); // create driectory and get absolute path bool create_directory(const std::string &path, /*out*/ std::string &absolute_path, /*out*/ std::string &err_msg); +// TODO(yingchun): remove it! bool write_file(const std::string &fname, std::string &buf); // check if directory is readable and writable diff --git a/src/utils/test/file_system_test.cpp b/src/utils/test/file_system_test.cpp index ccfc2da2f9..f497a908b3 100644 --- a/src/utils/test/file_system_test.cpp +++ b/src/utils/test/file_system_test.cpp @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +// IWYU pragma: no_include // IWYU pragma: no_include // IWYU pragma: no_include #include @@ -26,6 +27,7 @@ #include #include "utils/env.h" +#include "utils/error_code.h" #include "utils/filesystem.h" #include "utils/flags.h" @@ -115,18 +117,63 @@ TEST(filesystem_test_p, encrypted_file_size) ASSERT_EQ(kFileContentSize + kEncryptionHeaderkSize, actual_file_size); } +// The old filesystem API doesn't support sensitive files, so skip testing +// FLAGS_encrypt_data_at_rest=true. +TEST(filesystem_test, check_new_md5sum) +{ + FLAGS_encrypt_data_at_rest = false; + + struct file_info + { + int64_t size; + } tests[]{{4095}, {4096}, {4097}}; + + for (const auto &test : tests) { + std::string fname = "test_file"; + // deprecated_md5sum doesn't support kSensitive files, so use kNonSensitive here. + auto s = rocksdb::WriteStringToFile( + dsn::utils::PegasusEnv(dsn::utils::FileDataType::kNonSensitive), + rocksdb::Slice(std::string(test.size, 'a')), + fname, + /* should_sync */ true); + ASSERT_TRUE(s.ok()) << s.ToString(); + // Check the file size. + int64_t file_fsize; + ASSERT_TRUE(file_size(fname, FileDataType::kNonSensitive, file_fsize)); + ASSERT_EQ(test.size, file_fsize); + + // Get the md5sum. + std::string md5sum1; + ASSERT_EQ(ERR_OK, md5sum(fname, md5sum1)); + ASSERT_FALSE(md5sum1.empty()); + + // Check the md5sum is repeatable. + std::string md5sum2; + ASSERT_EQ(ERR_OK, md5sum(fname, md5sum2)); + ASSERT_EQ(md5sum1, md5sum2); + + // Check the md5sum is the same to deprecated_md5sum. + ASSERT_EQ(ERR_OK, deprecated_md5sum(fname, md5sum2)); + ASSERT_EQ(md5sum1, md5sum2); + + utils::filesystem::remove_path(fname); + } +} + TEST(filesystem_test, verify_file_test) { + FLAGS_encrypt_data_at_rest = false; + const std::string &fname = "test_file"; std::string expected_md5; int64_t expected_fsize; create_file(fname); md5sum(fname, expected_md5); - file_size(fname, expected_fsize); + ASSERT_TRUE(file_size(fname, FileDataType::kNonSensitive, expected_fsize)); - ASSERT_TRUE(verify_file(fname, expected_md5, expected_fsize)); - ASSERT_FALSE(verify_file(fname, "wrong_md5", 10086)); - ASSERT_FALSE(verify_file("file_not_exists", "wrong_md5", 10086)); + ASSERT_TRUE(verify_file(fname, FileDataType::kNonSensitive, expected_md5, expected_fsize)); + ASSERT_FALSE(verify_file(fname, FileDataType::kNonSensitive, "wrong_md5", 10086)); + ASSERT_FALSE(verify_file("file_not_exists", FileDataType::kNonSensitive, "wrong_md5", 10086)); remove_path(fname); } diff --git a/src/utils/test/file_utils.cpp b/src/utils/test/file_utils.cpp index 004e228b4b..0cc2173e8e 100644 --- a/src/utils/test/file_utils.cpp +++ b/src/utils/test/file_utils.cpp @@ -33,6 +33,7 @@ #include #include +#include "utils/env.h" #include "utils/error_code.h" #include "utils/filesystem.h" @@ -683,12 +684,12 @@ static void file_utils_test_file_size() bool ret; path = "./file_utils_temp.txt"; - ret = dsn::utils::filesystem::file_size(path, sz); - EXPECT_TRUE(ret); - EXPECT_TRUE(sz == 12); + ret = dsn::utils::filesystem::file_size(path, dsn::utils::FileDataType::kNonSensitive, sz); + ASSERT_TRUE(ret); + ASSERT_EQ(12, sz); path = "./file_utils_temp2.txt"; - ret = dsn::utils::filesystem::file_size(path, sz); + ret = dsn::utils::filesystem::file_size(path, dsn::utils::FileDataType::kNonSensitive, sz); EXPECT_FALSE(ret); }