Skip to content

Commit

Permalink
Merge pull request #87 from jbowens/jackson/19.1-rocksdb6973
Browse files Browse the repository at this point in the history
backport of facebook#6973
  • Loading branch information
jbowens authored Jun 22, 2020
2 parents caa3e9e + 364ab2b commit e88116e
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 7 deletions.
30 changes: 25 additions & 5 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1700,7 +1700,8 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
" guaranteed to be preserved, try larger iter_start_seqnum opt."));
}
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
ColumnFamilyData* cfd = cfh->cfd();
assert(cfd != nullptr);
ReadCallback* read_callback = nullptr; // No read callback provided.
if (read_options.tailing) {
#ifdef ROCKSDB_LITE
Expand All @@ -1720,10 +1721,11 @@ if (read_options.tailing) {
// Note: no need to consider the special case of
// last_seq_same_as_publish_seq_==false since NewIterator is overridden in
// WritePreparedTxnDB
auto snapshot = read_options.snapshot != nullptr
? read_options.snapshot->GetSequenceNumber()
: versions_->LastSequence();
result = NewIteratorImpl(read_options, cfd, snapshot, read_callback);
result = NewIteratorImpl(read_options, cfd,
(read_options.snapshot != nullptr)
? read_options.snapshot->GetSequenceNumber()
: kMaxSequenceNumber,
read_callback);
}
return result;
}
Expand All @@ -1736,6 +1738,24 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
bool allow_refresh) {
SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);

TEST_SYNC_POINT("DBImpl::NewIterator:1");
TEST_SYNC_POINT("DBImpl::NewIterator:2");

if (snapshot == kMaxSequenceNumber) {
// Note that the snapshot is assigned AFTER referencing the super
// version because otherwise a flush happening in between may compact away
// data for the snapshot, so the reader would see neither data that was be
// visible to the snapshot before compaction nor the newer data inserted
// afterwards.
// Note that the super version might not contain all the data available
// to this snapshot, but in that case it can see all the data in the
// super version, which is a valid consistent state after the user
// calls NewIterator().
snapshot = versions_->LastSequence();
TEST_SYNC_POINT("DBImpl::NewIterator:3");
TEST_SYNC_POINT("DBImpl::NewIterator:4");
}

// Try to generate a DB iterator tree in continuous memory area to be
// cache friendly. Here is an example of result:
// +-------------------------------+
Expand Down
2 changes: 2 additions & 0 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ class DBImpl : public DB {
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators) override;

// If `snapshot` == kMaxSequenceNumber, set a recent one inside the file.
ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options,
ColumnFamilyData* cfd,
SequenceNumber snapshot,
Expand Down
6 changes: 4 additions & 2 deletions db/db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1543,15 +1543,17 @@ Status ArenaWrappedDBIter::Refresh() {
// TODO(yiwu): For last_seq_same_as_publish_seq_==false, this is not the
// correct behavior. Will be corrected automatically when we take a snapshot
// here for the case of WritePreparedTxnDB.
SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber();
uint64_t cur_sv_number = cfd_->GetSuperVersionNumber();
TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:1");
TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:2");
if (sv_number_ != cur_sv_number) {
Env* env = db_iter_->env();
db_iter_->~DBIter();
arena_.~Arena();
new (&arena_) Arena();

SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_->mutex());
SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber();
Init(env, read_options_, *(cfd_->ioptions()), sv->mutable_cf_options,
latest_seq, sv->mutable_cf_options.max_sequential_skip_in_iterations,
cur_sv_number, read_callback_, db_impl_, cfd_, allow_blob_,
Expand All @@ -1561,7 +1563,7 @@ Status ArenaWrappedDBIter::Refresh() {
read_options_, cfd_, sv, &arena_, db_iter_->GetRangeDelAggregator());
SetIterUnderDBIter(internal_iter);
} else {
db_iter_->set_sequence(latest_seq);
db_iter_->set_sequence(db_impl_->GetLatestSequenceNumber());
db_iter_->set_valid(false);
}
return Status::OK();
Expand Down
88 changes: 88 additions & 0 deletions db/db_test2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2090,6 +2090,94 @@ TEST_F(DBTest2, OptimizeForPointLookup) {

#endif // ROCKSDB_LITE

TEST_F(DBTest2, IterRaceFlush1) {
ASSERT_OK(Put("foo", "v1"));

rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::NewIterator:1", "DBTest2::IterRaceFlush:1"},
{"DBTest2::IterRaceFlush:2", "DBImpl::NewIterator:2"}});

rocksdb::SyncPoint::GetInstance()->EnableProcessing();

rocksdb::port::Thread t1([&] {
TEST_SYNC_POINT("DBTest2::IterRaceFlush:1");
ASSERT_OK(Put("foo", "v2"));
Flush();
TEST_SYNC_POINT("DBTest2::IterRaceFlush:2");
});

// iterator is created after the first Put(), so it should see either
// "v1" or "v2".
{
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
it->Seek("foo");
ASSERT_TRUE(it->Valid());
ASSERT_EQ("foo", it->key().ToString());
}

t1.join();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}

TEST_F(DBTest2, IterRaceFlush2) {
ASSERT_OK(Put("foo", "v1"));

rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::NewIterator:3", "DBTest2::IterRaceFlush2:1"},
{"DBTest2::IterRaceFlush2:2", "DBImpl::NewIterator:4"}});

rocksdb::SyncPoint::GetInstance()->EnableProcessing();

rocksdb::port::Thread t1([&] {
TEST_SYNC_POINT("DBTest2::IterRaceFlush2:1");
ASSERT_OK(Put("foo", "v2"));
Flush();
TEST_SYNC_POINT("DBTest2::IterRaceFlush2:2");
});

// iterator is created after the first Put(), so it should see either
// "v1" or "v2".
{
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
it->Seek("foo");
ASSERT_TRUE(it->Valid());
ASSERT_EQ("foo", it->key().ToString());
}

t1.join();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}

TEST_F(DBTest2, IterRefreshRaceFlush) {
ASSERT_OK(Put("foo", "v1"));

rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"ArenaWrappedDBIter::Refresh:1", "DBTest2::IterRefreshRaceFlush:1"},
{"DBTest2::IterRefreshRaceFlush:2", "ArenaWrappedDBIter::Refresh:2"}});

rocksdb::SyncPoint::GetInstance()->EnableProcessing();

rocksdb::port::Thread t1([&] {
TEST_SYNC_POINT("DBTest2::IterRefreshRaceFlush:1");
ASSERT_OK(Put("foo", "v2"));
Flush();
TEST_SYNC_POINT("DBTest2::IterRefreshRaceFlush:2");
});

// iterator is created after the first Put(), so it should see either
// "v1" or "v2".
{
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
it->Refresh();
it->Seek("foo");
ASSERT_TRUE(it->Valid());
ASSERT_EQ("foo", it->key().ToString());
}

t1.join();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}

TEST_F(DBTest2, GetRaceFlush1) {
ASSERT_OK(Put("foo", "v1"));

Expand Down

0 comments on commit e88116e

Please sign in to comment.