Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a bug that causes iterator to return wrong result in a rare data race #6973

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
### Behavior Changes
* Best-efforts recovery ignores CURRENT file completely. If CURRENT file is missing during recovery, best-efforts recovery still proceeds with MANIFEST file(s).
* In best-efforts recovery, an error that is not Corruption or IOError::kNotFound or IOError::kPathNotFound will be overwritten silently. Fix this by checking all non-ok cases and return early.
### Bug fixes
* Compressed block cache was automatically disabled with read-only DBs by mistake. Now it is fixed: compressed block cache will be in effective with read-only DB too.
* Fix a bug of wrong iterator result if another thread finishes an update and a DB flush between two statement.

### Public API Change
* `DB::GetDbSessionId(std::string& session_id)` is added. `session_id` stores a unique identifier that gets reset every time the DB is opened. This DB session ID should be unique among all open DB instances on all hosts, and should be unique among re-openings of the same or other DBs. This identifier is recorded in the LOG file on the line starting with "DB Session ID:".
Expand All @@ -25,7 +25,6 @@
* Fix false negative from the VerifyChecksum() API when there is a checksum mismatch in an index partition block in a BlockBasedTable format table file (index_type is kTwoLevelIndexSearch).
* Fix sst_dump to return non-zero exit code if the specified file is not a recognized SST file or fails requested checks.
* Fix incorrect results from batched MultiGet for duplicate keys, when the duplicate key matches the largest key of an SST file and the value type for the key in the file is a merge value.
* Fix "bad block type" error from persistent cache on Windows.

