From a33782f24129674a9cc6108c484944c383eb1108 Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Wed, 3 Apr 2019 23:14:11 +0800 Subject: [PATCH 01/11] Refactor Region Data. --- dbms/src/Debug/dbgFuncRegion.cpp | 45 +- .../Storages/Transaction/PartitionStreams.cpp | 17 +- dbms/src/Storages/Transaction/Region.cpp | 225 +----- dbms/src/Storages/Transaction/Region.h | 78 +-- .../Transaction/RegionBlockReader.cpp | 39 +- .../Storages/Transaction/RegionBlockReader.h | 6 +- dbms/src/Storages/Transaction/RegionData.h | 646 ++++++++++++++++++ dbms/src/Storages/Transaction/RegionTable.cpp | 2 +- dbms/src/Storages/Transaction/RegionTable.h | 4 +- dbms/src/Storages/Transaction/TiKVKeyValue.h | 227 +++--- .../Storages/Transaction/tests/kvstore.cpp | 36 +- .../Transaction/tests/region_persister.cpp | 12 +- 12 files changed, 866 insertions(+), 471 deletions(-) create mode 100644 dbms/src/Storages/Transaction/RegionData.h diff --git a/dbms/src/Debug/dbgFuncRegion.cpp b/dbms/src/Debug/dbgFuncRegion.cpp index 1f9205bc9d7..15dad94e0d0 100644 --- a/dbms/src/Debug/dbgFuncRegion.cpp +++ b/dbms/src/Debug/dbgFuncRegion.cpp @@ -3,16 +3,15 @@ #include #include +#include #include #include -#include -// TODO: Remove this #include -#include #include #include +#include #include @@ -21,9 +20,9 @@ namespace DB namespace ErrorCodes { - extern const int BAD_ARGUMENTS; - extern const int UNKNOWN_TABLE; -} +extern const int BAD_ARGUMENTS; +extern const int UNKNOWN_TABLE; +} // namespace ErrorCodes TableID getTableID(Context & context, const std::string & database_name, const std::string & table_name) { @@ -49,8 +48,7 @@ void dbgFuncPutRegion(Context & context, const ASTs & args, DBGInvoker::Printer { if (args.size() != 5) { - throw Exception("Args not matched, should be: region-id, start-key, end-key, database-name, table-name", - ErrorCodes::BAD_ARGUMENTS); + throw Exception("Args not matched, should be: region-id, start-key, end-key, database-name, table-name", ErrorCodes::BAD_ARGUMENTS); } RegionID region_id = (RegionID)safeGet(typeid_cast(*args[0]).value); @@ -66,8 +64,8 @@ void dbgFuncPutRegion(Context & context, const ASTs & args, DBGInvoker::Printer tmt.kvstore->onSnapshot(region, &context); std::stringstream ss; - ss << "put region #" << region_id << ", range[" << start << ", " << end << ")" << - " to table #" << table_id << " with kvstore.onSnapshot"; + ss << "put region #" << region_id << ", range[" << start << ", " << end << ")" + << " to table #" << table_id << " with kvstore.onSnapshot"; output(ss.str()); } @@ -75,8 +73,7 @@ void dbgFuncRegionSnapshot(Context & context, const ASTs & args, DBGInvoker::Pri { if (args.size() < 5) { - throw Exception("Args not matched, should be: region-id, start-key, end-key, database-name, table-name", - ErrorCodes::BAD_ARGUMENTS); + throw Exception("Args not matched, should be: region-id, start-key, end-key, database-name, table-name", ErrorCodes::BAD_ARGUMENTS); } RegionID region_id = (RegionID)safeGet(typeid_cast(*args[0]).value); @@ -106,8 +103,7 @@ void dbgFuncRegionSnapshot(Context & context, const ASTs & args, DBGInvoker::Pri // TODO: Put data into snapshot cmd - auto reader = [&] (enginepb::SnapshotRequest * out) - { + auto reader = [&](enginepb::SnapshotRequest * out) { if (is_readed) return false; *out = req; @@ -117,8 +113,8 @@ void dbgFuncRegionSnapshot(Context & context, const ASTs & args, DBGInvoker::Pri applySnapshot(tmt.kvstore, reader, &context); std::stringstream ss; - ss << "put region #" << region_id << ", range[" << start << ", " << end << ")" << - " to table #" << table_id << " with raft commands"; + ss << "put region #" << region_id << ", range[" << start << ", " << end << ")" + << " to table #" << table_id << " with raft commands"; output(ss.str()); } @@ -128,7 +124,7 @@ std::string getRegionKeyString(const HandleID s, const TiKVKey & k) { if (s == std::numeric_limits::min() || s == std::numeric_limits::max()) { - String raw_key = k.empty() ? "" : std::get<0>(RecordKVFormat::decodeTiKVKey(k)); + String raw_key = k.empty() ? "" : RecordKVFormat::decodeTiKVKey(k); bool is_record = RecordKVFormat::isRecord(raw_key); std::stringstream ss; if (is_record) @@ -159,7 +155,7 @@ std::string getStartKeyString(TableID table_id, const TiKVKey & start_key) } catch (...) { - return"e: " + start_key.toHex(); + return "e: " + start_key.toHex(); } } @@ -172,16 +168,15 @@ std::string getEndKeyString(TableID table_id, const TiKVKey & end_key) } catch (...) { - return"e: " + end_key.toHex(); + return "e: " + end_key.toHex(); } } -void dbgFuncDumpRegion(Context& context, const ASTs& args, DBGInvoker::Printer output) +void dbgFuncDumpRegion(Context & context, const ASTs & args, DBGInvoker::Printer output) { if (args.size() > 1) { - throw Exception("Args not matched, should be: [show-region-range=false]", - ErrorCodes::BAD_ARGUMENTS); + throw Exception("Args not matched, should be: [show-region-range=false]", ErrorCodes::BAD_ARGUMENTS); } bool show_region = false; @@ -193,7 +188,7 @@ void dbgFuncDumpRegion(Context& context, const ASTs& args, DBGInvoker::Printer o RegionTable::RegionMap regions; tmt.region_table.dumpRegionMap(regions); - for (const auto & it: regions) + for (const auto & it : regions) { auto region_id = it.first; const auto & table_ids = it.second.tables; @@ -254,7 +249,7 @@ void dbgFuncRegionRmData(Context & /*context*/, const ASTs & /*args*/, DBGInvoke */ } -size_t executeQueryAndCountRows(Context & context,const std::string & query) +size_t executeQueryAndCountRows(Context & context, const std::string & query) { size_t count = 0; Context query_context = context; @@ -272,4 +267,4 @@ size_t executeQueryAndCountRows(Context & context,const std::string & query) return count; } -} +} // namespace DB diff --git a/dbms/src/Storages/Transaction/PartitionStreams.cpp b/dbms/src/Storages/Transaction/PartitionStreams.cpp index b9ca1f60a41..8992b6ef7f3 100644 --- a/dbms/src/Storages/Transaction/PartitionStreams.cpp +++ b/dbms/src/Storages/Transaction/PartitionStreams.cpp @@ -17,10 +17,19 @@ std::tuple RegionTab const TiDB::TableInfo & table_info, const ColumnsDescription & columns, const Names & ordered_columns, - std::vector * keys) + std::vector * keys) { - return getBlockInputStreamByRegion( - table_id, tmt.kvstore->getRegion(region_id), InvalidRegionVersion, InvalidRegionVersion, table_info, columns, ordered_columns, false, false, 0, keys); + return getBlockInputStreamByRegion(table_id, + tmt.kvstore->getRegion(region_id), + InvalidRegionVersion, + InvalidRegionVersion, + table_info, + columns, + ordered_columns, + false, + false, + 0, + keys); } std::tuple RegionTable::getBlockInputStreamByRegion(TableID table_id, @@ -33,7 +42,7 @@ std::tuple RegionTab bool learner_read, bool resolve_locks, UInt64 start_ts, - std::vector * keys) + std::vector * keys) { if (!region) return {nullptr, NOT_FOUND, 0}; diff --git a/dbms/src/Storages/Transaction/Region.cpp b/dbms/src/Storages/Transaction/Region.cpp index 993c012fbb5..bf47bd5fb03 100644 --- a/dbms/src/Storages/Transaction/Region.cpp +++ b/dbms/src/Storages/Transaction/Region.cpp @@ -19,86 +19,16 @@ const String Region::lock_cf_name = "lock"; const String Region::default_cf_name = "default"; const String Region::write_cf_name = "write"; -Region::KVMap::iterator Region::removeDataByWriteIt(const KVMap::iterator & write_it) -{ - auto & write_key = write_it->first; - auto & write_value = write_it->second; - auto [write_type, prewrite_ts, short_str] = RecordKVFormat::decodeWriteCfValue(write_value); - - auto bare_key = RecordKVFormat::truncateTs(write_key); - auto data_key = RecordKVFormat::appendTs(bare_key, prewrite_ts); - auto data_it = data_cf.find(data_key); - - if (write_type == PutFlag && !short_str) - { - if (unlikely(data_it == data_cf.end())) - { - throw Exception( - toString() + " key [" + data_key.toString() + "] not found in data cf when removing", ErrorCodes::LOGICAL_ERROR); - } - - cf_data_size -= data_it->first.dataSize() + data_it->second.dataSize(); - data_cf.erase(data_it); - } - - cf_data_size -= write_it->first.dataSize() + write_it->second.dataSize(); +RegionData::WriteCFIter Region::removeDataByWriteIt(const RegionData::WriteCFIter & write_it) { return data.removeDataByWriteIt(write_it); } - return write_cf.erase(write_it); -} - -Region::ReadInfo Region::readDataByWriteIt(const KVMap::const_iterator & write_it, std::vector * keys) +RegionData::ReadInfo Region::readDataByWriteIt(const RegionData::ConstWriteCFIter & write_it, std::vector * keys) { - auto & write_key = write_it->first; - auto & write_value = write_it->second; - - if (keys) - (*keys).push_back(write_key); - - auto [write_type, prewrite_ts, short_value] = RecordKVFormat::decodeWriteCfValue(write_value); - - String decode_key = std::get<0>(RecordKVFormat::decodeTiKVKey(write_key)); - auto commit_ts = RecordKVFormat::getTs(write_key); - auto handle = RecordKVFormat::getHandle(decode_key); - - if (write_type != PutFlag) - return std::make_tuple(handle, write_type, commit_ts, TiKVValue()); - - auto bare_key = RecordKVFormat::truncateTs(write_key); - auto data_key = RecordKVFormat::appendTs(bare_key, prewrite_ts); - - if (short_value) - return std::make_tuple(handle, write_type, commit_ts, TiKVValue(std::move(*short_value))); - - auto data_it = data_cf.find(data_key); - if (unlikely(data_it == data_cf.end())) - { - throw Exception(toString() + " key [" + data_key.toString() + "] not found in data cf", ErrorCodes::LOGICAL_ERROR); - } - - return std::make_tuple(handle, write_type, commit_ts, data_it->second); + return data.readDataByWriteIt(write_it, keys); } Region::LockInfoPtr Region::getLockInfo(TableID expected_table_id, UInt64 start_ts) { - for (auto && [key, value] : lock_cf) - { - auto decode_key = std::get<0>(RecordKVFormat::decodeTiKVKey(key)); - auto table_id = RecordKVFormat::getTableId(decode_key); - if (expected_table_id != table_id) - { - continue; - } - auto [lock_type, primary, ts, ttl, data] = RecordKVFormat::decodeLockCfValue(value); - std::ignore = data; - if (lock_type == DelFlag || ts > start_ts) - { - continue; - } - std::cout << "got primary lock: " << primary << std::endl; - return std::make_unique(LockInfo{primary, ts, decode_key, ttl}); - } - - return nullptr; + return data.getLockInfo(expected_table_id, start_ts); } TableID Region::insert(const std::string & cf, const TiKVKey & key, const TiKVValue & value) @@ -122,10 +52,10 @@ void Region::batchInsert(std::function && f) } } -TableID Region::doInsert(const std::string & cf, const TiKVKey & key, const TiKVValue & value) +TableID Region::doInsert(const String & cf, const TiKVKey & key, const TiKVValue & value) { // Ignoring all keys other than records. - String raw_key = std::get<0>(RecordKVFormat::decodeTiKVKey(key)); + String raw_key = RecordKVFormat::decodeTiKVKey(key); if (!RecordKVFormat::isRecord(raw_key)) return InvalidTableID; @@ -133,27 +63,20 @@ TableID Region::doInsert(const std::string & cf, const TiKVKey & key, const TiKV if (isTiDBSystemTable(table_id)) return InvalidTableID; - auto & map = getCf(cf); - auto p = map.try_emplace(key, value); - if (!p.second) - throw Exception(toString() + " found existing key [" + key.toString() + "]", ErrorCodes::LOGICAL_ERROR); - - if (cf != lock_cf_name) - cf_data_size += key.dataSize() + value.dataSize(); - - return table_id; + auto type = getCf(cf); + return data.insert(type, key, raw_key, value); } -TableID Region::remove(const std::string & cf, const TiKVKey & key) +TableID Region::remove(const String & cf, const TiKVKey & key) { std::unique_lock lock(mutex); return doRemove(cf, key); } -TableID Region::doRemove(const std::string & cf, const TiKVKey & key) +TableID Region::doRemove(const String & cf, const TiKVKey & key) { // Ignoring all keys other than records. - String raw_key = std::get<0>(RecordKVFormat::decodeTiKVKey(key)); + String raw_key = RecordKVFormat::decodeTiKVKey(key); if (!RecordKVFormat::isRecord(raw_key)) return InvalidTableID; @@ -161,21 +84,8 @@ TableID Region::doRemove(const std::string & cf, const TiKVKey & key) if (isTiDBSystemTable(table_id)) return InvalidTableID; - auto & map = getCf(cf); - auto it = map.find(key); - // TODO following exception could throw currently. - if (it == map.end()) - { - // tikv gc will delete useless data in write & default cf. - if (unlikely(&map == &lock_cf)) - LOG_WARNING(log, toString() << " key not found [" << key.toString() << "] in cf " << cf); - return table_id; - } - - map.erase(it); - - if (cf != lock_cf_name) - cf_data_size -= key.dataSize() + it->second.dataSize(); + auto type = getCf(cf); + data.remove(type, key, raw_key); return table_id; } @@ -189,7 +99,6 @@ UInt64 Region::getProbableIndex() const { return meta.appliedIndex(); } RegionPtr Region::splitInto(const RegionMeta & meta) { - auto [start_key, end_key] = meta.getRange(); RegionPtr new_region; if (client != nullptr) new_region = std::make_shared(meta, [&](pingcap::kv::RegionVerID) { @@ -198,50 +107,7 @@ RegionPtr Region::splitInto(const RegionMeta & meta) else new_region = std::make_shared(meta); - for (auto it = data_cf.begin(); it != data_cf.end();) - { - bool ok = start_key ? it->first >= start_key : true; - ok = ok && (end_key ? it->first < end_key : true); - if (ok) - { - cf_data_size -= it->first.dataSize() + it->second.dataSize(); - new_region->cf_data_size += it->first.dataSize() + it->second.dataSize(); - - new_region->data_cf.insert(std::move(*it)); - it = data_cf.erase(it); - } - else - ++it; - } - - for (auto it = write_cf.begin(); it != write_cf.end();) - { - bool ok = start_key ? it->first >= start_key : true; - ok = ok && (end_key ? it->first < end_key : true); - if (ok) - { - cf_data_size -= it->first.dataSize() + it->second.dataSize(); - new_region->cf_data_size += it->first.dataSize() + it->second.dataSize(); - - new_region->write_cf.insert(std::move(*it)); - it = write_cf.erase(it); - } - else - ++it; - } - - for (auto it = lock_cf.begin(); it != lock_cf.end();) - { - bool ok = start_key ? it->first >= start_key : true; - ok = ok && (end_key ? it->first < end_key : true); - if (ok) - { - new_region->lock_cf.insert(std::move(*it)); - it = lock_cf.erase(it); - } - else - ++it; - } + data.splitInto(meta.getRange(), new_region->data); return new_region; } @@ -433,26 +299,7 @@ size_t Region::serialize(WriteBuffer & buf, enginepb::CommandResponse * response total_size += meta.serialize(buf); - total_size += writeBinary2(data_cf.size(), buf); - for (auto && [key, value] : data_cf) - { - total_size += key.serialize(buf); - total_size += value.serialize(buf); - } - - total_size += writeBinary2(write_cf.size(), buf); - for (auto && [key, value] : write_cf) - { - total_size += key.serialize(buf); - total_size += value.serialize(buf); - } - - total_size += writeBinary2(lock_cf.size(), buf); - for (auto && [key, value] : lock_cf) - { - total_size += key.serialize(buf); - total_size += value.serialize(buf); - } + total_size += data.serialize(buf); if (response != nullptr) *response = toCommandResponse(); @@ -470,31 +317,7 @@ RegionPtr Region::deserialize(ReadBuffer & buf, const RegionClientCreateFunc * r auto region = region_client_create == nullptr ? std::make_shared(RegionMeta::deserialize(buf)) : std::make_shared(RegionMeta::deserialize(buf), *region_client_create); - auto size = readBinary2(buf); - for (size_t i = 0; i < size; ++i) - { - auto key = TiKVKey::deserialize(buf); - auto value = TiKVValue::deserialize(buf); - region->data_cf.emplace(key, value); - region->cf_data_size += key.dataSize() + value.dataSize(); - } - - size = readBinary2(buf); - for (size_t i = 0; i < size; ++i) - { - auto key = TiKVKey::deserialize(buf); - auto value = TiKVValue::deserialize(buf); - region->write_cf.emplace(key, value); - region->cf_data_size += key.dataSize() + value.dataSize(); - } - - size = readBinary2(buf); - for (size_t i = 0; i < size; ++i) - { - auto key = TiKVKey::deserialize(buf); - auto value = TiKVValue::deserialize(buf); - region->lock_cf.emplace(key, value); - } + RegionData::deserialize(buf, region->data); region->persist_parm = 0; @@ -517,14 +340,14 @@ bool Region::checkIndex(UInt64 index) return true; } -Region::KVMap & Region::getCf(const std::string & cf) +ColumnFamilyType Region::getCf(const std::string & cf) { if (cf.empty() || cf == default_cf_name) - return data_cf; + return ColumnFamilyType::Default; else if (cf == write_cf_name) - return write_cf; + return ColumnFamilyType::Write; else if (cf == lock_cf_name) - return lock_cf; + return ColumnFamilyType::Lock; else throw Exception("Illegal cf: " + cf, ErrorCodes::LOGICAL_ERROR); } @@ -539,7 +362,7 @@ void Region::setPendingRemove() meta.notifyAll(); } -size_t Region::dataSize() const { return cf_data_size; } +size_t Region::dataSize() const { return data.dataSize(); } void Region::markPersisted() { last_persist_time = Clock::now(); } @@ -613,11 +436,7 @@ void Region::reset(Region && new_region) { std::unique_lock lock(mutex); - data_cf = std::move(new_region.data_cf); - write_cf = std::move(new_region.write_cf); - lock_cf = std::move(new_region.lock_cf); - - cf_data_size = new_region.cf_data_size.load(); + data.reset(std::move(new_region.data)); incPersistParm(); diff --git a/dbms/src/Storages/Transaction/Region.h b/dbms/src/Storages/Transaction/Region.h index 271d83fbe62..efefb06e791 100644 --- a/dbms/src/Storages/Transaction/Region.h +++ b/dbms/src/Storages/Transaction/Region.h @@ -2,14 +2,11 @@ #include #include -#include - -#include -#include +#include #include -#include #include +#include #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wunused-parameter" @@ -38,39 +35,18 @@ class Region : public std::enable_shared_from_this const static String default_cf_name; const static String write_cf_name; - // In both lock_cf and write_cf. - enum CFModifyFlag : UInt8 - { - PutFlag = 'P', - DelFlag = 'D', - // useless for TiFLASH - /* - LockFlag = 'L', - // In write_cf, only raft leader will use RollbackFlag in txn mode. Learner should ignore it. - RollbackFlag = 'R', - */ - }; - - // This must be an ordered map. Many logics rely on it, like iterating. - using KVMap = std::map; + static const auto PutFlag = RegionData::CFModifyFlag::PutFlag; + static const auto DelFlag = RegionData::CFModifyFlag::DelFlag; - /// A quick-and-dirty copy of LockInfo structure in kvproto. - /// Used to transmit to client using non-ProtoBuf protocol. - struct LockInfo - { - std::string primary_lock; - UInt64 lock_version; - std::string key; - UInt64 lock_ttl; - }; - using LockInfoPtr = std::unique_ptr; + using LockInfo = RegionData::LockInfo; + using LockInfoPtr = RegionData::LockInfoPtr; using LockInfos = std::vector; class CommittedScanner : private boost::noncopyable { public: CommittedScanner(const RegionPtr & store_, TableID expected_table_id_) - : store(store_), lock(store_->mutex), expected_table_id(expected_table_id_), write_map_it(store->write_cf.cbegin()) + : store(store_), lock(store_->mutex), expected_table_id(expected_table_id_), write_map_it(store->data.write_cf.map.cbegin()) {} /// Check if next kv exists. @@ -79,21 +55,21 @@ class Region : public std::enable_shared_from_this { if (expected_table_id != InvalidTableID) { - for (; write_map_it != store->write_cf.cend(); ++write_map_it) + for (; write_map_it != store->data.write_cf.map.cend(); ++write_map_it) { - if (likely(RecordKVFormat::getTableId(write_map_it->first) == expected_table_id)) + if (likely(std::get<0>(write_map_it->first) == expected_table_id)) return expected_table_id; } } else { - if (write_map_it != store->write_cf.cend()) - return RecordKVFormat::getTableId(write_map_it->first); + if (write_map_it != store->data.write_cf.map.cend()) + return std::get<0>(write_map_it->first); } return InvalidTableID; } - auto next(std::vector * keys = nullptr) { return store->readDataByWriteIt(write_map_it++, keys); } + auto next(std::vector * keys = nullptr) { return store->readDataByWriteIt(write_map_it++, keys); } LockInfoPtr getLockInfo(TableID expected_table_id, UInt64 start_ts) { return store->getLockInfo(expected_table_id, start_ts); } @@ -102,7 +78,7 @@ class Region : public std::enable_shared_from_this std::shared_lock lock; TableID expected_table_id; - KVMap::const_iterator write_map_it; + RegionData::ConstWriteCFIter write_map_it; }; class CommittedRemover : private boost::noncopyable @@ -110,9 +86,9 @@ class Region : public std::enable_shared_from_this public: CommittedRemover(const RegionPtr & store_) : store(store_), lock(store_->mutex) {} - void remove(const TiKVKey & key) + void remove(const RegionWriteCFData::Key & key) { - if (auto it = store->write_cf.find(key); it != store->write_cf.end()) + if (auto it = store->data.write_cf.map.find(key); it != store->data.write_cf.map.end()) store->removeDataByWriteIt(it); } @@ -173,8 +149,7 @@ class Region : public std::enable_shared_from_this std::shared_lock lock1(region1.mutex); std::shared_lock lock2(region2.mutex); - return region1.meta == region2.meta && region1.data_cf == region2.data_cf && region1.write_cf == region2.write_cf - && region1.lock_cf == region2.lock_cf && region1.cf_data_size == region2.cf_data_size; + return region1.meta == region2.meta && region1.data == region2.data; } UInt64 learnerRead(); @@ -194,15 +169,15 @@ class Region : public std::enable_shared_from_this private: // Private methods no need to lock mutex, normally - TableID doInsert(const std::string & cf, const TiKVKey & key, const TiKVValue & value); - TableID doRemove(const std::string & cf, const TiKVKey & key); + TableID doInsert(const String & cf, const TiKVKey & key, const TiKVValue & value); + TableID doRemove(const String & cf, const TiKVKey & key); bool checkIndex(UInt64 index); - KVMap & getCf(const std::string & cf); + ColumnFamilyType getCf(const String & cf); - using ReadInfo = std::tuple; - ReadInfo readDataByWriteIt(const KVMap::const_iterator & write_it, std::vector * keys = nullptr); - KVMap::iterator removeDataByWriteIt(const KVMap::iterator & write_it); + RegionData::ReadInfo readDataByWriteIt( + const RegionData::ConstWriteCFIter & write_it, std::vector * keys = nullptr); + RegionData::WriteCFIter removeDataByWriteIt(const RegionData::WriteCFIter & write_it); LockInfoPtr getLockInfo(TableID expected_table_id, UInt64 start_ts); @@ -211,20 +186,13 @@ class Region : public std::enable_shared_from_this void execChangePeer(const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response, UInt64 index, UInt64 term); private: - // TODO: We should later change to lock free structure if needed. - KVMap data_cf; - KVMap write_cf; - KVMap lock_cf; - + RegionData data; mutable std::shared_mutex mutex; RegionMeta meta; pingcap::kv::RegionClientPtr client; - // Size of data cf & write cf, without lock cf. - std::atomic cf_data_size = 0; - std::atomic last_persist_time = Clock::now(); std::atomic persist_parm = 1; diff --git a/dbms/src/Storages/Transaction/RegionBlockReader.cpp b/dbms/src/Storages/Transaction/RegionBlockReader.cpp index b1c2c0d3ef6..72dd08cf9cd 100644 --- a/dbms/src/Storages/Transaction/RegionBlockReader.cpp +++ b/dbms/src/Storages/Transaction/RegionBlockReader.cpp @@ -5,7 +5,7 @@ namespace DB namespace ErrorCodes { - extern const int LOGICAL_ERROR; +extern const int LOGICAL_ERROR; } static const Field MockDecodeRow(TiDB::CodecFlag flag) @@ -35,8 +35,8 @@ static const Field MockDecodeRow(TiDB::CodecFlag flag) } } -Block RegionBlockRead(const TiDB::TableInfo & table_info, const ColumnsDescription & columns, - const Names & ordered_columns_, ScannerPtr & scanner, std::vector * keys) +Block RegionBlockRead(const TiDB::TableInfo & table_info, const ColumnsDescription & columns, const Names & ordered_columns_, + ScannerPtr & scanner, std::vector * keys) { // Note: this code below is mostly ported from RegionBlockInputStream. Names ordered_columns = ordered_columns_; @@ -54,9 +54,9 @@ Block RegionBlockRead(const TiDB::TableInfo & table_info, const ColumnsDescripti std::map> column_map; for (const auto & column_info : table_info.columns) { - Int64 col_id = column_info.id; - String col_name = column_info.name; - auto ch_col = columns.getPhysical(col_name); + Int64 col_id = column_info.id; + String col_name = column_info.name; + auto ch_col = columns.getPhysical(col_name); column_map[col_id] = std::make_pair(ch_col.type->createColumn(), ch_col); if (table_info.pk_is_handle && column_info.hasPriKeyFlag()) { @@ -66,7 +66,7 @@ Block RegionBlockRead(const TiDB::TableInfo & table_info, const ColumnsDescripti if (!table_info.pk_is_handle) { - auto ch_col = columns.getPhysical(MutableSupport::tidb_pk_column_name); + auto ch_col = columns.getPhysical(MutableSupport::tidb_pk_column_name); column_map[handle_id] = std::make_pair(ch_col.type->createColumn(), ch_col); } @@ -74,8 +74,8 @@ Block RegionBlockRead(const TiDB::TableInfo & table_info, const ColumnsDescripti // TODO: lock region, to avoid region adding/droping while writing data - TableID my_table_id = table_info.id; - TableID next_table_id = InvalidTableID; + TableID my_table_id = table_info.id; + TableID next_table_id = InvalidTableID; // Here we use do{}while() instead of while(){}, // Because the first check of scanner.hasNext() already been done outside of this function. @@ -88,8 +88,8 @@ Block RegionBlockRead(const TiDB::TableInfo & table_info, const ColumnsDescripti // TODO: optimize columns' insertion - ColumnUInt8::Container &delmark_data = delmark_col->getData(); - ColumnUInt64::Container &version_data = version_col->getData(); + ColumnUInt8::Container & delmark_data = delmark_col->getData(); + ColumnUInt64::Container & version_data = version_col->getData(); // `write_type` does not equal `Op` in proto delmark_data.resize(delmark_data.size() + 1); @@ -117,14 +117,14 @@ Block RegionBlockRead(const TiDB::TableInfo & table_info, const ColumnsDescripti for (size_t i = 0; i < row.size(); i += 2) { - Field &col_id = row[i]; + Field & col_id = row[i]; auto it = column_map.find(col_id.get()); if (it == column_map.end()) continue; - std::string tp = it->second.second.type->getName(); + const auto & tp = it->second.second.type->getName(); if (tp == "Nullable(DateTime)" || tp == "Nullable(Date)" || tp == "DateTime" || tp == "Date") { - Field &field = row[i + 1]; + Field & field = row[i + 1]; UInt64 packed = field.get(); UInt64 ymdhms = packed >> 24; UInt64 ymd = ymdhms >> 17; @@ -138,19 +138,24 @@ Block RegionBlockRead(const TiDB::TableInfo & table_info, const ColumnsDescripti int minute = int((hms >> 6) & ((1 << 6) - 1)); int hour = int(hms >> 12); - if (tp == "Nullable(DateTime)" || tp == "DataTime") { + if (tp == "Nullable(DateTime)" || tp == "DataTime") + { time_t datetime; if (unlikely(year == 0)) datetime = 0; else datetime = date_lut.makeDateTime(year, month, day, hour, minute, second); it->second.first->insert(static_cast(datetime)); - } else { + } + else + { auto date = date_lut.makeDayNum(year, month, day); Field date_field(static_cast(date)); it->second.first->insert(date_field); } - } else { + } + else + { it->second.first->insert(row[i + 1]); } } diff --git a/dbms/src/Storages/Transaction/RegionBlockReader.h b/dbms/src/Storages/Transaction/RegionBlockReader.h index 24e05e62102..9d356ef8524 100644 --- a/dbms/src/Storages/Transaction/RegionBlockReader.h +++ b/dbms/src/Storages/Transaction/RegionBlockReader.h @@ -19,12 +19,12 @@ namespace DB namespace ErrorCodes { - extern const int LOGICAL_ERROR; +extern const int LOGICAL_ERROR; } using ScannerPtr = std::unique_ptr; -Block RegionBlockRead(const TiDB::TableInfo & table_info, - const ColumnsDescription & columns, const Names & ordered_columns_, ScannerPtr & curr_scanner, std::vector * keys= nullptr); +Block RegionBlockRead(const TiDB::TableInfo & table_info, const ColumnsDescription & columns, const Names & ordered_columns_, + ScannerPtr & curr_scanner, std::vector * keys = nullptr); } // namespace DB diff --git a/dbms/src/Storages/Transaction/RegionData.h b/dbms/src/Storages/Transaction/RegionData.h new file mode 100644 index 00000000000..09ad1840fca --- /dev/null +++ b/dbms/src/Storages/Transaction/RegionData.h @@ -0,0 +1,646 @@ +#pragma once + +#include +#include + +#include +#include +#include +#include + +namespace DB +{ + +enum ColumnFamilyType +{ + Write, + Default, + Lock, +}; + +struct RegionWriteCFDataTrait +{ + using DecodedWriteCFValue = RecordKVFormat::DecodedWriteCFValue; + using Key = std::tuple; + using Value = std::tuple; + std::pair genKVPair(const TiKVKey & key, const String & raw_key, const TiKVValue & value) + { + HandleID handle_id = RecordKVFormat::getHandle(raw_key); + Timestamp ts = RecordKVFormat::getTs(key); + return {Key{handle_id, ts}, Value{key, value, RecordKVFormat::decodeWriteCfValue(value)}}; + } +}; + +struct RegionDefaultCFDataTrait +{ + using Key = std::tuple; + using Value = std::tuple; + std::pair genKVPair(const TiKVKey & key, const String & raw_key, const TiKVValue & value) + { + HandleID handle_id = RecordKVFormat::getHandle(raw_key); + Timestamp ts = RecordKVFormat::getTs(key); + return {Key{handle_id, ts}, Value{key, value}}; + } +}; + +struct RegionLockCFDataTrait +{ + using DecodedLockCFValue = RecordKVFormat::DecodedLockCFValue; + using Key = std::tuple; + using Value = std::tuple; + std::pair genKVPair(const TiKVKey & key, const String & raw_key, const TiKVValue & value) + { + HandleID handle_id = RecordKVFormat::getHandle(raw_key); + Timestamp ts = RecordKVFormat::getTs(key); + return {Key{handle_id, ts}, Value{key, value, RecordKVFormat::decodeLockCfValue(value)}}; + } +}; + +template +struct RegionCFDataBase +{ + using Key = typename Trait::Key; + using Value = typename Trait::Value; + using Map = typename std::map; + + TableID insert(const TiKVKey & key, const TiKVValue & value) + { + const String & raw_key = RecordKVFormat::decodeTiKVKey(key); + return insert(key, value, raw_key); + } + + TableID insert(const TiKVKey & key, const TiKVValue & value, const String & raw_key) + { + TableID table_id = RecordKVFormat::getTableId(raw_key); + auto & map = data[table_id]; + auto [it, ok] = map.insert(Trait::genKVPair(key, raw_key, value)); + std::ignore = it; + if (!ok) + throw Exception(" found existing key [" + key.toString() + "]", ErrorCodes::LOGICAL_ERROR); + return table_id; + } + + static size_t calcTiKVKeyValueSize(const Value & value) + { + const auto & tikv_key = std::get<0>(value); + const auto & tikv_val = std::get<1>(value); + return tikv_key.dataSize() + tikv_val.dataSize(); + } + + size_t remove(TableID table_id, const Key & key) + { + auto & map = data[table_id]; + + if (auto it = map.find(key); it != map.end()) + { + const Value & value = it->second; + size_t size = calcTiKVKeyValueSize(value); + map.erase(it); + return size; + } + else + { + const auto & [handle_id, ts] = key; + auto tikv_key = RecordKVFormat::appendTs(RecordKVFormat::genKey(table_id, handle_id), ts); + throw Exception(" key not found [" + tikv_key.toString() + "]", ErrorCodes::LOGICAL_ERROR); + + return 0; + } + } + + RegionCFDataBase(){} + RegionCFDataBase(RegionCFDataBase && region): data(std::move(region.data)) {} + RegionCFDataBase & operator = (RegionCFDataBase && region) + { + data = std::move(region.data); + return *this; + } + + std::unordered_map data; +}; + +template +bool CFDataCmp(const Map & a, const Map & b) +{ + if (a.size() != b.size()) + return false; + for (const auto & [key, value] : a) + { + if (auto it = b.find(key); it != b.end()) + { + if (std::get<0>(value) != std::get<0>(it->second) || std::get<1>(value) != std::get<1>(it->second)) + return false; + } + else + return false; + } + return true; +} + +struct RegionWriteCFData +{ + using DecodedWriteCFValue = RecordKVFormat::DecodedWriteCFValue; + using Key = std::tuple; + using Value = std::tuple; + using WriteCFDataMap = std::map; + + TableID insert(TableID table_id, HandleID handle_id, Timestamp ts, const TiKVKey & key, const TiKVValue & value) + { + auto [it, ok] = map.try_emplace({table_id, handle_id, ts}, Value{key, value, RecordKVFormat::decodeWriteCfValue(value)}); + std::ignore = it; + if (!ok) + throw Exception(" found existing key [" + key.toString() + "]", ErrorCodes::LOGICAL_ERROR); + + return table_id; + } + + TableID insert(const TiKVKey & key, const TiKVValue & value) + { + const String & raw_key = RecordKVFormat::decodeTiKVKey(key); + return insert(key, value, raw_key); + } + + TableID insert(const TiKVKey & key, const TiKVValue & value, const String & raw_key) + { + TableID table_id = RecordKVFormat::getTableId(raw_key); + HandleID handle_id = RecordKVFormat::getHandle(raw_key); + Timestamp ts = RecordKVFormat::getTs(key); + return insert(table_id, handle_id, ts, key, value); + } + + size_t remove(TableID table_id, HandleID handle_id, Timestamp ts) + { + if (auto it = map.find({table_id, handle_id, ts}); it != map.end()) + { + const auto & [key, value, _] = it->second; + std::ignore = _; + size_t size = key.dataSize() + value.dataSize(); + map.erase(it); + return size; + } + else + { + auto key = RecordKVFormat::appendTs(RecordKVFormat::genKey(table_id, handle_id), ts); + throw Exception(" key not found [" + key.toString() + "]", ErrorCodes::LOGICAL_ERROR); + + return 0; + } + } + + friend bool operator==(const RegionWriteCFData & cf1, const RegionWriteCFData & cf2) + { + return CFDataCmp(cf1.map, cf2.map); + } + + RegionWriteCFData(){} + RegionWriteCFData(RegionWriteCFData && data): map(std::move(data.map)) {} + RegionWriteCFData & operator = (RegionWriteCFData && data) + { + map = std::move(data.map); + return *this; + } + + WriteCFDataMap map; +}; + +struct RegionDefaultCFData +{ + using Key = std::tuple; + using Value = std::tuple; + using DefaultCFDataMap = std::map; + + TableID insert(TableID table_id, HandleID handle_id, Timestamp ts, const TiKVKey & key, const TiKVValue & value) + { + auto [it, ok] = map.try_emplace({table_id, handle_id, ts}, Value{key, value}); + std::ignore = it; + if (!ok) + throw Exception(" found existing key [" + key.toString() + "]", ErrorCodes::LOGICAL_ERROR); + + return table_id; + } + + TableID insert(const TiKVKey & key, const TiKVValue & value) + { + const String & raw_key = RecordKVFormat::decodeTiKVKey(key); + return insert(key, value, raw_key); + } + + TableID insert(const TiKVKey & key, const TiKVValue & value, const String & raw_key) + { + TableID table_id = RecordKVFormat::getTableId(raw_key); + HandleID handle_id = RecordKVFormat::getHandle(raw_key); + Timestamp ts = RecordKVFormat::getTs(key); + return insert(table_id, handle_id, ts, key, value); + } + + size_t remove(TableID table_id, HandleID handle_id, Timestamp ts) + { + if (auto it = map.find({table_id, handle_id, ts}); it != map.end()) + { + const auto & [key, value] = it->second; + size_t size = key.dataSize() + value.dataSize(); + map.erase(it); + return size; + } + else + { + auto key = RecordKVFormat::appendTs(RecordKVFormat::genKey(table_id, handle_id), ts); + throw Exception(" key not found [" + key.toString() + "]", ErrorCodes::LOGICAL_ERROR); + + return 0; + } + } + + friend bool operator==(const RegionDefaultCFData & cf1, const RegionDefaultCFData & cf2) + { + return CFDataCmp(cf1.map, cf2.map); + } + + RegionDefaultCFData(){} + RegionDefaultCFData(RegionDefaultCFData && data): map(std::move(data.map)) {} + RegionDefaultCFData & operator = (RegionDefaultCFData && data) + { + map = std::move(data.map); + return *this; + } + + DefaultCFDataMap map; +}; + +struct RegionLockCFData +{ + using DecodedLockCFValue = std::tuple>; + using Key = std::tuple; + using Value = std::tuple; + using LockCFDataMap = std::map; + + TableID insert(TableID table_id, HandleID handle_id, const TiKVKey & key, const TiKVValue & value) + { + auto [it, ok] = map.try_emplace({table_id, handle_id}, Value{key, value, RecordKVFormat::decodeLockCfValue(value)}); + std::ignore = it; + + if (!ok) + throw Exception(" found existing key [" + key.toString() + "]", ErrorCodes::LOGICAL_ERROR); + + return table_id; + } + + TableID insert(const TiKVKey & key, const TiKVValue & value) + { + const String & raw_key = RecordKVFormat::decodeTiKVKey(key); + return insert(key, value, raw_key); + } + + TableID insert(const TiKVKey & key, const TiKVValue & value, const String & raw_key) + { + TableID table_id = RecordKVFormat::getTableId(raw_key); + HandleID handle_id = RecordKVFormat::getHandle(raw_key); + return insert(table_id, handle_id, key, value); + } + + void remove(TableID table_id, HandleID handle_id) + { + if (auto it = map.find({table_id, handle_id}); it != map.end()) + { + map.erase(it); + } + else + { + auto key = RecordKVFormat::genKey(table_id, handle_id); + throw Exception(" key not found [" + key.toString() + "]", ErrorCodes::LOGICAL_ERROR); + } + } + + friend bool operator==(const RegionLockCFData & cf1, const RegionLockCFData & cf2) + { + return CFDataCmp(cf1.map, cf2.map); + } + + RegionLockCFData(){} + RegionLockCFData(RegionLockCFData && data): map(std::move(data.map)) {} + RegionLockCFData & operator = (RegionLockCFData && data) + { + map = std::move(data.map); + return *this; + } + + LockCFDataMap map; +}; + +class RegionData +{ +public: + // In both lock_cf and write_cf. + enum CFModifyFlag : UInt8 + { + PutFlag = 'P', + DelFlag = 'D', + // useless for TiFLASH + /* + LockFlag = 'L', + // In write_cf, only raft leader will use RollbackFlag in txn mode. Learner should ignore it. + RollbackFlag = 'R', + */ + }; + + /// A quick-and-dirty copy of LockInfo structure in kvproto. + /// Used to transmit to client using non-ProtoBuf protocol. + struct LockInfo + { + std::string primary_lock; + UInt64 lock_version; + std::string key; + UInt64 lock_ttl; + }; + + using ReadInfo = std::tuple; + using WriteCFIter = RegionWriteCFData::WriteCFDataMap::iterator; + using ConstWriteCFIter = RegionWriteCFData::WriteCFDataMap::const_iterator; + + using LockInfoPtr = std::unique_ptr; + + TableID insert(ColumnFamilyType cf, const TiKVKey & key, const String & raw_key, const TiKVValue & value) + { + switch(cf) + { + case Write: + { + cf_data_size += key.dataSize() + value.dataSize(); + return write_cf.insert(key, value, raw_key); + } + case Default: + { + cf_data_size += key.dataSize() + value.dataSize(); + return default_cf.insert(key, value, raw_key); + } + case Lock: + { + return lock_cf.insert(key, value, raw_key); + } + default: + throw Exception(" should not happen", ErrorCodes::LOGICAL_ERROR); + } + } + + TableID remove(ColumnFamilyType cf, const TiKVKey & key, const String & raw_key) + { + switch(cf) + { + case Write: + { + TableID table_id = RecordKVFormat::getTableId(raw_key); + HandleID handle_id = RecordKVFormat::getHandle(raw_key); + Timestamp ts = RecordKVFormat::getTs(key); + cf_data_size -= write_cf.remove(table_id, handle_id, ts); + return table_id; + } + case Default: + { + TableID table_id = RecordKVFormat::getTableId(raw_key); + HandleID handle_id = RecordKVFormat::getHandle(raw_key); + Timestamp ts = RecordKVFormat::getTs(key); + cf_data_size -= default_cf.remove(table_id, handle_id, ts); + return table_id; + } + case Lock: + { + TableID table_id = RecordKVFormat::getTableId(raw_key); + HandleID handle_id = RecordKVFormat::getHandle(raw_key); + lock_cf.remove(table_id, handle_id); + return table_id; + } + default: + throw Exception(" should not happen", ErrorCodes::LOGICAL_ERROR); + } + } + + WriteCFIter removeDataByWriteIt(const WriteCFIter & write_it) + { + const auto & [key, value, decoded_val] = write_it->second; + const auto & [table, handle, ts] = write_it->first; + std::ignore = ts; + const auto & [write_type, prewrite_ts, short_str] = decoded_val; + + auto data_it = default_cf.map.find({table, handle, prewrite_ts}); + + if (write_type == PutFlag && !short_str) + { + if (unlikely(data_it == default_cf.map.end())) + throw Exception(" key [" + key.toString() + "] not found in data cf when removing", ErrorCodes::LOGICAL_ERROR); + + const auto & [key, value] = data_it->second; + + cf_data_size -= key.dataSize() + value.dataSize(); + default_cf.map.erase(data_it); + } + + cf_data_size -= key.dataSize() + value.dataSize(); + + return write_cf.map.erase(write_it); + } + + ReadInfo readDataByWriteIt(const ConstWriteCFIter & write_it, std::vector * keys) + { + const auto & [key, value, decoded_val] = write_it->second; + const auto & [table, handle, ts] = write_it->first; + + std::ignore = value; + + if (keys) + keys->push_back(write_it->first); + + const auto & [write_type, prewrite_ts, short_value] = decoded_val; + + if (write_type != PutFlag) + return std::make_tuple(handle, write_type, ts, TiKVValue()); + + if (short_value) + return std::make_tuple(handle, write_type, ts, TiKVValue(*short_value)); + + auto data_it = default_cf.map.find({table, handle, prewrite_ts}); + if (unlikely(data_it == default_cf.map.end())) + throw Exception("key [" + key.toString() + "] not found in data cf", ErrorCodes::LOGICAL_ERROR); + + return std::make_tuple(handle, write_type, ts, std::get<1>(data_it->second)); + } + + LockInfoPtr getLockInfo(TableID expected_table_id, Timestamp start_ts) + { + for (const auto & [key, value] : lock_cf.map) + { + const auto & [table_id, handle] = key; + if (expected_table_id != table_id) + continue; + std::ignore = handle; + + const auto & [tikv_key, tikv_val, decoded_val] = value; + const auto & [lock_type, primary, ts, ttl, data] = decoded_val; + std::ignore = tikv_val; + std::ignore = data; + + if (lock_type == DelFlag || ts > start_ts) + continue; + + return std::make_unique(LockInfo{primary, ts, RecordKVFormat::decodeTiKVKey(tikv_key), ttl}); + } + + return nullptr; + } + + void splitInto(const RegionRange & range, RegionData & new_region_data) + { + const auto & [start_key, end_key] = range; + + for (auto it = default_cf.map.begin(); it != default_cf.map.end();) + { + const auto & [key, val] = it->second; + + bool ok = start_key ? key >= start_key : true; + ok = ok && (end_key ? key < end_key : true); + if (ok) + { + cf_data_size -= key.dataSize() + val.dataSize(); + new_region_data.cf_data_size += key.dataSize() + val.dataSize(); + + new_region_data.default_cf.map.insert(std::move(*it)); + it = default_cf.map.erase(it); + } + else + ++it; + } + + for (auto it = write_cf.map.begin(); it != write_cf.map.end();) + { + const auto & [key, val, _] = it->second; + std::ignore = _; + + bool ok = start_key ? key >= start_key : true; + ok = ok && (end_key ? key < end_key : true); + if (ok) + { + cf_data_size -= key.dataSize() + val.dataSize(); + new_region_data.cf_data_size += key.dataSize() + val.dataSize(); + + new_region_data.write_cf.map.insert(std::move(*it)); + it = write_cf.map.erase(it); + } + else + ++it; + } + + for (auto it = lock_cf.map.begin(); it != lock_cf.map.end();) + { + const auto & [key, val, _] = it->second; + std::ignore = val; + std::ignore = _; + + bool ok = start_key ? key >= start_key : true; + ok = ok && (end_key ? key < end_key : true); + if (ok) + { + new_region_data.lock_cf.map.insert(std::move(*it)); + it = lock_cf.map.erase(it); + } + else + ++it; + } + } + + size_t dataSize() const { return cf_data_size; } + + void reset(RegionData && new_region_data) + { + default_cf = std::move(new_region_data.default_cf); + write_cf = std::move(new_region_data.write_cf); + lock_cf = std::move(new_region_data.lock_cf); + + cf_data_size = new_region_data.cf_data_size.load(); + } + + size_t serialize(WriteBuffer & buf) + { + size_t total_size = 0; + + total_size += writeBinary2(default_cf.map.size(), buf); + for (const auto & ele : default_cf.map) + { + const auto & [key, value] = ele.second; + total_size += key.serialize(buf); + total_size += value.serialize(buf); + } + + total_size += writeBinary2(write_cf.map.size(), buf); + for (const auto & ele : write_cf.map) + { + const auto & [key, value, _] = ele.second; + std::ignore = _; + + total_size += key.serialize(buf); + total_size += value.serialize(buf); + } + + total_size += writeBinary2(lock_cf.map.size(), buf); + for (const auto & ele : lock_cf.map) + { + const auto & [key, value, _] = ele.second; + std::ignore = _; + + total_size += key.serialize(buf); + total_size += value.serialize(buf); + } + + return total_size; + } + + static void deserialize(ReadBuffer & buf, RegionData & region_data) + { + auto size = readBinary2(buf); + for (size_t i = 0; i < size; ++i) + { + auto key = TiKVKey::deserialize(buf); + auto value = TiKVValue::deserialize(buf); + + region_data.default_cf.insert(key, value); + region_data.cf_data_size += key.dataSize() + value.dataSize(); + } + + size = readBinary2(buf); + for (size_t i = 0; i < size; ++i) + { + auto key = TiKVKey::deserialize(buf); + auto value = TiKVValue::deserialize(buf); + + region_data.write_cf.insert(key, value); + region_data.cf_data_size += key.dataSize() + value.dataSize(); + } + + size = readBinary2(buf); + for (size_t i = 0; i < size; ++i) + { + auto key = TiKVKey::deserialize(buf); + auto value = TiKVValue::deserialize(buf); + + region_data.lock_cf.insert(key, value); + } + } + + friend bool operator==(const RegionData & r1, const RegionData & r2) + { + return r1.default_cf == r2.default_cf && r1.write_cf == r2.write_cf + && r1.lock_cf == r2.lock_cf && r1.cf_data_size == r2.cf_data_size; + } + + RegionData() {} + + RegionData(RegionData && data):write_cf(std::move(data.write_cf)),default_cf(std::move(data.default_cf)),lock_cf(std::move(data.lock_cf)) {} + +public: + RegionWriteCFData write_cf; + RegionDefaultCFData default_cf; + RegionLockCFData lock_cf; + + // Size of data cf & write cf, without lock cf. + std::atomic cf_data_size = 0; +}; + +} // namespace DB diff --git a/dbms/src/Storages/Transaction/RegionTable.cpp b/dbms/src/Storages/Transaction/RegionTable.cpp index 635ca2e8ae0..9464a7d6fc1 100644 --- a/dbms/src/Storages/Transaction/RegionTable.cpp +++ b/dbms/src/Storages/Transaction/RegionTable.cpp @@ -170,7 +170,7 @@ void RegionTable::flushRegion(TableID table_id, RegionID region_id, size_t & cac TMTContext & tmt = context.getTMTContext(); - std::vector keys_to_remove; + std::vector keys_to_remove; { auto merge_tree = std::dynamic_pointer_cast(storage); diff --git a/dbms/src/Storages/Transaction/RegionTable.h b/dbms/src/Storages/Transaction/RegionTable.h index 201fb39d473..275f50a5bf8 100644 --- a/dbms/src/Storages/Transaction/RegionTable.h +++ b/dbms/src/Storages/Transaction/RegionTable.h @@ -189,7 +189,7 @@ class RegionTable : private boost::noncopyable const TiDB::TableInfo & table_info, const ColumnsDescription & columns, const Names & ordered_columns, - std::vector * keys); + std::vector * keys); static std::tuple getBlockInputStreamByRegion(TableID table_id, RegionPtr region, @@ -201,7 +201,7 @@ class RegionTable : private boost::noncopyable bool learner_read, bool resolve_locks, UInt64 start_ts, - std::vector * keys = nullptr); + std::vector * keys = nullptr); static TableIDSet getRegionTableIds(const RegionPtr & region); diff --git a/dbms/src/Storages/Transaction/TiKVKeyValue.h b/dbms/src/Storages/Transaction/TiKVKeyValue.h index 1100fe3ce07..97e61564912 100644 --- a/dbms/src/Storages/Transaction/TiKVKeyValue.h +++ b/dbms/src/Storages/Transaction/TiKVKeyValue.h @@ -1,20 +1,17 @@ #pragma once -#include -#include #include #include -#include #include #include #include +#include #include -#include #include -#include +#include namespace DB @@ -22,7 +19,7 @@ namespace DB namespace ErrorCodes { - extern const int LOGICAL_ERROR; +extern const int LOGICAL_ERROR; } template @@ -33,12 +30,12 @@ struct StringObject struct Hash { - std::size_t operator()(const T & x) const { return std::hash()(x.str); } + std::size_t operator()(const T & x) const { return std::hash()(x.str); } }; StringObject() = default; - explicit StringObject(std::string && str_) : str(std::move(str_)) {} - explicit StringObject(const std::string & str_) : str(str_) {} + explicit StringObject(String && str_) : str(std::move(str_)) {} + explicit StringObject(const String & str_) : str(str_) {} StringObject(StringObject && obj) : str(std::move(obj.str)) {} StringObject(const StringObject & obj) : str(obj.str) {} StringObject & operator=(const StringObject & a) @@ -52,15 +49,15 @@ struct StringObject return *this; } - const std::string & getStr() const { return str; } - std::string & getStrRef() { return str; } - const char * data() const { return str.data(); } - size_t dataSize() const { return str.size(); } + const String & getStr() const { return str; } + String & getStrRef() { return str; } + const char * data() const { return str.data(); } + size_t dataSize() const { return str.size(); } - std::string toString() const { return str; } + String toString() const { return str; } // For debug - std::string toHex() const + String toHex() const { std::stringstream ss; ss << str.size() << "[" << std::hex; @@ -70,34 +67,34 @@ struct StringObject return ss.str(); } - bool empty() const { return str.empty(); } + bool empty() const { return str.empty(); } explicit operator bool() const { return !str.empty(); } - bool operator==(const T & rhs) const { return str == rhs.str; } - bool operator!=(const T & rhs) const { return str != rhs.str; } - bool operator<(const T & rhs) const { return str < rhs.str; } - bool operator<=(const T & rhs) const { return str <= rhs.str; } - bool operator>(const T & rhs) const { return str > rhs.str; } - bool operator>=(const T & rhs) const { return str >= rhs.str; } + bool operator==(const T & rhs) const { return str == rhs.str; } + bool operator!=(const T & rhs) const { return str != rhs.str; } + bool operator<(const T & rhs) const { return str < rhs.str; } + bool operator<=(const T & rhs) const { return str <= rhs.str; } + bool operator>(const T & rhs) const { return str > rhs.str; } + bool operator>=(const T & rhs) const { return str >= rhs.str; } size_t serialize(WriteBuffer & buf) const { return writeBinary2(str, buf); } - static T deserialize(ReadBuffer & buf) { return T(readBinary2(buf)); } + static T deserialize(ReadBuffer & buf) { return T(readBinary2(buf)); } private: - std::string str; + String str; }; -using TiKVKey = StringObject; -using TiKVValue = StringObject; +using TiKVKey = StringObject; +using TiKVValue = StringObject; using TiKVKeyValue = std::pair; namespace RecordKVFormat { -static const char TABLE_PREFIX = 't'; -static const char * RECORD_PREFIX_SEP = "_r"; -static const char SHORT_VALUE_PREFIX = 'v'; +static const char TABLE_PREFIX = 't'; +static const char * RECORD_PREFIX_SEP = "_r"; +static const char SHORT_VALUE_PREFIX = 'v'; static const size_t SHORT_VALUE_MAX_LEN = 64; @@ -107,27 +104,19 @@ static const size_t RAW_KEY_NO_HANDLE_SIZE = 1 + 8 + 2; static const size_t RAW_KEY_SIZE = RAW_KEY_NO_HANDLE_SIZE + 8; -inline TiKVKeyValue genKV(const raft_cmdpb::PutRequest & req) -{ - return {TiKVKey(req.key()), TiKVValue(req.value())}; -} +inline TiKVKeyValue genKV(const raft_cmdpb::PutRequest & req) { return {TiKVKey(req.key()), TiKVValue(req.value())}; } -inline TiKVKey genKey(const raft_cmdpb::GetRequest & req) -{ - return TiKVKey {req.key()}; -} +inline TiKVKey genKey(const raft_cmdpb::GetRequest & req) { return TiKVKey{req.key()}; } -inline TiKVKey genKey(const raft_cmdpb::DeleteRequest & req) -{ - return TiKVKey {req.key()}; -} +inline TiKVKey genKey(const raft_cmdpb::DeleteRequest & req) { return TiKVKey{req.key()}; } inline std::vector DecodeRow(const TiKVValue & value) { std::vector vec; - const String & raw_value = value.toString(); + const String & raw_value = value.getStr(); size_t cursor = 0; - while(cursor < raw_value.size()) { + while (cursor < raw_value.size()) + { vec.push_back(DecodeDatum(cursor, raw_value)); } return vec; @@ -136,48 +125,26 @@ inline std::vector DecodeRow(const TiKVValue & value) // Key format is here: // https://docs.google.com/document/d/1J9Dsp8l5Sbvzjth77hK8yx3SzpEJ4SXaR_wIvswRhro/edit // https://github.com/tikv/tikv/blob/289ce2ddac505d7883ec616c078e184c00844d17/src/util/codec/bytes.rs#L33-L63 -// TODO support desc is true -inline void encodeAsTiKVKey(const String& ori_str, std::stringstream & ss) -{ - EncodeBytes(ori_str, ss); -} +inline void encodeAsTiKVKey(const String & ori_str, std::stringstream & ss) { EncodeBytes(ori_str, ss); } -inline TiKVKey encodeAsTiKVKey(const String& ori_str) +inline TiKVKey encodeAsTiKVKey(const String & ori_str) { std::stringstream ss; encodeAsTiKVKey(ori_str, ss); return TiKVKey(ss.str()); } -inline UInt64 encodeUInt64(const UInt64 x) -{ - return toBigEndian(x); -} +inline UInt64 encodeUInt64(const UInt64 x) { return toBigEndian(x); } -inline UInt64 encodeInt64(const Int64 x) -{ - return encodeUInt64(static_cast(x) ^ SIGN_MARK); -} +inline UInt64 encodeInt64(const Int64 x) { return encodeUInt64(static_cast(x) ^ SIGN_MARK); } -inline UInt64 encodeUInt64Desc(const UInt64 x) -{ - return encodeUInt64(~x); -} +inline UInt64 encodeUInt64Desc(const UInt64 x) { return encodeUInt64(~x); } -inline UInt64 decodeUInt64(const UInt64 x) -{ - return toBigEndian(x); -} +inline UInt64 decodeUInt64(const UInt64 x) { return toBigEndian(x); } -inline UInt64 decodeUInt64Desc(const UInt64 x) -{ - return ~decodeUInt64(x); -} +inline UInt64 decodeUInt64Desc(const UInt64 x) { return ~decodeUInt64(x); } -inline Int64 decodeInt64(const UInt64 x) -{ - return static_cast(decodeUInt64(x) ^ SIGN_MARK); -} +inline Int64 decodeInt64(const UInt64 x) { return static_cast(decodeUInt64(x) ^ SIGN_MARK); } inline TiKVValue EncodeRow(const TiDB::TableInfo & table_info, const std::vector & fields) { @@ -194,7 +161,8 @@ inline TiKVValue EncodeRow(const TiDB::TableInfo & table_info, const std::vector } template -inline T read(const char* s){ +inline T read(const char * s) +{ return *(reinterpret_cast(s)); } @@ -210,22 +178,19 @@ inline String genRawKey(const TableID tableId, const HandleID handleId) return key.str(); } -inline TiKVKey genKey(const TableID tableId, const HandleID handleId) -{ - return encodeAsTiKVKey(genRawKey(tableId, handleId)); -} +inline TiKVKey genKey(const TableID tableId, const HandleID handleId) { return encodeAsTiKVKey(genRawKey(tableId, handleId)); } -inline std::tuple decodeTiKVKey(const TiKVKey & key) +inline std::tuple decodeTiKVKeyFull(const TiKVKey & key) { std::stringstream res; - const char* ptr = key.data(); + const char * ptr = key.data(); const size_t chunk_len = ENC_GROUP_SIZE + 1; - for (const char* next_ptr = ptr; ; next_ptr += chunk_len) + for (const char * next_ptr = ptr;; next_ptr += chunk_len) { ptr = next_ptr; if (ptr + chunk_len > key.dataSize() + key.data()) throw Exception("Unexpected eof", ErrorCodes::LOGICAL_ERROR); - auto marker = (UInt8)*(ptr + ENC_GROUP_SIZE); + auto marker = (UInt8) * (ptr + ENC_GROUP_SIZE); size_t pad_size = (ENC_MARKER - marker); if (pad_size == 0) { @@ -235,7 +200,7 @@ inline std::tuple decodeTiKVKey(const TiKVKey & key) if (pad_size > ENC_GROUP_SIZE) throw Exception("Key padding", ErrorCodes::LOGICAL_ERROR); res.write(ptr, ENC_GROUP_SIZE - pad_size); - for (const char *p = ptr + ENC_GROUP_SIZE - pad_size; p < ptr + ENC_GROUP_SIZE; ++p) + for (const char * p = ptr + ENC_GROUP_SIZE - pad_size; p < ptr + ENC_GROUP_SIZE; ++p) { if (*p != 0) throw Exception("Key padding", ErrorCodes::LOGICAL_ERROR); @@ -245,42 +210,24 @@ inline std::tuple decodeTiKVKey(const TiKVKey & key) } } -inline Timestamp getTs(const TiKVKey & key) -{ - return decodeUInt64Desc(read(key.data() + key.dataSize() - 8)); -} +inline String decodeTiKVKey(const TiKVKey & key) { return std::get<0>(decodeTiKVKeyFull(key)); } -inline TableID getTableId(const String & key) -{ - return decodeInt64(read(key.data() + 1)); -} +inline Timestamp getTs(const TiKVKey & key) { return decodeUInt64Desc(read(key.data() + key.dataSize() - 8)); } -inline HandleID getHandle(const String & key) -{ - return decodeInt64(read(key.data() + RAW_KEY_NO_HANDLE_SIZE)); -} +inline TableID getTableId(const String & key) { return decodeInt64(read(key.data() + 1)); } -inline TableID getTableId(const TiKVKey & key) -{ - return getTableId(std::get<0>(decodeTiKVKey(key))); -} +inline HandleID getHandle(const String & key) { return decodeInt64(read(key.data() + RAW_KEY_NO_HANDLE_SIZE)); } -inline HandleID getHandle(const TiKVKey & key) -{ - return getHandle(std::get<0>(decodeTiKVKey(key))); -} +inline TableID getTableId(const TiKVKey & key) { return getTableId(decodeTiKVKey(key)); } + +inline HandleID getHandle(const TiKVKey & key) { return getHandle(decodeTiKVKey(key)); } inline bool isRecord(const String & raw_key) { - return raw_key.size() >= RAW_KEY_SIZE - && raw_key[0] == TABLE_PREFIX - && memcmp(raw_key.data() + 9, RECORD_PREFIX_SEP, 2) == 0; + return raw_key.size() >= RAW_KEY_SIZE && raw_key[0] == TABLE_PREFIX && memcmp(raw_key.data() + 9, RECORD_PREFIX_SEP, 2) == 0; } -inline TiKVKey truncateTs(const TiKVKey & key) -{ - return TiKVKey(std::string(key.data(), key.dataSize() - sizeof(Timestamp))); -} +inline TiKVKey truncateTs(const TiKVKey & key) { return TiKVKey(String(key.data(), key.dataSize() - sizeof(Timestamp))); } inline TiKVKey appendTs(const TiKVKey & key, Timestamp ts) { @@ -293,7 +240,7 @@ inline void changeTs(TiKVKey & key, Timestamp ts) { auto big_endian_ts = encodeUInt64Desc(ts); auto str = key.getStrRef(); - *(reinterpret_cast(str.data() + str.size() - sizeof(Timestamp))) = big_endian_ts; + *(reinterpret_cast(str.data() + str.size() - sizeof(Timestamp))) = big_endian_ts; } inline TiKVKey genKey(TableID tableId, HandleID handleId, Timestamp ts) @@ -302,8 +249,7 @@ inline TiKVKey genKey(TableID tableId, HandleID handleId, Timestamp ts) return appendTs(key, ts); } -inline TiKVValue internalEncodeLockCfValue(UInt8 lock_type, const String& primary, UInt64 ts, UInt64 ttl, - const String* short_value) +inline TiKVValue internalEncodeLockCfValue(UInt8 lock_type, const String & primary, Timestamp ts, UInt64 ttl, const String * short_value) { std::stringstream res; res.put(lock_type); @@ -321,44 +267,45 @@ inline TiKVValue internalEncodeLockCfValue(UInt8 lock_type, const String& primar } -inline TiKVValue encodeLockCfValue(UInt8 lock_type, const String& primary, UInt64 ts, UInt64 ttl, const String& short_value) +inline TiKVValue encodeLockCfValue(UInt8 lock_type, const String & primary, Timestamp ts, UInt64 ttl, const String & short_value) { return internalEncodeLockCfValue(lock_type, primary, ts, ttl, &short_value); } -inline TiKVValue encodeLockCfValue(UInt8 lock_type, const String& primary, UInt64 ts, UInt64 ttl) +inline TiKVValue encodeLockCfValue(UInt8 lock_type, const String & primary, Timestamp ts, UInt64 ttl) { return internalEncodeLockCfValue(lock_type, primary, ts, ttl, nullptr); } +using DecodedLockCFValue = std::tuple>; -inline std::tuple> decodeLockCfValue(const TiKVValue& value) +inline std::tuple> decodeLockCfValue(const TiKVValue & value) { UInt8 lock_type; - std::string primary; + String primary; UInt64 ts; UInt64 ttl = 0; const char * data = value.data(); size_t len = value.dataSize(); lock_type = static_cast(*data); - data += 1, len -= 1; //lock type + data += 1, len -= 1; //lock type Int64 primary_len = 0; - auto cur = TiKV::readVarInt(primary_len, data, len); // primary + auto cur = TiKV::readVarInt(primary_len, data, len); // primary len -= cur - data, data = cur; primary.append(data, static_cast(primary_len)); len -= primary_len, data += primary_len; - cur = TiKV::readVarUInt(ts, data, len); // ts + cur = TiKV::readVarUInt(ts, data, len); // ts len -= cur - data, data = cur; if (len == 0) return std::make_tuple(lock_type, primary, ts, ttl, nullptr); - cur = TiKV::readVarUInt(ttl, data, len); // ttl + cur = TiKV::readVarUInt(ttl, data, len); // ttl len -= cur - data, data = cur; if (len == 0) return std::make_tuple(lock_type, primary, ts, ttl, nullptr); char flag = *data; - data += 1, len -= 1; // SHORT_VALUE_PREFIX + data += 1, len -= 1; // SHORT_VALUE_PREFIX assert(flag == SHORT_VALUE_PREFIX); (void)flag; auto slen = (size_t)*data; @@ -368,18 +315,19 @@ inline std::tuple> d return std::make_tuple(lock_type, primary, ts, ttl, std::make_unique(data, len)); } +using DecodedWriteCFValue = std::tuple>; -inline std::tuple> decodeWriteCfValue(const TiKVValue& value) +inline DecodedWriteCFValue decodeWriteCfValue(const TiKVValue & value) { const char * data = value.data(); size_t len = value.dataSize(); auto write_type = static_cast(*data); - data += 1, len -= 1; //write type + data += 1, len -= 1; //write type - UInt64 ts; + Timestamp ts; const char * res = TiKV::readVarUInt(ts, data, len); - len -= res - data, data = res; // ts + len -= res - data, data = res; // ts if (len == 0) return std::make_tuple(write_type, ts, nullptr); @@ -393,7 +341,7 @@ inline std::tuple> decodeWriteCfValue(con } -inline TiKVValue internalEncodeWriteCfValue(UInt8 write_type, UInt64 ts, const String* short_value) +inline TiKVValue internalEncodeWriteCfValue(UInt8 write_type, Timestamp ts, const String * short_value) { std::stringstream res; res.put(write_type); @@ -408,23 +356,13 @@ inline TiKVValue internalEncodeWriteCfValue(UInt8 write_type, UInt64 ts, const S } -inline TiKVValue encodeWriteCfValue(UInt8 write_type, UInt64 ts, const String& short_value) +inline TiKVValue encodeWriteCfValue(UInt8 write_type, Timestamp ts, const String & short_value) { return internalEncodeWriteCfValue(write_type, ts, &short_value); } -inline TiKVValue encodeWriteCfValue(UInt8 write_type, UInt64 ts) -{ - return internalEncodeWriteCfValue(write_type, ts, nullptr); -} - - -inline UInt64 getTsFromWriteCf(const TiKVValue & value) -{ - return std::get<1>(decodeWriteCfValue(value)); -} - +inline TiKVValue encodeWriteCfValue(UInt8 write_type, Timestamp ts) { return internalEncodeWriteCfValue(write_type, ts, nullptr); } } // namespace RecordKVFormat @@ -446,11 +384,10 @@ inline HandleID getRangeHandle(const TiKVKey & tikv_key, const TableID table_id) } String key; - if constexpr (decoded) { + if constexpr (decoded) key = tikv_key.getStr(); - } else { - key = std::get<0>(RecordKVFormat::decodeTiKVKey(tikv_key)); - } + else + key = RecordKVFormat::decodeTiKVKey(tikv_key); if (key <= RecordKVFormat::genRawKey(table_id, min)) return min; @@ -474,7 +411,7 @@ inline HandleID getRangeHandle(const TiKVKey & tikv_key, const TableID table_id) return RecordKVFormat::getHandle(key); } -inline bool checkTableInvolveRange(const TableID table_id, const std::pair& range) +inline bool checkTableInvolveRange(const TableID table_id, const std::pair & range) { const TiKVKey start_key = RecordKVFormat::genKey(table_id, std::numeric_limits::min()); const TiKVKey end_key = RecordKVFormat::genKey(table_id, std::numeric_limits::max()); @@ -483,6 +420,6 @@ inline bool checkTableInvolveRange(const TableID table_id, const std::pairmutable_cf()) = "default"; auto * kvs = r.mutable_data()->mutable_data(); - *(kvs->Add()) = kv(table_id, 1, 1, "v1"); - *(kvs->Add()) = kv(table_id, 2, 2, "v3"); - *(kvs->Add()) = kv(table_id, 3, 3, "v3"); + *(kvs->Add()) = default_kv(table_id, 1, 1, "v1"); + *(kvs->Add()) = default_kv(table_id, 2, 2, "v3"); + *(kvs->Add()) = default_kv(table_id, 3, 3, "v3"); reqs.push_back(r); } @@ -65,9 +81,9 @@ int main(int, char **) enginepb::SnapshotRequest r; *(r.mutable_data()->mutable_cf()) = "lock"; auto * kvs = r.mutable_data()->mutable_data(); - *(kvs->Add()) = kv(table_id, 1, 1, "v1"); - *(kvs->Add()) = kv(table_id, 2, 2, "v3"); - *(kvs->Add()) = kv(table_id, 3, 3, "v3"); + *(kvs->Add()) = lock_kv(table_id, 111); + *(kvs->Add()) = lock_kv(table_id, 222); + *(kvs->Add()) = lock_kv(table_id, 333); reqs.push_back(r); } @@ -76,9 +92,9 @@ int main(int, char **) enginepb::SnapshotRequest r; *(r.mutable_data()->mutable_cf()) = "write"; auto * kvs = r.mutable_data()->mutable_data(); - *(kvs->Add()) = kv(table_id, 1, 1, "v1"); - *(kvs->Add()) = kv(table_id, 2, 2, "v3"); - *(kvs->Add()) = kv(table_id, 3, 3, "v3"); + *(kvs->Add()) = write_kv(table_id, 1, 1, 1); + *(kvs->Add()) = write_kv(table_id, 2, 2, 2); + *(kvs->Add()) = write_kv(table_id, 3, 3, 3); reqs.push_back(r); } diff --git a/dbms/src/Storages/Transaction/tests/region_persister.cpp b/dbms/src/Storages/Transaction/tests/region_persister.cpp index d8784aba1d3..5a6d700d67e 100644 --- a/dbms/src/Storages/Transaction/tests/region_persister.cpp +++ b/dbms/src/Storages/Transaction/tests/region_persister.cpp @@ -61,8 +61,8 @@ int main(int, char **) auto region = std::make_shared(createRegionMeta(100)); TiKVKey key = RecordKVFormat::genKey(100, 323, 9983); region->insert("default", key, TiKVValue("value1")); - region->insert("write", key, TiKVValue("value1")); - region->insert("lock", key, TiKVValue("value1")); + region->insert("write", key, RecordKVFormat::encodeWriteCfValue('P', 0)); + region->insert("lock", key, RecordKVFormat::encodeLockCfValue('P', "", 0, 0)); auto path = dir_path + "region.test"; WriteBufferFromFile write_buf(path, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_CREAT); @@ -101,8 +101,8 @@ int main(int, char **) auto region = std::make_shared(createRegionMeta(i)); TiKVKey key = RecordKVFormat::genKey(100, i, diff++); region->insert("default", key, TiKVValue("value1")); - region->insert("write", key, TiKVValue("value1")); - region->insert("lock", key, TiKVValue("value1")); + region->insert("write", key, RecordKVFormat::encodeWriteCfValue('P', 0)); + region->insert("lock", key, RecordKVFormat::encodeLockCfValue('P', "", 0, 0)); persister.persist(region); @@ -150,8 +150,8 @@ int main(int, char **) auto region = std::make_shared(createRegionMeta(i)); TiKVKey key = RecordKVFormat::genKey(100, i, diff++); region->insert("default", key, TiKVValue("value1")); - region->insert("write", key, TiKVValue("value1")); - region->insert("lock", key, TiKVValue("value1")); + region->insert("write", key, RecordKVFormat::encodeWriteCfValue('P', 0)); + region->insert("lock", key, RecordKVFormat::encodeLockCfValue('P', "", 0, 0)); persister.persist(region); From 8595a25ccfec9e6acdd88ef25b1e7c2c8c45c4c7 Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Thu, 4 Apr 2019 14:57:53 +0800 Subject: [PATCH 02/11] use template to make code better. --- .../Storages/Transaction/PartitionStreams.cpp | 2 +- dbms/src/Storages/Transaction/Region.cpp | 14 +- dbms/src/Storages/Transaction/Region.h | 66 ++- dbms/src/Storages/Transaction/RegionData.h | 548 +++++++----------- dbms/src/Storages/Transaction/RegionTable.cpp | 2 +- 5 files changed, 263 insertions(+), 369 deletions(-) diff --git a/dbms/src/Storages/Transaction/PartitionStreams.cpp b/dbms/src/Storages/Transaction/PartitionStreams.cpp index 8992b6ef7f3..942acafb4b8 100644 --- a/dbms/src/Storages/Transaction/PartitionStreams.cpp +++ b/dbms/src/Storages/Transaction/PartitionStreams.cpp @@ -66,7 +66,7 @@ std::tuple RegionTab { Region::LockInfoPtr lock_info = nullptr; if (resolve_locks) - lock_info = scanner->getLockInfo(table_id, start_ts); + lock_info = scanner->getLockInfo(start_ts); if (lock_info) { Region::LockInfos lock_infos; diff --git a/dbms/src/Storages/Transaction/Region.cpp b/dbms/src/Storages/Transaction/Region.cpp index bf47bd5fb03..1043398ffff 100644 --- a/dbms/src/Storages/Transaction/Region.cpp +++ b/dbms/src/Storages/Transaction/Region.cpp @@ -19,11 +19,15 @@ const String Region::lock_cf_name = "lock"; const String Region::default_cf_name = "default"; const String Region::write_cf_name = "write"; -RegionData::WriteCFIter Region::removeDataByWriteIt(const RegionData::WriteCFIter & write_it) { return data.removeDataByWriteIt(write_it); } +RegionData::WriteCFIter Region::removeDataByWriteIt(const TableID & table_id, const RegionData::WriteCFIter & write_it) +{ + return data.removeDataByWriteIt(table_id, write_it); +} -RegionData::ReadInfo Region::readDataByWriteIt(const RegionData::ConstWriteCFIter & write_it, std::vector * keys) +RegionData::ReadInfo Region::readDataByWriteIt( + const TableID & table_id, const RegionData::ConstWriteCFIter & write_it, RegionWriteCFDataTrait::Keys * keys) { - return data.readDataByWriteIt(write_it, keys); + return data.readDataByWriteIt(table_id, write_it, keys); } Region::LockInfoPtr Region::getLockInfo(TableID expected_table_id, UInt64 start_ts) @@ -379,9 +383,9 @@ std::unique_ptr Region::createCommittedScanner(TableID return std::make_unique(this->shared_from_this(), expected_table_id); } -std::unique_ptr Region::createCommittedRemover() +std::unique_ptr Region::createCommittedRemover(TableID expected_table_id) { - return std::make_unique(this->shared_from_this()); + return std::make_unique(this->shared_from_this(), expected_table_id); } std::string Region::toString(bool dump_status) const { return meta.toString(dump_status); } diff --git a/dbms/src/Storages/Transaction/Region.h b/dbms/src/Storages/Transaction/Region.h index efefb06e791..1fcf6d17005 100644 --- a/dbms/src/Storages/Transaction/Region.h +++ b/dbms/src/Storages/Transaction/Region.h @@ -46,55 +46,71 @@ class Region : public std::enable_shared_from_this { public: CommittedScanner(const RegionPtr & store_, TableID expected_table_id_) - : store(store_), lock(store_->mutex), expected_table_id(expected_table_id_), write_map_it(store->data.write_cf.map.cbegin()) - {} - - /// Check if next kv exists. - /// Return InvalidTableID if not. - TableID hasNext() + : store(store_), lock(store_->mutex), expected_table_id(expected_table_id_) { - if (expected_table_id != InvalidTableID) + const auto & data = store->data.write_cf.getData(); + if (auto it = data.find(expected_table_id); it != data.end()) { - for (; write_map_it != store->data.write_cf.map.cend(); ++write_map_it) - { - if (likely(std::get<0>(write_map_it->first) == expected_table_id)) - return expected_table_id; - } + found = true; + write_map_it = it->second.begin(); + write_map_it_end = it->second.end(); } else - { - if (write_map_it != store->data.write_cf.map.cend()) - return std::get<0>(write_map_it->first); - } - return InvalidTableID; + found = false; } - auto next(std::vector * keys = nullptr) { return store->readDataByWriteIt(write_map_it++, keys); } + bool hasNext() const { return found && write_map_it != write_map_it_end; } - LockInfoPtr getLockInfo(TableID expected_table_id, UInt64 start_ts) { return store->getLockInfo(expected_table_id, start_ts); } + auto next(RegionWriteCFDataTrait::Keys * keys = nullptr) + { + if (!found) + throw Exception(String() + "table: " + DB::toString(expected_table_id) + " is not found", ErrorCodes::LOGICAL_ERROR); + return store->readDataByWriteIt(expected_table_id, write_map_it++, keys); + } + + LockInfoPtr getLockInfo(UInt64 start_ts) { return store->getLockInfo(expected_table_id, start_ts); } private: RegionPtr store; std::shared_lock lock; + bool found; TableID expected_table_id; RegionData::ConstWriteCFIter write_map_it; + RegionData::ConstWriteCFIter write_map_it_end; }; class CommittedRemover : private boost::noncopyable { public: - CommittedRemover(const RegionPtr & store_) : store(store_), lock(store_->mutex) {} + CommittedRemover(const RegionPtr & store_, TableID expected_table_id_) + : store(store_), lock(store_->mutex), expected_table_id(expected_table_id_) + { + auto & data = store->data.write_cf.getDataMut(); + if (auto it = data.find(expected_table_id); it != data.end()) + { + found = true; + data_map = &(it->second); + } + else + found = false; + } void remove(const RegionWriteCFData::Key & key) { - if (auto it = store->data.write_cf.map.find(key); it != store->data.write_cf.map.end()) - store->removeDataByWriteIt(it); + if (!found) + return; + if (auto it = data_map->find(key); it != data_map->end()) + store->removeDataByWriteIt(expected_table_id, it); } private: RegionPtr store; std::unique_lock lock; + + bool found; + TableID expected_table_id; + RegionWriteCFData::Map * data_map; }; public: @@ -121,7 +137,7 @@ class Region : public std::enable_shared_from_this std::tuple, TableIDSet, bool> onCommand(const enginepb::CommandRequest & cmd); std::unique_ptr createCommittedScanner(TableID expected_table_id); - std::unique_ptr createCommittedRemover(); + std::unique_ptr createCommittedRemover(TableID expected_table_id); size_t serialize(WriteBuffer & buf, enginepb::CommandResponse * response = nullptr); static RegionPtr deserialize(ReadBuffer & buf, const RegionClientCreateFunc * region_client_create = nullptr); @@ -176,8 +192,8 @@ class Region : public std::enable_shared_from_this ColumnFamilyType getCf(const String & cf); RegionData::ReadInfo readDataByWriteIt( - const RegionData::ConstWriteCFIter & write_it, std::vector * keys = nullptr); - RegionData::WriteCFIter removeDataByWriteIt(const RegionData::WriteCFIter & write_it); + const TableID & table_id, const RegionData::ConstWriteCFIter & write_it, RegionWriteCFDataTrait::Keys * keys = nullptr); + RegionData::WriteCFIter removeDataByWriteIt(const TableID & table_id, const RegionData::WriteCFIter & write_it); LockInfoPtr getLockInfo(TableID expected_table_id, UInt64 start_ts); diff --git a/dbms/src/Storages/Transaction/RegionData.h b/dbms/src/Storages/Transaction/RegionData.h index 09ad1840fca..58dcf29c265 100644 --- a/dbms/src/Storages/Transaction/RegionData.h +++ b/dbms/src/Storages/Transaction/RegionData.h @@ -23,36 +23,59 @@ struct RegionWriteCFDataTrait using DecodedWriteCFValue = RecordKVFormat::DecodedWriteCFValue; using Key = std::tuple; using Value = std::tuple; - std::pair genKVPair(const TiKVKey & key, const String & raw_key, const TiKVValue & value) + using Map = std::map; + using Keys = std::vector; + + static std::pair genKVPair(const TiKVKey & key, const String & raw_key, const TiKVValue & value) { HandleID handle_id = RecordKVFormat::getHandle(raw_key); Timestamp ts = RecordKVFormat::getTs(key); return {Key{handle_id, ts}, Value{key, value, RecordKVFormat::decodeWriteCfValue(value)}}; } + static TiKVKey genTiKVKey(const TableID & table_id, const Key & key) + { + const auto & [handle_id, ts] = key; + auto tikv_key = RecordKVFormat::appendTs(RecordKVFormat::genKey(table_id, handle_id), ts); + return tikv_key; + } }; struct RegionDefaultCFDataTrait { using Key = std::tuple; using Value = std::tuple; - std::pair genKVPair(const TiKVKey & key, const String & raw_key, const TiKVValue & value) + using Map = std::map; + + static std::pair genKVPair(const TiKVKey & key, const String & raw_key, const TiKVValue & value) { HandleID handle_id = RecordKVFormat::getHandle(raw_key); Timestamp ts = RecordKVFormat::getTs(key); return {Key{handle_id, ts}, Value{key, value}}; } + static TiKVKey genTiKVKey(const TableID & table_id, const Key & key) + { + const auto & [handle_id, ts] = key; + auto tikv_key = RecordKVFormat::appendTs(RecordKVFormat::genKey(table_id, handle_id), ts); + return tikv_key; + } }; struct RegionLockCFDataTrait { using DecodedLockCFValue = RecordKVFormat::DecodedLockCFValue; - using Key = std::tuple; + using Key = HandleID; using Value = std::tuple; - std::pair genKVPair(const TiKVKey & key, const String & raw_key, const TiKVValue & value) + using Map = std::map; + + static std::pair genKVPair(const TiKVKey & key, const String & raw_key, const TiKVValue & value) { HandleID handle_id = RecordKVFormat::getHandle(raw_key); - Timestamp ts = RecordKVFormat::getTs(key); - return {Key{handle_id, ts}, Value{key, value, RecordKVFormat::decodeLockCfValue(value)}}; + return {handle_id, Value{key, value, RecordKVFormat::decodeLockCfValue(value)}}; + } + static TiKVKey genTiKVKey(const TableID & table_id, const Key & key) + { + auto tikv_key = RecordKVFormat::genKey(table_id, key); + return tikv_key; } }; @@ -61,7 +84,17 @@ struct RegionCFDataBase { using Key = typename Trait::Key; using Value = typename Trait::Value; - using Map = typename std::map; + using Map = typename Trait::Map; + + static const TiKVKey & getTiKVKey(const Value & val) + { + return std::get<0>(val); + } + + static const TiKVValue & getTiKVValue(const Value & val) + { + return std::get<1>(val); + } TableID insert(const TiKVKey & key, const TiKVValue & value) { @@ -82,9 +115,15 @@ struct RegionCFDataBase static size_t calcTiKVKeyValueSize(const Value & value) { - const auto & tikv_key = std::get<0>(value); - const auto & tikv_val = std::get<1>(value); - return tikv_key.dataSize() + tikv_val.dataSize(); + return calcTiKVKeyValueSize(getTiKVKey(value), getTiKVValue(value)); + } + + static size_t calcTiKVKeyValueSize(const TiKVKey & key, const TiKVValue & value) + { + if constexpr (std::is_same::value) + return 0; + else + return key.dataSize() + value.dataSize(); } size_t remove(TableID table_id, const Key & key) @@ -100,233 +139,152 @@ struct RegionCFDataBase } else { - const auto & [handle_id, ts] = key; - auto tikv_key = RecordKVFormat::appendTs(RecordKVFormat::genKey(table_id, handle_id), ts); + auto tikv_key = Trait::genTiKVKey(table_id, key); throw Exception(" key not found [" + tikv_key.toString() + "]", ErrorCodes::LOGICAL_ERROR); return 0; } } - RegionCFDataBase(){} - RegionCFDataBase(RegionCFDataBase && region): data(std::move(region.data)) {} - RegionCFDataBase & operator = (RegionCFDataBase && region) - { - data = std::move(region.data); - return *this; - } - - std::unordered_map data; -}; - -template -bool CFDataCmp(const Map & a, const Map & b) -{ - if (a.size() != b.size()) - return false; - for (const auto & [key, value] : a) + static bool cmp(const Map & a, const Map & b) { - if (auto it = b.find(key); it != b.end()) + if (a.size() != b.size()) + return false; + for (const auto & [key, value] : a) { - if (std::get<0>(value) != std::get<0>(it->second) || std::get<1>(value) != std::get<1>(it->second)) + if (auto it = b.find(key); it != b.end()) + { + if (getTiKVKey(value) != getTiKVKey(it->second) || getTiKVValue(value) != getTiKVValue(it->second)) + return false; + } + else return false; } - else - return false; - } - return true; -} - -struct RegionWriteCFData -{ - using DecodedWriteCFValue = RecordKVFormat::DecodedWriteCFValue; - using Key = std::tuple; - using Value = std::tuple; - using WriteCFDataMap = std::map; - - TableID insert(TableID table_id, HandleID handle_id, Timestamp ts, const TiKVKey & key, const TiKVValue & value) - { - auto [it, ok] = map.try_emplace({table_id, handle_id, ts}, Value{key, value, RecordKVFormat::decodeWriteCfValue(value)}); - std::ignore = it; - if (!ok) - throw Exception(" found existing key [" + key.toString() + "]", ErrorCodes::LOGICAL_ERROR); - - return table_id; - } - - TableID insert(const TiKVKey & key, const TiKVValue & value) - { - const String & raw_key = RecordKVFormat::decodeTiKVKey(key); - return insert(key, value, raw_key); - } - - TableID insert(const TiKVKey & key, const TiKVValue & value, const String & raw_key) - { - TableID table_id = RecordKVFormat::getTableId(raw_key); - HandleID handle_id = RecordKVFormat::getHandle(raw_key); - Timestamp ts = RecordKVFormat::getTs(key); - return insert(table_id, handle_id, ts, key, value); + return true; } - size_t remove(TableID table_id, HandleID handle_id, Timestamp ts) + bool operator == (const RegionCFDataBase & cf) const { - if (auto it = map.find({table_id, handle_id, ts}); it != map.end()) - { - const auto & [key, value, _] = it->second; - std::ignore = _; - size_t size = key.dataSize() + value.dataSize(); - map.erase(it); - return size; - } - else + const auto & cf_data = cf.data; + if (data.size() != cf_data.size()) + return false; + for (const auto & [table_id, map] : data) { - auto key = RecordKVFormat::appendTs(RecordKVFormat::genKey(table_id, handle_id), ts); - throw Exception(" key not found [" + key.toString() + "]", ErrorCodes::LOGICAL_ERROR); - - return 0; + if (auto it = cf_data.find(table_id); it != cf_data.end()) + { + if (!cmp(map, it->second)) + return false; + } + else + return false; } + return true; } - friend bool operator==(const RegionWriteCFData & cf1, const RegionWriteCFData & cf2) - { - return CFDataCmp(cf1.map, cf2.map); - } - - RegionWriteCFData(){} - RegionWriteCFData(RegionWriteCFData && data): map(std::move(data.map)) {} - RegionWriteCFData & operator = (RegionWriteCFData && data) + RegionCFDataBase(){} + RegionCFDataBase(RegionCFDataBase && region): data(std::move(region.data)) {} + RegionCFDataBase & operator = (RegionCFDataBase && region) { - map = std::move(data.map); + data = std::move(region.data); return *this; } - WriteCFDataMap map; -}; - -struct RegionDefaultCFData -{ - using Key = std::tuple; - using Value = std::tuple; - using DefaultCFDataMap = std::map; - - TableID insert(TableID table_id, HandleID handle_id, Timestamp ts, const TiKVKey & key, const TiKVValue & value) - { - auto [it, ok] = map.try_emplace({table_id, handle_id, ts}, Value{key, value}); - std::ignore = it; - if (!ok) - throw Exception(" found existing key [" + key.toString() + "]", ErrorCodes::LOGICAL_ERROR); - - return table_id; - } - - TableID insert(const TiKVKey & key, const TiKVValue & value) + size_t splitInto(const RegionRange & range, RegionCFDataBase & new_region_data) { - const String & raw_key = RecordKVFormat::decodeTiKVKey(key); - return insert(key, value, raw_key); - } - - TableID insert(const TiKVKey & key, const TiKVValue & value, const String & raw_key) - { - TableID table_id = RecordKVFormat::getTableId(raw_key); - HandleID handle_id = RecordKVFormat::getHandle(raw_key); - Timestamp ts = RecordKVFormat::getTs(key); - return insert(table_id, handle_id, ts, key, value); - } + const auto & [start_key, end_key] = range; + size_t size_changed = 0; - size_t remove(TableID table_id, HandleID handle_id, Timestamp ts) - { - if (auto it = map.find({table_id, handle_id, ts}); it != map.end()) + for (auto data_it = data.begin(); data_it != data.end(); ) { - const auto & [key, value] = it->second; - size_t size = key.dataSize() + value.dataSize(); - map.erase(it); - return size; - } - else - { - auto key = RecordKVFormat::appendTs(RecordKVFormat::genKey(table_id, handle_id), ts); - throw Exception(" key not found [" + key.toString() + "]", ErrorCodes::LOGICAL_ERROR); + const auto & table_id = data_it->first; + auto & ori_map = data_it->second; + if (ori_map.empty()) + { + data_it = data.erase(data_it); + continue; + } - return 0; - } - } + auto & tar_map = new_region_data.data[table_id]; - friend bool operator==(const RegionDefaultCFData & cf1, const RegionDefaultCFData & cf2) - { - return CFDataCmp(cf1.map, cf2.map); - } + for (auto it = ori_map.begin(); it != ori_map.end();) + { + const auto & key = getTiKVKey(it->second); + + bool ok = start_key ? key >= start_key : true; + ok = ok && (end_key ? key < end_key : true); + if (ok) + { + size_changed += calcTiKVKeyValueSize(it->second); + tar_map.insert(std::move(*it)); + it = ori_map.erase(it); + } + else + ++it; + } - RegionDefaultCFData(){} - RegionDefaultCFData(RegionDefaultCFData && data): map(std::move(data.map)) {} - RegionDefaultCFData & operator = (RegionDefaultCFData && data) - { - map = std::move(data.map); - return *this; + ++data_it; + } + return size_changed; } - DefaultCFDataMap map; -}; - -struct RegionLockCFData -{ - using DecodedLockCFValue = std::tuple>; - using Key = std::tuple; - using Value = std::tuple; - using LockCFDataMap = std::map; - - TableID insert(TableID table_id, HandleID handle_id, const TiKVKey & key, const TiKVValue & value) + size_t serialize(WriteBuffer & buf) const { - auto [it, ok] = map.try_emplace({table_id, handle_id}, Value{key, value, RecordKVFormat::decodeLockCfValue(value)}); - std::ignore = it; + size_t total_size = 0; - if (!ok) - throw Exception(" found existing key [" + key.toString() + "]", ErrorCodes::LOGICAL_ERROR); + size_t size = 0; + for (auto data_it = data.begin(); data_it != data.end(); ++data_it) + size += data_it->second.size(); - return table_id; - } + total_size += writeBinary2(size, buf); - TableID insert(const TiKVKey & key, const TiKVValue & value) - { - const String & raw_key = RecordKVFormat::decodeTiKVKey(key); - return insert(key, value, raw_key); - } + for (const auto & [table_id, map] : data) + { + std::ignore = table_id; + for (const auto & ele : map) + { + const auto & key = getTiKVKey(ele.second); + const auto & value = getTiKVValue(ele.second); + total_size += key.serialize(buf); + total_size += value.serialize(buf); + } + } - TableID insert(const TiKVKey & key, const TiKVValue & value, const String & raw_key) - { - TableID table_id = RecordKVFormat::getTableId(raw_key); - HandleID handle_id = RecordKVFormat::getHandle(raw_key); - return insert(table_id, handle_id, key, value); + return total_size; } - void remove(TableID table_id, HandleID handle_id) + static size_t deserialize(ReadBuffer & buf, RegionCFDataBase & new_region_data) { - if (auto it = map.find({table_id, handle_id}); it != map.end()) - { - map.erase(it); - } - else + size_t size = readBinary2(buf); + size_t cf_data_size = 0; + for (size_t i = 0; i < size; ++i) { - auto key = RecordKVFormat::genKey(table_id, handle_id); - throw Exception(" key not found [" + key.toString() + "]", ErrorCodes::LOGICAL_ERROR); + auto key = TiKVKey::deserialize(buf); + auto value = TiKVValue::deserialize(buf); + + new_region_data.insert(key, value); + cf_data_size += calcTiKVKeyValueSize(key, value); } + return cf_data_size; } - friend bool operator==(const RegionLockCFData & cf1, const RegionLockCFData & cf2) + const auto & getData() const { - return CFDataCmp(cf1.map, cf2.map); + return data; } - RegionLockCFData(){} - RegionLockCFData(RegionLockCFData && data): map(std::move(data.map)) {} - RegionLockCFData & operator = (RegionLockCFData && data) + auto & getDataMut() { - map = std::move(data.map); - return *this; + return data; } - LockCFDataMap map; +private: + std::unordered_map data; }; +using RegionWriteCFData = RegionCFDataBase; +using RegionDefaultCFData = RegionCFDataBase; +using RegionLockCFData = RegionCFDataBase; + class RegionData { public: @@ -354,8 +312,8 @@ class RegionData }; using ReadInfo = std::tuple; - using WriteCFIter = RegionWriteCFData::WriteCFDataMap::iterator; - using ConstWriteCFIter = RegionWriteCFData::WriteCFDataMap::const_iterator; + using WriteCFIter = RegionWriteCFData::Map::iterator; + using ConstWriteCFIter = RegionWriteCFData::Map::const_iterator; using LockInfoPtr = std::unique_ptr; @@ -365,13 +323,15 @@ class RegionData { case Write: { + auto table_id = write_cf.insert(key, value, raw_key); cf_data_size += key.dataSize() + value.dataSize(); - return write_cf.insert(key, value, raw_key); + return table_id; } case Default: { + auto table_id = default_cf.insert(key, value, raw_key); cf_data_size += key.dataSize() + value.dataSize(); - return default_cf.insert(key, value, raw_key); + return table_id; } case Lock: { @@ -391,7 +351,7 @@ class RegionData TableID table_id = RecordKVFormat::getTableId(raw_key); HandleID handle_id = RecordKVFormat::getHandle(raw_key); Timestamp ts = RecordKVFormat::getTs(key); - cf_data_size -= write_cf.remove(table_id, handle_id, ts); + cf_data_size -= write_cf.remove(table_id, RegionWriteCFData::Key{handle_id, ts}); return table_id; } case Default: @@ -399,7 +359,7 @@ class RegionData TableID table_id = RecordKVFormat::getTableId(raw_key); HandleID handle_id = RecordKVFormat::getHandle(raw_key); Timestamp ts = RecordKVFormat::getTs(key); - cf_data_size -= default_cf.remove(table_id, handle_id, ts); + cf_data_size -= default_cf.remove(table_id, RegionDefaultCFData::Key{handle_id, ts}); return table_id; } case Lock: @@ -414,35 +374,37 @@ class RegionData } } - WriteCFIter removeDataByWriteIt(const WriteCFIter & write_it) + WriteCFIter removeDataByWriteIt(const TableID & table_id, const WriteCFIter & write_it) { const auto & [key, value, decoded_val] = write_it->second; - const auto & [table, handle, ts] = write_it->first; - std::ignore = ts; + const auto & [handle, ts] = write_it->first; const auto & [write_type, prewrite_ts, short_str] = decoded_val; - auto data_it = default_cf.map.find({table, handle, prewrite_ts}); + std::ignore = ts; + std::ignore = value; if (write_type == PutFlag && !short_str) { - if (unlikely(data_it == default_cf.map.end())) - throw Exception(" key [" + key.toString() + "] not found in data cf when removing", ErrorCodes::LOGICAL_ERROR); - - const auto & [key, value] = data_it->second; + auto & map = default_cf.getDataMut()[table_id]; - cf_data_size -= key.dataSize() + value.dataSize(); - default_cf.map.erase(data_it); + if (auto data_it = map.find({handle, prewrite_ts}); data_it != map.end()) + { + cf_data_size -= RegionDefaultCFData::calcTiKVKeyValueSize(data_it->second); + map.erase(data_it); + } + else + throw Exception(" key [" + key.toString() + "] not found in data cf when removing", ErrorCodes::LOGICAL_ERROR); } - cf_data_size -= key.dataSize() + value.dataSize(); + cf_data_size -= RegionWriteCFData::calcTiKVKeyValueSize(write_it->second); - return write_cf.map.erase(write_it); + return write_cf.getDataMut()[table_id].erase(write_it); } - ReadInfo readDataByWriteIt(const ConstWriteCFIter & write_it, std::vector * keys) + ReadInfo readDataByWriteIt(const TableID & table_id, const ConstWriteCFIter & write_it, RegionWriteCFDataTrait::Keys * keys) const { const auto & [key, value, decoded_val] = write_it->second; - const auto & [table, handle, ts] = write_it->first; + const auto & [handle, ts] = write_it->first; std::ignore = value; @@ -457,93 +419,51 @@ class RegionData if (short_value) return std::make_tuple(handle, write_type, ts, TiKVValue(*short_value)); - auto data_it = default_cf.map.find({table, handle, prewrite_ts}); - if (unlikely(data_it == default_cf.map.end())) - throw Exception("key [" + key.toString() + "] not found in data cf", ErrorCodes::LOGICAL_ERROR); - - return std::make_tuple(handle, write_type, ts, std::get<1>(data_it->second)); - } - - LockInfoPtr getLockInfo(TableID expected_table_id, Timestamp start_ts) - { - for (const auto & [key, value] : lock_cf.map) + if (auto map_it = default_cf.getData().find(table_id); map_it != default_cf.getData().end()) { - const auto & [table_id, handle] = key; - if (expected_table_id != table_id) - continue; - std::ignore = handle; - - const auto & [tikv_key, tikv_val, decoded_val] = value; - const auto & [lock_type, primary, ts, ttl, data] = decoded_val; - std::ignore = tikv_val; - std::ignore = data; - - if (lock_type == DelFlag || ts > start_ts) - continue; - - return std::make_unique(LockInfo{primary, ts, RecordKVFormat::decodeTiKVKey(tikv_key), ttl}); + const auto & map = map_it->second; + if (auto data_it = map.find({handle, prewrite_ts}); data_it != map.end()) + return std::make_tuple(handle, write_type, ts, RegionDefaultCFData::getTiKVValue(data_it->second)); + else + throw Exception(" key [" + key.toString() + "] not found in data cf", ErrorCodes::LOGICAL_ERROR); } - - return nullptr; + else + throw Exception(" table [" + toString(table_id) + "] not found in data cf", ErrorCodes::LOGICAL_ERROR); } - void splitInto(const RegionRange & range, RegionData & new_region_data) + LockInfoPtr getLockInfo(TableID expected_table_id, Timestamp start_ts) const { - const auto & [start_key, end_key] = range; - - for (auto it = default_cf.map.begin(); it != default_cf.map.end();) + if (auto it = lock_cf.getData().find(expected_table_id); it != lock_cf.getData().end()) { - const auto & [key, val] = it->second; - - bool ok = start_key ? key >= start_key : true; - ok = ok && (end_key ? key < end_key : true); - if (ok) + for (const auto & [handle, value] : it->second) { - cf_data_size -= key.dataSize() + val.dataSize(); - new_region_data.cf_data_size += key.dataSize() + val.dataSize(); - - new_region_data.default_cf.map.insert(std::move(*it)); - it = default_cf.map.erase(it); - } - else - ++it; - } + std::ignore = handle; - for (auto it = write_cf.map.begin(); it != write_cf.map.end();) - { - const auto & [key, val, _] = it->second; - std::ignore = _; + const auto & [tikv_key, tikv_val, decoded_val] = value; + const auto & [lock_type, primary, ts, ttl, data] = decoded_val; + std::ignore = tikv_val; + std::ignore = data; - bool ok = start_key ? key >= start_key : true; - ok = ok && (end_key ? key < end_key : true); - if (ok) - { - cf_data_size -= key.dataSize() + val.dataSize(); - new_region_data.cf_data_size += key.dataSize() + val.dataSize(); + if (lock_type == DelFlag || ts > start_ts) + continue; - new_region_data.write_cf.map.insert(std::move(*it)); - it = write_cf.map.erase(it); + return std::make_unique(LockInfo{primary, ts, RecordKVFormat::decodeTiKVKey(tikv_key), ttl}); } - else - ++it; - } - for (auto it = lock_cf.map.begin(); it != lock_cf.map.end();) - { - const auto & [key, val, _] = it->second; - std::ignore = val; - std::ignore = _; - - bool ok = start_key ? key >= start_key : true; - ok = ok && (end_key ? key < end_key : true); - if (ok) - { - new_region_data.lock_cf.map.insert(std::move(*it)); - it = lock_cf.map.erase(it); - } - else - ++it; + return nullptr; } + else + return nullptr; + } + + void splitInto(const RegionRange & range, RegionData & new_region_data) + { + size_t size_changed = 0; + size_changed += default_cf.splitInto(range, new_region_data.default_cf); + size_changed += write_cf.splitInto(range, new_region_data.write_cf); + size_changed += lock_cf.splitInto(range, new_region_data.lock_cf); + cf_data_size -= size_changed; + new_region_data.cf_data_size += size_changed; } size_t dataSize() const { return cf_data_size; } @@ -557,71 +477,25 @@ class RegionData cf_data_size = new_region_data.cf_data_size.load(); } - size_t serialize(WriteBuffer & buf) + size_t serialize(WriteBuffer & buf) const { size_t total_size = 0; - total_size += writeBinary2(default_cf.map.size(), buf); - for (const auto & ele : default_cf.map) - { - const auto & [key, value] = ele.second; - total_size += key.serialize(buf); - total_size += value.serialize(buf); - } - - total_size += writeBinary2(write_cf.map.size(), buf); - for (const auto & ele : write_cf.map) - { - const auto & [key, value, _] = ele.second; - std::ignore = _; - - total_size += key.serialize(buf); - total_size += value.serialize(buf); - } - - total_size += writeBinary2(lock_cf.map.size(), buf); - for (const auto & ele : lock_cf.map) - { - const auto & [key, value, _] = ele.second; - std::ignore = _; - - total_size += key.serialize(buf); - total_size += value.serialize(buf); - } + total_size += default_cf.serialize(buf); + total_size += write_cf.serialize(buf); + total_size += lock_cf.serialize(buf); return total_size; } static void deserialize(ReadBuffer & buf, RegionData & region_data) { - auto size = readBinary2(buf); - for (size_t i = 0; i < size; ++i) - { - auto key = TiKVKey::deserialize(buf); - auto value = TiKVValue::deserialize(buf); - - region_data.default_cf.insert(key, value); - region_data.cf_data_size += key.dataSize() + value.dataSize(); - } - - size = readBinary2(buf); - for (size_t i = 0; i < size; ++i) - { - auto key = TiKVKey::deserialize(buf); - auto value = TiKVValue::deserialize(buf); - - region_data.write_cf.insert(key, value); - region_data.cf_data_size += key.dataSize() + value.dataSize(); - } - - size = readBinary2(buf); - for (size_t i = 0; i < size; ++i) - { - auto key = TiKVKey::deserialize(buf); - auto value = TiKVValue::deserialize(buf); + size_t total_size = 0; + total_size += RegionDefaultCFData::deserialize(buf, region_data.default_cf); + total_size += RegionWriteCFData::deserialize(buf, region_data.write_cf); + total_size += RegionLockCFData::deserialize(buf, region_data.lock_cf); - region_data.lock_cf.insert(key, value); - } + region_data.cf_data_size += total_size; } friend bool operator==(const RegionData & r1, const RegionData & r2) diff --git a/dbms/src/Storages/Transaction/RegionTable.cpp b/dbms/src/Storages/Transaction/RegionTable.cpp index 9464a7d6fc1..0a75daf4862 100644 --- a/dbms/src/Storages/Transaction/RegionTable.cpp +++ b/dbms/src/Storages/Transaction/RegionTable.cpp @@ -206,7 +206,7 @@ void RegionTable::flushRegion(TableID table_id, RegionID region_id, size_t & cac auto region = tmt.kvstore->getRegion(region_id); if (!region) return; - auto remover = region->createCommittedRemover(); + auto remover = region->createCommittedRemover(table_id); for (const auto & key : keys_to_remove) remover->remove(key); cache_size = region->dataSize(); From 347821539dae707e2bfd8e5e0e536956f3295788 Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Thu, 4 Apr 2019 16:55:57 +0800 Subject: [PATCH 03/11] Fix. --- .../Storages/Transaction/PartitionStreams.cpp | 9 ++++---- .../Transaction/RegionBlockReader.cpp | 16 +++---------- .../Storages/Transaction/RegionBlockReader.h | 2 +- dbms/src/Storages/Transaction/RegionData.h | 23 ++++++++++++++----- dbms/src/Storages/Transaction/RegionTable.cpp | 2 +- dbms/src/Storages/Transaction/RegionTable.h | 4 ++-- 6 files changed, 28 insertions(+), 28 deletions(-) diff --git a/dbms/src/Storages/Transaction/PartitionStreams.cpp b/dbms/src/Storages/Transaction/PartitionStreams.cpp index 942acafb4b8..ab72678d6d8 100644 --- a/dbms/src/Storages/Transaction/PartitionStreams.cpp +++ b/dbms/src/Storages/Transaction/PartitionStreams.cpp @@ -17,7 +17,7 @@ std::tuple RegionTab const TiDB::TableInfo & table_info, const ColumnsDescription & columns, const Names & ordered_columns, - std::vector * keys) + RegionWriteCFDataTrait::Keys * keys) { return getBlockInputStreamByRegion(table_id, tmt.kvstore->getRegion(region_id), @@ -42,7 +42,7 @@ std::tuple RegionTab bool learner_read, bool resolve_locks, UInt64 start_ts, - std::vector * keys) + RegionWriteCFDataTrait::Keys * keys) { if (!region) return {nullptr, NOT_FOUND, 0}; @@ -75,11 +75,10 @@ std::tuple RegionTab } } - auto next_table_id = scanner->hasNext(); - if (next_table_id == InvalidTableID) + if (!scanner->hasNext()) return {nullptr, OK, 0}; - const auto [table_info, columns, ordered_columns] = schema_fetcher(next_table_id); + const auto [table_info, columns, ordered_columns] = schema_fetcher(table_id); auto block = RegionBlockRead(*table_info, *columns, *ordered_columns, scanner, keys); size_t tol = block.rows(); diff --git a/dbms/src/Storages/Transaction/RegionBlockReader.cpp b/dbms/src/Storages/Transaction/RegionBlockReader.cpp index 72dd08cf9cd..06d6ab4e14d 100644 --- a/dbms/src/Storages/Transaction/RegionBlockReader.cpp +++ b/dbms/src/Storages/Transaction/RegionBlockReader.cpp @@ -36,7 +36,7 @@ static const Field MockDecodeRow(TiDB::CodecFlag flag) } Block RegionBlockRead(const TiDB::TableInfo & table_info, const ColumnsDescription & columns, const Names & ordered_columns_, - ScannerPtr & scanner, std::vector * keys) + ScannerPtr & scanner, RegionWriteCFDataTrait::Keys * keys) { // Note: this code below is mostly ported from RegionBlockInputStream. Names ordered_columns = ordered_columns_; @@ -73,13 +73,8 @@ Block RegionBlockRead(const TiDB::TableInfo & table_info, const ColumnsDescripti const auto & date_lut = DateLUT::instance(); // TODO: lock region, to avoid region adding/droping while writing data - - TableID my_table_id = table_info.id; - TableID next_table_id = InvalidTableID; - - // Here we use do{}while() instead of while(){}, // Because the first check of scanner.hasNext() already been done outside of this function. - while (true) + do { // TODO: comfirm all this mess auto [handle, write_type, commit_ts, value] = scanner->next(keys); @@ -161,12 +156,7 @@ Block RegionBlockRead(const TiDB::TableInfo & table_info, const ColumnsDescripti } column_map[handle_id].first->insert(Field(handle)); } - - next_table_id = scanner->hasNext(); - - if (next_table_id == InvalidTableID || next_table_id != my_table_id) - break; - } + } while (scanner->hasNext()); Block block; for (const auto & name : ordered_columns) diff --git a/dbms/src/Storages/Transaction/RegionBlockReader.h b/dbms/src/Storages/Transaction/RegionBlockReader.h index 9d356ef8524..c3f93022e69 100644 --- a/dbms/src/Storages/Transaction/RegionBlockReader.h +++ b/dbms/src/Storages/Transaction/RegionBlockReader.h @@ -25,6 +25,6 @@ extern const int LOGICAL_ERROR; using ScannerPtr = std::unique_ptr; Block RegionBlockRead(const TiDB::TableInfo & table_info, const ColumnsDescription & columns, const Names & ordered_columns_, - ScannerPtr & curr_scanner, std::vector * keys = nullptr); + ScannerPtr & curr_scanner, RegionWriteCFDataTrait::Keys * keys = nullptr); } // namespace DB diff --git a/dbms/src/Storages/Transaction/RegionData.h b/dbms/src/Storages/Transaction/RegionData.h index 58dcf29c265..1dc714aafce 100644 --- a/dbms/src/Storages/Transaction/RegionData.h +++ b/dbms/src/Storages/Transaction/RegionData.h @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -24,7 +25,7 @@ struct RegionWriteCFDataTrait using Key = std::tuple; using Value = std::tuple; using Map = std::map; - using Keys = std::vector; + using Keys = std::list; static std::pair genKVPair(const TiKVKey & key, const String & raw_key, const TiKVValue & value) { @@ -165,11 +166,15 @@ struct RegionCFDataBase bool operator == (const RegionCFDataBase & cf) const { - const auto & cf_data = cf.data; - if (data.size() != cf_data.size()) + if (getSize() != cf.getSize()) return false; + + const auto & cf_data = cf.data; for (const auto & [table_id, map] : data) { + if (map.empty()) + continue; + if (auto it = cf_data.find(table_id); it != cf_data.end()) { if (!cmp(map, it->second)) @@ -181,6 +186,14 @@ struct RegionCFDataBase return true; } + size_t getSize() const + { + size_t size = 0; + for (auto data_it = data.begin(); data_it != data.end(); ++data_it) + size += data_it->second.size(); + return size; + } + RegionCFDataBase(){} RegionCFDataBase(RegionCFDataBase && region): data(std::move(region.data)) {} RegionCFDataBase & operator = (RegionCFDataBase && region) @@ -231,9 +244,7 @@ struct RegionCFDataBase { size_t total_size = 0; - size_t size = 0; - for (auto data_it = data.begin(); data_it != data.end(); ++data_it) - size += data_it->second.size(); + size_t size = getSize(); total_size += writeBinary2(size, buf); diff --git a/dbms/src/Storages/Transaction/RegionTable.cpp b/dbms/src/Storages/Transaction/RegionTable.cpp index 0a75daf4862..851859f1c5b 100644 --- a/dbms/src/Storages/Transaction/RegionTable.cpp +++ b/dbms/src/Storages/Transaction/RegionTable.cpp @@ -170,7 +170,7 @@ void RegionTable::flushRegion(TableID table_id, RegionID region_id, size_t & cac TMTContext & tmt = context.getTMTContext(); - std::vector keys_to_remove; + RegionWriteCFDataTrait::Keys keys_to_remove; { auto merge_tree = std::dynamic_pointer_cast(storage); diff --git a/dbms/src/Storages/Transaction/RegionTable.h b/dbms/src/Storages/Transaction/RegionTable.h index 275f50a5bf8..c286a52bf08 100644 --- a/dbms/src/Storages/Transaction/RegionTable.h +++ b/dbms/src/Storages/Transaction/RegionTable.h @@ -189,7 +189,7 @@ class RegionTable : private boost::noncopyable const TiDB::TableInfo & table_info, const ColumnsDescription & columns, const Names & ordered_columns, - std::vector * keys); + RegionWriteCFDataTrait::Keys * keys); static std::tuple getBlockInputStreamByRegion(TableID table_id, RegionPtr region, @@ -201,7 +201,7 @@ class RegionTable : private boost::noncopyable bool learner_read, bool resolve_locks, UInt64 start_ts, - std::vector * keys = nullptr); + RegionWriteCFDataTrait::Keys * keys = nullptr); static TableIDSet getRegionTableIds(const RegionPtr & region); From 1d196672a77cb8fd5f8a61a6b32144a3c80dea84 Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Thu, 4 Apr 2019 18:11:36 +0800 Subject: [PATCH 04/11] test --- dbms/src/Debug/DBGInvoker.cpp | 1 + dbms/src/Debug/dbgFuncRegion.cpp | 16 ++++++++++++++++ dbms/src/Debug/dbgFuncRegion.h | 2 ++ 3 files changed, 19 insertions(+) diff --git a/dbms/src/Debug/DBGInvoker.cpp b/dbms/src/Debug/DBGInvoker.cpp index ee3c5726b77..a08d899d91c 100644 --- a/dbms/src/Debug/DBGInvoker.cpp +++ b/dbms/src/Debug/DBGInvoker.cpp @@ -52,6 +52,7 @@ DBGInvoker::DBGInvoker() regFunc("rm_region_data", dbgFuncRegionRmData); regFunc("dump_region", dbgFuncDumpRegion); + regFunc("dump_all_region", dbgFuncDumpAllRegion); } void replaceSubstr(std::string & str, const std::string & target, const std::string & replacement) diff --git a/dbms/src/Debug/dbgFuncRegion.cpp b/dbms/src/Debug/dbgFuncRegion.cpp index 15dad94e0d0..b43132f1f19 100644 --- a/dbms/src/Debug/dbgFuncRegion.cpp +++ b/dbms/src/Debug/dbgFuncRegion.cpp @@ -172,6 +172,22 @@ std::string getEndKeyString(TableID table_id, const TiKVKey & end_key) } } +void dbgFuncDumpAllRegion(Context & context, const ASTs & args, DBGInvoker::Printer output) +{ + auto & tmt = context.getTMTContext(); + TableID table_id = (TableID)safeGet(typeid_cast(*args[0]).value); + size_t size = 0; + tmt.kvstore->traverseRegions([&](const RegionID region_id, const RegionPtr & region) { + std::ignore = region_id; + auto range = region->getHandleRangeByTable(table_id); + size += 1; + std::stringstream ss; + ss << "table #" << table_id << " " << region->toString() << " ranges: " << range.first << ", " << range.second; + output(ss.str()); + }); + output("total size: " + toString(size)); +} + void dbgFuncDumpRegion(Context & context, const ASTs & args, DBGInvoker::Printer output) { if (args.size() > 1) diff --git a/dbms/src/Debug/dbgFuncRegion.h b/dbms/src/Debug/dbgFuncRegion.h index 3e1ff2a7c0a..fef43461f69 100644 --- a/dbms/src/Debug/dbgFuncRegion.h +++ b/dbms/src/Debug/dbgFuncRegion.h @@ -30,4 +30,6 @@ void dbgFuncDumpRegion(Context& context, const ASTs& args, DBGInvoker::Printer o // ./storage-client.sh "DBGInvoke rm_region_data(region_id)" void dbgFuncRegionRmData(Context & context, const ASTs & args, DBGInvoker::Printer output); +void dbgFuncDumpAllRegion(Context& context, const ASTs& args, DBGInvoker::Printer output); + } From c7e73a4756044ac4d4386845b9cf1b34f01b8b3f Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Thu, 4 Apr 2019 19:52:56 +0800 Subject: [PATCH 05/11] Fix bug: getRegionTableIds. --- dbms/src/Storages/Transaction/KVStore.cpp | 2 +- dbms/src/Storages/Transaction/Region.cpp | 2 ++ dbms/src/Storages/Transaction/Region.h | 6 ++-- dbms/src/Storages/Transaction/RegionData.h | 29 +++++++++++++++- dbms/src/Storages/Transaction/RegionTable.cpp | 33 ++----------------- dbms/src/Storages/Transaction/RegionTable.h | 2 -- 6 files changed, 37 insertions(+), 37 deletions(-) diff --git a/dbms/src/Storages/Transaction/KVStore.cpp b/dbms/src/Storages/Transaction/KVStore.cpp index a5b725728b2..ce636be2815 100644 --- a/dbms/src/Storages/Transaction/KVStore.cpp +++ b/dbms/src/Storages/Transaction/KVStore.cpp @@ -55,7 +55,7 @@ void KVStore::onSnapshot(RegionPtr new_region, Context * context) { TMTContext * tmt_ctx = context ? &(context->getTMTContext()) : nullptr; - auto table_ids = RegionTable::getRegionTableIds(new_region); + auto table_ids = new_region->getCommittedRecordTableID(); { std::lock_guard lock(task_mutex); diff --git a/dbms/src/Storages/Transaction/Region.cpp b/dbms/src/Storages/Transaction/Region.cpp index 1043398ffff..406b52fed19 100644 --- a/dbms/src/Storages/Transaction/Region.cpp +++ b/dbms/src/Storages/Transaction/Region.cpp @@ -450,4 +450,6 @@ void Region::reset(Region && new_region) bool Region::isPeerRemoved() const { return meta.isPeerRemoved(); } +TableIDSet Region::getCommittedRecordTableID() { return data.getCommittedRecordTableID(); } + } // namespace DB diff --git a/dbms/src/Storages/Transaction/Region.h b/dbms/src/Storages/Transaction/Region.h index 1fcf6d17005..010896c3030 100644 --- a/dbms/src/Storages/Transaction/Region.h +++ b/dbms/src/Storages/Transaction/Region.h @@ -48,7 +48,7 @@ class Region : public std::enable_shared_from_this CommittedScanner(const RegionPtr & store_, TableID expected_table_id_) : store(store_), lock(store_->mutex), expected_table_id(expected_table_id_) { - const auto & data = store->data.write_cf.getData(); + const auto & data = store->data.writeCF().getData(); if (auto it = data.find(expected_table_id); it != data.end()) { found = true; @@ -86,7 +86,7 @@ class Region : public std::enable_shared_from_this CommittedRemover(const RegionPtr & store_, TableID expected_table_id_) : store(store_), lock(store_->mutex), expected_table_id(expected_table_id_) { - auto & data = store->data.write_cf.getDataMut(); + auto & data = store->data.writeCFMute().getDataMut(); if (auto it = data.find(expected_table_id); it != data.end()) { found = true; @@ -182,6 +182,8 @@ class Region : public std::enable_shared_from_this void reset(Region && new_region); + TableIDSet getCommittedRecordTableID(); + private: // Private methods no need to lock mutex, normally diff --git a/dbms/src/Storages/Transaction/RegionData.h b/dbms/src/Storages/Transaction/RegionData.h index 1dc714aafce..3b2abe87b32 100644 --- a/dbms/src/Storages/Transaction/RegionData.h +++ b/dbms/src/Storages/Transaction/RegionData.h @@ -288,6 +288,18 @@ struct RegionCFDataBase return data; } + TableIDSet getAllRecordTableID() const + { + TableIDSet tables; + for (const auto & [table_id, map] : data) + { + if (map.empty()) + continue; + tables.insert(table_id); + } + return tables; + } + private: std::unordered_map data; }; @@ -515,11 +527,26 @@ class RegionData && r1.lock_cf == r2.lock_cf && r1.cf_data_size == r2.cf_data_size; } + RegionWriteCFData & writeCFMute() + { + return write_cf; + } + + const RegionWriteCFData & writeCF() const + { + return write_cf; + } + + TableIDSet getCommittedRecordTableID() const + { + return writeCF().getAllRecordTableID(); + } + RegionData() {} RegionData(RegionData && data):write_cf(std::move(data.write_cf)),default_cf(std::move(data.default_cf)),lock_cf(std::move(data.lock_cf)) {} -public: +private: RegionWriteCFData write_cf; RegionDefaultCFData default_cf; RegionLockCFData lock_cf; diff --git a/dbms/src/Storages/Transaction/RegionTable.cpp b/dbms/src/Storages/Transaction/RegionTable.cpp index 851859f1c5b..27df2c892dd 100644 --- a/dbms/src/Storages/Transaction/RegionTable.cpp +++ b/dbms/src/Storages/Transaction/RegionTable.cpp @@ -15,31 +15,6 @@ extern const int LOGICAL_ERROR; extern const int UNKNOWN_TABLE; } // namespace ErrorCodes -// ============================================================= -// Static methods. -// ============================================================= - -TableIDSet RegionTable::getRegionTableIds(const RegionPtr & region) -{ - TableIDSet table_ids; - { - auto scanner = region->createCommittedScanner(InvalidTableID); - while (true) - { - TableID table_id = scanner->hasNext(); - if (table_id == InvalidTableID) - break; - table_ids.emplace(table_id); - scanner->next(); - } - } - return table_ids; -} - -// ============================================================= -// Private member functions. -// ============================================================= - RegionTable::Table & RegionTable::getOrCreateTable(TableID table_id) { auto it = tables.find(table_id); @@ -219,10 +194,6 @@ void RegionTable::flushRegion(TableID table_id, RegionID region_id, size_t & cac } } -// ============================================================= -// Public member functions. -// ============================================================= - static const Int64 FTH_BYTES_1 = 1024; // 1 KB static const Int64 FTH_BYTES_2 = 1024 * 1024; // 1 MB static const Int64 FTH_BYTES_3 = 1024 * 1024 * 10; // 10 MBs @@ -307,7 +278,7 @@ void RegionTable::updateRegion(const RegionPtr & region, const TableIDSet & rela void RegionTable::applySnapshotRegion(const RegionPtr & region) { - auto table_ids = getRegionTableIds(region); + auto table_ids = region->getCommittedRecordTableID(); return applySnapshotRegion(region, table_ids); } @@ -320,7 +291,7 @@ void RegionTable::applySnapshotRegions(const ::DB::RegionMap & region_map) { std::ignore = id; size_t cache_bytes = region->dataSize(); - auto table_ids = getRegionTableIds(region); + auto table_ids = region->getCommittedRecordTableID(); for (auto table_id : table_ids) { auto & internal_region = getOrInsertRegion(table_id, region, table_to_persist); diff --git a/dbms/src/Storages/Transaction/RegionTable.h b/dbms/src/Storages/Transaction/RegionTable.h index c286a52bf08..0edfd2a2bb1 100644 --- a/dbms/src/Storages/Transaction/RegionTable.h +++ b/dbms/src/Storages/Transaction/RegionTable.h @@ -203,8 +203,6 @@ class RegionTable : private boost::noncopyable UInt64 start_ts, RegionWriteCFDataTrait::Keys * keys = nullptr); - static TableIDSet getRegionTableIds(const RegionPtr & region); - // For debug void dumpRegionMap(RegionTable::RegionMap & res); void dropRegionsInTable(TableID table_id); From 6c0d67a7b2d0c1f9fd290d24d7ba06699cc7a155 Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Fri, 5 Apr 2019 15:38:52 +0800 Subject: [PATCH 06/11] format. --- dbms/src/DataStreams/RangesFilterBlockInputStream.h | 6 ++++-- dbms/src/Storages/Transaction/Region.cpp | 8 ++++---- dbms/src/Storages/Transaction/Region.h | 10 +++++----- dbms/src/Storages/Transaction/RegionMeta.cpp | 2 +- dbms/src/Storages/Transaction/RegionMeta.h | 2 +- 5 files changed, 15 insertions(+), 13 deletions(-) diff --git a/dbms/src/DataStreams/RangesFilterBlockInputStream.h b/dbms/src/DataStreams/RangesFilterBlockInputStream.h index 6a98f791d3b..9dda89577c7 100644 --- a/dbms/src/DataStreams/RangesFilterBlockInputStream.h +++ b/dbms/src/DataStreams/RangesFilterBlockInputStream.h @@ -1,7 +1,7 @@ #pragma once #include -#include +#include #include namespace DB @@ -10,7 +10,9 @@ namespace DB class RangesFilterBlockInputStream : public IProfilingBlockInputStream { public: - RangesFilterBlockInputStream(const BlockInputStreamPtr & input_, const HandleRange & ranges_, const String & handle_col_name_) : input(input_), ranges(ranges_), handle_col_name(handle_col_name_) {} + RangesFilterBlockInputStream(const BlockInputStreamPtr & input_, const HandleRange & ranges_, const String & handle_col_name_) + : input(input_), ranges(ranges_), handle_col_name(handle_col_name_) + {} protected: Block getHeader() const override { return input->getHeader(); } diff --git a/dbms/src/Storages/Transaction/Region.cpp b/dbms/src/Storages/Transaction/Region.cpp index 406b52fed19..e10964e2670 100644 --- a/dbms/src/Storages/Transaction/Region.cpp +++ b/dbms/src/Storages/Transaction/Region.cpp @@ -25,12 +25,12 @@ RegionData::WriteCFIter Region::removeDataByWriteIt(const TableID & table_id, co } RegionData::ReadInfo Region::readDataByWriteIt( - const TableID & table_id, const RegionData::ConstWriteCFIter & write_it, RegionWriteCFDataTrait::Keys * keys) + const TableID & table_id, const RegionData::ConstWriteCFIter & write_it, RegionWriteCFDataTrait::Keys * keys) const { return data.readDataByWriteIt(table_id, write_it, keys); } -Region::LockInfoPtr Region::getLockInfo(TableID expected_table_id, UInt64 start_ts) +Region::LockInfoPtr Region::getLockInfo(TableID expected_table_id, UInt64 start_ts) const { return data.getLockInfo(expected_table_id, start_ts); } @@ -295,7 +295,7 @@ std::tuple, TableIDSet, bool> Region::onCommand(const eng return {split_regions, table_ids, sync_log}; } -size_t Region::serialize(WriteBuffer & buf, enginepb::CommandResponse * response) +size_t Region::serialize(WriteBuffer & buf, enginepb::CommandResponse * response) const { std::shared_lock lock(mutex); @@ -450,6 +450,6 @@ void Region::reset(Region && new_region) bool Region::isPeerRemoved() const { return meta.isPeerRemoved(); } -TableIDSet Region::getCommittedRecordTableID() { return data.getCommittedRecordTableID(); } +TableIDSet Region::getCommittedRecordTableID() const { return data.getCommittedRecordTableID(); } } // namespace DB diff --git a/dbms/src/Storages/Transaction/Region.h b/dbms/src/Storages/Transaction/Region.h index 010896c3030..9ae55aeb64a 100644 --- a/dbms/src/Storages/Transaction/Region.h +++ b/dbms/src/Storages/Transaction/Region.h @@ -139,7 +139,7 @@ class Region : public std::enable_shared_from_this std::unique_ptr createCommittedScanner(TableID expected_table_id); std::unique_ptr createCommittedRemover(TableID expected_table_id); - size_t serialize(WriteBuffer & buf, enginepb::CommandResponse * response = nullptr); + size_t serialize(WriteBuffer & buf, enginepb::CommandResponse * response = nullptr) const; static RegionPtr deserialize(ReadBuffer & buf, const RegionClientCreateFunc * region_client_create = nullptr); RegionID id() const; @@ -182,7 +182,7 @@ class Region : public std::enable_shared_from_this void reset(Region && new_region); - TableIDSet getCommittedRecordTableID(); + TableIDSet getCommittedRecordTableID() const; private: // Private methods no need to lock mutex, normally @@ -191,13 +191,13 @@ class Region : public std::enable_shared_from_this TableID doRemove(const String & cf, const TiKVKey & key); bool checkIndex(UInt64 index); - ColumnFamilyType getCf(const String & cf); + static ColumnFamilyType getCf(const String & cf); RegionData::ReadInfo readDataByWriteIt( - const TableID & table_id, const RegionData::ConstWriteCFIter & write_it, RegionWriteCFDataTrait::Keys * keys = nullptr); + const TableID & table_id, const RegionData::ConstWriteCFIter & write_it, RegionWriteCFDataTrait::Keys * keys = nullptr) const; RegionData::WriteCFIter removeDataByWriteIt(const TableID & table_id, const RegionData::WriteCFIter & write_it); - LockInfoPtr getLockInfo(TableID expected_table_id, UInt64 start_ts); + LockInfoPtr getLockInfo(TableID expected_table_id, UInt64 start_ts) const; RegionPtr splitInto(const RegionMeta & meta); Regions execBatchSplit(const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response, UInt64 index, UInt64 term); diff --git a/dbms/src/Storages/Transaction/RegionMeta.cpp b/dbms/src/Storages/Transaction/RegionMeta.cpp index e09b0f6f283..7001a410c70 100644 --- a/dbms/src/Storages/Transaction/RegionMeta.cpp +++ b/dbms/src/Storages/Transaction/RegionMeta.cpp @@ -19,7 +19,7 @@ size_t RegionMeta::serializeSize() const return peer_size + region_size + apply_state_size + sizeof(UInt64) + sizeof(bool); } -size_t RegionMeta::serialize(WriteBuffer & buf) +size_t RegionMeta::serialize(WriteBuffer & buf) const { std::lock_guard lock(mutex); diff --git a/dbms/src/Storages/Transaction/RegionMeta.h b/dbms/src/Storages/Transaction/RegionMeta.h index baebbe935e0..caf80f3a6fb 100644 --- a/dbms/src/Storages/Transaction/RegionMeta.h +++ b/dbms/src/Storages/Transaction/RegionMeta.h @@ -63,7 +63,7 @@ class RegionMeta enginepb::CommandResponse toCommandResponse() const; size_t serializeSize() const; - size_t serialize(WriteBuffer & buf); + size_t serialize(WriteBuffer & buf) const; static RegionMeta deserialize(ReadBuffer & buf); From f32412c8452d7ea684eaccc9114799c4fc5b6c96 Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Fri, 5 Apr 2019 18:22:25 +0800 Subject: [PATCH 07/11] refactor some code. --- .../ReplacingTMTSortedBlockInputStream.h | 7 +++---- dbms/src/Storages/Transaction/Region.h | 19 ++++++------------- dbms/src/Storages/Transaction/RegionData.h | 3 ++- 3 files changed, 11 insertions(+), 18 deletions(-) diff --git a/dbms/src/DataStreams/ReplacingTMTSortedBlockInputStream.h b/dbms/src/DataStreams/ReplacingTMTSortedBlockInputStream.h index 2f21c63ba1f..c4a010a8d4f 100644 --- a/dbms/src/DataStreams/ReplacingTMTSortedBlockInputStream.h +++ b/dbms/src/DataStreams/ReplacingTMTSortedBlockInputStream.h @@ -3,7 +3,7 @@ #include #include -#include +#include namespace DB { @@ -11,8 +11,7 @@ namespace DB class ReplacingTMTSortedBlockInputStream : public MergingSortedBlockInputStream { public: - ReplacingTMTSortedBlockInputStream( - const std::vector> & ranges_, + ReplacingTMTSortedBlockInputStream(const std::vector> & ranges_, const BlockInputStreams & inputs_, const SortDescription & description_, const String & version_column, @@ -73,4 +72,4 @@ class ReplacingTMTSortedBlockInputStream : public MergingSortedBlockInputStream UInt64 gc_tso; }; -} +} // namespace DB diff --git a/dbms/src/Storages/Transaction/Region.h b/dbms/src/Storages/Transaction/Region.h index 9ae55aeb64a..9bdaf551ce9 100644 --- a/dbms/src/Storages/Transaction/Region.h +++ b/dbms/src/Storages/Transaction/Region.h @@ -83,25 +83,19 @@ class Region : public std::enable_shared_from_this class CommittedRemover : private boost::noncopyable { public: - CommittedRemover(const RegionPtr & store_, TableID expected_table_id_) - : store(store_), lock(store_->mutex), expected_table_id(expected_table_id_) + CommittedRemover(const RegionPtr & store_, TableID expected_table_id_) : store(store_), lock(store_->mutex) { auto & data = store->data.writeCFMute().getDataMut(); - if (auto it = data.find(expected_table_id); it != data.end()) - { - found = true; - data_map = &(it->second); - } - else - found = false; + write_cf_data_it = data.find(expected_table_id_); + found = write_cf_data_it != data.end(); } void remove(const RegionWriteCFData::Key & key) { if (!found) return; - if (auto it = data_map->find(key); it != data_map->end()) - store->removeDataByWriteIt(expected_table_id, it); + if (auto it = write_cf_data_it->second.find(key); it != write_cf_data_it->second.end()) + store->removeDataByWriteIt(write_cf_data_it->first, it); } private: @@ -109,8 +103,7 @@ class Region : public std::enable_shared_from_this std::unique_lock lock; bool found; - TableID expected_table_id; - RegionWriteCFData::Map * data_map; + RegionWriteCFData::Data::iterator write_cf_data_it; }; public: diff --git a/dbms/src/Storages/Transaction/RegionData.h b/dbms/src/Storages/Transaction/RegionData.h index 3b2abe87b32..361662753e3 100644 --- a/dbms/src/Storages/Transaction/RegionData.h +++ b/dbms/src/Storages/Transaction/RegionData.h @@ -86,6 +86,7 @@ struct RegionCFDataBase using Key = typename Trait::Key; using Value = typename Trait::Value; using Map = typename Trait::Map; + using Data = std::unordered_map; static const TiKVKey & getTiKVKey(const Value & val) { @@ -301,7 +302,7 @@ struct RegionCFDataBase } private: - std::unordered_map data; + Data data; }; using RegionWriteCFData = RegionCFDataBase; From 46a65640fd7c7212deaec9bdcf9c4ce737233664 Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Fri, 5 Apr 2019 23:37:50 +0800 Subject: [PATCH 08/11] Fix bug: spark read no region. --- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 32 ++++++++++++------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 698bf7fcf73..9869a50edec 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -276,19 +276,27 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( TMTContext & tmt = context.getTMTContext(); - if (!select.no_kvstore && regions_query_info.empty()) + if (select.no_kvstore) + regions_query_info.clear(); + else { - tmt.region_table.traverseRegionsByTable(data.table_info.id, [&](std::vector> & regions) { - for (const auto & [id, region]: regions) - { - kvstore_region.emplace(id, region); - if (region == nullptr) - // maybe region is removed. - regions_query_info.push_back({id, InvalidRegionVersion, InvalidRegionVersion, {0, 0}}); - else - regions_query_info.push_back({id, region->version(), region->confVer(), region->getHandleRangeByTable(data.table_info.id)}); - } - }); + for (const auto & query_info : regions_query_info) + kvstore_region.emplace(query_info.region_id, tmt.kvstore->getRegion(query_info.region_id)); + + if (regions_query_info.empty()) + { + tmt.region_table.traverseRegionsByTable(data.table_info.id, [&](std::vector> & regions) { + for (const auto & [id, region]: regions) + { + kvstore_region.emplace(id, region); + if (region == nullptr) + // maybe region is removed. + regions_query_info.push_back({id, InvalidRegionVersion, InvalidRegionVersion, {0, 0}}); + else + regions_query_info.push_back({id, region->version(), region->confVer(), region->getHandleRangeByTable(data.table_info.id)}); + } + }); + } } std::sort(regions_query_info.begin(), regions_query_info.end()); From 4ff09af415a2bf4e4bc2aaa314f78cc1f96a4fae Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Sun, 7 Apr 2019 10:37:35 +0800 Subject: [PATCH 09/11] fix: add lock. --- dbms/src/Storages/Transaction/Region.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/dbms/src/Storages/Transaction/Region.cpp b/dbms/src/Storages/Transaction/Region.cpp index e10964e2670..6abe4ddd2aa 100644 --- a/dbms/src/Storages/Transaction/Region.cpp +++ b/dbms/src/Storages/Transaction/Region.cpp @@ -450,6 +450,10 @@ void Region::reset(Region && new_region) bool Region::isPeerRemoved() const { return meta.isPeerRemoved(); } -TableIDSet Region::getCommittedRecordTableID() const { return data.getCommittedRecordTableID(); } +TableIDSet Region::getCommittedRecordTableID() const +{ + std::shared_lock lock(mutex); + return data.getCommittedRecordTableID(); +} } // namespace DB From c18e421f538f81b106dbe0d8c83e2aece4447956 Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Sun, 7 Apr 2019 13:34:15 +0800 Subject: [PATCH 10/11] fix and optimize. --- dbms/src/Storages/Transaction/KVStore.cpp | 5 ++-- dbms/src/Storages/Transaction/Region.h | 2 +- .../Transaction/RegionBlockReader.cpp | 3 +-- dbms/src/Storages/Transaction/RegionTable.cpp | 23 ++----------------- dbms/src/Storages/Transaction/RegionTable.h | 1 - 5 files changed, 6 insertions(+), 28 deletions(-) diff --git a/dbms/src/Storages/Transaction/KVStore.cpp b/dbms/src/Storages/Transaction/KVStore.cpp index ce636be2815..45dc4bcd2e1 100644 --- a/dbms/src/Storages/Transaction/KVStore.cpp +++ b/dbms/src/Storages/Transaction/KVStore.cpp @@ -55,8 +55,6 @@ void KVStore::onSnapshot(RegionPtr new_region, Context * context) { TMTContext * tmt_ctx = context ? &(context->getTMTContext()) : nullptr; - auto table_ids = new_region->getCommittedRecordTableID(); - { std::lock_guard lock(task_mutex); @@ -89,8 +87,9 @@ void KVStore::onSnapshot(RegionPtr new_region, Context * context) region_persister.persist(new_region); + // if the operation about RegionTable is out of the protection of task_mutex, we should make sure that it can't delete any mapping relation. if (tmt_ctx) - tmt_ctx->region_table.applySnapshotRegion(new_region, table_ids); + tmt_ctx->region_table.applySnapshotRegion(new_region); } void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftContext & raft_ctx) diff --git a/dbms/src/Storages/Transaction/Region.h b/dbms/src/Storages/Transaction/Region.h index 9bdaf551ce9..2772effa254 100644 --- a/dbms/src/Storages/Transaction/Region.h +++ b/dbms/src/Storages/Transaction/Region.h @@ -64,7 +64,7 @@ class Region : public std::enable_shared_from_this auto next(RegionWriteCFDataTrait::Keys * keys = nullptr) { if (!found) - throw Exception(String() + "table: " + DB::toString(expected_table_id) + " is not found", ErrorCodes::LOGICAL_ERROR); + throw Exception("CommittedScanner table: " + DB::toString(expected_table_id) + " is not found", ErrorCodes::LOGICAL_ERROR); return store->readDataByWriteIt(expected_table_id, write_map_it++, keys); } diff --git a/dbms/src/Storages/Transaction/RegionBlockReader.cpp b/dbms/src/Storages/Transaction/RegionBlockReader.cpp index 06d6ab4e14d..d807da025a0 100644 --- a/dbms/src/Storages/Transaction/RegionBlockReader.cpp +++ b/dbms/src/Storages/Transaction/RegionBlockReader.cpp @@ -72,11 +72,10 @@ Block RegionBlockRead(const TiDB::TableInfo & table_info, const ColumnsDescripti const auto & date_lut = DateLUT::instance(); - // TODO: lock region, to avoid region adding/droping while writing data // Because the first check of scanner.hasNext() already been done outside of this function. do { - // TODO: comfirm all this mess + // TODO: confirm all this mess auto [handle, write_type, commit_ts, value] = scanner->next(keys); if (write_type == Region::PutFlag || write_type == Region::DelFlag) { diff --git a/dbms/src/Storages/Transaction/RegionTable.cpp b/dbms/src/Storages/Transaction/RegionTable.cpp index 27df2c892dd..6c750f54d8d 100644 --- a/dbms/src/Storages/Transaction/RegionTable.cpp +++ b/dbms/src/Storages/Transaction/RegionTable.cpp @@ -278,8 +278,9 @@ void RegionTable::updateRegion(const RegionPtr & region, const TableIDSet & rela void RegionTable::applySnapshotRegion(const RegionPtr & region) { + // make operation about snapshot can only add mapping relations rather than delete. auto table_ids = region->getCommittedRecordTableID(); - return applySnapshotRegion(region, table_ids); + updateRegion(region, table_ids); } void RegionTable::applySnapshotRegions(const ::DB::RegionMap & region_map) @@ -305,26 +306,6 @@ void RegionTable::applySnapshotRegions(const ::DB::RegionMap & region_map) tables.find(table_id)->second.persist(); } -void RegionTable::applySnapshotRegion(const RegionPtr & region, const TableIDSet & table_ids) -{ - TableIDSet table_to_persist; - size_t cache_bytes = region->dataSize(); - - std::lock_guard lock(mutex); - - for (auto table_id : table_ids) - { - auto & internal_region = getOrInsertRegion(table_id, region, table_to_persist); - internal_region.updated = true; - internal_region.cache_bytes = cache_bytes; - } - - updateRegionRange(region, table_to_persist); - - for (auto table_id : table_to_persist) - tables.find(table_id)->second.persist(); -} - void RegionTable::splitRegion(const RegionPtr & kvstore_region, const std::vector & split_regions) { std::lock_guard lock(mutex); diff --git a/dbms/src/Storages/Transaction/RegionTable.h b/dbms/src/Storages/Transaction/RegionTable.h index 0edfd2a2bb1..b48a161aeea 100644 --- a/dbms/src/Storages/Transaction/RegionTable.h +++ b/dbms/src/Storages/Transaction/RegionTable.h @@ -165,7 +165,6 @@ class RegionTable : private boost::noncopyable /// A new region arrived by apply snapshot command, this function store the region into selected partitions. void applySnapshotRegion(const RegionPtr & region); void applySnapshotRegions(const ::DB::RegionMap & regions); - void applySnapshotRegion(const RegionPtr & region, const TableIDSet & table_ids); /// Manage data after region split into split_regions. /// i.e. split_regions could have assigned to another partitions, we need to move the data belong with them. From 24e29ec55d66f4ded1f80911607da75403ba1505 Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Sun, 7 Apr 2019 21:42:45 +0800 Subject: [PATCH 11/11] add more info log. --- dbms/src/Storages/Transaction/RegionPersister.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dbms/src/Storages/Transaction/RegionPersister.cpp b/dbms/src/Storages/Transaction/RegionPersister.cpp index 4e145036d61..558e235bb16 100644 --- a/dbms/src/Storages/Transaction/RegionPersister.cpp +++ b/dbms/src/Storages/Transaction/RegionPersister.cpp @@ -60,6 +60,8 @@ void RegionPersister::restore(RegionMap & regions, Region::RegionClientCreateFun regions.emplace(page.page_id, region); }; page_storage.traverse(acceptor); + + LOG_INFO(log, "restore " << regions.size() << " regions"); } bool RegionPersister::gc() { return page_storage.gc(); }