Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BlockBasedTableReader: automatically adjust tail prefetch size #4156

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 85 additions & 3 deletions db/db_test2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2321,9 +2321,9 @@ TEST_F(DBTest2, RateLimitedCompactionReads) {
options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW);
// Include the explicit prefetch of the footer in direct I/O case.
size_t direct_io_extra = use_direct_io ? 512 * 1024 : 0;
ASSERT_GE(rate_limited_bytes,
static_cast<size_t>(kNumKeysPerFile * kBytesPerKey * kNumL0Files +
direct_io_extra));
ASSERT_GE(
rate_limited_bytes,
static_cast<size_t>(kNumKeysPerFile * kBytesPerKey * kNumL0Files));
ASSERT_LT(
rate_limited_bytes,
static_cast<size_t>(2 * kNumKeysPerFile * kBytesPerKey * kNumL0Files +
Expand Down Expand Up @@ -2547,6 +2547,88 @@ TEST_F(DBTest2, PinnableSliceAndMmapReads) {
#endif
}

TEST_F(DBTest2, TestBBTTailPrefetch) {
std::atomic<bool> called(false);
size_t expected_lower_bound = 512 * 1024;
size_t expected_higher_bound = 512 * 1024;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"BlockBasedTable::Open::TailPrefetchLen", [&](void* arg) {
size_t* prefetch_size = static_cast<size_t*>(arg);
EXPECT_LE(expected_lower_bound, *prefetch_size);
EXPECT_GE(expected_higher_bound, *prefetch_size);
called = true;
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();

Put("1", "1");
Put("9", "1");
Flush();

expected_lower_bound = 0;
expected_higher_bound = 8 * 1024;

Put("1", "1");
Put("9", "1");
Flush();

Put("1", "1");
Put("9", "1");
Flush();

// Full compaction to make sure there is no L0 file after the open.
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));

ASSERT_TRUE(called.load());
called = false;

rocksdb::SyncPoint::GetInstance()->DisableProcessing();
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();

std::atomic<bool> first_call(true);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"BlockBasedTable::Open::TailPrefetchLen", [&](void* arg) {
size_t* prefetch_size = static_cast<size_t*>(arg);
if (first_call) {
EXPECT_EQ(4 * 1024, *prefetch_size);
first_call = false;
} else {
EXPECT_GE(4 * 1024, *prefetch_size);
}
called = true;
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();

Options options = CurrentOptions();
options.max_file_opening_threads = 1; // one thread
BlockBasedTableOptions table_options;
table_options.cache_index_and_filter_blocks = true;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.max_open_files = -1;
Reopen(options);

Put("1", "1");
Put("9", "1");
Flush();

Put("1", "1");
Put("9", "1");
Flush();

ASSERT_TRUE(called.load());
called = false;

// Parallel loading SST files
options.max_file_opening_threads = 16;
Reopen(options);

ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));

ASSERT_TRUE(called.load());

rocksdb::SyncPoint::GetInstance()->DisableProcessing();
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
}

} // namespace rocksdb

