Skip to content

Commit

Permalink
[#21877] docdb: Revert "[#21580] docdb: Filter intent SST files only …
Browse files Browse the repository at this point in the history
…retained for CDC"

Summary:
D33131 introduced a segmentation fault which was  identified in multiple tests.
```
* thread #1, name = 'yb-tserver', stop reason = signal SIGSEGV
  * frame #0: 0x00007f4d2b6f3a84 libpthread.so.0`__pthread_mutex_lock + 4
    frame #1: 0x000055d6d1e1190b yb-tserver`yb::tablet::MvccManager::SafeTimeForFollower(yb::HybridTime, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l>>>) const [inlined] std::__1::unique_lock<std::__1::mutex>::unique_lock[abi:v170002](this=0x00007f4ccb6feaa0, __m=0x0000000000000110) at unique_lock.h:41:11
    frame #2: 0x000055d6d1e118f5 yb-tserver`yb::tablet::MvccManager::SafeTimeForFollower(this=0x00000000000000f0, min_allowed=<unavailable>, deadline=yb::CoarseTimePoint @ 0x00007f4ccb6feb08) const at mvcc.cc:500:32
    frame #3: 0x000055d6d1ef58e3 yb-tserver`yb::tablet::TransactionParticipant::Impl::ProcessRemoveQueueUnlocked(this=0x000037e27d26fb00, min_running_notifier=0x00007f4ccb6fef28) at transaction_participant.cc:1537:45
    frame #4: 0x000055d6d1efc11a yb-tserver`yb::tablet::TransactionParticipant::Impl::EnqueueRemoveUnlocked(this=0x000037e27d26fb00, id=<unavailable>, reason=<unavailable>, min_running_notifier=0x00007f4ccb6fef28, expected_deadlock_status=<unavailable>) at transaction_participant.cc:1516:5
    frame #5: 0x000055d6d1e3afbe yb-tserver`yb::tablet::RunningTransaction::DoStatusReceived(this=0x000037e2679b5218, status_tablet="d5922c26c9704f298d6812aff8f615f6", status=<unavailable>, response=<unavailable>, serial_no=56986, shared_self=std::__1::shared_ptr<yb::tablet::RunningTransaction>::element_type @ 0x000037e2679b5218) at running_transaction.cc:424:16
    frame #6: 0x000055d6d0d7db5f yb-tserver`yb::client::(anonymous namespace)::TransactionRpcBase::Finished(this=0x000037e29c80b420, status=<unavailable>) at transaction_rpc.cc:67:7
```
This diff reverts the change to unblock the tests.

The proper fix for this problem is WIP
Jira: DB-10780, DB-10466

Test Plan: Jenkins: urgent

Reviewers: rthallam

Reviewed By: rthallam

Subscribers: ybase, yql

Differential Revision: https://phorge.dev.yugabyte.com/D34245
  • Loading branch information
yusong-yan committed Apr 18, 2024
1 parent c3f43c0 commit 731a61d
Show file tree
Hide file tree
Showing 16 changed files with 58 additions and 115 deletions.
6 changes: 3 additions & 3 deletions src/yb/ash/wait_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ DEFINE_RUNTIME_PG_PREVIEW_FLAG(bool, yb_enable_ash, false,
"and various background activities. This does nothing if "
"ysql_yb_enable_ash_infra is disabled.");

DEFINE_test_flag(bool, export_wait_state_names, yb::kIsDebug,
DEFINE_test_flag(bool, export_wait_state_names, yb::IsDebug(),
"Exports wait-state name as a human understandable string.");
DEFINE_test_flag(bool, trace_ash_wait_code_updates, yb::kIsDebug,
DEFINE_test_flag(bool, trace_ash_wait_code_updates, yb::IsDebug(),
"Add a trace line whenever the wait state code is updated.");
DEFINE_test_flag(uint32, yb_ash_sleep_at_wait_state_ms, 0,
"How long to sleep/delay when entering a particular wait state.");
DEFINE_test_flag(uint32, yb_ash_wait_code_to_sleep_at, 0,
"If enabled, add a sleep/delay when we enter the specified wait state.");
DEFINE_test_flag(bool, export_ash_uuids_as_hex_strings, yb::kIsDebug,
DEFINE_test_flag(bool, export_ash_uuids_as_hex_strings, yb::IsDebug(),
"Exports wait-state name as a human understandable string.");
DEFINE_test_flag(bool, ash_debug_aux, false, "Set ASH aux_info to the first 16 characters"
" of the method tserver is running");
Expand Down
4 changes: 2 additions & 2 deletions src/yb/client/ql-stress-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
#include "yb/tserver/ts_tablet_manager.h"

#include "yb/util/backoff_waiter.h"
#include "yb/util/debug.h"
#include "yb/util/debug-util.h"
#include "yb/util/format.h"
#include "yb/util/metrics.h"
#include "yb/util/random_util.h"
Expand Down Expand Up @@ -830,7 +830,7 @@ void QLStressTest::AddWriter(
}

void QLStressTest::TestWriteRejection() {
constexpr int kWriters = kIsDebug ? 10 : 20;
constexpr int kWriters = IsDebug() ? 10 : 20;
constexpr int kKeyBase = 10000;

std::array<std::atomic<int>, kWriters> keys;
Expand Down
21 changes: 11 additions & 10 deletions src/yb/docdb/conflict_resolution.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,7 @@ class ConflictResolver : public std::enable_shared_from_this<ConflictResolver> {

// Reads conflicts for specified intent from DB.
Status ReadIntentConflicts(IntentTypeSet type, KeyBytes* intent_key_prefix) {
if (!CreateIntentIteratorIfNecessary()) {
return Status::OK();
}
EnsureIntentIteratorCreated();

const auto conflicting_intent_types = kIntentTypeSetConflicts[type.ToUIntPtr()];

Expand Down Expand Up @@ -305,12 +303,17 @@ class ConflictResolver : public std::enable_shared_from_this<ConflictResolver> {
return intent_iter_.status();
}

bool CreateIntentIteratorIfNecessary() {
void EnsureIntentIteratorCreated() {
if (!intent_iter_.Initialized()) {
intent_iter_ = CreateIntentsIteratorWithHybridTimeFilter(
doc_db_.intents, &status_manager(), doc_db_.key_bounds, &intent_key_upperbound_);
intent_iter_ = CreateRocksDBIterator(
doc_db_.intents,
doc_db_.key_bounds,
BloomFilterMode::DONT_USE_BLOOM_FILTER,
boost::none /* user_key_for_filter */,
rocksdb::kDefaultQueryId,
nullptr /* file_filter */,
&intent_key_upperbound_);
}
return intent_iter_.Initialized();
}

Result<IntentTypesContainer> GetLockStatusInfo() {
Expand Down Expand Up @@ -1144,9 +1147,7 @@ class TransactionConflictResolverContext : public ConflictResolverContextBase {
// This is to prevent the case when we create an iterator on the regular DB where a
// provisional record has not yet been applied, and then create an iterator the intents
// DB where the provisional record has already been removed.
// Even in the case where there are no intents to iterate over, the following loop must be
// run, so we cannot return early if the following call returns false.
resolver->CreateIntentIteratorIfNecessary();
resolver->EnsureIntentIteratorCreated();

for (const auto& i : container) {
const Slice intent_key = i.first.AsSlice();
Expand Down
12 changes: 0 additions & 12 deletions src/yb/docdb/doc_ql_filefilter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,6 @@

#include "yb/rocksdb/db/compaction.h"

#include "yb/util/debug.h"

DEFINE_RUNTIME_bool(docdb_ht_filter_intents, yb::kIsDebug,
"Use hybrid time SST filter when scanning intents.");

namespace yb::docdb {

rocksdb::UserBoundaryTag TagForRangeComponent(size_t index);
Expand Down Expand Up @@ -152,11 +147,4 @@ std::shared_ptr<rocksdb::ReadFileFilter> CreateHybridTimeFileFilter(HybridTime m
return std::make_shared<HybridTimeFileFilter>(min_hybrid_time);
}

std::shared_ptr<rocksdb::ReadFileFilter> CreateIntentHybridTimeFileFilter(
HybridTime min_running_ht) {
return GetAtomicFlag(&FLAGS_docdb_ht_filter_intents) && min_running_ht != HybridTime::kMin
? std::make_shared<HybridTimeFileFilter>(min_running_ht)
: nullptr;
}

} // namespace yb::docdb
5 changes: 0 additions & 5 deletions src/yb/docdb/doc_ql_filefilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#pragma once

#include "yb/common/hybrid_time.h"
#include "yb/common/transaction.h"

#include "yb/qlexpr/qlexpr_fwd.h"

Expand All @@ -26,9 +25,5 @@ namespace yb::docdb {

std::shared_ptr<rocksdb::ReadFileFilter> CreateFileFilter(const qlexpr::YQLScanSpec& scan_spec);
std::shared_ptr<rocksdb::ReadFileFilter> CreateHybridTimeFileFilter(HybridTime min_hybrid_Time);
// Create a file filter for intentsdb using the given min running hybrid time. Filtering is done
// based on intent hybrid time stored in the intent key, not commit time of the transaction.
std::shared_ptr<rocksdb::ReadFileFilter> CreateIntentHybridTimeFileFilter(
HybridTime min_running_ht);

} // namespace yb::docdb
22 changes: 0 additions & 22 deletions src/yb/docdb/docdb_rocksdb_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -316,28 +316,6 @@ unique_ptr<IntentAwareIterator> CreateIntentAwareIterator(
statistics ? statistics->IntentsDBStatistics() : nullptr);
}

BoundedRocksDbIterator CreateIntentsIteratorWithHybridTimeFilter(
rocksdb::DB* intentsdb,
const TransactionStatusManager* status_manager,
const KeyBounds* docdb_key_bounds,
const Slice* iterate_upper_bound,
rocksdb::Statistics* statistics) {
auto min_running_ht = status_manager->MinRunningHybridTime();
if (min_running_ht == HybridTime::kMax) {
VLOG(4) << "No transactions running";
return {};
}
return CreateRocksDBIterator(
intentsdb,
docdb_key_bounds,
docdb::BloomFilterMode::DONT_USE_BLOOM_FILTER,
boost::none /* user_key_for_filter */,
rocksdb::kDefaultQueryId,
CreateIntentHybridTimeFileFilter(min_running_ht),
iterate_upper_bound,
statistics);
}

namespace {

std::mutex rocksdb_flags_mutex;
Expand Down
7 changes: 0 additions & 7 deletions src/yb/docdb/docdb_rocksdb_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,6 @@ std::unique_ptr<IntentAwareIterator> CreateIntentAwareIterator(
const Slice* iterate_upper_bound = nullptr,
const DocDBStatistics* statistics = nullptr);

BoundedRocksDbIterator CreateIntentsIteratorWithHybridTimeFilter(
rocksdb::DB* intentsdb,
const TransactionStatusManager* status_manager,
const KeyBounds* docdb_key_bounds,
const Slice* iterate_upper_bound = nullptr,
rocksdb::Statistics* statistics = nullptr);

std::shared_ptr<rocksdb::RocksDBPriorityThreadPoolMetrics> CreateRocksDBPriorityThreadPoolMetrics(
scoped_refptr<yb::MetricEntity> entity);

Expand Down
16 changes: 12 additions & 4 deletions src/yb/docdb/intent_aware_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include "yb/common/hybrid_time.h"
#include "yb/common/transaction.h"

#include "yb/docdb/doc_ql_filefilter.h"
#include "yb/docdb/docdb_fwd.h"
#include "yb/docdb/conflict_resolution.h"
#include "yb/docdb/docdb-internal.h"
Expand Down Expand Up @@ -140,9 +139,18 @@ IntentAwareIterator::IntentAwareIterator(
<< ", txn_op_context: " << txn_op_context_;

if (txn_op_context) {
intent_iter_ = docdb::CreateIntentsIteratorWithHybridTimeFilter(
doc_db.intents, txn_op_context.txn_status_manager, doc_db.key_bounds, &intent_upperbound_,
intentsdb_statistics);
if (txn_op_context.txn_status_manager->MinRunningHybridTime() != HybridTime::kMax) {
intent_iter_ = docdb::CreateRocksDBIterator(doc_db.intents,
doc_db.key_bounds,
docdb::BloomFilterMode::DONT_USE_BLOOM_FILTER,
boost::none,
rocksdb::kDefaultQueryId,
nullptr /* file_filter */,
&intent_upperbound_,
intentsdb_statistics);
} else {
VLOG(4) << "No transactions running";
}
}
// WARNING: Is is important for regular DB iterator to be created after intents DB iterator,
// otherwise consistency could break, for example in following scenario:
Expand Down
15 changes: 12 additions & 3 deletions src/yb/docdb/intent_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
#include "yb/docdb/intent_iterator.h"

#include "yb/docdb/conflict_resolution.h"
#include "yb/docdb/doc_ql_filefilter.h"
#include "yb/docdb/docdb-internal.h"
#include "yb/docdb/docdb_rocksdb_util.h"
#include "yb/docdb/iter_util.h"
Expand Down Expand Up @@ -75,8 +74,18 @@ IntentIterator::IntentIterator(
VLOG(4) << "IntentIterator, read_time: " << read_time << ", txn_op_context: " << txn_op_context_;

if (txn_op_context) {
intent_iter_ = docdb::CreateIntentsIteratorWithHybridTimeFilter(
intents_db, txn_op_context.txn_status_manager, docdb_key_bounds, &upperbound_);
if (txn_op_context.txn_status_manager->MinRunningHybridTime() != HybridTime::kMax) {
intent_iter_ = docdb::CreateRocksDBIterator(
intents_db,
docdb_key_bounds,
docdb::BloomFilterMode::DONT_USE_BLOOM_FILTER,
boost::none,
rocksdb::kDefaultQueryId,
nullptr /* file_filter */,
&upperbound_);
} else {
VLOG(4) << "No transactions running";
}
}
VTRACE(2, "Created intent iterator - initialized? - $0", intent_iter_.Initialized());
}
Expand Down
10 changes: 2 additions & 8 deletions src/yb/docdb/rocksdb_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#include "yb/common/row_mark.h"

#include "yb/docdb/conflict_resolution.h"
#include "yb/docdb/doc_ql_filefilter.h"
#include "yb/docdb/docdb.messages.h"
#include "yb/docdb/docdb_compaction_context.h"
#include "yb/docdb/docdb_rocksdb_util.h"
Expand Down Expand Up @@ -438,18 +437,15 @@ IntentsWriterContext::IntentsWriterContext(const TransactionId& transaction_id)
}

IntentsWriter::IntentsWriter(const Slice& start_key,
HybridTime file_filter_ht,
rocksdb::DB* intents_db,
IntentsWriterContext* context)
: start_key_(start_key), intents_db_(intents_db), context_(*context) {
AppendTransactionKeyPrefix(context_.transaction_id(), &txn_reverse_index_prefix_);
txn_reverse_index_prefix_.AppendKeyEntryType(dockv::KeyEntryType::kMaxByte);
reverse_index_upperbound_ = txn_reverse_index_prefix_.AsSlice();

reverse_index_iter_ = CreateRocksDBIterator(
intents_db_, &KeyBounds::kNoBounds, BloomFilterMode::DONT_USE_BLOOM_FILTER, boost::none,
rocksdb::kDefaultQueryId, CreateIntentHybridTimeFileFilter(file_filter_ht),
&reverse_index_upperbound_);
rocksdb::kDefaultQueryId, nullptr /* read_filter */, &reverse_index_upperbound_);
}

Status IntentsWriter::Apply(rocksdb::DirectWriteHandler* handler) {
Expand Down Expand Up @@ -506,7 +502,6 @@ ApplyIntentsContext::ApplyIntentsContext(
const SubtxnSet& aborted,
HybridTime commit_ht,
HybridTime log_ht,
HybridTime file_filter_ht,
const KeyBounds* key_bounds,
SchemaPackingProvider* schema_packing_provider,
rocksdb::DB* intents_db)
Expand All @@ -524,8 +519,7 @@ ApplyIntentsContext::ApplyIntentsContext(
key_bounds_(key_bounds),
intent_iter_(CreateRocksDBIterator(
intents_db, key_bounds, BloomFilterMode::DONT_USE_BLOOM_FILTER, boost::none,
rocksdb::kDefaultQueryId,
CreateIntentHybridTimeFileFilter(file_filter_ht))) {
rocksdb::kDefaultQueryId)) {
}

Result<bool> ApplyIntentsContext::StoreApplyState(
Expand Down
2 changes: 0 additions & 2 deletions src/yb/docdb/rocksdb_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ class IntentsWriterContext {
class IntentsWriter : public rocksdb::DirectWriter {
public:
IntentsWriter(const Slice& start_key,
HybridTime file_filter_ht,
rocksdb::DB* intents_db,
IntentsWriterContext* context);

Expand Down Expand Up @@ -220,7 +219,6 @@ class ApplyIntentsContext : public IntentsWriterContext, public FrontierSchemaVe
const SubtxnSet& aborted,
HybridTime commit_ht,
HybridTime log_ht,
HybridTime file_filter_ht,
const KeyBounds* key_bounds,
SchemaPackingProvider* schema_packing_provider,
rocksdb::DB* intents_db);
Expand Down
11 changes: 3 additions & 8 deletions src/yb/tablet/tablet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2000,8 +2000,6 @@ Status Tablet::ImportData(const std::string& source_dir) {
Result<docdb::ApplyTransactionState> Tablet::ApplyIntents(const TransactionApplyData& data) {
VLOG_WITH_PREFIX(4) << __func__ << ": " << data.transaction_id;

HybridTime min_running_ht = transaction_participant_->MinRunningHybridTime();

// This flag enables tests to induce a situation where a transaction has committed but its intents
// haven't yet moved to regular db for a sufficiently long period. For example, it can help a test
// to reliably assert that conflict resolution/ concurrency control with a conflicting committed
Expand All @@ -2010,10 +2008,9 @@ Result<docdb::ApplyTransactionState> Tablet::ApplyIntents(const TransactionApply
AtomicFlagSleepMs(&FLAGS_TEST_inject_sleep_before_applying_intents_ms);
docdb::ApplyIntentsContext context(
data.transaction_id, data.apply_state, data.aborted, data.commit_ht, data.log_ht,
min_running_ht, &key_bounds_, metadata_.get(), intents_db_.get());
&key_bounds_, metadata_.get(), intents_db_.get());
docdb::IntentsWriter intents_writer(
data.apply_state ? data.apply_state->key : Slice(), min_running_ht,
intents_db_.get(), &context);
data.apply_state ? data.apply_state->key : Slice(), intents_db_.get(), &context);
rocksdb::WriteBatch regular_write_batch;
regular_write_batch.SetDirectWriter(&intents_writer);
// data.hybrid_time contains transaction commit time.
Expand All @@ -2032,14 +2029,12 @@ Status Tablet::RemoveIntentsImpl(
RETURN_NOT_OK(scoped_read_operation);

rocksdb::WriteBatch intents_write_batch;
HybridTime min_running_ht = CHECK_NOTNULL(transaction_participant_)->MinRunningHybridTime();
for (const auto& id : ids) {
boost::optional<docdb::ApplyTransactionState> apply_state;
for (;;) {
docdb::RemoveIntentsContext context(id, static_cast<uint8_t>(reason));
docdb::IntentsWriter writer(
apply_state ? apply_state->key : Slice(), min_running_ht,
intents_db_.get(), &context);
apply_state ? apply_state->key : Slice(), intents_db_.get(), &context);
intents_write_batch.SetDirectWriter(&writer);
docdb::ConsensusFrontiers frontiers;
auto frontiers_ptr = InitFrontiers(data, &frontiers);
Expand Down
6 changes: 3 additions & 3 deletions src/yb/tablet/transaction_participant.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
#include "yb/util/async_util.h"
#include "yb/util/callsite_profiling.h"
#include "yb/util/countdown_latch.h"
#include "yb/util/debug.h"
#include "yb/util/debug-util.h"
#include "yb/util/flags.h"
#include "yb/util/format.h"
#include "yb/util/logging.h"
Expand Down Expand Up @@ -105,11 +105,11 @@ DEFINE_NON_RUNTIME_int32(wait_queue_poll_interval_ms, 100,
"active blockers.");

// TODO: this should be turned into an autoflag.
DEFINE_RUNTIME_bool(cdc_write_post_apply_metadata, yb::kIsDebug,
DEFINE_RUNTIME_bool(cdc_write_post_apply_metadata, yb::IsDebug(),
"Write post-apply transaction metadata to intentsdb for transaction that have been applied but "
" have not yet been streamed by CDC.");

DEFINE_RUNTIME_bool(cdc_immediate_transaction_cleanup, yb::kIsDebug,
DEFINE_RUNTIME_bool(cdc_immediate_transaction_cleanup, yb::IsDebug(),
"Clean up transactions from memory after apply, even if its changes have not yet been "
"streamed by CDC.");

Expand Down
4 changes: 2 additions & 2 deletions src/yb/tserver/pg_client_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
#include "yb/tserver/tserver_service.pb.h"
#include "yb/tserver/tserver_service.proxy.h"

#include "yb/util/debug.h"
#include "yb/util/debug-util.h"
#include "yb/util/flags.h"
#include "yb/util/flags/flag_tags.h"
#include "yb/util/logging.h"
Expand All @@ -86,7 +86,7 @@ using namespace std::literals;
DEFINE_UNKNOWN_uint64(pg_client_session_expiration_ms, 60000,
"Pg client session expiration time in milliseconds.");

DEFINE_RUNTIME_bool(pg_client_use_shared_memory, yb::kIsDebug,
DEFINE_RUNTIME_bool(pg_client_use_shared_memory, yb::IsDebug(),
"Use shared memory for executing read and write pg client queries");

DEFINE_RUNTIME_int32(get_locks_status_max_retry_attempts, 2,
Expand Down
9 changes: 8 additions & 1 deletion src/yb/util/debug-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@

#include "yb/gutil/strings/fastmem.h"

#include "yb/util/debug.h"
#include "yb/util/enums.h"
#include "yb/util/slice.h"
#include "yb/util/stack_trace.h"
Expand Down Expand Up @@ -104,6 +103,14 @@ std::string GetLogFormatStackTraceHex();
// may invoke the dynamic loader.
void HexStackTraceToString(char* buf, size_t size);

constexpr bool IsDebug() {
#ifdef NDEBUG
return false;
#else
return true;
#endif
}

class NODISCARD_CLASS ScopeLogger {
public:
ScopeLogger(const std::string& msg, std::function<void()> on_scope_bounds);
Expand Down
Loading

0 comments on commit 731a61d

Please sign in to comment.