Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UniPS: Provide more information when meets exception in updateLocalCacheForRemotePage #9311

Merged
merged 13 commits into from
Aug 12, 2024
23 changes: 12 additions & 11 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,17 +117,18 @@ namespace DB
M(force_agg_two_level_hash_table_before_merge) \
M(force_thread_0_no_agg_spill)

#define APPLY_FOR_PAUSEABLE_FAILPOINTS_ONCE(M) \
M(pause_with_alter_locks_acquired) \
M(hang_in_execution) \
M(pause_before_dt_background_delta_merge) \
M(pause_until_dt_background_delta_merge) \
M(pause_before_apply_raft_cmd) \
M(pause_before_apply_raft_snapshot) \
M(pause_until_apply_raft_snapshot) \
M(pause_after_copr_streams_acquired_once) \
M(pause_before_register_non_root_mpp_task) \
M(pause_before_make_non_root_mpp_task_active)
#define APPLY_FOR_PAUSEABLE_FAILPOINTS_ONCE(M) \
M(pause_with_alter_locks_acquired) \
M(hang_in_execution) \
M(pause_before_dt_background_delta_merge) \
M(pause_until_dt_background_delta_merge) \
M(pause_before_apply_raft_cmd) \
M(pause_before_apply_raft_snapshot) \
M(pause_until_apply_raft_snapshot) \
M(pause_after_copr_streams_acquired_once) \
M(pause_before_register_non_root_mpp_task) \
M(pause_before_make_non_root_mpp_task_active) \
M(pause_before_page_dir_update_local_cache)

#define APPLY_FOR_PAUSEABLE_FAILPOINTS(M) \
M(pause_when_reading_from_dt_stream) \
Expand Down
81 changes: 56 additions & 25 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ namespace FailPoints
{
extern const char random_slow_page_storage_remove_expired_snapshots[];
extern const char pause_before_full_gc_prepare[];
extern const char pause_before_page_dir_update_local_cache[];
} // namespace FailPoints

namespace ErrorCodes
Expand All @@ -86,7 +87,7 @@ namespace PS::V3
template <typename Trait>
PageLock VersionedPageEntries<Trait>::acquireLock() const NO_THREAD_SAFETY_ANALYSIS
{
return std::lock_guard(m);
return std::lock_guard{m};
}

