Skip to content

Commit

Permalink
BlockBasedTableReader: automatically adjust tail prefetch size
Browse files Browse the repository at this point in the history
Summary: Right now we use one hard-coded prefetch size to prefetch data from the tail of the SST files. However, this may introduce a waste for some use cases, while not efficient for others.
Introduce a way to adjust this prefetch size by tracking 32 recent times, and pick a value with which the wasted read is less than 10%

Test Plan: Add some unit tests for functionality correctnes. Run strace against db_bench to verify it works end to end.

Reviewers:

fix

Fix a bug

Add comments and fix the test
  • Loading branch information
siying committed Jul 20, 2018
1 parent b561322 commit c73c1e5
Show file tree
Hide file tree
Showing 8 changed files with 334 additions and 21 deletions.
87 changes: 84 additions & 3 deletions db/db_test2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2321,9 +2321,9 @@ TEST_F(DBTest2, RateLimitedCompactionReads) {
options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW);
// Include the explicit prefetch of the footer in direct I/O case.
size_t direct_io_extra = use_direct_io ? 512 * 1024 : 0;
ASSERT_GE(rate_limited_bytes,
static_cast<size_t>(kNumKeysPerFile * kBytesPerKey * kNumL0Files +
direct_io_extra));
ASSERT_GE(
rate_limited_bytes,
static_cast<size_t>(kNumKeysPerFile * kBytesPerKey * kNumL0Files));
ASSERT_LT(
rate_limited_bytes,
static_cast<size_t>(2 * kNumKeysPerFile * kBytesPerKey * kNumL0Files +
Expand Down Expand Up @@ -2547,6 +2547,87 @@ TEST_F(DBTest2, PinnableSliceAndMmapReads) {
#endif
}

TEST_F(DBTest2, TestBBTTailPrefetch) {
std::atomic<bool> called(false);
size_t expected_lower_bound = 512 * 1024;
size_t expected_higher_bound = 512 * 1024;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"BlockBasedTable::Open::TailPrefetchLen", [&](void* arg) {
size_t* prefetch_size = static_cast<size_t*>(arg);
EXPECT_LE(expected_lower_bound, *prefetch_size);
EXPECT_GE(expected_higher_bound, *prefetch_size);
called = true;
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();

Put("1", "1");
Put("9", "1");
Flush();

expected_lower_bound = 0;
expected_higher_bound = 8 * 1024;

Put("1", "1");
Put("9", "1");
Flush();

Put("1", "1");
Put("9", "1");
Flush();

ASSERT_TRUE(called.load());
called = false;

rocksdb::SyncPoint::GetInstance()->DisableProcessing();
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();

std::atomic<bool> first_call(true);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"BlockBasedTable::Open::TailPrefetchLen", [&](void* arg) {
size_t* prefetch_size = static_cast<size_t*>(arg);
if (first_call) {
EXPECT_EQ(8 * 1024, *prefetch_size);
first_call = false;
} else {
EXPECT_GE(4 * 1024, *prefetch_size);
}
called = true;
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();

Options options = CurrentOptions();
options.max_file_opening_threads = 1; // one thread
BlockBasedTableOptions table_options;
table_options.cache_index_and_filter_blocks = true;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.max_open_files = -1;
Reopen(options);

ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));

Put("1", "1");
Put("9", "1");
Flush();

Put("1", "1");
Put("9", "1");
Flush();

ASSERT_TRUE(called.load());
called = false;

// Parallel loading SST files
options.max_file_opening_threads = 16;
Reopen(options);

ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));

ASSERT_TRUE(called.load());

rocksdb::SyncPoint::GetInstance()->DisableProcessing();
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
}

} // namespace rocksdb

