Skip to content

Commit

Permalink
refactor: some minor refactors without functional changes (#1629)
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 authored Oct 10, 2023
1 parent 70d2467 commit b551e26
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 94 deletions.
2 changes: 1 addition & 1 deletion .licenserc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ header:
- 'src/nfs/nfs_client_impl.h'
- 'src/nfs/nfs_code_definition.h'
- 'src/nfs/nfs_node.cpp'
- 'src/nfs/nfs_node_impl.cpp'
- 'src/nfs/nfs_node_simple.cpp'
- 'src/nfs/nfs_node_simple.h'
- 'src/nfs/nfs_server_impl.cpp'
- 'src/nfs/nfs_server_impl.h'
Expand Down
6 changes: 4 additions & 2 deletions src/block_service/block_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,8 @@ struct upload_request
*/
struct upload_response
{
dsn::error_code err;
uint64_t uploaded_size;
dsn::error_code err = ERR_OK;
uint64_t uploaded_size = 0;
};
typedef std::function<void(const upload_response &)> upload_callback;
typedef future_task<upload_response> upload_future;
Expand Down Expand Up @@ -378,6 +378,8 @@ class block_file : public dsn::ref_counter
const write_callback &cb,
dsn::task_tracker *tracker = nullptr) = 0;

// TODO(yingchun): it seems every read() will read the whole file, consider to read the whole
// file directly.
/**
* @brief read
* @param req, ref {@link #read_request}
Expand Down
6 changes: 4 additions & 2 deletions src/geo/bench/bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,12 @@ int main(int argc, char **argv)
}
}

// TODO(yingchun): the benchmark can not exit normally, we need to fix it later.
pegasus::geo::geo_client my_geo(
"config.ini", cluster_name.c_str(), app_name.c_str(), geo_app_name.c_str());
if (!my_geo.set_max_level(max_level).is_ok()) {
std::cerr << "set_max_level failed" << std::endl;
auto err = my_geo.set_max_level(max_level);
if (!err.is_ok()) {
std::cerr << "set_max_level failed, err: " << err << std::endl;
return -1;
}

Expand Down
6 changes: 5 additions & 1 deletion src/meta/test/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ TEST(meta, state_sync) { g_app->state_sync_test(); }

TEST(meta, update_configuration) { g_app->update_configuration_test(); }

TEST(meta, balancer_validator) { g_app->balancer_validator(); }
TEST(meta, balancer_validator)
{
// TODO(yingchun): this test last too long time, optimize it!
g_app->balancer_validator();
}

TEST(meta, apply_balancer) { g_app->apply_balancer_test(); }

Expand Down
13 changes: 9 additions & 4 deletions src/nfs/nfs_node_impl.cpp → src/nfs/nfs_node_simple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,17 @@ void nfs_node_simple::register_async_rpc_handler_for_test()

error_code nfs_node_simple::stop()
{
delete _server;
_server = nullptr;
if (_server != nullptr) {
_server->close_service();

delete _client;
_client = nullptr;
delete _server;
_server = nullptr;
}

if (_client != nullptr) {
delete _client;
_client = nullptr;
}
return ERR_OK;
}

Expand Down
14 changes: 12 additions & 2 deletions src/nfs/nfs_node_simple.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,24 @@
*/
#pragma once

#include "runtime/tool_api.h"
#include <memory>

#include "nfs/nfs_node.h"
#include "utils/error_code.h"

