Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ResourceManagement] Account for dictionary-building buffer in global memory limit #8428

Closed
wants to merge 13 commits into from
2 changes: 2 additions & 0 deletions cache/cache_entry_roles.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ std::array<const char*, kNumCacheEntryRoles> kCacheEntryRoleToCamelString{{
"IndexBlock",
"OtherBlock",
"WriteBuffer",
"CompressionDictionaryBuildingBuffer",
"Misc",
}};

Expand All @@ -30,6 +31,7 @@ std::array<const char*, kNumCacheEntryRoles> kCacheEntryRoleToHyphenString{{
"index-block",
"other-block",
"write-buffer",
"compression-dictionary-building-buffer",
"misc",
}};

Expand Down
5 changes: 5 additions & 0 deletions cache/cache_entry_roles.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
namespace ROCKSDB_NAMESPACE {

// Classifications of block cache entries, for reporting statistics
// Adding new enum to this class requires corresponding updates to
// kCacheEntryRoleToCamelString and kCacheEntryRoleToHyphenString
hx235 marked this conversation as resolved.
Show resolved Hide resolved
enum class CacheEntryRole {
// Block-based table data block
kDataBlock,
Expand All @@ -29,6 +31,9 @@ enum class CacheEntryRole {
kOtherBlock,
// WriteBufferManager reservations to account for memtable usage
kWriteBuffer,
// BlockBasedTableBuilder reservations to account for
// compression dictionary building buffer's memory usage
kCompressionDictionaryBuildingBuffer,
// Default bucket, for miscellaneous cache entries. Do not use for
// entries that could potentially add up to large usage.
kMisc,
Expand Down
3 changes: 3 additions & 0 deletions cache/cache_reservation_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ Status CacheReservationManager::UpdateCacheReservation(
// This makes it possible to keep the template definitions in the .cc file.
template Status CacheReservationManager::UpdateCacheReservation<
CacheEntryRole::kWriteBuffer>(std::size_t new_mem_used);
template Status CacheReservationManager::UpdateCacheReservation<
CacheEntryRole::kCompressionDictionaryBuildingBuffer>(
std::size_t new_mem_used);
// For cache reservation manager unit tests
template Status CacheReservationManager::UpdateCacheReservation<
CacheEntryRole::kMisc>(std::size_t new_mem_used);
Expand Down
38 changes: 33 additions & 5 deletions table/block_based/block_based_table_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include <unordered_map>
#include <utility>

#include "cache/cache_entry_roles.h"
#include "cache/cache_reservation_manager.h"
#include "db/dbformat.h"
#include "index_builder.h"
#include "memory/memory_allocator.h"
Expand Down Expand Up @@ -312,7 +314,7 @@ struct BlockBasedTableBuilder::Rep {
// `kBuffered` state is allowed only as long as the buffering of uncompressed
// data blocks (see `data_block_buffers`) does not exceed `buffer_limit`.
uint64_t buffer_limit;

std::unique_ptr<CacheReservationManager> cache_rev_mng;
const bool use_delta_encoding_for_index_values;
std::unique_ptr<FilterBlockBuilder> filter_builder;
char cache_key_prefix[BlockBasedTable::kMaxCacheKeyPrefixSize];
Expand Down Expand Up @@ -444,6 +446,12 @@ struct BlockBasedTableBuilder::Rep {
buffer_limit = std::min(tbo.target_file_size,
compression_opts.max_dict_buffer_bytes);
}
if (table_options.no_block_cache) {
cache_rev_mng.reset(nullptr);
} else {
cache_rev_mng.reset(
new CacheReservationManager(table_options.block_cache));
}
for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) {
compression_ctxs[i].reset(new CompressionContext(compression_type));
}
Expand Down Expand Up @@ -896,10 +904,24 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
assert(!r->data_block.empty());
r->first_key_in_next_block = &key;
Flush();
if (r->state == Rep::State::kBuffered) {
bool exceeds_buffer_limit =
(r->buffer_limit != 0 && r->data_begin_offset > r->buffer_limit);
bool is_cache_full = false;

// Increase cache reservation for the last buffered data block
// only if the block is not going to be unbuffered immediately
// and there exists a cache reservation manager
if (!exceeds_buffer_limit && r->cache_rev_mng != nullptr) {
Status s = r->cache_rev_mng->UpdateCacheReservation<
Copy link
Contributor Author

@hx235 hx235 Sep 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Question]
It seems that there isn't any synchronization inside of BlockBasedTableBuilder for data structure like data_blocks_buffer nor did I see multi-threaded usage of one BlockBasedTableBuilder object in the test other than parallel compression, which seems be irrelevant to our data buffering (although I am having a bit difficulties in fully understanding the code of how parallel compression play its part in BlockBasedBuilder). I’d also imagine it will be pretty hard to guarantee the order of key doing multi-threaded Add() from one builder.

I wonder if we can skip the external synchronization of cache_rev_mng inside of BlockBasedTableBuilder (which I feel we can)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BlockBasedTableBuilder is generally single-threaded (except the experimental parallel compression parts, probably not relevant here), so having a separate cache_rev_mng per builder should suffice. 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for jumping in and answering my question!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, the background threads will not be involved in BlockBasedTableBuilder::Add(). They only do compressing/writing, and also they only start doing work once we EnterUnbuffered().

CacheEntryRole::kCompressionDictionaryBuildingBuffer>(
r->data_begin_offset);
is_cache_full = s.IsIncomplete();
}

if (r->state == Rep::State::kBuffered && r->buffer_limit != 0 &&
r->data_begin_offset > r->buffer_limit) {
EnterUnbuffered();
if (exceeds_buffer_limit || is_cache_full) {
EnterUnbuffered();
}
}

// Add item to index block.
Expand Down Expand Up @@ -1910,10 +1932,16 @@ void BlockBasedTableBuilder::EnterUnbuffered() {
r->pending_handle);
}
}

std::swap(iter, next_block_iter);
}
r->data_block_buffers.clear();
r->data_begin_offset = 0;
if (r->cache_rev_mng != nullptr) {
Status s = r->cache_rev_mng->UpdateCacheReservation<
CacheEntryRole::kCompressionDictionaryBuildingBuffer>(
r->data_begin_offset);
s.PermitUncheckedError();
}
hx235 marked this conversation as resolved.
Show resolved Hide resolved
}

