Skip to content

Commit

Permalink
Add UT to test bg read qps behavior during upgrade for pr11406
Browse files Browse the repository at this point in the history
  • Loading branch information
hx235 committed Jun 8, 2023
1 parent 2b2994c commit 87644d5
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 26 deletions.
190 changes: 165 additions & 25 deletions file/prefetch_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ class MockFS;
class MockRandomAccessFile : public FSRandomAccessFileOwnerWrapper {
public:
MockRandomAccessFile(std::unique_ptr<FSRandomAccessFile>& file,
bool support_prefetch, std::atomic_int& prefetch_count)
bool support_prefetch, std::atomic_int& prefetch_count,
bool small_buffer_alignment = false)
: FSRandomAccessFileOwnerWrapper(std::move(file)),
support_prefetch_(support_prefetch),
prefetch_count_(prefetch_count) {}
prefetch_count_(prefetch_count),
small_buffer_alignment_(small_buffer_alignment) {}

IOStatus Prefetch(uint64_t offset, size_t n, const IOOptions& options,
IODebugContext* dbg) override {
Expand All @@ -40,16 +42,25 @@ class MockRandomAccessFile : public FSRandomAccessFileOwnerWrapper {
}
}

size_t GetRequiredBufferAlignment() const override {
return small_buffer_alignment_
? 1
: FSRandomAccessFileOwnerWrapper::GetRequiredBufferAlignment();
}

private:
const bool support_prefetch_;
std::atomic_int& prefetch_count_;
const bool small_buffer_alignment_;
};

class MockFS : public FileSystemWrapper {
public:
explicit MockFS(const std::shared_ptr<FileSystem>& wrapped,
bool support_prefetch)
: FileSystemWrapper(wrapped), support_prefetch_(support_prefetch) {}
bool support_prefetch, bool small_buffer_alignment = true)
: FileSystemWrapper(wrapped),
support_prefetch_(support_prefetch),
small_buffer_alignment_(small_buffer_alignment) {}

static const char* kClassName() { return "MockFS"; }
const char* Name() const override { return kClassName(); }
Expand All @@ -61,8 +72,8 @@ class MockFS : public FileSystemWrapper {
std::unique_ptr<FSRandomAccessFile> file;
IOStatus s;
s = target()->NewRandomAccessFile(fname, opts, &file, dbg);
result->reset(
new MockRandomAccessFile(file, support_prefetch_, prefetch_count_));
result->reset(new MockRandomAccessFile(
file, support_prefetch_, prefetch_count_, small_buffer_alignment_));
return s;
}

Expand All @@ -76,6 +87,7 @@ class MockFS : public FileSystemWrapper {

private:
const bool support_prefetch_;
const bool small_buffer_alignment_;
std::atomic_int prefetch_count_{0};
};

Expand All @@ -85,7 +97,8 @@ class PrefetchTest
public:
PrefetchTest() : DBTestBase("prefetch_test", true) {}

