From 1b952e7aca6da54525e58affdaa057799c413225 Mon Sep 17 00:00:00 2001 From: yibin Date: Mon, 6 Feb 2023 16:47:57 +0800 Subject: [PATCH 1/2] Fix gtest false negative case (#6737) close pingcap/tiflash#6733 --- .../Flash/Coprocessor/StreamingDAGResponseWriter.cpp | 12 ++++++------ dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp | 6 +++--- dbms/src/Flash/Mpp/HashPartitionWriter.cpp | 6 +++--- dbms/src/Flash/Mpp/MPPTunnelSetHelper.cpp | 6 +++--- dbms/src/TestUtils/FunctionTestUtils.h | 2 +- 5 files changed, 16 insertions(+), 16 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp b/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp index 9fc78cfeb1c..feb50a17ada 100644 --- a/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp +++ b/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp @@ -96,21 +96,20 @@ void StreamingDAGResponseWriter::encodeThenWriteBlocks() if (dag_context.encode_type == tipb::EncodeType::TypeCHBlock) { /// passthrough data to a non-TiFlash node, like sending data to TiSpark - while (!blocks.empty()) + for (auto & block : blocks) { - const auto & block = blocks.back(); chunk_codec_stream->encode(block, 0, block.rows()); - blocks.pop_back(); + block.clear(); response.addChunk(chunk_codec_stream->getString()); chunk_codec_stream->clear(); } + blocks.clear(); } else /// passthrough data to a TiDB node { Int64 current_records_num = 0; - while (!blocks.empty()) + for (auto & block : blocks) { - const auto & block = blocks.back(); size_t rows = block.rows(); for (size_t row_index = 0; row_index < rows;) { @@ -125,8 +124,9 @@ void StreamingDAGResponseWriter::encodeThenWriteBlocks() current_records_num += (upper - row_index); row_index = upper; } - blocks.pop_back(); + block.clear(); } + blocks.clear(); if (current_records_num > 0) { diff --git a/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp b/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp index b04851f8e30..f24d59942fd 100644 --- a/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp +++ b/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp @@ -143,17 +143,17 @@ void FineGrainedShuffleWriter::batchWriteFineGrainedShuffleIm assert(fine_grained_shuffle_stream_count <= 1024); HashBaseWriterHelper::materializeBlocks(blocks); - while (!blocks.empty()) + for (auto & block : blocks) { - const auto & block = blocks.back(); if constexpr (version != MPPDataPacketV0) { // check schema assertBlockSchema(expected_types, block, FineGrainedShuffleWriterLabels[MPPDataPacketV1]); } HashBaseWriterHelper::scatterColumnsForFineGrainedShuffle(block, partition_col_ids, collators, partition_key_containers_for_reuse, partition_num, fine_grained_shuffle_stream_count, hash, selector, scattered); - blocks.pop_back(); + block.clear(); } + blocks.clear(); // serialize each partitioned block and write it to its destination size_t part_id = 0; diff --git a/dbms/src/Flash/Mpp/HashPartitionWriter.cpp b/dbms/src/Flash/Mpp/HashPartitionWriter.cpp index 18e87bdc1c2..a2f5c9c1c2d 100644 --- a/dbms/src/Flash/Mpp/HashPartitionWriter.cpp +++ b/dbms/src/Flash/Mpp/HashPartitionWriter.cpp @@ -154,16 +154,15 @@ void HashPartitionWriter::partitionAndWriteBlocksV1() std::vector> dest_columns(partition_num); size_t total_rows = 0; - while (!blocks.empty()) + for (auto & block : blocks) { - const auto & block = blocks.back(); { // check schema assertBlockSchema(expected_types, block, HashPartitionWriterLabels[MPPDataPacketV1]); } auto && dest_tbl_cols = HashBaseWriterHelper::createDestColumns(block, partition_num); HashBaseWriterHelper::scatterColumns(block, partition_col_ids, collators, partition_key_containers, partition_num, dest_tbl_cols); - blocks.pop_back(); + block.clear(); for (size_t part_id = 0; part_id < partition_num; ++part_id) { @@ -175,6 +174,7 @@ void HashPartitionWriter::partitionAndWriteBlocksV1() dest_columns[part_id].emplace_back(std::move(columns)); } } + blocks.clear(); RUNTIME_CHECK(rows_in_blocks, total_rows); for (size_t part_id = 0; part_id < partition_num; ++part_id) diff --git a/dbms/src/Flash/Mpp/MPPTunnelSetHelper.cpp b/dbms/src/Flash/Mpp/MPPTunnelSetHelper.cpp index eb65327a7f8..f1bd57a8d4b 100644 --- a/dbms/src/Flash/Mpp/MPPTunnelSetHelper.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnelSetHelper.cpp @@ -50,14 +50,14 @@ TrackedMppDataPacketPtr ToPacketV0(Blocks & blocks, const std::vector(MPPDataPacketV0); - while (!blocks.empty()) + for (auto & block : blocks) { - const auto & block = blocks.back(); codec_stream->encode(block, 0, block.rows()); - blocks.pop_back(); + block.clear(); tracked_packet->addChunk(codec_stream->getString()); codec_stream->clear(); } + blocks.clear(); return tracked_packet; } diff --git a/dbms/src/TestUtils/FunctionTestUtils.h b/dbms/src/TestUtils/FunctionTestUtils.h index 37e0f9783ca..508b06ae102 100644 --- a/dbms/src/TestUtils/FunctionTestUtils.h +++ b/dbms/src/TestUtils/FunctionTestUtils.h @@ -835,7 +835,7 @@ class FunctionTest : public ::testing::Test }; #define ASSERT_COLUMN_EQ(expected, actual) ASSERT_TRUE(DB::tests::columnEqual((expected), (actual))) -#define ASSERT_BLOCK_EQ(expected, actual) DB::tests::blockEqual((expected), (actual)) +#define ASSERT_BLOCK_EQ(expected, actual) ASSERT_TRUE(DB::tests::blockEqual((expected), (actual))) /// restrictly checking columns equality, both data set and each row's offset should be the same #define ASSERT_COLUMNS_EQ_R(expected, actual) ASSERT_TRUE(DB::tests::columnsEqual((expected), (actual), true)) From 12b69013bb65c0339d5da0881c159f6e622854ce Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Mon, 6 Feb 2023 19:01:57 +0800 Subject: [PATCH 2/2] Remove related code about single file mode in DMFile (#6704) ref pingcap/tiflash#6233, ref pingcap/tiflash#6426 --- dbms/src/Common/FailPoint.cpp | 1 - dbms/src/DataStreams/MarkInCompressedFile.h | 32 +- dbms/src/Debug/MockRaftStoreProxy.cpp | 5 - dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp | 2 - dbms/src/Interpreters/Settings.h | 1 - dbms/src/Server/DTTool/DTToolBench.cpp | 2 +- dbms/src/Server/DTTool/DTToolInspect.cpp | 62 ++-- dbms/src/Server/DTTool/DTToolMigrate.cpp | 2 +- dbms/src/Server/RaftConfigParser.cpp | 35 +- dbms/src/Server/RaftConfigParser.h | 1 - dbms/src/Server/tests/gtest_dttool.cpp | 2 +- .../Storages/DeltaMerge/DMChecksumConfig.cpp | 10 +- .../Storages/DeltaMerge/DMChecksumConfig.h | 2 +- dbms/src/Storages/DeltaMerge/DMContext.h | 4 +- dbms/src/Storages/DeltaMerge/File/DMFile.cpp | 275 ++++---------- dbms/src/Storages/DeltaMerge/File/DMFile.h | 47 +-- .../DeltaMerge/File/DMFileBlockOutputStream.h | 8 +- .../DeltaMerge/File/DMFilePackFilter.h | 2 - .../Storages/DeltaMerge/File/DMFileReader.cpp | 122 ++----- .../Storages/DeltaMerge/File/DMFileReader.h | 11 +- .../Storages/DeltaMerge/File/DMFileWriter.cpp | 344 +++++------------- .../Storages/DeltaMerge/File/DMFileWriter.h | 88 +---- .../SSTFilesToDTFilesOutputStream.cpp | 19 +- .../SSTFilesToDTFilesOutputStream.h | 2 - dbms/src/Storages/DeltaMerge/Segment.cpp | 12 +- .../DeltaMerge/tests/gtest_dm_column_file.cpp | 2 +- .../gtest_dm_delta_merge_store_test_basic.h | 9 +- .../tests/gtest_dm_delta_value_space.cpp | 5 +- .../DeltaMerge/tests/gtest_dm_file.cpp | 38 +- .../DeltaMerge/tests/gtest_dm_segment.cpp | 8 +- .../tests/gtest_dm_simple_pk_test_basic.cpp | 6 +- .../tests/gtest_segment_replace_data.cpp | 6 +- .../tests/gtest_segment_test_basic.cpp | 12 +- .../tests/gtest_sst_files_stream.cpp | 8 - dbms/src/Storages/StorageDeltaMerge.h | 4 +- .../Storages/Transaction/ApplySnapshot.cpp | 18 - dbms/src/Storages/Transaction/KVStore.cpp | 3 +- dbms/src/Storages/Transaction/KVStore.h | 6 +- dbms/src/Storages/Transaction/ProxyFFI.cpp | 18 +- .../Storages/Transaction/StorageEngineType.h | 22 -- dbms/src/Storages/Transaction/TMTContext.cpp | 2 +- .../Transaction/tests/gtest_kvstore.cpp | 7 - .../Transaction/tests/kvstore_helper.h | 2 +- 43 files changed, 293 insertions(+), 974 deletions(-) diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index af93d4c9cd3..9a79fd334d5 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -58,7 +58,6 @@ std::unordered_map> FailPointHelper::f M(exception_during_mpp_root_task_run) \ M(exception_during_write_to_storage) \ M(force_set_sst_to_dtfile_block_size) \ - M(force_set_sst_decode_rand) \ M(exception_before_page_file_write_sync) \ M(force_set_segment_ingest_packs_fail) \ M(segment_merge_after_ingest_packs) \ diff --git a/dbms/src/DataStreams/MarkInCompressedFile.h b/dbms/src/DataStreams/MarkInCompressedFile.h index ee7917dc509..0d1ae8314df 100644 --- a/dbms/src/DataStreams/MarkInCompressedFile.h +++ b/dbms/src/DataStreams/MarkInCompressedFile.h @@ -14,11 +14,11 @@ #pragma once -#include - +#include #include #include -#include + +#include namespace DB @@ -50,28 +50,4 @@ struct MarkInCompressedFile using MarksInCompressedFile = PODArray; using MarksInCompressedFilePtr = std::shared_ptr; - -struct MarkWithSizeInCompressedFile -{ - MarkInCompressedFile mark; - size_t mark_size; - - bool operator==(const MarkWithSizeInCompressedFile & rhs) const - { - return std::tie(mark, mark_size) == std::tie(rhs.mark, rhs.mark_size); - } - bool operator!=(const MarkWithSizeInCompressedFile & rhs) const - { - return !(*this == rhs); - } - - String toString() const - { - return "(" + mark.toString() + "," + DB::toString(mark_size) + ")"; - } -}; - -using MarkWithSizesInCompressedFile = PODArray; -using MarkWithSizesInCompressedFilePtr = std::shared_ptr; - -} +} // namespace DB diff --git a/dbms/src/Debug/MockRaftStoreProxy.cpp b/dbms/src/Debug/MockRaftStoreProxy.cpp index 727b2a2d4c7..17c52d9029e 100644 --- a/dbms/src/Debug/MockRaftStoreProxy.cpp +++ b/dbms/src/Debug/MockRaftStoreProxy.cpp @@ -623,11 +623,6 @@ void MockRaftStoreProxy::snapshot( // The new entry is committed on Proxy's side. region->updateCommitIndex(index); - auto ori_snapshot_apply_method = kvs.snapshot_apply_method; - kvs.snapshot_apply_method = TiDB::SnapshotApplyMethod::DTFile_Single; - SCOPE_EXIT({ - kvs.snapshot_apply_method = ori_snapshot_apply_method; - }); std::vector ssts; for (auto & cf : cfs) { diff --git a/dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp b/dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp index 2100f31bb60..c0ab09c73ac 100644 --- a/dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp +++ b/dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp @@ -46,7 +46,6 @@ namespace DB namespace FailPoints { extern const char force_set_sst_to_dtfile_block_size[]; -extern const char force_set_sst_decode_rand[]; extern const char force_set_safepoint_when_decode_block[]; } // namespace FailPoints @@ -422,7 +421,6 @@ void MockRaftCommand::dbgFuncIngestSST(Context & context, const ASTs & args, DBG auto & kvstore = tmt.getKVStore(); auto region = kvstore->getRegion(region_id); - FailPointHelper::enableFailPoint(FailPoints::force_set_sst_decode_rand); // Register some mock SST reading methods so that we can decode data in `MockSSTReader::MockSSTData` RegionMockTest mock_test(kvstore.get(), region); diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 050bdacda7f..a22f97a4b5e 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -219,7 +219,6 @@ struct Settings M(SettingBool, dt_enable_relevant_place, false, "Enable relevant place or not in DeltaTree Engine.") \ M(SettingBool, dt_enable_skippable_place, true, "Enable skippable place or not in DeltaTree Engine.") \ M(SettingBool, dt_enable_stable_column_cache, true, "Enable column cache for StorageDeltaMerge.") \ - M(SettingBool, dt_enable_single_file_mode_dmfile, false, "Enable write DMFile in single file mode.") \ M(SettingUInt64, dt_open_file_max_idle_seconds, 15, "Max idle time of opening files, 0 means infinite.") \ M(SettingUInt64, dt_page_num_max_expect_legacy_files, 100, "Max number of legacy file expected") \ M(SettingFloat, dt_page_num_max_gc_valid_rate, 1.0, "Max valid rate of deciding a page file can be compact when exising legacy files are more over than " \ diff --git a/dbms/src/Server/DTTool/DTToolBench.cpp b/dbms/src/Server/DTTool/DTToolBench.cpp index 1bec02786ae..130e231010f 100644 --- a/dbms/src/Server/DTTool/DTToolBench.cpp +++ b/dbms/src/Server/DTTool/DTToolBench.cpp @@ -356,7 +356,7 @@ int benchEntry(const std::vector & opts) for (size_t i = 0; i < repeat; ++i) { using namespace std::chrono; - dmfile = DB::DM::DMFile::create(1, workdir, false, opt); + dmfile = DB::DM::DMFile::create(1, workdir, opt); auto start = high_resolution_clock::now(); { auto stream = DB::DM::DMFileBlockOutputStream(*db_context, dmfile, *defines); diff --git a/dbms/src/Server/DTTool/DTToolInspect.cpp b/dbms/src/Server/DTTool/DTToolInspect.cpp index c74c0c2994d..d8447a61ede 100644 --- a/dbms/src/Server/DTTool/DTToolInspect.cpp +++ b/dbms/src/Server/DTTool/DTToolInspect.cpp @@ -47,7 +47,6 @@ int inspectServiceMain(DB::Context & context, const InspectArgs & args) auto dmfile = DB::DM::DMFile::restore(fp, args.file_id, 0, args.workdir, DB::DM::DMFile::ReadMetaMode::all()); LOG_INFO(logger, "bytes on disk: {}", dmfile->getBytesOnDisk()); - LOG_INFO(logger, "single file: {}", dmfile->isSingleFileMode()); // if the DMFile has a config file, there may be additional debugging information // we also log the content of dmfile checksum config @@ -81,43 +80,38 @@ int inspectServiceMain(DB::Context & context, const InspectArgs & args) if (args.check) { // for directory mode file, we can consume each file to check its integrity. - if (!dmfile->isSingleFileMode()) + auto prefix = fmt::format("{}/dmf_{}", args.workdir, args.file_id); + auto file = Poco::File{prefix}; + std::vector sub; + file.list(sub); + for (auto & i : sub) { - auto prefix = args.workdir + "/dmf_" + DB::toString(args.file_id); - auto file = Poco::File{prefix}; - std::vector sub; - file.list(sub); - for (auto & i : sub) + if (endsWith(i, ".mrk") || endsWith(i, ".dat") || endsWith(i, ".idx") || i == "pack") { - if (endsWith(i, ".mrk") || endsWith(i, ".dat") || endsWith(i, ".idx") || i == "pack") + auto full_path = fmt::format("{}/{}", prefix, i); + LOG_INFO(logger, "checking full_path is {}: ", full_path); + if (dmfile->getConfiguration()) { - auto full_path = prefix; - full_path += "/"; - full_path += i; - LOG_INFO(logger, "checking {}: ", i); - if (dmfile->getConfiguration()) - { - consume(*DB::createReadBufferFromFileBaseByFileProvider( - fp, - full_path, - DB::EncryptionPath(full_path, i), - dmfile->getConfiguration()->getChecksumFrameLength(), - nullptr, - dmfile->getConfiguration()->getChecksumAlgorithm(), - dmfile->getConfiguration()->getChecksumFrameLength())); - } - else - { - consume(*DB::createReadBufferFromFileBaseByFileProvider( - fp, - full_path, - DB::EncryptionPath(full_path, i), - DBMS_DEFAULT_BUFFER_SIZE, - 0, - nullptr)); - } - LOG_INFO(logger, "[success]"); + consume(*DB::createReadBufferFromFileBaseByFileProvider( + fp, + full_path, + DB::EncryptionPath(full_path, i), + dmfile->getConfiguration()->getChecksumFrameLength(), + nullptr, + dmfile->getConfiguration()->getChecksumAlgorithm(), + dmfile->getConfiguration()->getChecksumFrameLength())); } + else + { + consume(*DB::createReadBufferFromFileBaseByFileProvider( + fp, + full_path, + DB::EncryptionPath(full_path, i), + DBMS_DEFAULT_BUFFER_SIZE, + 0, + nullptr)); + } + LOG_INFO(logger, "[success]"); } } // for both directory file and single mode file, we can read out all blocks from the file. diff --git a/dbms/src/Server/DTTool/DTToolMigrate.cpp b/dbms/src/Server/DTTool/DTToolMigrate.cpp index 77a628275ef..fabcabaca1f 100644 --- a/dbms/src/Server/DTTool/DTToolMigrate.cpp +++ b/dbms/src/Server/DTTool/DTToolMigrate.cpp @@ -209,7 +209,7 @@ int migrateServiceMain(DB::Context & context, const MigrateArgs & args) } LOG_INFO(logger, "creating new dtfile"); - auto new_file = DB::DM::DMFile::create(args.file_id, keeper.migration_temp_dir.path(), false, std::move(option)); + auto new_file = DB::DM::DMFile::create(args.file_id, keeper.migration_temp_dir.path(), std::move(option)); LOG_INFO(logger, "creating input stream"); auto input_stream = DB::DM::createSimpleBlockInputStream(context, src_file); diff --git a/dbms/src/Server/RaftConfigParser.cpp b/dbms/src/Server/RaftConfigParser.cpp index b8cfe1ab3bf..4ebade882fc 100644 --- a/dbms/src/Server/RaftConfigParser.cpp +++ b/dbms/src/Server/RaftConfigParser.cpp @@ -88,40 +88,7 @@ TiFlashRaftConfig TiFlashRaftConfig::parseSettings(Poco::Util::AbstractConfigura res.enable_compatible_mode = config.getBool("raft.enable_compatible_mode"); } - if (config.has("raft.snapshot.method")) - { - String snapshot_method = config.getString("raft.snapshot.method"); - std::transform(snapshot_method.begin(), snapshot_method.end(), snapshot_method.begin(), [](char ch) { return std::tolower(ch); }); - if (snapshot_method == "file1") - { - res.snapshot_apply_method = TiDB::SnapshotApplyMethod::DTFile_Directory; - } -#if 0 - // Not generally available for this file format - else if (snapshot_method == "file2") - { - res.snapshot_apply_method = TiDB::SnapshotApplyMethod::DTFile_Single; - } -#endif - } - switch (res.snapshot_apply_method) - { - case TiDB::SnapshotApplyMethod::DTFile_Directory: - case TiDB::SnapshotApplyMethod::DTFile_Single: - if (res.engine != TiDB::StorageEngine::DT) - { - throw Exception( - fmt::format("Illegal arguments: can not use DTFile to store snapshot data when the storage engine is not DeltaTree, [engine={}] [snapshot method={}]", - static_cast(res.engine), - applyMethodToString(res.snapshot_apply_method)), - ErrorCodes::INVALID_CONFIG_PARAMETER); - } - break; - default: - break; - } - - LOG_INFO(log, "Default storage engine [type={}] [snapshot.method={}]", static_cast(res.engine), applyMethodToString(res.snapshot_apply_method)); + LOG_INFO(log, "Default storage engine [type={}]", static_cast(res.engine)); return res; } diff --git a/dbms/src/Server/RaftConfigParser.h b/dbms/src/Server/RaftConfigParser.h index 604a2476c44..34900af7e66 100644 --- a/dbms/src/Server/RaftConfigParser.h +++ b/dbms/src/Server/RaftConfigParser.h @@ -47,7 +47,6 @@ struct TiFlashRaftConfig static constexpr TiDB::StorageEngine DEFAULT_ENGINE = TiDB::StorageEngine::DT; TiDB::StorageEngine engine = DEFAULT_ENGINE; - TiDB::SnapshotApplyMethod snapshot_apply_method = TiDB::SnapshotApplyMethod::DTFile_Directory; public: TiFlashRaftConfig() = default; diff --git a/dbms/src/Server/tests/gtest_dttool.cpp b/dbms/src/Server/tests/gtest_dttool.cpp index cfe6a0de071..5ce2cd98686 100644 --- a/dbms/src/Server/tests/gtest_dttool.cpp +++ b/dbms/src/Server/tests/gtest_dttool.cpp @@ -85,7 +85,7 @@ struct DTToolTest : public DB::base::TiFlashStorageTestBasic db_context->getSettingsRef()); // Write { - dmfile = DB::DM::DMFile::create(1, getTemporaryPath(), false, std::nullopt); + dmfile = DB::DM::DMFile::create(1, getTemporaryPath(), std::nullopt); { auto stream = DB::DM::DMFileBlockOutputStream(*db_context, dmfile, *defines); stream.writePrefix(); diff --git a/dbms/src/Storages/DeltaMerge/DMChecksumConfig.cpp b/dbms/src/Storages/DeltaMerge/DMChecksumConfig.cpp index fdb969c4aa2..68c29493114 100644 --- a/dbms/src/Storages/DeltaMerge/DMChecksumConfig.cpp +++ b/dbms/src/Storages/DeltaMerge/DMChecksumConfig.cpp @@ -25,8 +25,6 @@ namespace DB::DM { DMChecksumConfig::DMChecksumConfig(std::istream & input) - : embedded_checksum() - , debug_info() { dtpb::ChecksumConfig configuration; if (unlikely(!configuration.ParseFromIstream(&input))) @@ -102,7 +100,7 @@ std::ostream & operator<<(std::ostream & output, const DMChecksumConfig & config { digest->update(name.data(), name.length()); digest->update(checksum.data(), checksum.length()); - auto embedded_checksum = configuration.add_embedded_checksum(); + auto * embedded_checksum = configuration.add_embedded_checksum(); embedded_checksum->set_name(name); embedded_checksum->set_checksum(checksum); } @@ -113,7 +111,7 @@ std::ostream & operator<<(std::ostream & output, const DMChecksumConfig & config { for (const auto & [name, content] : config.debug_info) { - auto tmp = configuration.add_debug_info(); + auto * tmp = configuration.add_debug_info(); tmp->set_name(name); tmp->set_content(content); } @@ -127,9 +125,9 @@ std::ostream & operator<<(std::ostream & output, const DMChecksumConfig & config return output; } -std::optional DMChecksumConfig::fromDBContext(const Context & context, bool is_single_file) +std::optional DMChecksumConfig::fromDBContext(const Context & context) { - return !is_single_file && STORAGE_FORMAT_CURRENT.dm_file >= DMFileFormat::V2 + return STORAGE_FORMAT_CURRENT.dm_file >= DMFileFormat::V2 ? std::make_optional(DMChecksumConfig{context}) : std::nullopt; }; diff --git a/dbms/src/Storages/DeltaMerge/DMChecksumConfig.h b/dbms/src/Storages/DeltaMerge/DMChecksumConfig.h index b151c15e3fa..6a968ca9d12 100644 --- a/dbms/src/Storages/DeltaMerge/DMChecksumConfig.h +++ b/dbms/src/Storages/DeltaMerge/DMChecksumConfig.h @@ -88,7 +88,7 @@ class DMChecksumConfig } } - [[maybe_unused]] static std::optional fromDBContext(const DB::Context & context, bool is_single_file); + [[maybe_unused]] static std::optional fromDBContext(const DB::Context & context); private: size_t checksum_frame_length; ///< the length of checksum frame diff --git a/dbms/src/Storages/DeltaMerge/DMContext.h b/dbms/src/Storages/DeltaMerge/DMContext.h index 371fc9ab5d0..cad1e5adc98 100644 --- a/dbms/src/Storages/DeltaMerge/DMContext.h +++ b/dbms/src/Storages/DeltaMerge/DMContext.h @@ -129,9 +129,9 @@ struct DMContext : private boost::noncopyable WriteLimiterPtr getWriteLimiter() const { return db_context.getWriteLimiter(); } ReadLimiterPtr getReadLimiter() const { return db_context.getReadLimiter(); } - DM::DMConfigurationOpt createChecksumConfig(bool is_single_file) const + DM::DMConfigurationOpt createChecksumConfig() const { - return DMChecksumConfig::fromDBContext(db_context, is_single_file); + return DMChecksumConfig::fromDBContext(db_context); } }; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp index 674f66c05fb..c3dee70e21a 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp @@ -55,9 +55,9 @@ inline constexpr static const char * DATA_FILE_SUFFIX = ".dat"; inline constexpr static const char * INDEX_FILE_SUFFIX = ".idx"; inline constexpr static const char * MARK_FILE_SUFFIX = ".mrk"; -inline String getNGCPath(const String & prefix, bool is_single_mode) +inline String getNGCPath(const String & prefix) { - return prefix + (is_single_mode ? "." : "/") + NGC_FILE_NAME; + return prefix + "/" + NGC_FILE_NAME; } } // namespace details @@ -83,9 +83,9 @@ String DMFile::getPathByStatus(const String & parent_path, UInt64 file_id, DMFil return s; } -String DMFile::getNGCPath(const String & parent_path, UInt64 file_id, DMFile::Status status, bool is_single_mode) +String DMFile::getNGCPath(const String & parent_path, UInt64 file_id, DMFile::Status status) { - return details::getNGCPath(getPathByStatus(parent_path, file_id, status), is_single_mode); + return details::getNGCPath(getPathByStatus(parent_path, file_id, status)); } // @@ -97,17 +97,16 @@ String DMFile::path() const String DMFile::ngcPath() const { - return getNGCPath(parent_path, file_id, status, isSingleFileMode()); + return getNGCPath(parent_path, file_id, status); } -DMFilePtr DMFile::create(UInt64 file_id, const String & parent_path, bool single_file_mode, DMConfigurationOpt configuration) +DMFilePtr DMFile::create(UInt64 file_id, const String & parent_path, DMConfigurationOpt configuration) { Poco::Logger * log = &Poco::Logger::get("DMFile"); // On create, ref_id is the same as file_id. DMFilePtr new_dmfile(new DMFile(file_id, file_id, parent_path, - single_file_mode ? Mode::SINGLE_FILE : Mode::FOLDER, Status::WRITABLE, log, std::move(configuration))); @@ -119,26 +118,13 @@ DMFilePtr DMFile::create(UInt64 file_id, const String & parent_path, bool single file.remove(true); LOG_WARNING(log, "Existing dmfile, removed: {}", path); } - if (single_file_mode) - { - Poco::File parent(parent_path); - parent.createDirectories(); - // Create a mark file to stop this dmfile from being removed by GC. - // We should create NGC file before creating the file under single file mode, - // or the file may be removed. - // FIXME : this should not use PageUtils. - PageUtil::touchFile(new_dmfile->ngcPath()); - PageUtil::touchFile(path); - } - else - { - file.createDirectories(); - // Create a mark file to stop this dmfile from being removed by GC. - // We should create NGC file after creating the directory under folder mode - // since the NGC file is a file under the folder. - // FIXME : this should not use PageUtils. - PageUtil::touchFile(new_dmfile->ngcPath()); - } + + file.createDirectories(); + // Create a mark file to stop this dmfile from being removed by GC. + // We should create NGC file after creating the directory under folder mode + // since the NGC file is a file under the folder. + // FIXME : this should not use PageUtils. + PageUtil::touchFile(new_dmfile->ngcPath()); return new_dmfile; } @@ -156,20 +142,15 @@ DMFilePtr DMFile::restore( if (!poco_file.exists()) return nullptr; - bool single_file_mode = poco_file.isFile(); DMFilePtr dmfile(new DMFile( file_id, page_id, parent_path, - single_file_mode ? Mode::SINGLE_FILE : Mode::FOLDER, Status::READABLE, &Poco::Logger::get("DMFile"))); if (!read_meta_mode.isNone()) { - if (!single_file_mode) - { - dmfile->readConfiguration(file_provider); - } + dmfile->readConfiguration(file_provider); dmfile->readMetadata(file_provider, read_meta_mode); } return dmfile; @@ -177,39 +158,17 @@ DMFilePtr DMFile::restore( String DMFile::colIndexCacheKey(const FileNameBase & file_name_base) const { - if (isSingleFileMode()) - { - return path() + "/" + DMFile::colIndexFileName(file_name_base); - } - else - { - return colIndexPath(file_name_base); - } + return colIndexPath(file_name_base); } String DMFile::colMarkCacheKey(const FileNameBase & file_name_base) const { - if (isSingleFileMode()) - { - return path() + "/" + DMFile::colMarkFileName(file_name_base); - } - else - { - return colMarkPath(file_name_base); - } + return colMarkPath(file_name_base); } bool DMFile::isColIndexExist(const ColId & col_id) const { - if (isSingleFileMode()) - { - const auto index_identifier = DMFile::colIndexFileName(DMFile::getFileNameBase(col_id)); - return isSubFileExists(index_identifier); - } - else - { - return column_indices.count(col_id) != 0; - } + return column_indices.count(col_id) != 0; } String DMFile::encryptionBasePath() const @@ -220,37 +179,37 @@ String DMFile::encryptionBasePath() const EncryptionPath DMFile::encryptionDataPath(const FileNameBase & file_name_base) const { - return EncryptionPath(encryptionBasePath(), isSingleFileMode() ? "" : file_name_base + details::DATA_FILE_SUFFIX); + return EncryptionPath(encryptionBasePath(), file_name_base + details::DATA_FILE_SUFFIX); } EncryptionPath DMFile::encryptionIndexPath(const FileNameBase & file_name_base) const { - return EncryptionPath(encryptionBasePath(), isSingleFileMode() ? "" : file_name_base + details::INDEX_FILE_SUFFIX); + return EncryptionPath(encryptionBasePath(), file_name_base + details::INDEX_FILE_SUFFIX); } EncryptionPath DMFile::encryptionMarkPath(const FileNameBase & file_name_base) const { - return EncryptionPath(encryptionBasePath(), isSingleFileMode() ? "" : file_name_base + details::MARK_FILE_SUFFIX); + return EncryptionPath(encryptionBasePath(), file_name_base + details::MARK_FILE_SUFFIX); } EncryptionPath DMFile::encryptionMetaPath() const { - return EncryptionPath(encryptionBasePath(), isSingleFileMode() ? "" : metaFileName()); + return EncryptionPath(encryptionBasePath(), metaFileName()); } EncryptionPath DMFile::encryptionPackStatPath() const { - return EncryptionPath(encryptionBasePath(), isSingleFileMode() ? "" : packStatFileName()); + return EncryptionPath(encryptionBasePath(), packStatFileName()); } EncryptionPath DMFile::encryptionPackPropertyPath() const { - return EncryptionPath(encryptionBasePath(), isSingleFileMode() ? "" : packPropertyFileName()); + return EncryptionPath(encryptionBasePath(), packPropertyFileName()); } EncryptionPath DMFile::encryptionConfigurationPath() const { - return EncryptionPath(encryptionBasePath(), isSingleFileMode() ? "" : configurationFileName()); + return EncryptionPath(encryptionBasePath(), configurationFileName()); } String DMFile::colDataFileName(const FileNameBase & file_name_base) @@ -383,10 +342,6 @@ void DMFile::writeMetadata(const FileProviderPtr & file_provider, const WriteLim void DMFile::upgradeMetaIfNeed(const FileProviderPtr & file_provider, DMFileFormat::Version ver) { - if (unlikely(mode != Mode::FOLDER)) - { - throw DB::TiFlashException("upgradeMetaIfNeed is only expected to be called when mode is FOLDER.", Errors::DeltaTree::Internal); - } if (unlikely(ver == DMFileFormat::V0)) { // Update ColumnStat.serialized_bytes @@ -450,17 +405,13 @@ void DMFile::readColumnStat(const FileProviderPtr & file_provider, const MetaPac assertString("\n", *buf); readText(column_stats, ver, *buf); - // No need to upgrade meta when mode is Mode::SINGLE_FILE - if (mode == Mode::FOLDER) + // for V2, we do not apply in-place upgrade for now + // but it should not affect the normal read procedure + if (unlikely(ver >= DMFileFormat::V2 && !configuration)) { - // for V2, we do not apply in-place upgrade for now - // but it should not affect the normal read procedure - if (unlikely(ver >= DMFileFormat::V2 && !configuration)) - { - throw TiFlashException("configuration expected but not loaded", Errors::Checksum::Missing); - } - upgradeMetaIfNeed(file_provider, ver); + throw TiFlashException("configuration expected but not loaded", Errors::Checksum::Missing); } + upgradeMetaIfNeed(file_provider, ver); } void DMFile::readPackStat(const FileProviderPtr & file_provider, const MetaPackInfo & meta_pack_info) @@ -543,61 +494,30 @@ void DMFile::readPackProperty(const FileProviderPtr & file_provider, const MetaP void DMFile::readMetadata(const FileProviderPtr & file_provider, const ReadMetaMode & read_meta_mode) { Footer footer; - if (isSingleFileMode()) + + if (read_meta_mode.isAll()) { - // Read the `Footer` part from disk and init `sub_file_stat` - /// TODO: Redesign the file format for single file mode (https://github.com/pingcap/tics/issues/1798) - Poco::File file(path()); - ReadBufferFromFileProvider buf(file_provider, path(), EncryptionPath(encryptionBasePath(), "")); - - buf.seek(file.getSize() - sizeof(Footer), SEEK_SET); - DB::readIntBinary(footer.meta_pack_info.pack_property_offset, buf); - DB::readIntBinary(footer.meta_pack_info.pack_property_size, buf); - DB::readIntBinary(footer.meta_pack_info.column_stat_offset, buf); - DB::readIntBinary(footer.meta_pack_info.column_stat_size, buf); - DB::readIntBinary(footer.meta_pack_info.pack_stat_offset, buf); - DB::readIntBinary(footer.meta_pack_info.pack_stat_size, buf); - DB::readIntBinary(footer.sub_file_stat_offset, buf); - DB::readIntBinary(footer.sub_file_num, buf); - // initialize sub file state - buf.seek(footer.sub_file_stat_offset, SEEK_SET); - SubFileStat sub_file_stat{}; - for (UInt32 i = 0; i < footer.sub_file_num; i++) - { - String name; - DB::readStringBinary(name, buf); - DB::readIntBinary(sub_file_stat.offset, buf); - DB::readIntBinary(sub_file_stat.size, buf); - sub_file_stats.emplace(name, sub_file_stat); - } + initializeIndices(); } - else - { - if (read_meta_mode.isAll()) + if (auto file = Poco::File(packPropertyPath()); file.exists()) + footer.meta_pack_info.pack_property_size = file.getSize(); + + auto recheck = [&](size_t size) { + if (this->configuration) { - initializeSubFileStatsForFolderMode(); - initializeIndices(); + auto total_size = this->configuration->getChecksumFrameLength() + this->configuration->getChecksumHeaderLength(); + auto frame_count = size / total_size + + (0 != size % total_size); + size -= frame_count * this->configuration->getChecksumHeaderLength(); } - if (auto file = Poco::File(packPropertyPath()); file.exists()) - footer.meta_pack_info.pack_property_size = file.getSize(); - - auto recheck = [&](size_t size) { - if (this->configuration) - { - auto total_size = this->configuration->getChecksumFrameLength() + this->configuration->getChecksumHeaderLength(); - auto frame_count = size / total_size - + (0 != size % total_size); - size -= frame_count * this->configuration->getChecksumHeaderLength(); - } - return size; - }; + return size; + }; - if (auto file = Poco::File(packPropertyPath()); file.exists()) - footer.meta_pack_info.pack_property_size = file.getSize(); + if (auto file = Poco::File(packPropertyPath()); file.exists()) + footer.meta_pack_info.pack_property_size = file.getSize(); - footer.meta_pack_info.column_stat_size = Poco::File(metaPath()).getSize(); - footer.meta_pack_info.pack_stat_size = recheck(Poco::File(packStatPath()).getSize()); - } + footer.meta_pack_info.column_stat_size = Poco::File(metaPath()).getSize(); + footer.meta_pack_info.pack_stat_size = recheck(Poco::File(packStatPath()).getSize()); if (read_meta_mode.needPackProperty() && footer.meta_pack_info.pack_property_size != 0) readPackProperty(file_provider, footer.meta_pack_info); @@ -635,52 +555,9 @@ void DMFile::finalizeForFolderMode(const FileProviderPtr & file_provider, const LOG_WARNING(log, "Existing dmfile, removed: {}", deleted_path); } old_file.renameTo(new_path); - initializeSubFileStatsForFolderMode(); initializeIndices(); } -void DMFile::finalizeForSingleFileMode(WriteBuffer & buffer) -{ - Footer footer; - std::tie(footer.meta_pack_info.pack_property_offset, footer.meta_pack_info.pack_property_size) = writePackPropertyToBuffer(buffer); - std::tie(footer.meta_pack_info.column_stat_offset, footer.meta_pack_info.column_stat_size) = writeMetaToBuffer(buffer); - std::tie(footer.meta_pack_info.pack_stat_offset, footer.meta_pack_info.pack_stat_size) = writePackStatToBuffer(buffer); - - footer.sub_file_stat_offset = buffer.count(); - footer.sub_file_num = sub_file_stats.size(); - for (auto & iter : sub_file_stats) - { - writeStringBinary(iter.first, buffer); - writeIntBinary(iter.second.offset, buffer); - writeIntBinary(iter.second.size, buffer); - } - writeIntBinary(footer.meta_pack_info.pack_property_offset, buffer); - writeIntBinary(footer.meta_pack_info.pack_property_size, buffer); - writeIntBinary(footer.meta_pack_info.column_stat_offset, buffer); - writeIntBinary(footer.meta_pack_info.column_stat_size, buffer); - writeIntBinary(footer.meta_pack_info.pack_stat_offset, buffer); - writeIntBinary(footer.meta_pack_info.pack_stat_size, buffer); - writeIntBinary(footer.sub_file_stat_offset, buffer); - writeIntBinary(footer.sub_file_num, buffer); - writeIntBinary(static_cast>(footer.file_format_version), buffer); - buffer.next(); - if (status != Status::WRITING) - throw Exception(fmt::format("Expected WRITING status, now {}", statusString(status))); - Poco::File old_file(path()); - Poco::File old_ngc_file(ngcPath()); - - setStatus(Status::READABLE); - - auto new_path = path(); - Poco::File file(new_path); - if (file.exists()) - file.remove(); - Poco::File new_ngc_file(ngcPath()); - new_ngc_file.createFile(); - old_file.renameTo(new_path); - old_ngc_file.remove(); -} - std::set DMFile::listAllInPath( const FileProviderPtr & file_provider, const String & parent_path, @@ -752,7 +629,7 @@ std::set DMFile::listAllInPath( // Only return the ID if the file is able to be GC-ed. const auto file_path = parent_path + "/" + name; Poco::File file(file_path); - String ngc_path = details::getNGCPath(file_path, file.isFile()); + String ngc_path = details::getNGCPath(file_path); Poco::File ngc_file(ngc_path); if (!ngc_file.exists()) file_ids.insert(file_id); @@ -779,47 +656,21 @@ void DMFile::enableGC() void DMFile::remove(const FileProviderPtr & file_provider) { - if (isSingleFileMode()) + // If we use `FileProvider::deleteDirectory`, it may left a broken DMFile on disk. + // By renaming DMFile with a prefix first, even if there are broken DMFiles left, + // we can safely clean them when `DMFile::listAllInPath` is called. + const String dir_path = path(); + if (Poco::File dir_file(dir_path); dir_file.exists()) { - file_provider->deleteRegularFile(path(), EncryptionPath(encryptionBasePath(), "")); - } - else - { - // If we use `FileProvider::deleteDirectory`, it may left a broken DMFile on disk. - // By renaming DMFile with a prefix first, even if there are broken DMFiles left, - // we can safely clean them when `DMFile::listAllInPath` is called. - const String dir_path = path(); - if (Poco::File dir_file(dir_path); dir_file.exists()) - { - setStatus(Status::DROPPED); - const String deleted_path = path(); - // Rename the directory first (note that we should do it before deleting encryption info) - dir_file.renameTo(deleted_path); - FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_before_dmfile_remove_encryption); - file_provider->deleteEncryptionInfo(EncryptionPath(encryptionBasePath(), "")); - FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_before_dmfile_remove_from_disk); - // Then clean the files on disk - dir_file.remove(true); - } - } -} - -void DMFile::initializeSubFileStatsForFolderMode() -{ - if (isSingleFileMode()) - return; - - Poco::File directory{path()}; - std::vector sub_files{}; - directory.list(sub_files); - for (const auto & name : sub_files) - { - if (endsWith(name, details::DATA_FILE_SUFFIX) || endsWith(name, details::INDEX_FILE_SUFFIX) - || endsWith(name, details::MARK_FILE_SUFFIX)) - { - auto size = Poco::File(path() + "/" + name).getSize(); - sub_file_stats.emplace(name, SubFileStat{0, size}); - } + setStatus(Status::DROPPED); + const String deleted_path = path(); + // Rename the directory first (note that we should do it before deleting encryption info) + dir_file.renameTo(deleted_path); + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_before_dmfile_remove_encryption); + file_provider->deleteEncryptionInfo(EncryptionPath(encryptionBasePath(), "")); + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_before_dmfile_remove_from_disk); + // Then clean the files on disk + dir_file.remove(true); } } @@ -840,8 +691,6 @@ void DMFile::initializeIndices() throw DB::Exception(fmt::format("invalid ColId: {} from file: {}", err.what(), data)); } }; - if (isSingleFileMode()) - return; Poco::File directory{path()}; std::vector sub_files{}; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.h b/dbms/src/Storages/DeltaMerge/File/DMFile.h index 22a7bd4ab55..06cf1f9502f 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.h @@ -51,12 +51,6 @@ using DMFiles = std::vector; class DMFile : private boost::noncopyable { public: - enum Mode : int - { - SINGLE_FILE, - FOLDER, - }; - enum Status : int { WRITABLE, @@ -65,11 +59,6 @@ class DMFile : private boost::noncopyable DROPPED, }; - enum DMSingleFileFormatVersion : int - { - SINGLE_FILE_VERSION_BASE = 0, - }; - static String statusString(Status status) { switch (status) @@ -178,12 +167,9 @@ class DMFile : private boost::noncopyable UInt64 sub_file_stat_offset; UInt32 sub_file_num; - DMSingleFileFormatVersion file_format_version; - Footer() : sub_file_stat_offset(0) , sub_file_num(0) - , file_format_version(DMSingleFileFormatVersion::SINGLE_FILE_VERSION_BASE) {} }; @@ -192,7 +178,7 @@ class DMFile : private boost::noncopyable using PackProperties = dtpb::PackProperties; static DMFilePtr - create(UInt64 file_id, const String & parent_path, bool single_file_mode = false, DMConfigurationOpt configuration = std::nullopt); + create(UInt64 file_id, const String & parent_path, DMConfigurationOpt configuration = std::nullopt); static DMFilePtr restore( const FileProviderPtr & file_provider, @@ -212,7 +198,7 @@ class DMFile : private boost::noncopyable // static helper function for getting path static String getPathByStatus(const String & parent_path, UInt64 file_id, DMFile::Status status); - static String getNGCPath(const String & parent_path, UInt64 file_id, DMFile::Status status, bool is_single_mode); + static String getNGCPath(const String & parent_path, UInt64 file_id, DMFile::Status status); bool canGC(); void enableGC(); @@ -266,7 +252,6 @@ class DMFile : private boost::noncopyable throw Exception("Column [" + DB::toString(col_id) + "] not found in dm file [" + path() + "]"); } bool isColumnExist(ColId col_id) const { return column_stats.find(col_id) != column_stats.end(); } - bool isSingleFileMode() const { return mode == Mode::SINGLE_FILE; } /* * TODO: This function is currently unused. We could use it when: @@ -301,22 +286,18 @@ class DMFile : private boost::noncopyable DMFile(UInt64 file_id_, UInt64 page_id_, String parent_path_, - Mode mode_, Status status_, Poco::Logger * log_, DMConfigurationOpt configuration_ = std::nullopt) : file_id(file_id_) , page_id(page_id_) , parent_path(std::move(parent_path_)) - , mode(mode_) , status(status_) , configuration(std::move(configuration_)) , log(log_) { } - bool isFolderMode() const { return mode == Mode::FOLDER; } - // Do not gc me. String ngcPath() const; String metaPath() const { return subFilePath(metaFileName()); } @@ -325,6 +306,10 @@ class DMFile : private boost::noncopyable String configurationPath() const { return subFilePath(configurationFileName()); } using FileNameBase = String; + size_t colIndexSize(const FileNameBase & file_name_base) { return Poco::File(colIndexPath(file_name_base)).getSize(); } + size_t colMarkSize(const FileNameBase & file_name_base) { return Poco::File(colMarkPath(file_name_base)).getSize(); } + size_t colDataSize(const FileNameBase & file_name_base) { return Poco::File(colDataPath(file_name_base)).getSize(); } + String colDataPath(const FileNameBase & file_name_base) const { return subFilePath(colDataFileName(file_name_base)); } String colIndexPath(const FileNameBase & file_name_base) const { return subFilePath(colIndexFileName(file_name_base)); } String colMarkPath(const FileNameBase & file_name_base) const { return subFilePath(colMarkFileName(file_name_base)); } @@ -332,12 +317,6 @@ class DMFile : private boost::noncopyable String colIndexCacheKey(const FileNameBase & file_name_base) const; String colMarkCacheKey(const FileNameBase & file_name_base) const; - size_t colIndexOffset(const FileNameBase & file_name_base) const { return subFileOffset(colIndexFileName(file_name_base)); } - size_t colMarkOffset(const FileNameBase & file_name_base) const { return subFileOffset(colMarkFileName(file_name_base)); } - size_t colIndexSize(const FileNameBase & file_name_base) const { return subFileSize(colIndexFileName(file_name_base)); } - size_t colMarkSize(const FileNameBase & file_name_base) const { return subFileSize(colMarkFileName(file_name_base)); } - size_t colDataSize(const FileNameBase & file_name_base) const { return subFileSize(colDataFileName(file_name_base)); } - bool isColIndexExist(const ColId & col_id) const; String encryptionBasePath() const; @@ -388,19 +367,8 @@ class DMFile : private boost::noncopyable void setStatus(Status status_) { status = status_; } void finalizeForFolderMode(const FileProviderPtr & file_provider, const WriteLimiterPtr & write_limiter); - void finalizeForSingleFileMode(WriteBuffer & buffer); - - void addSubFileStat(const String & name, UInt64 offset, UInt64 size) { sub_file_stats.emplace(name, SubFileStat{offset, size}); } - - bool isSubFileExists(const String & name) const { return sub_file_stats.find(name) != sub_file_stats.end(); } - - String subFilePath(const String & file_name) const { return isSingleFileMode() ? path() : path() + "/" + file_name; } - - size_t subFileOffset(const String & file_name) const { return isSingleFileMode() ? sub_file_stats.at(file_name).offset : 0; } - - size_t subFileSize(const String & file_name) const { return sub_file_stats.at(file_name).size; } - void initializeSubFileStatsForFolderMode(); + String subFilePath(const String & file_name) const { return path() + "/" + file_name; } void initializeIndices(); @@ -416,7 +384,6 @@ class DMFile : private boost::noncopyable ColumnStats column_stats; std::unordered_set column_indices; - Mode mode; Status status; DMConfigurationOpt configuration; // configuration diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileBlockOutputStream.h b/dbms/src/Storages/DeltaMerge/File/DMFileBlockOutputStream.h index 3329dda14d4..89b6fe2627b 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileBlockOutputStream.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileBlockOutputStream.h @@ -32,12 +32,9 @@ namespace DM class DMFileBlockOutputStream { public: - using Flags = DMFileWriter::Flags; - DMFileBlockOutputStream(const Context & context, const DMFilePtr & dmfile, - const ColumnDefines & write_columns, - const Flags flags = Flags()) + const ColumnDefines & write_columns) : writer( dmfile, write_columns, @@ -46,8 +43,7 @@ class DMFileBlockOutputStream DMFileWriter::Options{ CompressionSettings(context.getSettingsRef().dt_compression_method, context.getSettingsRef().dt_compression_level), context.getSettingsRef().min_compress_block_size, - context.getSettingsRef().max_compress_block_size, - flags}) + context.getSettingsRef().max_compress_block_size}) { } diff --git a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h index 25e781cf4d5..6d631a0e6e5 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h @@ -246,7 +246,6 @@ class DMFilePackFilter dmfile->encryptionIndexPath(file_name_base), std::min(static_cast(DBMS_DEFAULT_BUFFER_SIZE), index_file_size), read_limiter); - index_buf.seek(dmfile->colIndexOffset(file_name_base)); return MinMaxIndex::read(*type, index_buf, dmfile->colIndexSize(file_name_base)); } else @@ -258,7 +257,6 @@ class DMFilePackFilter read_limiter, dmfile->configuration->getChecksumAlgorithm(), dmfile->configuration->getChecksumFrameLength()); - index_buf->seek(dmfile->colIndexOffset(file_name_base)); auto header_size = dmfile->configuration->getChecksumHeaderLength(); auto frame_total_size = dmfile->configuration->getChecksumFrameLength() + header_size; auto frame_count = index_file_size / frame_total_size + (index_file_size % frame_total_size != 0); diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp index b07b8ce199d..5534f260a21 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp @@ -51,67 +51,39 @@ DMFileReader::Stream::Stream( size_t max_read_buffer_size, const LoggerPtr & log, const ReadLimiterPtr & read_limiter) - : single_file_mode(reader.single_file_mode) - , avg_size_hint(reader.dmfile->getColumnStat(col_id).avg_size) + : avg_size_hint(reader.dmfile->getColumnStat(col_id).avg_size) { // load mark data - if (reader.single_file_mode) - { - auto mark_with_size_load = [&]() -> MarkWithSizesInCompressedFilePtr { - auto res = std::make_shared(reader.dmfile->getPacks()); - if (res->empty()) // 0 rows. - return res; - size_t size = sizeof(MarkWithSizeInCompressedFile) * reader.dmfile->getPacks(); + auto mark_load = [&]() -> MarksInCompressedFilePtr { + auto res = std::make_shared(reader.dmfile->getPacks()); + if (res->empty()) // 0 rows. + return res; + size_t size = sizeof(MarkInCompressedFile) * reader.dmfile->getPacks(); + if (reader.dmfile->configuration) + { + auto buffer = createReadBufferFromFileBaseByFileProvider( + reader.file_provider, + reader.dmfile->colMarkPath(file_name_base), + reader.dmfile->encryptionMarkPath(file_name_base), + reader.dmfile->getConfiguration()->getChecksumFrameLength(), + read_limiter, + reader.dmfile->getConfiguration()->getChecksumAlgorithm(), + reader.dmfile->getConfiguration()->getChecksumFrameLength()); + buffer->readBig(reinterpret_cast(res->data()), size); + } + else + { auto file = reader.file_provider->newRandomAccessFile(reader.dmfile->colMarkPath(file_name_base), - reader.dmfile->encryptionMarkPath(file_name_base), - nullptr, - -1); - auto mark_size = reader.dmfile->colMarkSize(file_name_base); - auto mark_offset = reader.dmfile->colMarkOffset(file_name_base); - if (unlikely(mark_size != size)) - { - throw DB::TiFlashException("Bad DMFile format, expected mark file content size: " + std::to_string(size) - + " vs. actual: " + std::to_string(mark_size), - Errors::DeltaTree::Internal); - } - PageUtil::readFile(file, mark_offset, reinterpret_cast(res->data()), size, read_limiter); + reader.dmfile->encryptionMarkPath(file_name_base)); + PageUtil::readFile(file, 0, reinterpret_cast(res->data()), size, read_limiter); + } + return res; + }; - return res; - }; - mark_with_sizes = mark_with_size_load(); - } + if (reader.mark_cache) + marks = reader.mark_cache->getOrSet(reader.dmfile->colMarkCacheKey(file_name_base), mark_load); else - { - auto mark_load = [&]() -> MarksInCompressedFilePtr { - auto res = std::make_shared(reader.dmfile->getPacks()); - if (res->empty()) // 0 rows. - return res; - size_t size = sizeof(MarkInCompressedFile) * reader.dmfile->getPacks(); - if (reader.dmfile->configuration) - { - auto buffer = createReadBufferFromFileBaseByFileProvider( - reader.file_provider, - reader.dmfile->colMarkPath(file_name_base), - reader.dmfile->encryptionMarkPath(file_name_base), - reader.dmfile->getConfiguration()->getChecksumFrameLength(), - read_limiter, - reader.dmfile->getConfiguration()->getChecksumAlgorithm(), - reader.dmfile->getConfiguration()->getChecksumFrameLength()); - buffer->readBig(reinterpret_cast(res->data()), size); - } - else - { - auto file = reader.file_provider->newRandomAccessFile(reader.dmfile->colMarkPath(file_name_base), - reader.dmfile->encryptionMarkPath(file_name_base)); - PageUtil::readFile(file, 0, reinterpret_cast(res->data()), size, read_limiter); - } - return res; - }; - if (reader.mark_cache) - marks = reader.mark_cache->getOrSet(reader.dmfile->colMarkCacheKey(file_name_base), mark_load); - else - marks = mark_load(); - } + marks = mark_load(); const String data_path = reader.dmfile->colDataPath(file_name_base); size_t data_file_size = reader.dmfile->colDataSize(file_name_base); @@ -120,19 +92,7 @@ DMFileReader::Stream::Stream( size_t estimated_size = 0; const auto & use_packs = reader.pack_filter.getUsePacks(); - if (reader.single_file_mode) - { - for (size_t i = 0; i < packs; i++) - { - if (!use_packs[i]) - { - continue; - } - buffer_size = std::max(buffer_size, (*mark_with_sizes)[i].mark_size); - estimated_size += (*mark_with_sizes)[i].mark_size; - } - } - else if (!reader.dmfile->configuration) + if (!reader.dmfile->configuration) { for (size_t i = 0; i < packs;) { @@ -170,10 +130,7 @@ DMFileReader::Stream::Stream( else { auto filename = reader.dmfile->colDataFileName(file_name_base); - auto iterator = reader.dmfile->sub_file_stats.find(filename); - estimated_size = iterator != reader.dmfile->sub_file_stats.end() - ? iterator->second.size - : reader.dmfile->configuration->getChecksumFrameLength(); + estimated_size = Poco::File(reader.dmfile->subFilePath(filename)).getSize(); } buffer_size = std::min(buffer_size, max_read_buffer_size); @@ -237,7 +194,6 @@ DMFileReader::DMFileReader( , read_columns(read_columns_) , is_common_handle(is_common_handle_) , read_one_pack_every_time(read_one_pack_every_time_) - , single_file_mode(dmfile_->isSingleFileMode()) , enable_handle_clean_read(enable_handle_clean_read_) , enable_del_clean_read(enable_del_clean_read_) , is_fast_scan(is_fast_scan_) @@ -332,9 +288,9 @@ Block DMFileReader::read() // Find max continuing rows we can read. size_t start_pack_id = next_pack_id; size_t start_row_offset = next_row_offset; - // When single_file_mode is true, or read_one_pack_every_time is true, we can just read one pack every time. + // When read_one_pack_every_time is true, we can just read one pack every time. // 0 means no limit - size_t read_pack_limit = (single_file_mode || read_one_pack_every_time) ? 1 : 0; + size_t read_pack_limit = read_one_pack_every_time ? 1 : 0; const auto & pack_stats = dmfile->getPackStats(); @@ -377,11 +333,6 @@ Block DMFileReader::read() size_t read_packs = next_pack_id - start_pack_id; - if (single_file_mode && read_packs != 1) - { - throw DB::TiFlashException("read_packs must be one when single_file_mode is true.", Errors::DeltaTree::Internal); - } - scan_context->total_dmfile_scanned_packs += read_packs; scan_context->total_dmfile_scanned_rows += read_rows; @@ -496,7 +447,7 @@ Block DMFileReader::read() rows_count += pack_stats[cursor].rows; } ColumnPtr col; - readColumn(cd, col, range.first, range.second - range.first, rows_count, skip_packs_by_column[i], single_file_mode); + readColumn(cd, col, range.first, range.second - range.first, rows_count, skip_packs_by_column[i]); column->insertRangeFrom(*col, 0, col->size()); skip_packs_by_column[i] = 0; } @@ -520,7 +471,7 @@ Block DMFileReader::read() { auto data_type = dmfile->getColumnStat(cd.id).type; ColumnPtr column; - readColumn(cd, column, start_pack_id, read_packs, read_rows, skip_packs_by_column[i], single_file_mode); + readColumn(cd, column, start_pack_id, read_packs, read_rows, skip_packs_by_column[i]); auto converted_column = convertColumnByColumnDefineIfNeed(data_type, std::move(column), cd); res.insert(ColumnWithTypeAndName{std::move(converted_column), cd.type, cd.name, cd.id}); @@ -595,14 +546,13 @@ void DMFileReader::readColumn(ColumnDefine & column_define, size_t start_pack_id, size_t pack_count, size_t read_rows, - size_t skip_packs, - bool force_seek) + size_t skip_packs) { if (!getCachedPacks(column_define.id, start_pack_id, pack_count, read_rows, column)) { auto data_type = dmfile->getColumnStat(column_define.id).type; auto col = data_type->createColumn(); - readFromDisk(column_define, col, start_pack_id, read_rows, skip_packs, force_seek || last_read_from_cache[column_define.id]); + readFromDisk(column_define, col, start_pack_id, read_rows, skip_packs, last_read_from_cache[column_define.id]); column = std::move(col); last_read_from_cache[column_define.id] = false; } diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h index 5ebde116815..e82749682fa 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h @@ -50,19 +50,17 @@ class DMFileReader const LoggerPtr & log, const ReadLimiterPtr & read_limiter); - const bool single_file_mode; double avg_size_hint; MarksInCompressedFilePtr marks; - MarkWithSizesInCompressedFilePtr mark_with_sizes; size_t getOffsetInFile(size_t i) const { - return single_file_mode ? (*mark_with_sizes)[i].mark.offset_in_compressed_file : (*marks)[i].offset_in_compressed_file; + return (*marks)[i].offset_in_compressed_file; } size_t getOffsetInDecompressedBlock(size_t i) const { - return single_file_mode ? (*mark_with_sizes)[i].mark.offset_in_decompressed_block : (*marks)[i].offset_in_decompressed_block; + return (*marks)[i].offset_in_decompressed_block; } std::unique_ptr buf; @@ -127,8 +125,7 @@ class DMFileReader size_t start_pack_id, size_t pack_count, size_t read_rows, - size_t skip_packs, - bool force_seek); + size_t skip_packs); bool getCachedPacks(ColId col_id, size_t start_pack_id, size_t pack_count, size_t read_rows, ColumnPtr & col); private: @@ -141,8 +138,6 @@ class DMFileReader // read_one_pack_every_time is used to create info for every pack const bool read_one_pack_every_time; - const bool single_file_mode{}; - /// Clean read optimize // In normal mode, if there is no delta for some packs in stable, we can try to do clean read (enable_handle_clean_read is true). // In fast mode, if we don't need handle column, we will try to do clean read on handle_column(enable_handle_clean_read is true). diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp index 637181c1e91..e5b30a4949c 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp @@ -34,37 +34,26 @@ DMFileWriter::DMFileWriter(const DMFilePtr & dmfile_, const DMFileWriter::Options & options_) : dmfile(dmfile_) , write_columns(write_columns_) - , options(options_, dmfile) + , options(options_) , // assume pack_stat_file is the first file created inside DMFile // it will create encryption info for the whole DMFile - pack_stat_file( - (options.flags.isSingleFile()) // - ? nullptr - : (dmfile->configuration ? createWriteBufferFromFileBaseByFileProvider( - file_provider_, - dmfile->packStatPath(), - dmfile->encryptionPackStatPath(), - true, - write_limiter_, - dmfile->configuration->getChecksumAlgorithm(), - dmfile->configuration->getChecksumFrameLength()) - : createWriteBufferFromFileBaseByFileProvider(file_provider_, - dmfile->packStatPath(), - dmfile->encryptionPackStatPath(), - true, - write_limiter_, - 0, - 0, - options.max_compress_block_size))) - , single_file_stream((!options.flags.isSingleFile()) - ? nullptr - : new SingleFileStream( - dmfile_, - options.compression_settings, - options.max_compress_block_size, - file_provider_, - write_limiter_)) + pack_stat_file(dmfile->configuration ? createWriteBufferFromFileBaseByFileProvider( + file_provider_, + dmfile->packStatPath(), + dmfile->encryptionPackStatPath(), + true, + write_limiter_, + dmfile->configuration->getChecksumAlgorithm(), + dmfile->configuration->getChecksumFrameLength()) + : createWriteBufferFromFileBaseByFileProvider(file_provider_, + dmfile->packStatPath(), + dmfile->encryptionPackStatPath(), + true, + write_limiter_, + 0, + 0, + options.max_compress_block_size)) , file_provider(file_provider_) , write_limiter(write_limiter_) { @@ -75,25 +64,7 @@ DMFileWriter::DMFileWriter(const DMFilePtr & dmfile_, /// for handle column always generate index auto type = removeNullable(cd.type); bool do_index = cd.id == EXTRA_HANDLE_COLUMN_ID || type->isInteger() || type->isDateOrDateTime(); - if (options.flags.isSingleFile()) - { - if (do_index) - { - const auto column_name = DMFile::getFileNameBase(cd.id, {}); - single_file_stream->minmax_indexs.emplace(column_name, std::make_shared(*cd.type)); - } - - auto callback = [&](const IDataType::SubstreamPath & substream_path) { - const auto stream_name = DMFile::getFileNameBase(cd.id, substream_path); - single_file_stream->column_data_sizes.emplace(stream_name, 0); - single_file_stream->column_mark_with_sizes.emplace(stream_name, SingleFileStream::MarkWithSizes{}); - }; - cd.type->enumerateStreams(callback, {}); - } - else - { - addStreams(cd.id, cd.type, do_index); - } + addStreams(cd.id, cd.type, do_index); dmfile->column_stats.emplace(cd.id, ColumnStat{cd.id, cd.type, /*avg_size=*/0}); } } @@ -141,10 +112,7 @@ void DMFileWriter::write(const Block & block, const BlockProperty & block_proper stat.first_tag = static_cast(col->get64(0)); } - if (!options.flags.isSingleFile()) - { - writePODBinary(stat, *pack_stat_file); - } + writePODBinary(stat, *pack_stat_file); dmfile->addPack(stat); @@ -157,145 +125,62 @@ void DMFileWriter::write(const Block & block, const BlockProperty & block_proper void DMFileWriter::finalize() { - if (!options.flags.isSingleFile()) - { - pack_stat_file->sync(); - } + pack_stat_file->sync(); for (auto & cd : write_columns) { finalizeColumn(cd.id, cd.type); } - if (options.flags.isSingleFile()) - { - dmfile->finalizeForSingleFileMode(single_file_stream->plain_layer); - single_file_stream->flush(); - } - else - { - dmfile->finalizeForFolderMode(file_provider, write_limiter); - } + dmfile->finalizeForFolderMode(file_provider, write_limiter); } void DMFileWriter::writeColumn(ColId col_id, const IDataType & type, const IColumn & column, const ColumnVector * del_mark) { size_t rows = column.size(); - if (options.flags.isSingleFile()) - { - auto callback = [&](const IDataType::SubstreamPath & substream) { - size_t offset_in_compressed_file = single_file_stream->plain_layer.count(); - const auto stream_name = DMFile::getFileNameBase(col_id, substream); - if (unlikely(substream.size() > 1)) - throw DB::TiFlashException("Substream_path shouldn't be more than one.", Errors::DeltaTree::Internal); - - auto & minmax_indexs = single_file_stream->minmax_indexs; - if (auto iter = minmax_indexs.find(stream_name); iter != minmax_indexs.end()) + type.enumerateStreams( + [&](const IDataType::SubstreamPath & substream) { + const auto name = DMFile::getFileNameBase(col_id, substream); + auto & stream = column_streams.at(name); + if (stream->minmaxes) { // For EXTRA_HANDLE_COLUMN_ID, we ignore del_mark when add minmax index. // Because we need all rows which satisfy a certain range when place delta index no matter whether the row is a delete row. // For TAG Column, we also ignore del_mark when add minmax index. - iter->second->addPack(column, (col_id == EXTRA_HANDLE_COLUMN_ID || col_id == TAG_COLUMN_ID) ? nullptr : del_mark); - } - - auto offset_in_compressed_block = single_file_stream->original_layer.offset(); - if (unlikely(offset_in_compressed_block != 0)) - throw DB::TiFlashException("Offset in compressed block is always expected to be 0 when single_file_mode is true, now " - + DB::toString(offset_in_compressed_block), - Errors::DeltaTree::Internal); - - // write column data - if (substream.empty()) - { - if (unlikely(type.isNullable())) - throw DB::TiFlashException("Type shouldn't be nullable when substream_path is empty.", Errors::DeltaTree::Internal); - - type.serializeBinaryBulk(column, single_file_stream->original_layer, 0, rows); - } - else if (substream[0].type == IDataType::Substream::NullMap) - { - if (unlikely(!type.isNullable())) - throw DB::TiFlashException( - "Type shouldn be nullable when substream_path's type is NullMap.", - Errors::DeltaTree::Internal); - - const auto & col = static_cast(column); - col.checkConsistency(); - DataTypeUInt8().serializeBinaryBulk(col.getNullMapColumn(), single_file_stream->original_layer, 0, rows); - } - else if (substream[0].type == IDataType::Substream::NullableElements) - { - if (unlikely(!type.isNullable())) - throw DB::TiFlashException( - "Type shouldn be nullable when substream_path's type is NullableElements.", - Errors::DeltaTree::Internal); - - const auto & nullable_type = static_cast(type); - const auto & col = static_cast(column); - nullable_type.getNestedType()->serializeBinaryBulk(col.getNestedColumn(), single_file_stream->original_layer, 0, rows); + stream->minmaxes->addPack(column, (col_id == EXTRA_HANDLE_COLUMN_ID || col_id == TAG_COLUMN_ID) ? nullptr : del_mark); } - else - { - throw DB::TiFlashException( - "Unknown type of substream_path: " + std::to_string(substream[0].type), - Errors::DeltaTree::Internal); - } - single_file_stream->flushCompressedData(); - size_t mark_size_in_file = single_file_stream->plain_layer.count() - offset_in_compressed_file; - single_file_stream->column_mark_with_sizes.at(stream_name) - .push_back(MarkWithSizeInCompressedFile{MarkInCompressedFile{.offset_in_compressed_file = offset_in_compressed_file, - .offset_in_decompressed_block = offset_in_compressed_block}, - mark_size_in_file}); - single_file_stream->column_data_sizes[stream_name] += mark_size_in_file; - }; - type.enumerateStreams(callback, {}); - } - else - { - type.enumerateStreams( - [&](const IDataType::SubstreamPath & substream) { - const auto name = DMFile::getFileNameBase(col_id, substream); - auto & stream = column_streams.at(name); - if (stream->minmaxes) - { - // For EXTRA_HANDLE_COLUMN_ID, we ignore del_mark when add minmax index. - // Because we need all rows which satisfy a certain range when place delta index no matter whether the row is a delete row. - // For TAG Column, we also ignore del_mark when add minmax index. - stream->minmaxes->addPack(column, (col_id == EXTRA_HANDLE_COLUMN_ID || col_id == TAG_COLUMN_ID) ? nullptr : del_mark); - } - - /// There could already be enough data to compress into the new block. - if (stream->compressed_buf->offset() >= options.min_compress_block_size) - stream->compressed_buf->next(); - auto offset_in_compressed_block = stream->compressed_buf->offset(); + /// There could already be enough data to compress into the new block. + if (stream->compressed_buf->offset() >= options.min_compress_block_size) + stream->compressed_buf->next(); - writeIntBinary(stream->plain_file->count(), *stream->mark_file); - writeIntBinary(offset_in_compressed_block, *stream->mark_file); - }, - {}); + auto offset_in_compressed_block = stream->compressed_buf->offset(); - type.serializeBinaryBulkWithMultipleStreams( - column, - [&](const IDataType::SubstreamPath & substream) { - const auto stream_name = DMFile::getFileNameBase(col_id, substream); - auto & stream = column_streams.at(stream_name); - return &(*stream->compressed_buf); - }, - 0, - rows, - true, - {}); + writeIntBinary(stream->plain_file->count(), *stream->mark_file); + writeIntBinary(offset_in_compressed_block, *stream->mark_file); + }, + {}); - type.enumerateStreams( - [&](const IDataType::SubstreamPath & substream) { - const auto name = DMFile::getFileNameBase(col_id, substream); - auto & stream = column_streams.at(name); - stream->compressed_buf->nextIfAtEnd(); - }, - {}); - } + type.serializeBinaryBulkWithMultipleStreams( + column, + [&](const IDataType::SubstreamPath & substream) { + const auto stream_name = DMFile::getFileNameBase(col_id, substream); + auto & stream = column_streams.at(stream_name); + return &(*stream->compressed_buf); + }, + 0, + rows, + true, + {}); + + type.enumerateStreams( + [&](const IDataType::SubstreamPath & substream) { + const auto name = DMFile::getFileNameBase(col_id, substream); + auto & stream = column_streams.at(name); + stream->compressed_buf->nextIfAtEnd(); + }, + {}); auto & avg_size = dmfile->column_stats.at(col_id).avg_size; IDataType::updateAvgValueSizeHint(column, avg_size); @@ -315,91 +200,58 @@ void DMFileWriter::finalizeColumn(ColId col_id, DataTypePtr type) } }; #endif - if (options.flags.isSingleFile()) - { - auto callback = [&](const IDataType::SubstreamPath & substream) { - const auto stream_name = DMFile::getFileNameBase(col_id, substream); - - dmfile->addSubFileStat(DMFile::colDataFileName(stream_name), 0, single_file_stream->column_data_sizes.at(stream_name)); - - // write mark - size_t mark_offset_in_file = single_file_stream->plain_layer.count(); - for (const auto & mark_with_size : single_file_stream->column_mark_with_sizes.at(stream_name)) - { - writeIntBinary(mark_with_size.mark.offset_in_compressed_file, single_file_stream->plain_layer); - writeIntBinary(mark_with_size.mark.offset_in_decompressed_block, single_file_stream->plain_layer); - writeIntBinary(mark_with_size.mark_size, single_file_stream->plain_layer); - } - size_t mark_size_in_file = single_file_stream->plain_layer.count() - mark_offset_in_file; - dmfile->addSubFileStat(DMFile::colMarkFileName(stream_name), mark_offset_in_file, mark_size_in_file); - // write minmax - auto & minmax_indexs = single_file_stream->minmax_indexs; - if (auto iter = minmax_indexs.find(stream_name); iter != minmax_indexs.end()) - { - size_t minmax_offset_in_file = single_file_stream->plain_layer.count(); - iter->second->write(*type, single_file_stream->plain_layer); - size_t minmax_size_in_file = single_file_stream->plain_layer.count() - minmax_offset_in_file; - bytes_written += minmax_size_in_file; - dmfile->addSubFileStat(DMFile::colIndexFileName(stream_name), minmax_offset_in_file, minmax_size_in_file); - } - }; - type->enumerateStreams(callback, {}); - } - else - { - auto callback = [&](const IDataType::SubstreamPath & substream) { - const auto stream_name = DMFile::getFileNameBase(col_id, substream); - auto & stream = column_streams.at(stream_name); - stream->flush(); + auto callback = [&](const IDataType::SubstreamPath & substream) { + const auto stream_name = DMFile::getFileNameBase(col_id, substream); + auto & stream = column_streams.at(stream_name); + stream->flush(); #ifndef NDEBUG - examine_buffer_size(*stream->mark_file, *this->file_provider); - examine_buffer_size(*stream->plain_file, *this->file_provider); + examine_buffer_size(*stream->mark_file, *this->file_provider); + examine_buffer_size(*stream->plain_file, *this->file_provider); #endif - bytes_written += stream->getWrittenBytes(); + bytes_written += stream->getWrittenBytes(); - if (stream->minmaxes) + if (stream->minmaxes) + { + if (!dmfile->configuration) { - if (!dmfile->configuration) - { - WriteBufferFromFileProvider buf( - file_provider, - dmfile->colIndexPath(stream_name), - dmfile->encryptionIndexPath(stream_name), - false, - write_limiter); - stream->minmaxes->write(*type, buf); - buf.sync(); - // Ignore data written in index file when the dmfile is empty. - // This is ok because the index file in this case is tiny, and we already ignore other small files like meta and pack stat file. - // The motivation to do this is to show a zero `stable_size_on_disk` for empty segments, - // and we cannot change the index file format for empty dmfile because of backward compatibility. - bytes_written += is_empty_file ? 0 : buf.getMaterializedBytes(); - } - else - { - auto buf = createWriteBufferFromFileBaseByFileProvider(file_provider, - dmfile->colIndexPath(stream_name), - dmfile->encryptionIndexPath(stream_name), - false, - write_limiter, - dmfile->configuration->getChecksumAlgorithm(), - dmfile->configuration->getChecksumFrameLength()); - stream->minmaxes->write(*type, *buf); - buf->sync(); - // Ignore data written in index file when the dmfile is empty. - // This is ok because the index file in this case is tiny, and we already ignore other small files like meta and pack stat file. - // The motivation to do this is to show a zero `stable_size_on_disk` for empty segments, - // and we cannot change the index file format for empty dmfile because of backward compatibility. - bytes_written += is_empty_file ? 0 : buf->getMaterializedBytes(); + WriteBufferFromFileProvider buf( + file_provider, + dmfile->colIndexPath(stream_name), + dmfile->encryptionIndexPath(stream_name), + false, + write_limiter); + stream->minmaxes->write(*type, buf); + buf.sync(); + // Ignore data written in index file when the dmfile is empty. + // This is ok because the index file in this case is tiny, and we already ignore other small files like meta and pack stat file. + // The motivation to do this is to show a zero `stable_size_on_disk` for empty segments, + // and we cannot change the index file format for empty dmfile because of backward compatibility. + bytes_written += is_empty_file ? 0 : buf.getMaterializedBytes(); + } + else + { + auto buf = createWriteBufferFromFileBaseByFileProvider(file_provider, + dmfile->colIndexPath(stream_name), + dmfile->encryptionIndexPath(stream_name), + false, + write_limiter, + dmfile->configuration->getChecksumAlgorithm(), + dmfile->configuration->getChecksumFrameLength()); + stream->minmaxes->write(*type, *buf); + buf->sync(); + // Ignore data written in index file when the dmfile is empty. + // This is ok because the index file in this case is tiny, and we already ignore other small files like meta and pack stat file. + // The motivation to do this is to show a zero `stable_size_on_disk` for empty segments, + // and we cannot change the index file format for empty dmfile because of backward compatibility. + bytes_written += is_empty_file ? 0 : buf->getMaterializedBytes(); #ifndef NDEBUG - examine_buffer_size(*buf, *this->file_provider); + examine_buffer_size(*buf, *this->file_provider); #endif - } } - }; - type->enumerateStreams(callback, {}); - } + } + }; + type->enumerateStreams(callback, {}); // Update column's bytes in disk dmfile->column_stats.at(col_id).serialized_bytes = bytes_written; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h index 95cedad28e3..efcbe58dd7c 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h @@ -108,59 +108,6 @@ class DMFileWriter using StreamPtr = std::unique_ptr; using ColumnStreams = std::map; - struct SingleFileStream - { - SingleFileStream(const DMFilePtr & dmfile, - CompressionSettings compression_settings, - size_t max_compress_block_size, - const FileProviderPtr & file_provider, - const WriteLimiterPtr & write_limiter_) - : plain_file(createWriteBufferFromFileBaseByFileProvider(file_provider, - dmfile->path(), - EncryptionPath(dmfile->encryptionBasePath(), ""), - true, - write_limiter_, - 0, - 0, - max_compress_block_size)) - , plain_layer(*plain_file) - , compressed_buf(plain_layer, compression_settings) - , original_layer(compressed_buf) - { - } - - void flushCompressedData() - { - original_layer.next(); - compressed_buf.next(); - } - - void flush() - { - plain_layer.next(); - plain_file->next(); - - plain_file->sync(); - } - - using ColumnMinMaxIndexs = std::unordered_map; - ColumnMinMaxIndexs minmax_indexs; - - using ColumnDataSizes = std::unordered_map; - ColumnDataSizes column_data_sizes; - - using MarkWithSizes = std::vector; - using ColumnMarkWithSizes = std::unordered_map; - ColumnMarkWithSizes column_mark_with_sizes; - - /// original_layer -> compressed_buf -> plain_layer -> plain_file - WriteBufferFromFileBasePtr plain_file; - HashingWriteBuffer plain_layer; - CompressedWriteBuffer<> compressed_buf; - HashingWriteBuffer original_layer; - }; - using SingleFileStreamPtr = std::shared_ptr; - struct BlockProperty { size_t not_clean_rows; @@ -169,47 +116,22 @@ class DMFileWriter size_t gc_hint_version; }; - struct Flags - { - private: - static constexpr size_t IS_SINGLE_FILE = 0x01; - - size_t value; - - public: - Flags() - : value(0x0) - {} - - inline void setSingleFile(bool v) { value = (v ? (value | IS_SINGLE_FILE) : (value & ~IS_SINGLE_FILE)); } - inline bool isSingleFile() const { return (value & IS_SINGLE_FILE); } - }; - struct Options { CompressionSettings compression_settings; - size_t min_compress_block_size; - size_t max_compress_block_size; - Flags flags; + size_t min_compress_block_size{}; + size_t max_compress_block_size{}; Options() = default; - Options(CompressionSettings compression_settings_, size_t min_compress_block_size_, size_t max_compress_block_size_, Flags flags_) + Options(CompressionSettings compression_settings_, size_t min_compress_block_size_, size_t max_compress_block_size_) : compression_settings(compression_settings_) , min_compress_block_size(min_compress_block_size_) , max_compress_block_size(max_compress_block_size_) - , flags(flags_) { } - Options(const Options & from, const DMFilePtr & file) - : compression_settings(from.compression_settings) - , min_compress_block_size(from.min_compress_block_size) - , max_compress_block_size(from.max_compress_block_size) - , flags(from.flags) - { - flags.setSingleFile(file->isSingleFileMode()); - } + Options(const Options & from) = default; }; @@ -246,8 +168,6 @@ class DMFileWriter WriteBufferFromFileBasePtr pack_stat_file; - SingleFileStreamPtr single_file_stream; - FileProviderPtr file_provider; WriteLimiterPtr write_limiter; diff --git a/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.cpp b/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.cpp index bb4ccfdf049..e7b909b0fd8 100644 --- a/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.cpp @@ -39,7 +39,6 @@ SSTFilesToDTFilesOutputStream::SSTFilesToDTFilesOutputStream( // ChildStream child_, StorageDeltaMergePtr storage_, DecodingStorageSchemaSnapshotConstPtr schema_snap_, - TiDB::SnapshotApplyMethod method_, FileConvertJobType job_type_, UInt64 split_after_rows_, UInt64 split_after_size_, @@ -47,7 +46,6 @@ SSTFilesToDTFilesOutputStream::SSTFilesToDTFilesOutputStream( // : child(std::move(child_)) , storage(std::move(storage_)) , schema_snap(std::move(schema_snap_)) - , method(method_) , job_type(job_type_) , split_after_rows(split_after_rows_) , split_after_size(split_after_size_) @@ -124,21 +122,8 @@ bool SSTFilesToDTFilesOutputStream::newDTFileStream() return false; } - DMFileBlockOutputStream::Flags flags{}; - switch (method) - { - case TiDB::SnapshotApplyMethod::DTFile_Directory: - flags.setSingleFile(false); - break; - case TiDB::SnapshotApplyMethod::DTFile_Single: - flags.setSingleFile(true); - break; - default: - break; - } - - auto dt_file = DMFile::create(file_id, parent_path, flags.isSingleFile(), storage->createChecksumConfig(flags.isSingleFile())); - dt_stream = std::make_unique(context, dt_file, *(schema_snap->column_defines), flags); + auto dt_file = DMFile::create(file_id, parent_path, storage->createChecksumConfig()); + dt_stream = std::make_unique(context, dt_file, *(schema_snap->column_defines)); dt_stream->writePrefix(); ingest_files.emplace_back(dt_file); ingest_files_range.emplace_back(std::nullopt); diff --git a/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.h b/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.h index 59c80b1ade6..18a9bab7eec 100644 --- a/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.h +++ b/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.h @@ -74,7 +74,6 @@ class SSTFilesToDTFilesOutputStream : private boost::noncopyable ChildStream child_, StorageDeltaMergePtr storage_, DecodingStorageSchemaSnapshotConstPtr schema_snap_, - TiDB::SnapshotApplyMethod method_, FileConvertJobType job_type_, UInt64 split_after_rows_, UInt64 split_after_size_, @@ -112,7 +111,6 @@ class SSTFilesToDTFilesOutputStream : private boost::noncopyable ChildStream child; StorageDeltaMergePtr storage; DecodingStorageSchemaSnapshotConstPtr schema_snap; - const TiDB::SnapshotApplyMethod method; const FileConvertJobType job_type; const UInt64 split_after_rows; const UInt64 split_after_size; diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index e1d2e87013a..bbeed541f47 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -110,11 +110,10 @@ DMFilePtr writeIntoNewDMFile(DMContext & dm_context, // const ColumnDefinesPtr & schema_snap, const BlockInputStreamPtr & input_stream, UInt64 file_id, - const String & parent_path, - DMFileBlockOutputStream::Flags flags) + const String & parent_path) { - auto dmfile = DMFile::create(file_id, parent_path, flags.isSingleFile(), dm_context.createChecksumConfig(flags.isSingleFile())); - auto output_stream = std::make_shared(dm_context.db_context, dmfile, *schema_snap, flags); + auto dmfile = DMFile::create(file_id, parent_path, dm_context.createChecksumConfig()); + auto output_stream = std::make_shared(dm_context.db_context, dmfile, *schema_snap); const auto * mvcc_stream = typeid_cast *>(input_stream.get()); input_stream->readPrefix(); @@ -175,11 +174,8 @@ StableValueSpacePtr createNewStable( // auto delegator = context.path_pool.getStableDiskDelegator(); auto store_path = delegator.choosePath(); - DMFileBlockOutputStream::Flags flags; - flags.setSingleFile(context.db_context.getSettingsRef().dt_enable_single_file_mode_dmfile); - PageId dtfile_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); - auto dtfile = writeIntoNewDMFile(context, schema_snap, input_stream, dtfile_id, store_path, flags); + auto dtfile = writeIntoNewDMFile(context, schema_snap, input_stream, dtfile_id, store_path); auto stable = std::make_shared(stable_id); stable->setFiles({dtfile}, RowKeyRange::newAll(context.is_common_handle, context.rowkey_column_size)); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_column_file.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_column_file.cpp index 7066ea7e2b4..ba0358e67e9 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_column_file.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_column_file.cpp @@ -86,7 +86,7 @@ TEST_F(ColumnFileTest, ColumnFileBigRead) try { auto table_columns = DMTestEnv::getDefaultColumns(); - auto dm_file = DMFile::create(1, parent_path, false, std::make_optional()); + auto dm_file = DMFile::create(1, parent_path, std::make_optional()); const size_t num_rows_write_per_batch = 8192; const size_t batch_num = 3; const UInt64 tso_value = 100; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h index 68c95d07fa6..77a132e92d7 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h @@ -37,8 +37,7 @@ extern DMFilePtr writeIntoNewDMFile(DMContext & dm_context, const ColumnDefinesPtr & schema_snap, const BlockInputStreamPtr & input_stream, UInt64 file_id, - const String & parent_path, - DMFileBlockOutputStream::Flags flags); + const String & parent_path); namespace tests { // Simple test suit for DeltaMergeStore. @@ -157,16 +156,12 @@ class DeltaMergeStoreRWTest auto input_stream = std::make_shared(block); auto [store_path, file_id] = store->preAllocateIngestFile(); - DMFileBlockOutputStream::Flags flags; - flags.setSingleFile(DMTestEnv::getPseudoRandomNumber() % 2); - auto dmfile = writeIntoNewDMFile( context, std::make_shared(store->getTableColumns()), input_stream, file_id, - store_path, - flags); + store_path); store->preIngestFile(store_path, file_id, dmfile->getBytesOnDisk()); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_value_space.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_value_space.cpp index 17b5d79febf..ddaa1b9c103 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_value_space.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_value_space.cpp @@ -41,8 +41,7 @@ extern DMFilePtr writeIntoNewDMFile(DMContext & dm_context, // const ColumnDefinesPtr & schema_snap, const BlockInputStreamPtr & input_stream, UInt64 file_id, - const String & parent_path, - DMFileBlockOutputStream::Flags flags); + const String & parent_path); namespace tests { void assertBlocksEqual(const Blocks & blocks1, const Blocks & blocks2) @@ -153,7 +152,7 @@ Block appendColumnFileBigToDeltaValueSpace(DMContext & context, ColumnDefinesPtr auto input_stream = std::make_shared(block); auto store_path = delegator.choosePath(); auto dmfile - = writeIntoNewDMFile(context, std::make_shared(*column_defines), input_stream, file_id, store_path, {}); + = writeIntoNewDMFile(context, std::make_shared(*column_defines), input_stream, file_id, store_path); delegator.addDTFile(file_id, dmfile->getBytesOnDisk(), store_path); auto & pk_column = block.getByPosition(0).column; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp index 517594d9a50..0de81b1b7d6 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp @@ -42,24 +42,8 @@ namespace DM { namespace tests { -TEST(DMFileWriterFlagsTest, SetClearFlags) -{ - using Flags = DMFileWriter::Flags; - - Flags flags; - - bool f = false; - flags.setSingleFile(f); - EXPECT_FALSE(flags.isSingleFile()); - - f = true; - flags.setSingleFile(f); - EXPECT_TRUE(flags.isSingleFile()); -} - enum class DMFileMode { - SingleFile, DirectoryLegacy, DirectoryChecksum }; @@ -71,9 +55,6 @@ String paramToString(const ::testing::TestParamInfo & info) String name; switch (mode) { - case DMFileMode::SingleFile: - name = "single_file"; - break; case DMFileMode::DirectoryLegacy: name = "folder"; break; @@ -103,13 +84,12 @@ class DMFileTest TiFlashStorageTestBasic::SetUp(); auto mode = GetParam(); - bool single_file_mode = (mode == DMFileMode::SingleFile); auto configuration = (mode == DMFileMode::DirectoryChecksum ? std::make_optional() : std::nullopt); parent_path = TiFlashStorageTestBasic::getTemporaryPath(); path_pool = std::make_unique(db_context->getPathPool().withTable("test", "DMFileTest", false)); storage_pool = std::make_unique(*db_context, /*ns_id*/ 100, *path_pool, "test.t1"); - dm_file = DMFile::create(1, parent_path, single_file_mode, std::move(configuration)); + dm_file = DMFile::create(1, parent_path, std::move(configuration)); table_columns = std::make_shared(); column_cache = std::make_shared(); @@ -252,10 +232,9 @@ try dm_file.reset(); auto mode = GetParam(); - bool single_file_mode = mode == DMFileMode::SingleFile; auto configuration = mode == DMFileMode::DirectoryChecksum ? std::make_optional() : std::nullopt; - dm_file = DMFile::create(id, parent_path, single_file_mode, std::move(configuration)); + dm_file = DMFile::create(id, parent_path, std::move(configuration)); // Right after created, the fil is not abled to GC and it is ignored by `listAllInPath` EXPECT_FALSE(dm_file->canGC()); DMFile::ListOptions options; @@ -903,7 +882,7 @@ CATCH INSTANTIATE_TEST_CASE_P(DTFileMode, // DMFileTest, - testing::Values(DMFileMode::SingleFile, DMFileMode::DirectoryLegacy, DMFileMode::DirectoryChecksum), + testing::Values(DMFileMode::DirectoryLegacy, DMFileMode::DirectoryChecksum), paramToString); @@ -923,12 +902,11 @@ class DMFileClusteredIndexTest path = TiFlashStorageTestBasic::getTemporaryPath(); auto mode = GetParam(); - bool single_file_mode = mode == DMFileMode::SingleFile; auto configuration = mode == DMFileMode::DirectoryChecksum ? std::make_optional() : std::nullopt; path_pool = std::make_unique(db_context->getPathPool().withTable("test", "t", false)); storage_pool = std::make_unique(*db_context, table_id, *path_pool, "test.t1"); - dm_file = DMFile::create(0, path, single_file_mode, std::move(configuration)); + dm_file = DMFile::create(0, path, std::move(configuration)); table_columns = std::make_shared(); column_cache = std::make_shared(); @@ -1127,12 +1105,6 @@ try } CATCH -INSTANTIATE_TEST_CASE_P(DTFileMode, // - DMFileClusteredIndexTest, - testing::Values(DMFile::Mode::FOLDER, DMFile::Mode::SINGLE_FILE), - paramToString); - - /// DDL test cases class DMFileDDLTest : public DMFileTest { @@ -1339,7 +1311,7 @@ CATCH INSTANTIATE_TEST_CASE_P(DTFileMode, // DMFileDDLTest, - testing::Values(DMFileMode::SingleFile, DMFileMode::DirectoryLegacy, DMFileMode::DirectoryChecksum), + testing::Values(DMFileMode::DirectoryLegacy, DMFileMode::DirectoryChecksum), paramToString); } // namespace tests diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index f266fbee02f..cf71819ce00 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -52,8 +52,7 @@ extern DMFilePtr writeIntoNewDMFile(DMContext & dm_context, // const ColumnDefinesPtr & schema_snap, const BlockInputStreamPtr & input_stream, UInt64 file_id, - const String & parent_path, - DMFileBlockOutputStream::Flags flags); + const String & parent_path); namespace tests { class SegmentTest : public DB::base::TiFlashStorageTestBasic @@ -1045,11 +1044,8 @@ class SegmentTest2 : public SegmentTest auto input_stream = std::make_shared(block); auto store_path = delegator.choosePath(); - DMFileBlockOutputStream::Flags flags; - flags.setSingleFile(DMTestEnv::getPseudoRandomNumber() % 2); - auto dmfile - = writeIntoNewDMFile(context, std::make_shared(*tableColumns()), input_stream, file_id, store_path, flags); + = writeIntoNewDMFile(context, std::make_shared(*tableColumns()), input_stream, file_id, store_path); delegator.addDTFile(file_id, dmfile->getBytesOnDisk(), store_path); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_simple_pk_test_basic.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_simple_pk_test_basic.cpp index 9fd6351ca8d..bb7f325f40a 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_simple_pk_test_basic.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_simple_pk_test_basic.cpp @@ -33,8 +33,7 @@ extern DMFilePtr writeIntoNewDMFile(DMContext & dm_context, const ColumnDefinesPtr & schema_snap, const BlockInputStreamPtr & input_stream, UInt64 file_id, - const String & parent_path, - DMFileBlockOutputStream::Flags flags); + const String & parent_path); namespace tests { @@ -314,8 +313,7 @@ ExternalDTFileInfo genDMFile(DeltaMergeStorePtr store, DMContext & context, cons std::make_shared(store->getTableColumns()), input_stream, file_id, - store_path, - /* flags */ {}); + store_path); store->preIngestFile(store_path, file_id, dmfile->getBytesOnDisk()); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_data.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_data.cpp index 1c308c6a192..f54350d94a1 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_data.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_data.cpp @@ -40,8 +40,7 @@ extern DMFilePtr writeIntoNewDMFile(DMContext & dm_context, const ColumnDefinesPtr & schema_snap, const BlockInputStreamPtr & input_stream, UInt64 file_id, - const String & parent_path, - DMFileBlockOutputStream::Flags flags); + const String & parent_path); namespace tests { @@ -176,8 +175,7 @@ try table_columns, input_stream, file_id, - delegator.choosePath(), - DMFileBlockOutputStream::Flags{}); + delegator.choosePath()); ingest_wbs.data.putExternal(file_id, /* tag */ 0); ingest_wbs.writeLogAndData(); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp index 7c2334d74f8..064ed0c4387 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp @@ -40,8 +40,7 @@ extern DMFilePtr writeIntoNewDMFile(DMContext & dm_context, const ColumnDefinesPtr & schema_snap, const BlockInputStreamPtr & input_stream, UInt64 file_id, - const String & parent_path, - DMFileBlockOutputStream::Flags flags); + const String & parent_path); namespace tests { @@ -420,8 +419,8 @@ void SegmentTestBasic::ingestDTFileIntoDelta(PageId segment_id, UInt64 write_row auto parent_path = delegator.choosePath(); auto file_id = storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); auto input_stream = std::make_shared(block); - DMFileBlockOutputStream::Flags flags; - auto dm_file = writeIntoNewDMFile(*dm_context, table_columns, input_stream, file_id, parent_path, flags); + + auto dm_file = writeIntoNewDMFile(*dm_context, table_columns, input_stream, file_id, parent_path); ingest_wbs.data.putExternal(file_id, /* tag */ 0); ingest_wbs.writeLogAndData(); delegator.addDTFile(file_id, dm_file->getBytesOnDisk(), parent_path); @@ -461,8 +460,7 @@ void SegmentTestBasic::ingestDTFileByReplace(PageId segment_id, UInt64 write_row auto parent_path = delegator.choosePath(); auto file_id = storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); auto input_stream = std::make_shared(block); - DMFileBlockOutputStream::Flags flags; - auto dm_file = writeIntoNewDMFile(*dm_context, table_columns, input_stream, file_id, parent_path, flags); + auto dm_file = writeIntoNewDMFile(*dm_context, table_columns, input_stream, file_id, parent_path); ingest_wbs.data.putExternal(file_id, /* tag */ 0); ingest_wbs.writeLogAndData(); delegator.addDTFile(file_id, dm_file->getBytesOnDisk(), parent_path); @@ -543,7 +541,7 @@ void SegmentTestBasic::replaceSegmentData(PageId segment_id, const Block & block auto file_id = storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); auto input_stream = std::make_shared(block); - auto dm_file = writeIntoNewDMFile(*dm_context, table_columns, input_stream, file_id, parent_path, {}); + auto dm_file = writeIntoNewDMFile(*dm_context, table_columns, input_stream, file_id, parent_path); ingest_wbs.data.putExternal(file_id, /* tag */ 0); ingest_wbs.writeLogAndData(); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_sst_files_stream.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_sst_files_stream.cpp index 25c2a55dd59..95e0dcda952 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_sst_files_stream.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_sst_files_stream.cpp @@ -177,7 +177,6 @@ try mock_stream, storage, schema_snapshot, - TiDB::SnapshotApplyMethod::DTFile_Directory, FileConvertJobType::ApplySnapshot, /* split_after_rows */ 0, /* split_after_size */ 0, @@ -205,7 +204,6 @@ try mock_stream, storage, schema_snapshot, - TiDB::SnapshotApplyMethod::DTFile_Directory, FileConvertJobType::ApplySnapshot, /* split_after_rows */ 0, /* split_after_size */ 0, @@ -235,7 +233,6 @@ try mock_stream, storage, schema_snapshot, - TiDB::SnapshotApplyMethod::DTFile_Directory, FileConvertJobType::ApplySnapshot, /* split_after_rows */ 1, /* split_after_size */ 1, @@ -266,7 +263,6 @@ try mock_stream, storage, schema_snapshot, - TiDB::SnapshotApplyMethod::DTFile_Directory, FileConvertJobType::ApplySnapshot, /* split_after_rows */ 10, /* split_after_size */ 0, @@ -303,7 +299,6 @@ try mock_stream, storage, schema_snapshot, - TiDB::SnapshotApplyMethod::DTFile_Directory, FileConvertJobType::ApplySnapshot, /* split_after_rows */ 10, /* split_after_size */ 0, @@ -336,7 +331,6 @@ try mock_stream, storage, schema_snapshot, - TiDB::SnapshotApplyMethod::DTFile_Directory, FileConvertJobType::ApplySnapshot, /* split_after_rows */ 10000, /* split_after_size */ 0, @@ -370,7 +364,6 @@ try mock_stream, storage, schema_snapshot, - TiDB::SnapshotApplyMethod::DTFile_Directory, FileConvertJobType::ApplySnapshot, /* split_after_rows */ 20, /* split_after_size */ 0, @@ -410,7 +403,6 @@ try mock_stream, storage, schema_snapshot, - TiDB::SnapshotApplyMethod::DTFile_Directory, FileConvertJobType::ApplySnapshot, /* split_after_rows */ 20, /* split_after_size */ 0, diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h index e4db7207541..f98d7b703d5 100644 --- a/dbms/src/Storages/StorageDeltaMerge.h +++ b/dbms/src/Storages/StorageDeltaMerge.h @@ -159,9 +159,9 @@ class StorageDeltaMerge bool initStoreIfDataDirExist() override; - DM::DMConfigurationOpt createChecksumConfig(bool is_single_file) const + DM::DMConfigurationOpt createChecksumConfig() const { - return DM::DMChecksumConfig::fromDBContext(global_context, is_single_file); + return DM::DMChecksumConfig::fromDBContext(global_context); } #ifndef DBMS_PUBLIC_GTEST diff --git a/dbms/src/Storages/Transaction/ApplySnapshot.cpp b/dbms/src/Storages/Transaction/ApplySnapshot.cpp index 28fb0d307ce..3591e49900c 100644 --- a/dbms/src/Storages/Transaction/ApplySnapshot.cpp +++ b/dbms/src/Storages/Transaction/ApplySnapshot.cpp @@ -39,7 +39,6 @@ namespace DB namespace FailPoints { extern const char force_set_sst_to_dtfile_block_size[]; -extern const char force_set_sst_decode_rand[]; extern const char pause_until_apply_raft_snapshot[]; } // namespace FailPoints @@ -350,7 +349,6 @@ std::vector KVStore::preHandleSSTsToDTFiles( bounded_stream, storage, schema_snap, - snapshot_apply_method, job_type, /* split_after_rows */ global_settings.dt_segment_limit_rows, /* split_after_size */ global_settings.dt_segment_limit_size, @@ -491,22 +489,6 @@ EngineStoreApplyRes KVStore::handleIngestSST(UInt64 region_id, const SSTViewVec return EngineStoreApplyRes::NotFound; } - fiu_do_on(FailPoints::force_set_sst_decode_rand, { - static int num_call = 0; - switch (num_call++ % 2) - { - case 0: - snapshot_apply_method = TiDB::SnapshotApplyMethod::DTFile_Directory; - break; - case 1: - snapshot_apply_method = TiDB::SnapshotApplyMethod::DTFile_Single; - break; - default: - break; - } - LOG_INFO(log, "{} ingest sst by method {}", region->toString(true), applyMethodToString(snapshot_apply_method)); - }); - const auto func_try_flush = [&]() { if (!region->writeCFCount()) return; diff --git a/dbms/src/Storages/Transaction/KVStore.cpp b/dbms/src/Storages/Transaction/KVStore.cpp index fb21be2d2b7..72ec8b70726 100644 --- a/dbms/src/Storages/Transaction/KVStore.cpp +++ b/dbms/src/Storages/Transaction/KVStore.cpp @@ -43,10 +43,9 @@ namespace FailPoints extern const char force_fail_in_flush_region_data[]; } // namespace FailPoints -KVStore::KVStore(Context & context, TiDB::SnapshotApplyMethod snapshot_apply_method_) +KVStore::KVStore(Context & context) : region_persister(std::make_unique(context, region_manager)) , raft_cmd_res(std::make_unique()) - , snapshot_apply_method(snapshot_apply_method_) , log(Logger::get()) , region_compact_log_period(120) , region_compact_log_min_rows(40 * 1024) diff --git a/dbms/src/Storages/Transaction/KVStore.h b/dbms/src/Storages/Transaction/KVStore.h index ec3494701b4..ac8aa285a20 100644 --- a/dbms/src/Storages/Transaction/KVStore.h +++ b/dbms/src/Storages/Transaction/KVStore.h @@ -78,7 +78,7 @@ class RegionPersister; class KVStore final : private boost::noncopyable { public: - KVStore(Context & context, TiDB::SnapshotApplyMethod snapshot_apply_method_); + KVStore(Context & context); void restore(PathPool & path_pool, const TiFlashRaftProxyHelper *); RegionPtr getRegion(RegionID region_id) const; @@ -135,8 +135,6 @@ class KVStore final : private boost::noncopyable // Exported only for tests. TiFlashRaftProxyHelper * mutProxyHelperUnsafe() { return const_cast(proxy_helper); } - TiDB::SnapshotApplyMethod applyMethod() const { return snapshot_apply_method; } - void addReadIndexEvent(Int64 f) { read_index_event_flag += f; } Int64 getReadIndexEvent() const { return read_index_event_flag; } @@ -256,8 +254,6 @@ class KVStore final : private boost::noncopyable // raft_cmd_res stores the result of applying raft cmd. It must be protected by task_mutex. std::unique_ptr raft_cmd_res; - TiDB::SnapshotApplyMethod snapshot_apply_method; - LoggerPtr log; std::atomic region_compact_log_period; diff --git a/dbms/src/Storages/Transaction/ProxyFFI.cpp b/dbms/src/Storages/Transaction/ProxyFFI.cpp index d1ec51a5cff..7cf4ec40ec8 100644 --- a/dbms/src/Storages/Transaction/ProxyFFI.cpp +++ b/dbms/src/Storages/Transaction/ProxyFFI.cpp @@ -379,19 +379,11 @@ RawCppPtr PreHandleSnapshot( } #endif - switch (kvstore->applyMethod()) - { - case TiDB::SnapshotApplyMethod::DTFile_Directory: - case TiDB::SnapshotApplyMethod::DTFile_Single: - { - // Pre-decode and save as DTFiles - auto ingest_ids = kvstore->preHandleSnapshotToFiles(new_region, snaps, index, term, tmt); - auto * res = new PreHandledSnapshotWithFiles{new_region, std::move(ingest_ids)}; - return GenRawCppPtr(res, RawCppPtrTypeImpl::PreHandledSnapshotWithFiles); - } - default: - throw Exception("Unknow Region apply method: " + applyMethodToString(kvstore->applyMethod())); - } + + // Pre-decode and save as DTFiles + auto ingest_ids = kvstore->preHandleSnapshotToFiles(new_region, snaps, index, term, tmt); + auto * res = new PreHandledSnapshotWithFiles{new_region, std::move(ingest_ids)}; + return GenRawCppPtr(res, RawCppPtrTypeImpl::PreHandledSnapshotWithFiles); } catch (...) { diff --git a/dbms/src/Storages/Transaction/StorageEngineType.h b/dbms/src/Storages/Transaction/StorageEngineType.h index 3d103ca60c1..3cbe4f6bff3 100644 --- a/dbms/src/Storages/Transaction/StorageEngineType.h +++ b/dbms/src/Storages/Transaction/StorageEngineType.h @@ -31,26 +31,4 @@ enum class StorageEngine UNSUPPORTED_ENGINES = 128, }; -enum class SnapshotApplyMethod : std::int32_t -{ - DEPRECATED_Block = 1, - // Invalid if the storage engine is not DeltaTree - DTFile_Directory, - DTFile_Single, -}; - -inline const std::string applyMethodToString(SnapshotApplyMethod method) -{ - switch (method) - { - case SnapshotApplyMethod::DTFile_Directory: - return "file1"; - case SnapshotApplyMethod::DTFile_Single: - return "file2"; - default: - return "unknown(" + std::to_string(static_cast(method)) + ")"; - } - return "unknown"; -} - } // namespace TiDB diff --git a/dbms/src/Storages/Transaction/TMTContext.cpp b/dbms/src/Storages/Transaction/TMTContext.cpp index 17e07180232..f22d2e83a42 100644 --- a/dbms/src/Storages/Transaction/TMTContext.cpp +++ b/dbms/src/Storages/Transaction/TMTContext.cpp @@ -67,7 +67,7 @@ static SchemaSyncerPtr createSchemaSyncer(bool exist_pd_addr, bool for_unit_test TMTContext::TMTContext(Context & context_, const TiFlashRaftConfig & raft_config, const pingcap::ClusterConfig & cluster_config) : context(context_) - , kvstore(context_.isDisaggregatedComputeMode() && context_.useAutoScaler() ? nullptr : std::make_shared(context, raft_config.snapshot_apply_method)) + , kvstore(context_.isDisaggregatedComputeMode() && context_.useAutoScaler() ? nullptr : std::make_shared(context)) , region_table(context) , background_service(nullptr) , gc_manager(context) diff --git a/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp b/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp index 80b901751df..2e1f7bb18cf 100644 --- a/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp +++ b/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp @@ -969,13 +969,6 @@ TEST_F(RegionKVStoreTest, KVStore) testRaftChangePeer(kvs, ctx.getTMTContext()); } { - auto ori_snapshot_apply_method = kvs.snapshot_apply_method; - kvs.snapshot_apply_method = TiDB::SnapshotApplyMethod::DTFile_Single; - SCOPE_EXIT({ - kvs.snapshot_apply_method = ori_snapshot_apply_method; - }); - - auto region_id = 19; auto region = makeRegion(region_id, RecordKVFormat::genKey(1, 50), RecordKVFormat::genKey(1, 60)); auto region_id_str = std::to_string(19); diff --git a/dbms/src/Storages/Transaction/tests/kvstore_helper.h b/dbms/src/Storages/Transaction/tests/kvstore_helper.h index 9eb9c9f90e5..cb179cb11c3 100644 --- a/dbms/src/Storages/Transaction/tests/kvstore_helper.h +++ b/dbms/src/Storages/Transaction/tests/kvstore_helper.h @@ -102,7 +102,7 @@ class RegionKVStoreTest : public ::testing::Test { kvstore.reset(); auto & global_ctx = TiFlashTestEnv::getGlobalContext(); - kvstore = std::make_unique(global_ctx, TiDB::SnapshotApplyMethod::DTFile_Directory); + kvstore = std::make_unique(global_ctx); // only recreate kvstore and restore data from disk, don't recreate proxy instance kvstore->restore(*path_pool, proxy_helper.get()); return *kvstore;