Skip to content

Commit

Permalink
#3288: Prevent callback from detached rocksdb instance; prevent race …
Browse files Browse the repository at this point in the history
…condition between insert and truncate; disable rocksdb flush on truncate

Summary:
- Prevent rocksdb instance from calling `ListenFilesChanged` callback after being detached from `Tablet`

Full stack available at - #3288 (comment)

Follow Up Work: #3476

- Prevent race between `Tablet::AcquireLocksAndPerformDocOperations() -> Tablet::StartDocWriteOperation()` and `Tablet::Truncate()` by incrementing `pending_op_counter_`

Full stack available at -  #3288 (comment)

- Do not flush rocksdb memtable when user truncates table to prevent following crash on flush -

```
#0  0x000056371c706b00 in ?? ()
#1  0x00007f5229291099 in std::__invoke_impl<void, void (yb::tablet::Tablet::*&)(), yb::tablet::Tablet*&> (__t=<optimized out>, __f=<optimized out>) at /usr/include/c++/7/bits/invoke.h:73
#2  std::__invoke<void (yb::tablet::Tablet::*&)(), yb::tablet::Tablet*&> (__fn=<optimized out>) at /usr/include/c++/7/bits/invoke.h:95
#3  std::_Bind<void (yb::tablet::Tablet::*(yb::tablet::Tablet*))()>::__call<void, , 0ul>(std::tuple<>&&, std::_Index_tuple<0ul>) (__args=..., this=<optimized out>) at /usr/include/c++/7/functional:467
#4  std::_Bind<void (yb::tablet::Tablet::*(yb::tablet::Tablet*))()>::operator()<, void>() (this=<optimized out>) at /usr/include/c++/7/functional:551
#5  std::_Function_handler<void (), std::_Bind<void (yb::tablet::Tablet::*(yb::tablet::Tablet*))()> >::_M_invoke(std::_Any_data const&) (__functor=...) at /usr/include/c++/7/bits/std_function.h:316
#6  0x00007f5225aee92c in std::function<void ()>::operator()() const (this=0x56371c5534f0) at /usr/include/c++/7/bits/std_function.h:706
#7  rocksdb::DBImpl::FilesChanged (this=this@entry=0x56371c552b00) at ../../src/yb/rocksdb/db/db_impl.cc:4359
#8  0x00007f5225b14b1b in rocksdb::DBImpl::BackgroundCallFlush (this=this@entry=0x56371c552b00, cfd=cfd@entry=0x0) at ../../src/yb/rocksdb/db/db_impl.cc:3285
```

Full stack available at - #3288 (comment)