template <typename Trait>
Expand Down Expand Up @@ -337,13 +338,24 @@ bool VersionedPageEntries<Trait>::updateLocalCacheForRemotePage(const PageVersio
if (type == EditRecordType::VAR_ENTRY)
{
auto last_iter = MapUtils::findMutLess(entries, PageVersion(ver.sequence + 1, 0));
RUNTIME_CHECK_MSG(last_iter != entries.end() && last_iter->second.isEntry(), "{}", toDebugString());
RUNTIME_CHECK_MSG(
last_iter != entries.end() && last_iter->second.isEntry(),
"this={}, entries={}, ver={}, entry={}",
toDebugString(),
entries,
ver,
entry);

auto & ori_entry = last_iter->second.entry.value();
RUNTIME_CHECK_MSG(ori_entry.checkpoint_info.has_value(), "{}", toDebugString());

// maybe another thread has in-place update the local blob location, ignored
if (!ori_entry.checkpoint_info.is_local_data_reclaimed)
{
return false;
}

// update the entry in-place
ori_entry.file_id = entry.file_id;
ori_entry.size = entry.size;
ori_entry.offset = entry.offset;
Expand Down Expand Up @@ -1784,6 +1796,7 @@ typename PageDirectory<Trait>::PageEntries PageDirectory<Trait>::updateLocalCach
const DB::PageStorageSnapshotPtr & snap_,
const WriteLimiterPtr & write_limiter)
{
FAIL_POINT_PAUSE(FailPoints::pause_before_page_dir_update_local_cache);
std::unique_lock apply_lock(apply_mutex);
auto seq = toConcreteSnapshot(snap_)->sequence;
for (auto & r : edit.getMutRecords())
Expand All @@ -1797,35 +1810,53 @@ typename PageDirectory<Trait>::PageEntries PageDirectory<Trait>::updateLocalCach

for (const auto & r : edit.getRecords())
{
auto id_to_resolve = r.page_id;
auto sequence_to_resolve = seq;
while (true)
try
{
auto iter = mvcc_table_directory.lower_bound(id_to_resolve);
assert(iter != mvcc_table_directory.end());
auto & version_list = iter->second;
auto [resolve_state, next_id_to_resolve, next_ver_to_resolve] = version_list->resolveToPageId(
sequence_to_resolve,
/*ignore_delete=*/id_to_resolve != r.page_id,
nullptr);
if (resolve_state == ResolveResult::TO_NORMAL)
auto id_to_resolve = r.page_id;
auto sequence_to_resolve = seq;
while (true)
{
if (!version_list->updateLocalCacheForRemotePage(PageVersion(sequence_to_resolve, 0), r.entry))
auto iter = mvcc_table_directory.lower_bound(id_to_resolve);
assert(iter != mvcc_table_directory.end());
auto & version_list = iter->second;
auto [resolve_state, next_id_to_resolve, next_ver_to_resolve] = version_list->resolveToPageId(
sequence_to_resolve,
/*ignore_delete=*/id_to_resolve != r.page_id,
nullptr);
if (resolve_state == ResolveResult::TO_NORMAL)
{
ignored_entries.push_back(r.entry);
if (!version_list->updateLocalCacheForRemotePage(PageVersion(sequence_to_resolve, 0), r.entry))
{
// The entry is not valid for updating the version_list.
// Caller should notice these part of "ignored_entries" and release
// the space allocated for these invalid entries.
// For the information persisted in WAL, it should be ignored when
// restoring from disk.
ignored_entries.push_back(r.entry);
}
break;
}
else if (resolve_state == ResolveResult::TO_REF)
{
id_to_resolve = next_id_to_resolve;
sequence_to_resolve = next_ver_to_resolve.sequence;
}
else
{
RUNTIME_CHECK(false);
}
break;
}
else if (resolve_state == ResolveResult::TO_REF)
{
id_to_resolve = next_id_to_resolve;
sequence_to_resolve = next_ver_to_resolve.sequence;
}
else
{
RUNTIME_CHECK(false);
}
}
catch (DB::Exception & e)
{
e.addMessage(fmt::format(
" type={}, page_id={}, ver={}, seq={}",
magic_enum::enum_name(r.type),
r.page_id,
r.version,
seq));
throw e;
}
}
}
return ignored_entries;
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Storages/Page/V3/Universal/UniversalPageStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -450,10 +450,14 @@ void UniversalPageStorage::unregisterUniversalExternalPagesCallbacks(const Strin

void UniversalPageStorage::tryUpdateLocalCacheForRemotePages(UniversalWriteBatch & wb, SnapshotPtr snapshot) const
{
// store the downloaded page data into local cache and generate new "edit"
auto edit = blob_store->write(std::move(wb));
// Update the entries to the location of BlobFile.
auto ignored_entries = page_directory->updateLocalCacheForRemotePages(std::move(edit), snapshot);
if (!ignored_entries.empty())
{
// Some entries are not valid for updating the page_directory. BlobStore should
// release the space for new blob data.
blob_store->removeEntries(ignored_entries);
}
}
Expand Down
185 changes: 185 additions & 0 deletions dbms/src/Storages/Page/V3/Universal/tests/gtest_remote_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <Common/FailPoint.h>
#include <Common/SyncPoint/Ctl.h>
#include <Flash/Disaggregated/MockS3LockClient.h>
#include <Flash/Disaggregated/S3LockClient.h>
#include <IO/BaseFile/PosixRandomAccessFile.h>
Expand All @@ -38,6 +39,10 @@

#include <memory>

namespace DB::FailPoints
{
extern const char pause_before_page_dir_update_local_cache[];
} // namespace DB::FailPoints

namespace DB::PS::universal::tests
{
Expand Down Expand Up @@ -279,6 +284,185 @@ try
}
CATCH

TEST_P(UniPageStorageRemoteReadTest, WriteReadGCWithRestart)
try
{
const String test_page_id = "aaabbb";
/// Prepare data on remote store
auto writer = PS::V3::CPFilesWriter::create({
.data_file_path_pattern = data_file_path_pattern,
.data_file_id_pattern = data_file_id_pattern,
.manifest_file_path = manifest_file_path,
.manifest_file_id = manifest_file_id,
.data_source = PS::V3::CPWriteDataSourceFixture::create({{10, "nahida opened her eyes"}}),
});

writer->writePrefix({
.writer = {},
.sequence = 5,
.last_sequence = 3,
});
{
auto edits = PS::V3::universal::PageEntriesEdit{};
edits.appendRecord(
{.type = PS::V3::EditRecordType::VAR_ENTRY, .page_id = test_page_id, .entry = {.size = 22, .offset = 10}});
writer->writeEditsAndApplyCheckpointInfo(edits);
}
auto data_paths = writer->writeSuffix();
writer.reset();
for (const auto & data_path : data_paths)
{
uploadFile(data_path);
}
uploadFile(manifest_file_path);

/// Put remote page into local
auto manifest_file = PosixRandomAccessFile::create(manifest_file_path);
auto manifest_reader = PS::V3::CPManifestFileReader::create({
.plain_file = manifest_file,
});
manifest_reader->readPrefix();
PS::V3::CheckpointProto::StringsInternMap im;
{
auto edits_r = manifest_reader->readEdits(im);
auto r = edits_r->getRecords();
ASSERT_EQ(1, r.size());

UniversalWriteBatch wb;
wb.disableRemoteLock();
wb.putRemotePage(
r[0].page_id,
0,
r[0].entry.size,
r[0].entry.checkpoint_info.data_location,
std::move(r[0].entry.field_offsets));
page_storage->write(std::move(wb));
}

// generate snapshot for reading
auto snap0 = page_storage->getSnapshot("read");

// Delete the page before reading from snapshot
{
UniversalWriteBatch wb;
wb.disableRemoteLock();
wb.delPage(test_page_id);
page_storage->write(std::move(wb));
}

// read with snapshot
{
auto page = page_storage->read(test_page_id, nullptr, snap0);
ASSERT_TRUE(page.isValid());
ASSERT_EQ("nahida opened her eyes", String(page.data.begin(), page.data.size()));
}

// Mock restart
reload();

{
// ensure the page is deleted as expected
auto page = page_storage->read(test_page_id, nullptr, nullptr, false);
ASSERT_FALSE(page.isValid());
}
}
CATCH

TEST_P(UniPageStorageRemoteReadTest, MultiThreadReadUpdateRmotePage)
try
{
const String test_page_id = "aaabbb";
/// Prepare data on remote store
auto writer = PS::V3::CPFilesWriter::create({
.data_file_path_pattern = data_file_path_pattern,
.data_file_id_pattern = data_file_id_pattern,
.manifest_file_path = manifest_file_path,
.manifest_file_id = manifest_file_id,
.data_source = PS::V3::CPWriteDataSourceFixture::create({{10, "nahida opened her eyes"}}),
});

writer->writePrefix({
.writer = {},
.sequence = 5,
.last_sequence = 3,
});
{
auto edits = PS::V3::universal::PageEntriesEdit{};
edits.appendRecord(
{.type = PS::V3::EditRecordType::VAR_ENTRY, .page_id = test_page_id, .entry = {.size = 22, .offset = 10}});
writer->writeEditsAndApplyCheckpointInfo(edits);
}
auto data_paths = writer->writeSuffix();
writer.reset();
for (const auto & data_path : data_paths)
{
uploadFile(data_path);
}
uploadFile(manifest_file_path);

/// Put remote page into local
auto manifest_file = PosixRandomAccessFile::create(manifest_file_path);
auto manifest_reader = PS::V3::CPManifestFileReader::create({
.plain_file = manifest_file,
});
manifest_reader->readPrefix();
PS::V3::CheckpointProto::StringsInternMap im;
{
auto edits_r = manifest_reader->readEdits(im);
auto r = edits_r->getRecords();
ASSERT_EQ(1, r.size());

UniversalWriteBatch wb;
wb.disableRemoteLock();
wb.putRemotePage(
r[0].page_id,
0,
r[0].entry.size,
r[0].entry.checkpoint_info.data_location,
std::move(r[0].entry.field_offsets));
page_storage->write(std::move(wb));
}

// read with snapshot
FAIL_POINT_PAUSE(FailPoints::pause_before_page_dir_update_local_cache);
auto th_read0 = std::async([&]() {
auto snap0 = page_storage->getSnapshot("read0");
auto page = page_storage->read(test_page_id, nullptr, snap0);
ASSERT_TRUE(page.isValid());
ASSERT_EQ("nahida opened her eyes", String(page.data.begin(), page.data.size()));
LOG_DEBUG(log, "th_read0 finished");
});
auto th_read1 = std::async([&]() {
auto snap1 = page_storage->getSnapshot("read1");
auto page = page_storage->read(test_page_id, nullptr, snap1);
ASSERT_TRUE(page.isValid());
ASSERT_EQ("nahida opened her eyes", String(page.data.begin(), page.data.size()));
LOG_DEBUG(log, "th_read1 finished");
});
LOG_DEBUG(log, "concurrent read block before update");

FailPointHelper::disableFailPoint(FailPoints::pause_before_page_dir_update_local_cache);
th_read0.get();
th_read1.get();
LOG_DEBUG(log, "there must be one read thread update fail");

{
auto page = page_storage->read(test_page_id);
ASSERT_TRUE(page.isValid());
ASSERT_EQ("nahida opened her eyes", String(page.data.begin(), page.data.size()));
}

// Mock restart
reload();

{
auto page = page_storage->read(test_page_id);
ASSERT_TRUE(page.isValid());
ASSERT_EQ("nahida opened her eyes", String(page.data.begin(), page.data.size()));
}
}
CATCH

TEST_P(UniPageStorageRemoteReadTest, WriteReadWithRef)
try
{
Expand Down Expand Up @@ -613,6 +797,7 @@ CATCH
INSTANTIATE_TEST_CASE_P(
UniPageStorageRemote,
UniPageStorageRemoteReadTest,
// <is_encrypted, is_keyspace_encrypted>
testing::Values(std::make_pair(false, false), std::make_pair(true, false), std::make_pair(true, true)));

} // namespace DB::PS::universal::tests
Loading