Skip to content

Commit

Permalink
Add insert hints for each writebatch (#5728)
Browse files Browse the repository at this point in the history
Summary:
Add insert hints for each writebatch so that they can be used in concurrent write, and add write option to enable it.

Bench result (qps):

`./db_bench --benchmarks=fillseq -allow_concurrent_memtable_write=true -num=4000000 -batch-size=1 -threads=1 -db=/data3/ylj/tmp -write_buffer_size=536870912 -num_column_families=4`

master:

| batch size \ thread num | 1       | 2       | 4       | 8       |
| ----------------------- | ------- | ------- | ------- | ------- |
| 1                       | 387883  | 220790  | 308294  | 490998  |
| 10                      | 1397208 | 978911  | 1275684 | 1733395 |
| 100                     | 2045414 | 1589927 | 1798782 | 2681039 |
| 1000                    | 2228038 | 1698252 | 1839877 | 2863490 |

fillseq with writebatch hint:

| batch size \ thread num | 1       | 2       | 4       | 8       |
| ----------------------- | ------- | ------- | ------- | ------- |
| 1                       | 286005  | 223570  | 300024  | 466981  |
| 10                      | 970374  | 813308  | 1399299 | 1753588 |
| 100                     | 1962768 | 1983023 | 2676577 | 3086426 |
| 1000                    | 2195853 | 2676782 | 3231048 | 3638143 |
Pull Request resolved: #5728

Differential Revision: D17297240

fbshipit-source-id: b053590a6d77871f1ef2f911a7bd013b3899b26c
  • Loading branch information
Jing118 authored and facebook-github-bot committed Sep 13, 2019
1 parent a378a4c commit 1a928c2
Show file tree
Hide file tree
Showing 13 changed files with 155 additions and 20 deletions.
5 changes: 5 additions & 0 deletions db/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3224,6 +3224,11 @@ void rocksdb_writeoptions_set_low_pri(
opt->rep.low_pri = v;
}

void rocksdb_writeoptions_set_memtable_insert_hint_per_batch(
rocksdb_writeoptions_t* opt, unsigned char v) {
opt->rep.memtable_insert_hint_per_batch = v;
}

rocksdb_compactoptions_t* rocksdb_compactoptions_create() {
return new rocksdb_compactoptions_t;
}
Expand Down
12 changes: 8 additions & 4 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
&trim_history_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/, this,
true /*concurrent_memtable_writes*/, seq_per_batch_, w.batch_cnt,
batch_per_txn_);
batch_per_txn_, write_options.memtable_insert_hint_per_batch);

PERF_TIMER_START(write_pre_and_post_process_time);
}
Expand Down Expand Up @@ -397,7 +397,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
&trim_history_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/,
this, true /*concurrent_memtable_writes*/, seq_per_batch_,
w.batch_cnt, batch_per_txn_);
w.batch_cnt, batch_per_txn_,
write_options.memtable_insert_hint_per_batch);
}
}
if (seq_used != nullptr) {
Expand Down Expand Up @@ -564,7 +565,9 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
w.status = WriteBatchInternal::InsertInto(
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
&trim_history_scheduler_, write_options.ignore_missing_column_families,
0 /*log_number*/, this, true /*concurrent_memtable_writes*/);
0 /*log_number*/, this, true /*concurrent_memtable_writes*/,
false /*seq_per_batch*/, 0 /*batch_cnt*/, true /*batch_per_txn*/,
write_options.memtable_insert_hint_per_batch);
if (write_thread_.CompleteParallelMemTableWriter(&w)) {
MemTableInsertStatusCheck(w.status);
versions_->SetLastSequence(w.write_group->last_sequence);
Expand Down Expand Up @@ -603,7 +606,8 @@ Status DBImpl::UnorderedWriteMemtable(const WriteOptions& write_options,
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
&trim_history_scheduler_, write_options.ignore_missing_column_families,
0 /*log_number*/, this, true /*concurrent_memtable_writes*/,
seq_per_batch_, sub_batch_cnt);
seq_per_batch_, sub_batch_cnt, true /*batch_per_txn*/,
write_options.memtable_insert_hint_per_batch);