- WORKAROUND for race condition (#3288 (comment)) by using `ScopedPendingOperation` in `Tablet::ShouldApplyWrite()` to prevent it from seeing a null `regular_db_` due to a concurrent `Tablet::Truncate()`

Full stack available at - #3288 (comment)

Follow Up Work: #3477

Test Plan:
./yb_build.sh debug --cxx-test pg_libpq-test --gtest_filter PgLibPqTest.ConcurrentInsertTruncateForeignKey
./yb_build.sh debug --cxx-test snapshot-txn-test --gtest_filter SnapshotTxnTest.MultiWriteWithRestart

Reviewers: bogdan, sergei, mikhail

Reviewed By: mikhail

Subscribers: kannan, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D7808
  • Loading branch information
rajukumaryb committed Feb 7, 2020
1 parent 94747c4 commit 89c3c39
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 6 deletions.
32 changes: 26 additions & 6 deletions src/yb/tablet/tablet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,17 @@ bool Tablet::StartShutdown() {
return true;
}

void Tablet::PreventCallbacksFromRocksDBs(bool disable_flush_on_shutdown) {
if (intents_db_) {
intents_db_->ListenFilesChanged(nullptr);
intents_db_->SetDisableFlushOnShutdown(disable_flush_on_shutdown);
}

if (regular_db_) {
regular_db_->SetDisableFlushOnShutdown(disable_flush_on_shutdown);
}
}

void Tablet::CompleteShutdown(IsDropTable is_drop_table) {
StartShutdown();

Expand All @@ -813,12 +824,8 @@ void Tablet::CompleteShutdown(IsDropTable is_drop_table) {
}

std::lock_guard<rw_spinlock> lock(component_lock_);
if (intents_db_) {
intents_db_->SetDisableFlushOnShutdown(is_drop_table);
}
if (regular_db_) {
regular_db_->SetDisableFlushOnShutdown(is_drop_table);
}

PreventCallbacksFromRocksDBs(is_drop_table);

// Shutdown the RocksDB instance for this table, if present.
// Destroy intents and regular DBs in reverse order to their creation.
Expand Down Expand Up @@ -1556,6 +1563,12 @@ void Tablet::AcquireLocksAndPerformDocOperations(std::unique_ptr<WriteOperation>
if (key_value_write_request->has_write_batch()) {
Status status;
if (!key_value_write_request->write_batch().read_pairs().empty()) {
ScopedPendingOperation scoped_operation(&pending_op_counter_);
if (!scoped_operation.ok()) {
operation->state()->CompleteWithStatus(MoveStatus(scoped_operation));
return;
}

status = StartDocWriteOperation(operation.get());
} else {
DCHECK(key_value_write_request->has_external_hybrid_time());
Expand Down Expand Up @@ -2035,6 +2048,8 @@ Status Tablet::Truncate(TruncateOperationState *state) {
return STATUS(IllegalState, "Tablet was shut down");
}

PreventCallbacksFromRocksDBs(true);

const rocksdb::SequenceNumber sequence_number = regular_db_->GetLatestSequenceNumber();
const string db_dir = regular_db_->GetName();

Expand Down Expand Up @@ -2603,6 +2618,11 @@ Status Tablet::CreateReadIntents(
}

bool Tablet::ShouldApplyWrite() {
ScopedPendingOperation scoped_read_operation(&pending_op_counter_);
if (!scoped_read_operation.ok()) {
return false;
}

return !regular_db_->NeedsDelay();
}

Expand Down
2 changes: 2 additions & 0 deletions src/yb/tablet/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,8 @@ class Tablet : public AbstractTablet, public TransactionIntentApplier {

CHECKED_STATUS DoEnableCompactions();

void PreventCallbacksFromRocksDBs(bool disable_flush_on_shutdown);

std::string LogPrefix() const;

std::string LogPrefix(docdb::StorageDbType db_type) const;
Expand Down
59 changes: 59 additions & 0 deletions src/yb/yql/pgwrapper/pg_libpq-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,65 @@ TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(ReadRestart)) {
ASSERT_GE(last_written.load(std::memory_order_acquire), 100);
}

// Concurrently insert records into tables with foreign key relationship while truncating both.
TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(ConcurrentInsertTruncateForeignKey)) {
auto conn = ASSERT_RESULT(Connect());

ASSERT_OK(conn.Execute("DROP TABLE IF EXISTS t2"));
ASSERT_OK(conn.Execute("DROP TABLE IF EXISTS t1"));
ASSERT_OK(conn.Execute("CREATE TABLE t1 (k int primary key, v int)"));
ASSERT_OK(conn.Execute(
"CREATE TABLE t2 (k int primary key, t1_k int, FOREIGN KEY (t1_k) REFERENCES t1 (k))"));

std::atomic<bool> stop(false);

const int kMaxKeys = 1 << 20;

constexpr auto kWriteThreads = 4;
std::vector<std::thread> write_threads;
while (write_threads.size() != kWriteThreads) {
write_threads.emplace_back([this, &stop] {
auto write_conn = ASSERT_RESULT(Connect());
while (!stop.load(std::memory_order_acquire)) {
int t1_k = RandomUniformInt(0, kMaxKeys - 1);
int t1_v = RandomUniformInt(0, kMaxKeys - 1);
WARN_NOT_OK(write_conn.ExecuteFormat(
"INSERT INTO t1 VALUES ($0, $1)", t1_k, t1_v), "Ignore");
int t2_k = RandomUniformInt(0, kMaxKeys - 1);
WARN_NOT_OK(write_conn.ExecuteFormat(
"INSERT INTO t2 VALUES ($0, $1)", t2_k, t1_k), "Ignore");
}
});
}

constexpr auto kTruncateThreads = 2;
std::vector<std::thread> truncate_threads;
while (truncate_threads.size() != kTruncateThreads) {
truncate_threads.emplace_back([this, &stop] {
auto truncate_conn = ASSERT_RESULT(Connect());
int idx = 0;
while (!stop.load(std::memory_order_acquire)) {
WARN_NOT_OK(truncate_conn.Execute(
"TRUNCATE TABLE t1, t2 CASCADE"), "Ignore");
++idx;
std::this_thread::sleep_for(100ms);
}
});
}

auto se = ScopeExit([&stop, &write_threads, &truncate_threads] {
stop.store(true, std::memory_order_release);
for (auto& thread : write_threads) {
thread.join();
}
for (auto& thread : truncate_threads) {
thread.join();
}
});

std::this_thread::sleep_for(30s);
}

// Concurrently insert records to table with index.
TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(ConcurrentIndexInsert)) {
auto conn = ASSERT_RESULT(Connect());
Expand Down

0 comments on commit 89c3c39

Please sign in to comment.