Skip to content

Commit

Permalink
Decrease ps write latency (#7154) (#7737)
Browse files Browse the repository at this point in the history
ref #6827, close #7735
  • Loading branch information
ti-chi-bot authored Jul 4, 2023
1 parent e88b2f8 commit 5b97981
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 6 deletions.
3 changes: 2 additions & 1 deletion dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ namespace DB
F(type_search_pos, {{"type", "search_pos"}}, ExpBuckets{0.00005, 1.8, 26}), \
F(type_blob_write, {{"type", "blob_write"}}, ExpBuckets{0.00005, 1.8, 26}), \
F(type_latch, {{"type", "latch"}}, ExpBuckets{0.00005, 1.8, 26}), \
F(type_wait_in_group, {{"type", "wait_in_group"}}, ExpBuckets{0.00005, 1.8, 26}), \
F(type_wal, {{"type", "wal"}}, ExpBuckets{0.00005, 1.8, 26}), \
F(type_commit, {{"type", "commit"}}, ExpBuckets{0.00005, 1.8, 26})) \
M(tiflash_storage_logical_throughput_bytes, "The logical throughput of read tasks of storage in bytes", Histogram, \
Expand Down Expand Up @@ -261,7 +262,7 @@ namespace DB
M(tiflash_storage_read_thread_seconds, "Bucketed histogram of read thread", Histogram, \
F(type_merged_task, {{"type", "merged_task"}}, ExpBuckets{0.001, 2, 20})) \
M(tiflash_mpp_task_manager, "The gauge of mpp task manager", Gauge, \
F(type_mpp_query_count, {"type", "mpp_query_count"})) \
F(type_mpp_query_count, {"type", "mpp_query_count"}))

// clang-format on

Expand Down
80 changes: 76 additions & 4 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1245,22 +1245,91 @@ void PageDirectory::applyRefEditRecord(
SYNC_FOR("after_PageDirectory::applyRefEditRecord_incr_ref_count");
}

PageDirectory::Writer * PageDirectory::buildWriteGroup(Writer * first, std::unique_lock<std::mutex> & /*lock*/)
{
RUNTIME_CHECK(!writers.empty());
RUNTIME_CHECK(first == writers.front());
auto * last_writer = first;
auto iter = writers.begin();
iter++;
for (; iter != writers.end(); iter++)
{
auto * w = *iter;
first->edit->merge(std::move(*(w->edit)));
last_writer = w;
}
return last_writer;
}

void PageDirectory::apply(PageEntriesEdit && edit, const WriteLimiterPtr & write_limiter)
{
// We need to make sure there is only one apply thread to write wal and then increase `sequence`.
// Note that, as read threads use current `sequence` as read_seq, we cannot increase `sequence`
// before applying edit to `mvcc_table_directory`.
//
// TODO: It is totally serialized by only 1 thread with IO waiting. Make this process a
// pipeline so that we can batch the incoming edit when doing IO.

Stopwatch watch;
Writer w;
w.edit = &edit;

Stopwatch watch;
std::unique_lock apply_lock(apply_mutex);

GET_METRIC(tiflash_storage_page_write_duration_seconds, type_latch).Observe(watch.elapsedSeconds());
watch.restart();

writers.push_back(&w);
SYNC_FOR("after_PageDirectory::enter_write_group");
w.cv.wait(apply_lock, [&] { return w.done || &w == writers.front(); });
GET_METRIC(tiflash_storage_page_write_duration_seconds, type_wait_in_group).Observe(watch.elapsedSeconds());
watch.restart();
if (w.done)
{
if (unlikely(!w.success))
{
if (w.exception)
{
w.exception->rethrow();
}
else
{
throw Exception("Unknown exception");
}
}
return;
}

auto * last_writer = buildWriteGroup(&w, apply_lock);
apply_lock.unlock();
SYNC_FOR("before_PageDirectory::leader_apply");

// `true` means the write process has completed without exception
bool success = false;
std::unique_ptr<DB::Exception> exception = nullptr;

SCOPE_EXIT({
apply_lock.lock();
while (true)
{
auto * ready = writers.front();
writers.pop_front();
if (ready != &w)
{
ready->done = true;
ready->success = success;
if (exception != nullptr)
{
ready->exception = std::unique_ptr<DB::Exception>(exception->clone());
}
ready->cv.notify_one();
}
if (ready == last_writer)
break;
}
if (!writers.empty())
{
writers.front()->cv.notify_one();
}
});

UInt64 max_sequence = sequence.load();
const auto edit_size = edit.size();

Expand Down Expand Up @@ -1329,13 +1398,16 @@ void PageDirectory::apply(PageEntriesEdit && edit, const WriteLimiterPtr & write
catch (DB::Exception & e)
{
e.addMessage(fmt::format(" [type={}] [page_id={}] [ver={}] [edit_size={}]", magic_enum::enum_name(r.type), r.page_id, r.version, edit_size));
exception.reset(e.clone());
e.rethrow();
}
}

// stage 3, the edit committed, incr the sequence number to publish changes for `createSnapshot`
sequence.fetch_add(edit_size);
}

