Skip to content

Commit

Permalink
refactor(file_system): use rocksdb API to read/write file
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Sep 21, 2023
1 parent c8793d2 commit 1a95d07
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 90 deletions.
7 changes: 5 additions & 2 deletions src/replica/bulk_load/replica_bulk_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "utils/autoref_ptr.h"
#include "utils/blob.h"
#include "utils/chrono_literals.h"
#include "utils/env.h"
#include "utils/fail_point.h"
#include "utils/filesystem.h"
#include "utils/fmt_logging.h"
Expand Down Expand Up @@ -497,7 +498,8 @@ void replica_bulk_loader::download_sst_file(const std::string &remote_dir,
// We are not sure if the file was cached by system. And we couldn't
// afford the io overhead which is cased by reading file in verify_file(),
// so if file exist we just verify file size
if (utils::filesystem::verify_file_size(file_name, f_meta.size)) {
if (utils::filesystem::verify_file_size(
file_name, utils::FileDataType::kSensitive, f_meta.size)) {
// local file exist and is verified
ec = ERR_OK;
f_size = f_meta.size;
Expand All @@ -520,7 +522,8 @@ void replica_bulk_loader::download_sst_file(const std::string &remote_dir,
if (ec == ERR_OK && !verified) {
if (!f_meta.md5.empty() && f_md5 != f_meta.md5) {
ec = ERR_CORRUPTION;
} else if (!utils::filesystem::verify_file_size(file_name, f_meta.size)) {
} else if (!utils::filesystem::verify_file_size(
file_name, utils::FileDataType::kSensitive, f_meta.size)) {
ec = ERR_CORRUPTION;
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/replica/replica_restore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include "runtime/task/task_tracker.h"
#include "utils/autoref_ptr.h"
#include "utils/blob.h"
#include "utils/env.h"
#include "utils/error_code.h"
#include "utils/filesystem.h"
#include "utils/fmt_logging.h"
Expand Down Expand Up @@ -160,7 +161,8 @@ error_code replica::download_checkpoint(const configuration_restore_request &req
const std::string file_name =
utils::filesystem::path_combine(local_chkpt_dir, f_meta.name);
if (download_err == ERR_OK || download_err == ERR_PATH_ALREADY_EXIST) {
if (!utils::filesystem::verify_file(file_name, f_meta.md5, f_meta.size)) {
if (!utils::filesystem::verify_file(
file_name, utils::FileDataType::kSensitive, f_meta.md5, f_meta.size)) {
download_err = ERR_CORRUPTION;
} else if (download_err == ERR_PATH_ALREADY_EXIST) {
download_err = ERR_OK;
Expand Down
4 changes: 3 additions & 1 deletion src/server/pegasus_write_service_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "pegasus_write_service.h"
#include "rocksdb_wrapper.h"
#include "utils/defer.h"
#include "utils/env.h"
#include "utils/filesystem.h"
#include "utils/string_conv.h"
#include "utils/strings.h"
Expand Down Expand Up @@ -76,7 +77,8 @@ inline dsn::error_code get_external_files_path(const std::string &bulk_load_dir,
for (const auto &f_meta : metadata.files) {
const auto &file_name = dsn::utils::filesystem::path_combine(bulk_load_dir, f_meta.name);
if (verify_before_ingest &&
!dsn::utils::filesystem::verify_file(file_name, f_meta.md5, f_meta.size)) {
!dsn::utils::filesystem::verify_file(
file_name, dsn::utils::FileDataType::kSensitive, f_meta.md5, f_meta.size)) {
break;
}
files_path.emplace_back(file_name);
Expand Down
154 changes: 82 additions & 72 deletions src/utils/filesystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,23 @@
* xxxx-xx-xx, author, fix bug about xxx
*/

#include <boost/filesystem/operations.hpp>
#include <boost/system/error_code.hpp>
#include <errno.h>
#include <fcntl.h>
#include <fmt/core.h>
#include <ftw.h>
#include <limits.h>
#include <openssl/md5.h>
#include <rocksdb/env.h>
#include <rocksdb/slice.h>
#include <rocksdb/status.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
// IWYU pragma: no_include <bits/struct_stat.h>
#include <unistd.h>

#include <istream>

#include <boost/filesystem/operations.hpp>
#include <boost/system/error_code.hpp>
#include <fmt/core.h>
#include <rocksdb/env.h>
#include <rocksdb/status.h>
#include <memory>

#include "utils/defer.h"
#include "utils/env.h"
Expand All @@ -60,7 +59,6 @@
#include "utils/ports.h"
#include "utils/safe_strerror_posix.h"
#include "utils/string_view.h"
#include "utils/strings.h"

#define getcwd_ getcwd
#define rmdir_ rmdir
Expand Down Expand Up @@ -417,7 +415,6 @@ bool deprecated_file_size(const std::string &path, int64_t &sz)
}

sz = st.st_size;

return true;
}

Expand Down Expand Up @@ -517,53 +514,28 @@ bool create_directory(const std::string &path)

bool create_file(const std::string &path)
{
size_t pos;
std::string npath;
int fd;
int mode;
int err;

if (path.empty()) {
return false;
}

if (_FS_ISSEP(path.back())) {
return false;
}

err = get_normalized_path(path, npath);
int err = get_normalized_path(path, npath);
if (err != 0) {
return false;
}

if (dsn::utils::filesystem::path_exists_internal(npath, FTW_F)) {
return true;
}

if (dsn::utils::filesystem::path_exists_internal(npath, FTW_D)) {
return false;
}

pos = npath.find_last_of("\\/");
auto pos = npath.find_last_of("\\/");
if ((pos != std::string::npos) && (pos > 0)) {
auto ppath = npath.substr(0, pos);
if (!dsn::utils::filesystem::create_directory(ppath)) {
LOG_WARNING("fail to create directory {}", ppath);
return false;
}
}

mode = 0775;
fd = ::creat(npath.c_str(), mode);
if (fd == -1) {
err = errno;
LOG_WARNING("create_file {} failed, err = {}", path, safe_strerror(err));
std::unique_ptr<rocksdb::WritableFile> wfile;
auto s = rocksdb::Env::Default()->ReopenWritableFile(path, &wfile, rocksdb::EnvOptions());
if (dsn_unlikely(!s.ok())) {
LOG_WARNING("fail to create file {}, err={}", path, s.ToString());
return false;
}

if (::close_(fd) != 0) {
LOG_WARNING("create_file {}, failed to close the file handle.", path);
}

return true;
}

Expand Down Expand Up @@ -745,6 +717,54 @@ bool link_file(const std::string &src, const std::string &target)
}

error_code md5sum(const std::string &file_path, /*out*/ std::string &result)
{
result.clear();
if (!::dsn::utils::filesystem::file_exists(file_path)) {
LOG_ERROR("md5sum error: file {} not exist", file_path);
return ERR_OBJECT_NOT_FOUND;
}

std::unique_ptr<rocksdb::SequentialFile> sfile;
auto s = rocksdb::Env::Default()->NewSequentialFile(file_path, &sfile, rocksdb::EnvOptions());
if (!sfile) {
LOG_ERROR("md5sum error: open file {} failed, err={}", file_path, s.ToString());
return ERR_FILE_OPERATION_FAILED;
}

const int64_t kBufferSize = 4096;
char buf[kBufferSize];
unsigned char out[MD5_DIGEST_LENGTH] = {0};
MD5_CTX c;
CHECK_EQ(1, MD5_Init(&c));
while (true) {
rocksdb::Slice res;
s = sfile->Read(kBufferSize, &res, buf);
if (!s.ok()) {
MD5_Final(out, &c);
LOG_ERROR("md5sum error: read file {} failed, err = ", file_path, s.ToString());
return ERR_FILE_OPERATION_FAILED;
}
if (res.empty()) {
break;
}
CHECK_EQ(1, MD5_Update(&c, buf, res.size()));
if (res.size() < kBufferSize) {
break;
}
}
CHECK_EQ(1, MD5_Final(out, &c));

char str[MD5_DIGEST_LENGTH * 2 + 1];
str[MD5_DIGEST_LENGTH * 2] = 0;
for (int n = 0; n < MD5_DIGEST_LENGTH; n++) {
sprintf(str + n + n, "%02x", out[n]);
}
result.assign(str);

return ERR_OK;
}

error_code deprecated_md5sum(const std::string &file_path, /*out*/ std::string &result)
{
result.clear();
// if file not exist, we return ERR_OBJECT_NOT_FOUND
Expand Down Expand Up @@ -841,6 +861,7 @@ error_code read_file(const std::string &fname, std::string &buf)
}

bool verify_file(const std::string &fname,
FileDataType type,
const std::string &expected_md5,
const int64_t &expected_fsize)
{
Expand All @@ -849,7 +870,7 @@ bool verify_file(const std::string &fname,
return false;
}
int64_t f_size = 0;
if (!file_size(fname, f_size)) {
if (!file_size(fname, type, f_size)) {
LOG_ERROR("verify file({}) failed, becaused failed to get file size", fname);
return false;
}
Expand All @@ -870,14 +891,14 @@ bool verify_file(const std::string &fname,
return true;
}

bool verify_file_size(const std::string &fname, const int64_t &expected_fsize)
bool verify_file_size(const std::string &fname, FileDataType type, const int64_t &expected_fsize)
{
if (!file_exists(fname)) {
LOG_ERROR("file({}) is not existed", fname);
return false;
}
int64_t f_size = 0;
if (!file_size(fname, f_size)) {
if (!file_size(fname, type, f_size)) {
LOG_ERROR("verify file({}) size failed, becaused failed to get file size", fname);
return false;
}
Expand All @@ -891,22 +912,6 @@ bool verify_file_size(const std::string &fname, const int64_t &expected_fsize)
return true;
}

bool verify_data_md5(const std::string &fname,
const char *data,
const size_t data_size,
const std::string &expected_md5)
{
std::string md5 = string_md5(data, data_size);
if (md5 != expected_md5) {
LOG_ERROR("verify data({}) failed, because data damaged, size: md5: {} VS {}",
fname,
md5,
expected_md5);
return false;
}
return true;
}

bool create_directory(const std::string &path, std::string &absolute_path, std::string &err_msg)
{
FAIL_POINT_INJECT_F("filesystem_create_directory", [path](string_view str) {
Expand Down Expand Up @@ -952,23 +957,28 @@ bool check_dir_rw(const std::string &path, std::string &err_msg)
path.find(broken_disk_dir) == std::string::npos;
});

std::string fname = "read_write_test_file";
std::string fpath = path_combine(path, fname);
if (!create_file(fpath)) {
err_msg = fmt::format("Fail to create test file {}.", fpath);
static const std::string kTestValue = "test_value";
static const std::string kFname = "read_write_test_file";
std::string fpath = path_combine(path, kFname);
auto cleanup = defer([&fpath]() { remove_path(fpath); });
auto s = rocksdb::WriteStringToFile(rocksdb::Env::Default(),
rocksdb::Slice(kTestValue),
fpath,
/* should_sync */ true);
if (dsn_unlikely(!s.ok())) {
err_msg = fmt::format("fail to write file {}, err={}", fpath, s.ToString());
return false;
}

auto cleanup = defer([&fpath]() { remove_path(fpath); });
std::string value = "test_value";
if (!write_file(fpath, value)) {
err_msg = fmt::format("Fail to write file {}.", fpath);
std::string read_data;
s = rocksdb::ReadFileToString(rocksdb::Env::Default(), fpath, &read_data);
if (dsn_unlikely(!s.ok())) {
err_msg = fmt::format("fail to read file {}, err={}", fpath, s.ToString());
return false;
}

std::string buf;
if (read_file(fpath, buf) != ERR_OK || buf != value) {
err_msg = fmt::format("Fail to read file {} or get wrong value({}).", fpath, buf);
if (dsn_unlikely(read_data != kTestValue)) {
err_msg = fmt::format("get wrong value '{}' from file", read_data, fpath);
return false;
}

Expand Down
13 changes: 7 additions & 6 deletions src/utils/filesystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ enum class FileDataType;

namespace filesystem {

// TODO(yingchun): Consider using rocksdb APIs to rewrite the following functions.

int get_normalized_path(const std::string &path, std::string &npath);

bool get_absolute_path(const std::string &path1, std::string &path2);
Expand Down Expand Up @@ -136,32 +138,31 @@ bool get_disk_space_info(const std::string &path, disk_space_info &info);
bool link_file(const std::string &src, const std::string &target);

error_code md5sum(const std::string &file_path, /*out*/ std::string &result);
error_code deprecated_md5sum(const std::string &file_path, /*out*/ std::string &result);

// return value:
// - <A, B>:
// A is represent whether operation encounter some local error
// B is represent wheter the directory is empty, true means empty, otherwise false
std::pair<error_code, bool> is_directory_empty(const std::string &dirname);

// TODO(yingchun): remove it!
error_code read_file(const std::string &fname, /*out*/ std::string &buf);

// compare file metadata calculated by fname with expected md5 and file_size
bool verify_file(const std::string &fname,
FileDataType type,
const std::string &expected_md5,
const int64_t &expected_fsize);

bool verify_file_size(const std::string &fname, const int64_t &expected_fsize);

bool verify_data_md5(const std::string &fname,
const char *data,
const size_t data_size,
const std::string &expected_md5);
bool verify_file_size(const std::string &fname, FileDataType type, const int64_t &expected_fsize);

// create driectory and get absolute path
bool create_directory(const std::string &path,
/*out*/ std::string &absolute_path,
/*out*/ std::string &err_msg);

// TODO(yingchun): remove it!
bool write_file(const std::string &fname, std::string &buf);

// check if directory is readable and writable
Expand Down
Loading

0 comments on commit 1a95d07

Please sign in to comment.