Skip to content

Commit

Permalink
[#21741] docdb: Filter intents by min running hybrid time during tabl…
Browse files Browse the repository at this point in the history
…et bootstrap

Summary:
When CDC streams are lagging, there may be a large number of intent SST files whose contents have all been applied already, but must be maintained for CDC purposes. We work around the performance implications of having a large number of these files by filtering out SST files by min running hybrid time (D33131 / 97536b4), but this approach does not work as is for bootstrap, since min running hybrid time is not currently determined until bootstrap has finished.

This change adds the saving of min running hybrid time periodically with retryable requests state, and then loads this min running hybrid time into the transaction participant early in bootstrap, to allow the SST file filter used in D33131 / 97536b4 to be used at bootstrap time as well.

To avoid reintroducing the issue introduced by D34389 / 2458c08, this diff also removes the requirement that min running hybrid time must not be set before bootstrap, by moving the requirement to `transactions_loaded_`.

**Upgrade/Rollback safety:**

This change is not guarded by a gflag or autoflag. If the newly added min running hybrid time field is missing (upgrade), we do not apply a filter (the current behavior), and the presence of the optional protobuf field when downgrading is entirely ignored (the old behavior is to unconditionally not apply a filter). There are no correctness issues involved with either applying or not applying the filter, as it is entirely a performance optimization.
Jira: DB-10615

Test Plan: Jenkins

Reviewers: yyan, qhu

Reviewed By: yyan, qhu

Subscribers: rthallam, ybase

Differential Revision: https://phorge.dev.yugabyte.com/D35639
  • Loading branch information
es1024 committed Jun 20, 2024
1 parent 2e2c4ea commit 8b23a4e
Show file tree
Hide file tree
Showing 10 changed files with 146 additions and 21 deletions.
1 change: 1 addition & 0 deletions src/yb/consensus/consensus.proto
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,7 @@ message ClientReplicatedRetryableRequestRangesPB {
message TabletBootstrapStatePB {
optional OpIdPB last_op_id = 1;
repeated ClientReplicatedRetryableRequestRangesPB client_requests = 2;
optional fixed64 min_running_ht = 3;
}

// A Raft implementation.
Expand Down
35 changes: 24 additions & 11 deletions src/yb/tablet/tablet_bootstrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,21 @@ class TabletBootstrap {
VLOG_WITH_PREFIX(1) << "Tablet Metadata: " << super_block.DebugString();
}

const bool has_blocks = VERIFY_RESULT(OpenTablet());
std::optional<consensus::TabletBootstrapStatePB> bootstrap_state_pb = std::nullopt;
HybridTime min_running_ht = HybridTime::kInvalid;
if (GetAtomicFlag(&FLAGS_enable_flush_retryable_requests) && data_.bootstrap_state_manager) {
auto result = data_.bootstrap_state_manager->LoadFromDisk();
if (result.ok()) {
bootstrap_state_pb = std::move(*result);

const auto& bootstrap_state = data_.bootstrap_state_manager->bootstrap_state();
min_running_ht = bootstrap_state.GetMinRunningHybridTime();
} else if (!result.status().IsNotFound()) {
return result.status();
}
}

const bool has_blocks = VERIFY_RESULT(OpenTablet(min_running_ht));

if (data_.retryable_requests) {
const auto retryable_request_timeout_secs = meta_->IsSysCatalog()
Expand All @@ -513,15 +527,8 @@ class TabletBootstrap {
}

// Load retryable requests after metrics entity has been instantiated.
if (GetAtomicFlag(&FLAGS_enable_flush_retryable_requests) &&
data_.bootstrap_retryable_requests && data_.retryable_requests &&
data_.bootstrap_state_manager) {
auto result = data_.bootstrap_state_manager->LoadFromDisk();
if (result.ok()) {
data_.retryable_requests->FromPB(*result);
} else if (!result.status().IsNotFound()) {
return result.status();
}
if (bootstrap_state_pb && data_.bootstrap_retryable_requests && data_.retryable_requests) {
data_.retryable_requests->FromPB(*bootstrap_state_pb);
}

if (FLAGS_TEST_dump_docdb_before_tablet_bootstrap) {
Expand Down Expand Up @@ -611,7 +618,7 @@ class TabletBootstrap {
}

// Sets result to true if there was any data on disk for this tablet.
Result<bool> OpenTablet() {
Result<bool> OpenTablet(HybridTime min_running_ht) {
CleanupSnapshots();
// Use operator new instead of make_shared for creating the shared_ptr. That way, we would have
// the shared_ptr's control block hold a raw pointer to the Tablet object as opposed to the
Expand All @@ -622,6 +629,12 @@ class TabletBootstrap {
// reference count drops to 0. With make_shared, there's a risk of a leaked weak_ptr holding up
// the object's memory even after all shared_ptrs go out of scope.
std::shared_ptr<Tablet> tablet(new Tablet(data_.tablet_init_data));

auto participant = tablet->transaction_participant();
if (participant) {
participant->SetMinRunningHybridTimeLowerBound(min_running_ht);
}

// Doing nothing for now except opening a tablet locally.
LOG_TIMING_PREFIX(INFO, LogPrefix(), "opening tablet") {
RETURN_NOT_OK(tablet->Open());
Expand Down
27 changes: 27 additions & 0 deletions src/yb/tablet/tablet_bootstrap_state_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,29 @@

namespace yb::tablet {

TabletBootstrapState::TabletBootstrapState(const TabletBootstrapState& rhs):
min_running_ht_(rhs.min_running_ht_.load()) {}

TabletBootstrapState::TabletBootstrapState(TabletBootstrapState&& rhs):
min_running_ht_(rhs.min_running_ht_.load()) {}

void TabletBootstrapState::operator=(TabletBootstrapState&& rhs) {
min_running_ht_.store(rhs.min_running_ht_.load());
}

void TabletBootstrapState::CopyFrom(const TabletBootstrapState& rhs) {
min_running_ht_.store(rhs.min_running_ht_.load());
}

void TabletBootstrapState::ToPB(consensus::TabletBootstrapStatePB* pb) const {
pb->set_min_running_ht(min_running_ht_.load().ToUint64());
}

void TabletBootstrapState::FromPB(const consensus::TabletBootstrapStatePB& pb) {
min_running_ht_.store(
pb.has_min_running_ht() ? HybridTime(pb.min_running_ht()) : HybridTime::kInvalid);
}

TabletBootstrapStateManager::TabletBootstrapStateManager() { }

TabletBootstrapStateManager::TabletBootstrapStateManager(
Expand Down Expand Up @@ -55,8 +78,11 @@ Status TabletBootstrapStateManager::SaveToDisk(consensus::RaftConsensus& raft_co
return Status::OK();
}

TabletBootstrapState bootstrap_state(bootstrap_state_);

consensus::TabletBootstrapStatePB pb;
retryable_requests->ToPB(&pb);
bootstrap_state.ToPB(&pb);

auto path = NewFilePath();
LOG(INFO) << "Saving bootstrap state up to " << pb.last_op_id() << " to " << path;
Expand Down Expand Up @@ -88,6 +114,7 @@ Result<consensus::TabletBootstrapStatePB> TabletBootstrapStateManager::LoadFromD
RETURN_NOT_OK_PREPEND(
pb_util::ReadPBContainerFromPath(fs_manager()->env(), path, &pb),
Format("Could not load bootstrap state from $0", path));
bootstrap_state_.FromPB(pb);
LOG(INFO) << Format("Loaded tablet ($0) bootstrap state "
"(max_replicated_op_id_=$1) from $2",
tablet_id_, pb.last_op_id(), path);
Expand Down
25 changes: 25 additions & 0 deletions src/yb/tablet/tablet_bootstrap_state_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,28 @@

namespace yb::tablet {

class TabletBootstrapState {
public:
TabletBootstrapState() = default;
~TabletBootstrapState() = default;

TabletBootstrapState(const TabletBootstrapState& rhs);

TabletBootstrapState(TabletBootstrapState&& rhs);
void operator=(TabletBootstrapState&& rhs);

void CopyFrom(const TabletBootstrapState& rhs);

void SetMinRunningHybridTime(HybridTime min_running_ht) { min_running_ht_.store(min_running_ht); }
HybridTime GetMinRunningHybridTime() const { return min_running_ht_.load(); }

void ToPB(consensus::TabletBootstrapStatePB* pb) const;
void FromPB(const consensus::TabletBootstrapStatePB& pb);

private:
std::atomic<HybridTime> min_running_ht_{HybridTime::kInvalid};
};

class TabletBootstrapStateManager {
public:
TabletBootstrapStateManager();
Expand All @@ -40,6 +62,8 @@ class TabletBootstrapStateManager {
Status Init();

FsManager* fs_manager() const { return fs_manager_; }
const TabletBootstrapState& bootstrap_state() const { return bootstrap_state_; }
TabletBootstrapState& bootstrap_state() { return bootstrap_state_; }

static std::string FilePath(const std::string& path) {
return JoinPathSegments(path, FileName());
Expand Down Expand Up @@ -84,6 +108,7 @@ class TabletBootstrapStateManager {
bool has_file_on_disk_ = false;
FsManager* fs_manager_ = nullptr;
std::string dir_;
TabletBootstrapState bootstrap_state_;

static constexpr char kSuffixNew[] = ".NEW";
static constexpr char kTabletBootstrapStateFileName[] = "retryable_requests";
Expand Down
13 changes: 13 additions & 0 deletions src/yb/tablet/tablet_peer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,12 @@ Status TabletPeer::InitTabletPeer(

prepare_thread_ = std::make_unique<Preparer>(consensus_.get(), tablet_prepare_pool);

auto txn_participant = tablet_->transaction_participant();
if (txn_participant) {
txn_participant->SetMinRunningHybridTimeUpdateCallback(
std::bind_front(&TabletPeer::MinRunningHybridTimeUpdated, this));
}

// "Publish" the tablet object right before releasing the lock.
tablet_obj_state_.store(TabletObjectState::kAvailable, std::memory_order_release);
}
Expand Down Expand Up @@ -1816,6 +1822,13 @@ TabletBootstrapFlushState TabletPeer::TEST_TabletBootstrapStateFlusherState() co
: TabletBootstrapFlushState::kFlushIdle;
}

void TabletPeer::MinRunningHybridTimeUpdated(HybridTime min_running_ht) {
if (min_running_ht && min_running_ht != HybridTime::kMax) {
VLOG_WITH_PREFIX(2) << "Min running hybrid time updated: " << min_running_ht;
bootstrap_state_manager_->bootstrap_state().SetMinRunningHybridTime(min_running_ht);
}
}

Preparer* TabletPeer::DEBUG_GetPreparer() { return prepare_thread_.get(); }

bool TabletPeer::HasSufficientDiskSpaceForWrite() {
Expand Down
2 changes: 2 additions & 0 deletions src/yb/tablet/tablet_peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,8 @@ class TabletPeer : public std::enable_shared_from_this<TabletPeer>,

bool FlushBootstrapStateEnabled() const;

void MinRunningHybridTimeUpdated(HybridTime min_running_ht);

MetricRegistry* metric_registry_;

bool IsLeader() override {
Expand Down
13 changes: 9 additions & 4 deletions src/yb/tablet/transaction_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "yb/dockv/intent.h"

#include "yb/docdb/bounded_rocksdb_iterator.h"
#include "yb/docdb/doc_ql_filefilter.h"
#include "yb/docdb/docdb_rocksdb_util.h"
#include "yb/docdb/iter_util.h"

Expand Down Expand Up @@ -51,11 +52,12 @@ namespace tablet {

namespace {

docdb::BoundedRocksDbIterator CreateFullScanIterator(rocksdb::DB* db) {
docdb::BoundedRocksDbIterator CreateFullScanIterator(
rocksdb::DB* db, std::shared_ptr<rocksdb::ReadFileFilter> filter) {
return docdb::BoundedRocksDbIterator(docdb::CreateRocksDBIterator(
db, &docdb::KeyBounds::kNoBounds,
docdb::BloomFilterMode::DONT_USE_BLOOM_FILTER,
/* user_key_for_filter= */ boost::none, rocksdb::kDefaultQueryId));
/* user_key_for_filter= */ boost::none, rocksdb::kDefaultQueryId, filter));
}

} // namespace
Expand All @@ -75,8 +77,11 @@ class TransactionLoader::Executor {
if (!scoped_pending_operation_.ok()) {
return false;
}
regular_iterator_ = CreateFullScanIterator(db.regular);
intents_iterator_ = CreateFullScanIterator(db.intents);
auto min_running_ht = context().MinRunningHybridTime();
VLOG_WITH_PREFIX(1) << "TransactionLoader min_running_ht: " << min_running_ht;
regular_iterator_ = CreateFullScanIterator(db.regular, nullptr /* filter */);
intents_iterator_ = CreateFullScanIterator(db.intents,
docdb::CreateIntentHybridTimeFileFilter(min_running_ht));
loader_.state_.store(TransactionLoaderState::kLoading, std::memory_order_release);
CHECK_OK(yb::Thread::Create(
"transaction_loader", "loader", &Executor::Execute, this, &loader_.load_thread_))
Expand Down
1 change: 1 addition & 0 deletions src/yb/tablet/transaction_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class TransactionLoaderContext {
OneWayBitmap&& replicated_batches,
const ApplyStateWithCommitHt* pending_apply) = 0;
virtual void LoadFinished(Status load_status) = 0;
virtual HybridTime MinRunningHybridTime() = 0;
};

YB_DEFINE_ENUM(TransactionLoaderState, (kNotStarted)(kLoading)(kCompleted)(kFailed));
Expand Down
46 changes: 40 additions & 6 deletions src/yb/tablet/transaction_participant.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1073,9 +1073,21 @@ class TransactionParticipant::Impl
return &participant_context_;
}

HybridTime MinRunningHybridTime() {
void SetMinRunningHybridTimeLowerBound(HybridTime lower_bound) {
if (lower_bound == HybridTime::kMax || lower_bound == HybridTime::kInvalid) {
return;
}
HybridTime current_ht = min_running_ht_.load(std::memory_order_acquire);
while ((!current_ht || current_ht < lower_bound)
&& !min_running_ht_.compare_exchange_weak(current_ht, lower_bound)) {}
VLOG_WITH_PREFIX(1) << "Updated min running hybrid time to at least " << lower_bound
<< ", was " << current_ht;
}

HybridTime MinRunningHybridTime() override {
auto result = min_running_ht_.load(std::memory_order_acquire);
if (result == HybridTime::kMax || result == HybridTime::kInvalid) {
if (result == HybridTime::kMax || result == HybridTime::kInvalid
|| !transactions_loaded_.load()) {
return result;
}
auto now = CoarseMonoClock::now();
Expand Down Expand Up @@ -1238,6 +1250,11 @@ class TransactionParticipant::Impl
return transactions_.size();
}

void SetMinRunningHybridTimeUpdateCallback(std::function<void(HybridTime)> callback) {
std::lock_guard lock(mutex_);
min_running_ht_callback_ = std::move(callback);
}

OneWayBitmap TEST_TransactionReplicatedBatches(const TransactionId& id) {
std::lock_guard lock(mutex_);
auto it = transactions_.find(id);
Expand Down Expand Up @@ -1467,7 +1484,7 @@ class TransactionParticipant::Impl
++idx;
}
}
transactions_loaded_ = true;
transactions_loaded_.store(true);
TransactionsModifiedUnlocked(&min_running_notifier);
}

Expand All @@ -1488,21 +1505,28 @@ class TransactionParticipant::Impl
}
}

void SetMinRunningHybridTime(HybridTime min_running_ht) REQUIRES(mutex_) {
min_running_ht_.store(min_running_ht, std::memory_order_release);
if (min_running_ht_callback_) {
min_running_ht_callback_(min_running_ht);
}
}

void TransactionsModifiedUnlocked(MinRunningNotifier* min_running_notifier) REQUIRES(mutex_) {
metric_transactions_running_->set_value(transactions_.size());
if (!transactions_loaded_) {
return;
}

if (transactions_.empty()) {
min_running_ht_.store(HybridTime::kMax, std::memory_order_release);
SetMinRunningHybridTime(HybridTime::kMax);
CheckMinRunningHybridTimeSatisfiedUnlocked(min_running_notifier);
return;
}

auto& first_txn = **transactions_.get<StartTimeTag>().begin();
if (first_txn.start_ht() != min_running_ht_.load(std::memory_order_relaxed)) {
min_running_ht_.store(first_txn.start_ht(), std::memory_order_release);
SetMinRunningHybridTime(first_txn.start_ht());
next_check_min_running_.store(
CoarseMonoClock::now() + 1ms * FLAGS_transaction_min_running_check_delay_ms,
std::memory_order_release);
Expand Down Expand Up @@ -2157,6 +2181,7 @@ class TransactionParticipant::Impl
std::atomic<CoarseTimePoint> next_check_min_running_{CoarseTimePoint()};
HybridTime waiting_for_min_running_ht_ = HybridTime::kMax;
std::atomic<bool> shutdown_done_{false};
std::function<void(HybridTime)> min_running_ht_callback_ GUARDED_BY(mutex_);

LRUCache<TransactionId> cleanup_cache_{FLAGS_transactions_cleanup_cache_size};

Expand All @@ -2178,7 +2203,7 @@ class TransactionParticipant::Impl

std::shared_ptr<MemTracker> mem_tracker_ GUARDED_BY(mutex_);

bool transactions_loaded_ GUARDED_BY(mutex_) = false;
std::atomic<bool> transactions_loaded_{false};

bool pending_applied_notified_ = false;
std::mutex pending_applies_mutex_;
Expand Down Expand Up @@ -2303,6 +2328,10 @@ TransactionParticipantContext* TransactionParticipant::context() const {
return impl_->participant_context();
}

void TransactionParticipant::SetMinRunningHybridTimeLowerBound(HybridTime lower_bound) {
impl_->SetMinRunningHybridTimeLowerBound(lower_bound);
}

HybridTime TransactionParticipant::MinRunningHybridTime() const {
return impl_->MinRunningHybridTime();
}
Expand Down Expand Up @@ -2412,5 +2441,10 @@ void TransactionParticipant::RecordConflictResolutionScanLatency(MonoDelta laten
impl_->RecordConflictResolutionScanLatency(latency);
}

void TransactionParticipant::SetMinRunningHybridTimeUpdateCallback(
std::function<void(HybridTime)> callback) {
impl_->SetMinRunningHybridTimeUpdateCallback(std::move(callback));
}

} // namespace tablet
} // namespace yb
4 changes: 4 additions & 0 deletions src/yb/tablet/transaction_participant.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ class TransactionParticipant : public TransactionStatusManager {

TransactionParticipantContext* context() const;

void SetMinRunningHybridTimeLowerBound(HybridTime lower_bound);

HybridTime MinRunningHybridTime() const override;

Result<HybridTime> WaitForSafeTime(HybridTime safe_time, CoarseTimePoint deadline) override;
Expand Down Expand Up @@ -238,6 +240,8 @@ class TransactionParticipant : public TransactionStatusManager {

size_t GetNumRunningTransactions() const;

void SetMinRunningHybridTimeUpdateCallback(std::function<void(HybridTime)> callback);

struct CountIntentsResult {
size_t num_intents;
size_t num_transactions;
Expand Down

0 comments on commit 8b23a4e

Please sign in to comment.