Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add insert hints for each writebatch #5728

Closed
wants to merge 15 commits into from
10 changes: 6 additions & 4 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/, this,
true /*concurrent_memtable_writes*/, seq_per_batch_, w.batch_cnt,
batch_per_txn_);
batch_per_txn_, write_options.hint_per_batch);

PERF_TIMER_START(write_pre_and_post_process_time);
}
Expand Down Expand Up @@ -393,7 +393,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/,
this, true /*concurrent_memtable_writes*/, seq_per_batch_,
w.batch_cnt, batch_per_txn_);
w.batch_cnt, batch_per_txn_, write_options.hint_per_batch);
}
}
if (seq_used != nullptr) {
Expand Down Expand Up @@ -560,7 +560,8 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
w.status = WriteBatchInternal::InsertInto(
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/, this,
true /*concurrent_memtable_writes*/);
true /*concurrent_memtable_writes*/, false /*seq_per_batch*/,
0 /*batch_cnt*/, true /*batch_per_txn*/, write_options.hint_per_batch);
if (write_thread_.CompleteParallelMemTableWriter(&w)) {
MemTableInsertStatusCheck(w.status);
versions_->SetLastSequence(w.write_group->last_sequence);
Expand Down Expand Up @@ -598,7 +599,8 @@ Status DBImpl::UnorderedWriteMemtable(const WriteOptions& write_options,
w.status = WriteBatchInternal::InsertInto(
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/, this,
true /*concurrent_memtable_writes*/, seq_per_batch_, sub_batch_cnt);
true /*concurrent_memtable_writes*/, seq_per_batch_, sub_batch_cnt,
true /*batch_per_txn*/, write_options.hint_per_batch);

WriteStatusCheck(w.status);
if (write_options.disableWAL) {
Expand Down
6 changes: 4 additions & 2 deletions db/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ MemTable::MemTableStats MemTable::ApproximateStats(const Slice& start_ikey,
bool MemTable::Add(SequenceNumber s, ValueType type,
const Slice& key, /* user key */
const Slice& value, bool allow_concurrent,
MemTablePostProcessInfo* post_process_info) {
MemTablePostProcessInfo* post_process_info, void** hint) {
// Format of an entry is concatenation of:
// key_size : varint32 of internal_key.size()
// key bytes : char[internal_key.size()]
Expand Down Expand Up @@ -544,7 +544,9 @@ bool MemTable::Add(SequenceNumber s, ValueType type,
assert(post_process_info == nullptr);
UpdateFlushState();
} else {
bool res = table->InsertKeyConcurrently(handle);
bool res = (hint == nullptr)
? table->InsertKeyConcurrently(handle)
: table->InsertKeyWithHintConcurrently(handle, hint);
if (UNLIKELY(!res)) {
return res;
}
Expand Down
3 changes: 2 additions & 1 deletion db/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ class MemTable {
// the <key, seq> already exists.
bool Add(SequenceNumber seq, ValueType type, const Slice& key,
const Slice& value, bool allow_concurrent = false,
MemTablePostProcessInfo* post_process_info = nullptr);
MemTablePostProcessInfo* post_process_info = nullptr,
void** hint = nullptr);

// Used to Get value associated with key or Get Merge Operands associated
// with key.
Expand Down
39 changes: 33 additions & 6 deletions db/write_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include <stack>
#include <stdexcept>
#include <type_traits>
#include <unordered_map>
#include <vector>

#include "db/column_family.h"
Expand Down Expand Up @@ -1222,6 +1223,22 @@ class MemTableInserter : public WriteBatch::Handler {
DupDetector duplicate_detector_;
bool dup_dectector_on_;

bool hint_per_batch_;
bool hint_created_;
// Hints for this batch
using HintMap = std::unordered_map<MemTable*, void*>;
using HintMapType = std::aligned_storage<sizeof(HintMap)>::type;
HintMapType hint_;

HintMap& GetHintMap() {
assert(hint_per_batch_);
if (!hint_created_) {
new (&hint_) HintMap();
hint_created_ = true;
}
return *reinterpret_cast<HintMap*>(&hint_);
}

MemPostInfoMap& GetPostMap() {
assert(concurrent_memtable_writes_);
if(!post_info_created_) {
Expand Down Expand Up @@ -1254,7 +1271,7 @@ class MemTableInserter : public WriteBatch::Handler {
uint64_t recovering_log_number, DB* db,
bool concurrent_memtable_writes,
bool* has_valid_writes = nullptr, bool seq_per_batch = false,
bool batch_per_txn = true)
bool batch_per_txn = true, bool hint_per_batch = false)
: sequence_(_sequence),
cf_mems_(cf_mems),
flush_scheduler_(flush_scheduler),
Expand All @@ -1277,7 +1294,9 @@ class MemTableInserter : public WriteBatch::Handler {
write_before_prepare_(!batch_per_txn),
unprepared_batch_(false),
duplicate_detector_(),
dup_dectector_on_(false) {
dup_dectector_on_(false),
hint_per_batch_(hint_per_batch),
hint_created_(false) {
assert(cf_mems_);
}

Expand All @@ -1290,6 +1309,12 @@ class MemTableInserter : public WriteBatch::Handler {
reinterpret_cast<MemPostInfoMap*>
(&mem_post_info_map_)->~MemPostInfoMap();
}
if (hint_created_) {
for (auto iter : GetHintMap()) {
delete[] reinterpret_cast<char*>(iter.second);
}
reinterpret_cast<HintMap*>(&hint_)->~HintMap();
}
delete rebuilding_trx_;
}

Expand Down Expand Up @@ -1399,7 +1424,8 @@ class MemTableInserter : public WriteBatch::Handler {
if (!moptions->inplace_update_support) {
bool mem_res =
mem->Add(sequence_, value_type, key, value,
concurrent_memtable_writes_, get_post_process_info(mem));
concurrent_memtable_writes_, get_post_process_info(mem),
hint_per_batch_ ? &GetHintMap()[mem] : nullptr);
if (UNLIKELY(!mem_res)) {
assert(seq_per_batch_);
ret_status = Status::TryAgain("key+seq exists");
Expand Down Expand Up @@ -1482,7 +1508,8 @@ class MemTableInserter : public WriteBatch::Handler {
MemTable* mem = cf_mems_->GetMemTable();
bool mem_res =
mem->Add(sequence_, delete_type, key, value,
concurrent_memtable_writes_, get_post_process_info(mem));
concurrent_memtable_writes_, get_post_process_info(mem),
hint_per_batch_ ? &GetHintMap()[mem] : nullptr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I'm not knowledgable enough to C++. If hint map doesn't have "mem", does the value inserted guarantees to be nullptr?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the value will be value-initialized, so pointer will be initialized to nullptr.

if (UNLIKELY(!mem_res)) {
assert(seq_per_batch_);
ret_status = Status::TryAgain("key+seq exists");
Expand Down Expand Up @@ -1941,15 +1968,15 @@ Status WriteBatchInternal::InsertInto(
ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
bool ignore_missing_column_families, uint64_t log_number, DB* db,
bool concurrent_memtable_writes, bool seq_per_batch, size_t batch_cnt,
bool batch_per_txn) {
bool batch_per_txn, bool hint_per_batch) {
#ifdef NDEBUG
(void)batch_cnt;
#endif
assert(writer->ShouldWriteToMemtable());
MemTableInserter inserter(
sequence, memtables, flush_scheduler, ignore_missing_column_families,
log_number, db, concurrent_memtable_writes, nullptr /*has_valid_writes*/,
seq_per_batch, batch_per_txn);
seq_per_batch, batch_per_txn, hint_per_batch);
SetSequence(writer->batch, sequence);
inserter.set_log_number_ref(writer->log_ref);
Status s = writer->batch->Iterate(&inserter);
Expand Down
3 changes: 2 additions & 1 deletion db/write_batch_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ class WriteBatchInternal {
uint64_t log_number = 0, DB* db = nullptr,
bool concurrent_memtable_writes = false,
bool seq_per_batch = false, size_t batch_cnt = 0,
bool batch_per_txn = true);
bool batch_per_txn = true,
bool hint_per_batch = false);

static Status Append(WriteBatch* dst, const WriteBatch* src,
const bool WAL_only = false);
Expand Down
14 changes: 14 additions & 0 deletions include/rocksdb/memtablerep.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,20 @@ class MemTableRep {
return true;
}

// Same as ::InsertWithHint, but allow concurrnet write
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to document the ownership of hint. In my understanding, the caller will own the object hint, correct?

Or, if that is the case, an even better idea is to change the argument to std::unique_ptr<void>*, so that is ownership is clear.

virtual void InsertWithHintConcurrently(KeyHandle handle, void** /*hint*/) {
// Ignore the hint by default.
InsertConcurrently(handle);
}

// Same as ::InsertWithHintConcurrently
// Returns false if MemTableRepFactory::CanHandleDuplicatedKey() is true and
// the <key, seq> already exists.
virtual bool InsertKeyWithHintConcurrently(KeyHandle handle, void** hint) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as here.

InsertWithHintConcurrently(handle, hint);
return true;
}

// Like Insert(handle), but may be called concurrent with other calls
// to InsertConcurrently for other handles.
//
Expand Down
6 changes: 6 additions & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -1338,6 +1338,11 @@ struct WriteOptions {
// Default: false
bool low_pri;

// If true, this writebatch will use its own insert hints in concurrent write
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be a better idea to improve the comments.

//
// Default: false
bool hint_per_batch;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which of memtable_insert_with_hint_prefix_extractor and hint_per_batch should take precedence? It seems to me it should be hint_per_batch because its a per batch option which is more specific. And we should document the behavior in the inline comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we by default enable it if memtable_insert_with_hint_prefix_extractor is set?
Extra option always makes it harder to maintain.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two options are not compatible with each other. memtable_insert_with_hint_prefix_extractor preserve the hint across different write batches, and hint_per_batch has nothing to do with prefixes.

But I'm wondering whether hint_per_batch can always enable. Like for each write batch we keep the splice for the first key, then detect if its a sequential insert. If so, reuse the hint, otherwise discard the hint and start over.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now I only enable hint_per_batch when concurrent_memtable_writes is set to true, because in non-concurrent write either memtable_insert_with_hint_prefix_extractor or seq_splice_ will be used, which i think will make no much difference in performance compared to hint_per_batch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some discussion we think having an extra hint_per_batch write option is more flexible and better fit our needs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should call it memtable_insert_hint_per_batch or something like that. Hint is too general for WriteOptions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will change it to memtable_insert_hint_per_batch .


// Timestamp of write operation, e.g. Put. All timestamps of the same
// database must share the same length and format. The user is also
// responsible for providing a customized compare function via Comparator to
Expand All @@ -1355,6 +1360,7 @@ struct WriteOptions {
ignore_missing_column_families(false),
no_slowdown(false),
low_pri(false),
hint_per_batch(false),
timestamp(nullptr) {}
};

Expand Down
24 changes: 24 additions & 0 deletions memtable/inlineskiplist.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ class InlineSkipList {
// REQUIRES: no concurrent calls to any of inserts.
bool InsertWithHint(const char* key, void** hint);

// Like InsertConcurrently, but with a hint
//
// REQUIRES: nothing that compares equal to key is currently in the list.
// REQUIRES: no concurrent calls that use same hint
bool InsertWithHintConcurrently(const char* key, void** hint);

// Like Insert, but external synchronization is not required.
bool InsertConcurrently(const char* key);

Expand Down Expand Up @@ -669,6 +675,24 @@ bool InlineSkipList<Comparator>::InsertWithHint(const char* key, void** hint) {
return Insert<false>(key, splice, true);
}

template <class Comparator>
bool InlineSkipList<Comparator>::InsertWithHintConcurrently(const char* key,
void** hint) {
assert(hint != nullptr);
Splice* splice = reinterpret_cast<Splice*>(*hint);
if (splice == nullptr) {
size_t array_size = sizeof(Node*) * (kMaxHeight_ + 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extract the logic to a AllocateSpliceOnHeap method?

char* raw = new char[sizeof(Splice) + array_size * 2];
splice = reinterpret_cast<Splice*>(raw);
splice->height_ = 0;
splice->prev_ = reinterpret_cast<Node**>(raw + sizeof(Splice));
splice->next_ = reinterpret_cast<Node**>(raw + sizeof(Splice) + array_size);

*hint = reinterpret_cast<void*>(splice);
}
return Insert<true>(key, splice, true);
}

template <class Comparator>
template <bool prefetch_before>
void InlineSkipList<Comparator>::FindSpliceForLevel(const DecodedKey& key,
Expand Down
26 changes: 22 additions & 4 deletions memtable/inlineskiplist_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -412,12 +412,18 @@ class ConcurrentTest {
}

// REQUIRES: No concurrent calls for the same k
void ConcurrentWriteStep(uint32_t k) {
void ConcurrentWriteStep(uint32_t k, bool use_hint = false) {
const int g = current_.Get(k) + 1;
const Key new_key = MakeKey(k, g);
char* buf = list_.AllocateKey(sizeof(Key));
memcpy(buf, &new_key, sizeof(Key));
list_.InsertConcurrently(buf);
if (use_hint) {
void* hint = nullptr;
list_.InsertWithHintConcurrently(buf, &hint);
delete[] reinterpret_cast<char*>(hint);
} else {
list_.InsertConcurrently(buf);
}
ASSERT_EQ(g, current_.Get(k) + 1);
current_.Set(k, g);
}
Expand Down Expand Up @@ -508,6 +514,7 @@ TEST_F(InlineSkipTest, ConcurrentInsertWithoutThreads) {
class TestState {
public:
ConcurrentTest t_;
bool use_hint_;
int seed_;
std::atomic<bool> quit_flag_;
std::atomic<uint32_t> next_writer_;
Expand Down Expand Up @@ -575,7 +582,7 @@ static void ConcurrentReader(void* arg) {
static void ConcurrentWriter(void* arg) {
TestState* state = reinterpret_cast<TestState*>(arg);
uint32_t k = state->next_writer_++ % ConcurrentTest::K;
state->t_.ConcurrentWriteStep(k);
state->t_.ConcurrentWriteStep(k, state->use_hint_);
state->AdjustPendingWriters(-1);
}

Expand All @@ -600,7 +607,8 @@ static void RunConcurrentRead(int run) {
}
}

static void RunConcurrentInsert(int run, int write_parallelism = 4) {
static void RunConcurrentInsert(int run, bool use_hint = false,
int write_parallelism = 4) {
Env::Default()->SetBackgroundThreads(1 + write_parallelism,
Env::Priority::LOW);
const int seed = test::RandomSeed() + (run * 100);
Expand All @@ -612,6 +620,7 @@ static void RunConcurrentInsert(int run, int write_parallelism = 4) {
fprintf(stderr, "Run %d of %d\n", i, N);
}
TestState state(seed + 1);
state.use_hint_ = use_hint;
Env::Default()->Schedule(ConcurrentReader, &state);
state.Wait(TestState::RUNNING);
for (int k = 0; k < kSize; k += write_parallelism) {
Expand All @@ -635,6 +644,15 @@ TEST_F(InlineSkipTest, ConcurrentRead5) { RunConcurrentRead(5); }
TEST_F(InlineSkipTest, ConcurrentInsert1) { RunConcurrentInsert(1); }
TEST_F(InlineSkipTest, ConcurrentInsert2) { RunConcurrentInsert(2); }
TEST_F(InlineSkipTest, ConcurrentInsert3) { RunConcurrentInsert(3); }
TEST_F(InlineSkipTest, ConcurrentInsertWithHint1) {
RunConcurrentInsert(1, true);
}
TEST_F(InlineSkipTest, ConcurrentInsertWithHint2) {
RunConcurrentInsert(2, true);
}
TEST_F(InlineSkipTest, ConcurrentInsertWithHint3) {
RunConcurrentInsert(3, true);
}

#endif // ROCKSDB_VALGRIND_RUN
} // namespace rocksdb
Expand Down
9 changes: 9 additions & 0 deletions memtable/skiplistrep.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ class SkipListRep : public MemTableRep {
return skip_list_.InsertWithHint(static_cast<char*>(handle), hint);
}

void InsertWithHintConcurrently(KeyHandle handle, void** hint) override {
skip_list_.InsertWithHintConcurrently(static_cast<char*>(handle), hint);
}

bool InsertKeyWithHintConcurrently(KeyHandle handle, void** hint) override {
return skip_list_.InsertWithHintConcurrently(static_cast<char*>(handle),
hint);
}

void InsertConcurrently(KeyHandle handle) override {
skip_list_.InsertConcurrently(static_cast<char*>(handle));
}
Expand Down