Status BlockBasedTableBuilder::Finish() {
Expand Down
234 changes: 234 additions & 0 deletions table/table_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include <stddef.h>
#include <stdio.h>

#include <algorithm>
Expand Down Expand Up @@ -4746,6 +4747,239 @@ TEST_P(BlockBasedTableTest, OutOfBoundOnNext) {
ASSERT_FALSE(iter->UpperBoundCheckResult() == IterBoundCheck::kOutOfBound);
}

TEST_P(
BlockBasedTableTest,
IncreaseCacheReservationForCompressDictBuildingBufferOnBuilderAddAndDecreaseOnBuilderFinish) {
constexpr std::size_t kSizeDummyEntry = 256 * 1024;
constexpr std::size_t kMetaDataChargeOverhead = 10000;
constexpr std::size_t kCacheCapacity = 8 * 1024 * 1024;
constexpr std::size_t kMaxDictBytes = 1024;
constexpr std::size_t kMaxDictBufferBytes = 1024;

BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
LRUCacheOptions lo;
lo.capacity = kCacheCapacity;
lo.num_shard_bits = 0; // 2^0 shard
lo.strict_capacity_limit = true;
std::shared_ptr<Cache> cache(NewLRUCache(lo));
table_options.block_cache = cache;
table_options.flush_block_policy_factory =
std::make_shared<FlushBlockEveryKeyPolicyFactory>();

Options options;
options.compression = kSnappyCompression;
options.compression_opts.max_dict_bytes = kMaxDictBytes;
options.compression_opts.max_dict_buffer_bytes = kMaxDictBufferBytes;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));

test::StringSink* sink = new test::StringSink();
std::unique_ptr<FSWritableFile> holder(sink);
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(holder), "test_file_name", FileOptions()));

ImmutableOptions ioptions(options);
MutableCFOptions moptions(options);
InternalKeyComparator ikc(options.comparator);
IntTblPropCollectorFactories int_tbl_prop_collector_factories;

std::unique_ptr<TableBuilder> builder(options.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, moptions, ikc,
&int_tbl_prop_collector_factories, kSnappyCompression,
options.compression_opts, kUnknownColumnFamily,
"test_cf", -1 /* level */),
file_writer.get()));

