Skip to content

Commit

Permalink
Support remote read on uni ps (#6930)
Browse files Browse the repository at this point in the history
ref #6882
  • Loading branch information
lidezhu authored Mar 6, 2023
1 parent 9ba4947 commit 9fb5cc1
Show file tree
Hide file tree
Showing 28 changed files with 1,098 additions and 73 deletions.
12 changes: 12 additions & 0 deletions dbms/src/Storages/Page/V2/PageFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ std::pair<ByteBuffer, ByteBuffer> genWriteData( //
case WriteBatchWriteType::PUT_EXTERNAL:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Should not serialize with {}", magic_enum::enum_name(write.type));
break;
default:
throw Exception(fmt::format("Unknown write {}", static_cast<Int32>(write.type)), ErrorCodes::LOGICAL_ERROR);
break;
}
}

Expand Down Expand Up @@ -234,6 +237,9 @@ std::pair<ByteBuffer, ByteBuffer> genWriteData( //

edit.ref(write.page_id, write.ori_page_id);
break;
default:
throw Exception(fmt::format("Unknown write {}", static_cast<Int32>(write.type)), ErrorCodes::LOGICAL_ERROR);
break;
}
}

Expand Down Expand Up @@ -439,6 +445,9 @@ bool PageFile::LinkingMetaAdapter::linkToNewSequenceNext(WriteBatch::SequenceID
pos += sizeof(PageId);
break;
}
default:
throw Exception(fmt::format("Unknown write {}", static_cast<Int32>(write_type)), ErrorCodes::LOGICAL_ERROR);
break;
}
}

