Skip to content

Commit

Permalink
Fix read when enable FAP (#7099)
Browse files Browse the repository at this point in the history
ref #6827
  • Loading branch information
lidezhu authored Mar 20, 2023
1 parent b9d373b commit 1e415c0
Show file tree
Hide file tree
Showing 22 changed files with 151 additions and 53 deletions.
3 changes: 2 additions & 1 deletion dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ namespace DB
F(type_syncing_data_freshness, {{"type", "data_freshness"}}, ExpBuckets{0.001, 2, 20})) \
M(tiflash_storage_read_tasks_count, "Total number of storage engine read tasks", Counter) \
M(tiflash_storage_command_count, "Total number of storage's command, such as delete range / shutdown /startup", Counter, \
F(type_delete_range, {"type", "delete_range"}), F(type_ingest, {"type", "ingest"})) \
F(type_delete_range, {"type", "delete_range"}), F(type_ingest, {"type", "ingest"}), \
F(type_ingest_checkpoint, {"type", "ingest_check_point"})) \
M(tiflash_storage_subtask_count, "Total number of storage's sub task", Counter, \
F(type_delta_merge_bg, {"type", "delta_merge_bg"}), \
F(type_delta_merge_bg_gc, {"type", "delta_merge_bg_gc"}), \
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Flash/Disaggregated/MockS3LockClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#pragma once

#include <Flash/Disaggregated/S3LockClient.h>
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/S3/S3Common.h>
#include <Storages/S3/S3Filename.h>
#include <aws/s3/S3Client.h>
Expand All @@ -39,7 +40,8 @@ class MockS3LockClient : public IS3LockClient
{
// If the data file exist and no delmark exist, then create a lock file on `data_file_key`
auto view = S3FilenameView::fromKey(data_file_key);
if (!objectExists(*s3_client, data_file_key))
auto object_key = view.isDMFile() ? fmt::format("{}/{}", data_file_key, DM::DMFile::metav2FileName()) : data_file_key;
if (!objectExists(*s3_client, object_key))
{
return {false, ""};
}
Expand Down
11 changes: 10 additions & 1 deletion dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,19 @@ std::tuple<ColumnFilePersistedPtr, BlockPtr> ColumnFileTiny::createFromCheckpoin
readIntBinary(bytes, buf);
auto new_cf_id = context.storage_pool->newLogPageId();
auto remote_page_id = UniversalPageIdFormat::toFullPageId(UniversalPageIdFormat::toFullPrefix(StorageType::Log, ns_id), data_page_id);
// The `data_file_id` in temp_ps is lock key, we need convert it to data key before write to local ps
auto remote_data_location = temp_ps->getCheckpointLocation(remote_page_id);
RUNTIME_CHECK(remote_data_location.has_value());
auto remote_data_file_lock_key_view = S3::S3FilenameView::fromKey(*remote_data_location->data_file_id);
RUNTIME_CHECK(remote_data_file_lock_key_view.isLockFile());
auto remote_data_file_key = remote_data_file_lock_key_view.asDataFile().toFullKey();
PS::V3::CheckpointLocation new_remote_data_location{
.data_file_id = std::make_shared<String>(remote_data_file_key),
.offset_in_file = remote_data_location->offset_in_file,
.size_in_file = remote_data_location->size_in_file};
auto entry = temp_ps->getEntry(remote_page_id);
wbs.log.putRemotePage(new_cf_id, 0, *remote_data_location, std::move(entry.field_offsets));
LOG_DEBUG(Logger::get(), "Write remote page[page_id={} remote_location={}] using local page id {}", remote_page_id, new_remote_data_location.toDebugString(), new_cf_id);
wbs.log.putRemotePage(new_cf_id, 0, entry.size, new_remote_data_location, std::move(entry.field_offsets));

auto column_file_schema = std::make_shared<ColumnFileSchema>(*schema);
return {std::make_shared<ColumnFileTiny>(column_file_schema, rows, bytes, new_cf_id), std::move(schema)};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,7 @@ class DeltaMergeStoreTestFastAddPeer : public DB::base::TiFlashStorageTestBasic
{
already_initialize_write_ps = true;
}
auto kvstore = db_context->getTMTContext().getKVStore();
{
auto meta_store = metapb::Store{};
meta_store.set_id(store_id);
kvstore->setStore(meta_store);
}
resetStoreId(current_store_id);
global_context.getSharedContextDisagg()->initFastAddPeerContext();
}

Expand All @@ -111,10 +106,22 @@ class DeltaMergeStoreTestFastAddPeer : public DB::base::TiFlashStorageTestBasic
}
}

