Skip to content

Commit

Permalink
[2024.1.1][#23048] Revert "[BACKPORT 2024.1][#21741] docdb: Filter in…
Browse files Browse the repository at this point in the history
…tents by min running hybrid time during tablet bootstrap"

Summary:
This reverts commit 717fbc5, which caused CDC tests to
start failing, in order to unblock 2024.1.1 branch.

Test Plan: Jenkins: urgent, rebase: 2024.1.1

Reviewers: rthallam

Reviewed By: rthallam

Subscribers: ybase

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D36261
  • Loading branch information
es1024 committed Jun 29, 2024
1 parent 94ff887 commit 708f243
Show file tree
Hide file tree
Showing 10 changed files with 21 additions and 146 deletions.
1 change: 0 additions & 1 deletion src/yb/consensus/consensus.proto
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,6 @@ message ClientReplicatedRetryableRequestRangesPB {
message TabletBootstrapStatePB {
optional OpIdPB last_op_id = 1;
repeated ClientReplicatedRetryableRequestRangesPB client_requests = 2;
optional fixed64 min_running_ht = 3;
}

// A Raft implementation.
Expand Down
35 changes: 11 additions & 24 deletions src/yb/tablet/tablet_bootstrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -502,21 +502,7 @@ class TabletBootstrap {
VLOG_WITH_PREFIX(1) << "Tablet Metadata: " << super_block.DebugString();
}

std::optional<consensus::TabletBootstrapStatePB> bootstrap_state_pb = std::nullopt;
HybridTime min_running_ht = HybridTime::kInvalid;
if (GetAtomicFlag(&FLAGS_enable_flush_retryable_requests) && data_.bootstrap_state_manager) {
auto result = data_.bootstrap_state_manager->LoadFromDisk();
if (result.ok()) {
bootstrap_state_pb = std::move(*result);

const auto& bootstrap_state = data_.bootstrap_state_manager->bootstrap_state();
min_running_ht = bootstrap_state.GetMinRunningHybridTime();
} else if (!result.status().IsNotFound()) {
return result.status();
}
}

const bool has_blocks = VERIFY_RESULT(OpenTablet(min_running_ht));
const bool has_blocks = VERIFY_RESULT(OpenTablet());

if (data_.retryable_requests) {
const auto retryable_request_timeout_secs = meta_->IsSysCatalog()
Expand All @@ -527,8 +513,15 @@ class TabletBootstrap {
}

// Load retryable requests after metrics entity has been instantiated.
if (bootstrap_state_pb && data_.bootstrap_retryable_requests && data_.retryable_requests) {
data_.retryable_requests->FromPB(*bootstrap_state_pb);
if (GetAtomicFlag(&FLAGS_enable_flush_retryable_requests) &&
data_.bootstrap_retryable_requests && data_.retryable_requests &&
data_.bootstrap_state_manager) {
auto result = data_.bootstrap_state_manager->LoadFromDisk();
if (result.ok()) {
data_.retryable_requests->FromPB(*result);
} else if (!result.status().IsNotFound()) {
return result.status();
}
}

if (FLAGS_TEST_dump_docdb_before_tablet_bootstrap) {
Expand Down Expand Up @@ -618,7 +611,7 @@ class TabletBootstrap {
}

// Sets result to true if there was any data on disk for this tablet.
Result<bool> OpenTablet(HybridTime min_running_ht) {
Result<bool> OpenTablet() {
CleanupSnapshots();
// Use operator new instead of make_shared for creating the shared_ptr. That way, we would have
// the shared_ptr's control block hold a raw pointer to the Tablet object as opposed to the
Expand All @@ -629,12 +622,6 @@ class TabletBootstrap {
// reference count drops to 0. With make_shared, there's a risk of a leaked weak_ptr holding up
// the object's memory even after all shared_ptrs go out of scope.
std::shared_ptr<Tablet> tablet(new Tablet(data_.tablet_init_data));

auto participant = tablet->transaction_participant();
if (participant) {
participant->SetMinRunningHybridTimeLowerBound(min_running_ht);
}

// Doing nothing for now except opening a tablet locally.
LOG_TIMING_PREFIX(INFO, LogPrefix(), "opening tablet") {
RETURN_NOT_OK(tablet->Open());
Expand Down
27 changes: 0 additions & 27 deletions src/yb/tablet/tablet_bootstrap_state_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,29 +26,6 @@

namespace yb::tablet {

TabletBootstrapState::TabletBootstrapState(const TabletBootstrapState& rhs):
min_running_ht_(rhs.min_running_ht_.load()) {}

TabletBootstrapState::TabletBootstrapState(TabletBootstrapState&& rhs):
min_running_ht_(rhs.min_running_ht_.load()) {}

void TabletBootstrapState::operator=(TabletBootstrapState&& rhs) {
min_running_ht_.store(rhs.min_running_ht_.load());
}

void TabletBootstrapState::CopyFrom(const TabletBootstrapState& rhs) {
min_running_ht_.store(rhs.min_running_ht_.load());
}

void TabletBootstrapState::ToPB(consensus::TabletBootstrapStatePB* pb) const {
pb->set_min_running_ht(min_running_ht_.load().ToUint64());
}

void TabletBootstrapState::FromPB(const consensus::TabletBootstrapStatePB& pb) {
min_running_ht_.store(
pb.has_min_running_ht() ? HybridTime(pb.min_running_ht()) : HybridTime::kInvalid);
}

TabletBootstrapStateManager::TabletBootstrapStateManager() { }

TabletBootstrapStateManager::TabletBootstrapStateManager(
Expand Down Expand Up @@ -78,11 +55,8 @@ Status TabletBootstrapStateManager::SaveToDisk(consensus::RaftConsensus& raft_co
return Status::OK();
}

TabletBootstrapState bootstrap_state(bootstrap_state_);

consensus::TabletBootstrapStatePB pb;
retryable_requests->ToPB(&pb);
bootstrap_state.ToPB(&pb);

auto path = NewFilePath();
LOG(INFO) << "Saving bootstrap state up to " << pb.last_op_id() << " to " << path;
Expand Down Expand Up @@ -114,7 +88,6 @@ Result<consensus::TabletBootstrapStatePB> TabletBootstrapStateManager::LoadFromD
RETURN_NOT_OK_PREPEND(
pb_util::ReadPBContainerFromPath(fs_manager()->env(), path, &pb),
Format("Could not load bootstrap state from $0", path));
bootstrap_state_.FromPB(pb);
LOG(INFO) << Format("Loaded tablet ($0) bootstrap state "
"(max_replicated_op_id_=$1) from $2",
tablet_id_, pb.last_op_id(), path);
Expand Down
25 changes: 0 additions & 25 deletions src/yb/tablet/tablet_bootstrap_state_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,28 +30,6 @@

namespace yb::tablet {

class TabletBootstrapState {
public:
TabletBootstrapState() = default;
~TabletBootstrapState() = default;

TabletBootstrapState(const TabletBootstrapState& rhs);

TabletBootstrapState(TabletBootstrapState&& rhs);
void operator=(TabletBootstrapState&& rhs);

void CopyFrom(const TabletBootstrapState& rhs);

void SetMinRunningHybridTime(HybridTime min_running_ht) { min_running_ht_.store(min_running_ht); }
HybridTime GetMinRunningHybridTime() const { return min_running_ht_.load(); }

void ToPB(consensus::TabletBootstrapStatePB* pb) const;
void FromPB(const consensus::TabletBootstrapStatePB& pb);

private:
std::atomic<HybridTime> min_running_ht_{HybridTime::kInvalid};
};

class TabletBootstrapStateManager {
public:
TabletBootstrapStateManager();
Expand All @@ -62,8 +40,6 @@ class TabletBootstrapStateManager {
Status Init();

FsManager* fs_manager() const { return fs_manager_; }
const TabletBootstrapState& bootstrap_state() const { return bootstrap_state_; }
TabletBootstrapState& bootstrap_state() { return bootstrap_state_; }

static std::string FilePath(const std::string& path) {
return JoinPathSegments(path, FileName());
Expand Down Expand Up @@ -108,7 +84,6 @@ class TabletBootstrapStateManager {
bool has_file_on_disk_ = false;
FsManager* fs_manager_ = nullptr;
std::string dir_;
TabletBootstrapState bootstrap_state_;

static constexpr char kSuffixNew[] = ".NEW";
static constexpr char kTabletBootstrapStateFileName[] = "retryable_requests";
Expand Down
13 changes: 0 additions & 13 deletions src/yb/tablet/tablet_peer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -329,12 +329,6 @@ Status TabletPeer::InitTabletPeer(

prepare_thread_ = std::make_unique<Preparer>(consensus_.get(), tablet_prepare_pool);

auto txn_participant = tablet_->transaction_participant();
if (txn_participant) {
txn_participant->SetMinRunningHybridTimeUpdateCallback(
std::bind_front(&TabletPeer::MinRunningHybridTimeUpdated, this));
}

// "Publish" the tablet object right before releasing the lock.
tablet_obj_state_.store(TabletObjectState::kAvailable, std::memory_order_release);
}
Expand Down Expand Up @@ -1822,13 +1816,6 @@ TabletBootstrapFlushState TabletPeer::TEST_TabletBootstrapStateFlusherState() co
: TabletBootstrapFlushState::kFlushIdle;
}

void TabletPeer::MinRunningHybridTimeUpdated(HybridTime min_running_ht) {
if (min_running_ht && min_running_ht != HybridTime::kMax) {
VLOG_WITH_PREFIX(2) << "Min running hybrid time updated: " << min_running_ht;
bootstrap_state_manager_->bootstrap_state().SetMinRunningHybridTime(min_running_ht);
}
}

Preparer* TabletPeer::DEBUG_GetPreparer() { return prepare_thread_.get(); }

} // namespace tablet
Expand Down
2 changes: 0 additions & 2 deletions src/yb/tablet/tablet_peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -594,8 +594,6 @@ class TabletPeer : public std::enable_shared_from_this<TabletPeer>,

bool FlushBootstrapStateEnabled() const;

void MinRunningHybridTimeUpdated(HybridTime min_running_ht);

MetricRegistry* metric_registry_;

bool IsLeader() override {
Expand Down
13 changes: 4 additions & 9 deletions src/yb/tablet/transaction_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include "yb/dockv/intent.h"

#include "yb/docdb/bounded_rocksdb_iterator.h"
#include "yb/docdb/doc_ql_filefilter.h"
#include "yb/docdb/docdb_rocksdb_util.h"
#include "yb/docdb/iter_util.h"

Expand Down Expand Up @@ -52,12 +51,11 @@ namespace tablet {

namespace {

docdb::BoundedRocksDbIterator CreateFullScanIterator(
rocksdb::DB* db, std::shared_ptr<rocksdb::ReadFileFilter> filter) {
docdb::BoundedRocksDbIterator CreateFullScanIterator(rocksdb::DB* db) {
return docdb::BoundedRocksDbIterator(docdb::CreateRocksDBIterator(
db, &docdb::KeyBounds::kNoBounds,
docdb::BloomFilterMode::DONT_USE_BLOOM_FILTER,
/* user_key_for_filter= */ boost::none, rocksdb::kDefaultQueryId, filter));
/* user_key_for_filter= */ boost::none, rocksdb::kDefaultQueryId));
}

} // namespace
Expand All @@ -77,11 +75,8 @@ class TransactionLoader::Executor {
if (!scoped_pending_operation_.ok()) {
return false;
}
auto min_running_ht = context().MinRunningHybridTime();
VLOG_WITH_PREFIX(1) << "TransactionLoader min_running_ht: " << min_running_ht;
regular_iterator_ = CreateFullScanIterator(db.regular, nullptr /* filter */);
intents_iterator_ = CreateFullScanIterator(db.intents,
docdb::CreateIntentHybridTimeFileFilter(min_running_ht));
regular_iterator_ = CreateFullScanIterator(db.regular);
intents_iterator_ = CreateFullScanIterator(db.intents);
loader_.state_.store(TransactionLoaderState::kLoading, std::memory_order_release);
CHECK_OK(yb::Thread::Create(
"transaction_loader", "loader", &Executor::Execute, this, &loader_.load_thread_))
Expand Down
1 change: 0 additions & 1 deletion src/yb/tablet/transaction_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ class TransactionLoaderContext {
OneWayBitmap&& replicated_batches,
const ApplyStateWithCommitHt* pending_apply) = 0;
virtual void LoadFinished(Status load_status) = 0;
virtual HybridTime MinRunningHybridTime() = 0;
};

YB_DEFINE_ENUM(TransactionLoaderState, (kNotStarted)(kLoading)(kCompleted)(kFailed));
Expand Down
46 changes: 6 additions & 40 deletions src/yb/tablet/transaction_participant.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1074,21 +1074,9 @@ class TransactionParticipant::Impl
return &participant_context_;
}

void SetMinRunningHybridTimeLowerBound(HybridTime lower_bound) {
if (lower_bound == HybridTime::kMax || lower_bound == HybridTime::kInvalid) {
return;
}
HybridTime current_ht = min_running_ht_.load(std::memory_order_acquire);
while ((!current_ht || current_ht < lower_bound)
&& !min_running_ht_.compare_exchange_weak(current_ht, lower_bound)) {}
VLOG_WITH_PREFIX(1) << "Updated min running hybrid time to at least " << lower_bound
<< ", was " << current_ht;
}

HybridTime MinRunningHybridTime() override {
HybridTime MinRunningHybridTime() {
auto result = min_running_ht_.load(std::memory_order_acquire);
if (result == HybridTime::kMax || result == HybridTime::kInvalid
|| !transactions_loaded_.load()) {
if (result == HybridTime::kMax || result == HybridTime::kInvalid) {
return result;
}
auto now = CoarseMonoClock::now();
Expand Down Expand Up @@ -1251,11 +1239,6 @@ class TransactionParticipant::Impl
return transactions_.size();
}

void SetMinRunningHybridTimeUpdateCallback(std::function<void(HybridTime)> callback) {
std::lock_guard lock(mutex_);
min_running_ht_callback_ = std::move(callback);
}

OneWayBitmap TEST_TransactionReplicatedBatches(const TransactionId& id) {
std::lock_guard lock(mutex_);
auto it = transactions_.find(id);
Expand Down Expand Up @@ -1485,7 +1468,7 @@ class TransactionParticipant::Impl
++idx;
}
}
transactions_loaded_.store(true);
transactions_loaded_ = true;
TransactionsModifiedUnlocked(&min_running_notifier);
}

Expand All @@ -1506,28 +1489,21 @@ class TransactionParticipant::Impl
}
}

void SetMinRunningHybridTime(HybridTime min_running_ht) REQUIRES(mutex_) {
min_running_ht_.store(min_running_ht, std::memory_order_release);
if (min_running_ht_callback_) {
min_running_ht_callback_(min_running_ht);
}
}

void TransactionsModifiedUnlocked(MinRunningNotifier* min_running_notifier) REQUIRES(mutex_) {
metric_transactions_running_->set_value(transactions_.size());
if (!transactions_loaded_) {
return;
}

if (transactions_.empty()) {
SetMinRunningHybridTime(HybridTime::kMax);
min_running_ht_.store(HybridTime::kMax, std::memory_order_release);
CheckMinRunningHybridTimeSatisfiedUnlocked(min_running_notifier);
return;
}

auto& first_txn = **transactions_.get<StartTimeTag>().begin();
if (first_txn.start_ht() != min_running_ht_.load(std::memory_order_relaxed)) {
SetMinRunningHybridTime(first_txn.start_ht());
min_running_ht_.store(first_txn.start_ht(), std::memory_order_release);
next_check_min_running_.store(
CoarseMonoClock::now() + 1ms * FLAGS_transaction_min_running_check_delay_ms,
std::memory_order_release);
Expand Down Expand Up @@ -2182,7 +2158,6 @@ class TransactionParticipant::Impl
std::atomic<CoarseTimePoint> next_check_min_running_{CoarseTimePoint()};
HybridTime waiting_for_min_running_ht_ = HybridTime::kMax;
std::atomic<bool> shutdown_done_{false};
std::function<void(HybridTime)> min_running_ht_callback_ GUARDED_BY(mutex_);

LRUCache<TransactionId> cleanup_cache_{FLAGS_transactions_cleanup_cache_size};

Expand All @@ -2204,7 +2179,7 @@ class TransactionParticipant::Impl

std::shared_ptr<MemTracker> mem_tracker_ GUARDED_BY(mutex_);

std::atomic<bool> transactions_loaded_{false};
bool transactions_loaded_ GUARDED_BY(mutex_) = false;

bool pending_applied_notified_ = false;
std::mutex pending_applies_mutex_;
Expand Down Expand Up @@ -2329,10 +2304,6 @@ TransactionParticipantContext* TransactionParticipant::context() const {
return impl_->participant_context();
}

void TransactionParticipant::SetMinRunningHybridTimeLowerBound(HybridTime lower_bound) {
impl_->SetMinRunningHybridTimeLowerBound(lower_bound);
}

HybridTime TransactionParticipant::MinRunningHybridTime() const {
return impl_->MinRunningHybridTime();
}
Expand Down Expand Up @@ -2442,10 +2413,5 @@ void TransactionParticipant::RecordConflictResolutionScanLatency(MonoDelta laten
impl_->RecordConflictResolutionScanLatency(latency);
}

void TransactionParticipant::SetMinRunningHybridTimeUpdateCallback(
std::function<void(HybridTime)> callback) {
impl_->SetMinRunningHybridTimeUpdateCallback(std::move(callback));
}

} // namespace tablet
} // namespace yb
4 changes: 0 additions & 4 deletions src/yb/tablet/transaction_participant.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,6 @@ class TransactionParticipant : public TransactionStatusManager {

TransactionParticipantContext* context() const;

void SetMinRunningHybridTimeLowerBound(HybridTime lower_bound);

HybridTime MinRunningHybridTime() const override;

Result<HybridTime> WaitForSafeTime(HybridTime safe_time, CoarseTimePoint deadline) override;
Expand Down Expand Up @@ -240,8 +238,6 @@ class TransactionParticipant : public TransactionStatusManager {

size_t GetNumRunningTransactions() const;

void SetMinRunningHybridTimeUpdateCallback(std::function<void(HybridTime)> callback);

struct CountIntentsResult {
size_t num_intents;
size_t num_transactions;
Expand Down

0 comments on commit 708f243

Please sign in to comment.