int main(int argc, char** argv) {
Expand Down
134 changes: 133 additions & 1 deletion table/block_based_table_factory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,141 @@
#include "table/block_based_table_builder.h"
#include "table/block_based_table_reader.h"
#include "table/format.h"
#include "util/mutexlock.h"
#include "util/string_util.h"

namespace rocksdb {

void TailPrefetchStats::RecordEffectiveSize(size_t len) {
MutexLock l(&mutex_);
if (num_records_ < kNumTracked) {
num_records_++;
}
records_[next_++] = len;
if (next_ == kNumTracked) {
next_ = 0;
}
}

size_t TailPrefetchStats::GetSuggestedPrefetchSize() {
std::vector<size_t> sorted;
{
MutexLock l(&mutex_);

if (num_records_ == 0) {
return 0;
}
sorted.assign(records_, records_ + num_records_);
}

// Of the historic size, we find the maximum one that satisifis the condtiion
// that if prefetching all, less than 1/8 will be wasted.
std::sort(sorted.begin(), sorted.end());

// Assuming we have 5 data points, and after sorting it looks like this:
//
// +---+
// +---+ | |
// | | | |
// | | | |
// | | | |
// | | | |
// +---+ | | | |
// | | | | | |
// +---+ | | | | | |
// | | | | | | | |
// +---+ | | | | | | | |
// | | | | | | | | | |
// | | | | | | | | | |
// | | | | | | | | | |
// | | | | | | | | | |
// | | | | | | | | | |
// +---+ +---+ +---+ +---+ +---+
//
// and we use every of the value as a candidate, and estimate how much we
// wasted, compared to read. For example, when we use the 3rd record
// as candiate. This area is what we read:
// +---+
// +---+ | |
// | | | |
// | | | |
// | | | |
// | | | |
// *** *** *** ***+ *** *** *** *** **
// * | | | | | |
// +---+ | | | | | *
// * | | | | | | | |
// +---+ | | | | | | | *
// * | | | | X | | | | |
// | | | | | | | | | *
// * | | | | | | | | |
// | | | | | | | | | *
// * | | | | | | | | |
// *** *** ***-*** ***--*** ***--*** +****
// which is (size of the record) X (number of records).
//
// While wasted is this area:
// +---+
// +---+ | |
// | | | |
// | | | |
// | | | |
// | | | |
// *** *** *** ****---+ | | | |
// * * | | | | |
// * *-*** *** | | | | |
// * * | | | | | | |
// *--** *** | | | | | | |
// | | | | | X | | | | |
// | | | | | | | | | |
// | | | | | | | | | |
// | | | | | | | | | |
// | | | | | | | | | |
// +---+ +---+ +---+ +---+ +---+
//
// Which can be calculated iteratively.
// The difference between wasted using 4st and 3rd record, will
// be following area:
// +---+
// +--+ +-+ ++ +-+ +-+ +---+ | |
// + xxxxxxxxxxxxxxxxxxxxxxxx | | | |
// xxxxxxxxxxxxxxxxxxxxxxxx | | | |
// + xxxxxxxxxxxxxxxxxxxxxxxx | | | |
// | xxxxxxxxxxxxxxxxxxxxxxxx | | | |
// +-+ +-+ +-+ ++ +---+ +--+ | | |
// | | | | | | |
// +---+ ++ | | | | | |
// | | | | | | X | | |
// +---+ ++ | | | | | | | |
// | | | | | | | | | |
// | | | | | | | | | |
// | | | | | | | | | |
// | | | | | | | | | |
// | | | | | | | | | |
// +---+ +---+ +---+ +---+ +---+
//
// which will be the size difference between 4st and 3rd record,
// times 3, which is number of records before the 4st.
// Here we assume that all data within the prefetch range will be useful. In
// reality, it may not be the case when a partial block is inside the range,
// or there are data in the middle that is not read. We ignore those cases
// for simplicity.
assert(!sorted.empty());
size_t prev_size = sorted[0];
size_t max_qualified_size = sorted[0];
size_t wasted = 0;
for (size_t i = 1; i < sorted.size(); i++) {
size_t read = sorted[i] * sorted.size();
wasted += (sorted[i] - prev_size) * i;
if (wasted <= read / 8) {
max_qualified_size = sorted[i];
}
prev_size = sorted[i];
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe its just me, but this for loop was somehow more complicated for me to understand.

My thought process was: For read, the value is multiplied by the array size. But for wasted, you are only multiplying with the current index ... why not multiply by the array size here too similar to read? Ultimately I realized that the wasted space for all subsequent reads should be considered as 0; and hence the algorithm works. (You did mention in the comments, but somehow it wasn't immediately obvious even after reading the comment).

const size_t kMaxPrefetchSize = 512 * 1024; // Never exceed 512KB
return std::min(kMaxPrefetchSize, max_qualified_size);
}

BlockBasedTableFactory::BlockBasedTableFactory(
const BlockBasedTableOptions& _table_options)
: table_options_(_table_options) {
Expand Down Expand Up @@ -71,7 +202,8 @@ Status BlockBasedTableFactory::NewTableReader(
table_options_, table_reader_options.internal_comparator, std::move(file),
file_size, table_reader, table_reader_options.prefix_extractor,
prefetch_index_and_filter_in_cache, table_reader_options.skip_filters,
table_reader_options.level, table_reader_options.immortal);
table_reader_options.level, table_reader_options.immortal,
&tail_prefetch_stats_);
}

TableBuilder* BlockBasedTableFactory::NewTableBuilder(
Expand Down
17 changes: 17 additions & 0 deletions table/block_based_table_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,22 @@ struct EnvOptions;
using std::unique_ptr;
class BlockBasedTableBuilder;

// A class used to track actual bytes written from the tail in the recent SST
// file opens, and provide a suggestion for following open.
class TailPrefetchStats {
public:
void RecordEffectiveSize(size_t len);
// 0 indicates no information to determine.
size_t GetSuggestedPrefetchSize();

private:
const static size_t kNumTracked = 32;
size_t records_[kNumTracked];
port::Mutex mutex_;
size_t next_ = 0;
size_t num_records_ = 0;
};

class BlockBasedTableFactory : public TableFactory {
public:
explicit BlockBasedTableFactory(
Expand Down Expand Up @@ -64,6 +80,7 @@ class BlockBasedTableFactory : public TableFactory {

private:
BlockBasedTableOptions table_options_;
mutable TailPrefetchStats tail_prefetch_stats_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be useful to export these stats and see a distribution (for later; not immediately needed).

};

extern const std::string kHashIndexPrefixesBlock;
Expand Down
44 changes: 31 additions & 13 deletions table/block_based_table_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,8 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
const SliceTransform* prefix_extractor,
const bool prefetch_index_and_filter_in_cache,
const bool skip_filters, const int level,
const bool immortal_table) {
const bool immortal_table,
TailPrefetchStats* tail_prefetch_stats) {
table_reader->reset();

Footer footer;
Expand All @@ -741,29 +742,40 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
// prefetch both index and filters, down to all partitions
const bool prefetch_all = prefetch_index_and_filter_in_cache || level == 0;
const bool preload_all = !table_options.cache_index_and_filter_blocks;
// Before read footer, readahead backwards to prefetch data. Do more readahead
// if we're going to read index/filter.
// TODO: This may incorrectly select small readahead in case partitioned
// index/filter is enabled and top-level partition pinning is enabled. That's
// because we need to issue readahead before we read the properties, at which
// point we don't yet know the index type.
const size_t kTailPrefetchSize =
prefetch_all || preload_all ? 512 * 1024 : 4 * 1024;

size_t tail_prefetch_size = 0;
if (tail_prefetch_stats != nullptr) {
// Multiple threads may get a 0 (no history) when running in parallel,
// but it will get cleared after the first of them finishes.
tail_prefetch_size = tail_prefetch_stats->GetSuggestedPrefetchSize();
}
if (tail_prefetch_size == 0) {
// Before read footer, readahead backwards to prefetch data. Do more
// readahead if we're going to read index/filter.
// TODO: This may incorrectly select small readahead in case partitioned
// index/filter is enabled and top-level partition pinning is enabled.
// That's because we need to issue readahead before we read the properties,
// at which point we don't yet know the index type.
tail_prefetch_size = prefetch_all || preload_all ? 512 * 1024 : 4 * 1024;
}
size_t prefetch_off;
size_t prefetch_len;
if (file_size < kTailPrefetchSize) {
if (file_size < tail_prefetch_size) {
prefetch_off = 0;
prefetch_len = static_cast<size_t>(file_size);
} else {
prefetch_off = static_cast<size_t>(file_size - kTailPrefetchSize);
prefetch_len = kTailPrefetchSize;
prefetch_off = static_cast<size_t>(file_size - tail_prefetch_size);
prefetch_len = tail_prefetch_size;
}
TEST_SYNC_POINT_CALLBACK("BlockBasedTable::Open::TailPrefetchLen",
&tail_prefetch_size);
Status s;
// TODO should not have this special logic in the future.
if (!file->use_direct_io()) {
prefetch_buffer.reset(new FilePrefetchBuffer(nullptr, 0, 0, false, true));
s = file->Prefetch(prefetch_off, prefetch_len);
} else {
prefetch_buffer.reset(new FilePrefetchBuffer());
prefetch_buffer.reset(new FilePrefetchBuffer(nullptr, 0, 0, true, true));
s = prefetch_buffer->Prefetch(file.get(), prefetch_off, prefetch_len);
}
s = ReadFooterFromFile(file.get(), prefetch_buffer.get(), file_size, &footer,
Expand Down Expand Up @@ -1060,6 +1072,12 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
}

if (s.ok()) {
assert(prefetch_buffer.get() != nullptr);
if (tail_prefetch_stats != nullptr) {
assert(prefetch_buffer->min_offset_read() < file_size);
tail_prefetch_stats->RecordEffectiveSize(
file_size - prefetch_buffer->min_offset_read());
}
*table_reader = std::move(new_table);
}

Expand Down
4 changes: 3 additions & 1 deletion table/block_based_table_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "rocksdb/status.h"
#include "rocksdb/table.h"
#include "table/block.h"
#include "table/block_based_table_factory.h"
#include "table/filter_block.h"
#include "table/format.h"
#include "table/persistent_cache_helper.h"
Expand Down Expand Up @@ -93,7 +94,8 @@ class BlockBasedTable : public TableReader {
const SliceTransform* prefix_extractor = nullptr,
bool prefetch_index_and_filter_in_cache = true,
bool skip_filters = false, int level = -1,
const bool immortal_table = false);
const bool immortal_table = false,
TailPrefetchStats* tail_prefetch_stats = nullptr);

bool PrefixMayMatch(const Slice& internal_key,
const ReadOptions& read_options,
Expand Down
Loading