std::string key1 = "key1";
std::string value1 = "val1";
InternalKey ik1(key1, 0 /* sequnce number */, kTypeValue);
// Adding the first key won't trigger a flush by FlushBlockEveryKeyPolicy
// therefore won't trigger any data block's buffering
builder->Add(ik1.Encode(), value1);
ASSERT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);

std::string key2 = "key2";
std::string value2 = "val2";
InternalKey ik2(key2, 1 /* sequnce number */, kTypeValue);
// Adding the second key will trigger a flush of the last data block (the one
// containing key1 and value1) by FlushBlockEveryKeyPolicy and hence trigger
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work finding test utils like FlushBlockEveryKeyPolicy to write clean/clear/simple unit tests.

// buffering of that data block.
builder->Add(ik2.Encode(), value2);
// Cache reservation will increase for last buffered data block (the one
// containing key1 and value1) since the buffer limit is not exceeded after
// that buffering and the cache will not be full after this reservation
EXPECT_GE(cache->GetPinnedUsage(), 1 * kSizeDummyEntry);
EXPECT_LT(cache->GetPinnedUsage(),
1 * kSizeDummyEntry + kMetaDataChargeOverhead);

ASSERT_OK(builder->Finish());
EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);
}

TEST_P(
BlockBasedTableTest,
IncreaseCacheReservationForCompressDictBuildingBufferOnBuilderAddAndDecreaseOnBufferLimitExceed) {
constexpr std::size_t kSizeDummyEntry = 256 * 1024;
constexpr std::size_t kMetaDataChargeOverhead = 10000;
constexpr std::size_t kCacheCapacity = 8 * 1024 * 1024;
constexpr std::size_t kMaxDictBytes = 1024;
constexpr std::size_t kMaxDictBufferBytes = 2 * kSizeDummyEntry;

BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
LRUCacheOptions lo;
lo.capacity = kCacheCapacity;
lo.num_shard_bits = 0; // 2^0 shard
lo.strict_capacity_limit = true;
std::shared_ptr<Cache> cache(NewLRUCache(lo));
table_options.block_cache = cache;
table_options.flush_block_policy_factory =
std::make_shared<FlushBlockEveryKeyPolicyFactory>();

Options options;
options.compression = kSnappyCompression;
options.compression_opts.max_dict_bytes = kMaxDictBytes;
options.compression_opts.max_dict_buffer_bytes = kMaxDictBufferBytes;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));

test::StringSink* sink = new test::StringSink();
std::unique_ptr<FSWritableFile> holder(sink);
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(holder), "test_file_name", FileOptions()));

ImmutableOptions ioptions(options);
MutableCFOptions moptions(options);
InternalKeyComparator ikc(options.comparator);
IntTblPropCollectorFactories int_tbl_prop_collector_factories;

std::unique_ptr<TableBuilder> builder(options.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, moptions, ikc,
&int_tbl_prop_collector_factories, kSnappyCompression,
options.compression_opts, kUnknownColumnFamily,
"test_cf", -1 /* level */),
file_writer.get()));

std::string key1 = "key1";
std::string value1(kSizeDummyEntry, '0');
InternalKey ik1(key1, 0 /* sequnce number */, kTypeValue);
// Adding the first key won't trigger a flush by FlushBlockEveryKeyPolicy
// therefore won't trigger any data block's buffering
builder->Add(ik1.Encode(), value1);
ASSERT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);

std::string key2 = "key2";
std::string value2(kSizeDummyEntry, '0');
InternalKey ik2(key2, 1 /* sequnce number */, kTypeValue);
// Adding the second key will trigger a flush of the last data block (the one
// containing key1 and value1) by FlushBlockEveryKeyPolicy and hence trigger
// buffering of the last data block.
builder->Add(ik2.Encode(), value2);
// Cache reservation will increase for last buffered data block (the one
// containing key1 and value1) since the buffer limit is not exceeded after
// the buffering and the cache will not be full after this reservation
EXPECT_GE(cache->GetPinnedUsage(), 2 * kSizeDummyEntry);
EXPECT_LT(cache->GetPinnedUsage(),
2 * kSizeDummyEntry + kMetaDataChargeOverhead);