### Public API Change
* Flush(..., column_family) may return Status::ColumnFamilyDropped() instead of Status::InvalidArgument() if column_family is dropped while processing the flush request.
Expand Down
6 changes: 4 additions & 2 deletions db/arena_wrapped_db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,17 @@ Status ArenaWrappedDBIter::Refresh() {
// TODO(yiwu): For last_seq_same_as_publish_seq_==false, this is not the
// correct behavior. Will be corrected automatically when we take a snapshot
// here for the case of WritePreparedTxnDB.
SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber();
uint64_t cur_sv_number = cfd_->GetSuperVersionNumber();
TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:1");
TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:2");
if (sv_number_ != cur_sv_number) {
Env* env = db_iter_->env();
db_iter_->~DBIter();
arena_.~Arena();
new (&arena_) Arena();

SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_);
SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber();
if (read_callback_) {
read_callback_->Refresh(latest_seq);
}
Expand All @@ -78,7 +80,7 @@ Status ArenaWrappedDBIter::Refresh() {
latest_seq, /* allow_unprepared_value */ true);
SetIterUnderDBIter(internal_iter);
} else {
db_iter_->set_sequence(latest_seq);
db_iter_->set_sequence(db_impl_->GetLatestSequenceNumber());
db_iter_->set_valid(false);
}
return Status::OK();
Expand Down
30 changes: 25 additions & 5 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2671,7 +2671,8 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
" guaranteed to be preserved, try larger iter_start_seqnum opt."));
}
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
ColumnFamilyData* cfd = cfh->cfd();
assert(cfd != nullptr);
ReadCallback* read_callback = nullptr; // No read callback provided.
if (read_options.tailing) {
#ifdef ROCKSDB_LITE
Expand All @@ -2692,10 +2693,11 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
// Note: no need to consider the special case of
// last_seq_same_as_publish_seq_==false since NewIterator is overridden in
// WritePreparedTxnDB
auto snapshot = read_options.snapshot != nullptr
? read_options.snapshot->GetSequenceNumber()
: versions_->LastSequence();
result = NewIteratorImpl(read_options, cfd, snapshot, read_callback);
result = NewIteratorImpl(read_options, cfd,
(read_options.snapshot != nullptr)
? read_options.snapshot->GetSequenceNumber()
: kMaxSequenceNumber,
read_callback);
}
return result;
}
Expand All @@ -2708,6 +2710,24 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
bool allow_refresh) {
SuperVersion* sv = cfd->GetReferencedSuperVersion(this);

TEST_SYNC_POINT("DBImpl::NewIterator:1");
TEST_SYNC_POINT("DBImpl::NewIterator:2");

if (snapshot == kMaxSequenceNumber) {
// Note that the snapshot is assigned AFTER referencing the super
// version because otherwise a flush happening in between may compact away
// data for the snapshot, so the reader would see neither data that was be
// visible to the snapshot before compaction nor the newer data inserted
// afterwards.
// Note that the super version might not contain all the data available
// to this snapshot, but in that case it can see all the data in the
// super version, which is a valid consistent state after the user
// calls NewIterator().
snapshot = versions_->LastSequence();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use a couple SyncPoints at the end of the LastSequence() function to enforce the flush happens immediately after sequence number is obtained. RIght now the sync points are in the right places but it's not obvious to maintainers how they should preserve the order in the future (esp. since their names are just 1, 2, 3, 4).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I gave it a try and found that it would make the code much more complicated. The reason is that write also calls versions_->LastSequence(). The sync-point would be complicated if we place inside. I can move the sync point to here anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SyncPoint marker could be useful as the write/flush happen in a different thread. Up to you.

TEST_SYNC_POINT("DBImpl::NewIterator:3");
TEST_SYNC_POINT("DBImpl::NewIterator:4");
}

// Try to generate a DB iterator tree in continuous memory area to be
// cache friendly. Here is an example of result:
// +-------------------------------+
Expand Down
1 change: 1 addition & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,7 @@ class DBImpl : public DB {
Status GetImpl(const ReadOptions& options, const Slice& key,
GetImplOptions& get_impl_options);

// If `snapshot` == kMaxSequenceNumber, set a recent one inside the file.
ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options,
ColumnFamilyData* cfd,
SequenceNumber snapshot,
Expand Down
88 changes: 88 additions & 0 deletions db/db_test2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2957,6 +2957,94 @@ TEST_F(DBTest2, OptimizeForSmallDB) {

#endif // ROCKSDB_LITE

TEST_F(DBTest2, IterRaceFlush1) {
ASSERT_OK(Put("foo", "v1"));

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
riversand963 marked this conversation as resolved.
Show resolved Hide resolved
{{"DBImpl::NewIterator:1", "DBTest2::IterRaceFlush:1"},
{"DBTest2::IterRaceFlush:2", "DBImpl::NewIterator:2"}});

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

ROCKSDB_NAMESPACE::port::Thread t1([&] {
TEST_SYNC_POINT("DBTest2::IterRaceFlush:1");
ASSERT_OK(Put("foo", "v2"));
Flush();
TEST_SYNC_POINT("DBTest2::IterRaceFlush:2");
});

// iterator is created after the first Put(), so it should see either
// "v1" or "v2".
{
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
it->Seek("foo");
ASSERT_TRUE(it->Valid());
ASSERT_EQ("foo", it->key().ToString());
}

t1.join();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}

TEST_F(DBTest2, IterRaceFlush2) {
ASSERT_OK(Put("foo", "v1"));

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::NewIterator:3", "DBTest2::IterRaceFlush2:1"},
{"DBTest2::IterRaceFlush2:2", "DBImpl::NewIterator:4"}});

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

ROCKSDB_NAMESPACE::port::Thread t1([&] {
TEST_SYNC_POINT("DBTest2::IterRaceFlush2:1");
ASSERT_OK(Put("foo", "v2"));
Flush();
TEST_SYNC_POINT("DBTest2::IterRaceFlush2:2");
});

// iterator is created after the first Put(), so it should see either
// "v1" or "v2".
{
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
it->Seek("foo");
ASSERT_TRUE(it->Valid());
ASSERT_EQ("foo", it->key().ToString());
}

t1.join();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}

TEST_F(DBTest2, IterRefreshRaceFlush) {
ASSERT_OK(Put("foo", "v1"));

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"ArenaWrappedDBIter::Refresh:1", "DBTest2::IterRefreshRaceFlush:1"},
{"DBTest2::IterRefreshRaceFlush:2", "ArenaWrappedDBIter::Refresh:2"}});

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

ROCKSDB_NAMESPACE::port::Thread t1([&] {
TEST_SYNC_POINT("DBTest2::IterRefreshRaceFlush:1");
ASSERT_OK(Put("foo", "v2"));
Flush();
TEST_SYNC_POINT("DBTest2::IterRefreshRaceFlush:2");
});

// iterator is created after the first Put(), so it should see either
// "v1" or "v2".
{
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
it->Refresh();
it->Seek("foo");
ASSERT_TRUE(it->Valid());
ASSERT_EQ("foo", it->key().ToString());
}

t1.join();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}

TEST_F(DBTest2, GetRaceFlush1) {
ASSERT_OK(Put("foo", "v1"));

Expand Down