Skip to content

Commit

Permalink
[yugabyte#21580] docdb: Filter intent SST files only retained for CDC
Browse files Browse the repository at this point in the history
Summary:
When CDC is lagging behind, there may be many SST files in intentsdb which only consist of applied transactions, but which we cannot yet delete, since CDC has not streamed the changes yet. These SST files impact performance of reading from intentsdb, even though we don't actually care about them in most cases (since all changes in them have already been applied).

This diff adds a hybrid time filter on intent iterators for read path, conflict resolution, and intent apply, to skip all SST files before min running hybrid time. This is gated behind the newly added `docdb_ht_filter_intents` gflag (default on in debug). `docdb_ht_filter_intents` to be set to default on after CDC stress tests with D31900 / 02dfc29 changes enabled as well.

Jira: DB-10466

Test Plan: Jenkins.

Reviewers: sergei, mbautin

Reviewed By: sergei, mbautin

Subscribers: yql, ybase, bogdan, rthallam

Differential Revision: https://phorge.dev.yugabyte.com/D33131
  • Loading branch information
es1024 committed Apr 5, 2024
1 parent 932a6e0 commit 97536b4
Show file tree
Hide file tree
Showing 16 changed files with 115 additions and 58 deletions.
6 changes: 3 additions & 3 deletions src/yb/ash/wait_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ DEFINE_RUNTIME_PG_PREVIEW_FLAG(bool, yb_enable_ash, false,
"and various background activities. This does nothing if "
"ysql_yb_enable_ash_infra is disabled.");

DEFINE_test_flag(bool, export_wait_state_names, yb::IsDebug(),
DEFINE_test_flag(bool, export_wait_state_names, yb::kIsDebug,
"Exports wait-state name as a human understandable string.");
DEFINE_test_flag(bool, trace_ash_wait_code_updates, yb::IsDebug(),
DEFINE_test_flag(bool, trace_ash_wait_code_updates, yb::kIsDebug,
"Add a trace line whenever the wait state code is updated.");
DEFINE_test_flag(uint32, yb_ash_sleep_at_wait_state_ms, 0,
"How long to sleep/delay when entering a particular wait state.");
DEFINE_test_flag(uint32, yb_ash_wait_code_to_sleep_at, 0,
"If enabled, add a sleep/delay when we enter the specified wait state.");
DEFINE_test_flag(bool, export_ash_uuids_as_hex_strings, yb::IsDebug(),
DEFINE_test_flag(bool, export_ash_uuids_as_hex_strings, yb::kIsDebug,
"Exports wait-state name as a human understandable string.");
DEFINE_test_flag(bool, ash_debug_aux, false, "Set ASH aux_info to the first 16 characters"
" of the method tserver is running");
Expand Down
4 changes: 2 additions & 2 deletions src/yb/client/ql-stress-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
#include "yb/tserver/ts_tablet_manager.h"

#include "yb/util/backoff_waiter.h"
#include "yb/util/debug-util.h"
#include "yb/util/debug.h"
#include "yb/util/format.h"
#include "yb/util/metrics.h"
#include "yb/util/random_util.h"
Expand Down Expand Up @@ -830,7 +830,7 @@ void QLStressTest::AddWriter(
}

void QLStressTest::TestWriteRejection() {
constexpr int kWriters = IsDebug() ? 10 : 20;
constexpr int kWriters = kIsDebug ? 10 : 20;
constexpr int kKeyBase = 10000;

std::array<std::atomic<int>, kWriters> keys;
Expand Down
21 changes: 10 additions & 11 deletions src/yb/docdb/conflict_resolution.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,9 @@ class ConflictResolver : public std::enable_shared_from_this<ConflictResolver> {

// Reads conflicts for specified intent from DB.
Status ReadIntentConflicts(IntentTypeSet type, KeyBytes* intent_key_prefix) {
EnsureIntentIteratorCreated();
if (!CreateIntentIteratorIfNecessary()) {
return Status::OK();
}

const auto conflicting_intent_types = kIntentTypeSetConflicts[type.ToUIntPtr()];

Expand Down Expand Up @@ -303,17 +305,12 @@ class ConflictResolver : public std::enable_shared_from_this<ConflictResolver> {
return intent_iter_.status();
}

void EnsureIntentIteratorCreated() {
bool CreateIntentIteratorIfNecessary() {
if (!intent_iter_.Initialized()) {
intent_iter_ = CreateRocksDBIterator(
doc_db_.intents,
doc_db_.key_bounds,
BloomFilterMode::DONT_USE_BLOOM_FILTER,
boost::none /* user_key_for_filter */,
rocksdb::kDefaultQueryId,
nullptr /* file_filter */,
&intent_key_upperbound_);
intent_iter_ = CreateIntentsIteratorWithHybridTimeFilter(
doc_db_.intents, &status_manager(), doc_db_.key_bounds, &intent_key_upperbound_);
}
return intent_iter_.Initialized();
}

Result<IntentTypesContainer> GetLockStatusInfo() {
Expand Down Expand Up @@ -1147,7 +1144,9 @@ class TransactionConflictResolverContext : public ConflictResolverContextBase {
// This is to prevent the case when we create an iterator on the regular DB where a
// provisional record has not yet been applied, and then create an iterator the intents
// DB where the provisional record has already been removed.
resolver->EnsureIntentIteratorCreated();
// Even in the case where there are no intents to iterate over, the following loop must be
// run, so we cannot return early if the following call returns false.
resolver->CreateIntentIteratorIfNecessary();

for (const auto& i : container) {
const Slice intent_key = i.first.AsSlice();
Expand Down
12 changes: 12 additions & 0 deletions src/yb/docdb/doc_ql_filefilter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@

#include "yb/rocksdb/db/compaction.h"

#include "yb/util/debug.h"

DEFINE_RUNTIME_bool(docdb_ht_filter_intents, yb::kIsDebug,
"Use hybrid time SST filter when scanning intents.");

namespace yb::docdb {

rocksdb::UserBoundaryTag TagForRangeComponent(size_t index);
Expand Down Expand Up @@ -147,4 +152,11 @@ std::shared_ptr<rocksdb::ReadFileFilter> CreateHybridTimeFileFilter(HybridTime m
return std::make_shared<HybridTimeFileFilter>(min_hybrid_time);
}

std::shared_ptr<rocksdb::ReadFileFilter> CreateIntentHybridTimeFileFilter(
HybridTime min_running_ht) {
return GetAtomicFlag(&FLAGS_docdb_ht_filter_intents) && min_running_ht != HybridTime::kMin
? std::make_shared<HybridTimeFileFilter>(min_running_ht)
: nullptr;
}

} // namespace yb::docdb
5 changes: 5 additions & 0 deletions src/yb/docdb/doc_ql_filefilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#pragma once

#include "yb/common/hybrid_time.h"
#include "yb/common/transaction.h"

#include "yb/qlexpr/qlexpr_fwd.h"

Expand All @@ -25,5 +26,9 @@ namespace yb::docdb {

std::shared_ptr<rocksdb::ReadFileFilter> CreateFileFilter(const qlexpr::YQLScanSpec& scan_spec);
std::shared_ptr<rocksdb::ReadFileFilter> CreateHybridTimeFileFilter(HybridTime min_hybrid_Time);
// Create a file filter for intentsdb using the given min running hybrid time. Filtering is done
// based on intent hybrid time stored in the intent key, not commit time of the transaction.
std::shared_ptr<rocksdb::ReadFileFilter> CreateIntentHybridTimeFileFilter(
HybridTime min_running_ht);

} // namespace yb::docdb
22 changes: 22 additions & 0 deletions src/yb/docdb/docdb_rocksdb_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,28 @@ unique_ptr<IntentAwareIterator> CreateIntentAwareIterator(
statistics ? statistics->IntentsDBStatistics() : nullptr);
}

BoundedRocksDbIterator CreateIntentsIteratorWithHybridTimeFilter(
rocksdb::DB* intentsdb,
const TransactionStatusManager* status_manager,
const KeyBounds* docdb_key_bounds,
const Slice* iterate_upper_bound,
rocksdb::Statistics* statistics) {
auto min_running_ht = status_manager->MinRunningHybridTime();
if (min_running_ht == HybridTime::kMax) {
VLOG(4) << "No transactions running";
return {};
}
return CreateRocksDBIterator(
intentsdb,
docdb_key_bounds,
docdb::BloomFilterMode::DONT_USE_BLOOM_FILTER,
boost::none /* user_key_for_filter */,
rocksdb::kDefaultQueryId,
CreateIntentHybridTimeFileFilter(min_running_ht),
iterate_upper_bound,
statistics);
}

namespace {

std::mutex rocksdb_flags_mutex;
Expand Down
7 changes: 7 additions & 0 deletions src/yb/docdb/docdb_rocksdb_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ std::unique_ptr<IntentAwareIterator> CreateIntentAwareIterator(
const Slice* iterate_upper_bound = nullptr,
const DocDBStatistics* statistics = nullptr);

BoundedRocksDbIterator CreateIntentsIteratorWithHybridTimeFilter(
rocksdb::DB* intentsdb,
const TransactionStatusManager* status_manager,
const KeyBounds* docdb_key_bounds,
const Slice* iterate_upper_bound = nullptr,
rocksdb::Statistics* statistics = nullptr);

std::shared_ptr<rocksdb::RocksDBPriorityThreadPoolMetrics> CreateRocksDBPriorityThreadPoolMetrics(
scoped_refptr<yb::MetricEntity> entity);

Expand Down
16 changes: 4 additions & 12 deletions src/yb/docdb/intent_aware_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "yb/common/hybrid_time.h"
#include "yb/common/transaction.h"

#include "yb/docdb/doc_ql_filefilter.h"
#include "yb/docdb/docdb_fwd.h"
#include "yb/docdb/conflict_resolution.h"
#include "yb/docdb/docdb-internal.h"
Expand Down Expand Up @@ -139,18 +140,9 @@ IntentAwareIterator::IntentAwareIterator(
<< ", txn_op_context: " << txn_op_context_;

if (txn_op_context) {
if (txn_op_context.txn_status_manager->MinRunningHybridTime() != HybridTime::kMax) {
intent_iter_ = docdb::CreateRocksDBIterator(doc_db.intents,
doc_db.key_bounds,
docdb::BloomFilterMode::DONT_USE_BLOOM_FILTER,
boost::none,
rocksdb::kDefaultQueryId,
nullptr /* file_filter */,
&intent_upperbound_,
intentsdb_statistics);
} else {
VLOG(4) << "No transactions running";
}
intent_iter_ = docdb::CreateIntentsIteratorWithHybridTimeFilter(
doc_db.intents, txn_op_context.txn_status_manager, doc_db.key_bounds, &intent_upperbound_,
intentsdb_statistics);
}
// WARNING: Is is important for regular DB iterator to be created after intents DB iterator,
// otherwise consistency could break, for example in following scenario:
Expand Down
15 changes: 3 additions & 12 deletions src/yb/docdb/intent_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "yb/docdb/intent_iterator.h"

#include "yb/docdb/conflict_resolution.h"
#include "yb/docdb/doc_ql_filefilter.h"
#include "yb/docdb/docdb-internal.h"
#include "yb/docdb/docdb_rocksdb_util.h"
#include "yb/docdb/iter_util.h"
Expand Down Expand Up @@ -74,18 +75,8 @@ IntentIterator::IntentIterator(
VLOG(4) << "IntentIterator, read_time: " << read_time << ", txn_op_context: " << txn_op_context_;

if (txn_op_context) {
if (txn_op_context.txn_status_manager->MinRunningHybridTime() != HybridTime::kMax) {
intent_iter_ = docdb::CreateRocksDBIterator(
intents_db,
docdb_key_bounds,
docdb::BloomFilterMode::DONT_USE_BLOOM_FILTER,
boost::none,
rocksdb::kDefaultQueryId,
nullptr /* file_filter */,
&upperbound_);
} else {
VLOG(4) << "No transactions running";
}
intent_iter_ = docdb::CreateIntentsIteratorWithHybridTimeFilter(
intents_db, txn_op_context.txn_status_manager, docdb_key_bounds, &upperbound_);
}
VTRACE(2, "Created intent iterator - initialized? - $0", intent_iter_.Initialized());
}
Expand Down
10 changes: 8 additions & 2 deletions src/yb/docdb/rocksdb_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "yb/common/row_mark.h"

#include "yb/docdb/conflict_resolution.h"
#include "yb/docdb/doc_ql_filefilter.h"
#include "yb/docdb/docdb.messages.h"
#include "yb/docdb/docdb_compaction_context.h"
#include "yb/docdb/docdb_rocksdb_util.h"
Expand Down Expand Up @@ -437,15 +438,18 @@ IntentsWriterContext::IntentsWriterContext(const TransactionId& transaction_id)
}

IntentsWriter::IntentsWriter(const Slice& start_key,
HybridTime file_filter_ht,
rocksdb::DB* intents_db,
IntentsWriterContext* context)
: start_key_(start_key), intents_db_(intents_db), context_(*context) {
AppendTransactionKeyPrefix(context_.transaction_id(), &txn_reverse_index_prefix_);
txn_reverse_index_prefix_.AppendKeyEntryType(dockv::KeyEntryType::kMaxByte);
reverse_index_upperbound_ = txn_reverse_index_prefix_.AsSlice();

reverse_index_iter_ = CreateRocksDBIterator(
intents_db_, &KeyBounds::kNoBounds, BloomFilterMode::DONT_USE_BLOOM_FILTER, boost::none,
rocksdb::kDefaultQueryId, nullptr /* read_filter */, &reverse_index_upperbound_);
rocksdb::kDefaultQueryId, CreateIntentHybridTimeFileFilter(file_filter_ht),
&reverse_index_upperbound_);
}

Status IntentsWriter::Apply(rocksdb::DirectWriteHandler* handler) {
Expand Down Expand Up @@ -502,6 +506,7 @@ ApplyIntentsContext::ApplyIntentsContext(
const SubtxnSet& aborted,
HybridTime commit_ht,
HybridTime log_ht,
HybridTime file_filter_ht,
const KeyBounds* key_bounds,
SchemaPackingProvider* schema_packing_provider,
rocksdb::DB* intents_db)
Expand All @@ -519,7 +524,8 @@ ApplyIntentsContext::ApplyIntentsContext(
key_bounds_(key_bounds),
intent_iter_(CreateRocksDBIterator(
intents_db, key_bounds, BloomFilterMode::DONT_USE_BLOOM_FILTER, boost::none,
rocksdb::kDefaultQueryId)) {
rocksdb::kDefaultQueryId,
CreateIntentHybridTimeFileFilter(file_filter_ht))) {
}

Result<bool> ApplyIntentsContext::StoreApplyState(
Expand Down
2 changes: 2 additions & 0 deletions src/yb/docdb/rocksdb_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ class IntentsWriterContext {
class IntentsWriter : public rocksdb::DirectWriter {
public:
IntentsWriter(const Slice& start_key,
HybridTime file_filter_ht,
rocksdb::DB* intents_db,
IntentsWriterContext* context);

Expand Down Expand Up @@ -219,6 +220,7 @@ class ApplyIntentsContext : public IntentsWriterContext, public FrontierSchemaVe
const SubtxnSet& aborted,
HybridTime commit_ht,
HybridTime log_ht,
HybridTime file_filter_ht,
const KeyBounds* key_bounds,
SchemaPackingProvider* schema_packing_provider,
rocksdb::DB* intents_db);
Expand Down
11 changes: 8 additions & 3 deletions src/yb/tablet/tablet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2000,6 +2000,8 @@ Status Tablet::ImportData(const std::string& source_dir) {
Result<docdb::ApplyTransactionState> Tablet::ApplyIntents(const TransactionApplyData& data) {
VLOG_WITH_PREFIX(4) << __func__ << ": " << data.transaction_id;

HybridTime min_running_ht = transaction_participant_->MinRunningHybridTime();

// This flag enables tests to induce a situation where a transaction has committed but its intents
// haven't yet moved to regular db for a sufficiently long period. For example, it can help a test
// to reliably assert that conflict resolution/ concurrency control with a conflicting committed
Expand All @@ -2008,9 +2010,10 @@ Result<docdb::ApplyTransactionState> Tablet::ApplyIntents(const TransactionApply
AtomicFlagSleepMs(&FLAGS_TEST_inject_sleep_before_applying_intents_ms);
docdb::ApplyIntentsContext context(
data.transaction_id, data.apply_state, data.aborted, data.commit_ht, data.log_ht,
&key_bounds_, metadata_.get(), intents_db_.get());
min_running_ht, &key_bounds_, metadata_.get(), intents_db_.get());
docdb::IntentsWriter intents_writer(
data.apply_state ? data.apply_state->key : Slice(), intents_db_.get(), &context);
data.apply_state ? data.apply_state->key : Slice(), min_running_ht,
intents_db_.get(), &context);
rocksdb::WriteBatch regular_write_batch;
regular_write_batch.SetDirectWriter(&intents_writer);
// data.hybrid_time contains transaction commit time.
Expand All @@ -2029,12 +2032,14 @@ Status Tablet::RemoveIntentsImpl(
RETURN_NOT_OK(scoped_read_operation);

rocksdb::WriteBatch intents_write_batch;
HybridTime min_running_ht = CHECK_NOTNULL(transaction_participant_)->MinRunningHybridTime();
for (const auto& id : ids) {
boost::optional<docdb::ApplyTransactionState> apply_state;
for (;;) {
docdb::RemoveIntentsContext context(id, static_cast<uint8_t>(reason));
docdb::IntentsWriter writer(
apply_state ? apply_state->key : Slice(), intents_db_.get(), &context);
apply_state ? apply_state->key : Slice(), min_running_ht,
intents_db_.get(), &context);
intents_write_batch.SetDirectWriter(&writer);
docdb::ConsensusFrontiers frontiers;
auto frontiers_ptr = InitFrontiers(data, &frontiers);
Expand Down
6 changes: 3 additions & 3 deletions src/yb/tablet/transaction_participant.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
#include "yb/util/async_util.h"
#include "yb/util/callsite_profiling.h"
#include "yb/util/countdown_latch.h"
#include "yb/util/debug-util.h"
#include "yb/util/debug.h"
#include "yb/util/flags.h"
#include "yb/util/format.h"
#include "yb/util/logging.h"
Expand Down Expand Up @@ -105,11 +105,11 @@ DEFINE_NON_RUNTIME_int32(wait_queue_poll_interval_ms, 100,
"active blockers.");

// TODO: this should be turned into an autoflag.
DEFINE_RUNTIME_bool(cdc_write_post_apply_metadata, yb::IsDebug(),
DEFINE_RUNTIME_bool(cdc_write_post_apply_metadata, yb::kIsDebug,
"Write post-apply transaction metadata to intentsdb for transaction that have been applied but "
" have not yet been streamed by CDC.");

DEFINE_RUNTIME_bool(cdc_immediate_transaction_cleanup, yb::IsDebug(),
DEFINE_RUNTIME_bool(cdc_immediate_transaction_cleanup, yb::kIsDebug,
"Clean up transactions from memory after apply, even if its changes have not yet been "
"streamed by CDC.");

Expand Down
4 changes: 2 additions & 2 deletions src/yb/tserver/pg_client_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
#include "yb/tserver/tserver_service.pb.h"
#include "yb/tserver/tserver_service.proxy.h"

#include "yb/util/debug-util.h"
#include "yb/util/debug.h"
#include "yb/util/flags.h"
#include "yb/util/flags/flag_tags.h"
#include "yb/util/logging.h"
Expand All @@ -86,7 +86,7 @@ using namespace std::literals;
DEFINE_UNKNOWN_uint64(pg_client_session_expiration_ms, 60000,
"Pg client session expiration time in milliseconds.");

DEFINE_RUNTIME_bool(pg_client_use_shared_memory, yb::IsDebug(),
DEFINE_RUNTIME_bool(pg_client_use_shared_memory, yb::kIsDebug,
"Use shared memory for executing read and write pg client queries");

DEFINE_RUNTIME_int32(get_locks_status_max_retry_attempts, 2,
Expand Down
9 changes: 1 addition & 8 deletions src/yb/util/debug-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

#include "yb/gutil/strings/fastmem.h"

#include "yb/util/debug.h"
#include "yb/util/enums.h"
#include "yb/util/slice.h"
#include "yb/util/stack_trace.h"
Expand Down Expand Up @@ -103,14 +104,6 @@ std::string GetLogFormatStackTraceHex();
// may invoke the dynamic loader.
void HexStackTraceToString(char* buf, size_t size);

constexpr bool IsDebug() {
#ifdef NDEBUG
return false;
#else
return true;
#endif
}

class NODISCARD_CLASS ScopeLogger {
public:
ScopeLogger(const std::string& msg, std::function<void()> on_scope_bounds);
Expand Down
Loading

0 comments on commit 97536b4

Please sign in to comment.