diff --git a/dbms/src/Common/ErrorCodes.cpp b/dbms/src/Common/ErrorCodes.cpp index b96794bfaf8..8c07700d751 100644 --- a/dbms/src/Common/ErrorCodes.cpp +++ b/dbms/src/Common/ErrorCodes.cpp @@ -384,6 +384,7 @@ namespace ErrorCodes extern const int THEFLASH_ENCODER_ERROR = 9002; extern const int THEFLASH_SESSION_ERROR = 9003; extern const int DECIMAL_OVERFLOW_ERROR = 9004; + extern const int FILE_SIZE_NOT_MATCH = 9005; extern const int LOCK_EXCEPTION = 10000; } diff --git a/dbms/src/Common/PersistedContainer.h b/dbms/src/Common/PersistedContainer.h index edbde2d9f51..d057460eae1 100644 --- a/dbms/src/Common/PersistedContainer.h +++ b/dbms/src/Common/PersistedContainer.h @@ -10,34 +10,71 @@ #include #include +#include +#include #include #include #include #include -// TODO find a way to unify PersistedContainerSetOrVector and PersistedContainerMap, and Write & Read should be able to use lambda. +namespace DB +{ + +namespace ErrorCodes +{ +extern const int CHECKSUM_DOESNT_MATCH; +} + +constexpr UInt8 PERSISTED_CONTAINER_MAGIC_WORD = 0xFF; +constexpr size_t HASH_CODE_LENGTH = sizeof(DB::HashingWriteBuffer::uint128); -template class Container, class Write, class Read> -struct PersistedContainerSetOrVector +template +struct PersistedContainer : public Trait { public: - PersistedContainerSetOrVector(const std::string & path_) : path(path_) {} + using Container = typename Trait::Container; + using Write = typename Trait::Write; + using Read = typename Trait::Read; - Container & get() { return container; } + explicit PersistedContainer(const std::string & path_) : path(path_) {} + + auto & get() { return container; } void persist() { std::string tmp_file_path = path + ".tmp." + DB::toString(Poco::Timestamp().epochMicroseconds()); - DB::WriteBufferFromFile file_buf(tmp_file_path, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_TRUNC | O_CREAT); - size_t size = container.size(); - writeIntBinary(size, file_buf); - for (const T & t : container) { - write(t, file_buf); - } - file_buf.next(); - file_buf.sync(); + DB::WriteBufferFromFile file_buf(tmp_file_path, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_TRUNC | O_CREAT); + file_buf.seek(HASH_CODE_LENGTH); + DB::HashingWriteBuffer hash_buf(file_buf); + size_t size = container.size(); + writeIntBinary(size, hash_buf); + if constexpr (is_map) + { + for (auto && [k, v] : container) + { + writeIntBinary(PERSISTED_CONTAINER_MAGIC_WORD, hash_buf); + write(k, v, hash_buf); + } + } + else + { + for (const auto & t : container) + { + writeIntBinary(PERSISTED_CONTAINER_MAGIC_WORD, hash_buf); + write(t, hash_buf); + } + } + hash_buf.next(); + + auto hashcode = hash_buf.getHash(); + file_buf.seek(0); + writeIntBinary(hashcode.first, file_buf); + writeIntBinary(hashcode.second, file_buf); + + file_buf.sync(); + } Poco::File(tmp_file_path).renameTo(path); } @@ -46,19 +83,38 @@ struct PersistedContainerSetOrVector if (!Poco::File(path).exists()) return; DB::ReadBufferFromFile file_buf(path, DBMS_DEFAULT_BUFFER_SIZE, O_RDONLY); + + DB::HashingReadBuffer::uint128 expected_hashcode; + readIntBinary(expected_hashcode.first, file_buf); + readIntBinary(expected_hashcode.second, file_buf); + + DB::HashingReadBuffer hash_buf(file_buf); size_t size; - readIntBinary(size, file_buf); + readIntBinary(size, hash_buf); + UInt8 word; for (size_t i = 0; i < size; ++i) { - if constexpr (is_set) + readIntBinary(word, hash_buf); + if (word != PERSISTED_CONTAINER_MAGIC_WORD) + throw DB::Exception("Magic word does not match!", DB::ErrorCodes::CHECKSUM_DOESNT_MATCH); + + if constexpr (is_map) + { + const auto && [k, v] = read(hash_buf); + container.emplace(k, v); + } + else if constexpr (is_set) { - container.insert(std::move(read(file_buf))); + container.insert(std::move(read(hash_buf))); } else { - container.push_back(std::move(read(file_buf))); + container.push_back(std::move(read(hash_buf))); } } + auto hashcode = hash_buf.getHash(); + if (hashcode != expected_hashcode) + throw DB::Exception("Hashcode does not match!", DB::ErrorCodes::CHECKSUM_DOESNT_MATCH); } void drop() @@ -66,80 +122,41 @@ struct PersistedContainerSetOrVector Poco::File f(path); if (f.exists()) f.remove(false); - Container tmp; + Container tmp; container.swap(tmp); } private: std::string path; - Container container; + Container container; Write write{}; Read read{}; }; -template class Container, - class Write, - class Read> -struct PersistedContainerMap +template class C, typename W, typename R> +struct MapTrait { -public: - PersistedContainerMap(const std::string & path_) : path(path_) {} - - Container & get() { return container; } - - void persist() - { - std::string tmp_file_path = path + ".tmp." + DB::toString(Poco::Timestamp().epochMicroseconds()); - DB::WriteBufferFromFile file_buf(tmp_file_path, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_TRUNC | O_CREAT); - size_t size = container.size(); - writeIntBinary(size, file_buf); - for (auto && [k, v] : container) - { - write(k, v, file_buf); - } - file_buf.next(); - file_buf.sync(); - - Poco::File(tmp_file_path).renameTo(path); - } - - void restore() - { - if (!Poco::File(path).exists()) - return; - DB::ReadBufferFromFile file_buf(path, DBMS_DEFAULT_BUFFER_SIZE, O_RDONLY); - size_t size; - readIntBinary(size, file_buf); - for (size_t i = 0; i < size; ++i) - { - const auto && [k, v] = read(file_buf); - container.emplace(k, v); - } - } - - void drop() - { - Poco::File f(path); - if (f.exists()) - f.remove(false); - Container tmp; - container.swap(tmp); - } + using Container = C; + using Write = W; + using Read = R; +}; -private: - std::string path; - Container container; - Write write{}; - Read read{}; +template class C, typename W, typename R> +struct VecSetTrait +{ + using Container = C; + using Write = W; + using Read = R; }; -template class Container, class Write, class Read> -using PersistedContainerSet = PersistedContainerSetOrVector; +template class C, class Write, class Read> +using PersistedContainerMap = PersistedContainer>; + +template class C, class Write, class Read> +using PersistedContainerSet = PersistedContainer>; -template class Container, class Write, class Read> -using PersistedContainerVector = PersistedContainerSetOrVector; +template class C, class Write, class Read> +using PersistedContainerVector = PersistedContainer>; struct UInt64Write { @@ -177,3 +194,5 @@ struct UInt64StringRead }; using PersistedUnorderedUInt64ToStringMap = PersistedContainerMap; + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Common/tests/persisted_container.cpp b/dbms/src/Common/tests/persisted_container.cpp index 564a9a1c258..5da2c2ff075 100644 --- a/dbms/src/Common/tests/persisted_container.cpp +++ b/dbms/src/Common/tests/persisted_container.cpp @@ -5,15 +5,21 @@ #include +using namespace DB; + int main(int, char **) { + auto clear_file = [=](std::string path) { + Poco::File file(path); + if (file.exists()) + file.remove(); + }; + { std::string file_path = "persisted_container_set_test.dat"; - SCOPE_EXIT({ - Poco::File file(file_path); - if (file.exists()) - file.remove(); - }); + clear_file(file_path); + SCOPE_EXIT({ clear_file(file_path); }); + { PersistedUnorderedUInt64Set set(file_path); set.restore(); @@ -38,11 +44,9 @@ int main(int, char **) { std::string file_path = "persisted_container_map_test.dat"; - SCOPE_EXIT({ - Poco::File file(file_path); - if (file.exists()) - file.remove(); - }); + clear_file(file_path); + SCOPE_EXIT({ clear_file(file_path); }); + { PersistedUnorderedUInt64ToStringMap map(file_path); map.restore(); diff --git a/dbms/src/Storages/Transaction/HashCheckHelper.cpp b/dbms/src/Storages/Transaction/HashCheckHelper.cpp new file mode 100644 index 00000000000..e960d5c746d --- /dev/null +++ b/dbms/src/Storages/Transaction/HashCheckHelper.cpp @@ -0,0 +1,96 @@ +#include + +#include + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int FILE_SIZE_NOT_MATCH; +extern const int FILE_DOESNT_EXIST; +extern const int CANNOT_OPEN_FILE; +extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR; +extern const int UNEXPECTED_END_OF_FILE; +extern const int CHECKSUM_DOESNT_MATCH; +extern const int CANNOT_SEEK_THROUGH_FILE; +} // namespace ErrorCodes + + +namespace FileHashCheck +{ +void readFileFully(const std::string & path, int fd, off_t file_offset, size_t read_size, char * data) +{ + + if (-1 == ::lseek(fd, file_offset, SEEK_SET)) + throwFromErrno("Cannot seek through file " + path, ErrorCodes::CANNOT_SEEK_THROUGH_FILE); + + char * pos = data; + size_t remain = read_size; + while (remain) + { + auto res = ::read(fd, pos, remain); + if (-1 == res && errno != EINTR) + throwFromErrno("Cannot read from file " + path, ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR); + if (!res && errno != EINTR) + throwFromErrno("End of file", ErrorCodes::UNEXPECTED_END_OF_FILE); + + remain -= res; + pos += res; + } +} + +void checkObjectHashInFile(const std::string & path, const std::vector & object_bytes, const std::vector & use, + const std::vector & expected_hash_codes, size_t block_size) +{ + Poco::File file(path); + size_t file_size = file.getSize(); + size_t total_size = 0; + size_t max_size = 0; + for (auto b : object_bytes) + { + total_size += b; + max_size = std::max(max_size, b); + } + if (total_size != file_size) + throw Exception("File size not match! Expected: " + DB::toString(total_size) + ", got: " + DB::toString(file_size), + ErrorCodes::FILE_SIZE_NOT_MATCH); + + char * object_data_buf = (char *)malloc(max_size); + SCOPE_EXIT({ free(object_data_buf); }); + + auto fd = open(path.c_str(), O_RDONLY); + if (-1 == fd) + throwFromErrno("Cannot open file " + path, errno == ENOENT ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE); + SCOPE_EXIT({ ::close(fd); }); + + off_t file_offset = 0; + for (size_t index = 0; index < object_bytes.size(); ++index) + { + if (use[index]) + { + uint128 hashcode{0, 0}; + size_t bytes = object_bytes[index]; + char * pos = object_data_buf; + + readFileFully(path, fd, file_offset, bytes, object_data_buf); + + while (bytes) + { + auto to_cal_bytes = std::min(bytes, block_size); + hashcode = CityHash_v1_0_2::CityHash128WithSeed(pos, to_cal_bytes, hashcode); + pos += to_cal_bytes; + bytes -= to_cal_bytes; + } + + if (hashcode != expected_hash_codes[index]) + throw Exception( + "File " + path + " hash code not match at object index: " + DB::toString(index), ErrorCodes::CHECKSUM_DOESNT_MATCH); + } + + file_offset += object_bytes[index]; + } +} +} // namespace FileHashCheck + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/Transaction/HashCheckHelper.h b/dbms/src/Storages/Transaction/HashCheckHelper.h new file mode 100644 index 00000000000..c3b1e7e62cd --- /dev/null +++ b/dbms/src/Storages/Transaction/HashCheckHelper.h @@ -0,0 +1,26 @@ +#pragma once + +#include + +#include + +#include +#include +#include + +namespace DB +{ + +namespace FileHashCheck +{ + +using uint128 = CityHash_v1_0_2::uint128; + +char * readFileFully(const std::string & path, size_t file_size); +void checkObjectHashInFile(const std::string & path, const std::vector & object_bytes, const std::vector & use, + const std::vector & expected_hash_codes, size_t block_size = DBMS_DEFAULT_HASHING_BLOCK_SIZE); + + +} // namespace FileHashCheck + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/Transaction/Region.cpp b/dbms/src/Storages/Transaction/Region.cpp index 87b90389b7d..fa40d1d2a97 100644 --- a/dbms/src/Storages/Transaction/Region.cpp +++ b/dbms/src/Storages/Transaction/Region.cpp @@ -10,7 +10,10 @@ namespace DB namespace ErrorCodes { extern const int LOGICAL_ERROR; -} +extern const int UNKNOWN_FORMAT_VERSION; +} // namespace ErrorCodes + +const UInt32 Region::CURRENT_VERSION = 0; const String Region::lock_cf_name = "lock"; const String Region::default_cf_name = "default"; @@ -399,7 +402,9 @@ size_t Region::serialize(WriteBuffer & buf) { std::lock_guard lock(mutex); - size_t total_size = meta.serialize(buf); + size_t total_size = writeBinary2(Region::CURRENT_VERSION, buf); + + total_size += meta.serialize(buf); total_size += writeBinary2(data_cf.size(), buf); for (auto && [key, value] : data_cf) @@ -426,6 +431,11 @@ size_t Region::serialize(WriteBuffer & buf) RegionPtr Region::deserialize(ReadBuffer & buf) { + auto version = readBinary2(buf); + if (version != Region::CURRENT_VERSION) + throw Exception("Unexpected region version: " + DB::toString(version) + ", expected: " + DB::toString(CURRENT_VERSION), + ErrorCodes::UNKNOWN_FORMAT_VERSION); + auto region = std::make_shared(RegionMeta::deserialize(buf)); auto size = readBinary2(buf); diff --git a/dbms/src/Storages/Transaction/Region.h b/dbms/src/Storages/Transaction/Region.h index 4e2d6138b0c..137c3723db6 100644 --- a/dbms/src/Storages/Transaction/Region.h +++ b/dbms/src/Storages/Transaction/Region.h @@ -27,6 +27,8 @@ using Regions = std::vector; class Region : public std::enable_shared_from_this { public: + const static UInt32 CURRENT_VERSION; + const static String lock_cf_name; const static String default_cf_name; const static String write_cf_name; diff --git a/dbms/src/Storages/Transaction/RegionFile.cpp b/dbms/src/Storages/Transaction/RegionFile.cpp index cecd4dcadeb..3d02bc2c363 100644 --- a/dbms/src/Storages/Transaction/RegionFile.cpp +++ b/dbms/src/Storages/Transaction/RegionFile.cpp @@ -1,14 +1,24 @@ +#include #include namespace DB { +namespace ErrorCodes +{ +extern const int UNKNOWN_FORMAT_VERSION; +extern const int CHECKSUM_DOESNT_MATCH; +} // namespace ErrorCodes + +static constexpr size_t REGION_FILE_INDEX_BUFFER_SIZE = 24576; + +const UInt32 RegionFile::CURRENT_VERSION = 0; + RegionFile::Writer::Writer(RegionFile & region_file) : data_file_size(region_file.file_size), data_file_buf(region_file.dataPath(), DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_WRONLY | O_CREAT), - index_file_buf(region_file.indexPath(), DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_WRONLY | O_CREAT) -{ -} + index_file_buf(region_file.indexPath(), REGION_FILE_INDEX_BUFFER_SIZE, O_APPEND | O_WRONLY | O_CREAT) +{} RegionFile::Writer::~Writer() { @@ -19,12 +29,16 @@ RegionFile::Writer::~Writer() size_t RegionFile::Writer::write(const RegionPtr & region) { - // index file format: [ region_id(8 bytes), region_size(8 bytes), reserve(8 bytes) ] , [ ... ] - size_t region_size = region->serialize(data_file_buf); + HashingWriteBuffer hash_buf(data_file_buf); + size_t region_size = region->serialize(hash_buf); + auto hashcode = hash_buf.getHash(); + // index file format: [ version(4 bytes), region_id(8 bytes), region_size(8 bytes), region hash(16 bytes] , [ ... ] ... + writeIntBinary(RegionFile::CURRENT_VERSION, index_file_buf); writeIntBinary(region->id(), index_file_buf); writeIntBinary(region_size, index_file_buf); - writeIntBinary((UInt64)0, index_file_buf); // reserve 8 bytes + writeIntBinary(hashcode.first, index_file_buf); + writeIntBinary(hashcode.second, index_file_buf); data_file_size += region_size; @@ -32,43 +46,57 @@ size_t RegionFile::Writer::write(const RegionPtr & region) } RegionFile::Reader::Reader(RegionFile & region_file) - : data_file_buf(region_file.dataPath(), std::min(region_file.file_size, static_cast(DBMS_DEFAULT_BUFFER_SIZE)), O_RDONLY) + : data_path(region_file.dataPath()), + data_file_buf(data_path, std::min(region_file.file_size, static_cast(DBMS_DEFAULT_BUFFER_SIZE)), O_RDONLY) { - // TODO: remove number 24576 - ReadBufferFromFile index_file_buf(region_file.indexPath(), 24576, O_RDONLY); - + ReadBufferFromFile index_file_buf(region_file.indexPath(), REGION_FILE_INDEX_BUFFER_SIZE, O_RDONLY); while (!index_file_buf.eof()) { - auto region_id = readBinary2(index_file_buf); + auto version = readBinary2(index_file_buf); + if (version != RegionFile::CURRENT_VERSION) + throw Exception( + "Unexpected region file index version: " + DB::toString(version) + ", expected: " + DB::toString(CURRENT_VERSION)); + auto region_id = readBinary2(index_file_buf); auto region_size = readBinary2(index_file_buf); - // reserve 8 bytes - readBinary2(index_file_buf); + HashingWriteBuffer::uint128 hashcode; + readIntBinary(hashcode.first, index_file_buf); + readIntBinary(hashcode.second, index_file_buf); + + metas.emplace_back(region_id, region_size, hashcode); + } +} - metas.emplace_back(region_id, region_size); +void RegionFile::Reader::checkHash(const std::vector & use) +{ + std::vector region_bytes(use.size()); + std::vector expected_hashcodes(use.size()); + for (size_t index = 0; index < use.size(); ++index) + { + region_bytes[index] = metas[index].region_size; + expected_hashcodes[index] = metas[index].hashcode; } + FileHashCheck::checkObjectHashInFile(data_path, region_bytes, use, expected_hashcodes); } RegionID RegionFile::Reader::hasNext() { - if (cur_region_index >= metas.size()) + if (next_region_index >= metas.size()) return InvalidRegionID; - // TODO: hasNext should be a const method - auto & meta = metas[cur_region_index++]; - cur_region_size = meta.region_size; - return meta.region_id; + next_region_meta = &metas[next_region_index++]; + return next_region_meta->region_id; } RegionPtr RegionFile::Reader::next() { - cur_region_offset += cur_region_size; + next_region_offset += next_region_meta->region_size; return Region::deserialize(data_file_buf); } void RegionFile::Reader::skipNext() { - data_file_buf.seek(cur_region_offset + cur_region_size); - cur_region_offset += cur_region_size; + next_region_offset += next_region_meta->region_size; + data_file_buf.seek(next_region_offset); } bool RegionFile::tryCoverRegion(RegionID region_id, RegionFile & other) @@ -85,15 +113,9 @@ bool RegionFile::tryCoverRegion(RegionID region_id, RegionFile & other) return false; } -bool RegionFile::addRegion(RegionID region_id, size_t region_size) -{ - return !regions.insert_or_assign(region_id, region_size).second; -} +bool RegionFile::addRegion(RegionID region_id, size_t region_size) { return !regions.insert_or_assign(region_id, region_size).second; } -bool RegionFile::dropRegion(RegionID region_id) -{ - return regions.erase(region_id) != 0; -} +bool RegionFile::dropRegion(RegionID region_id) { return regions.erase(region_id) != 0; } /// Remove underlying file and clean up resources. void RegionFile::destroy() @@ -140,24 +162,12 @@ Float64 RegionFile::useRate() return ((Float64)size) / file_size; } -std::string RegionFile::dataPath() -{ - return dataPath(file_id); -} +std::string RegionFile::dataPath() { return dataPath(file_id); } -std::string RegionFile::indexPath() -{ - return indexPath(file_id); -} +std::string RegionFile::indexPath() { return indexPath(file_id); } -std::string RegionFile::dataPath(UInt64 the_file_id) -{ - return parent_path + DB::toString(the_file_id) + REGION_DATA_FILE_SUFFIX; -} +std::string RegionFile::dataPath(UInt64 the_file_id) { return parent_path + DB::toString(the_file_id) + REGION_DATA_FILE_SUFFIX; } -std::string RegionFile::indexPath(UInt64 the_file_id) -{ - return parent_path + DB::toString(the_file_id) + REGION_INDEX_FILE_SUFFIX; -} +std::string RegionFile::indexPath(UInt64 the_file_id) { return parent_path + DB::toString(the_file_id) + REGION_INDEX_FILE_SUFFIX; } } // namespace DB diff --git a/dbms/src/Storages/Transaction/RegionFile.h b/dbms/src/Storages/Transaction/RegionFile.h index fe8ca6e3091..5ed83e4acb9 100644 --- a/dbms/src/Storages/Transaction/RegionFile.h +++ b/dbms/src/Storages/Transaction/RegionFile.h @@ -7,6 +7,7 @@ #include +#include #include #include @@ -15,7 +16,7 @@ namespace DB { -static const std::string REGION_DATA_FILE_SUFFIX = ".rgn"; +static const std::string REGION_DATA_FILE_SUFFIX = ".rgn"; static const std::string REGION_INDEX_FILE_SUFFIX = ".idx"; class RegionFile; @@ -27,6 +28,8 @@ class RegionFile public: using RegionAndSizeMap = std::unordered_map; + static const UInt32 CURRENT_VERSION; + class Writer : private boost::noncopyable { public: @@ -38,7 +41,7 @@ class RegionFile private: // It is a reference to file_size in RegionFile, will be updated after write. - off_t & data_file_size; + off_t & data_file_size; WriteBufferFromFile data_file_buf; WriteBufferFromFile index_file_buf; }; @@ -50,12 +53,17 @@ class RegionFile struct PersistMeta { - PersistMeta(RegionID region_id_, UInt64 region_size_) : region_id(region_id_), region_size(region_size_) {} + PersistMeta(RegionID region_id_, UInt64 region_size_, HashingWriteBuffer::uint128 hashcode_) + : region_id(region_id_), region_size(region_size_), hashcode(hashcode_) + {} RegionID region_id; UInt64 region_size; + HashingWriteBuffer::uint128 hashcode; }; + void checkHash(const std::vector & use); + RegionID hasNext(); RegionPtr next(); void skipNext(); @@ -63,13 +71,14 @@ class RegionFile const std::vector & regionMetas() { return metas; } private: + std::string data_path; ReadBufferFromFile data_file_buf; std::vector metas; - size_t cur_region_index = 0; - size_t cur_region_offset = 0; - size_t cur_region_size = 0; + size_t next_region_index = 0; + size_t next_region_offset = 0; + PersistMeta * next_region_meta = nullptr; }; RegionFile(UInt64 file_id_, const std::string & parent_path_) @@ -110,7 +119,7 @@ class RegionFile std::string indexPath(UInt64 the_file_id); private: - UInt64 file_id; + UInt64 file_id; std::string parent_path; RegionAndSizeMap regions; diff --git a/dbms/src/Storages/Transaction/RegionPartition.cpp b/dbms/src/Storages/Transaction/RegionPartition.cpp index ba269567e52..56d6a667228 100644 --- a/dbms/src/Storages/Transaction/RegionPartition.cpp +++ b/dbms/src/Storages/Transaction/RegionPartition.cpp @@ -76,11 +76,10 @@ RegionPartition::Table & RegionPartition::getOrCreateTable(TableID table_id) return it->second; } -size_t RegionPartition::selectPartitionId(Table & table, RegionID region_id) +size_t RegionPartition::selectPartitionId(Table & table, RegionID /*region_id*/) { // size_t partition_id = rng() % table.partitions.get().size(); size_t partition_id = (next_partition_id++) % table.partitions.get().size(); - LOG_DEBUG(log, "Table " << table.table_id << " assign region " << region_id << " to partition " << partition_id); return partition_id; } @@ -115,13 +114,13 @@ std::pair RegionPartition::getOrInser // Currently region_id does not exist in any regions in this table, let's insert into one. size_t partition_id = selectPartitionId(table, region_id); + LOG_DEBUG(log, "Table " << table_id << " assign region " << region_id << " to partition " << partition_id); + return insertRegion(table, partition_id, region_id); } void RegionPartition::updateRegionRange(const RegionPtr & region) { - std::lock_guard lock(mutex); - auto region_id = region->id(); const auto range = region->getRange(); @@ -272,12 +271,24 @@ RegionPartition::RegionPartition(Context & context_, const std::string & parent_ for (PartitionID partition_id = 0; partition_id < table.partitions.get().size(); ++partition_id) { auto & partition = table.partitions.get()[partition_id]; - for (auto region_id : partition.region_ids) + auto it = partition.region_ids.begin(); + while (it != partition.region_ids.end()) { // Update cache infos + auto region_id = *it; auto region_ptr = region_fetcher(region_id); if (!region_ptr) - throw Exception("Region with id " + DB::toString(region_id) + " not found", ErrorCodes::LOGICAL_ERROR); + { + // It could happen that process crash after region split or region snapshot apply, + // and region has not been persisted, but region <-> partition mapping does. + it = partition.region_ids.erase(it); + LOG_WARNING(log, "Region " << region_id << " not found from KVStore, dropped."); + continue; + } + else + { + ++it; + } partition.cache_bytes += region_ptr->dataSize(); // Update region_id -> table_id & partition_id @@ -288,6 +299,8 @@ RegionPartition::RegionPartition(Context & context_, const std::string & parent_ region_info.table_to_partition.emplace(table_id, partition_id); } } + + table.partitions.persist(); } } @@ -372,6 +385,7 @@ void RegionPartition::splitRegion(const RegionPtr & region, std::vectorid(); size_t new_partition_id; while ((new_partition_id = selectPartitionId(table, split_region_id)) == current_partition_id) {} + LOG_DEBUG(log, "Table " << table.table_id << " assign region " << split_region_id << " to partition " << new_partition_id); auto [start_field, end_field] = getRegionRangeField(range.first, range.second, table_id); move_actions.push_back({merge_tree_storage, current_partition_id, new_partition_id, start_field, end_field}); diff --git a/dbms/src/Storages/Transaction/RegionPersister.cpp b/dbms/src/Storages/Transaction/RegionPersister.cpp index 9ee2e037116..fd8fa4453aa 100644 --- a/dbms/src/Storages/Transaction/RegionPersister.cpp +++ b/dbms/src/Storages/Transaction/RegionPersister.cpp @@ -159,6 +159,8 @@ void RegionPersister::restore(RegionMap & regions) } } + reader.checkHash(use); + UInt64 region_id; size_t index = 0; while ((region_id = reader.hasNext()) != InvalidRegionID) @@ -225,16 +227,20 @@ bool RegionPersister::gc() LOG_DEBUG(log, "GC decide to merge " << merge_files.size() << " files, containing " << migrate_region_count << " regions"); - if (!merge_files.empty()) + if (!merge_files.empty() && migrate_region_count) { + // Create the GC file if needed. RegionFile * gc_file = createGCFile(); - auto writer = gc_file->createWriter(); + RegionFile::Writer gc_file_writer = gc_file->createWriter(); + for (auto & [file, migrate_region_ids] : merge_files) { auto reader = file->createReader(); auto metas = reader.regionMetas(); std::vector use = valid_regions_in_file(metas); + reader.checkHash(use); + UInt64 region_id; size_t index = 0; while ((region_id = reader.hasNext()) != InvalidRegionID) @@ -242,7 +248,7 @@ bool RegionPersister::gc() if (use[index] && migrate_region_ids.count(region_id)) { auto region = reader.next(); - auto region_size = writer.write(region); + auto region_size = gc_file_writer.write(region); { std::lock_guard map_lock(region_map_mutex); @@ -262,6 +268,7 @@ bool RegionPersister::gc() } } + if (!merge_files.empty()) { std::lock_guard map_lock(region_map_mutex); diff --git a/dbms/src/Storages/Transaction/tests/region_persister.cpp b/dbms/src/Storages/Transaction/tests/region_persister.cpp index f5747845997..34d3bbf12cf 100644 --- a/dbms/src/Storages/Transaction/tests/region_persister.cpp +++ b/dbms/src/Storages/Transaction/tests/region_persister.cpp @@ -90,7 +90,7 @@ int main(int, char **) } { - RegionPersister persister([](UInt64) -> RegionPtr { return {}; }, dir_path); + RegionPersister persister(dir_path); persister.persist(region1); persister.persist(region1); persister.persist(region2); @@ -114,7 +114,7 @@ int main(int, char **) regions.emplace(3, region3); { - RegionPersister persister([](UInt64) -> RegionPtr { return {}; }, dir_path, config); + RegionPersister persister(dir_path, config); persister.persist(region1); persister.persist(region1); persister.persist(region1); @@ -131,7 +131,7 @@ int main(int, char **) } { - RegionPersister persister([&](UInt64 id) -> RegionPtr { return regions.find(id)->second; }, dir_path, config); + RegionPersister persister(dir_path, config); RegionMap restore_regions; persister.restore(restore_regions); ASSERT_CHECK_EQUAL(3, restore_regions.size(), suc); @@ -155,7 +155,7 @@ int main(int, char **) regions.emplace(2, region2); regions.emplace(3, region3); - RegionPersister persister([&](UInt64 id) -> RegionPtr { return regions.find(id)->second; }, dir_path, config); + RegionPersister persister(dir_path, config); persister.persist(region1); persister.persist(region1); persister.persist(region1);