success = true;
}

void PageDirectory::gcApply(PageEntriesEdit && migrated_edit, const WriteLimiterPtr & write_limiter)
Expand Down
36 changes: 35 additions & 1 deletion dbms/src/Storages/Page/V3/PageDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,13 @@ class PageDirectory
return u;
}

// `writers` should be used under the protection of apply_mutex
// So don't use this function in production code
size_t getWritersQueueSizeForTest()
{
return writers.size();
}

// No copying and no moving
DISALLOW_COPY_AND_MOVE(PageDirectory);

Expand Down Expand Up @@ -397,12 +404,39 @@ class PageDirectory
return std::static_pointer_cast<PageDirectorySnapshot>(ptr);
}

struct Writer
{
PageEntriesEdit * edit;
bool done = false; // The work has been performed by other thread
bool success = false; // The work complete successfully
std::unique_ptr<DB::Exception> exception;
std::condition_variable cv;
};

// return the last writer in the group
Writer * buildWriteGroup(Writer * first, std::unique_lock<std::mutex> & /*lock*/);

private:
PageId max_page_id;
std::atomic<UInt64> sequence;

// Used for avoid concurrently apply edits to wal and mvcc_table_directory.
mutable std::shared_mutex apply_mutex;
mutable std::mutex apply_mutex;
// This is a queue of Writers to PageDirectory and is protected by apply_mutex.
// Every writer enqueue itself to this queue before writing.
// And the head writer of the queue will become the leader and is responsible to write and sync the WAL.
// The write process of the leader:
// 1. scan the queue to find all available writers and merge their edits to the leader's edit;
// 2. unlock the apply_mutex;
// 3. write the edits to the WAL and sync it;
// 4. apply the edit to mvcc_table_directory;
// 5. lock the apply_mutex;
// 6. dequeue the writers found in step 1 and notify them that their write work has completed;
// 7. if the writer queue is not empty, notify the head writer to become the leader of next write;
// Other writers in the queue just wait the leader to wake them up and one of the two conditions must be true:
// 1. its work has been finished by the leader, and they can just return;
// 2. it becomes the head of the queue, so it continue to finish the write process of the leader;
std::deque<Writer *> writers;

// Used to protect mvcc_table_directory between apply threads and read threads
mutable std::shared_mutex table_rw_mutex;
Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Storages/Page/V3/PageEntriesEdit.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,15 @@ class PageEntriesEdit
records.emplace_back(rec);
}

void merge(PageEntriesEdit && other)
{
records.insert(
records.end(),
std::make_move_iterator(other.records.begin()),
std::make_move_iterator(other.records.end()));
other.records.clear();
}

EditRecords & getMutRecords() { return records; }
const EditRecords & getRecords() const { return records; }

Expand Down
93 changes: 93 additions & 0 deletions dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,15 @@ try
edit.put(buildV3Id(TEST_NAMESPACE_ID, 2), entry_updated2);
ASSERT_ANY_THROW(dir->apply(std::move(edit)));
}

// Write a new entry to make sure apply works normally after exception throw
{
PageEntriesEdit edit;
edit.put(buildV3Id(TEST_NAMESPACE_ID, 10), entry1);
dir->apply(std::move(edit));
}
auto snap2 = dir->createSnapshot();
EXPECT_ENTRY_EQ(entry1, dir, 10, snap2);
}
CATCH

