diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp index bfb5d2cc054..b8c7e83102c 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp @@ -30,12 +30,7 @@ DMFileBlockInputStreamBuilder::DMFileBlockInputStreamBuilder(const Context & con DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(const DMFilePtr & dmfile, const ColumnDefines & read_columns, const RowKeyRanges & rowkey_ranges) { - if (dmfile->getStatus() != DMFile::Status::READABLE) - throw Exception(fmt::format( - "DMFile [{}] is expected to be in READABLE status, but: {}", - dmfile->fileId(), - DMFile::statusString(dmfile->getStatus())), - ErrorCodes::LOGICAL_ERROR); + RUNTIME_CHECK(dmfile->getStatus() == DMFile::Status::READABLE, dmfile->fileId(), DMFile::statusString(dmfile->getStatus())); // if `rowkey_ranges` is empty, we unconditionally read all packs // `rowkey_ranges` and `is_common_handle` will only be useful in clean read mode. diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h index 2eb14c6d19e..bffa0ef59ce 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h @@ -102,13 +102,11 @@ class DMFileReader /// Return false if it is the end of stream. bool getSkippedRows(size_t & skip_rows); Block read(); - UInt64 fileId() const - { - return dmfile->fileId(); - } std::string path() const { - return dmfile->path(); + // Status of DMFile can be updated when DMFileReader in used and the pathname will be changed. + // For DMFileReader, always use the readable path. + return DMFile::getPathByStatus(dmfile->parentPath(), dmfile->fileId(), DMFile::Status::READABLE); } void addCachedPacks(ColId col_id, size_t start_pack_id, size_t pack_count, ColumnPtr & col); diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.cpp b/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.cpp index 13b187167b6..fc541aa9124 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.cpp +++ b/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.cpp @@ -60,4 +60,11 @@ void DMFileReaderPool::set(DMFileReader & from_reader, int64_t col_id, size_t st r->addCachedPacks(col_id, start, count, col); } } + +DMFileReader * DMFileReaderPool::get(const std::string & name) +{ + std::lock_guard lock(mtx); + auto itr = readers.find(name); + return itr != readers.end() && !itr->second.empty() ? *(itr->second.begin()) : nullptr; +} } // namespace DB::DM \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h b/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h index 57d60eae77b..9bf93ade499 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h @@ -224,6 +224,8 @@ class DMFileReaderPool void add(DMFileReader & reader); void del(DMFileReader & reader); void set(DMFileReader & from_reader, int64_t col_id, size_t start, size_t count, ColumnPtr & col); + // `get` is just for test. + DMFileReader * get(const std::string & name); private: DMFileReaderPool() = default; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_reader.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_reader.cpp index c06b74757d5..eb81a9edb29 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_reader.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_reader.cpp @@ -20,8 +20,10 @@ #include #include #include +#include #include #include +#include #include #include #include @@ -127,6 +129,98 @@ try } CATCH + +TEST_P(DeltaMergeStoreRWTest, DMFileNameChangedInDMFileReadPool) +try +{ + const ColumnDefine col_str_define(2, "col2", std::make_shared()); + const ColumnDefine col_i8_define(3, "i8", std::make_shared()); + { + auto table_column_defines = DMTestEnv::getDefaultColumns(); + table_column_defines->emplace_back(col_str_define); + table_column_defines->emplace_back(col_i8_define); + store = reload(table_column_defines); + } + + constexpr size_t num_rows_write = 128; + // Ensure stable is not empty. + { + auto block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false); + block.insert(DB::tests::createColumn( + createNumberStrings(0, num_rows_write), + col_str_define.name, + col_str_define.id)); + block.insert(DB::tests::createColumn( + createSignedNumbers(0, num_rows_write), + col_i8_define.name, + col_i8_define.id)); + store->write(*db_context, db_context->getSettingsRef(), block); + ASSERT_TRUE(store->flushCache(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()))); + store->mergeDeltaAll(*db_context); + auto stable = store->id_to_segment.begin()->second->getStable(); + ASSERT_EQ(stable->getRows(), num_rows_write); + } + + // Ensure delta is not empty. + { + auto beg = num_rows_write; + auto end = num_rows_write + num_rows_write; + auto block = DMTestEnv::prepareSimpleWriteBlock(beg, end, false); + block.insert(DB::tests::createColumn( + createNumberStrings(beg, end), + col_str_define.name, + col_str_define.id)); + block.insert(DB::tests::createColumn( + createSignedNumbers(beg, end), + col_i8_define.name, + col_i8_define.id)); + store->write(*db_context, db_context->getSettingsRef(), block); + auto delta = store->id_to_segment.begin()->second->getDelta(); + ASSERT_EQ(delta->getRows(), num_rows_write); + } + + // Check DMFile + const auto & dmfiles = store->id_to_segment.begin()->second->getStable()->getDMFiles(); + ASSERT_EQ(dmfiles.size(), 1); + auto dmfile = dmfiles.front(); + auto readable_path = DMFile::getPathByStatus(dmfile->parentPath(), dmfile->fileId(), DMFile::Status::READABLE); + ASSERT_EQ(dmfile->path(), readable_path); + ASSERT_EQ(DMFileReaderPool::instance().get(readable_path), nullptr); + + { + const auto & columns = store->getTableColumns(); + BlockInputStreamPtr in = store->read(*db_context, + db_context->getSettingsRef(), + columns, + {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + /* num_streams= */ 1, + /* max_version= */ std::numeric_limits::max(), + EMPTY_FILTER, + TRACING_NAME, + /* keep_order= */ false, + /* is_fast_scan= */ false, + /* expected_block_size= */ 128)[0]; + auto blk = in->read(); + // DMFileReader is created and add to DMFileReaderPool. + auto * reader = DMFileReaderPool::instance().get(readable_path); + ASSERT_NE(reader, nullptr); + ASSERT_EQ(reader->path(), readable_path); + + // Update DMFile. + ASSERT_TRUE(store->flushCache(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()))); + store->mergeDeltaAll(*db_context); + auto stable = store->id_to_segment.begin()->second->getStable(); + ASSERT_EQ(stable->getRows(), 2 * num_rows_write); + + dmfile->remove(db_context->getFileProvider()); + ASSERT_NE(dmfile->path(), readable_path); + + in = nullptr; + ASSERT_EQ(DMFileReaderPool::instance().get(readable_path), nullptr); + } +} +CATCH + } // namespace tests } // namespace DM } // namespace DB