diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index d3a5ccbfd4d..1503fc9c088 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -53,6 +53,7 @@ #include #include #include +#include #include #include #include @@ -206,6 +207,8 @@ struct ContextShared Context::ConfigReloadCallback config_reload_callback; + std::shared_ptr shared_block_schemas; + explicit ContextShared(std::shared_ptr runtime_components_factory_) : runtime_components_factory(std::move(runtime_components_factory_)) , storage_run_mode(PageStorageRunMode::ONLY_V3) @@ -1843,6 +1846,16 @@ SharedQueriesPtr Context::getSharedQueries() return shared->shared_queries; } +const std::shared_ptr & Context::getSharedBlockSchemas() const +{ + return shared->shared_block_schemas; +} + +void Context::initializeSharedBlockSchemas() +{ + shared->shared_block_schemas = std::make_shared(*this); +} + size_t Context::getMaxStreams() const { size_t max_streams = settings.max_threads; diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index 63c722dbc12..08a678de2e4 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -109,6 +109,7 @@ namespace DM class MinMaxIndexCache; class DeltaIndexManager; class GlobalStoragePool; +class SharedBlockSchemas; using GlobalStoragePoolPtr = std::shared_ptr; } // namespace DM @@ -178,7 +179,6 @@ class Context DAGContext * dag_context = nullptr; using DatabasePtr = std::shared_ptr; using Databases = std::map>; - /// Use copy constructor or createGlobal() instead Context(); @@ -511,6 +511,9 @@ class Context return disaggregated_mode == DisaggregatedMode::Storage; } + const std::shared_ptr & getSharedBlockSchemas() const; + void initializeSharedBlockSchemas(); + // todo: remove after AutoScaler is stable. void setUseAutoScaler(bool use) { diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 4dac558477b..30c3e522eba 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -67,6 +67,7 @@ #include #include #include +#include #include #include #include @@ -1200,6 +1201,8 @@ int Server::main(const std::vector & /*args*/) DM::SegmentReaderPoolManager::instance().init(server_info); DM::SegmentReadTaskScheduler::instance(); + global_context->initializeSharedBlockSchemas(); + { // Note that this must do before initialize schema sync service. do diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h index cb8f007756e..47f7a52d841 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h @@ -133,7 +133,7 @@ class ColumnFile /// been persisted in the disk and their data will be immutable. virtual bool isAppendable() const { return false; } virtual void disableAppend() {} - virtual bool append(DMContext & /*dm_context*/, const Block & /*data*/, size_t /*offset*/, size_t /*limit*/, size_t /*data_bytes*/) + virtual bool append(const DMContext & /*dm_context*/, const Block & /*data*/, size_t /*offset*/, size_t /*limit*/, size_t /*data_bytes*/) { throw Exception("Unsupported operation", ErrorCodes::LOGICAL_ERROR); } diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp index 9cfbafb1ad4..7295e57b038 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp @@ -62,7 +62,7 @@ void ColumnFileBig::serializeMetadata(WriteBuffer & buf, bool /*save_schema*/) c writeIntBinary(valid_bytes, buf); } -ColumnFilePersistedPtr ColumnFileBig::deserializeMetadata(DMContext & context, // +ColumnFilePersistedPtr ColumnFileBig::deserializeMetadata(const DMContext & context, // const RowKeyRange & segment_range, ReadBuffer & buf) { diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h index 2ac97ee8b55..c2187f96f39 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h @@ -85,7 +85,7 @@ class ColumnFileBig : public ColumnFilePersisted void serializeMetadata(WriteBuffer & buf, bool save_schema) const override; - static ColumnFilePersistedPtr deserializeMetadata(DMContext & context, // + static ColumnFilePersistedPtr deserializeMetadata(const DMContext & context, // const RowKeyRange & segment_range, ReadBuffer & buf); diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.cpp index fff0d964f42..aecce09c9af 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.cpp @@ -32,6 +32,7 @@ void ColumnFileInMemory::fillColumns(const ColumnDefines & col_defs, size_t col_ Columns read_cols; std::scoped_lock lock(cache->mutex); + const auto & colid_to_offset = schema->getColIdToOffset(); for (size_t i = col_start; i < col_end; ++i) { const auto & cd = col_defs[i]; @@ -61,7 +62,7 @@ ColumnFileInMemory::getReader(const DMContext & /*context*/, const StorageSnapsh return std::make_shared(*this, col_defs); } -bool ColumnFileInMemory::append(DMContext & context, const Block & data, size_t offset, size_t limit, size_t data_bytes) +bool ColumnFileInMemory::append(const DMContext & context, const Block & data, size_t offset, size_t limit, size_t data_bytes) { if (disable_append) return false; diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h index 74c408efdb2..9440d2c9d73 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h @@ -15,6 +15,7 @@ #pragma once #include +#include namespace DB { @@ -29,7 +30,7 @@ class ColumnFileInMemory : public ColumnFile friend class ColumnFileInMemoryReader; private: - BlockPtr schema; + ColumnFileSchemaPtr schema; UInt64 rows = 0; UInt64 bytes = 0; @@ -39,28 +40,20 @@ class ColumnFileInMemory : public ColumnFile // The cache data in memory. CachePtr cache; - // Used to map column id to column instance in a Block. - ColIdToOffset colid_to_offset; private: void fillColumns(const ColumnDefines & col_defs, size_t col_count, Columns & result) const; const DataTypePtr & getDataType(ColId column_id) const { - // Note that column_id must exist - auto index = colid_to_offset.at(column_id); - return schema->getByPosition(index).type; + return schema->getDataType(column_id); } public: - explicit ColumnFileInMemory(const BlockPtr & schema_, const CachePtr & cache_ = nullptr) + explicit ColumnFileInMemory(const ColumnFileSchemaPtr & schema_, const CachePtr & cache_ = nullptr) : schema(schema_) - , cache(cache_ ? cache_ : std::make_shared(*schema_)) - { - colid_to_offset.clear(); - for (size_t i = 0; i < schema->columns(); ++i) - colid_to_offset.emplace(schema->getByPosition(i).column_id, i); - } + , cache(cache_ ? cache_ : std::make_shared(schema_->getSchema())) + {} Type getType() const override { return Type::INMEMORY_FILE; } @@ -70,9 +63,7 @@ class ColumnFileInMemory : public ColumnFile CachePtr getCache() { return cache; } /// The schema of this pack. - BlockPtr getSchema() const { return schema; } - /// Replace the schema with a new schema, and the new schema instance should be exactly the same as the previous one. - void resetIdenticalSchema(BlockPtr schema_) { schema = schema_; } + ColumnFileSchemaPtr getSchema() const { return schema; } ColumnInMemoryFilePtr clone() { @@ -90,7 +81,7 @@ class ColumnFileInMemory : public ColumnFile { disable_append = true; } - bool append(DMContext & dm_context, const Block & data, size_t offset, size_t limit, size_t data_bytes) override; + bool append(const DMContext & dm_context, const Block & data, size_t offset, size_t limit, size_t data_bytes) override; Block readDataForFlush() const; @@ -101,7 +92,7 @@ class ColumnFileInMemory : public ColumnFile String s = "{in_memory_file,rows:" + DB::toString(rows) // + ",bytes:" + DB::toString(bytes) // + ",disable_append:" + DB::toString(disable_append) // - + ",schema:" + (schema ? schema->dumpStructure() : "none") // + + ",schema:" + (schema ? schema->toString() : "none") // + ",cache_block:" + (cache ? cache->block.dumpStructure() : "none") + "}"; return s; } diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.cpp index 31388c909e8..54bdd1e17c1 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.cpp @@ -25,12 +25,12 @@ namespace DB { namespace DM { -void serializeSchema(WriteBuffer & buf, const BlockPtr & schema) +void serializeSchema(WriteBuffer & buf, const Block & schema) { if (schema) { - writeIntBinary(static_cast(schema->columns()), buf); - for (auto & col : *schema) + writeIntBinary(static_cast(schema.columns()), buf); + for (const auto & col : schema) { writeIntBinary(col.column_id, buf); writeStringBinary(col.name, buf); @@ -105,7 +105,7 @@ void serializeSavedColumnFiles(WriteBuffer & buf, const ColumnFilePersisteds & c } } -ColumnFilePersisteds deserializeSavedColumnFiles(DMContext & context, const RowKeyRange & segment_range, ReadBuffer & buf) +ColumnFilePersisteds deserializeSavedColumnFiles(const DMContext & context, const RowKeyRange & segment_range, ReadBuffer & buf) { // Check binary version DeltaFormat::Version version; @@ -117,7 +117,7 @@ ColumnFilePersisteds deserializeSavedColumnFiles(DMContext & context, const RowK // V1 and V2 share the same deserializer. case DeltaFormat::V1: case DeltaFormat::V2: - column_files = deserializeSavedColumnFilesInV2Format(buf, version); + column_files = deserializeSavedColumnFilesInV2Format(context, buf, version); break; case DeltaFormat::V3: column_files = deserializeSavedColumnFilesInV3Format(context, segment_range, buf); diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.h index 887ba75ca10..bade97e346e 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.h @@ -35,7 +35,7 @@ class ColumnFilePersisted : public ColumnFile virtual void serializeMetadata(WriteBuffer & buf, bool save_schema) const = 0; }; -void serializeSchema(WriteBuffer & buf, const BlockPtr & schema); +void serializeSchema(WriteBuffer & buf, const Block & schema); BlockPtr deserializeSchema(ReadBuffer & buf); void serializeColumn(MemoryWriteBuffer & buf, const IColumn & column, const DataTypePtr & type, size_t offset, size_t limit, CompressionMethod compression_method, Int64 compression_level); @@ -44,13 +44,13 @@ void deserializeColumn(IColumn & column, const DataTypePtr & type, const ByteBuf /// Serialize those column files' metadata into buf. void serializeSavedColumnFiles(WriteBuffer & buf, const ColumnFilePersisteds & column_files); /// Recreate column file instances from buf. -ColumnFilePersisteds deserializeSavedColumnFiles(DMContext & context, const RowKeyRange & segment_range, ReadBuffer & buf); +ColumnFilePersisteds deserializeSavedColumnFiles(const DMContext & context, const RowKeyRange & segment_range, ReadBuffer & buf); void serializeSavedColumnFilesInV2Format(WriteBuffer & buf, const ColumnFilePersisteds & column_files); -ColumnFilePersisteds deserializeSavedColumnFilesInV2Format(ReadBuffer & buf, UInt64 version); +ColumnFilePersisteds deserializeSavedColumnFilesInV2Format(const DMContext & context, ReadBuffer & buf, UInt64 version); void serializeSavedColumnFilesInV3Format(WriteBuffer & buf, const ColumnFilePersisteds & column_files); -ColumnFilePersisteds deserializeSavedColumnFilesInV3Format(DMContext & context, const RowKeyRange & segment_range, ReadBuffer & buf); +ColumnFilePersisteds deserializeSavedColumnFilesInV3Format(const DMContext & context, const RowKeyRange & segment_range, ReadBuffer & buf); } // namespace DM } // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSchema.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSchema.cpp new file mode 100644 index 00000000000..60378550add --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSchema.cpp @@ -0,0 +1,101 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +namespace DB +{ +namespace DM +{ + +ColumnFileSchema::ColumnFileSchema(const Block & block) + : schema(block.cloneEmpty()) +{ + for (size_t i = 0; i < schema.columns(); ++i) + colid_to_offset.emplace(schema.getByPosition(i).column_id, i); +} + +const DataTypePtr & ColumnFileSchema::getDataType(ColId column_id) const +{ + /// Returns the data type of a column. + /// The specified column id must exist, otherwise something unexpected will happen. + auto index = colid_to_offset.at(column_id); + return schema.getByPosition(index).type; +} + +String ColumnFileSchema::toString() const +{ + return "{schema:" + (schema ? schema.dumpJsonStructure() : "none") + "}"; +} + +SharedBlockSchemas::SharedBlockSchemas(DB::Context & context) + : background_pool(context.getBackgroundPool()) +{ + handle = background_pool.addTask([&, this] { + std::lock_guard lock(mutex); + for (auto iter = column_file_schemas.begin(); iter != column_file_schemas.end();) + { + if (iter->second.expired()) + { + iter = column_file_schemas.erase(iter); + } + else + { + ++iter; + } + } + return true; + }, + /*multi*/ false, + /*interval_ms*/ 60000); +} + +SharedBlockSchemas::~SharedBlockSchemas() +{ + if (handle) + { + background_pool.removeTask(handle); + } +} + +ColumnFileSchemaPtr SharedBlockSchemas::find(const Digest & digest) +{ + std::lock_guard lock(mutex); + auto it = column_file_schemas.find(digest); + if (it == column_file_schemas.end()) + return nullptr; + return it->second.lock(); +} + +ColumnFileSchemaPtr SharedBlockSchemas::getOrCreate(const Block & block) +{ + Digest digest = hashSchema(block); + std::lock_guard lock(mutex); + auto it = column_file_schemas.find(digest); + if (it == column_file_schemas.end() || it->second.expired()) + { + auto schema = std::make_shared(block); + column_file_schemas.emplace(digest, schema); + return schema; + } + else + return it->second.lock(); +} + +std::shared_ptr getSharedBlockSchemas(const DMContext & context) +{ + return context.db_context.getSharedBlockSchemas(); +} +} // namespace DM +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSchema.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSchema.h new file mode 100644 index 00000000000..9cfffc8ec83 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSchema.h @@ -0,0 +1,93 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace std +{ +using Digest = UInt256; +template <> +struct hash +{ + size_t operator()(const Digest & digest) const + { + size_t seed = 0; + boost::hash_combine(seed, boost::hash_value(digest.a)); + boost::hash_combine(seed, boost::hash_value(digest.b)); + boost::hash_combine(seed, boost::hash_value(digest.c)); + boost::hash_combine(seed, boost::hash_value(digest.d)); + return seed; + } +}; +} // namespace std + +namespace DB +{ +namespace DM +{ +using Digest = UInt256; +class ColumnFileSchema +{ +private: + Block schema; + + using ColIdToOffset = std::unordered_map; + ColIdToOffset colid_to_offset; + +public: + explicit ColumnFileSchema(const Block & block); + + const DataTypePtr & getDataType(ColId column_id) const; + + String toString() const; + + const Block & getSchema() const { return schema; } + const ColIdToOffset & getColIdToOffset() const { return colid_to_offset; } +}; + +using ColumnFileSchemaPtr = std::shared_ptr; + +class SharedBlockSchemas +{ +private: + // we use sha256 to generate Digest for each ColumnFileSchema as the key of column_file_schemas, + // to minimize the possibility of two different schemas having the same key in column_file_schemas. + // Besides, we use weak_ptr to ensure we can remove the ColumnFileSchema, + // when no one use it, to avoid too much memory usage. + std::unordered_map> column_file_schemas; + std::mutex mutex; + BackgroundProcessingPool::TaskHandle handle; + BackgroundProcessingPool & background_pool; + +public: + explicit SharedBlockSchemas(DB::Context & context); + ~SharedBlockSchemas(); + + ColumnFileSchemaPtr find(const Digest & digest); + + ColumnFileSchemaPtr getOrCreate(const Block & block); +}; + +std::shared_ptr getSharedBlockSchemas(const DMContext & context); +} // namespace DM +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.cpp index f340696e16d..d6b2a26897a 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.cpp @@ -13,10 +13,13 @@ // limitations under the License. #include +#include +#include #include #include #include +#include namespace DB { @@ -28,6 +31,7 @@ Columns ColumnFileTiny::readFromCache(const ColumnDefines & column_defines, size return {}; Columns columns; + const auto & colid_to_offset = schema->getColIdToOffset(); for (size_t i = col_start; i < col_end; ++i) { const auto & cd = column_defines[i]; @@ -62,6 +66,7 @@ Columns ColumnFileTiny::readFromDisk(const PageReader & page_reader, // PageStorage::PageReadFields fields; fields.first = data_page_id; + const auto & colid_to_offset = schema->getColIdToOffset(); for (size_t index = col_start; index < col_end; ++index) { const auto & cd = column_defines[index]; @@ -132,18 +137,26 @@ ColumnFileTiny::getReader(const DMContext & /*context*/, const StorageSnapshotPt void ColumnFileTiny::serializeMetadata(WriteBuffer & buf, bool save_schema) const { - serializeSchema(buf, save_schema ? schema : BlockPtr{}); + serializeSchema(buf, save_schema ? schema->getSchema() : Block{}); writeIntBinary(data_page_id, buf); writeIntBinary(rows, buf); writeIntBinary(bytes, buf); } -std::tuple ColumnFileTiny::deserializeMetadata(ReadBuffer & buf, const BlockPtr & last_schema) +ColumnFilePersistedPtr ColumnFileTiny::deserializeMetadata(const DMContext & context, ReadBuffer & buf, ColumnFileSchemaPtr & last_schema) { - auto schema = deserializeSchema(buf); - if (!schema) + auto schema_block = deserializeSchema(buf); + std::shared_ptr schema; + + if (!schema_block) schema = last_schema; + else + { + schema = getSharedBlockSchemas(context)->getOrCreate(*schema_block); + last_schema = schema; + } + if (unlikely(!schema)) throw Exception("Cannot deserialize DeltaPackBlock's schema", ErrorCodes::LOGICAL_ERROR); @@ -154,7 +167,7 @@ std::tuple ColumnFileTiny::deserializeMetadata readIntBinary(rows, buf); readIntBinary(bytes, buf); - return {std::make_shared(schema, rows, bytes, data_page_id), std::move(schema)}; + return std::make_shared(schema, rows, bytes, data_page_id); } Block ColumnFileTiny::readBlockForMinorCompaction(const PageReader & page_reader) const @@ -171,7 +184,7 @@ Block ColumnFileTiny::readBlockForMinorCompaction(const PageReader & page_reader } else { - const auto & schema_ref = *schema; + const auto & schema_ref = schema->getSchema(); auto page = page_reader.read(data_page_id); auto columns = schema_ref.cloneEmptyColumns(); @@ -190,15 +203,17 @@ Block ColumnFileTiny::readBlockForMinorCompaction(const PageReader & page_reader } } -ColumnFileTinyPtr ColumnFileTiny::writeColumnFile(DMContext & context, const Block & block, size_t offset, size_t limit, WriteBatches & wbs, const BlockPtr & schema, const CachePtr & cache) +ColumnFileTinyPtr ColumnFileTiny::writeColumnFile(const DMContext & context, const Block & block, size_t offset, size_t limit, WriteBatches & wbs, const CachePtr & cache) { auto page_id = writeColumnFileData(context, block, offset, limit, wbs); - auto new_column_file_schema = schema ? schema : std::make_shared(block.cloneEmpty()); + + auto schema = getSharedBlockSchemas(context)->getOrCreate(block); + auto bytes = block.bytes(offset, limit); - return std::make_shared(new_column_file_schema, limit, bytes, page_id, cache); + return std::make_shared(schema, limit, bytes, page_id, cache); } -PageId ColumnFileTiny::writeColumnFileData(DMContext & context, const Block & block, size_t offset, size_t limit, WriteBatches & wbs) +PageId ColumnFileTiny::writeColumnFileData(const DMContext & context, const Block & block, size_t offset, size_t limit, WriteBatches & wbs) { auto page_id = context.storage_pool.newLogPageId(); diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h index 4ccf641cc37..7680139e4b3 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h @@ -14,7 +14,9 @@ #pragma once +#include #include +#include namespace DB { @@ -32,7 +34,7 @@ class ColumnFileTiny : public ColumnFilePersisted friend class ColumnFileTinyReader; private: - BlockPtr schema; + ColumnFileSchemaPtr schema; UInt64 rows = 0; UInt64 bytes = 0; @@ -45,8 +47,6 @@ class ColumnFileTiny : public ColumnFilePersisted /// The cache data in memory. /// Currently this field is unused. CachePtr cache; - /// Used to map column id to column instance in a Block. - ColIdToOffset colid_to_offset; private: /// Read a block of columns in `column_defines` from cache / disk, @@ -58,22 +58,17 @@ class ColumnFileTiny : public ColumnFilePersisted const DataTypePtr & getDataType(ColId column_id) const { - // Note that column_id must exist - auto index = colid_to_offset.at(column_id); - return schema->getByPosition(index).type; + return schema->getDataType(column_id); } public: - ColumnFileTiny(const BlockPtr & schema_, UInt64 rows_, UInt64 bytes_, PageId data_page_id_, const CachePtr & cache_ = nullptr) + ColumnFileTiny(const ColumnFileSchemaPtr & schema_, UInt64 rows_, UInt64 bytes_, PageId data_page_id_, const CachePtr & cache_ = nullptr) : schema(schema_) , rows(rows_) , bytes(bytes_) , data_page_id(data_page_id_) , cache(cache_) - { - for (size_t i = 0; i < schema->columns(); ++i) - colid_to_offset.emplace(schema->getByPosition(i).column_id, i); - } + {} Type getType() const override { return Type::TINY_FILE; } @@ -84,9 +79,7 @@ class ColumnFileTiny : public ColumnFilePersisted void clearCache() { cache = {}; } /// The schema of this pack. Could be empty, i.e. a DeleteRange does not have a schema. - BlockPtr getSchema() const { return schema; } - /// Replace the schema with a new schema, and the new schema instance should be exactly the same as the previous one. - void resetIdenticalSchema(BlockPtr schema_) { schema = schema_; } + ColumnFileSchemaPtr getSchema() const { return schema; } ColumnFileTinyPtr cloneWith(PageId new_data_page_id) { @@ -109,11 +102,11 @@ class ColumnFileTiny : public ColumnFilePersisted Block readBlockForMinorCompaction(const PageReader & page_reader) const; - static ColumnFileTinyPtr writeColumnFile(DMContext & context, const Block & block, size_t offset, size_t limit, WriteBatches & wbs, const BlockPtr & schema = nullptr, const CachePtr & cache = nullptr); + static ColumnFileTinyPtr writeColumnFile(const DMContext & context, const Block & block, size_t offset, size_t limit, WriteBatches & wbs, const CachePtr & cache = nullptr); - static PageId writeColumnFileData(DMContext & context, const Block & block, size_t offset, size_t limit, WriteBatches & wbs); + static PageId writeColumnFileData(const DMContext & context, const Block & block, size_t offset, size_t limit, WriteBatches & wbs); - static std::tuple deserializeMetadata(ReadBuffer & buf, const BlockPtr & last_schema); + static ColumnFilePersistedPtr deserializeMetadata(const DMContext & context, ReadBuffer & buf, ColumnFileSchemaPtr & last_schema); bool mayBeFlushedFrom(ColumnFile * from_file) const override { @@ -135,7 +128,7 @@ class ColumnFileTiny : public ColumnFilePersisted String s = "{tiny_file,rows:" + DB::toString(rows) // + ",bytes:" + DB::toString(bytes) // + ",data_page_id:" + DB::toString(data_page_id) // - + ",schema:" + (schema ? schema->dumpStructure() : "none") // + + ",schema:" + (schema ? schema->toString() : "none") // + ",cache_block:" + (cache ? cache->block.dumpStructure() : "none") + "}"; return s; } diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile_V2.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile_V2.cpp index 09a705bb22d..51240795c3c 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile_V2.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile_V2.cpp @@ -14,6 +14,7 @@ #include #include +#include #include namespace DB @@ -33,7 +34,7 @@ struct ColumnFileV2 using ColumnFileV2Ptr = std::shared_ptr; using ColumnFileV2s = std::vector; -inline ColumnFilePersisteds transform_V2_to_V3(const ColumnFileV2s & column_files_v2) +inline ColumnFilePersisteds transform_V2_to_V3(const DMContext & context, const ColumnFileV2s & column_files_v2) { ColumnFilePersisteds column_files_v3; for (const auto & f : column_files_v2) @@ -42,7 +43,10 @@ inline ColumnFilePersisteds transform_V2_to_V3(const ColumnFileV2s & column_file if (f->isDeleteRange()) f_v3 = std::make_shared(std::move(f->delete_range)); else - f_v3 = std::make_shared(f->schema, f->rows, f->bytes, f->data_page_id); + { + auto schema = getSharedBlockSchemas(context)->getOrCreate(*(f->schema)); + f_v3 = std::make_shared(schema, f->rows, f->bytes, f->data_page_id); + } column_files_v3.push_back(f_v3); } @@ -64,7 +68,7 @@ inline ColumnFileV2s transformSaved_V3_to_V2(const ColumnFilePersisteds & column { f_v2->rows = f_tiny_file->getRows(); f_v2->bytes = f_tiny_file->getBytes(); - f_v2->schema = f_tiny_file->getSchema(); + f_v2->schema = std::make_shared(f_tiny_file->getSchema()->getSchema()); f_v2->data_page_id = f_tiny_file->getDataPageId(); } else @@ -152,12 +156,11 @@ inline ColumnFileV2Ptr deserializeColumnFile_V2(ReadBuffer & buf, UInt64 version } readIntBinary(column_file->data_page_id, buf); - column_file->schema = deserializeSchema(buf); return column_file; } -ColumnFilePersisteds deserializeSavedColumnFilesInV2Format(ReadBuffer & buf, UInt64 version) +ColumnFilePersisteds deserializeSavedColumnFilesInV2Format(const DMContext & context, ReadBuffer & buf, UInt64 version) { size_t size; readIntBinary(size, buf); @@ -175,7 +178,7 @@ ColumnFilePersisteds deserializeSavedColumnFilesInV2Format(ReadBuffer & buf, UIn } column_files.push_back(column_file); } - return transform_V2_to_V3(column_files); + return transform_V2_to_V3(context, column_files); } } // namespace DM diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile_V3.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile_V3.cpp index dcf063b2fe1..cb7378c5d24 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile_V3.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile_V3.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include namespace DB @@ -24,7 +25,7 @@ namespace DM void serializeSavedColumnFilesInV3Format(WriteBuffer & buf, const ColumnFilePersisteds & column_files) { writeIntBinary(column_files.size(), buf); - BlockPtr last_schema; + ColumnFileSchemaPtr last_schema; for (const auto & column_file : column_files) { @@ -61,13 +62,13 @@ void serializeSavedColumnFilesInV3Format(WriteBuffer & buf, const ColumnFilePers } } -ColumnFilePersisteds deserializeSavedColumnFilesInV3Format(DMContext & context, const RowKeyRange & segment_range, ReadBuffer & buf) +ColumnFilePersisteds deserializeSavedColumnFilesInV3Format(const DMContext & context, const RowKeyRange & segment_range, ReadBuffer & buf) { size_t column_file_count; readIntBinary(column_file_count, buf); ColumnFilePersisteds column_files; column_files.reserve(column_file_count); - BlockPtr last_schema; + ColumnFileSchemaPtr last_schema; for (size_t i = 0; i < column_file_count; ++i) { std::underlying_type::type column_file_type; @@ -80,7 +81,7 @@ ColumnFilePersisteds deserializeSavedColumnFilesInV3Format(DMContext & context, break; case ColumnFile::Type::TINY_FILE: { - std::tie(column_file, last_schema) = ColumnFileTiny::deserializeMetadata(buf, last_schema); + column_file = ColumnFileTiny::deserializeMetadata(context, buf, last_schema); break; } case ColumnFile::Type::BIG_FILE: diff --git a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp index d3a6845f80d..b6b34df3542 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -106,17 +107,6 @@ void ColumnFilePersistedSet::recordRemoveColumnFilesPages(WriteBatches & wbs) co file->removeData(wbs); } -BlockPtr ColumnFilePersistedSet::getLastSchema() -{ - for (auto it = persisted_files.rbegin(); it != persisted_files.rend(); ++it) - { - if (auto * t_file = (*it)->tryToTinyFile(); t_file) - return t_file->getSchema(); - } - return {}; -} - - ColumnFilePersisteds ColumnFilePersistedSet::diffColumnFiles(const ColumnFiles & previous_column_files) const { // It should not be not possible that files in the snapshots are removed when calling this diff --git a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h index e7796ac55dc..7f191bfa0ca 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h +++ b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h @@ -36,7 +36,6 @@ #include #include - namespace DB { namespace DM @@ -108,8 +107,6 @@ class ColumnFilePersistedSet : public std::enable_shared_from_this(id_, persisted_files)) - , mem_table_set(std::make_shared(persisted_file_set->getLastSchema(), in_memory_files)) + , mem_table_set(std::make_shared(in_memory_files)) , delta_index(std::make_shared()) , log(Logger::get()) {} DeltaValueSpace::DeltaValueSpace(ColumnFilePersistedSetPtr && persisted_file_set_) : persisted_file_set(std::move(persisted_file_set_)) - , mem_table_set(std::make_shared(persisted_file_set->getLastSchema())) + , mem_table_set(std::make_shared()) , delta_index(std::make_shared()) , log(Logger::get()) {} diff --git a/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.cpp b/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.cpp index 47d2f67f1e0..476d59f550d 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.cpp @@ -28,25 +28,6 @@ namespace DM { void MemTableSet::appendColumnFileInner(const ColumnFilePtr & column_file) { - // If this column file's schema is identical to last_schema, then use the last_schema instance (instead of the one in `column_file`), - // so that we don't have to serialize my_schema instance. - if (auto * m_file = column_file->tryToInMemoryFile(); m_file) - { - auto my_schema = m_file->getSchema(); - if (last_schema && my_schema && last_schema != my_schema && isSameSchema(*my_schema, *last_schema)) - m_file->resetIdenticalSchema(last_schema); - else - last_schema = my_schema; - } - else if (auto * t_file = column_file->tryToTinyFile(); t_file) - { - auto my_schema = t_file->getSchema(); - if (last_schema && my_schema && last_schema != my_schema && isSameSchema(*my_schema, *last_schema)) - t_file->resetIdenticalSchema(last_schema); - else - last_schema = my_schema; - } - if (!column_files.empty()) { // As we are now appending a new column file (which can be used for new appends), @@ -212,9 +193,10 @@ void MemTableSet::appendToCache(DMContext & context, const Block & block, size_t if (!success) { + auto schema = getSharedBlockSchemas(context)->getOrCreate(block); + // Create a new column file. - auto my_schema = (last_schema && isSameSchema(block, *last_schema)) ? last_schema : std::make_shared(block.cloneEmpty()); - auto new_column_file = std::make_shared(my_schema); + auto new_column_file = std::make_shared(schema); // Must append the empty `new_column_file` to `column_files` before appending data to it, // because `appendColumnFileInner` will update stats related to `column_files` but we will update stats relate to `new_column_file` here. appendColumnFileInner(new_column_file); diff --git a/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h b/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h index 89f1e620559..a6a308fde1b 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h +++ b/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h @@ -34,9 +34,6 @@ class MemTableSet : public std::enable_shared_from_this , private boost::noncopyable { private: - /// To avoid serialize the same schema between continuous ColumnFileInMemory and ColumnFileTiny instance. - BlockPtr last_schema; - // Note that we must update `column_files_count` for outer thread-safe after `column_files` changed ColumnFiles column_files; // TODO: check the proper memory_order when use this atomic variable @@ -52,9 +49,8 @@ class MemTableSet : public std::enable_shared_from_this void appendColumnFileInner(const ColumnFilePtr & column_file); public: - explicit MemTableSet(const BlockPtr & last_schema_, const ColumnFiles & in_memory_files = {}) - : last_schema(last_schema_) - , column_files(in_memory_files) + explicit MemTableSet(const ColumnFiles & in_memory_files = {}) + : column_files(in_memory_files) , log(Logger::get()) { column_files_count = column_files.size(); @@ -63,14 +59,6 @@ class MemTableSet : public std::enable_shared_from_this rows += file->getRows(); bytes += file->getBytes(); deletes += file->getDeletes(); - if (auto * m_file = file->tryToInMemoryFile(); m_file) - { - last_schema = m_file->getSchema(); - } - else if (auto * t_file = file->tryToTinyFile(); t_file) - { - last_schema = t_file->getSchema(); - } } } diff --git a/dbms/src/Storages/DeltaMerge/Delta/MinorCompaction.cpp b/dbms/src/Storages/DeltaMerge/Delta/MinorCompaction.cpp index 6d6164279fa..90959527d30 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/MinorCompaction.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/MinorCompaction.cpp @@ -35,7 +35,7 @@ void MinorCompaction::prepare(DMContext & context, WriteBatches & wbs, const Pag if (task.is_trivial_move) continue; - auto & schema = *(task.to_compact[0]->tryToTinyFile()->getSchema()); + const auto & schema = task.to_compact[0]->tryToTinyFile()->getSchema()->getSchema(); auto compact_columns = schema.cloneEmptyColumns(); for (auto & file : task.to_compact) { @@ -55,7 +55,7 @@ void MinorCompaction::prepare(DMContext & context, WriteBatches & wbs, const Pag } Block compact_block = schema.cloneWithColumns(std::move(compact_columns)); auto compact_rows = compact_block.rows(); - auto compact_column_file = ColumnFileTiny::writeColumnFile(context, compact_block, 0, compact_rows, wbs, task.to_compact.front()->tryToTinyFile()->getSchema()); + auto compact_column_file = ColumnFileTiny::writeColumnFile(context, compact_block, 0, compact_rows, wbs); wbs.writeLogAndData(); task.result = compact_column_file; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeHelpers.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeHelpers.cpp index 1ec0785f29b..db6d9c9f814 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeHelpers.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeHelpers.cpp @@ -14,11 +14,42 @@ #include #include +#include namespace DB { namespace DM { +using Digest = UInt256; +Digest hashSchema(const Block & schema) +{ + SHA256_CTX ctx; + SHA256_Init(&ctx); + unsigned char digest_bytes[32]; + + const auto & data = schema.getColumnsWithTypeAndName(); + for (const auto & column_with_type_and_name : data) + { + // for type infos, we should use getName() instead of getTypeId(), + // because for all nullable types, getTypeId() will always return TypeIndex::Nullable in getTypeId() + // but getName() will return the real type name, e.g. Nullable(UInt64), Nullable(datetime(6)) + const auto & type = column_with_type_and_name.type->getName(); + SHA256_Update(&ctx, reinterpret_cast(type.c_str()), type.size()); + + const auto & name = column_with_type_and_name.name; + SHA256_Update(&ctx, reinterpret_cast(name.c_str()), name.size()); + + const auto & column_id = column_with_type_and_name.column_id; + SHA256_Update(&ctx, reinterpret_cast(&column_id), sizeof(column_id)); + + const auto & default_value = column_with_type_and_name.default_value.toString(); + SHA256_Update(&ctx, reinterpret_cast(default_value.c_str()), default_value.size()); + } + + SHA256_Final(digest_bytes, &ctx); + return *(reinterpret_cast(&digest_bytes)); +} + void convertColumn(Block & block, size_t pos, const DataTypePtr & to_type, const Context & context) { const IDataType * to_type_ptr = to_type.get(); diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeHelpers.h b/dbms/src/Storages/DeltaMerge/DeltaMergeHelpers.h index ad585d684c7..24b6f5efd06 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeHelpers.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeHelpers.h @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include + #include #pragma once @@ -245,6 +247,9 @@ inline bool isSameSchema(const Block & a, const Block & b) return true; } +using Digest = UInt256; +Digest hashSchema(const Block & schema); + /// This method guarantees that the returned valid block is not empty. inline Block readNextBlock(const BlockInputStreamPtr & in) { diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_column_file.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_column_file.cpp index ba0358e67e9..be3766c9b8d 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_column_file.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_column_file.cpp @@ -177,12 +177,11 @@ try ColumnFilePersisteds column_file_persisteds; size_t rows = 100; // arbitrary value auto block = DMTestEnv::prepareSimpleWriteBlock(0, rows, false); - auto schema = std::make_shared(block.cloneEmpty()); - column_file_persisteds.push_back(ColumnFileTiny::writeColumnFile(dmContext(), block, 0, rows, wbs, schema)); + column_file_persisteds.push_back(ColumnFileTiny::writeColumnFile(dmContext(), block, 0, rows, wbs)); column_file_persisteds.emplace_back(std::make_shared(RowKeyRange::newAll(false, 1))); - column_file_persisteds.push_back(ColumnFileTiny::writeColumnFile(dmContext(), block, 0, rows, wbs, schema)); + column_file_persisteds.push_back(ColumnFileTiny::writeColumnFile(dmContext(), block, 0, rows, wbs)); column_file_persisteds.emplace_back(std::make_shared(RowKeyRange::newAll(false, 1))); - column_file_persisteds.push_back(ColumnFileTiny::writeColumnFile(dmContext(), block, 0, rows, wbs, schema)); + column_file_persisteds.push_back(ColumnFileTiny::writeColumnFile(dmContext(), block, 0, rows, wbs)); serializeSavedColumnFilesInV3Format(buff, column_file_persisteds); } diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp index a3d6517a2ff..fb80ec58dd5 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp @@ -392,7 +392,7 @@ try auto [memory_cf, persisted_cf] = delta->cloneAllColumnFiles(lock, *dm_context, segment->getRowKeyRange(), wbs); ASSERT_FALSE(memory_cf.empty()); ASSERT_TRUE(persisted_cf.empty()); - BlockPtr last_schema; + ColumnFileSchemaPtr last_schema; for (const auto & column_file : memory_cf) { if (auto * t_file = column_file->tryToTinyFile(); t_file) diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 93b58916443..7c0e5cc49f4 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -36,6 +36,7 @@ #include #include #include +#include #include #include #include @@ -960,7 +961,29 @@ std::pair StorageDeltaMerg { if (cache_blocks.empty()) { - return std::make_pair(decoding_schema_snapshot, std::make_unique(createBlockSortByColumnID(decoding_schema_snapshot))); + BlockUPtr block = std::make_unique(createBlockSortByColumnID(decoding_schema_snapshot)); + auto digest = hashSchema(*block); + auto schema = global_context.getSharedBlockSchemas()->find(digest); + if (schema) + { + // Because we use sha256 to calculate the hash of schema, so schemas has extremely low probability of collision + // while we can't guarantee that there will be no collision forever, + // so (when schema changes) we will check if this schema causes a hash collision, i.e. + // the two different schemas have the same digest. + // Considering there is extremely low probability for same digest but different schema, + // we choose just throw exception when this happens. + // If unfortunately it happens, + // we can rename some columns in this table and then restart tiflash to workaround. + RUNTIME_CHECK_MSG( + isSameSchema(*block, schema->getSchema()), + "new table's schema's digest is the same as one previous table schemas' digest, \ + but schema info is not the same .So please change the new tables' schema, \ + whose table_info is {}. The collisioned schema is {}", + tidb_table_info.serialize(), + schema->toString()); + } + + return std::make_pair(decoding_schema_snapshot, std::move(block)); } else { diff --git a/dbms/src/TestUtils/TiFlashTestEnv.cpp b/dbms/src/TestUtils/TiFlashTestEnv.cpp index 019b1d5a6a5..8ae2aa4c35a 100644 --- a/dbms/src/TestUtils/TiFlashTestEnv.cpp +++ b/dbms/src/TestUtils/TiFlashTestEnv.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -137,6 +138,8 @@ void TiFlashTestEnv::addGlobalContext(Strings testdata_path, PageStorageRunMode auto & path_pool = global_context->getPathPool(); global_context->getTMTContext().restore(path_pool); + + global_context->initializeSharedBlockSchemas(); } Context TiFlashTestEnv::getContext(const DB::Settings & settings, Strings testdata_path)