std::string key3 = "key3";
std::string value3 = "val3";
InternalKey ik3(key3, 2 /* sequnce number */, kTypeValue);
// Adding the third key will trigger a flush of the last data block (the one
// containing key2 and value2) by FlushBlockEveryKeyPolicy and hence trigger
// buffering of the last data block.
builder->Add(ik3.Encode(), value3);
// Cache reservation will decrease since the buffer limit is now exceeded
// after the last buffering and EnterUnbuffered() is triggered
EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);

ASSERT_OK(builder->Finish());
EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);
}

TEST_P(
BlockBasedTableTest,
IncreaseCacheReservationForCompressDictBuildingBufferOnBuilderAddAndDecreaseOnCacheFull) {
constexpr std::size_t kSizeDummyEntry = 256 * 1024;
constexpr std::size_t kMetaDataChargeOverhead = 10000;
// A small kCacheCapacity is chosen so that increase cache reservation for
// buffering two data blocks, each containing key1/value1, key2/a big
// value2, will cause cache full
constexpr std::size_t kCacheCapacity =
1 * kSizeDummyEntry + kSizeDummyEntry / 2;
constexpr std::size_t kMaxDictBytes = 1024;
// A big kMaxDictBufferBytes is chosen so that adding a big key value pair
// (key2, value2) won't exceed the buffer limit
constexpr std::size_t kMaxDictBufferBytes = 1024 * 1024 * 1024;

BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
LRUCacheOptions lo;
lo.capacity = kCacheCapacity;
lo.num_shard_bits = 0; // 2^0 shard
lo.strict_capacity_limit = true;
std::shared_ptr<Cache> cache(NewLRUCache(lo));
table_options.block_cache = cache;
table_options.flush_block_policy_factory =
std::make_shared<FlushBlockEveryKeyPolicyFactory>();

Options options;
options.compression = kSnappyCompression;
options.compression_opts.max_dict_bytes = kMaxDictBytes;
options.compression_opts.max_dict_buffer_bytes = kMaxDictBufferBytes;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));

test::StringSink* sink = new test::StringSink();
std::unique_ptr<FSWritableFile> holder(sink);
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(holder), "test_file_name", FileOptions()));

ImmutableOptions ioptions(options);
MutableCFOptions moptions(options);
InternalKeyComparator ikc(options.comparator);
IntTblPropCollectorFactories int_tbl_prop_collector_factories;

std::unique_ptr<TableBuilder> builder(options.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, moptions, ikc,
&int_tbl_prop_collector_factories, kSnappyCompression,
options.compression_opts, kUnknownColumnFamily,
"test_cf", -1 /* level */),
file_writer.get()));

std::string key1 = "key1";
std::string value1 = "val1";
InternalKey ik1(key1, 0 /* sequnce number */, kTypeValue);
// Adding the first key won't trigger a flush by FlushBlockEveryKeyPolicy
// therefore won't trigger any data block's buffering
builder->Add(ik1.Encode(), value1);
ASSERT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);

std::string key2 = "key2";
std::string value2(kSizeDummyEntry, '0');
InternalKey ik2(key2, 1 /* sequnce number */, kTypeValue);
// Adding the second key will trigger a flush of the last data block (the one
// containing key1 and value1) by FlushBlockEveryKeyPolicy and hence trigger
// buffering of the last data block.
builder->Add(ik2.Encode(), value2);
// Cache reservation will increase for the last buffered data block (the one
// containing key1 and value1) since the buffer limit is not exceeded after
// the buffering and the cache will not be full after this reservation
EXPECT_GE(cache->GetPinnedUsage(), 1 * kSizeDummyEntry);
EXPECT_LT(cache->GetPinnedUsage(),
1 * kSizeDummyEntry + kMetaDataChargeOverhead);

std::string key3 = "key3";
std::string value3 = "value3";
InternalKey ik3(key3, 2 /* sequnce number */, kTypeValue);
// Adding the third key will trigger a flush of the last data block (the one
// containing key2 and value2) by FlushBlockEveryKeyPolicy and hence trigger
// buffering of the last data block.
builder->Add(ik3.Encode(), value3);
// Cache reservation will decrease since the cache is now full after
// increasing reservation for the last buffered block and EnterUnbuffered() is
// triggered
EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);

ASSERT_OK(builder->Finish());
EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);
}

} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down