From 6efd32a590c3da5749a0578a40e47a735cfe5fab Mon Sep 17 00:00:00 2001 From: Akanksha Mahajan Date: Wed, 1 Jun 2022 10:52:26 -0700 Subject: [PATCH] Persist the new MANIFEST after successfully syncing the new WAL during recovery (#9922) Summary: In case of non-TransactionDB and avoid_flush_during_recovery = true, RocksDB won't flush the data from WAL to L0 for all column families if possible. As a result, not all column families can increase their log_numbers, and min_log_number_to_keep won't change. For transaction DB (.allow_2pc), even with the flush, there may be old WAL files that it must not delete because they can contain data of uncommitted transactions and min_log_number_to_keep won't change. If we persist a new MANIFEST with advanced log_numbers for some column families, then during a second crash after persisting the MANIFEST, RocksDB will see some column families' log_numbers larger than the corrupted wal, and the "column family inconsistency" error will be hit, causing recovery to fail. As a solution, RocksDB will persist the new MANIFEST after successfully syncing the new WAL. If a future recovery starts from the new MANIFEST, then it means the new WAL is successfully synced. Due to the sentinel empty write batch at the beginning, kPointInTimeRecovery of WAL is guaranteed to go after this point. If future recovery starts from the old MANIFEST, it means the writing the new MANIFEST failed. We won't have the "SST ahead of WAL" error. Currently, RocksDB DB::Open() may creates and writes to two new MANIFEST files even before recovery succeeds. This PR buffers the edits in a structure and writes to a new MANIFEST after recovery is successful Pull Request resolved: https://github.com/facebook/rocksdb/pull/9922 Test Plan: 1. Update unit tests to fail without this change 2. make crast_test -j Branch with unit test and no fix https://github.com/facebook/rocksdb/pull/9942 to keep track of unit test (without fix) Reviewed By: riversand963 Differential Revision: D36043701 Pulled By: akankshamahajan15 fbshipit-source-id: 5760970db0a0920fb73d3c054a4155733500acd9 --- db/corruption_test.cc | 77 +++++++-- db/db_impl/db_impl.h | 60 ++++++- db/db_impl/db_impl_files.cc | 36 ++-- db/db_impl/db_impl_open.cc | 157 ++++++++++-------- db/db_impl/db_impl_secondary.cc | 3 +- db/db_impl/db_impl_secondary.h | 4 +- .../test/java/org/rocksdb/RocksDBTest.java | 2 +- monitoring/stats_history_test.cc | 10 +- 8 files changed, 236 insertions(+), 113 deletions(-) diff --git a/db/corruption_test.cc b/db/corruption_test.cc index 8a8a2770d85..4d90cfaed5e 100644 --- a/db/corruption_test.cc +++ b/db/corruption_test.cc @@ -1065,7 +1065,7 @@ INSTANTIATE_TEST_CASE_P(CorruptionTest, CrashDuringRecoveryWithCorruptionTest, // The combination of corrupting a WAL and injecting an error during subsequent // re-open exposes the bug of prematurely persisting a new MANIFEST with // advanced ColumnFamilyData::log_number. -TEST_P(CrashDuringRecoveryWithCorruptionTest, DISABLED_CrashDuringRecovery) { +TEST_P(CrashDuringRecoveryWithCorruptionTest, CrashDuringRecovery) { CloseDb(); Options options; options.track_and_verify_wals_in_manifest = @@ -1107,7 +1107,8 @@ TEST_P(CrashDuringRecoveryWithCorruptionTest, DISABLED_CrashDuringRecovery) { // number. TEST_SwitchMemtable makes sure WALs are not synced and test can // corrupt un-sync WAL. for (int i = 0; i < 2; ++i) { - ASSERT_OK(db_->Put(WriteOptions(), "key" + std::to_string(i), "value")); + ASSERT_OK(db_->Put(WriteOptions(), "key" + std::to_string(i), + "value" + std::to_string(i))); ASSERT_OK(dbimpl->TEST_SwitchMemtable()); } @@ -1188,6 +1189,23 @@ TEST_P(CrashDuringRecoveryWithCorruptionTest, DISABLED_CrashDuringRecovery) { { options.avoid_flush_during_recovery = avoid_flush_during_recovery_; ASSERT_OK(DB::Open(options, dbname_, cf_descs, &handles, &db_)); + + // Verify that data is not lost. + { + std::string v; + ASSERT_OK(db_->Get(ReadOptions(), handles[1], "old_key", &v)); + ASSERT_EQ("dontcare", v); + + v.clear(); + ASSERT_OK(db_->Get(ReadOptions(), "key" + std::to_string(0), &v)); + ASSERT_EQ("value" + std::to_string(0), v); + + // Since it's corrupting second last wal, below key is not found. + v.clear(); + ASSERT_EQ(db_->Get(ReadOptions(), "key" + std::to_string(1), &v), + Status::NotFound()); + } + for (auto* h : handles) { delete h; } @@ -1219,8 +1237,7 @@ TEST_P(CrashDuringRecoveryWithCorruptionTest, DISABLED_CrashDuringRecovery) { // The combination of corrupting a WAL and injecting an error during subsequent // re-open exposes the bug of prematurely persisting a new MANIFEST with // advanced ColumnFamilyData::log_number. -TEST_P(CrashDuringRecoveryWithCorruptionTest, - DISABLED_TxnDbCrashDuringRecovery) { +TEST_P(CrashDuringRecoveryWithCorruptionTest, TxnDbCrashDuringRecovery) { CloseDb(); Options options; options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery; @@ -1271,13 +1288,14 @@ TEST_P(CrashDuringRecoveryWithCorruptionTest, // Put and flush cf0 for (int i = 0; i < 2; ++i) { - ASSERT_OK(txn_db->Put(WriteOptions(), "dontcare", "value")); + ASSERT_OK(txn_db->Put(WriteOptions(), "key" + std::to_string(i), + "value" + std::to_string(i))); ASSERT_OK(dbimpl->TEST_SwitchMemtable()); } // Put cf1 txn = txn_db->BeginTransaction(WriteOptions(), TransactionOptions()); - ASSERT_OK(txn->Put(handles[1], "foo1", "value")); + ASSERT_OK(txn->Put(handles[1], "foo1", "value1")); ASSERT_OK(txn->Commit()); delete txn; @@ -1337,7 +1355,6 @@ TEST_P(CrashDuringRecoveryWithCorruptionTest, std::vector file_nums; GetSortedWalFiles(file_nums); size_t size = file_nums.size(); - assert(size >= 2); uint64_t log_num = file_nums[size - 1]; CorruptFileWithTruncation(FileType::kWalFile, log_num); } @@ -1354,6 +1371,27 @@ TEST_P(CrashDuringRecoveryWithCorruptionTest, { ASSERT_OK(TransactionDB::Open(options, txn_db_opts, dbname_, cf_descs, &handles, &txn_db)); + + // Verify that data is not lost. + { + std::string v; + // Key not visible since it's not committed. + ASSERT_EQ(txn_db->Get(ReadOptions(), handles[1], "foo", &v), + Status::NotFound()); + + v.clear(); + ASSERT_OK(txn_db->Get(ReadOptions(), "key" + std::to_string(0), &v)); + ASSERT_EQ("value" + std::to_string(0), v); + + // Last WAL is corrupted which contains two keys below. + v.clear(); + ASSERT_EQ(txn_db->Get(ReadOptions(), "key" + std::to_string(1), &v), + Status::NotFound()); + v.clear(); + ASSERT_EQ(txn_db->Get(ReadOptions(), handles[1], "foo1", &v), + Status::NotFound()); + } + for (auto* h : handles) { delete h; } @@ -1396,8 +1434,7 @@ TEST_P(CrashDuringRecoveryWithCorruptionTest, // The combination of corrupting a WAL and injecting an error during subsequent // re-open exposes the bug of prematurely persisting a new MANIFEST with // advanced ColumnFamilyData::log_number. -TEST_P(CrashDuringRecoveryWithCorruptionTest, - DISABLED_CrashDuringRecoveryWithFlush) { +TEST_P(CrashDuringRecoveryWithCorruptionTest, CrashDuringRecoveryWithFlush) { CloseDb(); Options options; options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery; @@ -1430,7 +1467,8 @@ TEST_P(CrashDuringRecoveryWithCorruptionTest, // Write to default_cf and flush this cf several times to advance wal // number. for (int i = 0; i < 2; ++i) { - ASSERT_OK(db_->Put(WriteOptions(), "key" + std::to_string(i), "value")); + ASSERT_OK(db_->Put(WriteOptions(), "key" + std::to_string(i), + "value" + std::to_string(i))); ASSERT_OK(db_->Flush(FlushOptions())); } @@ -1483,6 +1521,25 @@ TEST_P(CrashDuringRecoveryWithCorruptionTest, { options.avoid_flush_during_recovery = avoid_flush_during_recovery_; ASSERT_OK(DB::Open(options, dbname_, cf_descs, &handles, &db_)); + + // Verify that data is not lost. + { + std::string v; + ASSERT_OK(db_->Get(ReadOptions(), handles[1], "old_key", &v)); + ASSERT_EQ("dontcare", v); + + for (int i = 0; i < 2; ++i) { + v.clear(); + ASSERT_OK(db_->Get(ReadOptions(), "key" + std::to_string(i), &v)); + ASSERT_EQ("value" + std::to_string(i), v); + } + + // Since it's corrupting last wal after Flush, below key is not found. + v.clear(); + ASSERT_EQ(db_->Get(ReadOptions(), handles[1], "dontcare", &v), + Status::NotFound()); + } + for (auto* h : handles) { delete h; } diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 6ebb3207e27..cf02e6a359f 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1240,6 +1240,39 @@ class DBImpl : public DB { std::atomic shutting_down_; + // RecoveryContext struct stores the context about version edits along + // with corresponding column_family_data and column_family_options. + class RecoveryContext { + public: + ~RecoveryContext() { + for (auto& edit_list : edit_lists_) { + for (auto* edit : edit_list) { + delete edit; + } + } + } + + void UpdateVersionEdits(ColumnFamilyData* cfd, const VersionEdit& edit) { + assert(cfd != nullptr); + if (map_.find(cfd->GetID()) == map_.end()) { + uint32_t size = static_cast(map_.size()); + map_.emplace(cfd->GetID(), size); + cfds_.emplace_back(cfd); + mutable_cf_opts_.emplace_back(cfd->GetLatestMutableCFOptions()); + edit_lists_.emplace_back(autovector()); + } + uint32_t i = map_[cfd->GetID()]; + edit_lists_[i].emplace_back(new VersionEdit(edit)); + } + + std::unordered_map map_; // cf_id to index; + autovector cfds_; + autovector mutable_cf_opts_; + autovector> edit_lists_; + // files_to_delete_ contains sst files + std::unordered_set files_to_delete_; + }; + // Except in DB::Open(), WriteOptionsFile can only be called when: // Persist options to options file. // If need_mutex_lock = false, the method will lock DB mutex. @@ -1356,16 +1389,19 @@ class DBImpl : public DB { // be made to the descriptor are added to *edit. // recovered_seq is set to less than kMaxSequenceNumber if the log's tail is // skipped. + // recovery_ctx stores the context about version edits and all those + // edits are persisted to new Manifest after successfully syncing the new WAL. virtual Status Recover( const std::vector& column_families, bool read_only = false, bool error_if_wal_file_exists = false, bool error_if_data_exists_in_wals = false, - uint64_t* recovered_seq = nullptr); + uint64_t* recovered_seq = nullptr, + RecoveryContext* recovery_ctx = nullptr); virtual bool OwnTablesAndLogs() const { return true; } // Set DB identity file, and write DB ID to manifest if necessary. - Status SetDBId(bool read_only); + Status SetDBId(bool read_only, RecoveryContext* recovery_ctx); // REQUIRES: db mutex held when calling this function, but the db mutex can // be released and re-acquired. Db mutex will be held when the function @@ -1374,12 +1410,15 @@ class DBImpl : public DB { // not referenced in the MANIFEST (e.g. // 1. It's best effort recovery; // 2. The VersionEdits referencing the SST files are appended to - // MANIFEST, DB crashes when syncing the MANIFEST, the VersionEdits are + // RecoveryContext, DB crashes when syncing the MANIFEST, the VersionEdits are // still not synced to MANIFEST during recovery.) - // We delete these SST files. In the + // It stores the SST files to be deleted in RecoveryContext. In the // meantime, we find out the largest file number present in the paths, and // bump up the version set's next_file_number_ to be 1 + largest_file_number. - Status DeleteUnreferencedSstFiles(); + // recovery_ctx stores the context about version edits and files to be + // deleted. All those edits are persisted to new Manifest after successfully + // syncing the new WAL. + Status DeleteUnreferencedSstFiles(RecoveryContext* recovery_ctx); // SetDbSessionId() should be called in the constuctor DBImpl() // to ensure that db_session_id_ gets updated every time the DB is opened @@ -1389,6 +1428,14 @@ class DBImpl : public DB { Status FailIfTsSizesMismatch(const ColumnFamilyHandle* column_family, const Slice& ts) const; + // recovery_ctx stores the context about version edits and + // LogAndApplyForRecovery persist all those edits to new Manifest after + // successfully syncing new WAL. + // LogAndApplyForRecovery should be called only once during recovery and it + // should be called when RocksDB writes to a first new MANIFEST since this + // recovery. + Status LogAndApplyForRecovery(const RecoveryContext& recovery_ctx); + private: friend class DB; friend class ErrorHandler; @@ -1645,7 +1692,8 @@ class DBImpl : public DB { // corrupted_log_found is set to true if we recover from a corrupted log file. Status RecoverLogFiles(const std::vector& log_numbers, SequenceNumber* next_sequence, bool read_only, - bool* corrupted_log_found); + bool* corrupted_log_found, + RecoveryContext* recovery_ctx); // The following two methods are used to flush a memtable to // storage. The first one is used at database RecoveryTime (when the diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index 86a7808b28a..189ab810ea7 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -863,7 +863,7 @@ uint64_t PrecomputeMinLogNumberToKeep2PC( return min_log_number_to_keep; } -Status DBImpl::SetDBId(bool read_only) { +Status DBImpl::SetDBId(bool read_only, RecoveryContext* recovery_ctx) { Status s; // Happens when immutable_db_options_.write_dbid_to_manifest is set to true // the very first time. @@ -890,14 +890,14 @@ Status DBImpl::SetDBId(bool read_only) { } s = GetDbIdentityFromIdentityFile(&db_id_); if (immutable_db_options_.write_dbid_to_manifest && s.ok()) { + assert(!read_only); + assert(recovery_ctx != nullptr); + assert(versions_->GetColumnFamilySet() != nullptr); VersionEdit edit; edit.SetDBId(db_id_); - Options options; - MutableCFOptions mutable_cf_options(options); versions_->db_id_ = db_id_; - s = versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(), - mutable_cf_options, &edit, &mutex_, nullptr, - /* new_descriptor_log */ false); + recovery_ctx->UpdateVersionEdits( + versions_->GetColumnFamilySet()->GetDefault(), edit); } } else if (!read_only) { s = SetIdentityFile(env_, dbname_, db_id_); @@ -905,7 +905,7 @@ Status DBImpl::SetDBId(bool read_only) { return s; } -Status DBImpl::DeleteUnreferencedSstFiles() { +Status DBImpl::DeleteUnreferencedSstFiles(RecoveryContext* recovery_ctx) { mutex_.AssertHeld(); std::vector paths; paths.push_back(NormalizePath(dbname_ + std::string(1, kFilePathSeparator))); @@ -925,7 +925,6 @@ Status DBImpl::DeleteUnreferencedSstFiles() { uint64_t next_file_number = versions_->current_next_file_number(); uint64_t largest_file_number = next_file_number; - std::set files_to_delete; Status s; for (const auto& path : paths) { std::vector files; @@ -943,8 +942,9 @@ Status DBImpl::DeleteUnreferencedSstFiles() { const std::string normalized_fpath = path + fname; largest_file_number = std::max(largest_file_number, number); if (type == kTableFile && number >= next_file_number && - files_to_delete.find(normalized_fpath) == files_to_delete.end()) { - files_to_delete.insert(normalized_fpath); + recovery_ctx->files_to_delete_.find(normalized_fpath) == + recovery_ctx->files_to_delete_.end()) { + recovery_ctx->files_to_delete_.emplace(normalized_fpath); } } } @@ -961,21 +961,7 @@ Status DBImpl::DeleteUnreferencedSstFiles() { assert(versions_->GetColumnFamilySet()); ColumnFamilyData* default_cfd = versions_->GetColumnFamilySet()->GetDefault(); assert(default_cfd); - s = versions_->LogAndApply( - default_cfd, *default_cfd->GetLatestMutableCFOptions(), &edit, &mutex_, - directories_.GetDbDir(), /*new_descriptor_log*/ false); - if (!s.ok()) { - return s; - } - - mutex_.Unlock(); - for (const auto& fname : files_to_delete) { - s = env_->DeleteFile(fname); - if (!s.ok()) { - break; - } - } - mutex_.Lock(); + recovery_ctx->UpdateVersionEdits(default_cfd, edit); return s; } diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 808459b77f1..4f0055c2d37 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -399,7 +399,7 @@ IOStatus Directories::SetDirectories(FileSystem* fs, const std::string& dbname, Status DBImpl::Recover( const std::vector& column_families, bool read_only, bool error_if_wal_file_exists, bool error_if_data_exists_in_wals, - uint64_t* recovered_seq) { + uint64_t* recovered_seq, RecoveryContext* recovery_ctx) { mutex_.AssertHeld(); bool is_new_db = false; @@ -524,9 +524,9 @@ Status DBImpl::Recover( return s; } } - s = SetDBId(read_only); + s = SetDBId(read_only, recovery_ctx); if (s.ok() && !read_only) { - s = DeleteUnreferencedSstFiles(); + s = DeleteUnreferencedSstFiles(recovery_ctx); } if (immutable_db_options_.paranoid_checks && s.ok()) { @@ -541,10 +541,6 @@ Status DBImpl::Recover( } } } - // DB mutex is already held - if (s.ok() && immutable_db_options_.persist_stats_to_disk) { - s = InitPersistStatsColumnFamily(); - } std::vector files_in_wal_dir; if (s.ok()) { @@ -614,7 +610,10 @@ Status DBImpl::Recover( WalNumber max_wal_number = versions_->GetWalSet().GetWals().rbegin()->first; edit.DeleteWalsBefore(max_wal_number + 1); - s = versions_->LogAndApplyToDefaultColumnFamily(&edit, &mutex_); + assert(recovery_ctx != nullptr); + assert(versions_->GetColumnFamilySet() != nullptr); + recovery_ctx->UpdateVersionEdits( + versions_->GetColumnFamilySet()->GetDefault(), edit); } if (!s.ok()) { return s; @@ -650,8 +649,8 @@ Status DBImpl::Recover( std::sort(wals.begin(), wals.end()); bool corrupted_wal_found = false; - s = RecoverLogFiles(wals, &next_sequence, read_only, - &corrupted_wal_found); + s = RecoverLogFiles(wals, &next_sequence, read_only, &corrupted_wal_found, + recovery_ctx); if (corrupted_wal_found && recovered_seq != nullptr) { *recovered_seq = next_sequence; } @@ -830,10 +829,30 @@ Status DBImpl::InitPersistStatsColumnFamily() { return s; } +Status DBImpl::LogAndApplyForRecovery(const RecoveryContext& recovery_ctx) { + mutex_.AssertHeld(); + assert(versions_->descriptor_log_ == nullptr); + Status s = versions_->LogAndApply( + recovery_ctx.cfds_, recovery_ctx.mutable_cf_opts_, + recovery_ctx.edit_lists_, &mutex_, directories_.GetDbDir()); + if (s.ok() && !(recovery_ctx.files_to_delete_.empty())) { + mutex_.Unlock(); + for (const auto& fname : recovery_ctx.files_to_delete_) { + s = env_->DeleteFile(fname); + if (!s.ok()) { + break; + } + } + mutex_.Lock(); + } + return s; +} + // REQUIRES: wal_numbers are sorted in ascending order Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, SequenceNumber* next_sequence, bool read_only, - bool* corrupted_wal_found) { + bool* corrupted_wal_found, + RecoveryContext* recovery_ctx) { struct LogReporter : public log::Reader::Reporter { Env* env; Logger* info_log; @@ -1291,44 +1310,36 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, // VersionSet::next_file_number_ always to be strictly greater than any // log number versions_->MarkFileNumberUsed(max_wal_number + 1); + assert(recovery_ctx != nullptr); - autovector cfds; - autovector cf_opts; - autovector> edit_lists; for (auto* cfd : *versions_->GetColumnFamilySet()) { - cfds.push_back(cfd); - cf_opts.push_back(cfd->GetLatestMutableCFOptions()); auto iter = version_edits.find(cfd->GetID()); assert(iter != version_edits.end()); - edit_lists.push_back({&iter->second}); + recovery_ctx->UpdateVersionEdits(cfd, iter->second); } - std::unique_ptr wal_deletion; if (flushed) { - wal_deletion = std::make_unique(); + VersionEdit wal_deletion; if (immutable_db_options_.track_and_verify_wals_in_manifest) { - wal_deletion->DeleteWalsBefore(max_wal_number + 1); + wal_deletion.DeleteWalsBefore(max_wal_number + 1); } if (!allow_2pc()) { // In non-2pc mode, flushing the memtables of the column families // means we can advance min_log_number_to_keep. - wal_deletion->SetMinLogNumberToKeep(max_wal_number + 1); + wal_deletion.SetMinLogNumberToKeep(max_wal_number + 1); } - edit_lists.back().push_back(wal_deletion.get()); + assert(versions_->GetColumnFamilySet() != nullptr); + recovery_ctx->UpdateVersionEdits( + versions_->GetColumnFamilySet()->GetDefault(), wal_deletion); } - - // write MANIFEST with update - status = versions_->LogAndApply(cfds, cf_opts, edit_lists, &mutex_, - directories_.GetDbDir(), - /*new_descriptor_log=*/true); } } if (status.ok()) { if (data_seen && !flushed) { status = RestoreAliveLogFiles(wal_numbers); - } else { - // If there's no data in the WAL, or we flushed all the data, still + } else if (!wal_numbers.empty()) { // If there's no data in the WAL, or we + // flushed all the data, still // truncate the log file. If the process goes into a crash loop before // the file is deleted, the preallocated space will never get freed. const bool truncate = !read_only; @@ -1724,6 +1735,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, } *dbptr = nullptr; + assert(handles); handles->clear(); size_t max_write_buffer_size = 0; @@ -1766,11 +1778,13 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, } impl->wal_in_db_path_ = impl->immutable_db_options_.IsWalDirSameAsDBPath(); - + RecoveryContext recovery_ctx; impl->mutex_.Lock(); + // Handles create_if_missing, error_if_exists uint64_t recovered_seq(kMaxSequenceNumber); - s = impl->Recover(column_families, false, false, false, &recovered_seq); + s = impl->Recover(column_families, false, false, false, &recovered_seq, + &recovery_ctx); if (s.ok()) { uint64_t new_log_number = impl->versions_->NewFileNumber(); log::Writer* new_log = nullptr; @@ -1787,40 +1801,6 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, } if (s.ok()) { - // set column family handles - for (auto cf : column_families) { - auto cfd = - impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name); - if (cfd != nullptr) { - handles->push_back( - new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_)); - impl->NewThreadStatusCfInfo(cfd); - } else { - if (db_options.create_missing_column_families) { - // missing column family, create it - ColumnFamilyHandle* handle; - impl->mutex_.Unlock(); - s = impl->CreateColumnFamily(cf.options, cf.name, &handle); - impl->mutex_.Lock(); - if (s.ok()) { - handles->push_back(handle); - } else { - break; - } - } else { - s = Status::InvalidArgument("Column family not found", cf.name); - break; - } - } - } - } - if (s.ok()) { - SuperVersionContext sv_context(/* create_superversion */ true); - for (auto cfd : *impl->versions_->GetColumnFamilySet()) { - impl->InstallSuperVersionAndScheduleWork( - cfd, &sv_context, *cfd->GetLatestMutableCFOptions()); - } - sv_context.Clean(); if (impl->two_write_queues_) { impl->log_write_mutex_.Lock(); } @@ -1860,6 +1840,53 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, } } } + if (s.ok()) { + s = impl->LogAndApplyForRecovery(recovery_ctx); + } + + if (s.ok() && impl->immutable_db_options_.persist_stats_to_disk) { + impl->mutex_.AssertHeld(); + s = impl->InitPersistStatsColumnFamily(); + } + + if (s.ok()) { + // set column family handles + for (auto cf : column_families) { + auto cfd = + impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name); + if (cfd != nullptr) { + handles->push_back( + new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_)); + impl->NewThreadStatusCfInfo(cfd); + } else { + if (db_options.create_missing_column_families) { + // missing column family, create it + ColumnFamilyHandle* handle = nullptr; + impl->mutex_.Unlock(); + s = impl->CreateColumnFamily(cf.options, cf.name, &handle); + impl->mutex_.Lock(); + if (s.ok()) { + handles->push_back(handle); + } else { + break; + } + } else { + s = Status::InvalidArgument("Column family not found", cf.name); + break; + } + } + } + } + + if (s.ok()) { + SuperVersionContext sv_context(/* create_superversion */ true); + for (auto cfd : *impl->versions_->GetColumnFamilySet()) { + impl->InstallSuperVersionAndScheduleWork( + cfd, &sv_context, *cfd->GetLatestMutableCFOptions()); + } + sv_context.Clean(); + } + if (s.ok() && impl->immutable_db_options_.persist_stats_to_disk) { // try to read format version s = impl->PersistentStatsProcessFormatVersion(); diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index b31c508a06e..d008292f9a5 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -33,7 +33,8 @@ DBImplSecondary::~DBImplSecondary() {} Status DBImplSecondary::Recover( const std::vector& column_families, bool /*readonly*/, bool /*error_if_wal_file_exists*/, - bool /*error_if_data_exists_in_wals*/, uint64_t*) { + bool /*error_if_data_exists_in_wals*/, uint64_t*, + RecoveryContext* /*recovery_ctx*/) { mutex_.AssertHeld(); JobContext job_context(0); diff --git a/db/db_impl/db_impl_secondary.h b/db/db_impl/db_impl_secondary.h index d3a7940b5ba..fcc86cc879c 100644 --- a/db/db_impl/db_impl_secondary.h +++ b/db/db_impl/db_impl_secondary.h @@ -81,8 +81,8 @@ class DBImplSecondary : public DBImpl { // and log_readers_ to facilitate future operations. Status Recover(const std::vector& column_families, bool read_only, bool error_if_wal_file_exists, - bool error_if_data_exists_in_wals, - uint64_t* = nullptr) override; + bool error_if_data_exists_in_wals, uint64_t* = nullptr, + RecoveryContext* recovery_ctx = nullptr) override; // Implementations of the DB interface using DB::Get; diff --git a/java/src/test/java/org/rocksdb/RocksDBTest.java b/java/src/test/java/org/rocksdb/RocksDBTest.java index 4193bcb4456..422bed40c6d 100644 --- a/java/src/test/java/org/rocksdb/RocksDBTest.java +++ b/java/src/test/java/org/rocksdb/RocksDBTest.java @@ -1428,7 +1428,7 @@ public void getLiveFiles() throws RocksDBException { assertThat(livefiles.manifestFileSize).isEqualTo(59); assertThat(livefiles.files.size()).isEqualTo(3); assertThat(livefiles.files.get(0)).isEqualTo("/CURRENT"); - assertThat(livefiles.files.get(1)).isEqualTo("/MANIFEST-000004"); + assertThat(livefiles.files.get(1)).isEqualTo("/MANIFEST-000005"); assertThat(livefiles.files.get(2)).isEqualTo("/OPTIONS-000007"); } } diff --git a/monitoring/stats_history_test.cc b/monitoring/stats_history_test.cc index 59e7be3d96f..1fe5503cbe7 100644 --- a/monitoring/stats_history_test.cc +++ b/monitoring/stats_history_test.cc @@ -604,10 +604,14 @@ TEST_F(StatsHistoryTest, ForceManualFlushStatsCF) { dbfull()->TEST_WaitForStatsDumpRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); // writing to all three cf, flush default cf - // LogNumbers: default: 14, stats: 4, pikachu: 4 + // LogNumbers: default: 16, stats: 10, pikachu: 5 + // Since in recovery process, cfd_stats column is created after WAL is + // created, synced and MANIFEST is persisted, its log number which depends on + // logfile_number_ will be different. Since "pikachu" is never flushed, thus + // its log_number should be the smallest of the three. ASSERT_OK(Flush()); - ASSERT_EQ(cfd_stats->GetLogNumber(), cfd_test->GetLogNumber()); - ASSERT_LT(cfd_stats->GetLogNumber(), cfd_default->GetLogNumber()); + ASSERT_LT(cfd_test->GetLogNumber(), cfd_stats->GetLogNumber()); + ASSERT_LT(cfd_test->GetLogNumber(), cfd_default->GetLogNumber()); ASSERT_OK(Put("foo1", "v1")); ASSERT_OK(Put("bar1", "v1"));