int main(int argc, char** argv) {
Expand Down
133 changes: 132 additions & 1 deletion table/block_based_table_factory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,140 @@
#include "table/block_based_table_builder.h"
#include "table/block_based_table_reader.h"
#include "table/format.h"
#include "util/mutexlock.h"
#include "util/string_util.h"

namespace rocksdb {

void TailPrefetchStats::RecordEffectiveSize(size_t len) {
MutexLock l(&mutex_);
if (num_records_ < kNumTracked) {
num_records_++;
}
records_[next_++] = len;
if (next_ == kNumTracked) {
next_ = 0;
}
}

size_t TailPrefetchStats::GetSuggestedPrefetchSize() {
std::vector<size_t> sorted;
{
MutexLock l(&mutex_);

if (num_records_ == 0) {
return 0;
}
sorted.assign(records_, records_ + num_records_);
}

// Of the historic size, we find the maximum one that satisifis the condtiion
// that if prefetching all, less than 1/8 will be wasted.
std::sort(sorted.begin(), sorted.end());

// Assuming we have 5 data points, and after sorting it looks like this:
//
// +---+
// +---+ | |
// | | | |
// | | | |
// | | | |
// | | | |
// +---+ | | | |
// | | | | | |
// +---+ | | | | | |
// | | | | | | | |
// +---+ | | | | | | | |
// | | | | | | | | | |
// | | | | | | | | | |
// | | | | | | | | | |
// | | | | | | | | | |
// | | | | | | | | | |
// +---+ +---+ +---+ +---+ +---+
//
// and we use every of the value as a candidate, and estimate how much we
// wasted, compared to read. For example, when we use the 3rd record
// as candiate. This area is what we read:
// +---+
// +---+ | |
// | | | |
// | | | |
// | | | |
// | | | |
// *** *** *** ***+ *** *** *** *** **
// * | | | | | |
// +---+ | | | | | *
// * | | | | | | | |
// +---+ | | | | | | | *
// * | | | | X | | | | |
// | | | | | | | | | *
// * | | | | | | | | |
// | | | | | | | | | *
// * | | | | | | | | |
// *** *** ***-*** ***--*** ***--*** +****
// which is (size of the record) X (number of records).
//
// While wasted is this area:
// +---+
// +---+ | |
// | | | |
// | | | |
// | | | |
// | | | |
// *** *** *** ****---+ | | | |
// * * | | | | |
// * *-*** *** | | | | |
// * * | | | | | | |
// *--** *** | | | | | | |
// | | | | | X | | | | |
// | | | | | | | | | |
// | | | | | | | | | |
// | | | | | | | | | |
// | | | | | | | | | |
// +---+ +---+ +---+ +---+ +---+
//
// Which can be calculated iteratively.
// The difference between wasted using 4st and 3rd record, will
// be following area:
// +---+
// +--+ +-+ ++ +-+ +-+ +---+ | |
// + xxxxxxxxxxxxxxxxxxxxxxxx | | | |
// xxxxxxxxxxxxxxxxxxxxxxxx | | | |
// + xxxxxxxxxxxxxxxxxxxxxxxx | | | |
// | xxxxxxxxxxxxxxxxxxxxxxxx | | | |
// +-+ +-+ +-+ ++ +---+ +--+ | | |
// | | | | | | |
// +---+ ++ | | | | | |
// | | | | | | X | | |
// +---+ ++ | | | | | | | |
// | | | | | | | | | |
// | | | | | | | | | |
// | | | | | | | | | |
// | | | | | | | | | |
// | | | | | | | | | |
// +---+ +---+ +---+ +---+ +---+
//
// which will be the size difference between 4st and 3rd record,
// times 3, which is number of records before the 4st.
// Here we assume that all data within the prefetch range will be useful. In
// reality, it may not be the case when a partial block is inside the range,
// or there are data in the middle that is not read. We ignore those cases
// for simplicity.
size_t prev_size = sorted[0];
size_t max_qualified_size = sorted[0];
size_t wasted = 0;
for (size_t i = 1; i < sorted.size(); i++) {
size_t read = sorted[i] * sorted.size();
wasted += (sorted[i] - prev_size) * i;
if (wasted <= read / 8) {
max_qualified_size = sorted[i];
}
prev_size = sorted[i];
}
const size_t kMaxPrefetchSize = 512 * 1024; // Never exceed 512KB
return std::min(kMaxPrefetchSize, max_qualified_size);
}

BlockBasedTableFactory::BlockBasedTableFactory(
const BlockBasedTableOptions& _table_options)
: table_options_(_table_options) {
Expand Down Expand Up @@ -71,7 +201,8 @@ Status BlockBasedTableFactory::NewTableReader(
table_options_, table_reader_options.internal_comparator, std::move(file),
file_size, table_reader, table_reader_options.prefix_extractor,
prefetch_index_and_filter_in_cache, table_reader_options.skip_filters,
table_reader_options.level, table_reader_options.immortal);
table_reader_options.level, table_reader_options.immortal,
&tail_prefetch_stats_);
}

TableBuilder* BlockBasedTableFactory::NewTableBuilder(
Expand Down
17 changes: 17 additions & 0 deletions table/block_based_table_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,22 @@ struct EnvOptions;
using std::unique_ptr;
class BlockBasedTableBuilder;

// A class used to track actual bytes written from the tail in the recent SST
// file opens, and provide a suggestion for following open.
class TailPrefetchStats {
public:
void RecordEffectiveSize(size_t len);
// 0 indicates no information to determine.
size_t GetSuggestedPrefetchSize();

private:
const static size_t kNumTracked = 32;
size_t records_[kNumTracked];
port::Mutex mutex_;
size_t next_ = 0;
size_t num_records_ = 0;
};

class BlockBasedTableFactory : public TableFactory {
public:
explicit BlockBasedTableFactory(
Expand Down Expand Up @@ -64,6 +80,7 @@ class BlockBasedTableFactory : public TableFactory {

private:
BlockBasedTableOptions table_options_;
mutable TailPrefetchStats tail_prefetch_stats_;
};

extern const std::string kHashIndexPrefixesBlock;
Expand Down
44 changes: 31 additions & 13 deletions table/block_based_table_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,8 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
const SliceTransform* prefix_extractor,
const bool prefetch_index_and_filter_in_cache,
const bool skip_filters, const int level,
const bool immortal_table) {
const bool immortal_table,
TailPrefetchStats* tail_prefetch_stats) {
table_reader->reset();

Footer footer;
Expand All @@ -741,29 +742,40 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
// prefetch both index and filters, down to all partitions
const bool prefetch_all = prefetch_index_and_filter_in_cache || level == 0;
const bool preload_all = !table_options.cache_index_and_filter_blocks;
// Before read footer, readahead backwards to prefetch data. Do more readahead
// if we're going to read index/filter.
// TODO: This may incorrectly select small readahead in case partitioned
// index/filter is enabled and top-level partition pinning is enabled. That's
// because we need to issue readahead before we read the properties, at which
// point we don't yet know the index type.
const size_t kTailPrefetchSize =
prefetch_all || preload_all ? 512 * 1024 : 4 * 1024;

size_t tail_prefetch_size = 0;
if (tail_prefetch_stats != nullptr) {
// Multiple threads may get a 0 (no history) when running in parallel,
// but it will get cleared after the first of them finishes.
tail_prefetch_size = tail_prefetch_stats->GetSuggestedPrefetchSize();
}
if (tail_prefetch_size == 0) {
// Before read footer, readahead backwards to prefetch data. Do more readahead
// if we're going to read index/filter.
// TODO: This may incorrectly select small readahead in case partitioned
// index/filter is enabled and top-level partition pinning is enabled. That's
// because we need to issue readahead before we read the properties, at which
// point we don't yet know the index type.
tail_prefetch_size = prefetch_all || preload_all ? 512 * 1024 : 4 * 1024;
}
size_t prefetch_off;
size_t prefetch_len;
if (file_size < kTailPrefetchSize) {
if (file_size < tail_prefetch_size) {
prefetch_off = 0;
prefetch_len = static_cast<size_t>(file_size);
} else {
prefetch_off = static_cast<size_t>(file_size - kTailPrefetchSize);
prefetch_len = kTailPrefetchSize;
prefetch_off = static_cast<size_t>(file_size - tail_prefetch_size);
prefetch_len = tail_prefetch_size;
}
TEST_SYNC_POINT_CALLBACK("BlockBasedTable::Open::TailPrefetchLen",
&tail_prefetch_size);
Status s;
// TODO should not have this special logic in the future.
if (!file->use_direct_io()) {
prefetch_buffer.reset(new FilePrefetchBuffer(nullptr, 0, 0, false, true));
s = file->Prefetch(prefetch_off, prefetch_len);
} else {
prefetch_buffer.reset(new FilePrefetchBuffer());
prefetch_buffer.reset(new FilePrefetchBuffer(nullptr, 0, 0, true, true));
s = prefetch_buffer->Prefetch(file.get(), prefetch_off, prefetch_len);
}
s = ReadFooterFromFile(file.get(), prefetch_buffer.get(), file_size, &footer,
Expand Down Expand Up @@ -1060,6 +1072,12 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
}

if (s.ok()) {
assert(prefetch_buffer.get() != nullptr);
if (tail_prefetch_stats != nullptr) {
assert(prefetch_buffer->min_offset_read() < file_size);
tail_prefetch_stats->RecordEffectiveSize(
file_size - prefetch_buffer->min_offset_read());
}
*table_reader = std::move(new_table);
}

Expand Down
4 changes: 3 additions & 1 deletion table/block_based_table_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "rocksdb/status.h"
#include "rocksdb/table.h"
#include "table/block.h"
#include "table/block_based_table_factory.h"
#include "table/filter_block.h"
#include "table/format.h"
#include "table/persistent_cache_helper.h"
Expand Down Expand Up @@ -93,7 +94,8 @@ class BlockBasedTable : public TableReader {
const SliceTransform* prefix_extractor = nullptr,
bool prefetch_index_and_filter_in_cache = true,
bool skip_filters = false, int level = -1,
const bool immortal_table = false);
const bool immortal_table = false,
TailPrefetchStats* tail_prefetch_stats = nullptr);

bool PrefixMayMatch(const Slice& internal_key,
const ReadOptions& read_options,
Expand Down
Loading

0 comments on commit c73c1e5

Please sign in to comment.