Skip to content

Commit

Permalink
refactor: some minor refactors without functional changes (apache#1629)
Browse files Browse the repository at this point in the history
acelyc111 authored Oct 10, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 70d2467 commit b551e26
Showing 10 changed files with 104 additions and 94 deletions.
2 changes: 1 addition & 1 deletion .licenserc.yaml
Original file line number Diff line number Diff line change
@@ -312,7 +312,7 @@ header:
- 'src/nfs/nfs_client_impl.h'
- 'src/nfs/nfs_code_definition.h'
- 'src/nfs/nfs_node.cpp'
- 'src/nfs/nfs_node_impl.cpp'
- 'src/nfs/nfs_node_simple.cpp'
- 'src/nfs/nfs_node_simple.h'
- 'src/nfs/nfs_server_impl.cpp'
- 'src/nfs/nfs_server_impl.h'
6 changes: 4 additions & 2 deletions src/block_service/block_service.h
Original file line number Diff line number Diff line change
@@ -238,8 +238,8 @@ struct upload_request
*/
struct upload_response
{
dsn::error_code err;
uint64_t uploaded_size;
dsn::error_code err = ERR_OK;
uint64_t uploaded_size = 0;
};
typedef std::function<void(const upload_response &)> upload_callback;
typedef future_task<upload_response> upload_future;
@@ -378,6 +378,8 @@ class block_file : public dsn::ref_counter
const write_callback &cb,
dsn::task_tracker *tracker = nullptr) = 0;

// TODO(yingchun): it seems every read() will read the whole file, consider to read the whole
// file directly.
/**
* @brief read
* @param req, ref {@link #read_request}
6 changes: 4 additions & 2 deletions src/geo/bench/bench.cpp
Original file line number Diff line number Diff line change
@@ -76,10 +76,12 @@ int main(int argc, char **argv)
}
}

// TODO(yingchun): the benchmark can not exit normally, we need to fix it later.
pegasus::geo::geo_client my_geo(
"config.ini", cluster_name.c_str(), app_name.c_str(), geo_app_name.c_str());
if (!my_geo.set_max_level(max_level).is_ok()) {
std::cerr << "set_max_level failed" << std::endl;
auto err = my_geo.set_max_level(max_level);
if (!err.is_ok()) {
std::cerr << "set_max_level failed, err: " << err << std::endl;
return -1;
}

6 changes: 5 additions & 1 deletion src/meta/test/main.cpp
Original file line number Diff line number Diff line change
@@ -63,7 +63,11 @@ TEST(meta, state_sync) { g_app->state_sync_test(); }

TEST(meta, update_configuration) { g_app->update_configuration_test(); }

TEST(meta, balancer_validator) { g_app->balancer_validator(); }
TEST(meta, balancer_validator)
{
// TODO(yingchun): this test last too long time, optimize it!
g_app->balancer_validator();
}

TEST(meta, apply_balancer) { g_app->apply_balancer_test(); }

13 changes: 9 additions & 4 deletions src/nfs/nfs_node_impl.cpp → src/nfs/nfs_node_simple.cpp
Original file line number Diff line number Diff line change
@@ -80,12 +80,17 @@ void nfs_node_simple::register_async_rpc_handler_for_test()

error_code nfs_node_simple::stop()
{
delete _server;
_server = nullptr;
if (_server != nullptr) {
_server->close_service();

delete _client;
_client = nullptr;
delete _server;
_server = nullptr;
}

if (_client != nullptr) {
delete _client;
_client = nullptr;
}
return ERR_OK;
}

14 changes: 12 additions & 2 deletions src/nfs/nfs_node_simple.h
Original file line number Diff line number Diff line change
@@ -34,14 +34,24 @@
*/
#pragma once

#include "runtime/tool_api.h"
#include <memory>

#include "nfs/nfs_node.h"
#include "utils/error_code.h"