Expand Down Expand Up @@ -539,6 +548,90 @@ TEST_F(PageDirectoryTest, RefWontDeadLock)
dir->apply(std::move(edit2));
}

TEST_F(PageDirectoryTest, BatchWriteSuccess)
{
PageEntryV3 entry1{.file_id = 1, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567};
PageEntryV3 entry2{.file_id = 2, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567};
PageEntryV3 entry3{.file_id = 3, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567};

auto sp_before_leader_apply = SyncPointCtl::enableInScope("before_PageDirectory::leader_apply");
auto th_write1 = std::async([&]() {
PageEntriesEdit edit;
edit.put(buildV3Id(TEST_NAMESPACE_ID, 1), entry1);
dir->apply(std::move(edit));
});
sp_before_leader_apply.waitAndPause();

// form a write group
auto sp_after_enter_write_group = SyncPointCtl::enableInScope("after_PageDirectory::enter_write_group");
auto th_write2 = std::async([&]() {
PageEntriesEdit edit;
edit.put(buildV3Id(TEST_NAMESPACE_ID, 2), entry2);
dir->apply(std::move(edit));
});
auto th_write3 = std::async([&]() {
PageEntriesEdit edit;
edit.put(buildV3Id(TEST_NAMESPACE_ID, 3), entry3);
dir->apply(std::move(edit));
});
sp_after_enter_write_group.waitAndNext();
sp_after_enter_write_group.waitAndNext();
ASSERT_EQ(dir->getWritersQueueSizeForTest(), 3); // 3 writers in write group

sp_before_leader_apply.next(); // continue first leader_apply
th_write1.get();

sp_before_leader_apply.waitAndNext(); // continue second leader_apply
th_write2.get();
th_write3.get();
ASSERT_EQ(dir->getWritersQueueSizeForTest(), 0);

auto snap = dir->createSnapshot();
EXPECT_ENTRY_EQ(entry1, dir, 1, snap);
EXPECT_ENTRY_EQ(entry2, dir, 2, snap);
EXPECT_ENTRY_EQ(entry3, dir, 3, snap);
}

TEST_F(PageDirectoryTest, BatchWriteException)
{
PageEntryV3 entry1{.file_id = 1, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567};

auto sp_before_leader_apply = SyncPointCtl::enableInScope("before_PageDirectory::leader_apply");
auto th_write1 = std::async([&]() {
PageEntriesEdit edit;
edit.put(buildV3Id(TEST_NAMESPACE_ID, 1), entry1);
dir->apply(std::move(edit));
});
sp_before_leader_apply.waitAndPause();

// form a write group
auto sp_after_enter_write_group = SyncPointCtl::enableInScope("after_PageDirectory::enter_write_group");
auto th_write2 = std::async([&]() {
PageEntriesEdit edit;
edit.ref(buildV3Id(TEST_NAMESPACE_ID, 2), buildV3Id(TEST_NAMESPACE_ID, 100));
ASSERT_ANY_THROW(dir->apply(std::move(edit)));
});
auto th_write3 = std::async([&]() {
PageEntriesEdit edit;
edit.ref(buildV3Id(TEST_NAMESPACE_ID, 3), buildV3Id(TEST_NAMESPACE_ID, 100));
ASSERT_ANY_THROW(dir->apply(std::move(edit)));
});
sp_after_enter_write_group.waitAndNext();
sp_after_enter_write_group.waitAndNext();
ASSERT_EQ(dir->getWritersQueueSizeForTest(), 3); // 3 writers in write group

sp_before_leader_apply.next(); // continue first leader_apply
th_write1.get();

sp_before_leader_apply.waitAndNext(); // continue secode leader_apply
th_write2.get();
th_write3.get();
ASSERT_EQ(dir->getWritersQueueSizeForTest(), 0);

auto snap = dir->createSnapshot();
EXPECT_ENTRY_EQ(entry1, dir, 1, snap);
}

TEST_F(PageDirectoryTest, IdempotentNewExtPageAfterAllCleaned)
{
// Make sure creating ext page after itself and all its reference are clean
Expand Down

0 comments on commit 5b97981

Please sign in to comment.