Expand Down Expand Up @@ -686,6 +695,9 @@ void PageFile::MetaMergingReader::moveNext(PageFormat::Version * v)
curr_edit.ref(ref_id, page_id);
break;
}
default:
throw Exception(fmt::format("Unknown write {}", static_cast<Int32>(write_type)), ErrorCodes::LOGICAL_ERROR);
break;
}
}
// move `pos` over the checksum of WriteBatch
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/Page/V2/VersionSet/PageEntriesBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ void PageEntriesBuilder::apply(const PageEntriesEdit & edit)
case WriteBatchWriteType::UPSERT:
current_version->upsertPage(rec.page_id, rec.entry);
break;
default:
throw Exception(fmt::format("Unknown write {}", static_cast<Int32>(rec.type)), ErrorCodes::LOGICAL_ERROR);
break;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,9 @@ void DeltaVersionEditAcceptor::apply(PageEntriesEdit & edit)
case WriteBatchWriteType::UPSERT:
throw Exception(ErrorCodes::LOGICAL_ERROR, "DeltaVersionEditAcceptor::apply with invalid type {}", magic_enum::enum_name(rec.type));
break;
default:
throw Exception(fmt::format("Unknown write {}", static_cast<Int32>(rec.type)), ErrorCodes::LOGICAL_ERROR);
break;
}
}
}
Expand Down Expand Up @@ -595,6 +598,9 @@ void DeltaVersionEditAcceptor::applyInplace(const String & name,
case WriteBatchWriteType::UPSERT:
current->upsertPage(rec.page_id, rec.entry);
break;
default:
throw Exception(fmt::format("Unknown write {}", static_cast<Int32>(rec.type)), ErrorCodes::LOGICAL_ERROR);
break;
}
}
}
Expand Down
12 changes: 10 additions & 2 deletions dbms/src/Storages/Page/V3/Blob/BlobStat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,16 @@ BlobStats::BlobStats(LoggerPtr log_, PSDiskDelegatorPtr delegator_, BlobConfig &

void BlobStats::restoreByEntry(const PageEntryV3 & entry)
{
auto stat = blobIdToStat(entry.file_id);
stat->restoreSpaceMap(entry.offset, entry.getTotalSize());
if (entry.file_id != INVALID_BLOBFILE_ID)
{
auto stat = blobIdToStat(entry.file_id);
stat->restoreSpaceMap(entry.offset, entry.getTotalSize());
}
else
{
// It must be an entry point to remote data location
RUNTIME_CHECK(entry.checkpoint_info.has_value() && entry.checkpoint_info->is_local_data_reclaimed);
}
}

std::pair<BlobFileId, String> BlobStats::getBlobIdFromName(String blob_name)
Expand Down
71 changes: 69 additions & 2 deletions dbms/src/Storages/Page/V3/BlobStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ BlobStore<Trait>::handleLargeWrite(typename Trait::WriteBatch & wb, const WriteL
switch (write.type)
{
case WriteBatchWriteType::PUT:
case WriteBatchWriteType::UPDATE_DATA_FROM_REMOTE:
{
ChecksumClass digest;
PageEntryV3 entry;
Expand Down Expand Up @@ -220,7 +221,30 @@ BlobStore<Trait>::handleLargeWrite(typename Trait::WriteBatch & wb, const WriteL
LOG_ERROR(log, "[blob_id={}] [offset_in_file={}] [size={}] write failed.", blob_id, offset_in_file, write.size);
throw e;
}
if (write.type == WriteBatchWriteType::PUT)
{
edit.put(wb.getFullPageId(write.page_id), entry);
}
else
{
edit.updateRemote(wb.getFullPageId(write.page_id), entry);
}

break;
}
case WriteBatchWriteType::PUT_REMOTE:
{
PageEntryV3 entry;
entry.file_id = INVALID_BLOBFILE_ID;
entry.tag = write.tag;
entry.checkpoint_info = CheckpointInfo{
.data_location = *write.data_location,
.is_local_data_reclaimed = true,
};
if (!write.offsets.empty())
{
entry.field_offsets.swap(write.offsets);
}
edit.put(wb.getFullPageId(write.page_id), entry);
break;
}
Expand All @@ -239,6 +263,7 @@ BlobStore<Trait>::handleLargeWrite(typename Trait::WriteBatch & wb, const WriteL
break;
case WriteBatchWriteType::UPSERT:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown write type: {}", magic_enum::enum_name(write.type));
break;
}
}

Expand All @@ -247,7 +272,7 @@ BlobStore<Trait>::handleLargeWrite(typename Trait::WriteBatch & wb, const WriteL

template <typename Trait>
typename BlobStore<Trait>::PageEntriesEdit
BlobStore<Trait>::write(typename Trait::WriteBatch & wb, const WriteLimiterPtr & write_limiter)
BlobStore<Trait>::write(typename Trait::WriteBatch && wb, const WriteLimiterPtr & write_limiter)
{
ProfileEvents::increment(ProfileEvents::PSMWritePages, wb.putWriteCount());

Expand All @@ -258,10 +283,26 @@ BlobStore<Trait>::write(typename Trait::WriteBatch & wb, const WriteLimiterPtr &
if (all_page_data_size == 0)
{
// Shortcut for WriteBatch that don't need to persist blob data.
for (auto & write : wb.getWrites())
for (auto & write : wb.getMutWrites())
{
switch (write.type)
{
case WriteBatchWriteType::PUT_REMOTE:
{
PageEntryV3 entry;
entry.file_id = INVALID_BLOBFILE_ID;
entry.tag = write.tag;
entry.checkpoint_info = CheckpointInfo{
.data_location = *write.data_location,
.is_local_data_reclaimed = true,
};
if (!write.offsets.empty())
{
entry.field_offsets.swap(write.offsets);
}
edit.put(wb.getFullPageId(write.page_id), entry);
break;
}
case WriteBatchWriteType::DEL:
{
edit.del(wb.getFullPageId(write.page_id));
Expand All @@ -280,7 +321,9 @@ BlobStore<Trait>::write(typename Trait::WriteBatch & wb, const WriteLimiterPtr &
}
case WriteBatchWriteType::PUT:
case WriteBatchWriteType::UPSERT:
case WriteBatchWriteType::UPDATE_DATA_FROM_REMOTE:
throw Exception(ErrorCodes::LOGICAL_ERROR, "write batch have a invalid total size == 0 while this kind of entry exist, write_type={}", magic_enum::enum_name(write.type));
break;
}
}
return edit;
Expand Down Expand Up @@ -319,6 +362,7 @@ BlobStore<Trait>::write(typename Trait::WriteBatch & wb, const WriteLimiterPtr &
switch (write.type)
{
case WriteBatchWriteType::PUT:
case WriteBatchWriteType::UPDATE_DATA_FROM_REMOTE:
{
ChecksumClass digest;
PageEntryV3 entry;
Expand Down Expand Up @@ -359,6 +403,29 @@ BlobStore<Trait>::write(typename Trait::WriteBatch & wb, const WriteLimiterPtr &
}

buffer_pos += write.size;
if (write.type == WriteBatchWriteType::PUT)
{
edit.put(wb.getFullPageId(write.page_id), entry);
}
else
{
edit.updateRemote(wb.getFullPageId(write.page_id), entry);
}
break;
}
case WriteBatchWriteType::PUT_REMOTE:
{
PageEntryV3 entry;
entry.file_id = INVALID_BLOBFILE_ID;
entry.tag = write.tag;
entry.checkpoint_info = CheckpointInfo{
.data_location = *write.data_location,
.is_local_data_reclaimed = true,
};
if (!write.offsets.empty())
{
entry.field_offsets.swap(write.offsets);
}
edit.put(wb.getFullPageId(write.page_id), entry);
break;
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/BlobStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class BlobStore : private Allocator<false>
const WriteLimiterPtr & write_limiter = nullptr,
const ReadLimiterPtr & read_limiter = nullptr);

PageEntriesEdit write(typename Trait::WriteBatch & wb, const WriteLimiterPtr & write_limiter = nullptr);
PageEntriesEdit write(typename Trait::WriteBatch && wb, const WriteLimiterPtr & write_limiter = nullptr);

void remove(const PageEntries & del_entries);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ try
wb.putPage("page_foo", 0, "The flower carriage rocked", {4, 10, 12});
wb.delPage("id_bar");
wb.putPage("page_abc", 0, "Dreamed of the day that she was born");
auto blob_store_edits = blob_store.write(wb, nullptr);
auto blob_store_edits = blob_store.write(std::move(wb), nullptr);

ASSERT_EQ(blob_store_edits.size(), 3);

Expand Down
69 changes: 67 additions & 2 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,34 @@ void VersionedPageEntries<Trait>::createDelete(const PageVersion & ver)
toDebugString()));
}

template <typename Trait>
bool VersionedPageEntries<Trait>::updateLocalCacheForRemotePage(const PageVersion & ver, const PageEntryV3 & entry)
{
auto page_lock = acquireLock();
if (type == EditRecordType::VAR_ENTRY)
{
auto last_iter = MapUtils::findMutLess(entries, PageVersion(ver.sequence + 1, 0));
RUNTIME_CHECK(last_iter != entries.end() && last_iter->second.isEntry());
auto & ori_entry = last_iter->second.entry;
RUNTIME_CHECK(ori_entry.checkpoint_info.has_value());
if (!ori_entry.checkpoint_info->is_local_data_reclaimed)
{
return false;
}
ori_entry.file_id = entry.file_id;
ori_entry.size = entry.size;
ori_entry.offset = entry.offset;
ori_entry.checksum = entry.checksum;
ori_entry.checkpoint_info->is_local_data_reclaimed = false;
return true;
}
throw Exception(fmt::format(
"try to update remote page with invalid state "
"[ver={}] [state={}]",
ver,
toDebugString()));
}

// Create a new reference version with version=`ver` and `ori_page_id_`.
// If create success, then return true, otherwise return false.
template <typename Trait>
Expand Down Expand Up @@ -1473,6 +1501,7 @@ void PageDirectory<Trait>::apply(PageEntriesEdit && edit, const WriteLimiterPtr
case EditRecordType::VAR_ENTRY:
case EditRecordType::VAR_EXTERNAL:
case EditRecordType::VAR_REF:
case EditRecordType::UPDATE_DATA_FROM_REMOTE:
throw Exception(fmt::format("should not handle edit with invalid type [type={}]", magic_enum::enum_name(r.type)));
}
}
Expand All @@ -1488,6 +1517,34 @@ void PageDirectory<Trait>::apply(PageEntriesEdit && edit, const WriteLimiterPtr
}
}

template <typename Trait>
typename PageDirectory<Trait>::PageEntries PageDirectory<Trait>::updateLocalCacheForRemotePages(PageEntriesEdit && edit, const DB::PageStorageSnapshotPtr & snap_, const WriteLimiterPtr & write_limiter)
{
std::unique_lock apply_lock(apply_mutex);
auto seq = toConcreteSnapshot(snap_)->sequence;
for (auto & r : edit.getMutRecords())
{
r.version = PageVersion(seq, 0);
}
wal->apply(Trait::Serializer::serializeTo(edit), write_limiter);
typename PageDirectory<Trait>::PageEntries ignored_entries;
{
std::unique_lock table_lock(table_rw_mutex);

for (const auto & r : edit.getRecords())
{
auto iter = mvcc_table_directory.lower_bound(r.page_id);
assert(iter != mvcc_table_directory.end());
auto & version_list = iter->second;
if (!version_list->updateLocalCacheForRemotePage(PageVersion(seq, 0), r.entry))
{
ignored_entries.push_back(r.entry);
}
}
}
return ignored_entries;
}

template <typename Trait>
void PageDirectory<Trait>::gcApply(PageEntriesEdit && migrated_edit, const WriteLimiterPtr & write_limiter)
{
Expand Down Expand Up @@ -1668,8 +1725,16 @@ bool PageDirectory<Trait>::tryDumpSnapshot(const ReadLimiterPtr & read_limiter,
auto edit_from_disk = collapsed_dir->dumpSnapshotToEdit();
files_snap.num_records = edit_from_disk.size();
files_snap.read_elapsed_ms = watch.elapsedMilliseconds();
bool done_any_io = wal->saveSnapshot(std::move(files_snap), Trait::Serializer::serializeTo(edit_from_disk), write_limiter);
return done_any_io;
if constexpr (std::is_same_v<Trait, u128::PageDirectoryTrait>)
{
bool done_any_io = wal->saveSnapshot(std::move(files_snap), Trait::Serializer::serializeTo(edit_from_disk), write_limiter);
return done_any_io;
}
else if constexpr (std::is_same_v<Trait, universal::PageDirectoryTrait>)
{
bool done_any_io = wal->saveSnapshot(std::move(files_snap), Trait::Serializer::serializeInCompressedFormTo(edit_from_disk), write_limiter);
return done_any_io;
}
}

template <typename Trait>
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Storages/Page/V3/PageDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ class VersionedPageEntries

void createDelete(const PageVersion & ver);

// Update the local cache info for remote page,
// Must a hold snap to prevent the page being deleted.
bool updateLocalCacheForRemotePage(const PageVersion & ver, const PageEntryV3 & entry);

std::shared_ptr<PageId> fromRestored(const typename PageEntriesEdit::EditRecord & rec);

std::tuple<ResolveResult, PageId, PageVersion>
Expand Down Expand Up @@ -346,6 +350,9 @@ class PageDirectory

void apply(PageEntriesEdit && edit, const WriteLimiterPtr & write_limiter = nullptr);

// return ignored entries, and the corresponding space in BlobFile should be reclaimed
PageEntries updateLocalCacheForRemotePages(PageEntriesEdit && edit, const DB::PageStorageSnapshotPtr & snap_, const WriteLimiterPtr & write_limiter = nullptr);

std::pair<GcEntriesMap, PageSize>
getEntriesByBlobIds(const std::vector<BlobFileId> & blob_ids) const;

Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ void PageDirectoryFactory<Trait>::applyRecord(
case EditRecordType::PUT:
version_list->createNewEntry(restored_version, r.entry);
break;
case EditRecordType::UPDATE_DATA_FROM_REMOTE:
version_list->updateLocalCacheForRemotePage(restored_version, r.entry);
break;
case EditRecordType::DEL:
case EditRecordType::VAR_DELETE: // nothing different from `DEL`
version_list->createDelete(restored_version);
Expand Down
Loading

0 comments on commit 9fb5cc1

Please sign in to comment.