namespace dsn {
class aio_task;
template <typename TResponse>
class rpc_replier;

namespace service {

class nfs_service_impl;
class copy_request;
class copy_response;
class get_file_size_request;
class get_file_size_response;
class nfs_client_impl;
class nfs_service_impl;

class nfs_node_simple : public nfs_node
{
Expand Down
102 changes: 47 additions & 55 deletions src/nfs/nfs_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,22 @@

#include "nfs/nfs_server_impl.h"

#include <errno.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <chrono>
#include <cstdint>
#include <mutex>
#include <type_traits>
#include <vector>

#include "nfs/nfs_code_definition.h"
#include "perf_counter/perf_counter.h"
#include "runtime/api_layer1.h"
#include "runtime/task/async_calls.h"
#include "utils/TokenBucket.h"
#include "utils/env.h"
#include "utils/filesystem.h"
#include "utils/flags.h"
#include "utils/ports.h"
#include "utils/safe_strerror_posix.h"
#include "utils/string_conv.h"
#include "utils/utils.h"

Expand Down Expand Up @@ -90,38 +89,35 @@ void nfs_service_impl::on_copy(const ::dsn::service::copy_request &request,
dsn::utils::filesystem::path_combine(request.source_dir, request.file_name);
disk_file *dfile = nullptr;

{
do {
zauto_lock l(_handles_map_lock);
auto it = _handles_map.find(file_path); // find file handle cache first

if (it == _handles_map.end()) {
dfile = file::open(file_path.c_str(), O_RDONLY | O_BINARY, 0);
if (dfile != nullptr) {
auto fh = std::make_shared<file_handle_info_on_server>();
fh->file_handle = dfile;
fh->file_access_count = 1;
fh->last_access_time = dsn_now_ms();
_handles_map.insert(std::make_pair(file_path, std::move(fh)));
if (dfile == nullptr) {
LOG_ERROR("[nfs_service] open file {} failed", file_path);
::dsn::service::copy_response resp;
resp.error = ERR_OBJECT_NOT_FOUND;
reply(resp);
return;
}
} else {
dfile = it->second->file_handle;
it->second->file_access_count++;
it->second->last_access_time = dsn_now_ms();
}
}

LOG_DEBUG(
"nfs: copy file {} [{}, {}]", file_path, request.offset, request.offset + request.size);

if (dfile == nullptr) {
LOG_ERROR("[nfs_service] open file {} failed", file_path);
::dsn::service::copy_response resp;
resp.error = ERR_OBJECT_NOT_FOUND;
reply(resp);
return;
}

std::shared_ptr<callback_para> cp = std::make_shared<callback_para>(std::move(reply));
auto fh = std::make_shared<file_handle_info_on_server>();
fh->file_handle = dfile;
it = _handles_map.insert(std::make_pair(file_path, std::move(fh))).first;
}
dfile = it->second->file_handle;
it->second->file_access_count++;
it->second->last_access_time = dsn_now_ms();
} while (false);

CHECK_NOTNULL(dfile, "");
LOG_DEBUG("nfs: copy from file {} [{}, {}]",
file_path,
request.offset,
request.offset + request.size);

auto cp = std::make_shared<callback_para>(std::move(reply));
cp->bb = blob(dsn::utils::make_shared_array<char>(request.size), request.size);
cp->dst_dir = request.dst_dir;
cp->source_disk_tag = request.source_disk_tag;
Expand Down Expand Up @@ -182,58 +178,53 @@ void nfs_service_impl::on_get_file_size(
{
get_file_size_response resp;
error_code err = ERR_OK;
std::vector<std::string> file_list;
std::string folder = request.source_dir;
// TODO(yingchun): refactor the following code!
if (request.file_list.size() == 0) // return all file size in the destination file folder
{
if (!dsn::utils::filesystem::directory_exists(folder)) {
LOG_ERROR("[nfs_service] directory {} not exist", folder);
err = ERR_OBJECT_NOT_FOUND;
} else {
std::vector<std::string> file_list;
if (!dsn::utils::filesystem::get_subfiles(folder, file_list, true)) {
LOG_ERROR("[nfs_service] get subfiles of directory {} failed", folder);
err = ERR_FILE_OPERATION_FAILED;
} else {
for (auto &fpath : file_list) {
// TODO: using uint64 instead as file ma
// Done
for (const auto &fpath : file_list) {
int64_t sz;
if (!dsn::utils::filesystem::file_size(fpath, sz)) {
// TODO(yingchun): check if there are any files that are not sensitive (not
// encrypted).
if (!dsn::utils::filesystem::file_size(
fpath, dsn::utils::FileDataType::kSensitive, sz)) {
LOG_ERROR("[nfs_service] get size of file {} failed", fpath);
err = ERR_FILE_OPERATION_FAILED;
break;
}

resp.size_list.push_back((uint64_t)sz);
resp.size_list.push_back(sz);
resp.file_list.push_back(
fpath.substr(request.source_dir.length(), fpath.length() - 1));
}
file_list.clear();
}
}
} else // return file size in the request file folder
{
for (size_t i = 0; i < request.file_list.size(); i++) {
std::string file_path =
dsn::utils::filesystem::path_combine(folder, request.file_list[i]);

struct stat st;
if (0 != ::stat(file_path.c_str(), &st)) {
LOG_ERROR("[nfs_service] get stat of file {} failed, err = {}",
file_path,
dsn::utils::safe_strerror(errno));
err = ERR_OBJECT_NOT_FOUND;
for (const auto &file_name : request.file_list) {
std::string file_path = dsn::utils::filesystem::path_combine(folder, file_name);
int64_t sz;
// TODO(yingchun): check if there are any files that are not sensitive (not encrypted).
if (!dsn::utils::filesystem::file_size(
file_path, dsn::utils::FileDataType::kSensitive, sz)) {
LOG_ERROR("[nfs_service] get size of file {} failed", file_path);
err = ERR_FILE_OPERATION_FAILED;
break;
}

// TODO: using int64 instead as file may exceed the size of 32bit
// Done
uint64_t size = st.st_size;

resp.size_list.push_back(size);
resp.file_list.push_back((folder + request.file_list[i])
.substr(request.source_dir.length(),
(folder + request.file_list[i]).length() - 1));
resp.size_list.push_back(sz);
resp.file_list.push_back(
(folder + file_name)
.substr(request.source_dir.length(), (folder + file_name).length() - 1));
}
}

Expand All @@ -253,8 +244,9 @@ void nfs_service_impl::close_file() // release out-of-date file handle
dsn_now_ms() - fptr->last_access_time > (uint64_t)FLAGS_file_close_expire_time_ms) {
LOG_DEBUG("nfs: close file handle {}", it->first);
it = _handles_map.erase(it);
} else
} else {
it++;
}
}
}

Expand Down
12 changes: 3 additions & 9 deletions src/nfs/nfs_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ class nfs_service_impl : public ::dsn::serverlet<nfs_service_impl>

void register_cli_commands();

// TODO(yingchun): seems nobody call it, can be removed?
void close_service()
{
unregister_rpc_handler(RPC_NFS_COPY);
Expand Down Expand Up @@ -107,14 +106,9 @@ class nfs_service_impl : public ::dsn::serverlet<nfs_service_impl>

struct file_handle_info_on_server
{
disk_file *file_handle;
int32_t file_access_count; // concurrent r/w count
uint64_t last_access_time; // last touch time

file_handle_info_on_server()
: file_handle(nullptr), file_access_count(0), last_access_time(0)
{
}
disk_file *file_handle = nullptr;
int32_t file_access_count = 0; // concurrent r/w count
uint64_t last_access_time = 0; // last touch time

~file_handle_info_on_server()
{
Expand Down
25 changes: 13 additions & 12 deletions src/server/test/pegasus_server_impl_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <fmt/core.h>
#include <gmock/gmock-actions.h>
#include <gmock/gmock-spec-builders.h>
// IWYU pragma: no_include <gtest/gtest-param-test.h>
// IWYU pragma: no_include <gtest/gtest-message.h>
// IWYU pragma: no_include <gtest/gtest-test-part.h>
#include <gtest/gtest.h>
Expand Down Expand Up @@ -124,10 +125,10 @@ class pegasus_server_impl_test : public pegasus_server_test_base
}
}

start(all_test_envs);
ASSERT_EQ(dsn::ERR_OK, start(all_test_envs));
if (is_restart) {
_server->stop(false);
start();
ASSERT_EQ(dsn::ERR_OK, _server->stop(false));
ASSERT_EQ(dsn::ERR_OK, start());
}

std::map<std::string, std::string> query_envs;
Expand All @@ -145,20 +146,20 @@ class pegasus_server_impl_test : public pegasus_server_test_base

TEST_F(pegasus_server_impl_test, test_table_level_slow_query)
{
start();
ASSERT_EQ(dsn::ERR_OK, start());
test_table_level_slow_query();
}

TEST_F(pegasus_server_impl_test, default_data_version)
{
start();
ASSERT_EQ(dsn::ERR_OK, start());
ASSERT_EQ(_server->_pegasus_data_version, 1);
}

TEST_F(pegasus_server_impl_test, test_open_db_with_latest_options)
{
// open a new db with no app env.
start();
ASSERT_EQ(dsn::ERR_OK, start());
ASSERT_EQ(ROCKSDB_ENV_USAGE_SCENARIO_NORMAL, _server->_usage_scenario);
// set bulk_load scenario for the db.
ASSERT_TRUE(_server->set_usage_scenario(ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD));
Expand All @@ -167,8 +168,8 @@ TEST_F(pegasus_server_impl_test, test_open_db_with_latest_options)
ASSERT_EQ(1000000000, opts.level0_file_num_compaction_trigger);
ASSERT_EQ(true, opts.disable_auto_compactions);
// reopen the db.
_server->stop(false);
start();
ASSERT_EQ(dsn::ERR_OK, _server->stop(false));
ASSERT_EQ(dsn::ERR_OK, start());
ASSERT_EQ(ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD, _server->_usage_scenario);
ASSERT_EQ(opts.level0_file_num_compaction_trigger,
_server->_db->GetOptions().level0_file_num_compaction_trigger);
Expand All @@ -179,7 +180,7 @@ TEST_F(pegasus_server_impl_test, test_open_db_with_app_envs)
{
std::map<std::string, std::string> envs;
envs[ROCKSDB_ENV_USAGE_SCENARIO_KEY] = ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD;
start(envs);
ASSERT_EQ(dsn::ERR_OK, start(envs));
ASSERT_EQ(ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD, _server->_usage_scenario);
}

Expand All @@ -197,16 +198,16 @@ TEST_F(pegasus_server_impl_test, test_restart_db_with_rocksdb_envs)

TEST_F(pegasus_server_impl_test, test_stop_db_twice)
{
start();
ASSERT_EQ(dsn::ERR_OK, start());
ASSERT_TRUE(_server->_is_open);
ASSERT_TRUE(_server->_db != nullptr);

_server->stop(false);
ASSERT_EQ(dsn::ERR_OK, _server->stop(false));
ASSERT_FALSE(_server->_is_open);
ASSERT_TRUE(_server->_db == nullptr);

// stop again
_server->stop(false);
ASSERT_EQ(dsn::ERR_OK, _server->stop(false));
ASSERT_FALSE(_server->_is_open);
ASSERT_TRUE(_server->_db == nullptr);
}
Expand Down
Loading

0 comments on commit b551e26

Please sign in to comment.