From 327417ce626a1dffdbc0a849abd230b437442ab4 Mon Sep 17 00:00:00 2001 From: Wenxuan Date: Wed, 29 May 2024 13:58:48 +0800 Subject: [PATCH] DMFile: Support modify DMFile meta (#200) Signed-off-by: Wish --- .../Flash/Disaggregated/MockS3LockClient.h | 5 +- .../src/Flash/Disaggregated/S3LockService.cpp | 5 +- .../tests/gtest_s3_lock_service.cpp | 2 +- dbms/src/Server/DTTool/DTToolInspect.cpp | 8 +- dbms/src/Server/DTTool/DTToolMigrate.cpp | 8 +- dbms/src/Server/tests/gtest_dttool.cpp | 3 +- .../DeltaMerge/ColumnFile/ColumnFileBig.cpp | 7 +- dbms/src/Storages/DeltaMerge/DMContext_fwd.h | 2 +- .../DeltaMerge/Delta/DeltaValueSpace.cpp | 3 +- .../DeltaMerge/DeltaMergeStore_Ingest.cpp | 14 +- .../DeltaMerge/DeltaMergeStore_InternalBg.cpp | 3 +- .../src/Storages/DeltaMerge/File/ColumnStat.h | 6 + dbms/src/Storages/DeltaMerge/File/DMFile.cpp | 14 +- dbms/src/Storages/DeltaMerge/File/DMFile.h | 31 +- .../Storages/DeltaMerge/File/DMFileMeta.cpp | 4 +- .../src/Storages/DeltaMerge/File/DMFileMeta.h | 16 +- .../Storages/DeltaMerge/File/DMFileMetaV2.cpp | 6 +- .../Storages/DeltaMerge/File/DMFileMetaV2.h | 34 +- .../File/DMFileV3IncrementWriter.cpp | 208 ++++++ .../DeltaMerge/File/DMFileV3IncrementWriter.h | 128 ++++ .../File/DMFileV3IncrementWriter_fwd.h | 26 + .../Storages/DeltaMerge/File/DMFileWriter.cpp | 3 +- .../src/Storages/DeltaMerge/File/MergedFile.h | 3 +- .../DeltaMerge/File/dtpb/dmfile.proto | 4 + .../DeltaMerge/Remote/DataStore/DataStore.h | 15 +- .../Remote/DataStore/DataStoreMock.cpp | 8 +- .../Remote/DataStore/DataStoreMock.h | 7 +- .../Remote/DataStore/DataStoreS3.cpp | 66 +- .../DeltaMerge/Remote/DataStore/DataStoreS3.h | 14 +- .../DeltaMerge/Remote/Proto/remote.proto | 1 + .../Storages/DeltaMerge/Remote/Serializer.cpp | 6 +- dbms/src/Storages/DeltaMerge/Segment.cpp | 99 ++- dbms/src/Storages/DeltaMerge/Segment.h | 16 + .../Storages/DeltaMerge/StableValueSpace.cpp | 23 +- ...est_dm_delta_merge_store_fast_add_peer.cpp | 3 +- .../DeltaMerge/tests/gtest_dm_file.cpp | 16 +- .../tests/gtest_dm_meta_version.cpp | 538 ++++++++++++++ .../DeltaMerge/tests/gtest_dm_segment.cpp | 9 +- .../tests/gtest_dm_vector_index.cpp | 8 +- .../gtest_segment_replace_stable_data.cpp | 657 ++++++++++++++++++ .../tests/gtest_segment_test_basic.cpp | 33 +- .../tests/gtest_segment_test_basic.h | 6 + .../V3/Universal/tests/gtest_checkpoint.cpp | 5 +- .../Universal/tests/gtest_lock_local_mgr.cpp | 2 +- dbms/src/Storages/PathPool.h | 4 + dbms/src/Storages/S3/FileCache.cpp | 18 +- dbms/src/Storages/S3/S3Filename.cpp | 2 +- dbms/src/Storages/S3/tests/gtest_s3file.cpp | 2 +- 48 files changed, 1994 insertions(+), 107 deletions(-) create mode 100644 dbms/src/Storages/DeltaMerge/File/DMFileV3IncrementWriter.cpp create mode 100644 dbms/src/Storages/DeltaMerge/File/DMFileV3IncrementWriter.h create mode 100644 dbms/src/Storages/DeltaMerge/File/DMFileV3IncrementWriter_fwd.h create mode 100644 dbms/src/Storages/DeltaMerge/tests/gtest_dm_meta_version.cpp create mode 100644 dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_stable_data.cpp diff --git a/dbms/src/Flash/Disaggregated/MockS3LockClient.h b/dbms/src/Flash/Disaggregated/MockS3LockClient.h index a2f754f6582..b5c89ceb72b 100644 --- a/dbms/src/Flash/Disaggregated/MockS3LockClient.h +++ b/dbms/src/Flash/Disaggregated/MockS3LockClient.h @@ -42,8 +42,9 @@ class MockS3LockClient : public IS3LockClient { // If the data file exist and no delmark exist, then create a lock file on `data_file_key` auto view = S3FilenameView::fromKey(data_file_key); - auto object_key - = view.isDMFile() ? fmt::format("{}/{}", data_file_key, DM::DMFileMetaV2::metaFileName()) : data_file_key; + auto object_key = view.isDMFile() + ? fmt::format("{}/{}", data_file_key, DM::DMFileMetaV2::metaFileName(/* meta_version= */ 0)) + : data_file_key; if (!objectExists(*s3_client, object_key)) { return {false, ""}; diff --git a/dbms/src/Flash/Disaggregated/S3LockService.cpp b/dbms/src/Flash/Disaggregated/S3LockService.cpp index 4034ce1e1d8..948766afc95 100644 --- a/dbms/src/Flash/Disaggregated/S3LockService.cpp +++ b/dbms/src/Flash/Disaggregated/S3LockService.cpp @@ -205,8 +205,9 @@ bool S3LockService::tryAddLockImpl( auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient(); // make sure data file exists - auto object_key - = key_view.isDMFile() ? fmt::format("{}/{}", data_file_key, DM::DMFileMetaV2::metaFileName()) : data_file_key; + auto object_key = key_view.isDMFile() + ? fmt::format("{}/{}", data_file_key, DM::DMFileMetaV2::metaFileName(/* meta_version= */ 0)) + : data_file_key; if (!DB::S3::objectExists(*s3_client, object_key)) { auto * e = response->mutable_result()->mutable_conflict(); diff --git a/dbms/src/Flash/Disaggregated/tests/gtest_s3_lock_service.cpp b/dbms/src/Flash/Disaggregated/tests/gtest_s3_lock_service.cpp index 5d220132be2..9f3bb264ece 100644 --- a/dbms/src/Flash/Disaggregated/tests/gtest_s3_lock_service.cpp +++ b/dbms/src/Flash/Disaggregated/tests/gtest_s3_lock_service.cpp @@ -68,7 +68,7 @@ class S3LockServiceTest : public DB::base::TiFlashStorageTestBasic DMFileOID{.store_id = store_id, .table_id = physical_table_id, .file_id = dm_file_id}); DB::S3::uploadEmptyFile( *s3_client, - fmt::format("{}/{}", data_filename.toFullKey(), DM::DMFileMetaV2::metaFileName())); + fmt::format("{}/{}", data_filename.toFullKey(), DM::DMFileMetaV2::metaFileName(/* meta_version= */ 0))); ++dm_file_id; } } diff --git a/dbms/src/Server/DTTool/DTToolInspect.cpp b/dbms/src/Server/DTTool/DTToolInspect.cpp index 812f4f422c7..4ab155b5c95 100644 --- a/dbms/src/Server/DTTool/DTToolInspect.cpp +++ b/dbms/src/Server/DTTool/DTToolInspect.cpp @@ -46,7 +46,13 @@ int inspectServiceMain(DB::Context & context, const InspectArgs & args) // Open the DMFile at `workdir/dmf_` auto fp = context.getFileProvider(); - auto dmfile = DB::DM::DMFile::restore(fp, args.file_id, 0, args.workdir, DB::DM::DMFileMeta::ReadMode::all()); + auto dmfile = DB::DM::DMFile::restore( + fp, + args.file_id, + 0, + args.workdir, + DB::DM::DMFileMeta::ReadMode::all(), + 0 /* FIXME: Support other meta version */); LOG_INFO(logger, "bytes on disk: {}", dmfile->getBytesOnDisk()); diff --git a/dbms/src/Server/DTTool/DTToolMigrate.cpp b/dbms/src/Server/DTTool/DTToolMigrate.cpp index 5713d56a135..93fe1d9e7fc 100644 --- a/dbms/src/Server/DTTool/DTToolMigrate.cpp +++ b/dbms/src/Server/DTTool/DTToolMigrate.cpp @@ -40,7 +40,7 @@ bool isRecognizable(const DB::DM::DMFile & file, const std::string & target) { return DB::DM::DMFileMeta::metaFileName() == target || DB::DM::DMFileMeta::configurationFileName() == target || DB::DM::DMFileMeta::packPropertyFileName() == target || needFrameMigration(file, target) - || isIgnoredInMigration(file, target) || DB::DM::DMFileMetaV2::metaFileName() == target; + || isIgnoredInMigration(file, target) || DB::DM::DMFileMetaV2::isMetaFileName(target); } namespace bpo = boost::program_options; @@ -193,7 +193,8 @@ int migrateServiceMain(DB::Context & context, const MigrateArgs & args) args.file_id, 0, args.workdir, - DB::DM::DMFileMeta::ReadMode::all()); + DB::DM::DMFileMeta::ReadMode::all(), + 0 /* FIXME: Support other meta version */); auto source_version = 0; if (src_file->useMetaV2()) { @@ -270,7 +271,8 @@ int migrateServiceMain(DB::Context & context, const MigrateArgs & args) args.file_id, 1, keeper.migration_temp_dir.path(), - DB::DM::DMFileMeta::ReadMode::all()); + DB::DM::DMFileMeta::ReadMode::all(), + 0 /* FIXME: Support other meta version */); } } LOG_INFO(logger, "migration finished"); diff --git a/dbms/src/Server/tests/gtest_dttool.cpp b/dbms/src/Server/tests/gtest_dttool.cpp index 23442999036..4fd78844f6b 100644 --- a/dbms/src/Server/tests/gtest_dttool.cpp +++ b/dbms/src/Server/tests/gtest_dttool.cpp @@ -319,7 +319,8 @@ TEST_F(DTToolTest, BlockwiseInvariant) 1, 0, getTemporaryPath(), - DB::DM::DMFileMeta::ReadMode::all()); + DB::DM::DMFileMeta::ReadMode::all(), + /* meta_version= */ 0); if (version == 2) { EXPECT_EQ(refreshed_file->getConfiguration()->getChecksumFrameLength(), frame_size); diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp index cef4729d980..3dae4f74d4c 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp @@ -110,7 +110,7 @@ ColumnFilePersistedPtr ColumnFileBig::deserializeMetadata( const auto & lock_key_view = S3::S3FilenameView::fromKey(*(remote_data_location->data_file_id)); auto file_oid = lock_key_view.asDataFile().getDMFileOID(); auto prepared = remote_data_store->prepareDMFile(file_oid, file_page_id); - dmfile = prepared->restore(DMFileMeta::ReadMode::all()); + dmfile = prepared->restore(DMFileMeta::ReadMode::all(), 0 /* FIXME: Support other meta version */); // gc only begin to run after restore so we can safely call addRemoteDTFileIfNotExists here path_delegate.addRemoteDTFileIfNotExists(local_external_id, dmfile->getBytesOnDisk()); } @@ -124,7 +124,8 @@ ColumnFilePersistedPtr ColumnFileBig::deserializeMetadata( file_page_id, file_parent_path, DMFileMeta::ReadMode::all(), - dm_context.keyspace_id); + dm_context.keyspace_id, + 0 /* FIXME: Support other meta version */); auto res = path_delegate.updateDTFileSize(file_id, dmfile->getBytesOnDisk()); RUNTIME_CHECK_MSG(res, "update dt file size failed, path={}", dmfile->path()); } @@ -165,7 +166,7 @@ ColumnFilePersistedPtr ColumnFileBig::createFromCheckpoint( wbs.data.putRemoteExternal(new_local_page_id, loc); auto remote_data_store = dm_context.global_context.getSharedContextDisagg()->remote_data_store; auto prepared = remote_data_store->prepareDMFile(file_oid, new_local_page_id); - auto dmfile = prepared->restore(DMFileMeta::ReadMode::all()); + auto dmfile = prepared->restore(DMFileMeta::ReadMode::all(), 0 /* FIXME: Support other meta version */); wbs.writeLogAndData(); // new_local_page_id is already applied to PageDirectory so we can safely call addRemoteDTFileIfNotExists here delegator.addRemoteDTFileIfNotExists(new_local_page_id, dmfile->getBytesOnDisk()); diff --git a/dbms/src/Storages/DeltaMerge/DMContext_fwd.h b/dbms/src/Storages/DeltaMerge/DMContext_fwd.h index 2a8ebce59c8..5d1ae9c744f 100644 --- a/dbms/src/Storages/DeltaMerge/DMContext_fwd.h +++ b/dbms/src/Storages/DeltaMerge/DMContext_fwd.h @@ -1,4 +1,4 @@ -// Copyright 2023 PingCAP, Inc. +// Copyright 2024 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp index 3bdc19511cd..c9193c3d790 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp @@ -182,7 +182,8 @@ std::vector CloneColumnFilesHelper::clone( /* page_id= */ new_page_id, file_parent_path, DMFileMeta::ReadMode::all(), - dm_context.keyspace_id); + dm_context.keyspace_id, + old_dmfile->metaVersion()); auto new_column_file = f->cloneWith(dm_context, new_file, target_range); cloned.push_back(new_column_file); diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp index 39a70981226..e2459fbf441 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp @@ -122,7 +122,8 @@ void DeltaMergeStore::cleanPreIngestFiles( f.id, file_parent_path, DM::DMFileMeta::ReadMode::memoryAndDiskSize(), - keyspace_id); + keyspace_id, + 0 /* a meta version that must exists */); removePreIngestFile(f.id, false); file->remove(file_provider); } @@ -189,7 +190,8 @@ Segments DeltaMergeStore::ingestDTFilesUsingColumnFile( page_id, file_parent_path, DMFileMeta::ReadMode::all(), - keyspace_id); + keyspace_id, + file->metaVersion()); data_files.emplace_back(std::move(ref_file)); wbs.data.putRefPage(page_id, file->pageId()); } @@ -472,7 +474,8 @@ bool DeltaMergeStore::ingestDTFileIntoSegmentUsingSplit( new_page_id, file->parentPath(), DMFileMeta::ReadMode::all(), - keyspace_id); + keyspace_id, + file->metaVersion()); wbs.data.putRefPage(new_page_id, file->pageId()); // We have to commit those file_ids to PageStorage before applying the ingest, because after the write @@ -661,7 +664,8 @@ UInt64 DeltaMergeStore::ingestFiles( external_file.id, file_parent_path, DMFileMeta::ReadMode::memoryAndDiskSize(), - keyspace_id); + keyspace_id, + 0 /* FIXME: Support other meta version */); } else { @@ -671,7 +675,7 @@ UInt64 DeltaMergeStore::ingestFiles( .table_id = dm_context->physical_table_id, .file_id = external_file.id}; file = remote_data_store->prepareDMFile(oid, external_file.id) - ->restore(DMFileMeta::ReadMode::memoryAndDiskSize()); + ->restore(DMFileMeta::ReadMode::memoryAndDiskSize(), 0 /* FIXME: Support other meta version */); } rows += file->getRows(); bytes += file->getBytes(); diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp index c225187bab3..bd7868fe9e5 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp @@ -136,7 +136,8 @@ class LocalDMFileGcRemover final /* page_id= */ 0, path, DMFileMeta::ReadMode::none(), - path_pool->getKeyspaceID()); + path_pool->getKeyspaceID(), + 0 /* a meta version that must exist */); if (unlikely(!dmfile)) { // If the dtfile directory is not exist, it means `StoragePathPool::drop` have been diff --git a/dbms/src/Storages/DeltaMerge/File/ColumnStat.h b/dbms/src/Storages/DeltaMerge/File/ColumnStat.h index f23b743ec77..8097e59892a 100644 --- a/dbms/src/Storages/DeltaMerge/File/ColumnStat.h +++ b/dbms/src/Storages/DeltaMerge/File/ColumnStat.h @@ -43,6 +43,8 @@ struct ColumnStat std::optional vector_index = std::nullopt; + String additional_data_for_test{}; + dtpb::ColumnStat toProto() const { dtpb::ColumnStat stat; @@ -61,6 +63,8 @@ struct ColumnStat if (vector_index.has_value()) stat.mutable_vector_index()->CopyFrom(vector_index.value()); + stat.set_additional_data_for_test(additional_data_for_test); + return stat; } @@ -80,6 +84,8 @@ struct ColumnStat if (proto.has_vector_index()) vector_index = proto.vector_index(); + + additional_data_for_test = proto.additional_data_for_test(); } // @deprecated. New fields should be added via protobuf. Use `toProto` instead diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp index 9f23c69bcff..abcb9c521b3 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp @@ -113,7 +113,8 @@ DMFilePtr DMFile::restore( UInt64 page_id, const String & parent_path, const DMFileMeta::ReadMode & read_meta_mode, - KeyspaceID keyspace_id) + KeyspaceID keyspace_id, + UInt32 meta_version) { auto is_s3_file = S3::S3FilenameView::fromKeyWithPrefix(parent_path).isDataFile(); if (!is_s3_file) @@ -137,8 +138,12 @@ DMFilePtr DMFile::restore( /*configuration_*/ std::nullopt, /*version_*/ STORAGE_FORMAT_CURRENT.dm_file, /*keyspace_id_*/ keyspace_id)); - if (is_s3_file || Poco::File(dmfile->metav2Path()).exists()) + if (is_s3_file || Poco::File(dmfile->metav2Path(/* meta_version= */ 0)).exists()) { + // Always use meta_version=0 when checking whether we should treat it as metav2. + // However, when reading actual meta data, we will read according to specified + // meta version. + dmfile->meta = std::make_unique( file_id, parent_path, @@ -147,11 +152,14 @@ DMFilePtr DMFile::restore( 16 * 1024 * 1024, keyspace_id, std::nullopt, - STORAGE_FORMAT_CURRENT.dm_file); + STORAGE_FORMAT_CURRENT.dm_file, + meta_version); dmfile->meta->read(file_provider, read_meta_mode); } else if (!read_meta_mode.isNone()) { + RUNTIME_CHECK_MSG(meta_version == 0, "Only support meta_version=0 for MetaV2, meta_version={}", meta_version); + dmfile->meta = std::make_unique( file_id, parent_path, diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.h b/dbms/src/Storages/DeltaMerge/File/DMFile.h index 0747e5316f7..825f86d4c0b 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.h @@ -17,7 +17,9 @@ #include #include #include +#include #include +#include #include #include #include @@ -61,7 +63,8 @@ class DMFile : private boost::noncopyable UInt64 page_id, const String & parent_path, const DMFileMeta::ReadMode & read_meta_mode, - KeyspaceID keyspace_id = NullspaceID); + KeyspaceID keyspace_id = NullspaceID, + UInt32 meta_version = 0); struct ListOptions { @@ -89,7 +92,7 @@ class DMFile : private boost::noncopyable // keyspaceID KeyspaceID keyspaceId() const { return meta->keyspace_id; } - DMFileFormat::Version version() const { return meta->version; } + DMFileFormat::Version version() const { return meta->format_version; } String path() const; @@ -128,7 +131,7 @@ class DMFile : private boost::noncopyable const std::unordered_set & getColumnIndices() const { return meta->column_indices; } // only used in gtest - void clearPackProperties() { meta->pack_properties.clear_property(); } + void clearPackProperties() const { meta->pack_properties.clear_property(); } const ColumnStat & getColumnStat(ColId col_id) const { @@ -158,7 +161,7 @@ class DMFile : private boost::noncopyable * Note that only the column id and type is valid. * @return All columns */ - ColumnDefines getColumnDefines(bool sort_by_id = true) + ColumnDefines getColumnDefines(bool sort_by_id = true) const { ColumnDefines results{}; results.reserve(this->meta->column_stats.size()); @@ -173,10 +176,12 @@ class DMFile : private boost::noncopyable return results; } - bool useMetaV2() const { return meta->version == DMFileFormat::V3; } + bool useMetaV2() const { return meta->format_version == DMFileFormat::V3; } std::vector listFilesForUpload() const; void switchToRemote(const S3::DMFileOID & oid); + UInt32 metaVersion() const { return meta->metaVersion(); } + private: DMFile( UInt64 file_id_, @@ -201,7 +206,8 @@ class DMFile : private boost::noncopyable merged_file_max_size_, keyspace_id_, configuration_, - version_); + version_, + /* meta_version= */ 0); } else { @@ -218,7 +224,7 @@ class DMFile : private boost::noncopyable // Do not gc me. String ngcPath() const; - String metav2Path() const { return subFilePath(DMFileMetaV2::metaFileName()); } + String metav2Path(UInt32 meta_version) const { return subFilePath(DMFileMetaV2::metaFileName(meta_version)); } UInt64 getReadFileSize(ColId col_id, const String & filename) const { return meta->getReadFileSize(col_id, filename); @@ -270,10 +276,10 @@ class DMFile : private boost::noncopyable return IDataType::getFileNameForStream(DB::toString(col_id), substream); } - void addPack(const DMFileMeta::PackStat & pack_stat) { meta->pack_stats.push_back(pack_stat); } + void addPack(const DMFileMeta::PackStat & pack_stat) const { meta->pack_stats.push_back(pack_stat); } DMFileStatus getStatus() const { return meta->status; } - void setStatus(DMFileStatus status_) { meta->status = status_; } + void setStatus(DMFileStatus status_) const { meta->status = status_; } void finalize(); @@ -283,8 +289,15 @@ class DMFile : private boost::noncopyable const UInt64 page_id; LoggerPtr log; + +#ifndef DBMS_PUBLIC_GTEST +private: +#else +public: +#endif DMFileMetaPtr meta; + friend class DMFileV3IncrementWriter; friend class DMFileWriter; friend class DMFileWriterRemote; friend class DMFileReader; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileMeta.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileMeta.cpp index 717ea21ad18..ec4ca7c0047 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileMeta.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileMeta.cpp @@ -183,12 +183,12 @@ void DMFileMeta::readConfiguration(const FileProviderPtr & file_provider) = openForRead(file_provider, configurationPath(), encryptionConfigurationPath(), DBMS_DEFAULT_BUFFER_SIZE); auto stream = InputStreamWrapper{buf}; configuration.emplace(stream); - version = DMFileFormat::V2; + format_version = DMFileFormat::V2; } else { configuration.reset(); - version = DMFileFormat::V1; + format_version = DMFileFormat::V1; } } diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileMeta.h b/dbms/src/Storages/DeltaMerge/File/DMFileMeta.h index 8aa47d0f779..9a62cdfdc60 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileMeta.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileMeta.h @@ -38,6 +38,7 @@ class DMFileMetaV2Test; class DMFile; class DMFileWriter; +class DMFileV3IncrementWriter; class DMFileMeta { @@ -48,14 +49,14 @@ class DMFileMeta DMFileStatus status_, KeyspaceID keyspace_id_, DMConfigurationOpt configuration_, - DMFileFormat::Version version_) + DMFileFormat::Version format_version_) : file_id(file_id_) , parent_path(parent_path_) , status(status_) , keyspace_id(keyspace_id_) , configuration(configuration_) , log(Logger::get()) - , version(version_) + , format_version(format_version_) {} virtual ~DMFileMeta() = default; @@ -181,6 +182,12 @@ class DMFileMeta const FileProviderPtr & file_provider, const WriteLimiterPtr & write_limiter); virtual String metaPath() const { return subFilePath(metaFileName()); } + virtual UInt32 metaVersion() const { return 0; } + /** + * @brief metaVersion += 1. Returns the new meta version. + * This is only supported in MetaV2. + */ + virtual UInt32 bumpMetaVersion() { RUNTIME_CHECK_MSG(false, "MetaV1 cannot bump meta version"); } virtual EncryptionPath encryptionMetaPath() const; virtual UInt64 getReadFileSize(ColId col_id, const String & filename) const; @@ -196,8 +203,8 @@ class DMFileMeta const KeyspaceID keyspace_id; DMConfigurationOpt configuration; // configuration - LoggerPtr log; - DMFileFormat::Version version; + const LoggerPtr log; + DMFileFormat::Version format_version; protected: static FileNameBase getFileNameBase(ColId col_id, const IDataType::SubstreamPath & substream = {}) @@ -244,6 +251,7 @@ class DMFileMeta friend class DMFile; friend class DMFileWriter; + friend class DMFileV3IncrementWriter; }; using DMFileMetaPtr = std::unique_ptr; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.cpp index fd6edea7ec6..e37080ce4c4 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.cpp @@ -26,7 +26,7 @@ namespace DB::DM EncryptionPath DMFileMetaV2::encryptionMetaPath() const { - return EncryptionPath(encryptionBasePath(), metaFileName(), keyspace_id); + return EncryptionPath(encryptionBasePath(), metaFileName(meta_version), keyspace_id); } EncryptionPath DMFileMetaV2::encryptionMergedPath(UInt32 number) const @@ -67,7 +67,7 @@ void DMFileMetaV2::parse(std::string_view buffer) } ptr = ptr - sizeof(DMFileFormat::Version); - version = *(reinterpret_cast(ptr)); + format_version = *(reinterpret_cast(ptr)); ptr = ptr - sizeof(UInt64); auto meta_block_handle_count = *(reinterpret_cast(ptr)); @@ -185,7 +185,7 @@ void DMFileMetaV2::finalize( }; writePODBinary(meta_block_handles, tmp_buffer); writeIntBinary(static_cast(meta_block_handles.size()), tmp_buffer); - writeIntBinary(version, tmp_buffer); + writeIntBinary(format_version, tmp_buffer); // Write to file and do checksums. auto s = tmp_buffer.releaseStr(); diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.h b/dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.h index 1a5e6e9cdb7..a70bde4c77f 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.h @@ -30,12 +30,14 @@ class DMFileMetaV2 : public DMFileMeta UInt64 merged_file_max_size_, KeyspaceID keyspace_id_, DMConfigurationOpt configuration_, - DMFileFormat::Version version_) - : DMFileMeta(file_id_, parent_path_, status_, keyspace_id_, configuration_, version_) + DMFileFormat::Version format_version_, + UInt32 meta_version_) + : DMFileMeta(file_id_, parent_path_, status_, keyspace_id_, configuration_, format_version_) , small_file_size_threshold(small_file_size_threshold_) , merged_file_max_size(merged_file_max_size_) + , meta_version(meta_version_) { - RUNTIME_CHECK(version_ == DMFileFormat::V3); + RUNTIME_CHECK(format_version_ == DMFileFormat::V3); } ~DMFileMetaV2() override = default; @@ -78,16 +80,38 @@ class DMFileMetaV2 : public DMFileMeta void finalize(WriteBuffer & buffer, const FileProviderPtr & file_provider, const WriteLimiterPtr & write_limiter) override; void read(const FileProviderPtr & file_provider, const DMFileMeta::ReadMode & read_meta_mode) override; - static String metaFileName() { return "meta"; } - String metaPath() const override { return subFilePath(metaFileName()); } + static String metaFileName(UInt32 meta_version) + { + if (meta_version == 0) + return "meta"; + else + return fmt::format("v{}.meta", meta_version); + } + + static bool isMetaFileName(std::string_view file_name) + { + return file_name == "meta" || (file_name.starts_with("v") && file_name.ends_with(".meta")); + } + + // Note: metaPath is different when meta_version is changed. + String metaPath() const override { return subFilePath(metaFileName(meta_version)); } + EncryptionPath encryptionMetaPath() const override; UInt64 getReadFileSize(ColId col_id, const String & filename) const override; EncryptionPath encryptionMergedPath(UInt32 number) const; static String mergedFilename(UInt32 number) { return fmt::format("{}.merged", number); } + UInt32 metaVersion() const override { return meta_version; } + UInt32 bumpMetaVersion() override + { + ++meta_version; + return meta_version; + } + UInt64 small_file_size_threshold; UInt64 merged_file_max_size; + UInt32 meta_version = 0; // Note: meta_version affects the output file name. private: UInt64 getMergedFileSizeOfColumn(const MergedSubFileInfo & file_info) const; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileV3IncrementWriter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileV3IncrementWriter.cpp new file mode 100644 index 00000000000..c498a0708da --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/File/DMFileV3IncrementWriter.cpp @@ -0,0 +1,208 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB::DM +{ + +DMFileV3IncrementWriter::DMFileV3IncrementWriter(const Options & options_) + : logger(Logger::get()) + , options(options_) + , dmfile_initial_meta_ver(options.dm_file->metaVersion()) +{ + RUNTIME_CHECK(options.dm_file != nullptr); + RUNTIME_CHECK(options.file_provider != nullptr); + RUNTIME_CHECK(options.path_pool != nullptr); + + // Should never be called from a Compute Node. + + RUNTIME_CHECK(options.dm_file->meta->format_version == DMFileFormat::V3, options.dm_file->meta->format_version); + RUNTIME_CHECK(options.dm_file->meta->status == DMFileStatus::READABLE); + + auto dmfile_path = options.dm_file->path(); + auto dmfile_path_s3_view = S3::S3FilenameView::fromKeyWithPrefix(dmfile_path); + is_s3_dmfile = dmfile_path_s3_view.isDataFile(); + if (is_s3_dmfile) + { + // When giving a remote DMFile, we expect to have a remoteDataStore + // so that our modifications can be uploaded to remote as well. + RUNTIME_CHECK(options.disagg_ctx && options.disagg_ctx->remote_data_store); + dmfile_oid = dmfile_path_s3_view.getDMFileOID(); + } + + if (is_s3_dmfile) + { + auto delegator = options.path_pool->getStableDiskDelegator(); + auto store_path = delegator.choosePath(); + local_path = getPathByStatus(store_path, options.dm_file->fileId(), DMFileStatus::READABLE); + + auto dmfile_directory = Poco::File(local_path); + dmfile_directory.createDirectories(); + } + else + { + local_path = options.dm_file->path(); + } +} + +void DMFileV3IncrementWriter::include(const String & file_name) +{ + RUNTIME_CHECK(!is_finalized); + + auto file_path = local_path + "/" + file_name; + auto file = Poco::File(file_path); + RUNTIME_CHECK(file.exists(), file_path); + RUNTIME_CHECK(file.isFile(), file_path); + + included_file_names.emplace(file_name); +} + +void DMFileV3IncrementWriter::finalize() +{ + // DMFileV3IncrementWriter must be created before making change to DMFile, otherwise + // a directory may not be correctly prepared. Thus, we could safely assert that + // DMFile meta version is bumped. + RUNTIME_CHECK_MSG( + options.dm_file->metaVersion() != dmfile_initial_meta_ver, + "Attempt to write with the same meta version when DMFileV3IncrementWriter is created, meta_version={}", + dmfile_initial_meta_ver); + RUNTIME_CHECK_MSG( + options.dm_file->metaVersion() > dmfile_initial_meta_ver, + "Discovered meta version rollback, old_meta_version={} new_meta_version={}", + dmfile_initial_meta_ver, + options.dm_file->metaVersion()); + + RUNTIME_CHECK(!is_finalized); + + writeAndIncludeMetaFile(); + + LOG_DEBUG( + logger, + "Write incremental update for DMFile, local_path={} dmfile_path={} old_meta_version={} new_meta_version={}", + local_path, + options.dm_file->path(), + dmfile_initial_meta_ver, + options.dm_file->metaVersion()); + + if (is_s3_dmfile) + { + uploadIncludedFiles(); + removeIncludedFiles(); + } + else + { + // If this is a local DMFile, so be it. + // The new meta and files are visible from now. + } + + is_finalized = true; +} + +void DMFileV3IncrementWriter::abandonEverything() +{ + if (is_finalized) + return; + + LOG_DEBUG(logger, "Abandon increment write, local_path={} file_names={}", local_path, included_file_names); + + // TODO: Clean up included files? + + is_finalized = true; +} + +DMFileV3IncrementWriter::~DMFileV3IncrementWriter() +{ + if (!is_finalized) + abandonEverything(); +} + +void DMFileV3IncrementWriter::writeAndIncludeMetaFile() +{ + // We don't check whether new_meta_version file exists. + // Because it may be a broken file left behind by previous failed writes. + + auto meta_file_name = DMFileMetaV2::metaFileName(options.dm_file->metaVersion()); + auto meta_file_path = local_path + "/" + meta_file_name; + // We first write to a temporary file, then rename it to the final name + // to ensure file's integrity. + auto meta_file_path_for_write = meta_file_path + ".tmp"; + + // Just a protection. We don't allow overwriting meta file. + { + auto existing_file = Poco::File(meta_file_path); + RUNTIME_CHECK_MSG( // + !existing_file.exists(), + "Meta file already exists, file={}", + meta_file_path); + } + + auto meta_file = WriteBufferFromWritableFileBuilder::buildPtr( + options.file_provider, + meta_file_path_for_write, // Must not use meta->metaPath(), because DMFile may be a S3 DMFile + EncryptionPath(local_path, meta_file_name), + /*create_new_encryption_info*/ true, + options.write_limiter, + DMFileMetaV2::meta_buffer_size); + + options.dm_file->meta->finalize(*meta_file, options.file_provider, options.write_limiter); + meta_file->sync(); + meta_file.reset(); + + Poco::File(meta_file_path_for_write).renameTo(meta_file_path); + + include(meta_file_name); +} + +void DMFileV3IncrementWriter::uploadIncludedFiles() +{ + if (included_file_names.empty()) + return; + + auto data_store = options.disagg_ctx->remote_data_store; + RUNTIME_CHECK(data_store != nullptr); + + std::vector file_names(included_file_names.begin(), included_file_names.end()); + data_store->putDMFileLocalFiles(local_path, file_names, dmfile_oid); +} + +void DMFileV3IncrementWriter::removeIncludedFiles() +{ + if (included_file_names.empty()) + return; + + for (const auto & file_name : included_file_names) + { + auto file_path = local_path + "/" + file_name; + auto file = Poco::File(file_path); + RUNTIME_CHECK(file.exists(), file_path); + file.remove(); + } + + included_file_names.clear(); + + // TODO: No need to remove from file_provider? + // TODO: Don't remove encryption info? +} + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileV3IncrementWriter.h b/dbms/src/Storages/DeltaMerge/File/DMFileV3IncrementWriter.h new file mode 100644 index 00000000000..e159095d35a --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/File/DMFileV3IncrementWriter.h @@ -0,0 +1,128 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace DB +{ +class WriteLimiter; +using WriteLimiterPtr = std::shared_ptr; + +class StoragePathPool; +using StoragePathPoolPtr = std::shared_ptr; +} // namespace DB + +namespace DB::DM +{ + +class DMFile; +using DMFilePtr = std::shared_ptr; + +} // namespace DB::DM + +namespace DB::DM +{ + +class DMFileV3IncrementWriter +{ +public: + struct Options + { + const DMFilePtr dm_file; + + const FileProviderPtr file_provider; + const WriteLimiterPtr write_limiter; + const StoragePathPoolPtr path_pool; + const SharedContextDisaggPtr disagg_ctx; + }; + + /** + * @brief Create a new DMFileV3IncrementWriter for writing new parts for a DMFile. + * + * @param options.dm_file Support both remote or local DMFile. When DMFile is remote, + * a local directory will be re-prepared for holding these new incremental files. + * + * Throws if DMFile is not FormatV3, since other Format Versions cannot update incrementally. + * Throws if DMFile is not readable. Otherwise (e.g. status=WRITING) DMFile metadata + * may be changed by others at any time. + */ + explicit DMFileV3IncrementWriter(const Options & options); + + static DMFileV3IncrementWriterPtr create(const Options & options) + { + return std::make_unique(options); + } + + ~DMFileV3IncrementWriter(); + + /** + * @brief Include a file. The file must be placed in `localPath()`. + * The file will be uploaded to S3 with the meta file all at once + * when `finalize()` is called. + * + * In non-disaggregated mode, this function does not take effect. + */ + void include(const String & file_name); + + /** + * @brief The path of the local directory of the DMFile. + * If DMFile is local, it equals to the dmfile->path(). + * If DMFile is on S3, the local path is a temporary directory for holding new incremental files. + */ + String localPath() const { return local_path; } + + /** + * @brief Persists the current dmfile in-memory meta using the in-memory meta version. + * If this meta version is already persisted before, exception **may** be thrown. + * It is caller's duty to ensure there is no concurrent IncrementWriters for the same dmfile + * to avoid meta version contention. + * + * For a remote DMFile, new meta version file and other files specified via `include()` + * will be uploaded to S3. Local files will be removed after that. + */ + void finalize(); + + void abandonEverything(); + +private: + void writeAndIncludeMetaFile(); + + void uploadIncludedFiles(); + + void removeIncludedFiles(); + +private: + const LoggerPtr logger; + const Options options; + const UInt32 dmfile_initial_meta_ver; + bool is_s3_dmfile = false; + Remote::DMFileOID dmfile_oid; // Valid when is_s3_dmfile == true + String local_path; + + std::unordered_set included_file_names; + + bool is_finalized = false; +}; + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileV3IncrementWriter_fwd.h b/dbms/src/Storages/DeltaMerge/File/DMFileV3IncrementWriter_fwd.h new file mode 100644 index 00000000000..e8a9187dc1f --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/File/DMFileV3IncrementWriter_fwd.h @@ -0,0 +1,26 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace DB::DM +{ + +class DMFileV3IncrementWriter; + +using DMFileV3IncrementWriterPtr = std::unique_ptr; + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp index 9b3c5bc36ab..2b012cb27a3 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp @@ -78,7 +78,7 @@ DMFileWriter::WriteBufferFromFileBasePtr DMFileWriter::createMetaFile() { return WriteBufferFromWritableFileBuilder::buildPtr( file_provider, - dmfile->metav2Path(), + dmfile->meta->metaPath(), dmfile->meta->encryptionMetaPath(), /*create_new_encryption_info*/ true, write_limiter, @@ -126,7 +126,6 @@ void DMFileWriter::addStreams( type->enumerateStreams(callback, {}); } - void DMFileWriter::write(const Block & block, const BlockProperty & block_property) { #ifndef NDEBUG diff --git a/dbms/src/Storages/DeltaMerge/File/MergedFile.h b/dbms/src/Storages/DeltaMerge/File/MergedFile.h index 4c0822b8396..b19ec12eaa5 100644 --- a/dbms/src/Storages/DeltaMerge/File/MergedFile.h +++ b/dbms/src/Storages/DeltaMerge/File/MergedFile.h @@ -21,6 +21,7 @@ namespace DB::DM { + struct MergedSubFileInfo { String fname; // Sub filemame @@ -55,4 +56,4 @@ struct MergedSubFileInfo return info; } }; -} // namespace DB::DM \ No newline at end of file +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/dtpb/dmfile.proto b/dbms/src/Storages/DeltaMerge/File/dtpb/dmfile.proto index 257522f0529..0d66ba45a56 100644 --- a/dbms/src/Storages/DeltaMerge/File/dtpb/dmfile.proto +++ b/dbms/src/Storages/DeltaMerge/File/dtpb/dmfile.proto @@ -65,6 +65,9 @@ message ColumnStat { reserved 101; // used before // TODO(vector-index) Support multiple vector index on the same column optional VectorIndexFileProps vector_index = 102; + + // Only used in tests. Modifying other fields of ColumnStat is hard. + optional string additional_data_for_test = 999; } message ColumnStats { @@ -73,6 +76,7 @@ message ColumnStats { message StableFile { optional uint64 page_id = 1; + optional uint64 meta_version = 2; } message StableLayerMeta { diff --git a/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStore.h b/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStore.h index d081dae7aa4..83f91c36e1e 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStore.h +++ b/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStore.h @@ -32,7 +32,7 @@ class IPreparedDMFileToken : boost::noncopyable /** * Restores into a DMFile object. This token will be kept valid when DMFile is valid. */ - virtual DMFilePtr restore(DMFileMeta::ReadMode read_mode) = 0; + virtual DMFilePtr restore(DMFileMeta::ReadMode read_mode, UInt32 meta_version) = 0; protected: // These should be the required information for any kind of DataStore. @@ -74,6 +74,19 @@ class IDataStore : boost::noncopyable */ virtual void putDMFile(DMFilePtr local_dm_file, const S3::DMFileOID & oid, bool remove_local) = 0; + /** + * @brief Note: Unlike putDMFile, this function intentionally does not + * remove any local files, because it is only a "put". + * + * @param local_dir The path of the local DMFile + * @param local_files File names to upload + */ + virtual void putDMFileLocalFiles( + const String & local_dir, + const std::vector & local_files, + const S3::DMFileOID & oid) + = 0; + /** * Blocks until a DMFile in the remote data store is successfully prepared in a local cache. * If the DMFile exists in the local cache, it will not be prepared again. diff --git a/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreMock.cpp b/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreMock.cpp index 95bf697ee48..d26c07b483f 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreMock.cpp +++ b/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreMock.cpp @@ -14,6 +14,8 @@ #include +#include "Storages/KVStore/Types.h" + namespace DB::DM::Remote { @@ -35,7 +37,7 @@ static std::tuple parseDMFilePath(const String & path) return std::tuple{parent_path, file_id}; } -DMFilePtr MockPreparedDMFileToken::restore(DMFileMeta::ReadMode read_mode) +DMFilePtr MockPreparedDMFileToken::restore(DMFileMeta::ReadMode read_mode, UInt32 meta_version) { auto [parent_path, file_id] = parseDMFilePath(path); return DMFile::restore( @@ -43,6 +45,8 @@ DMFilePtr MockPreparedDMFileToken::restore(DMFileMeta::ReadMode read_mode) file_id, /*page_id*/ 0, parent_path, - read_mode); + read_mode, + NullspaceID, + meta_version); } } // namespace DB::DM::Remote diff --git a/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreMock.h b/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreMock.h index 6965c918588..dc0ac3467cf 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreMock.h +++ b/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreMock.h @@ -54,6 +54,11 @@ class DataStoreMock final : public IDataStore throw Exception("DataStoreMock::setTaggingsForKeys unsupported"); } + void putDMFileLocalFiles(const String &, const std::vector &, const S3::DMFileOID &) override + { + throw Exception("DataStoreMock::putDMFileLocalFiles unsupported"); + } + private: FileProviderPtr file_provider; }; @@ -68,7 +73,7 @@ class MockPreparedDMFileToken : public IPreparedDMFileToken ~MockPreparedDMFileToken() override = default; - DMFilePtr restore(DMFileMeta::ReadMode read_mode) override; + DMFilePtr restore(DMFileMeta::ReadMode read_mode, UInt32 meta_version) override; private: String path; diff --git a/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreS3.cpp b/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreS3.cpp index ed28b54d819..b20f9fbe605 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreS3.cpp +++ b/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreS3.cpp @@ -42,29 +42,42 @@ void DataStoreS3::putDMFile(DMFilePtr local_dmfile, const S3::DMFileOID & oid, b const auto local_dir = local_dmfile->path(); const auto local_files = local_dmfile->listFilesForUpload(); auto itr_meta = std::find_if(local_files.cbegin(), local_files.cend(), [](const auto & file_name) { - return file_name == DMFileMetaV2::metaFileName(); + // We always ensure meta v0 exists. + return file_name == DMFileMetaV2::metaFileName(0); }); RUNTIME_CHECK(itr_meta != local_files.cend()); + putDMFileLocalFiles(local_dir, local_files, oid); + + if (remove_local) + local_dmfile->switchToRemote(oid); +} + +void DataStoreS3::putDMFileLocalFiles( + const String & local_dir, + const std::vector & local_files, + const S3::DMFileOID & oid) +{ + Stopwatch sw; + const auto remote_dir = S3::S3Filename::fromDMFileOID(oid).toFullKey(); LOG_DEBUG( log, - "Start upload DMFile, local_dir={} remote_dir={} local_files={}", + "Start upload DMFile local files, local_dir={} remote_dir={} local_files={}", local_dir, remote_dir, local_files); auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient(); + // First, upload non-meta files. std::vector> upload_results; upload_results.reserve(local_files.size() - 1); for (const auto & fname : local_files) { - if (fname == DMFileMetaV2::metaFileName()) - { - // meta file will be upload at last. + if (DMFileMetaV2::isMetaFileName(fname)) continue; - } + auto local_fname = fmt::format("{}/{}", local_dir, fname); auto remote_fname = fmt::format("{}/{}", remote_dir, fname); auto task = std::make_shared>( @@ -73,30 +86,40 @@ void DataStoreS3::putDMFile(DMFilePtr local_dmfile, const S3::DMFileOID & oid, b *s3_client, local_fname, remote_fname, - EncryptionPath(local_dmfile->path(), fname, oid.keyspace_id), + EncryptionPath(local_dir, fname, oid.keyspace_id), file_provider); }); upload_results.push_back(task->get_future()); DataStoreS3Pool::get().scheduleOrThrowOnError([task]() { (*task)(); }); } for (auto & f : upload_results) - { f.get(); - } + // Then, upload meta files. // Only when the meta upload is successful, the dmfile upload can be considered successful. - auto local_meta_fname = fmt::format("{}/{}", local_dir, DMFileMetaV2::metaFileName()); - auto remote_meta_fname = fmt::format("{}/{}", remote_dir, DMFileMetaV2::metaFileName()); - S3::uploadFile( - *s3_client, - local_meta_fname, - remote_meta_fname, - EncryptionPath(local_dmfile->path(), DMFileMetaV2::metaFileName(), oid.keyspace_id), - file_provider); - if (remove_local) + upload_results.clear(); + for (const auto & fname : local_files) { - local_dmfile->switchToRemote(oid); + if (!DMFileMetaV2::isMetaFileName(fname)) + continue; + + auto local_fname = fmt::format("{}/{}", local_dir, fname); + auto remote_fname = fmt::format("{}/{}", remote_dir, fname); + auto task = std::make_shared>( + [&, local_fname = std::move(local_fname), remote_fname = std::move(remote_fname)]() { + S3::uploadFile( + *s3_client, + local_fname, + remote_fname, + EncryptionPath(local_dir, fname, oid.keyspace_id), + file_provider); + }); + upload_results.push_back(task->get_future()); + DataStoreS3Pool::get().scheduleOrThrowOnError([task]() { (*task)(); }); } + for (auto & f : upload_results) + f.get(); + LOG_INFO(log, "Upload DMFile finished, key={}, cost={}ms", remote_dir, sw.elapsedMilliseconds()); } @@ -261,7 +284,7 @@ IPreparedDMFileTokenPtr DataStoreS3::prepareDMFileByKey(const String & remote_ke return prepareDMFile(oid, 0); } -DMFilePtr S3PreparedDMFileToken::restore(DMFileMeta::ReadMode read_mode) +DMFilePtr S3PreparedDMFileToken::restore(DMFileMeta::ReadMode read_mode, UInt32 meta_version) { return DMFile::restore( file_provider, @@ -269,6 +292,7 @@ DMFilePtr S3PreparedDMFileToken::restore(DMFileMeta::ReadMode read_mode) page_id, S3::S3Filename::fromTableID(oid.store_id, oid.keyspace_id, oid.table_id).toFullKeyWithPrefix(), read_mode, - oid.keyspace_id); + oid.keyspace_id, + meta_version); } } // namespace DB::DM::Remote diff --git a/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreS3.h b/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreS3.h index 15348e5b8b6..e831a55ea6c 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreS3.h +++ b/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreS3.h @@ -35,6 +35,18 @@ class DataStoreS3 final : public IDataStore */ void putDMFile(DMFilePtr local_dmfile, const S3::DMFileOID & oid, bool remove_local) override; + /** + * @brief Note: Unlike putDMFile, this function intentionally does not + * remove any local files, because it is only a "put". + * + * @param local_dir The path of the local DMFile + * @param local_files File names to upload + */ + void putDMFileLocalFiles( + const String & local_dir, + const std::vector & local_files, + const S3::DMFileOID & oid) override; + /** * Blocks until a DMFile in the remote data store is successfully prepared in a local cache. * If the DMFile exists in the local cache, it will not be prepared again. @@ -78,7 +90,7 @@ class S3PreparedDMFileToken : public IPreparedDMFileToken ~S3PreparedDMFileToken() override = default; - DMFilePtr restore(DMFileMeta::ReadMode read_mode) override; + DMFilePtr restore(DMFileMeta::ReadMode read_mode, UInt32 meta_version) override; }; } // namespace DB::DM::Remote diff --git a/dbms/src/Storages/DeltaMerge/Remote/Proto/remote.proto b/dbms/src/Storages/DeltaMerge/Remote/Proto/remote.proto index 7cb780bdd2b..ac1bb51c9f0 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/Proto/remote.proto +++ b/dbms/src/Storages/DeltaMerge/Remote/Proto/remote.proto @@ -84,6 +84,7 @@ message ColumnFileTiny { message ColumnFileBig { uint64 page_id = 1; CheckpointInfo checkpoint_info = 2; + uint32 meta_version = 3; // Note: Only Stable cares about meta_version. ColumnFileBig does not care. // TODO: We should better recalculate these fields from local DTFile. uint64 valid_rows = 10; diff --git a/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp b/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp index c29518a3ed6..28ccdcfead3 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp +++ b/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp @@ -98,6 +98,7 @@ RemotePb::RemoteSegment Serializer::serializeSegment( { auto * remote_file = remote.add_stable_pages(); remote_file->set_page_id(dt_file->pageId()); + remote_file->set_meta_version(dt_file->metaVersion()); auto * checkpoint_info = remote_file->mutable_checkpoint_info(); #ifndef DBMS_PUBLIC_GTEST // Don't not check path in unittests. RUNTIME_CHECK(startsWith(dt_file->path(), "s3://"), dt_file->path()); @@ -170,7 +171,7 @@ SegmentSnapshotPtr Serializer::deserializeSegment( { auto remote_key = stable_file.checkpoint_info().data_file_id(); auto prepared = data_store->prepareDMFileByKey(remote_key); - auto dmfile = prepared->restore(DMFileMeta::ReadMode::all()); + auto dmfile = prepared->restore(DMFileMeta::ReadMode::all(), stable_file.meta_version()); RUNTIME_CHECK(dmfile != nullptr, remote_key); dmfiles.emplace_back(std::move(dmfile)); } @@ -405,6 +406,7 @@ RemotePb::ColumnFileRemote Serializer::serializeCFBig(const ColumnFileBig & cf_b auto * checkpoint_info = remote_big->mutable_checkpoint_info(); checkpoint_info->set_data_file_id(cf_big.file->path()); remote_big->set_page_id(cf_big.file->pageId()); + remote_big->set_meta_version(cf_big.file->metaVersion()); remote_big->set_valid_rows(cf_big.valid_rows); remote_big->set_valid_bytes(cf_big.valid_bytes); return ret; @@ -418,7 +420,7 @@ ColumnFileBigPtr Serializer::deserializeCFBig( RUNTIME_CHECK(proto.has_checkpoint_info()); LOG_DEBUG(Logger::get(), "Rebuild local ColumnFileBig from remote, key={}", proto.checkpoint_info().data_file_id()); auto prepared = data_store->prepareDMFileByKey(proto.checkpoint_info().data_file_id()); - auto dmfile = prepared->restore(DMFileMeta::ReadMode::all()); + auto dmfile = prepared->restore(DMFileMeta::ReadMode::all(), proto.meta_version()); auto * cf_big = new ColumnFileBig(dmfile, proto.valid_rows(), proto.valid_bytes(), segment_range); return std::shared_ptr(cf_big); // The constructor is private, so we cannot use make_shared. } diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index ddaa8141da6..69014ed7949 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -36,6 +36,7 @@ #include #include #include +// #include #include #include #include @@ -1351,6 +1352,93 @@ SegmentPtr Segment::replaceData( return new_me; } +SegmentPtr Segment::replaceStableMetaVersion( + const Segment::Lock &, + DMContext & dm_context, + const DMFiles & new_stable_files) +{ + auto current_stable_files_str = [&] { + FmtBuffer fmt_buf; + fmt_buf.append('['); + fmt_buf.joinStr( + stable->getDMFiles().begin(), + stable->getDMFiles().end(), + [](const DMFilePtr & file, FmtBuffer & fb) { + fb.fmtAppend("dmf_{}(v{})", file->fileId(), file->metaVersion()); + }, + ","); + fmt_buf.append(']'); + return fmt_buf.toString(); + }; + + auto new_stable_files_str = [&] { + FmtBuffer fmt_buf; + fmt_buf.append('['); + fmt_buf.joinStr( + new_stable_files.begin(), + new_stable_files.end(), + [](const DMFilePtr & file, FmtBuffer & fb) { + fb.fmtAppend("dmf_{}(v{})", file->fileId(), file->metaVersion()); + }, + ","); + fmt_buf.append(']'); + return fmt_buf.toString(); + }; + + LOG_DEBUG( + log, + "ReplaceStableMetaVersion - Begin, current_stable={} new_stable={}", + current_stable_files_str(), + new_stable_files_str()); + + // Ensure new stable files have the same DMFile ID as the old stable files. + // We only allow changing meta version when calling this function. + + if (new_stable_files.size() != stable->getDMFiles().size()) + { + LOG_WARNING( + log, + "ReplaceStableMetaVersion - Fail, stable files count mismatch, current_stable={} new_stable={}", + current_stable_files_str(), + new_stable_files_str()); + return {}; + } + for (size_t i = 0; i < new_stable_files.size(); i++) + { + if (new_stable_files[i]->fileId() != stable->getDMFiles()[i]->fileId()) + { + LOG_WARNING( + log, + "ReplaceStableMetaVersion - Fail, stable files mismatch, current_stable={} new_stable={}", + current_stable_files_str(), + new_stable_files_str()); + return {}; + } + } + + WriteBatches wbs(*dm_context.storage_pool, dm_context.getWriteLimiter()); + + auto new_stable = std::make_shared(stable->getId()); + new_stable->setFiles(new_stable_files, rowkey_range, &dm_context); + new_stable->saveMeta(wbs.meta); + + auto new_me = std::make_shared( // + parent_log, + epoch + 1, + rowkey_range, + segment_id, + next_segment_id, + delta, // Delta is untouched. Shares the same delta instance. + new_stable); + new_me->serialize(wbs.meta); + + wbs.writeAll(); + + LOG_DEBUG(log, "ReplaceStableMetaVersion - Finish, new_stable_files={}", new_stable_files_str()); + + return new_me; +} + SegmentPtr Segment::dangerouslyReplaceDataFromCheckpoint( const Segment::Lock &, // DMContext & dm_context, @@ -1375,7 +1463,8 @@ SegmentPtr Segment::dangerouslyReplaceDataFromCheckpoint( new_page_id, data_file->parentPath(), DMFileMeta::ReadMode::all(), - dm_context.keyspace_id); + dm_context.keyspace_id, + data_file->metaVersion()); wbs.data.putRefPage(new_page_id, data_file->pageId()); auto new_stable = std::make_shared(stable->getId()); @@ -1415,7 +1504,7 @@ SegmentPtr Segment::dangerouslyReplaceDataFromCheckpoint( auto remote_data_store = dm_context.global_context.getSharedContextDisagg()->remote_data_store; RUNTIME_CHECK(remote_data_store != nullptr); auto prepared = remote_data_store->prepareDMFile(file_oid, new_data_page_id); - auto dmfile = prepared->restore(DMFileMeta::ReadMode::all()); + auto dmfile = prepared->restore(DMFileMeta::ReadMode::all(), b->getFile()->metaVersion()); auto new_column_file = b->cloneWith(dm_context, dmfile, rowkey_range); new_column_file_persisteds.push_back(new_column_file); } @@ -1814,14 +1903,16 @@ Segment::prepareSplitLogical( // /* page_id= */ my_dmfile_page_id, file_parent_path, DMFileMeta::ReadMode::all(), - dm_context.keyspace_id); + dm_context.keyspace_id, + dmfile->metaVersion()); auto other_dmfile = DMFile::restore( dm_context.global_context.getFileProvider(), file_id, /* page_id= */ other_dmfile_page_id, file_parent_path, DMFileMeta::ReadMode::all(), - dm_context.keyspace_id); + dm_context.keyspace_id, + dmfile->metaVersion()); my_stable_files.push_back(my_dmfile); other_stable_files.push_back(other_dmfile); } diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index bcf1b3d4058..3cb0d30ea76 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -487,6 +487,22 @@ class Segment const DMFilePtr & data_file, SegmentSnapshotPtr segment_snap_opt = nullptr) const; + /** + * Replace the stable layer using the DMFile with a new meta version. + * Delta layer is unchanged. + * + * This API can be used to make a newly added index visible. + * + * This API does not have a prepare & apply pair, as it should be quick enough. + * + * @param new_stable_files Must be the same as the current stable DMFiles (except for the meta version). + * Otherwise replace will be failed and nullptr will be returned. + */ + [[nodiscard]] SegmentPtr replaceStableMetaVersion( + const Lock &, + DMContext & dm_context, + const DMFiles & new_stable_files); + [[nodiscard]] SegmentPtr dangerouslyReplaceDataFromCheckpoint( const Lock &, DMContext & dm_context, diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp index 942ed75172c..15fbaea1621 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp @@ -95,7 +95,13 @@ UInt64 StableValueSpace::serializeMetaToBuf(WriteBuffer & buf) const writeIntBinary(valid_bytes, buf); writeIntBinary(static_cast(files.size()), buf); for (const auto & f : files) + { + RUNTIME_CHECK_MSG( + f->metaVersion() == 0, + "StableFormat::V1 cannot persist meta_version={}", + f->metaVersion()); writeIntBinary(f->pageId(), buf); + } } else if (STORAGE_FORMAT_CURRENT.stable == StableFormat::V2) { @@ -103,7 +109,11 @@ UInt64 StableValueSpace::serializeMetaToBuf(WriteBuffer & buf) const meta.set_valid_rows(valid_rows); meta.set_valid_bytes(valid_bytes); for (const auto & f : files) - meta.add_files()->set_page_id(f->pageId()); + { + auto * mf = meta.add_files(); + mf->set_page_id(f->pageId()); + mf->set_meta_version(f->metaVersion()); + } auto data = meta.SerializeAsString(); writeStringBinary(data, buf); @@ -184,6 +194,8 @@ StableValueSpacePtr StableValueSpace::restore(DMContext & dm_context, ReadBuffer for (int i = 0; i < metapb.files().size(); ++i) { UInt64 page_id = metapb.files(i).page_id(); + UInt64 meta_version = metapb.files(i).meta_version(); + DMFilePtr dmfile; auto path_delegate = dm_context.path_pool->getStableDiskDelegator(); if (remote_data_store) @@ -203,7 +215,7 @@ StableValueSpacePtr StableValueSpace::restore(DMContext & dm_context, ReadBuffer RUNTIME_CHECK(file_oid.keyspace_id == dm_context.keyspace_id); RUNTIME_CHECK(file_oid.table_id == dm_context.physical_table_id); auto prepared = remote_data_store->prepareDMFile(file_oid, page_id); - dmfile = prepared->restore(DMFileMeta::ReadMode::all()); + dmfile = prepared->restore(DMFileMeta::ReadMode::all(), meta_version); // gc only begin to run after restore so we can safely call addRemoteDTFileIfNotExists here path_delegate.addRemoteDTFileIfNotExists(local_external_id, dmfile->getBytesOnDisk()); } @@ -217,7 +229,8 @@ StableValueSpacePtr StableValueSpace::restore(DMContext & dm_context, ReadBuffer page_id, file_parent_path, DMFileMeta::ReadMode::all(), - dm_context.keyspace_id); + dm_context.keyspace_id, + meta_version); auto res = path_delegate.updateDTFileSize(file_id, dmfile->getBytesOnDisk()); RUNTIME_CHECK_MSG(res, "update dt file size failed, path={}", dmfile->path()); } @@ -251,6 +264,8 @@ StableValueSpacePtr StableValueSpace::createFromCheckpoint( // for (int i = 0; i < metapb.files().size(); ++i) { UInt64 page_id = metapb.files(i).page_id(); + UInt64 meta_version = metapb.files(i).meta_version(); + auto full_page_id = UniversalPageIdFormat::toFullPageId( UniversalPageIdFormat::toFullPrefix( dm_context.keyspace_id, @@ -270,7 +285,7 @@ StableValueSpacePtr StableValueSpace::createFromCheckpoint( // }; wbs.data.putRemoteExternal(new_local_page_id, loc); auto prepared = remote_data_store->prepareDMFile(file_oid, new_local_page_id); - auto dmfile = prepared->restore(DMFileMeta::ReadMode::all()); + auto dmfile = prepared->restore(DMFileMeta::ReadMode::all(), meta_version); wbs.writeLogAndData(); // new_local_page_id is already applied to PageDirectory so we can safely call addRemoteDTFileIfNotExists here delegator.addRemoteDTFileIfNotExists(new_local_page_id, dmfile->getBytesOnDisk()); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp index 28fbd8f9331..9cd9a1250f8 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp @@ -330,7 +330,8 @@ try file_id.id, file_id.id, delegator.getDTFilePath(file_id.id), - DMFileMeta::ReadMode::all()); + DMFileMeta::ReadMode::all(), + /* meta_version= */ 0); remote_store->putDMFile( dm_file, S3::DMFileOID{ diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp index d77c365492b..572b8e0d6c1 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp @@ -146,7 +146,13 @@ class DMFileMetaV2Test : public DB::base::TiFlashStorageTestBasic auto page_id = dm_file->pageId(); auto parent_path = dm_file->parentPath(); auto file_provider = dbContext().getFileProvider(); - return DMFile::restore(file_provider, file_id, page_id, parent_path, DMFileMeta::ReadMode::all()); + return DMFile::restore( + file_provider, + file_id, + page_id, + parent_path, + DMFileMeta::ReadMode::all(), + /* meta_version= */ 0); } DMContext & dmContext() { return *dm_context; } @@ -162,7 +168,7 @@ class DMFileMetaV2Test : public DB::base::TiFlashStorageTestBasic static void breakFileMetaV2File(const DMFilePtr & dmfile) { - PosixWritableFile file(dmfile->metav2Path(), false, -1, 0666); + PosixWritableFile file(dmfile->metav2Path(/* meta_version= */ 0), false, -1, 0666); String s = "hello"; auto n = file.pwrite(s.data(), s.size(), 0); ASSERT_EQ(n, s.size()); @@ -552,7 +558,8 @@ try dmfile2->fileId(), dmfile2->pageId(), dmfile2->parentPath(), - DMFileMeta::ReadMode::all()); + DMFileMeta::ReadMode::all(), + /* meta_version= */ 0); LOG_DEBUG(Logger::get(), "check dmfile1 dmfile3"); check_meta(dmfile1, dmfile3); @@ -616,7 +623,8 @@ try dmfile->fileId(), dmfile->pageId(), dmfile->parentPath(), - DMFileMeta::ReadMode::all()); + DMFileMeta::ReadMode::all(), + /* meta_version= */ 0); FAIL(); // Should not come here. } catch (const DB::Exception & e) diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_meta_version.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_meta_version.cpp new file mode 100644 index 00000000000..57cd52fb1ca --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_meta_version.cpp @@ -0,0 +1,538 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS,n +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "Storages/KVStore/Types.h" + +namespace DB::DM::tests +{ + +class DMFileMetaVersionTestBase : public DB::base::TiFlashStorageTestBasic +{ +public: + void SetUp() override + { + TiFlashStorageTestBasic::SetUp(); + + if (enable_encryption) + { + KeyManagerPtr key_manager = std::make_shared(true); + file_provider_maybe_encrypted = std::make_shared(key_manager, true); + } + else + { + file_provider_maybe_encrypted = db_context->getFileProvider(); + } + + parent_path = TiFlashStorageTestBasic::getTemporaryPath(); + path_pool = std::make_shared( + db_context->getPathPool().main_data_paths, + db_context->getPathPool().latest_data_paths, + "test", + "t1", + false, + db_context->getPathPool().global_capacity, + file_provider_maybe_encrypted); + } + +protected: + DMFilePtr prepareDMFile(UInt64 file_id) + { + auto dm_file = DMFile::create( + file_id, + parent_path, + std::make_optional(), + 128 * 1024, + 16 * 1024 * 1024, + DMFileFormat::V3); + + auto cols = DMTestEnv::getDefaultColumns(DMTestEnv::PkType::HiddenTiDBRowID, /*add_nullable*/ true); + Block block = DMTestEnv::prepareSimpleWriteBlockWithNullable(0, 3); + + auto writer = DMFileWriter( + dm_file, + *cols, + file_provider_maybe_encrypted, + db_context->getWriteLimiter(), + DMFileWriter::Options()); + writer.write(block, DMFileBlockOutputStream::BlockProperty{0, 0, 0, 0}); + writer.finalize(); + + return dm_file; + } + + bool enable_encryption = true; + + const KeyspaceID keyspace_id = NullspaceID; + const TableID table_id = 100; + + std::shared_ptr path_pool{}; + FileProviderPtr file_provider_maybe_encrypted{}; + String parent_path; +}; + +class LocalDMFile + : public DMFileMetaVersionTestBase + , public testing::WithParamInterface +{ +public: + LocalDMFile() { enable_encryption = GetParam(); } +}; + +INSTANTIATE_TEST_CASE_P( // + DMFileMetaVersion, + LocalDMFile, + /* enable_encryption */ ::testing::Bool()); + +TEST_P(LocalDMFile, WriteWithOldMetaVersion) +try +{ + auto dm_file = prepareDMFile(/* file_id= */ 1); + ASSERT_EQ(0, dm_file->metaVersion()); + + auto iw = DMFileV3IncrementWriter::create(DMFileV3IncrementWriter::Options{ + .dm_file = dm_file, + .file_provider = file_provider_maybe_encrypted, + .write_limiter = db_context->getWriteLimiter(), + .path_pool = path_pool, + .disagg_ctx = db_context->getSharedContextDisagg(), + }); + ASSERT_THROW({ iw->finalize(); }, DB::Exception); +} +CATCH + +TEST_P(LocalDMFile, RestoreInvalidMetaVersion) +try +{ + auto dm_file = prepareDMFile(/* file_id= */ 1); + ASSERT_EQ(0, dm_file->metaVersion()); + + ASSERT_THROW( + { + DMFile::restore( + file_provider_maybe_encrypted, + 1, + 1, + parent_path, + DMFileMeta::ReadMode::all(), + NullspaceID, + /* meta_version= */ 1); + }, + DB::Exception); +} +CATCH + +TEST_P(LocalDMFile, RestoreWithMetaVersion) +try +{ + auto dm_file = prepareDMFile(/* file_id= */ 1); + ASSERT_EQ(0, dm_file->metaVersion()); + ASSERT_EQ(4, dm_file->meta->getColumnStats().size()); + ASSERT_STREQ("", dm_file->getColumnStat(::DB::TiDBPkColumnID).additional_data_for_test.c_str()); + + // Write new metadata + auto iw = DMFileV3IncrementWriter::create(DMFileV3IncrementWriter::Options{ + .dm_file = dm_file, + .file_provider = file_provider_maybe_encrypted, + .write_limiter = db_context->getWriteLimiter(), + .path_pool = path_pool, + .disagg_ctx = db_context->getSharedContextDisagg(), + }); + dm_file->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test = "test"; + ASSERT_EQ(1, dm_file->meta->bumpMetaVersion()); + iw->finalize(); + + // Read out meta version = 0 + dm_file = DMFile::restore( + file_provider_maybe_encrypted, + 1, + 1, + parent_path, + DMFileMeta::ReadMode::all(), + NullspaceID, + /* meta_version= */ 0); + + ASSERT_EQ(0, dm_file->metaVersion()); + ASSERT_EQ(4, dm_file->meta->getColumnStats().size()); + ASSERT_STREQ("", dm_file->getColumnStat(::DB::TiDBPkColumnID).additional_data_for_test.c_str()); + + // Read out meta version = 1 + dm_file = DMFile::restore( + file_provider_maybe_encrypted, + 1, + 1, + parent_path, + DMFileMeta::ReadMode::all(), + NullspaceID, + /* meta_version= */ 1); + + ASSERT_EQ(1, dm_file->metaVersion()); + ASSERT_EQ(4, dm_file->meta->getColumnStats().size()); + ASSERT_STREQ("test", dm_file->getColumnStat(::DB::TiDBPkColumnID).additional_data_for_test.c_str()); +} +CATCH + +TEST_P(LocalDMFile, RestoreWithMultipleMetaVersion) +try +{ + auto dm_file_for_write = prepareDMFile(/* file_id= */ 1); + + auto iw = DMFileV3IncrementWriter::create(DMFileV3IncrementWriter::Options{ + .dm_file = dm_file_for_write, + .file_provider = file_provider_maybe_encrypted, + .write_limiter = db_context->getWriteLimiter(), + .path_pool = path_pool, + .disagg_ctx = db_context->getSharedContextDisagg(), + }); + dm_file_for_write->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test = "test"; + ASSERT_EQ(1, dm_file_for_write->meta->bumpMetaVersion()); + iw->finalize(); + + auto dm_file_for_read_v1 = DMFile::restore( + file_provider_maybe_encrypted, + 1, + 1, + parent_path, + DMFileMeta::ReadMode::all(), + NullspaceID, + /* meta_version= */ 1); + ASSERT_STREQ( + "test", + dm_file_for_read_v1->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test.c_str()); + + // Write a new meta with a new version = 2 + iw = DMFileV3IncrementWriter::create(DMFileV3IncrementWriter::Options{ + .dm_file = dm_file_for_write, + .file_provider = file_provider_maybe_encrypted, + .write_limiter = db_context->getWriteLimiter(), + .path_pool = path_pool, + .disagg_ctx = db_context->getSharedContextDisagg(), + }); + dm_file_for_write->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test = "test2"; + ASSERT_EQ(2, dm_file_for_write->meta->bumpMetaVersion()); + iw->finalize(); + + // Current DMFile instance does not affect + ASSERT_STREQ( + "test", + dm_file_for_read_v1->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test.c_str()); + + // Read out meta version = 2 + auto dm_file_for_read_v2 = DMFile::restore( + file_provider_maybe_encrypted, + 1, + 1, + parent_path, + DMFileMeta::ReadMode::all(), + NullspaceID, + /* meta_version= */ 2); + ASSERT_STREQ( + "test2", + dm_file_for_read_v2->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test.c_str()); +} +CATCH + +TEST_P(LocalDMFile, OverrideMetaVersion) +try +{ + auto dm_file = prepareDMFile(/* file_id= */ 1); + + // Write meta v1. + auto iw = DMFileV3IncrementWriter::create(DMFileV3IncrementWriter::Options{ + .dm_file = dm_file, + .file_provider = file_provider_maybe_encrypted, + .write_limiter = db_context->getWriteLimiter(), + .path_pool = path_pool, + .disagg_ctx = db_context->getSharedContextDisagg(), + }); + dm_file->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test = "test"; + ASSERT_EQ(1, dm_file->meta->bumpMetaVersion()); + iw->finalize(); + + // Overwrite meta v1. + // To overwrite meta v1, we restore a v0 instance, and then bump meta version again. + auto dm_file_2 = DMFile::restore( + file_provider_maybe_encrypted, + 1, + 1, + parent_path, + DMFileMeta::ReadMode::all(), + NullspaceID, + /* meta_version= */ 0); + iw = DMFileV3IncrementWriter::create(DMFileV3IncrementWriter::Options{ + .dm_file = dm_file_2, + .file_provider = file_provider_maybe_encrypted, + .write_limiter = db_context->getWriteLimiter(), + .path_pool = path_pool, + .disagg_ctx = db_context->getSharedContextDisagg(), + }); + dm_file_2->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test = "test_overwrite"; + ASSERT_EQ(1, dm_file_2->meta->bumpMetaVersion()); + ASSERT_THROW({ iw->finalize(); }, DB::Exception); + + // Read out meta v1 again. + auto dm_file_for_read = DMFile::restore( + file_provider_maybe_encrypted, + 1, + 1, + parent_path, + DMFileMeta::ReadMode::all(), + NullspaceID, + /* meta_version= */ 1); + ASSERT_STREQ( + "test", + dm_file_for_read->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test.c_str()); +} +CATCH + +TEST_P(LocalDMFile, FinalizeMultipleTimes) +try +{ + auto dm_file = prepareDMFile(/* file_id= */ 1); + ASSERT_EQ(0, dm_file->metaVersion()); + ASSERT_EQ(4, dm_file->meta->getColumnStats().size()); + ASSERT_STREQ("", dm_file->getColumnStat(::DB::TiDBPkColumnID).additional_data_for_test.c_str()); + + // Write new metadata + auto iw = DMFileV3IncrementWriter::create(DMFileV3IncrementWriter::Options{ + .dm_file = dm_file, + .file_provider = file_provider_maybe_encrypted, + .write_limiter = db_context->getWriteLimiter(), + .path_pool = path_pool, + .disagg_ctx = db_context->getSharedContextDisagg(), + }); + dm_file->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test = "test"; + dm_file->meta->bumpMetaVersion(); + iw->finalize(); + + ASSERT_THROW({ iw->finalize(); }, DB::Exception); + + dm_file->meta->bumpMetaVersion(); + ASSERT_THROW({ iw->finalize(); }, DB::Exception); +} +CATCH + +class S3DMFile + : public DMFileMetaVersionTestBase + , public testing::WithParamInterface +{ +public: + S3DMFile() { enable_encryption = GetParam(); } + + void SetUp() override + { + DB::tests::TiFlashTestEnv::enableS3Config(); + auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient(); + ASSERT_TRUE(::DB::tests::TiFlashTestEnv::createBucketIfNotExist(*s3_client)); + + DMFileMetaVersionTestBase::SetUp(); + + auto & global_context = db_context->getGlobalContext(); + ASSERT_TRUE(!global_context.getSharedContextDisagg()->remote_data_store); + global_context.getSharedContextDisagg()->initRemoteDataStore( + file_provider_maybe_encrypted, + /* s3_enabled= */ true); + ASSERT_TRUE(global_context.getSharedContextDisagg()->remote_data_store); + } + + void TearDown() override + { + DMFileMetaVersionTestBase::TearDown(); + + auto & global_context = db_context->getGlobalContext(); + global_context.getSharedContextDisagg()->remote_data_store = nullptr; + auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient(); + DB::tests::TiFlashTestEnv::deleteBucket(*s3_client); + DB::tests::TiFlashTestEnv::disableS3Config(); + } + +protected: + Remote::IDataStorePtr dataStore() + { + auto data_store = db_context->getSharedContextDisagg()->remote_data_store; + RUNTIME_CHECK(data_store != nullptr); + return data_store; + } + + DMFilePtr prepareDMFileRemote(UInt64 file_id) + { + auto dm_file = prepareDMFile(file_id); + dataStore()->putDMFile( + dm_file, + S3::DMFileOID{ + .store_id = store_id, + .keyspace_id = keyspace_id, + .table_id = table_id, + .file_id = dm_file->fileId(), + }, + true); + return dm_file; + } + +protected: + const StoreID store_id = 17; + + // DeltaMergeStorePtr store; + bool already_initialize_data_store = false; + bool already_initialize_write_ps = false; + DB::PageStorageRunMode orig_mode = PageStorageRunMode::ONLY_V3; +}; + +INSTANTIATE_TEST_CASE_P( // + DMFileMetaVersion, + S3DMFile, + /* enable_encryption */ ::testing::Values(false)); + +// In this TiFlash version WN does not support encryption at all. +// See https://github.com/pingcap/tiflash/issues/8351 + +TEST_P(S3DMFile, Basic) +try +{ + // This test case just test DMFileMetaVersionTestForS3 is working. + + auto dm_file = prepareDMFileRemote(/* file_id= */ 1); + ASSERT_TRUE(dm_file->path().starts_with("s3://")); + ASSERT_EQ(0, dm_file->metaVersion()); + + auto token = dataStore()->prepareDMFile(S3::DMFileOID{ + .store_id = store_id, + .keyspace_id = keyspace_id, + .table_id = table_id, + .file_id = 1, + }); + auto cn_dmf = token->restore(DMFileMeta::ReadMode::all(), 0); + ASSERT_EQ(0, cn_dmf->metaVersion()); + + auto cn_dmf_2 = token->restore(DMFileMeta::ReadMode::all(), 0); + ASSERT_EQ(0, cn_dmf_2->metaVersion()); +} +CATCH + +TEST_P(S3DMFile, WriteRemoteDMFile) +try +{ + auto dm_file = prepareDMFileRemote(/* file_id= */ 1); + ASSERT_TRUE(dm_file->path().starts_with("s3://")); + + ASSERT_EQ(0, dm_file->metaVersion()); + ASSERT_EQ(4, dm_file->meta->getColumnStats().size()); + ASSERT_STREQ("", dm_file->getColumnStat(::DB::TiDBPkColumnID).additional_data_for_test.c_str()); + + // Write new metadata + auto iw = DMFileV3IncrementWriter::create(DMFileV3IncrementWriter::Options{ + .dm_file = dm_file, + .file_provider = file_provider_maybe_encrypted, + .write_limiter = db_context->getWriteLimiter(), + .path_pool = path_pool, + .disagg_ctx = db_context->getSharedContextDisagg(), + }); + dm_file->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test = "test"; + ASSERT_EQ(1, dm_file->meta->bumpMetaVersion()); + iw->finalize(); + + // Read out meta version = 0 + auto token = dataStore()->prepareDMFile(S3::DMFileOID{ + .store_id = store_id, + .keyspace_id = keyspace_id, + .table_id = table_id, + .file_id = 1, + }); + auto cn_dmf = token->restore(DMFileMeta::ReadMode::all(), 0); + ASSERT_EQ(0, cn_dmf->metaVersion()); + ASSERT_STREQ("", cn_dmf->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test.c_str()); + + // Read out meta version = 1 + cn_dmf = token->restore(DMFileMeta::ReadMode::all(), 1); + ASSERT_EQ(1, cn_dmf->metaVersion()); + ASSERT_STREQ("test", cn_dmf->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test.c_str()); +} +CATCH + +TEST_P(S3DMFile, WithFileCache) +try +{ + StorageRemoteCacheConfig file_cache_config{ + .dir = fmt::format("{}/fs_cache", getTemporaryPath()), + .capacity = 1 * 1000 * 1000 * 1000, + }; + FileCache::initialize(db_context->getGlobalContext().getPathCapacity(), file_cache_config); + + auto dm_file = prepareDMFileRemote(/* file_id= */ 1); + ASSERT_TRUE(dm_file->path().starts_with("s3://")); + + ASSERT_EQ(0, dm_file->metaVersion()); + ASSERT_EQ(4, dm_file->meta->getColumnStats().size()); + ASSERT_STREQ("", dm_file->getColumnStat(::DB::TiDBPkColumnID).additional_data_for_test.c_str()); + + // Write new metadata + auto iw = DMFileV3IncrementWriter::create(DMFileV3IncrementWriter::Options{ + .dm_file = dm_file, + .file_provider = file_provider_maybe_encrypted, + .write_limiter = db_context->getWriteLimiter(), + .path_pool = path_pool, + .disagg_ctx = db_context->getSharedContextDisagg(), + }); + dm_file->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test = "test"; + ASSERT_EQ(1, dm_file->meta->bumpMetaVersion()); + iw->finalize(); + + { + auto * file_cache = FileCache::instance(); + ASSERT_TRUE(file_cache->getAll().empty()); + } + + // Read out meta version = 0 + auto token = dataStore()->prepareDMFile(S3::DMFileOID{ + .store_id = store_id, + .keyspace_id = keyspace_id, + .table_id = table_id, + .file_id = 1, + }); + auto cn_dmf = token->restore(DMFileMeta::ReadMode::all(), 0); + ASSERT_EQ(0, cn_dmf->metaVersion()); + ASSERT_STREQ("", cn_dmf->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test.c_str()); + + { + auto * file_cache = FileCache::instance(); + ASSERT_FALSE(file_cache->getAll().empty()); + } + + // Read out meta version = 1 + cn_dmf = token->restore(DMFileMeta::ReadMode::all(), 1); + ASSERT_EQ(1, cn_dmf->metaVersion()); + ASSERT_STREQ("test", cn_dmf->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test.c_str()); + + SCOPE_EXIT({ FileCache::shutdown(); }); +} +CATCH + +} // namespace DB::DM::tests diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index a323171836c..8bccb4ddd88 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -1385,8 +1385,13 @@ try auto [range, file_ids] = genDMFile(dmContext(), block); auto file_id = file_ids[0]; auto file_parent_path = delegate.getDTFilePath(file_id); - auto file - = DMFile::restore(file_provider, file_id, file_id, file_parent_path, DMFileMeta::ReadMode::all()); + auto file = DMFile::restore( + file_provider, + file_id, + file_id, + file_parent_path, + DMFileMeta::ReadMode::all(), + /* meta_version= */ 0); WriteBatches wbs(*storage_pool); wbs.data.putExternal(file_id, 0); wbs.writeLogAndData(); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp index 8c0b68014d2..694047b50e9 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp @@ -152,7 +152,13 @@ class VectorIndexDMFileTest auto file_id = dm_file->fileId(); auto page_id = dm_file->pageId(); auto file_provider = dbContext().getFileProvider(); - return DMFile::restore(file_provider, file_id, page_id, parent_path, DMFileMeta::ReadMode::all()); + return DMFile::restore( + file_provider, + file_id, + page_id, + parent_path, + DMFileMeta::ReadMode::all(), + /* meta_version= */ 0); } Context & dbContext() { return *db_context; } diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_stable_data.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_stable_data.cpp new file mode 100644 index 00000000000..878c769cba9 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_stable_data.cpp @@ -0,0 +1,657 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace CurrentMetrics +{ +extern const Metric DT_SnapshotOfRead; +} // namespace CurrentMetrics + +namespace DB::DM +{ + +extern DMFilePtr writeIntoNewDMFile( + DMContext & dm_context, + const ColumnDefinesPtr & schema_snap, + const BlockInputStreamPtr & input_stream, + UInt64 file_id, + const String & parent_path); + +} + +namespace DB::DM::tests +{ + +class SegmentReplaceStableData + : public SegmentTestBasic + , public testing::WithParamInterface +{ +protected: + void SetUp() override + { + storage_version = STORAGE_FORMAT_CURRENT; + STORAGE_FORMAT_CURRENT = STORAGE_FORMAT_V6; + SegmentTestBasic::SetUp(); + } + + void TearDown() override + { + SegmentTestBasic::TearDown(); + STORAGE_FORMAT_CURRENT = storage_version; + } + + void replaceSegmentStableWithNewMetaValue(PageIdU64 segment_id, String pk_additiona_data) + { + // For test purpose, we only replace the additional_data_for_test field + // of the PK, as the change of the new metadata. + + auto [segment, snapshot] = getSegmentForRead(segment_id); + RUNTIME_CHECK(segment != nullptr); + + auto files = snapshot->stable->getDMFiles(); + RUNTIME_CHECK(files.size() == 1); + + DMFiles new_dm_files; + + for (auto & file : files) + { + auto new_dm_file = DMFile::restore( + dm_context->global_context.getFileProvider(), + file->fileId(), + file->pageId(), + file->parentPath(), + DMFileMeta::ReadMode::all(), + NullspaceID, + file->metaVersion()); + + auto iw = DMFileV3IncrementWriter::create(DMFileV3IncrementWriter::Options{ + .dm_file = new_dm_file, + .file_provider = dm_context->global_context.getFileProvider(), + .write_limiter = dm_context->global_context.getWriteLimiter(), + .path_pool = storage_path_pool, + .disagg_ctx = dm_context->global_context.getSharedContextDisagg(), + }); + auto & column_stats = new_dm_file->meta->getColumnStats(); + RUNTIME_CHECK(column_stats.find(::DB::TiDBPkColumnID) != column_stats.end()); + column_stats[::DB::TiDBPkColumnID].additional_data_for_test = pk_additiona_data; + + new_dm_file->meta->bumpMetaVersion(); + iw->finalize(); + + new_dm_files.emplace_back(new_dm_file); + } + + // TODO: Support multiple DMFiles + auto succeeded = replaceSegmentStableData(segment_id, new_dm_files[0]); + RUNTIME_CHECK(succeeded); + } + + UInt32 getSegmentStableMetaVersion(SegmentPtr segment) + { + auto files = segment->stable->getDMFiles(); + RUNTIME_CHECK(!files.empty()); + + // TODO: Support multiple DMFiles + auto file = files[0]; + + auto meta_version = file->metaVersion(); + + // Read again using a fresh DMFile restore, to ensure that this meta version is + // indeed persisted. + auto file2 = DMFile::restore( + dm_context->global_context.getFileProvider(), + file->fileId(), + file->pageId(), + file->parentPath(), + DMFileMeta::ReadMode::all(), + NullspaceID, + meta_version); + RUNTIME_CHECK(file2 != nullptr); + + return meta_version; + } + + UInt32 getSegmentStableMetaVersion(PageIdU64 segment_id) + { + auto [segment, snapshot] = getSegmentForRead(segment_id); + RUNTIME_CHECK(segment != nullptr); + UNUSED(snapshot); + return getSegmentStableMetaVersion(segment); + } + + String getSegmentStableMetaValue(SegmentPtr segment) + { + // For test purpose, we only get the additional_data_for_test field + // of the PK, as a prove of the metadata. + + auto files = segment->stable->getDMFiles(); + RUNTIME_CHECK(!files.empty()); + + auto file = files[0]; + auto column_stats = file->meta->getColumnStats(); + RUNTIME_CHECK(column_stats.find(::DB::TiDBPkColumnID) != column_stats.end()); + + auto meta_value = column_stats[::DB::TiDBPkColumnID].additional_data_for_test; + + // Read again using a fresh DMFile restore, to ensure that this value is + // indeed persisted. + auto file2 = DMFile::restore( + dm_context->global_context.getFileProvider(), + file->fileId(), + file->pageId(), + file->parentPath(), + DMFileMeta::ReadMode::all(), + file->metaVersion()); + RUNTIME_CHECK(file2 != nullptr); + + column_stats = file2->meta->getColumnStats(); + RUNTIME_CHECK(column_stats.find(::DB::TiDBPkColumnID) != column_stats.end()); + RUNTIME_CHECK(column_stats[::DB::TiDBPkColumnID].additional_data_for_test == meta_value); + + return meta_value; + } + + String getSegmentStableMetaValue(PageIdU64 segment_id) + { + auto [segment, snapshot] = getSegmentForRead(segment_id); + RUNTIME_CHECK(segment != nullptr); + UNUSED(snapshot); + return getSegmentStableMetaValue(segment); + } + + inline void assertPK(PageIdU64 segment_id, std::string_view expected_sequence) + { + auto left_handle = getSegmentHandle(segment_id, {}); + const auto * left_r = toColumnVectorDataPtr(left_handle); + auto expected_left_handle = genSequence(expected_sequence); + ASSERT_EQ(expected_left_handle.size(), left_r->size()); + ASSERT_TRUE(sequenceEqual(expected_left_handle.data(), left_r->data(), left_r->size())); + } + +private: + StorageFormatVersion storage_version = STORAGE_FORMAT_CURRENT; +}; + +INSTANTIATE_TEST_CASE_P( + DMFileMetaVersion, + SegmentReplaceStableData, + /* unused */ testing::Values(false)); + +TEST_P(SegmentReplaceStableData, ReplaceWithAnotherDMFile) +try +{ + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100); + flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); + mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + + auto block = prepareWriteBlock(/* from */ 0, /* to */ 10); + auto input_stream = std::make_shared(block); + auto delegator = storage_path_pool->getStableDiskDelegator(); + auto file_id = storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); + auto new_dm_file = writeIntoNewDMFile(*dm_context, table_columns, input_stream, file_id, delegator.choosePath()); + + ASSERT_FALSE(replaceSegmentStableData(DELTA_MERGE_FIRST_SEGMENT_ID, new_dm_file)); +} +CATCH + +TEST_P(SegmentReplaceStableData, Basic) +try +{ + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, /* write_rows= */ 100, /* start_at= */ 0); + flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); + mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, /* write_rows= */ 10, /* start_at= */ 200); + flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); + + assertPK(DELTA_MERGE_FIRST_SEGMENT_ID, "[0,100)|[200,210)"); + + // Initial meta version should be 0 + ASSERT_EQ(0, getSegmentStableMetaVersion(DELTA_MERGE_FIRST_SEGMENT_ID)); + ASSERT_STREQ("", getSegmentStableMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID).c_str()); + + // Create a new meta and replace + replaceSegmentStableWithNewMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID, "hello"); + // Data in delta does not change + assertPK(DELTA_MERGE_FIRST_SEGMENT_ID, "[0,100)|[200,210)"); + ASSERT_EQ(1, getSegmentStableMetaVersion(DELTA_MERGE_FIRST_SEGMENT_ID)); + ASSERT_STREQ("hello", getSegmentStableMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID).c_str()); + + // Create a new meta and replace + replaceSegmentStableWithNewMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID, "foo"); + assertPK(DELTA_MERGE_FIRST_SEGMENT_ID, "[0,100)|[200,210)"); + ASSERT_EQ(2, getSegmentStableMetaVersion(DELTA_MERGE_FIRST_SEGMENT_ID)); + ASSERT_STREQ("foo", getSegmentStableMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID).c_str()); + + // Write to delta after updating the meta should be fine. + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, /* write_rows= */ 50, /* start_at= */ 500); + assertPK(DELTA_MERGE_FIRST_SEGMENT_ID, "[0,100)|[200,210)|[500,550)"); + flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); + assertPK(DELTA_MERGE_FIRST_SEGMENT_ID, "[0,100)|[200,210)|[500,550)"); + + // Rewrite stable should result in a fresh meta + mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + assertPK(DELTA_MERGE_FIRST_SEGMENT_ID, "[0,100)|[200,210)|[500,550)"); + ASSERT_EQ(0, getSegmentStableMetaVersion(DELTA_MERGE_FIRST_SEGMENT_ID)); + ASSERT_STREQ("", getSegmentStableMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID).c_str()); +} +CATCH + +TEST_P(SegmentReplaceStableData, LogicalSplit) +try +{ + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, /* write_rows= */ 100, /* start_at= */ 0); + flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); + mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + + // Create a new meta and replace + replaceSegmentStableWithNewMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID, "bar"); + ASSERT_EQ(1, getSegmentStableMetaVersion(DELTA_MERGE_FIRST_SEGMENT_ID)); + ASSERT_STREQ("bar", getSegmentStableMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID).c_str()); + + assertPK(DELTA_MERGE_FIRST_SEGMENT_ID, "[0,100)"); + + // Logical split + auto right_segment_id = splitSegmentAt( // + DELTA_MERGE_FIRST_SEGMENT_ID, + /* split_at= */ 50, + Segment::SplitMode::Logical); + ASSERT_TRUE(right_segment_id.has_value()); + + assertPK(DELTA_MERGE_FIRST_SEGMENT_ID, "[0,50)"); + assertPK(*right_segment_id, "[50,100)"); + + // The new segment should have the same meta + ASSERT_EQ(1, getSegmentStableMetaVersion(*right_segment_id)); + ASSERT_STREQ("bar", getSegmentStableMetaValue(*right_segment_id).c_str()); + + ASSERT_EQ(1, getSegmentStableMetaVersion(DELTA_MERGE_FIRST_SEGMENT_ID)); + ASSERT_STREQ("bar", getSegmentStableMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID).c_str()); + + // Rewrite stable + mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + + assertPK(DELTA_MERGE_FIRST_SEGMENT_ID, "[0,50)"); + assertPK(*right_segment_id, "[50,100)"); + + ASSERT_EQ(1, getSegmentStableMetaVersion(*right_segment_id)); + ASSERT_STREQ("bar", getSegmentStableMetaValue(*right_segment_id).c_str()); + + ASSERT_EQ(0, getSegmentStableMetaVersion(DELTA_MERGE_FIRST_SEGMENT_ID)); + ASSERT_STREQ("", getSegmentStableMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID).c_str()); +} +CATCH + +TEST_P(SegmentReplaceStableData, PhysicalSplit) +try +{ + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, /* write_rows= */ 100, /* start_at= */ 0); + flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); + mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + + // Create a new meta and replace + replaceSegmentStableWithNewMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID, "bar"); + ASSERT_EQ(1, getSegmentStableMetaVersion(DELTA_MERGE_FIRST_SEGMENT_ID)); + ASSERT_STREQ("bar", getSegmentStableMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID).c_str()); + + assertPK(DELTA_MERGE_FIRST_SEGMENT_ID, "[0,100)"); + + // Physical split + auto right_segment_id = splitSegmentAt( // + DELTA_MERGE_FIRST_SEGMENT_ID, + /* split_at= */ 50, + Segment::SplitMode::Physical); + ASSERT_TRUE(right_segment_id.has_value()); + + assertPK(DELTA_MERGE_FIRST_SEGMENT_ID, "[0,50)"); + assertPK(*right_segment_id, "[50,100)"); + + // Physical split will rewrite the stable, thus result in a fresh meta + ASSERT_EQ(0, getSegmentStableMetaVersion(*right_segment_id)); + ASSERT_STREQ("", getSegmentStableMetaValue(*right_segment_id).c_str()); + + ASSERT_EQ(0, getSegmentStableMetaVersion(DELTA_MERGE_FIRST_SEGMENT_ID)); + ASSERT_STREQ("", getSegmentStableMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID).c_str()); +} +CATCH + +TEST_P(SegmentReplaceStableData, UpdateMetaAfterLogicalSplit) +try +{ + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, /* write_rows= */ 100, /* start_at= */ 0); + flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); + mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + + auto right_segment_id = splitSegmentAt( // + DELTA_MERGE_FIRST_SEGMENT_ID, + /* split_at= */ 50, + Segment::SplitMode::Logical); + ASSERT_TRUE(right_segment_id.has_value()); + + // The left and right segment shares the same stable. + // However we should be able to update their meta independently, + // as long as meta versions are different. + + ASSERT_EQ(0, getSegmentStableMetaVersion(DELTA_MERGE_FIRST_SEGMENT_ID)); + ASSERT_STREQ("", getSegmentStableMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID).c_str()); + ASSERT_EQ(0, getSegmentStableMetaVersion(*right_segment_id)); + ASSERT_STREQ("", getSegmentStableMetaValue(*right_segment_id).c_str()); + + // Update left meta does not change right meta + replaceSegmentStableWithNewMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID, "bar"); + ASSERT_EQ(1, getSegmentStableMetaVersion(DELTA_MERGE_FIRST_SEGMENT_ID)); + ASSERT_STREQ("bar", getSegmentStableMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID).c_str()); + ASSERT_EQ(0, getSegmentStableMetaVersion(*right_segment_id)); + ASSERT_STREQ("", getSegmentStableMetaValue(*right_segment_id).c_str()); + + // Update right meta should fail, because right meta is still holding meta version 0 + // and will overwrite meta version 1. + ASSERT_THROW({ replaceSegmentStableWithNewMetaValue(*right_segment_id, "foo"); }, DB::Exception); + ASSERT_EQ(1, getSegmentStableMetaVersion(DELTA_MERGE_FIRST_SEGMENT_ID)); + ASSERT_STREQ("bar", getSegmentStableMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID).c_str()); + ASSERT_EQ(0, getSegmentStableMetaVersion(*right_segment_id)); + ASSERT_STREQ("", getSegmentStableMetaValue(*right_segment_id).c_str()); +} +CATCH + +TEST_P(SegmentReplaceStableData, RestoreSegment) +try +{ + // TODO with different storage format versions. + + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, /* write_rows= */ 100, /* start_at= */ 0); + flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); + mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + + assertPK(DELTA_MERGE_FIRST_SEGMENT_ID, "[0,100)"); + + // Create a new meta and replace + replaceSegmentStableWithNewMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID, "hello"); + assertPK(DELTA_MERGE_FIRST_SEGMENT_ID, "[0,100)"); + ASSERT_EQ(1, getSegmentStableMetaVersion(DELTA_MERGE_FIRST_SEGMENT_ID)); + ASSERT_STREQ("hello", getSegmentStableMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID).c_str()); + + // Restore the segment from PageStorage, meta version should be correct. + SegmentPtr restored_segment = Segment::restoreSegment(Logger::get(), *dm_context, DELTA_MERGE_FIRST_SEGMENT_ID); + ASSERT_EQ(1, getSegmentStableMetaVersion(restored_segment)); + ASSERT_STREQ("hello", getSegmentStableMetaValue(restored_segment).c_str()); +} +CATCH + +class SegmentReplaceStableDataDisaggregated + : public DB::base::TiFlashStorageTestBasic + , public testing::WithParamInterface +{ +private: + bool enable_file_cache = false; + +public: + SegmentReplaceStableDataDisaggregated() { enable_file_cache = GetParam(); } + +public: + void SetUp() override + { + storage_version = STORAGE_FORMAT_CURRENT; + STORAGE_FORMAT_CURRENT = STORAGE_FORMAT_V6; + + DB::tests::TiFlashTestEnv::enableS3Config(); + auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient(); + ASSERT_TRUE(::DB::tests::TiFlashTestEnv::createBucketIfNotExist(*s3_client)); + TiFlashStorageTestBasic::SetUp(); + + auto & global_context = TiFlashTestEnv::getGlobalContext(); + + ASSERT_TRUE(global_context.getSharedContextDisagg()->remote_data_store == nullptr); + global_context.getSharedContextDisagg()->initRemoteDataStore( + global_context.getFileProvider(), + /*s3_enabled*/ true); + ASSERT_TRUE(global_context.getSharedContextDisagg()->remote_data_store != nullptr); + + ASSERT_TRUE(global_context.getWriteNodePageStorage() == nullptr); + orig_mode = global_context.getPageStorageRunMode(); + global_context.setPageStorageRunMode(PageStorageRunMode::UNI_PS); + global_context.tryReleaseWriteNodePageStorageForTest(); + global_context.initializeWriteNodePageStorageIfNeed(global_context.getPathPool()); + + auto kvstore = db_context->getTMTContext().getKVStore(); + { + auto meta_store = metapb::Store{}; + meta_store.set_id(100); + kvstore->setStore(meta_store); + } + + TiFlashStorageTestBasic::reload(DB::Settings()); + storage_path_pool = std::make_shared(db_context->getPathPool().withTable("test", "t1", false)); + page_id_allocator = std::make_shared(); + storage_pool = std::make_shared( + *db_context, + NullspaceID, + ns_id, + *storage_path_pool, + page_id_allocator, + "test.t1"); + storage_pool->restore(); + + if (enable_file_cache) + { + StorageRemoteCacheConfig file_cache_config{ + .dir = fmt::format("{}/fs_cache", getTemporaryPath()), + .capacity = 1 * 1000 * 1000 * 1000, + }; + FileCache::initialize(global_context.getPathCapacity(), file_cache_config); + } + + table_columns = DMTestEnv::getDefaultColumns(); + + wn_dm_context = dmContext(); + wn_segment = Segment::newSegment( + Logger::get(), + *wn_dm_context, + table_columns, + RowKeyRange::newAll(false, 1), + DELTA_MERGE_FIRST_SEGMENT_ID, + 0); + ASSERT_EQ(wn_segment->segmentId(), DELTA_MERGE_FIRST_SEGMENT_ID); + } + + void TearDown() override + { + if (enable_file_cache) + { + FileCache::shutdown(); + } + + auto & global_context = TiFlashTestEnv::getGlobalContext(); + // global_context.dropVectorIndexCache(); + global_context.getSharedContextDisagg()->remote_data_store = nullptr; + global_context.setPageStorageRunMode(orig_mode); + + auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient(); + ::DB::tests::TiFlashTestEnv::deleteBucket(*s3_client); + DB::tests::TiFlashTestEnv::disableS3Config(); + + STORAGE_FORMAT_CURRENT = storage_version; + } + + SegmentSnapshotPtr createCNSnapshotFromWN(SegmentPtr wn_segment, const DMContext & wn_context) + { + auto snap = wn_segment->createSnapshot(wn_context, false, CurrentMetrics::DT_SnapshotOfRead); + auto snap_proto = Remote::Serializer::serializeSegment( + snap, + wn_segment->segmentId(), + 0, + wn_segment->rowkey_range, + {wn_segment->rowkey_range}, + dummy_mem_tracker, + true); + + auto cn_segment = std::make_shared( + Logger::get(), + /*epoch*/ 0, + wn_segment->getRowKeyRange(), + wn_segment->segmentId(), + /*next_segment_id*/ 0, + nullptr, + nullptr); + + auto read_dm_context = dmContext(); + auto cn_segment_snap = Remote::Serializer::deserializeSegment( + *read_dm_context, + /* store_id */ 100, + 0, + /* table_id */ 100, + snap_proto); + + return cn_segment_snap; + } + +protected: + DMContextPtr dmContext(const ScanContextPtr & scan_context = nullptr) + { + return DMContext::createUnique( + *db_context, + storage_path_pool, + storage_pool, + /*min_version_*/ 0, + NullspaceID, + /*physical_table_id*/ 100, + false, + 1, + db_context->getSettingsRef(), + scan_context); + } + +protected: + /// all these var lives as ref in dm_context + GlobalPageIdAllocatorPtr page_id_allocator; + std::shared_ptr storage_path_pool; + std::shared_ptr storage_pool; + ColumnDefinesPtr table_columns; + DM::DeltaMergeStore::Settings settings; + + NamespaceID ns_id = 100; + + // the segment we are going to test + SegmentPtr wn_segment; + DMContextPtr wn_dm_context; + + DB::PageStorageRunMode orig_mode = PageStorageRunMode::ONLY_V3; + + MemTrackerWrapper dummy_mem_tracker = MemTrackerWrapper(0, root_of_query_mem_trackers.get()); + +private: + StorageFormatVersion storage_version = STORAGE_FORMAT_CURRENT; +}; + +INSTANTIATE_TEST_CASE_P( + DMFileMetaVersion, + SegmentReplaceStableDataDisaggregated, + /* enable_file_cache */ testing::Bool()); + +TEST_P(SegmentReplaceStableDataDisaggregated, Basic) +try +{ + // Prepare a stable data on WN + { + Block block = DMTestEnv::prepareSimpleWriteBlockWithNullable(0, 100); + wn_segment->write(*wn_dm_context, std::move(block), true); + wn_segment = wn_segment->mergeDelta(*wn_dm_context, table_columns); + ASSERT_TRUE(wn_segment != nullptr); + ASSERT_TRUE(wn_segment->stable->getDMFiles()[0]->path().rfind("s3://") == 0); + } + + // Prepare meta version 1 + SegmentPtr wn_segment_v1{}; + { + auto file = wn_segment->stable->getDMFiles()[0]; + auto new_dm_file = DMFile::restore( + wn_dm_context->global_context.getFileProvider(), + file->fileId(), + file->pageId(), + file->parentPath(), + DMFileMeta::ReadMode::all(), + file->metaVersion()); + + auto iw = DMFileV3IncrementWriter::create(DMFileV3IncrementWriter::Options{ + .dm_file = new_dm_file, + .file_provider = wn_dm_context->global_context.getFileProvider(), + .write_limiter = wn_dm_context->global_context.getWriteLimiter(), + .path_pool = storage_path_pool, + .disagg_ctx = wn_dm_context->global_context.getSharedContextDisagg(), + }); + auto & column_stats = new_dm_file->meta->getColumnStats(); + RUNTIME_CHECK(column_stats.find(::DB::TiDBPkColumnID) != column_stats.end()); + column_stats[::DB::TiDBPkColumnID].additional_data_for_test = "tiflash_foo"; + + new_dm_file->meta->bumpMetaVersion(); + iw->finalize(); + + auto lock = wn_segment->mustGetUpdateLock(); + wn_segment_v1 = wn_segment->replaceStableMetaVersion(lock, *wn_dm_context, {new_dm_file}); + RUNTIME_CHECK(wn_segment_v1 != nullptr); + } + + // Read meta v0 in CN + { + auto snapshot = createCNSnapshotFromWN(wn_segment, *wn_dm_context); + ASSERT_TRUE(snapshot != nullptr); + auto cn_files = snapshot->stable->getDMFiles(); + ASSERT_EQ(1, cn_files.size()); + ASSERT_EQ(0, cn_files[0]->metaVersion()); + ASSERT_STREQ("", cn_files[0]->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test.c_str()); + } + + // Read meta v1 in CN + { + auto snapshot = createCNSnapshotFromWN(wn_segment_v1, *wn_dm_context); + ASSERT_TRUE(snapshot != nullptr); + auto cn_files = snapshot->stable->getDMFiles(); + ASSERT_EQ(1, cn_files.size()); + ASSERT_EQ(1, cn_files[0]->metaVersion()); + ASSERT_STREQ( + "tiflash_foo", + cn_files[0]->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test.c_str()); + } + + // Read meta v0 again in CN + { + auto snapshot = createCNSnapshotFromWN(wn_segment, *wn_dm_context); + ASSERT_TRUE(snapshot != nullptr); + auto cn_files = snapshot->stable->getDMFiles(); + ASSERT_EQ(1, cn_files.size()); + ASSERT_EQ(0, cn_files[0]->metaVersion()); + ASSERT_STREQ("", cn_files[0]->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test.c_str()); + } +} +CATCH + +} // namespace DB::DM::tests diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp index 4381f0a1ae5..2e95535d67b 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp @@ -531,7 +531,8 @@ void SegmentTestBasic::ingestDTFileIntoDelta( file_id, ref_id, parent_path, - DMFileMeta::ReadMode::all()); + DMFileMeta::ReadMode::all(), + /* meta_version= */ 0); wbs.writeLogAndData(); ASSERT_TRUE(segment->ingestDataToDelta( *dm_context, @@ -590,7 +591,8 @@ void SegmentTestBasic::ingestDTFileByReplace( file_id, ref_id, parent_path, - DMFileMeta::ReadMode::all()); + DMFileMeta::ReadMode::all(), + /* meta_version= */ 0); wbs.writeLogAndData(); auto apply_result = segment->ingestDataForTest(*dm_context, ref_file, clear); @@ -708,6 +710,33 @@ void SegmentTestBasic::replaceSegmentData(PageIdU64 segment_id, const DMFilePtr operation_statistics["replaceData"]++; } +bool SegmentTestBasic::replaceSegmentStableData(PageIdU64 segment_id, const DMFilePtr & file) +{ + LOG_INFO( + logger_op, + "replaceSegmentStableData, segment_id={} file=dmf_{}(v={})", + segment_id, + file->fileId(), + file->metaVersion()); + + RUNTIME_CHECK(segments.find(segment_id) != segments.end()); + + bool success = false; + auto segment = segments[segment_id]; + { + auto lock = segment->mustGetUpdateLock(); + auto new_segment = segment->replaceStableMetaVersion(lock, *dm_context, {file}); + if (new_segment != nullptr) + { + segments[new_segment->segmentId()] = new_segment; + success = true; + } + } + + operation_statistics["replaceStableData"]++; + return success; +} + bool SegmentTestBasic::areSegmentsSharingStable(const std::vector & segments_id) const { RUNTIME_CHECK(segments_id.size() >= 2); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h index 46cc093696e..9a7ab721eec 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h @@ -95,6 +95,12 @@ class SegmentTestBasic : public DB::base::TiFlashStorageTestBasic void replaceSegmentData(PageIdU64 segment_id, const DMFilePtr & file, SegmentSnapshotPtr snapshot = nullptr); void replaceSegmentData(PageIdU64 segment_id, const Block & block, SegmentSnapshotPtr snapshot = nullptr); + /** + * This function does not check rows. + * Returns whether replace is successful. + */ + bool replaceSegmentStableData(PageIdU64 segment_id, const DMFilePtr & file); + Block prepareWriteBlock(Int64 start_key, Int64 end_key, bool is_deleted = false); Block prepareWriteBlockInSegmentRange( PageIdU64 segment_id, diff --git a/dbms/src/Storages/Page/V3/Universal/tests/gtest_checkpoint.cpp b/dbms/src/Storages/Page/V3/Universal/tests/gtest_checkpoint.cpp index 2adb2e389df..299083cf2ae 100644 --- a/dbms/src/Storages/Page/V3/Universal/tests/gtest_checkpoint.cpp +++ b/dbms/src/Storages/Page/V3/Universal/tests/gtest_checkpoint.cpp @@ -1093,7 +1093,10 @@ try S3::uploadEmptyFile(*s3_client, ingest_from_data_file.toFullKey()); S3::uploadEmptyFile( *s3_client, - fmt::format("{}/{}", ingest_from_dtfile.toFullKey(), DM::DMFileMetaV2::metaFileName())); + fmt::format( + "{}/{}", + ingest_from_dtfile.toFullKey(), + DM::DMFileMetaV2::metaFileName(/* meta_version= */ 0))); UniversalWriteBatch batch; diff --git a/dbms/src/Storages/Page/V3/Universal/tests/gtest_lock_local_mgr.cpp b/dbms/src/Storages/Page/V3/Universal/tests/gtest_lock_local_mgr.cpp index 0c1ade3cf9b..96d82527c30 100644 --- a/dbms/src/Storages/Page/V3/Universal/tests/gtest_lock_local_mgr.cpp +++ b/dbms/src/Storages/Page/V3/Universal/tests/gtest_lock_local_mgr.cpp @@ -78,7 +78,7 @@ try { S3::uploadEmptyFile( *s3_client, - fmt::format("{}/{}", s3name_dtfile.toFullKey(), DM::DMFileMetaV2::metaFileName())); + fmt::format("{}/{}", s3name_dtfile.toFullKey(), DM::DMFileMetaV2::metaFileName(/* meta_version= */ 0))); PS::V3::CheckpointLocation loc{ .data_file_id = std::make_shared(s3name_dtfile.toFullKey()), .offset_in_file = 0, diff --git a/dbms/src/Storages/PathPool.h b/dbms/src/Storages/PathPool.h index f0bb4882504..6164b4dc0c2 100644 --- a/dbms/src/Storages/PathPool.h +++ b/dbms/src/Storages/PathPool.h @@ -122,7 +122,11 @@ class PathPool friend class PSDiskDelegatorGlobalMulti; friend class PSDiskDelegatorFixedDirectory; +#ifndef DBMS_PUBLIC_GTEST private: +#else +public: +#endif Strings main_data_paths; Strings latest_data_paths; Strings kvstore_paths; diff --git a/dbms/src/Storages/S3/FileCache.cpp b/dbms/src/Storages/S3/FileCache.cpp index 88e1dca52f3..a1acd79bc87 100644 --- a/dbms/src/Storages/S3/FileCache.cpp +++ b/dbms/src/Storages/S3/FileCache.cpp @@ -515,12 +515,9 @@ UInt64 FileCache::getEstimatedSizeOfFileType(FileSegment::FileType file_type) FileType FileCache::getFileType(const String & fname) { std::filesystem::path p(fname); + auto ext = p.extension(); - if (ext.empty()) - { - return p.stem() == DM::DMFileMetaV2::metaFileName() ? FileType::Meta : FileType::Unknow; - } - else if (ext == ".merged") + if (ext == ".merged") { return FileType::Merged; } @@ -536,10 +533,17 @@ FileType FileCache::getFileType(const String & fname) { return getFileTypeOfColData(p.stem()); } - else + else if (ext == ".meta") { - return FileType::Unknow; + // Example: v1.meta + return FileType::Meta; } + else if (ext.empty() && p.stem() == "meta") + { + return FileType::Meta; + } + + return FileType::Unknow; } bool FileCache::finalizeReservedSize(FileType reserve_for, UInt64 reserved_size, UInt64 content_length) diff --git a/dbms/src/Storages/S3/S3Filename.cpp b/dbms/src/Storages/S3/S3Filename.cpp index 6e4040e3df5..77e34cdd070 100644 --- a/dbms/src/Storages/S3/S3Filename.cpp +++ b/dbms/src/Storages/S3/S3Filename.cpp @@ -71,7 +71,7 @@ constexpr static std::string_view fmt_lock_prefix = "lock/"; constexpr static std::string_view fmt_lock_datafile_prefix = "lock/s{store_id}/{subpath}.lock_"; constexpr static std::string_view fmt_lock_file = "lock/s{store_id}/{subpath}.lock_s{lock_store}_{lock_seq}"; -// If you want to read/write S3 object as file throught `FileProvider`, file path must starts with `s3_filename_prefix`. +// If you want to read/write S3 object as file throught `FileProvider`, file path must starts with `s3_filename_prefix`. constexpr static std::string_view s3_filename_prefix = "s3://"; // clang-format on diff --git a/dbms/src/Storages/S3/tests/gtest_s3file.cpp b/dbms/src/Storages/S3/tests/gtest_s3file.cpp index 23e36b93ae1..e56677aa9dc 100644 --- a/dbms/src/Storages/S3/tests/gtest_s3file.cpp +++ b/dbms/src/Storages/S3/tests/gtest_s3file.cpp @@ -195,7 +195,7 @@ class S3FileTest DMFilePtr restoreDMFile(const DMFileOID & oid) { - return data_store->prepareDMFile(oid)->restore(DMFileMeta::ReadMode::all()); + return data_store->prepareDMFile(oid)->restore(DMFileMeta::ReadMode::all(), /* meta_version= */ 0); } LoggerPtr log;