From 4c4df206a8dd2e059d5ae9b21a67fbda2137bf16 Mon Sep 17 00:00:00 2001 From: qhu Date: Wed, 14 Aug 2024 00:42:38 +0000 Subject: [PATCH] [BACKPORT 2.18][#23047] docdb: Fix cotable ids in flushed frontier at restore Summary: Original commit: 550458d8f874d9363a1ccd6ee326dd9228b75e36 / D36041 When restoring a snopshot of a colocated tablet to a new database/table group, all tables are re-created in the new database so that the cotable ids are different from those in the snapshot. At restore, the cotables in flushed frontiers should be updated to the ids of newly created tables, otherwise we will probably hit the following issue after restore: ``` 1. we have 3 sst files after restore 1.sst (smallest:old_id=0, largest:old_id=0) 2.sst (smallest:old_id=0, largest:old_id=0) 3.sst (smallest:old_id=0, largest:old_id=0) 2. compact 1.sst and 2.sst and generate 4.sst 3.sst (smallest:old_id=0, largest:old_id=0) 4.sst (smallest:new_id=1, largest:new_id=1) After compaction, schema packing with version 0 for new_id can be dropped because from frontier we can only find new_id=1 3. When compact 3.sst and 4.sst there are still rows with version 0 for old_id but schema version 0 has been GCed in step 2 ``` Jira: DB-11979 Test Plan: PackedRows/YBBackupTestWithPackedRowsAndColocation.*/1 CrossColocationTests/YBBackupCrossColocation.TestYSQLRestoreWithInvalidIndex/1 TableRewriteTests/YBBackupTestWithTableRewrite.TestYSQLBackupAndRestoreAfterRewrite/1 TableRewriteTests/YBBackupTestWithTableRewrite.TestYSQLBackupAndRestoreAfterFailedRewrite/1 Reviewers: sergei, zdrudi, mhaddad Reviewed By: zdrudi Subscribers: qhu, ybase Tags: #jenkins-ready Differential Revision: https://phorge.dev.yugabyte.com/D36932 --- src/yb/docdb/consensus_frontier.cc | 14 +++++++ src/yb/docdb/consensus_frontier.h | 4 ++ src/yb/docdb/docdb_rocksdb_util.cc | 16 ++++++-- src/yb/docdb/docdb_rocksdb_util.h | 7 +++- src/yb/tablet/tablet_metadata.cc | 21 ++++++++++- src/yb/tablet/tablet_metadata.h | 2 + src/yb/tablet/tablet_snapshots.cc | 60 ++++++++++++++++++++++++------ src/yb/tablet/tablet_snapshots.h | 5 ++- src/yb/tools/data-patcher.cc | 2 +- 9 files changed, 112 insertions(+), 19 deletions(-) diff --git a/src/yb/docdb/consensus_frontier.cc b/src/yb/docdb/consensus_frontier.cc index 75985e29125c..e0e701095a54 100644 --- a/src/yb/docdb/consensus_frontier.cc +++ b/src/yb/docdb/consensus_frontier.cc @@ -343,6 +343,20 @@ void ConsensusFrontier::ResetSchemaVersion() { cotable_schema_versions_.clear(); } +bool ConsensusFrontier::UpdateCoTableId(const Uuid& cotable_id, const Uuid& new_cotable_id) { + if (cotable_id == new_cotable_id) { + return false; + } + auto it = cotable_schema_versions_.find(cotable_id); + if (it == cotable_schema_versions_.end()) { + return false; + } + auto schema_version = it->second; + cotable_schema_versions_.erase(it); + cotable_schema_versions_[new_cotable_id] = schema_version; + return true; +} + void ConsensusFrontier::MakeExternalSchemaVersionsAtMost( std::unordered_map* min_schema_versions) const { if (primary_schema_version_) { diff --git a/src/yb/docdb/consensus_frontier.h b/src/yb/docdb/consensus_frontier.h index 6a4747f0026c..26a742c441f4 100644 --- a/src/yb/docdb/consensus_frontier.h +++ b/src/yb/docdb/consensus_frontier.h @@ -120,6 +120,10 @@ class ConsensusFrontier : public rocksdb::UserFrontier { void AddSchemaVersion(const Uuid& table_id, SchemaVersion version); void ResetSchemaVersion(); + // Update cotable_id to new_cotable_id in current frontier's cotable_schema_versions_ map. + // Return true if the map is modified, otherwise, return false. + bool UpdateCoTableId(const Uuid& cotable_id, const Uuid& new_cotable_id); + // Merge current frontier with provided map, preferring min values. void MakeExternalSchemaVersionsAtMost( std::unordered_map* min_schema_versions) const; diff --git a/src/yb/docdb/docdb_rocksdb_util.cc b/src/yb/docdb/docdb_rocksdb_util.cc index 7ea82c8f1a0a..c221b2b53490 100644 --- a/src/yb/docdb/docdb_rocksdb_util.cc +++ b/src/yb/docdb/docdb_rocksdb_util.cc @@ -1011,7 +1011,8 @@ class RocksDBPatcher::Impl { return helper.Apply(options_, imm_cf_options_); } - Status ModifyFlushedFrontier(const ConsensusFrontier& frontier) { + Status ModifyFlushedFrontier( + const ConsensusFrontier& frontier, const CotableIdsMap& cotable_ids_map) { RocksDBPatcherHelper helper(&version_set_); docdb::ConsensusFrontier final_frontier = frontier; @@ -1031,7 +1032,8 @@ class RocksDBPatcher::Impl { helper.Edit().ModifyFlushedFrontier( final_frontier.Clone(), rocksdb::FrontierModificationMode::kForce); - helper.IterateFiles([&helper, &frontier](int level, rocksdb::FileMetaData fmd) { + helper.IterateFiles([&helper, &frontier, &cotable_ids_map]( + int level, rocksdb::FileMetaData fmd) { bool modified = false; for (auto* user_frontier : {&fmd.smallest.user_frontier, &fmd.largest.user_frontier}) { if (!*user_frontier) { @@ -1046,6 +1048,11 @@ class RocksDBPatcher::Impl { consensus_frontier.set_history_cutoff_information(frontier.history_cutoff()); modified = true; } + for (const auto& [table_id, new_table_id] : cotable_ids_map) { + if (consensus_frontier.UpdateCoTableId(table_id, new_table_id)) { + modified = true; + } + } } if (modified) { helper.ModifyFile(level, fmd); @@ -1122,8 +1129,9 @@ Status RocksDBPatcher::SetHybridTimeFilter(std::optional db_oid, Hybri return impl_->SetHybridTimeFilter(db_oid, value); } -Status RocksDBPatcher::ModifyFlushedFrontier(const ConsensusFrontier& frontier) { - return impl_->ModifyFlushedFrontier(frontier); +Status RocksDBPatcher::ModifyFlushedFrontier( + const ConsensusFrontier& frontier, const CotableIdsMap& cotable_ids_map) { + return impl_->ModifyFlushedFrontier(frontier, cotable_ids_map); } Status RocksDBPatcher::UpdateFileSizes() { diff --git a/src/yb/docdb/docdb_rocksdb_util.h b/src/yb/docdb/docdb_rocksdb_util.h index 4b9b331b7673..6a7b1c8573fa 100644 --- a/src/yb/docdb/docdb_rocksdb_util.h +++ b/src/yb/docdb/docdb_rocksdb_util.h @@ -31,6 +31,10 @@ namespace yb { namespace docdb { +// Map from old cotable id to new cotable id. +// Used to restore snapshot to a new database/tablegroup and update cotable ids in the frontiers. +using CotableIdsMap = std::unordered_map; + const int kDefaultGroupNo = 0; class IntentAwareIterator; @@ -167,7 +171,8 @@ class RocksDBPatcher { Status SetHybridTimeFilter(std::optional db_oid, HybridTime value); // Modify flushed frontier and clean up smallest/largest op id in per-SST file metadata. - Status ModifyFlushedFrontier(const ConsensusFrontier& frontier); + Status ModifyFlushedFrontier( + const ConsensusFrontier& frontier, const CotableIdsMap& cotable_ids_map); // Update file sizes in manifest if actual file size was changed because of direct manipulation // with .sst files. diff --git a/src/yb/tablet/tablet_metadata.cc b/src/yb/tablet/tablet_metadata.cc index eca2e1670f7d..747d5abf2805 100644 --- a/src/yb/tablet/tablet_metadata.cc +++ b/src/yb/tablet/tablet_metadata.cc @@ -605,6 +605,13 @@ Result RaftGroupMetadata::Load( return ret; } +Result RaftGroupMetadata::LoadFromPath( + FsManager* fs_manager, const std::string& path) { + RaftGroupMetadataPtr ret(new RaftGroupMetadata(fs_manager, "")); + RETURN_NOT_OK(ret->LoadFromDisk(path)); + return ret; +} + Result RaftGroupMetadata::TEST_LoadOrCreate( const RaftGroupMetadataData& data) { if (data.fs_manager->LookupTablet(data.raft_group_id)) { @@ -684,6 +691,18 @@ Result RaftGroupMetadata::GetTableInfoUnlocked( return iter->second; } +std::vector RaftGroupMetadata::GetColocatedTableInfos() const { + std::vector table_infos; + { + std::lock_guard lock(data_mutex_); + for (const auto& [_, table_info] : kv_store_.colocation_to_table) { + DCHECK(table_info->schema().has_colocation_id()); + table_infos.push_back(table_info); + } + } + return table_infos; +} + Status RaftGroupMetadata::DeleteTabletData(TabletDataState delete_type, const OpId& last_logged_opid) { CHECK(delete_type == TABLET_DATA_DELETED || @@ -859,7 +878,7 @@ Status RaftGroupMetadata::LoadFromSuperBlock(const RaftGroupReplicaSuperBlockPB& std::lock_guard lock(data_mutex_); // Verify that the Raft group id matches with the one in the protobuf. - if (superblock.raft_group_id() != raft_group_id_) { + if (!raft_group_id_.empty() && superblock.raft_group_id() != raft_group_id_) { return STATUS(Corruption, "Expected id=" + raft_group_id_ + " found " + superblock.raft_group_id(), superblock.DebugString()); diff --git a/src/yb/tablet/tablet_metadata.h b/src/yb/tablet/tablet_metadata.h index da8c20ac80a6..0a392d2df7e6 100644 --- a/src/yb/tablet/tablet_metadata.h +++ b/src/yb/tablet/tablet_metadata.h @@ -289,6 +289,8 @@ class RaftGroupMetadata : public RefCountedThreadSafe, const TableId& table_id, const ColocationId& colocation_id = kColocationIdNotSet) const REQUIRES(data_mutex_); + std::vector GetColocatedTableInfos() const; + const RaftGroupId& raft_group_id() const { DCHECK_NE(state_, kNotLoadedYet); return raft_group_id_; diff --git a/src/yb/tablet/tablet_snapshots.cc b/src/yb/tablet/tablet_snapshots.cc index f12d7bbf7735..fed1f2d8fc43 100644 --- a/src/yb/tablet/tablet_snapshots.cc +++ b/src/yb/tablet/tablet_snapshots.cc @@ -218,6 +218,10 @@ Env& TabletSnapshots::env() { return *metadata().fs_manager()->env(); } +FsManager* TabletSnapshots::fs_manager() { + return metadata().fs_manager(); +} + Status TabletSnapshots::CleanupSnapshotDir(const std::string& dir) { auto& env = this->env(); if (!env.FileExists(dir)) { @@ -347,21 +351,54 @@ Result TabletSnapshots::GenerateRestoreWriteBatch( } } +// Get the map of snapshot cotable ids to the current cotable ids. +// The restored flushed frontiers can have cotable ids that are different from current cotable ids. +// This map is used to update the cotable ids in the restored flushed frontiers. +Result TabletSnapshots::GetCotableIdsMap(const std::string& snapshot_dir) { + docdb::CotableIdsMap cotable_ids_map; + if (snapshot_dir.empty() || !metadata().colocated()) { + return cotable_ids_map; + } + auto snapshot_metadata_file = TabletMetadataFile(snapshot_dir); + if (!env().FileExists(snapshot_metadata_file)) { + return cotable_ids_map; + } + auto snapshot_metadata = + VERIFY_RESULT(RaftGroupMetadata::LoadFromPath(fs_manager(), snapshot_metadata_file)); + for (const auto& snapshot_table_info : snapshot_metadata->GetColocatedTableInfos()) { + auto current_table_info = metadata().GetTableInfo( + std::string(), snapshot_table_info->schema().colocation_id()); + if (!current_table_info.ok()) { + if (!current_table_info.status().IsNotFound()) { + return current_table_info.status(); + } + LOG_WITH_PREFIX(WARNING) << "Table " << snapshot_table_info->table_id + << " not found: " << current_table_info.status(); + } else if ((*current_table_info)->cotable_id != snapshot_table_info->cotable_id) { + cotable_ids_map[snapshot_table_info->cotable_id] = (*current_table_info)->cotable_id; + } + } + if (!cotable_ids_map.empty()) { + LOG_WITH_PREFIX(INFO) << "Cotable ids map: " << yb::ToString(cotable_ids_map); + } + return cotable_ids_map; +} + Status TabletSnapshots::RestoreCheckpoint( - const std::string& dir, HybridTime restore_at, const RestoreMetadata& restore_metadata, + const std::string& snapshot_dir, HybridTime restore_at, const RestoreMetadata& restore_metadata, const docdb::ConsensusFrontier& frontier, bool is_pitr_restore, const OpId& op_id) { LongOperationTracker long_operation_tracker("Restore checkpoint", 5s); // The following two lines can't just be changed to RETURN_NOT_OK(PauseReadWriteOperations()): // op_pause has to stay in scope until the end of the function. - auto op_pauses = StartShutdownRocksDBs(DisableFlushOnShutdown(!dir.empty())); + auto op_pauses = StartShutdownRocksDBs(DisableFlushOnShutdown(!snapshot_dir.empty())); std::lock_guard lock(create_checkpoint_lock()); const string db_dir = regular_db().GetName(); const std::string intents_db_dir = has_intents_db() ? intents_db().GetName() : std::string(); - if (dir.empty()) { + if (snapshot_dir.empty()) { // Just change rocksdb hybrid time limit, because it should be in retention interval. // TODO(pitr) apply transactions and reset intents. CompleteShutdownRocksDBs(op_pauses); @@ -371,7 +408,7 @@ Status TabletSnapshots::RestoreCheckpoint( RETURN_NOT_OK(DeleteRocksDBs(CompleteShutdownRocksDBs(op_pauses))); auto s = CopyDirectory( - &rocksdb_env(), dir, db_dir, UseHardLinks::kTrue, CreateIfMissing::kTrue); + &rocksdb_env(), snapshot_dir, db_dir, UseHardLinks::kTrue, CreateIfMissing::kTrue); if (PREDICT_FALSE(!s.ok())) { LOG_WITH_PREFIX(WARNING) << "Copy checkpoint files status: " << s; return STATUS(IllegalState, "Unable to copy checkpoint files", s.ToString()); @@ -388,7 +425,8 @@ Status TabletSnapshots::RestoreCheckpoint( docdb::RocksDBPatcher patcher(db_dir, rocksdb_options); RETURN_NOT_OK(patcher.Load()); - RETURN_NOT_OK(patcher.ModifyFlushedFrontier(frontier)); + RETURN_NOT_OK(patcher.ModifyFlushedFrontier( + frontier, VERIFY_RESULT(GetCotableIdsMap(snapshot_dir)))); if (restore_at) { RETURN_NOT_OK(patcher.SetHybridTimeFilter(std::nullopt, restore_at)); } @@ -420,14 +458,14 @@ Status TabletSnapshots::RestoreCheckpoint( need_flush = true; } - if (!dir.empty()) { - auto tablet_metadata_file = TabletMetadataFile(dir); + if (!snapshot_dir.empty()) { + auto snapshot_metadata_file = TabletMetadataFile(snapshot_dir); // Old snapshots could lack tablet metadata, so just do nothing in this case. - if (env().FileExists(tablet_metadata_file)) { - LOG_WITH_PREFIX(INFO) << "Merging metadata with restored: " << tablet_metadata_file + if (env().FileExists(snapshot_metadata_file)) { + LOG_WITH_PREFIX(INFO) << "Merging metadata with restored: " << snapshot_metadata_file << " , force overwrite of schema packing " << !is_pitr_restore; RETURN_NOT_OK(tablet().metadata()->MergeWithRestored( - tablet_metadata_file, + snapshot_metadata_file, is_pitr_restore ? docdb::OverwriteSchemaPacking::kFalse : docdb::OverwriteSchemaPacking::kTrue)); need_flush = true; @@ -447,7 +485,7 @@ Status TabletSnapshots::RestoreCheckpoint( return s; } - LOG_WITH_PREFIX(INFO) << "Checkpoint restored from " << dir; + LOG_WITH_PREFIX(INFO) << "Checkpoint restored from " << snapshot_dir; LOG_WITH_PREFIX(INFO) << "Re-enabling compactions"; s = tablet().EnableCompactions(&op_pauses.non_abortable); if (!s.ok()) { diff --git a/src/yb/tablet/tablet_snapshots.h b/src/yb/tablet/tablet_snapshots.h index 7b1d82bef590..a38be151d1c0 100644 --- a/src/yb/tablet/tablet_snapshots.h +++ b/src/yb/tablet/tablet_snapshots.h @@ -101,7 +101,7 @@ class TabletSnapshots : public TabletComponent { // Restore the RocksDB checkpoint from the provided directory. // Only used when table_type_ == YQL_TABLE_TYPE. Status RestoreCheckpoint( - const std::string& dir, HybridTime restore_at, const RestoreMetadata& metadata, + const std::string& snapshot_dir, HybridTime restore_at, const RestoreMetadata& metadata, const docdb::ConsensusFrontier& frontier, bool is_pitr_restore, const OpId& op_id); // Applies specified snapshot operation. @@ -109,12 +109,15 @@ class TabletSnapshots : public TabletComponent { Status CleanupSnapshotDir(const std::string& dir); Env& env(); + FsManager* fs_manager(); Status RestorePartialRows(SnapshotOperation* operation); Result GenerateRestoreWriteBatch( const tserver::TabletSnapshotOpRequestPB& request, docdb::DocWriteBatch* write_batch); + Result GetCotableIdsMap(const std::string& snapshot_dir); + std::string TEST_last_rocksdb_checkpoint_dir_; }; diff --git a/src/yb/tools/data-patcher.cc b/src/yb/tools/data-patcher.cc index 0b2cea7ec2dd..20ee643ba43c 100644 --- a/src/yb/tools/data-patcher.cc +++ b/src/yb/tools/data-patcher.cc @@ -928,7 +928,7 @@ class ApplyPatch { frontier.set_history_cutoff_information( { HybridTime::FromMicros(kYugaByteMicrosecondEpoch), HybridTime::FromMicros(kYugaByteMicrosecondEpoch) }); - RETURN_NOT_OK(patcher.ModifyFlushedFrontier(frontier)); + RETURN_NOT_OK(patcher.ModifyFlushedFrontier(frontier, docdb::CotableIdsMap())); } else { LOG(INFO) << "We did not see RocksDB CURRENT or MANIFEST-... files in " << dir << ", skipping applying " << patched_path;