diff --git a/.daily_canary b/.daily_canary index cfe6346401fe..32c35a995211 100644 --- a/.daily_canary +++ b/.daily_canary @@ -1 +1 @@ -Remarkable bird, the Norwegian Canary, idn'it, ay? +ping \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 476e6bbcd79c..680aeb765612 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,9 +7,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## Unreleased -### Changed +## Changed -- The entry point for creation of C++ apps is now `make_user_endpoints()`. The old entry point `get_rpc_handler()` has been removed. +- The entry point for creation of C++ apps is now `make_user_endpoints()`. The old entry point `get_rpc_handler()` has been removed (#3562). +- Failed recovery procedures no longer block subsequent recoveries: `.recovery` ledger files are now created while the recovery is in progress and ignored or deleted by nodes on startup (#3563). ## [2.0.0-rc1] diff --git a/doc/operations/ledger_snapshot.rst b/doc/operations/ledger_snapshot.rst index 0e297315a424..537304589e66 100644 --- a/doc/operations/ledger_snapshot.rst +++ b/doc/operations/ledger_snapshot.rst @@ -40,7 +40,10 @@ The listing below is an example of what a ledger directory may look like: -rw-rw-r-- 1 user user 1.1M Jan 31 14:00 ledger_92502-97520.committed -rw-rw-r-- 1 user user 553K Jan 31 14:00 ledger_97521 # File still in progress -.. note:: On startup, a CCF node started with existing ledger files may suffix some of the file names with ``.corrupted`` if the ledger file cannot be parsed, depending on the sequence number the node will join from. +.. note:: + + - On startup, a CCF node started with existing ledger files may suffix some of the file names with ``.corrupted`` if the ledger file cannot be parsed, depending on the sequence number the node will join from. + - While the :doc:`/operations/recovery` procedure is in progress, new ledger files are suffixed with ``.recovery``. These files are automatically renamed (i.e. recovery suffix removed) once the recovery procedure is complete. ``.recovery`` files are automatically discarded on node startup so that a failed recovery attempt does not prevent further recoveries. .. note:: A new ledger chunk can also be created by the ``trigger_ledger_chunk`` governance action, which will automatically produce a new chunk at the following signature transaction. diff --git a/include/ccf/ds/nonstd.h b/include/ccf/ds/nonstd.h index cf687d71aecc..890d08f731ed 100644 --- a/include/ccf/ds/nonstd.h +++ b/include/ccf/ds/nonstd.h @@ -160,4 +160,15 @@ namespace nonstd return s; } + + static inline std::string remove_suffix( + const std::string& s, const std::string& suffix) + { + if (ends_with(s, suffix)) + { + return s.substr(0, s.size() - suffix.size()); + } + + return s; + } } \ No newline at end of file diff --git a/python/ccf/ledger.py b/python/ccf/ledger.py index 44d74d04acb1..2c39300dd1c6 100644 --- a/python/ccf/ledger.py +++ b/python/ccf/ledger.py @@ -37,6 +37,7 @@ SERVICE_INFO_TABLE_NAME = "public:ccf.gov.service.info" COMMITTED_FILE_SUFFIX = ".committed" +RECOVERY_FILE_SUFFIX = ".recovery" # Key used by CCF to record single-key tables WELL_KNOWN_SINGLETON_TABLE_KEY = bytes(bytearray(8)) @@ -110,6 +111,7 @@ def range_from_filename(filename: str) -> Tuple[int, Optional[int]]: elements = ( os.path.basename(filename) .replace(COMMITTED_FILE_SUFFIX, "") + .replace(RECOVERY_FILE_SUFFIX, "") .replace("ledger_", "") .split("-") ) @@ -822,6 +824,7 @@ def __init__( self, directories: List[str], committed_only: bool = True, + read_recovery_files: bool = False, insecure_skip_verification: bool = False, ): @@ -830,8 +833,17 @@ def __init__( ledger_files: List[str] = [] for directory in directories: for path in os.listdir(directory): - if committed_only and not path.endswith(COMMITTED_FILE_SUFFIX): + sanitised_path = path + if path.endswith(RECOVERY_FILE_SUFFIX): + sanitised_path = path[: -len(RECOVERY_FILE_SUFFIX)] + if not read_recovery_files: + continue + + if committed_only and not sanitised_path.endswith( + COMMITTED_FILE_SUFFIX + ): continue + chunk = os.path.join(directory, path) # The same ledger file may appear multiple times in different directories # so ignore duplicates diff --git a/src/consensus/aft/raft.h b/src/consensus/aft/raft.h index 9aafffb0a79c..a78fe4f85559 100644 --- a/src/consensus/aft/raft.h +++ b/src/consensus/aft/raft.h @@ -345,7 +345,10 @@ namespace aft } void init_as_backup( - Index index, Term term, const std::vector& term_history) override + Index index, + Term term, + const std::vector& term_history, + Index recovery_start_index = 0) override { // This should only be called when the node resumes from a snapshot and // before it has received any append entries. @@ -356,7 +359,7 @@ namespace aft state->view_history.initialise(term_history); - ledger->init(index); + ledger->init(index, recovery_start_index); snapshotter->set_last_snapshot_idx(index); become_aware_of_new_term(term); diff --git a/src/consensus/aft/test/logging_stub.h b/src/consensus/aft/test/logging_stub.h index 604f101dfc88..431ddb5128b4 100644 --- a/src/consensus/aft/test/logging_stub.h +++ b/src/consensus/aft/test/logging_stub.h @@ -23,7 +23,7 @@ namespace aft LedgerStubProxy(const ccf::NodeId& id) : _id(id) {} - virtual void init(Index idx) {} + virtual void init(Index, Index) {} virtual void put_entry( const std::vector& original, diff --git a/src/consensus/ledger_enclave.h b/src/consensus/ledger_enclave.h index c66d9b3f9a27..99dff78e9b11 100644 --- a/src/consensus/ledger_enclave.h +++ b/src/consensus/ledger_enclave.h @@ -121,7 +121,8 @@ namespace consensus */ void truncate(Index idx) { - RINGBUFFER_WRITE_MESSAGE(consensus::ledger_truncate, to_host, idx); + RINGBUFFER_WRITE_MESSAGE( + consensus::ledger_truncate, to_host, idx, false /* no recovery */); } /** @@ -138,10 +139,12 @@ namespace consensus * Initialise ledger at a given index (e.g. after a snapshot) * * @param idx Index to start ledger from + * @param recovery_start_idx Index at which the recovery starts */ - void init(Index idx) + void init(Index idx = 0, Index recovery_start_idx = 0) { - RINGBUFFER_WRITE_MESSAGE(consensus::ledger_init, to_host, idx); + RINGBUFFER_WRITE_MESSAGE( + consensus::ledger_init, to_host, idx, recovery_start_idx); } }; } \ No newline at end of file diff --git a/src/consensus/ledger_enclave_types.h b/src/consensus/ledger_enclave_types.h index a1ce30bd77ee..b33a58795081 100644 --- a/src/consensus/ledger_enclave_types.h +++ b/src/consensus/ledger_enclave_types.h @@ -29,6 +29,7 @@ namespace consensus DEFINE_RINGBUFFER_MSG_TYPE(ledger_truncate), DEFINE_RINGBUFFER_MSG_TYPE(ledger_commit), DEFINE_RINGBUFFER_MSG_TYPE(ledger_init), + DEFINE_RINGBUFFER_MSG_TYPE(ledger_open), /// Create and commit a snapshot. Enclave -> Host DEFINE_RINGBUFFER_MSG_TYPE(snapshot), @@ -53,15 +54,19 @@ DECLARE_RINGBUFFER_MESSAGE_PAYLOAD( consensus::Index, consensus::LedgerRequestPurpose); -DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(consensus::ledger_init, consensus::Index); +DECLARE_RINGBUFFER_MESSAGE_PAYLOAD( + consensus::ledger_init, + consensus::Index /* start idx */, + consensus::Index /* recovery start idx */); DECLARE_RINGBUFFER_MESSAGE_PAYLOAD( consensus::ledger_append, bool /* committable */, bool /* force chunk */, std::vector); DECLARE_RINGBUFFER_MESSAGE_PAYLOAD( - consensus::ledger_truncate, consensus::Index); + consensus::ledger_truncate, consensus::Index, bool /* recovery mode */); DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(consensus::ledger_commit, consensus::Index); +DECLARE_RINGBUFFER_MESSAGE_NO_PAYLOAD(consensus::ledger_open); DECLARE_RINGBUFFER_MESSAGE_PAYLOAD( consensus::snapshot, consensus::Index /* snapshot idx */, diff --git a/src/ds/files.h b/src/ds/files.h index 0028758c7b40..483fdce740b8 100644 --- a/src/ds/files.h +++ b/src/ds/files.h @@ -12,8 +12,13 @@ #include #include +#define FMT_HEADER_ONLY +#include + namespace files { + namespace fs = std::filesystem; + /** * @brief Checks if a path exists * @@ -122,4 +127,15 @@ namespace files { return dump(std::vector(data.begin(), data.end()), file); } + + void rename(const fs::path& src, const fs::path& dst) + { + std::error_code ec; + fs::rename(src, dst, ec); + if (ec) + { + throw std::logic_error(fmt::format( + "Could not rename file {} to {}: {}", src, dst, ec.message())); + } + } } diff --git a/src/host/ledger.h b/src/host/ledger.h index c707122e000e..c910bffcd5d4 100644 --- a/src/host/ledger.h +++ b/src/host/ledger.h @@ -5,6 +5,7 @@ #include "ccf/ds/logger.h" #include "ccf/ds/nonstd.h" #include "consensus/ledger_enclave_types.h" +#include "ds/files.h" #include "ds/messaging.h" #include "ds/serialized.h" #include "kv/kv_types.h" @@ -32,16 +33,7 @@ namespace asynchost static constexpr auto ledger_start_idx_delimiter = "_"; static constexpr auto ledger_last_idx_delimiter = "-"; static constexpr auto ledger_corrupt_file_suffix = "corrupted"; - - static inline bool is_ledger_file_committed(const std::string& file_name) - { - auto pos = file_name.find("."); - if (pos == std::string::npos) - { - return false; - } - return file_name.substr(pos + 1) == ledger_committed_suffix; - } + static constexpr auto ledger_recovery_file_suffix = "recovery"; static inline size_t get_start_idx_from_file_name( const std::string& file_name) @@ -69,13 +61,29 @@ namespace asynchost return std::stol(file_name.substr(pos + 1)); } + static inline bool is_ledger_file_committed(const std::string& file_name) + { + return nonstd::ends_with(file_name, ledger_committed_suffix); + } + static inline bool is_ledger_file_name_corrupted(const std::string& file_name) { return nonstd::ends_with(file_name, ledger_corrupt_file_suffix); } + static inline bool is_ledger_file_name_recovery(const std::string& file_name) + { + return nonstd::ends_with(file_name, ledger_recovery_file_suffix); + } + + static inline fs::path remove_recovery_suffix(const std::string& file_name) + { + return nonstd::remove_suffix( + file_name, fmt::format(".{}", ledger_recovery_file_suffix)); + } + static std::optional get_file_name_with_idx( - const std::string& dir, size_t idx) + const std::string& dir, size_t idx, bool allow_recovery_files) { std::optional match = std::nullopt; for (auto const& f : fs::directory_iterator(dir)) @@ -84,7 +92,9 @@ namespace asynchost // (i.e. those with a last idx) and non-corrupted files are considered // here. auto f_name = f.path().filename(); - if (is_ledger_file_name_corrupted(f_name)) + if ( + is_ledger_file_name_corrupted(f_name) || + (!allow_recovery_files && is_ledger_file_name_recovery(f_name))) { continue; } @@ -117,8 +127,8 @@ namespace asynchost using positions_offset_header_t = size_t; static constexpr auto file_name_prefix = "ledger"; - const std::string dir; - std::string file_name; + const fs::path dir; + fs::path file_name; // This uses C stdio instead of fstream because an fstream // cannot be truncated. @@ -132,14 +142,23 @@ namespace asynchost bool completed = false; bool committed = false; + bool recovery = false; + public: // Used when creating a new (empty) ledger file - LedgerFile(const std::string& dir, size_t start_idx) : + LedgerFile(const fs::path& dir, size_t start_idx, bool recovery = false) : dir(dir), file_name(fmt::format("{}_{}", file_name_prefix, start_idx)), - start_idx(start_idx) + start_idx(start_idx), + recovery(recovery) { - auto file_path = fs::path(dir) / fs::path(file_name); + if (recovery) + { + file_name = + fmt::format("{}.{}", file_name.string(), ledger_recovery_file_suffix); + } + + auto file_path = dir / file_name; file = fopen(file_path.c_str(), "w+b"); if (!file) { @@ -157,7 +176,7 @@ namespace asynchost dir(dir), file_name(file_name_) { - auto file_path = (fs::path(dir) / fs::path(file_name)); + auto file_path = dir / file_name; file = fopen(file_path.c_str(), "r+b"); if (!file) { @@ -278,6 +297,11 @@ namespace asynchost return completed; } + bool is_recovery() const + { + return recovery; + } + size_t write_entry(const uint8_t* data, size_t size, bool committable) { fseeko(file, total_len, SEEK_SET); @@ -352,7 +376,7 @@ namespace asynchost if (idx == start_idx - 1) { // Truncating everything triggers file deletion - if (!fs::remove(fs::path(dir) / fs::path(file_name))) + if (!fs::remove(dir / file_name)) { throw std::logic_error( fmt::format("Could not remove file {}", file_name)); @@ -428,6 +452,33 @@ namespace asynchost completed = true; } + bool rename(const std::string& new_file_name) + { + auto file_path = dir / file_name; + auto new_file_path = dir / new_file_name; + + try + { + files::rename(file_path, new_file_path); + } + catch (const std::exception& e) + { + // If the file cannot be renamed (e.g. file was removed), report an + // error and continue + LOG_FAIL_FMT("Error renaming ledger file: {}", e.what()); + } + file_name = new_file_name; + return true; + } + + void open() + { + auto new_file_name = remove_recovery_suffix(file_name); + rename(new_file_name); + recovery = false; + LOG_DEBUG_FMT("Open recovery ledger file {}", new_file_name); + } + bool commit(size_t idx) { if (!completed || committed || (idx != get_last_idx())) @@ -442,35 +493,30 @@ namespace asynchost fmt::format("Failed to flush ledger file: {}", strerror(errno))); } - const auto committed_file_name = fmt::format( + auto committed_file_name = fmt::format( "{}_{}-{}.{}", file_name_prefix, start_idx, get_last_idx(), ledger_committed_suffix); - auto file_path = fs::path(dir) / fs::path(file_name); - auto committed_file_path = fs::path(dir) / fs::path(committed_file_name); - - std::error_code ec; - fs::rename(file_path, committed_file_path, ec); - if (ec) + if (recovery) { - // Even if the file cannot be renamed (e.g. file was removed), report an - // error and continue - LOG_FAIL_FMT( - "Could not rename committed ledger file {} to {}", - file_path, - committed_file_path); + committed_file_name = fmt::format( + "{}.{}", committed_file_name, ledger_recovery_file_suffix); } - else + + if (!rename(committed_file_name)) { - file_name = committed_file_name; - committed = true; - LOG_DEBUG_FMT("Committed ledger file {}", file_name); + return false; } - return true; + committed = true; + LOG_DEBUG_FMT("Committed ledger file {}", file_name); + + // Committed recovery files stay in the list of active files until the + // ledger is open + return !recovery; } }; @@ -483,10 +529,10 @@ namespace asynchost ringbuffer::WriterPtr to_enclave; // Main ledger directory (write and read) - const std::string ledger_dir; + const fs::path ledger_dir; // Ledger directories (read-only) - std::vector read_ledger_dirs; + std::vector read_ledger_dirs; // Keep tracks of all ledger files for writing. // Current ledger file is always the last one @@ -506,6 +552,10 @@ namespace asynchost // True if a new file should be created when writing an entry bool require_new_file; + // Set during recovery to mark files as temporary until the recovery is + // complete + std::optional recovery_start_idx = std::nullopt; + auto get_it_contains_idx(size_t idx) const { if (idx == 0) @@ -546,8 +596,10 @@ namespace asynchost // If the file is not in the cache, find the file from the ledger // directories, inspecting the main ledger directory first + // Note: reading recovery chunks from main ledger directory is + // acceptable and in fact required to complete private recovery. std::string ledger_dir_; - auto match = get_file_name_with_idx(ledger_dir, idx); + auto match = get_file_name_with_idx(ledger_dir, idx, true); if (match.has_value()) { ledger_dir_ = ledger_dir; @@ -556,7 +608,7 @@ namespace asynchost { for (auto const& dir : read_ledger_dirs) { - match = get_file_name_with_idx(dir, idx); + match = get_file_name_with_idx(dir, idx, false); if (match.has_value()) { ledger_dir_ = dir; @@ -570,8 +622,8 @@ namespace asynchost return nullptr; } - // Emplace file in the max-sized read cache, replacing the oldest entry if - // the read cache is full + // Emplace file in the max-sized read cache, replacing the oldest entry + // if the read cache is full auto match_file = std::make_shared(ledger_dir_, match.value()); @@ -661,14 +713,13 @@ namespace asynchost public: Ledger( - const std::string& ledger_dir, + const fs::path& ledger_dir, ringbuffer::AbstractWriterFactory& writer_factory, size_t chunk_threshold, size_t max_read_cache_files = ledger_max_read_cache_files_default, - std::vector read_ledger_dirs = {}) : + const std::vector& read_ledger_dirs_ = {}) : to_enclave(writer_factory.create_writer_to_inside()), ledger_dir(ledger_dir), - read_ledger_dirs(read_ledger_dirs), max_read_cache_files(max_read_cache_files), chunk_threshold(chunk_threshold) { @@ -680,15 +731,17 @@ namespace asynchost } // Recover last idx from read-only ledger directories - for (const auto& read_dir : read_ledger_dirs) + for (const auto& read_dir : read_ledger_dirs_) { LOG_INFO_FMT("Recovering read-only ledger directory \"{}\"", read_dir); if (!fs::is_directory(read_dir)) { - throw std::logic_error(fmt::format( - "\"{}\" read-only ledger is not a directory", read_dir)); + throw std::logic_error( + fmt::format("{} read-only ledger is not a directory", read_dir)); } + read_ledger_dirs.emplace_back(read_dir); + for (auto const& f : fs::directory_iterator(read_dir)) { auto last_idx_ = get_last_idx_from_file_name(f.path().filename()); @@ -723,7 +776,7 @@ namespace asynchost if (fs::is_directory(ledger_dir)) { // If the ledger directory exists, recover ledger files from it - LOG_INFO_FMT("Recovering main ledger directory \"{}\"", ledger_dir); + LOG_INFO_FMT("Recovering main ledger directory {}", ledger_dir); std::vector corrupt_files = {}; for (auto const& f : fs::directory_iterator(ledger_dir)) @@ -735,6 +788,15 @@ namespace asynchost continue; } + if (is_ledger_file_name_recovery(file_name)) + { + LOG_INFO_FMT( + "Deleting recovery ledger file {} in main ledger directory", + file_name); + fs::remove(f); + continue; + } + std::shared_ptr ledger_file = nullptr; try { @@ -759,10 +821,10 @@ namespace asynchost { auto new_file_name = fmt::format( "{}.{}", f.filename().string(), ledger_corrupt_file_suffix); - fs::rename(f, fs::path(ledger_dir) / fs::path(new_file_name)); + files::rename(f, ledger_dir / new_file_name); LOG_INFO_FMT( - "Renamed invalid ledger file {} to \"{}\" (file will be ignored)", + "Renamed invalid ledger file {} to {} (file will be ignored)", f.filename(), new_file_name); } @@ -770,7 +832,7 @@ namespace asynchost if (files.empty()) { LOG_INFO_FMT( - "Main ledger directory \"{}\" is empty: no ledger file to " + "Main ledger directory {} is empty: no ledger file to " "recover", ledger_dir); require_new_file = true; @@ -839,7 +901,7 @@ namespace asynchost Ledger(const Ledger& that) = delete; - void init(size_t idx) + void init(size_t idx, size_t recovery_start_idx_ = 0) { TimeBoundLogger log_if_slow(fmt::format("Initing ledger - idx={}", idx)); @@ -871,9 +933,49 @@ namespace asynchost require_new_file = true; } - LOG_INFO_FMT("Setting last known/commit index to {}", idx); last_idx = idx; committed_idx = idx; + if (recovery_start_idx_ > 0) + { + // Do not set recovery idx and create recovery chunks + // if the ledger is initialised from 0 (i.e. genesis) + recovery_start_idx = recovery_start_idx_; + } + + LOG_INFO_FMT( + "Set last known/commit index to {}, recovery idx to {}", + idx, + recovery_start_idx_); + } + + void complete_recovery() + { + // When the recovery is completed (i.e. service is open), temporary + // recovery ledger chunks are renamed as they can now be recovered. + // Note: this operation cannot be rolled back. + LOG_INFO_FMT("Ledger complete recovery"); + + for (auto it = files.begin(); it != files.end();) + { + auto& f = *it; + if (f->is_recovery()) + { + f->open(); + + // Recovery files are kept in the list of active files when committed + // so that they can be renamed in a stable order when the service is + // open. Once this is done, they can be removed from the list of + // active files. + if (f->is_committed()) + { + it = files.erase(it); + continue; + } + } + ++it; + } + + recovery_start_idx.reset(); } size_t get_last_idx() const @@ -881,6 +983,11 @@ namespace asynchost return last_idx; } + void set_recovery_start_idx(size_t idx) + { + recovery_start_idx = idx; + } + std::optional> read_entry(size_t idx) { TimeBoundLogger log_if_slow( @@ -947,7 +1054,12 @@ namespace asynchost if (require_new_file) { - files.push_back(std::make_shared(ledger_dir, last_idx + 1)); + size_t start_idx = last_idx + 1; + bool is_recovery = recovery_start_idx.has_value() && + start_idx > recovery_start_idx.value(); + + files.push_back( + std::make_shared(ledger_dir, last_idx + 1, is_recovery)); require_new_file = false; } @@ -1124,7 +1236,9 @@ namespace asynchost DISPATCHER_SET_MESSAGE_HANDLER( disp, consensus::ledger_init, [this](const uint8_t* data, size_t size) { auto idx = serialized::read(data, size); - init(idx); + auto recovery_start_idx = + serialized::read(data, size); + init(idx, recovery_start_idx); }); DISPATCHER_SET_MESSAGE_HANDLER( @@ -1141,7 +1255,12 @@ namespace asynchost consensus::ledger_truncate, [this](const uint8_t* data, size_t size) { auto idx = serialized::read(data, size); + auto recovery_mode = serialized::read(data, size); truncate(idx); + if (recovery_mode) + { + set_recovery_start_idx(idx); + } }); DISPATCHER_SET_MESSAGE_HANDLER( @@ -1152,6 +1271,11 @@ namespace asynchost commit(idx); }); + DISPATCHER_SET_MESSAGE_HANDLER( + disp, consensus::ledger_open, [this](const uint8_t*, size_t) { + complete_recovery(); + }); + DISPATCHER_SET_MESSAGE_HANDLER( disp, consensus::ledger_get_range, diff --git a/src/host/test/ledger.cpp b/src/host/test/ledger.cpp index 968faebdf3ed..53bd74308ae2 100644 --- a/src/host/test/ledger.cpp +++ b/src/host/test/ledger.cpp @@ -92,12 +92,16 @@ size_t number_of_files_in_ledger_dir() return file_count; } -size_t number_of_committed_files_in_ledger_dir() +size_t number_of_committed_files_in_ledger_dir(bool allow_recovery = false) { size_t committed_file_count = 0; for (auto const& f : fs::directory_iterator(ledger_dir)) { - if (is_ledger_file_committed(f.path().string())) + auto file_name = f.path().string(); + if ( + (allow_recovery && is_ledger_file_name_recovery(file_name) && + file_name.find(ledger_committed_suffix) != std::string::npos) || + is_ledger_file_committed(file_name)) { committed_file_count++; } @@ -106,6 +110,20 @@ size_t number_of_committed_files_in_ledger_dir() return committed_file_count; } +size_t number_of_recovery_files_in_ledger_dir() +{ + size_t recovery_file_count = 0; + for (auto const& f : fs::directory_iterator(ledger_dir)) + { + if (is_ledger_file_name_recovery(f.path())) + { + recovery_file_count++; + } + } + + return recovery_file_count; +} + void verify_framed_entries_range( const std::vector& framed_entries, size_t from, size_t to) { @@ -1450,4 +1468,147 @@ TEST_CASE("Chunking according to entry header flag") // Forcing a new chunk before creating a new chunk to store this entry REQUIRE(number_of_files_in_ledger_dir() == ledger_files_count + 1); } +} + +TEST_CASE("Recovery") +{ + auto dir = AutoDeleteFolder(ledger_dir); + + size_t chunk_threshold = 30; + size_t entries_per_chunk = get_entries_per_chunk(chunk_threshold); + + SUBCASE("Enable and complete recovery") + { + Ledger ledger(ledger_dir, wf, chunk_threshold); + TestEntrySubmitter entry_submitter(ledger); + size_t pre_recovery_last_idx = 0; + + INFO("Write many entries on ledger"); + { + size_t chunk_count = 5; + initialise_ledger(entry_submitter, chunk_threshold, chunk_count); + pre_recovery_last_idx = entry_submitter.get_last_idx(); + ledger.commit(pre_recovery_last_idx); + } + + INFO("Enable recovery"); + { + REQUIRE(number_of_recovery_files_in_ledger_dir() == 0); + ledger.set_recovery_start_idx(pre_recovery_last_idx); + + entry_submitter.write(true); + REQUIRE(number_of_recovery_files_in_ledger_dir() == 1); + } + + INFO("Truncation does not affect recovery mode"); + { + entry_submitter.truncate(pre_recovery_last_idx); + REQUIRE(number_of_recovery_files_in_ledger_dir() == 0); + entry_submitter.write(true); + REQUIRE(number_of_recovery_files_in_ledger_dir() == 1); + } + + INFO("Create and commit more recovery chunks"); + { + for (size_t i = 0; i < entries_per_chunk; i++) + { + entry_submitter.write(true); + } + REQUIRE(number_of_recovery_files_in_ledger_dir() == 2); + + // Reading from uncommitted recovery chunks is OK + read_entries_range_from_ledger(ledger, 1, entry_submitter.get_last_idx()); + + // Committed files are also marked .recovery + auto initial_number_committed_files = + number_of_committed_files_in_ledger_dir(true); + ledger.commit(entry_submitter.get_last_idx()); + REQUIRE(number_of_recovery_files_in_ledger_dir() == 2); + REQUIRE( + number_of_committed_files_in_ledger_dir(true) == + initial_number_committed_files + 1); + + // Reading from committed recovery chunks is OK + read_entries_range_from_ledger(ledger, 1, entry_submitter.get_last_idx()); + } + + INFO("Finally open the ledger"); + { + ledger.complete_recovery(); + + // All recovery chunks are gone + REQUIRE(number_of_recovery_files_in_ledger_dir() == 0); + + // Further chunks are not marked as recovery + for (size_t i = 0; i < entries_per_chunk; i++) + { + entry_submitter.write(true); + } + REQUIRE(number_of_recovery_files_in_ledger_dir() == 0); + + // Even ones that are committed + ledger.commit(entry_submitter.get_last_idx()); + REQUIRE(number_of_recovery_files_in_ledger_dir() == 0); + } + } + + SUBCASE("Recover ledger with recovery chunks") + { + Ledger ledger(ledger_dir, wf, chunk_threshold); + TestEntrySubmitter entry_submitter(ledger); + size_t pre_recovery_last_idx = 0; + size_t last_idx = 0; + + INFO("Write many entries on ledger"); + { + size_t chunk_count = 5; + initialise_ledger(entry_submitter, chunk_threshold, chunk_count); + pre_recovery_last_idx = entry_submitter.get_last_idx(); + ledger.commit(pre_recovery_last_idx); + } + + INFO("Enable recovery"); + { + REQUIRE(number_of_recovery_files_in_ledger_dir() == 0); + ledger.set_recovery_start_idx(pre_recovery_last_idx); + + for (size_t i = 0; i < entries_per_chunk + 1; i++) + { + entry_submitter.write(true); + } + REQUIRE(number_of_recovery_files_in_ledger_dir() == 2); + last_idx = entry_submitter.get_last_idx(); + } + + INFO("New ledger recovery in read-only ledger directory"); + { + auto new_ledger_dir = "new_ledger_dir"; + Ledger new_ledger( + new_ledger_dir, + wf, + chunk_threshold, + ledger_max_read_cache_files_default, + {ledger_dir}); + + // Recovery files in read-only ledger directory are ignored on startup + REQUIRE(number_of_recovery_files_in_ledger_dir() == 2); + REQUIRE_THROWS(read_entries_range_from_ledger(new_ledger, 1, last_idx)); + + // Entries pre-recovery can still be read + read_entries_range_from_ledger(new_ledger, 1, pre_recovery_last_idx); + } + + INFO("New ledger recovery in main ledger directory"); + { + Ledger new_ledger(ledger_dir, wf, chunk_threshold); + + // Recovery files in main ledger directory are automatically deleted on + // ledger creation + REQUIRE(number_of_recovery_files_in_ledger_dir() == 0); + REQUIRE_THROWS(read_entries_range_from_ledger(new_ledger, 1, last_idx)); + + // Entries pre-recovery can still be read + read_entries_range_from_ledger(new_ledger, 1, pre_recovery_last_idx); + } + } } \ No newline at end of file diff --git a/src/kv/kv_types.h b/src/kv/kv_types.h index 76eae7a26720..95b8ddbb550d 100644 --- a/src/kv/kv_types.h +++ b/src/kv/kv_types.h @@ -469,7 +469,7 @@ namespace kv virtual void force_become_primary( ccf::SeqNo, ccf::View, const std::vector&, ccf::SeqNo) = 0; virtual void init_as_backup( - ccf::SeqNo, ccf::View, const std::vector&) = 0; + ccf::SeqNo, ccf::View, const std::vector&, ccf::SeqNo) = 0; virtual bool replicate(const BatchVector& entries, ccf::View view) = 0; virtual std::pair get_committed_txid() = 0; diff --git a/src/kv/test/stub_consensus.h b/src/kv/test/stub_consensus.h index 1d85eacdc403..90f7e33803c4 100644 --- a/src/kv/test/stub_consensus.h +++ b/src/kv/test/stub_consensus.h @@ -82,7 +82,10 @@ namespace kv::test } virtual void init_as_backup( - ccf::SeqNo, ccf::View, const std::vector&) override + ccf::SeqNo, + ccf::View, + const std::vector&, + ccf::SeqNo) override { state = Backup; } diff --git a/src/node/node_state.h b/src/node/node_state.h index 52856d304cc0..69c0c546adda 100644 --- a/src/node/node_state.h +++ b/src/node/node_state.h @@ -564,7 +564,10 @@ namespace ccf } consensus->init_as_backup( - network.tables->current_version(), view, view_history); + network.tables->current_version(), + view, + view_history, + last_recovered_signed_idx); if (resp.network_info->public_only) { @@ -893,7 +896,7 @@ namespace ccf // Note: KV term must be set before the first Tx is committed network.tables->rollback( {last_recovered_term, last_recovered_signed_idx}, new_term); - ledger_truncate(last_recovered_signed_idx); + ledger_truncate(last_recovered_signed_idx, true); snapshotter->rollback(last_recovered_signed_idx); LOG_INFO_FMT( @@ -922,7 +925,6 @@ namespace ccf kv::Term view = 0; kv::Version global_commit = 0; - // auto ls = g.get_last_signature(); auto ls = tx.ro(network.signatures)->get(); if (ls.has_value()) { @@ -1050,6 +1052,7 @@ namespace ccf // Shares for the new ledger secret can only be issued now, once the // previous ledger secrets have been recovered share_manager.issue_recovery_shares(tx); + GenesisGenerator g(network, tx); if (!g.open_service()) { @@ -1854,8 +1857,15 @@ namespace ccf w->cert.str()); return; } + network.identity->set_certificate(w->cert); - open_user_frontend(); + if (w->status == ServiceStatus::OPEN) + { + open_user_frontend(); + + RINGBUFFER_WRITE_MESSAGE(consensus::ledger_open, to_host); + LOG_INFO_FMT("Service open at seqno {}", hook_version); + } })); } @@ -2071,9 +2081,10 @@ namespace ccf consensus::LedgerRequestPurpose::Recovery); } - void ledger_truncate(consensus::Index idx) + void ledger_truncate(consensus::Index idx, bool recovery_mode = false) { - RINGBUFFER_WRITE_MESSAGE(consensus::ledger_truncate, to_host, idx); + RINGBUFFER_WRITE_MESSAGE( + consensus::ledger_truncate, to_host, idx, recovery_mode); } }; } \ No newline at end of file diff --git a/tests/infra/network.py b/tests/infra/network.py index dc458ab84e99..c2170fa365e3 100644 --- a/tests/infra/network.py +++ b/tests/infra/network.py @@ -538,7 +538,7 @@ def recover(self, args): def ignore_errors_on_shutdown(self): self.ignoring_shutdown_errors = True - def check_ledger_files_identical(self): + def check_ledger_files_identical(self, read_recovery_ledger_files=False): # Note: Should be called on stopped service # Verify that all ledger files on stopped nodes exist on most up-to-date node # and are identical @@ -553,7 +553,9 @@ def check_ledger_files_identical(self): ) ledger = node.remote.ledger_paths() - last_seqno = Ledger(ledger).get_latest_public_state()[1] + last_seqno = Ledger( + ledger, read_recovery_files=read_recovery_ledger_files + ).get_latest_public_state()[1] nodes_ledger[node.local_node_id] = [ledger, last_seqno] if last_seqno > longest_ledger_seqno: longest_ledger_seqno = last_seqno @@ -583,7 +585,9 @@ def list_files_in_dirs_with_checksums(dirs): f"Verified {len(longest_ledger_files)} ledger files consistency on all {len(self.nodes)} stopped nodes" ) - def stop_all_nodes(self, skip_verification=False, verbose_verification=False): + def stop_all_nodes( + self, skip_verification=False, verbose_verification=False, **kwargs + ): if not skip_verification: if self.txs is not None: LOG.info("Verifying that all committed txs can be read before shutdown") @@ -600,7 +604,7 @@ def stop_all_nodes(self, skip_verification=False, verbose_verification=False): fatal_error_found = True LOG.info("All nodes stopped") - self.check_ledger_files_identical() + self.check_ledger_files_identical(**kwargs) if fatal_error_found: if self.ignoring_shutdown_errors: diff --git a/tests/recovery.py b/tests/recovery.py index 6dae2ab7a25b..678bdcd572da 100644 --- a/tests/recovery.py +++ b/tests/recovery.py @@ -22,6 +22,11 @@ def split_all_ledger_files_in_dir(input_dir, output_dir): # at any one (but not the last one, which would have no effect) at random. for ledger_file in os.listdir(input_dir): sig_seqnos = [] + + if ledger_file.endswith(ccf.ledger.RECOVERY_FILE_SUFFIX): + # Ignore recovery files + continue + ledger_file_path = os.path.join(input_dir, ledger_file) ledger_chunk = ccf.ledger.LedgerChunk(ledger_file_path, ledger_validator=None) for transaction in ledger_chunk: @@ -46,9 +51,9 @@ def split_all_ledger_files_in_dir(input_dir, output_dir): os.remove(ledger_file_path) -@reqs.description("Recovering a network") +@reqs.description("Recovering a service") @reqs.recover(number_txs=2) -def test(network, args, from_snapshot=False, split_ledger=False): +def test_recover_service(network, args, from_snapshot=False, split_ledger=False): old_primary, _ = network.find_primary() snapshots_dir = None @@ -71,7 +76,11 @@ def test(network, args, from_snapshot=False, split_ledger=False): ) recovered_network = infra.network.Network( - args.nodes, args.binary_dir, args.debug_nodes, args.perf_nodes, network + args.nodes, + args.binary_dir, + args.debug_nodes, + args.perf_nodes, + existing_network=network, ) recovered_network.start_in_recovery( args, @@ -79,11 +88,81 @@ def test(network, args, from_snapshot=False, split_ledger=False): committed_ledger_dirs=committed_ledger_dirs, snapshots_dir=snapshots_dir, ) + recovered_network.recover(args) return recovered_network +@reqs.description("Attempt to recover a service but abort before recovery is complete") +def test_recover_service_aborted(network, args, from_snapshot=False): + old_primary, _ = network.find_primary() + + snapshots_dir = None + if from_snapshot: + snapshots_dir = network.get_committed_snapshots(old_primary) + + network.stop_all_nodes() + current_ledger_dir, committed_ledger_dirs = old_primary.get_ledger() + + aborted_network = infra.network.Network( + args.nodes, args.binary_dir, args.debug_nodes, args.perf_nodes, network + ) + aborted_network.start_in_recovery( + args, + ledger_dir=current_ledger_dir, + committed_ledger_dirs=committed_ledger_dirs, + snapshots_dir=snapshots_dir, + ) + + LOG.info("Fill in ledger to trigger new chunks, which should be marked as recovery") + primary, _ = aborted_network.find_primary() + while ( + len( + [ + f + for f in os.listdir(primary.remote.ledger_paths()[0]) + if f.endswith( + f"{ccf.ledger.COMMITTED_FILE_SUFFIX}{ccf.ledger.RECOVERY_FILE_SUFFIX}" + ) + ] + ) + < 2 + ): + # Submit large proposal until at least two recovery ledger chunks are committed + aborted_network.consortium.create_and_withdraw_large_proposal(primary) + + LOG.info( + "Do not complete service recovery on purpose and initiate new recovery from scratch" + ) + + snapshots_dir = None + if from_snapshot: + snapshots_dir = network.get_committed_snapshots(primary) + + # Check that all nodes have the same (recovery) ledger files + aborted_network.stop_all_nodes( + skip_verification=True, read_recovery_ledger_files=True + ) + + current_ledger_dir, committed_ledger_dirs = primary.get_ledger() + recovered_network = infra.network.Network( + args.nodes, + args.binary_dir, + args.debug_nodes, + args.perf_nodes, + existing_network=aborted_network, + ) + recovered_network.start_in_recovery( + args, + ledger_dir=current_ledger_dir, + committed_ledger_dirs=committed_ledger_dirs, + snapshots_dir=snapshots_dir, + ) + recovered_network.recover(args) + return recovered_network + + @reqs.description("Recovering a network, kill one node while submitting shares") @reqs.recover(number_txs=2) def test_share_resilience(network, args, from_snapshot=False): @@ -147,7 +226,7 @@ def test_share_resilience(network, args, from_snapshot=False): return recovered_network -def run(args): +def run(args, recoveries_count): txs = app.LoggingTxs("user0") with infra.network.network( args.nodes, @@ -159,7 +238,7 @@ def run(args): ) as network: network.start_and_open(args) - for i in range(args.recovery): + for i in range(recoveries_count): # Issue transactions which will required historical ledger queries recovery # when the network is shutdown network.txs.issue(network, number_txs=1) @@ -167,18 +246,17 @@ def run(args): # Alternate between recovery with primary change and stable primary-ship, # with and without snapshots - if i % 2 == 0: + if i % recoveries_count == 0: if args.consensus != "BFT": - recovered_network = test_share_resilience( - network, args, from_snapshot=True - ) - else: - recovered_network = network + network = test_share_resilience(network, args, from_snapshot=True) + elif i % recoveries_count == 1: + network = test_recover_service_aborted( + network, args, from_snapshot=False + ) # TODO: Also tests with snapshots else: - recovered_network = test( + network = test_recover_service( network, args, from_snapshot=False, split_ledger=True ) - network = recovered_network for node in network.get_joined_nodes(): node.verify_certificate_validity_period() @@ -211,15 +289,12 @@ def run(args): def add(parser): parser.description = """ -This test executes multiple recoveries (as specified by the "--recovery" arg), +This test_recover_service executes multiple recoveries (as specified by the "--recovery" arg), with a fixed number of messages applied between each network crash (as specified by the "--msgs-per-recovery" arg). After the network is recovered and before applying new transactions, all transactions previously applied are checked. Note that the key for each logging message is unique (per table). """ - parser.add_argument( - "--recovery", help="Number of recoveries to perform", type=int, default=2 - ) parser.add_argument( "--msgs-per-recovery", help="Number of public and private messages between two recoveries", @@ -236,4 +311,4 @@ def add(parser): args.ledger_chunk_bytes = "50KB" args.snapshot_tx_interval = 30 - run(args) + run(args, recoveries_count=3) diff --git a/tests/suite/test_suite.py b/tests/suite/test_suite.py index 21ef0204b97f..9654d6dbeba4 100644 --- a/tests/suite/test_suite.py +++ b/tests/suite/test_suite.py @@ -21,20 +21,20 @@ # This suite tests that rekeying, network configuration changes, recoveries and # curve changes can be interleaved suite_rekey_recovery = [ - recovery.test, + recovery.test_recover_service, reconfiguration.test_add_node, reconfiguration.test_add_node_on_other_curve, e2e_logging.test_rekey, reconfiguration.test_add_node, reconfiguration.test_add_node_on_other_curve, - recovery.test, + recovery.test_recover_service, e2e_logging.test_rekey, reconfiguration.test_add_node, reconfiguration.test_change_curve, reconfiguration.test_add_node, reconfiguration.test_add_node_on_other_curve, e2e_logging.test_rekey, - recovery.test, + recovery.test_recover_service, e2e_logging.test_rekey, ] suites["rekey_recovery"] = suite_rekey_recovery @@ -42,13 +42,13 @@ # This suite tests that membership changes and recoveries can be interleaved suite_membership_recovery = [ membership.test_add_member, - recovery.test, + recovery.test_recover_service, membership.test_remove_member, - recovery.test, + recovery.test_recover_service, membership.test_set_recovery_threshold, - recovery.test, + recovery.test_recover_service, membership.test_update_recovery_shares, - recovery.test, + recovery.test_recover_service, ] suites["membership_recovery"] = suite_membership_recovery @@ -103,7 +103,8 @@ reconfiguration.test_node_certificates_validity_period, reconfiguration.test_retire_primary, # recovery: - recovery.test, + recovery.test_recover_service, + recovery.test_recover_service_aborted, # rekey: e2e_logging.test_rekey, # election: @@ -114,7 +115,7 @@ code_update.test_add_node_with_bad_code, # curve migration: reconfiguration.test_change_curve, - recovery.test, + recovery.test_recover_service, # jwt jwt_test.test_refresh_jwt_issuer, # governance