void resetStoreId(UInt64 store_id)
{
auto kvstore = db_context->getTMTContext().getKVStore();
{
auto meta_store = metapb::Store{};
meta_store.set_id(store_id);
kvstore->setStore(meta_store);
}
}

DeltaMergeStorePtr
reload(const ColumnDefinesPtr & pre_define_columns = {}, bool is_common_handle = false, size_t rowkey_column_size = 1)
{
TiFlashStorageTestBasic::reload();
auto kvstore = db_context->getTMTContext().getKVStore();
auto store_id = kvstore->getStoreID();
if (auto ps = DB::tests::TiFlashTestEnv::getGlobalContext().getWriteNodePageStorage(); ps)
{
auto mock_s3lock_client = std::make_shared<DB::S3::MockS3LockClient>(DB::S3::ClientFactory::instance().sharedTiFlashClient());
Expand Down Expand Up @@ -166,7 +173,7 @@ class DeltaMergeStoreTestFastAddPeer : public DB::base::TiFlashStorageTestBasic
return {handle_range, {external_file}}; // There are some duplicated info. This is to minimize the change to our test code.
}

void dumpCheckpoint()
void dumpCheckpoint(UInt64 store_id)
{
auto temp_dir = getTemporaryPath() + "/";
auto page_storage = db_context->getWriteNodePageStorage();
Expand Down Expand Up @@ -227,7 +234,7 @@ class DeltaMergeStoreTestFastAddPeer : public DB::base::TiFlashStorageTestBasic

protected:
DeltaMergeStorePtr store;
UInt64 store_id = 100;
UInt64 current_store_id = 100;
UInt64 upload_sequence = 1000;
bool already_initialize_data_store = false;
bool already_initialize_write_ps = false;
Expand All @@ -239,6 +246,8 @@ class DeltaMergeStoreTestFastAddPeer : public DB::base::TiFlashStorageTestBasic
TEST_F(DeltaMergeStoreTestFastAddPeer, SimpleWriteReadAfterRestoreFromCheckPoint)
try
{
UInt64 write_store_id = current_store_id + 1;
resetStoreId(write_store_id);
{
auto table_column_defines = DMTestEnv::getDefaultColumns();

Expand Down Expand Up @@ -289,7 +298,7 @@ try
remote_store->putDMFile(
dm_file,
S3::DMFileOID{
.store_id = store_id,
.store_id = write_store_id,
.table_id = store->physical_table_id,
.file_id = file_id.id,
},
Expand All @@ -300,20 +309,54 @@ try
store->flushCache(*db_context, RowKeyRange::newAll(false, 1), true);
}

dumpCheckpoint();
dumpCheckpoint(write_store_id);

clearData();

verifyRows(RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()), 0);

const auto manifest_key = S3::S3Filename::newCheckpointManifest(store_id, upload_sequence).toFullKey();
const auto manifest_key = S3::S3Filename::newCheckpointManifest(write_store_id, upload_sequence).toFullKey();
auto checkpoint_info = std::make_shared<CheckpointInfo>();
checkpoint_info->remote_store_id = store_id;
checkpoint_info->remote_store_id = write_store_id;
checkpoint_info->region_id = 1000;
checkpoint_info->checkpoint_data_holder = buildParsedCheckpointData(*db_context, manifest_key, /*dir_seq*/ 100);
checkpoint_info->temp_ps = checkpoint_info->checkpoint_data_holder->getUniversalPageStorage();
resetStoreId(current_store_id);
{
auto table_column_defines = DMTestEnv::getDefaultColumns();

store = reload(table_column_defines);
}
store->ingestSegmentsFromCheckpointInfo(*db_context, db_context->getSettingsRef(), RowKeyRange::newAll(false, 1), checkpoint_info);

// check data file lock exists
{
const auto data_key = S3::S3Filename::newCheckpointData(write_store_id, upload_sequence, 0).toFullKey();
const auto data_key_view = S3::S3FilenameView::fromKey(data_key);
const auto lock_prefix = data_key_view.getLockPrefix();
auto client = S3::ClientFactory::instance().sharedTiFlashClient();
std::set<String> lock_keys;
S3::listPrefix(*client, lock_prefix, [&](const Aws::S3::Model::Object & object) {
const auto & lock_key = object.GetKey();
// also store the object.GetLastModified() for removing
// outdated manifest objects
lock_keys.emplace(lock_key);
return DB::S3::PageResult{.num_keys = 1, .more = true};
});
// 2 lock files, 1 from write store, 1 from current store
ASSERT_EQ(lock_keys.size(), 2);
bool current_store_lock_exist = false;
for (const auto & lock_key : lock_keys)
{
auto lock_key_view = S3::S3FilenameView::fromKey(lock_key);
ASSERT_TRUE(lock_key_view.isLockFile());
auto lock_info = lock_key_view.getLockInfo();
if (lock_info.store_id == current_store_id)
current_store_lock_exist = true;
}
ASSERT_TRUE(current_store_lock_exist);
}

verifyRows(RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()), num_rows_write / 2 + 2 * num_rows_write);

reload();
Expand Down Expand Up @@ -373,15 +416,16 @@ try
}
store->mergeDeltaAll(*db_context);

dumpCheckpoint();
UInt64 write_store_id = current_store_id + 1;
dumpCheckpoint(write_store_id);

clearData();

verifyRows(RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()), 0);