WriteStatusCheck(w.status);
if (write_options.disableWAL) {
Expand Down
6 changes: 4 additions & 2 deletions db/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ MemTable::MemTableStats MemTable::ApproximateStats(const Slice& start_ikey,
bool MemTable::Add(SequenceNumber s, ValueType type,
const Slice& key, /* user key */
const Slice& value, bool allow_concurrent,
MemTablePostProcessInfo* post_process_info) {
MemTablePostProcessInfo* post_process_info, void** hint) {
// Format of an entry is concatenation of:
// key_size : varint32 of internal_key.size()
// key bytes : char[internal_key.size()]
Expand Down Expand Up @@ -547,7 +547,9 @@ bool MemTable::Add(SequenceNumber s, ValueType type,
assert(post_process_info == nullptr);
UpdateFlushState();
} else {
bool res = table->InsertKeyConcurrently(handle);
bool res = (hint == nullptr)
? table->InsertKeyConcurrently(handle)
: table->InsertKeyWithHintConcurrently(handle, hint);
if (UNLIKELY(!res)) {
return res;
}
Expand Down
3 changes: 2 additions & 1 deletion db/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ class MemTable {
// the <key, seq> already exists.
bool Add(SequenceNumber seq, ValueType type, const Slice& key,
const Slice& value, bool allow_concurrent = false,
MemTablePostProcessInfo* post_process_info = nullptr);
MemTablePostProcessInfo* post_process_info = nullptr,
void** hint = nullptr);

// Used to Get value associated with key or Get Merge Operands associated
// with key.
Expand Down
39 changes: 33 additions & 6 deletions db/write_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include <stack>
#include <stdexcept>
#include <type_traits>
#include <unordered_map>
#include <vector>

#include "db/column_family.h"
Expand Down Expand Up @@ -1225,6 +1226,22 @@ class MemTableInserter : public WriteBatch::Handler {
DupDetector duplicate_detector_;
bool dup_dectector_on_;

bool hint_per_batch_;
bool hint_created_;
// Hints for this batch
using HintMap = std::unordered_map<MemTable*, void*>;
using HintMapType = std::aligned_storage<sizeof(HintMap)>::type;
HintMapType hint_;

HintMap& GetHintMap() {
assert(hint_per_batch_);
if (!hint_created_) {
new (&hint_) HintMap();
hint_created_ = true;
}
return *reinterpret_cast<HintMap*>(&hint_);
}

MemPostInfoMap& GetPostMap() {
assert(concurrent_memtable_writes_);
if(!post_info_created_) {
Expand Down Expand Up @@ -1258,7 +1275,7 @@ class MemTableInserter : public WriteBatch::Handler {
uint64_t recovering_log_number, DB* db,
bool concurrent_memtable_writes,
bool* has_valid_writes = nullptr, bool seq_per_batch = false,
bool batch_per_txn = true)
bool batch_per_txn = true, bool hint_per_batch = false)
: sequence_(_sequence),
cf_mems_(cf_mems),
flush_scheduler_(flush_scheduler),
Expand All @@ -1282,7 +1299,9 @@ class MemTableInserter : public WriteBatch::Handler {
write_before_prepare_(!batch_per_txn),
unprepared_batch_(false),
duplicate_detector_(),
dup_dectector_on_(false) {
dup_dectector_on_(false),
hint_per_batch_(hint_per_batch),
hint_created_(false) {
assert(cf_mems_);
}

Expand All @@ -1295,6 +1314,12 @@ class MemTableInserter : public WriteBatch::Handler {
reinterpret_cast<MemPostInfoMap*>
(&mem_post_info_map_)->~MemPostInfoMap();
}
if (hint_created_) {
for (auto iter : GetHintMap()) {
delete[] reinterpret_cast<char*>(iter.second);
}
reinterpret_cast<HintMap*>(&hint_)->~HintMap();
}
delete rebuilding_trx_;
}

Expand Down Expand Up @@ -1404,7 +1429,8 @@ class MemTableInserter : public WriteBatch::Handler {
if (!moptions->inplace_update_support) {
bool mem_res =
mem->Add(sequence_, value_type, key, value,
concurrent_memtable_writes_, get_post_process_info(mem));
concurrent_memtable_writes_, get_post_process_info(mem),
hint_per_batch_ ? &GetHintMap()[mem] : nullptr);
if (UNLIKELY(!mem_res)) {
assert(seq_per_batch_);
ret_status = Status::TryAgain("key+seq exists");
Expand Down Expand Up @@ -1487,7 +1513,8 @@ class MemTableInserter : public WriteBatch::Handler {
MemTable* mem = cf_mems_->GetMemTable();
bool mem_res =
mem->Add(sequence_, delete_type, key, value,
concurrent_memtable_writes_, get_post_process_info(mem));
concurrent_memtable_writes_, get_post_process_info(mem),
hint_per_batch_ ? &GetHintMap()[mem] : nullptr);
if (UNLIKELY(!mem_res)) {
assert(seq_per_batch_);
ret_status = Status::TryAgain("key+seq exists");
Expand Down Expand Up @@ -1962,7 +1989,7 @@ Status WriteBatchInternal::InsertInto(
TrimHistoryScheduler* trim_history_scheduler,
bool ignore_missing_column_families, uint64_t log_number, DB* db,
bool concurrent_memtable_writes, bool seq_per_batch, size_t batch_cnt,
bool batch_per_txn) {
bool batch_per_txn, bool hint_per_batch) {
#ifdef NDEBUG
(void)batch_cnt;
#endif
Expand All @@ -1971,7 +1998,7 @@ Status WriteBatchInternal::InsertInto(
sequence, memtables, flush_scheduler, trim_history_scheduler,
ignore_missing_column_families, log_number, db,
concurrent_memtable_writes, nullptr /*has_valid_writes*/, seq_per_batch,
batch_per_txn);
batch_per_txn, hint_per_batch);
SetSequence(writer->batch, sequence);
inserter.set_log_number_ref(writer->log_ref);
Status s = writer->batch->Iterate(&inserter);
Expand Down
3 changes: 2 additions & 1 deletion db/write_batch_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ class WriteBatchInternal {
uint64_t log_number = 0, DB* db = nullptr,
bool concurrent_memtable_writes = false,
bool seq_per_batch = false, size_t batch_cnt = 0,
bool batch_per_txn = true);
bool batch_per_txn = true,
bool hint_per_batch = false);

static Status Append(WriteBatch* dst, const WriteBatch* src,
const bool WAL_only = false);
Expand Down
3 changes: 3 additions & 0 deletions include/rocksdb/c.h
Original file line number Diff line number Diff line change
Expand Up @@ -1263,6 +1263,9 @@ extern ROCKSDB_LIBRARY_API void rocksdb_writeoptions_set_no_slowdown(
rocksdb_writeoptions_t*, unsigned char);
extern ROCKSDB_LIBRARY_API void rocksdb_writeoptions_set_low_pri(
rocksdb_writeoptions_t*, unsigned char);
extern ROCKSDB_LIBRARY_API void
rocksdb_writeoptions_set_memtable_insert_hint_per_batch(rocksdb_writeoptions_t*,
unsigned char);

/* Compact range options */

Expand Down
22 changes: 22 additions & 0 deletions include/rocksdb/memtablerep.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,28 @@ class MemTableRep {
return true;
}

// Same as ::InsertWithHint, but allow concurrnet write
//
// If hint points to nullptr, a new hint will be allocated on heap, otherwise
// the hint will be updated to reflect the last insert location. The hint is
// owned by the caller and it is the caller's responsibility to delete the
// hint later.
//
// Currently only skip-list based memtable implement the interface. Other
// implementations will fallback to InsertConcurrently() by default.
virtual void InsertWithHintConcurrently(KeyHandle handle, void** /*hint*/) {
// Ignore the hint by default.
InsertConcurrently(handle);
}

// Same as ::InsertWithHintConcurrently
// Returns false if MemTableRepFactory::CanHandleDuplicatedKey() is true and
// the <key, seq> already exists.
virtual bool InsertKeyWithHintConcurrently(KeyHandle handle, void** hint) {
InsertWithHintConcurrently(handle, hint);
return true;
}

// Like Insert(handle), but may be called concurrent with other calls
// to InsertConcurrently for other handles.
//
Expand Down
10 changes: 10 additions & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -1356,6 +1356,15 @@ struct WriteOptions {
// Default: false
bool low_pri;

// If true, this writebatch will maintain the last insert positions of each
// memtable as hints in concurrent write. It can improve write performance
// in concurrent writes if keys in one writebatch are sequential. In
// non-concurrent writes (when concurrent_memtable_writes is false) this
// option will be ignored.
//
// Default: false
bool memtable_insert_hint_per_batch;

// Timestamp of write operation, e.g. Put. All timestamps of the same
// database must share the same length and format. The user is also
// responsible for providing a customized compare function via Comparator to
Expand All @@ -1373,6 +1382,7 @@ struct WriteOptions {
ignore_missing_column_families(false),
no_slowdown(false),
low_pri(false),
memtable_insert_hint_per_batch(false),
timestamp(nullptr) {}
};

Expand Down
33 changes: 33 additions & 0 deletions memtable/inlineskiplist.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ class InlineSkipList {
// Allocate a splice using allocator.
Splice* AllocateSplice();

// Allocate a splice on heap.
Splice* AllocateSpliceOnHeap();

// Inserts a key allocated by AllocateKey, after the actual key value
// has been filled in.
//
Expand All @@ -105,6 +108,12 @@ class InlineSkipList {
// REQUIRES: no concurrent calls to any of inserts.
bool InsertWithHint(const char* key, void** hint);

// Like InsertConcurrently, but with a hint
//
// REQUIRES: nothing that compares equal to key is currently in the list.
// REQUIRES: no concurrent calls that use same hint
bool InsertWithHintConcurrently(const char* key, void** hint);

// Like Insert, but external synchronization is not required.
bool InsertConcurrently(const char* key);

Expand Down Expand Up @@ -642,6 +651,18 @@ InlineSkipList<Comparator>::AllocateSplice() {
return splice;
}

template <class Comparator>
typename InlineSkipList<Comparator>::Splice*
InlineSkipList<Comparator>::AllocateSpliceOnHeap() {
size_t array_size = sizeof(Node*) * (kMaxHeight_ + 1);
char* raw = new char[sizeof(Splice) + array_size * 2];
Splice* splice = reinterpret_cast<Splice*>(raw);
splice->height_ = 0;
splice->prev_ = reinterpret_cast<Node**>(raw + sizeof(Splice));
splice->next_ = reinterpret_cast<Node**>(raw + sizeof(Splice) + array_size);
return splice;
}

template <class Comparator>
bool InlineSkipList<Comparator>::Insert(const char* key) {
return Insert<false>(key, seq_splice_, false);
Expand All @@ -668,6 +689,18 @@ bool InlineSkipList<Comparator>::InsertWithHint(const char* key, void** hint) {
return Insert<false>(key, splice, true);
}

template <class Comparator>
bool InlineSkipList<Comparator>::InsertWithHintConcurrently(const char* key,
void** hint) {
assert(hint != nullptr);
Splice* splice = reinterpret_cast<Splice*>(*hint);
if (splice == nullptr) {
splice = AllocateSpliceOnHeap();
*hint = reinterpret_cast<void*>(splice);
}
return Insert<true>(key, splice, true);
}

template <class Comparator>
template <bool prefetch_before>
void InlineSkipList<Comparator>::FindSpliceForLevel(const DecodedKey& key,
Expand Down
26 changes: 22 additions & 4 deletions memtable/inlineskiplist_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -412,12 +412,18 @@ class ConcurrentTest {
}

// REQUIRES: No concurrent calls for the same k
void ConcurrentWriteStep(uint32_t k) {
void ConcurrentWriteStep(uint32_t k, bool use_hint = false) {
const int g = current_.Get(k) + 1;
const Key new_key = MakeKey(k, g);
char* buf = list_.AllocateKey(sizeof(Key));
memcpy(buf, &new_key, sizeof(Key));
list_.InsertConcurrently(buf);
if (use_hint) {
void* hint = nullptr;
list_.InsertWithHintConcurrently(buf, &hint);
delete[] reinterpret_cast<char*>(hint);
} else {
list_.InsertConcurrently(buf);
}
ASSERT_EQ(g, current_.Get(k) + 1);
current_.Set(k, g);
}
Expand Down Expand Up @@ -508,6 +514,7 @@ TEST_F(InlineSkipTest, ConcurrentInsertWithoutThreads) {
class TestState {
public:
ConcurrentTest t_;
bool use_hint_;
int seed_;
std::atomic<bool> quit_flag_;
std::atomic<uint32_t> next_writer_;
Expand Down Expand Up @@ -575,7 +582,7 @@ static void ConcurrentReader(void* arg) {
static void ConcurrentWriter(void* arg) {
TestState* state = reinterpret_cast<TestState*>(arg);
uint32_t k = state->next_writer_++ % ConcurrentTest::K;
state->t_.ConcurrentWriteStep(k);
state->t_.ConcurrentWriteStep(k, state->use_hint_);
state->AdjustPendingWriters(-1);
}

Expand All @@ -600,7 +607,8 @@ static void RunConcurrentRead(int run) {
}
}

static void RunConcurrentInsert(int run, int write_parallelism = 4) {
static void RunConcurrentInsert(int run, bool use_hint = false,
int write_parallelism = 4) {
Env::Default()->SetBackgroundThreads(1 + write_parallelism,
Env::Priority::LOW);
const int seed = test::RandomSeed() + (run * 100);
Expand All @@ -612,6 +620,7 @@ static void RunConcurrentInsert(int run, int write_parallelism = 4) {
fprintf(stderr, "Run %d of %d\n", i, N);
}
TestState state(seed + 1);
state.use_hint_ = use_hint;
Env::Default()->Schedule(ConcurrentReader, &state);
state.Wait(TestState::RUNNING);
for (int k = 0; k < kSize; k += write_parallelism) {
Expand All @@ -635,6 +644,15 @@ TEST_F(InlineSkipTest, ConcurrentRead5) { RunConcurrentRead(5); }
TEST_F(InlineSkipTest, ConcurrentInsert1) { RunConcurrentInsert(1); }
TEST_F(InlineSkipTest, ConcurrentInsert2) { RunConcurrentInsert(2); }
TEST_F(InlineSkipTest, ConcurrentInsert3) { RunConcurrentInsert(3); }
TEST_F(InlineSkipTest, ConcurrentInsertWithHint1) {
RunConcurrentInsert(1, true);
}
TEST_F(InlineSkipTest, ConcurrentInsertWithHint2) {
RunConcurrentInsert(2, true);
}
TEST_F(InlineSkipTest, ConcurrentInsertWithHint3) {
RunConcurrentInsert(3, true);
}

#endif // ROCKSDB_VALGRIND_RUN
} // namespace rocksdb
Expand Down
Loading

0 comments on commit 1a928c2

Please sign in to comment.