Skip to content

Commit

Permalink
Optimize Commit pipeline performance (facebook#286)
Browse files Browse the repository at this point in the history
* Optimize Commit pipeline performance

Signed-off-by: Wenbo Zhang <[email protected]>

* Remove safe queue, iterate the wbs directly

Signed-off-by: Wenbo Zhang <[email protected]>

* Refactored some code
  - Move atomic ops into functions
  - Unified replacement of PebbleWrite with MultiBatchWrite
  - Add a few comments

Signed-off-by: Wenbo Zhang <[email protected]>

* Reset pending_wb_cnt before wakeup writers when write WAL failed

Signed-off-by: Wenbo Zhang <[email protected]>

* Update db/db_impl/db_impl_write.cc

Co-authored-by: Xinye Tao <[email protected]>
Signed-off-by: Wenbo Zhang <[email protected]>

* Update db/write_thread.cc

Co-authored-by: Xinye Tao <[email protected]>
Signed-off-by: Wenbo Zhang <[email protected]>

* Refactor some code

Signed-off-by: Wenbo Zhang <[email protected]>

* Fix two bugs about write memtable

Signed-off-by: Wenbo Zhang <[email protected]>

Co-authored-by: Xinye Tao <[email protected]>
Signed-off-by: tabokie <[email protected]>
  • Loading branch information
ethercflow and tabokie committed Jun 28, 2022
1 parent 1726df0 commit 19db40b
Show file tree
Hide file tree
Showing 22 changed files with 489 additions and 125 deletions.
2 changes: 1 addition & 1 deletion db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1396,7 +1396,7 @@ Status ColumnFamilyData::ValidateOptions(
"FIFO compaction only supported with max_open_files = -1.");
}

if (db_options.enable_pipelined_commit &&
if (db_options.enable_multi_batch_write &&
cf_options.max_successive_merges > 0) {
return Status::NotSupported(
"Multi thread write is only supported with no successive merges");
Expand Down
18 changes: 11 additions & 7 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ class DBImpl : public DB {
virtual Status Write(const WriteOptions& options,
WriteBatch* updates) override;

using DB::MultiBatchWrite;
virtual Status MultiBatchWrite(const WriteOptions& options,
std::vector<WriteBatch*>&& updates) override;

using DB::Get;
virtual Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
Expand Down Expand Up @@ -1320,12 +1324,12 @@ class DBImpl : public DB {
size_t batch_cnt = 0,
PreReleaseCallback* pre_release_callback = nullptr);

Status PebbleWriteImpl(const WriteOptions& write_options,
WriteBatch* my_batch,
WriteCallback* callback = nullptr,
uint64_t* log_used = nullptr, uint64_t log_ref = 0,
uint64_t* seq_used = nullptr);
void PebbleWriteCommit(CommitRequest* request);
Status MultiBatchWriteImpl(const WriteOptions& write_options,
std::vector<WriteBatch*>&& my_batch,
WriteCallback* callback = nullptr,
uint64_t* log_used = nullptr, uint64_t log_ref = 0,
uint64_t* seq_used = nullptr);
void MultiBatchWriteCommit(CommitRequest* request);

Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates,
WriteCallback* callback = nullptr,
Expand Down Expand Up @@ -1732,7 +1736,7 @@ class DBImpl : public DB {
}

if (!immutable_db_options_.unordered_write &&
!immutable_db_options_.enable_pipelined_commit) {
!immutable_db_options_.enable_multi_batch_write) {
// Then the writes are finished before the next write group starts
return;
}
Expand Down
6 changes: 3 additions & 3 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,10 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src,

// multi thread write do not support two-write-que or write in 2PC
if (result.two_write_queues || result.allow_2pc) {
result.enable_pipelined_commit = false;
result.enable_multi_batch_write = false;
}

if (result.enable_pipelined_commit) {
if (result.enable_multi_batch_write) {
result.enable_pipelined_write = false;
result.allow_concurrent_memtable_write = true;
}
Expand Down Expand Up @@ -1070,7 +1070,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
bool has_valid_writes = false;
status = WriteBatchInternal::InsertInto(
&batch, column_family_memtables_.get(), &flush_scheduler_,
&trim_history_scheduler_, true, wal_number, this,
&trim_history_scheduler_, true, wal_number, 0, this,
false /* concurrent_memtable_writes */, next_sequence,
&has_valid_writes, seq_per_batch_, batch_per_txn_);
MaybeIgnoreError(&status);
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_secondary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ Status DBImplSecondary::RecoverLogFiles(
status = WriteBatchInternal::InsertInto(
&batch, column_family_memtables_.get(),
nullptr /* flush_scheduler */, nullptr /* trim_history_scheduler*/,
true, log_number, this, false /* concurrent_memtable_writes */,
true, log_number, 0, this, false /* concurrent_memtable_writes */,
next_sequence, &has_valid_writes, seq_per_batch_, batch_per_txn_);
}
// If column family was not found, it might mean that the WAL write
Expand Down
Loading

0 comments on commit 19db40b

Please sign in to comment.