From e7a1d0b2b9028e81202fe2801914017180201911 Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Thu, 4 Jan 2024 15:13:57 +0800 Subject: [PATCH 1/8] chore(github): add contributor limowang into apache collaborators to use github actions (#1828) --- .asf.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.asf.yaml b/.asf.yaml index 507676fca1..6eecde77af 100644 --- a/.asf.yaml +++ b/.asf.yaml @@ -67,6 +67,7 @@ github: - ruojieranyishen - ninsmiracle - Samunroyu + - limowang notifications: commits: commits@pegasus.apache.org From 74631839294075d6639bef653c7420c1e18129ce Mon Sep 17 00:00:00 2001 From: Yingchun Lai Date: Thu, 4 Jan 2024 15:46:13 +0800 Subject: [PATCH 2/8] chore: Update .gitignore (#1823) --- .gitignore | 210 ++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 142 insertions(+), 68 deletions(-) diff --git a/.gitignore b/.gitignore index 18b818638c..0c8b005409 100644 --- a/.gitignore +++ b/.gitignore @@ -24,7 +24,6 @@ src/runtime/dsn.layer2_types.cpp onebox/ .zk_install/ *.data/ -config-*.ini .kill_test.shell.* pegasus_kill_test.log kill_history.txt @@ -142,121 +141,196 @@ java-client/src/main/java/org/apache/pegasus/apps/* java-client/src/main/java/org/apache/pegasus/replication/* java-client/.flattened-pom.xml +# ============ # +# scala-client # +# ============ # +scala-client/project/project/ +scala-client/project/target/ + +# From https://github.com/github/gitignore/blob/main/Scala.gitignore +scala-client/**/*.class +scala-client/**/*.log + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +scala-client/**/hs_err_pid* + # Created by .ignore support plugin (hsz.mobi) ### SBT template # Simple Build Tool # http://www.scala-sbt.org/release/docs/Getting-Started/Directories.html#configuring-version-control - scala-client/target/ -### Scala template -scala-client/*.class -scala-client/*.log - scala-client/.idea/ scala-client/.iml scala-client/rolling_log/ +# ============= # +# python-client # +# ============= # +python-client/pypegasus/replication/ +python-client/pypegasus/rrdb/ +python-client/tests/_trial_temp/ + +# From https://github.com/github/gitignore/blob/main/Python.gitignore # Byte-compiled / optimized / DLL files -python-client/__pycache__/ -python-client/*.py[cod] -python-client/*$py.class +python-client/**/__pycache__/ +python-client/**/*.py[cod] +python-client/**/*$py.class # C extensions -python-client/*.so +python-client/**/*.so # Distribution / packaging -python-client/.Python -python-client/env/ -python-client/build/ -python-client/develop-eggs/ -python-client/dist/ -python-client/downloads/ -python-client/eggs/ -python-client/.eggs/ -python-client/lib/ -python-client/lib64/ -python-client/parts/ -python-client/sdist/ -python-client/var/ -python-client/wheels/ -python-client/*.egg-info/ -python-client/.installed.cfg -python-client/*.egg +python-client/**/.Python +python-client/**/build/ +python-client/**/develop-eggs/ +python-client/**/dist/ +python-client/**/downloads/ +python-client/**/eggs/ +python-client/**/.eggs/ +python-client/**/lib/ +python-client/**/lib64/ +python-client/**/parts/ +python-client/**/sdist/ +python-client/**/var/ +python-client/**/wheels/ +python-client/**/share/python-wheels/ +python-client/**/*.egg-info/ +python-client/**/.installed.cfg +python-client/**/*.egg +python-client/**/MANIFEST # PyInstaller # Usually these files are written by a python script from a template # before PyInstaller builds the exe, so as to inject date/other infos into it. -python-client/*.manifest -python-client/*.spec +python-client/**/*.manifest +python-client/**/*.spec # Installer logs -python-client/pip-log.txt -python-client/pip-delete-this-directory.txt +python-client/**/pip-log.txt +python-client/**/pip-delete-this-directory.txt # Unit test / coverage reports -python-client/htmlcov/ -python-client/.tox/ -python-client/.coverage -python-client/.coverage.* -python-client/.cache -python-client/nosetests.xml -python-client/coverage.xml -python-client/*.cover -python-client/.hypothesis/ +python-client/**/htmlcov/ +python-client/**/.tox/ +python-client/**/.nox/ +python-client/**/.coverage +python-client/**/.coverage.* +python-client/**/.cache +python-client/**/nosetests.xml +python-client/**/coverage.xml +python-client/**/*.cover +python-client/**/*.py,cover +python-client/**/.hypothesis/ +python-client/**/.pytest_cache/ +python-client/**/cover/ # Translations -python-client/*.mo -python-client/*.pot +python-client/**/*.mo +python-client/**/*.pot # Django stuff: -python-client/*.log -python-client/local_settings.py +python-client/**/*.log +python-client/**/local_settings.py +python-client/**/db.sqlite3 +python-client/**/db.sqlite3-journal # Flask stuff: -python-client/instance/ -python-client/.webassets-cache +python-client/**/instance/ +python-client/**/.webassets-cache -python-client/# Scrapy stuff: -python-client/.scrapy +# Scrapy stuff: +python-client/**/.scrapy # Sphinx documentation -python-client/docs/_build/ +python-client/**/docs/_build/ # PyBuilder -python-client/target/ +python-client/**/.pybuilder/ +python-client/**/target/ # Jupyter Notebook -python-client/.ipynb_checkpoints +python-client/**/.ipynb_checkpoints -# pyenv -python-client/.python-version +# IPython +python-client/**/profile_default/ +python-client/**/ipython_config.py -# celery beat schedule file -python-client/celerybeat-schedule +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +python-client/**/.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +python-client/**/Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +python-client/**/poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +python-client/**/pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +python-client/**/.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +python-client/**/__pypackages__/ + +# Celery stuff +python-client/**/celerybeat-schedule +python-client/**/celerybeat.pid # SageMath parsed files -python-client/*.sage.py - -# dotenv -python-client/.env +python-client/**/*.sage.py -# virtualenv -python-client/.venv -python-client/venv/ -python-client/ENV/ +# Environments +python-client/**/.env +python-client/**/.venv +python-client/**/env/ +python-client/**/venv/ +python-client/**/ENV/ +python-client/**/env.bak/ +python-client/**/venv.bak/ # Spyder project settings -python-client/.spyderproject -python-client/.spyproject +python-client/**/.spyderproject +python-client/**/.spyproject # Rope project settings -python-client/.ropeproject +python-client/**/.ropeproject # mkdocs documentation -python-client/site +python-client/**//site # mypy -python-client/.mypy_cache/ +python-client/**/.mypy_cache/ +python-client/**/.dmypy.json +python-client/**/dmypy.json + +# Pyre type checker +python-client/**/.pyre/ + +# pytype static type analyzer +python-client/**/.pytype/ + +# Cython debug symbols +python-client/**/cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +python-client/**/.idea/ .arcconfig *.log* From 82d981f51cf5e7b22971f9ac3f26f02a063600fe Mon Sep 17 00:00:00 2001 From: Yingchun Lai Date: Thu, 4 Jan 2024 18:55:23 +0800 Subject: [PATCH 3/8] fix(logging): remove too verbose log when key not found (#1829) Don't log "client read encountered an unhandled error: 1" when the key is not found. --- src/replica/replica.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp index 62cd71ae94..3274dc0817 100644 --- a/src/replica/replica.cpp +++ b/src/replica/replica.cpp @@ -438,7 +438,10 @@ void replica::on_client_read(dsn::message_ex *request, bool ignore_throttling) CHECK(_app, ""); auto storage_error = _app->on_request(request); - if (dsn_unlikely(storage_error != ERR_OK)) { + // kNotFound is normal, it indicates that the key is not found (including expired) + // in the storage engine, so just ignore it. + if (dsn_unlikely(storage_error != rocksdb::Status::kOk && + storage_error != rocksdb::Status::kNotFound)) { switch (storage_error) { // TODO(yingchun): Now only kCorruption and kIOError are dealt, consider to deal with // more storage engine errors. From 9300a2926589fa2dad8527e0f35ccaf6ab7be0be Mon Sep 17 00:00:00 2001 From: Yingchun Lai Date: Fri, 5 Jan 2024 14:37:27 +0800 Subject: [PATCH 4/8] fix(build): Fix Cpp build errors in recent github actions (#1834) Mute errors like the follows, the Pegasus codebase will be checked by clang-format, so it's safe to enable the CMake option (-Wno-misleading-indentation). ``` # .../thirdparty/output/include/boost/lexical_cast/detail/lcast_unsigned_converters.hpp:149:17: error: this 'while' clause does not guard... [-Werror=misleading-indentation] # 149 | inline CharT* main_convert_loop() BOOST_NOEXCEPT { # | ^~~~~ ``` --- cmake_modules/BaseFunctions.cmake | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/cmake_modules/BaseFunctions.cmake b/cmake_modules/BaseFunctions.cmake index b1dff9b601..33c819fbb5 100644 --- a/cmake_modules/BaseFunctions.cmake +++ b/cmake_modules/BaseFunctions.cmake @@ -222,6 +222,13 @@ function(dsn_setup_compiler_flags) add_compile_options(-Wno-deprecated-declarations) add_compile_options(-Wno-inconsistent-missing-override) add_compile_options(-Wno-attributes) + + # Mute errors like the follows, the Pegasus codebase will be checked by clang-format, so it's safe to mute it. + # /__w/incubator-pegasus/incubator-pegasus/thirdparty/output/include/boost/lexical_cast/detail/lcast_unsigned_converters.hpp:149:17: error: this 'while' clause does not guard... [-Werror=misleading-indentation] + # 149 | inline CharT* main_convert_loop() BOOST_NOEXCEPT { + # | ^~~~~ + add_compile_options(-Wno-misleading-indentation) + # -fno-omit-frame-pointer # use frame pointers to allow simple stack frame walking for backtraces. # This has a small perf hit but worth it for the ability to profile in production From 14ca78e7ec3ea3dd70f6f16fc45d89f6b7dfcf5e Mon Sep 17 00:00:00 2001 From: Yingchun Lai Date: Fri, 5 Jan 2024 14:45:16 +0800 Subject: [PATCH 5/8] refactor(json_file): Refactor the load and dump functions of JSON objects (#1818) This patch mainly refactoring the objects load and dump functions. There are 2 types of objects, one is using DEFINE_JSON_SERIALIZATION macro to make the object can be converted to/from JSON string which corresponding to RapidJson, the other is using NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE macro which corresponding to NlohmannJson. So there are 2 types of functions (i.e. load_rjobj_from_file/dump_rjobj_to_file and load_njobj_from_file/dump_njobj_to_file). This patch refactor the code to use the newly added functions to do the operations, and also add some unit tests. --- src/block_service/local/local_service.cpp | 50 +----- src/block_service/local/local_service.h | 8 +- src/block_service/test/local_service_test.cpp | 8 +- src/replica/bulk_load/replica_bulk_loader.cpp | 24 +-- src/replica/bulk_load/replica_bulk_loader.h | 3 +- .../test/replica_bulk_loader_test.cpp | 25 +-- src/replica/replica_disk_migrator.cpp | 5 +- src/replica/replica_restore.cpp | 38 +---- src/replica/replication_app_base.cpp | 80 ++------- src/replica/replication_app_base.h | 58 +------ .../function_test/bulk_load/CMakeLists.txt | 1 + .../bulk_load/test_bulk_load.cpp | 34 +--- src/utils/filesystem.cpp | 2 +- src/utils/fmt_logging.h | 16 +- src/utils/load_dump_object.h | 155 ++++++++++++++++++ src/utils/test/load_dump_object_test.cpp | 115 +++++++++++++ 16 files changed, 357 insertions(+), 265 deletions(-) create mode 100644 src/utils/load_dump_object.h create mode 100644 src/utils/test/load_dump_object_test.cpp diff --git a/src/block_service/local/local_service.cpp b/src/block_service/local/local_service.cpp index 9977fbe943..f0dc35f141 100644 --- a/src/block_service/local/local_service.cpp +++ b/src/block_service/local/local_service.cpp @@ -16,15 +16,14 @@ // under the License. #include -#include #include #include #include #include +#include "absl/strings/string_view.h" #include "local_service.h" #include "nlohmann/json.hpp" -#include "nlohmann/json_fwd.hpp" #include "rocksdb/slice.h" #include "rocksdb/status.h" #include "runtime/task/async_calls.h" @@ -36,7 +35,7 @@ #include "utils/filesystem.h" #include "utils/flags.h" #include "utils/fmt_logging.h" -#include "absl/strings/string_view.h" +#include "utils/load_dump_object.h" #include "utils/strings.h" DSN_DECLARE_bool(enable_direct_io); @@ -53,41 +52,6 @@ namespace block_service { DEFINE_TASK_CODE(LPC_LOCAL_SERVICE_CALL, TASK_PRIORITY_COMMON, THREAD_POOL_BLOCK_SERVICE) -error_code file_metadata::dump_to_file(const std::string &file_path) const -{ - std::string data = nlohmann::json(*this).dump(); - auto s = - rocksdb::WriteStringToFile(dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive), - rocksdb::Slice(data), - file_path, - /* should_sync */ true); - if (!s.ok()) { - LOG_WARNING("write to metadata file '{}' failed, err = {}", file_path, s.ToString()); - return ERR_FS_INTERNAL; - } - - return ERR_OK; -} - -error_code file_metadata::load_from_file(const std::string &file_path) -{ - std::string data; - auto s = rocksdb::ReadFileToString( - dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive), file_path, &data); - if (!s.ok()) { - LOG_WARNING("load from metadata file '{}' failed, err = {}", file_path, s.ToString()); - return ERR_FS_INTERNAL; - } - - try { - nlohmann::json::parse(data).get_to(*this); - return ERR_OK; - } catch (nlohmann::json::exception &exp) { - LOG_WARNING("decode metadata from json failed: {}, data = [{}]", exp.what(), data); - return ERR_FS_INTERNAL; - } -} - std::string local_service::get_metafile(const std::string &filepath) { std::string dir_part = utils::filesystem::remove_file_name(filepath); @@ -294,7 +258,7 @@ error_code local_file_object::load_metadata() file_metadata fmd; std::string filepath = local_service::get_metafile(file_name()); - auto ec = fmd.load_from_file(filepath); + auto ec = dsn::utils::load_njobj_from_file(filepath, &fmd); if (ec != ERR_OK) { LOG_WARNING("load metadata file '{}' failed", filepath); return ERR_FS_INTERNAL; @@ -356,8 +320,8 @@ dsn::task_ptr local_file_object::write(const write_request &req, // a lot, but it is somewhat not correct. _size = resp.written_size; _md5_value = utils::string_md5(req.buffer.data(), req.buffer.length()); - auto err = file_metadata(_size, _md5_value) - .dump_to_file(local_service::get_metafile(file_name())); + auto err = dsn::utils::dump_njobj_to_file(file_metadata(_size, _md5_value), + local_service::get_metafile(file_name())); if (err != ERR_OK) { LOG_ERROR("file_metadata write failed"); resp.err = ERR_FS_INTERNAL; @@ -498,8 +462,8 @@ dsn::task_ptr local_file_object::upload(const upload_request &req, break; } - auto err = file_metadata(_size, _md5_value) - .dump_to_file(local_service::get_metafile(file_name())); + auto err = dsn::utils::dump_njobj_to_file(file_metadata(_size, _md5_value), + local_service::get_metafile(file_name())); if (err != ERR_OK) { LOG_ERROR("file_metadata write failed"); resp.err = ERR_FS_INTERNAL; diff --git a/src/block_service/local/local_service.h b/src/block_service/local/local_service.h index 0c1a5ee883..c67b9913e3 100644 --- a/src/block_service/local/local_service.h +++ b/src/block_service/local/local_service.h @@ -41,14 +41,8 @@ struct file_metadata std::string md5; file_metadata(int64_t s = 0, const std::string &m = "") : size(s), md5(m) {} - - // Dump the object to a file in JSON format. - error_code dump_to_file(const std::string &file_path) const; - - // Load the object from a file in JSON format. - error_code load_from_file(const std::string &file_path); }; -NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(file_metadata, size, md5) +NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(file_metadata, size, md5); class local_service : public block_filesystem { diff --git a/src/block_service/test/local_service_test.cpp b/src/block_service/test/local_service_test.cpp index 5eeb8dd0fe..9cbb7eeda6 100644 --- a/src/block_service/test/local_service_test.cpp +++ b/src/block_service/test/local_service_test.cpp @@ -32,6 +32,7 @@ #include "test_util/test_util.h" #include "utils/env.h" #include "utils/error_code.h" +#include "utils/load_dump_object.h" namespace dsn { namespace dist { @@ -49,11 +50,11 @@ TEST_P(local_service_test, file_metadata) const int64_t kSize = 12345; const std::string kMD5 = "0123456789abcdef0123456789abcdef"; auto meta_file_path = local_service::get_metafile("a.txt"); - ASSERT_EQ(ERR_OK, file_metadata(kSize, kMD5).dump_to_file(meta_file_path)); + ASSERT_EQ(ERR_OK, dsn::utils::dump_njobj_to_file(file_metadata(kSize, kMD5), meta_file_path)); ASSERT_TRUE(boost::filesystem::exists(meta_file_path)); file_metadata fm; - fm.load_from_file(meta_file_path); + ASSERT_EQ(ERR_OK, dsn::utils::load_njobj_from_file(meta_file_path, &fm)); ASSERT_EQ(kSize, fm.size); ASSERT_EQ(kMD5, fm.md5); } @@ -64,7 +65,8 @@ TEST_P(local_service_test, load_metadata) auto meta_file_path = local_service::get_metafile(file.file_name()); { - ASSERT_EQ(ERR_OK, file_metadata(5, "abcde").dump_to_file(meta_file_path)); + ASSERT_EQ(ERR_OK, + dsn::utils::dump_njobj_to_file(file_metadata(5, "abcde"), meta_file_path)); ASSERT_EQ(ERR_OK, file.load_metadata()); ASSERT_EQ("abcde", file.get_md5sum()); ASSERT_EQ(5, file.get_size()); diff --git a/src/replica/bulk_load/replica_bulk_loader.cpp b/src/replica/bulk_load/replica_bulk_loader.cpp index 6672fdceb3..b92a98885e 100644 --- a/src/replica/bulk_load/replica_bulk_loader.cpp +++ b/src/replica/bulk_load/replica_bulk_loader.cpp @@ -16,18 +16,16 @@ // under the License. #include -#include -#include #include #include #include #include #include +#include "absl/strings/string_view.h" #include "block_service/block_service_manager.h" #include "common/bulk_load_common.h" #include "common/gpid.h" -#include "common/json_helper.h" #include "common/replication.codes.h" #include "common/replication_common.h" #include "common/replication_enums.h" @@ -42,14 +40,12 @@ #include "runtime/rpc/rpc_holder.h" #include "runtime/task/async_calls.h" #include "utils/autoref_ptr.h" -#include "utils/blob.h" #include "utils/chrono_literals.h" #include "utils/env.h" #include "utils/fail_point.h" #include "utils/filesystem.h" #include "utils/fmt_logging.h" -#include "utils/ports.h" -#include "absl/strings/string_view.h" +#include "utils/load_dump_object.h" #include "utils/thread_access_checker.h" METRIC_DEFINE_counter(replica, @@ -606,18 +602,10 @@ void replica_bulk_loader::download_sst_file(const std::string &remote_dir, // need to acquire write lock while calling it error_code replica_bulk_loader::parse_bulk_load_metadata(const std::string &fname) { - std::string buf; - auto s = rocksdb::ReadFileToString( - dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive), fname, &buf); - if (dsn_unlikely(!s.ok())) { - LOG_ERROR_PREFIX("read file {} failed, error = {}", fname, s.ToString()); - return ERR_FILE_OPERATION_FAILED; - } - - blob bb = blob::create_from_bytes(std::move(buf)); - if (!json::json_forwarder::decode(bb, _metadata)) { - LOG_ERROR_PREFIX("file({}) is damaged", fname); - return ERR_CORRUPTION; + auto ec = dsn::utils::load_rjobj_from_file(fname, &_metadata); + if (ec != ERR_OK) { + LOG_ERROR_PREFIX("load bulk_load_metadata from file {} failed", fname); + return ec; } if (_metadata.file_total_size <= 0) { diff --git a/src/replica/bulk_load/replica_bulk_loader.h b/src/replica/bulk_load/replica_bulk_loader.h index fcfcca589c..5559f6d4a0 100644 --- a/src/replica/bulk_load/replica_bulk_loader.h +++ b/src/replica/bulk_load/replica_bulk_loader.h @@ -92,7 +92,8 @@ class replica_bulk_loader : replica_base int32_t file_index, dist::block_service::block_filesystem *fs); - // \return ERR_FILE_OPERATION_FAILED: file not exist, get size failed, open file failed + // \return ERR_PATH_NOT_FOUND: file not exist + // \return ERR_FILE_OPERATION_FAILED: get size failed, open file failed // \return ERR_CORRUPTION: parse failed // need to acquire write lock while calling it error_code parse_bulk_load_metadata(const std::string &fname); diff --git a/src/replica/bulk_load/test/replica_bulk_loader_test.cpp b/src/replica/bulk_load/test/replica_bulk_loader_test.cpp index 59a00cd28b..bb874d5007 100644 --- a/src/replica/bulk_load/test/replica_bulk_loader_test.cpp +++ b/src/replica/bulk_load/test/replica_bulk_loader_test.cpp @@ -17,17 +17,12 @@ #include "replica/bulk_load/replica_bulk_loader.h" -#include -#include -#include -#include #include // IWYU pragma: keep #include #include #include "common/bulk_load_common.h" #include "common/gpid.h" -#include "common/json_helper.h" #include "dsn.layer2_types.h" #include "gtest/gtest.h" #include "replica/test/mock_utils.h" @@ -35,10 +30,9 @@ #include "runtime/rpc/rpc_address.h" #include "runtime/task/task_tracker.h" #include "test_util/test_util.h" -#include "utils/blob.h" -#include "utils/env.h" #include "utils/fail_point.h" #include "utils/filesystem.h" +#include "utils/load_dump_object.h" #include "utils/test_macros.h" namespace dsn { @@ -258,14 +252,7 @@ class replica_bulk_loader_test : public replica_test_base _metadata.files.emplace_back(_file_meta); _metadata.file_total_size = _file_meta.size; std::string whole_name = utils::filesystem::path_combine(LOCAL_DIR, METADATA); - blob bb = json::json_forwarder::encode(_metadata); - auto s = - rocksdb::WriteStringToFile(dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive), - rocksdb::Slice(bb.data(), bb.length()), - whole_name, - /* should_sync */ true); - ASSERT_TRUE(s.ok()) << fmt::format( - "write file {} failed, err = {}", whole_name, s.ToString()); + ASSERT_EQ(ERR_OK, utils::dump_rjobj_to_file(_metadata, whole_name)); } bool validate_metadata() @@ -525,7 +512,7 @@ TEST_P(replica_bulk_loader_test, rollback_to_downloading_test) // parse_bulk_load_metadata unit tests TEST_P(replica_bulk_loader_test, bulk_load_metadata_not_exist) { - ASSERT_EQ(test_parse_bulk_load_metadata("path_not_exist"), ERR_FILE_OPERATION_FAILED); + ASSERT_EQ(ERR_PATH_NOT_FOUND, test_parse_bulk_load_metadata("path_not_exist")); } TEST_P(replica_bulk_loader_test, bulk_load_metadata_corrupt) @@ -535,8 +522,7 @@ TEST_P(replica_bulk_loader_test, bulk_load_metadata_corrupt) NO_FATALS(pegasus::create_local_test_file(utils::filesystem::path_combine(LOCAL_DIR, METADATA), &_file_meta)); std::string metadata_file_name = utils::filesystem::path_combine(LOCAL_DIR, METADATA); - error_code ec = test_parse_bulk_load_metadata(metadata_file_name); - ASSERT_EQ(ERR_CORRUPTION, ec); + ASSERT_EQ(ERR_CORRUPTION, test_parse_bulk_load_metadata(metadata_file_name)); utils::filesystem::remove_path(LOCAL_DIR); } @@ -546,8 +532,7 @@ TEST_P(replica_bulk_loader_test, bulk_load_metadata_parse_succeed) NO_FATALS(create_local_metadata_file()); std::string metadata_file_name = utils::filesystem::path_combine(LOCAL_DIR, METADATA); - auto ec = test_parse_bulk_load_metadata(metadata_file_name); - ASSERT_EQ(ec, ERR_OK); + ASSERT_EQ(ERR_OK, test_parse_bulk_load_metadata(metadata_file_name)); ASSERT_TRUE(validate_metadata()); utils::filesystem::remove_path(LOCAL_DIR); } diff --git a/src/replica/replica_disk_migrator.cpp b/src/replica/replica_disk_migrator.cpp index 355421503a..4ad9cb98c1 100644 --- a/src/replica/replica_disk_migrator.cpp +++ b/src/replica/replica_disk_migrator.cpp @@ -36,6 +36,7 @@ #include "utils/filesystem.h" #include "utils/fmt_logging.h" #include "utils/thread_access_checker.h" +#include "utils/load_dump_object.h" namespace dsn { namespace replication { @@ -251,7 +252,9 @@ bool replica_disk_migrator::migrate_replica_app_info(const replica_disk_migrate_ return false; }); replica_init_info init_info = _replica->get_app()->init_info(); - const auto &store_init_info_err = init_info.store(_target_replica_dir); + const auto &store_init_info_err = utils::dump_rjobj_to_file( + init_info, + utils::filesystem::path_combine(_target_replica_dir, replica_init_info::kInitInfo)); if (store_init_info_err != ERR_OK) { LOG_ERROR_PREFIX("disk migration(origin={}, target={}) stores app init info failed({})", req.origin_disk, diff --git a/src/replica/replica_restore.cpp b/src/replica/replica_restore.cpp index 9c255df0b4..47ef20d2be 100644 --- a/src/replica/replica_restore.cpp +++ b/src/replica/replica_restore.cpp @@ -17,8 +17,6 @@ #include #include -#include -#include #include #include #include @@ -34,7 +32,6 @@ #include "block_service/block_service_manager.h" #include "common/backup_common.h" #include "common/gpid.h" -#include "common/json_helper.h" #include "common/replication.codes.h" #include "dsn.layer2_types.h" #include "failure_detector/failure_detector_multimaster.h" @@ -55,6 +52,7 @@ #include "utils/error_code.h" #include "utils/filesystem.h" #include "utils/fmt_logging.h" +#include "utils/load_dump_object.h" using namespace dsn::dist::block_service; @@ -95,31 +93,6 @@ bool replica::remove_useless_file_under_chkpt(const std::string &chkpt_dir, return true; } -bool replica::read_cold_backup_metadata(const std::string &fname, - cold_backup_metadata &backup_metadata) -{ - if (!::dsn::utils::filesystem::file_exists(fname)) { - LOG_ERROR_PREFIX( - "checkpoint on remote storage media is damaged, coz file({}) doesn't exist", fname); - return false; - } - - std::string data; - auto s = rocksdb::ReadFileToString( - dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive), fname, &data); - if (!s.ok()) { - LOG_ERROR_PREFIX("read file '{}' failed, err = {}", fname, s.ToString()); - return false; - } - - if (!::dsn::json::json_forwarder::decode( - blob::create_from_bytes(std::move(data)), backup_metadata)) { - LOG_ERROR_PREFIX("file({}) under checkpoint is damaged", fname); - return false; - } - return true; -} - error_code replica::download_checkpoint(const configuration_restore_request &req, const std::string &remote_chkpt_dir, const std::string &local_chkpt_dir) @@ -206,13 +179,14 @@ error_code replica::get_backup_metadata(block_filesystem *fs, return err; } - // parse cold_backup_meta from metadata file + // Load cold_backup_metadata from metadata file. const std::string local_backup_metada_file = utils::filesystem::path_combine(local_chkpt_dir, cold_backup_constant::BACKUP_METADATA); - if (!read_cold_backup_metadata(local_backup_metada_file, backup_metadata)) { - LOG_ERROR_PREFIX("read cold_backup_metadata from file({}) failed", + auto ec = dsn::utils::load_rjobj_from_file(local_backup_metada_file, &backup_metadata); + if (ec != ERR_OK) { + LOG_ERROR_PREFIX("load cold_backup_metadata from file({}) failed", local_backup_metada_file); - return ERR_FILE_OPERATION_FAILED; + return ec; } _chkpt_total_size = backup_metadata.checkpoint_total_size; diff --git a/src/replica/replication_app_base.cpp b/src/replica/replication_app_base.cpp index 9e3f984cb6..bc25925d1b 100644 --- a/src/replica/replication_app_base.cpp +++ b/src/replica/replication_app_base.cpp @@ -25,9 +25,9 @@ */ #include +#include #include #include -#include #include #include #include @@ -43,7 +43,6 @@ #include "mutation.h" #include "replica.h" #include "replica/replication_app_base.h" -#include "runtime/api_layer1.h" #include "runtime/rpc/rpc_message.h" #include "runtime/rpc/serialization.h" #include "runtime/task/task_code.h" @@ -52,10 +51,13 @@ #include "utils/binary_reader.h" #include "utils/binary_writer.h" #include "utils/blob.h" +#include "utils/env.h" #include "utils/factory_store.h" #include "utils/fail_point.h" +#include "utils/filesystem.h" #include "utils/fmt_logging.h" #include "utils/latency_tracer.h" +#include "utils/load_dump_object.h" METRIC_DEFINE_counter(replica, committed_requests, @@ -68,65 +70,13 @@ namespace replication { const std::string replica_init_info::kInitInfo = ".init-info"; -error_code replica_init_info::load(const std::string &dir) -{ - std::string info_path = utils::filesystem::path_combine(dir, kInitInfo); - LOG_AND_RETURN_NOT_TRUE(ERROR, - utils::filesystem::path_exists(info_path), - ERR_PATH_NOT_FOUND, - "file({}) not exist", - info_path); - LOG_AND_RETURN_NOT_OK( - ERROR, load_json(info_path), "load replica_init_info from {} failed", info_path); - LOG_INFO("load replica_init_info from {} succeed: {}", info_path, to_string()); - return ERR_OK; -} - -error_code replica_init_info::store(const std::string &dir) -{ - uint64_t start = dsn_now_ns(); - std::string info_path = utils::filesystem::path_combine(dir, kInitInfo); - LOG_AND_RETURN_NOT_OK(ERROR, - store_json(info_path), - "store replica_init_info to {} failed, time_used_ns = {}", - info_path, - dsn_now_ns() - start); - LOG_INFO("store replica_init_info to {} succeed, time_used_ns = {}: {}", - info_path, - dsn_now_ns() - start, - to_string()); - return ERR_OK; -} - -error_code replica_init_info::load_json(const std::string &fname) -{ - std::string data; - auto s = rocksdb::ReadFileToString( - dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive), fname, &data); - LOG_AND_RETURN_NOT_TRUE(ERROR, s.ok(), ERR_FILE_OPERATION_FAILED, "read file {} failed", fname); - LOG_AND_RETURN_NOT_TRUE(ERROR, - json::json_forwarder::decode( - blob::create_from_bytes(std::move(data)), *this), - ERR_FILE_OPERATION_FAILED, - "decode json from file {} failed", - fname); - return ERR_OK; -} - -error_code replica_init_info::store_json(const std::string &fname) -{ - return write_blob_to_file(fname, - json::json_forwarder::encode(*this), - dsn::utils::FileDataType::kSensitive); -} - std::string replica_init_info::to_string() { - // TODO(yingchun): use fmt instead - std::ostringstream oss; - oss << "init_ballot = " << init_ballot << ", init_durable_decree = " << init_durable_decree - << ", init_offset_in_private_log = " << init_offset_in_private_log; - return oss.str(); + return fmt::format( + "init_ballot = {}, init_durable_decree = {}, init_offset_in_private_log = {}", + init_ballot, + init_durable_decree, + init_offset_in_private_log); } error_code replica_app_info::load(const std::string &fname) @@ -164,7 +114,8 @@ error_code replica_app_info::store(const std::string &fname) marshall(writer, tmp, DSF_THRIFT_JSON); } - return write_blob_to_file(fname, writer.get_buffer(), dsn::utils::FileDataType::kSensitive); + return dsn::utils::write_data_to_file( + fname, writer.get_buffer(), dsn::utils::FileDataType::kSensitive); } /*static*/ @@ -223,7 +174,8 @@ error_code replication_app_base::open_internal(replica *r) _last_committed_decree = last_durable_decree(); - auto err = _info.load(r->dir()); + auto err = dsn::utils::load_rjobj_from_file( + utils::filesystem::path_combine(r->dir(), replica_init_info::kInitInfo), &_info); LOG_AND_RETURN_NOT_OK(ERROR_PREFIX, err, "load replica_init_info failed"); LOG_AND_RETURN_NOT_TRUE(ERROR_PREFIX, @@ -435,7 +387,11 @@ error_code replication_app_base::update_init_info(replica *r, _info.init_durable_decree = durable_decree; _info.init_offset_in_private_log = private_log_offset; - LOG_AND_RETURN_NOT_OK(ERROR_PREFIX, _info.store(r->dir()), "store replica_init_info failed"); + LOG_AND_RETURN_NOT_OK( + ERROR_PREFIX, + utils::dump_rjobj_to_file( + _info, utils::filesystem::path_combine(r->dir(), replica_init_info::kInitInfo)), + "store replica_init_info failed"); return ERR_OK; } diff --git a/src/replica/replication_app_base.h b/src/replica/replication_app_base.h index db3f44c1cf..b8014003e0 100644 --- a/src/replica/replication_app_base.h +++ b/src/replica/replication_app_base.h @@ -26,12 +26,8 @@ #pragma once -#include -#include -#include -#include -#include #include +#include #include #include @@ -41,11 +37,7 @@ #include "metadata_types.h" #include "replica/replica_base.h" #include "replica_admin_types.h" -#include "utils/defer.h" -#include "utils/env.h" #include "utils/error_code.h" -#include "utils/filesystem.h" -#include "utils/fmt_logging.h" #include "utils/fmt_utils.h" #include "utils/metrics.h" #include "utils/ports.h" @@ -56,46 +48,21 @@ class blob; class message_ex; namespace replication { - class learn_state; class mutation; class replica; -namespace { -template -error_code write_blob_to_file(const std::string &fname, - const T &data, - const dsn::utils::FileDataType &fileDataType) -{ - std::string tmp_fname = fname + ".tmp"; - auto cleanup = defer([tmp_fname]() { utils::filesystem::remove_path(tmp_fname); }); - auto s = rocksdb::WriteStringToFile(dsn::utils::PegasusEnv(fileDataType), - rocksdb::Slice(data.data(), data.length()), - tmp_fname, - /* should_sync */ true); - LOG_AND_RETURN_NOT_TRUE( - ERROR, s.ok(), ERR_FILE_OPERATION_FAILED, "write file {} failed", tmp_fname); - LOG_AND_RETURN_NOT_TRUE(ERROR, - utils::filesystem::rename_path(tmp_fname, fname), - ERR_FILE_OPERATION_FAILED, - "move file from {} to {} failed", - tmp_fname, - fname); - return ERR_OK; -} -} // namespace - class replica_init_info { public: - int32_t magic; - int32_t crc; - ballot init_ballot; - decree init_durable_decree; - int64_t init_offset_in_shared_log; // Deprecated since Pegasus 2.6.0, but leave it to keep - // compatible readability to read replica_init_info from - // older Pegasus version. - int64_t init_offset_in_private_log; + int32_t magic = 0; + int32_t crc = 0; + ballot init_ballot = 0; + decree init_durable_decree = 0; + int64_t init_offset_in_shared_log = 0; // Deprecated since Pegasus 2.6.0, but leave it to keep + // compatible readability to read replica_init_info from + // older Pegasus version. + int64_t init_offset_in_private_log = 0; DEFINE_JSON_SERIALIZATION(init_ballot, init_durable_decree, init_offset_in_shared_log, @@ -104,14 +71,7 @@ class replica_init_info static const std::string kInitInfo; public: - replica_init_info() { memset((void *)this, 0, sizeof(*this)); } - error_code load(const std::string &dir) WARN_UNUSED_RESULT; - error_code store(const std::string &dir); std::string to_string(); - -private: - error_code load_json(const std::string &fname); - error_code store_json(const std::string &fname); }; class replica_app_info diff --git a/src/test/function_test/bulk_load/CMakeLists.txt b/src/test/function_test/bulk_load/CMakeLists.txt index ae1fd559b6..03f55fac24 100644 --- a/src/test/function_test/bulk_load/CMakeLists.txt +++ b/src/test/function_test/bulk_load/CMakeLists.txt @@ -39,6 +39,7 @@ set(MY_PROJ_LIBS krb5 function_test_utils test_utils + dsn_utils rocksdb lz4 zstd diff --git a/src/test/function_test/bulk_load/test_bulk_load.cpp b/src/test/function_test/bulk_load/test_bulk_load.cpp index 502553f03a..85db39c7b3 100644 --- a/src/test/function_test/bulk_load/test_bulk_load.cpp +++ b/src/test/function_test/bulk_load/test_bulk_load.cpp @@ -39,7 +39,6 @@ #include "client/partition_resolver.h" #include "client/replication_ddl_client.h" #include "common/bulk_load_common.h" -#include "common/json_helper.h" #include "gtest/gtest.h" #include "include/pegasus/client.h" // IWYU pragma: keep #include "meta/meta_bulk_load_service.h" @@ -52,6 +51,7 @@ #include "utils/errors.h" #include "utils/filesystem.h" #include "utils/flags.h" +#include "utils/load_dump_object.h" #include "utils/test_macros.h" #include "utils/utils.h" @@ -89,18 +89,6 @@ class bulk_load_test : public test_util NO_FATALS(run_cmd_from_project_root("rm -rf " + bulk_load_local_app_root_)); } - // Generate the 'bulk_load_info' file according to 'bli' to path 'bulk_load_info_path'. - void generate_bulk_load_info(const bulk_load_info &bli, const std::string &bulk_load_info_path) - { - auto value = dsn::json::json_forwarder::encode(bli); - auto s = - rocksdb::WriteStringToFile(dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive), - rocksdb::Slice(value.data(), value.length()), - bulk_load_info_path, - /* should_sync */ true); - ASSERT_TRUE(s.ok()) << s.ToString(); - } - // Generate the '.xxx.meta' file according to the 'xxx' file // in path 'file_path'. void generate_metadata_file(const std::string &file_path) @@ -110,7 +98,7 @@ class bulk_load_test : public test_util file_path, dsn::utils::FileDataType::kSensitive, fm.size)); ASSERT_EQ(ERR_OK, dsn::utils::filesystem::md5sum(file_path, fm.md5)); auto metadata_file_path = dist::block_service::local_service::get_metafile(file_path); - ASSERT_EQ(ERR_OK, fm.dump_to_file(metadata_file_path)); + ASSERT_EQ(ERR_OK, dsn::utils::dump_njobj_to_file(fm, metadata_file_path)); } void generate_bulk_load_files() @@ -192,15 +180,9 @@ class bulk_load_test : public test_util } // Generate 'bulk_load_metadata' file for each partition. - blob bb = json::json_forwarder::encode(blm); std::string blm_path = dsn::utils::filesystem::path_combine( partition_path, bulk_load_constant::BULK_LOAD_METADATA); - auto s = rocksdb::WriteStringToFile( - dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive), - rocksdb::Slice(bb.data(), bb.length()), - blm_path, - /* should_sync */ true); - ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(ERR_OK, dsn::utils::dump_rjobj_to_file(blm, blm_path)); // Generate '.bulk_load_metadata.meta' file of 'bulk_load_metadata'. NO_FATALS(generate_metadata_file(blm_path)); @@ -208,8 +190,10 @@ class bulk_load_test : public test_util // Generate 'bulk_load_info' file for this table. auto bulk_load_info_path = fmt::format("{}/bulk_load_info", bulk_load_local_app_root_); - NO_FATALS(generate_bulk_load_info(bulk_load_info(table_id_, table_name_, partition_count_), - bulk_load_info_path)); + ASSERT_EQ( + ERR_OK, + dsn::utils::dump_rjobj_to_file(bulk_load_info(table_id_, table_name_, partition_count_), + bulk_load_info_path)); // Generate '.bulk_load_info.meta' file of 'bulk_load_info'. NO_FATALS(generate_metadata_file(bulk_load_info_path)); @@ -334,10 +318,10 @@ TEST_F(bulk_load_test, inconsistent_bulk_load_info) // kind of inconsistencies. bulk_load_info tests[] = {{table_id_ + 1, table_name_, partition_count_}, {table_id_, table_name_, partition_count_ * 2}}; + auto bulk_load_info_path = fmt::format("{}/bulk_load_info", bulk_load_local_app_root_); for (const auto &test : tests) { // Generate inconsistent 'bulk_load_info'. - auto bulk_load_info_path = fmt::format("{}/bulk_load_info", bulk_load_local_app_root_); - NO_FATALS(generate_bulk_load_info(test, bulk_load_info_path)); + ASSERT_EQ(ERR_OK, dsn::utils::dump_rjobj_to_file(test, bulk_load_info_path)); // Generate '.bulk_load_info.meta'. NO_FATALS(generate_metadata_file(bulk_load_info_path)); diff --git a/src/utils/filesystem.cpp b/src/utils/filesystem.cpp index d0d4396640..ec414767cc 100644 --- a/src/utils/filesystem.cpp +++ b/src/utils/filesystem.cpp @@ -41,6 +41,7 @@ #include #include +#include "absl/strings/string_view.h" #include "utils/defer.h" #include "utils/env.h" #include "utils/fail_point.h" @@ -48,7 +49,6 @@ #include "utils/fmt_logging.h" #include "utils/ports.h" #include "utils/safe_strerror_posix.h" -#include "absl/strings/string_view.h" #define getcwd_ getcwd #define rmdir_ rmdir diff --git a/src/utils/fmt_logging.h b/src/utils/fmt_logging.h index 2e40c82790..62043a2d08 100644 --- a/src/utils/fmt_logging.h +++ b/src/utils/fmt_logging.h @@ -275,16 +275,26 @@ inline const char *null_str_printer(const char *s) { return s == nullptr ? "(nul LOG_AND_RETURN_NOT_TRUE(level, _err == ::dsn::ERR_OK, _err, __VA_ARGS__); \ } while (0) -// Return the given rocksdb::Status 's' if it is not OK. -#define LOG_AND_RETURN_NOT_RDB_OK(level, s, ...) \ +// Return the given rocksdb::Status of 'exp' if it is not OK. +#define LOG_AND_RETURN_NOT_RDB_OK(level, exp, ...) \ do { \ - const auto &_s = (s); \ + const auto &_s = (exp); \ if (dsn_unlikely(!_s.ok())) { \ LOG_##level("{}: {}", _s.ToString(), fmt::format(__VA_ARGS__)); \ return _s; \ } \ } while (0) +// Return the given 'err' code if 'exp' is not OK. +#define LOG_AND_RETURN_CODE_NOT_RDB_OK(level, exp, err, ...) \ + do { \ + const auto &_s = (exp); \ + if (dsn_unlikely(!_s.ok())) { \ + LOG_##level("{}: {}", _s.ToString(), fmt::format(__VA_ARGS__)); \ + return err; \ + } \ + } while (0) + #ifndef NDEBUG #define DCHECK CHECK #define DCHECK_NOTNULL CHECK_NOTNULL diff --git a/src/utils/load_dump_object.h b/src/utils/load_dump_object.h new file mode 100644 index 0000000000..a2ee031670 --- /dev/null +++ b/src/utils/load_dump_object.h @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/json_helper.h" +#include "utils/blob.h" +#include "utils/defer.h" +#include "utils/env.h" +#include "utils/error_code.h" +#include "utils/filesystem.h" +#include "utils/fmt_logging.h" + +namespace dsn { +namespace utils { + +// Write 'data' to path 'file_path', while 'type' decide whether to encrypt the data. +template +error_code write_data_to_file(const std::string &file_path, const T &data, const FileDataType &type) +{ + const std::string tmp_file_path = fmt::format("{}.tmp", file_path); + auto cleanup = defer([tmp_file_path]() { filesystem::remove_path(tmp_file_path); }); + LOG_AND_RETURN_CODE_NOT_RDB_OK( + ERROR, + rocksdb::WriteStringToFile(PegasusEnv(type), + rocksdb::Slice(data.data(), data.length()), + tmp_file_path, + /* should_sync */ true), + ERR_FILE_OPERATION_FAILED, + "write file '{}' failed", + tmp_file_path); + LOG_AND_RETURN_NOT_TRUE(ERROR, + filesystem::rename_path(tmp_file_path, file_path), + ERR_FILE_OPERATION_FAILED, + "move file from '{}' to '{}' failed", + tmp_file_path, + file_path); + return ERR_OK; +} + +// Load a object from the file in 'file_path' to 'obj', the file content must be in JSON format. +// The object T must use the marco DEFINE_JSON_SERIALIZATION(...) when defining, which implement +// the RapidJson APIs. +template +error_code load_rjobj_from_file(const std::string &file_path, T *obj) +{ + LOG_AND_RETURN_NOT_TRUE(ERROR, + filesystem::path_exists(file_path), + ERR_PATH_NOT_FOUND, + "file '{}' not exist", + file_path); + std::string data; + LOG_AND_RETURN_CODE_NOT_RDB_OK( + ERROR, + rocksdb::ReadFileToString(PegasusEnv(FileDataType::kSensitive), file_path, &data), + ERR_FILE_OPERATION_FAILED, + "read file '{}' failed", + file_path); + LOG_AND_RETURN_NOT_TRUE( + ERROR, + json::json_forwarder::decode(blob::create_from_bytes(std::move(data)), *obj), + ERR_CORRUPTION, + "decode JSON from file '{}' failed", + file_path); + return ERR_OK; +} + +// Dump the object to the file in 'file_path' according to 'obj', the file content will be in JSON +// format. +// The object T must use the marco DEFINE_JSON_SERIALIZATION(...) when defining, which implement +// the RapidJson APIs. +template +error_code dump_rjobj_to_file(const T &obj, const std::string &file_path) +{ + const auto data = json::json_forwarder::encode(obj); + LOG_AND_RETURN_NOT_OK(ERROR, + write_data_to_file(file_path, data, FileDataType::kSensitive), + "dump content to '{}' failed", + file_path); + return ERR_OK; +} + +// Similar to load_rjobj_from_file, but the object T must use the +// marco NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(...) when defining, +// which implement the NlohmannJson APIs. +template +error_code load_njobj_from_file(const std::string &file_path, T *obj) +{ + LOG_AND_RETURN_NOT_TRUE(ERROR, + filesystem::path_exists(file_path), + ERR_PATH_NOT_FOUND, + "file '{}' not exist", + file_path); + std::string data; + LOG_AND_RETURN_CODE_NOT_RDB_OK( + ERROR, + rocksdb::ReadFileToString(PegasusEnv(FileDataType::kSensitive), file_path, &data), + ERR_FILE_OPERATION_FAILED, + "read file '{}' failed", + file_path); + try { + nlohmann::json::parse(data).get_to(*obj); + } catch (nlohmann::json::exception &exp) { + LOG_WARNING("decode JSON from file '{}' failed, exception = {}, data = [{}]", + file_path, + exp.what(), + data); + return ERR_CORRUPTION; + } + return ERR_OK; +} + +// Similar to dump_rjobj_to_file, but the object T must use the +// marco NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(...) when defining, +// which implement the NlohmannJson APIs. +template +error_code dump_njobj_to_file(const T &obj, const std::string &file_path) +{ + const auto data = nlohmann::json(obj).dump(); + LOG_AND_RETURN_NOT_OK(ERROR, + write_data_to_file(file_path, data, FileDataType::kSensitive), + "dump content to '{}' failed", + file_path); + return ERR_OK; +} + +} // namespace utils +} // namespace dsn diff --git a/src/utils/test/load_dump_object_test.cpp b/src/utils/test/load_dump_object_test.cpp new file mode 100644 index 0000000000..4b84de33c3 --- /dev/null +++ b/src/utils/test/load_dump_object_test.cpp @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "utils/load_dump_object.h" + +#include +#include +#include +#include + +#include "gtest/gtest.h" + +namespace dsn { +namespace utils { +struct nlohmann_json_struct; +struct rapid_json_struct; + +#define STRUCT_CONTENT(T) \ + int64_t a; \ + std::string b; \ + std::vector c; \ + bool operator==(const T &other) const { return a == other.a && b == other.b && c == other.c; } + +struct nlohmann_json_struct +{ + STRUCT_CONTENT(nlohmann_json_struct); +}; +NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(nlohmann_json_struct, a, b, c); + +TEST(load_dump_object, nlohmann_json_struct_normal_test) +{ + const std::string path("nlohmann_json_struct_test"); + nlohmann_json_struct obj; + obj.a = 123; + obj.b = "hello world"; + obj.c = std::vector({1, 3, 5, 2, 4}); + ASSERT_EQ(ERR_OK, dump_njobj_to_file(obj, path)); + nlohmann_json_struct obj2; + ASSERT_EQ(ERR_OK, load_njobj_from_file(path, &obj2)); + ASSERT_EQ(obj, obj2); +} + +TEST(load_dump_object, nlohmann_json_struct_load_failed_test) +{ + const std::string path("nlohmann_json_struct_test_bad"); + ASSERT_TRUE(filesystem::remove_path(path)); + + nlohmann_json_struct obj; + ASSERT_EQ(ERR_PATH_NOT_FOUND, load_njobj_from_file(path, &obj)); + + auto s = + rocksdb::WriteStringToFile(dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive), + rocksdb::Slice("invalid data"), + path, + /* should_sync */ true); + ASSERT_TRUE(s.ok()) << s.ToString(); + + ASSERT_EQ(ERR_CORRUPTION, load_njobj_from_file(path, &obj)); +} + +struct rapid_json_struct +{ + STRUCT_CONTENT(rapid_json_struct); + DEFINE_JSON_SERIALIZATION(a, b, c); +}; + +TEST(load_dump_object, rapid_json_struct_test) +{ + const std::string path("rapid_json_struct_test"); + rapid_json_struct obj; + obj.a = 123; + obj.b = "hello world"; + obj.c = std::vector({1, 3, 5, 2, 4}); + ASSERT_EQ(ERR_OK, dump_rjobj_to_file(obj, path)); + rapid_json_struct obj2; + ASSERT_EQ(ERR_OK, load_rjobj_from_file(path, &obj2)); + ASSERT_EQ(obj, obj2); +} + +TEST(load_dump_object, rapid_json_struct_load_failed_test) +{ + const std::string path("rapid_json_struct_test_bad"); + ASSERT_TRUE(filesystem::remove_path(path)); + + rapid_json_struct obj; + ASSERT_EQ(ERR_PATH_NOT_FOUND, load_rjobj_from_file(path, &obj)); + + auto s = + rocksdb::WriteStringToFile(dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive), + rocksdb::Slice("invalid data"), + path, + /* should_sync */ true); + ASSERT_TRUE(s.ok()) << s.ToString(); + + ASSERT_EQ(ERR_CORRUPTION, load_rjobj_from_file(path, &obj)); +} + +} // namespace utils +} // namespace dsn From 4093b4f27ec21dbe5c57ebea8bf0f308dda15c77 Mon Sep 17 00:00:00 2001 From: Yingchun Lai Date: Fri, 5 Jan 2024 14:50:20 +0800 Subject: [PATCH 6/8] chore(collector): add fmt job for collector CI (#1833) - Update collector README - Update the url from github.com/pegasus-kv/collector to github.com/apache/incubator-pegasus/collector - Add fmt job for collector CI --- .github/workflows/lint_and_test_collector.yml | 19 +++++++++++++++++++ collector/README.md | 16 ++++++++++++++++ collector/go.mod | 2 +- collector/main.go | 6 +++--- collector/sink/falcon_sink.go | 2 +- collector/sink/prometheus_sink.go | 2 +- collector/sink/sink.go | 2 +- collector/usage/usage_recorder.go | 2 +- collector/webui/index.go | 2 +- 9 files changed, 44 insertions(+), 9 deletions(-) diff --git a/.github/workflows/lint_and_test_collector.yml b/.github/workflows/lint_and_test_collector.yml index e257dbc93d..e21ede1a87 100644 --- a/.github/workflows/lint_and_test_collector.yml +++ b/.github/workflows/lint_and_test_collector.yml @@ -31,14 +31,32 @@ on: paths: - .github/workflows/lint_and_test_collector.yml - collector/** + - go-client/** # for manually triggering workflow workflow_dispatch: # workflow tasks jobs: + format: + name: Format + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v3 + - name: Setup Go + uses: actions/setup-go@v2 + with: + go-version: 1.18 + - name: Format + working-directory: ./go-client + run: | + gofmt -d . + test -z "$(gofmt -d .)" + lint: name: Lint + needs: format runs-on: ubuntu-20.04 steps: - name: Checkout @@ -58,6 +76,7 @@ jobs: build: name: Build + needs: lint runs-on: ubuntu-20.04 steps: - name: Checkout diff --git a/collector/README.md b/collector/README.md index 7f8b09ac2d..d05cf8c73a 100644 --- a/collector/README.md +++ b/collector/README.md @@ -26,3 +26,19 @@ Collector is a part of the Pegasus ecosystem that serves as: 1. the service availability detector 2. the hotkey detector 3. the capacity units recorder + +## Requirement + +- Go1.18+ + +## Development + +Build the collector: +```bash +make build +``` + +Format the code: +```bash +make fmt +``` diff --git a/collector/go.mod b/collector/go.mod index 9ba2888dd8..de9e22a9ee 100644 --- a/collector/go.mod +++ b/collector/go.mod @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -module github.com/pegasus-kv/collector +module github.com/apache/incubator-pegasus/collector go 1.18 diff --git a/collector/main.go b/collector/main.go index 71dd5abbf9..e42881adfc 100644 --- a/collector/main.go +++ b/collector/main.go @@ -26,9 +26,9 @@ import ( "strings" "syscall" - "github.com/pegasus-kv/collector/avail" - "github.com/pegasus-kv/collector/metrics" - "github.com/pegasus-kv/collector/webui" + "github.com/apache/incubator-pegasus/collector/avail" + "github.com/apache/incubator-pegasus/collector/metrics" + "github.com/apache/incubator-pegasus/collector/webui" log "github.com/sirupsen/logrus" "github.com/spf13/viper" "gopkg.in/natefinch/lumberjack.v2" diff --git a/collector/sink/falcon_sink.go b/collector/sink/falcon_sink.go index 906374b92d..3f27c7632d 100644 --- a/collector/sink/falcon_sink.go +++ b/collector/sink/falcon_sink.go @@ -25,7 +25,7 @@ import ( "net/http" "time" - "github.com/pegasus-kv/collector/aggregate" + "github.com/apache/incubator-pegasus/collector/aggregate" log "github.com/sirupsen/logrus" "github.com/spf13/viper" ) diff --git a/collector/sink/prometheus_sink.go b/collector/sink/prometheus_sink.go index 34d5e18841..7b62543894 100644 --- a/collector/sink/prometheus_sink.go +++ b/collector/sink/prometheus_sink.go @@ -20,7 +20,7 @@ package sink import ( "sync" - "github.com/pegasus-kv/collector/aggregate" + "github.com/apache/incubator-pegasus/collector/aggregate" "github.com/prometheus/client_golang/prometheus" ) diff --git a/collector/sink/sink.go b/collector/sink/sink.go index 361c73303e..9389e2249e 100644 --- a/collector/sink/sink.go +++ b/collector/sink/sink.go @@ -18,7 +18,7 @@ package sink import ( - "github.com/pegasus-kv/collector/aggregate" + "github.com/apache/incubator-pegasus/collector/aggregate" log "github.com/sirupsen/logrus" "github.com/spf13/viper" ) diff --git a/collector/usage/usage_recorder.go b/collector/usage/usage_recorder.go index 6a6784d39f..d031e5f745 100644 --- a/collector/usage/usage_recorder.go +++ b/collector/usage/usage_recorder.go @@ -22,8 +22,8 @@ import ( "fmt" "time" + "github.com/apache/incubator-pegasus/collector/aggregate" "github.com/apache/incubator-pegasus/go-client/pegasus" - "github.com/pegasus-kv/collector/aggregate" log "github.com/sirupsen/logrus" "github.com/spf13/viper" "gopkg.in/tomb.v2" diff --git a/collector/webui/index.go b/collector/webui/index.go index 0c0d26ff4b..948dd563ce 100644 --- a/collector/webui/index.go +++ b/collector/webui/index.go @@ -18,8 +18,8 @@ package webui import ( + "github.com/apache/incubator-pegasus/collector/aggregate" "github.com/kataras/iris/v12" - "github.com/pegasus-kv/collector/aggregate" ) var indexPageClusterStats = []string{ From 43f2e3ba7165a54c837545911762dc553353b6ef Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Fri, 5 Jan 2024 15:13:53 +0800 Subject: [PATCH 7/8] chore(version): update the version of pegasus server and client of each language to 2.6.0-SNAPSHOT (#1831) --- java-client/pom.xml | 2 +- nodejs-client/package.json | 2 +- python-client/pypegasus/__init__.py | 2 +- scala-client/build.sbt | 4 ++-- src/include/pegasus/version.h | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/java-client/pom.xml b/java-client/pom.xml index 4d92e15792..1d83237fc5 100644 --- a/java-client/pom.xml +++ b/java-client/pom.xml @@ -29,7 +29,7 @@ org.apache.pegasus pegasus-client - 2.5.0-SNAPSHOT + 2.6.0-SNAPSHOT jar Pegasus Java Client diff --git a/nodejs-client/package.json b/nodejs-client/package.json index eba987f600..f8e1a9ccc8 100644 --- a/nodejs-client/package.json +++ b/nodejs-client/package.json @@ -1,6 +1,6 @@ { "name": "pegasus-nodejs-client", - "version": "2.5.0", + "version": "2.6.0", "description": "offical pegasus nodejs client", "main": "index.js", "scripts": { diff --git a/python-client/pypegasus/__init__.py b/python-client/pypegasus/__init__.py index 9a8e0626c5..d991184b27 100644 --- a/python-client/pypegasus/__init__.py +++ b/python-client/pypegasus/__init__.py @@ -18,4 +18,4 @@ # specific language governing permissions and limitations # under the License. -__version__ = '2.5.0' +__version__ = '2.6.0' diff --git a/scala-client/build.sbt b/scala-client/build.sbt index 38313ad51b..f161665446 100644 --- a/scala-client/build.sbt +++ b/scala-client/build.sbt @@ -17,7 +17,7 @@ * under the License. */ -version := "2.5.0-SNAPSHOT" +version := "2.6.0-SNAPSHOT" organization := "org.apache" @@ -54,6 +54,6 @@ credentials += Credentials( libraryDependencies ++= Seq( "com.google.guava" % "guava" % "21.0", - "org.apache.pegasus" % "pegasus-client" % "2.5.0-SNAPSHOT", + "org.apache.pegasus" % "pegasus-client" % "2.6.0-SNAPSHOT", "org.scalatest" %% "scalatest" % "3.0.3" % Test ) diff --git a/src/include/pegasus/version.h b/src/include/pegasus/version.h index 37995f9f9f..79e682a193 100644 --- a/src/include/pegasus/version.h +++ b/src/include/pegasus/version.h @@ -18,4 +18,4 @@ */ #pragma once -#define PEGASUS_VERSION "2.5.0-SNAPSHOT" +#define PEGASUS_VERSION "2.6.0-SNAPSHOT" From 363d7898828bebbb562c6de2ccabf982b98e4fe6 Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Fri, 5 Jan 2024 17:31:22 +0800 Subject: [PATCH 8/8] feat(new_metrics): add some basic fields to the response to metrics query (#1827) https://github.com/apache/incubator-pegasus/issues/1820 Labels are needed by metric models like Prometheus Data Model. Therefore, cluster name, role name, host name and port should be provided in the response to metrics query as the labels. All of these fields would be added in the form of name/value pairs of json object. Original entity array would be the value of a json object with name "entities". --- src/runtime/task/task.h | 6 +++ src/utils/CMakeLists.txt | 3 +- src/utils/metrics.cpp | 94 +++++++++++++++++++++++++++++---- src/utils/metrics.h | 8 +++ src/utils/test/metrics_test.cpp | 30 +++++++++-- 5 files changed, 124 insertions(+), 17 deletions(-) diff --git a/src/runtime/task/task.h b/src/runtime/task/task.h index 4e2f70ed6e..9fbe022dd8 100644 --- a/src/runtime/task/task.h +++ b/src/runtime/task/task.h @@ -243,6 +243,7 @@ class task : public ref_counter, public extensible_object static int get_current_worker_index(); static const char *get_current_node_name(); static rpc_engine *get_current_rpc(); + static rpc_engine *get_current_rpc2(); static env_provider *get_current_env(); static void set_tls_dsn_context( @@ -594,6 +595,11 @@ __inline /*static*/ rpc_engine *task::get_current_rpc() return tls_dsn.rpc; } +__inline /*static*/ rpc_engine *task::get_current_rpc2() +{ + return tls_dsn.magic == 0xdeadbeef ? tls_dsn.rpc : nullptr; +} + __inline /*static*/ env_provider *task::get_current_env() { check_tls_dsn(); diff --git a/src/utils/CMakeLists.txt b/src/utils/CMakeLists.txt index 38390f0ed6..1c1c7d0c4f 100644 --- a/src/utils/CMakeLists.txt +++ b/src/utils/CMakeLists.txt @@ -43,9 +43,10 @@ set(MY_BINPLACES "") if (APPLE) dsn_add_static_library() - target_link_libraries(${MY_PROJ_NAME} PRIVATE dsn_http) + target_link_libraries(${MY_PROJ_NAME} PRIVATE dsn_http dsn_replication_common) else() dsn_add_shared_library() + target_link_libraries(${MY_PROJ_NAME} PRIVATE dsn_replication_common) endif() add_subdirectory(long_adder_bench) diff --git a/src/utils/metrics.cpp b/src/utils/metrics.cpp index 3d2ad00b80..2cd8e70d01 100644 --- a/src/utils/metrics.cpp +++ b/src/utils/metrics.cpp @@ -22,11 +22,17 @@ #include #include #include +#include #include #include "http/http_method.h" #include "http/http_status_code.h" #include "runtime/api_layer1.h" +#include "runtime/rpc/rpc_address.h" +#include "runtime/rpc/rpc_engine.h" +#include "runtime/service_app.h" +#include "runtime/service_engine.h" +#include "runtime/task/task.h" #include "utils/flags.h" #include "utils/rand.h" #include "utils/shared_io_service.h" @@ -410,17 +416,6 @@ metric_registry::entity_map metric_registry::entities() const return _entities; } -void metric_registry::take_snapshot(metric_json_writer &writer, const metric_filters &filters) const -{ - utils::auto_read_lock l(_lock); - - writer.StartArray(); - for (const auto &entity : _entities) { - entity.second->take_snapshot(writer, filters); - } - writer.EndArray(); -} - metric_entity_ptr metric_registry::find_or_create_entity(const metric_entity_prototype *prototype, const std::string &id, const metric_entity::attr_map &attrs) @@ -449,6 +444,83 @@ metric_entity_ptr metric_registry::find_or_create_entity(const metric_entity_pro return entity; } +DSN_DECLARE_string(cluster_name); + +namespace { + +#define ENCODE_OBJ_VAL(cond, val) \ + do { \ + if (dsn_likely(cond)) { \ + dsn::json::json_encode(writer, val); \ + } else { \ + dsn::json::json_encode(writer, "unknown"); \ + } \ + } while (0) + +void encode_cluster(dsn::metric_json_writer &writer) +{ + writer.Key(dsn::kMetricClusterField.c_str()); + + ENCODE_OBJ_VAL(!utils::is_empty(dsn::FLAGS_cluster_name), dsn::FLAGS_cluster_name); +} + +void encode_role(dsn::metric_json_writer &writer) +{ + writer.Key(dsn::kMetricRoleField.c_str()); + + const auto *const node = dsn::task::get_current_node2(); + ENCODE_OBJ_VAL(node != nullptr, node->get_service_app_info().full_name); +} + +void encode_host(dsn::metric_json_writer &writer) +{ + writer.Key(dsn::kMetricHostField.c_str()); + + char hostname[1024]; + ENCODE_OBJ_VAL(gethostname(hostname, sizeof(hostname)) == 0, hostname); +} + +void encode_port(dsn::metric_json_writer &writer) +{ + writer.Key(dsn::kMetricPortField.c_str()); + + const auto *const rpc = dsn::task::get_current_rpc2(); + ENCODE_OBJ_VAL(rpc != nullptr, rpc->primary_address().port()); +} + +#undef ENCODE_OBJ_VAL + +} // anonymous namespace + +void metric_registry::encode_entities(metric_json_writer &writer, + const metric_filters &filters) const +{ + writer.Key(dsn::kMetricEntitiesField.c_str()); + + writer.StartArray(); + + { + utils::auto_read_lock l(_lock); + + for (const auto &entity : _entities) { + entity.second->take_snapshot(writer, filters); + } + } + + writer.EndArray(); +} + +void metric_registry::take_snapshot(metric_json_writer &writer, const metric_filters &filters) const +{ + writer.StartObject(); + encode_cluster(writer); + encode_role(writer); + encode_host(writer); + encode_port(writer); + encode_entities(writer, filters); + writer.EndObject(); +} + metric_registry::collected_entities_info metric_registry::collect_stale_entities() const { collected_entities_info collected_info; diff --git a/src/utils/metrics.h b/src/utils/metrics.h index 93ed703e7d..abd7d5ef23 100644 --- a/src/utils/metrics.h +++ b/src/utils/metrics.h @@ -348,6 +348,12 @@ const std::string kMetricEntityIdField = "id"; const std::string kMetricEntityAttrsField = "attributes"; const std::string kMetricEntityMetricsField = "metrics"; +const std::string kMetricClusterField = "cluster"; +const std::string kMetricRoleField = "role"; +const std::string kMetricHostField = "host"; +const std::string kMetricPortField = "port"; +const std::string kMetricEntitiesField = "entities"; + class metric_entity : public ref_counter { public: @@ -687,6 +693,8 @@ class metric_registry : public utils::singleton const std::string &id, const metric_entity::attr_map &attrs); + void encode_entities(metric_json_writer &writer, const metric_filters &filters) const; + // These functions are used to retire stale entities. // // Since retirement is infrequent, there tend to be no entity that should be retired. diff --git a/src/utils/test/metrics_test.cpp b/src/utils/test/metrics_test.cpp index 9e6b51066f..144466b4ff 100644 --- a/src/utils/test/metrics_test.cpp +++ b/src/utils/test/metrics_test.cpp @@ -2267,22 +2267,35 @@ TEST(metrics_test, take_snapshot_entity) } } +const std::unordered_set kAllMetricQueryFields = {kMetricClusterField, + kMetricRoleField, + kMetricHostField, + kMetricPortField, + kMetricEntitiesField}; + void check_entity_ids_from_json_string(const std::string &json_string, const std::unordered_set &expected_entity_ids) { - // Even if there is not any entity selected, `json_string` should be "[]". + // Even if there is not any entity selected, `json_string` should not be empty. ASSERT_FALSE(json_string.empty()); rapidjson::Document doc; rapidjson::ParseResult result = doc.Parse(json_string.c_str()); ASSERT_FALSE(result.IsError()); + // The root struct should be an object. + ASSERT_TRUE(doc.IsObject()); + for (const auto &field : kAllMetricQueryFields) { + ASSERT_TRUE(doc.HasMember(field.c_str())); + } + // Actual entity ids parsed from json string. std::unordered_set actual_entity_ids; // The json format for entities should be an array. - ASSERT_TRUE(doc.IsArray()); - for (const auto &entity : doc.GetArray()) { + const auto &entities = doc.FindMember(kMetricEntitiesField.c_str())->value; + ASSERT_TRUE(entities.IsArray()); + for (const auto &entity : entities.GetArray()) { // The json format for each entity should be an object. ASSERT_TRUE(entity.IsObject()); @@ -2429,12 +2442,19 @@ void check_entities_from_json_string(const std::string &json_string, return; } + // The successful response should be an object. + ASSERT_TRUE(doc.IsObject()); + for (const auto &field : kAllMetricQueryFields) { + ASSERT_TRUE(doc.HasMember(field.c_str())); + } + // Actual entities parsed from json string. entity_container actual_entities; // The json format for entities should be an array. - ASSERT_TRUE(doc.IsArray()); - for (const auto &entity : doc.GetArray()) { + const auto &entities = doc.FindMember(kMetricEntitiesField.c_str())->value; + ASSERT_TRUE(entities.IsArray()); + for (const auto &entity : entities.GetArray()) { // The json format for each entity should be an object. ASSERT_TRUE(entity.IsObject());