void SetGenericOptions(Env* env, bool use_direct_io, Options& options) {
virtual void SetGenericOptions(Env* env, bool use_direct_io,
Options& options) {
options = CurrentOptions();
options.write_buffer_size = 1024;
options.create_if_missing = true;
Expand Down Expand Up @@ -236,30 +249,79 @@ TEST_P(PrefetchTest, Basic) {
Close();
}

TEST_P(PrefetchTest, BlockBasedTableTailPrefetch) {
const bool support_prefetch =
std::get<0>(GetParam()) &&
test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
// Second param is if directIO is enabled or not
const bool use_direct_io = std::get<1>(GetParam());
const bool use_file_prefetch_buffer = !support_prefetch || use_direct_io;
class PrefetchTailTest : public PrefetchTest {
public:
bool SupportPrefetch() const {
return std::get<0>(GetParam()) &&
test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
}

std::shared_ptr<MockFS> fs =
std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
bool UseDirectIO() const { return std::get<1>(GetParam()); }

bool UseFilePrefetchBuffer() const {
return !SupportPrefetch() || UseDirectIO();
}

Env* GetEnv(bool small_buffer_alignment = false) const {
std::shared_ptr<MockFS> fs = std::make_shared<MockFS>(
env_->GetFileSystem(), SupportPrefetch(), small_buffer_alignment);

return new CompositeEnvWrapper(env_, fs);
}

void SetGenericOptions(Env* env, bool use_direct_io,
Options& options) override {
PrefetchTest::SetGenericOptions(env, use_direct_io, options);
options.statistics = CreateDBStatistics();
}

void SetBlockBasedTableOptions(
BlockBasedTableOptions& table_options, bool partition_filters = true,
uint64_t metadata_block_size =
BlockBasedTableOptions().metadata_block_size,
bool use_small_cache = false) {
table_options.index_type = BlockBasedTableOptions::kTwoLevelIndexSearch;
table_options.partition_filters = partition_filters;
if (table_options.partition_filters) {
table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
}
table_options.metadata_block_size = metadata_block_size;

if (use_small_cache) {
LRUCacheOptions co;
co.capacity = 1;
std::shared_ptr<Cache> cache = NewLRUCache(co);
table_options.block_cache = cache;
}
}

int64_t GetNumIndexPartition() const {
int64_t index_partition_counts = 0;
TablePropertiesCollection all_table_props;
assert(db_->GetPropertiesOfAllTables(&all_table_props).ok());
for (const auto& name_and_table_props : all_table_props) {
const auto& table_props = name_and_table_props.second;
index_partition_counts += table_props->index_partitions;
}
return index_partition_counts;
}
};

INSTANTIATE_TEST_CASE_P(PrefetchTailTest, PrefetchTailTest,
::testing::Combine(::testing::Bool(),
::testing::Bool()));

TEST_P(PrefetchTailTest, Basic) {
std::unique_ptr<Env> env(GetEnv());
Options options;
SetGenericOptions(env.get(), use_direct_io, options);
options.statistics = CreateDBStatistics();
SetGenericOptions(env.get(), UseDirectIO(), options);

BlockBasedTableOptions bbto;
bbto.index_type = BlockBasedTableOptions::kTwoLevelIndexSearch;
bbto.partition_filters = true;
bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
SetBlockBasedTableOptions(bbto);
options.table_factory.reset(NewBlockBasedTableFactory(bbto));

Status s = TryReopen(options);
if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
if (UseDirectIO() && (s.IsNotSupported() || s.IsInvalidArgument())) {
// If direct IO is not supported, skip the test
ROCKSDB_GTEST_BYPASS("Direct IO is not supported");
return;
Expand All @@ -276,7 +338,7 @@ TEST_P(PrefetchTest, BlockBasedTableTailPrefetch) {
HistogramData post_flush_file_read;
options.statistics->histogramData(FILE_READ_FLUSH_MICROS,
&post_flush_file_read);
if (use_file_prefetch_buffer) {
if (UseFilePrefetchBuffer()) {
// `PartitionedFilterBlockReader/PartitionIndexReader::CacheDependencies()`
// should read from the prefetched tail in file prefetch buffer instead of
// initiating extra SST reads. Therefore `BlockBasedTable::PrefetchTail()`
Expand All @@ -300,7 +362,7 @@ TEST_P(PrefetchTest, BlockBasedTableTailPrefetch) {
HistogramData post_compaction_file_read;
options.statistics->histogramData(FILE_READ_COMPACTION_MICROS,
&post_compaction_file_read);
if (use_file_prefetch_buffer) {
if (UseFilePrefetchBuffer()) {
// `PartitionedFilterBlockReader/PartitionIndexReader::CacheDependencies()`
// should read from the prefetched tail in file prefetch buffer instead of
// initiating extra SST reads.
Expand All @@ -323,6 +385,84 @@ TEST_P(PrefetchTest, BlockBasedTableTailPrefetch) {
Close();
}

TEST_P(PrefetchTailTest, UpgradeToTailSizeInManifest) {
if (!UseFilePrefetchBuffer()) {
ROCKSDB_GTEST_BYPASS(
"Upgrade to tail size in manifest is only relevant when RocksDB file "
"prefetch buffer is used.");
}
if (UseDirectIO()) {
ROCKSDB_GTEST_BYPASS(
"To simplify testing logics with setting file's buffer alignment to be "
"1, direct IO is required to be disabled.");
}

std::unique_ptr<Env> env(GetEnv(true /* small_buffer_alignment */));
Options options;
SetGenericOptions(env.get(), false /* use_direct_io*/, options);
options.max_open_files = -1;
options.write_buffer_size = 1024 * 1024;

BlockBasedTableOptions table_options;
SetBlockBasedTableOptions(table_options, false /* partition_filters */,
1 /* metadata_block_size*/,
true /* use_small_cache */);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));

SyncPoint::GetInstance()->EnableProcessing();
// To simulate a pre-upgrade DB where file tail size is not recorded in
// manifest
SyncPoint::GetInstance()->SetCallBack(
"FileMetaData::FileMetaData", [&](void* arg) {
FileMetaData* meta = static_cast<FileMetaData*>(arg);
meta->tail_size = 0;
});

ASSERT_OK(TryReopen(options));
for (int i = 0; i < 10000; ++i) {
ASSERT_OK(Put("k" + std::to_string(i), "v"));
}
ASSERT_OK(Flush());

SyncPoint::GetInstance()->ClearAllCallBacks();

// To simulate a DB undergoing the upgrade where tail size to prefetch is
// inferred to be a small number for files with no tail size recorded in
// manifest.
// "1" is chosen to be such number so that with `small_buffer_alignment ==
// true` and `use_small_cache == true`, it would have caused one file read per
// index partition during db open if the upgrade is done wrong.
SyncPoint::GetInstance()->SetCallBack(
"BlockBasedTable::Open::TailPrefetchLen", [&](void* arg) {
std::pair<size_t*, size_t*>* prefetch_off_len_pair =
static_cast<std::pair<size_t*, size_t*>*>(arg);
size_t* prefetch_off = prefetch_off_len_pair->first;
size_t* tail_size = prefetch_off_len_pair->second;
const size_t file_size = *prefetch_off + *tail_size;

*tail_size = 1;
*prefetch_off = file_size - (*tail_size);
});

ASSERT_OK(TryReopen(options));

SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();

HistogramData db_open_file_read;
options.statistics->histogramData(FILE_READ_DB_OPEN_MICROS,
&db_open_file_read);

int64_t num_index_partition = GetNumIndexPartition();
// If the upgrade is done right, db open will prefetch all the index
// partitions at once, instead of doing one read per partition.
// That is, together with `metadata_block_size == 1`, there will be more index
// partitions than number of non index partitions reads.
ASSERT_LT(db_open_file_read.count, num_index_partition);

Close();
}

// This test verifies BlockBasedTableOptions.max_auto_readahead_size is
// configured dynamically.
TEST_P(PrefetchTest, ConfigureAutoMaxReadaheadSize) {
Expand Down
7 changes: 6 additions & 1 deletion table/block_based/block_based_table_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -851,8 +851,13 @@ Status BlockBasedTable::PrefetchTail(
prefetch_off = static_cast<size_t>(file_size - tail_prefetch_size);
prefetch_len = tail_prefetch_size;
}

#ifndef NDEBUG
std::pair<size_t*, size_t*> prefetch_off_len_pair = {&prefetch_off,
&prefetch_len};
TEST_SYNC_POINT_CALLBACK("BlockBasedTable::Open::TailPrefetchLen",
&tail_prefetch_size);
&prefetch_off_len_pair);
#endif // NDEBUG

// Try file system prefetch
if (!file->use_direct_io() && !force_direct_prefetch) {
Expand Down

0 comments on commit 87644d5

Please sign in to comment.