namespace dsn {
class aio_task;
template <typename TResponse>
class rpc_replier;

namespace service {

class nfs_service_impl;
class copy_request;
class copy_response;
class get_file_size_request;
class get_file_size_response;
class nfs_client_impl;
class nfs_service_impl;

class nfs_node_simple : public nfs_node
{
102 changes: 47 additions & 55 deletions src/nfs/nfs_server_impl.cpp
Original file line number Diff line number Diff line change
@@ -26,23 +26,22 @@

#include "nfs/nfs_server_impl.h"

#include <errno.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <chrono>
#include <cstdint>
#include <mutex>
#include <type_traits>
#include <vector>

#include "nfs/nfs_code_definition.h"
#include "perf_counter/perf_counter.h"
#include "runtime/api_layer1.h"
#include "runtime/task/async_calls.h"
#include "utils/TokenBucket.h"
#include "utils/env.h"
#include "utils/filesystem.h"
#include "utils/flags.h"
#include "utils/ports.h"
#include "utils/safe_strerror_posix.h"
#include "utils/string_conv.h"
#include "utils/utils.h"

@@ -90,38 +89,35 @@ void nfs_service_impl::on_copy(const ::dsn::service::copy_request &request,
dsn::utils::filesystem::path_combine(request.source_dir, request.file_name);
disk_file *dfile = nullptr;

{
do {
zauto_lock l(_handles_map_lock);
auto it = _handles_map.find(file_path); // find file handle cache first

if (it == _handles_map.end()) {
dfile = file::open(file_path.c_str(), O_RDONLY | O_BINARY, 0);
if (dfile != nullptr) {
auto fh = std::make_shared<file_handle_info_on_server>();
fh->file_handle = dfile;
fh->file_access_count = 1;
fh->last_access_time = dsn_now_ms();
_handles_map.insert(std::make_pair(file_path, std::move(fh)));
if (dfile == nullptr) {
LOG_ERROR("[nfs_service] open file {} failed", file_path);
::dsn::service::copy_response resp;
resp.error = ERR_OBJECT_NOT_FOUND;
reply(resp);
return;
}
} else {
dfile = it->second->file_handle;
it->second->file_access_count++;
it->second->last_access_time = dsn_now_ms();
}
}

LOG_DEBUG(
"nfs: copy file {} [{}, {}]", file_path, request.offset, request.offset + request.size);

if (dfile == nullptr) {
LOG_ERROR("[nfs_service] open file {} failed", file_path);
::dsn::service::copy_response resp;
resp.error = ERR_OBJECT_NOT_FOUND;
reply(resp);
return;
}

std::shared_ptr<callback_para> cp = std::make_shared<callback_para>(std::move(reply));
auto fh = std::make_shared<file_handle_info_on_server>();
fh->file_handle = dfile;
it = _handles_map.insert(std::make_pair(file_path, std::move(fh))).first;
}
dfile = it->second->file_handle;
it->second->file_access_count++;
it->second->last_access_time = dsn_now_ms();
} while (false);

CHECK_NOTNULL(dfile, "");
LOG_DEBUG("nfs: copy from file {} [{}, {}]",
file_path,
request.offset,
request.offset + request.size);

auto cp = std::make_shared<callback_para>(std::move(reply));
cp->bb = blob(dsn::utils::make_shared_array<char>(request.size), request.size);
cp->dst_dir = request.dst_dir;
cp->source_disk_tag = request.source_disk_tag;
@@ -182,58 +178,53 @@ void nfs_service_impl::on_get_file_size(
{
get_file_size_response resp;
error_code err = ERR_OK;
std::vector<std::string> file_list;
std::string folder = request.source_dir;
// TODO(yingchun): refactor the following code!
if (request.file_list.size() == 0) // return all file size in the destination file folder
{
if (!dsn::utils::filesystem::directory_exists(folder)) {
LOG_ERROR("[nfs_service] directory {} not exist", folder);
err = ERR_OBJECT_NOT_FOUND;
} else {
std::vector<std::string> file_list;
if (!dsn::utils::filesystem::get_subfiles(folder, file_list, true)) {
LOG_ERROR("[nfs_service] get subfiles of directory {} failed", folder);
err = ERR_FILE_OPERATION_FAILED;
} else {
for (auto &fpath : file_list) {
// TODO: using uint64 instead as file ma
// Done
for (const auto &fpath : file_list) {
int64_t sz;
if (!dsn::utils::filesystem::file_size(fpath, sz)) {
// TODO(yingchun): check if there are any files that are not sensitive (not
// encrypted).
if (!dsn::utils::filesystem::file_size(
fpath, dsn::utils::FileDataType::kSensitive, sz)) {
LOG_ERROR("[nfs_service] get size of file {} failed", fpath);
err = ERR_FILE_OPERATION_FAILED;
break;
}

resp.size_list.push_back((uint64_t)sz);
resp.size_list.push_back(sz);
resp.file_list.push_back(
fpath.substr(request.source_dir.length(), fpath.length() - 1));
}
file_list.clear();
}
}
} else // return file size in the request file folder
{
for (size_t i = 0; i < request.file_list.size(); i++) {
std::string file_path =
dsn::utils::filesystem::path_combine(folder, request.file_list[i]);

struct stat st;
if (0 != ::stat(file_path.c_str(), &st)) {
LOG_ERROR("[nfs_service] get stat of file {} failed, err = {}",
file_path,
dsn::utils::safe_strerror(errno));
err = ERR_OBJECT_NOT_FOUND;
for (const auto &file_name : request.file_list) {
std::string file_path = dsn::utils::filesystem::path_combine(folder, file_name);
int64_t sz;
// TODO(yingchun): check if there are any files that are not sensitive (not encrypted).
if (!dsn::utils::filesystem::file_size(
file_path, dsn::utils::FileDataType::kSensitive, sz)) {
LOG_ERROR("[nfs_service] get size of file {} failed", file_path);
err = ERR_FILE_OPERATION_FAILED;
break;
}

// TODO: using int64 instead as file may exceed the size of 32bit
// Done
uint64_t size = st.st_size;

resp.size_list.push_back(size);
resp.file_list.push_back((folder + request.file_list[i])
.substr(request.source_dir.length(),
(folder + request.file_list[i]).length() - 1));
resp.size_list.push_back(sz);
resp.file_list.push_back(
(folder + file_name)
.substr(request.source_dir.length(), (folder + file_name).length() - 1));
}
}

@@ -253,8 +244,9 @@ void nfs_service_impl::close_file() // release out-of-date file handle
dsn_now_ms() - fptr->last_access_time > (uint64_t)FLAGS_file_close_expire_time_ms) {
LOG_DEBUG("nfs: close file handle {}", it->first);
it = _handles_map.erase(it);
} else
} else {
it++;
}
}
}

12 changes: 3 additions & 9 deletions src/nfs/nfs_server_impl.h
Original file line number Diff line number Diff line change
@@ -66,7 +66,6 @@ class nfs_service_impl : public ::dsn::serverlet<nfs_service_impl>

void register_cli_commands();

// TODO(yingchun): seems nobody call it, can be removed?
void close_service()
{
unregister_rpc_handler(RPC_NFS_COPY);
@@ -107,14 +106,9 @@ class nfs_service_impl : public ::dsn::serverlet<nfs_service_impl>

struct file_handle_info_on_server
{
disk_file *file_handle;
int32_t file_access_count; // concurrent r/w count
uint64_t last_access_time; // last touch time

file_handle_info_on_server()
: file_handle(nullptr), file_access_count(0), last_access_time(0)
{
}
disk_file *file_handle = nullptr;
int32_t file_access_count = 0; // concurrent r/w count
uint64_t last_access_time = 0; // last touch time

~file_handle_info_on_server()
{
25 changes: 13 additions & 12 deletions src/server/test/pegasus_server_impl_test.cpp
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@
#include <fmt/core.h>
#include <gmock/gmock-actions.h>
#include <gmock/gmock-spec-builders.h>
// IWYU pragma: no_include <gtest/gtest-param-test.h>
// IWYU pragma: no_include <gtest/gtest-message.h>
// IWYU pragma: no_include <gtest/gtest-test-part.h>
#include <gtest/gtest.h>
@@ -124,10 +125,10 @@ class pegasus_server_impl_test : public pegasus_server_test_base
}
}

start(all_test_envs);
ASSERT_EQ(dsn::ERR_OK, start(all_test_envs));
if (is_restart) {
_server->stop(false);
start();
ASSERT_EQ(dsn::ERR_OK, _server->stop(false));
ASSERT_EQ(dsn::ERR_OK, start());
}

std::map<std::string, std::string> query_envs;
@@ -145,20 +146,20 @@ class pegasus_server_impl_test : public pegasus_server_test_base

TEST_F(pegasus_server_impl_test, test_table_level_slow_query)
{
start();
ASSERT_EQ(dsn::ERR_OK, start());
test_table_level_slow_query();
}

TEST_F(pegasus_server_impl_test, default_data_version)
{
start();
ASSERT_EQ(dsn::ERR_OK, start());
ASSERT_EQ(_server->_pegasus_data_version, 1);
}

TEST_F(pegasus_server_impl_test, test_open_db_with_latest_options)
{
// open a new db with no app env.
start();
ASSERT_EQ(dsn::ERR_OK, start());
ASSERT_EQ(ROCKSDB_ENV_USAGE_SCENARIO_NORMAL, _server->_usage_scenario);
// set bulk_load scenario for the db.
ASSERT_TRUE(_server->set_usage_scenario(ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD));
@@ -167,8 +168,8 @@ TEST_F(pegasus_server_impl_test, test_open_db_with_latest_options)
ASSERT_EQ(1000000000, opts.level0_file_num_compaction_trigger);
ASSERT_EQ(true, opts.disable_auto_compactions);
// reopen the db.
_server->stop(false);
start();
ASSERT_EQ(dsn::ERR_OK, _server->stop(false));
ASSERT_EQ(dsn::ERR_OK, start());
ASSERT_EQ(ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD, _server->_usage_scenario);
ASSERT_EQ(opts.level0_file_num_compaction_trigger,
_server->_db->GetOptions().level0_file_num_compaction_trigger);
@@ -179,7 +180,7 @@ TEST_F(pegasus_server_impl_test, test_open_db_with_app_envs)
{
std::map<std::string, std::string> envs;
envs[ROCKSDB_ENV_USAGE_SCENARIO_KEY] = ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD;
start(envs);
ASSERT_EQ(dsn::ERR_OK, start(envs));
ASSERT_EQ(ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD, _server->_usage_scenario);
}

@@ -197,16 +198,16 @@ TEST_F(pegasus_server_impl_test, test_restart_db_with_rocksdb_envs)

TEST_F(pegasus_server_impl_test, test_stop_db_twice)
{
start();
ASSERT_EQ(dsn::ERR_OK, start());
ASSERT_TRUE(_server->_is_open);
ASSERT_TRUE(_server->_db != nullptr);

_server->stop(false);
ASSERT_EQ(dsn::ERR_OK, _server->stop(false));
ASSERT_FALSE(_server->_is_open);
ASSERT_TRUE(_server->_db == nullptr);

// stop again
_server->stop(false);
ASSERT_EQ(dsn::ERR_OK, _server->stop(false));
ASSERT_FALSE(_server->_is_open);
ASSERT_TRUE(_server->_db == nullptr);
}
12 changes: 6 additions & 6 deletions src/test/function_test/base_api/test_copy.cpp
Original file line number Diff line number Diff line change
@@ -98,9 +98,9 @@ class copy_data_test : public test_util
ASSERT_EQ(dsn::ERR_OK,
ddl_client_->create_app(
destination_app_name, "pegasus", default_partitions, 3, {}, false));
srouce_client_ =
source_client_ =
pegasus_client_factory::get_client(cluster_name_.c_str(), source_app_name.c_str());
ASSERT_NE(nullptr, srouce_client_);
ASSERT_NE(nullptr, source_client_);
destination_client_ =
pegasus_client_factory::get_client(cluster_name_.c_str(), destination_app_name.c_str());
ASSERT_NE(nullptr, destination_client_);
@@ -132,7 +132,7 @@ class copy_data_test : public test_util
while (expect_data_[empty_hash_key].size() < 1000) {
sort_key = random_string();
value = random_string();
ASSERT_EQ(PERR_OK, srouce_client_->set(empty_hash_key, sort_key, value))
ASSERT_EQ(PERR_OK, source_client_->set(empty_hash_key, sort_key, value))
<< "hash_key=" << hash_key << ", sort_key=" << sort_key;
expect_data_[empty_hash_key][sort_key] = value;
}
@@ -142,7 +142,7 @@ class copy_data_test : public test_util
while (expect_data_[hash_key].size() < 10) {
sort_key = random_string();
value = random_string();
ASSERT_EQ(PERR_OK, srouce_client_->set(hash_key, sort_key, value))
ASSERT_EQ(PERR_OK, source_client_->set(hash_key, sort_key, value))
<< "hash_key=" << hash_key << ", sort_key=" << sort_key;
expect_data_[hash_key][sort_key] = value;
}
@@ -163,7 +163,7 @@ class copy_data_test : public test_util
char buffer_[256];
map<string, map<string, string>> expect_data_;

pegasus_client *srouce_client_;
pegasus_client *source_client_;
pegasus_client *destination_client_;
};
const char copy_data_test::CCH[] =
@@ -176,7 +176,7 @@ TEST_F(copy_data_test, EMPTY_HASH_KEY_COPY)
pegasus_client::scan_options options;
options.return_expire_ts = true;
vector<pegasus::pegasus_client::pegasus_scanner *> raw_scanners;
ASSERT_EQ(PERR_OK, srouce_client_->get_unordered_scanners(INT_MAX, options, raw_scanners));
ASSERT_EQ(PERR_OK, source_client_->get_unordered_scanners(INT_MAX, options, raw_scanners));

LOG_INFO("open source app scanner succeed, partition_count = {}", raw_scanners.size());

0 comments on commit b551e26

Please sign in to comment.