diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.h index 7832bcba4f7..fac1f97cf44 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.h @@ -39,12 +39,13 @@ class BlockOrDelete , block_offset(offset_) {} explicit BlockOrDelete(const RowKeyRange & delete_range_) - : delete_range(delete_range_) + : block_offset(0) + , delete_range(delete_range_) {} - bool isBlock() { return (bool)block; } + bool isBlock() { return static_cast(block); } auto & getBlock() { return block; }; - auto getBlockOffset() { return block_offset; } + auto getBlockOffset() const { return block_offset; } auto & getDeleteRange() { return delete_range; } }; @@ -60,12 +61,12 @@ class ColumnFileSetSnapshot : public std::enable_shared_from_this #include #include #include @@ -71,11 +72,17 @@ Columns ColumnFileTiny::readFromDisk(const PageReader & page_reader, // } else { - // New column after ddl is not exist in this pack, fill with default value + // New column after ddl is not exist in this CFTiny, fill with default value columns[index - col_start] = createColumnWithDefaultValue(cd, rows); } } + // All columns to be read are not exist in this CFTiny and filled with default value, + // we can skip reading from disk + if (fields.second.empty()) + return columns; + + // Read the columns from disk and apply DDL cast if need auto page_map = page_reader.read({fields}); Page page = page_map[data_page_id]; for (size_t index = col_start; index < col_end; ++index) @@ -183,7 +190,7 @@ Block ColumnFileTiny::readBlockForMinorCompaction(const PageReader & page_reader } } -ColumnTinyFilePtr ColumnFileTiny::writeColumnFile(DMContext & context, const Block & block, size_t offset, size_t limit, WriteBatches & wbs, const BlockPtr & schema, const CachePtr & cache) +ColumnFileTinyPtr ColumnFileTiny::writeColumnFile(DMContext & context, const Block & block, size_t offset, size_t limit, WriteBatches & wbs, const BlockPtr & schema, const CachePtr & cache) { auto page_id = writeColumnFileData(context, block, offset, limit, wbs); auto new_column_file_schema = schema ? schema : std::make_shared(block.cloneEmpty()); @@ -201,7 +208,9 @@ PageId ColumnFileTiny::writeColumnFileData(DMContext & context, const Block & bl { auto last_buf_size = write_buf.count(); serializeColumn(write_buf, *col.column, col.type, offset, limit, context.db_context.getSettingsRef().dt_compression_method, context.db_context.getSettingsRef().dt_compression_level); - col_data_sizes.push_back(write_buf.count() - last_buf_size); + size_t serialized_size = write_buf.count() - last_buf_size; + RUNTIME_CHECK_MSG(serialized_size != 0, "try to persist a block with empty column, colname={} colid={} block={}", col.name, col.column_id, block.dumpJsonStructure()); + col_data_sizes.push_back(serialized_size); } auto data_size = write_buf.count(); @@ -252,4 +261,4 @@ ColumnFileReaderPtr ColumnFileTinyReader::createNewReader(const ColumnDefinesPtr } } // namespace DM -} // namespace DB \ No newline at end of file +} // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h index 4a2f494b712..4ccf641cc37 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h @@ -21,7 +21,7 @@ namespace DB namespace DM { class ColumnFileTiny; -using ColumnTinyFilePtr = std::shared_ptr; +using ColumnFileTinyPtr = std::shared_ptr; /// A column file which data is stored in PageStorage. /// It may be created in two ways: @@ -88,7 +88,7 @@ class ColumnFileTiny : public ColumnFilePersisted /// Replace the schema with a new schema, and the new schema instance should be exactly the same as the previous one. void resetIdenticalSchema(BlockPtr schema_) { schema = schema_; } - ColumnTinyFilePtr cloneWith(PageId new_data_page_id) + ColumnFileTinyPtr cloneWith(PageId new_data_page_id) { auto new_tiny_file = std::make_shared(*this); new_tiny_file->data_page_id = new_data_page_id; @@ -109,7 +109,7 @@ class ColumnFileTiny : public ColumnFilePersisted Block readBlockForMinorCompaction(const PageReader & page_reader) const; - static ColumnTinyFilePtr writeColumnFile(DMContext & context, const Block & block, size_t offset, size_t limit, WriteBatches & wbs, const BlockPtr & schema = nullptr, const CachePtr & cache = nullptr); + static ColumnFileTinyPtr writeColumnFile(DMContext & context, const Block & block, size_t offset, size_t limit, WriteBatches & wbs, const BlockPtr & schema = nullptr, const CachePtr & cache = nullptr); static PageId writeColumnFileData(DMContext & context, const Block & block, size_t offset, size_t limit, WriteBatches & wbs); diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index e54c63bf283..753184af622 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -561,8 +561,15 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_ // The [offset, rows - offset] can be exceeding the Segment's rowkey_range. Cut the range // to fit the segment. auto [cur_offset, cur_limit] = rowkey_range.getPosRange(handle_column, offset, rows - offset); - if (unlikely(cur_offset != offset)) - throw Exception(fmt::format("cur_offset does not equal to offset. is_common_handle {} start_key {} cur_offset {} cur_limit {} rows {} offset {} rowkey_range {}", is_common_handle, start_key.toRowKeyValue().toString(), cur_offset, cur_limit, rows, offset, rowkey_range.toDebugString()), ErrorCodes::LOGICAL_ERROR); + RUNTIME_CHECK_MSG(cur_offset == offset && cur_limit != 0, + "invalid cur_offset or cur_limit. is_common_handle={} start_key={} cur_offset={} cur_limit={} rows={} offset={} rowkey_range={}", + is_common_handle, + start_key.toRowKeyValue().toString(), + cur_offset, + cur_limit, + rows, + offset, + rowkey_range.toDebugString()); limit = cur_limit; auto alloc_bytes = block.bytes(offset, limit); diff --git a/dbms/src/Storages/DeltaMerge/WriteBatches.h b/dbms/src/Storages/DeltaMerge/WriteBatches.h index 83420f278d5..b5d2b00d728 100644 --- a/dbms/src/Storages/DeltaMerge/WriteBatches.h +++ b/dbms/src/Storages/DeltaMerge/WriteBatches.h @@ -41,7 +41,7 @@ struct WriteBatches : private boost::noncopyable WriteLimiterPtr write_limiter; - WriteBatches(StoragePool & storage_pool_, const WriteLimiterPtr & write_limiter_ = nullptr) + explicit WriteBatches(StoragePool & storage_pool_, const WriteLimiterPtr & write_limiter_ = nullptr) : ns_id(storage_pool_.getNamespaceId()) , log(ns_id) , data(ns_id) @@ -58,12 +58,11 @@ struct WriteBatches : private boost::noncopyable { if constexpr (DM_RUN_CHECK) { - Poco::Logger * logger = &Poco::Logger::get("WriteBatches"); auto check_empty = [&](const WriteBatch & wb, const String & name) { if (!wb.empty()) { StackTrace trace; - LOG_ERROR(logger, + LOG_ERROR(Logger::get(), "!!!=========================Modifications in {} haven't persisted=========================!!! Stack trace: {}", name, trace.toString()); @@ -91,8 +90,7 @@ struct WriteBatches : private boost::noncopyable if constexpr (DM_RUN_CHECK) { - Poco::Logger * logger = &Poco::Logger::get("WriteBatches"); - auto check = [](const WriteBatch & wb, const String & what, Poco::Logger * logger) { + auto check = [](const WriteBatch & wb, const String & what) { if (wb.empty()) return; for (const auto & w : wb.getWrites()) @@ -100,11 +98,11 @@ struct WriteBatches : private boost::noncopyable if (unlikely(w.type == WriteBatchWriteType::DEL)) throw Exception("Unexpected deletes in " + what); } - LOG_TRACE(logger, "Write into {} : {}", what, wb.toString()); + LOG_TRACE(Logger::get(), "Write into {} : {}", what, wb.toString()); }; - check(log, "log", logger); - check(data, "data", logger); + check(log, "log"); + check(data, "data"); } for (auto & w : log.getWrites()) @@ -135,9 +133,7 @@ struct WriteBatches : private boost::noncopyable if constexpr (DM_RUN_CHECK) { - Poco::Logger * logger = &Poco::Logger::get("WriteBatches"); - - auto check = [](const WriteBatch & wb, const String & what, Poco::Logger * logger) { + auto check = [](const WriteBatch & wb, const String & what) { if (wb.empty()) return; for (const auto & w : wb.getWrites()) @@ -145,11 +141,11 @@ struct WriteBatches : private boost::noncopyable if (unlikely(w.type != WriteBatchWriteType::DEL)) throw Exception("Expected deletes in " + what); } - LOG_TRACE(logger, "Rollback remove from {} : {}", what, wb.toString()); + LOG_TRACE(Logger::get(), "Rollback remove from {} : {}", what, wb.toString()); }; - check(log_wb, "log_wb", logger); - check(data_wb, "data_wb", logger); + check(log_wb, "log_wb"); + check(data_wb, "data_wb"); } storage_pool.logWriter()->write(std::move(log_wb), write_limiter); @@ -163,9 +159,7 @@ struct WriteBatches : private boost::noncopyable { if constexpr (DM_RUN_CHECK) { - Poco::Logger * logger = &Poco::Logger::get("WriteBatches"); - - auto check = [](const WriteBatch & wb, const String & what, Poco::Logger * logger) { + auto check = [](const WriteBatch & wb, const String & what) { if (wb.empty()) return; for (const auto & w : wb.getWrites()) @@ -173,10 +167,10 @@ struct WriteBatches : private boost::noncopyable if (unlikely(w.type != WriteBatchWriteType::PUT)) throw Exception("Expected puts in " + what); } - LOG_TRACE(logger, "Write into {} : {}", what, wb.toString()); + LOG_TRACE(Logger::get(), "Write into {} : {}", what, wb.toString()); }; - check(meta, "meta", logger); + check(meta, "meta"); } storage_pool.metaWriter()->write(std::move(meta), write_limiter); @@ -187,9 +181,7 @@ struct WriteBatches : private boost::noncopyable { if constexpr (DM_RUN_CHECK) { - Poco::Logger * logger = &Poco::Logger::get("WriteBatches"); - - auto check = [](const WriteBatch & wb, const String & what, Poco::Logger * logger) { + auto check = [](const WriteBatch & wb, const String & what) { if (wb.empty()) return; for (const auto & w : wb.getWrites()) @@ -197,12 +189,12 @@ struct WriteBatches : private boost::noncopyable if (unlikely(w.type != WriteBatchWriteType::DEL)) throw Exception("Expected deletes in " + what); } - LOG_TRACE(logger, "Write into {} : {}", what, wb.toString()); + LOG_TRACE(Logger::get(), "Write into {} : {}", what, wb.toString()); }; - check(removed_log, "removed_log", logger); - check(removed_data, "removed_data", logger); - check(removed_meta, "removed_meta", logger); + check(removed_log, "removed_log"); + check(removed_data, "removed_data"); + check(removed_meta, "removed_meta"); } storage_pool.logWriter()->write(std::move(removed_log), write_limiter); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_column_file.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_column_file.cpp index ffdf019d504..7066ea7e2b4 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_column_file.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_column_file.cpp @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include #include #include #include @@ -23,9 +25,12 @@ #include #include #include +#include #include #include +#include +#include #include namespace DB @@ -191,6 +196,53 @@ try } CATCH +TEST_F(ColumnFileTest, SerializeEmptyBlock) +try +{ + size_t num_rows_write = 0; + Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false); + WriteBatches wbs(dmContext().storage_pool); + EXPECT_THROW(ColumnFileTiny::writeColumnFile(dmContext(), block, 0, num_rows_write, wbs), DB::Exception); +} +CATCH + +TEST_F(ColumnFileTest, ReadColumns) +try +{ + size_t num_rows_write = 10; + Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false); + ColumnFileTinyPtr cf; + { + WriteBatches wbs(dmContext().storage_pool); + cf = ColumnFileTiny::writeColumnFile(dmContext(), block, 0, num_rows_write, wbs); + wbs.writeAll(); + } + auto storage_snap = std::make_shared(dmContext().storage_pool, nullptr, "", true); + + { + // Read columns exactly the same as we have written + auto columns_to_read = std::make_shared(getColumnDefinesFromBlock(block)); + auto reader = cf->getReader(dmContext(), storage_snap, columns_to_read); + auto block_read = reader->readNextBlock(); + ASSERT_BLOCK_EQ(block_read, block); + } + + { + // Only read with a column that is not exist in ColumnFileTiny + ColumnID added_colid = 100; + String added_colname = "added_col"; + auto columns_to_read = std::make_shared(ColumnDefines{ColumnDefine(added_colid, added_colname, typeFromString("Int64"))}); + auto reader = cf->getReader(dmContext(), storage_snap, columns_to_read); + auto block_read = reader->readNextBlock(); + ASSERT_COLUMNS_EQ_R( + ColumnsWithTypeAndName({ + createColumn(std::vector(num_rows_write, 0)), + }), + block_read.getColumnsWithTypeAndName()); + } +} +CATCH + } // namespace tests } // namespace DM } // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index a2ec3da3cef..f266fbee02f 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include diff --git a/dbms/src/Storages/Page/V3/BlobStore.cpp b/dbms/src/Storages/Page/V3/BlobStore.cpp index 799d0c2cfe8..85eb79457f1 100644 --- a/dbms/src/Storages/Page/V3/BlobStore.cpp +++ b/dbms/src/Storages/Page/V3/BlobStore.cpp @@ -272,7 +272,7 @@ PageEntriesEdit BlobStore::write(DB::WriteBatch & wb, const WriteLimiterPtr & wr } case WriteBatchWriteType::PUT: case WriteBatchWriteType::UPSERT: - throw Exception(fmt::format("write batch have a invalid total size [write_type={}]", static_cast(write.type)), + throw Exception(fmt::format("write batch have a invalid total size == 0 while this kind of entry exist, write_type={}", static_cast(write.type)), ErrorCodes::LOGICAL_ERROR); } } @@ -579,20 +579,49 @@ PageMap BlobStore::read(FieldReadInfos & to_read, const ReadLimiterPtr & read_li } } - // Read with `FieldReadInfos`, buf_size must not be 0. - if (buf_size == 0) + PageMap page_map; + if (unlikely(buf_size == 0)) { - throw Exception("Reading with fields but entry size is 0.", ErrorCodes::LOGICAL_ERROR); + // We should never persist an empty column inside a block. If the buf size is 0 + // then this read with `FieldReadInfos` could be completely eliminated in the upper + // layer. Log a warning to check if it happens. + { + FmtBuffer buf; + buf.joinStr( + to_read.begin(), + to_read.end(), + [](const FieldReadInfo & info, FmtBuffer & fb) { + fb.fmtAppend("{{page_id: {}, fields: {}, entry: {}}}", info.page_id, info.fields, toDebugString(info.entry)); + }, + ","); +#ifndef NDEBUG + // throw an exception under debug mode so we should change the upper layer logic + throw Exception(fmt::format("Reading with fields but entry size is 0, read_info=[{}]", buf.toString()), ErrorCodes::LOGICAL_ERROR); +#endif + // Log a warning under production release + LOG_WARNING(log, "Reading with fields but entry size is 0, read_info=[{}]", buf.toString()); + } + + // Allocating buffer with size == 0 could lead to unexpected behavior, skip the allocating and return + for (const auto & [page_id, entry, fields] : to_read) + { + UNUSED(entry, fields); + Page page(page_id); + page.data = ByteBuffer(nullptr, nullptr); + page_map.emplace(page_id.low, std::move(page)); + } + return page_map; } - char * data_buf = static_cast(alloc(buf_size)); - MemHolder mem_holder = createMemHolder(data_buf, [&, buf_size](char * p) { + + // Allocate one for holding all pages data + char * shared_data_buf = static_cast(alloc(buf_size)); + MemHolder shared_mem_holder = createMemHolder(shared_data_buf, [&, buf_size](char * p) { free(p, buf_size); }); std::set fields_offset_in_page; - char * pos = data_buf; - PageMap page_map; + char * pos = shared_data_buf; for (const auto & [page_id_v3, entry, fields] : to_read) { size_t read_size_this_entry = 0; @@ -636,7 +665,7 @@ PageMap BlobStore::read(FieldReadInfos & to_read, const ReadLimiterPtr & read_li Page page(page_id_v3); page.data = ByteBuffer(pos, write_offset); - page.mem_holder = mem_holder; + page.mem_holder = shared_mem_holder; page.field_offsets.swap(fields_offset_in_page); fields_offset_in_page.clear(); page_map.emplace(page_id_v3.low, std::move(page)); @@ -644,11 +673,22 @@ PageMap BlobStore::read(FieldReadInfos & to_read, const ReadLimiterPtr & read_li pos = write_offset; } - if (unlikely(pos != data_buf + buf_size)) - throw Exception(fmt::format("[end_position={}] not match the [current_position={}]", - data_buf + buf_size, - pos), + if (unlikely(pos != shared_data_buf + buf_size)) + { + FmtBuffer buf; + buf.joinStr( + to_read.begin(), + to_read.end(), + [](const FieldReadInfo & info, FmtBuffer & fb) { + fb.fmtAppend("{{page_id: {}, fields: {}, entry: {}}}", info.page_id, info.fields, toDebugString(info.entry)); + }, + ","); + throw Exception(fmt::format("unexpected read size, end_pos={} current_pos={} read_info=[{}]", + shared_data_buf + buf_size, + pos, + buf.toString()), ErrorCodes::LOGICAL_ERROR); + } return page_map; } @@ -734,10 +774,21 @@ PageMap BlobStore::read(PageIDAndEntriesV3 & entries, const ReadLimiterPtr & rea } if (unlikely(pos != data_buf + buf_size)) - throw Exception(fmt::format("[end_position={}] not match the [current_position={}]", + { + FmtBuffer buf; + buf.joinStr( + entries.begin(), + entries.end(), + [](const PageIDAndEntryV3 & id_entry, FmtBuffer & fb) { + fb.fmtAppend("{{page_id: {}, entry: {}}}", id_entry.first, toDebugString(id_entry.second)); + }, + ","); + throw Exception(fmt::format("unexpected read size, end_pos={} current_pos={} read_info=[{}]", data_buf + buf_size, - pos), + pos, + buf.toString()), ErrorCodes::LOGICAL_ERROR); + } return page_map; } diff --git a/dbms/src/Storages/Page/V3/PageEntry.h b/dbms/src/Storages/Page/V3/PageEntry.h index 65a9b3955d3..3b441bd094a 100644 --- a/dbms/src/Storages/Page/V3/PageEntry.h +++ b/dbms/src/Storages/Page/V3/PageEntry.h @@ -82,13 +82,21 @@ using PageIDAndEntriesV3 = std::vector; inline String toDebugString(const PageEntryV3 & entry) { - return fmt::format("PageEntryV3{{file: {}, offset: 0x{:X}, size: {}, checksum: 0x{:X}, tag: {}, field_offsets_size: {}}}", + FmtBuffer fmt_buf; + fmt_buf.joinStr( + entry.field_offsets.begin(), + entry.field_offsets.end(), + [](const auto & offset_checksum, FmtBuffer & fb) { + fb.fmtAppend("{}", offset_checksum.first); + }, + ","); + return fmt::format("PageEntryV3{{file: {}, offset: 0x{:X}, size: {}, checksum: 0x{:X}, tag: {}, field_offsets: [{}]}}", entry.file_id, entry.offset, entry.size, entry.checksum, entry.tag, - entry.field_offsets.size()); + fmt_buf.toString()); } } // namespace PS::V3