const auto manifest_key = S3::S3Filename::newCheckpointManifest(store_id, upload_sequence).toFullKey();
const auto manifest_key = S3::S3Filename::newCheckpointManifest(write_store_id, upload_sequence).toFullKey();
auto checkpoint_info = std::make_shared<CheckpointInfo>();
checkpoint_info->remote_store_id = store_id;
checkpoint_info->remote_store_id = write_store_id;
checkpoint_info->region_id = 1000;
checkpoint_info->checkpoint_data_holder = buildParsedCheckpointData(*db_context, manifest_key, /*dir_seq*/ 100);
checkpoint_info->temp_ps = checkpoint_info->checkpoint_data_holder->getUniversalPageStorage();
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Storages/Page/V3/BlobStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ BlobStore<Trait>::handleLargeWrite(typename Trait::WriteBatch & wb, const WriteL
{
PageEntryV3 entry;
entry.file_id = INVALID_BLOBFILE_ID;
entry.size = write.size;
entry.tag = write.tag;
entry.checkpoint_info = OptionalCheckpointInfo{
.data_location = *write.data_location,
Expand Down Expand Up @@ -303,6 +304,7 @@ BlobStore<Trait>::write(typename Trait::WriteBatch && wb, const WriteLimiterPtr
{
PageEntryV3 entry;
entry.file_id = INVALID_BLOBFILE_ID;
entry.size = write.size;
entry.tag = write.tag;
entry.checkpoint_info = OptionalCheckpointInfo{
.data_location = *write.data_location,
Expand Down Expand Up @@ -438,6 +440,7 @@ BlobStore<Trait>::write(typename Trait::WriteBatch && wb, const WriteLimiterPtr
{
PageEntryV3 entry;
entry.file_id = INVALID_BLOBFILE_ID;
entry.size = write.size;
entry.tag = write.tag;
entry.checkpoint_info = OptionalCheckpointInfo{
.data_location = *write.data_location,
Expand Down Expand Up @@ -518,6 +521,11 @@ void BlobStore<Trait>::remove(const PageEntries & del_entries)
std::set<BlobFileId> blob_updated;
for (const auto & entry : del_entries)
{
if (entry.file_id == INVALID_BLOBFILE_ID)
{
RUNTIME_CHECK(entry.checkpoint_info.has_value() && entry.checkpoint_info.is_local_data_reclaimed);
continue;
}
blob_updated.insert(entry.file_id);
// External page size is 0
if (entry.size == 0)
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,11 @@ CPDataWriteStats CPFilesWriter::writeEditsAndApplyCheckpointInfo(
page.data.begin(),
page.data.size());
RUNTIME_CHECK(page.data.size() == rec_edit.entry.size, page.data.size(), rec_edit.entry.size);
bool is_local_data_reclaimed = rec_edit.entry.checkpoint_info.has_value() && rec_edit.entry.checkpoint_info.is_local_data_reclaimed;
rec_edit.entry.checkpoint_info = OptionalCheckpointInfo{
.data_location = data_location,
.is_valid = true,
.is_local_data_reclaimed = false,
.is_local_data_reclaimed = is_local_data_reclaimed,
};
locked_files.emplace(*data_location.data_file_id);
if (is_rewrite)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,14 @@ namespace DB::PS::V3

Page CPWriteDataSourceBlobStore::read(const BlobStore<universal::BlobStoreTrait>::PageIdAndEntry & page_id_and_entry)
{
return blob_store.read(page_id_and_entry);
if (page_id_and_entry.second.checkpoint_info.has_value() && page_id_and_entry.second.checkpoint_info.is_local_data_reclaimed)
{
return remote_reader->read(page_id_and_entry);
}
else
{
return blob_store.read(page_id_and_entry);
}
}

Page CPWriteDataSourceFixture::read(const BlobStore<universal::BlobStoreTrait>::PageIdAndEntry & id_and_entry)
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/Page/V3/CheckpointFile/CPWriteDataSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#pragma once

#include <Storages/Page/V3/BlobStore.h>
#include <Storages/Page/V3/Universal/S3PageReader.h>

namespace DB::PS::V3
{
Expand Down Expand Up @@ -45,6 +46,7 @@ class CPWriteDataSourceBlobStore : public CPWriteDataSource
*/
explicit CPWriteDataSourceBlobStore(BlobStore<universal::BlobStoreTrait> & blob_store_)
: blob_store(blob_store_)
, remote_reader(std::make_unique<S3PageReader>())
{}

static CPWriteDataSourcePtr create(BlobStore<universal::BlobStoreTrait> & blob_store_)
Expand All @@ -56,6 +58,7 @@ class CPWriteDataSourceBlobStore : public CPWriteDataSource

private:
BlobStore<universal::BlobStoreTrait> & blob_store;
S3PageReaderPtr remote_reader;
};

/**
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/Page/V3/PageEntry.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ struct fmt::formatter<DB::PS::V3::PageEntryV3>
fb.fmtAppend("{}", offset_checksum.first);
},
",");
return format_to(ctx.out(), "PageEntry{{file: {}, offset: 0x{:X}, size: {}, checksum: 0x{:X}, tag: {}, field_offsets: [{}]}}", entry.file_id, entry.offset, entry.size, entry.checksum, entry.tag, fmt_buf.toString());

return format_to(ctx.out(), "PageEntry{{file: {}, offset: 0x{:X}, size: {}, checksum: 0x{:X}, tag: {}, field_offsets: [{}], checkpoint_info: {}}}", entry.file_id, entry.offset, entry.size, entry.checksum, entry.tag, fmt_buf.toString(), entry.checkpoint_info.toDebugString());
}
};
18 changes: 18 additions & 0 deletions dbms/src/Storages/Page/V3/PageEntryCheckpointInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <Storages/Page/V3/CheckpointFile/Proto/manifest_file.pb.h>
#include <common/defines.h>
#include <fmt/format.h>

namespace DB::PS::V3
{
Expand Down Expand Up @@ -45,6 +46,11 @@ struct CheckpointLocation
static CheckpointLocation fromProto(
const CheckpointProto::EntryDataLocation & proto_rec,
CheckpointProto::StringsInternMap & strings_map);

std::string toDebugString() const
{
return fmt::format("{{data_file_id: {}, offset_in_file: {}, size_in_file: {}}}", *data_file_id, offset_in_file, size_in_file);
}
};

// A more memory compact struct compared to std::optional<CheckpointInfo>
Expand All @@ -65,6 +71,18 @@ struct OptionalCheckpointInfo
*/
bool is_local_data_reclaimed = false;

std::string toDebugString() const
{
if (is_valid)
{
return fmt::format("{{local_data_reclaimed: {}, data_location: {}}}", is_local_data_reclaimed, data_location.toDebugString());
}
else
{
return "invalid";
}
}

public:
ALWAYS_INLINE bool has_value() const { return is_valid; } // NOLINT(readability-identifier-naming)
};
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/Page/V3/Universal/RaftDataReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ void RaftDataReader::traverse(const UniversalPageId & start, const UniversalPage
}
}

void RaftDataReader::traverseRemoteRaftLogForRegion(UInt64 region_id, const std::function<void(const UniversalPageId & page_id, const PS::V3::CheckpointLocation & location)> & acceptor)
void RaftDataReader::traverseRemoteRaftLogForRegion(UInt64 region_id, const std::function<void(const UniversalPageId & page_id, PageSize size, const PS::V3::CheckpointLocation & location)> & acceptor)
{
auto start = UniversalPageIdFormat::toFullRaftLogPrefix(region_id);
auto end = UniversalPageIdFormat::toFullRaftLogScanEnd(region_id);
Expand All @@ -62,7 +62,8 @@ void RaftDataReader::traverseRemoteRaftLogForRegion(UInt64 region_id, const std:
RUNTIME_CHECK(page_id.size() == 20, page_id.size());
auto maybe_location = uni_ps.getCheckpointLocation(page_id, snapshot);
RUNTIME_CHECK(maybe_location.has_value());
acceptor(page_id, *maybe_location);
auto entry = uni_ps.getEntry(page_id, snapshot);
acceptor(page_id, entry.size, *maybe_location);
}
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/Universal/RaftDataReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class RaftDataReader final
void traverse(const UniversalPageId & start, const UniversalPageId & end, const std::function<void(const UniversalPageId & page_id, DB::Page page)> & acceptor);

// Only used to get raft log data from remote checkpoint data
void traverseRemoteRaftLogForRegion(UInt64 region_id, const std::function<void(const UniversalPageId & page_id, const PS::V3::CheckpointLocation & location)> & acceptor);
void traverseRemoteRaftLogForRegion(UInt64 region_id, const std::function<void(const UniversalPageId & page_id, PageSize size, const PS::V3::CheckpointLocation & location)> & acceptor);

// return the first id not less than `page_id`
std::optional<UniversalPageId> getLowerBound(const UniversalPageId & page_id);
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/Page/V3/Universal/S3PageReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ Page S3PageReader::read(const UniversalPageIdAndEntry & page_id_and_entry)

buf.seek(location.offset_in_file, SEEK_SET);
auto buf_size = location.size_in_file;
RUNTIME_CHECK(buf_size != 0, page_id_and_entry);
char * data_buf = static_cast<char *>(alloc(buf_size));
MemHolder mem_holder = createMemHolder(data_buf, [&, buf_size](char * p) {
free(p, buf_size);
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Storages/Page/V3/Universal/UniversalWriteBatchImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ class UniversalWriteBatch : private boost::noncopyable
putPage(UniversalPageIdFormat::toFullPageId(prefix, page_id), tag, read_buffer, size, data_sizes);
}

void putRemotePage(PageIdU64 page_id, UInt64 tag, const PS::V3::CheckpointLocation & data_location, PageFieldOffsetChecksums && offset_and_checksums)
void putRemotePage(PageIdU64 page_id, UInt64 tag, PageSize size, const PS::V3::CheckpointLocation & data_location, PageFieldOffsetChecksums && offset_and_checksums)
{
putRemotePage(UniversalPageIdFormat::toFullPageId(prefix, page_id), tag, data_location, std::move(offset_and_checksums));
putRemotePage(UniversalPageIdFormat::toFullPageId(prefix, page_id), tag, size, data_location, std::move(offset_and_checksums));
}

void putExternal(PageIdU64 page_id, UInt64 tag)
Expand Down Expand Up @@ -114,9 +114,9 @@ class UniversalWriteBatch : private boost::noncopyable
putPage(page_id, tag, buffer_ptr, data.size(), data_sizes);
}

void putRemotePage(const UniversalPageId & page_id, UInt64 tag, const PS::V3::CheckpointLocation & data_location, PageFieldOffsetChecksums && offset_and_checksums)
void putRemotePage(const UniversalPageId & page_id, UInt64 tag, PageSize size, const PS::V3::CheckpointLocation & data_location, PageFieldOffsetChecksums && offset_and_checksums)
{
Write w{WriteBatchWriteType::PUT_REMOTE, page_id, tag, nullptr, /* size */ 0, "", std::move(offset_and_checksums), data_location};
Write w{WriteBatchWriteType::PUT_REMOTE, page_id, tag, nullptr, size, "", std::move(offset_and_checksums), data_location};
writes.emplace_back(std::move(w));
has_writes_from_remote = true;
}
Expand Down
Loading

0 comments on commit 1e415c0

Please sign in to comment.