From d6bbf59db8b502f790fc160ca9eef72cf5791b7b Mon Sep 17 00:00:00 2001 From: Eric Sheng Date: Mon, 16 Sep 2024 16:03:11 -0700 Subject: [PATCH] [#23890] docdb: Add filtering for bootstrap intent iterators based on min_replay_txn_start_ht Summary: This diff adds filtering for intent SST files during the transaction loading step of tablet bootstrap, using the newly introduced `min_replay_txn_start_ht`. When CDC is enabled, we persist intent SST files longer than they are otherwise needed, until CDC has streamed all transactions in the SST file and moved the retention barrier far enough ahead. This can lead to a large buildup of intent SST files which are not actually needed at bootstrap time. 8b23a4eef1a835bd144ab134f31b5e75189801fd / D35639 added changes to save `min_running_ht` periodically, and add intent SST file filtering during bootstrap time based on periodically saved values of `min_running_ht`. This can lead to data loss, if there is a transaction T such that the following is true: - T has been applied (APPLIED written to WALs) - T has intents that have been flushed to disk (this is rare but possible when CDC is disabled since in the ideal non-CDC case we never flush intents) - Changes made by T on regulardb have not been flushed - The metadata record for T is on an intent SST file whose max HT is less than min_running_ht after T apply (i.e. intentsdb flush happened between T writes and apply) - Tablet bootstrap state has been saved after T has committed These conditions will result in a `min_running_ht > T.start_time` being written to disk, and loaded during tablet bootstrap. Since regulardb changes have not been flushed, WAL replay will start from a point that includes T. However, transaction loader will not load T, because its metadata record has been excluded due to the SST file filter. This results in changes made by T being dropped, even though it has successfully committed. This change introduces a new `min_replay_txn_start_ht` and changes the intent SST file filter to be based off of periodically saved values of this new `min_replay_txn_start_ht`. `min_replay_txn_start_ht` is defined as the minimum of: - `min_running_ht` - `start_ht` of any transaction which may be read during WAL replay WAL replay begins at `bootstrap_start_op_id = min(intentsdb flushed_op_id, rocksdb flushed_op_id, retryable requests last_flushed_op_id)`. We calculate `min_replay_txn_start_ht` by maintaining a set of `(applied_op_id, start_ht)` for recently applied transactions. Transactions are added into this set when they are applied and cleaned from memory (removed from `transactions_`) and are removed when `bootstrap_start_op_id` is increased past `applied_op_id`. `min_replay_txn_start_ht` is then the minimum of `start_ht` of this set and `min_running_ht`. Since `replay_start_op_id` is only updated after flushes to disk, this ensures that any transaction whose metadata record is filtered out by the intent SST file filter will not be incorrectly loaded during WAL replay, since such a transaction would have `apply_op_id < replay_start_op_id` (the `replay_start_op_id` calculated at bootstrap time), so none of its records are read by WAL replay. **Upgrade/Rollback safety:** The `min_running_ht` field in `TabletBootstrapStatePB` was retired and a new `min_replay_txn_start_ht` field was added. There are no autoflags added because `min_replay_txn_start_ht` is only used for an optimization (intent SST file filtering) so the lack of its presence post-upgrade does not change correctness, and its presence post-rollback is simply ignored. `min_running_ht` was only used for a incorrect implementation of the optimization which was off by default, so the lack of its presence post-rollback does not harm correctness (and actually improves it if optimization was turned on) and its presence after upgrade is ignored. A different field was used for this change to ensure that values of `min_running_ht` set before upgrade are not used, since it is unsafe to use it. Jira: DB-12794 Test Plan: Added test case to reproduce the data loss scenario when filter was using `min_running_ht`: ``` ./yb_build.sh --cxx_test pgwrapper_pg_mini-test --gtest_filter PgMiniTestSingleNode.TestBootstrapOnAppliedTransactionWithIntents ``` Also confirmed that CDC stress tests stop failing after these changes. Reviewers: sergei, qhu Reviewed By: sergei Subscribers: rthallam, ybase, yql Differential Revision: https://phorge.dev.yugabyte.com/D37792 --- src/yb/common/opid.h | 15 ++ src/yb/consensus/consensus.proto | 5 +- src/yb/tablet/tablet.cc | 47 +++- src/yb/tablet/tablet.h | 1 + src/yb/tablet/tablet_bootstrap.cc | 14 +- .../tablet/tablet_bootstrap_state_flusher.cc | 2 +- .../tablet/tablet_bootstrap_state_flusher.h | 3 + .../tablet/tablet_bootstrap_state_manager.cc | 49 +++- .../tablet/tablet_bootstrap_state_manager.h | 11 +- src/yb/tablet/tablet_peer.cc | 19 +- src/yb/tablet/tablet_peer.h | 4 +- src/yb/tablet/transaction_loader.cc | 6 +- src/yb/tablet/transaction_loader.h | 2 +- src/yb/tablet/transaction_participant.cc | 220 ++++++++++++++++-- src/yb/tablet/transaction_participant.h | 13 +- .../tablet/transaction_participant_context.h | 2 + src/yb/util/algorithm_util.h | 12 + src/yb/yql/pgwrapper/pg_mini-test.cc | 52 +++++ 18 files changed, 409 insertions(+), 68 deletions(-) diff --git a/src/yb/common/opid.h b/src/yb/common/opid.h index e2513f09ed30..9b375c91795c 100644 --- a/src/yb/common/opid.h +++ b/src/yb/common/opid.h @@ -84,6 +84,9 @@ struct OpId { std::string ToString() const; + static OpId MinValid(const OpId& lhs, const OpId& rhs); + static OpId MaxValid(const OpId& lhs, const OpId& rhs); + // Parse OpId from TERM.INDEX string. static Result FromString(Slice input); }; @@ -112,6 +115,18 @@ inline bool operator>=(const OpId& lhs, const OpId& rhs) { return !(lhs < rhs); } +inline OpId OpId::MinValid(const OpId& lhs, const OpId& rhs) { + if (!lhs.valid()) return rhs; + if (!rhs.valid()) return lhs; + return std::min(lhs, rhs); +} + +inline OpId OpId::MaxValid(const OpId& lhs, const OpId& rhs) { + if (!lhs.valid()) return rhs; + if (!rhs.valid()) return lhs; + return std::max(lhs, rhs); +} + std::ostream& operator<<(std::ostream& out, const OpId& op_id); size_t hash_value(const OpId& op_id) noexcept; diff --git a/src/yb/consensus/consensus.proto b/src/yb/consensus/consensus.proto index 5db17252fda8..ceb605627345 100644 --- a/src/yb/consensus/consensus.proto +++ b/src/yb/consensus/consensus.proto @@ -613,7 +613,10 @@ message ClientReplicatedRetryableRequestRangesPB { message TabletBootstrapStatePB { optional OpIdPB last_op_id = 1; repeated ClientReplicatedRetryableRequestRangesPB client_requests = 2; - optional fixed64 min_running_ht = 3; + + reserved 3; + + optional fixed64 min_replay_txn_start_ht = 4; } // A Raft implementation. diff --git a/src/yb/tablet/tablet.cc b/src/yb/tablet/tablet.cc index cb49192e65e7..51ecb5716666 100644 --- a/src/yb/tablet/tablet.cc +++ b/src/yb/tablet/tablet.cc @@ -491,11 +491,36 @@ Result CheckSafeTime(HybridTime time, HybridTime min_allowed) { } // namespace -class Tablet::RegularRocksDbListener : public rocksdb::EventListener { +class Tablet::RocksDbListener : public rocksdb::EventListener { public: - RegularRocksDbListener(Tablet* tablet, const std::string& log_prefix) - : tablet_(*CHECK_NOTNULL(tablet)), - log_prefix_(log_prefix) {} + RocksDbListener(Tablet& tablet, const std::string& log_prefix) + : tablet_(tablet), log_prefix_(log_prefix) {} + + void OnFlushCompleted(rocksdb::DB*, const rocksdb::FlushJobInfo&) override { + if (auto* participant = tablet_.transaction_participant()) { + VLOG_WITH_PREFIX_AND_FUNC(2) + << "RocksDB flush completed, triggering cleanup of recently applied transactions"; + auto status = participant->ProcessRecentlyAppliedTransactions(); + if (!status.ok() && !tablet_.shutdown_requested_.load(std::memory_order_acquire)) { + LOG_WITH_PREFIX_AND_FUNC(DFATAL) + << "Failed to clean up recently applied transactions: " << status; + } + } + } + + protected: + const std::string& LogPrefix() const { + return log_prefix_; + } + + Tablet& tablet_; + const std::string log_prefix_; +}; + +class Tablet::RegularRocksDbListener : public Tablet::RocksDbListener { + public: + RegularRocksDbListener(Tablet& tablet, const std::string& log_prefix) + : RocksDbListener(tablet, log_prefix) {} void OnCompactionCompleted(rocksdb::DB* db, const rocksdb::CompactionJobInfo& ci) override { auto& metadata = *CHECK_NOTNULL(tablet_.metadata()); @@ -528,7 +553,7 @@ class Tablet::RegularRocksDbListener : public rocksdb::EventListener { { auto scoped_read_operation = tablet_.CreateScopedRWOperationNotBlockingRocksDbShutdownStart(); if (!scoped_read_operation.ok()) { - VLOG_WITH_FUNC(4) << "Skip"; + VLOG_WITH_PREFIX_AND_FUNC(4) << "Skip"; return; } @@ -552,7 +577,7 @@ class Tablet::RegularRocksDbListener : public rocksdb::EventListener { if(!tablet_.metadata()->colocated()) { auto schema_version = tablet_.get_min_xcluster_schema_version_(primary_table_id, kColocationIdNotSet); - VLOG_WITH_FUNC(4) << + VLOG_WITH_PREFIX_AND_FUNC(4) << Format("MinNonXClusterSchemaVersion, MinXClusterSchemaVersion for $0:$1,$2", primary_table_id, min_schema_versions[Uuid::Nil()], schema_version); if (schema_version < min_schema_versions[Uuid::Nil()]) { @@ -566,7 +591,7 @@ class Tablet::RegularRocksDbListener : public rocksdb::EventListener { ColocationId colocation_id = colocated_tables[table_id.ToHexString()]; auto xcluster_min_schema_version = tablet_.get_min_xcluster_schema_version_(primary_table_id, colocation_id); - VLOG_WITH_FUNC(4) << + VLOG_WITH_PREFIX_AND_FUNC(4) << Format("MinNonXClusterSchemaVersion, MinXClusterSchemaVersion for $0,$1:$2,$3", primary_table_id, colocation_id, min_schema_versions[table_id], xcluster_min_schema_version); @@ -595,9 +620,6 @@ class Tablet::RegularRocksDbListener : public rocksdb::EventListener { smallest.MakeExternalSchemaVersionsAtMost(table_id_to_min_schema_version); } } - - Tablet& tablet_; - const std::string log_prefix_; }; Tablet::Tablet(const TabletInitData& data) @@ -972,7 +994,7 @@ Status Tablet::OpenKeyValueTablet() { rocksdb::Options regular_rocksdb_options(rocksdb_options); regular_rocksdb_options.listeners.push_back( - std::make_shared(this, regular_rocksdb_options.log_prefix)); + std::make_shared(*this, regular_rocksdb_options.log_prefix)); const string db_dir = metadata()->rocksdb_dir(); RETURN_NOT_OK(CreateTabletDirectories(db_dir, metadata()->fs_manager())); @@ -1020,6 +1042,9 @@ Status Tablet::OpenKeyValueTablet() { } intents_rocksdb_options.statistics = intentsdb_statistics_; + intents_rocksdb_options.listeners.push_back( + std::make_shared(*this, intents_rocksdb_options.log_prefix)); + rocksdb::DB* intents_db = nullptr; RETURN_NOT_OK( rocksdb::DB::Open(intents_rocksdb_options, db_dir + kIntentsDBSuffix, &intents_db)); diff --git a/src/yb/tablet/tablet.h b/src/yb/tablet/tablet.h index 82d42335f3b8..65f4a23d80e7 100644 --- a/src/yb/tablet/tablet.h +++ b/src/yb/tablet/tablet.h @@ -978,6 +978,7 @@ class Tablet : public AbstractTablet, friend class ScopedReadOperation; friend class TabletComponent; + class RocksDbListener; class RegularRocksDbListener; FRIEND_TEST(TestTablet, TestGetLogRetentionSizeForIndex); diff --git a/src/yb/tablet/tablet_bootstrap.cc b/src/yb/tablet/tablet_bootstrap.cc index 7cb76d3094b7..ad0c7bcfa9a9 100644 --- a/src/yb/tablet/tablet_bootstrap.cc +++ b/src/yb/tablet/tablet_bootstrap.cc @@ -136,8 +136,8 @@ DEFINE_RUNTIME_bool(skip_flushed_entries_in_first_replayed_segment, true, "If applicable, only replay entries that are not flushed to RocksDB or necessary " "to bootstrap retryable requests in the first replayed wal segment."); -DEFINE_RUNTIME_bool(use_bootstrap_intent_ht_filter, false, - "Use min running hybrid time filter for bootstrap."); +DEFINE_RUNTIME_bool(use_bootstrap_intent_ht_filter, true, + "Use min replay txn start time filter for bootstrap."); DECLARE_int32(retryable_request_timeout_secs); @@ -506,7 +506,7 @@ class TabletBootstrap { } std::optional bootstrap_state_pb = std::nullopt; - HybridTime min_running_ht = HybridTime::kInvalid; + HybridTime min_replay_txn_start_ht = HybridTime::kInvalid; if (GetAtomicFlag(&FLAGS_enable_flush_retryable_requests) && data_.bootstrap_state_manager) { auto result = data_.bootstrap_state_manager->LoadFromDisk(); if (result.ok()) { @@ -514,14 +514,14 @@ class TabletBootstrap { if (GetAtomicFlag(&FLAGS_use_bootstrap_intent_ht_filter)) { const auto& bootstrap_state = data_.bootstrap_state_manager->bootstrap_state(); - min_running_ht = bootstrap_state.GetMinRunningHybridTime(); + min_replay_txn_start_ht = bootstrap_state.GetMinReplayTxnStartTime(); } } else if (!result.status().IsNotFound()) { return result.status(); } } - const bool has_blocks = VERIFY_RESULT(OpenTablet(min_running_ht)); + const bool has_blocks = VERIFY_RESULT(OpenTablet(min_replay_txn_start_ht)); if (data_.retryable_requests) { const auto retryable_request_timeout_secs = meta_->IsSysCatalog() @@ -623,7 +623,7 @@ class TabletBootstrap { } // Sets result to true if there was any data on disk for this tablet. - Result OpenTablet(HybridTime min_running_ht) { + Result OpenTablet(HybridTime min_replay_txn_start_ht) { CleanupSnapshots(); // Use operator new instead of make_shared for creating the shared_ptr. That way, we would have // the shared_ptr's control block hold a raw pointer to the Tablet object as opposed to the @@ -637,7 +637,7 @@ class TabletBootstrap { auto participant = tablet->transaction_participant(); if (participant) { - participant->SetMinRunningHybridTimeLowerBound(min_running_ht); + participant->SetMinReplayTxnStartTimeLowerBound(min_replay_txn_start_ht); } // Doing nothing for now except opening a tablet locally. diff --git a/src/yb/tablet/tablet_bootstrap_state_flusher.cc b/src/yb/tablet/tablet_bootstrap_state_flusher.cc index d739eb5d9742..969aff0dbe0e 100644 --- a/src/yb/tablet/tablet_bootstrap_state_flusher.cc +++ b/src/yb/tablet/tablet_bootstrap_state_flusher.cc @@ -125,7 +125,7 @@ Status TabletBootstrapStateFlusher::FlushBootstrapState(TabletBootstrapFlushStat SetIdleAndNotifyAll(); }); TEST_PAUSE_IF_FLAG(TEST_pause_before_flushing_bootstrap_state); - return bootstrap_state_manager_->SaveToDisk(*raft_consensus_); + return bootstrap_state_manager_->SaveToDisk(tablet_, *raft_consensus_); } Status TabletBootstrapStateFlusher::SubmitFlushBootstrapStateTask() { diff --git a/src/yb/tablet/tablet_bootstrap_state_flusher.h b/src/yb/tablet/tablet_bootstrap_state_flusher.h index 189c5cabb9f6..0bcaf7460d73 100644 --- a/src/yb/tablet/tablet_bootstrap_state_flusher.h +++ b/src/yb/tablet/tablet_bootstrap_state_flusher.h @@ -40,10 +40,12 @@ class TabletBootstrapStateFlusher : public: TabletBootstrapStateFlusher( const std::string& tablet_id, + TabletWeakPtr tablet, std::shared_ptr raft_consensus, std::shared_ptr bootstrap_state_manager, std::unique_ptr flush_bootstrap_state_pool_token) : tablet_id_(tablet_id), + tablet_(std::move(tablet)), raft_consensus_(raft_consensus), bootstrap_state_manager_(bootstrap_state_manager), flush_bootstrap_state_pool_token_(std::move(flush_bootstrap_state_pool_token)) {} @@ -77,6 +79,7 @@ class TabletBootstrapStateFlusher : mutable std::condition_variable flush_cond_; std::atomic flush_state_{TabletBootstrapFlushState::kFlushIdle}; TabletId tablet_id_; + TabletWeakPtr tablet_; std::shared_ptr raft_consensus_; std::shared_ptr bootstrap_state_manager_; std::unique_ptr flush_bootstrap_state_pool_token_; diff --git a/src/yb/tablet/tablet_bootstrap_state_manager.cc b/src/yb/tablet/tablet_bootstrap_state_manager.cc index ef0573e868a1..3f739223abc0 100644 --- a/src/yb/tablet/tablet_bootstrap_state_manager.cc +++ b/src/yb/tablet/tablet_bootstrap_state_manager.cc @@ -21,32 +21,36 @@ #include "yb/consensus/retryable_requests.h" #include "yb/consensus/opid_util.h" +#include "yb/tablet/tablet.h" +#include "yb/tablet/transaction_participant.h" + #include "yb/util/debug-util.h" #include "yb/util/env_util.h" namespace yb::tablet { TabletBootstrapState::TabletBootstrapState(const TabletBootstrapState& rhs): - min_running_ht_(rhs.min_running_ht_.load()) {} + min_replay_txn_start_ht_(rhs.min_replay_txn_start_ht_.load()) {} TabletBootstrapState::TabletBootstrapState(TabletBootstrapState&& rhs): - min_running_ht_(rhs.min_running_ht_.load()) {} + min_replay_txn_start_ht_(rhs.min_replay_txn_start_ht_.load()) {} void TabletBootstrapState::operator=(TabletBootstrapState&& rhs) { - min_running_ht_.store(rhs.min_running_ht_.load()); + min_replay_txn_start_ht_.store(rhs.min_replay_txn_start_ht_.load()); } void TabletBootstrapState::CopyFrom(const TabletBootstrapState& rhs) { - min_running_ht_.store(rhs.min_running_ht_.load()); + min_replay_txn_start_ht_.store(rhs.min_replay_txn_start_ht_.load()); } void TabletBootstrapState::ToPB(consensus::TabletBootstrapStatePB* pb) const { - pb->set_min_running_ht(min_running_ht_.load().ToUint64()); + pb->set_min_replay_txn_start_ht(min_replay_txn_start_ht_.load().ToUint64()); } void TabletBootstrapState::FromPB(const consensus::TabletBootstrapStatePB& pb) { - min_running_ht_.store( - pb.has_min_running_ht() ? HybridTime(pb.min_running_ht()) : HybridTime::kInvalid); + min_replay_txn_start_ht_.store( + pb.has_min_replay_txn_start_ht() ? HybridTime(pb.min_replay_txn_start_ht()) + : HybridTime::kInvalid); } TabletBootstrapStateManager::TabletBootstrapStateManager() { } @@ -71,15 +75,32 @@ Status TabletBootstrapStateManager::Init() { return Status::OK(); } -Status TabletBootstrapStateManager::SaveToDisk(consensus::RaftConsensus& raft_consensus) { +Status TabletBootstrapStateManager::SaveToDisk( + const TabletWeakPtr& tablet_ptr, consensus::RaftConsensus& raft_consensus) { auto retryable_requests = VERIFY_RESULT(raft_consensus.TakeSnapshotOfRetryableRequests()); if (!retryable_requests) { LOG(INFO) << "Nothing to save"; return Status::OK(); } + auto max_replicated_op_id = retryable_requests->GetMaxReplicatedOpId(); + TabletBootstrapState bootstrap_state(bootstrap_state_); + // Set min replay txn start time to what it will be after this flush succeeds - this is safe + // because if the flush succeeds, replay start op id will be calculated from the new value. + auto tablet = tablet_ptr.lock(); + TransactionParticipant* participant = nullptr; + if (tablet) { + participant = tablet->transaction_participant(); + if (participant) { + auto start_ht = VERIFY_RESULT(participant->SimulateProcessRecentlyAppliedTransactions( + max_replicated_op_id)); + VLOG(1) << "Using min_replay_txn_start_ht = " << start_ht; + bootstrap_state.SetMinReplayTxnStartTime(start_ht); + } + } + consensus::TabletBootstrapStatePB pb; retryable_requests->ToPB(&pb); bootstrap_state.ToPB(&pb); @@ -101,8 +122,16 @@ Status TabletBootstrapStateManager::SaveToDisk(consensus::RaftConsensus& raft_co has_file_on_disk_ = true; RETURN_NOT_OK(env->SyncDir(dir_)); - auto max_replicated_op_id = retryable_requests->GetMaxReplicatedOpId(); - return raft_consensus.SetLastFlushedOpIdInRetryableRequests(max_replicated_op_id); + RETURN_NOT_OK(raft_consensus.SetLastFlushedOpIdInRetryableRequests(max_replicated_op_id)); + + if (participant) { + VLOG(1) + << "Bootstrap state saved to disk, triggering cleanup of recently applied transactions"; + participant->SetRetryableRequestsFlushedOpId(max_replicated_op_id); + return participant->ProcessRecentlyAppliedTransactions(); + } + + return Status::OK(); } Result TabletBootstrapStateManager::LoadFromDisk() { diff --git a/src/yb/tablet/tablet_bootstrap_state_manager.h b/src/yb/tablet/tablet_bootstrap_state_manager.h index dd7c60dd2be6..17f011c3d9cf 100644 --- a/src/yb/tablet/tablet_bootstrap_state_manager.h +++ b/src/yb/tablet/tablet_bootstrap_state_manager.h @@ -42,14 +42,17 @@ class TabletBootstrapState { void CopyFrom(const TabletBootstrapState& rhs); - void SetMinRunningHybridTime(HybridTime min_running_ht) { min_running_ht_.store(min_running_ht); } - HybridTime GetMinRunningHybridTime() const { return min_running_ht_.load(); } + void SetMinReplayTxnStartTime(HybridTime min_replay_txn_start_ht) { + min_replay_txn_start_ht_.store(min_replay_txn_start_ht); + } + + HybridTime GetMinReplayTxnStartTime() const { return min_replay_txn_start_ht_.load(); } void ToPB(consensus::TabletBootstrapStatePB* pb) const; void FromPB(const consensus::TabletBootstrapStatePB& pb); private: - std::atomic min_running_ht_{HybridTime::kInvalid}; + std::atomic min_replay_txn_start_ht_{HybridTime::kInvalid}; }; class TabletBootstrapStateManager { @@ -74,7 +77,7 @@ class TabletBootstrapStateManager { } // Flush the pb as the latest version. - Status SaveToDisk(consensus::RaftConsensus& raft_consensus); + Status SaveToDisk(const TabletWeakPtr& tablet_ptr, consensus::RaftConsensus& raft_consensus); // Load the latest version from disk if any. Result LoadFromDisk(); diff --git a/src/yb/tablet/tablet_peer.cc b/src/yb/tablet/tablet_peer.cc index 28942c1baa18..103b638419c7 100644 --- a/src/yb/tablet/tablet_peer.cc +++ b/src/yb/tablet/tablet_peer.cc @@ -320,7 +320,7 @@ Status TabletPeer::InitTabletPeer( auto flush_bootstrap_state_pool_token = flush_bootstrap_state_pool ? flush_bootstrap_state_pool->NewToken(ThreadPool::ExecutionMode::SERIAL) : nullptr; bootstrap_state_flusher_ = std::make_shared( - tablet_id_, consensus_, bootstrap_state_manager_, + tablet_id_, tablet_weak_, consensus_, bootstrap_state_manager_, std::move(flush_bootstrap_state_pool_token)); tablet_->SetHybridTimeLeaseProvider(std::bind(&TabletPeer::HybridTimeLease, this, _1, _2)); @@ -331,8 +331,8 @@ Status TabletPeer::InitTabletPeer( auto txn_participant = tablet_->transaction_participant(); if (txn_participant) { - txn_participant->SetMinRunningHybridTimeUpdateCallback( - std::bind_front(&TabletPeer::MinRunningHybridTimeUpdated, this)); + txn_participant->SetMinReplayTxnStartTimeUpdateCallback( + std::bind_front(&TabletPeer::MinReplayTxnStartTimeUpdated, this)); } // "Publish" the tablet object right before releasing the lock. @@ -942,6 +942,11 @@ void TabletPeer::GetInFlightOperations(Operation::TraceType trace_type, } } +Result TabletPeer::MaxPersistentOpId() const { + auto flush_op_ids = VERIFY_RESULT(tablet_->MaxPersistentOpId()); + return OpId::MinValid(flush_op_ids.intents, flush_op_ids.regular); +} + Result TabletPeer::GetEarliestNeededLogIndex(std::string* details) const { if (PREDICT_FALSE(!log_)) { auto status = STATUS(Uninitialized, "Log not ready (tablet peer not yet initialized?)"); @@ -1822,10 +1827,10 @@ TabletBootstrapFlushState TabletPeer::TEST_TabletBootstrapStateFlusherState() co : TabletBootstrapFlushState::kFlushIdle; } -void TabletPeer::MinRunningHybridTimeUpdated(HybridTime min_running_ht) { - if (min_running_ht && min_running_ht != HybridTime::kMax) { - VLOG_WITH_PREFIX(2) << "Min running hybrid time updated: " << min_running_ht; - bootstrap_state_manager_->bootstrap_state().SetMinRunningHybridTime(min_running_ht); +void TabletPeer::MinReplayTxnStartTimeUpdated(HybridTime start_ht) { + if (start_ht && start_ht != HybridTime::kMax) { + VLOG_WITH_PREFIX(2) << "min_replay_txn_start_ht updated: " << start_ht; + bootstrap_state_manager_->bootstrap_state().SetMinReplayTxnStartTime(start_ht); } } diff --git a/src/yb/tablet/tablet_peer.h b/src/yb/tablet/tablet_peer.h index 2c6fbb1e54db..50abce111683 100644 --- a/src/yb/tablet/tablet_peer.h +++ b/src/yb/tablet/tablet_peer.h @@ -320,6 +320,8 @@ class TabletPeer : public std::enable_shared_from_this, // to it. Result GetEarliestNeededLogIndex(std::string* details = nullptr) const; + Result MaxPersistentOpId() const override; + // Returns the the minimum log index for transaction tables and latest log index for other tables. // Returns the bootstrap_time which is safe_time higher than the time of the returned OpId. // If FLAGS_abort_active_txns_during_cdc_bootstrap is set then all active transactions are @@ -600,7 +602,7 @@ class TabletPeer : public std::enable_shared_from_this, bool FlushBootstrapStateEnabled() const; - void MinRunningHybridTimeUpdated(HybridTime min_running_ht); + void MinReplayTxnStartTimeUpdated(HybridTime start_ht); MetricRegistry* metric_registry_; diff --git a/src/yb/tablet/transaction_loader.cc b/src/yb/tablet/transaction_loader.cc index a7be08d4865c..cd2c8295040e 100644 --- a/src/yb/tablet/transaction_loader.cc +++ b/src/yb/tablet/transaction_loader.cc @@ -77,11 +77,11 @@ class TransactionLoader::Executor { if (!scoped_pending_operation_.ok()) { return false; } - auto min_running_ht = context().MinRunningHybridTime(); - VLOG_WITH_PREFIX(1) << "TransactionLoader min_running_ht: " << min_running_ht; + auto min_replay_txn_start_ht = context().MinReplayTxnStartTime(); + VLOG_WITH_PREFIX(1) << "TransactionLoader min_replay_txn_start_ht: " << min_replay_txn_start_ht; regular_iterator_ = CreateFullScanIterator(db.regular, nullptr /* filter */); intents_iterator_ = CreateFullScanIterator(db.intents, - docdb::CreateIntentHybridTimeFileFilter(min_running_ht)); + docdb::CreateIntentHybridTimeFileFilter(min_replay_txn_start_ht)); loader_.state_.store(TransactionLoaderState::kLoading, std::memory_order_release); CHECK_OK(yb::Thread::Create( "transaction_loader", "loader", &Executor::Execute, this, &loader_.load_thread_)) diff --git a/src/yb/tablet/transaction_loader.h b/src/yb/tablet/transaction_loader.h index e317cbca9c8e..3ea8d00d750e 100644 --- a/src/yb/tablet/transaction_loader.h +++ b/src/yb/tablet/transaction_loader.h @@ -58,7 +58,7 @@ class TransactionLoaderContext { OneWayBitmap&& replicated_batches, const ApplyStateWithCommitHt* pending_apply) = 0; virtual void LoadFinished(Status load_status) = 0; - virtual HybridTime MinRunningHybridTime() = 0; + virtual HybridTime MinReplayTxnStartTime() = 0; }; YB_DEFINE_ENUM(TransactionLoaderState, (kNotStarted)(kLoading)(kCompleted)(kFailed)); diff --git a/src/yb/tablet/transaction_participant.cc b/src/yb/tablet/transaction_participant.cc index 78ba97bcf340..d961f600f369 100644 --- a/src/yb/tablet/transaction_participant.cc +++ b/src/yb/tablet/transaction_participant.cc @@ -20,6 +20,7 @@ #include #include +#include #include #include "yb/client/transaction_rpc.h" @@ -49,6 +50,7 @@ #include "yb/tserver/tserver_service.pb.h" +#include "yb/util/algorithm_util.h" #include "yb/util/async_util.h" #include "yb/util/callsite_profiling.h" #include "yb/util/countdown_latch.h" @@ -113,6 +115,9 @@ DEFINE_RUNTIME_bool(cdc_immediate_transaction_cleanup, true, DEFINE_test_flag(int32, stopactivetxns_sleep_in_abort_cb_ms, 0, "Delays the abort callback in StopActiveTxns to repro GitHub #23399."); +DEFINE_test_flag(bool, no_schedule_remove_intents, false, + "Don't schedule remove intents when transaction is cleaned from memory."); + DECLARE_int64(transaction_abort_check_timeout_ms); DECLARE_int64(cdc_intent_retention_ms); @@ -130,6 +135,10 @@ METRIC_DEFINE_simple_gauge_uint64( tablet, aborted_transactions_pending_cleanup, "Total number of aborted transactions running in participant", yb::MetricUnit::kTransactions); +METRIC_DEFINE_simple_gauge_uint64( + tablet, wal_replayable_applied_transactions, + "Total number of recently applied transactions that may be found during WAL replay", + yb::MetricUnit::kTransactions); METRIC_DEFINE_event_stats(tablet, conflict_resolution_latency, "Conflict Resolution Latency", yb::MetricUnit::kMicroseconds, "Microseconds spent on conflict resolution across all " @@ -197,6 +206,8 @@ class TransactionParticipant::Impl metric_transaction_not_found_ = METRIC_transaction_not_found.Instantiate(entity); metric_aborted_transactions_pending_cleanup_ = METRIC_aborted_transactions_pending_cleanup.Instantiate(entity, 0); + metric_wal_replayable_applied_transactions_ = + METRIC_wal_replayable_applied_transactions.Instantiate(entity, 0); metric_conflict_resolution_latency_ = METRIC_conflict_resolution_latency.Instantiate(entity); metric_conflict_resolution_num_keys_scanned_ = @@ -624,7 +635,9 @@ class TransactionParticipant::Impl OpId op_id = (**it).GetApplyOpId(); if (op_id <= checkpoint_op_id) { - (**it).ScheduleRemoveIntents(*it, front.reason); + if (PREDICT_TRUE(!GetAtomicFlag(&FLAGS_TEST_no_schedule_remove_intents))) { + (**it).ScheduleRemoveIntents(*it, front.reason); + } } else { if (!GetAtomicFlag(&FLAGS_cdc_write_post_apply_metadata) || !GetAtomicFlag(&FLAGS_cdc_immediate_transaction_cleanup)) { @@ -1075,18 +1088,22 @@ class TransactionParticipant::Impl return &participant_context_; } - void SetMinRunningHybridTimeLowerBound(HybridTime lower_bound) { - if (lower_bound == HybridTime::kMax || lower_bound == HybridTime::kInvalid) { + void SetMinReplayTxnStartTimeLowerBound(HybridTime start_ht) { + if (start_ht == HybridTime::kMax || start_ht == HybridTime::kInvalid) { return; } - HybridTime current_ht = min_running_ht_.load(std::memory_order_acquire); - while ((!current_ht || current_ht < lower_bound) - && !min_running_ht_.compare_exchange_weak(current_ht, lower_bound)) {} - VLOG_WITH_PREFIX(1) << "Updated min running hybrid time to at least " << lower_bound + HybridTime current_ht = min_replay_txn_start_ht_.load(std::memory_order_acquire); + while ((!current_ht || current_ht < start_ht) + && !min_replay_txn_start_ht_.compare_exchange_weak(current_ht, start_ht)) {} + VLOG_WITH_PREFIX(1) << "Set min replay txn start time to at least " << start_ht << ", was " << current_ht; } - HybridTime MinRunningHybridTime() override { + HybridTime MinReplayTxnStartTime() override { + return min_replay_txn_start_ht_.load(std::memory_order_acquire); + } + + HybridTime MinRunningHybridTime() { auto result = min_running_ht_.load(std::memory_order_acquire); if (result == HybridTime::kMax || result == HybridTime::kInvalid || !transactions_loaded_.load()) { @@ -1252,9 +1269,9 @@ class TransactionParticipant::Impl return transactions_.size(); } - void SetMinRunningHybridTimeUpdateCallback(std::function callback) { + void SetMinReplayTxnStartTimeUpdateCallback(std::function callback) { std::lock_guard lock(mutex_); - min_running_ht_callback_ = std::move(callback); + min_replay_txn_start_ht_callback_ = std::move(callback); } OneWayBitmap TEST_TransactionReplicatedBatches(const TransactionId& id) { @@ -1406,9 +1423,28 @@ class TransactionParticipant::Impl metric_conflict_resolution_latency_->Increment(latency.ToMilliseconds()); } + Result SimulateProcessRecentlyAppliedTransactions( + const OpId& retryable_requests_flushed_op_id) EXCLUDES(mutex_) { + std::lock_guard lock(mutex_); + return DoProcessRecentlyAppliedTransactions( + retryable_requests_flushed_op_id, false /* persist */); + } + + void SetRetryableRequestsFlushedOpId(const OpId& flushed_op_id) EXCLUDES(mutex_) { + std::lock_guard lock(mutex_); + retryable_requests_flushed_op_id_ = flushed_op_id; + } + + Status ProcessRecentlyAppliedTransactions() EXCLUDES(mutex_) { + std::lock_guard lock(mutex_); + return ResultToStatus(DoProcessRecentlyAppliedTransactions( + retryable_requests_flushed_op_id_, true /* persist */)); + } + private: class AbortCheckTimeTag; class StartTimeTag; + class ApplyOpIdTag; typedef boost::multi_index_container > Transactions; + struct AppliedTransactionState { + OpId apply_op_id; + HybridTime start_ht; + }; + + using RecentlyAppliedTransactions = boost::multi_index_container, + boost::multi_index::member < + AppliedTransactionState, OpId, &AppliedTransactionState::apply_op_id> + >, + boost::multi_index::ordered_non_unique < + boost::multi_index::tag, + boost::multi_index::member < + AppliedTransactionState, HybridTime, &AppliedTransactionState::start_ht> + > + > + >; + void LoadFinished(Status load_status) EXCLUDES(status_resolvers_mutex_) override { // The start_latch will be hit either from a CountDown from Start, or from Shutdown, so make // sure that at the end of Load, we unblock shutdown. @@ -1509,10 +1565,8 @@ class TransactionParticipant::Impl } void SetMinRunningHybridTime(HybridTime min_running_ht) REQUIRES(mutex_) { - min_running_ht_.store(min_running_ht, std::memory_order_release); - if (min_running_ht_callback_) { - min_running_ht_callback_(min_running_ht); - } + min_running_ht_.store(min_running_ht); + UpdateMinReplayTxnStartTimeIfNeeded(); } void TransactionsModifiedUnlocked(MinRunningNotifier* min_running_notifier) REQUIRES(mutex_) { @@ -1632,7 +1686,9 @@ class TransactionParticipant::Impl bool remove_transaction = true; if (op_id < checkpoint_op_id) { - (**it).ScheduleRemoveIntents(*it, reason); + if (PREDICT_TRUE(!GetAtomicFlag(&FLAGS_TEST_no_schedule_remove_intents))) { + (**it).ScheduleRemoveIntents(*it, reason); + } } else { if (!GetAtomicFlag(&FLAGS_cdc_write_post_apply_metadata) || !GetAtomicFlag(&FLAGS_cdc_immediate_transaction_cleanup)) { @@ -1811,6 +1867,7 @@ class TransactionParticipant::Impl recently_removed_transactions_cleanup_queue_.push_back({transaction.id(), now + 15s}); LOG_IF_WITH_PREFIX(DFATAL, !recently_removed_transactions_.insert(transaction.id()).second) << "Transaction removed twice: " << transaction.id(); + AddRecentlyAppliedTransaction(transaction.start_ht(), transaction.GetApplyOpId()); transactions_.erase(it); mem_tracker_->Release(kRunningTransactionSize); TransactionsModifiedUnlocked(min_running_notifier); @@ -2087,6 +2144,100 @@ class TransactionParticipant::Impl participant_context_.StrandEnqueue(write_metadata_task.get()); } + void AddRecentlyAppliedTransaction(HybridTime start_ht, const OpId& apply_op_id) + REQUIRES(mutex_) { + // We only care about the min start_ht, while cleaning out all entries with apply_op_id less + // than progressively higher boundaries, so entries with apply_op_id lower and higher start_ht + // than the entry with the lowest start_ht are irrelevant. Likewise, if apply_op_id is higher + // and start_ht is lower than the lowest start_ht entry, the lowest start_ht entry is now + // irrelevant and can be cleaned up. + + int64_t cleaned = 0; + if (!recently_applied_.empty()) { + auto& index = recently_applied_.get(); + + auto itr = index.begin(); + if (start_ht >= itr->start_ht && apply_op_id <= itr->apply_op_id) { + VLOG_WITH_PREFIX(2) + << "Not adding recently applied transaction: " + << "start_ht=" << start_ht << " (min=" << itr->start_ht << "), " + << "apply_op_id=" << apply_op_id << " (min=" << itr->apply_op_id << ")"; + return; + } + + cleaned = EraseElementsUntil( + index, + [start_ht, &apply_op_id](const AppliedTransactionState& state) { + return start_ht > state.start_ht || apply_op_id < state.apply_op_id; + }); + } + + VLOG_WITH_PREFIX(2) + << "Adding recently applied transaction: " + << "start_ht=" << start_ht << " apply_op_id=" << apply_op_id + << " (cleaned " << cleaned << ")"; + recently_applied_.insert(AppliedTransactionState{apply_op_id, start_ht}); + metric_wal_replayable_applied_transactions_->IncrementBy(1 - static_cast(cleaned)); + UpdateMinReplayTxnStartTimeIfNeeded(); + } + + Result DoProcessRecentlyAppliedTransactions( + const OpId& retryable_requests_flushed_op_id, bool persist) REQUIRES(mutex_) { + auto threshold = VERIFY_RESULT(participant_context_.MaxPersistentOpId()); + threshold = OpId::MinValid(threshold, retryable_requests_flushed_op_id); + + if (!threshold.valid()) { + return min_replay_txn_start_ht_.load(std::memory_order_acquire); + } + + auto recently_applied_copy = + persist ? RecentlyAppliedTransactions() : RecentlyAppliedTransactions(recently_applied_); + auto& recently_applied = persist ? recently_applied_ : recently_applied_copy; + + auto cleaned = CleanRecentlyAppliedTransactions(recently_applied, threshold); + if (persist && cleaned > 0) { + metric_wal_replayable_applied_transactions_->DecrementBy(cleaned); + VLOG_WITH_PREFIX(1) << "Cleaned recently applied transactions with threshold: " << threshold + << ", cleaned " << cleaned + << ", remaining " << recently_applied_.size(); + UpdateMinReplayTxnStartTimeIfNeeded(); + } + + return GetMinReplayTxnStartTime(recently_applied); + } + + int64_t CleanRecentlyAppliedTransactions( + RecentlyAppliedTransactions& recently_applied, const OpId& threshold) { + if (!threshold.valid() || recently_applied.empty()) { + return 0; + } + + return EraseElementsUntil( + recently_applied.get(), + [&threshold](const AppliedTransactionState& state) { + return state.apply_op_id >= threshold; + }); + } + + HybridTime GetMinReplayTxnStartTime(RecentlyAppliedTransactions& recently_applied) { + auto min_running_ht = min_running_ht_.load(std::memory_order_acquire); + auto applied_min_ht = recently_applied.empty() + ? HybridTime::kMax + : (*recently_applied.get().begin()).start_ht; + + applied_min_ht.MakeAtMost(min_running_ht); + return applied_min_ht; + } + + void UpdateMinReplayTxnStartTimeIfNeeded() REQUIRES(mutex_) { + if (min_replay_txn_start_ht_callback_) { + auto ht = GetMinReplayTxnStartTime(recently_applied_); + if (min_replay_txn_start_ht_.exchange(ht, std::memory_order_acq_rel) != ht) { + min_replay_txn_start_ht_callback_(ht); + } + } + } + struct ImmediateCleanupQueueEntry { int64_t request_id; TransactionId transaction_id; @@ -2136,6 +2287,16 @@ class TransactionParticipant::Impl std::deque immediate_cleanup_queue_ GUARDED_BY(mutex_); std::deque graceful_cleanup_queue_ GUARDED_BY(mutex_); + // Information about recently applied transactions that are still needed at bootstrap time, used + // to calculate min_replay_txn_start_ht (lowest start_ht of any transaction which may be + // read during bootstrap log replay). + RecentlyAppliedTransactions recently_applied_ GUARDED_BY(mutex_); + + // Retryable requests flushed_op_id, used to calculate bootstrap_start_op_id. A copy is held + // here instead of querying participant_context_ to avoid grabbing TabletPeer lock and causing + // a deadlock between flush listener and thread waiting for sync flush. + OpId retryable_requests_flushed_op_id_ GUARDED_BY(mutex_) = OpId::Invalid(); + // Remove queue maintains transactions that could be cleaned when safe time for follower reaches // appropriate time for an entry. // Since we add entries with increasing time, this queue is ordered by time. @@ -2171,6 +2332,7 @@ class TransactionParticipant::Impl scoped_refptr> metric_transactions_running_; scoped_refptr> metric_aborted_transactions_pending_cleanup_; + scoped_refptr> metric_wal_replayable_applied_transactions_; scoped_refptr metric_transaction_not_found_; scoped_refptr metric_conflict_resolution_latency_; scoped_refptr metric_conflict_resolution_num_keys_scanned_; @@ -2181,10 +2343,11 @@ class TransactionParticipant::Impl CountDownLatch shutdown_latch_{1}; std::atomic min_running_ht_{HybridTime::kInvalid}; + std::atomic min_replay_txn_start_ht_{HybridTime::kInvalid}; std::atomic next_check_min_running_{CoarseTimePoint()}; HybridTime waiting_for_min_running_ht_ = HybridTime::kMax; std::atomic shutdown_done_{false}; - std::function min_running_ht_callback_ GUARDED_BY(mutex_); + std::function min_replay_txn_start_ht_callback_ GUARDED_BY(mutex_); LRUCache cleanup_cache_{FLAGS_transactions_cleanup_cache_size}; @@ -2331,8 +2494,12 @@ TransactionParticipantContext* TransactionParticipant::context() const { return impl_->participant_context(); } -void TransactionParticipant::SetMinRunningHybridTimeLowerBound(HybridTime lower_bound) { - impl_->SetMinRunningHybridTimeLowerBound(lower_bound); +void TransactionParticipant::SetMinReplayTxnStartTimeLowerBound(HybridTime start_ht) { + impl_->SetMinReplayTxnStartTimeLowerBound(start_ht); +} + +HybridTime TransactionParticipant::MinReplayTxnStartTime() const { + return impl_->MinReplayTxnStartTime(); } HybridTime TransactionParticipant::MinRunningHybridTime() const { @@ -2444,9 +2611,22 @@ void TransactionParticipant::RecordConflictResolutionScanLatency(MonoDelta laten impl_->RecordConflictResolutionScanLatency(latency); } -void TransactionParticipant::SetMinRunningHybridTimeUpdateCallback( +void TransactionParticipant::SetMinReplayTxnStartTimeUpdateCallback( std::function callback) { - impl_->SetMinRunningHybridTimeUpdateCallback(std::move(callback)); + impl_->SetMinReplayTxnStartTimeUpdateCallback(std::move(callback)); +} + +Result TransactionParticipant::SimulateProcessRecentlyAppliedTransactions( + const OpId& retryable_requests_flushed_op_id) { + return impl_->SimulateProcessRecentlyAppliedTransactions(retryable_requests_flushed_op_id); +} + +void TransactionParticipant::SetRetryableRequestsFlushedOpId(const OpId& flushed_op_id) { + return impl_->SetRetryableRequestsFlushedOpId(flushed_op_id); +} + +Status TransactionParticipant::ProcessRecentlyAppliedTransactions() { + return impl_->ProcessRecentlyAppliedTransactions(); } } // namespace tablet diff --git a/src/yb/tablet/transaction_participant.h b/src/yb/tablet/transaction_participant.h index 5079ae79702c..b9e291153ef1 100644 --- a/src/yb/tablet/transaction_participant.h +++ b/src/yb/tablet/transaction_participant.h @@ -185,7 +185,9 @@ class TransactionParticipant : public TransactionStatusManager { TransactionParticipantContext* context() const; - void SetMinRunningHybridTimeLowerBound(HybridTime lower_bound); + void SetMinReplayTxnStartTimeLowerBound(HybridTime start_ht); + + HybridTime MinReplayTxnStartTime() const; HybridTime MinRunningHybridTime() const override; @@ -240,7 +242,7 @@ class TransactionParticipant : public TransactionStatusManager { size_t GetNumRunningTransactions() const; - void SetMinRunningHybridTimeUpdateCallback(std::function callback); + void SetMinReplayTxnStartTimeUpdateCallback(std::function callback); struct CountIntentsResult { size_t num_intents; @@ -253,6 +255,13 @@ class TransactionParticipant : public TransactionStatusManager { OneWayBitmap TEST_TransactionReplicatedBatches(const TransactionId& id) const; + Result SimulateProcessRecentlyAppliedTransactions( + const OpId& retryable_requests_flushed_op_id); + + void SetRetryableRequestsFlushedOpId(const OpId& flushed_op_id); + + Status ProcessRecentlyAppliedTransactions(); + private: Result RegisterRequest() override; void UnregisterRequest(int64_t request) override; diff --git a/src/yb/tablet/transaction_participant_context.h b/src/yb/tablet/transaction_participant_context.h index ca3d130bb06e..a8b0f5f6d0dd 100644 --- a/src/yb/tablet/transaction_participant_context.h +++ b/src/yb/tablet/transaction_participant_context.h @@ -48,6 +48,8 @@ class TransactionParticipantContext { // Returns hybrid time that lower than any future transaction apply record. virtual HybridTime SafeTimeForTransactionParticipant() = 0; + virtual Result MaxPersistentOpId() const = 0; + virtual Result WaitForSafeTime(HybridTime safe_time, CoarseTimePoint deadline) = 0; std::string LogPrefix() const; diff --git a/src/yb/util/algorithm_util.h b/src/yb/util/algorithm_util.h index e2f9e06bc086..4da60d3af8fa 100644 --- a/src/yb/util/algorithm_util.h +++ b/src/yb/util/algorithm_util.h @@ -98,4 +98,16 @@ auto StableSorted(const Col& collection, const Extractor& extractor) { return order; } +// Erases elements from container until predicate is satisfied. +template +size_t EraseElementsUntil(Container& container, const Predicate& predicate) { + size_t erased = 0; + auto itr = container.begin(); + while (itr != container.end() && !predicate(*itr)) { + itr = container.erase(itr); + ++erased; + } + return erased; +} + }; // namespace yb diff --git a/src/yb/yql/pgwrapper/pg_mini-test.cc b/src/yb/yql/pgwrapper/pg_mini-test.cc index 90667f97f0a0..a216b6503bd1 100644 --- a/src/yb/yql/pgwrapper/pg_mini-test.cc +++ b/src/yb/yql/pgwrapper/pg_mini-test.cc @@ -82,6 +82,10 @@ DECLARE_bool(enable_wait_queues); DECLARE_bool(pg_client_use_shared_memory); DECLARE_bool(ysql_yb_enable_replica_identity); DECLARE_bool(TEST_enable_pg_client_mock); +DECLARE_bool(delete_intents_sst_files); +DECLARE_bool(use_bootstrap_intent_ht_filter); +DECLARE_bool(TEST_no_schedule_remove_intents); +DECLARE_bool(TEST_disable_flush_on_shutdown); DECLARE_double(TEST_respond_write_failed_probability); DECLARE_double(TEST_transaction_ignore_applying_probability); @@ -2395,6 +2399,54 @@ TEST_F_EX(PgMiniTest, RegexPushdown, PgMiniTestSingleNode) { } } +TEST_F(PgMiniTestSingleNode, TestBootstrapOnAppliedTransactionWithIntents) { + ANNOTATE_UNPROTECTED_WRITE(FLAGS_delete_intents_sst_files) = false; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_use_bootstrap_intent_ht_filter) = true; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_no_schedule_remove_intents) = true; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_disable_flush_on_shutdown) = true; + + auto conn1 = ASSERT_RESULT(Connect()); + auto conn2 = ASSERT_RESULT(Connect()); + + LOG(INFO) << "Creating table"; + ASSERT_OK(conn1.Execute("CREATE TABLE test(a int) SPLIT INTO 1 TABLETS")); + + const auto& peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kLeaders); + tablet::TabletPeerPtr tablet_peer = nullptr; + for (auto peer : peers) { + if (peer->shared_tablet()->regular_db()) { + tablet_peer = peer; + break; + } + } + ASSERT_NE(tablet_peer, nullptr); + + LOG(INFO) << "T1 - BEGIN/INSERT"; + ASSERT_OK(conn1.StartTransaction(IsolationLevel::SNAPSHOT_ISOLATION)); + ASSERT_OK(conn1.Execute("INSERT INTO test(a) VALUES (0)")); + + LOG(INFO) << "Flush"; + ASSERT_OK(tablet_peer->shared_tablet()->Flush(tablet::FlushMode::kSync)); + + LOG(INFO) << "T2 - BEGIN/INSERT"; + ASSERT_OK(conn2.StartTransaction(IsolationLevel::SNAPSHOT_ISOLATION)); + ASSERT_OK(conn2.Execute("INSERT INTO test(a) VALUES (1)")); + + LOG(INFO) << "Flush"; + ASSERT_OK(tablet_peer->shared_tablet()->Flush(tablet::FlushMode::kSync)); + + LOG(INFO) << "T1 - Commit"; + ASSERT_OK(conn1.CommitTransaction()); + + ASSERT_OK(tablet_peer->FlushBootstrapState()); + + LOG(INFO) << "Restarting cluster"; + ASSERT_OK(RestartCluster()); + + conn1 = ASSERT_RESULT(Connect()); + auto res = ASSERT_RESULT(conn1.FetchRow("SELECT COUNT(*) FROM test")); + ASSERT_EQ(res, 1); +} Status MockAbortFailure( const yb::tserver::PgFinishTransactionRequestPB* req,